| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.702 Tracking Error 0.142 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% Drawdown Recovery 0 |
from AlgorithmImports import *
from datetime import timedelta
class FutureCLGCInflationHedge(QCAlgorithm):
def initialize(self):
self.set_start_date(self.end_date - timedelta(5*365))
self.set_cash(100000)
# Add continuous futures
self._gc = self.add_future("GC", Resolution.DAILY)
self._cl = self.add_future("CL", Resolution.DAILY)
# Rolling windows for 30-day history
self._gc_prices = RollingWindow[float](30)
self._cl_prices = RollingWindow[float](30)
self._ratio_window = RollingWindow[float](30)
self._hedge_ratio = 1.0
self._state = 0 # 0=flat, 1=long ratio, -1=short ratio
# Schedule daily signal check before market open
self.schedule.on(
self.date_rules.every_day("GC"),
self.time_rules.at(7, 0),
self._check_signals
)
# Monthly hedge ratio re-fit before market open
self.schedule.on(
self.date_rules.month_start("GC"),
self.time_rules.at(7, 15),
self._refit_hedge_ratio
)
def _refit_hedge_ratio(self):
if self._gc_prices.count < 30 or self._cl_prices.count < 30:
return
gc_list = list(self._gc_prices)[::-1]
cl_list = list(self._cl_prices)[::-1]
n = len(gc_list)
mean_gc = sum(gc_list) / n
mean_cl = sum(cl_list) / n
cov = sum((gc_list[i] - mean_gc) * (cl_list[i] - mean_cl) for i in range(n))
var_cl = sum((cl_list[i] - mean_cl) ** 2 for i in range(n))
if var_cl != 0:
self._hedge_ratio = cov / var_cl
def _check_signals(self):
gc_price = self.securities[self._gc.symbol].close
cl_price = self.securities[self._cl.symbol].close
if gc_price == 0 or cl_price == 0:
return
ratio = gc_price / cl_price
self._gc_prices.add(gc_price)
self._cl_prices.add(cl_price)
self._ratio_window.add(ratio)
if self._ratio_window.count < 30:
return
ratios = list(self._ratio_window)[::-1]
mean_ratio = sum(ratios) / len(ratios)
variance = sum((r - mean_ratio) ** 2 for r in ratios) / len(ratios)
std_ratio = variance ** 0.5
if std_ratio == 0:
return
z_score = (ratio - mean_ratio) / std_ratio
# Entry logic
if self._state == 0:
if z_score > 2:
# Ratio is high, short GC long CL (short the ratio)
self._state = -1
self._enter_positions()
elif z_score < -2:
# Ratio is low, long GC short CL (long the ratio)
self._state = 1
self._enter_positions()
else:
# Exit when z crosses 0
if (self._state == 1 and z_score >= 0) or (self._state == -1 and z_score <= 0):
self.liquidate(self._gc.symbol)
self.liquidate(self._cl.symbol)
self._state = 0
def _enter_positions(self):
gc_target = 0.5 * self._state
cl_target = -0.5 * self._state * self._hedge_ratio
gross = abs(gc_target) + abs(cl_target)
if gross > 0:
gc_target = gc_target / gross
cl_target = cl_target / gross
self.set_holdings(self._gc.symbol, gc_target)
self.set_holdings(self._cl.symbol, cl_target)
def on_data(self, data: Slice):
pass