from AlgorithmImports import *
import os
import sys
import shutil
import subprocess
from datetime import timedelta
class AutoGluonRayRepro(QCAlgorithm):
def initialize(self) -> None:
self.set_start_date(2025, 12, 20)
self.set_end_date(2025, 12, 22)
self.set_cash(100000)
#HAVE A LOOK HERE
# When set to True, uses a fit configuration that avoids Ray usage inside AutoGluon -> the fit will work
# When set to False, uses the requested fit configuration that triggers Ray usage (default in AutoGluon) -> the fit will crash in QC Cloud
self.prevent_ray_in_autogluon_fit = False
# No trading needed, but add at least one security so the algorithm is "normal"
self.add_equity("SPY", Resolution.DAILY)
# Warmup not strictly needed, but it makes it easy to run once deterministically
self.set_warm_up(timedelta(days=1))
self._ran = False
self.debug("Initialize complete. Waiting for warmup to finish...")
def on_data(self, data: Slice) -> None: # type: ignore
# Run exactly once after warmup
if self.is_warming_up or self._ran:
return
self._ran = True
self.debug("Warmup finished. Starting AutoGluon reproduction...")
try:
self._log_environment_diagnostics()
# Import inside the function to keep the repro self-contained
import pandas as pd
import numpy as np
from autogluon.tabular import TabularPredictor
# ---- Minimal synthetic dataset ----
# Keep it small but non-trivial.
n = 300
rng = np.random.default_rng(123)
df = pd.DataFrame({
"f1": rng.normal(size=n),
"f2": rng.normal(size=n),
"f3": rng.integers(0, 5, size=n),
})
# A simple numeric regression target
df["label"] = 0.3 * df["f1"] - 0.7 * df["f2"] + 0.1 * df["f3"] + rng.normal(scale=0.1, size=n)
train_df = df.iloc[:240].reset_index(drop=True)
test_df = df.iloc[240:].reset_index(drop=True)
# Use a writable path in QC environment
model_path = os.path.join(os.getcwd(), "ag_repro_model")
self.debug(f"About to fit AutoGluon. model_path={model_path}")
predictor = TabularPredictor(
label="label",
problem_type="regression",
eval_metric="spearmanr",
path=model_path,
log_to_file=True,
)
# ---- The minimal fit configuration requested ----
predictor = predictor.fit(
train_data=train_df,
presets="high_quality",
hyperparameters={"GBM": {}},
# Try to keep it as sequential/simple as possible and disable Ray logging
ds_args={"memory_safe_fits": False, "enable_ray_logging": False} if self.prevent_ray_in_autogluon_fit else {},
# This avvoids using ray for ensemble fitting
ag_args_ensemble={"fold_fitting_strategy": "sequential_local"} if self.prevent_ray_in_autogluon_fit else {},
num_cpus="auto", # makes it more deterministic + reduces parallelism
verbosity=2,
)
self.debug("AutoGluon fit completed. Running a tiny predict step...")
_ = predictor.predict(test_df.drop(columns=["label"]))
self.debug("Predict step completed. Repro finished successfully (no crash).")
except BaseException as e:
# Dump exception so QC support sees it in logs
self.error(f"AutoGluon reproduction failed with exception: {repr(e)}")
import traceback
self.error(traceback.format_exc())
finally:
# Stop quickly to keep logs small
self.debug("Quitting algorithm.")
self.quit()
def _log_environment_diagnostics(self) -> None:
"""Logs the key signals that show Ray version/environment mismatch in QC Cloud."""
self.debug(f"sys.executable: {sys.executable}")
self.debug(f"PATH (prefix): {os.environ.get('PATH', '')[:200]}...")
# Ray import diagnostics (may be different from ray CLI)
try:
import ray
self.debug(f"ray imported version: {ray.__version__}")
self.debug(f"ray.__file__: {ray.__file__}")
except BaseException as e:
self.error(f"Failed to import ray: {repr(e)}")
# CLI diagnostics
try:
which_ray = shutil.which("ray")
self.debug(f"which ray: {which_ray}")
except BaseException as e:
self.error(f"Failed to locate ray CLI: {repr(e)}")
try:
p = subprocess.run(["ray", "--version"], text=True, capture_output=True)
self.debug(f"ray --version rc: {p.returncode}")
self.debug(f"ray --version stdout: {p.stdout.strip()[:300]}")
self.debug(f"ray --version stderr: {p.stderr.strip()[:300]}")
except BaseException as e:
self.error(f"Failed to run 'ray --version': {repr(e)}")
# Optional deps check
try:
import pkgutil
import aiohttp_cors
self.debug(f"aiohttp_cors import ok: {aiohttp_cors.__file__}")
self.debug(f"aiohttp_cors present? {pkgutil.find_loader('aiohttp_cors') is not None}")
except BaseException as e:
self.error(f"aiohttp_cors import check failed: {repr(e)}")