| Overall Statistics |
|
Total Orders 477 Average Win 0.91% Average Loss -0.21% Compounding Annual Return 22.138% Drawdown 13.200% Expectancy 1.949 Start Equity 1000000 End Equity 3175591.13 Net Profit 217.559% Sharpe Ratio 1.184 Sortino Ratio 1.223 Probabilistic Sharpe Ratio 82.037% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 4.25 Alpha 0.082 Beta 0.442 Annual Standard Deviation 0.109 Annual Variance 0.012 Information Ratio 0.179 Tracking Error 0.123 Treynor Ratio 0.292 Total Fees $2025.47 Estimated Strategy Capacity $0 Lowest Capacity Asset NB R735QTJ8XC9X Portfolio Turnover 0.89% |
import numpy as np
from AlgorithmImports import *
class AssetWeightCalculator:
def __init__(self, algorithm: QCAlgorithm):
self.algorithm = algorithm
self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR)
def coarse_selection(self, coarse):
"""
Selects stonks, first filter
"""
# Sorts by dollar volume before taking top 200
sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data],
key=lambda x: x.dollar_volume,
reverse=True)
return [x.symbol for x in sorted_by_volume][:200]
def fine_selection(self, fine):
"""
Selects stonks, second filter
"""
filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9]
self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters")
# Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main
ranked_symbols = self.rank_stocks(filtered)
return ranked_symbols
def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days
"""
Calculates the sharpe
"""
try:
# If a KeyValuePair was recieved only take the symbol
if hasattr(symbol, "Key"):
symbol = symbol.Key
history = self.algorithm.history([symbol], period, Resolution.HOUR)
if history.empty:
self.algorithm.debug(f"No history for {symbol.value}")
return None
# Get risk-free rate
rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR)
risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data
# Sharpe ratio logic
returns = history['close'].pct_change().dropna()
excess_returns = returns - (risk_free_rate/1638)
mean_excess_return = excess_returns.mean() * 1638
std_dev = excess_returns.std() * np.sqrt(1638)
return mean_excess_return / std_dev if std_dev != 0 else None
except Exception as e:
self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}")
return None
def rank_stocks(self, symbols):
"""
Ranks da top 50 stocks based on sharpe
"""
if not symbols:
self.algorithm.debug("No symbols to rank")
return []
self.algorithm.debug(f"Ranking {len(symbols)} symbols")
# Converting from key pair if neccessary
symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols]
scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols}
valid_scores = {k: v for k, v in scores.items() if v is not None}
self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}")
if not valid_scores:
return []
sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20]
self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}")
self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}")
return sorted_scores
def normalize_scores(self, scores):
"""
The list of scores from the ranking method are
normalized using a z score so that an additive
operation may be used in WeightCombiner()
"""
values = np.array(list(scores.values()))
mean = np.mean(values)
std_dev = np.std(values)
if std_dev == 0:
# If no variation in scores, assign equal normalized scores
return {symbol: 0 for symbol in scores.keys()}
normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()}
print(normalized_scores) #To see output for debugging
return normalized_scores
from AlgorithmImports import *
class MACDSignalGenerator:
def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05):
self.algorithm = algorithm
self.symbols = symbols
self.cash_buffer = cash_buffer
self.macd_indicators = {} # {symbol: {variant: MACD}}
# Define MACD parameters for different variants
self.macd_variants = {
"slow": {"fast": 12, "slow": 26, "signal": 9},
"slow-med": {"fast": 9, "slow": 19, "signal": 5},
"med-fast": {"fast": 7, "slow": 15, "signal": 3},
"fast": {"fast": 5, "slow": 12, "signal": 2},
}
def remove_symbols(self, symbols: list):
"""
Removes MACD indicators for the specified symbols.
"""
for symbol in symbols:
# Liquidate position before removing indicator
self.algorithm.liquidate(symbol)
# Unregister and delete indicators tied to each symbol
if symbol in self.macd_indicators:
for macd in self.macd_indicators[symbol].values(): # Better: gets MACD objects directly
self.algorithm.unregister_indicator(macd)
del self.macd_indicators[symbol]
def add_symbols(self, new_symbols):
"""
Add in the new symbols that are given by AssetWeightCalculator.
"""
# Log initial attempt
self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}")
# Get historical data for new symbols
history = self.algorithm.history([s for s in new_symbols],
35, # Longest MACD period needed
Resolution.HOUR)
# Log history data availability
self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}")
self.symbols.extend(new_symbols)
for symbol in new_symbols:
security = self.algorithm.securities[symbol]
# Detailed security check logging
# self.algorithm.debug(f"Security {symbol.value} check:"
# f" has_data={security.has_data},"
# f" is_tradable={security.is_tradable},"
# f" price={security.price}")
# Checking if price is 0
if not (security.has_data and security.is_tradable and security.price > 0):
self.algorithm.debug(f"Waiting for valid price data: {symbol.value}")
continue
# Adding the symbol
if symbol not in self.macd_indicators:
self.macd_indicators[symbol] = {}
# Get symbol's historical data
if symbol not in history.index.get_level_values(0):
self.algorithm.debug(f"No history data for: {symbol.value}")
continue
symbol_history = history.loc[symbol]
self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}")
for variant, params in self.macd_variants.items():
macd = self.algorithm.macd(
symbol=symbol,
fast_period=params["fast"],
slow_period=params["slow"],
signal_period=params["signal"],
type=MovingAverageType.EXPONENTIAL,
resolution=Resolution.HOUR,
selector=Field.CLOSE
)
self.macd_indicators[symbol][variant] = macd
# Warm up MACD with historical data
for time, row in symbol_history.iterrows():
macd.update(time, row['close'])
self.macd_indicators[symbol][variant] = macd
def calculate_position_sizes(self):
position_sizes = {}
max_position_limit = 0.1
# Check if we have any symbols to process
if not self.symbols or not self.macd_indicators:
self.algorithm.debug("No symbols available for position calculation")
return position_sizes
# Calculating the maximum one variant can be in size
max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants))
for symbol in self.macd_indicators:
position_sizes[symbol] = {}
for variant, macd in self.macd_indicators[symbol].items():
if macd.is_ready:
security = self.algorithm.securities[symbol]
# Detailed security check logging
# self.algorithm.debug(f"Position Check for {symbol.value}:"
# f" has_data={security.has_data},"
# f" is_tradable={security.is_tradable},"
# f" price={security.price},"
# f" last_data={security.get_last_data() is not None},")
# More comprehensive check
# if not (security.has_data and
# security.is_tradable and
# security.price > 0 and
# security.get_last_data() is not None):
# self.algorithm.debug(f"Security not ready: {symbol.value}")
# continue
# Distance between fast and slow
distance = macd.fast.current.value - macd.slow.current.value
# Normalize the distance as a percentage difference and then as a fraction of max position
position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting
# Only allow positive positions, cap at maximum
position_size = max(0, min(position_size, max_position_limit))
position_sizes[symbol][variant] = position_size
#self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}")
else:
position_sizes[symbol][variant] = 0
# Running daily cause the logging is too heavy hourly
if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0:
rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()]
#self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}")
return position_sizesfrom AlgorithmImports import *
import numpy as np
from datetime import timedelta
class MarketCapWeightedSP500Tracker(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2025, 1, 1)
self.SetCash(1000000)
self.UniverseSettings.Resolution = Resolution.Daily
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
# Add tactical ETFs - much smaller set
self.spxl = self.AddEquity("SPXL", Resolution.Daily).Symbol # 3x S&P 500 bull
self.spxs = self.AddEquity("SPXS", Resolution.Daily).Symbol # 3x S&P 500 bear
# Much smaller tactical allocation - only 8% of portfolio
self.tactical_allocation = 0.08
self.tactical_positions = set()
self.last_tactical_trade = datetime(1900, 1, 1)
# Add volatility tracking
self.spy_volatility_window = RollingWindow[float](20)
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.selected_by_market_cap = []
self.rebalance_flag = False
self.spy_30day_window = RollingWindow[float](30)
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
# Weekly tactical adjustment (not daily - reduces turnover)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 45), # After main rebalance
self.WeeklyTacticalAdjustment)
# Initialize rolling window with historical data
history = self.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
def CoarseSelectionFunction(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
return [x.Symbol for x in sorted_by_cap]
def SetRebalanceFlag(self):
if self.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
def OnData(self, data):
# Update price window
if not data.Bars.ContainsKey(self.spy): return
self.spy_30day_window.Add(data.Bars[self.spy].Close)
# Update volatility tracking
if data.Bars.ContainsKey(self.spy):
current_price = data.Bars[self.spy].Close
if self.spy_volatility_window.Count > 0:
previous_price = self.spy_volatility_window[0]
if previous_price > 0: # Ensure no division by zero
daily_return = (current_price - previous_price) / previous_price
self.spy_volatility_window.Add(daily_return)
else:
# First value should be price
self.spy_volatility_window.Add(current_price)
# Track if any stop-loss was triggered
stop_loss_triggered = False
# Check for stop-loss triggers on all holdings
for kvp in self.Portfolio:
symbol = kvp.Key
holding = kvp.Value
if holding.Invested and symbol != self.bil:
current_price = self.Securities[symbol].Price
if symbol not in self.entry_prices:
self.entry_prices[symbol] = current_price
price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol]
# Slightly more nuanced stop-loss threshold
stop_threshold = 0.05
# For larger positions, use slightly tighter stop-loss to protect portfolio
position_value = holding.HoldingsValue
portfolio_pct = position_value / self.Portfolio.TotalPortfolioValue
if portfolio_pct > 0.08: # If position is over 8% of portfolio
stop_threshold = 0.048 # Slightly tighter stop
if price_drop >= stop_threshold:
stop_loss_triggered = True
# Liquidate the position
self.Liquidate(symbol)
self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.2f}%")
# If any stop-loss was triggered, invest all available cash in BIL
if stop_loss_triggered:
# Calculate total available cash (including unsettled cash from sales)
available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash
if available_cash > 0:
bil_price = self.Securities[self.bil].Price
if bil_price is not None and bil_price > 0:
bil_quantity = available_cash / bil_price
self.MarketOrder(self.bil, bil_quantity)
self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss")
else:
self.Debug("BIL price not available or zero; skipping buy.")
def WeeklyTacticalAdjustment(self):
"""Weekly tactical adjustment that uses a very small portion (8%) of the portfolio"""
# Only calculate market state if we have enough history
if self.spy_30day_window.Count < 30:
return
# Check how far we are from monthly rebalance to avoid conflicting trades
days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999
if days_since_rebalance < 3:
return # Skip if we just rebalanced the main portfolio
# Calculate current market state
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count
# Market state: percentage deviation from 30-day SMA
market_deviation = (spy_price / sma_30) - 1.0
# Calculate market volatility - ensure we have enough values
market_volatility = 0
if self.spy_volatility_window.Count >= 20:
returns = [self.spy_volatility_window[i] for i in range(1, min(20, self.spy_volatility_window.Count))]
if returns:
market_volatility = np.std(returns) * np.sqrt(252) # Annualized
# Close existing tactical positions if conditions have changed
for symbol in list(self.tactical_positions): # Make a copy since we'll modify during iteration
# Liquidate tactical positions that no longer match market conditions
if symbol == self.spxl and market_deviation < 0.02: # No longer in strong bull
self.Liquidate(symbol)
self.tactical_positions.remove(symbol)
self.Debug(f"Tactical: Closed bull position due to changing market conditions")
elif symbol == self.spxs and market_deviation > -0.02: # No longer in strong bear
self.Liquidate(symbol)
self.tactical_positions.remove(symbol)
self.Debug(f"Tactical: Closed bear position due to changing market conditions")
# Calculate portfolio value excluding tactical positions
tactical_value = sum(self.Portfolio[s].HoldingsValue for s in self.tactical_positions
if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested)
core_portfolio_value = self.Portfolio.TotalPortfolioValue - tactical_value
# Strong Bull Market: SPY > 5% above 30-day SMA and previous month has low BIL allocation
if market_deviation > 0.05 and self.previous_bil_allocation < 0.3:
# Avoid if volatility is too high (> 30% annualized)
if market_volatility < 0.30 and self.spxl not in self.tactical_positions:
# Allocate to the 3x bull ETF
target_value = core_portfolio_value * self.tactical_allocation
current_price = self.Securities[self.spxl].Price
if current_price > 0:
shares = target_value / current_price
self.MarketOrder(self.spxl, shares)
self.tactical_positions.add(self.spxl)
self.Debug(f"Tactical: Allocated {self.tactical_allocation*100:.1f}% to SPXL in strong bull market")
# Strong Bear Market: SPY > 5% below 30-day SMA and high BIL allocation
elif market_deviation < -0.05 and self.previous_bil_allocation > 0.4:
# Only in high volatility environments
if market_volatility > 0.20 and self.spxs not in self.tactical_positions:
# Allocate to the 3x bear ETF
target_value = core_portfolio_value * self.tactical_allocation
current_price = self.Securities[self.spxs].Price
if current_price > 0:
shares = target_value / current_price
self.MarketOrder(self.spxs, shares)
self.tactical_positions.add(self.spxs)
self.Debug(f"Tactical: Allocated {self.tactical_allocation*100:.1f}% to SPXS in strong bear market")
self.last_tactical_trade = self.Time
def MonthlyRebalance(self):
if not self.rebalance_flag: return
self.rebalance_flag = False
self.last_rebalance_date = self.Time # Track when we do the monthly rebalance
self.entry_prices.clear() # Reset entry prices at rebalance
if self.spy_30day_window.Count < 30:
self.Debug("Waiting for enough SPY history.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / 30
# Calculate new BIL allocation based on SMA
bil_weight = 0.0
if spy_price < sma_30:
# Original formula with slight adjustment to react faster to deeper drops
base_weight = (sma_30 - spy_price) / sma_30
# Use a slightly more aggressive allocation when significantly below SMA
if base_weight > 0.08: # Slightly earlier trigger (8% below vs 10%)
bil_weight = min(base_weight * 1.2, 1.0) # 20% more aggressive
else:
bil_weight = min(base_weight, 1.0)
# Apply reduction rule from previous month (maintain original approach)
min_bil_allocation = self.previous_bil_allocation * 0.8
bil_weight = max(bil_weight, min_bil_allocation)
# Cap at 80% to ensure some equity exposure remains even in severe downturns
bil_weight = min(bil_weight, 0.8)
# If market is rising strongly, reduce BIL allocation more aggressively
if spy_price > sma_30 * 1.03: # More responsive to uptrends (3% above SMA)
bil_weight = min(bil_weight, self.previous_bil_allocation * 0.75) # Slightly faster reduction
# Add an even stronger reduction in very strong uptrends
if spy_price > sma_30 * 1.08: # Very strong uptrend
bil_weight = min(bil_weight, self.previous_bil_allocation * 0.65) # Much faster reduction
# Tactical positions should not affect normal position sizing
# Calculate core portfolio value excluding tactical positions
tactical_value = sum(self.Portfolio[s].HoldingsValue for s in self.tactical_positions
if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested)
core_portfolio_value = self.Portfolio.TotalPortfolioValue - tactical_value
equity_weight = 1.0 - bil_weight
if not self.selected_by_market_cap:
self.Debug("No stocks selected for investment.")
return
total_market_cap = sum([x[1] for x in self.selected_by_market_cap])
weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in self.selected_by_market_cap}
# Limit individual stock exposure to improve diversification
capped_weights = {}
for symbol, weight in weights.items():
# Slightly higher weight cap for largest companies
market_cap = next((cap for sym, cap in self.selected_by_market_cap if sym == symbol), 0)
if market_cap > 5e11: # Over $500B market cap
capped_weights[symbol] = min(weight, equity_weight * 0.12) # Allow up to 12%
else:
capped_weights[symbol] = min(weight, equity_weight * 0.10) # Standard 10% cap
# Normalize the weights after capping
total_capped_weight = sum(capped_weights.values())
if total_capped_weight > 0:
normalized_weights = {symbol: (weight / total_capped_weight) * equity_weight
for symbol, weight in capped_weights.items()}
else:
normalized_weights = weights
invested = set()
for symbol, weight in normalized_weights.items():
if weight > 0.005: # Small minimum position threshold (0.5%)
self.SetHoldings(symbol, weight)
invested.add(symbol)
self.entry_prices[symbol] = self.Securities[symbol].Price
if bil_weight > 0:
self.SetHoldings(self.bil, bil_weight)
invested.add(self.bil)
else:
self.Liquidate(self.bil)
# Store current BIL allocation for next month's minimum
self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue
self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)")
# Add conditional logging to help track performance
if spy_price < sma_30:
self.Debug(f"Market below 30d SMA by {((sma_30 - spy_price) / sma_30 * 100):0.2f}%")
else:
self.Debug(f"Market above 30d SMA by {((spy_price - sma_30) / sma_30 * 100):0.2f}%")
# Add portfolio statistics to logs
self.Debug(f"Total invested: {self.Portfolio.Invested:0.2f}, Cash: {self.Portfolio.Cash:0.2f}")
# Liquidate positions not in current selection
for kvp in self.Portfolio:
symbol = kvp.Key
if (kvp.Value.Invested and symbol not in invested
and symbol != self.spy and symbol not in self.tactical_positions):
self.Liquidate(symbol)# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from sklearn.preprocessing import RobustScaler
class KQTStrategy:
def __init__(self):
self.model = None
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {}
self.adaptive_threshold = 0.2
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = []
self.defensive_mode = False
self.previous_day_hit_stops = []
self.entry_prices = {}
def create_sliding_sequences(self, df, feature_cols, lookback, stride=1):
X = []
for i in range(0, len(df) - lookback + 1, stride):
X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32))
return np.array(X)
def clip_outliers(self, df, cols, lower=0.01, upper=0.99):
df_copy = df.copy()
for col in cols:
if col in df_copy.columns:
q_low = df_copy[col].quantile(lower)
q_high = df_copy[col].quantile(upper)
df_copy.loc[df_copy[col] < q_low, col] = q_low
df_copy.loc[df_copy[col] > q_high, col] = q_high
return df_copy
def filter_features_to_match_model(self, df, feature_cols, required_count=5):
"""Ensure we have exactly the required number of features"""
if len(feature_cols) == required_count:
return feature_cols
# First, prioritize the lag returns (most important)
lag_features = [col for col in feature_cols if 'return_lag' in col]
# Next, add in the most predictive technical features in a fixed order
tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m',
'oversold', 'overbought', 'roc_diff', 'volatility_regime']
prioritized_features = lag_features.copy()
for feat in tech_priority:
if feat in feature_cols and len(prioritized_features) < required_count:
prioritized_features.append(feat)
# If still not enough, add remaining features
remaining = [col for col in feature_cols if col not in prioritized_features]
while len(prioritized_features) < required_count and remaining:
prioritized_features.append(remaining.pop(0))
# If too many, truncate
return prioritized_features[:required_count]
def add_technical_features(self, df):
if 'Close' not in df.columns:
return df
df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price
df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1
df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal
df['volatility_10'] = df['Close'].pct_change().rolling(10).std()
df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std()
df['roc_5'] = df['Close'].pct_change(5)
df['roc_10'] = df['Close'].pct_change(10)
df['roc_diff'] = df['roc_5'] - df['roc_10']
df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1)
return df.fillna(0)
def add_enhanced_features(self, df):
"""Add enhanced technical features"""
df['volatility_trend'] = df['volatility_10'].pct_change(5)
df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int)
if 'volume' in df.columns:
df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean()
df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5']
df['momentum_1m'] = df['Close'].pct_change(20)
df['momentum_3m'] = df['Close'].pct_change(60)
df['momentum_breadth'] = (
(df['roc_5'] > 0).astype(int) +
(df['momentum_1m'] > 0).astype(int) +
(df['momentum_3m'] > 0).astype(int)
) / 3
df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10']
df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int)
df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int)
df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int)
df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001)
return df
def prepare_stock_data(self, stock_data, ticker, is_training=False):
"""Prepare data for a single stock"""
if len(stock_data) < self.lookback + 5: # Need enough data
return None, None
stock_df = pd.DataFrame({
'Close': stock_data['close'].values,
'time': stock_data['time'].values
})
if 'volume' in stock_data.columns:
stock_df['volume'] = stock_data['volume'].values
stock_df = stock_df.sort_values('time').reset_index(drop=True)
stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100
# In prepare_stock_data, replace the feature cols section with:
feature_cols = []
# Add basic lag features
for i in range(1, 6):
col_name = f'return_lag{i}'
stock_df[col_name] = stock_df['pct_return'].shift(i)
feature_cols.append(col_name)
# Add technical features
stock_df = self.add_technical_features(stock_df)
stock_df = self.add_enhanced_features(stock_df)
# Add all potential features
additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20']
enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m',
'momentum_breadth', 'mean_rev_signal', 'oversold',
'overbought', 'regime_change', 'risk_adj_momentum']
for col in additional_features + enhanced_features:
if col in stock_df.columns:
feature_cols.append(col)
# Filter to the exact number of features expected by the model
model_feature_count = 5 # Use the exact count from your model
feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count)
if not self.feature_cols:
self.feature_cols = feature_cols.copy()
stock_df = stock_df.dropna().reset_index(drop=True)
# Handle outliers
stock_df = self.clip_outliers(stock_df, feature_cols)
# Replace the scaling code in prepare_stock_data with this:
# Scale features
if ticker not in self.scalers or is_training:
# Check if we have data
if len(stock_df) == 0 or len(feature_cols) == 0:
return None, stock_df # Return early if no data
# Check if any features are empty/nan
if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty:
# Fill NaNs with zeros
stock_df[feature_cols] = stock_df[feature_cols].fillna(0)
# Ensure we have data
if len(stock_df[feature_cols]) > 0:
try:
scaler = RobustScaler()
stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols])
self.scalers[ticker] = scaler
except Exception as e:
print(f"Scaling error for {ticker}: {str(e)}")
# Use a simple standardization as fallback
for col in feature_cols:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
else:
return None, stock_df # Return early if empty after processing
else:
# Use existing scaler
scaler = self.scalers[ticker]
try:
stock_df[feature_cols] = scaler.transform(stock_df[feature_cols])
except Exception as e:
print(f"Transform error for {ticker}: {str(e)}")
# Simple standardization fallback
for col in feature_cols:
if col in stock_df.columns and len(stock_df[col]) > 0:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
# Create sequences for prediction
X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1)
if len(X) == 0:
return None, stock_df
return X, stock_df
# Add to strategy.py in KQTStrategy class
def calculate_portfolio_risk_score(self, market_returns):
"""Calculate a portfolio risk score (0-100) to scale overall exposure"""
risk_score = 50 # Neutral starting point
# VIX-like volatility measurement using SPY returns
if len(market_returns) >= 5:
recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized
longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol
# Volatility spike detection
vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1
if vol_ratio > 1.5: # Sharp volatility increase
risk_score -= 30
elif vol_ratio > 1.2:
risk_score -= 15
# Consecutive negative days
if len(market_returns) >= 3:
neg_days = sum(1 for r in market_returns[-3:] if r < 0)
if neg_days == 3: # Three consecutive down days
risk_score -= 20
elif neg_days == 2:
risk_score -= 10
# Trend direction
if len(market_returns) >= 10:
avg_recent = np.mean(market_returns[-5:])
avg_older = np.mean(market_returns[-10:-5])
trend_change = avg_recent - avg_older
# Declining trend
if trend_change < -0.3:
risk_score -= 15
# Accelerating uptrend
elif trend_change > 0.3 and avg_recent > 0:
risk_score += 10
return max(10, min(100, risk_score)) # Constrain between 10-100
def predict_returns(self, X, ticker):
"""Make predictions for a single stock"""
if self.model is None:
return 0
if ticker not in self.stock_to_id:
self.stock_to_id[ticker] = len(self.stock_to_id)
stock_id = self.stock_to_id[ticker]
try:
X_tensor = torch.tensor(X, dtype=torch.float32)
stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long)
with torch.no_grad():
predictions = self.model(X_tensor, stock_ids)
# Convert to standard Python float for safety
return float(predictions.detach().numpy().flatten()[-1])
except Exception as e:
print(f"Prediction error for {ticker}: {e}")
return 0 # Return neutral prediction on error
def adjust_for_market_conditions(self, positions, spy_sma_signal, defensive_mode):
"""Adjust positions based on market conditions from SPY moving average"""
adjusted_positions = positions.copy()
# If we're below SPY SMA, reduce position sizing
if spy_sma_signal < 0 or defensive_mode:
# Calculate how far below SMA we are (0 to 1 scale)
reduction_factor = min(0.5, max(0.2, abs(spy_sma_signal))) if spy_sma_signal < 0 else 0.25
# Reduce all position sizes
for ticker in adjusted_positions:
adjusted_positions[ticker] *= (1 - reduction_factor)
return adjusted_positions
def update_stopped_out_tickers(self, stopped_out_today):
"""Update the list of recently stopped out tickers"""
# Add new stopped out tickers to the list
self.previous_day_hit_stops.extend(stopped_out_today)
# Keep only the most recent 10 stopped out tickers
if len(self.previous_day_hit_stops) > 10:
self.previous_day_hit_stops = self.previous_day_hit_stops[-10:]
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
# Use fixed 5% stop-loss from Drawdownminimize.py
return -5.0
def check_stop_losses(self, symbol_data):
"""Check if any positions should be stopped out based on 5% drop from entry"""
# This would be called from the algorithm with current price data
# Return list of tickers that hit stop-loss
stopped_out = []
for symbol, price_data in symbol_data.items():
if symbol in self.entry_prices:
entry_price = self.entry_prices[symbol]
current_price = price_data['current_price']
# 5% drop from entry
if (entry_price - current_price) / entry_price >= 0.05:
stopped_out.append(symbol)
return stopped_out
def detect_market_regime(self, daily_returns, lookback=10, spy_ma_signal=0):
"""Detect current market regime based on portfolio returns and SPY MA"""
# Make regime detection more sensitive to negative SPY MA signals
# CRITICAL CHANGE: Immediately shift to bearish on SPY below MA
if spy_ma_signal < 0:
# How far below MA determines the severity
if spy_ma_signal < -0.05: # More than 5% below MA
return "bearish_high_vol"
elif spy_ma_signal < -0.02: # More than 2% below MA
return "bearish"
else:
# Slight MA crossover shifts neutral regimes bearish and bullish to neutral
if self.current_regime == "bullish":
return "neutral"
elif self.current_regime == "neutral":
return "bearish_low_vol"
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
# Incorporate SPY moving average signal (negative when below MA)
if spy_ma_signal < -0.02: # SPY at least 2% below 30-day MA
if self.current_regime == "neutral":
return "bearish"
elif self.current_regime in ["bullish", "bullish_pullback"]:
return "neutral"
elif self.current_regime == "bearish":
return "bearish_high_vol" # Increase bearish conviction
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(self.portfolio_returns[-3:])
less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
if len(self.portfolio_returns) >= 5:
recent_portfolio_returns = self.portfolio_returns[-5:]
pos_days = sum(1 for r in recent_portfolio_returns if r > 0)
neg_days = sum(1 for r in recent_portfolio_returns if r < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(self.portfolio_returns) >= 10:
recent_vol = np.std(self.portfolio_returns[-5:])
older_vol = np.std(self.portfolio_returns[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(self.portfolio_returns) >= 5:
if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None, spy_sma_signal=0):
"""Generate position sizing based on predictions with improved defensive capabilities"""
if not prediction_data:
return {}
# Update market regime - now including SPY MA signal in the detection
if current_returns is not None:
self.current_regime = self.detect_market_regime(current_returns, 10, spy_sma_signal)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
# Set defensive mode if bearish signals or below SPY MA
self.defensive_mode = (bearish_count >= 1 or # Reduced from 2 to 1
bearish_strength > 0.3 or # Reduced from 0.5 to 0.3
spy_sma_signal < 0) # ANY negative signal from MA
# Calculate portfolio risk score (0-100)
portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else [])
# Convert to a scaling factor (0.1 to 1.0)
risk_scaling = portfolio_risk_score / 100
# Adjust threshold based on regime
base_threshold = 0.25 * self.pred_std
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
self.adaptive_threshold = base_threshold * 0.4
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
self.adaptive_threshold = base_threshold * 2.5
elif self.current_regime in ["bearish", "bearish_low_vol"]:
self.adaptive_threshold = base_threshold * 1.6
elif self.current_regime in ["bullish_pullback"]:
self.adaptive_threshold = base_threshold * 0.9
else: # neutral or other regimes
self.adaptive_threshold = base_threshold * 0.75
positions = {}
# Group stocks by sector
sector_data = {}
for ticker, data in prediction_data.items():
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
if sector not in sector_data:
sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return,
"composite_score": pred_return / self.adaptive_threshold
})
# Rank sectors by predicted return
sector_avg_scores = {}
for sector, stocks in sector_data.items():
sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks])
# CHANGE: Include more sectors (3-4 instead of just 2)
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
top_sector_count = 3 if portfolio_risk_score > 60 else 2 # More diversification in lower risk periods
top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))]
# CHANGE: Allow more stocks per sector in bull markets
stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2
# Allocate within top sectors - focus on stocks with strongest signals
for sector in top_sectors:
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
# Take top N stocks in each selected sector
top_stocks = sector_stocks[:min(stocks_per_sector, len(sector_stocks))]
# CHANGE: Make position size proportional to signal strength but limited by volatility
for stock in top_stocks:
ticker = stock["ticker"]
signal_strength = stock["pred_return"] / (0.2 * self.pred_std)
# Base size calculation
base_size = min(0.3, max(0.05, 0.15 * signal_strength))
# Scale by portfolio risk
final_size = base_size * risk_scaling
positions[ticker] = final_size
# Apply unified defensive logic at the end (remove redundant sections)
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"] or spy_sma_signal < 0:
# STRONGER SCALING: Adjust scaling based on severity
if spy_sma_signal < -0.05 or self.current_regime == "bearish_high_vol":
scaling_factor = 0.3 # Reduced from 0.4 to 0.3
elif spy_sma_signal < -0.02 or self.current_regime == "bearish_low_vol":
scaling_factor = 0.5 # Reduced from 0.6 to 0.5
else:
scaling_factor = 0.7
for ticker in positions:
positions[ticker] *= scaling_factor
# Remove tickers that recently hit stop losses
for ticker in list(positions.keys()):
if ticker in self.previous_day_hit_stops:
positions.pop(ticker, None)
return positions
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
if len(self.portfolio_returns) > 60: # Keep a rolling window
self.portfolio_returns = self.portfolio_returns[-60:]
def update_model_calibration(self, all_predictions):
"""Update prediction standard deviation for threshold calibration"""
all_pred_values = [p for p in all_predictions.values()]
if len(all_pred_values) > 5:
self.pred_std = np.std(all_pred_values)
def calculate_bil_allocation(self, spy_price, sma_30):
"""Calculate BIL allocation based on SPY price and SMA."""
bill_drop_intensity = (sma_30 - spy_price) / sma_30
if spy_price < sma_30:
bil_weight = min(bill_drop_intensity * 1.5, 1.0) # Stronger response
if bill_drop_intensity > 0.05: # SPY >5% below MA
bil_weight = min(0.7, bil_weight * 1.3) # Even stronger response
else:
bil_weight = 0.0
return bil_weight