| Overall Statistics |
|
Total Orders 1054 Average Win 1.34% Average Loss -0.95% Compounding Annual Return 35.997% Drawdown 35.500% Expectancy 0.337 Start Equity 1000000 End Equity 5035245.64 Net Profit 403.525% Sharpe Ratio 0.953 Sortino Ratio 1.18 Probabilistic Sharpe Ratio 40.775% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 1.40 Alpha 0.206 Beta 0.575 Annual Standard Deviation 0.271 Annual Variance 0.073 Information Ratio 0.637 Tracking Error 0.263 Treynor Ratio 0.45 Total Fees $30779.05 Estimated Strategy Capacity $1300000000.00 Lowest Capacity Asset GE R735QTJ8XC9X Portfolio Turnover 13.30% Drawdown Recovery 541 |
#region imports
from AlgorithmImports import *
import numpy as np
from collections import deque
import statistics as stat
import pickle
from datetime import datetime, timedelta
#endregion
class Q2PlaygroundAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2019, 3, 1) # Set Start Date
self.set_end_date(2024, 6, 1) # Set End Date
self.set_cash(1000000) # Set Strategy Cash
self.set_security_initializer(BrokerageModelSecurityInitializer(
self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)
))
########################## PARAMETERS ##########################
# self.p_lookback = self.get_parameter("p_lookback", 252)
# self.p_num_coarse = self.get_parameter("p_num_coarse", 200)
# self.p_num_fine = self.get_parameter("p_num_fine", 70)
# self.p_num_long = self.get_parameter("p_num_long", 5)
# self.p_adjustment_step = self.get_parameter("p_adjustment_step", 1.0)
# self.p_n_portfolios = self.get_parameter("p_n_portfolios", 1000)
# self.p_short_lookback = self.get_parameter("p_short_lookback", 63)
# self.p_rand_seed = self.get_parameter("p_rand_seed", 13)
################################################################
self.p_lookback = 252
self.p_num_coarse = 200
self.p_num_fine = 70
self.p_num_long = 4
self.p_adjustment_step = 1.0
self.p_n_portfolios = 1000
self.p_short_lookback = 63
self.p_rand_seed = 13
self.p_adjustment_frequency = 'weekday' # Can be 'monthly', 'weekly', 'bi-weekly'
################################################################
self.universe_settings.resolution = Resolution.DAILY
self._momp = {} # Dict of Momentum indicator keyed by Symbol
self._lookback = self.p_lookback # Momentum indicator lookback period
self._num_coarse = self.p_num_coarse # Number of symbols selected at Coarse Selection
self._num_fine = self.p_num_fine # Number of symbols selected at Fine Selection
self._num_long = self.p_num_long # Number of symbols with open positions
self._rebalance = False
self.current_holdings = set() # To track current holdings
self.target_weights = {} # To store target weights
self.adjustment_step = self.p_adjustment_step # Adjustment step for gradual transition
self.first_trade_date = None
self.next_adjustment_date = None
# Analysis variables for momentum tracking
self.momentum_history = {} # Track momentum values over time
self.portfolio_history = [] # Track portfolio composition changes
self.rebalance_dates = [] # Track all rebalancing dates
self.days_since_rebalance = 0
self.portfolio_value_at_rebalance = None
self.daily_returns_since_rebalance = []
self.momentum_decay_tracker = {} # Track how momentum changes after selection
self.correlation_tracker = {} # Track correlations between assets
self.volatility_tracker = {} # Track volatility patterns
self.individual_performance = {} # Track individual asset performance
# Add SPY for market regime detection
self.market_proxy = self.add_equity("SPY", Resolution.DAILY).symbol
# Add GLD for defensive allocation (Gold performs well during market stress)
self.defensive_asset = self.add_equity("GLD", Resolution.DAILY).symbol # SPDR Gold Shares ETF
# Market regime detection parameters
self.regime_indicators = {}
self.current_regime = "MOMENTUM" # "MOMENTUM" or "DEFENSIVE"
self.regime_history = []
self.bear_signal_threshold = 0.45 # Conservative threshold
# Market regime weights (must sum to 1.0)
self.regime_weights = {
'ma_200_signal': 0.30,
'ma_50_signal': 0.25,
'momentum_signal': 0.20,
'volatility_signal': 0.15,
'drawdown_signal': 0.10
}
# Defensive allocation settings
self.defensive_gold_allocation = 0.95 # 95% gold during defensive periods
self.defensive_equity_allocation = 0.05 # 5% equity during defensive periods
self.momentum_allocation = 1.0 # 100% equity during momentum periods
# Track regime score
self.last_bear_score = 0.0
self.add_universe(self._coarse_selection_function, self._fine_selection_function)
# Schedule daily regime update
self.schedule.on(self.date_rules.every_day(),
self.time_rules.after_market_open(self.market_proxy, 30),
self.update_market_regime)
def update_market_regime(self):
"""Update market regime indicators and determine current regime"""
if self.market_proxy not in self.securities:
return
# Get market proxy price history
history = self.history(self.market_proxy, 250, Resolution.DAILY)
if history.empty:
return
prices = history.loc[self.market_proxy]['close']
current_price = prices.iloc[-1]
# Reset indicators
self.regime_indicators = {}
# 1. Moving Average Signals
if len(prices) >= 200:
ma_200 = prices.tail(200).mean()
self.regime_indicators['ma_200_signal'] = 1 if current_price < ma_200 else 0
if len(prices) >= 50:
ma_50 = prices.tail(50).mean()
self.regime_indicators['ma_50_signal'] = 1 if current_price < ma_50 else 0
# 2. Momentum Signal (multiple timeframes)
momentum_signals = []
for period in [10, 20, 50]:
if len(prices) >= period:
momentum = (current_price / prices.iloc[-period] - 1)
# More aggressive thresholds for different timeframes
threshold = -0.08 if period == 10 else -0.10 if period == 20 else -0.12
momentum_signals.append(1 if momentum < threshold else 0)
if momentum_signals:
self.regime_indicators['momentum_signal'] = max(momentum_signals)
# 3. Volatility Regime
if len(prices) >= 50:
returns = prices.pct_change().dropna()
if len(returns) >= 20:
current_vol = returns.tail(20).std() * np.sqrt(252)
avg_vol = returns.tail(100).std() * np.sqrt(252) if len(returns) >= 100 else current_vol
vol_threshold = avg_vol * 1.8 # 80% above average
self.regime_indicators['volatility_signal'] = 1 if current_vol > vol_threshold else 0
# 4. Drawdown Signal
if len(prices) >= 100:
returns = prices.pct_change().dropna()
cumulative = (1 + returns).cumprod()
rolling_max = cumulative.expanding().max()
current_drawdown = (cumulative.iloc[-1] - rolling_max.iloc[-1]) / rolling_max.iloc[-1]
# More sensitive drawdown threshold
self.regime_indicators['drawdown_signal'] = 1 if current_drawdown < -0.08 else 0
# Calculate composite bear signal
bear_score = 0
total_weight = 0
for indicator, weight in self.regime_weights.items():
if indicator in self.regime_indicators:
bear_score += self.regime_indicators[indicator] * weight
total_weight += weight
if total_weight > 0:
bear_score = bear_score / total_weight
# Update regime with hysteresis to prevent whipsaws
old_regime = self.current_regime
if self.current_regime == "MOMENTUM":
# Require stronger signal to switch to defensive
if bear_score >= self.bear_signal_threshold:
self.current_regime = "DEFENSIVE"
else: # DEFENSIVE
# Require lower threshold to switch back to momentum (hysteresis)
if bear_score <= (self.bear_signal_threshold - 0.1):
self.current_regime = "MOMENTUM"
# Store regime history
self.regime_history.append({
'date': self.time,
'regime': self.current_regime,
'bear_score': bear_score,
'indicators': self.regime_indicators.copy(),
'spy_price': current_price
})
self.last_bear_score = bear_score
def _coarse_selection_function(self, coarse):
'''Enhanced coarse selection with regime awareness'''
if self.next_adjustment_date and self.time < self.next_adjustment_date:
return Universe.UNCHANGED
self._rebalance = True
if not self.first_trade_date:
self.first_trade_date = self.time
self.next_adjustment_date = self.get_next_adjustment_date(self.time)
self._rebalance = True
# In defensive regime, be more selective (higher price threshold)
min_price = 8.0 if self.current_regime == "DEFENSIVE" else 5.0
selected = sorted([x for x in coarse if x.has_fundamental_data and x.price > min_price],
key=lambda x: x.dollar_volume, reverse=True)
return [x.symbol for x in selected[:self._num_coarse]]
def _fine_selection_function(self, fine):
'''Enhanced fine selection with regime awareness'''
# In defensive regime, prefer larger, more stable companies
if self.current_regime == "DEFENSIVE":
# Sort by market cap and select more conservative stocks
selected = sorted(fine, key=lambda f: f.market_cap, reverse=True)
return [x.symbol for x in selected[:int(self._num_fine * 0.8)]] # Be more selective
else:
# Normal momentum selection
selected = sorted(fine, key=lambda f: f.market_cap, reverse=True)
return [x.symbol for x in selected[:self._num_fine]]
def on_data(self, data):
# Update the indicator
for symbol, mom in self._momp.items():
mom.update(self.time, self.securities[symbol].close)
# Track momentum history for analysis
if symbol not in self.momentum_history:
self.momentum_history[symbol] = []
if mom.is_ready:
self.momentum_history[symbol].append({
'date': self.time,
'momentum': mom.current.value,
'price': self.securities[symbol].close
})
# Track daily performance since last rebalance
if self.portfolio_value_at_rebalance is not None:
current_value = self.portfolio.total_portfolio_value
daily_return = (current_value - self.portfolio_value_at_rebalance) / self.portfolio_value_at_rebalance
self.daily_returns_since_rebalance.append({
'date': self.time,
'days_since_rebalance': self.days_since_rebalance,
'daily_return': daily_return,
'portfolio_value': current_value
})
self.days_since_rebalance += 1
# Check if empty portfolio and set first_trade_date
if not self.portfolio.invested and not self.first_trade_date:
self.first_trade_date = self.time
self.next_adjustment_date = self.get_next_adjustment_date(self.time, initial=True)
self._rebalance = True
if not self._rebalance:
# Track momentum decay even when not rebalancing
if len(self.current_holdings) > 0:
current_momentum_values = []
for symbol in self.current_holdings:
if symbol in self._momp and self._momp[symbol].is_ready:
momentum_val = self._momp[symbol].current.value
current_momentum_values.append(momentum_val)
# Track momentum decay for current holdings
if symbol not in self.momentum_decay_tracker:
self.momentum_decay_tracker[symbol] = []
self.momentum_decay_tracker[symbol].append({
'date': self.time,
'days_since_selection': self.days_since_rebalance,
'momentum': momentum_val
})
return
# ENHANCED REBALANCING WITH REGIME AWARENESS
if self.current_regime == "DEFENSIVE":
# DEFENSIVE MODE: Allocate to gold + small equity exposure
# Liquidate all current equity positions first
for symbol in list(self.portfolio.keys()):
if self.portfolio[symbol].invested and symbol != self.defensive_asset:
self.liquidate(symbol, "Defensive regime - reallocating to gold")
# Allocate to GLD (Gold)
gold_allocation = self.defensive_gold_allocation
self.set_holdings(self.defensive_asset, gold_allocation)
# Small equity allocation to top momentum stock
sorted_mom = sorted([k for k,v in self._momp.items() if v.is_ready and k != self.defensive_asset and k != self.market_proxy],
key=lambda x: self._momp[x].current.value, reverse=True)
if sorted_mom:
top_momentum = sorted_mom[0]
equity_allocation = self.defensive_equity_allocation
self.set_holdings(top_momentum, equity_allocation)
self.current_holdings = {top_momentum}
else:
self.current_holdings = set()
# Track defensive period
portfolio_record = {
'date': self.time,
'days_since_last_rebalance': self.days_since_rebalance,
'regime': 'DEFENSIVE',
'equity_symbols': [str(s) for s in self.current_holdings] if self.current_holdings else [],
'gold_allocation': self.defensive_gold_allocation,
'equity_allocation': self.defensive_equity_allocation,
'bear_score': self.last_bear_score,
'portfolio_value': self.portfolio.total_portfolio_value
}
else:
# MOMENTUM MODE: Full momentum strategy
sorted_mom = sorted([k for k,v in self._momp.items() if v.is_ready],
key=lambda x: self._momp[x].current.value, reverse=True)
selected = sorted_mom[:self._num_long]
new_holdings = set(selected)
# Only rebalance if the new selection is different from current holdings
if new_holdings != self.current_holdings or self.first_trade_date == self.time:
if len(selected) > 0:
# Analyze correlation between selected assets
if len(selected) > 1:
self.analyze_correlations(selected)
# Track volatility of selected assets
self.analyze_volatility(selected)
optimal_weights = self.optimize_portfolio(selected)
self.target_weights = dict(zip(selected, optimal_weights))
self.current_holdings = new_holdings
self.adjust_portfolio()
# Track momentum period
portfolio_record = {
'date': self.time,
'days_since_last_rebalance': self.days_since_rebalance,
'regime': 'MOMENTUM',
'symbols': [str(s) for s in selected],
'momentum_values': [self._momp[s].current.value for s in selected] if selected else [],
'bear_score': self.last_bear_score,
'portfolio_value': self.portfolio.total_portfolio_value
}
# Common tracking for both regimes
self.portfolio_history.append(portfolio_record)
self.rebalance_dates.append(self.time)
# Reset tracking variables
self.days_since_rebalance = 0
self.portfolio_value_at_rebalance = self.portfolio.total_portfolio_value
self.daily_returns_since_rebalance = []
self._rebalance = False
self.next_adjustment_date = self.get_next_adjustment_date(self.time)
def on_securities_changed(self, changes):
# Clean up data for removed securities and Liquidate
for security in changes.RemovedSecurities:
symbol = security.Symbol
if self._momp.pop(symbol, None) is not None:
self.liquidate(symbol, 'Removed from universe')
for security in changes.AddedSecurities:
if security.Symbol not in self._momp:
self._momp[security.Symbol] = MomentumPercent(self._lookback)
# Warm up the indicator with history price for all securities (including bonds)
added_symbols = [k for k, v in self._momp.items() if not v.IsReady]
history = self.history(added_symbols, 1 + self._lookback, Resolution.DAILY)
if not history.empty:
history = history.close.unstack(level=0)
for symbol in added_symbols:
ticker = symbol.id.to_string()
if ticker in history:
for time, value in history[ticker].dropna().items():
item = IndicatorDataPoint(symbol, time.date(), value)
self._momp[symbol].update(item)
def analyze_correlations(self, selected_symbols):
"""Analyze correlations between selected assets"""
try:
short_lookback = min(30, self.p_short_lookback) # Use shorter period for correlation analysis
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
if len(returns) > 10: # Need enough data points
corr_matrix = returns.corr()
# Calculate average correlation
correlations = []
for i in range(len(selected_symbols)):
for j in range(i+1, len(selected_symbols)):
symbol1 = selected_symbols[i]
symbol2 = selected_symbols[j]
if symbol1.id.to_string() in corr_matrix.columns and symbol2.id.to_string() in corr_matrix.columns:
corr_val = corr_matrix.loc[symbol1.id.to_string(), symbol2.id.to_string()]
if not np.isnan(corr_val):
correlations.append(corr_val)
if correlations:
avg_correlation = sum(correlations) / len(correlations)
max_correlation = max(correlations)
min_correlation = min(correlations)
# Store correlation data
self.correlation_tracker[self.time] = {
'avg_correlation': avg_correlation,
'max_correlation': max_correlation,
'min_correlation': min_correlation,
'symbols': [str(s) for s in selected_symbols]
}
except Exception as e:
pass
def analyze_volatility(self, selected_symbols):
"""Analyze volatility patterns of selected assets"""
try:
short_lookback = min(30, self.p_short_lookback)
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
if len(returns) > 10:
volatilities = []
for symbol in selected_symbols:
symbol_str = symbol.id.to_string()
if symbol_str in returns.columns:
symbol_returns = returns[symbol_str].dropna()
if len(symbol_returns) > 5:
vol = symbol_returns.std() * np.sqrt(252) # Annualized volatility
volatilities.append(vol)
if volatilities:
avg_volatility = sum(volatilities) / len(volatilities)
max_volatility = max(volatilities)
min_volatility = min(volatilities)
# Store volatility data
self.volatility_tracker[self.time] = {
'avg_volatility': avg_volatility,
'max_volatility': max_volatility,
'min_volatility': min_volatility,
'symbols': [str(s) for s in selected_symbols]
}
except Exception as e:
pass
def optimize_portfolio(self, selected_symbols):
short_lookback = self.p_short_lookback
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
n_assets = len(selected_symbols)
n_portfolios = self.p_n_portfolios
results = np.zeros((3, n_portfolios))
weights_record = []
np.random.seed(self.p_rand_seed)
for i in range(n_portfolios):
weights = np.random.random(n_assets)
weights /= np.sum(weights)
portfolio_return = np.sum(returns.mean() * weights) * short_lookback
portfolio_stddev = np.sqrt(np.dot(weights.T, np.dot(returns.cov() * short_lookback, weights)))
downside_stddev = np.sqrt(np.mean(np.minimum(0, returns).apply(lambda x: x**2, axis=0).dot(weights)))
sortino_ratio = portfolio_return / downside_stddev if downside_stddev > 0 else 0
results[0,i] = portfolio_return
results[1,i] = portfolio_stddev
results[2,i] = sortino_ratio
weights_record.append(weights)
best_sortino_idx = np.argmax(results[2])
best_portfolio_return = results[0, best_sortino_idx]
best_portfolio_stddev = results[1, best_sortino_idx]
best_sortino_ratio = results[2, best_sortino_idx]
return weights_record[best_sortino_idx]
def adjust_portfolio(self):
current_symbols = set(self.portfolio.keys())
target_symbols = set(self.target_weights.keys())
# Liquidate removed symbols (exclude gold during defensive periods)
removed_symbols = current_symbols - target_symbols
for symbol in removed_symbols:
# Don't liquidate gold during defensive periods unless explicitly targeted
if self.current_regime == "DEFENSIVE" and symbol == self.defensive_asset:
continue
self.liquidate(symbol)
# Adjust holdings for selected symbols
for symbol, target_weight in self.target_weights.items():
current_weight = self.portfolio[symbol].holdings_value / self.portfolio.total_portfolio_value if symbol in self.portfolio else 0
adjusted_weight = current_weight * (1 - self.adjustment_step) + target_weight * self.adjustment_step
# Track individual performance
if symbol not in self.individual_performance:
self.individual_performance[symbol] = []
self.individual_performance[symbol].append({
'date': self.time,
'weight': adjusted_weight,
'momentum': self._momp[symbol].current.value if symbol in self._momp and self._momp[symbol].is_ready else None,
'price': self.securities[symbol].close if symbol in self.securities else None
})
self.set_holdings(symbol, adjusted_weight)
def get_next_adjustment_date(self, current_date, initial=False):
if self.p_adjustment_frequency == 'weekday':
return current_date + timedelta(days=5)
elif self.p_adjustment_frequency == 'bi-weekly':
return current_date + timedelta(days=14)
elif self.p_adjustment_frequency == 'monthly':
if initial:
next_month = current_date.replace(day=1) + timedelta(days=32)
return next_month.replace(day=1)
next_month = current_date.replace(day=1) + timedelta(days=32)
return next_month.replace(day=1)
else:
raise ValueError(f"Unsupported adjustment frequency: {self.p_adjustment_frequency}")
def on_end_of_algorithm(self):
"""Enhanced end analysis including regime performance"""
pass