| Overall Statistics |
|
Total Orders 3714 Average Win 0.48% Average Loss -0.44% Compounding Annual Return 11.240% Drawdown 21.500% Expectancy 0.260 Start Equity 75000 End Equity 424406.53 Net Profit 465.875% Sharpe Ratio 0.574 Sortino Ratio 0.439 Probabilistic Sharpe Ratio 8.622% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 1.10 Alpha 0 Beta 0 Annual Standard Deviation 0.109 Annual Variance 0.012 Information Ratio 0.761 Tracking Error 0.109 Treynor Ratio 0 Total Fees €0.00 Estimated Strategy Capacity €61000.00 Lowest Capacity Asset XAUUSD 8I Portfolio Turnover 40.71% Drawdown Recovery 798 |
# region imports
from AlgorithmImports import *
from dataclasses import dataclass, field
from typing import Optional, Literal, List
from datetime import datetime, timedelta
# endregion
# ============================================================================
# v12 (2026-04-10): Custom data for yfinance-style indicators
#
# The deployed signal configs (sharefolder/strategyconfigs/*.json) use
# source_type: yfinance_ratio for futures (GC=F, CL=F, HG=F) and ETFs
# (XLV, SPY, EFA), and deribit_volatility_index for BTC DVOL.
#
# QC's native futures chain + equity feed produces z-scores that differ from
# yfinance's continuous futures + ETF closes by ~0.3-0.5 z-score units
# (proven 2026-04-10: yfinance-recomputed C064 z-score matches offline to
# 5 decimal places; QC-native C064 z differs by mean abs 0.275).
#
# YFCustomData below reads pre-uploaded CSV files from ObjectStore under
# the 'yf/' prefix. Files are written by fetch_custom_data.py and uploaded
# via `lean cloud object-store set`.
# ============================================================================
class YFCustomData(PythonData):
"""Daily close data sourced from yfinance (for futures GC=F/CL=F/HG=F,
ETFs XLV/SPY/EFA) or Deribit DVOL (for BTC). CSVs live in ObjectStore
under 'yf/<ticker>.csv' in 'date,close' format.
"""
def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live: bool) -> SubscriptionDataSource:
# config.symbol.value is the ticker we passed to add_data (e.g. "GC_F")
return SubscriptionDataSource(
f"yf/{config.symbol.value}.csv",
SubscriptionTransportMedium.OBJECT_STORE,
FileFormat.CSV
)
def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live: bool) -> BaseData:
if not line or not line.strip():
return None
if line[0] == "d": # header row "date,close"
return None
try:
parts = line.split(",")
bar_date = datetime.strptime(parts[0], "%Y-%m-%d")
close = float(parts[1])
except (ValueError, IndexError):
return None
bar = YFCustomData()
bar.symbol = config.symbol
# v15 (2026-04-11): REVERTED to next-day delivery. This is needed for
# Condition B matched-settings diagnostic vs offline asymmetric fix
# which uses D-1's close for signal. v13-style bar delivery (next day)
# gives exactly that behavior naturally (bar for D arrives at midnight
# UTC of D+1, so at 19:10 UTC of D the rolling window's latest is D-1).
bar.time = bar_date
bar.end_time = bar_date + timedelta(days=1)
bar.value = close
return bar
# ============================================================================
# FIX LOG (vs original Mia code):
# v1: Daily close data for z-score (was hourly → wrong window)
# v2: Price cache in on_data (scheduled event had empty slice)
# v3: Concurrent positions (MT5 live logic stacks positions)
# + per-position timed exit with counter-orders
# + configurable position sizing
# + enhanced logging for MT5 comparison
# v4: Exchange-specific daily close times (was single 18:00 UTC for all)
# + Warmup guard to prevent trades during warmup period
# + Reduces z-score divergence vs MT5 (timing was main source)
# v5: C240E fred_subtract inner-join fix (was two misaligned RollingWindows)
# + Pre-computes DGS10-T10YIE on same-day pairs only
# + Matches MT5's fred_subtract: s1.join(s2, how='inner')
# v6: Micro Futures (MGC, MES) replace ETF proxies (GLD, SPY)
# + ATR-based dynamic position sizing (matches MT5 --equal-risk --risk-pct 0.01)
# + Proper continuous contract handling for trading targets
# + EUR→USD conversion for cross-currency risk calculation
# v7: ATR fix — proper True Range from daily H/L/C (was close-to-close)
# + Tracks daily high/low/close from hourly OHLC bars per symbol
# + Computes ATR as SMA of True Range: max(H-L, |H-Cprev|, |L-Cprev|)
# v8: OANDA spot CFDs replace micro futures for trading targets
# + XAUUSD (spot gold CFD) replaces MGC — same price index as MT5 GAUUSD
# + SPX500USD (S&P 500 CFD) replaces MES — same price index as MT5 SP500
# + Eliminates futures basis/roll noise that caused 15% win/loss flips
# + No more futures chain resolution needed for trading targets
# + Simpler code path: CFDs are always tradable, no expiry handling
# v9: Match MT5 sizing logic exactly (from portfolio_backtest_live_logic.py)
# + Account scaled 100× to €75,000: QC enforces lot_size=1 for XAUUSD
# (orders must be integer oz). At €750: qty=0.4 → rejected by QC engine.
# At €75K: qty=40.5 → rounds to 41 oz, smooth compounding.
# Compare PERCENTAGE returns with MT5 benchmark (+2316%).
# + ATR period: 14 → 20 (matches MT5 compute_atr(df, period=20))
# + Margin cap: 8% → 5% (matches MT5 default max_margin_pct=0.05)
# + SPX500USD lot_size=0.1 → round to 0.1 steps (min 0.1).
# v10: CRITICAL FIX — set_brokerage_model(OANDA) for 50× CFD leverage
# + v9 used DefaultBrokerageModel (2× leverage) → all XAUUSD trades were 1 oz
# + OANDA model provides 50× leverage → proper ATR-based sizing works
# + C302 (GDX) disabled — OANDA can't trade Equity (only ~€300 MT5 profit)
# + NO exchange_open check on exits (it blocked CFD exits → stuck positions)
# + MarketOnOpen not an issue since C302 (only equity target) is disabled
# + Added debug logging in _compute_quantity for sizing verification
# v11: TIMEZONE FIX — algorithm timezone set to UTC (was NY → 4h shift)
# + MT5 schedule times are UTC. QC defaulted to America/New_York.
# schedule_hour=19 fired at 19:00 ET = 23:00 UTC → AFTER Friday close.
# All 199 Friday entries (~20% of total) were rejected as MarketOnOpen.
# + Now set_time_zone("Etc/UTC") → entries at 19:xx UTC, always in-session.
# + Weekend exit skip: don't attempt market_order on Sat / Sun<22:00 UTC
# for CFDs/Forex. Avoids 10,000+ Invalid MarketOnOpen spam orders.
# Positions stay in _open_positions, retry once market reopens Sun 22:00.
# + BTCUSD (crypto) exempt from weekend skip — trades 24/7.
# ============================================================================
@dataclass
class StrategyConfig:
"""Configuration for a single mean-reversion strategy"""
name: str
indicator_symbol_1: str
indicator_symbol_2: str
target_symbol: str
window: int # Rolling window in TRADING DAYS
threshold: float
exit_hours: int
schedule_hour: int # UTC
schedule_minute: int # UTC
direction: Literal["both", "positive"]
direction_on_positive_trigger: Literal["buy", "sell"]
weekdays_only: bool = True
@dataclass
class OpenPosition:
"""Individual tracked position (concurrent positions allowed per strategy)"""
position_id: int
strategy_name: str
target_symbol_str: str # e.g. "NZDUSD", "XAUUSD", "GDX"
order_symbol: Symbol # actual QC Symbol used for the order
entry_time: datetime
exit_time: datetime
direction: int # +1 long, -1 short
quantity: float # absolute quantity ordered
entry_price: float
z_score: float
class MacroMeanReversionPortfolio(QCAlgorithm):
"""
9-Strategy Mean Reversion Portfolio — IB via QuantConnect.
v11: Timezone fix (UTC) + weekend exit skip.
v9 bug: DefaultBrokerageModel used 2× leverage for CFDs → all
XAUUSD trades capped at 1 oz. v10 sets leverage=50 per CFD/Forex
via add_cfd()/add_forex() params. Keeps DefaultBrokerageModel to
support Equity (GDX) + MarketOnOpen orders.
Compare PERCENTAGE returns with MT5 benchmark (+2316%).
Mirrors the MT5 live-logic:
- Concurrent positions per strategy (no dedup, no caps — same as MT5)
- Timed exits per individual position
- Daily z-score calculation matching pandas rolling().std() (ddof=1)
- ATR(14) from daily H/L/C True Range for position sizing
- Spot CFDs match MT5 CFD pricing (no futures basis/roll noise)
"""
# ── Risk Configuration ────────────────────────────────────────────────
# v15-matched (2026-04-11): Condition B diagnostic — match offline's
# asymmetric-fix settings exactly. Signal uses D-1 close (v13 bar delivery
# reverted), 1% risk, margin cap effectively disabled, balance-denominated.
# Goal: see if return converges with offline's +1458% when margin/leverage
# mechanics are equalized.
RISK_PCT = 0.005
ATR_PERIOD = 20
MAX_MARGIN_PER_TRADE = 10.0
USE_BALANCE_FOR_SIZING = True
USE_STRATEGY_RISK_WEIGHTS = False
USE_STATIC_V5_WEIGHTS = False
STATIC_WEIGHTS = {}
DUMP_ZSCORES = False
# ── OBSERVABILITY ONLY — NOT SIZING (2026-04-10) ──────────────────────
# Custom chart dump for diagnostic z-score comparison. Flag is set above
# in the Risk Configuration block because v14 flipped it back on.
# Contract multiplier: ATR * multiplier = USD risk per unit
# XAUUSD: 1 unit = 1 troy oz gold, $1 move = $1 per unit
# SPX500USD: 1 unit = 1 index point, $1 move = $1 per unit
# GDX: equity, $1 per $1 move per share
# Forex: 1 unit, ATR in USD directly
# BTCUSD: 1 BTC, ATR in USD directly
CONTRACT_MULTIPLIER = {
"XAUUSD": 1, # OANDA spot gold CFD (1 unit = 1 oz)
"SPX500USD": 1, # v8: S&P 500 CFD (1 unit = 1 index point)
# "GDX": 1, # v10: disabled — OANDA can't trade Equity
"NZDUSD": 1,
"GBPUSD": 1,
"BTCUSD": 1,
}
# ── CFDs: OANDA spot instruments (v8 trading targets) ─────────────────
OANDA_CFDS = {"XAUUSD", "SPX500USD"}
# ── Futures: INDICATOR-ONLY (v12: emptied — now custom data from yfinance) ──
INDICATOR_FUTURES = set()
# ── v12: yfinance/Deribit custom data routing ──
# Maps strategy-config indicator_symbol_X -> ObjectStore key (without yf/ prefix)
# These files are pre-uploaded via fetch_custom_data.py + `lean cloud object-store set`.
YF_CUSTOM_MAP = {
"GC": "GC_F",
"CL": "CL_F",
"HG": "HG_F",
"SPY": "SPY",
"XLV": "XLV",
"EFA": "EFA",
"BVIV": "DVOL_BTC", # Deribit BTC DVOL (what the offline config calls deribit_volatility_index)
}
# ── Exchange-specific daily close times (UTC) ────────────────────────
DAILY_CLOSE_TIMES = {
# Futures (indicator-only)
"GC": (18, 30),
"HG": (18, 30),
"CL": (18, 30),
# v8: OANDA CFDs (trading targets)
"XAUUSD": (21, 0), # NY 5pm rollover, same as forex
"SPX500USD": (20, 0), # NYSE close 4pm ET = 20:00 UTC
# US Equities / ETFs (indicator and/or trading targets)
"SPY": (20, 0),
"GDX": (20, 0),
"XLV": (20, 0),
"EFA": (20, 0),
# Forex (trading targets)
"NZDUSD": (21, 0),
"GBPUSD": (21, 0),
# Crypto
"BTCUSD": (0, 0),
}
def initialize(self):
self.set_start_date(2010, 1, 1)
self.set_end_date(2026, 4, 5)
self.set_account_currency("EUR")
# v11: Algorithm timezone = UTC. MT5 strategy times are UTC.
# QC default was America/New_York → schedule_hour=19 fired at 23:xx UTC
# on Friday, AFTER CFD market close → all Friday entries rejected.
self.set_time_zone("Etc/UTC")
# v10: OANDA brokerage model — provides 50× leverage for CFDs/Forex.
# Caveat: rejects Equity orders → C302 (GDX) disabled.
self.set_brokerage_model(BrokerageName.OANDA_BROKERAGE, AccountType.MARGIN)
# v9: Scaled 100× — QC enforces lot_size=1 for XAUUSD (integer oz only).
# At €750: qty=0.4 → rejected. At €75K: qty=40.5 → smooth compounding.
self.set_cash(75000)
self.set_warm_up(timedelta(days=250))
# ── Data structures ────────────────────────────────────────────────
self._strategies = {}
self._indicator_daily = {} # RollingWindow[float] per symbol — close only, for z-score
self._last_indicator_date = {}
self._latest_price = {} # Hourly price cache
self._symbols = {} # str → QC Symbol
self._latest_future_symbol = {} # "GC"/"CL"/"HG" → last known front-month Symbol (indicators only)
# ── v7: Daily OHLC tracking for True Range ATR ─────────────────────
self._intraday_hlc = {} # symbol_str → {"high", "low", "close", "date"}
self._daily_true_ranges = {} # symbol_str → RollingWindow[float] of daily TRs
self._prev_daily_close = {} # symbol_str → previous day's close (for TR calc)
# ── v12: date-aligned indicator history for custom-data z-score ────
# Maps symbol_str -> { date: close_value }. Populated alongside
# _indicator_daily (RollingWindow) in on_data. _calculate_z_score
# uses this for date-based inner-join when both indicators are custom
# data, fixing the position-based alignment drift from before v12.
self._indicator_by_date: dict = {} # {symbol_str: {date: float}}
# ── v3: Concurrent position tracking ───────────────────────────────
self._open_positions: List[OpenPosition] = []
self._next_position_id = 1
self._trade_log = []
# ── V4: per-strategy inverse-vol weights (lazy-init on first trade) ──
self._strategy_risk_weights: Optional[dict] = None
# ── Z-score tracking for comparison ────────────────────────────────
self._daily_z_scores = {}
# ── v5: Pre-computed FRED subtract series (inner-join on date) ─────
self._fred_subtract_daily = RollingWindow[float](300)
self._fred_subtract_last_date = None
self._fred_today_values = {}
self._setup_strategies()
self._subscribe_data()
self._schedule_strategies()
self.universe_settings.extended_market_hours = True
self.debug(f"v11 | EUR {self.portfolio.cash:,.0f} | Risk {self.RISK_PCT*100:.0f}% | ATR({self.ATR_PERIOD}) | OANDA 50x | TZ=UTC")
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.at(23, 30),
self._log_daily_summary
)
# ── Strategy Configuration ─────────────────────────────────────────────
def _setup_strategies(self):
configs = [
StrategyConfig(
name="C063",
indicator_symbol_1="HG", indicator_symbol_2="CL",
target_symbol="NZDUSD",
window=60, threshold=2.5, exit_hours=48,
schedule_hour=19, schedule_minute=0,
direction="both", direction_on_positive_trigger="buy"
),
StrategyConfig(
name="C064",
indicator_symbol_1="GC", indicator_symbol_2="CL",
target_symbol="XAUUSD", # v9: OANDA spot gold CFD (same price as MT5 GAUUSD.s)
window=60, threshold=1.5, exit_hours=48,
schedule_hour=19, schedule_minute=10,
direction="both", direction_on_positive_trigger="buy"
),
StrategyConfig(
name="C166D",
indicator_symbol_1="XLV", indicator_symbol_2="SPY",
target_symbol="SPX500USD", # v8: S&P 500 CFD (was MES futures)
window=63, threshold=2.0, exit_hours=48,
schedule_hour=19, schedule_minute=35,
direction="both", direction_on_positive_trigger="sell"
),
StrategyConfig(
name="C185C",
indicator_symbol_1="EFA", indicator_symbol_2="SPY",
target_symbol="NZDUSD",
window=63, threshold=2.0, exit_hours=48,
schedule_hour=19, schedule_minute=20,
direction="both", direction_on_positive_trigger="buy"
),
StrategyConfig(
name="C185D",
indicator_symbol_1="EFA", indicator_symbol_2="SPY",
target_symbol="GBPUSD",
window=63, threshold=2.0, exit_hours=48,
schedule_hour=19, schedule_minute=15,
direction="both", direction_on_positive_trigger="buy"
),
StrategyConfig(
name="C240E",
indicator_symbol_1="FRED/DGS10", indicator_symbol_2="FRED/T10YIE",
target_symbol="SPX500USD", # v8: S&P 500 CFD (was MES futures)
window=126, threshold=2.0, exit_hours=48,
schedule_hour=19, schedule_minute=25,
direction="both", direction_on_positive_trigger="sell"
),
# v10: C302 DISABLED — targets GDX (Equity), OANDA model rejects Equity orders.
# C302 contributes only ~€300 in MT5 (vs C064's €10,308). Negligible impact.
# StrategyConfig(
# name="C302",
# indicator_symbol_1="GC", indicator_symbol_2="CL",
# target_symbol="GDX",
# window=60, threshold=2.0, exit_hours=120,
# schedule_hour=19, schedule_minute=30,
# direction="both", direction_on_positive_trigger="buy"
# ),
StrategyConfig(
name="C304",
indicator_symbol_1="HG", indicator_symbol_2="CL",
target_symbol="NZDUSD",
window=60, threshold=2.0, exit_hours=120,
schedule_hour=19, schedule_minute=40,
direction="both", direction_on_positive_trigger="buy"
),
StrategyConfig(
name="C399",
indicator_symbol_1="BVIV", indicator_symbol_2="CONSTANT",
target_symbol="BTCUSD",
window=60, threshold=2.0, exit_hours=120,
schedule_hour=22, schedule_minute=45,
direction="positive", direction_on_positive_trigger="buy",
weekdays_only=False
),
]
for c in configs:
self._strategies[c.name] = c
# ── Data Subscription ──────────────────────────────────────────────────
def _subscribe_data(self):
added = set()
for strategy in self._strategies.values():
for sym in [strategy.indicator_symbol_1, strategy.indicator_symbol_2]:
if sym == "CONSTANT" or sym in added:
continue
self._add_symbol(sym)
added.add(sym)
if strategy.target_symbol not in added:
self._add_symbol(strategy.target_symbol)
added.add(strategy.target_symbol)
# v10: removed per-symbol logging
def _add_symbol(self, symbol_str: str):
try:
# v12: yfinance custom data (futures + ETFs + BTC DVOL)
if symbol_str in self.YF_CUSTOM_MAP:
ticker = self.YF_CUSTOM_MAP[symbol_str]
sym = self.add_data(YFCustomData, ticker, Resolution.DAILY).symbol
self._symbols[symbol_str] = sym
self._indicator_daily[symbol_str] = RollingWindow[float](300)
self._last_indicator_date[symbol_str] = None
return
if symbol_str.startswith("FRED/"):
fred_code = symbol_str.split("/")[1]
sym = self.add_data(Fred, fred_code, Resolution.DAILY).symbol
self._symbols[symbol_str] = sym
self._indicator_daily[symbol_str] = RollingWindow[float](300)
self._last_indicator_date[symbol_str] = None
pass # FRED added
elif symbol_str in self.INDICATOR_FUTURES:
fut_map = {
"GC": Futures.Metals.GOLD,
"CL": Futures.Energy.CRUDE_OIL_WTI,
"HG": Futures.Metals.COPPER,
}
future = self.add_future(
fut_map[symbol_str],
Resolution.HOUR,
data_mapping_mode=DataMappingMode.OPEN_INTEREST,
data_normalization_mode=DataNormalizationMode.RAW,
contract_depth_offset=0
)
future.set_filter(0, 90)
self._symbols[symbol_str] = future.symbol
self._indicator_daily[symbol_str] = RollingWindow[float](300)
self._last_indicator_date[symbol_str] = None
pass # Future added
elif symbol_str in self.OANDA_CFDS:
# OANDA spot CFDs — XAUUSD (gold), SPX500USD (S&P 500)
sym = self.add_cfd(symbol_str, Resolution.HOUR, Market.OANDA).symbol
self._symbols[symbol_str] = sym
self._indicator_daily[symbol_str] = RollingWindow[float](300)
self._last_indicator_date[symbol_str] = None
pass # CFD added
elif symbol_str in ["NZDUSD", "GBPUSD", "BTCUSD"]:
if symbol_str == "BTCUSD":
sym = self.add_crypto("BTCUSD", Resolution.HOUR, Market.COINBASE).symbol
else:
sym = self.add_forex(symbol_str, Resolution.HOUR, Market.OANDA).symbol
self._symbols[symbol_str] = sym
self._indicator_daily[symbol_str] = RollingWindow[float](300)
self._last_indicator_date[symbol_str] = None
pass # Forex/Crypto added
else:
# Equities: SPY, GDX, XLV, EFA, BVIV
sym = self.add_equity(symbol_str, Resolution.HOUR).symbol
self._symbols[symbol_str] = sym
self._indicator_daily[symbol_str] = RollingWindow[float](300)
self._last_indicator_date[symbol_str] = None
pass # Equity added
# v7: Initialize daily True Range tracking for trading targets
if symbol_str in self.CONTRACT_MULTIPLIER:
self._daily_true_ranges[symbol_str] = RollingWindow[float](self.ATR_PERIOD + 5)
pass # ATR tracking enabled
except Exception as e:
self.error(f"Failed to add {symbol_str}: {str(e)}")
if symbol_str == "BVIV":
pass # BVIV unavailable
# ── Scheduling ─────────────────────────────────────────────────────────
def _schedule_strategies(self):
for strategy in self._strategies.values():
if strategy.weekdays_only:
for day in [DayOfWeek.MONDAY, DayOfWeek.TUESDAY, DayOfWeek.WEDNESDAY,
DayOfWeek.THURSDAY, DayOfWeek.FRIDAY]:
self.schedule.on(
self.date_rules.every(day),
self.time_rules.at(strategy.schedule_hour, strategy.schedule_minute),
lambda s=strategy: self._check_strategy(s))
else:
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.at(strategy.schedule_hour, strategy.schedule_minute),
lambda s=strategy: self._check_strategy(s))
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.every(timedelta(hours=1)),
self._check_timed_exits)
# v4: Exchange-specific daily close captures
close_groups = {}
for sym_str, (h, m) in self.DAILY_CLOSE_TIMES.items():
close_groups.setdefault((h, m), []).append(sym_str)
for (h, m), symbols in close_groups.items():
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.at(h, m),
lambda syms=symbols, hh=h, mm=m: self._capture_daily_close(syms, hh, mm))
# ── Data Handling ──────────────────────────────────────────────────────
def on_data(self, data: Slice):
"""Cache latest prices every hour; store FRED + yfinance custom directly (already daily)."""
today = self.time.date()
# v12: yfinance custom data (GC/CL/HG/SPY/XLV/EFA/BVIV) → direct daily close.
# Populate BOTH _indicator_daily (RollingWindow, for backward compat with
# _calculate_z_score fallback) AND _indicator_by_date (dict keyed by bar's
# own date, for date-aligned inner-join). The bar date is bar.end_time - 1
# because we set end_time = bar_date + 1day in reader().
for symbol_str, symbol in self._symbols.items():
if symbol_str not in self.YF_CUSTOM_MAP:
continue
if symbol in data and data[symbol] is not None:
bar = data[symbol]
price = float(bar.value) if hasattr(bar, "value") and bar.value else 0.0
if price <= 0 and hasattr(bar, "price"):
price = float(bar.price)
if price > 0:
self._latest_price[symbol_str] = price
# Use the bar's OWN date (from bar.time), not today's algorithm date
bar_date = bar.time.date() if hasattr(bar, "time") else today
if self._last_indicator_date.get(symbol_str) != bar_date:
self._indicator_daily[symbol_str].add(price)
self._last_indicator_date[symbol_str] = bar_date
# Also store in date-keyed dict for date-aligned z-score lookup
if symbol_str not in self._indicator_by_date:
self._indicator_by_date[symbol_str] = {}
self._indicator_by_date[symbol_str][bar_date] = price
# FRED → direct to daily RollingWindow + v5 inner-join subtract
for symbol_str, symbol in self._symbols.items():
if symbol_str.startswith("FRED/") and symbol in data:
if self._last_indicator_date.get(symbol_str) != today:
value = data[symbol].value
self._indicator_daily[symbol_str].add(value)
self._last_indicator_date[symbol_str] = today
fred_code = symbol_str.split("/")[1]
if fred_code in ("DGS10", "T10YIE"):
if self._fred_subtract_last_date != today:
self._fred_today_values = {}
self._fred_subtract_last_date = today
self._fred_today_values[fred_code] = value
if "DGS10" in self._fred_today_values and "T10YIE" in self._fred_today_values:
diff = self._fred_today_values["DGS10"] - self._fred_today_values["T10YIE"]
self._fred_subtract_daily.add(diff)
# Indicator Futures → cache latest price from mapped contract (for z-score only)
for symbol_str in self.INDICATOR_FUTURES:
if symbol_str not in self._symbols:
continue
canonical = self._symbols[symbol_str]
security = self.securities.get(canonical)
if security is not None and security.mapped is not None:
mapped_contract = self.securities.get(security.mapped)
if mapped_contract is not None and mapped_contract.price > 0:
self._latest_price[symbol_str] = mapped_contract.price
self._latest_future_symbol[symbol_str] = security.mapped
else:
if data.future_chains.contains_key(canonical):
chain = data.future_chains[canonical]
if len(chain) > 0:
min_expiry = self.time + timedelta(days=5)
valid = [c for c in chain if c.expiry > min_expiry and c.last_price > 0]
if valid:
contract = sorted(valid, key=lambda x: x.expiry)[0]
self._latest_price[symbol_str] = contract.last_price
self._latest_future_symbol[symbol_str] = contract.symbol
# CFDs, Equities, Forex, Crypto → cache latest price + update intraday H/L/C
for symbol_str, symbol in self._symbols.items():
if (symbol_str.startswith("FRED/") or
symbol_str in self.INDICATOR_FUTURES or
symbol_str in self.YF_CUSTOM_MAP):
continue # v12: custom data symbols handled above
if symbol in data and data[symbol] is not None:
bar = data[symbol]
price = bar.close if hasattr(bar, 'close') and bar.close > 0 else bar.price
if price > 0:
self._latest_price[symbol_str] = price
# v7: Update intraday H/L/C for ATR-eligible symbols
if symbol_str in self._daily_true_ranges:
self._update_intraday_hlc_from_bar(symbol_str, bar)
# ── v7: Intraday OHLC tracking for True Range ─────────────────────────
def _update_intraday_hlc_from_bar(self, symbol_str: str, bar):
"""Update intraday H/L/C from a TradeBar (CFDs, equities, forex, crypto)."""
today = self.time.date()
# Try to use bar OHLC if available
if hasattr(bar, 'high') and hasattr(bar, 'low') and bar.close > 0:
h = bar.high
l = bar.low
c = bar.close
else:
# Fallback: use close/price only
c = bar.close if hasattr(bar, 'close') and bar.close > 0 else bar.price
if c <= 0:
return
h = c
l = c
hlc = self._intraday_hlc.get(symbol_str)
if hlc is None or hlc.get("date") != today:
# New day — finalize previous day's bar first
if hlc is not None and hlc.get("date") is not None:
self._finalize_daily_bar(symbol_str, hlc)
self._intraday_hlc[symbol_str] = {
"high": h, "low": l, "close": c, "date": today
}
else:
hlc["high"] = max(hlc["high"], h)
hlc["low"] = min(hlc["low"], l)
hlc["close"] = c
def _finalize_daily_bar(self, symbol_str: str, hlc: dict):
"""
Compute True Range for the completed daily bar and store it.
TR = max(H - L, |H - Cprev|, |L - Cprev|)
"""
high = hlc["high"]
low = hlc["low"]
close = hlc["close"]
prev_close = self._prev_daily_close.get(symbol_str)
if prev_close is not None and prev_close > 0:
tr = max(high - low,
abs(high - prev_close),
abs(low - prev_close))
else:
tr = high - low
if tr > 0:
rw = self._daily_true_ranges.get(symbol_str)
if rw is not None:
rw.add(tr)
self._prev_daily_close[symbol_str] = close
def _capture_daily_close(self, symbols_to_capture: list, hour: int, minute: int):
"""
v4: Store ONE price per trading day, at the exchange-correct close time.
v7: Also finalize the intraday H/L/C bar for ATR-eligible symbols.
"""
today = self.time.date()
for symbol_str in symbols_to_capture:
if symbol_str not in self._indicator_daily:
continue
if self._last_indicator_date.get(symbol_str) == today:
continue
price = self._latest_price.get(symbol_str)
if price is not None and price > 0:
self._indicator_daily[symbol_str].add(price)
self._last_indicator_date[symbol_str] = today
# v7: Finalize intraday bar at daily close time
if symbol_str in self._daily_true_ranges:
hlc = self._intraday_hlc.get(symbol_str)
if hlc is not None and hlc.get("date") == today:
self._finalize_daily_bar(symbol_str, hlc)
self._intraday_hlc[symbol_str] = None
# ── ATR Calculation (v7: True Range) ──────────────────────────────────
def _compute_atr(self, target_str: str) -> Optional[float]:
"""
v7: ATR(14) from daily True Range values.
TR = max(H-L, |H-Cprev|, |L-Cprev|) aggregated from hourly OHLC bars.
"""
rw = self._daily_true_ranges.get(target_str)
if rw is not None and rw.count >= self.ATR_PERIOD:
total = sum(rw[i] for i in range(self.ATR_PERIOD))
atr = total / self.ATR_PERIOD
if atr > 0:
return atr
# Fallback: close-to-close × 2.5 (only during early warmup)
closes = self._indicator_daily.get(target_str)
if closes is not None and closes.count >= self.ATR_PERIOD + 1:
true_ranges = []
for i in range(self.ATR_PERIOD):
tr = abs(closes[i] - closes[i + 1])
true_ranges.append(tr)
atr = sum(true_ranges) / len(true_ranges)
atr *= 2.5
return atr if atr > 0 else None
return None
def _get_eur_usd_rate(self) -> float:
"""Get EUR/USD rate for risk conversion (EUR account → USD instruments)."""
try:
usd_to_eur = self.portfolio.cash_book["USD"].conversion_rate
if usd_to_eur > 0:
return 1.0 / usd_to_eur
except Exception:
pass
return 1.08
def _compute_strategy_weights(self):
"""
V4: inverse-vol risk parity weights from ATR/price*sqrt(252).
V5: STATIC_WEIGHTS override if USE_STATIC_V5_WEIGHTS is True.
Normalization: mean-preserving (average weight = 1), total risk budget unchanged.
"""
if self.USE_STATIC_V5_WEIGHTS:
raw = dict(self.STATIC_WEIGHTS)
# Fill any strategies not listed with 1.0 so nothing is accidentally zeroed
for strategy in self._strategies.values():
raw.setdefault(strategy.name, 1.0)
else:
raw = {}
for strategy in self._strategies.values():
target = strategy.target_symbol
atr = self._compute_atr(target)
price = self._latest_price.get(target, 0)
if atr is None or atr <= 0 or price <= 0:
raw[strategy.name] = None
continue
annual_vol = (atr / price) * (252.0 ** 0.5)
if annual_vol <= 0:
raw[strategy.name] = None
else:
raw[strategy.name] = 1.0 / annual_vol
valid = {k: v for k, v in raw.items() if v is not None}
if not valid:
self._strategy_risk_weights = {k: 1.0 for k in raw}
self.debug("V5/V4 weights: data unavailable, falling back to equal weights")
return
mean = sum(valid.values()) / len(valid)
weights = {}
for k, v in raw.items():
weights[k] = (v / mean) if v is not None else 1.0
self._strategy_risk_weights = weights
weights_str = ", ".join(f"{k}={v:.2f}" for k, v in weights.items())
label = "V5 static" if self.USE_STATIC_V5_WEIGHTS else "V4 inverse-vol"
self.debug(f"{label} weights (mean-preserving): {weights_str}")
def _compute_quantity(self, target_str: str, strategy: Optional['StrategyConfig'] = None) -> Optional[float]:
"""
ATR-based dynamic position sizing (matches MT5 --equal-risk --risk-pct).
Two constraints applied (the stricter one wins):
1. ATR-based: qty = risk_usd / (ATR * contract_multiplier)
2. Margin cap: qty limited so margin_used < MAX_MARGIN_PER_TRADE * risk_base
V4: Optional per-strategy inverse-vol weight multiplier.
"""
atr = self._compute_atr(target_str)
if atr is None:
return None
multiplier = self.CONTRACT_MULTIPLIER.get(target_str, 1)
dollar_risk_per_unit = atr * multiplier
if dollar_risk_per_unit <= 0:
return None
# V2: risk base switchable between equity and balance (closed-PnL equity).
if self.USE_BALANCE_FOR_SIZING:
risk_base_eur = self.portfolio.total_portfolio_value - self.portfolio.total_unrealized_profit
else:
risk_base_eur = self.portfolio.total_portfolio_value
# V4: per-strategy inverse-vol weight (lazy-init on first post-warmup call)
strategy_weight = 1.0
if self.USE_STRATEGY_RISK_WEIGHTS:
if self._strategy_risk_weights is None:
self._compute_strategy_weights()
if strategy is not None:
strategy_weight = self._strategy_risk_weights.get(strategy.name, 1.0)
risk_eur = risk_base_eur * self.RISK_PCT * strategy_weight
eur_usd = self._get_eur_usd_rate()
risk_usd = risk_eur * eur_usd
# Constraint 1: ATR-based quantity
atr_qty = risk_usd / dollar_risk_per_unit
# Constraint 2: Margin cap per trade (same risk base, weighted by strategy)
max_margin_usd = risk_base_eur * self.MAX_MARGIN_PER_TRADE * strategy_weight * eur_usd
entry_price = self._latest_price.get(target_str, 0)
if entry_price > 0:
if target_str in self.OANDA_CFDS:
# OANDA CFD margin: ~5% for gold, ~5% for indices
margin_per_unit = entry_price * 0.05
elif target_str in ("NZDUSD", "GBPUSD"):
margin_per_unit = entry_price * 0.02
elif target_str == "BTCUSD":
margin_per_unit = entry_price * 0.50
else:
margin_per_unit = entry_price * 0.50
if margin_per_unit > 0:
margin_qty = max_margin_usd / margin_per_unit
raw_qty = min(atr_qty, margin_qty)
else:
raw_qty = atr_qty
else:
raw_qty = atr_qty
# v9: QC lot sizes from symbol-properties-database.csv
# XAUUSD lot_size=1 (integer oz enforced by engine!)
# SPX500USD lot_size=0.1 (0.1-unit steps)
if target_str == "XAUUSD":
# OANDA XAUUSD: lot_size=1 — must be integer oz
qty = max(1, round(raw_qty))
elif target_str == "SPX500USD":
# OANDA SPX500USD: lot_size=0.1 — 0.1-unit steps
qty = max(0.1, round(raw_qty, 1))
elif target_str in ("NZDUSD", "GBPUSD"):
# OANDA Forex: lot_size=1, min_order=0.00001 (but use 1 unit as practical min)
qty = max(1, round(raw_qty))
elif target_str == "BTCUSD":
# Coinbase: min_order=0.01 BTC
qty = max(0.01, round(raw_qty, 4))
else:
qty = max(1, round(raw_qty))
return qty
# ── Z-Score Calculation ────────────────────────────────────────────────
def _calculate_z_score(self, strategy: StrategyConfig):
"""Z-score from daily indicator data. Uses sample std (ddof=1).
v12: Date-aligned inner-join path when both indicators are in
_indicator_by_date (custom data). Falls back to position-based
iteration for FRED and other native paths.
"""
data1 = self._indicator_daily.get(strategy.indicator_symbol_1)
data2 = self._indicator_daily.get(strategy.indicator_symbol_2)
if data1 is None or data1.count < strategy.window:
return None, None, None, None
if strategy.name == "C240E":
if self._fred_subtract_daily.count < strategy.window:
return None, None, None, None
values = [self._fred_subtract_daily[i]
for i in range(strategy.window)]
elif strategy.indicator_symbol_2 == "CONSTANT":
# C399: single-series case, no ratio. Prefer date-based if available.
if strategy.indicator_symbol_1 in self._indicator_by_date:
d = self._indicator_by_date[strategy.indicator_symbol_1]
sorted_dates = sorted(d.keys(), reverse=True) # newest first
values = [d[dt] for dt in sorted_dates[:strategy.window]]
else:
values = [data1[i] for i in range(min(strategy.window, data1.count))]
elif (strategy.indicator_symbol_1 in self._indicator_by_date
and strategy.indicator_symbol_2 in self._indicator_by_date):
# v12 date-aligned path: inner-join the two symbols by date.
d1 = self._indicator_by_date[strategy.indicator_symbol_1]
d2 = self._indicator_by_date[strategy.indicator_symbol_2]
common_dates = sorted(set(d1.keys()) & set(d2.keys()), reverse=True) # newest first
if len(common_dates) < strategy.window:
return None, None, None, None
values = []
for dt in common_dates[:strategy.window]:
v2 = d2[dt]
if v2 != 0:
values.append(d1[dt] / v2)
else:
# Legacy fallback: position-based iteration (native data paths)
if data2 is None or data2.count < strategy.window:
return None, None, None, None
values = []
for i in range(min(strategy.window, data1.count, data2.count)):
if data2[i] != 0:
values.append(data1[i] / data2[i])
if len(values) < strategy.window:
return None, None, None, None
current_value = values[0]
window_values = values[:strategy.window]
n = len(window_values)
mean = sum(window_values) / n
variance = sum((x - mean) ** 2 for x in window_values) / (n - 1)
std = variance ** 0.5
if std == 0:
return None, None, None, None
return (current_value - mean) / std, current_value, mean, std
# ── Signal Logic ───────────────────────────────────────────────────────
def _get_signal(self, strategy: StrategyConfig, z_score: float) -> int:
if abs(z_score) <= strategy.threshold:
return 0
if strategy.direction == "positive" and z_score < 0:
return 0
if z_score > strategy.threshold:
return 1 if strategy.direction_on_positive_trigger == "buy" else -1
else:
return -1 if strategy.direction_on_positive_trigger == "buy" else 1
# ── Strategy Check ─────────────────────────────────────────────────────
def _check_strategy(self, strategy: StrategyConfig):
if self.is_warming_up:
return
z_score, raw_value, rolling_mean, rolling_std = self._calculate_z_score(strategy)
if z_score is None:
return
date_key = self.time.date()
if date_key not in self._daily_z_scores:
self._daily_z_scores[date_key] = {}
self._daily_z_scores[date_key][strategy.name] = {
'indicator': f"{strategy.indicator_symbol_1}/{strategy.indicator_symbol_2}",
'raw_value': raw_value,
'z_score': z_score,
'threshold': strategy.threshold,
'rolling_mean': rolling_mean,
'rolling_std': rolling_std,
}
# === OBSERVABILITY ONLY — NOT SIZING/SIGNAL (2026-04-10) ============
# Plot the raw z-score to a per-strategy custom chart so it can be fetched
# via the backtest charts API for external comparison against the offline
# run. Reads a value already computed above; no side effects on trading.
if self.DUMP_ZSCORES:
self.plot(f"Z_{strategy.name}", "z", float(z_score))
# === END OBSERVABILITY ================================================
signal = self._get_signal(strategy, z_score)
if signal != 0:
concurrent = sum(1 for p in self._open_positions
if p.strategy_name == strategy.name)
self._execute_trade(strategy, signal, z_score, concurrent)
# v10: removed "No signal" logging — saves 90%+ of log volume
# ── Trade Execution (v8: simplified — no futures chain for trading) ───
def _execute_trade(self, strategy: StrategyConfig, signal: int,
z_score: float, concurrent: int):
if self.is_warming_up:
return
target_str = strategy.target_symbol
if target_str not in self._symbols:
self.error(f"{strategy.name}: {target_str} not subscribed")
return
# v8: Trading targets are all CFDs/Forex/Equity/Crypto — no futures
# resolution needed. The QC Symbol from _symbols is directly tradable.
symbol = self._symbols[target_str]
entry_price = self._latest_price.get(target_str, 0)
if entry_price <= 0:
return
# ATR-based dynamic position sizing (V4: pass strategy for weight lookup)
base_qty = self._compute_quantity(target_str, strategy)
if base_qty is None:
return
quantity = signal * base_qty
# Pre-trade margin check
margin_remaining = self.portfolio.margin_remaining
if target_str in self.OANDA_CFDS:
estimated_margin = abs(base_qty) * entry_price * 0.05
elif target_str in ("NZDUSD", "GBPUSD"):
estimated_margin = abs(base_qty) * entry_price * 0.02
else:
estimated_margin = abs(base_qty) * entry_price * 0.50
if margin_remaining < estimated_margin:
return
try:
ticket = self.market_order(symbol, quantity)
# v10: Don't track position if order was rejected
if ticket.status == OrderStatus.INVALID or ticket.status == OrderStatus.CANCELED:
return
exit_time = self.time + timedelta(hours=strategy.exit_hours)
pos = OpenPosition(
position_id=self._next_position_id,
strategy_name=strategy.name,
target_symbol_str=target_str,
order_symbol=symbol,
entry_time=self.time,
exit_time=exit_time,
direction=signal,
quantity=abs(quantity),
entry_price=entry_price,
z_score=z_score,
)
self._open_positions.append(pos)
self._next_position_id += 1
dir_str = "BUY" if signal > 0 else "SELL"
self.debug(f"{strategy.name} #{pos.position_id} {dir_str} {abs(quantity):.4g} {target_str} @ {entry_price:.2f}")
except Exception as e:
self.error(f"{strategy.name}: Order failed: {e}")
# ── Timed Exits (v8: simplified — no expired contract handling) ───────
def _check_timed_exits(self):
if self.is_warming_up:
return
# v11: Weekend skip for CFDs/Forex (algorithm time is now UTC).
# CFDs/Forex closed Sat 00:00 – Sun ~22:00 UTC. Don't attempt
# market_order during this window — OANDA converts to MarketOnOpen
# which it then rejects. Without this skip, ~10,000 Invalid orders
# accumulate over the backtest. BTCUSD exempt (trades 24/7).
dow = self.time.weekday() # Mon=0 … Sat=5, Sun=6
hour = self.time.hour
cfd_forex_closed = (
dow == 5 or # Saturday — all closed
(dow == 6 and hour < 22) or # Sunday before 22:00 UTC
(dow == 4 and hour >= 22) # Friday after 22:00 UTC
)
to_close = []
for pos in self._open_positions:
if self.time >= pos.exit_time:
# v11: skip non-crypto during known closed hours
if cfd_forex_closed and pos.target_symbol_str != "BTCUSD":
continue # stay in _open_positions, retry when market opens
to_close.append(pos)
actually_closed_ids = set()
for pos in to_close:
close_qty = -pos.direction * pos.quantity
order_symbol = pos.order_symbol
# v10: Check ticket status — catch any remaining edge-case rejects.
try:
ticket = self.market_order(order_symbol, close_qty)
if ticket.status == OrderStatus.INVALID or ticket.status == OrderStatus.CANCELED:
continue # keep in _open_positions, retry next hour
self._log_closed_trade(pos)
actually_closed_ids.add(pos.position_id)
except Exception as e:
self.error(f"Close failed #{pos.position_id}: {e}")
if actually_closed_ids:
self._open_positions = [p for p in self._open_positions
if p.position_id not in actually_closed_ids]
def _log_closed_trade(self, pos: OpenPosition):
"""Record a closed trade in the trade log."""
exit_price = self._latest_price.get(pos.target_symbol_str, 0)
hold_hours = (self.time - pos.entry_time).total_seconds() / 3600
self._trade_log.append({
"id": pos.position_id,
"strategy": pos.strategy_name,
"symbol": pos.target_symbol_str,
"direction": "buy" if pos.direction > 0 else "sell",
"entry": pos.entry_time.strftime("%Y-%m-%d %H:%M"),
"exit": self.time.strftime("%Y-%m-%d %H:%M"),
"entry_price": pos.entry_price,
"exit_price": exit_price,
"quantity": pos.quantity,
"hold_hours": round(hold_hours, 1),
"z_score": round(pos.z_score, 3),
})
# v10: no close logging — trade details are in the trade CSV
# ── Reporting ──────────────────────────────────────────────────────────
def _log_daily_summary(self):
pass # v10: removed — saves ~600 log lines
def on_end_of_algorithm(self):
starting_capital = 75000.0
final_equity = self.portfolio.total_portfolio_value
pct_return = ((final_equity - starting_capital) / starting_capital) * 100
self.debug(f"v11 FINAL: {len(self._trade_log)} trades | EUR {final_equity:,.0f} | {pct_return:+.1f}% (MT5: +2316%)")
# ObjectStore dump removed — QC CLI/API blocks download for non-institutional
# accounts. Replaced with per-strategy custom-chart plotting in _check_strategy
# above (gated by DUMP_ZSCORES), which can be fetched via backtests/chart/read.