Overall Statistics
Total Orders
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Start Equity
100000
End Equity
100000
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-0.702
Tracking Error
0.142
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
Drawdown Recovery
0
from AlgorithmImports import *
from datetime import timedelta


class FutureCLGCInflationHedge(QCAlgorithm):

    def initialize(self):
        self.set_start_date(self.end_date - timedelta(5*365))
        self.set_cash(100000)
        
        # Add continuous futures
        self._gc = self.add_future("GC", Resolution.DAILY)
        self._cl = self.add_future("CL", Resolution.DAILY)
        
        # Rolling windows for 30-day history
        self._gc_prices = RollingWindow[float](30)
        self._cl_prices = RollingWindow[float](30)
        self._ratio_window = RollingWindow[float](30)
        
        self._hedge_ratio = 1.0
        self._state = 0  # 0=flat, 1=long ratio, -1=short ratio
        
        # Schedule daily signal check before market open
        self.schedule.on(
            self.date_rules.every_day("GC"),
            self.time_rules.at(7, 0),
            self._check_signals
        )
        
        # Monthly hedge ratio re-fit before market open
        self.schedule.on(
            self.date_rules.month_start("GC"),
            self.time_rules.at(7, 15),
            self._refit_hedge_ratio
        )

    def _refit_hedge_ratio(self):
        if self._gc_prices.count < 30 or self._cl_prices.count < 30:
            return
        
        gc_list = list(self._gc_prices)[::-1]
        cl_list = list(self._cl_prices)[::-1]
        n = len(gc_list)
        
        mean_gc = sum(gc_list) / n
        mean_cl = sum(cl_list) / n
        
        cov = sum((gc_list[i] - mean_gc) * (cl_list[i] - mean_cl) for i in range(n))
        var_cl = sum((cl_list[i] - mean_cl) ** 2 for i in range(n))
        
        if var_cl != 0:
            self._hedge_ratio = cov / var_cl

    def _check_signals(self):
        gc_price = self.securities[self._gc.symbol].close
        cl_price = self.securities[self._cl.symbol].close
        
        if gc_price == 0 or cl_price == 0:
            return
            
        ratio = gc_price / cl_price
        
        self._gc_prices.add(gc_price)
        self._cl_prices.add(cl_price)
        self._ratio_window.add(ratio)
        
        if self._ratio_window.count < 30:
            return
            
        ratios = list(self._ratio_window)[::-1]
        mean_ratio = sum(ratios) / len(ratios)
        variance = sum((r - mean_ratio) ** 2 for r in ratios) / len(ratios)
        std_ratio = variance ** 0.5
        
        if std_ratio == 0:
            return
            
        z_score = (ratio - mean_ratio) / std_ratio
        
        # Entry logic
        if self._state == 0:
            if z_score > 2:
                # Ratio is high, short GC long CL (short the ratio)
                self._state = -1
                self._enter_positions()
            elif z_score < -2:
                # Ratio is low, long GC short CL (long the ratio)
                self._state = 1
                self._enter_positions()
        else:
            # Exit when z crosses 0
            if (self._state == 1 and z_score >= 0) or (self._state == -1 and z_score <= 0):
                self.liquidate(self._gc.symbol)
                self.liquidate(self._cl.symbol)
                self._state = 0

    def _enter_positions(self):
        gc_target = 0.5 * self._state
        cl_target = -0.5 * self._state * self._hedge_ratio
        
        gross = abs(gc_target) + abs(cl_target)
        if gross > 0:
            gc_target = gc_target / gross
            cl_target = cl_target / gross
            
        self.set_holdings(self._gc.symbol, gc_target)
        self.set_holdings(self._cl.symbol, cl_target)

    def on_data(self, data: Slice):
        pass