Overall Statistics
Total Orders
528
Average Win
1.50%
Average Loss
-1.50%
Compounding Annual Return
-3.295%
Drawdown
45.400%
Expectancy
-0.103
Start Equity
100000
End Equity
60472.16
Net Profit
-39.528%
Sharpe Ratio
-0.761
Sortino Ratio
-0.589
Probabilistic Sharpe Ratio
0.000%
Loss Rate
55%
Win Rate
45%
Profit-Loss Ratio
1.00
Alpha
0
Beta
0
Annual Standard Deviation
0.051
Annual Variance
0.003
Information Ratio
-0.424
Tracking Error
0.051
Treynor Ratio
0
Total Fees
$1304.16
Estimated Strategy Capacity
$16000000000.00
Lowest Capacity Asset
ZB YQXHW27VDWYT
Portfolio Turnover
15.91%
Drawdown Recovery
154
# region imports
from AlgorithmImports import *
import numpy as np
from collections import deque
# endregion

# ======================================================================
# Curve Spread / Butterfly Mean-Reversion — CME Treasury FUTURES on IBKR
# ----------------------------------------------------------------------
# IBKR version. On Interactive Brokers you trade the real CME futures
# (ZT/ZF/ZN/ZB), not CFDs, so vs the OANDA build:
#   * Instruments are exchange futures via AddFuture (continuous contracts).
#   * NO CFD financing model — futures carry is in the roll, which the
#     BackwardsRatio continuous contract already captures. Adding a daily
#     financing charge here would double-count carry.
#   * IB commissions + futures margin are modelled by the IB brokerage model.
#   * Rolls are real: the reconciler flattens rolled-out contracts and
#     rebuilds the position on the new front month.
#   * Regression runs on DOLLAR values (price x point value), so ZT's
#     $2,000/point (vs $1,000 for ZF/ZN/ZB) is handled and the hedge ratios
#     are correctly DV01-neutral across tenors.
#   * IB accepts MarketOnOpen, so orders are plain market orders (no buffer).
#
# Leg convention: leg_a = DEPENDENT (regressed on others); residual = spread.
#   2-leg = two tenors (e.g. 10 vs 30); 3-leg = belly(leg_a) + two wings (fly).
# Flip use_regime_filter 0/1 across two backtests for the filter on/off curves.
#
# Not investment advice. Backtested results are not live results.
# ======================================================================

# tenor -> (LEAN future symbol, point value $/point)
TENOR = {
    "2":  (Futures.Financials.Y2TreasuryNote,  2000.0),   # ZT: $200k face
    "5":  (Futures.Financials.Y5TreasuryNote,  1000.0),   # ZF
    "10": (Futures.Financials.Y10TreasuryNote, 1000.0),   # ZN
    "30": (Futures.Financials.Y30TreasuryBond, 1000.0),   # ZB
}


