| Overall Statistics |
|
Total Orders 3654 Average Win 8.87% Average Loss -5.48% Compounding Annual Return 88.137% Drawdown 86.500% Expectancy 0.147 Start Equity 100000 End Equity 2066281.43 Net Profit 1966.281% Sharpe Ratio 0.902 Sortino Ratio 8.999 Probabilistic Sharpe Ratio 0.065% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 1.62 Alpha 2.382 Beta 0.23 Annual Standard Deviation 2.666 Annual Variance 7.106 Information Ratio 0.866 Tracking Error 2.669 Treynor Ratio 10.433 Total Fees $4277.71 Estimated Strategy Capacity $0 Lowest Capacity Asset FB V6OIPNZEM8V9 Portfolio Turnover 4.85% |
import numpy as np
from AlgorithmImports import *
class AssetWeightCalculator:
def __init__(self, algorithm: QCAlgorithm):
self.algorithm = algorithm
self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR)
def coarse_selection(self, coarse):
"""
Selects stonks, first filter
"""
# Sorts by dollar volume before taking top 200
sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data],
key=lambda x: x.dollar_volume,
reverse=True)
return [x.symbol for x in sorted_by_volume][:200]
def fine_selection(self, fine):
"""
Selects stonks, second filter
"""
filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9]
self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters")
# Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main
ranked_symbols = self.rank_stocks(filtered)
return ranked_symbols
def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days
"""
Calculates the sharpe
"""
try:
# If a KeyValuePair was recieved only take the symbol
if hasattr(symbol, "Key"):
symbol = symbol.Key
history = self.algorithm.history([symbol], period, Resolution.HOUR)
if history.empty:
self.algorithm.debug(f"No history for {symbol.value}")
return None
# Get risk-free rate
rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR)
risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data
# Sharpe ratio logic
returns = history['close'].pct_change().dropna()
excess_returns = returns - (risk_free_rate/1638)
mean_excess_return = excess_returns.mean() * 1638
std_dev = excess_returns.std() * np.sqrt(1638)
return mean_excess_return / std_dev if std_dev != 0 else None
except Exception as e:
self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}")
return None
def rank_stocks(self, symbols):
"""
Ranks da top 50 stocks based on sharpe
"""
if not symbols:
self.algorithm.debug("No symbols to rank")
return []
self.algorithm.debug(f"Ranking {len(symbols)} symbols")
# Converting from key pair if neccessary
symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols]
scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols}
valid_scores = {k: v for k, v in scores.items() if v is not None}
self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}")
if not valid_scores:
return []
sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20]
self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}")
self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}")
return sorted_scores
def normalize_scores(self, scores):
"""
The list of scores from the ranking method are
normalized using a z score so that an additive
operation may be used in WeightCombiner()
"""
values = np.array(list(scores.values()))
mean = np.mean(values)
std_dev = np.std(values)
if std_dev == 0:
# If no variation in scores, assign equal normalized scores
return {symbol: 0 for symbol in scores.keys()}
normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()}
print(normalized_scores) #To see output for debugging
return normalized_scores
from AlgorithmImports import *
class MACDSignalGenerator:
def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05):
self.algorithm = algorithm
self.symbols = symbols
self.cash_buffer = cash_buffer
self.macd_indicators = {} # {symbol: {variant: MACD}}
# Define MACD parameters for different variants
self.macd_variants = {
"slow": {"fast": 12, "slow": 26, "signal": 9},
"slow-med": {"fast": 9, "slow": 19, "signal": 5},
"med-fast": {"fast": 7, "slow": 15, "signal": 3},
"fast": {"fast": 5, "slow": 12, "signal": 2},
}
def remove_symbols(self, symbols: list):
"""
Removes MACD indicators for the specified symbols.
"""
for symbol in symbols:
# Liquidate position before removing indicator
self.algorithm.liquidate(symbol)
# Unregister and delete indicators tied to each symbol
if symbol in self.macd_indicators:
for macd in self.macd_indicators[symbol].values(): # Better: gets MACD objects directly
self.algorithm.unregister_indicator(macd)
del self.macd_indicators[symbol]
def add_symbols(self, new_symbols):
"""
Add in the new symbols that are given by AssetWeightCalculator.
"""
# Log initial attempt
self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}")
# Get historical data for new symbols
history = self.algorithm.history([s for s in new_symbols],
35, # Longest MACD period needed
Resolution.HOUR)
# Log history data availability
self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}")
self.symbols.extend(new_symbols)
for symbol in new_symbols:
security = self.algorithm.securities[symbol]
# Detailed security check logging
# self.algorithm.debug(f"Security {symbol.value} check:"
# f" has_data={security.has_data},"
# f" is_tradable={security.is_tradable},"
# f" price={security.price}")
# Checking if price is 0
if not (security.has_data and security.is_tradable and security.price > 0):
self.algorithm.debug(f"Waiting for valid price data: {symbol.value}")
continue
# Adding the symbol
if symbol not in self.macd_indicators:
self.macd_indicators[symbol] = {}
# Get symbol's historical data
if symbol not in history.index.get_level_values(0):
self.algorithm.debug(f"No history data for: {symbol.value}")
continue
symbol_history = history.loc[symbol]
self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}")
for variant, params in self.macd_variants.items():
macd = self.algorithm.macd(
symbol=symbol,
fast_period=params["fast"],
slow_period=params["slow"],
signal_period=params["signal"],
type=MovingAverageType.EXPONENTIAL,
resolution=Resolution.HOUR,
selector=Field.CLOSE
)
self.macd_indicators[symbol][variant] = macd
# Warm up MACD with historical data
for time, row in symbol_history.iterrows():
macd.update(time, row['close'])
self.macd_indicators[symbol][variant] = macd
def calculate_position_sizes(self):
position_sizes = {}
max_position_limit = 0.1
# Check if we have any symbols to process
if not self.symbols or not self.macd_indicators:
self.algorithm.debug("No symbols available for position calculation")
return position_sizes
# Calculating the maximum one variant can be in size
max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants))
for symbol in self.macd_indicators:
position_sizes[symbol] = {}
for variant, macd in self.macd_indicators[symbol].items():
if macd.is_ready:
security = self.algorithm.securities[symbol]
# Detailed security check logging
# self.algorithm.debug(f"Position Check for {symbol.value}:"
# f" has_data={security.has_data},"
# f" is_tradable={security.is_tradable},"
# f" price={security.price},"
# f" last_data={security.get_last_data() is not None},")
# More comprehensive check
# if not (security.has_data and
# security.is_tradable and
# security.price > 0 and
# security.get_last_data() is not None):
# self.algorithm.debug(f"Security not ready: {symbol.value}")
# continue
# Distance between fast and slow
distance = macd.fast.current.value - macd.slow.current.value
# Normalize the distance as a percentage difference and then as a fraction of max position
position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting
# Only allow positive positions, cap at maximum
position_size = max(0, min(position_size, max_position_limit))
position_sizes[symbol][variant] = position_size
#self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}")
else:
position_sizes[symbol][variant] = 0
# Running daily cause the logging is too heavy hourly
if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0:
rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()]
#self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}")
return position_sizes# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from strategy import KQTStrategy
def calculate_ema(prices, span):
# Using adjust=False mimics how many trading platforms calculate EMA
return pd.Series(prices).ewm(span=span, adjust=False).mean().values
def calculate_rsi(prices, period=14):
"""Calculate Relative Strength Index"""
deltas = np.diff(prices)
seed = deltas[:period+1]
up = seed[seed >= 0].sum()/period
down = -seed[seed < 0].sum()/period
if down == 0: return 100 # No downward movement means RSI == 100
rs = up/down
rsi = np.zeros_like(prices)
rsi[:period] = 100. - 100./(1. + rs)
for i in range(period, len(prices)):
delta = deltas[i-1]
if delta > 0:
upval = delta
downval = 0.
else:
upval = 0.
downval = -delta
up = (up * (period-1) + upval) / period
down = (down * (period-1) + downval) / period
rs = up/down if down != 0 else float('inf')
rsi[i] = 100. - 100./(1. + rs)
return rsi
def calculate_macd(prices, fast=12, slow=26, signal=9):
"""Calculate MACD line and histogram"""
# Convert to numpy array if not already
prices = np.array(prices)
# Calculate EMAs
ema_fast = pd.Series(prices).ewm(span=fast, adjust=False).mean().values
ema_slow = pd.Series(prices).ewm(span=slow, adjust=False).mean().values
# Calculate MACD line and signal line
macd_line = ema_fast - ema_slow
signal_line = pd.Series(macd_line).ewm(span=signal, adjust=False).mean().values
# Calculate histogram
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def calculate_atr(high, low, close, period=14):
"""Calculate Average True Range"""
if len(high) != len(low) or len(high) != len(close):
raise ValueError("Input arrays must have the same length")
tr = np.zeros(len(high))
tr[0] = high[0] - low[0] # Initial TR = high - low of first bar
for i in range(1, len(tr)):
tr[i] = max(
high[i] - low[i],
abs(high[i] - close[i-1]),
abs(low[i] - close[i-1])
)
# Calculate ATR
atr = np.zeros_like(tr)
atr[0] = tr[0]
for i in range(1, len(atr)):
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period
return atr
class KQTAlgorithm(QCAlgorithm):
def Initialize(self):
"""Initialize the algorithm"""
# Set start date, end date, and cash
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2024, 12, 31)
self.SetCash(1000000)
self.previous_portfolio_value = 0
# Set benchmark to SPY
self.SetBenchmark("SPY")
# Initialize the KQT strategy
self.strategy = KQTStrategy()
self.lookback = 60 # Need enough data for technical indicators
self.tickers = []
self.symbols = {}
self.sector_mappings = {}
self.strategy.sector_mappings = self.sector_mappings # Share the dictionary
self._universe = self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
# Add SPY for market data
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
# Storage for historical data and predictions
self.stock_data = {}
self.current_predictions = {}
self.previous_positions = {}
# Track stopped out positions
self.stopped_out = set()
# Schedule the trading function to run before market close
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(10, 0), # 10:00 AM Eastern
self.TradeExecute)
# Initialize model with feature count
feature_count = 18 # Adjust based on your actual feature count
# Load pre-trained model weights if available
#self.TryLoadModelWeights()
def CoarseSelectionFunction(self, coarse):
sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sorted_by_dollar_volume[:500]]
def FineSelectionFunction(self, fine):
sorted_by_market_cap = sorted(fine, key=lambda x: x.MarketCap, reverse=True)
selected = sorted_by_market_cap[:100]
# Debug the first item to understand available properties
if len(selected) > 0:
f = selected[0]
self.Debug(f"Available fundamental properties: {[attr for attr in dir(f) if not attr.startswith('_')]}")
if hasattr(f, 'AssetClassification'):
self.Debug(f"AssetClassification properties: {[attr for attr in dir(f.AssetClassification) if not attr.startswith('_')]}")
for f in selected:
ticker = f.Symbol.Value
# Try multiple ways to get sector information
sector = "Unknown"
try:
if hasattr(f, 'AssetClassification') and f.AssetClassification is not None:
# Try commonly used sector properties
if hasattr(f.AssetClassification, 'MorningstarSectorCode'):
sector = str(f.AssetClassification.MorningstarSectorCode)
elif hasattr(f.AssetClassification, 'MorningstarIndustryCode'):
sector = str(f.AssetClassification.MorningstarIndustryCode)
elif hasattr(f.AssetClassification, 'GicsCode'):
sector = str(f.AssetClassification.GicsCode)
# Additional fallbacks
elif hasattr(f.AssetClassification, 'Sector'):
sector = f.AssetClassification.Sector
elif hasattr(f.AssetClassification, 'Industry'):
sector = f.AssetClassification.Industry
except Exception as e:
self.Debug(f"Error getting sector for {ticker}: {str(e)}")
self.sector_mappings[ticker] = sector
return [f.Symbol for f in selected]
def OnSecuritiesChanged(self, changes):
self.Debug(f"Universe changed: Added {len(changes.AddedSecurities)}, Removed {len(changes.RemovedSecurities)}")
for added in changes.AddedSecurities:
self.Debug(f"Added: {added.Symbol.Value}")
for removed in changes.RemovedSecurities:
self.Debug(f"Removed: {removed.Symbol.Value}")
for added in changes.AddedSecurities:
ticker = added.Symbol.Value
if ticker not in self.tickers:
self.tickers.append(ticker)
self.symbols[ticker] = added.Symbol
for removed in changes.RemovedSecurities:
ticker = removed.Symbol.Value
if ticker in self.tickers:
self.tickers.remove(ticker)
if ticker in self.symbols:
del self.symbols[ticker]
if ticker in self.sector_mappings:
del self.sector_mappings[ticker]
if ticker in self.stock_data:
del self.stock_data[ticker]
def TryLoadModelWeights(self):
"""Try to load model weights from ObjectStore"""
try:
if self.ObjectStore.ContainsKey("kqt_model_weights"):
self.Debug("Found model weights in ObjectStore, loading...")
# Get base64 encoded string
encoded_bytes = self.ObjectStore.Read("kqt_model_weights")
# Decode back to binary
import base64
model_bytes = base64.b64decode(encoded_bytes)
# Save temporarily to file
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix='.pth') as temp:
temp_path = temp.name
temp.write(model_bytes)
# Extract input size from saved weights and reinitialize model
try:
state_dict = torch.load(temp_path)
input_shape = state_dict['embedding.0.weight'].shape
actual_input_size = input_shape[1]
self.Debug(f"Detected input size from weights: {actual_input_size}")
# Reinitialize model with correct input size
self.Debug("Successfully loaded model weights")
except Exception as inner_e:
self.Debug(f"Error examining weights: {str(inner_e)}")
# Clean up
import os
os.unlink(temp_path)
else:
self.Debug("No model weights found in ObjectStore")
except Exception as e:
self.Debug(f"Error loading model weights: {str(e)}")
def OnData(self, data):
"""OnData event is the primary entry point for your algorithm."""
# We're using scheduled events instead of processing each data point
pass
def TradeExecute(self):
"""Execute trading logic with extreme safety measures to prevent leverage spikes"""
# Check if we're in emergency safety mode
in_emergency_mode = hasattr(self, 'EMERGENCY_SAFETY_MODE') and self.EMERGENCY_SAFETY_MODE
if in_emergency_mode:
days_since_transition = (self.Time - self.transition_date).days if hasattr(self, 'transition_date') else 0
self.Debug(f"EMERGENCY SAFETY MODE: Day {days_since_transition+1} after transition. " +
f"Max allowed leverage: {self.MAX_LEVERAGE*100:.0f}%, Position cap: {self.POSITION_CAP*100:.0f}%")
# Completely skip trading in the first 2 days for safety
if days_since_transition < 2:
self.Debug(f"EMERGENCY SAFETY: Skipping trading entirely for the first 2 days after transition")
return
# *** FIXED Market Open Check ***
if not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].Exchange.ExchangeOpen:
return # Skip if market is closed or SPY data isn't loaded yet
# Standard trade execution code continues...
self.Debug(f"Current universe size: {len(self.tickers)}")
self.Debug(f"Using sectors: {list(self.sector_mappings.keys())[:5]}...")
self.Debug(f"TradeExecute running on {self.Time}")
# 1. Update historical data for all stocks
self.UpdateHistoricalData()
# 2. Generate predictions for each stock
self.current_predictions = self.GeneratePredictions()
# 3. Check for stop losses
self.ProcessStopLosses()
# 4. Generate new position sizes
market_returns = self.GetMarketReturns()
target_positions = self.strategy.generate_positions(self.current_predictions, market_returns)
self.Debug(f"Target positions before execution: {target_positions}")
self.Debug(f"Market returns: {market_returns}")
# 5. Execute trades to reach target positions
self.ExecuteTrades(target_positions)
# 6. Update portfolio return for regime detection
daily_return = self.CalculatePortfolioReturn()
self.strategy.update_portfolio_returns(daily_return)
# 7. Add this line to store today's value for tomorrow's calculation
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
self.Debug(f"Generated {len(target_positions)} positions: {target_positions}")
def CalculatePortfolioReturn(self):
"""Calculate today's portfolio return"""
# Get the portfolio value change
current_value = self.Portfolio.TotalPortfolioValue
# Use our stored previous value instead of the non-existent property
if self.previous_portfolio_value > 0:
return (current_value / self.previous_portfolio_value - 1) * 100 # as percentage
# On first day, just store the value and return 0
self.previous_portfolio_value = current_value
return 0
def UpdateHistoricalData(self):
"""Fetch and update historical data for all symbols"""
for ticker in self.tickers:
# Use the stored Symbol object instead of accessing Securities by ticker
if ticker not in self.symbols:
self.Debug(f"Symbol not found for ticker {ticker}, skipping")
continue
symbol = self.symbols[ticker]
# Request sufficient history for features
history = self.History(symbol, self.lookback, Resolution.Daily)
if history.empty or len(history) < self.lookback:
self.Debug(f"Not enough historical data for {ticker}, skipping")
continue
# Store historical data
if isinstance(history.index, pd.MultiIndex):
history_reset = history.reset_index()
symbol_data = history_reset[history_reset['symbol'] == symbol]
self.stock_data[ticker] = symbol_data
else:
self.stock_data[ticker] = history
def GetMarketReturns(self):
"""Get recent market returns for regime detection"""
spy_history = self.History(self.spy, 10, Resolution.Daily)
if spy_history.empty:
return []
# Handle both MultiIndex and regular index formats
if isinstance(spy_history.index, pd.MultiIndex):
spy_history_reset = spy_history.reset_index()
spy_history_filtered = spy_history_reset[spy_history_reset['symbol'] == self.spy]
spy_prices = spy_history_filtered['close'].values
else:
spy_prices = spy_history['close'].values
# Calculate returns
spy_returns = []
for i in range(1, len(spy_prices)):
daily_return = (spy_prices[i] / spy_prices[i-1] - 1) * 100
spy_returns.append(daily_return)
return spy_returns
def GeneratePredictions(self):
"""Generate predictions for all stocks"""
predictions = {}
self.Debug(f"Generating predictions with {len(self.stock_data)} stocks in data")
for ticker, history in self.stock_data.items():
try:
if len(history) < self.lookback:
self.Debug(f"Skipping {ticker}: Not enough history ({len(history)} < {self.lookback})")
continue
# Prepare data for this stock
X, stock_df = self.strategy.prepare_stock_data(history, ticker)
if X is None or len(X) == 0:
# FALLBACK: Simple prediction if ML fails
if isinstance(history.index, pd.MultiIndex):
history_reset = history.reset_index()
closes = history_reset[history_reset['symbol'] == self.symbols[ticker]]['close'].values
else:
closes = history['close'].values
if len(closes) > 20:
# Simple momentum strategy for fallback
short_ma = np.mean(closes[-5:])
long_ma = np.mean(closes[-20:])
momentum = closes[-1] / closes[-10] - 1 if len(closes) > 10 else 0
pred_score = momentum + 0.5 * (short_ma/long_ma - 1)
predictions[ticker] = {
"pred_return": pred_score * 2,
"composite_score": pred_score * 5
}
self.Debug(f"Used fallback prediction for {ticker}: {pred_score}")
continue
# If we reach here, X must be valid data for the ML model
# Generate prediction with ML model
pred_return = self.strategy.predict_returns(X, ticker) # Now X is guaranteed not to be None
# Check if prediction itself returned None (handle potential issues in predict_returns)
if pred_return is None:
self.Debug(f"ML prediction for {ticker} returned None. Skipping.")
continue # Skip this ticker if ML prediction failed internally
# Store ML prediction
predictions[ticker] = {
"pred_return": pred_return,
"composite_score": pred_return / self.strategy.adaptive_threshold if self.strategy.adaptive_threshold != 0 else pred_return # Avoid division by zero
}
self.Debug(f"ML prediction for {ticker}: {pred_return}")
except Exception as e:
# Catch errors during data prep or ML prediction call itself
self.Debug(f"Error processing {ticker} in GeneratePredictions loop: {str(e)}")
import traceback
self.Debug(traceback.format_exc()) # Log full traceback for debugging
continue # Skip to next ticker on any error in the main try block
self.Debug(f"Generated {len(predictions)} predictions")
return predictions
def ProcessStopLosses(self):
"""Check and process stop loss orders"""
stop_loss_level = self.strategy.get_stop_loss_level()
for ticker in self.tickers:
if ticker not in self.symbols or not self.Portfolio[self.symbols[ticker]].Invested:
continue
symbol = self.symbols[ticker]
position = self.Portfolio[symbol]
# Get today's return
history = self.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2:
continue
# Handle both MultiIndex and regular index formats
if isinstance(history.index, pd.MultiIndex):
history_reset = history.reset_index()
symbol_data = history_reset[history_reset['symbol'] == symbol]
if len(symbol_data) < 2:
continue
close_prices = symbol_data['close'].values
else:
close_prices = history['close'].values
daily_return = (close_prices[-1] / close_prices[-2] - 1) * 100
position_type = "long" if position.Quantity > 0 else "short"
hit_stop = False
if position_type == "long" and daily_return < stop_loss_level:
hit_stop = True
self.Debug(f"Stop loss triggered for {ticker} (long): {daily_return:.2f}%")
elif position_type == "short" and daily_return > -stop_loss_level:
hit_stop = True
self.Debug(f"Stop loss triggered for {ticker} (short): {daily_return:.2f}%")
if hit_stop:
self.stopped_out.add(ticker)
def ExecuteTrades(self, target_positions):
"""Execute trades with extreme caution to prevent leverage spikes"""
if not target_positions:
self.Debug("No target positions received")
return
# FIRST STEP: Check for emergency safety mode
in_emergency_mode = hasattr(self, 'EMERGENCY_SAFETY_MODE') and self.EMERGENCY_SAFETY_MODE
max_leverage_allowed = 0.8 # Default conservative cap
max_position_size = 0.15 # Default conservative position cap
if in_emergency_mode:
max_leverage_allowed = self.MAX_LEVERAGE
max_position_size = self.POSITION_CAP
self.Debug(f"EMERGENCY SAFETY: Limiting allocation to {max_leverage_allowed*100:.0f}% with {max_position_size*100:.0f}% position cap")
# CRITICAL STEP 1: Calculate and log current leverage BEFORE making changes
portfolio_value = self.Portfolio.TotalPortfolioValue
current_holdings_value = sum(holding.AbsoluteHoldingsValue for holding in self.Portfolio.Values if holding.Invested)
current_leverage = current_holdings_value / portfolio_value if portfolio_value > 0 else 0
self.Debug(f"CURRENT LEVERAGE: {current_leverage:.3f}x - Holdings value: ${current_holdings_value:.2f}, Portfolio value: ${portfolio_value:.2f}")
# CRITICAL STEP 2: Calculate total target allocation and compare to limit
total_target_allocation = sum(abs(weight) for weight in target_positions.values())
self.Debug(f"ALLOCATION CHECK: Target allocation {total_target_allocation:.3f}x vs limit {max_leverage_allowed:.3f}x")
# CRITICAL STEP 3: Enforce more aggressive scaling if needed
if total_target_allocation > max_leverage_allowed:
scaling_factor = max_leverage_allowed / total_target_allocation
original_positions = target_positions.copy() # Save original for logging
target_positions = {ticker: weight * scaling_factor for ticker, weight in target_positions.items()}
# Log the scaling in detail
self.Debug(f"LEVERAGE PROTECTION: Scaling down target allocation from {total_target_allocation:.3f}x to {max_leverage_allowed:.3f}x")
# Log sample of position changes
sample_tickers = list(original_positions.keys())[:5] # First 5 positions
for ticker in sample_tickers:
if ticker in original_positions and ticker in target_positions:
self.Debug(f" {ticker}: {original_positions[ticker]:.4f} → {target_positions[ticker]:.4f}")
# STEP 4: Enforce position size cap
for ticker in list(target_positions.keys()):
if abs(target_positions[ticker]) > max_position_size:
original = target_positions[ticker]
target_positions[ticker] = max_position_size * (1 if original > 0 else -1)
self.Debug(f"POSITION CAP: Limiting {ticker} from {original:.4f} to {target_positions[ticker]:.4f}")
# STEP 5: Additional safety - enforce maximum number of positions in emergency mode
if in_emergency_mode:
max_positions = 10 # Severely limit number of positions during emergency
if len(target_positions) > max_positions:
# Keep only the highest conviction positions
top_positions = sorted(target_positions.items(), key=lambda x: abs(x[1]), reverse=True)[:max_positions]
target_positions = {ticker: weight for ticker, weight in top_positions}
self.Debug(f"SAFETY LIMIT: Restricted to top {max_positions} positions during emergency mode")
# STEP 6: Calculate expected final leverage after proposed changes
# This helps us detect if our changes would still result in too much leverage
expected_total_allocation = sum(abs(weight) for weight in target_positions.values())
self.Debug(f"FINAL CHECK: Expected allocation after all safety measures: {expected_total_allocation:.3f}x")
# STEP 7: Set a minimum cash buffer in emergency mode
if in_emergency_mode:
cash_buffer = 0.3 # Keep 30% in cash during emergency
if expected_total_allocation > (1.0 - cash_buffer):
final_scaling = (1.0 - cash_buffer) / expected_total_allocation
target_positions = {ticker: weight * final_scaling for ticker, weight in target_positions.items()}
self.Debug(f"CASH BUFFER: Added {cash_buffer*100:.0f}% cash buffer, final scaling: {final_scaling:.3f}x")
# Now execute the trades with our ultra-safe target positions
# Continue with the existing execution logic
# Available buying power tracking - FIXED LINE
available_buying_power = self.Portfolio.MarginRemaining # Use MarginRemaining instead of GetBuyingPower()
self.Debug(f"Starting available margin: ${available_buying_power:.2f}")
# Execute trades to reach target positions
for ticker, target_weight in target_positions.items():
symbol = self.symbols[ticker]
current_security = self.Securities[symbol]
# Calculate target share amount
price = current_security.Price
target_value = portfolio_value * target_weight
target_shares = int(target_value / price) if price > 0 else 0
# Get current holdings
holding = self.Portfolio[symbol]
current_shares = holding.Quantity
# Calculate shares to trade
shares_to_trade = target_shares - current_shares
# Skip tiny orders
if abs(shares_to_trade) > 0:
try:
# Check if we have enough buying power for this order
required_buying_power = abs(shares_to_trade) * price * 1.01 # Add 1% buffer
if shares_to_trade < 0 or required_buying_power <= available_buying_power:
# Place the order
if shares_to_trade > 0:
self.MarketOrder(symbol, shares_to_trade)
self.Debug(f"BUY {shares_to_trade} shares of {ticker} (${shares_to_trade * price:.2f})")
available_buying_power -= required_buying_power
else:
self.MarketOrder(symbol, shares_to_trade) # Negative for sell
self.Debug(f"SELL {abs(shares_to_trade)} shares of {ticker}")
# Selling increases buying power
available_buying_power += abs(shares_to_trade) * price * 0.98 # Assume 2% impact/fees
else:
# Not enough buying power, try reduced size
max_affordable = int(available_buying_power / (price * 1.01))
if max_affordable > 0:
self.MarketOrder(symbol, max_affordable)
self.Debug(f"REDUCED BUY {max_affordable} shares of {ticker} due to buying power constraints")
available_buying_power -= max_affordable * price * 1.01
else:
self.Debug(f"Skipped {ticker} order: Insufficient buying power")
except Exception as e:
self.Debug(f"Order error for {ticker}: {str(e)}")
# Store current positions for next day
self.previous_positions = target_positions.copy()
# After executing trades, verify total allocation and deleverage if needed
current_allocation = sum(self.Portfolio[self.symbols[ticker]].AbsoluteHoldingsValue
for ticker in target_positions if ticker in self.symbols
and self.Portfolio[self.symbols[ticker]].Invested) / portfolio_value
# FIX: Use max_leverage_allowed instead of max_allocation_limit
if current_allocation > max_leverage_allowed * 1.05: # 5% buffer
self.Debug(f"EMERGENCY: Current allocation {current_allocation*100:.1f}% exceeds limit, executing deleveraging")
self._ExecuteEmergencyDeleveraging(max_leverage_allowed)
def _LimitSectorExposure(self, target_positions, max_sector_allocation):
"""Limit exposure to any single sector during post-transition period"""
# Group positions by sector
sector_allocations = {}
for ticker, weight in target_positions.items():
sector = self.sector_mappings.get(ticker, "Unknown")
sector_allocations[sector] = sector_allocations.get(sector, 0) + abs(weight)
# Identify sectors that exceed the limit
for sector, allocation in sector_allocations.items():
if allocation > max_sector_allocation:
scaling_factor = max_sector_allocation / allocation
self.Debug(f"Limiting {sector} sector from {allocation*100:.1f}% to {max_sector_allocation*100:.1f}%")
# Scale down all positions in this sector
for ticker in list(target_positions.keys()):
if self.sector_mappings.get(ticker, "Unknown") == sector:
target_positions[ticker] *= scaling_factor
def _ExecuteEmergencyDeleveraging(self, target_allocation):
"""Emergency deleveraging procedure to quickly reduce position sizes"""
self.Debug("EXECUTING EMERGENCY DELEVERAGING")
# Get current total allocation
portfolio_value = self.Portfolio.TotalPortfolioValue
current_allocation = sum(holding.AbsoluteHoldingsValue for holding in self.Portfolio.Values
if holding.Invested) / portfolio_value
if current_allocation <= target_allocation:
self.Debug(f"Current allocation {current_allocation*100:.1f}% already within target {target_allocation*100:.1f}%")
return
# Calculate how much we need to reduce positions by
reduction_factor = target_allocation / current_allocation
self.Debug(f"Reducing all positions by {(1-reduction_factor)*100:.1f}%")
# Get all positions sorted by size (largest first)
positions = sorted(
[(kvp.Key, kvp.Value) for kvp in self.Portfolio if kvp.Value.Invested],
key=lambda p: abs(p[1].HoldingsValue),
reverse=True
)
# Reduce each position proportionally
for symbol, position in positions:
current_shares = position.Quantity
# Calculate reduced position
new_shares = int(current_shares * reduction_factor)
shares_to_sell = current_shares - new_shares
# Only execute if the change is significant
if abs(shares_to_sell) > 0:
self.MarketOrder(symbol, -shares_to_sell) # Negative to reduce position
self.Debug(f"DELEVERAGING: Reduced {symbol} by {abs(shares_to_sell)} shares")
self.Debug(f"DELEVERAGING COMPLETE: Target allocation {target_allocation*100:.1f}%")
def _ExecuteEmergencyCircuitBreaker(self, target_leverage):
"""Last-resort emergency circuit breaker to prevent extreme leverage"""
self.Debug("!!! EXECUTING EMERGENCY CIRCUIT BREAKER !!!")
# Calculate current leverage
portfolio_value = self.Portfolio.TotalPortfolioValue
holdings_value = sum(holding.AbsoluteHoldingsValue for holding in self.Portfolio.Values if holding.Invested)
current_leverage = holdings_value / portfolio_value if portfolio_value > 0 else 0
if current_leverage <= target_leverage:
self.Debug(f"Circuit breaker check passed: {current_leverage:.2f}x <= {target_leverage:.2f}x")
return
# Calculate reduction needed
reduction_factor = target_leverage / current_leverage
self.Debug(f"EMERGENCY: Reducing all positions by {(1-reduction_factor)*100:.1f}%")
# Get all positions sorted by size (largest first for biggest impact)
positions = sorted(
[(kvp.Key, kvp.Value) for kvp in self.Portfolio if kvp.Value.Invested],
key=lambda p: abs(p[1].HoldingsValue),
reverse=True
)
# AGGRESSIVE APPROACH: Liquidate the largest positions first until we're safe
total_reduced = 0
target_reduction = holdings_value - (target_leverage * portfolio_value)
for symbol, position in positions:
if total_reduced >= target_reduction:
break
# For largest positions, liquidate entirely for immediate effect
self.Liquidate(symbol)
total_reduced += position.AbsoluteHoldingsValue
self.Debug(f"EMERGENCY LIQUIDATION: Completely liquidated {symbol} (${position.AbsoluteHoldingsValue:.2f})")
self.Debug(f"CIRCUIT BREAKER COMPLETE: Liquidated ${total_reduced:.2f} in emergency response")
# PENDULUM SYSTEM for quant connect switching algos
from AlgorithmImports import *
from datetime import datetime, timedelta
import calendar
from kqtalgo import KQTAlgorithm
from riskcontrol import MarketCapWeightedSP500Tracker
class PendulumSystem(QCAlgorithm):
def Initialize(self):
"""Initialize the pendulum system algorithm."""
# Set start date, end date, and cash
self.SetStartDate(2020, 1, 1)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH)
self.SetEndDate(2025, 1, 1)
self.SetCash(1000000)
# Add SPY for market data
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
# Add VIX for monitoring only (not for trading)
self.vix = self.AddIndex("VIX", Resolution.Hour).Symbol
# Pre-add securities needed by RiskControl algorithm
# Add BIL security needed by RiskControl
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
# Add defensive ETFs required by RiskControl
# Inverse ETFs
self.sh = self.AddEquity("SH", Resolution.Daily).Symbol
self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol
self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol
self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol
self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol
self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol
# Alternative defensive ETFs
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol
self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol
# Sector defensive ETFs
self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol
self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol
self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol
self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol
self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol
# Set benchmark to SPY
self.SetBenchmark("SPY")
# Set universe settings
self.UniverseSettings.Resolution = Resolution.Daily
self.UniverseSettings.ExtendedMarketHours = False
# Add universe selection for KQT algorithm
self._universe = self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
# Add reference to all ETFs for RiskControl to use
self.all_defensive_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd,
self.gld, self.ief, self.bnd, self.xlp, self.xlu, self.xlv,
self.vht, self.vdc]
# Initialize both algorithms as member variables
self.kqt_algo = self._InitializeKQTAlgorithm()
self.risk_control_algo = self._InitializeRiskControlAlgorithm()
# Initialize active algorithm to KQT by default
self.active_algo = self.kqt_algo
self.current_algo_name = "KQT"
# Initialize tracking variables for algorithm switching
self.in_risk_control_mode = False
self.risk_control_end_date = None
self.months_in_risk_control = 0
self.vix_trigger_months = set() # Track months where VIX has triggered
# Add a flag to track if we've done the initial setup for each algorithm
self.risk_control_initialized = False
self.kqt_initialized = False
# Add a flag to record when switching occurred
self.switch_date = None
# Add comprehensive error tracking
self.error_count = 0
self.max_errors = 10 # Exit after 10 serious errors
# Add a flag to track if we should force a rebalance
self.force_rebalance = False
self.force_rebalance_date = None
# Add flag to directly execute RiskControl's MonthlyRebalance function after switch
self.force_immediate_rebalance = False
# Add a counter to track days since algorithm switch to ensure rebalancing happens
self.days_since_switch = 0
# Add a force allocation flag to ensure RiskControl properly allocates
self.force_deep_allocation = False
# Add a helper dictionary to store current market metrics for debugging
self.market_metrics = {}
# Schedule hourly VIX check
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.Every(TimeSpan.FromHours(1)),
self.CheckVIX)
# Schedule daily notification of which algorithm is active
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(9, 31), # 9:31 AM
self.NotifyActiveAlgorithm)
# Add daily logic execution for active algorithm
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(10, 0), # Same time as KQTAlgorithm.TradeExecute
self.ExecuteActiveAlgorithmLogic)
self.Debug(f"PendulumSystem initialized with default algorithm: {self.current_algo_name}")
def _InitializeKQTAlgorithm(self):
"""Initialize the KQT algorithm."""
kqt = KQTAlgorithm()
# Set the QCAlgorithm properties from this algorithm
self._ShareAlgorithmProperties(kqt)
# Call Initialize to set up the algorithm
kqt.Initialize()
return kqt
def _InitializeRiskControlAlgorithm(self):
"""Initialize the Risk Control algorithm correctly with proper market data"""
try:
# Create a new instance with the correct classname
risk_control = MarketCapWeightedSP500Tracker()
# Share common properties
self._ShareAlgorithmProperties(risk_control)
# Explicitly set required properties before Initialize
# Share ETF symbols directly to member variables
risk_control.spy = self.spy
risk_control.bil = self.bil
risk_control.sh = self.sh
risk_control.psq = self.psq
risk_control.dog = self.dog
risk_control.rwm = self.rwm
risk_control.eum = self.eum
risk_control.myd = self.myd
risk_control.gld = self.gld
risk_control.ief = self.ief
risk_control.bnd = self.bnd
risk_control.xlp = self.xlp
risk_control.xlu = self.xlu
risk_control.xlv = self.xlv
risk_control.vht = self.vht
risk_control.vdc = self.vdc
# Set the securities dictionaries
risk_control.Securities = self.Securities
risk_control.Portfolio = self.Portfolio
# Initialize RiskControl with correct state
risk_control.rebalance_flag = True # Force rebalance at start
risk_control.diagnostic_mode = True # Enable detailed logging
# Initialize the proper ATR tracking for all symbols
risk_control.atr = {}
for symbol in self.all_defensive_etfs + [self.bil, self.spy]:
risk_control.atr[symbol] = self.ATR(symbol, 14, Resolution.Daily)
# Initialize tracking collections
risk_control.spy_30day_window = RollingWindow[float](30)
risk_control.entry_prices = {}
risk_control.previous_bil_allocation = 0.0
risk_control.inverse_positions = set()
risk_control.defensive_positions = set()
risk_control.last_defensive_update = datetime(1900, 1, 1)
# Do explicit pre-fill of historical data BEFORE calling Initialize
history = self.History(self.spy, 60, Resolution.Daily)
# Pre-populate SPY price history data
if not history.empty:
self.Debug(f"Loading {len(history)} days of historical SPY data into spy_30day_window")
# Handle different index formats
if isinstance(history.index, pd.MultiIndex):
spy_prices = history.loc[self.spy]['close'].values
else:
spy_prices = history['close'].values
# IMPORTANT: Organize prices in correct chronological order
# Must add OLDEST prices first, then move to newest
prices_to_add = spy_prices[-30:] # Get the 30 most recent prices
# Add prices to the window in the correct order (oldest first)
for price in prices_to_add:
risk_control.spy_30day_window.Add(price)
self.Debug(f"Filled spy_30day_window with {risk_control.spy_30day_window.Count} days of data")
# Also fill market price dictionary for trend calculation
risk_control.spy_prices = {}
# FIX: Properly extract dates based on index type
if isinstance(history.index, pd.MultiIndex):
# For MultiIndex, properly extract dates from the level containing timestamps
# This works with format: (symbol, timestamp)
dates = []
for idx in history.index:
if idx[0] == self.spy: # Ensure we're getting dates for SPY
# The timestamp is the second element, regardless of index structure
dates.append(idx[1].date())
else:
# For DatetimeIndex, convert each timestamp to date directly
dates = [idx.date() for idx in history.index]
# Make sure we have matching lengths
min_len = min(len(dates), len(spy_prices))
# Add recent prices to the spy_prices dictionary
for i in range(min_len):
risk_control.spy_prices[dates[i]] = spy_prices[i]
self.Debug(f"Added {len(risk_control.spy_prices)} prices to spy_prices dictionary")
# Initialize algorithm but catch any exceptions
try:
# Call the original Initialize method
risk_control.Initialize()
# CRITICAL FIX: Calculate and store current market metrics for debugging
if risk_control.spy_30day_window.Count >= 30:
spy_price = self.Securities[self.spy].Price
sma_30 = sum(risk_control.spy_30day_window) / 30
market_deviation = (spy_price / sma_30) - 1.0
market_trend = risk_control._calculateMarketTrend()
self.market_metrics = {
"spy_price": spy_price,
"sma_30": sma_30,
"market_deviation": market_deviation,
"market_trend": market_trend
}
self.Debug(f"Market metrics: SPY={spy_price:.2f}, SMA30={sma_30:.2f}, " +
f"Deviation={market_deviation*100:.2f}%, Trend={market_trend*100:.2f}%")
# Override rebalance flag to force a rebalance
risk_control.rebalance_flag = True
risk_control.force_rebalance_override = True
# VERY IMPORTANT: Set the date properly
risk_control.last_rebalance_date = self.Time - timedelta(days=60) # Force a rebalance
self.risk_control_initialized = True
except Exception as e:
self.Error(f"Error initializing RiskControl algorithm: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
return risk_control
except Exception as e:
self.Error(f"Error creating RiskControl algorithm: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
# Return a placeholder risk control algorithm that won't crash
placeholder = QCAlgorithm()
placeholder.Debug = self.Debug
return placeholder
def _ShareAlgorithmProperties(self, algo):
"""Share essential QCAlgorithm properties with better error handling."""
try:
# Share the broker, trader, and other essential properties
algo.Securities = self.Securities
algo.Portfolio = self.Portfolio
algo.Transactions = self.Transactions
algo.BrokerageModel = self.BrokerageModel
algo.Debug = self.Debug
algo.Error = self.Error if hasattr(self, 'Error') else self.Debug
# Share needed symbols - check first to avoid errors
if hasattr(self, 'spy'):
algo.spy = self.spy
if hasattr(self, 'bil'):
algo.bil = self.bil
# Share all defensive ETFs with RiskControl algorithm more carefully
if isinstance(algo, MarketCapWeightedSP500Tracker):
for etf in ['sh', 'psq', 'dog', 'rwm', 'eum', 'myd', 'gld', 'ief', 'bnd',
'xlp', 'xlu', 'xlv', 'vht', 'vdc']:
if hasattr(self, etf):
setattr(algo, etf, getattr(self, etf))
# Only share properties that exist on the parent algorithm
for prop in ['SubscriptionManager', 'OptionChainProvider', 'FutureChainProvider']:
if hasattr(self, prop):
setattr(algo, prop, getattr(self, prop))
# This allows child algorithms to place orders through the main algorithm
def wrapped_market_order(symbol, quantity, tag=""):
try:
# Don't allow trading of VIX (index)
if symbol == self.vix:
self.Debug(f"Blocked attempt to trade VIX index")
return None
return self.MarketOrder(symbol, quantity, asynchronous=False, tag=tag)
except Exception as e:
self.Error(f"Error in wrapped_market_order for {symbol}: {str(e)}")
return None
algo.MarketOrder = wrapped_market_order
# Safe SetHoldings wrapper
def wrapped_set_holdings(symbol, percentage, liquidateExistingHoldings=False, tag=""):
try:
if symbol == self.vix:
self.Debug(f"Blocked attempt to set holdings for VIX index")
return None
return self.SetHoldings(symbol, percentage, liquidateExistingHoldings, tag)
except Exception as e:
self.Error(f"Error in SetHoldings for {symbol}: {str(e)}")
return None
algo.SetHoldings = wrapped_set_holdings
# Safe Liquidate wrapper
def wrapped_liquidate(symbol=None, tag=""):
try:
return self.Liquidate(symbol, tag)
except Exception as e:
self.Error(f"Error in Liquidate for {symbol}: {str(e)}")
return None
algo.Liquidate = wrapped_liquidate
# Make other relevant methods available
algo.History = self.History
# Create a custom getter method for Time instead of direct assignment
algo.GetCurrentTime = lambda: self.Time
algo.GetUtcTime = lambda: self.UtcTime
# Access to UniverseSettings
algo.UniverseSettings = self.UniverseSettings
# Pass ObjectStore access
algo.ObjectStore = self.ObjectStore
return algo
except Exception as e:
self.Error(f"Error in _ShareAlgorithmProperties: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
return algo # Return the algorithm even if property sharing failed
def OnSecuritiesChanged(self, changes):
"""Handle securities universe changes."""
# Forward to active algorithm
if hasattr(self.active_algo, "OnSecuritiesChanged"):
self.active_algo.OnSecuritiesChanged(changes)
def OnData(self, data):
"""Main event for handling market data."""
try:
# Track days since algorithm switch
if self.in_risk_control_mode and hasattr(self, 'switch_date'):
self.days_since_switch = (self.Time - self.switch_date).days
# Update RiskControl algorithm's spy_30day_window if in RiskControl mode
if self.in_risk_control_mode and self.spy in data.Bars:
spy_price = data.Bars[self.spy].Close
# Add to the RiskControl's price window directly
self.active_algo.spy_30day_window.Add(spy_price)
# Also update the price history dictionary
if hasattr(self.active_algo, 'spy_prices'):
self.active_algo.spy_prices[self.Time.date()] = spy_price
# Clean up old prices (keep only the last 60 days)
dates_to_remove = []
for date in self.active_algo.spy_prices.keys():
if (self.Time.date() - date).days > 60:
dates_to_remove.append(date)
for date in dates_to_remove:
self.active_algo.spy_prices.pop(date)
# Forward data to the active algorithm
self.active_algo.OnData(data)
# Force immediate rebalance after switching to RiskControl
if self.force_immediate_rebalance and self.in_risk_control_mode:
# Check if market is open and we have SPY data
if self.Securities.ContainsKey(self.spy) and self.Securities[self.spy].Exchange.ExchangeOpen:
self.Debug(f"Executing immediate rebalance {self.days_since_switch} days after switching to RiskControl")
# Force flag to be set
self.active_algo.rebalance_flag = True
# Call the method directly
self.ForceRiskControlRebalance()
self.force_immediate_rebalance = False
self.Debug("Immediate rebalance completed")
# Verify portfolio state after rebalance
self.LogRiskControlPortfolio()
except Exception as e:
self.Error(f"Error in OnData: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
def CheckVIX(self):
"""Check VIX levels hourly with enhanced error handling."""
try:
if not self.Securities.ContainsKey(self.vix):
self.Debug("VIX data not available")
return
current_vix = self.Securities[self.vix].Price
current_month_year = (self.Time.year, self.Time.month)
# Log VIX level for monitoring
if self.Time.hour == 10 and self.Time.minute == 0: # Log once per day at 10:00 AM
self.Debug(f"Current VIX: {current_vix:.2f}, Current Algorithm: {self.current_algo_name}")
# Add extra diagnostics if in risk control mode
if self.in_risk_control_mode:
days_since_switch = (self.Time - self.switch_date).days if self.switch_date else "unknown"
self.Debug(f"Days since switch to RiskControl: {days_since_switch}, Error count: {self.error_count}")
# Check if we need to switch to Risk Control
if current_vix > 29 and not self.in_risk_control_mode:
self.Debug(f"VIX TRIGGER: VIX at {current_vix:.2f} > 30, switching to Risk Control algorithm")
self.SwitchToRiskControl()
# Mark this month as having triggered VIX
self.vix_trigger_months.add(current_month_year)
# Check if VIX exceeded 30 in a new month while already in risk control mode
elif current_vix > 29 and self.in_risk_control_mode:
if current_month_year not in self.vix_trigger_months:
self.vix_trigger_months.add(current_month_year)
self.ExtendRiskControlPeriod()
# Check if it's time to switch back to KQT
if self.in_risk_control_mode and self.Time >= self.risk_control_end_date:
self.Debug("Risk Control period ended, switching back to KQT algorithm")
self.SwitchToKQT()
except Exception as e:
self.Error(f"Error in CheckVIX: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
def SwitchToRiskControl(self):
"""Switch from KQT to Risk Control algorithm, ensuring proper initialization and execution"""
try:
self.Debug("Starting switch to RiskControl...")
# Liquidate all positions before switching
self.Liquidate()
self.Debug("Positions liquidated")
# Re-initialize the risk control algorithm to ensure a clean state
self.Debug("Reinitializing RiskControl algorithm...")
self.risk_control_algo = self._InitializeRiskControlAlgorithm()
# CRITICAL ADDITION: Manually force portfolio allocation test
self.force_deep_allocation = True
self.Debug("Set force_deep_allocation to ensure proper portfolio allocation")
# Make sure the force_rebalance_override exists and is set
if not hasattr(self.risk_control_algo, 'force_rebalance_override'):
self.Debug("Adding missing force_rebalance_override flag to RiskControl")
self.risk_control_algo.force_rebalance_override = True
else:
self.risk_control_algo.force_rebalance_override = True
# Update active algorithm
self.active_algo = self.risk_control_algo
self.current_algo_name = "RiskControl"
self.in_risk_control_mode = True
# Set initial period to 3 months
self.months_in_risk_control = 3
# Calculate end date (3 months from now)
self.risk_control_end_date = self._AddMonths(self.Time, 3)
# Extensive logging of market conditions at switch time
self._DeepLogMarketConditions()
# Record when we switched for debugging
self.switch_date = self.Time
self.days_since_switch = 0
# CRITICAL FIX: Set multiple flags to ensure rebalance occurs
self.force_immediate_rebalance = True
self.force_rebalance = True
self.force_rebalance_date = self.Time.date()
self.Debug(f"Switched to Risk Control algorithm until {self.risk_control_end_date.strftime('%Y-%m-%d')}")
# FORCE IMMEDIATE REBALANCE - This is critical for proper execution
if hasattr(self.active_algo, 'MonthlyRebalance'):
self.Debug("DIRECTLY executing MonthlyRebalance after switch")
# Make sure the flag is set in multiple places
self.active_algo.rebalance_flag = True
self.active_algo.force_rebalance_override = True
# CRITICAL: Set the last_rebalance_date to force a rebalance
self.active_algo.last_rebalance_date = self.Time - timedelta(days=60)
# Execute the rebalance
self.active_algo.MonthlyRebalance()
# Execute diagnostic methods
self._ForceAllocationTest()
# Log immediate results
self.LogRiskControlPortfolio()
except Exception as e:
self.Error(f"Error in SwitchToRiskControl: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
self.error_count += 1
# If too many errors, revert to KQT
if self.error_count >= self.max_errors:
self.Debug("Too many errors, reverting to KQT algorithm")
self.SwitchToKQT()
def _AnalyzeRiskControlPositions(self):
"""Enhanced debugging for RiskControl decision making"""
if not self.in_risk_control_mode:
return
# Get spy data
if not self.Securities.ContainsKey(self.spy):
self.Debug("Cannot analyze RiskControl - SPY not available")
return
spy_price = self.Securities[self.spy].Price
# Check if 30-day window is ready
if hasattr(self.active_algo, 'spy_30day_window') and self.active_algo.spy_30day_window.Count >= 30:
sma_30 = sum(self.active_algo.spy_30day_window) / 30
market_deviation = (spy_price / sma_30) - 1.0
self.Debug(f"ANALYSIS - Current SPY: {spy_price:.2f}, 30-day SMA: {sma_30:.2f}")
self.Debug(f"ANALYSIS - Market deviation: {market_deviation*100:.2f}%")
# Market condition assessment
if market_deviation > 0.05:
self.Debug("ANALYSIS - Strong bull market detected, minimal defensive positions expected")
elif market_deviation > 0:
self.Debug("ANALYSIS - Mild bull market detected, moderate defensive positions expected")
elif market_deviation > -0.03:
self.Debug("ANALYSIS - Neutral market detected, balanced allocation expected")
else:
self.Debug("ANALYSIS - Bear market detected, heavy defensive positions expected")
# Check defensive scores if method exists
if hasattr(self.active_algo, '_runDefensiveETFDiagnostics'):
self.Debug("ANALYSIS - Running detailed defensive ETF diagnostics...")
trend = self.active_algo._calculateMarketTrend() if hasattr(self.active_algo, '_calculateMarketTrend') else 0
self.active_algo._runDefensiveETFDiagnostics(market_deviation, trend)
else:
self.Debug("ANALYSIS - Not enough history in spy_30day_window for complete analysis")
# Check BIL recommendation specifically
self.Debug(f"ANALYSIS - Current BIL allocation: {self.Portfolio[self.bil].HoldingsValue/self.Portfolio.TotalPortfolioValue*100:.2f}%")
def SwitchToKQT(self):
"""Switch from Risk Control to KQT algorithm."""
# Liquidate all positions and log details
self.Debug("Liquidating all positions before switch to KQT...")
self.Liquidate()
# Add a check to verify liquidation status
positions_count = sum(1 for p in self.Portfolio.Values if p.Invested)
self.Debug(f"After liquidation call: {positions_count} positions still invested")
if hasattr(self, 'risk_control_algo') and hasattr(self.risk_control_algo, 'spy_30day_window'):
self.Debug("Clearing RiskControl market data state")
self.risk_control_algo.spy_30day_window = RollingWindow[float](30)
# Re-initialize the KQT algorithm to ensure a clean state
self.kqt_algo = self._InitializeKQTAlgorithm()
# Update active algorithm
self.active_algo = self.kqt_algo
self.current_algo_name = "KQT"
self.in_risk_control_mode = False
self.risk_control_end_date = None
self.months_in_risk_control = 0
self.vix_trigger_months.clear() # Reset VIX trigger tracking
self.Debug("Switched to KQT algorithm")
def ExtendRiskControlPeriod(self):
"""Extend the Risk Control period by 1 month."""
# Add 1 month to the current end date
self.months_in_risk_control += 1
self.risk_control_end_date = self._AddMonths(self.risk_control_end_date, 1)
self.Debug(f"Extended Risk Control period by 1 month. New end date: {self.risk_control_end_date.strftime('%Y-%m-%d')}")
def _AddMonths(self, date, months):
"""Add a specified number of months to a date, handling month end correctly."""
month = date.month - 1 + months
year = date.year + month // 12
month = month % 12 + 1
# Get the last day of the target month
last_day = calendar.monthrange(year, month)[1]
# If original date is the last day of the month or beyond the last day of the target month
if date.day > last_day or date.day == calendar.monthrange(date.year, date.month)[1]:
day = last_day
else:
day = date.day
return datetime(year, month, day, date.hour, date.minute, date.second)
def NotifyActiveAlgorithm(self):
"""Log the currently active algorithm and remaining time if in Risk Control mode."""
if self.in_risk_control_mode:
days_remaining = (self.risk_control_end_date - self.Time).days
self.Debug(f"Active Algorithm: {self.current_algo_name}, Days remaining: {days_remaining}")
else:
self.Debug(f"Active Algorithm: {self.current_algo_name}")
def OnOrderEvent(self, orderEvent):
"""Handle order events."""
# Forward to active algorithm if it has the method
if hasattr(self.active_algo, "OnOrderEvent"):
self.active_algo.OnOrderEvent(orderEvent)
def OnEndOfAlgorithm(self):
"""Called at the end of the algorithm execution."""
# Forward to both algorithms to ensure proper cleanup
if hasattr(self.kqt_algo, "OnEndOfAlgorithm"):
self.kqt_algo.OnEndOfAlgorithm()
if hasattr(self.risk_control_algo, "OnEndOfAlgorithm"):
self.risk_control_algo.OnEndOfAlgorithm()
def ExecuteActiveAlgorithmLogic(self):
"""Execute the active algorithm's trading logic with enhanced RiskControl handling"""
# Check if market is open
if not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].Exchange.ExchangeOpen:
return
try:
# Check if we need to force a rebalance after algorithm switch
if self.force_rebalance and self.Time.date() >= self.force_rebalance_date:
self.Debug("Executing forced rebalance after algorithm switch")
self.ForceRiskControlRebalance()
self.force_rebalance = False
self.force_rebalance_date = None
# For KQT algorithm, call TradeExecute directly
if self.current_algo_name == "KQT" and hasattr(self.active_algo, "TradeExecute"):
self.Debug(f"Executing KQT Algorithm's TradeExecute method")
# Make sure target positions don't include VIX
original_generate_positions = self.active_algo.strategy.generate_positions
def filtered_generate_positions(prediction_data, current_returns=None):
positions = original_generate_positions(prediction_data, current_returns)
# Remove VIX from target positions
if 'VIX' in positions:
self.Debug(f"Removing VIX from target positions")
del positions['VIX']
return positions
# Monkey patch the strategy function to filter out VIX
self.active_algo.strategy.generate_positions = filtered_generate_positions
try:
self.active_algo.TradeExecute()
self.Debug("KQT TradeExecute completed successfully")
except Exception as e:
self.Error(f"Error in KQT TradeExecute: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
self.error_count += 1
# Restore original function
self.active_algo.strategy.generate_positions = original_generate_positions
# Enhanced RiskControl execution logic
elif self.current_algo_name == "RiskControl":
# Log current RiskControl status with more details
if hasattr(self.active_algo, 'rebalance_flag'):
self.Debug(f"RiskControl status: rebalance_flag={self.active_algo.rebalance_flag}, " +
f"days_since_switch={self.days_since_switch}")
# Show portfolio breakdown for debugging
if self.days_since_switch < 3 or self.Time.day == 1: # First of month or after switch
self.LogRiskControlPortfolio()
# Check if RiskControl is properly initialized
if not self.risk_control_initialized:
self.Debug("RiskControl not properly initialized, attempting to fix...")
self.risk_control_algo = self._InitializeRiskControlAlgorithm()
self.active_algo = self.risk_control_algo
self.risk_control_initialized = True
return
today_is_wednesday = self.Time.weekday() == 2
today_is_monday = self.Time.weekday() == 0
first_week_of_month = self.Time.day <= 7
# CRITICAL: Force monthly rebalance during first few days OR on first Wednesday
force_rebalance = (self.days_since_switch < 3) or (first_week_of_month and today_is_wednesday)
# Check if it's time for monthly rebalance or forced rebalance
if hasattr(self.active_algo, "MonthlyRebalance"):
if (first_week_of_month and today_is_wednesday) or force_rebalance or self.active_algo.rebalance_flag:
self.Debug(f"Executing RiskControl Algorithm's MonthlyRebalance method (forced={force_rebalance})")
# Set the rebalance flag to true before calling
self.active_algo.rebalance_flag = True
try:
# Using SetRebalanceFlag first if available
if hasattr(self.active_algo, 'SetRebalanceFlag'):
self.active_algo.SetRebalanceFlag()
# Execute the rebalance
self.active_algo.MonthlyRebalance()
self.Debug("MonthlyRebalance completed successfully")
# Log portfolio after rebalance
self.LogRiskControlPortfolio()
except Exception as e:
self.Error(f"Error in MonthlyRebalance: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
self.error_count += 1
# Also ensure WeeklyDefensiveAdjustment gets called on Mondays
if today_is_monday and hasattr(self.active_algo, "WeeklyDefensiveAdjustment"):
self.Debug(f"Executing RiskControl Algorithm's WeeklyDefensiveAdjustment method")
try:
self.active_algo.WeeklyDefensiveAdjustment()
self.Debug("WeeklyDefensiveAdjustment completed successfully")
# Log portfolio after weekly adjustment
if self.Time.day <= 14: # First two weeks only to reduce log spam
self.LogRiskControlPortfolio()
except Exception as e:
self.Error(f"Error in WeeklyDefensiveAdjustment: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
self.error_count += 1
# Add detailed market analysis EVERY day to help diagnose the issue
self._DeepLogMarketConditions()
# Add portfolio composition check
if self.Portfolio.Invested:
total_value = self.Portfolio.TotalPortfolioValue
equity_value = sum(self.Portfolio[s].HoldingsValue
for s in self.Portfolio.Keys
if s != self.bil and s not in self.all_defensive_etfs)
bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0
self.Debug(f"DAILY PORTFOLIO CHECK: Equity: {equity_value/total_value*100:.1f}%, " +
f"BIL: {bil_value/total_value*100:.1f}%, Cash: {self.Portfolio.Cash/total_value*100:.1f}%")
# Force rebalance if we still see no equity positions after several days
if (self.days_since_switch > 3 and
sum(1 for s in self.Portfolio.Keys
if self.Portfolio[s].Invested and
s != self.bil and
s not in self.all_defensive_etfs) == 0):
self.Debug("EMERGENCY: No equity positions detected days after switch, forcing rebalance")
self.ForceRiskControlRebalance()
except Exception as e:
self.Error(f"Error in ExecuteActiveAlgorithmLogic: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
self.error_count += 1
# If too many errors, revert to KQT
if self.error_count >= self.max_errors and self.current_algo_name == "RiskControl":
self.Debug("Too many errors in RiskControl, reverting to KQT algorithm")
self.SwitchToKQT()
def ForceRiskControlRebalance(self):
"""Force a rebalance for the Risk Control algorithm with enhanced execution"""
try:
if self.current_algo_name == "RiskControl" and hasattr(self.active_algo, 'MonthlyRebalance'):
self.Debug("Executing immediate rebalance after switch to RiskControl")
# CRITICAL: Force both flags directly without any conditions
self.active_algo.rebalance_flag = True
self.active_algo.force_rebalance_override = True
self.Debug("ForceRiskControlRebalance: Set BOTH rebalance flags to True")
# CRITICAL: Make sure we have stocks in selected_by_market_cap
if not hasattr(self.active_algo, 'selected_by_market_cap') or len(self.active_algo.selected_by_market_cap) == 0:
self.Debug("WARNING: selected_by_market_cap is empty, manually triggering Fine selection")
# Get initial universe from coarse selection
coarse_universe = self.FineSelectionFunction(self.CoarseSelectionFunction([
x for x in self.Securities.Values
if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA
]))
# Call Fine selection function directly to populate selected_by_market_cap
self.active_algo.selected_by_market_cap = []
# Try to get market cap data for these symbols
for symbol in coarse_universe:
if self.Securities.ContainsKey(symbol):
security = self.Securities[symbol]
if hasattr(security, 'Fundamentals') and security.Fundamentals is not None:
market_cap = security.Fundamentals.MarketCap
self.active_algo.selected_by_market_cap.append((symbol, market_cap))
else:
# Use price as a proxy for market cap if fundamentals not available
self.active_algo.selected_by_market_cap.append((symbol, security.Price * 1000000))
# If we still don't have enough stocks, use a fixed list of large caps
if len(self.active_algo.selected_by_market_cap) < 10:
self.Debug("Still not enough stocks, adding fallback large cap stocks")
fallback_tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "XOM", "JPM", "JNJ", "PG"]
for ticker in fallback_tickers:
# Try to find the symbol in our Securities collection
for s in self.Securities.Keys:
if s.Value == ticker and s not in [x[0] for x in self.active_algo.selected_by_market_cap]:
price = self.Securities[s].Price
# Use price as a proxy for market cap
self.active_algo.selected_by_market_cap.append((s, price * 1000000000))
self.Debug(f"Added fallback stock: {ticker}")
self.Debug(f"Selected {len(self.active_algo.selected_by_market_cap)} stocks for market cap weighting")
self.active_algo.MonthlyRebalance()
self.Debug("MonthlyRebalance executed with forced conditions")
except Exception as e:
self.Error(f"Error in ForceRiskControlRebalance: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
def LogRiskControlPortfolio(self):
"""Enhanced portfolio logging with more details about defensive positions"""
self.Debug("CURRENT PORTFOLIO HOLDINGS:")
total_value = self.Portfolio.TotalPortfolioValue
cash_percent = self.Portfolio.Cash / total_value * 100
self.Debug(f"Cash: ${self.Portfolio.Cash:.2f} ({cash_percent:.2f}%)")
# Calculate major allocation groups
bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0
bil_percent = bil_value / total_value * 100
inverse_value = sum(self.Portfolio[s].HoldingsValue
for s in [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
if self.Portfolio[s].Invested)
inverse_percent = inverse_value / total_value * 100
defensive_value = sum(self.Portfolio[s].HoldingsValue
for s in [self.gld, self.ief, self.bnd, self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
if self.Portfolio[s].Invested)
defensive_percent = defensive_value / total_value * 100
equity_value = total_value - cash_percent - bil_value - inverse_value - defensive_value
equity_percent = equity_value / total_value * 100
self.Debug(f"ALLOCATION SUMMARY:")
self.Debug(f" BIL: ${bil_value:.2f} ({bil_percent:.2f}%)")
self.Debug(f" Inverse: ${inverse_value:.2f} ({inverse_percent:.2f}%)")
self.Debug(f" Defensive: ${defensive_value:.2f} ({defensive_percent:.2f}%)")
self.Debug(f" Equity: ${equity_value:.2f} ({equity_percent:.2f}%)")
# Log invested positions
invested_positions = [kvp for kvp in self.Portfolio.Values if kvp.Invested]
invested_positions.sort(key=lambda p: p.HoldingsValue, reverse=True)
self.Debug(f"TOP 10 HOLDINGS:")
for i, position in enumerate(invested_positions[:10]):
symbol = position.Symbol
shares = position.Quantity
value = position.HoldingsValue
percent = value / total_value * 100
# Highlight position type
position_type = "Equity"
if symbol == self.bil:
position_type = "BIL"
elif symbol in [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]:
position_type = "Inverse"
elif symbol in [self.gld, self.ief, self.bnd, self.xlp, self.xlu, self.xlv, self.vht, self.vdc]:
position_type = "Defensive"
self.Debug(f" {i+1}. {symbol.Value} ({position_type}): {shares} shares, ${value:.2f} ({percent:.2f}%)")
# Log total positions count
self.Debug(f"Total positions: {len(invested_positions)}")
# Log defensive tracking data from RiskControl algorithm
if self.current_algo_name == "RiskControl":
if hasattr(self.active_algo, 'defensive_positions'):
defensive_symbols = [s.Value for s in self.active_algo.defensive_positions]
self.Debug(f"Current tracked defensive positions: {defensive_symbols}")
if hasattr(self.active_algo, 'rebalance_flag'):
self.Debug(f"RiskControl rebalance_flag: {self.active_algo.rebalance_flag}")
def CoarseSelectionFunction(self, coarse):
"""Pass universe selection to active algorithm if it has the method"""
if self.current_algo_name == "KQT" and hasattr(self.active_algo, "CoarseSelectionFunction"):
return self.active_algo.CoarseSelectionFunction(coarse)
# Default implementation for RiskControl or as fallback
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
"""Pass universe selection to active algorithm if it has the method"""
if self.current_algo_name == "KQT" and hasattr(self.active_algo, "FineSelectionFunction"):
return self.active_algo.FineSelectionFunction(fine)
# Default implementation for RiskControl or as fallback
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
# IMPORTANT: If we're in RiskControl mode, update its selected_by_market_cap directly
if self.current_algo_name == "RiskControl":
self.active_algo.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
self.Debug(f"Updated RiskControl selected_by_market_cap with {len(sorted_by_cap)} stocks")
return [x.Symbol for x in sorted_by_cap]
def _EnsureEnoughHistory(self):
"""Ensure RiskControl has enough historical data for decisions"""
if not self.in_risk_control_mode:
return
# Check if we need to populate the 30-day window
if hasattr(self.active_algo, 'spy_30day_window') and self.active_algo.spy_30day_window.Count < 30:
self.Debug("Populating spy_30day_window with historical data")
# Get 60 days of history to ensure we have enough
history = self.History(self.spy, 60, Resolution.Daily)
if not history.empty:
if isinstance(history.index, pd.MultiIndex):
prices = history.loc[self.spy]['close'].values
else:
prices = history['close'].values
# Clear existing data and fill with historical data
self.active_algo.spy_30day_window = RollingWindow[float](30)
# Add prices to the window in the correct order (oldest first)
for price in prices[-30:]: # Take the most recent 30 days
self.active_algo.spy_30day_window.Add(price)
self.Debug(f"Filled spy_30day_window with {self.active_algo.spy_30day_window.Count} days of data")
def _DeepLogMarketConditions(self):
"""Log extensive market conditions for debugging RiskControl issues"""
self.Debug("=========== MARKET CONDITIONS DEEP LOG ===========")
# Get SPY data
if not self.Securities.ContainsKey(self.spy):
self.Debug("SPY not available")
return
spy_price = self.Securities[self.spy].Price
# Get all the historic data
history = self.History(self.spy, 60, Resolution.Daily)
if history.empty:
self.Debug("No SPY history available")
return
# Get closing prices
if isinstance(history.index, pd.MultiIndex):
spy_prices = history.loc[self.spy]['close'].values
else:
spy_prices = history['close'].values
# Calculate key metrics
if len(spy_prices) >= 30:
sma_30 = np.mean(spy_prices[-30:])
market_deviation = (spy_price / sma_30) - 1.0
# Calculate trend metrics
if len(spy_prices) >= 10:
trend_10d = (spy_prices[-1] / spy_prices[-10]) - 1.0
else:
trend_10d = 0
# Store these metrics for reference
self.market_metrics = {
"spy_price": spy_price,
"sma_30": sma_30,
"market_deviation": market_deviation,
"trend_10d": trend_10d,
"last_30_prices": spy_prices[-30:].tolist(),
"spy_window_size": self.active_algo.spy_30day_window.Count if hasattr(self.active_algo, 'spy_30day_window') else 0
}
# Log extensive details
self.Debug(f"SPY price: {spy_price:.2f}")
self.Debug(f"30-day SMA: {sma_30:.2f}")
self.Debug(f"Market deviation: {market_deviation*100:.2f}%")
self.Debug(f"10-day trend: {trend_10d*100:.2f}%")
self.Debug(f"Window count: {self.active_algo.spy_30day_window.Count}")
# Log where we are in the market cycle
if market_deviation > 0.05:
self.Debug("MARKET CONDITION: Strong bull market (deviation > 5%)")
elif market_deviation > 0.02:
self.Debug("MARKET CONDITION: Moderate bull market (deviation > 2%)")
elif market_deviation > 0:
self.Debug("MARKET CONDITION: Mild bull market (deviation > 0%)")
elif market_deviation > -0.03:
self.Debug("MARKET CONDITION: Neutral market (deviation > -3%)")
else:
self.Debug("MARKET CONDITION: Bear market (deviation <= -3%)")
# CRITICAL: Add extensive log of allocation decisions
self._LogExpectedAllocations(market_deviation, trend_10d)
else:
self.Debug(f"Not enough price history: {len(spy_prices)} days available, need 30")
self.Debug("===================================================")
def _LogExpectedAllocations(self, market_deviation, market_trend):
"""Log what allocations should be expected based on market conditions"""
self.Debug("EXPECTED ALLOCATION BASED ON MARKET CONDITIONS:")
# Replicate RiskControl logic to determine expected allocations
bil_weight = 0.0
if market_deviation < 0:
# Enhanced formula for better downside protection
base_weight = -market_deviation # Convert to positive number
if base_weight > 0.08: # Significant drop
# Lower cap on BIL for significant drops
bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%)
else:
bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%)
# Estimate what allocation should be
if market_deviation > 0.08: # Very strong bull
expected_bil = min(0.15, bil_weight)
expected_equity = 0.85
expected_defensive = 0
elif market_deviation > 0.05: # Strong bull
expected_bil = min(0.25, bil_weight)
expected_equity = 0.75
expected_defensive = 0
elif market_deviation > 0.0: # Mild bull
expected_bil = min(0.4, bil_weight)
expected_equity = 0.6
expected_defensive = 0
elif market_deviation > -0.03: # Neutral
expected_bil = min(0.5, bil_weight)
expected_equity = 0.5
expected_defensive = 0 # Small defensive allocation possible
else: # Bear
expected_bil = min(0.6, bil_weight)
expected_equity = 0.4
expected_defensive = max(0, 0.2 * (-market_deviation - 0.03) * 10) # More defensive in deeper corrections
self.Debug(f"Expected BIL: {expected_bil*100:.1f}%")
self.Debug(f"Expected equity: {expected_equity*100:.1f}%")
self.Debug(f"Expected defensive ETFs: {expected_defensive*100:.1f}%")
# Make all of this information available to MonthlyRebalance for diagnostic purposes
if hasattr(self.active_algo, 'expected_allocations'):
self.active_algo.expected_allocations = {
'bil': expected_bil,
'equity': expected_equity,
'defensive': expected_defensive
}
def _ForceAllocationTest(self):
"""Directly test allocation to make sure we can allocate to other assets"""
if not self.in_risk_control_mode or not self.force_deep_allocation:
return
self.Debug("FORCE ALLOCATION TEST: Directly testing allocation capabilities")
# Clear the force deep allocation flag so this only runs once
self.force_deep_allocation = False
try:
# CRITICAL: Directly try buying a very small amount of a defensive ETF
test_symbol = self.gld # Gold is a good test case
test_allocation = 0.05 # Small test allocation (5%)
self.Debug(f"Testing direct allocation to {test_symbol}")
self.active_algo.SetHoldings(test_symbol, test_allocation)
# Also try a small equity position
if len(self.active_algo.selected_by_market_cap) > 0:
equity_symbol = self.active_algo.selected_by_market_cap[0][0]
self.Debug(f"Testing direct allocation to equity {equity_symbol}")
self.active_algo.SetHoldings(equity_symbol, 0.05)
# Instead of using Tomorrow(), schedule for the next trading day at market open
# FIX: Use EveryDay and check if it's the next day in the handler
self.next_check_date = self.Time.date() + timedelta(days=1)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.CheckAllocationTestWrapper)
except Exception as e:
self.Error(f"Force allocation test failed: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
def CheckAllocationTestWrapper(self):
"""Wrapper to only execute the check once on the next day"""
# Only run this once on the next day
if hasattr(self, 'next_check_date') and self.Time.date() >= self.next_check_date:
# Clear the next check date attribute so it doesn't run again
delattr(self, 'next_check_date')
# Call the actual check method
self.CheckAllocationTest()
def CheckAllocationTest(self):
"""Check if our forced allocation test was successful"""
self.Debug("CHECKING ALLOCATION TEST RESULTS:")
# Check if we have any non-BIL positions
equity_positions = 0
defensive_positions = 0
bil_position = 0
for kvp in self.Portfolio:
symbol = kvp.Key
position = kvp.Value
if not position.Invested:
continue
if symbol == self.bil:
bil_position += 1
elif symbol in self.all_defensive_etfs:
defensive_positions += 1
else:
equity_positions += 1
self.Debug(f"Current positions: BIL: {bil_position}, Defensive ETFs: {defensive_positions}, Equities: {equity_positions}")
if defensive_positions == 0 and equity_positions == 0:
self.Debug("CRITICAL ERROR: Only BIL is being held despite force allocation test")
# Last-resort attempt: Try to fake the market conditions to force allocation
self._FakeMarketConditions()
else:
self.Debug("Allocation test successful: Portfolio includes non-BIL positions")
def _FakeMarketConditions(self):
"""Last resort: Attempt to fake market conditions to force allocation"""
if not self.in_risk_control_mode:
return
self.Debug("ATTEMPTING EMERGENCY FIX: Faking market conditions to force allocation")
try:
# Safety check to make sure we have the RiskControl algorithm
if not hasattr(self.active_algo, 'MonthlyRebalance'):
self.Debug("Cannot fake market conditions: Not a RiskControl algorithm")
return
# Create a fake bear market by manipulating the 30-day window
# This is a hack, but it might fix the issue in a live system
spy_price = self.Securities[self.spy].Price
# Create a set of fake prices 20% higher than current price
fake_high_prices = [spy_price * 1.2] * 30
# Replace the window with our fake high prices
self.active_algo.spy_30day_window = RollingWindow[float](30)
for price in fake_high_prices:
self.active_algo.spy_30day_window.Add(price)
# This should create a severe negative market deviation
# which should force allocation to defensive ETFs
self.Debug("Created fake bear market conditions (-16.7% deviation)")
# Force rebalance with these fake conditions
self.active_algo.rebalance_flag = True
self.active_algo.force_rebalance_override = True
self.active_algo.MonthlyRebalance()
# Log the results
self.LogRiskControlPortfolio()
except Exception as e:
self.Error(f"Failed to fake market conditions: {str(e)}")
import traceback
self.Debug(traceback.format_exc())
from AlgorithmImports import *
import numpy as np
from datetime import timedelta
class MarketCapWeightedSP500Tracker(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2025, 1, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Daily
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.selected_by_market_cap = []
self.rebalance_flag = False
self.spy_30day_window = RollingWindow[float](30)
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
# Initialize rolling window with historical data
history = self.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
# Add simple tracking of market trend
self.trend_lookback = 10
self.spy_prices = {}
self.max_spy_history = 60 # Days of price history to keep
# Add dynamic stop-loss enhancement
self.stop_loss_base = 0.04 # Reduced base stop-loss threshold
self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold
# Expanded list of inverse and defensive ETFs
# Original inverse ETFs
self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500
self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100
self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones
self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000
self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets
self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400
# Alternative defensive ETFs (not inverse but potentially good in downturns)
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury
self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market
# Sector-based defensive ETFs (often outperform in bear markets)
self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples
self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities
self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare
self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare
self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples
# Group all defensive ETFs together
self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
self.alternative_defensive = [self.gld, self.ief, self.bnd]
self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive
# Add diagnostic logging capability
self.diagnostic_mode = True # Enable detailed diagnostics
# Initialize positions tracking and add weekly tactical adjustment
self.defensive_positions = set()
self.last_defensive_update = datetime(1900, 1, 1)
# Add weekly defensive ETF evaluation schedule
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance
self.WeeklyDefensiveAdjustment)
# Initialize positions tracking
self.inverse_positions = set()
# Add inverse ETF lookback windows for better momentum calculation
self.inverse_lookback_short = 7 # 1 week momentum window
self.inverse_lookback_med = 15 # Medium-term momentum
# Add ATR indicators for enhanced volatility-based stop-loss calculation
self.atr_period = 14
self.atr = {}
# Register ATR for key symbols (defensive ETFs, BIL, and SPY)
for symbol in self.all_defensive + [self.bil, self.spy]:
self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily)
# Add flag for forced rebalance override
self.force_rebalance_override = False
# Add logging for rebalance decisions
self.rebalance_log = []
# Add expected allocations tracking for diagnostics
self.expected_allocations = {'bil': 0, 'equity': 0, 'defensive': 0}
# Add a flag for when debugging allocations
self.debug_allocation_details = True
def CoarseSelectionFunction(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
return [x.Symbol for x in sorted_by_cap]
def SetRebalanceFlag(self):
if self.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
def OnData(self, data):
# Update price window
if not data.Bars.ContainsKey(self.spy): return
self.spy_30day_window.Add(data.Bars[self.spy].Close)
# Track prices for trend calculation
self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close
# Remove old prices
dates_to_remove = []
for date in self.spy_prices.keys():
if (self.Time.date() - date).days > self.max_spy_history:
dates_to_remove.append(date)
for date in dates_to_remove:
self.spy_prices.pop(date)
market_trend = self._calculateMarketTrend()
# Track if any stop-loss was triggered
stop_loss_triggered = False
# Check stop-loss triggers with improved dynamic thresholds
for kvp in self.Portfolio:
symbol = kvp.Key
holding = kvp.Value
if holding.Invested and symbol != self.bil:
current_price = self.Securities[symbol].Price
if symbol not in self.entry_prices:
self.entry_prices[symbol] = current_price
price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol]
# Start with the base threshold and adjust based on market trend
stop_threshold = self.stop_loss_base
if market_trend < -0.03:
stop_threshold *= 0.9 # tighten in downtrends
elif market_trend > 0.03:
stop_threshold *= 1.1 # loosen in uptrends
# Incorporate ATR if ready with adjustment to prevent overreaction in high volatility
if symbol in self.atr and self.atr[symbol].IsReady:
current_atr = self.atr[symbol].Current.Value
atr_pct = current_atr / current_price
# If ATR is excessively high versus our base, use a lower weight to temper the effect
effective_weight = self.dynamic_stop_weight
if atr_pct > stop_threshold * 1.2:
effective_weight = min(self.dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold +
effective_weight * atr_pct)
if price_drop >= stop_threshold:
self.Liquidate(symbol)
stop_loss_triggered = True
self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
# If any stop-loss was triggered, invest all available cash in BIL
if stop_loss_triggered:
available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash
if available_cash > 0:
bil_price = self.Securities[self.bil].Price
bil_quantity = available_cash / bil_price
self.MarketOrder(self.bil, bil_quantity)
self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss")
def WeeklyDefensiveAdjustment(self):
"""Weekly check and adjustment for defensive ETF positions"""
# Skip if we've done the monthly rebalance recently
days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999
if days_since_rebalance < 3:
return
# Skip if we've updated defensive positions recently
days_since_update = (self.Time.date() - self.last_defensive_update.date()).days
if days_since_update < 5: # At most once a week
return
# Calculate current market conditions
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Skip in strong bull markets
if market_deviation > 0.04 and market_trend > 0.03:
return
# Calculate total invested amount including all positions
total_invested = sum(holding.HoldingsValue for holding in self.Portfolio.Values
if holding.Invested) / self.Portfolio.TotalPortfolioValue
# If we're already fully invested, can't add more defensive positions
if total_invested >= 0.98: # Allow small buffer for rounding errors
self.Debug(f"Already fully invested ({total_invested:.2f}), skipping defensive adjustments")
return
# Calculate available room for defensive positions
available_allocation = max(0, 0.99 - total_invested) # Keep tiny buffer
# Calculate how much is currently allocated to defensive positions
current_defensive_value = sum(self.Portfolio[s].HoldingsValue
for s in self.defensive_positions
if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested)
# Calculate current BIL allocation
current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0
bil_allocation = current_bil_value / self.Portfolio.TotalPortfolioValue
# Limit potential allocation to available room
max_defensive_pct = min(0.25, available_allocation / bil_allocation if bil_allocation > 0 else 0)
potential_allocation = bil_allocation * max_defensive_pct
# Make sure we don't exceed available room
potential_allocation = min(potential_allocation, available_allocation)
# Super detailed diagnostics for current defensive positions
if self.diagnostic_mode and self.defensive_positions:
self.Debug(f"WEEKLY CHECK - Current defensive positions:")
for symbol in self.defensive_positions:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
position = self.Portfolio[symbol]
entry = self.entry_prices.get(symbol, position.AveragePrice)
current = self.Securities[symbol].Price
pnl_pct = (current / entry) - 1 if entry > 0 else 0
self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f}")
# Evaluate current defensive positions and potential new ones
self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%")
self.Debug(f"BIL allocation: {bil_allocation*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%")
# Run the defensive ETF evaluation
new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation)
# Calculate which positions to add, modify, or remove
positions_to_add = {}
positions_to_remove = set()
# Process existing positions
for symbol in self.defensive_positions:
# If position should be kept but maybe at different allocation
if symbol in new_allocations and new_allocations[symbol] > 0:
current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue if self.Portfolio.ContainsKey(symbol) else 0
target_pct = new_allocations[symbol]
# If allocation difference is significant, adjust position
if abs(target_pct - current_pct) > 0.01:
positions_to_add[symbol] = target_pct
# Remove from new allocations dict to avoid double-processing
new_allocations.pop(symbol)
else:
# Position should be removed
positions_to_remove.add(symbol)
# Add any remaining new positions
for symbol, allocation in new_allocations.items():
if allocation > 0.01: # Minimum meaningful allocation
positions_to_add[symbol] = allocation
# Check if we'll exceed our allocation limits with new positions
total_new_allocation = sum(positions_to_add.values())
if total_new_allocation > available_allocation:
# Scale back allocations to fit available space
scale_factor = available_allocation / total_new_allocation
for symbol in positions_to_add:
positions_to_add[symbol] *= scale_factor
self.Debug(f"Scaled defensive allocations to fit available space: {scale_factor:.4f}")
# Execute trades if needed
if positions_to_add or positions_to_remove:
self.Debug(f"WEEKLY ADJUSTMENT - Making defensive position changes")
# Remove positions no longer needed
for symbol in positions_to_remove:
self.Liquidate(symbol)
self.defensive_positions.remove(symbol)
self.Debug(f"Removed defensive position: {symbol}")
# Add or adjust positions
for symbol, allocation in positions_to_add.items():
self.SetHoldings(symbol, allocation)
self.defensive_positions.add(symbol)
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Updated defensive position: {symbol} to {allocation*100:.2f}%")
self.last_defensive_update = self.Time
def MonthlyRebalance(self):
"""Modified to provide much more detailed logging about decisions"""
# Check if we're forcing a rebalance regardless of flag
if not self.rebalance_flag and not self.force_rebalance_override:
self.Debug("MonthlyRebalance called but both rebalance_flag and force_rebalance_override are False - skipping")
return
self.Debug(f"EXECUTING MONTHLY REBALANCE - rebalance_flag: {self.rebalance_flag}, override: {self.force_rebalance_override}")
# Reset the override flag after using it
self.force_rebalance_override = False
self.rebalance_flag = False
self.entry_prices.clear() # Reset entry prices at rebalance
# Add more detailed logging at key decision points
if self.spy_30day_window.Count < 30:
self.Debug(f"ERROR: Not enough SPY history. Window count: {self.spy_30day_window.Count}")
# CRITICAL FIX: Use fake data if needed to ensure algorithm runs
# Fill window with last available price
spy_price = self.Securities[self.spy].Price
while self.spy_30day_window.Count < 30:
self.spy_30day_window.Add(spy_price)
self.Debug(f"Filled insufficient window with current price. New count: {self.spy_30day_window.Count}")
# Comprehensive logging of the 30-day window data
window_data = [self.spy_30day_window[i] for i in range(self.spy_30day_window.Count)]
self.Debug(f"SPY 30-day window first 5 values: {window_data[:5]}")
self.Debug(f"SPY 30-day window last 5 values: {window_data[-5:]}")
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / 30
# Log raw data
self.Debug(f"RAW DATA - Current SPY price: {spy_price:.2f}, 30-day SMA: {sma_30:.2f}")
# Calculate market deviation for better decisions
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
self.Debug(f"REBALANCE ANALYSIS - Market deviation: {market_deviation*100:.2f}%, Trend: {market_trend*100:.2f}%")
# Enhanced BIL allocation logic with lower caps
bil_weight = 0.0
if spy_price < sma_30:
# Enhanced formula for better downside protection
base_weight = (sma_30 - spy_price) / sma_30
if base_weight > 0.08: # Significant drop
# Lower cap on BIL for significant drops
bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%)
else:
bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%)
# Enhanced reduction rule for better returns in bull markets
if market_deviation > 0.05: # Strong bull market
min_bil_allocation = self.previous_bil_allocation * 0.7 # 30% reduction
elif market_deviation > 0.02: # Modest bull market
min_bil_allocation = self.previous_bil_allocation * 0.75 # 25% reduction
else:
min_bil_allocation = self.previous_bil_allocation * 0.8 # Standard 20% reduction
# Ensure we have a valid min_bil_allocation
if min_bil_allocation <= 0 or np.isnan(min_bil_allocation):
min_bil_allocation = 0
self.Debug("WARNING: min_bil_allocation was invalid, reset to 0")
bil_weight = max(bil_weight, min_bil_allocation)
# Debug the BIL allocation decision in detail
self.Debug(f"BIL ALLOCATION CALCULATION:")
self.Debug(f" Initial bil_weight based on deviation: {bil_weight*100:.2f}%")
self.Debug(f" Previous BIL allocation: {self.previous_bil_allocation*100:.2f}%")
self.Debug(f" Minimum BIL allocation: {min_bil_allocation*100:.2f}%")
# Lower caps on BIL in all market conditions
original_bil_weight = bil_weight # Save for debugging
if market_deviation > 0.08: # Very strong bull
bil_weight = min(bil_weight, 0.15) # Cap at 15% (was 20%)
self.Debug(f" Applied very strong bull market cap: {bil_weight*100:.2f}%")
elif market_deviation > 0.05: # Strong bull
bil_weight = min(bil_weight, 0.25) # Cap at 25% (was 30%)
self.Debug(f" Applied strong bull market cap: {bil_weight*100:.2f}%")
elif market_deviation > 0.0: # Mild bull
bil_weight = min(bil_weight, 0.4) # Cap at 40% (new tier)
self.Debug(f" Applied mild bull market cap: {bil_weight*100:.2f}%")
elif market_deviation > -0.03: # Neutral
bil_weight = min(bil_weight, 0.5) # Cap at 50% (new tier)
self.Debug(f" Applied neutral market cap: {bil_weight*100:.2f}%")
else: # Bear
bil_weight = min(bil_weight, 0.6) # Cap at 60% (new tier)
self.Debug(f" Applied bear market cap: {bil_weight*100:.2f}%")
if bil_weight != original_bil_weight:
self.Debug(f" BIL allocation was capped from {original_bil_weight*100:.2f}% to {bil_weight*100:.2f}%")
# Calculate how much of the original BIL allocation to potentially use for inverse ETFs
original_bil = bil_weight
# Use only a portion of BIL for inverse ETFs, keeping some as BIL
inverse_etf_potential = original_bil * 0.4 # Use 40% of BIL allocation for inverse ETFs
bil_weight = original_bil - inverse_etf_potential
# Run diagnostics on defensive ETFs
if self.diagnostic_mode:
self._runDefensiveETFDiagnostics(market_deviation, market_trend)
# Evaluate inverse ETFs for possible allocation
inverse_allocations = self._evaluateInverseETFs(market_deviation, market_trend, inverse_etf_potential)
# Include alternative defensive ETFs in evaluation
all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, inverse_etf_potential)
# Calculate total allocation to defensive ETFs
total_defensive_allocation = sum(all_defensive_allocations.values())
# Set aside remainder as cash (won't be allocated)
cash_reserve = inverse_etf_potential - total_defensive_allocation
# Calculate weight for equity portion
equity_weight = 1.0 - total_defensive_allocation
# Ensure total allocation never exceeds 100%
total_allocation = bil_weight + total_defensive_allocation + equity_weight
if total_allocation > 1.0:
# Scale back components proportionally
scale_factor = 1.0 / total_allocation
bil_weight *= scale_factor
equity_weight *= scale_factor
# Scale each defensive allocation
for symbol in all_defensive_allocations:
all_defensive_allocations[symbol] *= scale_factor
total_defensive_allocation = sum(all_defensive_allocations.values())
self.Debug(f"Scaled allocations to prevent leverage: {scale_factor:.4f}")
self.Debug(f"Allocation breakdown: Equity {equity_weight*100:.1f}%, BIL {bil_weight*100:.1f}%, " +
f"Defensive ETFs {total_defensive_allocation*100:.1f}%, Cash {cash_reserve*100:.1f}%")
# Enhance stock selection with simple momentum filter
momentum_scores = self._calculateSimpleMomentum()
# Filter out worst momentum stocks
filtered_stocks = []
for symbol, mcap in self.selected_by_market_cap:
score = momentum_scores.get(symbol, 1.0)
if score >= 0.9: # Keep only neutral or positive momentum stocks
filtered_stocks.append((symbol, mcap))
# If we filtered too many, revert to original list
if len(filtered_stocks) < 20:
filtered_stocks = self.selected_by_market_cap
# Calculate weights using the filtered stocks
total_market_cap = sum([x[1] for x in filtered_stocks])
weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in filtered_stocks}
# CRITICAL: Check if we actually have any stocks selected
if len(self.selected_by_market_cap) == 0:
self.Debug("CRITICAL ERROR: No stocks in selected_by_market_cap!")
# Attempt to manually select some large cap stocks as a fallback
fallback_tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "BRK.B", "JNJ", "V", "PG", "UNH", "JPM", "HD", "MA", "XOM"]
self.Debug(f"Using fallback list of {len(fallback_tickers)} large cap stocks")
fallback_stocks = []
# Add each stock we can find
for ticker in fallback_tickers:
try:
# Try to find the symbol
symbol = None
for kvp in self.Securities:
if kvp.Key.Value == ticker:
symbol = kvp.Key
break
if symbol is not None:
# Get approximate market cap (using price as proxy)
price = self.Securities[symbol].Price
# Rough estimate of market cap using price as proxy (shares outstanding unknown)
fake_mcap = price * 1000000000 # Use price in billions as stand-in
fallback_stocks.append((symbol, fake_mcap))
self.Debug(f"Added fallback stock: {ticker}")
except Exception as e:
self.Debug(f"Error adding fallback stock {ticker}: {str(e)}")
if len(fallback_stocks) > 0:
self.selected_by_market_cap = fallback_stocks
self.Debug(f"Added {len(fallback_stocks)} fallback stocks")
else:
# Direct fallback - add SPY with 100% allocation
self.Debug("EMERGENCY FALLBACK: Using SPY as only equity position")
self.SetHoldings(self.spy, equity_weight)
self.SetHoldings(self.bil, bil_weight)
# Set BIL allocation for next time
self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue
# Exit early since we've handled allocations directly
return
# Log how many stocks we have
self.Debug(f"Using {len(self.selected_by_market_cap)} stocks for market cap weighting")
# If we made it here, continue with existing code to filter stocks and calculate weights
# CRITICAL: Add diagnostic logging for equity allocations
self.Debug(f"EQUITY ALLOCATION DETAILS:")
self.Debug(f"Filtered stocks count: {len(filtered_stocks)}")
self.Debug(f"Total market cap: ${total_market_cap:,.2f}")
self.Debug(f"Total equity weight: {equity_weight*100:.2f}%")
# Log top equity positions
top_equities = sorted(weights.items(), key=lambda x: x[1], reverse=True)[:10]
self.Debug(f"Top 10 equity allocations:")
for symbol, weight in top_equities:
self.Debug(f" {symbol.Value}: {weight*100:.2f}% (Market cap: ${filtered_stocks[[s[0] for s in filtered_stocks].index(symbol)][1]:,.2f})")
# Track successful allocations
successful_allocations = []
failed_allocations = []
invested = set()
for symbol, weight in weights.items():
if weight > 0:
try:
# CRITICAL: Add minimum threshold for equity positions
if weight >= 0.005: # Minimum 0.5% allocation
order_ticket = self.SetHoldings(symbol, weight)
# Check if order was placed successfully
if order_ticket:
invested.add(symbol)
successful_allocations.append((symbol.Value, weight))
self.entry_prices[symbol] = self.Securities[symbol].Price
else:
failed_allocations.append((symbol.Value, weight, "Order ticket null"))
else:
self.Debug(f"Skipping {symbol.Value} - allocation too small: {weight*100:.2f}%")
except Exception as e:
failed_allocations.append((symbol.Value, weight, str(e)))
self.Debug(f"Error setting holdings for {symbol.Value}: {str(e)}")
# Log allocation results
self.Debug(f"Successfully allocated to {len(successful_allocations)} equity positions")
if failed_allocations:
self.Debug(f"Failed to allocate to {len(failed_allocations)} positions:")
for symbol, weight, reason in failed_allocations[:5]: # Log first 5 failures
self.Debug(f" {symbol}: {weight*100:.2f}% - Reason: {reason}")
# Set BIL position
if bil_weight > 0:
self.SetHoldings(self.bil, bil_weight)
invested.add(self.bil)
else:
self.Liquidate(self.bil)
# Set defensive ETF positions
for symbol, weight in all_defensive_allocations.items():
if weight > 0:
self.SetHoldings(symbol, weight)
invested.add(symbol)
self.defensive_positions.add(symbol) # Using renamed set
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Allocated {weight*100:.2f}% to defensive ETF {symbol}")
elif symbol in self.defensive_positions:
self.Liquidate(symbol)
self.defensive_positions.remove(symbol)
# Update last rebalance date tracker
self.last_rebalance_date = self.Time
# Store current BIL allocation for next month's minimum
self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue
self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)")
# Liquidate positions not in current selection
for kvp in self.Portfolio:
symbol = kvp.Key
if (kvp.Value.Invested and symbol not in invested
and symbol != self.spy and symbol not in self.defensive_positions):
self.Liquidate(symbol)
# Near the end of the method, add extra debugging to confirm positions were set
def log_allocations(weights, defensive_allocations, bil):
self.Debug(f"FINAL ALLOCATION SUMMARY:")
self.Debug(f" BIL allocation: {bil*100:.2f}%")
self.Debug(f" Equity allocations: {len(weights)} positions")
if weights:
total_equity = sum(weights.values())
self.Debug(f" Total equity weight: {total_equity*100:.2f}%")
top_equities = sorted(weights.items(), key=lambda x: x[1], reverse=True)[:5]
for sym, wt in top_equities:
self.Debug(f" {sym.Value}: {wt*100:.2f}%")
self.Debug(f" Defensive allocations: {len(defensive_allocations)} positions")
if defensive_allocations:
total_defensive = sum(defensive_allocations.values())
self.Debug(f" Total defensive weight: {total_defensive*100:.2f}%")
for sym, wt in defensive_allocations.items():
self.Debug(f" {sym.Value}: {wt*100:.2f}%")
# Log final allocations before setting them
log_allocations(weights, all_defensive_allocations, bil_weight)
# Near end of function, add a verification step
self.Debug("VERIFICATION OF FINAL PORTFOLIO:")
invested_positions = [kvp for kvp in self.Portfolio.Values if kvp.Invested]
equity_positions = [p for p in invested_positions if p.Symbol != self.bil and p.Symbol not in self.defensive_positions]
self.Debug(f"Final equity positions: {len(equity_positions)}/{len(filtered_stocks)}")
self.Debug(f"Final defensive positions: {len([p for p in invested_positions if p.Symbol in self.defensive_positions])}")
self.Debug(f"BIL position: {1 if self.Portfolio[self.bil].Invested else 0}")
def _calculateMarketTrend(self):
"""Calculate recent market trend using price history"""
if len(self.spy_prices) < self.trend_lookback + 1:
return 0 # Not enough data
dates = sorted(self.spy_prices.keys())
if len(dates) <= self.trend_lookback:
return 0
recent_price = self.spy_prices[dates[-1]]
older_price = self.spy_prices[dates[-self.trend_lookback]]
return (recent_price / older_price) - 1.0
def _calculateSimpleMomentum(self):
"""Calculate simple momentum scores for stock filtering"""
momentum_scores = {}
symbols = [sym for sym, _ in self.selected_by_market_cap]
if not symbols:
return momentum_scores
# Get 30 days of history for all stocks
history = self.History(symbols, 30, Resolution.Daily)
if history.empty:
return momentum_scores
# Calculate simple momentum (30-day price change)
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# 30-day momentum
mom = prices.iloc[-1] / prices.iloc[0] - 1
# Convert to a score between 0.7 and 1.3
# Center around 1.0, with range based on 15% move
momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2)))
return momentum_scores
def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced evaluation of inverse ETFs with more sensitive criteria"""
allocations = {symbol: 0 for symbol in self.inverse_etfs}
# More permissive consideration of inverse ETFs
if market_deviation > 0.04 and market_trend > 0.02:
return allocations # Only skip in very strong bull markets
# Get more history for better momentum calculation
history = self.History(self.inverse_etfs, 45, Resolution.Daily)
if history.empty:
return allocations
# Enhanced momentum scoring
momentum_scores = {}
volatility_scores = {}
for symbol in self.inverse_etfs:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Multiple timeframe momentum - more emphasis on recent performance
mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
mom_30d = prices.iloc[-1] / prices.iloc[0] - 1
# Weight recent momentum much more heavily
momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2)
# Calculate volatility (lower is better for inverse ETFs)
returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))]
volatility = np.std(returns) if returns else 0
# Calculate short-term rate of change (acceleration)
if len(prices) >= 10:
recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1
prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1
acceleration = recent_5d_change - prev_5d_change
else:
acceleration = 0
# Momentum score adds weight for accelerating performance
momentum_scores[symbol] = momentum + (acceleration * 0.5)
volatility_scores[symbol] = volatility
# More aggressive filtering - consider even small positive momentum
positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005}
# No allocation if no ETFs have at least neutral momentum
if not positive_momentum_etfs:
self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash")
return allocations
# Enhanced selection: favor momentum but consider volatility too
best_candidates = []
for symbol, score in positive_momentum_etfs.items():
volatility = volatility_scores.get(symbol, 1.0)
# Adjust score: higher momentum is good, lower volatility is good
adjusted_score = score - (volatility * 0.5)
best_candidates.append((symbol, score, adjusted_score))
# Sort by adjusted score
best_candidates.sort(key=lambda x: x[2], reverse=True)
# More aggressive allocation model
allocation_pct = 0.0
# Allocate based on market conditions with more sensitivity
if market_deviation < -0.05:
allocation_pct = 1.0 # Use 100% of available inverse allocation
elif market_deviation < -0.03:
allocation_pct = 0.8 # Use 80% of available inverse allocation
elif market_deviation < -0.01:
allocation_pct = 0.6 # Use 60% of available inverse allocation
elif market_deviation < 0.01: # Even in slight bull market if momentum is positive
allocation_pct = 0.4 # Use 40% of available inverse allocation
else:
allocation_pct = 0.2 # Use 20% only if momentum is strong enough
# No candidates or market conditions don't justify allocation
if not best_candidates or allocation_pct < 0.1:
return allocations
# Take top 1-2 ETFs depending on market conditions
num_etfs = 1
if market_deviation < -0.04 and len(best_candidates) > 1:
num_etfs = 2 # Use two ETFs in stronger downtrends
# Allocate to best ETF(s)
remaining_allocation = max_allocation * allocation_pct
for i in range(min(num_etfs, len(best_candidates))):
symbol, raw_score, _ = best_candidates[i]
# Allocate proportionally to momentum strength, with a minimum threshold
etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3
# Calculate allocation for this ETF
etf_allocation = remaining_allocation * etf_weight / num_etfs
# Only allocate if it's a meaningful amount
if etf_allocation >= 0.01: # At least 1% allocation
allocations[symbol] = etf_allocation
self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%")
return allocations
def _runDefensiveETFDiagnostics(self, market_deviation, market_trend):
"""Run detailed diagnostics on all defensive ETFs"""
# Get extensive history for analysis
history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily)
if history.empty:
return
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0,
"15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
# Log market conditions
self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " +
f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%")
# Analyze each ETF
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate multiple timeframe performance
perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate recent acceleration
recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
accel = recent_5d - prev_5d
# Calculate relative performance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
if period == "7d":
rel_perf[period] = perf_7d - spy_val
elif period == "15d":
rel_perf[period] = perf_15d - spy_val
elif period == "30d":
rel_perf[period] = perf_30d - spy_val
# Log detailed ETF statistics
self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " +
f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " +
f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%")
def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced defensive ETF evaluation with sector rotation"""
allocations = {symbol: 0 for symbol in self.all_defensive}
# Skip if market is very bullish
if market_deviation > 0.04 and market_trend > 0.02:
return allocations
# Get history for all defensive options and SPY
history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily)
if history.empty:
return allocations
# Detailed diagnostics on all ETFs
self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS:")
# Calculate SPY performance for relative comparisons
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
self.Debug(f" SPY: 5d: {spy_perf['5d']*100:.1f}%, 10d: {spy_perf['10d']*100:.1f}%, " +
f"20d: {spy_perf['20d']*100:.1f}%, 30d: {spy_perf['30d']*100:.1f}%")
# Enhanced scoring system with different criteria for different ETF types
etf_scores = {}
# Process each ETF by type
for group_name, group in [("Inverse", self.inverse_etfs),
("Alternative", self.alternative_defensive),
("Sector", self.sector_defensive)]:
self.Debug(f" {group_name} ETFs:")
for symbol in group:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate absolute momentum components
perf = {}
perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0
perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate relative outperformance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
rel_perf[period] = perf[period] - spy_val
# Log detailed performance
self.Debug(f" {symbol}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf['5d']*100:+.1f}%), " +
f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf['10d']*100:+.1f}%), " +
f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf['30d']*100:+.1f}%)")
# Inverse ETFs need to show positive momentum in down markets
if symbol in self.inverse_etfs:
# In downtrends, rising inverse ETFs are good
if market_deviation < -0.02:
score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2)
# Bonus for relative outperformance
score += (rel_perf["5d"] + rel_perf["10d"]) * 0.15
else:
# Less emphasis on long-term performance in neutral markets
score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
# Alternative defensive (bonds, gold) - focus on absolute return
elif symbol in self.alternative_defensive:
# Less dramatic movements, need lower thresholds
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
# In downtrends, emphasize relative performance more
if market_deviation < -0.03:
score += rel_perf["10d"] * 0.2 # Bonus for outperformance
# Sector ETFs - focus on relative outperformance
else:
# These should have positive absolute returns and outperform SPY
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score = (rel_perf["5d"] * 0.3) + (rel_perf["10d"] * 0.3) + (rel_perf["30d"] * 0.4)
# Balance absolute and relative performance
if market_deviation < -0.02:
# In downtrends, relative outperformance is more important
score = (abs_score * 0.4) + (rel_score * 0.6)
else:
# In neutral markets, absolute performance matters more
score = (abs_score * 0.6) + (rel_score * 0.4)
etf_scores[symbol] = score
# Find candidates with appropriate momentum based on market conditions
threshold = -0.007 # Default threshold
if market_deviation < -0.03:
threshold = -0.01 # More permissive in stronger downturns
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Debug("No defensive ETFs showed sufficient momentum - keeping as cash")
return allocations
# Sort and log candidate scores
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
self.Debug(f"Top 5 defensive candidates:")
for symbol, score in sorted_candidates[:5]:
group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f" {symbol} ({group}): Score {score*100:.2f}%")
# Set allocation percent based on market conditions and trend
allocation_pct = 0.0
if market_deviation < -0.05 or market_trend < -0.04:
allocation_pct = 0.95 # Almost all available allocation
elif market_deviation < -0.03 or market_trend < -0.02:
allocation_pct = 0.8
elif market_deviation < -0.01 or market_trend < -0.01:
allocation_pct = 0.6
else:
allocation_pct = 0.4
# Adjust allocation based on strength of best candidate
best_score = sorted_candidates[0][1] if sorted_candidates else 0
allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4))
# Determine number of ETFs to use - more in stronger downtrends
num_etfs = 1
if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1:
num_etfs = min(2, len(sorted_candidates))
# Allocate to best candidates
remaining_allocation = max_allocation * allocation_pct
total_score = sum(score for _, score in sorted_candidates[:num_etfs])
if total_score > 0:
for i in range(num_etfs):
symbol, score = sorted_candidates[i]
# Weight by relative score
weight = score / total_score if total_score > 0 else 1.0/num_etfs
# Calculate allocation
etf_allocation = remaining_allocation * weight
# Only allocate if meaningful
if etf_allocation >= 0.02: # 2% minimum allocation
allocations[symbol] = etf_allocation
etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f"Selected {etf_type} ETF {symbol} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%")
return allocations# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from sklearn.preprocessing import RobustScaler
class KQTStrategy:
def __init__(self):
self.model = None
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {}
self.adaptive_threshold = 0.2
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = []
self.defensive_mode = False
self.previous_day_hit_stops = []
def create_sliding_sequences(self, df, feature_cols, lookback, stride=1):
X = []
for i in range(0, len(df) - lookback + 1, stride):
X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32))
return np.array(X)
def clip_outliers(self, df, cols, lower=0.01, upper=0.99):
df_copy = df.copy()
for col in cols:
if col in df_copy.columns:
q_low = df_copy[col].quantile(lower)
q_high = df_copy[col].quantile(upper)
df_copy.loc[df_copy[col] < q_low, col] = q_low
df_copy.loc[df_copy[col] > q_high, col] = q_high
return df_copy
def filter_features_to_match_model(self, df, feature_cols, required_count=5):
"""Ensure we have exactly the required number of features"""
if len(feature_cols) == required_count:
return feature_cols
# First, prioritize the lag returns (most important)
lag_features = [col for col in feature_cols if 'return_lag' in col]
# Next, add in the most predictive technical features in a fixed order
tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m',
'oversold', 'overbought', 'roc_diff', 'volatility_regime']
prioritized_features = lag_features.copy()
for feat in tech_priority:
if feat in feature_cols and len(prioritized_features) < required_count:
prioritized_features.append(feat)
# If still not enough, add remaining features
remaining = [col for col in feature_cols if col not in prioritized_features]
while len(prioritized_features) < required_count and remaining:
prioritized_features.append(remaining.pop(0))
# If too many, truncate
return prioritized_features[:required_count]
def add_technical_features(self, df):
if 'Close' not in df.columns:
return df
df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price
df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1
df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal
df['volatility_10'] = df['Close'].pct_change().rolling(10).std()
df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std()
df['roc_5'] = df['Close'].pct_change(5)
df['roc_10'] = df['Close'].pct_change(10)
df['roc_diff'] = df['roc_5'] - df['roc_10']
df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1)
return df.fillna(0)
def add_enhanced_features(self, df):
"""Add enhanced technical features"""
df['volatility_trend'] = df['volatility_10'].pct_change(5)
df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int)
if 'volume' in df.columns:
df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean()
df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5']
df['momentum_1m'] = df['Close'].pct_change(20)
df['momentum_3m'] = df['Close'].pct_change(60)
df['momentum_breadth'] = (
(df['roc_5'] > 0).astype(int) +
(df['momentum_1m'] > 0).astype(int) +
(df['momentum_3m'] > 0).astype(int)
) / 3
df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10']
df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int)
df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int)
df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int)
df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001)
return df
def prepare_stock_data(self, stock_data, ticker, is_training=False):
"""Prepare data for a single stock"""
if len(stock_data) < self.lookback + 5: # Need enough data
return None, None
stock_df = pd.DataFrame({
'Close': stock_data['close'].values,
'time': stock_data['time'].values
})
if 'volume' in stock_data.columns:
stock_df['volume'] = stock_data['volume'].values
stock_df = stock_df.sort_values('time').reset_index(drop=True)
stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100
# In prepare_stock_data, replace the feature cols section with:
feature_cols = []
# Add basic lag features
for i in range(1, 6):
col_name = f'return_lag{i}'
stock_df[col_name] = stock_df['pct_return'].shift(i)
feature_cols.append(col_name)
# Add technical features
stock_df = self.add_technical_features(stock_df)
stock_df = self.add_enhanced_features(stock_df)
# Add all potential features
additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20']
enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m',
'momentum_breadth', 'mean_rev_signal', 'oversold',
'overbought', 'regime_change', 'risk_adj_momentum']
for col in additional_features + enhanced_features:
if col in stock_df.columns:
feature_cols.append(col)
# Filter to the exact number of features expected by the model
model_feature_count = 5 # Use the exact count from your model
feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count)
if not self.feature_cols:
self.feature_cols = feature_cols.copy()
stock_df = stock_df.dropna().reset_index(drop=True)
# Handle outliers
stock_df = self.clip_outliers(stock_df, feature_cols)
# Replace the scaling code in prepare_stock_data with this:
# Scale features
if ticker not in self.scalers or is_training:
# Check if we have data
if len(stock_df) == 0 or len(feature_cols) == 0:
return None, stock_df # Return early if no data
# Check if any features are empty/nan
if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty:
# Fill NaNs with zeros
stock_df[feature_cols] = stock_df[feature_cols].fillna(0)
# Ensure we have data
if len(stock_df[feature_cols]) > 0:
try:
scaler = RobustScaler()
stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols])
self.scalers[ticker] = scaler
except Exception as e:
print(f"Scaling error for {ticker}: {str(e)}")
# Use a simple standardization as fallback
for col in feature_cols:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
else:
return None, stock_df # Return early if empty after processing
else:
# Use existing scaler
scaler = self.scalers[ticker]
try:
stock_df[feature_cols] = scaler.transform(stock_df[feature_cols])
except Exception as e:
print(f"Transform error for {ticker}: {str(e)}")
# Simple standardization fallback
for col in feature_cols:
if col in stock_df.columns and len(stock_df[col]) > 0:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
# Create sequences for prediction
X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1)
if len(X) == 0:
return None, stock_df
return X, stock_df
# Add to strategy.py in KQTStrategy class
def calculate_portfolio_risk_score(self, market_returns):
"""Calculate a portfolio risk score (0-100) to scale overall exposure"""
risk_score = 50 # Neutral starting point
# VIX-like volatility measurement using SPY returns
if len(market_returns) >= 5:
recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized
longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol
# Volatility spike detection
vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1
if vol_ratio > 1.5: # Sharp volatility increase
risk_score -= 30
elif vol_ratio > 1.2:
risk_score -= 15
# Consecutive negative days
if len(market_returns) >= 3:
neg_days = sum(1 for r in market_returns[-3:] if r < 0)
if neg_days == 3: # Three consecutive down days
risk_score -= 20
elif neg_days == 2:
risk_score -= 10
# Trend direction
if len(market_returns) >= 10:
avg_recent = np.mean(market_returns[-5:])
avg_older = np.mean(market_returns[-10:-5])
trend_change = avg_recent - avg_older
# Declining trend
if trend_change < -0.3:
risk_score -= 15
# Accelerating uptrend
elif trend_change > 0.3 and avg_recent > 0:
risk_score += 10
return max(10, min(100, risk_score)) # Constrain between 10-100
def predict_returns(self, X, ticker):
"""Make predictions for a single stock"""
if self.model is None:
return 0
if ticker not in self.stock_to_id:
self.stock_to_id[ticker] = len(self.stock_to_id)
stock_id = self.stock_to_id[ticker]
try:
X_tensor = torch.tensor(X, dtype=torch.float32)
stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long)
with torch.no_grad():
predictions = self.model(X_tensor, stock_ids)
# Convert to standard Python float for safety
return float(predictions.detach().numpy().flatten()[-1])
except Exception as e:
print(f"Prediction error for {ticker}: {e}")
return 0 # Return neutral prediction on error
def detect_market_regime(self, daily_returns, lookback=10):
"""Detect current market regime based on portfolio returns"""
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(self.portfolio_returns[-3:])
less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
if len(self.portfolio_returns) >= 5:
recent_portfolio_returns = self.portfolio_returns[-5:]
pos_days = sum(1 for r in recent_portfolio_returns if r > 0)
neg_days = sum(1 for r in recent_portfolio_returns if r < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(self.portfolio_returns) >= 10:
recent_vol = np.std(self.portfolio_returns[-5:])
older_vol = np.std(self.portfolio_returns[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(self.portfolio_returns) >= 5:
if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None):
"""Generate position sizing based on predictions with improved diversification"""
if not prediction_data:
return {}
# Update market regime
if current_returns is not None:
self.current_regime = self.detect_market_regime(current_returns)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5
# Calculate portfolio risk score (0-100)
portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else [])
# Convert to a scaling factor (0.1 to 1.0)
risk_scaling = portfolio_risk_score / 100
base_threshold = 0.25 * self.pred_std
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
self.adaptive_threshold = base_threshold * 0.4
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
self.adaptive_threshold = base_threshold * 2.5
elif self.current_regime in ["bearish", "bearish_low_vol"]:
self.adaptive_threshold = base_threshold * 1.6
elif self.current_regime in ["bullish_pullback"]:
self.adaptive_threshold = base_threshold * 0.9
else: # neutral or other regimes
self.adaptive_threshold = base_threshold * 0.75
positions = {}
# Group stocks by sector
sector_data = {}
for ticker, data in prediction_data.items():
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
if sector not in sector_data:
sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return,
"composite_score": pred_return / self.adaptive_threshold
})
# Rank sectors by predicted return
sector_avg_scores = {}
for sector, stocks in sector_data.items():
sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks])
# CHANGE: Include more sectors (3-4 instead of just 2)
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
top_sector_count = 3 if portfolio_risk_score > 60 else 2 # More diversification in lower risk periods
top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))]
# CHANGE: Allow more stocks per sector in bull markets
stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2
# Allocate within top sectors - focus on stocks with strongest signals
for sector in top_sectors:
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
# Take top N stocks in each selected sector
top_stocks = sector_stocks[:min(stocks_per_sector, len(sector_stocks))]
# CHANGE: Make position size proportional to signal strength but limited by volatility
for stock in top_stocks:
ticker = stock["ticker"]
signal_strength = stock["pred_return"] / (0.2 * self.pred_std)
# Base size calculation
base_size = min(0.3, max(0.05, 0.15 * signal_strength))
# Scale by portfolio risk
final_size = base_size * risk_scaling
positions[ticker] = final_size
# Defensive adjustments
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]:
# 1. Reduce overall position sizes
scaling_factor = 0.5 if self.defensive_mode else 0.7 # More aggressive reduction
for ticker in positions:
positions[ticker] *= scaling_factor
# 2. Add inverse positions (shorts) as hedges if we have bearish predictions
if len(positions) > 0 and portfolio_risk_score < 40: # Only hedge in higher risk environments
negative_preds = {t: data["pred_return"] for t, data in prediction_data.items()
if data["pred_return"] < 0 and t not in positions}
if negative_preds:
worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2]
for ticker, pred in worst_stocks:
hedge_size = -0.15 if self.defensive_mode else -0.1
positions[ticker] = hedge_size
# CRITICAL FIX: Add overall position cap to prevent excessive leverage
# Calculate how many positions we'll have in total
total_position_count = sum(min(stocks_per_sector, len(sector_data.get(sector, []))) for sector in top_sectors)
# Limit per-position size based on total count to avoid excessive leverage
max_position_size = min(0.3, max(0.05, 0.95 / max(total_position_count, 1)))
# NEW: Add a post-processing step to ensure total allocation is reasonable
total_allocation = sum(abs(size) for size in positions.values())
if total_allocation > 0.95:
scaling_factor = 0.95 / total_allocation
for ticker in positions:
positions[ticker] *= scaling_factor
# Also ensure no single position is too large (could cause extreme leverage)
for ticker in list(positions.keys()):
if abs(positions[ticker]) > max_position_size:
positions[ticker] = max_position_size * (1 if positions[ticker] > 0 else -1)
return positions
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
if self.defensive_mode:
return -2.0 # Tighter in defensive mode
else:
return -3.5 # More room for positions to breathe
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
return -1.5 # Tighter stop-loss in bearish regimes
else:
if self.defensive_mode:
return -1.8
else:
return -2.5
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
if len(self.portfolio_returns) > 60: # Keep a rolling window
self.portfolio_returns = self.portfolio_returns[-60:]
def update_model_calibration(self, all_predictions):
"""Update prediction standard deviation for threshold calibration"""
all_pred_values = [p for p in all_predictions.values()]
if len(all_pred_values) > 5:
self.pred_std = np.std(all_pred_values)