| Overall Statistics |
|
Total Orders 966 Average Win 1.64% Average Loss -1.08% Compounding Annual Return 28.071% Drawdown 46.000% Expectancy 0.300 Start Equity 1000000 End Equity 3672105.27 Net Profit 267.211% Sharpe Ratio 0.726 Sortino Ratio 0.872 Probabilistic Sharpe Ratio 22.029% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.52 Alpha 0.126 Beta 0.984 Annual Standard Deviation 0.297 Annual Variance 0.088 Information Ratio 0.509 Tracking Error 0.244 Treynor Ratio 0.219 Total Fees $28472.68 Estimated Strategy Capacity $770000000.00 Lowest Capacity Asset ETN R735QTJ8XC9X Portfolio Turnover 12.98% Drawdown Recovery 832 |
#region imports
from AlgorithmImports import *
import numpy as np
from collections import deque
import statsmodels.api as sm
import statistics as stat
import pickle
from datetime import datetime, timedelta
#endregion
class Q2PlaygroundAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2019, 3, 1) # Set Start Date
self.set_end_date(2024, 6, 1) # Set End Date
self.set_cash(1000000) # Set Strategy Cash
self.set_security_initializer(BrokerageModelSecurityInitializer(
self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)
))
########################## PARAMETERS ##########################
# self.p_lookback = self.get_parameter("p_lookback", 252)
# self.p_num_coarse = self.get_parameter("p_num_coarse", 200)
# self.p_num_fine = self.get_parameter("p_num_fine", 70)
# self.p_num_long = self.get_parameter("p_num_long", 5)
# self.p_adjustment_step = self.get_parameter("p_adjustment_step", 1.0)
# self.p_n_portfolios = self.get_parameter("p_n_portfolios", 1000)
# self.p_short_lookback = self.get_parameter("p_short_lookback", 63)
# self.p_rand_seed = self.get_parameter("p_rand_seed", 13)
################################################################
self.p_lookback = 252
self.p_num_coarse = 200
self.p_num_fine = 70
self.p_num_long = 4
self.p_adjustment_step = 1.0
self.p_n_portfolios = 1000
self.p_short_lookback = 63
self.p_rand_seed = 13
self.p_adjustment_frequency = 'weekly' # Can be 'monthly', 'weekly', 'bi-weekly'
################################################################
self.universe_settings.resolution = Resolution.DAILY
self._momp = {} # Dict of Momentum indicator keyed by Symbol
self._lookback = self.p_lookback # Momentum indicator lookback period
self._num_coarse = self.p_num_coarse # Number of symbols selected at Coarse Selection
self._num_fine = self.p_num_fine # Number of symbols selected at Fine Selection
self._num_long = self.p_num_long # Number of symbols with open positions
self._rebalance = False
self.current_holdings = set() # To track current holdings
self.target_weights = {} # To store target weights
self.adjustment_step = self.p_adjustment_step # Adjustment step for gradual transition
self.first_trade_date = None
self.next_adjustment_date = None
# Logging variables for momentum analysis
self.momentum_history = {} # Track momentum values over time
self.portfolio_history = [] # Track portfolio composition changes
self.rebalance_dates = [] # Track all rebalancing dates
self.days_since_rebalance = 0
self.portfolio_value_at_rebalance = None
self.daily_returns_since_rebalance = []
self.momentum_decay_tracker = {} # Track how momentum changes after selection
self.correlation_tracker = {} # Track correlations between assets
self.volatility_tracker = {} # Track volatility patterns
self.individual_performance = {} # Track individual asset performance
self.add_universe(self._coarse_selection_function, self._fine_selection_function)
def _coarse_selection_function(self, coarse):
'''Drop securities which have no fundamental data or have too low prices.
Select those with highest by dollar volume'''
if self.next_adjustment_date and self.time < self.next_adjustment_date:
return Universe.UNCHANGED
self._rebalance = True
if not self.first_trade_date:
self.first_trade_date = self.time
self.next_adjustment_date = self.get_next_adjustment_date(self.time)
self._rebalance = True
selected = sorted([x for x in coarse if x.has_fundamental_data and x.price > 5],
key=lambda x: x.dollar_volume, reverse=True)
return [x.symbol for x in selected[:self._num_coarse]]
def _fine_selection_function(self, fine):
'''Select security with highest market cap'''
selected = sorted(fine, key=lambda f: f.market_cap, reverse=True)
return [x.symbol for x in selected[:self._num_fine]]
def on_data(self, data):
# Update the indicator
for symbol, mom in self._momp.items():
mom.update(self.time, self.securities[symbol].close)
# Track momentum history for analysis
if symbol not in self.momentum_history:
self.momentum_history[symbol] = []
if mom.is_ready:
self.momentum_history[symbol].append({
'date': self.time,
'momentum': mom.current.value,
'price': self.securities[symbol].close
})
# Track daily performance since last rebalance
if self.portfolio_value_at_rebalance is not None:
current_value = self.portfolio.total_portfolio_value
daily_return = (current_value - self.portfolio_value_at_rebalance) / self.portfolio_value_at_rebalance
self.daily_returns_since_rebalance.append({
'date': self.time,
'days_since_rebalance': self.days_since_rebalance,
'daily_return': daily_return,
'portfolio_value': current_value
})
self.days_since_rebalance += 1
# Check if empty portfolio and set first_trade_date
if not self.portfolio.invested and not self.first_trade_date:
self.first_trade_date = self.time
self.next_adjustment_date = self.get_next_adjustment_date(self.time, initial=True)
self._rebalance = True
self.log(f"FIRST_TRADE_SETUP: Setting first trade date to {self.time}")
if not self._rebalance:
# Log momentum decay even when not rebalancing
if len(self.current_holdings) > 0:
current_momentum_values = []
for symbol in self.current_holdings:
if symbol in self._momp and self._momp[symbol].is_ready:
momentum_val = self._momp[symbol].current.value
current_momentum_values.append(momentum_val)
# Track momentum decay for current holdings
if symbol not in self.momentum_decay_tracker:
self.momentum_decay_tracker[symbol] = []
self.momentum_decay_tracker[symbol].append({
'date': self.time,
'days_since_selection': self.days_since_rebalance,
'momentum': momentum_val
})
if current_momentum_values:
avg_momentum = sum(current_momentum_values) / len(current_momentum_values)
min_momentum = min(current_momentum_values)
max_momentum = max(current_momentum_values)
# Check for momentum exhaustion signals
momentum_spread = max_momentum - min_momentum
momentum_decline_threshold = 0.02 # 2% decline from initial
if self.days_since_rebalance > 0:
# Get initial momentum values (first day after rebalance)
initial_momentums = []
for symbol in self.current_holdings:
if symbol in self.momentum_decay_tracker and len(self.momentum_decay_tracker[symbol]) > 0:
initial_momentum = self.momentum_decay_tracker[symbol][0]['momentum']
initial_momentums.append(initial_momentum)
if initial_momentums:
avg_initial_momentum = sum(initial_momentums) / len(initial_momentums)
momentum_decline = (avg_initial_momentum - avg_momentum) / avg_initial_momentum
if momentum_decline > momentum_decline_threshold:
self.log(f"MOMENTUM_EXHAUSTION_SIGNAL: Day {self.days_since_rebalance} | "
f"Decline: {momentum_decline*100:.2f}% | "
f"Current Avg: {avg_momentum:.4f} | "
f"Initial Avg: {avg_initial_momentum:.4f}")
self.log(f"MOMENTUM_TRACK: Day {self.days_since_rebalance} | "
f"Avg: {avg_momentum:.4f} | Min: {min_momentum:.4f} | "
f"Max: {max_momentum:.4f} | Spread: {momentum_spread:.4f} | "
f"Holdings: {len(self.current_holdings)}")
return
# Selects the securities with highest momentum
sorted_mom = sorted([k for k,v in self._momp.items() if v.is_ready],
key=lambda x: self._momp[x].current.value, reverse=True)
selected = sorted_mom[:self._num_long]
new_holdings = set(selected)
# Log detailed momentum analysis at rebalancing time
self.log(f"REBALANCE_START: Date {self.time} | Days since last rebalance: {self.days_since_rebalance}")
# Log top momentum candidates
top_candidates = sorted_mom[:15] # Look at top 15 to see what we're choosing from
self.log(f"TOP_MOMENTUM_CANDIDATES:")
for i, symbol in enumerate(top_candidates):
momentum_val = self._momp[symbol].current.value
is_selected = symbol in selected
status = "SELECTED" if is_selected else "NOT_SELECTED"
self.log(f" Rank {i+1}: {symbol} | Momentum: {momentum_val:.4f} | {status}")
# Only rebalance if the new selection is different from current holdings
if new_holdings != self.current_holdings or self.first_trade_date == self.time:
if len(selected) > 0:
# Log portfolio composition changes
holdings_changed = new_holdings != self.current_holdings
if holdings_changed:
added_symbols = new_holdings - self.current_holdings
removed_symbols = self.current_holdings - new_holdings
maintained_symbols = new_holdings & self.current_holdings
self.log(f"PORTFOLIO_CHANGES:")
self.log(f" Added: {[str(s) for s in added_symbols]}")
self.log(f" Removed: {[str(s) for s in removed_symbols]}")
self.log(f" Maintained: {[str(s) for s in maintained_symbols]}")
# Analyze correlation between selected assets
if len(selected) > 1:
self.analyze_correlations(selected)
# Track volatility of selected assets
self.analyze_volatility(selected)
# Save portfolio composition history
portfolio_record = {
'date': self.time,
'days_since_last_rebalance': self.days_since_rebalance,
'symbols': [str(s) for s in selected],
'momentum_values': [self._momp[s].current.value for s in selected],
'portfolio_value': self.portfolio.total_portfolio_value
}
self.portfolio_history.append(portfolio_record)
self.rebalance_dates.append(self.time)
# Reset tracking variables
self.days_since_rebalance = 0
self.portfolio_value_at_rebalance = self.portfolio.total_portfolio_value
self.daily_returns_since_rebalance = []
optimal_weights = self.optimize_portfolio(selected)
self.target_weights = dict(zip(selected, optimal_weights))
# Log optimal weights
self.log(f"OPTIMAL_WEIGHTS:")
for symbol, weight in self.target_weights.items():
self.log(f" {symbol}: {weight:.4f}")
self.current_holdings = new_holdings
self.adjust_portfolio()
else:
self.log(f"WARNING: No securities selected for portfolio")
self._rebalance = False
self.next_adjustment_date = self.get_next_adjustment_date(self.time)
self.log(f"REBALANCE_END: Next rebalance scheduled for {self.next_adjustment_date}")
def on_securities_changed(self, changes):
# Clean up data for removed securities and Liquidate
for security in changes.RemovedSecurities:
symbol = security.Symbol
if self._momp.pop(symbol, None) is not None:
self.liquidate(symbol, 'Removed from universe')
for security in changes.AddedSecurities:
if security.Symbol not in self._momp:
self._momp[security.Symbol] = MomentumPercent(self._lookback)
# Warm up the indicator with history price if it is not ready
added_symbols = [k for k, v in self._momp.items() if not v.IsReady]
history = self.history(added_symbols, 1 + self._lookback, Resolution.DAILY)
if not history.empty:
history = history.close.unstack(level=0)
for symbol in added_symbols:
ticker = symbol.id.to_string()
if ticker in history:
for time, value in history[ticker].dropna().items():
item = IndicatorDataPoint(symbol, time.date(), value)
self._momp[symbol].update(item)
def analyze_correlations(self, selected_symbols):
"""Analyze correlations between selected assets"""
try:
short_lookback = min(30, self.p_short_lookback) # Use shorter period for correlation analysis
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
if len(returns) > 10: # Need enough data points
corr_matrix = returns.corr()
# Calculate average correlation
correlations = []
for i in range(len(selected_symbols)):
for j in range(i+1, len(selected_symbols)):
symbol1 = selected_symbols[i]
symbol2 = selected_symbols[j]
if symbol1.id.to_string() in corr_matrix.columns and symbol2.id.to_string() in corr_matrix.columns:
corr_val = corr_matrix.loc[symbol1.id.to_string(), symbol2.id.to_string()]
if not np.isnan(corr_val):
correlations.append(corr_val)
if correlations:
avg_correlation = sum(correlations) / len(correlations)
max_correlation = max(correlations)
min_correlation = min(correlations)
self.log(f"CORRELATION_ANALYSIS: Avg: {avg_correlation:.4f} | Max: {max_correlation:.4f} | Min: {min_correlation:.4f}")
# Store correlation data
self.correlation_tracker[self.time] = {
'avg_correlation': avg_correlation,
'max_correlation': max_correlation,
'min_correlation': min_correlation,
'symbols': [str(s) for s in selected_symbols]
}
except Exception as e:
self.log(f"CORRELATION_ERROR: {str(e)}")
def analyze_volatility(self, selected_symbols):
"""Analyze volatility patterns of selected assets"""
try:
short_lookback = min(30, self.p_short_lookback)
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
if len(returns) > 10:
volatilities = []
for symbol in selected_symbols:
symbol_str = symbol.id.to_string()
if symbol_str in returns.columns:
symbol_returns = returns[symbol_str].dropna()
if len(symbol_returns) > 5:
vol = symbol_returns.std() * np.sqrt(252) # Annualized volatility
volatilities.append(vol)
if volatilities:
avg_volatility = sum(volatilities) / len(volatilities)
max_volatility = max(volatilities)
min_volatility = min(volatilities)
self.log(f"VOLATILITY_ANALYSIS: Avg: {avg_volatility:.4f} | Max: {max_volatility:.4f} | Min: {min_volatility:.4f}")
# Store volatility data
self.volatility_tracker[self.time] = {
'avg_volatility': avg_volatility,
'max_volatility': max_volatility,
'min_volatility': min_volatility,
'symbols': [str(s) for s in selected_symbols]
}
except Exception as e:
self.log(f"VOLATILITY_ERROR: {str(e)}")
def optimize_portfolio(self, selected_symbols):
short_lookback = self.p_short_lookback
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
n_assets = len(selected_symbols)
n_portfolios = self.p_n_portfolios
# Log optimization details
self.log(f"OPTIMIZATION_START: Assets: {n_assets} | Portfolios to test: {n_portfolios} | Lookback: {short_lookback}")
results = np.zeros((3, n_portfolios))
weights_record = []
np.random.seed(self.p_rand_seed)
for i in range(n_portfolios):
weights = np.random.random(n_assets)
weights /= np.sum(weights)
portfolio_return = np.sum(returns.mean() * weights) * short_lookback
portfolio_stddev = np.sqrt(np.dot(weights.T, np.dot(returns.cov() * short_lookback, weights)))
downside_stddev = np.sqrt(np.mean(np.minimum(0, returns).apply(lambda x: x**2, axis=0).dot(weights)))
sortino_ratio = portfolio_return / downside_stddev if downside_stddev > 0 else 0
results[0,i] = portfolio_return
results[1,i] = portfolio_stddev
results[2,i] = sortino_ratio
weights_record.append(weights)
best_sortino_idx = np.argmax(results[2])
best_portfolio_return = results[0, best_sortino_idx]
best_portfolio_stddev = results[1, best_sortino_idx]
best_sortino_ratio = results[2, best_sortino_idx]
# Log optimization results
self.log(f"OPTIMIZATION_RESULT: Best Sortino: {best_sortino_ratio:.4f} | Return: {best_portfolio_return:.4f} | StdDev: {best_portfolio_stddev:.4f}")
return weights_record[best_sortino_idx]
def adjust_portfolio(self):
current_symbols = set(self.portfolio.keys())
target_symbols = set(self.target_weights.keys())
self.log(f"PORTFOLIO_ADJUSTMENT_START:")
self.log(f" Current symbols: {[str(s) for s in current_symbols]}")
self.log(f" Target symbols: {[str(s) for s in target_symbols]}")
# Liquidate removed symbols
removed_symbols = current_symbols - target_symbols
for symbol in removed_symbols:
current_holding = self.portfolio[symbol]
self.log(f" LIQUIDATING: {symbol} | Current value: ${current_holding.holdings_value:.2f}")
self.liquidate(symbol)
# Adjust holdings for selected symbols
for symbol, target_weight in self.target_weights.items():
current_weight = self.portfolio[symbol].holdings_value / self.portfolio.total_portfolio_value if symbol in self.portfolio else 0
adjusted_weight = current_weight * (1 - self.adjustment_step) + target_weight * self.adjustment_step
self.log(f" ADJUSTING: {symbol} | Current: {current_weight:.4f} | Target: {target_weight:.4f} | Adjusted: {adjusted_weight:.4f}")
# Track individual performance
if symbol not in self.individual_performance:
self.individual_performance[symbol] = []
self.individual_performance[symbol].append({
'date': self.time,
'weight': adjusted_weight,
'momentum': self._momp[symbol].current.value if symbol in self._momp and self._momp[symbol].is_ready else None,
'price': self.securities[symbol].close if symbol in self.securities else None
})
self.set_holdings(symbol, adjusted_weight)
self.log(f"PORTFOLIO_ADJUSTMENT_COMPLETE: Total value: ${self.portfolio.total_portfolio_value:.2f}")
def get_next_adjustment_date(self, current_date, initial=False):
if self.p_adjustment_frequency == 'weekly':
return current_date + timedelta(days=7)
elif self.p_adjustment_frequency == 'bi-weekly':
return current_date + timedelta(days=14)
elif self.p_adjustment_frequency == 'monthly':
if initial:
next_month = current_date.replace(day=1) + timedelta(days=32)
return next_month.replace(day=1)
next_month = current_date.replace(day=1) + timedelta(days=32)
return next_month.replace(day=1)
else:
raise ValueError(f"Unsupported adjustment frequency: {self.p_adjustment_frequency}")
def analyze_momentum_decay_patterns(self):
"""Analyze momentum decay patterns to find optimal rebalancing period"""
self.log("=" * 80)
self.log("MOMENTUM DECAY ANALYSIS SUMMARY")
self.log("=" * 80)
# Analyze momentum decay for each symbol
decay_patterns = {}
for symbol, momentum_data in self.momentum_decay_tracker.items():
if len(momentum_data) > 5: # Need enough data points
decay_patterns[symbol] = momentum_data
if decay_patterns:
# Calculate average momentum decay over time
max_days = max(len(data) for data in decay_patterns.values())
daily_avg_momentum = {}
for day in range(min(30, max_days)): # Look at first 30 days
day_momentums = []
for symbol_data in decay_patterns.values():
if day < len(symbol_data):
day_momentums.append(symbol_data[day]['momentum'])
if day_momentums:
daily_avg_momentum[day] = {
'avg': sum(day_momentums) / len(day_momentums),
'min': min(day_momentums),
'max': max(day_momentums),
'count': len(day_momentums)
}
self.log("AVERAGE MOMENTUM DECAY BY DAYS SINCE SELECTION:")
for day, stats in daily_avg_momentum.items():
self.log(f"Day {day:2d}: Avg={stats['avg']:7.4f} | Min={stats['min']:7.4f} | Max={stats['max']:7.4f} | N={stats['count']:3d}")
# Find potential optimal rebalancing points
momentum_drops = []
prev_avg = None
for day, stats in daily_avg_momentum.items():
if prev_avg is not None:
drop_pct = (prev_avg - stats['avg']) / prev_avg * 100
momentum_drops.append({'day': day, 'drop_pct': drop_pct})
prev_avg = stats['avg']
# Find significant momentum drops
significant_drops = [d for d in momentum_drops if d['drop_pct'] > 2.0] # More than 2% drop
if significant_drops:
self.log("\nSIGNIFICANT MOMENTUM DROPS (>2%):")
for drop in significant_drops:
self.log(f" Day {drop['day']}: {drop['drop_pct']:.2f}% momentum drop")
# Analyze portfolio performance by days since rebalance
if self.daily_returns_since_rebalance:
self.log("\nPERFORMANCE BY DAYS SINCE REBALANCE:")
performance_by_day = {}
for return_data in self.daily_returns_since_rebalance:
day = return_data['days_since_rebalance']
if day not in performance_by_day:
performance_by_day[day] = []
performance_by_day[day].append(return_data['daily_return'])
for day in sorted(performance_by_day.keys())[:20]: # First 20 days
returns = performance_by_day[day]
avg_return = sum(returns) / len(returns)
self.log(f"Day {day:2d}: Avg daily return = {avg_return*100:6.3f}% (N={len(returns)})")
def on_end_of_algorithm(self):
"""Called at the end of the algorithm to provide comprehensive analysis"""
self.log("=" * 100)
self.log("ALGORITHM COMPLETED - COMPREHENSIVE ANALYSIS")
self.log("=" * 100)
self.log(f"Rebalancing frequency: {self.p_adjustment_frequency}")
self.log(f"Total rebalances: {len(self.rebalance_dates)}")
self.log(f"Final portfolio value: ${self.portfolio.total_portfolio_value:.2f}")
# Analyze momentum decay patterns
self.analyze_momentum_decay_patterns()
# Portfolio composition analysis
self.log("\nPORTFOLIO COMPOSITION HISTORY:")
for i, record in enumerate(self.portfolio_history[-10:]): # Last 10 rebalances
self.log(f"Rebalance {len(self.portfolio_history)-10+i+1}: {record['date'].strftime('%Y-%m-%d')} | "
f"Symbols: {record['symbols']} | "
f"Avg Momentum: {sum(record['momentum_values'])/len(record['momentum_values']):.4f}")
# Correlation analysis summary
if self.correlation_tracker:
avg_correlations = [data['avg_correlation'] for data in self.correlation_tracker.values()]
self.log(f"\nCORRELATION SUMMARY:")
self.log(f" Average correlation across all rebalances: {sum(avg_correlations)/len(avg_correlations):.4f}")
self.log(f" Max average correlation: {max(avg_correlations):.4f}")
self.log(f" Min average correlation: {min(avg_correlations):.4f}")
# Volatility analysis summary
if self.volatility_tracker:
avg_volatilities = [data['avg_volatility'] for data in self.volatility_tracker.values()]
self.log(f"\nVOLATILITY SUMMARY:")
self.log(f" Average volatility across all rebalances: {sum(avg_volatilities)/len(avg_volatilities):.4f}")
self.log(f" Max average volatility: {max(avg_volatilities):.4f}")
self.log(f" Min average volatility: {min(avg_volatilities):.4f}")
self.log("=" * 100)