| Overall Statistics |
|
Total Orders 6498 Average Win 0.26% Average Loss -0.18% Compounding Annual Return 29.921% Drawdown 20.500% Expectancy 0.260 Start Equity 1000000 End Equity 4567006.29 Net Profit 356.701% Sharpe Ratio 1.255 Sortino Ratio 1.478 Probabilistic Sharpe Ratio 81.510% Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.39 Alpha 0.143 Beta 0.386 Annual Standard Deviation 0.148 Annual Variance 0.022 Information Ratio 0.46 Tracking Error 0.168 Treynor Ratio 0.48 Total Fees $116266.71 Estimated Strategy Capacity $0 Lowest Capacity Asset DIS R735QTJ8XC9X Portfolio Turnover 33.06% |
# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
# --- Add explicit import ---
from QuantConnect.Algorithm.Framework.Portfolio import PortfolioTarget
# ---
from datetime import timedelta
import numpy as np
import pandas as pd
# Assuming KQTStrategy is correctly defined in strategy.py
from strategy import KQTStrategy
class KQTStrategyModule:
def __init__(self, algorithm):
self.algorithm = algorithm
self.strategy = KQTStrategy()
self.lookback = 60
self.tickers = []
self.symbols = {}
self.sector_mappings = {}
self.strategy.sector_mappings = self.sector_mappings # Share the dictionary
# Add SPY for market data (used within KQT logic)
self.spy = self.algorithm.AddEquity("SPY", Resolution.Daily).Symbol
# Storage for historical data and predictions
self.stock_data = {}
self.current_predictions = {}
self.previous_positions = {}
self.previous_portfolio_value = 0
# Track stopped out positions
self.stopped_out = set()
def Initialize(self):
"""Initialize specific settings for KQT mode"""
self.algorithm.SetBenchmark("SPY") # KQT uses SPY benchmark
# Universe selection is specific to KQT
self._universe = self.algorithm.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
# Schedule the trading function
self.trade_schedule = self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(),
self.algorithm.TimeRules.At(10, 0), # 10:00 AM Eastern
self.TradeExecute)
self.previous_portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue # Initialize for return calc
def Activate(self):
"""Actions to take when this strategy becomes active."""
self.algorithm.Log("Activating KQT Strategy Module")
# Ensure universe is active
if not self._universe:
self._universe = self.algorithm.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
# Ensure schedule is active (QC schedule API doesn't have enable/disable, rely on flag in TradeExecute)
self.previous_portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue # Reset portfolio value baseline
def Deactivate(self):
"""Actions to take when this strategy becomes inactive."""
self.algorithm.Log("Deactivating KQT Strategy Module")
# Remove universe specific to KQT? Or keep symbols? Keep for now.
# self.algorithm.RemoveUniverse(self._universe) # Be careful if symbols are shared
# self._universe = None
# Clear internal state if necessary
self.current_predictions = {}
self.previous_positions = {}
self.stock_data = {}
self.stopped_out.clear()
def CoarseSelectionFunction(self, coarse):
# Use algorithm time
if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 30: # Avoid excessive logging
self.algorithm.Log(f"KQT Coarse Selection: {len(coarse)} symbols")
sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sorted_by_dollar_volume[:500]]
def FineSelectionFunction(self, fine):
# Use algorithm time
if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 30: # Avoid excessive logging
self.algorithm.Log(f"KQT Fine Selection: {len(fine)} symbols")
sorted_by_market_cap = sorted(fine, key=lambda x: x.MarketCap, reverse=True)
selected = sorted_by_market_cap[:100]
current_symbols = set(self.symbols.keys())
new_symbols = set()
for f in selected:
ticker = f.Symbol.Value
new_symbols.add(ticker)
if ticker not in self.symbols:
self.symbols[ticker] = f.Symbol # Store symbol object
# Try multiple ways to get sector information
sector = "Unknown"
try:
if hasattr(f, 'AssetClassification') and f.AssetClassification is not None:
if hasattr(f.AssetClassification, 'MorningstarSectorCode'): sector = str(f.AssetClassification.MorningstarSectorCode)
elif hasattr(f.AssetClassification, 'GicsSectorCode'): sector = str(f.AssetClassification.GicsSectorCode) # Common alternative
elif hasattr(f.AssetClassification, 'Sector'): sector = f.AssetClassification.Sector
elif hasattr(f.AssetClassification, 'Industry'): sector = f.AssetClassification.Industry # Fallback
except Exception as e:
self.algorithm.Debug(f"Error getting sector for {ticker}: {str(e)}")
self.sector_mappings[ticker] = sector
# Update tickers list based on fine selection
self.tickers = [ticker for ticker in self.tickers if ticker in new_symbols]
for ticker in new_symbols:
if ticker not in self.tickers:
self.tickers.append(ticker)
# Clean up symbols/mappings for removed tickers from fine selection
removed_symbols = current_symbols - new_symbols
for ticker in removed_symbols:
if ticker in self.symbols: del self.symbols[ticker]
if ticker in self.sector_mappings: del self.sector_mappings[ticker]
if ticker in self.stock_data: del self.stock_data[ticker]
if ticker in self.tickers: self.tickers.remove(ticker)
return [self.symbols[ticker] for ticker in self.tickers]
def OnSecuritiesChanged(self, changes):
# This might be redundant if FineSelectionFunction handles updates,
# but good for explicitly handling removals reported by QC.
self.algorithm.Log(f"KQT OnSecuritiesChanged: Added {len(changes.AddedSecurities)}, Removed {len(changes.RemovedSecurities)}")
# Additions are handled by FineSelection
for removed in changes.RemovedSecurities:
ticker = removed.Symbol.Value
if ticker in self.tickers: self.tickers.remove(ticker)
if ticker in self.symbols: del self.symbols[ticker]
if ticker in self.sector_mappings: del self.sector_mappings[ticker]
if ticker in self.stock_data: del self.stock_data[ticker]
# Liquidate position if removed from universe
if self.algorithm.Portfolio[removed.Symbol].Invested:
self.algorithm.Log(f"Liquidating {ticker} due to removal from KQT universe.")
self.algorithm.Liquidate(removed.Symbol)
def OnData(self, data):
"""Handle daily data updates if necessary (primary logic is in TradeExecute)."""
pass
def TradeExecute(self):
"""Execute trading logic daily before market close"""
# Check if this strategy is active in the main algorithm
if not self.algorithm.is_kqt_active:
# self.algorithm.Debug("KQT TradeExecute skipped: Not active.") # Keep this commented unless debugging activation issues
return
# Market Open Check
if not self.algorithm.Securities.ContainsKey(self.spy) or not self.algorithm.Securities[self.spy].Exchange.ExchangeOpen:
# self.algorithm.Debug(f"KQT Skipping TradeExecute: Market closed or SPY not ready. Time: {self.algorithm.Time}") # Keep this commented unless debugging market open issues
return
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT TradeExecute STARTING on {self.algorithm.Time}. Active: {self.algorithm.is_kqt_active}. Universe size: {len(self.tickers)}")
# 1. Update historical data
self.UpdateHistoricalData()
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT Updated historical data. {len(self.stock_data)} stocks with data.")
# 2. Generate predictions (now always uses fallback)
self.current_predictions = self.GeneratePredictions()
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT Generated {len(self.current_predictions)} predictions: {self.current_predictions}") # Log generated predictions
# 3. Check for stop losses
self.ProcessStopLosses()
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT Processed stop losses. Stopped out: {self.stopped_out}")
# 4. Generate new position sizes
market_returns = self.GetMarketReturns()
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT Market returns for position gen: {market_returns}") # Log market returns input
target_positions = self.strategy.generate_positions(self.current_predictions, market_returns, algorithm=self.algorithm) # Pass self.algorithm for logging
# --- Logging moved inside generate_positions ---
# self.algorithm.Log(f"KQT Target positions from strategy: {target_positions}")
# 5. Execute trades
self.ExecuteTrades(target_positions)
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT Executed trades.")
# 6. Update portfolio return for regime detection
daily_return = self.CalculatePortfolioReturn()
self.strategy.update_portfolio_returns(daily_return)
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT Updated portfolio returns. Daily return: {daily_return:.4f}")
# 7. Store today's value for tomorrow's calculation
self.previous_portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue
# --- ADDED LOG ---
self.algorithm.Log(f"--> KQT TradeExecute FINISHED on {self.algorithm.Time}.")
def CalculatePortfolioReturn(self):
"""Calculate today's portfolio return"""
current_value = self.algorithm.Portfolio.TotalPortfolioValue
if self.previous_portfolio_value > 0:
daily_return = (current_value / self.previous_portfolio_value - 1) * 100
else:
daily_return = 0
return daily_return
def UpdateHistoricalData(self):
"""Fetch and update historical data for all symbols"""
active_tickers = list(self.symbols.keys()) # Use currently selected symbols
if not active_tickers:
self.algorithm.Log("KQT UpdateHistoricalData: No active tickers.")
self.stock_data = {}
return
symbols_to_request = [self.symbols[ticker] for ticker in active_tickers]
history = self.algorithm.History(symbols_to_request, self.lookback + 5, Resolution.Daily) # Get slightly more for indicators
if history.empty:
self.algorithm.Log("KQT UpdateHistoricalData: History request returned empty.")
self.stock_data = {}
return
history = history.reset_index() # This creates 'time' and 'symbol' columns
for ticker in active_tickers:
symbol_obj = self.symbols[ticker]
# Filter history for the specific symbol
symbol_history = history[history['symbol'] == symbol_obj]
if symbol_history.empty or len(symbol_history) < self.lookback:
if ticker in self.stock_data: del self.stock_data[ticker] # Remove old data if insufficient now
continue
self.stock_data[ticker] = symbol_history
def GetMarketReturns(self):
"""Get recent market returns for regime detection"""
spy_history = self.algorithm.History(self.spy, 15, Resolution.Daily) # Need ~10 returns
if spy_history.empty or len(spy_history) < 2:
return []
spy_prices = spy_history.loc[self.spy]['close'] if self.spy in spy_history.index else pd.Series()
if len(spy_prices) < 2: return []
spy_returns = spy_prices.pct_change().dropna() * 100
return spy_returns.tolist()[-10:] # Return last 10 days
def GeneratePredictions(self):
"""Generate predictions for all stocks using ONLY the momentum fallback logic."""
predictions = {}
# Use Debug for potentially verbose logs
self.algorithm.Debug(f"KQT Generating fallback predictions for {len(self.stock_data)} stocks.")
for ticker, history_df in self.stock_data.items():
if ticker not in self.symbols: continue # Ensure symbol exists
try:
if history_df.empty or len(history_df) < 20: # Need at least 20 for fallback MA
self.algorithm.Debug(f"KQT Skipping {ticker}: Not enough history ({len(history_df)}) for fallback.")
continue
# --- Fallback Momentum Logic ---
closes = history_df['close'].values
if len(closes) > 20:
short_ma = np.mean(closes[-5:])
long_ma = np.mean(closes[-20:])
momentum = closes[-1] / closes[-10] - 1 if len(closes) > 10 else 0
pred_score = momentum + 0.5 * (short_ma/long_ma - 1) if long_ma != 0 else momentum
pred_return = pred_score * 2 # Keep scaling from original fallback
# Store the prediction
threshold = 0.1 # Example: Use a fixed threshold for fallback scores
predictions[ticker] = {
"pred_return": pred_return,
"composite_score": pred_return / threshold if threshold != 0 else pred_return
}
else:
self.algorithm.Debug(f"KQT Fallback skipped for {ticker}: Not enough close prices ({len(closes)}).")
continue
# --- End Fallback Logic ---
except Exception as e:
self.algorithm.Log(f"KQT Error processing {ticker} in GeneratePredictions: {str(e)}")
import traceback
self.algorithm.Log(traceback.format_exc())
continue
return predictions
def ProcessStopLosses(self):
"""Check and process stop loss orders"""
stop_loss_level = self.strategy.get_stop_loss_level()
self.stopped_out.clear() # Reset daily
for ticker in list(self.symbols.keys()): # Iterate over current universe
symbol = self.symbols[ticker]
if not self.algorithm.Portfolio[symbol].Invested:
continue
position = self.algorithm.Portfolio[symbol]
# Use history to get daily return
history = self.algorithm.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2: continue
close_prices = history.loc[symbol]['close'] if symbol in history.index else pd.Series()
if len(close_prices) < 2: continue
daily_return = (close_prices.iloc[-1] / close_prices.iloc[-2] - 1) * 100
position_type = "long" if position.Quantity > 0 else "short"
hit_stop = False
if position_type == "long" and daily_return < stop_loss_level:
hit_stop = True
self.algorithm.Log(f"KQT Stop loss triggered for {ticker} (long): {daily_return:.2f}% < {stop_loss_level:.2f}%")
elif position_type == "short" and daily_return > abs(stop_loss_level): # Stop loss for shorts is positive return
hit_stop = True
self.algorithm.Log(f"KQT Stop loss triggered for {ticker} (short): {daily_return:.2f}% > {abs(stop_loss_level):.2f}%")
if hit_stop:
self.stopped_out.add(ticker)
self.algorithm.Liquidate(symbol, f"KQT Stop Loss {daily_return:.2f}%")
def ExecuteTrades(self, target_positions):
"""Execute trades to reach target positions using CalculateOrderQuantity and MarketOrder"""
self.algorithm.Log(f"--- KQT ExecuteTrades START ---") # Mark start clearly
portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue
# --- Implement Liquidation Logic for Empty Targets ---
if not target_positions:
self.algorithm.Log("KQT: No target positions received. Liquidating existing KQT assets.")
liquidated_count = 0
# Iterate through symbols managed by KQT
for ticker, symbol in self.symbols.items():
if self.algorithm.Portfolio[symbol].Invested:
if ticker not in self.stopped_out:
self.algorithm.Log(f"KQT: Liquidating {ticker} ({symbol.ID}) due to empty target list.")
# --- Use Liquidate ---
self.algorithm.Liquidate(symbol, "KQT Empty Target")
liquidated_count += 1
else:
self.algorithm.Log(f"KQT: Skipping liquidation for {ticker} (empty target, but stopped out).")
self.algorithm.Log(f"KQT: Liquidated {liquidated_count} assets due to empty target list.")
self.algorithm.Log(f"--- KQT ExecuteTrades END (No Targets) ---")
return
# --- End Liquidation Logic ---
self.algorithm.Log(f"KQT Received {len(target_positions)} target positions: {target_positions}")
if portfolio_value <= 0:
self.algorithm.Log("KQT ExecuteTrades: Zero or negative portfolio value.")
self.algorithm.Log(f"--- KQT ExecuteTrades END (Zero Value) ---")
return
# Log portfolio state BEFORE execution
holdings_before = {kvp.Key.Value: kvp.Value.Quantity for kvp in self.algorithm.Portfolio if kvp.Value.Invested}
self.algorithm.Log(f"Portfolio holdings BEFORE KQT execution: {holdings_before}")
# Scale positions
total_allocation = sum(abs(weight) for weight in target_positions.values())
# --- Adjust Max Allocation Cap ---
max_allowed_allocation = 0.8 # Increased from 0.7, matching original KQT cap
# ---
if total_allocation > max_allowed_allocation:
scaling_factor = max_allowed_allocation / total_allocation
self.algorithm.Log(f"KQT: Scaling positions by {scaling_factor:.3f} to meet max allocation {max_allowed_allocation*100}%. Original total: {total_allocation:.3f}")
scaled_targets = {ticker: weight * scaling_factor for ticker, weight in target_positions.items()}
else:
scaled_targets = target_positions.copy() # Use a copy
self.algorithm.Log(f"KQT Scaled target weights: {scaled_targets}")
processed_symbols = set()
# --- Use CalculateOrderQuantity and MarketOrder ---
# Execute trades for target positions
for ticker, target_weight in scaled_targets.items():
if ticker in self.stopped_out:
self.algorithm.Log(f"KQT: Skipping trade for {ticker}, recently stopped out.")
continue
if ticker not in self.symbols:
self.algorithm.Log(f"KQT: Skipping trade for {ticker}, not in current universe symbols map.")
continue
symbol = self.symbols[ticker]
processed_symbols.add(symbol) # Track symbols targeted by KQT
try:
target_weight_float = float(target_weight)
except Exception as cast_e:
self.algorithm.Error(f"Could not cast target_weight '{target_weight}' to float for {ticker}: {cast_e}")
continue
if not np.isfinite(target_weight_float):
self.algorithm.Error(f"Invalid target weight for {ticker}: {target_weight_float}. Skipping.")
continue
# --- Minimum weight check ---
min_abs_weight = 0.0001
if abs(target_weight_float) < min_abs_weight:
self.algorithm.Log(f"Target weight for {ticker} ({target_weight_float:.6f}) is below minimum {min_abs_weight}.")
# If weight is near zero, ensure position is closed if currently held
if self.algorithm.Portfolio[symbol].Invested:
self.algorithm.Log(f" Liquidating {ticker} due to near-zero target weight.")
# --- Use Liquidate ---
self.algorithm.Liquidate(symbol, "KQT Near-Zero Target")
continue
# ---
# Calculate the order quantity
quantity = self.algorithm.CalculateOrderQuantity(symbol, target_weight_float)
self.algorithm.Log(f"Calculated quantity for {ticker} (Target: {target_weight_float:.4f}): {quantity}")
if quantity != 0:
# Place the market order
self.algorithm.Log(f"Placing MarketOrder for {ticker}, Quantity: {quantity}")
try:
order_ticket = self.algorithm.MarketOrder(symbol, quantity)
if order_ticket.Status == OrderStatus.Invalid:
self.algorithm.Error(f"MarketOrder failed for {ticker}, Quantity: {quantity}. Reason: {order_ticket.GetErrorMessage()}")
else:
self.algorithm.Log(f" Order submitted for {ticker}: ID {order_ticket.OrderId}, Status {order_ticket.Status}")
except Exception as order_e:
self.algorithm.Error(f"Exception placing MarketOrder for {ticker}, Quantity: {quantity}: {order_e}")
else:
# If quantity is 0, but we hold the asset, liquidate it (target weight is non-zero but rounds to 0 shares)
if self.algorithm.Portfolio[symbol].Invested:
self.algorithm.Log(f"Calculated quantity is 0 for {ticker}, but position is held. Liquidating.")
# --- Use Liquidate ---
self.algorithm.Liquidate(symbol, "KQT Zero Quantity Target")
# else: # Optional log if quantity is 0 and not held
# self.algorithm.Log(f"Calculated quantity is 0 for {ticker}, no position held.")
# Liquidate KQT-managed positions no longer in target (and not stopped out)
self.algorithm.Log(f"Checking for KQT assets to liquidate (not in scaled targets: {list(scaled_targets.keys())})")
for kqt_ticker, kqt_symbol in self.symbols.items():
# Check if the symbol was processed OR if it's invested but shouldn't be
if kqt_symbol not in processed_symbols and self.algorithm.Portfolio[kqt_symbol].Invested:
# This KQT asset is held but wasn't in the target_positions
if kqt_ticker not in self.stopped_out:
self.algorithm.Log(f"KQT: Liquidating {kqt_ticker} ({kqt_symbol.ID}) as it's no longer targeted by KQT.")
# --- Use Liquidate ---
self.algorithm.Liquidate(kqt_symbol, "KQT No Longer Targeted")
else:
self.algorithm.Log(f"KQT: {kqt_ticker} not targeted, but skipping liquidation (recently stopped out).")
# Log portfolio state AFTER execution
holdings_after = {kvp.Key.Value: kvp.Value.Quantity for kvp in self.algorithm.Portfolio if kvp.Value.Invested}
self.algorithm.Log(f"Portfolio holdings AFTER KQT execution: {holdings_after}")
self.algorithm.Log(f"--- KQT ExecuteTrades END ---")
# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
from sklearn.preprocessing import RobustScaler
class KQTStrategy:
def __init__(self):
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {}
self.adaptive_threshold = 0.1
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = []
self.defensive_mode = False
self.previous_day_hit_stops = []
self.algorithm = None
def calculate_portfolio_risk_score(self, market_returns):
"""Calculate a portfolio risk score (0-100) to scale overall exposure"""
risk_score = 50 # Neutral starting point
# VIX-like volatility measurement using SPY returns
if len(market_returns) >= 5:
recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized
longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol
# Volatility spike detection
vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1
if vol_ratio > 1.5: # Sharp volatility increase
risk_score -= 30
elif vol_ratio > 1.2:
risk_score -= 15
# Consecutive negative days
if len(market_returns) >= 3:
neg_days = sum(1 for r in market_returns[-3:] if r < 0)
if neg_days == 3: # Three consecutive down days
risk_score -= 20
elif neg_days == 2:
risk_score -= 10
# Trend direction
if len(market_returns) >= 10:
avg_recent = np.mean(market_returns[-5:])
avg_older = np.mean(market_returns[-10:-5])
trend_change = avg_recent - avg_older
# Declining trend
if trend_change < -0.3:
risk_score -= 15
# Accelerating uptrend
elif trend_change > 0.3 and avg_recent > 0:
risk_score += 10
return max(10, min(100, risk_score)) # Constrain between 10-100
def detect_market_regime(self, daily_returns, lookback=10):
"""Detect current market regime based on portfolio returns"""
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(self.portfolio_returns[-3:])
less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
if len(self.portfolio_returns) >= 5:
recent_portfolio_returns = self.portfolio_returns[-5:]
pos_days = sum(1 for r in recent_portfolio_returns if r > 0)
neg_days = sum(1 for r in recent_portfolio_returns if r < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(self.portfolio_returns) >= 10:
recent_vol = np.std(self.portfolio_returns[-5:])
older_vol = np.std(self.portfolio_returns[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(self.portfolio_returns) >= 5:
if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None, algorithm=None): # Add algorithm parameter
"""Generate position sizing based on predictions with improved diversification"""
# Store algorithm instance for logging
if algorithm:
self.algorithm = algorithm
else:
# Fallback if algorithm instance isn't passed (should not happen from module)
print("Warning: Algorithm instance not provided to generate_positions for logging.")
log_func = print
log_func = self.algorithm.Log if self.algorithm else print # Use Log for important info
# --- Logging Start ---
log_func(f"--- generate_positions ---")
log_func(f"Input predictions count: {len(prediction_data)}") # Log count instead of full dict initially
# self.algorithm.Debug(f"Input predictions data: {prediction_data}") # Use Debug for potentially large dict
log_func(f"Input market returns: {current_returns}")
# --- Logging End ---
if not prediction_data:
log_func("generate_positions: No prediction data provided.")
return {}
# Update market regime
if current_returns is not None and len(current_returns) > 0:
self.current_regime = self.detect_market_regime(current_returns)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5
else:
# Default if no returns provided
self.current_regime = "neutral"
self.defensive_mode = False
# --- Define Bullish Regimes and Tech Sector ---
bullish_regimes = {"bullish_strong", "breakout_bullish", "bullish", "bullish_pullback"}
is_bullish = self.current_regime in bullish_regimes
# !!! IMPORTANT: Verify this identifier matches your actual sector data (e.g., GICS code '45') !!!
TECH_SECTOR_IDENTIFIER = '45'
tech_boost_factor = 1.15 # Apply a 15% boost in bullish regimes
# ---
# Calculate portfolio risk score (0-100)
portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else [])
# Convert to a scaling factor (0.1 to 1.0)
risk_scaling = portfolio_risk_score / 100
# --- INCREASE MIN RISK SCALING FLOOR ---
min_risk_scaling = 0.75 # Increased from 0.4 to 0.75 (ensures at least 75% of potential allocation is used)
# ---
risk_scaling = max(min_risk_scaling, risk_scaling)
# ---
# --- Logging ---
log_func(f"Regime: {self.current_regime}, Defensive Mode: {self.defensive_mode}")
log_func(f"Portfolio Risk Score: {portfolio_risk_score}, Risk Scaling (min {min_risk_scaling}): {risk_scaling:.2f}")
# --- Logging End ---
# Adjust threshold based on regime (using the fixed default threshold now)
base_threshold = self.adaptive_threshold # Use the fixed threshold from __init__
current_threshold = base_threshold # Keep it simple for now, regime adjustment might need tuning for fallback scores
# --- Logging ---
log_func(f"Using Threshold: {current_threshold}")
# --- Logging End ---
positions = {}
# Group stocks by sector
sector_data = {}
valid_predictions = 0
for ticker, data in prediction_data.items():
# Ensure data has the expected keys
if "pred_return" not in data:
log_func(f"Warning: Missing 'pred_return' for {ticker}")
continue
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
# --- Apply Tech Boost in Bullish Regime ---
boost_applied = False
if is_bullish and sector == TECH_SECTOR_IDENTIFIER:
original_pred = pred_return
pred_return *= tech_boost_factor
boost_applied = True
# Optional Debug Log:
# self.algorithm.Debug(f"Applied {tech_boost_factor}x boost to {ticker} (Tech) in {self.current_regime} regime. Original: {original_pred:.4f}, Boosted: {pred_return:.4f}")
# ---
if sector not in sector_data:
sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return, # Use potentially boosted value
# Use the current_threshold for composite score
"composite_score": pred_return / current_threshold if current_threshold != 0 else pred_return
})
valid_predictions += 1
# --- ADDED LOG ---
log_func(f"Found {valid_predictions} valid predictions.")
# ---
if valid_predictions == 0:
log_func("generate_positions: No valid predictions after filtering.")
return {}
# Rank sectors by average predicted return
sector_avg_scores = {}
for sector, stocks in sector_data.items():
if stocks: # Ensure sector has stocks
sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks])
else:
sector_avg_scores[sector] = -np.inf # Penalize empty sectors
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
# --- Reduce Sector Count ---
top_sector_count = 4 if portfolio_risk_score > 60 else 3 # Reduced from 5/4
# ---
top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))]
# --- Logging ---
log_func(f"Ranked Sectors: {ranked_sectors}")
log_func(f"Top Sectors Selected ({top_sector_count}): {top_sectors}")
# --- Logging End ---
# --- Reduce Stocks Per Sector ---
stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2 # Reduced from 4/3
# ---
# Allocate within top sectors
selected_stocks_for_positioning = []
for sector in top_sectors:
if sector not in sector_data: continue # Skip if sector somehow has no data
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
top_stocks_in_sector = sector_stocks[:min(stocks_per_sector, len(sector_stocks))]
selected_stocks_for_positioning.extend(top_stocks_in_sector)
# --- Logging ---
log_func(f"Sector '{sector}': Top stocks {[s['ticker'] for s in top_stocks_in_sector]} with scores {[f'{s:.3f}' for s in [st['pred_return'] for st in top_stocks_in_sector]]}")
# --- Logging End ---
# --- Log count before filtering ---
log_func(f"Selected {len(selected_stocks_for_positioning)} stocks across top sectors before size filtering.")
# ---
# Calculate position sizes for selected stocks
log_func(f"Calculating positions for selected stocks.") # Log count
for stock in selected_stocks_for_positioning:
ticker = stock["ticker"]
# Use pred_return directly for signal strength with fallback scores
signal_strength = stock["pred_return"]
# --- Adjust Base Size Calculation & Filter (Less Aggressive) ---
# Decreased multiplier
# Kept max base size high (0.6) - allows concentration if signal is strong
# Increased filter threshold
base_size_multiplier = 1.2 # Decreased from 1.5 to 1.2
max_base_size = 0.5 # Decreased from 0.6 to 0.5
min_base_size_threshold = 0.02 # Increased from 0.05 to 0.06
base_size = min(max_base_size, max(0.01, base_size_multiplier * signal_strength))
# --- Hysteresis Check (Optional) ---
# entry_threshold_multiplier = 1.1 # Require 10% higher base size to enter than to stay
# previously_held = ticker in self.algorithm.kqt_previous_positions and self.algorithm.kqt_previous_positions[ticker] > 0
# required_threshold = min_base_size_threshold if previously_held else min_base_size_threshold * entry_threshold_multiplier
# if base_size > required_threshold:
# --- Original Check (No Hysteresis) ---
if base_size > min_base_size_threshold: # Use the increased threshold
# ---
final_size = base_size * risk_scaling
# --- Increase minimum final size threshold ---
min_final_size = 0.015 # Increased from 0.04 to 0.055 (5.5%)
if final_size >= min_final_size:
positions[ticker] = final_size
# --- Logging ---
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size: {base_size:.3f}, Final Size: {final_size:.3f}")
# --- Logging End ---
else:
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size: {base_size:.3f}, Final Size ({final_size:.3f}) too small after risk scaling (Min: {min_final_size}), skipping.")
else:
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size ({base_size:.3f}) too small or negative (Threshold: {min_base_size_threshold}), skipping.")
# Defensive adjustments
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]:
# --- Soften Defensive Scaling ---
scaling_factor = 0.9 if self.defensive_mode else 0.99 # Increased from 0.7/0.85
# ---
log_func(f"Defensive Adjustment: Scaling positions by {scaling_factor}")
for ticker in list(positions.keys()): # Iterate over keys copy
positions[ticker] *= scaling_factor
# Use the increased min_final_size as the post-scaling check too
# --- Use the SAME min_final_size threshold after scaling ---
if positions[ticker] < min_final_size: # Check against the 4% threshold again
log_func(f" Removing {ticker} due to small size ({positions[ticker]:.4f}) after defensive scaling (Min: {min_final_size}).")
del positions[ticker]
# --- Temporarily Disable Hedges ---
# Add hedges (shorts) based on negative predictions
# if portfolio_risk_score < 40:
# negative_preds = {t: data["pred_return"] for t, data in prediction_data.items()
# if "pred_return" in data and data["pred_return"] < -0.05 and t not in positions} # Threshold for shorting
#
# if negative_preds:
# worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2]
# log_func(f"Defensive Adjustment: Adding Hedges for {worst_stocks}")
# for ticker, pred in worst_stocks:
# hedge_size = -0.15 if self.defensive_mode else -0.1
# positions[ticker] = hedge_size
# log_func(f" Adding hedge {ticker} with size {hedge_size}")
# ---
# --- Logging Final ---
log_func(f"Final positions generated ({len(positions)}): {positions}")
log_func(f"--- generate_positions END ---")
# --- Logging End ---
return positions
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
if self.defensive_mode:
return -2.0 # Tighter in defensive mode
else:
return -3.5 # More room for positions to breathe
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
return -1.5 # Tighter stop-loss in bearish regimes
else:
if self.defensive_mode:
return -1.8
else:
return -2.5
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
if len(self.portfolio_returns) > 60: # Keep a rolling window
self.portfolio_returns = self.portfolio_returns[-60:]
# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta, datetime
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from strategy import KQTStrategy
def calculate_ema(prices, span):
return pd.Series(prices).ewm(span=span, adjust=False).mean().values
def calculate_rsi(prices, period=14):
deltas = np.diff(prices)
seed = deltas[:period+1]
up = seed[seed >= 0].sum()/period
down = -seed[seed < 0].sum()/period
if down == 0: return 100
rs = up/down
rsi = np.zeros_like(prices)
rsi[:period] = 100. - 100./(1. + rs)
for i in range(period, len(prices)):
delta = deltas[i-1]
if delta > 0:
upval = delta
downval = 0.
else:
upval = 0.
downval = -delta
up = (up * (period-1) + upval) / period
down = (down * (period-1) + downval) / period
rs = up/down if down != 0 else float('inf')
rsi[i] = 100. - 100./(1. + rs)
return rsi
def calculate_macd(prices, fast=12, slow=26, signal=9):
prices = np.array(prices)
ema_fast = pd.Series(prices).ewm(span=fast, adjust=False).mean().values
ema_slow = pd.Series(prices).ewm(span=slow, adjust=False).mean().values
macd_line = ema_fast - ema_slow
signal_line = pd.Series(macd_line).ewm(span=signal, adjust=False).mean().values
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def calculate_atr(high, low, close, period=14):
if len(high) != len(low) or len(high) != len(close):
raise ValueError("Input arrays must have the same length")
tr = np.zeros(len(high))
tr[0] = high[0] - low[0]
for i in range(1, len(tr)):
tr[i] = max(
high[i] - low[i],
abs(high[i] - close[i-1]),
abs(low[i] - close[i-1])
)
atr = np.zeros_like(tr)
atr[0] = tr[0]
for i in range(1, len(atr)):
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period
return atr
class KQTAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2024, 12, 31)
self.SetCash(1000000)
self.previous_portfolio_value = 0
self.current_strategy_mode = "KQT"
self.vix = self.AddIndex("VIX", Resolution.Daily).Symbol
self.vix_threshold = 30
self.SetBenchmark("SPY")
self.strategy = KQTStrategy()
self.kqt_lookback = 60
self.kqt_tickers = []
self.kqt_symbols = {}
self.kqt_sector_mappings = {}
self.strategy.sector_mappings = self.kqt_sector_mappings
self.kqt_stock_data = {}
self.kqt_current_predictions = {}
self.kqt_previous_positions = {}
self.kqt_stopped_out = set()
self.rc_spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.rc_bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.rc_selected_by_market_cap = []
self.rc_rebalance_flag = False
self.rc_spy_30day_window = RollingWindow[float](30)
self.rc_entry_prices = {}
self.rc_previous_bil_allocation = 0.0
self.rc_trend_lookback = 10
self.rc_spy_prices = {}
self.rc_max_spy_history = 60
self.rc_stop_loss_base = 0.04
self.rc_dynamic_stop_weight = 0.5
self.rc_atr_period = 14
self.rc_atr = {}
self.rc_defensive_positions = set()
self.rc_last_defensive_update = datetime(1900, 1, 1)
self.rc_last_rebalance_date = datetime(1900, 1, 1)
# Modified tolerances to reduce trading frequency
self.kqt_rebalance_tolerance = 0.02 # Increased from 0.01 to 2%
self.rc_rebalance_tolerance = 0.02 # Increased from 0.01 to 2%
# Add cooldown for mode switching
self.last_mode_switch_time = None
self.min_days_between_switches = 5
self.rc_sh = self.AddEquity("SH", Resolution.Daily).Symbol
self.rc_psq = self.AddEquity("PSQ", Resolution.Daily).Symbol
self.rc_dog = self.AddEquity("DOG", Resolution.Daily).Symbol
self.rc_rwm = self.AddEquity("RWM", Resolution.Daily).Symbol
self.rc_eum = self.AddEquity("EUM", Resolution.Daily).Symbol
self.rc_myd = self.AddEquity("MYY", Resolution.Daily).Symbol
self.rc_gld = self.AddEquity("GLD", Resolution.Daily).Symbol
self.rc_ief = self.AddEquity("IEF", Resolution.Daily).Symbol
self.rc_bnd = self.AddEquity("BND", Resolution.Daily).Symbol
self.rc_xlp = self.AddEquity("XLP", Resolution.Daily).Symbol
self.rc_xlu = self.AddEquity("XLU", Resolution.Daily).Symbol
self.rc_xlv = self.AddEquity("XLV", Resolution.Daily).Symbol
self.rc_vht = self.AddEquity("VHT", Resolution.Daily).Symbol
self.rc_vdc = self.AddEquity("VDC", Resolution.Daily).Symbol
self.rc_inverse_etfs = [self.rc_sh, self.rc_psq, self.rc_dog, self.rc_rwm, self.rc_eum, self.rc_myd]
self.rc_alternative_defensive = [self.rc_gld, self.rc_ief, self.rc_bnd]
self.rc_sector_defensive = [self.rc_xlp, self.rc_xlu, self.rc_xlv, self.rc_vht, self.rc_vdc]
self.rc_all_defensive = self.rc_inverse_etfs + self.rc_alternative_defensive + self.rc_sector_defensive
self.rc_diagnostic_mode = True
for symbol in self.rc_all_defensive + [self.rc_bil, self.rc_spy]:
self.rc_atr[symbol] = self.ATR(symbol, self.rc_atr_period, Resolution.Daily)
self.UniverseSettings.Resolution = Resolution.Daily
self._universe = self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.TradeExecute)
self.Schedule.On(self.DateRules.MonthStart(self.rc_spy), self.TimeRules.AfterMarketOpen(self.rc_spy, 30), self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.rc_spy, DayOfWeek.Wednesday), self.TimeRules.AfterMarketOpen(self.rc_spy, 30), self.MonthlyRebalance)
self.Schedule.On(self.DateRules.WeekStart(self.rc_spy, DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.rc_spy, 60), self.WeeklyDefensiveAdjustment)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.vix, 5), self.CheckVixAndManageState)
self.spy = self.rc_spy
self.spy_sma50 = self.SMA(self.spy, 50, Resolution.Daily)
self.spy_sma200 = self.SMA(self.spy, 200, Resolution.Daily)
history_spy = self.History(self.spy, 200, Resolution.Daily)
if not history_spy.empty:
for time, row in history_spy.loc[self.spy].iterrows():
close_price = row["close"]
self.rc_spy_30day_window.Add(close_price)
self.spy_sma50.Update(time, close_price)
self.spy_sma200.Update(time, close_price)
self.TryLoadModelWeights()
def CheckVixAndManageState(self):
if not self.Securities.ContainsKey(self.vix) or not self.Securities[self.vix].HasData or \
not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].HasData or \
not self.spy_sma50.IsReady or not self.spy_sma200.IsReady:
self.Log("Data not ready for state check (VIX, SPY, or SMAs).")
return
vix_value = self.Securities[self.vix].Price
spy_price = self.Securities[self.spy].Price
sma50_value = self.spy_sma50.Current.Value
sma200_value = self.spy_sma200.Current.Value
is_bearish_trend = spy_price < sma200_value or sma50_value < sma200_value
is_bullish_trend = spy_price > sma200_value and sma50_value > sma200_value
is_vix_high = vix_value > self.vix_threshold
is_vix_low = vix_value < 20
if self.current_strategy_mode == "KQT":
if is_vix_high and is_bearish_trend:
if self.last_mode_switch_time is None or (self.Time - self.last_mode_switch_time).days >= self.min_days_between_switches:
self.Log(f"Conditions met to ENTER RiskControl: VIX {vix_value:.2f} > {self.vix_threshold} AND Bearish Trend (SPY {spy_price:.2f} vs SMA200 {sma200_value:.2f}, SMA50 {sma50_value:.2f} vs SMA200 {sma200_value:.2f}).")
self.EnterRiskControlMode()
self.last_mode_switch_time = self.Time
else:
self.Log(f"Conditions met but too soon since last switch ({(self.Time - self.last_mode_switch_time).days} days).")
elif self.current_strategy_mode == "RiskControl":
if is_vix_low and is_bullish_trend:
if self.last_mode_switch_time is None or (self.Time - self.last_mode_switch_time).days >= self.min_days_between_switches:
self.Log(f"Conditions met to EXIT RiskControl: VIX {vix_value:.2f} < 20 AND Bullish Trend (SPY {spy_price:.2f} > SMA200 {sma200_value:.2f}, SMA50 {sma50_value:.2f} > SMA200 {sma200_value:.2f}).")
self.ExitRiskControlMode()
self.last_mode_switch_time = self.Time
else:
self.Log(f"Conditions met but too soon since last switch ({(self.Time - self.last_mode_switch_time).days} days).")
# Remaining methods unchanged to preserve performance
def CoarseSelectionFunction(self, coarse):
if self.current_strategy_mode == "KQT":
sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sorted_by_dollar_volume[:500]]
elif self.current_strategy_mode == "RiskControl":
filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA]
symbols = [x.Symbol for x in filtered]
symbols.extend(self.rc_all_defensive)
symbols.append(self.rc_bil)
return list(set(symbols))
else:
return []
def FineSelectionFunction(self, fine):
if self.current_strategy_mode == "KQT":
sorted_by_market_cap = sorted(fine, key=lambda x: x.MarketCap, reverse=True)
selected = sorted_by_market_cap[:100]
self.kqt_tickers = []
self.kqt_symbols = {}
for f in selected:
ticker = f.Symbol.Value
self.kqt_tickers.append(ticker)
self.kqt_symbols[ticker] = f.Symbol
sector = "Unknown"
try:
if hasattr(f, 'AssetClassification') and f.AssetClassification is not None:
if hasattr(f.AssetClassification, 'MorningstarSectorCode'): sector = str(f.AssetClassification.MorningstarSectorCode)
elif hasattr(f.AssetClassification, 'GicsSectorCode'): sector = str(f.AssetClassification.GicsSectorCode)
elif hasattr(f.AssetClassification, 'Sector'): sector = f.AssetClassification.Sector
elif hasattr(f.AssetClassification, 'Industry'): sector = f.AssetClassification.Industry
except Exception as e:
self.Debug(f"KQT Fine - Error getting sector for {ticker}: {str(e)}")
self.kqt_sector_mappings[ticker] = sector
return [f.Symbol for f in selected]
elif self.current_strategy_mode == "RiskControl":
equity_fine = [x for x in fine if x.SecurityReference.SecurityType == "ST00000001" and x.MarketCap > 1e10]
sorted_by_cap = sorted(equity_fine, key=lambda x: x.MarketCap, reverse=True)[:30]
self.rc_selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
symbols = [x.Symbol for x in sorted_by_cap]
symbols.extend(self.rc_all_defensive)
symbols.append(self.rc_bil)
return list(set(symbols))
else:
return []
def OnSecuritiesChanged(self, changes):
self.Log(f"OnSecuritiesChanged ({self.current_strategy_mode} mode): Added {len(changes.AddedSecurities)}, Removed {len(changes.RemovedSecurities)}")
for removed in changes.RemovedSecurities:
ticker = removed.Symbol.Value
if ticker in self.kqt_tickers: self.kqt_tickers.remove(ticker)
if ticker in self.kqt_symbols: del self.kqt_symbols[ticker]
if ticker in self.kqt_sector_mappings: del self.kqt_sector_mappings[ticker]
if ticker in self.kqt_stock_data: del self.kqt_stock_data[ticker]
if self.Portfolio[removed.Symbol].Invested:
self.Log(f"Liquidating {removed.Symbol.Value} due to removal from universe.")
self.Liquidate(removed.Symbol)
def EnterRiskControlMode(self):
self.Log("Transitioning to RiskControl: Liquidating KQT assets.")
liquidated_count = 0
rc_symbols_to_keep = set(self.rc_all_defensive + [self.rc_bil, self.rc_spy])
for holding in self.Portfolio.Values:
if holding.Invested and holding.Symbol not in rc_symbols_to_keep:
self.Log(f" Liquidating {holding.Symbol.Value} (KQT asset).")
self.Liquidate(holding.Symbol)
liquidated_count += 1
self.Log(f"Liquidated {liquidated_count} KQT assets.")
self.kqt_current_predictions = {}
self.kqt_previous_positions = {}
self.kqt_stopped_out.clear()
self.current_strategy_mode = "RiskControl"
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
self.Log("Setting initial RiskControl position to 100% BIL.")
self.SetHoldings(self.rc_bil, 1.0)
self.rc_previous_bil_allocation = 1.0
self.rc_last_rebalance_date = self.Time
def ExitRiskControlMode(self):
self.Log("Transitioning back to KQT: Liquidating RiskControl assets.")
liquidated_count = 0
rc_symbols_to_liquidate = set(self.rc_all_defensive + [self.rc_bil])
for holding in self.Portfolio.Values:
if holding.Invested and holding.Symbol in rc_symbols_to_liquidate:
self.Log(f" Liquidating {holding.Symbol.Value} (RC asset).")
self.Liquidate(holding.Symbol)
liquidated_count += 1
self.Log(f"Liquidated {liquidated_count} RiskControl assets.")
self.rc_selected_by_market_cap = []
self.rc_rebalance_flag = False
self.rc_entry_prices = {}
self.rc_previous_bil_allocation = 0.0
self.rc_defensive_positions.clear()
self.current_strategy_mode = "KQT"
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
def OnData(self, data):
if data.Bars.ContainsKey(self.spy):
spy_close = data.Bars[self.spy].Close
self.rc_spy_30day_window.Add(spy_close)
self.spy_sma50.Update(self.Time, spy_close)
self.spy_sma200.Update(self.Time, spy_close)
self.rc_spy_prices[self.Time.date()] = spy_close
dates_to_remove = [date for date in self.rc_spy_prices if (self.Time.date() - date).days > self.rc_max_spy_history]
for date in dates_to_remove: self.rc_spy_prices.pop(date)
if self.current_strategy_mode == "RiskControl":
stop_loss_triggered = False
market_trend = self._rc_calculateMarketTrend()
for symbol in list(self.Portfolio.Keys):
if symbol not in self.Portfolio: continue
holding = self.Portfolio[symbol]
if holding.Invested and symbol != self.rc_bil:
current_price = self.Securities[symbol].Price
if current_price <= 0: continue
if symbol not in self.rc_entry_prices:
if holding.AveragePrice > 0:
self.rc_entry_prices[symbol] = holding.AveragePrice
self.Log(f"Warning: Missing entry price for {symbol}. Using average price {holding.AveragePrice} for stop-loss check.")
else:
self.Log(f"Warning: Cannot check stop-loss for {symbol}. Missing entry price and invalid average price.")
continue
entry_price = self.rc_entry_prices[symbol]
if entry_price <= 0: continue
price_drop = (entry_price - current_price) / entry_price
stop_threshold = self.rc_stop_loss_base
if market_trend < -0.03: stop_threshold *= 0.9
elif market_trend > 0.03: stop_threshold *= 1.1
if symbol in self.rc_atr and self.rc_atr[symbol].IsReady:
current_atr = self.rc_atr[symbol].Current.Value
atr_pct = current_atr / current_price if current_price > 0 else 0
effective_weight = self.rc_dynamic_stop_weight
if atr_pct > stop_threshold * 1.2: effective_weight = min(self.rc_dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold + effective_weight * atr_pct)
if price_drop >= stop_threshold:
self.Log(f"RiskControl Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
self.Liquidate(symbol, "RiskControl Stop Loss")
stop_loss_triggered = True
if symbol in self.rc_entry_prices: del self.rc_entry_prices[symbol]
def TradeExecute(self):
if not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].Exchange.ExchangeOpen:
return
self.Log(f"TradeExecute running in {self.current_strategy_mode} mode at {self.Time}")
if self.current_strategy_mode == "KQT":
self.Log(f"KQT: Current universe size: {len(self.kqt_tickers)}")
self.UpdateKQTHistoricalData()
self.kqt_current_predictions = self.GenerateKQTPredictions()
self.ProcessKQTStopLosses()
market_returns = self.GetMarketReturns()
target_positions = self.strategy.generate_positions(self.kqt_current_predictions, market_returns, algorithm=self)
self.ExecuteKQTTrades(target_positions)
daily_return = self.CalculatePortfolioReturn()
self.strategy.update_portfolio_returns(daily_return)
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
elif self.current_strategy_mode == "RiskControl":
self.Log("RiskControl mode active. Main logic runs on schedule.")
pass
def UpdateKQTHistoricalData(self):
self.Log(f"KQT: Updating history for {len(self.kqt_tickers)} tickers.")
active_tickers = list(self.kqt_symbols.keys())
if not active_tickers:
self.kqt_stock_data = {}
return
symbols_to_request = [self.kqt_symbols[ticker] for ticker in active_tickers]
history = self.History(symbols_to_request, self.kqt_lookback + 5, Resolution.Daily)
if history.empty:
self.kqt_stock_data = {}
return
history = history.reset_index()
new_stock_data = {}
for ticker in active_tickers:
symbol_obj = self.kqt_symbols[ticker]
symbol_history = history[history['symbol'] == symbol_obj]
if not symbol_history.empty and len(symbol_history) >= self.kqt_lookback:
new_stock_data[ticker] = symbol_history
self.kqt_stock_data = new_stock_data
self.Log(f"KQT: Updated history for {len(self.kqt_stock_data)} tickers.")
def GenerateKQTPredictions(self):
predictions = {}
self.Log(f"KQT: Generating fallback predictions for {len(self.kqt_stock_data)} stocks.")
for ticker, history_df in self.kqt_stock_data.items():
if ticker not in self.kqt_symbols: continue
try:
closes = history_df['close'].values
if len(closes) > 20:
short_ma = np.mean(closes[-5:])
long_ma = np.mean(closes[-20:])
momentum = closes[-1] / closes[-10] - 1 if len(closes) > 10 else 0
pred_score = momentum + 0.5 * (short_ma/long_ma - 1) if long_ma != 0 else momentum
pred_return = pred_score * 2
threshold = 0.1
predictions[ticker] = {
"pred_return": pred_return,
"composite_score": pred_return / threshold if threshold != 0 else pred_return
}
except Exception as e:
self.Log(f"KQT Error processing {ticker} in GenerateKQTPredictions: {str(e)}")
continue
self.Log(f"KQT: Generated {len(predictions)} predictions.")
return predictions
def ProcessKQTStopLosses(self):
stop_loss_level = self.strategy.get_stop_loss_level()
self.kqt_stopped_out.clear()
for ticker in list(self.kqt_symbols.keys()):
symbol = self.kqt_symbols[ticker]
if not self.Portfolio[symbol].Invested: continue
position = self.Portfolio[symbol]
history = self.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2: continue
close_prices = history.loc[symbol]['close'] if symbol in history.index else pd.Series()
if len(close_prices) < 2: continue
daily_return = (close_prices.iloc[-1] / close_prices.iloc[-2] - 1) * 100
position_type = "long" if position.Quantity > 0 else "short"
hit_stop = False
if position_type == "long" and daily_return < stop_loss_level:
hit_stop = True
self.Log(f"KQT Stop loss triggered for {ticker} (long): {daily_return:.2f}% < {stop_loss_level:.2f}%")
elif position_type == "short" and daily_return > abs(stop_loss_level):
hit_stop = True
self.Log(f"KQT Stop loss triggered for {ticker} (short): {daily_return:.2f}% > {abs(stop_loss_level):.2f}%")
if hit_stop:
self.kqt_stopped_out.add(ticker)
self.Liquidate(symbol, f"KQT Stop Loss {daily_return:.2f}%")
def ExecuteKQTTrades(self, target_positions):
self.Log(f"--- KQT ExecuteTrades START ---")
portfolio_value = self.Portfolio.TotalPortfolioValue
if portfolio_value <= 0:
self.Log("KQT ExecuteTrades: Zero or negative portfolio value. Cannot execute trades.")
return
final_targets = []
processed_symbols = set()
kqt_managed_symbols = set(self.kqt_symbols.values())
min_exec_weight = 0.035
initial_total_allocation = sum(abs(weight) for weight in target_positions.values())
self.Log(f"KQT: Initial target allocation sum from strategy: {initial_total_allocation:.3f}")
max_allowed_allocation = 0.99
scaling_factor = 1.0
if initial_total_allocation > max_allowed_allocation:
scaling_factor = max_allowed_allocation / initial_total_allocation
self.Log(f"KQT: Scaling positions by {scaling_factor:.3f} to meet max allocation {max_allowed_allocation*100}%. Original total: {initial_total_allocation:.3f}")
for ticker, target_weight in target_positions.items():
scaled_target_weight = target_weight * scaling_factor
if ticker in self.kqt_stopped_out:
self.Log(f"KQT: Skipping trade for {ticker}, recently stopped out. Will check for liquidation later.")
if ticker in self.kqt_symbols:
processed_symbols.add(self.kqt_symbols[ticker])
continue
if ticker not in self.kqt_symbols:
self.Log(f"KQT: Skipping trade for {ticker}, not in current KQT symbols map.")
continue
symbol = self.kqt_symbols[ticker]
processed_symbols.add(symbol)
try:
target_weight_float = float(scaled_target_weight)
if not np.isfinite(target_weight_float):
raise ValueError("Non-finite weight")
except ValueError:
self.Log(f"KQT: Invalid/Non-finite target weight for {ticker}: {scaled_target_weight}. Skipping.")
continue
current_holding = self.Portfolio[symbol]
current_weight = current_holding.HoldingsValue / portfolio_value if portfolio_value > 0 and current_holding.Invested else 0.0
weight_difference = abs(target_weight_float - current_weight)
is_significant_target = abs(target_weight_float) >= min_exec_weight
is_currently_invested = current_holding.Invested
if is_significant_target:
if not is_currently_invested or weight_difference > self.kqt_rebalance_tolerance:
self.Log(f"KQT: Setting target for {ticker} to {target_weight_float:.4f} (Current: {current_weight:.4f}, Diff: {weight_difference:.4f}, Tol: {self.kqt_rebalance_tolerance})")
final_targets.append(PortfolioTarget(symbol, target_weight_float))
else:
self.Log(f"KQT: Skipping target for {ticker} ({target_weight_float:.4f}), change vs current ({current_weight:.4f}) within tolerance.")
elif is_currently_invested:
if weight_difference > self.kqt_rebalance_tolerance:
self.Log(f"KQT: Liquidating {ticker} (Current: {current_weight:.4f}) due to near-zero target ({target_weight_float:.4f}) and significant difference.")
final_targets.append(PortfolioTarget(symbol, 0))
else:
self.Log(f"KQT: Skipping liquidation for {ticker} ({target_weight_float:.4f}), change vs current ({current_weight:.4f}) within tolerance.")
for holding in self.Portfolio.Values:
if not holding.Invested or holding.Symbol not in kqt_managed_symbols:
continue
symbol = holding.Symbol
ticker = symbol.Value
current_weight = holding.HoldingsValue / portfolio_value
if ticker in self.kqt_stopped_out and symbol not in [t.Symbol for t in final_targets if t.Quantity == 0]:
if current_weight > self.kqt_rebalance_tolerance:
self.Log(f"KQT: Adding liquidation target for stopped-out {ticker} (Current: {current_weight:.4f})")
final_targets.append(PortfolioTarget(symbol, 0))
continue
if symbol not in processed_symbols and symbol not in [t.Symbol for t in final_targets]:
if current_weight > self.kqt_rebalance_tolerance:
self.Log(f"KQT: Adding liquidation target for untargeted holding {ticker} (Current: {current_weight:.4f})")
final_targets.append(PortfolioTarget(symbol, 0))
else:
self.Log(f"KQT: Skipping liquidation for untargeted {ticker}, current weight ({current_weight:.4f}) within tolerance from zero.")
if final_targets:
self.Log(f"KQT: Submitting {len(final_targets)} targets to SetHoldings after tolerance check.")
self.SetHoldings(final_targets)
else:
self.Log("KQT: No targets needed after tolerance check.")
self.Log(f"--- KQT ExecuteTrades END ---")
self.kqt_previous_positions = target_positions
def GetMarketReturns(self):
spy_history = self.History(self.spy, 15, Resolution.Daily)
if spy_history.empty or len(spy_history) < 2: return []
spy_prices = spy_history.loc[self.spy]['close'] if self.spy in spy_history.index else pd.Series()
if len(spy_prices) < 2: return []
spy_returns = spy_prices.pct_change().dropna() * 100
return spy_returns.tolist()[-10:]
def CalculatePortfolioReturn(self):
current_value = self.Portfolio.TotalPortfolioValue
if self.previous_portfolio_value > 0:
return (current_value / self.previous_portfolio_value - 1) * 100
return 0
def TryLoadModelWeights(self):
try:
if self.ObjectStore.ContainsKey("kqt_model_weights"):
self.Debug("Found model weights in ObjectStore, loading...")
encoded_bytes = self.ObjectStore.Read("kqt_model_weights")
import base64
model_bytes = base64.b64decode(encoded_bytes)
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix='.pth') as temp:
temp_path = temp.name
temp.write(model_bytes)
state_dict = torch.load(temp_path)
input_shape = state_dict['embedding.0.weight'].shape
actual_input_size = input_shape[1]
self.Debug(f"Detected input size from weights: {actual_input_size}")
self.Debug("Successfully loaded model weights")
import os
os.unlink(temp_path)
else:
self.Debug("No model weights found in ObjectStore")
except Exception as e:
self.Debug(f"Error loading model weights: {str(e)}")
def SetRebalanceFlag(self):
if self.current_strategy_mode == "RiskControl":
if self.Time.weekday() == 2:
self.rc_rebalance_flag = True
self.Log("RiskControl: Set rebalance flag for Wednesday.")
def MonthlyRebalance(self):
if self.current_strategy_mode != "RiskControl" or not self.rc_rebalance_flag:
return
self.Log("--- RiskControl MonthlyRebalance START ---")
portfolio_value = self.Portfolio.TotalPortfolioValue
if portfolio_value <= 0:
self.Log("RiskControl MonthlyRebalance: Zero or negative portfolio value.")
return
self.rc_rebalance_flag = False
self.rc_entry_prices.clear()
if self.rc_spy_30day_window.Count < 30:
self.Log("RiskControl: Waiting for enough SPY history for rebalance.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.rc_spy_30day_window) / self.rc_spy_30day_window.Count
market_deviation = (spy_price / sma_30) - 1.0 if sma_30 > 0 else 0.0
market_trend = self._rc_calculateMarketTrend()
bil_weight = 0.0
bil_weight = min(bil_weight, 0.10)
self.Log(f"RC Rebalance: Calculated initial BIL weight: {bil_weight:.1%}")
defensive_etf_potential = bil_weight * 0.4
all_defensive_allocations = self._rc_evaluateDefensiveETFs(market_deviation, market_trend, defensive_etf_potential)
total_defensive_allocation = sum(all_defensive_allocations.values())
bil_weight -= total_defensive_allocation
if total_defensive_allocation > 0.05: bil_weight = min(bil_weight, 0.15)
elif total_defensive_allocation > 0.01: bil_weight = min(bil_weight, 0.25)
bil_weight = max(0, bil_weight)
bil_weight = min(bil_weight, 0.10)
self.Log(f"RC Rebalance: Adjusted BIL weight after defensive: {bil_weight:.1%}")
equity_weight = max(0, 1.0 - bil_weight - total_defensive_allocation)
self.Log(f"RC Rebalance - Market Dev: {market_deviation:.2%}, Trend: {market_trend:.2%}")
self.Log(f"RC Final Allocation Targets: Equity {equity_weight:.1%}, BIL {bil_weight:.1%}, Defensive {total_defensive_allocation:.1%}")
momentum_scores = self._rc_calculateSimpleMomentum()
filtered_stocks = [(s, mcap) for s, mcap in self.rc_selected_by_market_cap if momentum_scores.get(s, 1.0) >= 0.9]
if len(filtered_stocks) < 20: filtered_stocks = self.rc_selected_by_market_cap
total_market_cap = sum([x[1] for x in filtered_stocks])
equity_weights = {s: (mcap / total_market_cap) * equity_weight for s, mcap in filtered_stocks} if total_market_cap > 0 and equity_weight > 0 else {}
final_targets = []
symbols_targeted_for_investment = set()
rc_equity_symbols = {s for s, _ in self.rc_selected_by_market_cap}
rc_managed_symbols = rc_equity_symbols | set(self.rc_all_defensive) | {self.rc_bil}
if equity_weight > 0:
for symbol, target_weight in equity_weights.items():
if target_weight > 0.001:
current_holding = self.Portfolio[symbol]
current_weight = current_holding.HoldingsValue / portfolio_value if portfolio_value > 0 and current_holding.Invested else 0.0
weight_difference = abs(target_weight - current_weight)
if not current_holding.Invested or weight_difference > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Setting Equity target {symbol.Value} to {target_weight:.4f} (Current: {current_weight:.4f}, Diff: {weight_difference:.4f})")
final_targets.append(PortfolioTarget(symbol, target_weight))
symbols_targeted_for_investment.add(symbol)
self.rc_entry_prices[symbol] = self.Securities[symbol].Price
else:
self.Log(f"RC Rebalance: Skipping Equity target {symbol.Value} ({target_weight:.4f}), change vs current ({current_weight:.4f}) within tolerance.")
if current_holding.Invested: symbols_targeted_for_investment.add(symbol)
current_holding_bil = self.Portfolio[self.rc_bil]
current_weight_bil = current_holding_bil.HoldingsValue / portfolio_value if portfolio_value > 0 and current_holding_bil.Invested else 0.0
weight_difference_bil = abs(bil_weight - current_weight_bil)
if bil_weight > 0.001:
if not current_holding_bil.Invested or weight_difference_bil > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Setting BIL target to {bil_weight:.4f} (Current: {current_weight_bil:.4f}, Diff: {weight_difference_bil:.4f})")
final_targets.append(PortfolioTarget(self.rc_bil, bil_weight))
symbols_targeted_for_investment.add(self.rc_bil)
else:
self.Log(f"RC Rebalance: Skipping BIL target ({bil_weight:.4f}), change vs current ({current_weight_bil:.4f}) within tolerance.")
if current_holding_bil.Invested: symbols_targeted_for_investment.add(self.rc_bil)
self.rc_defensive_positions.clear()
if total_defensive_allocation > 0:
for symbol, target_weight in all_defensive_allocations.items():
if target_weight > 0.001:
current_holding_def = self.Portfolio[symbol]
current_weight_def = current_holding_def.HoldingsValue / portfolio_value if portfolio_value > 0 and current_holding_def.Invested else 0.0
weight_difference_def = abs(target_weight - current_weight_def)
if not current_holding_def.Invested or weight_difference_def > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Setting Defensive target {symbol.Value} to {target_weight:.4f} (Current: {current_weight_def:.4f}, Diff: {weight_difference_def:.4f})")
final_targets.append(PortfolioTarget(symbol, target_weight))
symbols_targeted_for_investment.add(symbol)
self.rc_defensive_positions.add(symbol)
self.rc_entry_prices[symbol] = self.Securities[symbol].Price
else:
self.Log(f"RC Rebalance: Skipping Defensive target {symbol.Value} ({target_weight:.4f}), change vs current ({current_weight_def:.4f}) within tolerance.")
if current_holding_def.Invested:
symbols_targeted_for_investment.add(symbol)
self.rc_defensive_positions.add(symbol)
for holding in self.Portfolio.Values:
if holding.Invested and holding.Symbol in rc_managed_symbols and holding.Symbol not in symbols_targeted_for_investment:
current_weight = holding.HoldingsValue / portfolio_value
if abs(0 - current_weight) > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Liquidating untargeted RC asset {holding.Symbol.Value} (Current: {current_weight:.4f})")
final_targets.append(PortfolioTarget(holding.Symbol, 0))
else:
self.Log(f"RC Rebalance: Skipping liquidation for untargeted {holding.Symbol.Value}, current weight ({current_weight:.4f}) within tolerance from zero.")
if final_targets:
self.Log(f"RC Rebalance: Submitting {len(final_targets)} targets to SetHoldings after tolerance check.")
self.SetHoldings(final_targets)
else:
self.Log("RC Rebalance: No targets needed after tolerance check.")
self.rc_last_rebalance_date = self.Time
self.rc_previous_bil_allocation = self.Portfolio[self.rc_bil].HoldingsValue / portfolio_value if portfolio_value > 0 else 0
self.Log(f"--- RiskControl MonthlyRebalance END (New BIL Alloc: {self.rc_previous_bil_allocation:.1%}) ---")
def WeeklyDefensiveAdjustment(self):
if self.current_strategy_mode != "RiskControl": return
days_since_rebalance = (self.Time.date() - self.rc_last_rebalance_date.date()).days
if days_since_rebalance < 3: return
days_since_update = (self.Time.date() - self.rc_last_defensive_update.date()).days
if days_since_update < 5: return
self.Log("--- RiskControl WeeklyDefensiveAdjustment START ---")
portfolio_value = self.Portfolio.TotalPortfolioValue
if portfolio_value <= 0:
self.Log("RC Weekly: Zero or negative portfolio value.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.rc_spy_30day_window) / self.rc_spy_30day_window.Count if self.rc_spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0 if sma_30 > 0 else 0.0
market_trend = self._rc_calculateMarketTrend()
if market_deviation > 0.04 and market_trend > 0.03:
self.Log("RC Weekly: Market too strong, skipping defensive additions.")
liquidate_existing_defensives = False
for s in self.rc_all_defensive:
if self.Portfolio[s].Invested:
liquidate_existing_defensives = True
break
if liquidate_existing_defensives:
self.Log("RC Weekly: Liquidating existing defensive positions due to strong market.")
liquidation_targets = []
for s in self.rc_all_defensive:
if self.Portfolio[s].Invested:
current_weight = self.Portfolio[s].HoldingsValue / portfolio_value
if abs(0 - current_weight) > self.rc_rebalance_tolerance:
liquidation_targets.append(PortfolioTarget(s, 0))
if liquidation_targets:
self.SetHoldings(liquidation_targets)
self.rc_defensive_positions.clear()
self.rc_last_defensive_update = self.Time
else:
self.Log("RC Weekly: Existing defensive positions within tolerance from zero, no liquidation needed.")
return
current_bil_holding = self.Portfolio[self.rc_bil]
current_bil_weight = current_bil_holding.HoldingsValue / portfolio_value if portfolio_value > 0 and current_bil_holding.Invested else 0.0
total_invested_pct = sum(h.HoldingsValue for h in self.Portfolio.Values if h.Invested) / portfolio_value if portfolio_value > 0 else 0.0
available_allocation = max(0, 0.99 - total_invested_pct)
max_defensive_pct_from_bil = 0.25
potential_allocation_from_bil = current_bil_weight * max_defensive_pct_from_bil
potential_allocation = min(available_allocation, potential_allocation_from_bil)
if potential_allocation < 0.01:
self.Log(f"RC Weekly: Not enough potential allocation ({potential_allocation:.2%}) from BIL ({current_bil_weight:.1%}) or available space ({available_allocation:.1%}).")
return
self.Log(f"RC Weekly - Market Dev: {market_deviation:.2%}, Trend: {market_trend:.2%}")
self.Log(f"RC Weekly - BIL: {current_bil_weight:.1%}, Potential Defensive: {potential_allocation:.1%}")
new_defensive_allocations = self._rc_evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation)
final_targets = []
symbols_targeted_for_investment = set()
symbols_targeted_for_investment.add(self.rc_bil)
total_new_defensive_target = sum(new_defensive_allocations.values())
if total_new_defensive_target > potential_allocation:
scale = potential_allocation / total_new_defensive_target if total_new_defensive_target > 0 else 0
self.Log(f"RC Weekly: Scaling new defensive targets by {scale:.3f} (Total: {total_new_defensive_target:.2%}, Potential: {potential_allocation:.2%})")
for s in new_defensive_allocations: new_defensive_allocations[s] *= scale
total_new_defensive_target = sum(new_defensive_allocations.values())
intended_bil_target_weight = max(0, current_bil_weight - total_new_defensive_target)
current_rc_defensive_symbols = set(self.rc_all_defensive)
for symbol in current_rc_defensive_symbols:
target_weight = new_defensive_allocations.get(symbol, 0.0)
current_holding = self.Portfolio[symbol]
current_weight = current_holding.HoldingsValue / portfolio_value if portfolio_value > 0 and current_holding.Invested else 0.0
weight_difference = abs(target_weight - current_weight)
if target_weight > 0.01:
if not current_holding.Invested or weight_difference > self.rc_rebalance_tolerance:
self.Log(f"RC Weekly: Setting Defensive target {symbol.Value} to {target_weight:.4f} (Current: {current_weight:.4f}, Diff: {weight_difference:.4f})")
final_targets.append(PortfolioTarget(symbol, target_weight))
symbols_targeted_for_investment.add(symbol)
self.rc_entry_prices[symbol] = self.Securities[symbol].Price
else:
self.Log(f"RC Weekly: Skipping Defensive target {symbol.Value} ({target_weight:.4f}), change vs current ({current_weight:.4f}) within tolerance.")
if current_holding.Invested: symbols_targeted_for_investment.add(symbol)
elif current_holding.Invested:
if abs(0 - current_weight) > self.rc_rebalance_tolerance:
self.Log(f"RC Weekly: Liquidating Defensive {symbol.Value} (Current: {current_weight:.4f}) due to zero target.")
final_targets.append(PortfolioTarget(symbol, 0))
else:
self.Log(f"RC Weekly: Skipping Defensive liquidation {symbol.Value}, current weight ({current_weight:.4f}) within tolerance from zero.")
weight_difference_bil = abs(intended_bil_target_weight - current_bil_weight)
if weight_difference_bil > self.rc_rebalance_tolerance:
self.Log(f"RC Weekly: Adjusting BIL target to {intended_bil_target_weight:.4f} (Current: {current_bil_weight:.4f}, Diff: {weight_difference_bil:.4f})")
final_targets.append(PortfolioTarget(self.rc_bil, intended_bil_target_weight))
if intended_bil_target_weight <= 0.001:
if self.rc_bil in symbols_targeted_for_investment:
symbols_targeted_for_investment.remove(self.rc_bil)
else:
self.Log(f"RC Weekly: Skipping BIL adjustment ({intended_bil_target_weight:.4f}), change vs current ({current_bil_weight:.4f}) within tolerance.")
if final_targets:
self.Log(f"RC Weekly: Submitting {len(final_targets)} targets to SetHoldings after tolerance check.")
self.SetHoldings(final_targets)
self.rc_defensive_positions.clear()
for target in final_targets:
if target.Symbol in self.rc_all_defensive and target.Quantity > 0:
self.rc_defensive_positions.add(target.Symbol)
self.rc_last_defensive_update = self.Time
else:
self.Log("RC Weekly: No targets needed after tolerance check.")
current_defensive = set()
for s in self.rc_all_defensive:
if self.Portfolio[s].Invested:
current_defensive.add(s)
self.rc_defensive_positions = current_defensive
self.Log("--- RiskControl WeeklyDefensiveAdjustment END ---")
def _rc_calculateMarketTrend(self):
if len(self.rc_spy_prices) < self.rc_trend_lookback + 1: return 0
dates = sorted(self.rc_spy_prices.keys())
if len(dates) <= self.rc_trend_lookback: return 0
recent_price = self.rc_spy_prices[dates[-1]]
older_price = self.rc_spy_prices[dates[-self.rc_trend_lookback]]
return (recent_price / older_price) - 1.0 if older_price > 0 else 0.0
def _rc_calculateSimpleMomentum(self):
momentum_scores = {}
symbols = [sym for sym, _ in self.rc_selected_by_market_cap]
if not symbols: return momentum_scores
history = self.History(symbols, 30, Resolution.Daily)
if history.empty: return momentum_scores
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
mom = prices.iloc[-1] / prices.iloc[0] - 1 if prices.iloc[0] > 0 else 0.0
momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2)))
return momentum_scores
def _rc_evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation):
self.Log(f"RC: Evaluating defensive ETFs. Max Alloc: {max_allocation:.2%}")
allocations = {symbol: 0 for symbol in self.rc_all_defensive}
if market_deviation > 0.04 and market_trend > 0.02:
self.Log("RC EvalDef: Market too strong, skipping.")
return allocations
history = self.History(self.rc_all_defensive + [self.spy], 60, Resolution.Daily)
if history.empty:
self.Log("RC EvalDef: History empty, skipping.")
return allocations
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 and spy_prices.iloc[-5] > 0 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 and spy_prices.iloc[-10] > 0 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 and spy_prices.iloc[-20] > 0 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 if len(spy_prices) >= 30 and spy_prices.iloc[-30] > 0 else 0
}
etf_scores = {}
for group_name, group in [("Inverse", self.rc_inverse_etfs), ("Alternative", self.rc_alternative_defensive), ("Sector", self.rc_sector_defensive)]:
for symbol in group:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
perf = {}
perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 and prices.iloc[-5] > 0 else 0
perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 and prices.iloc[-10] > 0 else 0
perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 and prices.iloc[-20] > 0 else 0
perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1 if len(prices) >= 30 and prices.iloc[-30] > 0 else 0
rel_perf = {p: perf[p] - spy_perf.get(p, 0) for p in spy_perf}
score = 0
if symbol in self.rc_inverse_etfs:
if market_deviation < -0.02: score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2) + (rel_perf.get("5d",0) + rel_perf.get("10d",0)) * 0.15
else: score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
elif symbol in self.rc_alternative_defensive:
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
if market_deviation < -0.03: score += rel_perf.get("10d",0) * 0.2
else:
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score = (rel_perf.get("5d",0) * 0.3) + (rel_perf.get("10d",0) * 0.3) + (rel_perf.get("30d",0) * 0.4)
if market_deviation < -0.02: score = (abs_score * 0.4) + (rel_score * 0.6)
else: score = (abs_score * 0.6) + (rel_score * 0.4)
etf_scores[symbol] = score
threshold = -0.007
if market_deviation < -0.03: threshold = -0.01
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Log("RC EvalDef: No candidates passed threshold.")
return allocations
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
allocation_pct = 0.0
if market_deviation < -0.05 or market_trend < -0.04: allocation_pct = 0.95
elif market_deviation < -0.03 or market_trend < -0.02: allocation_pct = 0.8
elif market_deviation < -0.01 or market_trend < -0.01: allocation_pct = 0.6
else: allocation_pct = 0.4
best_score = sorted_candidates[0][1] if sorted_candidates else 0
allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4))
num_etfs = 1
if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1:
num_etfs = min(2, len(sorted_candidates))
remaining_allocation = max_allocation * allocation_pct
total_score = sum(score for _, score in sorted_candidates[:num_etfs])
if total_score > 0:
for i in range(num_etfs):
symbol, score = sorted_candidates[i]
weight = score / total_score if total_score > 0 else 1.0/num_etfs
etf_allocation = remaining_allocation * weight
if etf_allocation >= 0.02:
allocations[symbol] = etf_allocation
self.Log(f"RC EvalDef: Allocating {etf_allocation:.1%} to {symbol.Value} (Score: {score:.3f})")
return allocationsfrom AlgorithmImports import *
import numpy as np
from datetime import timedelta, datetime
class RiskControlStrategyModule:
def __init__(self, algorithm):
self.algorithm = algorithm
self.selected_by_market_cap = []
self.rebalance_flag = False
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.last_rebalance_date = datetime(1900, 1, 1)
self.last_defensive_update = datetime(1900, 1, 1)
# Symbols managed by this strategy
self.spy = self.algorithm.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.algorithm.AddEquity("BIL", Resolution.Daily).Symbol
self.sh = self.algorithm.AddEquity("SH", Resolution.Daily).Symbol
self.psq = self.algorithm.AddEquity("PSQ", Resolution.Daily).Symbol
self.dog = self.algorithm.AddEquity("DOG", Resolution.Daily).Symbol
self.rwm = self.algorithm.AddEquity("RWM", Resolution.Daily).Symbol
self.eum = self.algorithm.AddEquity("EUM", Resolution.Daily).Symbol
self.myd = self.algorithm.AddEquity("MYY", Resolution.Daily).Symbol # Check ticker MYY vs MYD
self.gld = self.algorithm.AddEquity("GLD", Resolution.Daily).Symbol
self.ief = self.algorithm.AddEquity("IEF", Resolution.Daily).Symbol
self.bnd = self.algorithm.AddEquity("BND", Resolution.Daily).Symbol
self.xlp = self.algorithm.AddEquity("XLP", Resolution.Daily).Symbol
self.xlu = self.algorithm.AddEquity("XLU", Resolution.Daily).Symbol
self.xlv = self.algorithm.AddEquity("XLV", Resolution.Daily).Symbol
self.vht = self.algorithm.AddEquity("VHT", Resolution.Daily).Symbol
self.vdc = self.algorithm.AddEquity("VDC", Resolution.Daily).Symbol
self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
self.alternative_defensive = [self.gld, self.ief, self.bnd]
self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive
self.risk_control_symbols = set([self.spy, self.bil] + self.all_defensive) # Symbols primarily managed here
self.defensive_positions = set() # Tracks symbols currently held as defensive by this strategy
# Indicators
self.spy_30day_window = RollingWindow[float](30)
self.atr_period = 14
self.atr = {}
self.spy_prices = {} # For trend calculation
self.max_spy_history = 60
self.trend_lookback = 10
# Settings
self.stop_loss_base = 0.04
self.dynamic_stop_weight = 0.5
self.diagnostic_mode = True # Enable detailed diagnostics
# Initialize indicators for relevant symbols
self._initialize_indicators()
def _initialize_indicators(self):
# Initialize rolling window with historical data
history = self.algorithm.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
# Initialize ATR for key symbols
for symbol in self.all_defensive + [self.bil, self.spy]:
# Check if symbol exists in Securities before creating indicator
if self.algorithm.Securities.ContainsKey(symbol):
self.atr[symbol] = self.algorithm.ATR(symbol, self.atr_period, Resolution.Daily)
else:
self.algorithm.Log(f"Warning: Symbol {symbol} not found for ATR initialization.")
def Initialize(self):
"""Initialize specific settings for Risk Control mode"""
self.algorithm.Log("Initializing Risk Control Strategy Module specific settings.")
# Set benchmark? Or keep the main one? Let's keep the main one (SPY).
# Universe selection specific to Risk Control
self._universe = self.algorithm.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
# Schedules specific to Risk Control
self.rebalance_schedule = self.algorithm.Schedule.On(self.algorithm.DateRules.MonthStart(self.spy),
self.algorithm.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.monthly_schedule = self.algorithm.Schedule.On(self.algorithm.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.algorithm.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
self.weekly_def_schedule = self.algorithm.Schedule.On(self.algorithm.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.algorithm.TimeRules.AfterMarketOpen(self.spy, 60),
self.WeeklyDefensiveAdjustment)
def Activate(self):
"""Actions to take when this strategy becomes active."""
self.algorithm.Log("Activating Risk Control Strategy Module")
# Ensure universe is active
if not self._universe:
self._universe = self.algorithm.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
# Reset state variables
self.entry_prices.clear()
self.previous_bil_allocation = 0.0
self.last_rebalance_date = self.algorithm.Time - timedelta(days=31) # Ensure rebalance runs soon
self.last_defensive_update = self.algorithm.Time - timedelta(days=8) # Ensure weekly runs soon
self.defensive_positions.clear()
# Re-initialize indicators if needed (e.g., if algorithm restarted)
self._initialize_indicators()
# Trigger initial rebalance maybe? Or let the schedule handle it. Let schedule handle.
self.algorithm.Log("Risk Control Module Activated. Waiting for scheduled rebalance.")
def Deactivate(self):
"""Actions to take when this strategy becomes inactive."""
self.algorithm.Log("Deactivating Risk Control Strategy Module")
# Remove universe specific to Risk Control?
# self.algorithm.RemoveUniverse(self._universe)
# self._universe = None
# Clear internal state
self.selected_by_market_cap = []
self.entry_prices.clear()
# Liquidate positions held by this strategy? The main handler should do this.
def CoarseSelectionFunction(self, coarse):
# Use algorithm time
# if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 30:
# self.algorithm.Log(f"RiskControl Coarse Selection: {len(coarse)} symbols")
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
# Return only symbols needed for FineSelection, not all 500 like KQT
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
# Use algorithm time
# if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 30:
# self.algorithm.Log(f"RiskControl Fine Selection: {len(fine)} symbols")
# Filter for large cap stocks for the equity portion
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"] # Common Stock
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30] # Top 30 large caps
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
# Return symbols for the equity part + ensure defensive symbols are subscribed
equity_symbols = [x.Symbol for x in sorted_by_cap]
all_needed_symbols = set(equity_symbols + self.all_defensive + [self.spy, self.bil])
return list(all_needed_symbols)
def OnSecuritiesChanged(self, changes):
# Only log if active?
# if self.algorithm.is_risk_control_active:
# self.algorithm.Log(f"RiskControl OnSecuritiesChanged: Added {len(changes.AddedSecurities)}, Removed {len(changes.RemovedSecurities)}")
# Handle removals if necessary, FineSelection should handle additions/updates for equity part
for removed in changes.RemovedSecurities:
# If a selected large-cap stock is removed, update the list
self.selected_by_market_cap = [(s, mc) for s, mc in self.selected_by_market_cap if s != removed.Symbol]
# If a defensive ETF is removed (unlikely), log warning
if removed.Symbol in self.all_defensive:
self.algorithm.Log(f"Warning: Defensive ETF {removed.Symbol.Value} removed from universe.")
# Liquidate if holding a removed equity position
if self.algorithm.Portfolio[removed.Symbol].Invested and removed.Symbol not in self.risk_control_symbols:
self.algorithm.Log(f"RiskControl: Liquidating {removed.Symbol.Value} due to removal from universe.")
self.algorithm.Liquidate(removed.Symbol)
def SetRebalanceFlag(self):
# Only set flag if this strategy is active
if not self.algorithm.is_risk_control_active: return
# Original logic: Set flag on MonthStart, Rebalance on Wednesday after MonthStart
# This seems slightly complex. Let's simplify: Rebalance on first Wednesday of month.
# The MonthlyRebalance check `if not self.rebalance_flag: return` handles this.
# Let's keep the original flag logic for now.
if self.algorithm.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
self.algorithm.Log("RiskControl: Rebalance flag SET for Wednesday.")
def OnData(self, data):
# Only run if this strategy is active
if not self.algorithm.is_risk_control_active: return
# Update SPY price window and history
if data.Bars.ContainsKey(self.spy):
self.spy_30day_window.Add(data.Bars[self.spy].Close)
self.spy_prices[self.algorithm.Time.date()] = data.Bars[self.spy].Close
# Remove old prices
dates_to_remove = [date for date in self.spy_prices if (self.algorithm.Time.date() - date).days > self.max_spy_history]
for date in dates_to_remove: self.spy_prices.pop(date, None)
else:
return # Need SPY data for checks
market_trend = self._calculateMarketTrend()
stop_loss_triggered = False
# Check stop-loss triggers
# Iterate safely over portfolio copy
for symbol, holding in list(self.algorithm.Portfolio.items()):
if not holding.Invested or symbol == self.bil: continue # Skip cash-like BIL
# Only manage stops for symbols relevant to this strategy?
# Or manage all stops when active? Let's manage all non-BIL stops.
# is_risk_control_asset = symbol in self.risk_control_symbols or symbol in [s for s, mc in self.selected_by_market_cap]
# if not is_risk_control_asset: continue
current_price = holding.Price
if current_price <= 0: continue # Skip if price is invalid
if symbol not in self.entry_prices:
self.entry_prices[symbol] = holding.AveragePrice # Use average price if entry price not set
entry_price = self.entry_prices.get(symbol, holding.AveragePrice)
if entry_price <= 0: continue # Skip if entry price is invalid
price_drop_pct = (entry_price - current_price) / entry_price
# Calculate dynamic stop threshold
stop_threshold = self.stop_loss_base
if market_trend < -0.03: stop_threshold *= 0.9
elif market_trend > 0.03: stop_threshold *= 1.1
# Incorporate ATR
if symbol in self.atr and self.atr[symbol].IsReady:
current_atr = self.atr[symbol].Current.Value
atr_pct = current_atr / current_price if current_price > 0 else 0
effective_weight = self.dynamic_stop_weight
if atr_pct > stop_threshold * 1.2: effective_weight = min(self.dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold + effective_weight * atr_pct)
# Check stop loss condition (use percentage drop)
if price_drop_pct >= stop_threshold:
self.algorithm.Log(f"RiskControl Stop-loss triggered for {symbol.Value}: Drop {price_drop_pct*100:.1f}% >= Threshold {stop_threshold*100:.1f}%")
self.algorithm.Liquidate(symbol, f"RiskControl Stop Loss {price_drop_pct*100:.1f}%")
stop_loss_triggered = True
if symbol in self.entry_prices: del self.entry_prices[symbol] # Remove entry price after stop
# If stop-loss triggered, invest remaining cash in BIL
if stop_loss_triggered:
cash = self.algorithm.Portfolio.Cash
if cash > 100: # Minimum amount to invest
bil_price = self.algorithm.Securities[self.bil].Price
if bil_price > 0:
bil_quantity = self.algorithm.CalculateOrderQuantity(self.bil, 1.0) # Allocate 100% of *available cash*
if bil_quantity != 0:
self.algorithm.MarketOrder(self.bil, bil_quantity, tag="RiskControl Post-StopLoss")
self.algorithm.Log(f"RiskControl: Invested remaining cash ({cash:.2f}) in BIL after stop-loss.")
def WeeklyDefensiveAdjustment(self):
# Only run if this strategy is active
if not self.algorithm.is_risk_control_active: return
self.algorithm.Log("RiskControl: Running Weekly Defensive Adjustment Check")
# Skip if monthly rebalance just happened
days_since_rebalance = (self.algorithm.Time.date() - self.last_rebalance_date.date()).days
if days_since_rebalance < 3:
self.algorithm.Log("RiskControl WeeklyDefensive: Skipping, too soon after monthly rebalance.")
return
# Skip if updated recently
days_since_update = (self.algorithm.Time.date() - self.last_defensive_update.date()).days
if days_since_update < 5:
self.algorithm.Log("RiskControl WeeklyDefensive: Skipping, updated within the last 5 days.")
return
# Calculate market conditions
if not self.spy_30day_window.IsReady or self.spy_30day_window.Count == 0:
self.algorithm.Log("RiskControl WeeklyDefensive: SPY window not ready.")
return
spy_price = self.algorithm.Securities[self.spy].Price
if spy_price <= 0: return
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count
market_deviation = (spy_price / sma_30) - 1.0 if sma_30 > 0 else 0
market_trend = self._calculateMarketTrend()
# Skip in strong bull markets
if market_deviation > 0.04 and market_trend > 0.03:
self.algorithm.Log("RiskControl WeeklyDefensive: Skipping, strong bull market conditions.")
return
# Calculate total portfolio value and current defensive allocation
portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue
if portfolio_value <= 0: return
current_equity_value = sum(holding.HoldingsValue for symbol, holding in self.algorithm.Portfolio.items()
if holding.Invested and symbol not in self.risk_control_symbols) # Equities not part of core defensive set
current_defensive_etf_value = sum(holding.HoldingsValue for symbol, holding in self.algorithm.Portfolio.items()
if holding.Invested and symbol in self.all_defensive)
current_bil_value = self.algorithm.Portfolio[self.bil].HoldingsValue if self.algorithm.Portfolio[self.bil].Invested else 0
total_invested_pct = (current_equity_value + current_defensive_etf_value + current_bil_value) / portfolio_value
# Available room for *new* defensive positions (target max 98% invested)
available_allocation_pct = max(0, 0.98 - total_invested_pct)
# Potential allocation comes from reducing BIL or cash
# Let's simplify: Max defensive allocation is a fixed % of portfolio, e.g., 25%
max_total_defensive_pct = 0.25
current_defensive_etf_pct = current_defensive_etf_value / portfolio_value
# How much more can we allocate to defensive ETFs?
potential_increase_pct = max(0, max_total_defensive_pct - current_defensive_etf_pct)
# Limit by available cash/BIL reduction room
potential_increase_pct = min(potential_increase_pct, available_allocation_pct)
if self.diagnostic_mode:
self.algorithm.Log(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%")
self.algorithm.Log(f"Current Defensive ETF%: {current_defensive_etf_pct*100:.2f}%, Potential Increase%: {potential_increase_pct*100:.2f}%")
if self.defensive_positions:
self.algorithm.Log(f"Current defensive positions: {[s.Value for s in self.defensive_positions]}")
# Evaluate defensive ETFs based on potential increase
new_target_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_increase_pct) # Pass potential *increase*
# Calculate changes needed
changes_made = False
current_defensive_holdings = {s: self.algorithm.Portfolio[s].HoldingsValue / portfolio_value
for s in self.all_defensive if self.algorithm.Portfolio[s].Invested}
# Add/Increase positions
for symbol, target_increase_pct in new_target_allocations.items():
if target_increase_pct > 0.01: # Minimum meaningful change
current_pct = current_defensive_holdings.get(symbol, 0)
new_target_total_pct = current_pct + target_increase_pct
self.algorithm.Log(f"WeeklyDefensive: Setting {symbol.Value} to {new_target_total_pct*100:.2f}%")
self.algorithm.SetHoldings(symbol, new_target_total_pct)
self.defensive_positions.add(symbol) # Track holdings
if symbol not in self.entry_prices: self.entry_prices[symbol] = self.algorithm.Securities[symbol].Price
changes_made = True
# Decrease/Remove positions (logic not in original _evaluateDefensiveETFs, needs adding or handle in MonthlyRebalance)
# For now, weekly only adds/increases based on available room and positive eval. Monthly handles reduction.
if changes_made:
self.last_defensive_update = self.algorithm.Time
def MonthlyRebalance(self):
# --- ADDED GUARD AND LOG ---
if not self.algorithm.is_risk_control_active:
# self.algorithm.Debug("RC MonthlyRebalance skipped: Not active.") # Keep commented unless debugging activation
return
self.algorithm.Log(f"--> RC MonthlyRebalance STARTING on {self.algorithm.Time}. Active: {self.algorithm.is_risk_control_active}")
# --- END GUARD AND LOG ---
if not self.rebalance_flag:
self.algorithm.Debug("RC MonthlyRebalance skipped: Rebalance flag not set.")
return
self.rebalance_flag = False
self.entry_prices.clear() # Reset entry prices at rebalance
if not self.spy_30day_window.IsReady:
self.algorithm.Log("RiskControl MonthlyRebalance: Waiting for SPY window.")
return
spy_price = self.algorithm.Securities[self.spy].Price
if spy_price <= 0 or self.spy_30day_window.Count == 0: return
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count
market_deviation = (spy_price / sma_30) - 1.0 if sma_30 > 0 else 0
market_trend = self._calculateMarketTrend()
# BIL allocation logic (simplified for clarity, original logic was complex)
bil_weight = 0.0
if market_deviation < -0.01: # Below MA
bil_weight = min(0.6, abs(market_deviation) * 5) # Scale up to 60% based on deviation
elif market_deviation < 0.03: # Slightly above MA
bil_weight = 0.1 # Small allocation
# else: bil_weight = 0 in strong uptrend
# Ensure minimum BIL based on previous month (original logic)
# min_bil_allocation = self.previous_bil_allocation * 0.8 # Default 20% reduction allowed
# if market_deviation > 0.05: min_bil_allocation = self.previous_bil_allocation * 0.7
# elif market_deviation > 0.02: min_bil_allocation = self.previous_bil_allocation * 0.75
# bil_weight = max(bil_weight, min_bil_allocation)
# Cap BIL weight based on market condition (simplified)
if market_deviation > 0.05: bil_weight = min(bil_weight, 0.15) # Cap 15%
elif market_deviation > 0.0: bil_weight = min(bil_weight, 0.30) # Cap 30%
else: bil_weight = min(bil_weight, 0.60) # Cap 60%
# Determine allocation available for defensive ETFs (e.g., up to 40% of potential BIL)
# Let's use a simpler approach: Allocate a fixed max % to defensive ETFs based on market
max_defensive_etf_allocation = 0.0
if market_deviation < -0.05: max_defensive_etf_allocation = 0.30 # Max 30% in strong downturn
elif market_deviation < -0.01: max_defensive_etf_allocation = 0.20 # Max 20% in mild downturn
elif market_deviation < 0.02: max_defensive_etf_allocation = 0.10 # Max 10% in neutral/weak uptrend
# Evaluate defensive ETFs
if self.diagnostic_mode: self._runDefensiveETFDiagnostics(market_deviation, market_trend)
defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, max_defensive_etf_allocation)
total_defensive_allocation = sum(defensive_allocations.values())
# Calculate remaining weight for equity portion
equity_weight = 1.0 - bil_weight - total_defensive_allocation
if equity_weight < 0: # Ensure non-negative equity weight
scale_down = (1.0 - bil_weight) / total_defensive_allocation if total_defensive_allocation > 0 else 0
total_defensive_allocation *= scale_down
for s in defensive_allocations: defensive_allocations[s] *= scale_down
equity_weight = 0
self.algorithm.Log("RiskControl: Scaled down defensive ETFs, equity weight reached zero.")
self.algorithm.Log(f"RiskControl Allocation: Equity {equity_weight*100:.1f}%, BIL {bil_weight*100:.1f}%, Defensive ETFs {total_defensive_allocation*100:.1f}%")
# Select and weight equity portion (Top 30 large caps, market cap weighted)
equity_weights = {}
if equity_weight > 0 and self.selected_by_market_cap:
# Apply momentum filter (optional, keep simple for now)
# momentum_scores = self._calculateSimpleMomentum()
# filtered_stocks = [(s, mc) for s, mc in self.selected_by_market_cap if momentum_scores.get(s, 1.0) >= 0.9]
# if len(filtered_stocks) < 20: filtered_stocks = self.selected_by_market_cap # Revert if too few
filtered_stocks = self.selected_by_market_cap # Use all selected
total_market_cap = sum([mc for s, mc in filtered_stocks])
if total_market_cap > 0:
equity_weights = {s: (mc / total_market_cap) * equity_weight for s, mc in filtered_stocks}
# --- Execute Trades ---
portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue
traded_symbols = set()
# Set Equity Positions
for symbol, weight in equity_weights.items():
if weight > 0.001: # Minimum weight
self.algorithm.SetHoldings(symbol, weight)
traded_symbols.add(symbol)
self.entry_prices[symbol] = self.algorithm.Securities[symbol].Price
# Set BIL Position
if bil_weight > 0.001:
self.algorithm.SetHoldings(self.bil, bil_weight)
traded_symbols.add(self.bil)
else:
if self.algorithm.Portfolio[self.bil].Invested:
self.algorithm.Liquidate(self.bil, "RiskControl BIL Rebalance")
# Set Defensive ETF Positions
self.defensive_positions.clear() # Reset tracked defensive positions
for symbol, weight in defensive_allocations.items():
if weight > 0.001:
self.algorithm.SetHoldings(symbol, weight)
traded_symbols.add(symbol)
self.defensive_positions.add(symbol) # Track active defensive holdings
self.entry_prices[symbol] = self.algorithm.Securities[symbol].Price
# self.algorithm.Log(f"RiskControl: Allocated {weight*100:.2f}% to defensive ETF {symbol.Value}")
# Liquidate positions not targeted in this rebalance
for symbol, holding in list(self.algorithm.Portfolio.items()):
if holding.Invested and symbol not in traded_symbols:
# Check if it's an equity or defensive ETF that should be liquidated
is_equity = symbol in [s for s, mc in self.selected_by_market_cap]
is_defensive = symbol in self.all_defensive
if is_equity or is_defensive or symbol == self.bil: # Liquidate BIL if weight is 0
self.algorithm.Log(f"RiskControl: Liquidating {symbol.Value} (no longer targeted).")
self.algorithm.Liquidate(symbol, "RiskControl Monthly Rebalance")
# Update trackers
self.last_rebalance_date = self.algorithm.Time
self.previous_bil_allocation = bil_weight # Store for potential future use
self.algorithm.Log(f"--> RC MonthlyRebalance FINISHED on {self.algorithm.Time}.")
def _calculateMarketTrend(self):
"""Calculate recent market trend using SPY price history"""
if len(self.spy_prices) < self.trend_lookback + 1: return 0
dates = sorted(self.spy_prices.keys())
if len(dates) <= self.trend_lookback: return 0
recent_price = self.spy_prices[dates[-1]]
older_price = self.spy_prices[dates[-self.trend_lookback]]
return (recent_price / older_price) - 1.0 if older_price > 0 else 0
def _calculateSimpleMomentum(self):
"""Calculate simple momentum scores for stock filtering (Optional)"""
momentum_scores = {}
symbols = [sym for sym, _ in self.selected_by_market_cap]
if not symbols: return momentum_scores
history = self.algorithm.History(symbols, 30, Resolution.Daily)
if history.empty: return momentum_scores
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
mom = prices.iloc[-1] / prices.iloc[0] - 1
momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2)))
return momentum_scores
def _runDefensiveETFDiagnostics(self, market_deviation, market_trend):
"""Run detailed diagnostics on all defensive ETFs"""
if not self.diagnostic_mode: return
self.algorithm.Log("--- RiskControl Defensive ETF Diagnostics ---")
symbols_to_request = self.all_defensive + [self.spy]
history = self.algorithm.History(symbols_to_request, 90, Resolution.Daily)
if history.empty:
self.algorithm.Log("Diagnostics: History request failed.")
return
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0,
"15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
self.algorithm.Log(f"DIAGNOSTIC - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%")
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1
rel_30d = perf_30d - spy_perf.get('30d', 0)
self.algorithm.Log(f" {symbol.Value}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, 30d: {perf_30d*100:.2f}%, Rel30d: {rel_30d*100:.2f}%")
self.algorithm.Log("--- End Diagnostics ---")
def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_total_allocation):
"""Enhanced defensive ETF evaluation with scoring"""
allocations = {symbol: 0 for symbol in self.all_defensive}
if max_total_allocation <= 0: return allocations # No room for defensive ETFs
# Skip if market is very bullish
if market_deviation > 0.04 and market_trend > 0.02:
return allocations
symbols_to_request = self.all_defensive + [self.spy]
history = self.algorithm.History(symbols_to_request, 60, Resolution.Daily)
if history.empty: return allocations
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = { # Use consistent periods
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
etf_scores = {}
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
perf = {
"5d": prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0,
"10d": prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0,
"20d": prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0,
"30d": prices.iloc[-1] / prices.iloc[-30] - 1
}
rel_perf = {p: perf[p] - spy_perf.get(p, 0) for p in perf}
# Scoring logic (simplified example)
# Weight recent performance and relative performance
abs_score = (perf["5d"] * 0.4) + (perf["10d"] * 0.3) + (perf["20d"] * 0.2) + (perf["30d"] * 0.1)
rel_score = (rel_perf["5d"] * 0.4) + (rel_perf["10d"] * 0.3) + (rel_perf["20d"] * 0.2) + (rel_perf["30d"] * 0.1)
# Combine scores based on ETF type and market condition
score = 0
is_inverse = symbol in self.inverse_etfs
is_alternative = symbol in self.alternative_defensive
is_sector = symbol in self.sector_defensive
if market_deviation < -0.02: # Downtrend
if is_inverse: score = abs_score # Inverse should go up
elif is_alternative: score = (abs_score * 0.5) + (rel_score * 0.5) # Balance abs/rel
elif is_sector: score = (abs_score * 0.3) + (rel_score * 0.7) # Favor relative outperformance
else: # Neutral / Uptrend
if is_inverse: score = abs_score * 0.5 # Penalize inverse unless strongly positive
elif is_alternative: score = abs_score # Absolute return matters more
elif is_sector: score = (abs_score * 0.6) + (rel_score * 0.4) # Favor absolute
etf_scores[symbol] = score
# Filter candidates with positive scores
candidates = {s: score for s, score in etf_scores.items() if score > 0.001} # Small positive threshold
if not candidates: return allocations
# Sort candidates
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
# Select top N candidates (e.g., top 2)
num_etfs_to_select = min(2, len(sorted_candidates))
selected_etfs = sorted_candidates[:num_etfs_to_select]
# Allocate proportionally to score among selected ETFs
total_score_selected = sum(score for _, score in selected_etfs)
if total_score_selected > 0:
for symbol, score in selected_etfs:
weight = score / total_score_selected
allocations[symbol] = max_total_allocation * weight
# self.algorithm.Log(f"RiskControl EvalDefensive: Selected {symbol.Value}, Score: {score:.4f}, Alloc: {allocations[symbol]*100:.2f}%")
return allocations
from AlgorithmImports import *
import numpy as np
from datetime import timedelta
class MarketCapWeightedSP500Tracker(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2025, 1, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Daily
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.selected_by_market_cap = []
self.rebalance_flag = False
self.spy_30day_window = RollingWindow[float](30)
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
# Initialize rolling window with historical data
history = self.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
# Add simple tracking of market trend
self.trend_lookback = 10
self.spy_prices = {}
self.max_spy_history = 60 # Days of price history to keep
# Add dynamic stop-loss enhancement
self.stop_loss_base = 0.04 # Reduced base stop-loss threshold
self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold
# Expanded list of inverse and defensive ETFs
# Original inverse ETFs
self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500
self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100
self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones
self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000
self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets
self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400
# Alternative defensive ETFs (not inverse but potentially good in downturns)
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury
self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market
# Sector-based defensive ETFs (often outperform in bear markets)
self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples
self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities
self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare
self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare
self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples
# Group all defensive ETFs together
self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
self.alternative_defensive = [self.gld, self.ief, self.bnd]
self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive
# Add diagnostic logging capability
self.diagnostic_mode = True # Enable detailed diagnostics
# Initialize positions tracking and add weekly tactical adjustment
self.defensive_positions = set()
self.last_defensive_update = datetime(1900, 1, 1)
# Add weekly defensive ETF evaluation schedule
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance
self.WeeklyDefensiveAdjustment)
# Initialize positions tracking
self.inverse_positions = set()
# Add inverse ETF lookback windows for better momentum calculation
self.inverse_lookback_short = 7 # 1 week momentum window
self.inverse_lookback_med = 15 # Medium-term momentum
# Add ATR indicators for enhanced volatility-based stop-loss calculation
self.atr_period = 14
self.atr = {}
# Register ATR for key symbols (defensive ETFs, BIL, and SPY)
for symbol in self.all_defensive + [self.bil, self.spy]:
self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily)
# Initialize defensive strategy handler
self.defensive_strategy = DefensiveStrategyHandler(self, {})
self.defensive_strategy.Initialize()
def CoarseSelectionFunction(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
return [x.Symbol for x in sorted_by_cap]
def SetRebalanceFlag(self):
if self.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
def OnData(self, data):
# Update price window
if not data.Bars.ContainsKey(self.spy): return
self.spy_30day_window.Add(data.Bars[self.spy].Close)
# Track prices for trend calculation
self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close
# Remove old prices
dates_to_remove = []
for date in self.spy_prices.keys():
if (self.Time.date() - date).days > self.max_spy_history:
dates_to_remove.append(date)
for date in dates_to_remove:
self.spy_prices.pop(date)
market_trend = self._calculateMarketTrend()
# Track if any stop-loss was triggered
stop_loss_triggered = False
# Check stop-loss triggers with improved dynamic thresholds
for kvp in self.Portfolio:
symbol = kvp.Key
holding = kvp.Value
if holding.Invested and symbol != self.bil:
current_price = self.Securities[symbol].Price
if symbol not in self.entry_prices:
self.entry_prices[symbol] = current_price
price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol]
# Start with the base threshold and adjust based on market trend
stop_threshold = self.stop_loss_base
if market_trend < -0.03:
stop_threshold *= 0.9 # tighten in downtrends
elif market_trend > 0.03:
stop_threshold *= 1.1 # loosen in uptrends
# Incorporate ATR if ready with adjustment to prevent overreaction in high volatility
if symbol in self.atr and self.atr[symbol].IsReady:
current_atr = self.atr[symbol].Current.Value
atr_pct = current_atr / current_price
# If ATR is excessively high versus our base, use a lower weight to temper the effect
effective_weight = self.dynamic_stop_weight
if atr_pct > stop_threshold * 1.2:
effective_weight = min(self.dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold +
effective_weight * atr_pct)
if price_drop >= stop_threshold:
self.Liquidate(symbol)
stop_loss_triggered = True
self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
# If any stop-loss was triggered, invest all available cash in BIL
if stop_loss_triggered:
available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash
if available_cash > 0:
bil_price = self.Securities[self.bil].Price
bil_quantity = available_cash / bil_price
self.MarketOrder(self.bil, bil_quantity)
self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss")
# Call defensive strategy handler
self.defensive_strategy.OnData(data)
def WeeklyDefensiveAdjustment(self):
"""Weekly check and adjustment for defensive ETF positions"""
# Skip if we've done the monthly rebalance recently
days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999
if days_since_rebalance < 3:
return
# Skip if we've updated defensive positions recently
days_since_update = (self.Time.date() - self.last_defensive_update.date()).days
if days_since_update < 5: # At most once a week
return
# Calculate current market conditions
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Skip in strong bull markets
if market_deviation > 0.04 and market_trend > 0.03:
return
# Calculate total invested amount including all positions
total_invested = sum(holding.HoldingsValue for holding in self.Portfolio.Values
if holding.Invested) / self.Portfolio.TotalPortfolioValue
# If we're already fully invested, can't add more defensive positions
if total_invested >= 0.98: # Allow small buffer for rounding errors
self.Debug(f"Already fully invested ({total_invested:.2f}), skipping defensive adjustments")
return
# Calculate available room for defensive positions
available_allocation = max(0, 0.99 - total_invested) # Keep tiny buffer
# Calculate how much is currently allocated to defensive positions
current_defensive_value = sum(self.Portfolio[s].HoldingsValue
for s in self.defensive_positions
if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested)
# Calculate current BIL allocation
current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0
bil_allocation = current_bil_value / self.Portfolio.TotalPortfolioValue
# Limit potential allocation to available room
max_defensive_pct = min(0.25, available_allocation / bil_allocation if bil_allocation > 0 else 0)
potential_allocation = bil_allocation * max_defensive_pct
# Make sure we don't exceed available room
potential_allocation = min(potential_allocation, available_allocation)
# Super detailed diagnostics for current defensive positions
if self.diagnostic_mode and self.defensive_positions:
self.Debug(f"WEEKLY CHECK - Current defensive positions:")
for symbol in self.defensive_positions:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
position = self.Portfolio[symbol]
entry = self.entry_prices.get(symbol, position.AveragePrice)
current = self.Securities[symbol].Price
pnl_pct = (current / entry) - 1 if entry > 0 else 0
self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f}")
# Evaluate current defensive positions and potential new ones
self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%")
self.Debug(f"BIL allocation: {bil_allocation*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%")
# Run the defensive ETF evaluation
new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation)
# Calculate which positions to add, modify, or remove
positions_to_add = {}
positions_to_remove = set()
# Process existing positions
for symbol in self.defensive_positions:
# If position should be kept but maybe at different allocation
if symbol in new_allocations and new_allocations[symbol] > 0:
current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue if self.Portfolio.ContainsKey(symbol) else 0
target_pct = new_allocations[symbol]
# If allocation difference is significant, adjust position
if abs(target_pct - current_pct) > 0.01:
positions_to_add[symbol] = target_pct
# Remove from new allocations dict to avoid double-processing
new_allocations.pop(symbol)
else:
# Position should be removed
positions_to_remove.add(symbol)
# Add any remaining new positions
for symbol, allocation in new_allocations.items():
if allocation > 0.01: # Minimum meaningful allocation
positions_to_add[symbol] = allocation
# Check if we'll exceed our allocation limits with new positions
total_new_allocation = sum(positions_to_add.values())
if total_new_allocation > available_allocation:
# Scale back allocations to fit available space
scale_factor = available_allocation / total_new_allocation
for symbol in positions_to_add:
positions_to_add[symbol] *= scale_factor
self.Debug(f"Scaled defensive allocations to fit available space: {scale_factor:.4f}")
# Execute trades if needed
if positions_to_add or positions_to_remove:
self.Debug(f"WEEKLY ADJUSTMENT - Making defensive position changes")
# Remove positions no longer needed
for symbol in positions_to_remove:
self.Liquidate(symbol)
self.defensive_positions.remove(symbol)
self.Debug(f"Removed defensive position: {symbol}")
# Add or adjust positions
for symbol, allocation in positions_to_add.items():
self.SetHoldings(symbol, allocation)
self.defensive_positions.add(symbol)
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Updated defensive position: {symbol} to {allocation*100:.2f}%")
self.last_defensive_update = self.Time
def MonthlyRebalance(self):
if not self.rebalance_flag: return
self.rebalance_flag = False
self.entry_prices.clear() # Reset entry prices at rebalance
if self.spy_30day_window.Count < 30:
self.Debug("Waiting for enough SPY history.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / 30
# Calculate market deviation for better decisions
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Enhanced BIL allocation logic with lower caps
bil_weight = 0.0
if spy_price < sma_30:
# Enhanced formula for better downside protection
base_weight = (sma_30 - spy_price) / sma_30
if base_weight > 0.08: # Significant drop
# Lower cap on BIL for significant drops
bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%)
else:
bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%)
# Enhanced reduction rule for better returns in bull markets
if market_deviation > 0.05: # Strong bull market
min_bil_allocation = self.previous_bil_allocation * 0.7 # 30% reduction
elif market_deviation > 0.02: # Modest bull market
min_bil_allocation = self.previous_bil_allocation * 0.75 # 25% reduction
else:
min_bil_allocation = self.previous_bil_allocation * 0.8 # Standard 20% reduction
bil_weight = max(bil_weight, min_bil_allocation)
# Lower caps on BIL in all market conditions
if market_deviation > 0.08: # Very strong bull
bil_weight = min(bil_weight, 0.15) # Cap at 15% (was 20%)
elif market_deviation > 0.05: # Strong bull
bil_weight = min(bil_weight, 0.25) # Cap at 25% (was 30%)
elif market_deviation > 0.0: # Mild bull
bil_weight = min(bil_weight, 0.4) # Cap at 40% (new tier)
elif market_deviation > -0.03: # Neutral
bil_weight = min(bil_weight, 0.5) # Cap at 50% (new tier)
else: # Bear
bil_weight = min(bil_weight, 0.6) # Cap at 60% (new tier)
# Calculate how much of the original BIL allocation to potentially use for inverse ETFs
original_bil = bil_weight
# Use only a portion of BIL for inverse ETFs, keeping some as BIL
inverse_etf_potential = original_bil * 0.4 # Use 40% of BIL allocation for inverse ETFs
bil_weight = original_bil - inverse_etf_potential
# Run diagnostics on defensive ETFs
if self.diagnostic_mode:
self._runDefensiveETFDiagnostics(market_deviation, market_trend)
# Evaluate inverse ETFs for possible allocation
inverse_allocations = self._evaluateInverseETFs(market_deviation, market_trend, inverse_etf_potential)
# Include alternative defensive ETFs in evaluation
all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, inverse_etf_potential)
# Calculate total allocation to defensive ETFs
total_defensive_allocation = sum(all_defensive_allocations.values())
# Set aside remainder as cash (won't be allocated)
cash_reserve = inverse_etf_potential - total_defensive_allocation
# Calculate weight for equity portion
equity_weight = 1.0 - total_defensive_allocation
# Ensure total allocation never exceeds 100%
total_allocation = bil_weight + total_defensive_allocation + equity_weight
if total_allocation > 1.0:
# Scale back components proportionally
scale_factor = 1.0 / total_allocation
bil_weight *= scale_factor
equity_weight *= scale_factor
# Scale each defensive allocation
for symbol in all_defensive_allocations:
all_defensive_allocations[symbol] *= scale_factor
total_defensive_allocation = sum(all_defensive_allocations.values())
self.Debug(f"Scaled allocations to prevent leverage: {scale_factor:.4f}")
self.Debug(f"Allocation breakdown: Equity {equity_weight*100:.1f}%, BIL {bil_weight*100:.1f}%, " +
f"Defensive ETFs {total_defensive_allocation*100:.1f}%, Cash {cash_reserve*100:.1f}%")
# Enhance stock selection with simple momentum filter
momentum_scores = self._calculateSimpleMomentum()
# Filter out worst momentum stocks
filtered_stocks = []
for symbol, mcap in self.selected_by_market_cap:
score = momentum_scores.get(symbol, 1.0)
if score >= 0.9: # Keep only neutral or positive momentum stocks
filtered_stocks.append((symbol, mcap))
# If we filtered too many, revert to original list
if len(filtered_stocks) < 20:
filtered_stocks = self.selected_by_market_cap
# Calculate weights using the filtered stocks
total_market_cap = sum([x[1] for x in filtered_stocks])
weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in filtered_stocks}
invested = set()
for symbol, weight in weights.items():
if weight > 0:
self.SetHoldings(symbol, weight)
invested.add(symbol)
self.entry_prices[symbol] = self.Securities[symbol].Price
# Set BIL position
if bil_weight > 0:
self.SetHoldings(self.bil, bil_weight)
invested.add(self.bil)
else:
self.Liquidate(self.bil)
# Set defensive ETF positions
for symbol, weight in all_defensive_allocations.items():
if weight > 0:
self.SetHoldings(symbol, weight)
invested.add(symbol)
self.defensive_positions.add(symbol) # Using renamed set
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Allocated {weight*100:.2f}% to defensive ETF {symbol}")
elif symbol in self.defensive_positions:
self.Liquidate(symbol)
self.defensive_positions.remove(symbol)
# Update last rebalance date tracker
self.last_rebalance_date = self.Time
# Store current BIL allocation for next month's minimum
self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue
self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)")
# Liquidate positions not in current selection
for kvp in self.Portfolio:
symbol = kvp.Key
if (kvp.Value.Invested and symbol not in invested
and symbol != self.spy and symbol not in self.defensive_positions):
self.Liquidate(symbol)
def _calculateMarketTrend(self):
"""Calculate recent market trend using price history"""
if len(self.spy_prices) < self.trend_lookback + 1:
return 0 # Not enough data
dates = sorted(self.spy_prices.keys())
if len(dates) <= self.trend_lookback:
return 0
recent_price = self.spy_prices[dates[-1]]
older_price = self.spy_prices[dates[-self.trend_lookback]]
return (recent_price / older_price) - 1.0
def _calculateSimpleMomentum(self):
"""Calculate simple momentum scores for stock filtering"""
momentum_scores = {}
symbols = [sym for sym, _ in self.selected_by_market_cap]
if not symbols:
return momentum_scores
# Get 30 days of history for all stocks
history = self.History(symbols, 30, Resolution.Daily)
if history.empty:
return momentum_scores
# Calculate simple momentum (30-day price change)
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# 30-day momentum
mom = prices.iloc[-1] / prices.iloc[0] - 1
# Convert to a score between 0.7 and 1.3
# Center around 1.0, with range based on 15% move
momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2)))
return momentum_scores
def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced evaluation of inverse ETFs with more sensitive criteria"""
allocations = {symbol: 0 for symbol in self.inverse_etfs}
# More permissive consideration of inverse ETFs
if market_deviation > 0.04 and market_trend > 0.02:
return allocations # Only skip in very strong bull markets
# Get more history for better momentum calculation
history = self.History(self.inverse_etfs, 45, Resolution.Daily)
if history.empty:
return allocations
# Enhanced momentum scoring
momentum_scores = {}
volatility_scores = {}
for symbol in self.inverse_etfs:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Multiple timeframe momentum - more emphasis on recent performance
mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
mom_30d = prices.iloc[-1] / prices.iloc[0] - 1
# Weight recent momentum much more heavily
momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2)
# Calculate volatility (lower is better for inverse ETFs)
returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))]
volatility = np.std(returns) if returns else 0
# Calculate short-term rate of change (acceleration)
if len(prices) >= 10:
recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1
prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1
acceleration = recent_5d_change - prev_5d_change
else:
acceleration = 0
# Momentum score adds weight for accelerating performance
momentum_scores[symbol] = momentum + (acceleration * 0.5)
volatility_scores[symbol] = volatility
# More aggressive filtering - consider even small positive momentum
positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005}
# No allocation if no ETFs have at least neutral momentum
if not positive_momentum_etfs:
self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash")
return allocations
# Enhanced selection: favor momentum but consider volatility too
best_candidates = []
for symbol, score in positive_momentum_etfs.items():
volatility = volatility_scores.get(symbol, 1.0)
# Adjust score: higher momentum is good, lower volatility is good
adjusted_score = score - (volatility * 0.5)
best_candidates.append((symbol, score, adjusted_score))
# Sort by adjusted score
best_candidates.sort(key=lambda x: x[2], reverse=True)
# More aggressive allocation model
allocation_pct = 0.0
# Allocate based on market conditions with more sensitivity
if market_deviation < -0.05:
allocation_pct = 1.0 # Use 100% of available inverse allocation
elif market_deviation < -0.03:
allocation_pct = 0.8 # Use 80% of available inverse allocation
elif market_deviation < -0.01:
allocation_pct = 0.6 # Use 60% of available inverse allocation
elif market_deviation < 0.01: # Even in slight bull market if momentum is positive
allocation_pct = 0.4 # Use 40% of available inverse allocation
else:
allocation_pct = 0.2 # Use 20% only if momentum is strong enough
# No candidates or market conditions don't justify allocation
if not best_candidates or allocation_pct < 0.1:
return allocations
# Take top 1-2 ETFs depending on market conditions
num_etfs = 1
if market_deviation < -0.04 and len(best_candidates) > 1:
num_etfs = 2 # Use two ETFs in stronger downtrends
# Allocate to best ETF(s)
remaining_allocation = max_allocation * allocation_pct
for i in range(min(num_etfs, len(best_candidates))):
symbol, raw_score, _ = best_candidates[i]
# Allocate proportionally to momentum strength, with a minimum threshold
etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3
# Calculate allocation for this ETF
etf_allocation = remaining_allocation * etf_weight / num_etfs
# Only allocate if it's a meaningful amount
if etf_allocation >= 0.01: # At least 1% allocation
allocations[symbol] = etf_allocation
self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%")
return allocations
def _runDefensiveETFDiagnostics(self, market_deviation, market_trend):
"""Run detailed diagnostics on all defensive ETFs"""
# Get extensive history for analysis
history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily)
if history.empty:
return
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0,
"15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
# Log market conditions
self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " +
f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%")
# Analyze each ETF
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate multiple timeframe performance
perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate recent acceleration
recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
accel = recent_5d - prev_5d
# Calculate relative performance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
if period == "7d":
rel_perf[period] = perf_7d - spy_val
elif period == "15d":
rel_perf[period] = perf_15d - spy_val
elif period == "30d":
rel_perf[period] = perf_30d - spy_val
# Log detailed ETF statistics
self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " +
f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " +
f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%")
def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced defensive ETF evaluation with sector rotation"""
allocations = {symbol: 0 for symbol in self.all_defensive}
# Skip if market is very bullish
if market_deviation > 0.04 and market_trend > 0.02:
return allocations
# Get history for all defensive options and SPY
history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily)
if history.empty:
return allocations
# Detailed diagnostics on all ETFs
self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS:")
# Calculate SPY performance for relative comparisons
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
self.Debug(f" SPY: 5d: {spy_perf['5d']*100:.1f}%, 10d: {spy_perf['10d']*100:.1f}%, " +
f"20d: {spy_perf['20d']*100:.1f}%, 30d: {spy_perf['30d']*100:.1f}%")
# Enhanced scoring system with different criteria for different ETF types
etf_scores = {}
# Process each ETF by type
for group_name, group in [("Inverse", self.inverse_etfs),
("Alternative", self.alternative_defensive),
("Sector", self.sector_defensive)]:
self.Debug(f" {group_name} ETFs:")
for symbol in group:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate absolute momentum components
perf = {}
perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0
perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate relative outperformance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
rel_perf[period] = perf[period] - spy_val
# Log detailed performance
self.Debug(f" {symbol}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf['5d']*100:+.1f}%), " +
f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf['10d']*100:+.1f}%), " +
f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf['30d']*100:+.1f}%)")
# Inverse ETFs need to show positive momentum in down markets
if symbol in self.inverse_etfs:
# In downtrends, rising inverse ETFs are good
if market_deviation < -0.02:
score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2)
# Bonus for relative outperformance
score += (rel_perf["5d"] + rel_perf["10d"]) * 0.15
else:
# Less emphasis on long-term performance in neutral markets
score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
# Alternative defensive (bonds, gold) - focus on absolute return
elif symbol in self.alternative_defensive:
# Less dramatic movements, need lower thresholds
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
# In downtrends, emphasize relative performance more
if market_deviation < -0.03:
score += rel_perf["10d"] * 0.2 # Bonus for outperformance
# Sector ETFs - focus on relative outperformance
else:
# These should have positive absolute returns and outperform SPY
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score = (rel_perf["5d"] * 0.3) + (rel_perf["10d"] * 0.3) + (rel_perf["30d"] * 0.4)
# Balance absolute and relative performance
if market_deviation < -0.02:
# In downtrends, relative outperformance is more important
score = (abs_score * 0.4) + (rel_score * 0.6)
else:
# In neutral markets, absolute performance matters more
score = (abs_score * 0.6) + (rel_score * 0.4)
etf_scores[symbol] = score
# Find candidates with appropriate momentum based on market conditions
threshold = -0.007 # Default threshold
if market_deviation < -0.03:
threshold = -0.01 # More permissive in stronger downturns
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Debug("No defensive ETFs showed sufficient momentum - keeping as cash")
return allocations
# Sort and log candidate scores
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
self.Debug(f"Top 5 defensive candidates:")
for symbol, score in sorted_candidates[:5]:
group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f" {symbol} ({group}): Score {score*100:.2f}%")
# Set allocation percent based on market conditions and trend
allocation_pct = 0.0
if market_deviation < -0.05 or market_trend < -0.04:
allocation_pct = 0.95 # Almost all available allocation
elif market_deviation < -0.03 or market_trend < -0.02:
allocation_pct = 0.8
elif market_deviation < -0.01 or market_trend < -0.01:
allocation_pct = 0.6
else:
allocation_pct = 0.4
# Adjust allocation based on strength of best candidate
best_score = sorted_candidates[0][1] if sorted_candidates else 0
allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4))
# Determine number of ETFs to use - more in stronger downtrends
num_etfs = 1
if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1:
num_etfs = min(2, len(sorted_candidates))
# Allocate to best candidates
remaining_allocation = max_allocation * allocation_pct
total_score = sum(score for _, score in sorted_candidates[:num_etfs])
if total_score > 0:
for i in range(num_etfs):
symbol, score = sorted_candidates[i]
# Weight by relative score
weight = score / total_score if total_score > 0 else 1.0/num_etfs
# Calculate allocation
etf_allocation = remaining_allocation * weight
# Only allocate if meaningful
if etf_allocation >= 0.02: # 2% minimum allocation
allocations[symbol] = etf_allocation
etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f"Selected {etf_type} ETF {symbol} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%")
return allocations
class DefensiveStrategyHandler:
def __init__(self, algorithm, config):
self.algo = algorithm
self.bnd = None
self.defensive_positions = set()
self.atr = {}
self.stop_loss_base = 0.04
self.dynamic_stop_weight = 0.5
self.tickers = [] # Initialize tickers as an empty list
def Initialize(self):
self.bnd = self.algo.AddEquity("BND", Resolution.Daily).Symbol
self.atr[self.bnd] = self.algo.ATR(self.bnd, 14, Resolution.Daily)
def OnData(self, data):
self.algo.Debug("DefensiveStrategyHandler: Processing OnData.")
if not self.tickers:
self.algo.Debug("DefensiveStrategyHandler: No tickers to process.")
return
self.CheckStopLosses()
self.AdjustDefensivePositions()
self.algo.Debug("DefensiveStrategyHandler: Adjusted defensive positions.")
def CheckStopLosses(self):
for symbol in self.defensive_positions:
holding = self.algo.Portfolio[symbol]
if holding.Invested:
current_price = self.algo.Securities[symbol].Price
entry_price = holding.AveragePrice
price_drop = (entry_price - current_price) / entry_price
stop_threshold = self.stop_loss_base
if symbol in self.atr and self.atr[symbol].IsReady:
atr_value = self.atr[symbol].Current.Value
atr_pct = atr_value / current_price
stop_threshold = ((1 - self.dynamic_stop_weight) * stop_threshold +
self.dynamic_stop_weight * atr_pct)
if price_drop >= stop_threshold:
self.algo.Liquidate(symbol)
def AdjustDefensivePositions(self):
if not self.algo.Portfolio[self.bnd].Invested:
self.algo.SetHoldings(self.bnd, 0.8)
def GetManagedSymbols(self):
return [self.bnd]
def OnSwitchOut(self):
self.algo.Debug("DefensiveStrategyHandler: Switched out.")
for symbol in self.GetManagedSymbols():
self.algo.Liquidate(symbol)
self.tickers.clear()
self.algo.Debug("DefensiveStrategyHandler: Liquidated positions and cleared tickers.")
def OnSwitchIn(self):
self.algo.Debug("DefensiveStrategyHandler: Switched in.")
self.tickers.clear() # Clear tickers to ensure fresh start
self.algo.Debug("DefensiveStrategyHandler: Cleared tickers.")# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
from sklearn.preprocessing import RobustScaler
class KQTStrategy:
def __init__(self):
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {}
self.adaptive_threshold = 0.1
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = []
self.defensive_mode = False
self.previous_day_hit_stops = []
self.algorithm = None
def calculate_portfolio_risk_score(self, market_returns):
"""Calculate a portfolio risk score (0-100) to scale overall exposure"""
risk_score = 50 # Neutral starting point
# VIX-like volatility measurement using SPY returns
if len(market_returns) >= 5:
recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized
longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol
# Volatility spike detection
vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1
if vol_ratio > 1.5: # Sharp volatility increase
risk_score -= 30
elif vol_ratio > 1.2:
risk_score -= 15
# Consecutive negative days
if len(market_returns) >= 3:
neg_days = sum(1 for r in market_returns[-3:] if r < 0)
if neg_days == 3: # Three consecutive down days
risk_score -= 20
elif neg_days == 2:
risk_score -= 10
# Trend direction
if len(market_returns) >= 10:
avg_recent = np.mean(market_returns[-5:])
avg_older = np.mean(market_returns[-10:-5])
trend_change = avg_recent - avg_older
# Declining trend
if trend_change < -0.3:
risk_score -= 15
# Accelerating uptrend
elif trend_change > 0.3 and avg_recent > 0:
risk_score += 10
return max(10, min(100, risk_score)) # Constrain between 10-100
def detect_market_regime(self, daily_returns, lookback=10):
"""Detect current market regime based on portfolio returns"""
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(self.portfolio_returns[-3:])
less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
if len(self.portfolio_returns) >= 5:
recent_portfolio_returns = self.portfolio_returns[-5:]
pos_days = sum(1 for r in recent_portfolio_returns if r > 0)
neg_days = sum(1 for r in recent_portfolio_returns if r < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(self.portfolio_returns) >= 10:
recent_vol = np.std(self.portfolio_returns[-5:])
older_vol = np.std(self.portfolio_returns[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(self.portfolio_returns) >= 5:
if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None, algorithm=None): # Add algorithm parameter
"""Generate position sizing based on predictions with improved diversification"""
# Store algorithm instance for logging
if algorithm:
self.algorithm = algorithm
else:
# Fallback if algorithm instance isn't passed (should not happen from module)
print("Warning: Algorithm instance not provided to generate_positions for logging.")
log_func = print
log_func = self.algorithm.Log if self.algorithm else print # Use Log for important info
# --- Logging Start ---
log_func(f"--- generate_positions ---")
log_func(f"Input predictions count: {len(prediction_data)}") # Log count instead of full dict initially
# self.algorithm.Debug(f"Input predictions data: {prediction_data}") # Use Debug for potentially large dict
log_func(f"Input market returns: {current_returns}")
# --- Logging End ---
if not prediction_data:
log_func("generate_positions: No prediction data provided.")
return {}
# Update market regime
if current_returns is not None and len(current_returns) > 0:
self.current_regime = self.detect_market_regime(current_returns)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5
else:
# Default if no returns provided
self.current_regime = "neutral"
self.defensive_mode = False
# --- Define Bullish Regimes and Tech Sector ---
bullish_regimes = {"bullish_strong", "breakout_bullish", "bullish", "bullish_pullback"}
is_bullish = self.current_regime in bullish_regimes
# !!! IMPORTANT: Verify this identifier matches your actual sector data (e.g., GICS code '45') !!!
TECH_SECTOR_IDENTIFIER = '45'
tech_boost_factor = 1.15 # Apply a 15% boost in bullish regimes
# ---
# Calculate portfolio risk score (0-100)
portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else [])
# Convert to a scaling factor (0.1 to 1.0)
risk_scaling = portfolio_risk_score / 100
# --- INCREASE MIN RISK SCALING FLOOR ---
min_risk_scaling = 0.75 # Increased from 0.4 to 0.75 (ensures at least 75% of potential allocation is used)
# ---
risk_scaling = max(min_risk_scaling, risk_scaling)
# ---
# --- Logging ---
log_func(f"Regime: {self.current_regime}, Defensive Mode: {self.defensive_mode}")
log_func(f"Portfolio Risk Score: {portfolio_risk_score}, Risk Scaling (min {min_risk_scaling}): {risk_scaling:.2f}")
# --- Logging End ---
# Adjust threshold based on regime (using the fixed default threshold now)
base_threshold = self.adaptive_threshold # Use the fixed threshold from __init__
current_threshold = base_threshold # Keep it simple for now, regime adjustment might need tuning for fallback scores
# --- Logging ---
log_func(f"Using Threshold: {current_threshold}")
# --- Logging End ---
positions = {}
# Group stocks by sector
sector_data = {}
valid_predictions = 0
for ticker, data in prediction_data.items():
# Ensure data has the expected keys
if "pred_return" not in data:
log_func(f"Warning: Missing 'pred_return' for {ticker}")
continue
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
# --- Apply Tech Boost in Bullish Regime ---
boost_applied = False
if is_bullish and sector == TECH_SECTOR_IDENTIFIER:
original_pred = pred_return
pred_return *= tech_boost_factor
boost_applied = True
# Optional Debug Log:
# self.algorithm.Debug(f"Applied {tech_boost_factor}x boost to {ticker} (Tech) in {self.current_regime} regime. Original: {original_pred:.4f}, Boosted: {pred_return:.4f}")
# ---
if sector not in sector_data:
sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return, # Use potentially boosted value
# Use the current_threshold for composite score
"composite_score": pred_return / current_threshold if current_threshold != 0 else pred_return
})
valid_predictions += 1
# --- ADDED LOG ---
log_func(f"Found {valid_predictions} valid predictions.")
# ---
if valid_predictions == 0:
log_func("generate_positions: No valid predictions after filtering.")
return {}
# Rank sectors by average predicted return
sector_avg_scores = {}
for sector, stocks in sector_data.items():
if stocks: # Ensure sector has stocks
sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks])
else:
sector_avg_scores[sector] = -np.inf # Penalize empty sectors
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
# --- Reduce Sector Count ---
top_sector_count = 4 if portfolio_risk_score > 60 else 3 # Reduced from 5/4
# ---
top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))]
# --- Logging ---
log_func(f"Ranked Sectors: {ranked_sectors}")
log_func(f"Top Sectors Selected ({top_sector_count}): {top_sectors}")
# --- Logging End ---
# --- Reduce Stocks Per Sector ---
stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2 # Reduced from 4/3
# ---
# Allocate within top sectors
selected_stocks_for_positioning = []
for sector in top_sectors:
if sector not in sector_data: continue # Skip if sector somehow has no data
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
top_stocks_in_sector = sector_stocks[:min(stocks_per_sector, len(sector_stocks))]
selected_stocks_for_positioning.extend(top_stocks_in_sector)
# --- Logging ---
log_func(f"Sector '{sector}': Top stocks {[s['ticker'] for s in top_stocks_in_sector]} with scores {[f'{s:.3f}' for s in [st['pred_return'] for st in top_stocks_in_sector]]}")
# --- Logging End ---
# --- Log count before filtering ---
log_func(f"Selected {len(selected_stocks_for_positioning)} stocks across top sectors before size filtering.")
# ---
# Calculate position sizes for selected stocks
log_func(f"Calculating positions for selected stocks.") # Log count
for stock in selected_stocks_for_positioning:
ticker = stock["ticker"]
# Use pred_return directly for signal strength with fallback scores
signal_strength = stock["pred_return"]
# --- Adjust Base Size Calculation & Filter (Less Aggressive) ---
# Decreased multiplier
# Kept max base size high (0.6) - allows concentration if signal is strong
# Increased filter threshold
base_size_multiplier = 1.5 # Decreased from 2.0
max_base_size = 0.6
min_base_size_threshold = 0.05 # Increased from 0.02
base_size = min(max_base_size, max(0.01, base_size_multiplier * signal_strength))
# --- Hysteresis Check (Optional) ---
# entry_threshold_multiplier = 1.1 # Require 10% higher base size to enter than to stay
# previously_held = ticker in self.algorithm.kqt_previous_positions and self.algorithm.kqt_previous_positions[ticker] > 0
# required_threshold = min_base_size_threshold if previously_held else min_base_size_threshold * entry_threshold_multiplier
# if base_size > required_threshold:
# --- Original Check (No Hysteresis) ---
if base_size > min_base_size_threshold: # Use the increased threshold
# ---
final_size = base_size * risk_scaling
# --- Increase minimum final size threshold ---
min_final_size = 0.04 # Increased from 0.03 to 4%
if final_size >= min_final_size:
positions[ticker] = final_size
# --- Logging ---
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size: {base_size:.3f}, Final Size: {final_size:.3f}")
# --- Logging End ---
else:
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size: {base_size:.3f}, Final Size ({final_size:.3f}) too small after risk scaling (Min: {min_final_size}), skipping.")
else:
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size ({base_size:.3f}) too small or negative (Threshold: {min_base_size_threshold}), skipping.")
# Defensive adjustments
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]:
# --- Soften Defensive Scaling ---
scaling_factor = 0.9 if self.defensive_mode else 0.99 # Increased from 0.7/0.85
# ---
log_func(f"Defensive Adjustment: Scaling positions by {scaling_factor}")
for ticker in list(positions.keys()): # Iterate over keys copy
positions[ticker] *= scaling_factor
# Use the increased min_final_size as the post-scaling check too
# --- Use the SAME min_final_size threshold after scaling ---
if positions[ticker] < min_final_size: # Check against the 4% threshold again
log_func(f" Removing {ticker} due to small size ({positions[ticker]:.4f}) after defensive scaling (Min: {min_final_size}).")
del positions[ticker]
# --- Temporarily Disable Hedges ---
# Add hedges (shorts) based on negative predictions
# if portfolio_risk_score < 40:
# negative_preds = {t: data["pred_return"] for t, data in prediction_data.items()
# if "pred_return" in data and data["pred_return"] < -0.05 and t not in positions} # Threshold for shorting
#
# if negative_preds:
# worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2]
# log_func(f"Defensive Adjustment: Adding Hedges for {worst_stocks}")
# for ticker, pred in worst_stocks:
# hedge_size = -0.15 if self.defensive_mode else -0.1
# positions[ticker] = hedge_size
# log_func(f" Adding hedge {ticker} with size {hedge_size}")
# ---
# --- Logging Final ---
log_func(f"Final positions generated ({len(positions)}): {positions}")
log_func(f"--- generate_positions END ---")
# --- Logging End ---
return positions
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
if self.defensive_mode:
return -2.0 # Tighter in defensive mode
else:
return -3.5 # More room for positions to breathe
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
return -1.5 # Tighter stop-loss in bearish regimes
else:
if self.defensive_mode:
return -1.8
else:
return -2.5
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
if len(self.portfolio_returns) > 60: # Keep a rolling window
self.portfolio_returns = self.portfolio_returns[-60:]