| Overall Statistics |
|
Total Orders 557 Average Win 0.95% Average Loss -0.17% Compounding Annual Return 22.952% Drawdown 13.100% Expectancy 2.386 Start Equity 100000 End Equity 330163.65 Net Profit 230.164% Sharpe Ratio 1.212 Sortino Ratio 1.267 Probabilistic Sharpe Ratio 83.675% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 5.68 Alpha 0.089 Beta 0.422 Annual Standard Deviation 0.111 Annual Variance 0.012 Information Ratio 0.216 Tracking Error 0.129 Treynor Ratio 0.319 Total Fees $625.65 Estimated Strategy Capacity $0 Lowest Capacity Asset COST R735QTJ8XC9X Portfolio Turnover 1.03% |
import numpy as np
from AlgorithmImports import *
class AssetWeightCalculator:
def __init__(self, algorithm: QCAlgorithm):
self.algorithm = algorithm
self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR)
def coarse_selection(self, coarse):
"""
Selects stonks, first filter
"""
# Sorts by dollar volume before taking top 200
sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data],
key=lambda x: x.dollar_volume,
reverse=True)
return [x.symbol for x in sorted_by_volume][:200]
def fine_selection(self, fine):
"""
Selects stonks, second filter
"""
filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9]
self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters")
# Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main
ranked_symbols = self.rank_stocks(filtered)
return ranked_symbols
def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days
"""
Calculates the sharpe
"""
try:
# If a KeyValuePair was recieved only take the symbol
if hasattr(symbol, "Key"):
symbol = symbol.Key
history = self.algorithm.history([symbol], period, Resolution.HOUR)
if history.empty:
self.algorithm.debug(f"No history for {symbol.value}")
return None
# Get risk-free rate
rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR)
risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data
# Sharpe ratio logic
returns = history['close'].pct_change().dropna()
excess_returns = returns - (risk_free_rate/1638)
mean_excess_return = excess_returns.mean() * 1638
std_dev = excess_returns.std() * np.sqrt(1638)
return mean_excess_return / std_dev if std_dev != 0 else None
except Exception as e:
self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}")
return None
def rank_stocks(self, symbols):
"""
Ranks da top 50 stocks based on sharpe
"""
if not symbols:
self.algorithm.debug("No symbols to rank")
return []
self.algorithm.debug(f"Ranking {len(symbols)} symbols")
# Converting from key pair if neccessary
symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols]
scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols}
valid_scores = {k: v for k, v in scores.items() if v is not None}
self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}")
if not valid_scores:
return []
sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20]
self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}")
self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}")
return sorted_scores
def normalize_scores(self, scores):
"""
The list of scores from the ranking method are
normalized using a z score so that an additive
operation may be used in WeightCombiner()
"""
values = np.array(list(scores.values()))
mean = np.mean(values)
std_dev = np.std(values)
if std_dev == 0:
# If no variation in scores, assign equal normalized scores
return {symbol: 0 for symbol in scores.keys()}
normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()}
print(normalized_scores) #To see output for debugging
return normalized_scores
from AlgorithmImports import *
class MACDSignalGenerator:
def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05):
self.algorithm = algorithm
self.symbols = symbols
self.cash_buffer = cash_buffer
self.macd_indicators = {} # {symbol: {variant: MACD}}
# Define MACD parameters for different variants
self.macd_variants = {
"slow": {"fast": 12, "slow": 26, "signal": 9},
"slow-med": {"fast": 9, "slow": 19, "signal": 5},
"med-fast": {"fast": 7, "slow": 15, "signal": 3},
"fast": {"fast": 5, "slow": 12, "signal": 2},
}
def remove_symbols(self, symbols: list):
"""
Removes MACD indicators for the specified symbols.
"""
for symbol in symbols:
# Liquidate position before removing indicator
self.algorithm.liquidate(symbol)
# Unregister and delete indicators tied to each symbol
if symbol in self.macd_indicators:
for macd in self.macd_indicators[symbol].values(): # Better: gets MACD objects directly
self.algorithm.unregister_indicator(macd)
del self.macd_indicators[symbol]
def add_symbols(self, new_symbols):
"""
Add in the new symbols that are given by AssetWeightCalculator.
"""
# Log initial attempt
self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}")
# Get historical data for new symbols
history = self.algorithm.history([s for s in new_symbols],
35, # Longest MACD period needed
Resolution.HOUR)
# Log history data availability
self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}")
self.symbols.extend(new_symbols)
for symbol in new_symbols:
security = self.algorithm.securities[symbol]
# Detailed security check logging
# self.algorithm.debug(f"Security {symbol.value} check:"
# f" has_data={security.has_data},"
# f" is_tradable={security.is_tradable},"
# f" price={security.price}")
# Checking if price is 0
if not (security.has_data and security.is_tradable and security.price > 0):
self.algorithm.debug(f"Waiting for valid price data: {symbol.value}")
continue
# Adding the symbol
if symbol not in self.macd_indicators:
self.macd_indicators[symbol] = {}
# Get symbol's historical data
if symbol not in history.index.get_level_values(0):
self.algorithm.debug(f"No history data for: {symbol.value}")
continue
symbol_history = history.loc[symbol]
self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}")
for variant, params in self.macd_variants.items():
macd = self.algorithm.macd(
symbol=symbol,
fast_period=params["fast"],
slow_period=params["slow"],
signal_period=params["signal"],
type=MovingAverageType.EXPONENTIAL,
resolution=Resolution.HOUR,
selector=Field.CLOSE
)
self.macd_indicators[symbol][variant] = macd
# Warm up MACD with historical data
for time, row in symbol_history.iterrows():
macd.update(time, row['close'])
self.macd_indicators[symbol][variant] = macd
def calculate_position_sizes(self):
position_sizes = {}
max_position_limit = 0.1
# Check if we have any symbols to process
if not self.symbols or not self.macd_indicators:
self.algorithm.debug("No symbols available for position calculation")
return position_sizes
# Calculating the maximum one variant can be in size
max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants))
for symbol in self.macd_indicators:
position_sizes[symbol] = {}
for variant, macd in self.macd_indicators[symbol].items():
if macd.is_ready:
security = self.algorithm.securities[symbol]
# Detailed security check logging
# self.algorithm.debug(f"Position Check for {symbol.value}:"
# f" has_data={security.has_data},"
# f" is_tradable={security.is_tradable},"
# f" price={security.price},"
# f" last_data={security.get_last_data() is not None},")
# More comprehensive check
# if not (security.has_data and
# security.is_tradable and
# security.price > 0 and
# security.get_last_data() is not None):
# self.algorithm.debug(f"Security not ready: {symbol.value}")
# continue
# Distance between fast and slow
distance = macd.fast.current.value - macd.slow.current.value
# Normalize the distance as a percentage difference and then as a fraction of max position
position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting
# Only allow positive positions, cap at maximum
position_size = max(0, min(position_size, max_position_limit))
position_sizes[symbol][variant] = position_size
#self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}")
else:
position_sizes[symbol][variant] = 0
# Running daily cause the logging is too heavy hourly
if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0:
rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()]
#self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}")
return position_sizesfrom AlgorithmImports import *
import numpy as np
from datetime import timedelta
class MarketCapWeightedSP500Tracker(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2025, 1, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Daily
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.selected_by_market_cap = []
self.rebalance_flag = False
self.spy_30day_window = RollingWindow[float](30)
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
# Initialize rolling window with historical data
history = self.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
# Add simple tracking of market trend
self.trend_lookback = 10
self.spy_prices = {}
self.max_spy_history = 60 # Days of price history to keep
# Add dynamic stop-loss enhancement
self.stop_loss_base = 0.04 # Reduced base stop-loss threshold
self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold
# Expanded list of inverse and defensive ETFs
# Original inverse ETFs
self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500
self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100
self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones
self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000
self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets
self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400
# Alternative defensive ETFs (not inverse but potentially good in downturns)
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury
self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market
# Sector-based defensive ETFs (often outperform in bear markets)
self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples
self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities
self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare
self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare
self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples
# Group all defensive ETFs together
self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
self.alternative_defensive = [self.gld, self.ief, self.bnd]
self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive
# Add diagnostic logging capability
self.diagnostic_mode = True # Enable detailed diagnostics
# Initialize positions tracking and add weekly tactical adjustment
self.defensive_positions = set()
self.last_defensive_update = datetime(1900, 1, 1)
# Add weekly defensive ETF evaluation schedule
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance
self.WeeklyDefensiveAdjustment)
# Initialize positions tracking
self.inverse_positions = set()
# Add inverse ETF lookback windows for better momentum calculation
self.inverse_lookback_short = 7 # 1 week momentum window
self.inverse_lookback_med = 15 # Medium-term momentum
# Add ATR indicators for enhanced volatility-based stop-loss calculation
self.atr_period = 14
self.atr = {}
# Register ATR for key symbols (defensive ETFs, BIL, and SPY)
for symbol in self.all_defensive + [self.bil, self.spy]:
self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily)
def CoarseSelectionFunction(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
return [x.Symbol for x in sorted_by_cap]
def SetRebalanceFlag(self):
if self.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
def OnData(self, data):
# Update price window
if not data.Bars.ContainsKey(self.spy): return
self.spy_30day_window.Add(data.Bars[self.spy].Close)
# Track prices for trend calculation
self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close
# Remove old prices
dates_to_remove = []
for date in self.spy_prices.keys():
if (self.Time.date() - date).days > self.max_spy_history:
dates_to_remove.append(date)
for date in dates_to_remove:
self.spy_prices.pop(date)
market_trend = self._calculateMarketTrend()
# Track if any stop-loss was triggered
stop_loss_triggered = False
# Check stop-loss triggers with improved dynamic thresholds
for kvp in self.Portfolio:
symbol = kvp.Key
holding = kvp.Value
if holding.Invested and symbol != self.bil:
current_price = self.Securities[symbol].Price
if symbol not in self.entry_prices:
self.entry_prices[symbol] = current_price
price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol]
# Start with the base threshold and adjust based on market trend
stop_threshold = self.stop_loss_base
if market_trend < -0.03:
stop_threshold *= 0.9 # tighten in downtrends
elif market_trend > 0.03:
stop_threshold *= 1.1 # loosen in uptrends
# Incorporate ATR if ready with adjustment to prevent overreaction in high volatility
if symbol in self.atr and self.atr[symbol].IsReady:
current_atr = self.atr[symbol].Current.Value
atr_pct = current_atr / current_price
# If ATR is excessively high versus our base, use a lower weight to temper the effect
effective_weight = self.dynamic_stop_weight
if atr_pct > stop_threshold * 1.2:
effective_weight = min(self.dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold +
effective_weight * atr_pct)
if price_drop >= stop_threshold:
self.Liquidate(symbol)
stop_loss_triggered = True
self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
# If any stop-loss was triggered, invest all available cash in BIL
if stop_loss_triggered:
available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash
if available_cash > 0:
bil_price = self.Securities[self.bil].Price
bil_quantity = available_cash / bil_price
self.MarketOrder(self.bil, bil_quantity)
self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss")
def WeeklyDefensiveAdjustment(self):
"""Weekly check and adjustment for defensive ETF positions"""
# Skip if we've done the monthly rebalance recently
days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999
if days_since_rebalance < 3:
return
# Skip if we've updated defensive positions recently
days_since_update = (self.Time.date() - self.last_defensive_update.date()).days
if days_since_update < 5: # At most once a week
return
# Calculate current market conditions
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Skip in strong bull markets
if market_deviation > 0.04 and market_trend > 0.03:
return
# Calculate total invested amount including all positions
total_invested = sum(holding.HoldingsValue for holding in self.Portfolio.Values
if holding.Invested) / self.Portfolio.TotalPortfolioValue
# If we're already fully invested, can't add more defensive positions
if total_invested >= 0.98: # Allow small buffer for rounding errors
self.Debug(f"Already fully invested ({total_invested:.2f}), skipping defensive adjustments")
return
# Calculate available room for defensive positions
available_allocation = max(0, 0.99 - total_invested) # Keep tiny buffer
# Calculate how much is currently allocated to defensive positions
current_defensive_value = sum(self.Portfolio[s].HoldingsValue
for s in self.defensive_positions
if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested)
# Calculate current BIL allocation
current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0
bil_allocation = current_bil_value / self.Portfolio.TotalPortfolioValue
# Limit potential allocation to available room
max_defensive_pct = min(0.25, available_allocation / bil_allocation if bil_allocation > 0 else 0)
potential_allocation = bil_allocation * max_defensive_pct
# Make sure we don't exceed available room
potential_allocation = min(potential_allocation, available_allocation)
# Super detailed diagnostics for current defensive positions
if self.diagnostic_mode and self.defensive_positions:
self.Debug(f"WEEKLY CHECK - Current defensive positions:")
for symbol in self.defensive_positions:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
position = self.Portfolio[symbol]
entry = self.entry_prices.get(symbol, position.AveragePrice)
current = self.Securities[symbol].Price
pnl_pct = (current / entry) - 1 if entry > 0 else 0
self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f}")
# Evaluate current defensive positions and potential new ones
self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%")
self.Debug(f"BIL allocation: {bil_allocation*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%")
# Run the defensive ETF evaluation
new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation)
# Calculate which positions to add, modify, or remove
positions_to_add = {}
positions_to_remove = set()
# Process existing positions
for symbol in self.defensive_positions:
# If position should be kept but maybe at different allocation
if symbol in new_allocations and new_allocations[symbol] > 0:
current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue if self.Portfolio.ContainsKey(symbol) else 0
target_pct = new_allocations[symbol]
# If allocation difference is significant, adjust position
if abs(target_pct - current_pct) > 0.01:
positions_to_add[symbol] = target_pct
# Remove from new allocations dict to avoid double-processing
new_allocations.pop(symbol)
else:
# Position should be removed
positions_to_remove.add(symbol)
# Add any remaining new positions
for symbol, allocation in new_allocations.items():
if allocation > 0.01: # Minimum meaningful allocation
positions_to_add[symbol] = allocation
# Check if we'll exceed our allocation limits with new positions
total_new_allocation = sum(positions_to_add.values())
if total_new_allocation > available_allocation:
# Scale back allocations to fit available space
scale_factor = available_allocation / total_new_allocation
for symbol in positions_to_add:
positions_to_add[symbol] *= scale_factor
self.Debug(f"Scaled defensive allocations to fit available space: {scale_factor:.4f}")
# Execute trades if needed
if positions_to_add or positions_to_remove:
self.Debug(f"WEEKLY ADJUSTMENT - Making defensive position changes")
# Remove positions no longer needed
for symbol in positions_to_remove:
self.Liquidate(symbol)
self.defensive_positions.remove(symbol)
self.Debug(f"Removed defensive position: {symbol}")
# Add or adjust positions
for symbol, allocation in positions_to_add.items():
self.SetHoldings(symbol, allocation)
self.defensive_positions.add(symbol)
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Updated defensive position: {symbol} to {allocation*100:.2f}%")
self.last_defensive_update = self.Time
def MonthlyRebalance(self):
if not self.rebalance_flag: return
self.rebalance_flag = False
self.entry_prices.clear() # Reset entry prices at rebalance
if self.spy_30day_window.Count < 30:
self.Debug("Waiting for enough SPY history.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / 30
# Calculate market deviation for better decisions
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Enhanced BIL allocation logic with lower caps
bil_weight = 0.0
if spy_price < sma_30:
# Enhanced formula for better downside protection
base_weight = (sma_30 - spy_price) / sma_30
if base_weight > 0.08: # Significant drop
# Lower cap on BIL for significant drops
bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%)
else:
bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%)
# Enhanced reduction rule for better returns in bull markets
if market_deviation > 0.05: # Strong bull market
min_bil_allocation = self.previous_bil_allocation * 0.7 # 30% reduction
elif market_deviation > 0.02: # Modest bull market
min_bil_allocation = self.previous_bil_allocation * 0.75 # 25% reduction
else:
min_bil_allocation = self.previous_bil_allocation * 0.8 # Standard 20% reduction
bil_weight = max(bil_weight, min_bil_allocation)
# Lower caps on BIL in all market conditions
if market_deviation > 0.08: # Very strong bull
bil_weight = min(bil_weight, 0.15) # Cap at 15% (was 20%)
elif market_deviation > 0.05: # Strong bull
bil_weight = min(bil_weight, 0.25) # Cap at 25% (was 30%)
elif market_deviation > 0.0: # Mild bull
bil_weight = min(bil_weight, 0.4) # Cap at 40% (new tier)
elif market_deviation > -0.03: # Neutral
bil_weight = min(bil_weight, 0.5) # Cap at 50% (new tier)
else: # Bear
bil_weight = min(bil_weight, 0.6) # Cap at 60% (new tier)
# Calculate how much of the original BIL allocation to potentially use for inverse ETFs
original_bil = bil_weight
# Use only a portion of BIL for inverse ETFs, keeping some as BIL
inverse_etf_potential = original_bil * 0.4 # Use 40% of BIL allocation for inverse ETFs
bil_weight = original_bil - inverse_etf_potential
# Run diagnostics on defensive ETFs
if self.diagnostic_mode:
self._runDefensiveETFDiagnostics(market_deviation, market_trend)
# Evaluate inverse ETFs for possible allocation
inverse_allocations = self._evaluateInverseETFs(market_deviation, market_trend, inverse_etf_potential)
# Include alternative defensive ETFs in evaluation
all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, inverse_etf_potential)
# Calculate total allocation to defensive ETFs
total_defensive_allocation = sum(all_defensive_allocations.values())
# Set aside remainder as cash (won't be allocated)
cash_reserve = inverse_etf_potential - total_defensive_allocation
# Calculate weight for equity portion
equity_weight = 1.0 - total_defensive_allocation
# Ensure total allocation never exceeds 100%
total_allocation = bil_weight + total_defensive_allocation + equity_weight
if total_allocation > 1.0:
# Scale back components proportionally
scale_factor = 1.0 / total_allocation
bil_weight *= scale_factor
equity_weight *= scale_factor
# Scale each defensive allocation
for symbol in all_defensive_allocations:
all_defensive_allocations[symbol] *= scale_factor
total_defensive_allocation = sum(all_defensive_allocations.values())
self.Debug(f"Scaled allocations to prevent leverage: {scale_factor:.4f}")
self.Debug(f"Allocation breakdown: Equity {equity_weight*100:.1f}%, BIL {bil_weight*100:.1f}%, " +
f"Defensive ETFs {total_defensive_allocation*100:.1f}%, Cash {cash_reserve*100:.1f}%")
# Enhance stock selection with simple momentum filter
momentum_scores = self._calculateSimpleMomentum()
# Filter out worst momentum stocks
filtered_stocks = []
for symbol, mcap in self.selected_by_market_cap:
score = momentum_scores.get(symbol, 1.0)
if score >= 0.9: # Keep only neutral or positive momentum stocks
filtered_stocks.append((symbol, mcap))
# If we filtered too many, revert to original list
if len(filtered_stocks) < 20:
filtered_stocks = self.selected_by_market_cap
# Calculate weights using the filtered stocks
total_market_cap = sum([x[1] for x in filtered_stocks])
weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in filtered_stocks}
invested = set()
for symbol, weight in weights.items():
if weight > 0:
self.SetHoldings(symbol, weight)
invested.add(symbol)
self.entry_prices[symbol] = self.Securities[symbol].Price
# Set BIL position
if bil_weight > 0:
self.SetHoldings(self.bil, bil_weight)
invested.add(self.bil)
else:
self.Liquidate(self.bil)
# Set defensive ETF positions
for symbol, weight in all_defensive_allocations.items():
if weight > 0:
self.SetHoldings(symbol, weight)
invested.add(symbol)
self.defensive_positions.add(symbol) # Using renamed set
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Allocated {weight*100:.2f}% to defensive ETF {symbol}")
elif symbol in self.defensive_positions:
self.Liquidate(symbol)
self.defensive_positions.remove(symbol)
# Update last rebalance date tracker
self.last_rebalance_date = self.Time
# Store current BIL allocation for next month's minimum
self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue
self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)")
# Liquidate positions not in current selection
for kvp in self.Portfolio:
symbol = kvp.Key
if (kvp.Value.Invested and symbol not in invested
and symbol != self.spy and symbol not in self.defensive_positions):
self.Liquidate(symbol)
def _calculateMarketTrend(self):
"""Calculate recent market trend using price history"""
if len(self.spy_prices) < self.trend_lookback + 1:
return 0 # Not enough data
dates = sorted(self.spy_prices.keys())
if len(dates) <= self.trend_lookback:
return 0
recent_price = self.spy_prices[dates[-1]]
older_price = self.spy_prices[dates[-self.trend_lookback]]
return (recent_price / older_price) - 1.0
def _calculateSimpleMomentum(self):
"""Calculate simple momentum scores for stock filtering"""
momentum_scores = {}
symbols = [sym for sym, _ in self.selected_by_market_cap]
if not symbols:
return momentum_scores
# Get 30 days of history for all stocks
history = self.History(symbols, 30, Resolution.Daily)
if history.empty:
return momentum_scores
# Calculate simple momentum (30-day price change)
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# 30-day momentum
mom = prices.iloc[-1] / prices.iloc[0] - 1
# Convert to a score between 0.7 and 1.3
# Center around 1.0, with range based on 15% move
momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2)))
return momentum_scores
def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced evaluation of inverse ETFs with more sensitive criteria"""
allocations = {symbol: 0 for symbol in self.inverse_etfs}
# More permissive consideration of inverse ETFs
if market_deviation > 0.04 and market_trend > 0.02:
return allocations # Only skip in very strong bull markets
# Get more history for better momentum calculation
history = self.History(self.inverse_etfs, 45, Resolution.Daily)
if history.empty:
return allocations
# Enhanced momentum scoring
momentum_scores = {}
volatility_scores = {}
for symbol in self.inverse_etfs:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Multiple timeframe momentum - more emphasis on recent performance
mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
mom_30d = prices.iloc[-1] / prices.iloc[0] - 1
# Weight recent momentum much more heavily
momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2)
# Calculate volatility (lower is better for inverse ETFs)
returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))]
volatility = np.std(returns) if returns else 0
# Calculate short-term rate of change (acceleration)
if len(prices) >= 10:
recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1
prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1
acceleration = recent_5d_change - prev_5d_change
else:
acceleration = 0
# Momentum score adds weight for accelerating performance
momentum_scores[symbol] = momentum + (acceleration * 0.5)
volatility_scores[symbol] = volatility
# More aggressive filtering - consider even small positive momentum
positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005}
# No allocation if no ETFs have at least neutral momentum
if not positive_momentum_etfs:
self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash")
return allocations
# Enhanced selection: favor momentum but consider volatility too
best_candidates = []
for symbol, score in positive_momentum_etfs.items():
volatility = volatility_scores.get(symbol, 1.0)
# Adjust score: higher momentum is good, lower volatility is good
adjusted_score = score - (volatility * 0.5)
best_candidates.append((symbol, score, adjusted_score))
# Sort by adjusted score
best_candidates.sort(key=lambda x: x[2], reverse=True)
# More aggressive allocation model
allocation_pct = 0.0
# Allocate based on market conditions with more sensitivity
if market_deviation < -0.05:
allocation_pct = 1.0 # Use 100% of available inverse allocation
elif market_deviation < -0.03:
allocation_pct = 0.8 # Use 80% of available inverse allocation
elif market_deviation < -0.01:
allocation_pct = 0.6 # Use 60% of available inverse allocation
elif market_deviation < 0.01: # Even in slight bull market if momentum is positive
allocation_pct = 0.4 # Use 40% of available inverse allocation
else:
allocation_pct = 0.2 # Use 20% only if momentum is strong enough
# No candidates or market conditions don't justify allocation
if not best_candidates or allocation_pct < 0.1:
return allocations
# Take top 1-2 ETFs depending on market conditions
num_etfs = 1
if market_deviation < -0.04 and len(best_candidates) > 1:
num_etfs = 2 # Use two ETFs in stronger downtrends
# Allocate to best ETF(s)
remaining_allocation = max_allocation * allocation_pct
for i in range(min(num_etfs, len(best_candidates))):
symbol, raw_score, _ = best_candidates[i]
# Allocate proportionally to momentum strength, with a minimum threshold
etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3
# Calculate allocation for this ETF
etf_allocation = remaining_allocation * etf_weight / num_etfs
# Only allocate if it's a meaningful amount
if etf_allocation >= 0.01: # At least 1% allocation
allocations[symbol] = etf_allocation
self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%")
return allocations
def _runDefensiveETFDiagnostics(self, market_deviation, market_trend):
"""Run detailed diagnostics on all defensive ETFs"""
# Get extensive history for analysis
history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily)
if history.empty:
return
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0,
"15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
# Log market conditions
self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " +
f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%")
# Analyze each ETF
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate multiple timeframe performance
perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate recent acceleration
recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
accel = recent_5d - prev_5d
# Calculate relative performance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
if period == "7d":
rel_perf[period] = perf_7d - spy_val
elif period == "15d":
rel_perf[period] = perf_15d - spy_val
elif period == "30d":
rel_perf[period] = perf_30d - spy_val
# Log detailed ETF statistics
self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " +
f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " +
f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%")
def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced defensive ETF evaluation with sector rotation"""
allocations = {symbol: 0 for symbol in self.all_defensive}
# Skip if market is very bullish
if market_deviation > 0.04 and market_trend > 0.02:
return allocations
# Get history for all defensive options and SPY
history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily)
if history.empty:
return allocations
# Detailed diagnostics on all ETFs
self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS:")
# Calculate SPY performance for relative comparisons
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
self.Debug(f" SPY: 5d: {spy_perf['5d']*100:.1f}%, 10d: {spy_perf['10d']*100:.1f}%, " +
f"20d: {spy_perf['20d']*100:.1f}%, 30d: {spy_perf['30d']*100:.1f}%")
# Enhanced scoring system with different criteria for different ETF types
etf_scores = {}
# Process each ETF by type
for group_name, group in [("Inverse", self.inverse_etfs),
("Alternative", self.alternative_defensive),
("Sector", self.sector_defensive)]:
self.Debug(f" {group_name} ETFs:")
for symbol in group:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate absolute momentum components
perf = {}
perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0
perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate relative outperformance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
rel_perf[period] = perf[period] - spy_val
# Log detailed performance
self.Debug(f" {symbol}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf['5d']*100:+.1f}%), " +
f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf['10d']*100:+.1f}%), " +
f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf['30d']*100:+.1f}%)")
# Inverse ETFs need to show positive momentum in down markets
if symbol in self.inverse_etfs:
# In downtrends, rising inverse ETFs are good
if market_deviation < -0.02:
score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2)
# Bonus for relative outperformance
score += (rel_perf["5d"] + rel_perf["10d"]) * 0.15
else:
# Less emphasis on long-term performance in neutral markets
score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
# Alternative defensive (bonds, gold) - focus on absolute return
elif symbol in self.alternative_defensive:
# Less dramatic movements, need lower thresholds
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
# In downtrends, emphasize relative performance more
if market_deviation < -0.03:
score += rel_perf["10d"] * 0.2 # Bonus for outperformance
# Sector ETFs - focus on relative outperformance
else:
# These should have positive absolute returns and outperform SPY
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score = (rel_perf["5d"] * 0.3) + (rel_perf["10d"] * 0.3) + (rel_perf["30d"] * 0.4)
# Balance absolute and relative performance
if market_deviation < -0.02:
# In downtrends, relative outperformance is more important
score = (abs_score * 0.4) + (rel_score * 0.6)
else:
# In neutral markets, absolute performance matters more
score = (abs_score * 0.6) + (rel_score * 0.4)
etf_scores[symbol] = score
# Find candidates with appropriate momentum based on market conditions
threshold = -0.007 # Default threshold
if market_deviation < -0.03:
threshold = -0.01 # More permissive in stronger downturns
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Debug("No defensive ETFs showed sufficient momentum - keeping as cash")
return allocations
# Sort and log candidate scores
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
self.Debug(f"Top 5 defensive candidates:")
for symbol, score in sorted_candidates[:5]:
group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f" {symbol} ({group}): Score {score*100:.2f}%")
# Set allocation percent based on market conditions and trend
allocation_pct = 0.0
if market_deviation < -0.05 or market_trend < -0.04:
allocation_pct = 0.95 # Almost all available allocation
elif market_deviation < -0.03 or market_trend < -0.02:
allocation_pct = 0.8
elif market_deviation < -0.01 or market_trend < -0.01:
allocation_pct = 0.6
else:
allocation_pct = 0.4
# Adjust allocation based on strength of best candidate
best_score = sorted_candidates[0][1] if sorted_candidates else 0
allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4))
# Determine number of ETFs to use - more in stronger downtrends
num_etfs = 1
if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1:
num_etfs = min(2, len(sorted_candidates))
# Allocate to best candidates
remaining_allocation = max_allocation * allocation_pct
total_score = sum(score for _, score in sorted_candidates[:num_etfs])
if total_score > 0:
for i in range(num_etfs):
symbol, score = sorted_candidates[i]
# Weight by relative score
weight = score / total_score if total_score > 0 else 1.0/num_etfs
# Calculate allocation
etf_allocation = remaining_allocation * weight
# Only allocate if meaningful
if etf_allocation >= 0.02: # 2% minimum allocation
allocations[symbol] = etf_allocation
etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f"Selected {etf_type} ETF {symbol} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%")
return allocations# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from sklearn.preprocessing import RobustScaler
class KQTStrategy:
def __init__(self):
self.model = None
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {}
self.adaptive_threshold = 0.2
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = []
self.defensive_mode = False
self.previous_day_hit_stops = []
def create_sliding_sequences(self, df, feature_cols, lookback, stride=1):
X = []
for i in range(0, len(df) - lookback + 1, stride):
X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32))
return np.array(X)
def clip_outliers(self, df, cols, lower=0.01, upper=0.99):
df_copy = df.copy()
for col in cols:
if col in df_copy.columns:
q_low = df_copy[col].quantile(lower)
q_high = df_copy[col].quantile(upper)
df_copy.loc[df_copy[col] < q_low, col] = q_low
df_copy.loc[df_copy[col] > q_high, col] = q_high
return df_copy
def filter_features_to_match_model(self, df, feature_cols, required_count=5):
"""Ensure we have exactly the required number of features"""
if len(feature_cols) == required_count:
return feature_cols
# First, prioritize the lag returns (most important)
lag_features = [col for col in feature_cols if 'return_lag' in col]
# Next, add in the most predictive technical features in a fixed order
tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m',
'oversold', 'overbought', 'roc_diff', 'volatility_regime']
prioritized_features = lag_features.copy()
for feat in tech_priority:
if feat in feature_cols and len(prioritized_features) < required_count:
prioritized_features.append(feat)
# If still not enough, add remaining features
remaining = [col for col in feature_cols if col not in prioritized_features]
while len(prioritized_features) < required_count and remaining:
prioritized_features.append(remaining.pop(0))
# If too many, truncate
return prioritized_features[:required_count]
def add_technical_features(self, df):
if 'Close' not in df.columns:
return df
df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price
df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1
df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal
df['volatility_10'] = df['Close'].pct_change().rolling(10).std()
df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std()
df['roc_5'] = df['Close'].pct_change(5)
df['roc_10'] = df['Close'].pct_change(10)
df['roc_diff'] = df['roc_5'] - df['roc_10']
df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1)
return df.fillna(0)
def add_enhanced_features(self, df):
"""Add enhanced technical features"""
df['volatility_trend'] = df['volatility_10'].pct_change(5)
df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int)
if 'volume' in df.columns:
df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean()
df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5']
df['momentum_1m'] = df['Close'].pct_change(20)
df['momentum_3m'] = df['Close'].pct_change(60)
df['momentum_breadth'] = (
(df['roc_5'] > 0).astype(int) +
(df['momentum_1m'] > 0).astype(int) +
(df['momentum_3m'] > 0).astype(int)
) / 3
df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10']
df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int)
df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int)
df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int)
df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001)
return df
def prepare_stock_data(self, stock_data, ticker, is_training=False):
"""Prepare data for a single stock"""
if len(stock_data) < self.lookback + 5: # Need enough data
return None, None
stock_df = pd.DataFrame({
'Close': stock_data['close'].values,
'time': stock_data['time'].values
})
if 'volume' in stock_data.columns:
stock_df['volume'] = stock_data['volume'].values
stock_df = stock_df.sort_values('time').reset_index(drop=True)
stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100
# In prepare_stock_data, replace the feature cols section with:
feature_cols = []
# Add basic lag features
for i in range(1, 6):
col_name = f'return_lag{i}'
stock_df[col_name] = stock_df['pct_return'].shift(i)
feature_cols.append(col_name)
# Add technical features
stock_df = self.add_technical_features(stock_df)
stock_df = self.add_enhanced_features(stock_df)
# Add all potential features
additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20']
enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m',
'momentum_breadth', 'mean_rev_signal', 'oversold',
'overbought', 'regime_change', 'risk_adj_momentum']
for col in additional_features + enhanced_features:
if col in stock_df.columns:
feature_cols.append(col)
# Filter to the exact number of features expected by the model
model_feature_count = 5 # Use the exact count from your model
feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count)
if not self.feature_cols:
self.feature_cols = feature_cols.copy()
stock_df = stock_df.dropna().reset_index(drop=True)
# Handle outliers
stock_df = self.clip_outliers(stock_df, feature_cols)
# Replace the scaling code in prepare_stock_data with this:
# Scale features
if ticker not in self.scalers or is_training:
# Check if we have data
if len(stock_df) == 0 or len(feature_cols) == 0:
return None, stock_df # Return early if no data
# Check if any features are empty/nan
if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty:
# Fill NaNs with zeros
stock_df[feature_cols] = stock_df[feature_cols].fillna(0)
# Ensure we have data
if len(stock_df[feature_cols]) > 0:
try:
scaler = RobustScaler()
stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols])
self.scalers[ticker] = scaler
except Exception as e:
print(f"Scaling error for {ticker}: {str(e)}")
# Use a simple standardization as fallback
for col in feature_cols:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
else:
return None, stock_df # Return early if empty after processing
else:
# Use existing scaler
scaler = self.scalers[ticker]
try:
stock_df[feature_cols] = scaler.transform(stock_df[feature_cols])
except Exception as e:
print(f"Transform error for {ticker}: {str(e)}")
# Simple standardization fallback
for col in feature_cols:
if col in stock_df.columns and len(stock_df[col]) > 0:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
# Create sequences for prediction
X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1)
if len(X) == 0:
return None, stock_df
return X, stock_df
# Add to strategy.py in KQTStrategy class
def calculate_portfolio_risk_score(self, market_returns):
"""Calculate a portfolio risk score (0-100) to scale overall exposure"""
risk_score = 50 # Neutral starting point
# VIX-like volatility measurement using SPY returns
if len(market_returns) >= 5:
recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized
longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol
# Volatility spike detection
vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1
if vol_ratio > 1.5: # Sharp volatility increase
risk_score -= 30
elif vol_ratio > 1.2:
risk_score -= 15
# Consecutive negative days
if len(market_returns) >= 3:
neg_days = sum(1 for r in market_returns[-3:] if r < 0)
if neg_days == 3: # Three consecutive down days
risk_score -= 20
elif neg_days == 2:
risk_score -= 10
# Trend direction
if len(market_returns) >= 10:
avg_recent = np.mean(market_returns[-5:])
avg_older = np.mean(market_returns[-10:-5])
trend_change = avg_recent - avg_older
# Declining trend
if trend_change < -0.3:
risk_score -= 15
# Accelerating uptrend
elif trend_change > 0.3 and avg_recent > 0:
risk_score += 10
return max(10, min(100, risk_score)) # Constrain between 10-100
def predict_returns(self, X, ticker):
"""Make predictions for a single stock"""
if self.model is None:
return 0
if ticker not in self.stock_to_id:
self.stock_to_id[ticker] = len(self.stock_to_id)
stock_id = self.stock_to_id[ticker]
try:
X_tensor = torch.tensor(X, dtype=torch.float32)
stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long)
with torch.no_grad():
predictions = self.model(X_tensor, stock_ids)
# Convert to standard Python float for safety
return float(predictions.detach().numpy().flatten()[-1])
except Exception as e:
print(f"Prediction error for {ticker}: {e}")
return 0 # Return neutral prediction on error
def detect_market_regime(self, daily_returns, lookback=10):
"""Detect current market regime based on portfolio returns"""
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(self.portfolio_returns[-3:])
less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
if len(self.portfolio_returns) >= 5:
recent_portfolio_returns = self.portfolio_returns[-5:]
pos_days = sum(1 for r in recent_portfolio_returns if r > 0)
neg_days = sum(1 for r in recent_portfolio_returns if r < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(self.portfolio_returns) >= 10:
recent_vol = np.std(self.portfolio_returns[-5:])
older_vol = np.std(self.portfolio_returns[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(self.portfolio_returns) >= 5:
if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None):
"""Generate position sizing based on predictions with improved diversification"""
if not prediction_data:
return {}
# Update market regime
if current_returns is not None:
self.current_regime = self.detect_market_regime(current_returns)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5
# Calculate portfolio risk score (0-100)
portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else [])
# Convert to a scaling factor (0.1 to 1.0)
risk_scaling = portfolio_risk_score / 100
base_threshold = 0.25 * self.pred_std
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
self.adaptive_threshold = base_threshold * 0.4
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
self.adaptive_threshold = base_threshold * 2.5
elif self.current_regime in ["bearish", "bearish_low_vol"]:
self.adaptive_threshold = base_threshold * 1.6
elif self.current_regime in ["bullish_pullback"]:
self.adaptive_threshold = base_threshold * 0.9
else: # neutral or other regimes
self.adaptive_threshold = base_threshold * 0.75
positions = {}
# Group stocks by sector
sector_data = {}
for ticker, data in prediction_data.items():
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
if sector not in sector_data:
sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return,
"composite_score": pred_return / self.adaptive_threshold
})
# Rank sectors by predicted return
sector_avg_scores = {}
for sector, stocks in sector_data.items():
sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks])
# CHANGE: Include more sectors (3-4 instead of just 2)
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
top_sector_count = 3 if portfolio_risk_score > 60 else 2 # More diversification in lower risk periods
top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))]
# CHANGE: Allow more stocks per sector in bull markets
stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2
# Allocate within top sectors - focus on stocks with strongest signals
for sector in top_sectors:
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
# Take top N stocks in each selected sector
top_stocks = sector_stocks[:min(stocks_per_sector, len(sector_stocks))]
# CHANGE: Make position size proportional to signal strength but limited by volatility
for stock in top_stocks:
ticker = stock["ticker"]
signal_strength = stock["pred_return"] / (0.2 * self.pred_std)
# Base size calculation
base_size = min(0.3, max(0.05, 0.15 * signal_strength))
# Scale by portfolio risk
final_size = base_size * risk_scaling
positions[ticker] = final_size
# Defensive adjustments
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]:
# 1. Reduce overall position sizes
scaling_factor = 0.5 if self.defensive_mode else 0.7 # More aggressive reduction
for ticker in positions:
positions[ticker] *= scaling_factor
# 2. Add inverse positions (shorts) as hedges if we have bearish predictions
if len(positions) > 0 and portfolio_risk_score < 40: # Only hedge in higher risk environments
negative_preds = {t: data["pred_return"] for t, data in prediction_data.items()
if data["pred_return"] < 0 and t not in positions}
if negative_preds:
worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2]
for ticker, pred in worst_stocks:
hedge_size = -0.15 if self.defensive_mode else -0.1
positions[ticker] = hedge_size
return positions
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
if self.defensive_mode:
return -2.0 # Tighter in defensive mode
else:
return -3.5 # More room for positions to breathe
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
return -1.5 # Tighter stop-loss in bearish regimes
else:
if self.defensive_mode:
return -1.8
else:
return -2.5
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
if len(self.portfolio_returns) > 60: # Keep a rolling window
self.portfolio_returns = self.portfolio_returns[-60:]
def update_model_calibration(self, all_predictions):
"""Update prediction standard deviation for threshold calibration"""
all_pred_values = [p for p in all_predictions.values()]
if len(all_pred_values) > 5:
self.pred_std = np.std(all_pred_values)