| Overall Statistics |
|
Total Orders 782 Average Win 1.77% Average Loss -1.34% Compounding Annual Return 31.462% Drawdown 36.500% Expectancy 0.298 Start Equity 1000000 End Equity 4212989.57 Net Profit 321.299% Sharpe Ratio 0.804 Sortino Ratio 0.983 Probabilistic Sharpe Ratio 27.448% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 1.32 Alpha 0.181 Beta 0.622 Annual Standard Deviation 0.295 Annual Variance 0.087 Information Ratio 0.517 Tracking Error 0.283 Treynor Ratio 0.382 Total Fees $23209.16 Estimated Strategy Capacity $77000000.00 Lowest Capacity Asset GE R735QTJ8XC9X Portfolio Turnover 11.78% Drawdown Recovery 835 |
#region imports
from AlgorithmImports import *
import numpy as np
from collections import deque
import statistics as stat
import pickle
from datetime import datetime, timedelta
from scipy.optimize import minimize
#endregion
class Q2PlaygroundAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2019, 3, 1) # Set Start Date
self.set_end_date(2024, 6, 1) # Set End Date
self.set_cash(1000000) # Set Strategy Cash
self.set_security_initializer(BrokerageModelSecurityInitializer(
self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)
))
########################## PARAMETERS ##########################
# self.p_lookback = self.get_parameter("p_lookback", 252)
# self.p_num_coarse = self.get_parameter("p_num_coarse", 200)
# self.p_num_fine = self.get_parameter("p_num_fine", 70)
# self.p_num_long = self.get_parameter("p_num_long", 5)
# self.p_adjustment_step = self.get_parameter("p_adjustment_step", 1.0)
# self.p_n_portfolios = self.get_parameter("p_n_portfolios", 1000)
# self.p_short_lookback = self.get_parameter("p_short_lookback", 63)
# self.p_rand_seed = self.get_parameter("p_rand_seed", 13)
################################################################
self.p_lookback = 252
self.p_num_coarse = 200
self.p_num_fine = 70
self.p_num_long = 4
self.p_adjustment_step = 1.0
self.p_n_portfolios = 1000
self.p_short_lookback = 63
self.p_rand_seed = 13
self.p_adjustment_frequency = 'weekday' # Can be 'monthly', 'weekly', 'bi-weekly'
################################################################
self.universe_settings.resolution = Resolution.DAILY
self._momp = {} # Dict of Momentum indicator keyed by Symbol
self._lookback = self.p_lookback # Momentum indicator lookback period
self._num_coarse = self.p_num_coarse # Number of symbols selected at Coarse Selection
self._num_fine = self.p_num_fine # Number of symbols selected at Fine Selection
self._num_long = self.p_num_long # Number of symbols with open positions
self._rebalance = False
self.current_holdings = set() # To track current holdings
self.target_weights = {} # To store target weights
self.adjustment_step = self.p_adjustment_step # Adjustment step for gradual transition
self.first_trade_date = None
self.next_adjustment_date = None
# Logging variables for momentum analysis
self.momentum_history = {} # Track momentum values over time
self.portfolio_history = [] # Track portfolio composition changes
self.rebalance_dates = [] # Track all rebalancing dates
self.days_since_rebalance = 0
self.portfolio_value_at_rebalance = None
self.daily_returns_since_rebalance = []
self.momentum_decay_tracker = {} # Track how momentum changes after selection
self.correlation_tracker = {} # Track correlations between assets
self.volatility_tracker = {} # Track volatility patterns
self.individual_performance = {} # Track individual asset performance
# Add SPY for market regime detection
self.market_proxy = self.add_equity("SPY", Resolution.DAILY).symbol
# Add GLD for defensive allocation (Gold performs well during market stress)
self.defensive_asset = self.add_equity("GLD", Resolution.DAILY).symbol # SPDR Gold Shares ETF
# Market regime detection parameters
self.regime_indicators = {}
self.current_regime = "MOMENTUM" # "MOMENTUM" or "DEFENSIVE"
self.regime_history = []
self.bear_signal_threshold = 0.45 # Conservative threshold
# Market regime weights (must sum to 1.0)
self.regime_weights = {
'ma_200_signal': 0.30,
'ma_50_signal': 0.25,
'momentum_signal': 0.20,
'volatility_signal': 0.15,
'drawdown_signal': 0.10
}
# Defensive allocation settings
self.defensive_gold_allocation = 0.95 # 95% gold during defensive periods
self.defensive_equity_allocation = 0.05 # 5% equity during defensive periods
self.momentum_allocation = 1.0 # 100% equity during momentum periods
# Track regime for logging
self.last_bear_score = 0.0
# Update strategy info
self.log("STRATEGY_INITIALIZED: Enhanced momentum strategy with Market Regime Detection")
self.log(f"REGIME_DETECTION: Multi-factor bear signal threshold: {self.bear_signal_threshold}")
self.log(f"ALLOCATION_SETTINGS: Momentum={self.momentum_allocation:.0%} equity | Defensive={self.defensive_gold_allocation:.0%} gold + {self.defensive_equity_allocation:.0%} equity")
self.log(f"DEFENSIVE_ASSET: GLD (Gold) for bear market protection")
self.add_universe(self._coarse_selection_function, self._fine_selection_function)
# Schedule daily regime update
self.schedule.on(self.date_rules.every_day(),
self.time_rules.after_market_open(self.market_proxy, 30),
self.update_market_regime)
def update_market_regime(self):
"""Update market regime indicators and determine current regime"""
if self.market_proxy not in self.securities:
return
# Get market proxy price history
history = self.history(self.market_proxy, 250, Resolution.DAILY)
if history.empty:
return
prices = history.loc[self.market_proxy]['close']
current_price = prices.iloc[-1]
# Reset indicators
self.regime_indicators = {}
# 1. Moving Average Signals
if len(prices) >= 200:
ma_200 = prices.tail(200).mean()
self.regime_indicators['ma_200_signal'] = 1 if current_price < ma_200 else 0
if len(prices) >= 50:
ma_50 = prices.tail(50).mean()
self.regime_indicators['ma_50_signal'] = 1 if current_price < ma_50 else 0
# 2. Momentum Signal (multiple timeframes)
momentum_signals = []
for period in [10, 20, 50]:
if len(prices) >= period:
momentum = (current_price / prices.iloc[-period] - 1)
# More aggressive thresholds for different timeframes
threshold = -0.08 if period == 10 else -0.10 if period == 20 else -0.12
momentum_signals.append(1 if momentum < threshold else 0)
if momentum_signals:
self.regime_indicators['momentum_signal'] = max(momentum_signals)
# 3. Volatility Regime
if len(prices) >= 50:
returns = prices.pct_change().dropna()
if len(returns) >= 20:
current_vol = returns.tail(20).std() * np.sqrt(252)
avg_vol = returns.tail(100).std() * np.sqrt(252) if len(returns) >= 100 else current_vol
vol_threshold = avg_vol * 1.8 # 80% above average
self.regime_indicators['volatility_signal'] = 1 if current_vol > vol_threshold else 0
# 4. Drawdown Signal
if len(prices) >= 100:
returns = prices.pct_change().dropna()
cumulative = (1 + returns).cumprod()
rolling_max = cumulative.expanding().max()
current_drawdown = (cumulative.iloc[-1] - rolling_max.iloc[-1]) / rolling_max.iloc[-1]
# More sensitive drawdown threshold
self.regime_indicators['drawdown_signal'] = 1 if current_drawdown < -0.08 else 0
# Calculate composite bear signal
bear_score = 0
total_weight = 0
for indicator, weight in self.regime_weights.items():
if indicator in self.regime_indicators:
bear_score += self.regime_indicators[indicator] * weight
total_weight += weight
if total_weight > 0:
bear_score = bear_score / total_weight
# Update regime with hysteresis to prevent whipsaws
old_regime = self.current_regime
if self.current_regime == "MOMENTUM":
# Require stronger signal to switch to defensive
if bear_score >= self.bear_signal_threshold:
self.current_regime = "DEFENSIVE"
else: # DEFENSIVE
# Require lower threshold to switch back to momentum (hysteresis)
if bear_score <= (self.bear_signal_threshold - 0.1):
self.current_regime = "MOMENTUM"
# Log regime changes
if old_regime != self.current_regime:
self.log(f"REGIME_CHANGE: {old_regime} -> {self.current_regime} | Bear Score: {bear_score:.3f}")
self.log(f" Indicators: MA200={self.regime_indicators.get('ma_200_signal', 0)} "
f"MA50={self.regime_indicators.get('ma_50_signal', 0)} "
f"Mom={self.regime_indicators.get('momentum_signal', 0)} "
f"Vol={self.regime_indicators.get('volatility_signal', 0)} "
f"DD={self.regime_indicators.get('drawdown_signal', 0)}")
# Store regime history
self.regime_history.append({
'date': self.time,
'regime': self.current_regime,
'bear_score': bear_score,
'indicators': self.regime_indicators.copy(),
'spy_price': current_price
})
# Log regime status periodically (every 20 days)
if len(self.regime_history) % 20 == 0:
self.log(f"REGIME_STATUS: {self.current_regime} | Bear Score: {bear_score:.3f} | SPY: ${current_price:.2f}")
self.last_bear_score = bear_score
def _coarse_selection_function(self, coarse):
'''Enhanced coarse selection with regime awareness'''
if self.next_adjustment_date and self.time < self.next_adjustment_date:
return Universe.UNCHANGED
self._rebalance = True
if not self.first_trade_date:
self.first_trade_date = self.time
self.next_adjustment_date = self.get_next_adjustment_date(self.time)
self._rebalance = True
# In defensive regime, be more selective (higher price threshold)
min_price = 8.0 if self.current_regime == "DEFENSIVE" else 5.0
selected = sorted([x for x in coarse if x.has_fundamental_data and x.price > min_price],
key=lambda x: x.dollar_volume, reverse=True)
return [x.symbol for x in selected[:self._num_coarse]]
def _fine_selection_function(self, fine):
'''Enhanced fine selection with regime awareness'''
# In defensive regime, prefer larger, more stable companies
if self.current_regime == "DEFENSIVE":
# Sort by market cap and select more conservative stocks
selected = sorted(fine, key=lambda f: f.market_cap, reverse=True)
return [x.symbol for x in selected[:int(self._num_fine * 0.8)]] # Be more selective
else:
# Normal momentum selection
selected = sorted(fine, key=lambda f: f.market_cap, reverse=True)
return [x.symbol for x in selected[:self._num_fine]]
def on_data(self, data):
# Update the indicator
for symbol, mom in self._momp.items():
mom.update(self.time, self.securities[symbol].close)
# Track momentum history for analysis
if symbol not in self.momentum_history:
self.momentum_history[symbol] = []
if mom.is_ready:
self.momentum_history[symbol].append({
'date': self.time,
'momentum': mom.current.value,
'price': self.securities[symbol].close
})
# Track daily performance since last rebalance
if self.portfolio_value_at_rebalance is not None:
current_value = self.portfolio.total_portfolio_value
daily_return = (current_value - self.portfolio_value_at_rebalance) / self.portfolio_value_at_rebalance
self.daily_returns_since_rebalance.append({
'date': self.time,
'days_since_rebalance': self.days_since_rebalance,
'daily_return': daily_return,
'portfolio_value': current_value
})
self.days_since_rebalance += 1
# Check if empty portfolio and set first_trade_date
if not self.portfolio.invested and not self.first_trade_date:
self.first_trade_date = self.time
self.next_adjustment_date = self.get_next_adjustment_date(self.time, initial=True)
self._rebalance = True
self.log(f"FIRST_TRADE_SETUP: Setting first trade date to {self.time}")
if not self._rebalance:
# Log momentum decay even when not rebalancing
if len(self.current_holdings) > 0:
current_momentum_values = []
for symbol in self.current_holdings:
if symbol in self._momp and self._momp[symbol].is_ready:
momentum_val = self._momp[symbol].current.value
current_momentum_values.append(momentum_val)
# Track momentum decay for current holdings
if symbol not in self.momentum_decay_tracker:
self.momentum_decay_tracker[symbol] = []
self.momentum_decay_tracker[symbol].append({
'date': self.time,
'days_since_selection': self.days_since_rebalance,
'momentum': momentum_val
})
if current_momentum_values:
avg_momentum = sum(current_momentum_values) / len(current_momentum_values)
min_momentum = min(current_momentum_values)
max_momentum = max(current_momentum_values)
# Check for momentum exhaustion signals
momentum_spread = max_momentum - min_momentum
momentum_decline_threshold = 0.02 # 2% decline from initial
if self.days_since_rebalance > 0:
# Get initial momentum values (first day after rebalance)
initial_momentums = []
for symbol in self.current_holdings:
if symbol in self.momentum_decay_tracker and len(self.momentum_decay_tracker[symbol]) > 0:
initial_momentum = self.momentum_decay_tracker[symbol][0]['momentum']
initial_momentums.append(initial_momentum)
if initial_momentums:
avg_initial_momentum = sum(initial_momentums) / len(initial_momentums)
momentum_decline = (avg_initial_momentum - avg_momentum) / avg_initial_momentum
if momentum_decline > momentum_decline_threshold:
self.log(f"MOMENTUM_EXHAUSTION_SIGNAL: Day {self.days_since_rebalance} | "
f"Decline: {momentum_decline*100:.2f}% | "
f"Current Avg: {avg_momentum:.4f} | "
f"Initial Avg: {avg_initial_momentum:.4f}")
self.log(f"MOMENTUM_TRACK: Day {self.days_since_rebalance} | "
f"Avg: {avg_momentum:.4f} | Min: {min_momentum:.4f} | "
f"Max: {max_momentum:.4f} | Spread: {momentum_spread:.4f} | "
f"Holdings: {len(self.current_holdings)}")
return
# ENHANCED REBALANCING WITH REGIME AWARENESS
self.log(f"REBALANCE_START: Date {self.time} | Regime: {self.current_regime} | Days since last: {self.days_since_rebalance}")
self.log(f" Bear Score: {self.last_bear_score:.3f} | Threshold: {self.bear_signal_threshold}")
if self.current_regime == "DEFENSIVE":
# DEFENSIVE MODE: Allocate to gold + small equity exposure
self.log(f"DEFENSIVE_MODE: Allocating {self.defensive_gold_allocation*100:.1f}% to gold, {self.defensive_equity_allocation*100:.1f}% to equity")
# Liquidate all current equity positions first
for symbol in list(self.portfolio.keys()):
if self.portfolio[symbol].invested and symbol != self.defensive_asset:
self.liquidate(symbol, "Defensive regime - reallocating to gold")
# Allocate to GLD (Gold)
gold_allocation = self.defensive_gold_allocation
self.set_holdings(self.defensive_asset, gold_allocation)
self.log(f"GOLD_ALLOCATION: GLD at {gold_allocation*100:.1f}%")
if self.defensive_asset in self._momp and self._momp[self.defensive_asset].is_ready:
self.log(f" Gold momentum: {self._momp[self.defensive_asset].current.value:.4f}")
# Small equity allocation to top momentum stock
sorted_mom = sorted([k for k,v in self._momp.items() if v.is_ready and k != self.defensive_asset and k != self.market_proxy],
key=lambda x: self._momp[x].current.value, reverse=True)
if sorted_mom:
top_momentum = sorted_mom[0]
equity_allocation = self.defensive_equity_allocation
self.set_holdings(top_momentum, equity_allocation)
self.log(f"DEFENSIVE_EQUITY: {top_momentum} at {equity_allocation*100:.1f}% allocation")
self.log(f" Equity momentum: {self._momp[top_momentum].current.value:.4f}")
self.current_holdings = {top_momentum}
else:
self.current_holdings = set()
self.log(f"DEFENSIVE_MODE: No suitable equity - gold only")
# Track defensive period
portfolio_record = {
'date': self.time,
'days_since_last_rebalance': self.days_since_rebalance,
'regime': 'DEFENSIVE',
'equity_symbols': [str(s) for s in self.current_holdings] if self.current_holdings else [],
'gold_allocation': self.defensive_gold_allocation,
'equity_allocation': self.defensive_equity_allocation,
'bear_score': self.last_bear_score,
'portfolio_value': self.portfolio.total_portfolio_value
}
else:
# MOMENTUM MODE: Full momentum strategy
sorted_mom = sorted([k for k,v in self._momp.items() if v.is_ready],
key=lambda x: self._momp[x].current.value, reverse=True)
selected = sorted_mom[:self._num_long]
new_holdings = set(selected)
self.log(f"MOMENTUM_MODE: Full momentum strategy with {len(selected)} assets")
# Log top momentum candidates
top_candidates = sorted_mom[:15] # Look at top 15 to see what we're choosing from
self.log(f"TOP_MOMENTUM_CANDIDATES:")
for i, symbol in enumerate(top_candidates):
momentum_val = self._momp[symbol].current.value
is_selected = symbol in selected
status = "SELECTED" if is_selected else "NOT_SELECTED"
self.log(f" Rank {i+1}: {symbol} | Momentum: {momentum_val:.4f} | {status}")
# Only rebalance if the new selection is different from current holdings
if new_holdings != self.current_holdings or self.first_trade_date == self.time:
if len(selected) > 0:
# Log portfolio composition changes
holdings_changed = new_holdings != self.current_holdings
if holdings_changed:
added_symbols = new_holdings - self.current_holdings
removed_symbols = self.current_holdings - new_holdings
maintained_symbols = new_holdings & self.current_holdings
self.log(f"PORTFOLIO_CHANGES:")
self.log(f" Added: {[str(s) for s in added_symbols]}")
self.log(f" Removed: {[str(s) for s in removed_symbols]}")
self.log(f" Maintained: {[str(s) for s in maintained_symbols]}")
# Analyze correlation between selected assets
if len(selected) > 1:
self.analyze_correlations(selected)
# Track volatility of selected assets
self.analyze_volatility(selected)
optimal_weights = self.optimize_portfolio(selected)
self.target_weights = dict(zip(selected, optimal_weights))
# Log optimal weights
self.log(f"OPTIMAL_WEIGHTS:")
for symbol, weight in self.target_weights.items():
self.log(f" {symbol}: {weight:.4f}")
self.current_holdings = new_holdings
self.adjust_portfolio()
else:
self.log(f"WARNING: No securities selected for momentum portfolio")
# Track momentum period
portfolio_record = {
'date': self.time,
'days_since_last_rebalance': self.days_since_rebalance,
'regime': 'MOMENTUM',
'symbols': [str(s) for s in selected],
'momentum_values': [self._momp[s].current.value for s in selected] if selected else [],
'bear_score': self.last_bear_score,
'portfolio_value': self.portfolio.total_portfolio_value
}
# Common tracking for both regimes
self.portfolio_history.append(portfolio_record)
self.rebalance_dates.append(self.time)
# Reset tracking variables
self.days_since_rebalance = 0
self.portfolio_value_at_rebalance = self.portfolio.total_portfolio_value
self.daily_returns_since_rebalance = []
self._rebalance = False
self.next_adjustment_date = self.get_next_adjustment_date(self.time)
self.log(f"REBALANCE_END: Next rebalance scheduled for {self.next_adjustment_date}")
def on_securities_changed(self, changes):
# Clean up data for removed securities and Liquidate
for security in changes.RemovedSecurities:
symbol = security.Symbol
if self._momp.pop(symbol, None) is not None:
self.liquidate(symbol, 'Removed from universe')
for security in changes.AddedSecurities:
if security.Symbol not in self._momp:
self._momp[security.Symbol] = MomentumPercent(self._lookback)
# Warm up the indicator with history price for all securities (including bonds)
added_symbols = [k for k, v in self._momp.items() if not v.IsReady]
history = self.history(added_symbols, 1 + self._lookback, Resolution.DAILY)
if not history.empty:
history = history.close.unstack(level=0)
for symbol in added_symbols:
ticker = symbol.id.to_string()
if ticker in history:
for time, value in history[ticker].dropna().items():
item = IndicatorDataPoint(symbol, time.date(), value)
self._momp[symbol].update(item)
def analyze_correlations(self, selected_symbols):
"""Analyze correlations between selected assets"""
try:
short_lookback = min(30, self.p_short_lookback) # Use shorter period for correlation analysis
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
if len(returns) > 10: # Need enough data points
corr_matrix = returns.corr()
# Calculate average correlation
correlations = []
for i in range(len(selected_symbols)):
for j in range(i+1, len(selected_symbols)):
symbol1 = selected_symbols[i]
symbol2 = selected_symbols[j]
if symbol1.id.to_string() in corr_matrix.columns and symbol2.id.to_string() in corr_matrix.columns:
corr_val = corr_matrix.loc[symbol1.id.to_string(), symbol2.id.to_string()]
if not np.isnan(corr_val):
correlations.append(corr_val)
if correlations:
avg_correlation = sum(correlations) / len(correlations)
max_correlation = max(correlations)
min_correlation = min(correlations)
self.log(f"CORRELATION_ANALYSIS: Avg: {avg_correlation:.4f} | Max: {max_correlation:.4f} | Min: {min_correlation:.4f}")
# Store correlation data
self.correlation_tracker[self.time] = {
'avg_correlation': avg_correlation,
'max_correlation': max_correlation,
'min_correlation': min_correlation,
'symbols': [str(s) for s in selected_symbols]
}
except Exception as e:
self.log(f"CORRELATION_ERROR: {str(e)}")
def analyze_volatility(self, selected_symbols):
"""Analyze volatility patterns of selected assets"""
try:
short_lookback = min(30, self.p_short_lookback)
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
if len(returns) > 10:
volatilities = []
for symbol in selected_symbols:
symbol_str = symbol.id.to_string()
if symbol_str in returns.columns:
symbol_returns = returns[symbol_str].dropna()
if len(symbol_returns) > 5:
vol = symbol_returns.std() * np.sqrt(252) # Annualized volatility
volatilities.append(vol)
if volatilities:
avg_volatility = sum(volatilities) / len(volatilities)
max_volatility = max(volatilities)
min_volatility = min(volatilities)
self.log(f"VOLATILITY_ANALYSIS: Avg: {avg_volatility:.4f} | Max: {max_volatility:.4f} | Min: {min_volatility:.4f}")
# Store volatility data
self.volatility_tracker[self.time] = {
'avg_volatility': avg_volatility,
'max_volatility': max_volatility,
'min_volatility': min_volatility,
'symbols': [str(s) for s in selected_symbols]
}
except Exception as e:
self.log(f"VOLATILITY_ERROR: {str(e)}")
def optimize_portfolio(self, selected_symbols):
short_lookback = self.p_short_lookback
returns = self.history(selected_symbols, short_lookback, Resolution.DAILY)['close'].unstack(level=0).pct_change().dropna()
n_assets = len(selected_symbols)
# Log optimization details
self.log(f"OPTIMIZATION_START: Assets: {n_assets} | Method: SLSQP | Lookback: {short_lookback}")
# Calculate mean returns and covariance matrix
mean_returns = returns.mean().values
cov_matrix = returns.cov().values
# Define objective function: negative Sortino ratio (to minimize)
def negative_sortino_ratio(weights):
# Portfolio return
portfolio_return = np.sum(mean_returns * weights) * short_lookback
# Downside deviation (negative returns only)
portfolio_returns = returns.dot(weights)
downside_returns = portfolio_returns[portfolio_returns < 0]
if len(downside_returns) == 0:
downside_deviation = 1e-8 # Small value to avoid division by zero
else:
downside_deviation = np.sqrt(np.mean(downside_returns**2)) * np.sqrt(short_lookback)
# Sortino ratio
sortino_ratio = portfolio_return / downside_deviation if downside_deviation > 0 else 0
# Return negative for minimization
return -sortino_ratio
# Initial weights (equal allocation)
x0 = np.ones(n_assets) / n_assets
# Constraints: weights sum to 1
constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
# Bounds: each weight between 0 and 1 (long-only)
bounds = [(0, 1) for _ in range(n_assets)]
# Optimize using SLSQP
try:
result = minimize(
negative_sortino_ratio,
x0,
method='SLSQP',
bounds=bounds,
constraints=constraints,
options={'maxiter': 1000, 'ftol': 1e-6}
)
if result.success:
optimal_weights = result.x
# Calculate final metrics for logging
final_sortino = -result.fun
portfolio_return = np.sum(mean_returns * optimal_weights) * short_lookback
portfolio_std = np.sqrt(np.dot(optimal_weights.T, np.dot(cov_matrix * short_lookback, optimal_weights)))
self.log(f"OPTIMIZATION_RESULT: Sortino: {final_sortino:.4f} | Return: {portfolio_return:.4f} | StdDev: {portfolio_std:.4f}")
return optimal_weights
else:
self.log(f"OPTIMIZATION_FAILED: {result.message} - Using equal weights")
return x0
except Exception as e:
self.log(f"OPTIMIZATION_ERROR: {str(e)} - Using equal weights")
return x0
def adjust_portfolio(self):
current_symbols = set(self.portfolio.keys())
target_symbols = set(self.target_weights.keys())
self.log(f"PORTFOLIO_ADJUSTMENT_START:")
self.log(f" Current symbols: {[str(s) for s in current_symbols]}")
self.log(f" Target symbols: {[str(s) for s in target_symbols]}")
# Liquidate removed symbols (exclude gold during defensive periods)
removed_symbols = current_symbols - target_symbols
for symbol in removed_symbols:
# Don't liquidate gold during defensive periods unless explicitly targeted
if self.current_regime == "DEFENSIVE" and symbol == self.defensive_asset:
continue
current_holding = self.portfolio[symbol]
self.log(f" LIQUIDATING: {symbol} | Current value: ${current_holding.holdings_value:.2f}")
self.liquidate(symbol)
# Adjust holdings for selected symbols
for symbol, target_weight in self.target_weights.items():
current_weight = self.portfolio[symbol].holdings_value / self.portfolio.total_portfolio_value if symbol in self.portfolio else 0
adjusted_weight = current_weight * (1 - self.adjustment_step) + target_weight * self.adjustment_step
self.log(f" ADJUSTING: {symbol} | Current: {current_weight:.4f} | Target: {target_weight:.4f} | Adjusted: {adjusted_weight:.4f}")
# Track individual performance
if symbol not in self.individual_performance:
self.individual_performance[symbol] = []
self.individual_performance[symbol].append({
'date': self.time,
'weight': adjusted_weight,
'momentum': self._momp[symbol].current.value if symbol in self._momp and self._momp[symbol].is_ready else None,
'price': self.securities[symbol].close if symbol in self.securities else None
})
self.set_holdings(symbol, adjusted_weight)
self.log(f"PORTFOLIO_ADJUSTMENT_COMPLETE: Total value: ${self.portfolio.total_portfolio_value:.2f}")
# Log current allocation breakdown
equity_value = sum(self.portfolio[symbol].holdings_value for symbol in self.portfolio.keys()
if symbol != self.defensive_asset and symbol != self.market_proxy and self.portfolio[symbol].invested)
gold_value = self.portfolio[self.defensive_asset].holdings_value if self.portfolio[self.defensive_asset].invested else 0
cash_value = self.portfolio.cash
total_value = self.portfolio.total_portfolio_value
self.log(f"ALLOCATION_BREAKDOWN: Equity: ${equity_value:.0f} ({equity_value/total_value*100:.1f}%) | "
f"Gold: ${gold_value:.0f} ({gold_value/total_value*100:.1f}%) | "
f"Cash: ${cash_value:.0f} ({cash_value/total_value*100:.1f}%)")
def get_next_adjustment_date(self, current_date, initial=False):
if self.p_adjustment_frequency == 'weekday':
return current_date + timedelta(days=5)
elif self.p_adjustment_frequency == 'bi-weekly':
return current_date + timedelta(days=14)
elif self.p_adjustment_frequency == 'monthly':
if initial:
next_month = current_date.replace(day=1) + timedelta(days=32)
return next_month.replace(day=1)
next_month = current_date.replace(day=1) + timedelta(days=32)
return next_month.replace(day=1)
else:
raise ValueError(f"Unsupported adjustment frequency: {self.p_adjustment_frequency}")
def analyze_momentum_decay_patterns(self):
"""Analyze momentum decay patterns to find optimal rebalancing period"""
self.log("=" * 80)
self.log("MOMENTUM DECAY ANALYSIS SUMMARY")
self.log("=" * 80)
# Analyze momentum decay for each symbol
decay_patterns = {}
for symbol, momentum_data in self.momentum_decay_tracker.items():
if len(momentum_data) > 5: # Need enough data points
decay_patterns[symbol] = momentum_data
if decay_patterns:
# Calculate average momentum decay over time
max_days = max(len(data) for data in decay_patterns.values())
daily_avg_momentum = {}
for day in range(min(30, max_days)): # Look at first 30 days
day_momentums = []
for symbol_data in decay_patterns.values():
if day < len(symbol_data):
day_momentums.append(symbol_data[day]['momentum'])
if day_momentums:
daily_avg_momentum[day] = {
'avg': sum(day_momentums) / len(day_momentums),
'min': min(day_momentums),
'max': max(day_momentums),
'count': len(day_momentums)
}
self.log("AVERAGE MOMENTUM DECAY BY DAYS SINCE SELECTION:")
for day, stats in daily_avg_momentum.items():
self.log(f"Day {day:2d}: Avg={stats['avg']:7.4f} | Min={stats['min']:7.4f} | Max={stats['max']:7.4f} | N={stats['count']:3d}")
# Find potential optimal rebalancing points
momentum_drops = []
prev_avg = None
for day, stats in daily_avg_momentum.items():
if prev_avg is not None:
drop_pct = (prev_avg - stats['avg']) / prev_avg * 100
momentum_drops.append({'day': day, 'drop_pct': drop_pct})
prev_avg = stats['avg']
# Find significant momentum drops
significant_drops = [d for d in momentum_drops if d['drop_pct'] > 2.0] # More than 2% drop
if significant_drops:
self.log("\nSIGNIFICANT MOMENTUM DROPS (>2%):")
for drop in significant_drops:
self.log(f" Day {drop['day']}: {drop['drop_pct']:.2f}% momentum drop")
# Analyze portfolio performance by days since rebalance
if self.daily_returns_since_rebalance:
self.log("\nPERFORMANCE BY DAYS SINCE REBALANCE:")
performance_by_day = {}
for return_data in self.daily_returns_since_rebalance:
day = return_data['days_since_rebalance']
if day not in performance_by_day:
performance_by_day[day] = []
performance_by_day[day].append(return_data['daily_return'])
for day in sorted(performance_by_day.keys())[:20]: # First 20 days
returns = performance_by_day[day]
avg_return = sum(returns) / len(returns)
self.log(f"Day {day:2d}: Avg daily return = {avg_return*100:6.3f}% (N={len(returns)})")
def on_end_of_algorithm(self):
"""Enhanced end analysis including regime performance"""
self.log("=" * 100)
self.log("ALGORITHM COMPLETED - COMPREHENSIVE ANALYSIS WITH REGIME DETECTION")
self.log("=" * 100)
self.log(f"Rebalancing frequency: {self.p_adjustment_frequency}")
self.log(f"Total rebalances: {len(self.rebalance_dates)}")
self.log(f"Final portfolio value: ${self.portfolio.total_portfolio_value:.2f}")
# Regime analysis
if self.regime_history:
momentum_days = sum(1 for r in self.regime_history if r['regime'] == 'MOMENTUM')
defensive_days = sum(1 for r in self.regime_history if r['regime'] == 'DEFENSIVE')
total_days = len(self.regime_history)
self.log(f"\nREGIME ANALYSIS:")
self.log(f" Total trading days: {total_days}")
self.log(f" Momentum days: {momentum_days} ({momentum_days/total_days*100:.1f}%)")
self.log(f" Defensive days: {defensive_days} ({defensive_days/total_days*100:.1f}%)")
# Find regime change periods
regime_changes = []
prev_regime = None
for record in self.regime_history:
if prev_regime and prev_regime != record['regime']:
regime_changes.append(record)
prev_regime = record['regime']
self.log(f" Total regime changes: {len(regime_changes)}")
self.log(f" Recent regime changes:")
for change in regime_changes[-5:]: # Last 5 changes
self.log(f" {change['date'].strftime('%Y-%m-%d')}: -> {change['regime']} (Score: {change['bear_score']:.3f})")
# Analyze momentum decay patterns
self.analyze_momentum_decay_patterns()
# Portfolio composition analysis with regime context
self.log("\nPORTFOLIO COMPOSITION HISTORY (with Regime):")
for i, record in enumerate(self.portfolio_history[-10:]): # Last 10 rebalances
regime = record.get('regime', 'UNKNOWN')
bear_score = record.get('bear_score', 0.0)
if regime == 'DEFENSIVE':
equity_symbols = record.get('equity_symbols', [])
gold_alloc = record.get('gold_allocation', 0.0)
equity_alloc = record.get('equity_allocation', 0.0)
self.log(f"Rebalance {len(self.portfolio_history)-10+i+1}: {record['date'].strftime('%Y-%m-%d')} | "
f"Regime: {regime} | Bear Score: {bear_score:.3f} | "
f"Gold: {gold_alloc*100:.0f}% | "
f"Equity: {equity_symbols} ({equity_alloc*100:.0f}%)")
elif 'momentum_values' in record and record['momentum_values']:
avg_momentum = sum(record['momentum_values'])/len(record['momentum_values'])
symbols = record.get('symbols', [])
self.log(f"Rebalance {len(self.portfolio_history)-10+i+1}: {record['date'].strftime('%Y-%m-%d')} | "
f"Regime: {regime} | Bear Score: {bear_score:.3f} | "
f"Symbols: {symbols} | Avg Momentum: {avg_momentum:.4f}")
else:
symbols = record.get('symbols', [])
self.log(f"Rebalance {len(self.portfolio_history)-10+i+1}: {record['date'].strftime('%Y-%m-%d')} | "
f"Regime: {regime} | Bear Score: {bear_score:.3f} | "
f"Symbols: {symbols}")
# Correlation analysis summary
if self.correlation_tracker:
avg_correlations = [data['avg_correlation'] for data in self.correlation_tracker.values()]
self.log(f"\nCORRELATION SUMMARY:")
self.log(f" Average correlation across all rebalances: {sum(avg_correlations)/len(avg_correlations):.4f}")
self.log(f" Max average correlation: {max(avg_correlations):.4f}")
self.log(f" Min average correlation: {min(avg_correlations):.4f}")
# Volatility analysis summary
if self.volatility_tracker:
avg_volatilities = [data['avg_volatility'] for data in self.volatility_tracker.values()]
self.log(f"\nVOLATILITY SUMMARY:")
self.log(f" Average volatility across all rebalances: {sum(avg_volatilities)/len(avg_volatilities):.4f}")
self.log(f" Max average volatility: {max(avg_volatilities):.4f}")
self.log(f" Min average volatility: {min(avg_volatilities):.4f}")
self.log("=" * 100)