| Overall Statistics |
|
Total Orders 5297 Average Win 0.19% Average Loss -0.24% Compounding Annual Return 55.098% Drawdown 22.600% Expectancy 0.325 Start Equity 100000 End Equity 898584.45 Net Profit 798.584% Sharpe Ratio 1.736 Sortino Ratio 2.229 Probabilistic Sharpe Ratio 96.809% Loss Rate 26% Win Rate 74% Profit-Loss Ratio 0.79 Alpha 0.287 Beta 0.819 Annual Standard Deviation 0.195 Annual Variance 0.038 Information Ratio 1.728 Tracking Error 0.159 Treynor Ratio 0.414 Total Fees $8065.50 Estimated Strategy Capacity $0 Lowest Capacity Asset GOOCV VP83T1ZUHROL Portfolio Turnover 13.86% Drawdown Recovery 315 |
from AlgorithmImports import *
from datetime import timedelta
from strategy_a import StrategyA
from strategy_b import StrategyB
from utils import record_vars
# Developed by Jo Walz | https://github.com/jowalz
# Quantitative Research & AI Systems Lab
# Experimental ML-driven systematic trading research
# Adaptive alpha models | Portfolio optimization | Regime-aware systems
#example of a multi-algo strategy combining Strategy A and Strategy B with dynamic allocation
class VectorHarvestAI(QCAlgorithm):
def Initialize(self):
# Set start date
# self.set_start_date(2026, 1, 1)
self.set_start_date(self.end_date - timedelta(5*365))
self.initial_cash = 100000
self.set_cash(self.initial_cash)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.settings.minimum_order_margin_portfolio_percentage = 0
# Warm up historical data before placing initial trades
self.set_warmup(timedelta(days=5))
# Add benchmark
self.benchmark_symbol = "SPY"
self.spy_prices = []
equity = self.add_equity(self.benchmark_symbol, Resolution.DAILY)
self.set_benchmark(equity.symbol)
# Target strategy allocations
self.strategy_a_allocation = 0.0
self.strategy_b_allocation = 1.0
# Multi-algo strategy setup
self.StrategyA = StrategyA(self, allocation=self.strategy_a_allocation)
self.StrategyA.initialize()
self.StrategyB = StrategyB(self, allocation=self.strategy_b_allocation)
self.StrategyB.initialize()
self.set_benchmark(self.benchmark_symbol)
def OnData(self, data):
if self.is_warming_up:
return
# Ensure strategies place initial orders only after warmup
try:
if hasattr(self.StrategyA, 'place_initial_orders'):
self.StrategyA.place_initial_orders()
except Exception:
pass
try:
if hasattr(self.StrategyB, 'place_initial_orders'):
self.StrategyB.place_initial_orders()
except Exception:
pass
record_vars(self)
# Universe changes
def OnSecuritiesChanged(self, changes):
self.StrategyA.on_securities_changed(changes)
self.StrategyB.on_securities_changed(changes)from AlgorithmImports import *
class StrategyA:
"""Simple buy-and-hold Strategy A implementation."""
def __init__(self, algo: QCAlgorithm, allocation=1.0):
self.algo = algo
self.allocation = float(allocation)
def initialize(self):
a = self.algo
# Simple buy-and-hold strategy for AAPL.
a._aapl = a.add_equity("AAPL", Resolution.DAILY).Symbol
a.set_benchmark(a._aapl)
# Defer allocation until after warmup / price data is available
self._pending_allocation = True
def on_securities_changed(self, changes):
# When securities are added, try placing pending initial orders.
# SecurityChanges attribute names vary between implementations (Added/added/etc.).
added = None
for attr in ('Added', 'added', 'AddedSecurities', 'added_securities', 'added_symbols', 'AddedSymbols'):
added = getattr(changes, attr, None)
if added:
break
if added and self._pending_allocation:
try:
self.place_initial_orders()
except Exception:
pass
def place_initial_orders(self):
a = self.algo
if not getattr(a, '_aapl', None):
return
if a.is_warming_up:
return
sym = a._aapl
if not a.securities.contains_key(sym):
return
sec = a.securities[sym]
if not getattr(sec, 'has_data', False):
return
# Avoid duplicate orders
try:
if a.portfolio[sym].invested:
self._pending_allocation = False
return
except Exception:
pass
a.set_holdings(sym, self.allocation)
self._pending_allocation = False
from AlgorithmImports import *
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from strategy_b_submodel.execution import ExecutionManager
from strategy_b_submodel.portfolio import PortfolioManager
from strategy_b_submodel.universe import StrategyUniverseManager
from strategy_b_submodel.alpha import MarketRegimeSignal, SecurityScorer
#STRATEGY B: Regime-aware multi-factor strategy with dynamic allocation and risk management
class StrategyB:
def __init__(self, algo: QCAlgorithm, allocation=1.0):
self.algo = algo
self.allocation = float(allocation)
def initialize(self):
a = self.algo
# Core assets
a._spy = a.add_equity("SPY", Resolution.DAILY).Symbol
a.set_benchmark(a._spy)
# Hedge: NEM as gold proxy for EU trading
a._gld = a.add_equity("NEM", Resolution.DAILY).Symbol
# Market regime: VIX
a._vix = a.add_data(CBOE, "VIX", Resolution.DAILY).Symbol
# Module managers attached to this strategy instance
self.execution_manager = ExecutionManager(a)
self.portfolio_mgr = PortfolioManager(a)
self.universe_mgr = StrategyUniverseManager(a)
self.regime_signal = MarketRegimeSignal(a)
self.scorer = SecurityScorer(a)
# Configure universe manager
self.universe_mgr.set_coarse_count(int(a.get_parameter("coarse_count") or 2000))
self.universe_mgr.set_max_universe(int(a.get_parameter("max_universe") or 150))
self.universe_mgr.set_min_ipo_days(int(a.get_parameter("min_ipo_days") or 365))
# Configure portfolio manager
self.portfolio_mgr.set_long_gross(float(a.get_parameter("long_gross") or 0.9))
self.portfolio_mgr.set_short_gross(float(a.get_parameter("short_gross") or 0.6))
self.portfolio_mgr.set_ml_tilt(float(a.get_parameter("ml_tilt") or 0.25))
self.portfolio_mgr.set_top_weight_limits(
float(a.get_parameter("top_weight_min") or 0.0),
float(a.get_parameter("top_weight_max") or 0.35)
)
# Configure scorer
self.scorer.set_lookback_bars(int(a.get_parameter("lookback_bars") or 260))
self.scorer.set_sma_length(int(a.get_parameter("sma_len") or 195))
self.scorer.set_extension_k(float(a.get_parameter("ext_k") or 2.0))
self.scorer.set_momentum_k(float(a.get_parameter("mom_k") or 1.75))
self.scorer.set_score_threshold(float(a.get_parameter("score_threshold") or 0.85))
# Risk parameters
self._stop_atr = float(a.get_parameter("stop_atr") or 2.0)
self._long_trail_1 = float(a.get_parameter("long_trail_1") or 0.095)
self._long_trail_2 = float(a.get_parameter("long_trail_2") or 0.07)
self._long_trail_3 = float(a.get_parameter("long_trail_3") or 0.0485)
self._top_n = int(a.get_parameter("top_n") or 1)
# Universe settings and registration
a.universe_settings.resolution = Resolution.DAILY
a.add_universe(self.CoarseSelection, self.FineSelection)
# Schedule rebalancing and risk checks (using "SPY" string as in original)
a.schedule.on(
a.date_rules.every_day("SPY"),
a.time_rules.after_market_open("SPY", 30),
self.CheckSignal_Long
)
a.schedule.on(
a.date_rules.month_start("SPY"),
a.time_rules.after_market_open("SPY", 60),
self.TrainModel
)
a.schedule.on(
a.date_rules.every(DayOfWeek.MONDAY),
a.time_rules.after_market_open(a._spy, 30),
self.Rebalance_Short
)
a.schedule.on(
a.date_rules.every_day(a._spy),
a.time_rules.after_market_open(a._spy, 160),
self.RiskCheck_Short
)
a.schedule.on(
a.date_rules.every_day(a._spy),
a.time_rules.after_market_open(a._spy, 90),
self.RiskCheck_Long
)
a.set_warmup(252)
# Universe handlers
def CoarseSelection(self, coarse):
return self.universe_mgr.coarse_selection(coarse)
def FineSelection(self, fine):
return self.universe_mgr.fine_selection(fine, self.algo.time.month, self.portfolio_mgr)
def on_securities_changed(self, changes):
if hasattr(self.universe_mgr, 'on_securities_changed'):
try:
self.universe_mgr.on_securities_changed(changes)
except Exception:
pass
# Model training
def TrainModel(self):
self.regime_signal.train(self.algo._vix, self.algo._spy)
# Daily long signal evaluation
def CheckSignal_Long(self):
a = self.algo
if a.is_warming_up:
return
# Exit longs not in top set
self.execution_manager.liquidate_non_top_longs(
self.portfolio_mgr.get_top_set(),
a._spy,
a._gld
)
# Get market data
vix_hist = a.history([a._vix], 100, Resolution.DAILY)
spy_hist = a.history([a._spy], 200, Resolution.DAILY)
if vix_hist.empty or spy_hist.empty:
return
if "close" not in vix_hist.columns or "close" not in spy_hist.columns:
return
# Extract VIX closes
if isinstance(vix_hist.index, pd.MultiIndex):
if a._vix not in vix_hist.index.get_level_values(0):
return
vix_closes = vix_hist.xs(a._vix)["close"].values
else:
if a._vix not in vix_hist.index:
return
vix_closes = vix_hist.loc[a._vix]["close"].values
# Extract SPY closes
if isinstance(spy_hist.index, pd.MultiIndex):
if a._spy not in spy_hist.index.get_level_values(0):
return
spy_closes = spy_hist.xs(a._spy)["close"].values
else:
if a._spy not in spy_hist.index:
return
spy_closes = spy_hist.loc[a._spy]["close"].values
if len(vix_closes) < 50 or len(spy_closes) < 200:
return
# Calculate regime signals
current_vix = float(vix_closes[-1])
vix_sma = float(np.mean(vix_closes[-20:]))
vix_p80 = float(np.percentile(vix_closes, 80))
spy_current = float(spy_closes[-1])
spy_sma50 = float(np.mean(spy_closes[-50:]))
spy_sma200 = float(np.mean(spy_closes[-200:]))
spy_5d_ret = float(spy_closes[-1] / spy_closes[-5] - 1)
# Get ML signal
ml_bullish, _ = self.regime_signal.get_signal(vix_closes, spy_closes)
LG = self.portfolio_mgr.get_long_gross() * self.allocation
# Regime-based allocation
if current_vix > vix_p80 and spy_5d_ret < -0.03:
weight = 1.0 if ml_bullish else 0.85
eq_w = LG * weight
self.portfolio_mgr.allocate_top_positions(
eq_w,
list(self.portfolio_mgr.get_top_set()),
ml_bullish=ml_bullish,
execution_mgr=self.execution_manager
)
self.execution_manager.safe_set_holdings(a._gld, LG * (1.0 - weight))
return
if current_vix < 13 and spy_current > spy_sma50 * 1.05:
self.portfolio_mgr.allocate_top_positions(
LG * 0.40,
list(self.portfolio_mgr.get_top_set()),
ml_bullish=ml_bullish,
execution_mgr=self.execution_manager
)
self.execution_manager.safe_set_holdings(a._gld, LG * 0.40)
return
if 20 < current_vix < vix_sma:
weight = 0.85 if ml_bullish else 0.70
eq_w = LG * weight
self.portfolio_mgr.allocate_top_positions(
eq_w,
list(self.portfolio_mgr.get_top_set()),
ml_bullish=ml_bullish,
execution_mgr=self.execution_manager
)
self.execution_manager.safe_set_holdings(a._gld, LG * (1.0 - weight))
return
if current_vix > vix_sma * 1.2:
self.portfolio_mgr.allocate_top_positions(
0.0,
list(self.portfolio_mgr.get_top_set()),
ml_bullish=ml_bullish,
execution_mgr=self.execution_manager
)
self.execution_manager.safe_set_holdings(a._gld, LG * 0.50)
return
if spy_current > spy_sma200:
base = 0.90 if ml_bullish else 0.70
self.portfolio_mgr.allocate_top_positions(
LG * base,
list(self.portfolio_mgr.get_top_set()),
ml_bullish=ml_bullish,
execution_mgr=self.execution_manager
)
self.execution_manager.safe_set_holdings(a._gld, LG * (1.0 - base))
else:
self.portfolio_mgr.allocate_top_positions(
LG * 0.30,
list(self.portfolio_mgr.get_top_set()),
ml_bullish=ml_bullish,
execution_mgr=self.execution_manager
)
self.execution_manager.safe_set_holdings(a._gld, LG * 0.50)
# Daily long risk management
def RiskCheck_Long(self):
self.execution_manager.check_long_trailing_stops(
self._long_trail_1,
self._long_trail_2,
self._long_trail_3
)
# Weekly short rebalancing
def Rebalance_Short(self):
a = self.algo
if a.is_warming_up:
return
active = self.portfolio_mgr.get_active_universe()
if not active:
return
scored = []
for sym in active:
if sym == a._spy:
continue
if not self.universe_mgr.validate_symbol(sym, a):
continue
meets_criteria, score_info = self.scorer.meets_criteria(sym)
if not meets_criteria or score_info is None:
continue
score, close_now, atr20 = score_info
scored.append((score, sym, close_now, atr20))
scored.sort(reverse=True, key=lambda x: x[0])
picked = scored[:self._top_n]
selected = [sym for _, sym, _, _ in picked]
# Exit unselected shorts
self.execution_manager.liquidate_non_selected_shorts(selected, a._spy, a._gld)
# Enter selected shorts
if selected:
w = -abs(self.portfolio_mgr.get_short_gross() * self.allocation) / float(len(selected))
for _, sym, close_now, atr20 in picked:
self.execution_manager.safe_set_holdings(sym, w)
self.execution_manager.record_short_entry(sym, close_now, atr20)
# Daily short risk check
def RiskCheck_Short(self):
self.execution_manager.check_short_atr_stops(self._stop_atr)
class CBOE(PythonData):
def GetSource(self, config, date, isLive):
return SubscriptionDataSource(
"https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX_History.csv",
SubscriptionTransportMedium.REMOTE_FILE
)
def Reader(self, config, line, date, isLive):
if not (line.strip() and line[0].isdigit()):
return None
data = line.split(',')
if len(data) < 5:
return None
date_parts = data[0].split('/')
if len(date_parts) != 3:
return None
month_str, day_str, year_str = date_parts
if not (month_str.isdigit() and day_str.isdigit() and year_str.isdigit()):
return None
month = int(month_str)
day = int(day_str)
year = int(year_str)
if month < 1 or month > 12 or year < 1900 or year > 2100:
return None
is_leap = (year % 4 == 0 and (year % 100 != 0 or year % 400 == 0))
mdays = [31, 29 if is_leap else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
if day < 1 or day > mdays[month - 1]:
return None
def is_float_text(value):
if not value:
return False
if value[0] in '+-':
value = value[1:]
if value.count('.') > 1:
return False
return value.replace('.', '', 1).isdigit()
if not all(is_float_text(x) for x in data[1:5]):
return None
bar = type(self)()
bar.Symbol = config.Symbol
bar.Time = datetime(year, month, day)
bar.Value = float(data[4])
bar["close"] = float(data[4])
bar["open"] = float(data[1])
bar["high"] = float(data[2])
bar["low"] = float(data[3])
return bar
from .alpha import MarketRegimeSignal, SecurityScorer
from .execution import ExecutionManager
from .portfolio import PortfolioManager
from .universe import StrategyUniverseManager
__all__ = [
"MarketRegimeSignal",
"SecurityScorer",
"ExecutionManager",
"PortfolioManager",
"StrategyUniverseManager",
]
"""
Public QC Strategy Framework - Alpha Module
Handles machine learning models, feature generation, and trading signals.
Generates both market regime signals and individual security scores.
"""
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from AlgorithmImports import *
class MarketRegimeSignal:
"""Generates market regime signals for allocation decisions."""
def __init__(self, algorithm):
"""
Initialize market regime signal generator.
Args:
algorithm: Reference to the QCAlgorithm instance
"""
self.algorithm = algorithm
self.model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
self.scaler = StandardScaler()
self.trained = False
self.min_training_samples = 504
def get_features(self, vix_closes, spy_closes):
"""
Generate ML features from VIX and SPY history.
Creates 11 features covering:
- VIX levels, trend, and percentile
- SPY momentum and moving averages
- Volatility metrics
Args:
vix_closes: Array of VIX close prices
spy_closes: Array of SPY close prices
Returns:
Feature array, or None if insufficient data
"""
if len(vix_closes) < 50 or len(spy_closes) < 200:
return None
# VIX features
current_vix = vix_closes[-1]
vix_sma20 = np.mean(vix_closes[-20:])
vix_sma50 = np.mean(vix_closes[-50:])
vix_std = np.std(vix_closes[-20:])
vix_zscore = (current_vix - vix_sma20) / vix_std if vix_std > 0 else 0.0
vix_percentile = float(np.sum(vix_closes < current_vix)) / float(len(vix_closes))
# SPY features
spy_current = spy_closes[-1]
spy_sma50 = np.mean(spy_closes[-50:])
spy_sma200 = np.mean(spy_closes[-200:])
spy_5d_ret = spy_closes[-1] / spy_closes[-5] - 1
spy_10d_ret = spy_closes[-1] / spy_closes[-10] - 1
spy_20d_ret = spy_closes[-1] / spy_closes[-20] - 1
spy_vol = np.std(np.diff(spy_closes[-21:]) / spy_closes[-21:-1])
return [
float(current_vix),
float(vix_zscore),
float(vix_percentile),
float(current_vix / vix_sma20) if vix_sma20 != 0 else 1.0,
float(current_vix / vix_sma50) if vix_sma50 != 0 else 1.0,
float(spy_5d_ret),
float(spy_10d_ret),
float(spy_20d_ret),
float(spy_current / spy_sma50) if spy_sma50 != 0 else 1.0,
float(spy_current / spy_sma200) if spy_sma200 != 0 else 1.0,
float(spy_vol * np.sqrt(252)),
]
def train(self, vix_symbol, spy_symbol):
"""
Train the monthly ML model.
Creates training labels based on forward 21-day SPY returns (>2% = bullish).
Uses 800 days of history with 200-bar minimum lookback.
Args:
vix_symbol: VIX symbol for history
spy_symbol: SPY symbol for history
Returns:
Boolean indicating success
"""
if self.algorithm.is_warming_up:
return False
# Get history
vix_hist = self.algorithm.history([vix_symbol], 800, Resolution.DAILY)
spy_hist = self.algorithm.history([spy_symbol], 800, Resolution.DAILY)
if vix_hist.empty or spy_hist.empty:
return False
if "close" not in vix_hist.columns or "close" not in spy_hist.columns:
return False
# Extract time series
if isinstance(vix_hist.index, pd.MultiIndex):
if vix_symbol not in vix_hist.index.get_level_values(0):
return False
vix_closes = vix_hist.xs(vix_symbol)["close"].values
else:
if vix_symbol not in vix_hist.index:
return False
vix_closes = vix_hist.loc[vix_symbol]["close"].values
if isinstance(spy_hist.index, pd.MultiIndex):
if spy_symbol not in spy_hist.index.get_level_values(0):
return False
spy_closes = spy_hist.xs(spy_symbol)["close"].values
else:
if spy_symbol not in spy_hist.index:
return False
spy_closes = spy_hist.loc[spy_symbol]["close"].values
if len(vix_closes) < self.min_training_samples or len(spy_closes) < self.min_training_samples:
return False
# Create training data
X, y = [], []
for i in range(200, len(spy_closes) - 21):
feats = self.get_features(vix_closes[:i], spy_closes[:i])
if feats is None:
continue
label = 1 if spy_closes[i + 21] / spy_closes[i] > 0.02 else 0
X.append(feats)
y.append(label)
if len(X) < 100:
return False
# Train model
X = np.array(X)
y = np.array(y)
self.scaler.fit(X)
Xs = self.scaler.transform(X)
self.model.fit(Xs, y)
self.trained = True
return True
def get_signal(self, vix_closes, spy_closes):
"""
Generate market regime signal.
Args:
vix_closes: Array of VIX close prices
spy_closes: Array of SPY close prices
Returns:
Tuple of (is_bullish, confidence)
- is_bullish: Boolean, True if model predicts positive return
- confidence: Float 0-1, model's confidence in prediction
"""
if not self.trained:
return False, 0.5
feats = self.get_features(vix_closes, spy_closes)
if feats is None:
return False, 0.5
X = self.scaler.transform([feats])
prob_array = self.model.predict_proba(X)[0]
if len(prob_array) == 2:
prob = float(prob_array[1])
else:
prob = 0.7 if self.model.predict(X)[0] == 1 else 0.5
return prob > 0.6, prob
def is_trained(self):
"""Check if model has been trained."""
return self.trained
def set_min_training_samples(self, count):
"""Set minimum samples required for training."""
self.min_training_samples = int(count)
class SecurityScorer:
"""Generates trading signals for individual securities."""
def __init__(self, algorithm):
"""
Initialize security scorer.
Args:
algorithm: Reference to the QCAlgorithm instance
"""
self.algorithm = algorithm
self.lookback_bars = 260
self.n_list = [10, 10, 40, 60, 90, 100]
self.sma_len = 195
self.ext_k = 2.0
self.mom_k = 1.75
self.score_threshold = 0.85
def _atr(self, df, n):
"""
Calculate Average True Range.
Args:
df: DataFrame with OHLC data
n: Period for ATR calculation
Returns:
ATR value, or None if insufficient data
"""
w = df.shape[0]
if w < n + 1:
return None
s = 0.0
for i in range(1, n + 1):
hi = float(df["high"].iloc[w - i])
lo = float(df["low"].iloc[w - i])
cl = float(df["close"].iloc[w - i])
s += max(hi - lo, abs(hi - cl), abs(lo - cl))
return s / float(n)
def _hurst_like(self, df, n, bump):
"""
Compute a Hurst-like exponent for trend/mean-reversion characteristics.
Args:
df: DataFrame with OHLC data
n: Period for calculation
bump: Adjustment factor based on period
Returns:
Hurst exponent, or None if calculation fails
"""
atr = self._atr(df, n)
if atr is None or atr <= 0:
return None
high_max = float(df["high"].tail(n).max())
low_min = float(df["low"].tail(n).min())
span = high_max - low_min
if span <= 0:
return None
h = (np.log(span) - np.log(atr)) / np.log(float(n))
if h > 0.45:
h += bump
elif h < 0.45:
h -= bump
return float(h)
def compute_score(self, symbol):
"""
Compute comprehensive trading score for a security.
Evaluates trend strength, extension, and momentum filters.
Args:
symbol: Symbol to score
Returns:
Tuple of (score, is_extended, has_momentum, close, atr), or None if fails
"""
df = self.algorithm.history(symbol, self.lookback_bars, Resolution.DAILY)
if df is None or df.empty:
return None
if isinstance(df.index, pd.MultiIndex):
df = df.xs(symbol)
if len(df) < max(self.n_list) + 6:
return None
# Compute Hurst exponents for multiple periods
hvals = []
for n in self.n_list:
hv = self._hurst_like(df, n, 0.01 + 0.0002 * n)
if hv is not None:
hvals.append(hv)
if len(hvals) < 4:
return None
havg = float(sum(hvals) / float(len(hvals)))
agree = int(sum(1 for x in hvals if x > 0.6))
close_now = float(df["close"].iloc[-1])
sma = float(df["close"].tail(self.sma_len).mean())
atr20 = self._atr(df, 20)
if atr20 is None or atr20 <= 0:
return None
close_5 = float(df["close"].iloc[-5])
ext_ok = (close_now - sma) > self.ext_k * atr20
mom_ok = (close_now - close_5) > self.mom_k * atr20
score = havg + 0.02 * max(0, agree - 3)
return float(score), bool(ext_ok), bool(mom_ok), close_now, float(atr20)
def meets_criteria(self, symbol):
"""
Check if security meets all trading criteria.
Args:
symbol: Symbol to evaluate
Returns:
Tuple of (meets_criteria, score_info), or (False, None) if fails
"""
result = self.compute_score(symbol)
if result is None:
return False, None
score, ext_ok, mom_ok, close_now, atr20 = result
if score >= self.score_threshold and ext_ok and mom_ok:
return True, (score, close_now, atr20)
return False, None
def set_lookback_bars(self, bars):
"""Set lookback period for score calculation."""
self.lookback_bars = int(bars)
def set_sma_length(self, length):
"""Set SMA length for extension calculation."""
self.sma_len = int(length)
def set_extension_k(self, k):
"""Set extension threshold multiplier."""
self.ext_k = float(k)
def set_momentum_k(self, k):
"""Set momentum threshold multiplier."""
self.mom_k = float(k)
def set_score_threshold(self, threshold):
"""Set minimum score threshold for trading."""
self.score_threshold = float(threshold)
"""
Public QC Strategy Framework - Execution Module
Handles order placement, position sizing, and risk management.
Provides safe order execution respecting portfolio constraints.
"""
import numpy as np
from AlgorithmImports import *
class ExecutionManager:
"""Manages order execution and risk management for the strategy."""
def __init__(self, algorithm):
"""
Initialize the execution manager.
Args:
algorithm: Reference to the QCAlgorithm instance
"""
self.algorithm = algorithm
self.long_trail = {}
self.entry = {}
def safe_set_holdings(self, symbol, target_weight):
"""
Place an order with portfolio margin constraints.
Respects available margin and clips target weight to prevent margin calls.
Args:
symbol: The symbol to trade
target_weight: Desired portfolio weight (-1 to 1)
"""
pv = float(self.algorithm.portfolio.total_portfolio_value)
if pv <= 0:
return
mr = float(self.algorithm.portfolio.margin_remaining)
max_abs = max(0.0, mr / pv)
w = float(np.clip(float(target_weight), -max_abs, max_abs))
self.algorithm.set_holdings(symbol, w)
def cap_and_renormalize(self, weights, total, wmin, wmax):
"""
Cap individual weights and renormalize to target allocation.
Ensures each weight is within [wmin, wmax] and sum equals target total
through iterative adjustment.
Args:
weights: Array of portfolio weights
total: Target total weight (usually <= 1.0)
wmin: Minimum weight per position (0 if no constraint)
wmax: Maximum weight per position (0 if no constraint)
Returns:
np.array: Adjusted weights
"""
w = np.array(weights, dtype=float)
n = len(w)
if n == 0:
return w
if wmin > 0:
w = np.maximum(w, wmin)
if wmax > 0:
w = np.minimum(w, wmax)
target = float(total)
for _ in range(10):
s = float(np.sum(w))
diff = target - s
if abs(diff) < 1e-8:
break
if diff > 0:
adjustable = [i for i in range(n) if (wmax <= 0 or w[i] < wmax - 1e-12)]
else:
adjustable = [i for i in range(n) if (wmin <= 0 or w[i] > wmin + 1e-12)]
if not adjustable:
break
incr = diff / float(len(adjustable))
for i in adjustable:
w[i] += incr
if wmin > 0:
w = np.maximum(w, wmin)
if wmax > 0:
w = np.minimum(w, wmax)
return w
def liquidate_position(self, symbol):
"""
Liquidate a position completely.
Args:
symbol: The symbol to liquidate
"""
self.algorithm.liquidate(symbol)
self.long_trail.pop(symbol, None)
self.entry.pop(symbol, None)
def liquidate_non_top_longs(self, top_set, spy, gld):
"""
Exit long positions that are not in the top set.
Args:
top_set: Set of symbols to keep as longs
spy: SPY symbol (to exclude)
gld: GLD/hedge symbol (to exclude)
"""
for kvp in self.algorithm.portfolio:
sym = kvp.key
if sym.security_type != SecurityType.EQUITY:
continue
if sym in (spy, gld):
continue
holding = kvp.value
if holding.invested and holding.quantity > 0 and sym not in top_set:
self.liquidate_position(sym)
def liquidate_non_selected_shorts(self, selected, spy, gld):
"""
Exit short positions that are not in the selected list.
Args:
selected: List of symbols to keep as shorts
spy: SPY symbol (to exclude)
gld: GLD/hedge symbol (to exclude)
"""
for kvp in self.algorithm.portfolio:
sym = kvp.key
if sym in (spy, gld):
continue
holding = kvp.value
if holding.invested and holding.quantity < 0 and sym not in selected:
self.liquidate_position(sym)
def ensure_long_trail_state(self, symbol, target_weight):
"""
Initialize or update trailing stop state for a long position.
Args:
symbol: The symbol being tracked
target_weight: The target weight for this position
"""
if not self.algorithm.securities.contains_key(symbol):
return
px = float(self.algorithm.securities[symbol].Price)
if px <= 0:
return
st = self.long_trail.get(symbol)
if st is None:
self.long_trail[symbol] = {
"high": px,
"stage": 0,
"target_w": float(target_weight)
}
else:
st["target_w"] = float(target_weight)
def check_long_trailing_stops(self, long_trail_1, long_trail_2, long_trail_3):
"""
Apply trailing stop exits to long positions.
Uses a 3-stage trailing stop mechanism:
- Stage 0->1: First trailing stop hit, reduce to 2/3
- Stage 1->2: Second trailing stop hit, reduce to 1/3
- Stage 2->3: Third trailing stop hit, liquidate
Args:
long_trail_1: First trailing stop threshold (drawdown %)
long_trail_2: Second trailing stop threshold (drawdown %)
long_trail_3: Third trailing stop threshold (drawdown %)
"""
if self.algorithm.is_warming_up:
return
for sym in list(self.long_trail.keys()):
if (
not self.algorithm.securities.contains_key(sym)
or not self.algorithm.portfolio[sym].invested
or self.algorithm.portfolio[sym].quantity <= 0
):
self.long_trail.pop(sym, None)
continue
px = float(self.algorithm.securities[sym].price)
if px <= 0:
continue
st = self.long_trail.get(sym)
if st is None:
continue
if px > float(st["high"]):
st["high"] = px
high = float(st["high"])
if high <= 0:
continue
dd = (high - px) / high
stage = int(st["stage"])
full_target_w = float(st["target_w"])
if stage == 0 and dd >= long_trail_1:
new_w = full_target_w * (2.0 / 3.0)
self.safe_set_holdings(sym, new_w)
st["stage"] = 1
st["high"] = px
elif stage == 1 and dd >= long_trail_2:
new_w = full_target_w * (1.0 / 3.0)
self.safe_set_holdings(sym, new_w)
st["stage"] = 2
st["high"] = px
elif stage == 2 and dd >= long_trail_3:
self.liquidate_position(sym)
def check_short_atr_stops(self, stop_atr_multiplier):
"""
Exit short positions when losses exceed ATR-based stop levels.
Args:
stop_atr_multiplier: Multiple of entry ATR for stop placement
"""
if self.algorithm.is_warming_up:
return
exits = []
for sym, info in list(self.entry.items()):
if not self.algorithm.securities.contains_key(sym) or not self.algorithm.portfolio[sym].invested:
self.entry.pop(sym, None)
continue
if self.algorithm.portfolio[sym].quantity >= 0:
self.entry.pop(sym, None)
continue
price = float(self.algorithm.securities[sym].Price)
if price <= 0:
continue
entry = float(info["entry_price"])
atr = float(info["entry_atr"])
if atr <= 0:
continue
if (price - entry) > stop_atr_multiplier * atr:
exits.append(sym)
for sym in exits:
self.liquidate_position(sym)
def record_short_entry(self, symbol, entry_price, entry_atr):
"""
Record entry information for short position risk tracking.
Args:
symbol: The short symbol
entry_price: Entry price for stop calculation
entry_atr: Entry ATR for stop calculation
"""
if symbol not in self.entry:
self.entry[symbol] = {
"entry_price": entry_price,
"entry_atr": entry_atr
}
def get_entry_price(self, symbol):
"""Get recorded entry price for a symbol."""
if symbol in self.entry:
return float(self.entry[symbol]["entry_price"])
return None
def get_entry_atr(self, symbol):
"""Get recorded entry ATR for a symbol."""
if symbol in self.entry:
return float(self.entry[symbol]["entry_atr"])
return None
"""
Public QC Strategy Framework - Portfolio Module
Manages portfolio state, position tracking, and allocation logic.
Handles top set management and weight distribution.
"""
import numpy as np
from AlgorithmImports import *
class PortfolioManager:
"""Manages portfolio state, positions, and allocation decisions."""
def __init__(self, algorithm):
"""
Initialize the portfolio manager.
Args:
algorithm: Reference to the QCAlgorithm instance
"""
self.algorithm = algorithm
self.top_set = set()
self.active = []
self.last_top_month = -1
# Configuration parameters
self.long_gross = 0.9
self.short_gross = 0.6
self.ml_tilt = 0.25
self.top_weight_max = 0.35
self.top_weight_min = 0.0
def update_top_set(self, fine, current_month):
"""
Update the top set of stocks for long positions.
Selects the top 4 stocks by market cap monthly.
Args:
fine: List of fundamental data
current_month: Current month (to determine if update needed)
"""
if current_month != self.last_top_month:
fine_mc = [f for f in fine if f.MarketCap and f.MarketCap > 0]
fine_mc.sort(key=lambda f: f.MarketCap, reverse=True)
self.top_set = set([f.Symbol for f in fine_mc[:4]])
self.last_top_month = current_month
def get_top_set(self):
"""Get the current top set of stocks."""
return self.top_set
def set_active_universe(self, symbols):
"""
Set the active universe of short candidates.
Args:
symbols: List of candidate symbols for shorting
"""
self.active = symbols
def get_active_universe(self):
"""Get the active universe of short candidates."""
return self.active
def pick_overweight_symbol(self, symbols):
"""
Select the highest market cap symbol for overweighting.
Args:
symbols: List of symbols to choose from (typically top set)
Returns:
Symbol with highest market cap, or first symbol if no cap data
"""
best = symbols[0]
best_cap = -1.0
for sym in symbols:
cap_val = -1.0
sec = self.algorithm.securities[sym] if self.algorithm.securities.contains_key(sym) else None
if sec is not None and sec.Fundamentals is not None:
cap = sec.Fundamentals.MarketCap
if cap and cap > 0:
cap_val = float(cap)
if cap_val > best_cap:
best_cap = cap_val
best = sym
return best
def allocate_top_positions(self, total_weight, symbols, ml_bullish=False, execution_mgr=None):
"""
Allocate capital to top long positions with optional ML tilt.
When ML is bullish, overweight the highest market cap stock.
Args:
total_weight: Total gross weight to allocate
symbols: List of top symbols to allocate to
ml_bullish: Whether ML signal is bullish (for tilt)
execution_mgr: ExecutionManager instance for placing orders
"""
if not symbols or not execution_mgr:
return
TW = float(total_weight)
if TW <= 0:
for sym in symbols:
if self.algorithm.portfolio[sym].invested and self.algorithm.portfolio[sym].quantity > 0:
self.algorithm.liquidate(sym)
return
n = len(symbols)
base = TW / float(n)
weights = np.array([base] * n, dtype=float)
# Apply ML tilt: increase overweight on highest market cap stock
if ml_bullish and self.ml_tilt > 0 and n >= 2:
ow = self.pick_overweight_symbol(symbols)
i_ow = symbols.index(ow)
extra = base * float(self.ml_tilt)
weights[i_ow] += extra
sub = extra / float(n - 1)
for i in range(n):
if i != i_ow:
weights[i] -= sub
# Cap and renormalize weights
weights = execution_mgr.cap_and_renormalize(
weights,
TW,
self.top_weight_min,
self.top_weight_max
)
# Place orders and track trailing stops
for i, sym in enumerate(symbols):
w = float(weights[i])
execution_mgr.safe_set_holdings(sym, w)
execution_mgr.ensure_long_trail_state(sym, w)
def get_long_gross(self):
"""Get configured long gross weight."""
return self.long_gross
def get_short_gross(self):
"""Get configured short gross weight."""
return self.short_gross
def get_ml_tilt(self):
"""Get configured ML tilt factor."""
return self.ml_tilt
def set_long_gross(self, value):
"""Set long gross weight."""
self.long_gross = float(value)
def set_short_gross(self, value):
"""Set short gross weight."""
self.short_gross = float(value)
def set_ml_tilt(self, value):
"""Set ML tilt factor."""
self.ml_tilt = float(value)
def set_top_weight_limits(self, wmin, wmax):
"""
Set weight constraints for top positions.
Args:
wmin: Minimum individual position weight
wmax: Maximum individual position weight
"""
self.top_weight_min = float(wmin)
self.top_weight_max = float(wmax)
def get_current_positions(self, symbol_type=SecurityType.EQUITY):
"""
Get all current positions of a given type.
Args:
symbol_type: Security type to filter (default: EQUITY)
Returns:
Dict of {symbol: quantity}
"""
positions = {}
for kvp in self.algorithm.portfolio:
sym = kvp.key
if sym.security_type != symbol_type:
continue
holding = kvp.value
if holding.invested:
positions[sym] = holding.quantity
return positions
def get_position_weight(self, symbol):
"""
Get current portfolio weight of a position.
Args:
symbol: The symbol to check
Returns:
Portfolio weight as percentage of total portfolio value
"""
pv = float(self.algorithm.portfolio.total_portfolio_value)
if pv <= 0:
return 0.0
holding = self.algorithm.portfolio[symbol]
return float(holding.HoldingsValue) / pv
def get_leverage(self):
"""
Get current portfolio leverage.
Returns:
Gross leverage (sum of absolute position weights)
"""
gross = 0.0
pv = float(self.algorithm.portfolio.total_portfolio_value)
if pv <= 0:
return 0.0
for kvp in self.algorithm.portfolio:
holding = kvp.value
if holding.invested:
gross += abs(float(holding.HoldingsValue) / pv)
return gross
def get_short_count(self):
"""Get count of active short positions."""
count = 0
for kvp in self.algorithm.portfolio:
holding = kvp.value
if holding.invested and holding.quantity < 0:
count += 1
return count
def get_long_count(self):
"""Get count of active long positions."""
count = 0
for kvp in self.algorithm.portfolio:
holding = kvp.value
if holding.invested and holding.quantity > 0:
count += 1
return count
"""
Public QC Strategy Framework - Universe Module
Handles universe selection, coarse/fine filtering, and security screening.
Manages the investment universe for both long and short strategies.
"""
from datetime import datetime
from AlgorithmImports import *
class StrategyUniverseManager:
"""Manages universe selection and security filtering."""
def __init__(self, algorithm):
"""
Initialize the universe manager.
Args:
algorithm: Reference to the QCAlgorithm instance
"""
self.algorithm = algorithm
self.coarse_count = 2000
self.max_universe = 150
self.min_ipo_days = 365
def coarse_selection(self, coarse):
"""
Perform coarse universe selection.
Filters for liquid, tradable equities with:
- Price > $5
- Dollar volume > $20M
- Fundamental data available
Args:
coarse: Coarse universe data from QCAlgorithm
Returns:
List of symbols meeting criteria, sorted by dollar volume
"""
filtered = [
c for c in coarse
if c.HasFundamentalData
and c.Price is not None and c.Price > 5
and c.DollarVolume is not None and c.DollarVolume > 2e7
]
filtered.sort(key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in filtered[:self.coarse_count]]
def fine_selection(self, fine, current_month, portfolio_manager):
"""
Perform fine universe selection.
Selects top market cap names for long positions and builds short candidate
universe with fundamental filters (market cap minimum, IPO age requirement).
Args:
fine: Fine universe data from QCAlgorithm
current_month: Current month to track monthly updates
portfolio_manager: PortfolioManager instance for updating top set
Returns:
Tuple of (short_candidates, top_set_symbols)
"""
today = self.algorithm.time.date()
# Update top set monthly: top 4 by market cap
portfolio_manager.update_top_set(fine, current_month)
top_set = portfolio_manager.get_top_set()
# Filter for short candidates
kept = []
for f in fine:
# Minimum market cap filter
if not f.MarketCap or f.MarketCap < 1_000_000_000:
continue
# IPO date validation
sr = f.SecurityReference
if sr is None or sr.IPODate is None:
continue
days_since_ipo = (today - sr.IPODate.date()).days
if days_since_ipo < self.min_ipo_days:
continue
kept.append(f)
# Sort by dollar volume for ordering
kept.sort(key=lambda f: f.DollarVolume, reverse=True)
short_candidates = [f.Symbol for f in kept[:self.max_universe]]
# Update portfolio manager with active universe
portfolio_manager.set_active_universe(short_candidates)
# Return combined set for subscriptions
return list(set(short_candidates) | top_set)
def set_coarse_count(self, count):
"""
Set the number of symbols to keep after coarse selection.
Args:
count: Number of symbols to keep
"""
self.coarse_count = int(count)
def set_max_universe(self, count):
"""
Set the maximum number of short candidates.
Args:
count: Maximum short candidate count
"""
self.max_universe = int(count)
def set_min_ipo_days(self, days):
"""
Set the minimum days since IPO requirement.
Args:
days: Minimum days since IPO
"""
self.min_ipo_days = int(days)
def get_coarse_count(self):
"""Get current coarse selection count."""
return self.coarse_count
def get_max_universe(self):
"""Get current maximum universe size."""
return self.max_universe
def get_min_ipo_days(self):
"""Get current minimum IPO days requirement."""
return self.min_ipo_days
@staticmethod
def validate_symbol(symbol, algorithm):
"""
Validate that a symbol has required data for analysis.
Args:
symbol: Symbol to validate
algorithm: Reference to QCAlgorithm
Returns:
Boolean indicating if symbol is valid
"""
if not algorithm.securities.contains_key(symbol):
return False
sec = algorithm.securities[symbol]
if not sec.has_data:
return False
price = float(sec.Price)
if price <= 0:
return False
return True
@staticmethod
def get_sorted_by_volume(symbols, algorithm):
"""
Sort symbols by dollar volume in descending order.
Args:
symbols: List of symbols to sort
algorithm: Reference to QCAlgorithm
Returns:
Sorted list of symbols
"""
volumes = []
for sym in symbols:
if not algorithm.securities.contains_key(sym):
continue
sec = algorithm.securities[sym]
if sec.has_data:
volumes.append((sym, float(sec.volume)))
volumes.sort(key=lambda x: x[1], reverse=True)
return [sym for sym, _ in volumes]
@staticmethod
def get_sorted_by_price(symbols, algorithm, descending=True):
"""
Sort symbols by price.
Args:
symbols: List of symbols to sort
algorithm: Reference to QCAlgorithm
descending: If True, sort high to low; else low to high
Returns:
Sorted list of symbols
"""
prices = []
for sym in symbols:
if not algorithm.securities.contains_key(sym):
continue
sec = algorithm.securities[sym]
if sec.has_data:
price = float(sec.Price)
if price > 0:
prices.append((sym, price))
prices.sort(key=lambda x: x[1], reverse=descending)
return [sym for sym, _ in prices]
def filter_by_price_range(self, symbols, min_price=0, max_price=None):
"""
Filter symbols by price range.
Args:
symbols: List of symbols to filter
min_price: Minimum price (default 0)
max_price: Maximum price (default None = unlimited)
Returns:
Filtered list of symbols
"""
result = []
for sym in symbols:
if not self.algorithm.securities.contains_key(sym):
continue
sec = self.algorithm.securities[sym]
price = float(sec.Price)
if price >= min_price:
if max_price is None or price <= max_price:
result.append(sym)
return result
def filter_by_volume(self, symbols, min_volume):
"""
Filter symbols by minimum dollar volume.
Args:
symbols: List of symbols to filter
min_volume: Minimum dollar volume
Returns:
Filtered list of symbols
"""
result = []
for sym in symbols:
if not self.algorithm.securities.contains_key(sym):
continue
sec = self.algorithm.securities[sym]
if float(sec.volume) >= min_volume:
result.append(sym)
return result
from AlgorithmImports import Resolution
def record_vars(algo):
"""
Records benchmark and strategy performance for plotting.
"""
try:
symbol = algo.benchmark_symbol
history = algo.History([symbol], 2, Resolution.Daily)
if history.empty:
algo.Debug("[RECORD VARS] History is empty.")
return
latest_price = history.loc[symbol].iloc[-1]["close"]
if not hasattr(algo, "spy_prices"):
algo.spy_prices = []
algo.spy_prices.append(latest_price)
if len(algo.spy_prices) > 1:
perf = algo.spy_prices[-1] / algo.spy_prices[0] * algo.initial_cash
algo.Plot("Strategy Equity", f"{symbol} Benchmark", perf)
# Debug info
# algo.Debug(f"[RECORD VARS] Perf: {perf:.2f}, Prices: {algo.spy_prices[-2:]}")
except Exception as e:
algo.Debug(f"[RECORD VARS ERROR] {e}")