Overall Statistics
from AlgorithmImports import *
import os
import sys
import shutil
import subprocess
from datetime import timedelta


class AutoGluonRayRepro(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2025, 12, 20)
        self.set_end_date(2025, 12, 22)
        self.set_cash(100000)

        #HAVE A LOOK HERE
        # When set to True, uses a fit configuration that avoids Ray usage inside AutoGluon -> the fit will work
        # When set to False, uses the requested fit configuration that triggers Ray usage (default in AutoGluon) -> the fit will crash in QC Cloud
        self.prevent_ray_in_autogluon_fit = False


        # No trading needed, but add at least one security so the algorithm is "normal"
        self.add_equity("SPY", Resolution.DAILY)
        # Warmup not strictly needed, but it makes it easy to run once deterministically
        self.set_warm_up(timedelta(days=1))
        self._ran = False
        self.debug("Initialize complete. Waiting for warmup to finish...")

    def on_data(self, data: Slice) -> None: # type: ignore
        # Run exactly once after warmup
        if self.is_warming_up or self._ran:
            return

        self._ran = True
        self.debug("Warmup finished. Starting AutoGluon reproduction...")

        try:
            self._log_environment_diagnostics()

            # Import inside the function to keep the repro self-contained
            import pandas as pd
            import numpy as np
            from autogluon.tabular import TabularPredictor

            # ---- Minimal synthetic dataset ----
            # Keep it small but non-trivial.
            n = 300
            rng = np.random.default_rng(123)
            df = pd.DataFrame({
                "f1": rng.normal(size=n),
                "f2": rng.normal(size=n),
                "f3": rng.integers(0, 5, size=n),
            })
            # A simple numeric regression target
            df["label"] = 0.3 * df["f1"] - 0.7 * df["f2"] + 0.1 * df["f3"] + rng.normal(scale=0.1, size=n)

            train_df = df.iloc[:240].reset_index(drop=True)
            test_df  = df.iloc[240:].reset_index(drop=True)

            # Use a writable path in QC environment
            model_path = os.path.join(os.getcwd(), "ag_repro_model")

            self.debug(f"About to fit AutoGluon. model_path={model_path}")

            predictor = TabularPredictor(
                label="label",
                problem_type="regression",
                eval_metric="spearmanr",
                path=model_path,
                log_to_file=True,
            )

            # ---- The minimal fit configuration requested ----
            predictor = predictor.fit(
                train_data=train_df,
                presets="high_quality",
                hyperparameters={"GBM": {}},
                # Try to keep it as sequential/simple as possible and disable Ray logging
                ds_args={"memory_safe_fits": False, "enable_ray_logging": False} if self.prevent_ray_in_autogluon_fit else {},
                # This avvoids using ray for ensemble fitting
                ag_args_ensemble={"fold_fitting_strategy": "sequential_local"} if self.prevent_ray_in_autogluon_fit else {},
                num_cpus="auto",  # makes it more deterministic + reduces parallelism
                verbosity=2,
            )

            self.debug("AutoGluon fit completed. Running a tiny predict step...")
            _ = predictor.predict(test_df.drop(columns=["label"]))
            self.debug("Predict step completed. Repro finished successfully (no crash).")

        except BaseException as e:
            # Dump exception so QC support sees it in logs
            self.error(f"AutoGluon reproduction failed with exception: {repr(e)}")
            import traceback
            self.error(traceback.format_exc())

        finally:
            # Stop quickly to keep logs small
            self.debug("Quitting algorithm.")
            self.quit()

    def _log_environment_diagnostics(self) -> None:
        """Logs the key signals that show Ray version/environment mismatch in QC Cloud."""
        self.debug(f"sys.executable: {sys.executable}")
        self.debug(f"PATH (prefix): {os.environ.get('PATH', '')[:200]}...")

        # Ray import diagnostics (may be different from ray CLI)
        try:
            import ray
            self.debug(f"ray imported version: {ray.__version__}")
            self.debug(f"ray.__file__: {ray.__file__}")
        except BaseException as e:
            self.error(f"Failed to import ray: {repr(e)}")

        # CLI diagnostics
        try:
            which_ray = shutil.which("ray")
            self.debug(f"which ray: {which_ray}")
        except BaseException as e:
            self.error(f"Failed to locate ray CLI: {repr(e)}")

        try:
            p = subprocess.run(["ray", "--version"], text=True, capture_output=True)
            self.debug(f"ray --version rc: {p.returncode}")
            self.debug(f"ray --version stdout: {p.stdout.strip()[:300]}")
            self.debug(f"ray --version stderr: {p.stderr.strip()[:300]}")
        except BaseException as e:
            self.error(f"Failed to run 'ray --version': {repr(e)}")

        # Optional deps check
        try:
            import pkgutil
            import aiohttp_cors
            self.debug(f"aiohttp_cors import ok: {aiohttp_cors.__file__}")
            self.debug(f"aiohttp_cors present? {pkgutil.find_loader('aiohttp_cors') is not None}")
        except BaseException as e:
            self.error(f"aiohttp_cors import check failed: {repr(e)}")