| Overall Statistics |
|
Total Orders 364 Average Win 0.43% Average Loss -0.16% Compounding Annual Return 6.285% Drawdown 11.800% Expectancy 2.320 Start Equity 1000000 End Equity 2496489.93 Net Profit 149.649% Sharpe Ratio 0.466 Sortino Ratio 0.499 Probabilistic Sharpe Ratio 11.450% Loss Rate 9% Win Rate 91% Profit-Loss Ratio 2.63 Alpha -0.002 Beta 0.339 Annual Standard Deviation 0.059 Annual Variance 0.004 Information Ratio -0.594 Tracking Error 0.1 Treynor Ratio 0.082 Total Fees $572.78 Estimated Strategy Capacity $160000000.00 Lowest Capacity Asset GLD T3SKPOF94JFP Portfolio Turnover 0.15% Drawdown Recovery 606 |
from AlgorithmImports import *
from collections import defaultdict, deque
from datetime import timedelta
import math, json, os, csv
import numpy as np
# =============================
# Core Constants
# =============================
PHI2 = ((1 + 5**0.5) / 2.0) ** 2
STABILITY_THRESHOLD = 0.60
STABILITY_MARGIN = 0.01
UP_THRESHOLD = 0.60 # loosened for more capture
DOWN_THRESHOLD = 0.40
FALLBACK_COOLDOWN = 40
HIST_BINS = 20
# Risk parameters
BASE_STOP_PCT = 0.05
BASE_TAKE_PCT = 0.12 # widened slightly
# Portfolio guard (staged drawdown)
GLOBAL_DRAWDOWN_LIMITS = [0.10, 0.20, 0.25]
MIN_COOLDOWN_DAYS = 7
GLOBAL_COOLDOWN_DAYS = 30
# Volatility targeting
TARGET_VOL = 0.15 # raised from 0.10 → more growth
VOL_LOOKBACK = 60
VOL_WARMUP = 126 # ~6 months warmup
# Correlation filter
CORR_LOOKBACK = 60
CORR_SKIP = 0.90
CORR_TAPER = 0.75
# Leverage control
MAX_EFFECTIVE_LEVERAGE = 1.5
# =============================
# Helper Functions
# =============================
def sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
def shannon_entropy(p):
arr = np.array(p, dtype=float)
arr = arr[arr > 0]
return float(-(arr * np.log(arr)).sum()) if arr.size > 0 else 0.0
def tsallis_entropy(p, q: float = 2.0):
arr = np.array(p, dtype=float)
return float((1.0 - (arr**q).sum()) / (q - 1.0))
def kl_divergence(p, q):
p = np.array(p, dtype=float)
q = np.array(q, dtype=float)
mask = (p > 0) & (q > 0)
if not mask.any():
return 0.0
return float((p[mask] * np.log(p[mask] / q[mask])).sum())
def regime_stability(entropy_deltas):
s = sum(abs(x) for x in entropy_deltas)
return 1.0 / (1.0 + math.exp(-s / PHI2))
def spectral_resonance(returns):
if len(returns) < 8:
return 0.0
arr = np.array(returns, dtype=float)
arr -= arr.mean()
fft_vals = np.fft.fft(arr)
power = np.abs(fft_vals) ** 2
half = len(power) // 2
low_band = power[1: max(2, half // 4)]
total = power[1:half]
if total.sum() == 0:
return 0.0
return float(low_band.sum() / total.sum())
def wct_phi(delta_entropy_values):
total_shift = sum(abs(x) for x in delta_entropy_values)
scaled = total_shift / PHI2
return 1.0 / (1.0 + math.exp(-scaled))
def realized_vol(returns):
if len(returns) < 2:
return 0.0
daily_vol = np.std(returns)
return daily_vol * math.sqrt(252)
def rolling_correlation(x, y):
if len(x) < 2 or len(y) < 2:
return 0.0
if len(x) != len(y):
n = min(len(x), len(y))
x = list(x)[-n:]
y = list(y)[-n:]
return float(np.corrcoef(x, y)[0,1]) if len(x) > 1 else 0.0
# =============================
# Risk Management Model
# =============================
class SnapfrontRiskModel(RiskManagementModel):
def __init__(self, algorithm):
super().__init__()
self.algorithm = algorithm
def ManageRisk(self, algorithm, targets):
targets_to_liquidate = []
current_value = algorithm.Portfolio.TotalPortfolioValue
dd = 1 - (current_value / self.algorithm.starting_equity)
self.algorithm.drawdown_pct = dd
if dd > GLOBAL_DRAWDOWN_LIMITS[2]:
algorithm.Debug(f"Drawdown {dd:.2%} exceeds {GLOBAL_DRAWDOWN_LIMITS[2]:.0%}, liquidating all at {algorithm.Time.date()}")
targets_to_liquidate += [
PortfolioTarget(symbol, 0) for symbol, holding in algorithm.Portfolio.items() if holding.Invested
]
self.algorithm.last_global_liquidation = algorithm.Time
return targets_to_liquidate
return targets_to_liquidate
# =============================
# Alpha Model
# =============================
class SnapfrontAlphaModel(AlphaModel):
def __init__(self, lookback=60, atrs=None, parent=None):
super().__init__()
self.lookback = lookback
self.windows = {}
self.entropies = defaultdict(lambda: deque(maxlen=lookback))
self.last_distributions = {}
self.last_stability = defaultdict(float)
self.metrics = defaultdict(dict)
self.last_signal_time = defaultdict(lambda: None)
self.atrs = atrs if atrs is not None else {}
self.parent = parent
self.portfolio_returns = deque(maxlen=VOL_LOOKBACK)
self.symbol_returns = defaultdict(lambda: deque(maxlen=VOL_LOOKBACK))
def Update(self, algorithm, data) -> list[Insight]:
insights = []
if self.parent and self.parent.last_global_liquidation:
days_since_liq = (algorithm.Time - self.parent.last_global_liquidation).days
if days_since_liq < MIN_COOLDOWN_DAYS:
return insights
if days_since_liq < GLOBAL_COOLDOWN_DAYS:
portfolio_wct = np.mean([m.get("wct_phi", 0.5) for m in self.metrics.values()]) if self.metrics else 0.5
if portfolio_wct < 0.70:
return insights
try:
for symbol, window in self.windows.items():
if symbol not in data:
continue
bar = data[symbol]
if bar is None or not hasattr(bar, "Close"):
continue
window.Add(bar.Close)
if window.Count < self.lookback:
continue
closes = [window[i] for i in range(window.Count)]
rets = [math.log(closes[i] / closes[i - 1]) for i in range(1, len(closes)) if closes[i - 1] > 0]
if not rets:
continue
self.symbol_returns[symbol].extend(rets)
if len(rets) > 0:
self.portfolio_returns.append(np.mean(rets))
min_r, max_r = min(rets), max(rets)
if max_r == min_r:
continue
bin_size = (max_r - min_r) / HIST_BINS
hist = [0] * HIST_BINS
for r in rets:
idx = int((r - min_r) / bin_size) if bin_size > 0 else 0
idx = min(HIST_BINS - 1, max(0, idx))
hist[idx] += 1
total = float(sum(hist))
p = [h / total for h in hist] if total > 0 else [1.0 / HIST_BINS] * HIST_BINS
hs = shannon_entropy(p)
ht = tsallis_entropy(p, q=2.0)
delta_h = hs - (self.entropies[symbol][-1] if self.entropies[symbol] else 0.0)
self.entropies[symbol].append(hs)
drift = kl_divergence(p, self.last_distributions[symbol]) if symbol in self.last_distributions else 0.0
self.last_distributions[symbol] = p
stability = regime_stability([delta_h])
resonance = spectral_resonance(rets)
wct = wct_phi([delta_h])
raw_score = 2.0 * drift + 1.5 * resonance + 0.8 * stability - 0.3 * ht
score = sigmoid(raw_score)
sym_vol = realized_vol(self.symbol_returns[symbol])
spy_rets = self.symbol_returns.get(self.parent.spy_symbol, deque())
corr = rolling_correlation(self.symbol_returns[symbol], spy_rets)
self.metrics[symbol] = {
"stability": stability,
"drift": drift,
"resonance": resonance,
"entropy_shannon": hs,
"entropy_tsallis": ht,
"score": score,
"wct_phi": wct,
"realized_vol": sym_vol,
"corr_spy": corr
}
atr_val = self.atrs[symbol].Current.Value if symbol in self.atrs else 0
vol_ok = atr_val > 0 and (atr_val / bar.Close) < 0.025
emit = False
last_signal = self.last_signal_time[symbol]
if wct >= 0.70:
horizon_days = 12
elif wct >= 0.50:
horizon_days = 6
else:
horizon_days = 2
if vol_ok and stability > STABILITY_THRESHOLD and stability >= self.last_stability[symbol] - STABILITY_MARGIN:
if last_signal is None or (algorithm.Time - last_signal).days >= FALLBACK_COOLDOWN:
if score > UP_THRESHOLD:
insights.append(Insight.Price(symbol, timedelta(days=horizon_days),
InsightDirection.Up, magnitude=score,
weight=score * stability))
emit = True
elif score < DOWN_THRESHOLD:
insights.append(Insight.Price(symbol, timedelta(days=horizon_days),
InsightDirection.Down, magnitude=1 - score,
weight=score * stability))
emit = True
if not emit and wct >= 0.50 and (last_signal is None or (algorithm.Time - last_signal).days >= 40):
direction = InsightDirection.Up if score >= 0.5 else InsightDirection.Down
insights.append(Insight.Price(symbol, timedelta(days=horizon_days),
direction, magnitude=abs(score - 0.5) + 0.5,
weight=score * stability))
emit = True
if emit:
self.last_signal_time[symbol] = algorithm.Time
self.last_stability[symbol] = stability
except Exception as e:
algorithm.Debug(f"update crashed: {e}")
return insights
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
if security.Symbol not in self.windows:
self.windows[security.Symbol] = RollingWindow[float](self.lookback)
history = algorithm.History(security.Symbol, self.lookback, Resolution.Daily)
for bar in history.itertuples():
self.windows[security.Symbol].Add(bar.close)
# =============================
# Portfolio Construction
# =============================
class CoherenceWeightedPortfolio(PortfolioConstructionModel):
def __init__(self, alpha_model, parent):
super().__init__()
self.alpha_model = alpha_model
self.parent = parent
self.day_counter = 0
def CreateTargets(self, algorithm, insights):
if len(insights) == 0:
return []
self.day_counter += 1
weights = {}
total_weight = 0.0
for ins in insights:
if ins.Weight is None or ins.Weight <= 0:
continue
symbol = ins.Symbol
m = self.alpha_model.metrics.get(symbol, {})
sym_vol = m.get("realized_vol", 0.2)
corr = m.get("corr_spy", 0.0)
if corr > CORR_SKIP:
continue
elif corr > CORR_TAPER:
adj_weight = (ins.Weight / max(sym_vol, 1e-3)) * 0.25
else:
adj_weight = ins.Weight / max(sym_vol, 1e-3)
weights[symbol] = adj_weight
total_weight += adj_weight
if total_weight == 0:
return []
port_vol = realized_vol(list(self.alpha_model.portfolio_returns))
if self.day_counter < VOL_WARMUP:
scale = 0.5
else:
scale = TARGET_VOL / max(port_vol, 1e-6) if port_vol > 0 else 1.0
scale = min(1.0, max(0.25, scale))
dd = getattr(self.parent, "drawdown_pct", 0.0)
if dd > GLOBAL_DRAWDOWN_LIMITS[2]:
return []
elif dd > GLOBAL_DRAWDOWN_LIMITS[1]:
dd_scale = 0.25
elif dd > GLOBAL_DRAWDOWN_LIMITS[0]:
dd_scale = 0.5
else:
dd_scale = 1.0
scale *= dd_scale
targets = []
for symbol, w in weights.items():
norm_weight = (w / total_weight) * scale
targets.append(PortfolioTarget.Percent(algorithm, symbol, norm_weight))
total_exposure = sum(abs(h.HoldingsValue) for h in algorithm.Portfolio.Values)
equity = algorithm.Portfolio.TotalPortfolioValue
eff_lev = total_exposure / equity if equity > 0 else 0
self.parent.effective_leverage = eff_lev
if eff_lev > MAX_EFFECTIVE_LEVERAGE:
for t in targets:
t.Quantity *= 0.5
return targets
# =============================
# Main Algorithm
# =============================
class SnapfrontAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2010, 1, 1)
self.SetEndDate(2025, 1, 1)
self.SetCash(1000000)
self.starting_equity = self.Portfolio.TotalPortfolioValue
self.last_global_liquidation = None
self.drawdown_pct = 0.0
self.effective_leverage = 0.0
self.log_dir = "results"
os.makedirs(self.log_dir, exist_ok=True)
self.json_log_path = os.path.join(self.log_dir, "metrics_log.jsonl")
self.csv_log_path = os.path.join(self.log_dir, "metrics_log.csv")
with open(self.csv_log_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"time","symbol","stability","drift","resonance",
"entropy_shannon","entropy_tsallis","score","wct_phi",
"realized_vol","corr_spy","stop_loss_threshold","take_profit_threshold",
"holding_period_days","pnl",
"portfolio_vol_estimate","drawdown_pct","effective_leverage"
])
self.atrs = {}
for ticker in ["SPY", "QQQ", "IWM", "GLD"]:
security = self.AddEquity(ticker, Resolution.Daily)
symbol = security.Symbol
self.atrs[symbol] = self.ATR(symbol, 14, MovingAverageType.Simple, Resolution.Daily)
self.spy_symbol = self.Symbol("SPY")
alpha = SnapfrontAlphaModel(lookback=60, atrs=self.atrs, parent=self)
self.alpha_model = alpha
self.AddAlpha(alpha)
self.SetPortfolioConstruction(CoherenceWeightedPortfolio(alpha, self))
self.SetExecution(ImmediateExecutionModel())
self.SetRiskManagement(SnapfrontRiskModel(self))
def OnOrderEvent(self, order_event: OrderEvent):
if order_event.Status == OrderStatus.Filled:
fill = {
"time": str(self.Time),
"symbol": str(order_event.Symbol),
"direction": str(order_event.Direction),
"quantity": order_event.FillQuantity,
"fill_price": order_event.FillPrice,
"order_id": order_event.OrderId
}
self.Debug(json.dumps(fill))
with open(self.json_log_path, "a") as f:
f.write(json.dumps({"type":"order", **fill}) + "\n")
def OnEndOfDay(self, symbol: Symbol):
if symbol in self.alpha_model.metrics:
m = self.alpha_model.metrics[symbol]
holding = self.Portfolio[symbol]
pnl = (holding.Price - holding.AveragePrice)/holding.AveragePrice if holding.Invested else 0.0
stop_threshold = BASE_STOP_PCT / (m["wct_phi"] + 0.25)
if m["wct_phi"] >= 0.70:
take_threshold = 3.5 * stop_threshold
elif m["wct_phi"] >= 0.50:
take_threshold = BASE_TAKE_PCT * (m["wct_phi"] + 0.6)
else:
take_threshold = BASE_TAKE_PCT * 1.8
port_vol = realized_vol(list(self.alpha_model.portfolio_returns))
summary = {
"time": str(self.Time),
"symbol": str(symbol),
"stability": float(m["stability"]),
"drift": float(m["drift"]),
"resonance": float(m["resonance"]),
"entropy_shannon": float(m["entropy_shannon"]),
"entropy_tsallis": float(m["entropy_tsallis"]),
"score": float(m["score"]),
"wct_phi": float(m["wct_phi"]),
"realized_vol": float(m["realized_vol"]),
"corr_spy": float(m["corr_spy"]),
"stop_loss_threshold": float(stop_threshold),
"take_profit_threshold": float(take_threshold),
"holding_period_days": (12 if m["wct_phi"] >= 0.70 else 6 if m["wct_phi"] >= 0.50 else 2),
"pnl": float(pnl),
"portfolio_vol_estimate": float(port_vol),
"drawdown_pct": float(self.drawdown_pct),
"effective_leverage": float(self.effective_leverage)
}
self.Debug(json.dumps(summary))
with open(self.json_log_path, "a") as f:
f.write(json.dumps({"type":"metrics", **summary}) + "\n")
with open(self.csv_log_path, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(summary.values())