Overall Statistics
Total Orders
1
Average Win
0%
Average Loss
0%
Compounding Annual Return
-98.729%
Drawdown
20.700%
Expectancy
0
Start Equity
100000
End Equity
87020.25
Net Profit
-12.980%
Sharpe Ratio
-1.1
Sortino Ratio
-1.98
Probabilistic Sharpe Ratio
20.372%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
-0.002
Beta
0.999
Annual Standard Deviation
0.872
Annual Variance
0.761
Information Ratio
-0.258
Tracking Error
0.007
Treynor Ratio
-0.96
Total Fees
$1.77
Estimated Strategy Capacity
$4100000000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
8.58%
Drawdown Recovery
1
from AlgorithmImports import *
from parser import MarketRegimeSignal


class MarketRegimeAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2020, 3, 2)
        self.set_end_date(2020, 3, 13)
        self.set_cash(100_000)
        self._spy = self.add_equity("SPY", Resolution.DAILY)
        # Store the symbol to reference it later
        self._regime_symbol = self.add_data(MarketRegimeSignal, "REGIME", Resolution.DAILY).symbol
        self.log(f"Initialized Custom Data: {self._regime_symbol.value}")

    def on_data(self, slice: Slice) -> None:
        self.log(f"on_data: {self.time}")
        slice_keys = [str(key) for key in slice.keys()]
        self.log(f"slice has: {slice_keys}")
        # Direct string check fallback
        if "REGIME" in slice:
            self.log("Success")
            # Extract data cleanly
            data = slice[self._regime_symbol]
            self.debug(f"Check data: {data.time.date()} | Regime: {data.regime} | Score: {data.score}")
            # Trading logic
            if data.regime == "risk_on":
                self.set_holdings(self._spy, 1.0)
            elif data.regime == "risk_off":
                self.liquidate(self._spy)
from AlgorithmImports import *

class MarketRegimeSignal(PythonData):
    def get_source(self, config, date, is_live_mode):
        # Change FileFormat to Csv. LEAN handles the .zip decompression automatically.
        return SubscriptionDataSource(
            "market-regime-signals.zip", 
            SubscriptionTransportMedium.OBJECT_STORE,
            FileFormat.CSV
        )

    def reader(self, config, line, date, is_live_mode):
        # 1. Quick sanity check for headers or empty lines
        if not line.strip() or "date" in line.lower():
            return None                        
        csv = line.split(',')
        if len(csv) < 6:
            return None            
        # 2. Extract and validate row date
        row_date = datetime.strptime(csv[0].strip(), "%Y-%m-%d")        
        # Only parse the data point meant for the current backtest date frontier
        if row_date.date() != date.date():
            return None            
        # 3. Direct mapping without broad try/except block
        data = MarketRegimeSignal()
        data.symbol = config.symbol
        data.time = row_date
        data.end_time = data.time + timedelta(days=1)         
        data.ticker = csv[1].strip()
        data.score = float(csv[2].strip())
        data.regime = csv[3].strip()
        data.is_confirmed = csv[4].strip().lower() == 'true'
        data.note = csv[5].strip()
        data.value = data.score         
        return data