| Overall Statistics |
|
Total Orders 10579 Average Win 0.17% Average Loss -0.14% Compounding Annual Return 18.519% Drawdown 17.300% Expectancy 0.152 Start Equity 1000000 End Equity 3012421.8 Net Profit 201.242% Sharpe Ratio 0.823 Sortino Ratio 1.026 Probabilistic Sharpe Ratio 48.451% Loss Rate 48% Win Rate 52% Profit-Loss Ratio 1.22 Alpha 0.08 Beta 0.252 Annual Standard Deviation 0.127 Annual Variance 0.016 Information Ratio 0.033 Tracking Error 0.174 Treynor Ratio 0.415 Total Fees $77340.45 Estimated Strategy Capacity $0 Lowest Capacity Asset WMT R735QTJ8XC9X Portfolio Turnover 32.72% |
# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta, datetime
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from strategy import Strategy
def calculate_ema(prices, span):
return pd.Series(prices).ewm(span=span, adjust=False).mean().values
def calculate_rsi(prices, period=14):
deltas = np.diff(prices)
seed = deltas[:period+1]
up = seed[seed >= 0].sum()/period
down = -seed[seed < 0].sum()/period
if down == 0: return 100
rs = up/down
rsi = np.zeros_like(prices)
rsi[:period] = 100. - 100./(1. + rs)
for i in range(period, len(prices)):
delta = deltas[i-1]
if delta > 0:
upval = delta
downval = 0.
else:
upval = 0.
downval = -delta
up = (up * (period-1) + upval) / period
down = (down * (period-1) + downval) / period
rs = up/down if down != 0 else float('inf')
rsi[i] = 100. - 100./(1. + rs)
return rsi
def calculate_macd(prices, fast=12, slow=26, signal=9):
prices = np.array(prices)
ema_fast = pd.Series(prices).ewm(span=fast, adjust=False).mean().values
ema_slow = pd.Series(prices).ewm(span=slow, adjust=False).mean().values
macd_line = ema_fast - ema_slow
signal_line = pd.Series(macd_line).ewm(span=signal, adjust=False).mean().values
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def calculate_atr(high, low, close, period=14):
if len(high) != len(low) or len(high) != len(close):
raise ValueError("Input arrays must have the same length")
tr = np.zeros(len(high))
tr[0] = high[0] - low[0]
for i in range(1, len(tr)):
tr[i] = max(
high[i] - low[i],
abs(high[i] - close[i-1]),
abs(low[i] - close[i-1])
)
atr = np.zeros_like(tr)
atr[0] = tr[0]
for i in range(1, len(atr)):
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period
return atr
class Algorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2025, 12, 31)
self.SetCash(1000000)
self.previous_portfolio_value = 0
self.current_strategy_mode = "KQT"
self.vix = self.AddIndex("VIX", Resolution.Daily).Symbol
self.vix_threshold = 30
self.SetBenchmark("SPY")
self.strategy = Strategy()
self.kqt_lookback = 60
self.kqt_tickers = []
self.kqt_symbols = {}
self.kqt_sector_mappings = {}
self.strategy.sector_mappings = self.kqt_sector_mappings
self.kqt_stock_data = {}
self.kqt_current_predictions = {}
self.kqt_previous_positions = {}
self.kqt_stopped_out = set()
self.rc_spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.rc_bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.rc_selected_by_market_cap = []
self.rc_rebalance_flag = False
self.rc_spy_30day_window = RollingWindow[float](30)
self.rc_entry_prices = {}
self.rc_previous_bil_allocation = 0.0
self.rc_trend_lookback = 10
self.rc_spy_prices = {}
self.rc_max_spy_history = 60
self.rc_stop_loss_base = 0.04
self.rc_dynamic_stop_weight = 0.5
self.rc_atr_period = 14
self.rc_atr = {}
self.rc_defensive_positions = set()
self.rc_last_defensive_update = datetime(1900, 1, 1)
self.rc_last_rebalance_date = datetime(1900, 1, 1)
# --- ADD: Define portfolio fractions ---
self.rc_portfolio_fraction = 0.75
self.kqt_portfolio_fraction_in_rc = 1.0 - self.rc_portfolio_fraction # 0.25
# ---
# Modified tolerances to reduce trading frequency
self.kqt_rebalance_tolerance = 0.02 # Increased from 0.01 to 2%
self.rc_rebalance_tolerance = 0.02 # Increased from 0.01 to 2%
# Add cooldown for mode switching
self.last_mode_switch_time = None
self.min_days_between_switches = 5
self.rc_sh = self.AddEquity("SH", Resolution.Daily).Symbol
self.rc_psq = self.AddEquity("PSQ", Resolution.Daily).Symbol
self.rc_dog = self.AddEquity("DOG", Resolution.Daily).Symbol
self.rc_rwm = self.AddEquity("RWM", Resolution.Daily).Symbol
self.rc_eum = self.AddEquity("EUM", Resolution.Daily).Symbol
self.rc_myd = self.AddEquity("MYY", Resolution.Daily).Symbol
self.rc_gld = self.AddEquity("GLD", Resolution.Daily).Symbol
self.rc_ief = self.AddEquity("IEF", Resolution.Daily).Symbol
self.rc_bnd = self.AddEquity("BND", Resolution.Daily).Symbol
self.rc_xlp = self.AddEquity("XLP", Resolution.Daily).Symbol
self.rc_xlu = self.AddEquity("XLU", Resolution.Daily).Symbol
self.rc_xlv = self.AddEquity("XLV", Resolution.Daily).Symbol
self.rc_vht = self.AddEquity("VHT", Resolution.Daily).Symbol
self.rc_vdc = self.AddEquity("VDC", Resolution.Daily).Symbol
self.rc_inverse_etfs = [self.rc_sh, self.rc_psq, self.rc_dog, self.rc_rwm, self.rc_eum, self.rc_myd]
self.rc_alternative_defensive = [self.rc_gld, self.rc_ief, self.rc_bnd]
self.rc_sector_defensive = [self.rc_xlp, self.rc_xlu, self.rc_xlv, self.rc_vht, self.rc_vdc]
# --- ADDED: Define rc_all_defensive before using it ---
self.rc_all_defensive = self.rc_inverse_etfs + self.rc_alternative_defensive + self.rc_sector_defensive
self.rc_all_defensive_set = set(self.rc_all_defensive)
symbols_for_atr = list(self.rc_all_defensive_set - {self.rc_bil}) + [self.rc_spy] # Exclude BIL
for symbol in symbols_for_atr:
# Check if symbol exists before registering ATR
if self.Securities.ContainsKey(symbol):
self.rc_atr[symbol] = self.ATR(symbol, self.rc_atr_period, Resolution.Daily)
else:
self.Log(f"Warning: Symbol {symbol} not found during ATR registration.")
self.rc_diagnostic_mode = True
for symbol in self.rc_all_defensive + [self.rc_bil, self.rc_spy]:
self.rc_atr[symbol] = self.ATR(symbol, self.rc_atr_period, Resolution.Daily)
self.UniverseSettings.Resolution = Resolution.Daily
self._universe = self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.TradeExecute)
self.Schedule.On(self.DateRules.MonthStart(self.rc_spy), self.TimeRules.AfterMarketOpen(self.rc_spy, 30), self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.rc_spy, DayOfWeek.Wednesday), self.TimeRules.AfterMarketOpen(self.rc_spy, 30), self.MonthlyRebalance)
self.Schedule.On(self.DateRules.WeekStart(self.rc_spy, DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.rc_spy, 60), self.WeeklyDefensiveAdjustment)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.vix, 5), self.CheckVixAndManageState)
self.spy = self.rc_spy
self.spy_sma50 = self.SMA(self.spy, 50, Resolution.Daily)
self.spy_sma200 = self.SMA(self.spy, 200, Resolution.Daily)
# Add EMA10 for cash replacement logic
self.spy_ema10 = self.EMA(self.spy, 10, Resolution.Daily)
# Warmup needs to account for EMA10
warmup_period_days = max(205, 15) # Calculate the required warmup days
self.SetWarmUp(warmup_period_days, Resolution.Daily) # Ensure warmup covers longest indicator + buffer
# Corrected: Use the calculated integer value directly for History request
history_spy = self.History(self.spy, warmup_period_days, Resolution.Daily) # Use integer value
if not history_spy.empty:
# Ensure 'close' column exists
if 'close' in history_spy.columns and self.spy in history_spy.index.levels[0]: # Check symbol exists in index
for time, row in history_spy.loc[self.spy].iterrows():
close_price = row["close"]
# Check if RollingWindow exists before adding
if hasattr(self, 'rc_spy_30day_window'):
self.rc_spy_30day_window.Add(close_price)
self.spy_sma50.Update(time, close_price)
self.spy_sma200.Update(time, close_price)
self.spy_ema10.Update(time, close_price) # Warm up EMA10
else:
self.Log("Warning: 'close' column not found in SPY history during warmup.")
self.TryLoadModelWeights()
def CheckVixAndManageState(self):
if not self.Securities.ContainsKey(self.vix) or not self.Securities[self.vix].HasData or \
not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].HasData or \
not self.spy_sma50.IsReady or not self.spy_sma200.IsReady:
self.Log("Data not ready for state check (VIX, SPY, or SMAs).")
return
vix_value = self.Securities[self.vix].Price
spy_price = self.Securities[self.spy].Price
sma50_value = self.spy_sma50.Current.Value
sma200_value = self.spy_sma200.Current.Value
is_bearish_trend = spy_price < sma200_value or sma50_value < sma200_value
is_bullish_trend = spy_price > sma200_value and sma50_value > sma200_value
is_vix_high = vix_value > self.vix_threshold
is_vix_low = vix_value < 20
if self.current_strategy_mode == "KQT":
if is_vix_high and is_bearish_trend:
if self.last_mode_switch_time is None or (self.Time - self.last_mode_switch_time).days >= self.min_days_between_switches:
self.Log(f"Conditions met to ENTER RiskControl: VIX {vix_value:.2f} > {self.vix_threshold} AND Bearish Trend (SPY {spy_price:.2f} vs SMA200 {sma200_value:.2f}, SMA50 {sma50_value:.2f} vs SMA200 {sma200_value:.2f}).")
self.EnterRiskControlMode()
self.last_mode_switch_time = self.Time
else:
self.Log(f"Conditions met but too soon since last switch ({(self.Time - self.last_mode_switch_time).days} days).")
elif self.current_strategy_mode == "RiskControl":
if is_vix_low and is_bullish_trend:
if self.last_mode_switch_time is None or (self.Time - self.last_mode_switch_time).days >= self.min_days_between_switches:
self.Log(f"Conditions met to EXIT RiskControl: VIX {vix_value:.2f} < 20 AND Bullish Trend (SPY {spy_price:.2f} > SMA200 {sma200_value:.2f}, SMA50 {sma50_value:.2f} > SMA200 {sma200_value:.2f}).")
self.ExitRiskControlMode()
self.last_mode_switch_time = self.Time
else:
self.Log(f"Conditions met but too soon since last switch ({(self.Time - self.last_mode_switch_time).days} days).")
# --- Modify Coarse/Fine Selection to remove BIL ---
def CoarseSelectionFunction(self, coarse):
if self.current_strategy_mode == "KQT":
# ... (KQT logic unchanged) ...
sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
# Filter for minimum price and fundamental data availability
filtered = [x.Symbol for x in sorted_by_dollar_volume if x.Price > 5 and x.HasFundamentalData][:500]
return filtered
elif self.current_strategy_mode == "RiskControl":
filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA]
symbols = [x.Symbol for x in filtered]
# Add all defensive ETFs EXCEPT BIL
symbols.extend(list(self.rc_all_defensive_set - {self.rc_bil}))
# Add SPY, SH, IEF explicitly
symbols.extend([self.rc_spy, self.rc_sh, self.rc_ief])
return list(set(symbols)) # Use set to remove duplicates
else:
return []
def FineSelectionFunction(self, fine):
# --- ALWAYS Update KQT Symbols/Mappings regardless of mode ---
# Filter fine data for potential KQT stocks (e.g., based on market cap, sector, etc. if needed)
# For now, assume 'fine' contains potential KQT candidates based on Coarse selection
kqt_candidates = sorted([f for f in fine if f.HasFundamentalData], key=lambda x: x.MarketCap, reverse=True) # Example: Sort all by market cap
kqt_selected = kqt_candidates[:100] # Take top 100 as potential KQT universe
# Clear and repopulate KQT dictionaries
self.kqt_tickers = []
self.kqt_symbols = {}
self.kqt_sector_mappings = {} # Reset sector mappings
for f in kqt_selected:
ticker = f.Symbol.Value
self.kqt_tickers.append(ticker)
self.kqt_symbols[ticker] = f.Symbol
sector = "Unknown"
try:
if f.AssetClassification is not None:
sector = str(f.AssetClassification.MorningstarSectorCode) if hasattr(f.AssetClassification, 'MorningstarSectorCode') else "Unknown"
except Exception as e:
self.Debug(f"FineSelection - Error getting sector for {ticker}: {str(e)}")
self.kqt_sector_mappings[ticker] = sector
# Update strategy's view of sector mappings
self.strategy.sector_mappings = self.kqt_sector_mappings
# --- End KQT Update ---
if self.current_strategy_mode == "KQT":
# Return only the KQT symbols when in KQT mode
self.Log(f"FineSelection (KQT Mode): Returning {len(self.kqt_symbols)} KQT symbols.")
return list(self.kqt_symbols.values())
elif self.current_strategy_mode == "RiskControl":
# Select RC equity symbols
equity_fine = [x for x in fine if x.SecurityReference.SecurityType == "ST00000001" and x.MarketCap > 1e10]
sorted_by_cap = sorted(equity_fine, key=lambda x: x.MarketCap, reverse=True)[:30]
self.rc_selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
rc_equity_symbols = [x.Symbol for x in sorted_by_cap]
# Combine RC equity, RC defensive, and KQT symbols for the universe in RC mode
symbols = set(rc_equity_symbols)
symbols.update(list(self.rc_all_defensive_set - {self.rc_bil}))
symbols.update([self.rc_spy, self.rc_sh, self.rc_ief])
symbols.update(list(self.kqt_symbols.values())) # Add KQT symbols
self.Log(f"FineSelection (RC Mode): Returning {len(symbols)} combined symbols (RC Equity: {len(rc_equity_symbols)}, RC Def: {len(self.rc_all_defensive_set)}, KQT: {len(self.kqt_symbols)}).")
return list(symbols) # Use set to remove duplicates
else:
return []
# --- Modify OnSecuritiesChanged to handle BIL removal ---
def OnSecuritiesChanged(self, changes):
self.Log(f"OnSecuritiesChanged ({self.current_strategy_mode} mode): Added {len(changes.AddedSecurities)}, Removed {len(changes.RemovedSecurities)}")
for removed in changes.RemovedSecurities:
ticker = removed.Symbol.Value
# KQT Cleanup
if ticker in self.kqt_tickers: self.kqt_tickers.remove(ticker)
if ticker in self.kqt_symbols: del self.kqt_symbols[ticker]
if ticker in self.kqt_sector_mappings: del self.kqt_sector_mappings[ticker]
if ticker in self.kqt_stock_data: del self.kqt_stock_data[ticker]
# RC ATR Cleanup (excluding BIL)
if removed.Symbol in self.rc_atr and removed.Symbol != self.rc_bil:
try:
self.DeregisterIndicator(self.rc_atr[removed.Symbol])
del self.rc_atr[removed.Symbol]
except Exception as e:
self.Log(f"Error removing ATR for {removed.Symbol.Value}: {e}")
# Liquidate if invested (including BIL if it's removed)
if self.Portfolio[removed.Symbol].Invested:
self.Log(f"Liquidating {removed.Symbol.Value} due to removal from universe.")
self.Liquidate(removed.Symbol)
# Add ATR for new RC securities (excluding BIL)
if self.current_strategy_mode == "RiskControl":
symbols_for_atr = list(self.rc_all_defensive_set - {self.rc_bil}) + [self.rc_spy]
for added in changes.AddedSecurities:
if added.Symbol in symbols_for_atr and added.Symbol not in self.rc_atr:
try:
self.rc_atr[added.Symbol] = self.ATR(added.Symbol, self.rc_atr_period, Resolution.Daily)
# Optional Warmup
# ... (warmup logic as before) ...
except Exception as e:
self.Log(f"Error adding ATR for {added.Symbol.Value}: {e}")
# --- Modify EnterRiskControlMode ---
def EnterRiskControlMode(self):
self.Log("Transitioning to RiskControl: Liquidating non-essential KQT assets.")
liquidated_count = 0
# Symbols KQT might manage (we won't liquidate these yet)
kqt_symbols_potential = set(self.kqt_symbols.values())
# Symbols RC will manage (GLD + others later) + Cash Replacements
rc_symbols_to_keep_initially = {self.rc_gld, self.rc_sh, self.rc_ief} # Start with GLD, SH, IEF
for holding in self.Portfolio.Values:
# Liquidate if invested AND it's NOT GLD/SH/IEF AND it's NOT a potential KQT symbol
if holding.Invested and holding.Symbol not in rc_symbols_to_keep_initially and holding.Symbol not in kqt_symbols_potential:
self.Log(f" Liquidating {holding.Symbol.Value} (Non-RC, Non-KQT asset).")
self.Liquidate(holding.Symbol)
liquidated_count += 1
self.Log(f"Liquidated {liquidated_count} non-essential assets.")
# Clear KQT state BUT keep symbols/tickers/mappings for the 25% execution
self.kqt_current_predictions = {}
self.kqt_previous_positions = {} # Reset previous positions for KQT scaling
self.kqt_stopped_out.clear()
self.current_strategy_mode = "RiskControl"
# Universe selection needs to cover both RC and KQT assets now
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
# --- CHANGE: Allocate only 75% to GLD initially ---
self.Log(f"Setting initial RiskControl position to {self.rc_portfolio_fraction*100}% GLD.")
# SetHoldings clears others, so we set GLD first. KQT will adjust its 25% portion.
self.SetHoldings(self.rc_gld, self.rc_portfolio_fraction)
self.rc_last_rebalance_date = self.Time
# KQT logic will run in TradeExecute to manage its 25%
# --- Modify ExitRiskControlMode ---
def ExitRiskControlMode(self):
self.Log("Transitioning back to KQT: Liquidating RiskControl assets.")
liquidated_count = 0
# Define all symbols potentially managed by RC (excluding KQT universe)
rc_managed_symbols = (self.rc_all_defensive_set - {self.rc_bil}) | {self.rc_sh, self.rc_ief, self.rc_gld} # Add GLD
# Also include RC equity selections if any
rc_equity_symbols = {s for s, _ in self.rc_selected_by_market_cap}
rc_symbols_to_liquidate = (rc_managed_symbols | rc_equity_symbols) - set(self.kqt_symbols.values()) # Don't liquidate KQT assets
for holding in self.Portfolio.Values:
if holding.Invested and holding.Symbol in rc_symbols_to_liquidate:
self.Log(f" Liquidating {holding.Symbol.Value} (RC asset).")
self.Liquidate(holding.Symbol)
liquidated_count += 1
self.Log(f"Liquidated {liquidated_count} RiskControl assets.")
# Reset RC state variables
self.rc_selected_by_market_cap = []
self.rc_rebalance_flag = False
self.rc_entry_prices = {}
self.rc_defensive_positions.clear()
self.current_strategy_mode = "KQT"
# KQT universe selection will take over
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
# KQT logic will run in TradeExecute managing 100%
# --- Modify OnData ---
def OnData(self, data):
# Update indicators
if data.Bars.ContainsKey(self.spy):
spy_close = data.Bars[self.spy].Close
if hasattr(self, 'rc_spy_30day_window'): self.rc_spy_30day_window.Add(spy_close)
# Only update SMAs/EMAs if they exist
if hasattr(self, 'spy_sma50'): self.spy_sma50.Update(self.Time, spy_close)
if hasattr(self, 'spy_sma200'): self.spy_sma200.Update(self.Time, spy_close)
if hasattr(self, 'spy_ema10'): self.spy_ema10.Update(self.Time, spy_close) # Update EMA10
if hasattr(self, 'rc_spy_prices'):
self.rc_spy_prices[self.Time.date()] = spy_close
dates_to_remove = [date for date in self.rc_spy_prices if (self.Time.date() - date).days > self.rc_max_spy_history]
for date in dates_to_remove: self.rc_spy_prices.pop(date)
# RC Stop Loss (No longer needs to exclude BIL as it's not held)
if self.current_strategy_mode == "RiskControl":
stop_loss_triggered = False
market_trend = self._rc_calculateMarketTrend()
symbols_to_ignore_stoploss = {self.rc_sh, self.rc_ief} # Keep ignoring cash replacements
kqt_managed_symbols = set(self.kqt_symbols.values()) # Get current KQT symbols
for symbol in list(self.Portfolio.Keys): # Use list copy
if symbol not in self.Portfolio or not self.Portfolio[symbol].Invested: continue # Skip if not invested
holding = self.Portfolio[symbol]
# --- Apply RC Stop Loss ONLY to RC-managed assets ---
# Check if it's NOT a KQT symbol AND NOT an ignored symbol (SH/IEF)
if symbol not in kqt_managed_symbols and symbol not in symbols_to_ignore_stoploss:
# --- Existing RC Stop Loss Logic ---
current_price = self.Securities[symbol].Price if self.Securities.ContainsKey(symbol) else 0
if current_price <= 0: continue
if symbol not in self.rc_entry_prices:
if holding.AveragePrice > 0:
self.rc_entry_prices[symbol] = holding.AveragePrice
self.Log(f"RC Stop-Loss Warning: Missing entry price for {symbol}. Using average price {holding.AveragePrice}.")
else:
self.Log(f"RC Stop-Loss Warning: Cannot check stop-loss for {symbol}. Missing entry price and invalid average price.")
continue
entry_price = self.rc_entry_prices[symbol]
if entry_price <= 0: continue
price_drop = (entry_price - current_price) / entry_price
stop_threshold = self.rc_stop_loss_base
if market_trend < -0.03: stop_threshold *= 0.9
elif market_trend > 0.03: stop_threshold *= 1.1
if symbol in self.rc_atr and self.rc_atr[symbol].IsReady:
current_atr = self.rc_atr[symbol].Current.Value
atr_pct = current_atr / current_price if current_price > 0 else 0
effective_weight = self.rc_dynamic_stop_weight
if atr_pct > stop_threshold * 1.2: effective_weight = min(self.rc_dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold + effective_weight * atr_pct)
if price_drop >= stop_threshold:
self.Log(f"RiskControl Stop-loss triggered for RC asset {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
self.Liquidate(symbol, "RiskControl Stop Loss")
stop_loss_triggered = True
if symbol in self.rc_entry_prices: del self.rc_entry_prices[symbol]
# --- End Existing RC Stop Loss Logic ---
# --- TradeExecute remains the same ---
def TradeExecute(self):
if not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].Exchange.ExchangeOpen:
return
self.Log(f"TradeExecute running in {self.current_strategy_mode} mode at {self.Time}")
# --- ALWAYS Run KQT Update/Prediction ---
# Ensure KQT symbols are available from FineSelection before updating/predicting
if not self.kqt_symbols:
self.Log("KQT: No symbols available, skipping Update/Prediction.")
else:
self.Log(f"KQT: Current universe size: {len(self.kqt_tickers)}")
self.UpdateKQTHistoricalData() # Update data regardless of mode
self.kqt_current_predictions = self.GenerateKQTPredictions() # Generate predictions regardless of mode
if self.current_strategy_mode == "KQT":
# --- KQT Mode: Manage 100% ---
self.ProcessKQTStopLosses()
market_returns = self.GetMarketReturns()
target_positions = self.strategy.generate_positions(self.kqt_current_predictions, market_returns, algorithm=self)
self.ExecuteKQTTrades(target_positions, target_portfolio_fraction=1.0)
daily_return = self.CalculatePortfolioReturn()
self.strategy.update_portfolio_returns(daily_return)
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
elif self.current_strategy_mode == "RiskControl":
# --- RC Mode: KQT Manages 25%, RC Manages 75% via Schedule ---
self.Log("RiskControl mode active. Running KQT logic for its fraction.")
# KQT Stop Losses (apply only to KQT managed symbols)
self.ProcessKQTStopLosses(managed_symbols=set(self.kqt_symbols.values()))
market_returns = self.GetMarketReturns() # Needed for strategy
# Generate KQT positions (strategy doesn't need scaling info)
target_positions = self.strategy.generate_positions(self.kqt_current_predictions, market_returns, algorithm=self)
# Execute KQT trades, scaled to its fraction
self.ExecuteKQTTrades(target_positions, target_portfolio_fraction=self.kqt_portfolio_fraction_in_rc)
# RC logic (MonthlyRebalance, WeeklyDefensiveAdjustment) runs on schedule
daily_return = self.CalculatePortfolioReturn()
self.strategy.update_portfolio_returns(daily_return) # Update strategy state
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
def UpdateKQTHistoricalData(self):
self.Log(f"KQT: Updating history for {len(self.kqt_tickers)} tickers.")
active_tickers = list(self.kqt_symbols.keys())
if not active_tickers:
self.kqt_stock_data = {}
return
symbols_to_request = [self.kqt_symbols[ticker] for ticker in active_tickers]
history = self.History(symbols_to_request, self.kqt_lookback + 5, Resolution.Daily)
if history.empty:
self.kqt_stock_data = {}
return
history = history.reset_index()
new_stock_data = {}
for ticker in active_tickers:
symbol_obj = self.kqt_symbols[ticker]
symbol_history = history[history['symbol'] == symbol_obj]
# Store only the closes as a numpy array
closes = symbol_history['close'].values if not symbol_history.empty else np.array([])
if len(closes) >= self.kqt_lookback:
new_stock_data[ticker] = closes
self.kqt_stock_data = new_stock_data
self.Log(f"KQT: Updated history for {len(self.kqt_stock_data)} tickers.")
def GenerateKQTPredictions(self):
predictions = {}
self.Log(f"KQT: Generating fallback predictions for {len(self.kqt_stock_data)} stocks.")
for ticker, closes in self.kqt_stock_data.items():
if ticker not in self.kqt_symbols: continue
try:
if len(closes) > 20:
short_ma = np.mean(closes[-5:])
long_ma = np.mean(closes[-20:])
momentum = closes[-1] / closes[-10] - 1 if len(closes) > 10 else 0
pred_score = momentum + 0.5 * (short_ma/long_ma - 1) if long_ma != 0 else momentum
pred_return = pred_score * 2
threshold = 0.1
predictions[ticker] = {
"pred_return": pred_return,
"composite_score": pred_return / threshold if threshold != 0 else pred_return
}
except Exception as e:
self.Log(f"KQT Error processing {ticker} in GenerateKQTPredictions: {str(e)}")
continue
self.Log(f"KQT: Generated {len(predictions)} predictions.")
return predictions
# --- Modify ProcessKQTStopLosses ---
def ProcessKQTStopLosses(self, managed_symbols=None): # Add optional filter
stop_loss_level = self.strategy.get_stop_loss_level()
self.kqt_stopped_out.clear()
# Determine which symbols to check
symbols_to_check = list(self.kqt_symbols.keys()) if managed_symbols is None else \
[ticker for ticker, symbol in self.kqt_symbols.items() if symbol in managed_symbols]
for ticker in symbols_to_check:
symbol = self.kqt_symbols[ticker]
if not self.Portfolio[symbol].Invested: continue
position = self.Portfolio[symbol]
history = self.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2: continue
close_prices = history.loc[symbol]['close'] if symbol in history.index else pd.Series()
if len(close_prices) < 2: continue
daily_return = (close_prices.iloc[-1] / close_prices.iloc[-2] - 1) * 100
position_type = "long" if position.Quantity > 0 else "short"
hit_stop = False
if position_type == "long" and daily_return < stop_loss_level:
hit_stop = True
self.Log(f"KQT Stop loss triggered for {ticker} (long): {daily_return:.2f}% < {stop_loss_level:.2f}%")
elif position_type == "short" and daily_return > abs(stop_loss_level):
hit_stop = True
self.Log(f"KQT Stop loss triggered for {ticker} (short): {daily_return:.2f}% > {abs(stop_loss_level):.2f}%")
if hit_stop:
self.kqt_stopped_out.add(ticker)
self.Liquidate(symbol, f"KQT Stop Loss {daily_return:.2f}%")
# --- Modify ExecuteKQTTrades ---
def ExecuteKQTTrades(self, target_positions, target_portfolio_fraction=1.0): # Add fraction parameter
mode_prefix = "KQT" if target_portfolio_fraction == 1.0 else f"KQT({target_portfolio_fraction*100:.0f}%)"
self.Log(f"--- {mode_prefix} ExecuteTrades START ---")
total_portfolio_value = self.Portfolio.TotalPortfolioValue
kqt_effective_portfolio_value = total_portfolio_value * target_portfolio_fraction
if kqt_effective_portfolio_value <= 0:
self.Log(f"{mode_prefix} ExecuteTrades: Zero or negative effective portfolio value ({kqt_effective_portfolio_value:.2f}). Cannot execute trades.")
return
final_targets = []
processed_symbols = set()
kqt_managed_symbols = set(self.kqt_symbols.values())
min_exec_weight = 0.035 # Relative to KQT fraction
# --- ADJUSTED: Use lower tolerance when KQT fraction is smaller ---
current_rebalance_tolerance = self.kqt_rebalance_tolerance
if target_portfolio_fraction < 1.0:
current_rebalance_tolerance = self.kqt_rebalance_tolerance * 0.5 # e.g., 0.01 instead of 0.02
self.Log(f"{mode_prefix}: Using reduced rebalance tolerance: {current_rebalance_tolerance}")
# ---
# Calculate initial allocation relative to the KQT fraction
initial_total_allocation_raw = sum(abs(weight) for weight in target_positions.values())
max_allowed_allocation_raw = 0.99
scaling_factor_raw = 1.0
if initial_total_allocation_raw > max_allowed_allocation_raw:
scaling_factor_raw = max_allowed_allocation_raw / initial_total_allocation_raw
rc_symbols_to_preserve = (self.rc_all_defensive_set - {self.rc_bil}) | {self.rc_sh, self.rc_ief, self.rc_gld}
if hasattr(self, 'rc_selected_by_market_cap'):
rc_symbols_to_preserve.update({s for s, _ in self.rc_selected_by_market_cap})
for ticker, raw_target_weight in target_positions.items():
scaled_raw_target_weight = raw_target_weight * scaling_factor_raw
target_weight_overall = scaled_raw_target_weight * target_portfolio_fraction
if ticker in self.kqt_stopped_out:
if ticker in self.kqt_symbols: processed_symbols.add(self.kqt_symbols[ticker])
continue
if ticker not in self.kqt_symbols: continue
symbol = self.kqt_symbols[ticker]
processed_symbols.add(symbol)
if symbol in rc_symbols_to_preserve: continue
try:
target_weight_float = float(target_weight_overall)
if not np.isfinite(target_weight_float): raise ValueError("Non-finite weight")
except ValueError: continue
current_holding = self.Portfolio[symbol]
current_weight_overall = current_holding.HoldingsValue / total_portfolio_value if total_portfolio_value > 0 and current_holding.Invested else 0.0
weight_difference = abs(target_weight_float - current_weight_overall)
is_significant_target_raw = abs(scaled_raw_target_weight) >= min_exec_weight
is_currently_invested = current_holding.Invested
if is_significant_target_raw:
# --- Use adjusted tolerance ---
if not is_currently_invested or weight_difference > current_rebalance_tolerance:
self.Log(f"{mode_prefix}: Setting overall target for {ticker} to {target_weight_float:.4f} (Current: {current_weight_overall:.4f}, Diff: {weight_difference:.4f}, Tol: {current_rebalance_tolerance})")
final_targets.append(PortfolioTarget(symbol, target_weight_float))
else:
self.Log(f"{mode_prefix}: Skipping overall target for {ticker} ({target_weight_float:.4f}), change vs current ({current_weight_overall:.4f}) within tolerance.")
elif is_currently_invested:
# --- Use adjusted tolerance for liquidation ---
if weight_difference > current_rebalance_tolerance:
self.Log(f"{mode_prefix}: Liquidating {ticker} (Current: {current_weight_overall:.4f}) due to near-zero target ({target_weight_float:.4f}) and diff > tol {current_rebalance_tolerance}.")
final_targets.append(PortfolioTarget(symbol, 0))
else:
self.Log(f"{mode_prefix}: Skipping liquidation for {ticker} ({target_weight_float:.4f}), change vs current ({current_weight_overall:.4f}) within tolerance.")
# Liquidate untargeted KQT assets ONLY
for holding in self.Portfolio.Values:
if not holding.Invested: continue
symbol = holding.Symbol
ticker = symbol.Value
if symbol in rc_symbols_to_preserve: continue
if symbol not in kqt_managed_symbols: continue
current_weight_overall = holding.HoldingsValue / total_portfolio_value
# --- Use adjusted tolerance for stopped-out liquidation ---
if ticker in self.kqt_stopped_out and symbol not in [t.Symbol for t in final_targets if t.Quantity == 0]:
if abs(0 - current_weight_overall) > current_rebalance_tolerance:
self.Log(f"{mode_prefix}: Adding liquidation target for stopped-out {ticker} (Current: {current_weight_overall:.4f}, Tol: {current_rebalance_tolerance})")
final_targets.append(PortfolioTarget(symbol, 0))
continue
# --- Use adjusted tolerance for untargeted liquidation ---
if symbol not in processed_symbols and symbol not in [t.Symbol for t in final_targets]:
if abs(0 - current_weight_overall) > current_rebalance_tolerance:
self.Log(f"{mode_prefix}: Adding liquidation target for untargeted KQT holding {ticker} (Current: {current_weight_overall:.4f}, Tol: {current_rebalance_tolerance})")
final_targets.append(PortfolioTarget(symbol, 0))
else:
self.Log(f"{mode_prefix}: Skipping liquidation for untargeted KQT {ticker}, current weight ({current_weight_overall:.4f}) within tolerance from zero.")
if final_targets:
self.Log(f"{mode_prefix}: Submitting {len(final_targets)} targets to SetHoldings after tolerance check.")
self.SetHoldings(final_targets)
else:
self.Log(f"{mode_prefix}: No KQT targets needed after tolerance check.")
self.Log(f"--- {mode_prefix} ExecuteTrades END ---")
self.kqt_previous_positions = target_positions
def GetMarketReturns(self):
spy_history = self.History(self.spy, 15, Resolution.Daily)
if spy_history.empty or len(spy_history) < 2: return []
spy_prices = spy_history.loc[self.spy]['close'] if self.spy in spy_history.index else pd.Series()
if len(spy_prices) < 2: return []
spy_returns = spy_prices.pct_change().dropna() * 100
return spy_returns.tolist()[-10:]
def CalculatePortfolioReturn(self):
current_value = self.Portfolio.TotalPortfolioValue
if self.previous_portfolio_value > 0:
return (current_value / self.previous_portfolio_value - 1) * 100
return 0
def TryLoadModelWeights(self):
try:
if self.ObjectStore.ContainsKey("kqt_model_weights"):
self.Debug("Found model weights in ObjectStore, loading...")
encoded_bytes = self.ObjectStore.Read("kqt_model_weights")
import base64
model_bytes = base64.b64decode(encoded_bytes)
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix='.pth') as temp:
temp_path = temp.name
temp.write(model_bytes)
state_dict = torch.load(temp_path)
input_shape = state_dict['embedding.0.weight'].shape
actual_input_size = input_shape[1]
self.Debug(f"Detected input size from weights: {actual_input_size}")
self.Debug("Successfully loaded model weights")
import os
os.unlink(temp_path)
else:
self.Debug("No model weights found in ObjectStore")
except Exception as e:
self.Debug(f"Error loading model weights: {str(e)}")
def SetRebalanceFlag(self):
if self.current_strategy_mode == "RiskControl":
if self.Time.weekday() == 2:
self.rc_rebalance_flag = True
self.Log("RiskControl: Set rebalance flag for Wednesday.")
# --- Modify MonthlyRebalance ---
def MonthlyRebalance(self):
if self.current_strategy_mode != "RiskControl" or not self.rc_rebalance_flag:
return
self.Log("--- RiskControl MonthlyRebalance START (Managing RC Fraction) ---")
total_portfolio_value = self.Portfolio.TotalPortfolioValue
if total_portfolio_value <= 0:
self.Log("RiskControl MonthlyRebalance: Zero or negative portfolio value.")
return
# Calculate the portfolio value this RC execution is responsible for
rc_effective_portfolio_value = total_portfolio_value * self.rc_portfolio_fraction
self.rc_rebalance_flag = False
self.rc_entry_prices.clear() # Clear RC entry prices
# Check indicators are ready
if not hasattr(self, 'rc_spy_30day_window') or self.rc_spy_30day_window.Count < 30 or not self.spy_ema10.IsReady:
self.Log("RiskControl: Waiting for enough SPY history (SMA30/EMA10) for rebalance.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.rc_spy_30day_window) / self.rc_spy_30day_window.Count
market_deviation = (spy_price / sma_30) - 1.0 if sma_30 > 0 else 0.0
market_trend = self._rc_calculateMarketTrend()
is_spy_trend_down = spy_price < self.spy_ema10.Current.Value
# --- Calculate RC Allocations (relative to the RC fraction) ---
# Max defensive potential *within the RC fraction*
max_defensive_potential_rc = 0.0
if market_deviation < -0.01 or market_trend < -0.01: max_defensive_potential_rc = 0.40 # 40% of the RC fraction
elif market_deviation < 0.01: max_defensive_potential_rc = 0.20 # 20% of the RC fraction
all_defensive_allocations_rc = self._rc_evaluateDefensiveETFs(
market_deviation, market_trend, max_defensive_potential_rc, # Max allocation relative to RC fraction
etfs_to_evaluate=list(self.rc_all_defensive_set - {self.rc_bil})
)
total_defensive_allocation_rc = sum(all_defensive_allocations_rc.values()) # Sum of weights relative to RC fraction
# Equity weight potential *within the RC fraction*
equity_weight_potential_rc = 1.0 - total_defensive_allocation_rc # 1.0 represents 100% of RC fraction
equity_weight_rc = equity_weight_potential_rc
if market_deviation < -0.03 or market_trend < -0.02: equity_weight_rc *= 0.5
elif market_deviation < 0.0 or market_trend < 0.0: equity_weight_rc *= 0.75
equity_weight_rc = max(0, equity_weight_rc)
# Cash replacement allocation *within the RC fraction*
cash_replacement_weight_rc = max(0, 1.0 - equity_weight_rc - total_defensive_allocation_rc)
self.Log(f"RC Rebalance (RC Fraction) - Market Dev: {market_deviation:.2%}, Trend: {market_trend:.2%}, SPY Trend Down: {is_spy_trend_down}")
self.Log(f"RC Final Allocation Targets (within RC {self.rc_portfolio_fraction*100}%): Equity {equity_weight_rc:.1%}, Defensive {total_defensive_allocation_rc:.1%}, CashReplace {cash_replacement_weight_rc:.1%}")
# --- Equity Weighting (within RC fraction) ---
momentum_scores = self._rc_calculateSimpleMomentum()
# Use RC selected stocks
filtered_stocks_rc = [(s, mcap) for s, mcap in self.rc_selected_by_market_cap if momentum_scores.get(s, 1.0) >= 0.9]
if not filtered_stocks_rc and equity_weight_rc > 0.01:
self.Log("RC Rebalance: No RC stocks passed momentum filter. RC Equity weight will be 0.")
equity_weight_rc = 0.0
cash_replacement_weight_rc = max(0, 1.0 - total_defensive_allocation_rc) # Recalculate within RC fraction
self.Log(f"RC Adjusted Allocation (within RC {self.rc_portfolio_fraction*100}%): Equity 0%, Defensive {total_defensive_allocation_rc:.1%}, CashReplace {cash_replacement_weight_rc:.1%}")
# --- Equity Weighting (Market Cap Weighted) ---
momentum_scores = self._rc_calculateSimpleMomentum()
filtered_stocks = [(s, mcap) for s, mcap in self.rc_selected_by_market_cap if momentum_scores.get(s, 1.0) >= 0.9] # Keep momentum filter
# If few stocks pass, adjust equity weight and increase cash replacement
if not filtered_stocks and equity_weight > 0.01:
self.Log("RC Rebalance: No stocks passed momentum filter >= 0.9. Equity weight will be 0.")
equity_weight = 0.0
# Recalculate cash replacement weight
cash_replacement_weight = max(0, 1.0 - total_defensive_allocation)
self.Log(f"RC Adjusted Allocation: Equity 0%, Defensive {total_defensive_allocation:.1%}, CashReplace {cash_replacement_weight:.1%}")
total_market_cap_rc = sum([x[1] for x in filtered_stocks_rc])
# Weights relative to the RC equity portion
equity_weights_rc_raw = {s: (mcap / total_market_cap_rc) for s, mcap in filtered_stocks_rc} if total_market_cap_rc > 0 else {}
# Scale by the target equity weight within the RC fraction
equity_weights_rc = {s: w * equity_weight_rc for s, w in equity_weights_rc_raw.items()} if equity_weight_rc > 0 else {}
# --- Build Final Targets (relative to TOTAL portfolio) ---
final_targets = []
symbols_targeted_for_rc_investment = set()
# Define RC managed symbols (used for liquidation check)
rc_equity_symbols = {s for s, _ in self.rc_selected_by_market_cap}
rc_managed_symbols = rc_equity_symbols | (self.rc_all_defensive_set - {self.rc_bil}) | {self.rc_sh, self.rc_ief, self.rc_gld} # Add GLD
# 1. RC Equity Targets
if equity_weight_rc > 0:
for symbol, target_weight_rc_frac in equity_weights_rc.items():
# Scale to overall portfolio weight
target_weight_overall = target_weight_rc_frac * self.rc_portfolio_fraction
if target_weight_overall > 0.001: # Min threshold overall
current_holding = self.Portfolio[symbol]
current_weight_overall = current_holding.HoldingsValue / total_portfolio_value if total_portfolio_value > 0 and current_holding.Invested else 0.0
weight_difference = abs(target_weight_overall - current_weight_overall)
if not current_holding.Invested or weight_difference > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Setting RC Equity overall target {symbol.Value} to {target_weight_overall:.4f} (Current: {current_weight_overall:.4f}, Diff: {weight_difference:.4f})")
final_targets.append(PortfolioTarget(symbol, target_weight_overall))
symbols_targeted_for_rc_investment.add(symbol)
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].HasData:
self.rc_entry_prices[symbol] = self.Securities[symbol].Price
else:
self.Log(f"RC Rebalance: Skipping RC Equity overall target {symbol.Value} ({target_weight_overall:.4f}), change vs current ({current_weight_overall:.4f}) within tolerance.")
if current_holding.Invested: symbols_targeted_for_rc_investment.add(symbol)
# 2. RC Defensive Targets
self.rc_defensive_positions.clear()
if total_defensive_allocation_rc > 0:
for symbol, target_weight_rc_frac in all_defensive_allocations_rc.items():
if symbol == self.rc_bil: continue
# Scale to overall portfolio weight
target_weight_overall = target_weight_rc_frac * self.rc_portfolio_fraction
if target_weight_overall > 0.001:
current_holding_def = self.Portfolio[symbol]
current_weight_overall_def = current_holding_def.HoldingsValue / total_portfolio_value if total_portfolio_value > 0 and current_holding_def.Invested else 0.0
weight_difference_def = abs(target_weight_overall - current_weight_overall_def)
if not current_holding_def.Invested or weight_difference_def > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Setting RC Defensive overall target {symbol.Value} to {target_weight_overall:.4f} (Current: {current_weight_overall_def:.4f}, Diff: {weight_difference_def:.4f})")
final_targets.append(PortfolioTarget(symbol, target_weight_overall))
symbols_targeted_for_rc_investment.add(symbol)
self.rc_defensive_positions.add(symbol)
else:
self.Log(f"RC Rebalance: Skipping RC Defensive overall target {symbol.Value} ({target_weight_overall:.4f}), change vs current ({current_weight_overall_def:.4f}) within tolerance.")
if current_holding_def.Invested:
symbols_targeted_for_rc_investment.add(symbol)
self.rc_defensive_positions.add(symbol)
# 3. RC Cash Replacement Target
if cash_replacement_weight_rc > 0.001:
target_symbol = self.rc_sh if is_spy_trend_down else self.rc_ief
other_cr_symbol = self.rc_ief if target_symbol == self.rc_sh else self.rc_sh
# Scale to overall portfolio weight
target_weight_overall = cash_replacement_weight_rc * self.rc_portfolio_fraction
current_holding_cr = self.Portfolio[target_symbol]
current_weight_overall_cr = current_holding_cr.HoldingsValue / total_portfolio_value if total_portfolio_value > 0 and current_holding_cr.Invested else 0.0
weight_difference_cr = abs(target_weight_overall - current_weight_overall_cr)
other_holding = self.Portfolio[other_cr_symbol]
if not current_holding_cr.Invested or weight_difference_cr > self.rc_rebalance_tolerance or other_holding.Invested:
self.Log(f"RC Rebalance: Setting RC CashReplace overall target {target_symbol.Value} to {target_weight_overall:.4f} (Current: {current_weight_overall_cr:.4f}, Diff: {weight_difference_cr:.4f})")
final_targets.append(PortfolioTarget(target_symbol, target_weight_overall))
symbols_targeted_for_rc_investment.add(target_symbol)
# Liquidate other CR if held significantly
other_current_weight_overall = other_holding.HoldingsValue / total_portfolio_value if total_portfolio_value > 0 and other_holding.Invested else 0.0
if other_holding.Invested and abs(0 - other_current_weight_overall) > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Liquidating other RC CashReplace {other_cr_symbol.Value} (Current: {other_current_weight_overall:.4f})")
final_targets.append(PortfolioTarget(other_cr_symbol, 0))
else:
self.Log(f"RC Rebalance: Skipping RC CashReplace overall target {target_symbol.Value} ({target_weight_overall:.4f}), change vs current ({current_weight_overall_cr:.4f}) within tolerance.")
if current_holding_cr.Invested: symbols_targeted_for_rc_investment.add(target_symbol)
# --- Preserve KQT Holdings ---
# Get current KQT holdings and their weights
kqt_symbols_held = {s for s in self.kqt_symbols.values() if self.Portfolio[s].Invested}
kqt_targets_to_preserve = []
for symbol in kqt_symbols_held:
# Don't interfere if KQT is already targeting it for liquidation
is_kqt_liquidating = any(t.Symbol == symbol and t.Quantity == 0 for t in final_targets)
if not is_kqt_liquidating:
current_weight = self.Portfolio[symbol].HoldingsValue / total_portfolio_value
# Add a target to maintain the current KQT position weight
kqt_targets_to_preserve.append(PortfolioTarget(symbol, current_weight))
self.Log(f"RC Rebalance: Preserving KQT holding {symbol.Value} at current weight {current_weight:.4f}")
# 4. Liquidate Untargeted RC Assets
symbols_targeted_overall = {t.Symbol for t in final_targets} | {t.Symbol for t in kqt_targets_to_preserve}
for holding in self.Portfolio.Values:
# Only consider liquidating RC managed symbols
is_rc_asset = holding.Symbol in rc_managed_symbols or holding.Symbol == self.rc_bil # Include BIL check just in case
if holding.Invested and is_rc_asset and holding.Symbol not in symbols_targeted_overall:
current_weight_overall = holding.HoldingsValue / total_portfolio_value
if abs(0 - current_weight_overall) > self.rc_rebalance_tolerance:
self.Log(f"RC Rebalance: Liquidating untargeted RC asset {holding.Symbol.Value} (Current: {current_weight_overall:.4f})")
final_targets.append(PortfolioTarget(holding.Symbol, 0))
else:
self.Log(f"RC Rebalance: Skipping liquidation for untargeted RC {holding.Symbol.Value}, current weight ({current_weight_overall:.4f}) within tolerance from zero.")
# Combine RC targets and KQT preservation targets
combined_targets = final_targets + kqt_targets_to_preserve
# --- Execute Trades ---
if combined_targets:
# Consolidate targets for the same symbol (e.g., if KQT and RC somehow target the same)
final_target_dict = {}
for target in combined_targets:
# Last target wins if duplicated (preservation target might override liquidation)
final_target_dict[target.Symbol] = target.Quantity
final_targets_consolidated = [PortfolioTarget(symbol, weight) for symbol, weight in final_target_dict.items()]
self.Log(f"RC Rebalance: Submitting {len(final_targets_consolidated)} combined targets (RC + KQT Preservation) to SetHoldings.")
self.SetHoldings(final_targets_consolidated)
else:
self.Log("RC Rebalance: No RC targets needed after tolerance check, KQT positions preserved.")
def WeeklyDefensiveAdjustment(self):
if self.current_strategy_mode != "RiskControl": return
days_since_rebalance = (self.Time.date() - self.rc_last_rebalance_date.date()).days
if days_since_rebalance < 3: return
days_since_update = (self.Time.date() - self.rc_last_defensive_update.date()).days
if days_since_update < 5: return
self.Log("--- RiskControl WeeklyDefensiveAdjustment START (Managing RC Fraction) ---")
total_portfolio_value = self.Portfolio.TotalPortfolioValue
if total_portfolio_value <= 0:
self.Log("RC Weekly: Zero or negative portfolio value.")
return
rc_effective_portfolio_value = total_portfolio_value * self.rc_portfolio_fraction
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.rc_spy_30day_window) / self.rc_spy_30day_window.Count if self.rc_spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0 if sma_30 > 0 else 0.0
market_trend = self._rc_calculateMarketTrend()
# Define RC managed symbols (excluding KQT)
rc_equity_symbols = {s for s, _ in self.rc_selected_by_market_cap}
rc_managed_symbols = rc_equity_symbols | (self.rc_all_defensive_set - {self.rc_bil}) | {self.rc_sh, self.rc_ief, self.rc_gld}
# Strong market check: Liquidate RC defensives?
if market_deviation > 0.04 and market_trend > 0.03:
self.Log("RC Weekly: Market too strong, checking existing RC defensive positions.")
liquidation_targets = []
rc_defensives_to_check = (self.rc_all_defensive_set - {self.rc_bil}) | {self.rc_sh, self.rc_ief, self.rc_gld} # Include GLD, SH, IEF
for s in rc_defensives_to_check:
if self.Portfolio[s].Invested:
current_weight_overall = self.Portfolio[s].HoldingsValue / total_portfolio_value
if abs(0 - current_weight_overall) > self.rc_rebalance_tolerance:
self.Log(f"RC Weekly: Liquidating RC defensive {s.Value} due to strong market (Current: {current_weight_overall:.4f}).")
liquidation_targets.append(PortfolioTarget(s, 0))
# Preserve KQT holdings during this liquidation
kqt_symbols_held = {s for s in self.kqt_symbols.values() if self.Portfolio[s].Invested}
kqt_targets_to_preserve = []
for symbol in kqt_symbols_held:
is_kqt_liquidating = any(t.Symbol == symbol and t.Quantity == 0 for t in liquidation_targets)
if not is_kqt_liquidating:
current_weight = self.Portfolio[symbol].HoldingsValue / total_portfolio_value
kqt_targets_to_preserve.append(PortfolioTarget(symbol, current_weight))
combined_targets = liquidation_targets + kqt_targets_to_preserve
if combined_targets:
final_target_dict = {t.Symbol: t.Quantity for t in combined_targets}
final_targets_consolidated = [PortfolioTarget(s, w) for s, w in final_target_dict.items()]
self.Log(f"RC Weekly: Submitting {len(final_targets_consolidated)} targets (RC Defensive Liq + KQT Preservation).")
self.SetHoldings(final_targets_consolidated)
self.rc_defensive_positions.clear() # Clear RC defensive tracking
self.rc_last_defensive_update = self.Time
else:
self.Log("RC Weekly: No significant RC defensive positions to liquidate.")
return # Skip further adjustments if market is strong
# --- Weekly Adjustment Logic (Operate within RC Fraction) ---
# Calculate current RC allocation (excluding KQT)
current_rc_invested_value = sum(h.HoldingsValue for h in self.Portfolio.Values if h.Invested and h.Symbol in rc_managed_symbols)
current_rc_weight_overall = current_rc_invested_value / total_portfolio_value if total_portfolio_value > 0 else 0.0
# Determine available space *within the RC fraction*
# This is complex - maybe simplify: evaluate potential new defensives based on a max potential allocation
max_new_defensive_potential_rc = 0.10 # Allow adding up to 10% of the RC fraction in new defensives weekly
self.Log(f"RC Weekly - Market Dev: {market_deviation:.2%}, Trend: {market_trend:.2%}")
self.Log(f"RC Weekly - Potential new defensive allocation (within RC {self.rc_portfolio_fraction*100}%): {max_new_defensive_potential_rc:.1%}")
# Evaluate *new* defensive candidates (excluding BIL)
new_defensive_allocations_rc = self._rc_evaluateDefensiveETFs(
market_deviation, market_trend, max_new_defensive_potential_rc,
etfs_to_evaluate=list(self.rc_all_defensive_set - {self.rc_bil})
)
final_targets = []
symbols_targeted_for_rc_investment = set() # Track symbols targeted by this RC adjustment
total_new_defensive_target_rc = sum(new_defensive_allocations_rc.values()) # Relative to RC fraction
# Add new/adjust existing RC defensive positions
current_rc_defensive_symbols = (self.rc_all_defensive_set - {self.rc_bil}) | {self.rc_sh, self.rc_ief, self.rc_gld} # Symbols RC might adjust
for symbol in current_rc_defensive_symbols:
target_weight_rc_frac = new_defensive_allocations_rc.get(symbol, 0.0) # Get *new* target weight within RC fraction
target_weight_overall = target_weight_rc_frac * self.rc_portfolio_fraction # Scale to overall
current_holding = self.Portfolio[symbol]
current_weight_overall = current_holding.HoldingsValue / total_portfolio_value if total_portfolio_value > 0 and current_holding.Invested else 0.0
weight_difference = abs(target_weight_overall - current_weight_overall)
# Only set target if adding new allocation or significantly adjusting existing
if target_weight_overall > 0.005: # Min threshold for weekly adjustment
if not current_holding.Invested or weight_difference > self.rc_rebalance_tolerance:
self.Log(f"RC Weekly: Setting RC Defensive overall target {symbol.Value} to {target_weight_overall:.4f} (Current: {current_weight_overall:.4f}, Diff: {weight_difference:.4f})")
final_targets.append(PortfolioTarget(symbol, target_weight_overall))
symbols_targeted_for_rc_investment.add(symbol)
if symbol not in self.rc_entry_prices: # Set entry price if new
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].HasData:
self.rc_entry_prices[symbol] = self.Securities[symbol].Price
else:
self.Log(f"RC Weekly: Skipping RC Defensive overall target {symbol.Value} ({target_weight_overall:.4f}), change vs current ({current_weight_overall:.4f}) within tolerance.")
if current_holding.Invested: symbols_targeted_for_rc_investment.add(symbol) # Still considered targeted by RC
# Don't liquidate existing defensives here unless explicitly targeted zero by _rc_evaluateDefensiveETFs (which it doesn't currently do)
elif current_holding.Invested:
# Keep existing RC defensives unless market is strong (handled above) or monthly rebalance removes them
symbols_targeted_for_rc_investment.add(symbol) # Mark as 'managed' by RC this cycle
# --- Preserve KQT Holdings ---
kqt_symbols_held = {s for s in self.kqt_symbols.values() if self.Portfolio[s].Invested}
kqt_targets_to_preserve = []
for symbol in kqt_symbols_held:
is_kqt_liquidating = any(t.Symbol == symbol and t.Quantity == 0 for t in final_targets) # Check if RC is somehow targeting KQT asset
if not is_kqt_liquidating:
current_weight = self.Portfolio[symbol].HoldingsValue / total_portfolio_value
kqt_targets_to_preserve.append(PortfolioTarget(symbol, current_weight))
# Combine RC targets and KQT preservation targets
combined_targets = final_targets + kqt_targets_to_preserve
if combined_targets:
final_target_dict = {t.Symbol: t.Quantity for t in combined_targets}
final_targets_consolidated = [PortfolioTarget(s, w) for s, w in final_target_dict.items()]
self.Log(f"RC Weekly: Submitting {len(final_targets_consolidated)} combined targets (RC Adj + KQT Preservation).")
self.SetHoldings(final_targets_consolidated)
# Update defensive positions tracking
self.rc_defensive_positions.clear()
for target in final_targets_consolidated:
if target.Symbol in current_rc_defensive_symbols and target.Quantity > 0:
self.rc_defensive_positions.add(target.Symbol)
self.rc_last_defensive_update = self.Time
else:
self.Log("RC Weekly: No RC targets needed after tolerance check, KQT positions preserved.")
# Update defensive positions tracking based on current holdings
current_defensive = set()
for s in current_rc_defensive_symbols:
if self.Portfolio[s].Invested:
current_defensive.add(s)
self.rc_defensive_positions = current_defensive
self.Log("--- RiskControl WeeklyDefensiveAdjustment END ---")
def _rc_calculateMarketTrend(self):
if len(self.rc_spy_prices) < self.rc_trend_lookback + 1: return 0
dates = sorted(self.rc_spy_prices.keys())
if len(dates) <= self.rc_trend_lookback: return 0
recent_price = self.rc_spy_prices[dates[-1]]
older_price = self.rc_spy_prices[dates[-self.rc_trend_lookback]]
return (recent_price / older_price) - 1.0 if older_price > 0 else 0.0
def _rc_calculateSimpleMomentum(self):
momentum_scores = {}
symbols = [sym for sym, _ in self.rc_selected_by_market_cap]
if not symbols: return momentum_scores
history = self.History(symbols, 30, Resolution.Daily)
if history.empty: return momentum_scores
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
mom = prices.iloc[-1] / prices.iloc[0] - 1 if prices.iloc[0] > 0 else 0.0
momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2)))
return momentum_scores
# --- Modify _rc_evaluateDefensiveETFs to accept the new argument ---
def _rc_evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation, etfs_to_evaluate=None):
"""
Evaluates defensive ETFs based on market conditions and performance.
Args:
market_deviation (float): SPY deviation from its 30-day SMA.
market_trend (float): SPY 10-day trend.
max_allocation (float): The maximum potential allocation for all defensive ETFs combined.
etfs_to_evaluate (list, optional): A specific list of ETF Symbols to evaluate.
If None, defaults to self.rc_all_defensive.
"""
# Determine which ETFs to evaluate
if etfs_to_evaluate is None:
etfs_to_evaluate = self.rc_all_defensive # Default to all defined defensive ETFs
# Ensure etfs_to_evaluate is a list of Symbols
if not isinstance(etfs_to_evaluate, list):
etfs_to_evaluate = list(etfs_to_evaluate) # Convert if it's a set or other iterable
self.Log(f"RC: Evaluating {len(etfs_to_evaluate)} defensive ETFs. Max Alloc: {max_allocation:.2%}")
# Initialize allocations only for the ETFs being evaluated
allocations = {symbol: 0 for symbol in etfs_to_evaluate}
if not etfs_to_evaluate:
self.Log("RC EvalDef: No ETFs provided to evaluate.")
return allocations
if market_deviation > 0.04 and market_trend > 0.02:
self.Log("RC EvalDef: Market too strong, skipping.")
return allocations
# Request history only for the ETFs being evaluated + SPY
symbols_for_history = etfs_to_evaluate + [self.spy]
history = self.History(symbols_for_history, 60, Resolution.Daily)
if history.empty:
self.Log("RC EvalDef: History empty, skipping.")
return allocations
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 and spy_prices.iloc[-5] > 0 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 and spy_prices.iloc[-10] > 0 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 and spy_prices.iloc[-20] > 0 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 if len(spy_prices) >= 30 and spy_prices.iloc[-30] > 0 else 0
}
etf_scores = {}
# Iterate through the provided list of ETFs to evaluate
for symbol in etfs_to_evaluate:
# Determine group based on original lists (for scoring logic)
group_name = "Unknown"
if symbol in self.rc_inverse_etfs: group_name = "Inverse"
elif symbol in self.rc_alternative_defensive: group_name = "Alternative"
elif symbol in self.rc_sector_defensive: group_name = "Sector"
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
perf = {}
perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 and prices.iloc[-5] > 0 else 0
perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 and prices.iloc[-10] > 0 else 0
perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 and prices.iloc[-20] > 0 else 0
perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1 if len(prices) >= 30 and prices.iloc[-30] > 0 else 0
rel_perf = {p: perf[p] - spy_perf.get(p, 0) for p in spy_perf}
score = 0
# Apply scoring based on group membership
if group_name == "Inverse":
if market_deviation < -0.02: score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2) + (rel_perf.get("5d",0) + rel_perf.get("10d",0)) * 0.15
else: score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
elif group_name == "Alternative":
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
if market_deviation < -0.03: score += rel_perf.get("10d",0) * 0.2
elif group_name == "Sector":
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score = (rel_perf.get("5d",0) * 0.3) + (rel_perf.get("10d",0) * 0.3) + (rel_perf.get("30d",0) * 0.4)
if market_deviation < -0.02: score = (abs_score * 0.4) + (rel_score * 0.6)
else: score = (abs_score * 0.6) + (rel_score * 0.4)
else: # Handle Unknown group if necessary
pass # Or assign a default score/logic
etf_scores[symbol] = score
threshold = -0.007
if market_deviation < -0.03: threshold = -0.01
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Log("RC EvalDef: No candidates passed threshold.")
return allocations
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
allocation_pct = 0.0
if market_deviation < -0.05 or market_trend < -0.04: allocation_pct = 0.95
elif market_deviation < -0.03 or market_trend < -0.02: allocation_pct = 0.8
elif market_deviation < -0.01 or market_trend < -0.01: allocation_pct = 0.6
else: allocation_pct = 0.4
best_score = sorted_candidates[0][1] if sorted_candidates else 0
allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4))
num_etfs = 1
if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1:
num_etfs = min(2, len(sorted_candidates))
remaining_allocation = max_allocation * allocation_pct
total_score = sum(score for _, score in sorted_candidates[:num_etfs])
if total_score > 0:
for i in range(num_etfs):
symbol, score = sorted_candidates[i]
weight = score / total_score if total_score > 0 else 1.0/num_etfs
etf_allocation = remaining_allocation * weight
if etf_allocation >= 0.02:
# Only allocate if the symbol is in the list we are evaluating
if symbol in allocations:
allocations[symbol] = etf_allocation
self.Log(f"RC EvalDef: Allocating {etf_allocation:.1%} to {symbol.Value} (Score: {score:.3f})")
return allocations# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
from sklearn.preprocessing import RobustScaler
from collections import deque
class Strategy:
def __init__(self):
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {}
self.adaptive_threshold = 0.1
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = deque(maxlen=60) # Use rolling window for returns
self.defensive_mode = False
self.previous_day_hit_stops = []
self.algorithm = None
def calculate_portfolio_risk_score(self, market_returns):
"""Calculate a portfolio risk score (0-100) to scale overall exposure"""
risk_score = 50 # Neutral starting point
# Convert to numpy array for efficient slicing and stats
market_returns = np.asarray(market_returns)
# VIX-like volatility measurement using SPY returns
if len(market_returns) >= 5:
recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized
longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol
# Volatility spike detection
vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1
if vol_ratio > 1.5: # Sharp volatility increase
risk_score -= 30
elif vol_ratio > 1.2:
risk_score -= 15
# Consecutive negative days
if len(market_returns) >= 3:
neg_days = np.sum(market_returns[-3:] < 0)
if neg_days == 3: # Three consecutive down days
risk_score -= 20
elif neg_days == 2:
risk_score -= 10
# Trend direction
if len(market_returns) >= 10:
avg_recent = np.mean(market_returns[-5:])
avg_older = np.mean(market_returns[-10:-5])
trend_change = avg_recent - avg_older
# Declining trend
if trend_change < -0.3:
risk_score -= 15
# Accelerating uptrend
elif trend_change > 0.3 and avg_recent > 0:
risk_score += 10
return max(10, min(100, risk_score)) # Constrain between 10-100
def detect_market_regime(self, daily_returns, lookback=10):
"""Detect current market regime based on portfolio returns"""
daily_returns = np.asarray(daily_returns)
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
if len(self.portfolio_returns) >= 3:
recent_returns = np.asarray(self.portfolio_returns)[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(np.asarray(self.portfolio_returns)[-3:])
less_recent = np.mean(np.asarray(self.portfolio_returns)[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
returns_arr = np.asarray(self.portfolio_returns)
if len(returns_arr) >= 5:
recent_portfolio_returns = returns_arr[-5:]
pos_days = np.sum(recent_portfolio_returns > 0)
neg_days = np.sum(recent_portfolio_returns < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(returns_arr) >= 10:
recent_vol = np.std(returns_arr[-5:])
older_vol = np.std(returns_arr[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(returns_arr) >= 5:
if returns_arr[-1] < 0 and returns_arr[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None, algorithm=None):
if algorithm: self.algorithm = algorithm
log_func = self.algorithm.Log if self.algorithm else print
log_func(f"--- generate_positions ---")
if not prediction_data:
log_func("generate_positions: No prediction data provided.")
return {}
# Update market regime
if current_returns is not None and len(current_returns) > 0:
self.current_regime = self.detect_market_regime(current_returns)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5
else:
self.current_regime = "neutral"
self.defensive_mode = False
bullish_regimes = {"bullish_strong", "breakout_bullish", "bullish", "bullish_pullback"}
is_bullish = self.current_regime in bullish_regimes
TECH_SECTOR_IDENTIFIER = '45'
tech_boost_factor = 1.15
portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else [])
risk_scaling = portfolio_risk_score / 100
# --- ADJUSTED: Lower min risk scaling floor ---
min_risk_scaling = 0.60 # Lowered from 0.75 back towards 0.6
risk_scaling = max(min_risk_scaling, risk_scaling)
# ---
log_func(f"Regime: {self.current_regime}, Defensive Mode: {self.defensive_mode}")
log_func(f"Portfolio Risk Score: {portfolio_risk_score}, Risk Scaling (min {min_risk_scaling}): {risk_scaling:.2f}")
base_threshold = self.adaptive_threshold
current_threshold = base_threshold
log_func(f"Using Threshold: {current_threshold}")
positions = {}
sector_data = {}
valid_predictions = 0
for ticker, data in prediction_data.items():
if "pred_return" not in data: continue
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
if is_bullish and sector == TECH_SECTOR_IDENTIFIER:
pred_return *= tech_boost_factor
if sector not in sector_data: sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return,
"composite_score": pred_return / current_threshold if current_threshold != 0 else pred_return
})
valid_predictions += 1
log_func(f"Found {valid_predictions} valid predictions.")
if valid_predictions == 0: return {}
sector_avg_scores = {sector: np.mean([s["pred_return"] for s in stocks]) if stocks else -np.inf
for sector, stocks in sector_data.items()}
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
top_sector_count = 4 if portfolio_risk_score > 60 else 3
top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))]
log_func(f"Top Sectors Selected ({top_sector_count}): {top_sectors}")
# --- ADJUSTED: Slightly increase stocks per sector ---
stocks_per_sector = 4 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 3 # Increased from 3/2
# ---
selected_stocks_for_positioning = []
for sector in top_sectors:
if sector not in sector_data: continue
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
top_stocks_in_sector = sector_stocks[:min(stocks_per_sector, len(sector_stocks))]
selected_stocks_for_positioning.extend(top_stocks_in_sector)
log_func(f"Sector '{sector}': Top stocks {[s['ticker'] for s in top_stocks_in_sector]}")
log_func(f"Selected {len(selected_stocks_for_positioning)} stocks across top sectors before size filtering.")
log_func(f"Calculating positions for selected stocks.")
for stock in selected_stocks_for_positioning:
ticker = stock["ticker"]
signal_strength = stock["pred_return"]
base_size_multiplier = 1.5
max_base_size = 0.6
# --- ADJUSTED: Lower min base size threshold ---
min_base_size_threshold = 0.03 # Lowered from 0.05
# ---
base_size = min(max_base_size, max(0.01, base_size_multiplier * signal_strength))
if base_size > min_base_size_threshold:
final_size = base_size * risk_scaling
# --- ADJUSTED: Lower min final size threshold ---
min_final_size = 0.025 # Lowered from 0.04 to 2.5%
# ---
if final_size >= min_final_size:
positions[ticker] = final_size
self.algorithm.Debug(f" Ticker: {ticker}, Signal: {signal_strength:.3f}, Base Size: {base_size:.3f}, Final Size: {final_size:.3f}")
else:
self.algorithm.Debug(f" Ticker: {ticker}, Final Size ({final_size:.3f}) too small after risk scaling (Min: {min_final_size}), skipping.")
else:
self.algorithm.Debug(f" Ticker: {ticker}, Base Size ({base_size:.3f}) too small or negative (Threshold: {min_base_size_threshold}), skipping.")
# Defensive adjustments
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]:
# --- ADJUSTED: Less aggressive defensive scaling ---
scaling_factor = 0.95 if self.defensive_mode else 0.99 # Increased from 0.9/0.99
# ---
log_func(f"Defensive Adjustment: Scaling positions by {scaling_factor}")
min_final_size_after_scale = 0.025 # Use the same lowered threshold
for ticker in list(positions.keys()):
positions[ticker] *= scaling_factor
if positions[ticker] < min_final_size_after_scale:
log_func(f" Removing {ticker} due to small size ({positions[ticker]:.4f}) after defensive scaling (Min: {min_final_size_after_scale}).")
del positions[ticker]
# Hedges remain disabled for now
log_func(f"Final positions generated ({len(positions)}): {positions}")
log_func(f"--- generate_positions END ---")
return positions
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
if self.defensive_mode:
return -2.0 # Tighter in defensive mode
else:
return -3.5 # More room for positions to breathe
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
return -1.5 # Tighter stop-loss in bearish regimes
else:
if self.defensive_mode:
return -1.8
else:
return -2.5
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
# No need to manually truncate; deque handles maxlen