class CurveSpreadIBKR(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2010, 1, 1)
        self.SetEndDate(2024, 12, 31)
        self.SetCash(100_000)
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)

        a = self.GetParameter("leg_a") or "10"
        b = self.GetParameter("leg_b") or "30"
        c = self.GetParameter("leg_c") or ""           # "" -> 2-leg; set for a fly
        tenors = [a, b] + ([c] if c else [])

        common = dict(dataNormalizationMode=DataNormalizationMode.BackwardsRatio,
                      dataMappingMode=DataMappingMode.OpenInterest, contractDepthOffset=0)
        self.futures, self.pv = [], []
        for t in tenors:
            sym, pv = TENOR[t]
            f = self.AddFuture(sym, Resolution.Daily, **common)
            f.SetFilter(0, 120)
            self.futures.append(f)
            self.pv.append(pv)
        self.Log(f"Legs (dependent first): {[TENOR[t][0] for t in tenors]}")

        self.use_regime_filter = int(self.GetParameter("use_regime_filter", 1)) == 1
        self.entry_z   = float(self.GetParameter("entry_z", 2.0))
        self.exit_z    = float(self.GetParameter("exit_z", 0.5))
        self.stop_z    = float(self.GetParameter("stop_z", 3.8))
        self.reg_window= int(self.GetParameter("reg_window", 60))
        self.hl_mult   = float(self.GetParameter("hl_window_mult", 3.0))
        self.min_win   = int(self.GetParameter("min_window", 15))
        self.trend_lb  = int(self.GetParameter("trend_lookback", 20))
        self.trend_th  = float(self.GetParameter("trend_strength", 1.0))
        self.gross_mult = float(self.GetParameter("gross_notional_mult", 1.0))  # gross notional / equity

        maxlen = self.reg_window + 10
        self.px = [deque(maxlen=maxlen) for _ in self.futures]   # DOLLAR-value series (price * pv)
        self.in_trade = False
        self.target_units = None       # list of desired NET contracts per leg; None = flat
        self._logged = False
        self.SetWarmUp(maxlen, Resolution.Daily)

    def OnData(self, slice: Slice):
        if not all(self.Securities[f.Symbol].HasData and self.Securities[f.Symbol].Price > 0
                   for f in self.futures):
            return
        for i, f in enumerate(self.futures):
            self.px[i].append(float(self.Securities[f.Symbol].Price) * self.pv[i])
        if self.IsWarmingUp or len(self.px[0]) < self.reg_window:
            return

        if not self._logged:
            for f in self.futures:
                self.Log(f"data check {f.Symbol.Value}: HasData={self.Securities[f.Symbol].HasData}")
            self._logged = True

        cols = [np.array(p, dtype=float)[-self.reg_window:] for p in self.px]
        coefs, _, resid = self._ols_multi(cols[0], cols[1:])
        if coefs is None:
            return
        hl = self._ou_half_life(resid)
        zwin = int(np.clip(self.hl_mult * hl, self.min_win, self.reg_window)) \
            if (hl is not None and np.isfinite(hl) and hl > 1) else self.reg_window
        seg = resid[-zwin:]
        sd = seg.std()
        if sd == 0:
            return
        z = (resid[-1] - seg.mean()) / sd

        in_trend = False
        if self.use_regime_filter:
            slope = self._ema_slope(resid, self.trend_lb, self.trend_lb)
            dres = np.diff(resid)
            dvol = dres[-zwin:].std() if len(dres) >= 2 else sd
            denom = dvol * np.sqrt(self.trend_lb)
            in_trend = (abs(slope) / denom > self.trend_th) if denom > 0 else False

        is_flat = not any(h.Invested for h in self.Portfolio.Values)

        if not self.in_trade and is_flat:
            if not in_trend and z > self.entry_z:
                tu = self._freeze(-1, coefs)
                if tu is not None:
                    self.target_units, self.in_trade = tu, True
            elif not in_trend and z < -self.entry_z:
                tu = self._freeze(+1, coefs)
                if tu is not None:
                    self.target_units, self.in_trade = tu, True
        elif self.in_trade:
            if abs(z) < self.exit_z or abs(z) > self.stop_z or (self.use_regime_filter and in_trend):
                self.target_units, self.in_trade = None, False

        self._reconcile()
        self.Plot("Signal", "z", float(z))

    # per-leg contract targets sized to a gross-notional fraction of equity.
    # coefs are dollar-DV01 ratios -> integer-rounded contracts are DV01-neutral.
    def _freeze(self, direction, coefs):
        p = [self.px[i][-1] for i in range(len(self.futures))]   # last dollar values
        gross_per_unit = p[0] + sum(abs(c) * p[j + 1] for j, c in enumerate(coefs))
        if gross_per_unit <= 0:
            return None
        u = max(1, int(self.Portfolio.TotalPortfolioValue * self.gross_mult / gross_per_unit))
        return [direction * u] + [int(round(-direction * c * u)) for c in coefs]

    # drive holdings toward target on the CURRENT front month; flatten rolled-out
    # contracts (roll) and flatten everything when target is None (exit).
    def _reconcile(self):
        desired = {}
        if self.target_units is not None:
            for i, f in enumerate(self.futures):
                m = f.Mapped
                if m is not None:
                    desired[m] = desired.get(m, 0) + self.target_units[i]
        syms = set(desired.keys())
        for h in self.Portfolio.Values:
            if h.Invested:
                syms.add(h.Symbol)
        if not syms:
            return
        self.Transactions.CancelOpenOrders()
        for sym in syms:
            delta = desired.get(sym, 0) - float(self.Portfolio[sym].Quantity)
            if abs(delta) >= 1:
                self.MarketOrder(sym, int(round(delta)))

    # ---- numerics ----
    @staticmethod
    def _ols_multi(y, regressors):
        n = len(y)
        A = np.column_stack(list(regressors) + [np.ones(n)])
        try:
            sol, *_ = np.linalg.lstsq(A, y, rcond=None)
        except Exception:
            return None, None, None
        return sol[:-1], sol[-1], y - A @ sol

    @staticmethod
    def _ou_half_life(s):
        if len(s) < 10:
            return None
        ds, lag = np.diff(s), s[:-1]
        A = np.vstack([lag, np.ones_like(lag)]).T
        try:
            theta = np.linalg.lstsq(A, ds, rcond=None)[0][0]
        except Exception:
            return None
        if theta >= 0:
            return None
        phi = 1.0 + theta
        return -np.log(2) / np.log(phi) if phi > 0 else None

    @staticmethod
    def _ema_slope(s, span, lookback):
        a = 2.0 / (span + 1.0)
        e = s[0]
        out = np.empty_like(s)
        for i, v in enumerate(s):
            e = a * v + (1 - a) * e
            out[i] = e
        return 0.0 if len(out) <= lookback else out[-1] - out[-1 - lookback]