| Overall Statistics |
|
Total Orders 5798 Average Win 0.43% Average Loss -0.67% Compounding Annual Return 40.508% Drawdown 43.300% Expectancy 0.301 Start Equity 1000000 End Equity 7663994.39 Net Profit 666.399% Sharpe Ratio 1.025 Sortino Ratio 1.134 Probabilistic Sharpe Ratio 47.001% Loss Rate 21% Win Rate 79% Profit-Loss Ratio 0.64 Alpha 0.122 Beta 1.535 Annual Standard Deviation 0.282 Annual Variance 0.08 Information Ratio 1.179 Tracking Error 0.153 Treynor Ratio 0.188 Total Fees $13482.43 Estimated Strategy Capacity $0 Lowest Capacity Asset NB R735QTJ8XC9X Portfolio Turnover 2.76% |
# region imports
from AlgorithmImports import *
# endregion
# QuantConnect Research Notebook Environment
from QuantConnect import *
from QuantConnect.Data.Market import *
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
import math
import gc
import os
# Add memory management
def free_memory():
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
# Set seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
class DirectionalAccuracyLoss(nn.Module):
def __init__(self, alpha=0.5):
super().__init__()
self.alpha = alpha
self.mse = nn.MSELoss()
def forward(self, pred, target):
mse_loss = self.mse(pred, target)
direction_loss = torch.mean((torch.sign(pred) != torch.sign(target)).float())
return self.alpha * mse_loss + (1 - self.alpha) * direction_loss
class QuaternionLinear(nn.Module):
def __init__(self, in_features, out_features, bias=True):
super().__init__()
assert in_features % 4 == 0, f"in_features must be divisible by 4, got {in_features}"
real_in_features = in_features // 4
real_out_features = out_features // 4 if out_features % 4 == 0 else out_features
self.r_weight = nn.Parameter(torch.Tensor(real_out_features, real_in_features))
self.i_weight = nn.Parameter(torch.Tensor(real_out_features, real_in_features))
self.j_weight = nn.Parameter(torch.Tensor(real_out_features, real_in_features))
self.k_weight = nn.Parameter(torch.Tensor(real_out_features, real_in_features))
if bias:
self.r_bias = nn.Parameter(torch.Tensor(real_out_features))
self.i_bias = nn.Parameter(torch.Tensor(real_out_features))
self.j_bias = nn.Parameter(torch.Tensor(real_out_features))
self.k_bias = nn.Parameter(torch.Tensor(real_out_features))
else:
self.register_parameter('r_bias', None)
self.register_parameter('i_bias', None)
self.register_parameter('j_bias', None)
self.register_parameter('k_bias', None)
self.bias = bias
self.real_out_features = real_out_features
self.out_features = real_out_features * 4 if out_features % 4 == 0 else out_features
self.reset_parameters()
def reset_parameters(self):
nn.init.kaiming_uniform_(self.r_weight, a=math.sqrt(5))
nn.init.kaiming_uniform_(self.i_weight, a=math.sqrt(5))
nn.init.kaiming_uniform_(self.j_weight, a=math.sqrt(5))
nn.init.kaiming_uniform_(self.k_weight, a=math.sqrt(5))
if self.bias:
fan_in = self.r_weight.size(1)
bound = 1 / math.sqrt(fan_in)
nn.init.uniform_(self.r_bias, -bound, bound)
nn.init.uniform_(self.i_bias, -bound, bound)
nn.init.uniform_(self.j_bias, -bound, bound)
nn.init.uniform_(self.k_bias, -bound, bound)
def forward(self, input):
batch_dims = input.shape[:-1]
input_reshaped = input.reshape(-1, input.shape[-1])
r, i, j, k = torch.chunk(input_reshaped, 4, dim=-1)
out_r = F.linear(r, self.r_weight) - F.linear(i, self.i_weight) - \
F.linear(j, self.j_weight) - F.linear(k, self.k_weight)
out_i = F.linear(r, self.i_weight) + F.linear(i, self.r_weight) + \
F.linear(j, self.k_weight) - F.linear(k, self.j_weight)
out_j = F.linear(r, self.j_weight) - F.linear(i, self.k_weight) + \
F.linear(j, self.r_weight) + F.linear(k, self.i_weight)
out_k = F.linear(r, self.k_weight) + F.linear(i, self.j_weight) - \
F.linear(j, self.i_weight) + F.linear(k, self.r_weight)
if self.bias:
out_r += self.r_bias
out_i += self.i_bias
out_j += self.j_bias
out_k += self.k_bias
out = torch.cat([out_r, out_i, out_j, out_k], dim=-1)
if self.out_features != self.real_out_features * 4:
out = out[:, :self.out_features]
return out.reshape(*batch_dims, -1)
# KoopmanOperator (copied from your code)
class KoopmanOperator(nn.Module):
def __init__(self, state_dim, koopman_dim):
super().__init__()
self.state_dim = state_dim
self.padded_state_dim = ((state_dim + 3) // 4) * 4
self.koopman_dim = koopman_dim
self.padded_koopman_dim = ((koopman_dim + 3) // 4) * 4
self.encoder = nn.Sequential(
QuaternionLinear(self.padded_state_dim, self.padded_koopman_dim * 2),
nn.LeakyReLU(0.2),
QuaternionLinear(self.padded_koopman_dim * 2, self.padded_koopman_dim)
)
self.koopman_matrix = nn.Parameter(torch.Tensor(self.padded_koopman_dim, self.padded_koopman_dim))
self.decoder = nn.Sequential(
QuaternionLinear(self.padded_koopman_dim, self.padded_koopman_dim * 2),
nn.LeakyReLU(0.2),
QuaternionLinear(self.padded_koopman_dim * 2, self.padded_state_dim)
)
nn.init.eye_(self.koopman_matrix)
self.koopman_matrix.data += 0.01 * torch.randn_like(self.koopman_matrix)
def forward(self, x, steps=1):
orig_shape = x.shape
batch_dims = orig_shape[:-1]
if self.state_dim != self.padded_state_dim:
padding = torch.zeros(*batch_dims, self.padded_state_dim - self.state_dim,
device=x.device, dtype=x.dtype)
x_padded = torch.cat([x, padding], dim=-1)
else:
x_padded = x
if len(batch_dims) > 1:
x_padded = x_padded.reshape(-1, self.padded_state_dim)
g = self.encoder(x_padded)
if steps == 1:
g_next = torch.matmul(g, self.koopman_matrix)
else:
koopman_power = torch.matrix_power(self.koopman_matrix, steps)
g_next = torch.matmul(g, koopman_power)
x_pred = self.decoder(g_next)
if self.state_dim != self.padded_state_dim:
x_pred = x_pred[..., :self.state_dim]
if len(batch_dims) > 1:
x_pred = x_pred.reshape(*batch_dims, -1)
g = g.reshape(*batch_dims, -1)
g_next = g_next.reshape(*batch_dims, -1)
return x_pred, g, g_next
# Modify the KQT class to implement cross-sequence attention
class CrossStockAttention(nn.Module):
def __init__(self, hidden_size, dropout=0.1):
super().__init__()
self.hidden_size = hidden_size
self.query = nn.Linear(hidden_size, hidden_size)
self.key = nn.Linear(hidden_size, hidden_size)
self.value = nn.Linear(hidden_size, hidden_size)
self.output = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(dropout)
self.norm = nn.LayerNorm(hidden_size)
self.scaling = hidden_size ** -0.5
def forward(self, x, stock_ids):
batch_size, seq_len, _ = x.shape
last_states = x[:, -1, :] # [batch_size, hidden_size]
q = self.query(last_states)
k = self.key(last_states)
v = self.value(last_states)
mask = stock_ids.unsqueeze(1) == stock_ids.unsqueeze(0)
mask = mask.float().to(x.device)
# Compute attention scores with masking
scores = torch.matmul(q, k.transpose(0, 1)) * self.scaling
scores = scores * mask + -1e9 * (1 - mask) # Apply mask
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
context = torch.matmul(attn_weights, v)
output = self.output(context)
output = self.norm(output + last_states)
return output
# Update your EnhancedKQT class definition by adding this line in the __init__ method
class EnhancedKQT(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dropout=0.1, nhead=4):
super().__init__()
self.input_size = input_size
# Use smaller hidden dimensions
self.hidden_size = hidden_size * 4 # Changed from 6
# CHANGE THIS LINE to match your trained model
self.koopman_dim = max(1024, self.hidden_size // 4) # Match trained model
# Simpler embedding
self.embedding = nn.Sequential(
nn.Linear(input_size, self.hidden_size),
nn.GELU(),
nn.Dropout(dropout)
)
# ADD THIS LINE - Create positional encoding parameter
self.pos_encoding = nn.Parameter(torch.randn(1, 32, self.hidden_size) * 0.02)
# Smaller transformer
self.transformer_layers = nn.ModuleList([
TransformerEncoderLayer(self.hidden_size, nhead, self.hidden_size, dropout)
for _ in range(1) # Only use 1 layer
])
self.cross_attn = CrossStockAttention(self.hidden_size, dropout)
self.koopman = KoopmanOperator(self.hidden_size, self.koopman_dim)
self.output_proj = nn.Linear(self.hidden_size, 1)
def forward(self, x, stock_ids=None):
batch_size, seq_len, _ = x.shape
# Simpler embedding
x = self.embedding(x)
# Add positional encoding
pos_enc = self.pos_encoding
if seq_len > pos_enc.size(1):
repeat_factor = (seq_len // pos_enc.size(1)) + 1
pos_enc = pos_enc.repeat(1, repeat_factor, 1)[:, :seq_len, :]
else:
pos_enc = pos_enc[:, :seq_len, :]
x = x + pos_enc
# Apply transformer layers
for layer in self.transformer_layers:
x = layer(x)
# Get final sequence representation
last_hidden = x[:, -1, :]
# Apply cross-stock attention if stock_ids are provided
if stock_ids is not None:
cross_attended = self.cross_attn(x, stock_ids)
# Weighted combination instead of simple addition
alpha = 0.7 # Weight for original representation
last_hidden = alpha * last_hidden + (1-alpha) * cross_attended
# Apply Koopman operator (standard, not multi-scale)
pred_state, _, _ = self.koopman(last_hidden)
# Project to output
return self.output_proj(pred_state)
class QuaternionAttention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
self.embed_dim = embed_dim
self.padded_embed_dim = ((embed_dim + 3) // 4) * 4
self.num_heads = num_heads
self.head_dim = (self.padded_embed_dim // (num_heads * 4)) * 4 or 4
self.qk_dim = self.head_dim * num_heads
self.v_dim = self.padded_embed_dim
self.q_proj = QuaternionLinear(self.padded_embed_dim, self.qk_dim)
self.k_proj = QuaternionLinear(self.padded_embed_dim, self.qk_dim)
self.v_proj = QuaternionLinear(self.padded_embed_dim, self.v_dim)
self.out_proj = nn.Linear(self.v_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
self.scaling = self.head_dim ** -0.5
def forward(self, query, key=None, value=None, attn_mask=None):
if key is None:
key = query
if value is None:
value = query
batch_size, seq_len, _ = query.shape
if query.size(-1) != self.padded_embed_dim:
padding = torch.zeros(batch_size, seq_len,
self.padded_embed_dim - query.size(-1),
device=query.device, dtype=query.dtype)
query_padded = torch.cat([query, padding], dim=-1)
key_padded = torch.cat([key, padding], dim=-1)
value_padded = torch.cat([value, padding], dim=-1)
else:
query_padded, key_padded, value_padded = query, key, value
q = self.q_proj(query_padded).view(batch_size, seq_len, self.num_heads, -1).transpose(1, 2)
k = self.k_proj(key_padded).view(batch_size, seq_len, self.num_heads, -1).transpose(1, 2)
v = self.v_proj(value_padded).view(batch_size, seq_len, self.num_heads, -1).transpose(1, 2)
scores = torch.matmul(q, k.transpose(-2, -1)) * self.scaling
if attn_mask is not None:
scores = scores.masked_fill(attn_mask, float('-inf'))
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
output = torch.matmul(attn_weights, v).transpose(1, 2).contiguous().view(batch_size, seq_len, -1)
return self.out_proj(output)
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
super().__init__()
self.self_attn = QuaternionAttention(d_model, nhead, dropout)
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = nn.GELU()
def forward(self, src, src_mask=None):
src2 = self.self_attn(src)
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
return self.norm2(src)
class SectorAwareCrossAttention(nn.Module):
def __init__(self, hidden_size, dropout=0.1):
super().__init__()
self.hidden_size = hidden_size
self.query = nn.Linear(hidden_size, hidden_size)
self.key = nn.Linear(hidden_size, hidden_size)
self.value = nn.Linear(hidden_size, hidden_size)
self.output = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(dropout)
self.norm = nn.LayerNorm(hidden_size)
self.scaling = hidden_size ** -0.5
def forward(self, x, stock_ids):
batch_size, seq_len, _ = x.shape
last_states = x[:, -1, :] # [batch_size, hidden_size]
q = self.query(last_states)
k = self.key(last_states)
v = self.value(last_states)
scores = torch.matmul(q, k.transpose(0, 1)) * self.scaling
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
context = torch.matmul(attn_weights, v)
output = self.output(context)
output = self.norm(output + last_states)
return output
class MultiScaleKoopmanOperator(nn.Module):
def __init__(self, state_dim, koopman_dim):
super().__init__()
self.state_dim = state_dim
self.padded_state_dim = ((state_dim + 3) // 4) * 4
self.koopman_dim = koopman_dim
self.padded_koopman_dim = ((koopman_dim + 3) // 4) * 4
self.encoder = nn.Sequential(
QuaternionLinear(self.padded_state_dim, self.padded_koopman_dim * 2),
nn.LeakyReLU(0.2),
nn.LayerNorm(self.padded_koopman_dim * 2),
QuaternionLinear(self.padded_koopman_dim * 2, self.padded_koopman_dim)
)
self.koopman_matrix_fast = nn.Parameter(torch.Tensor(self.padded_koopman_dim, self.padded_koopman_dim))
self.koopman_matrix_medium = nn.Parameter(torch.Tensor(self.padded_koopman_dim, self.padded_koopman_dim))
self.koopman_matrix_slow = nn.Parameter(torch.Tensor(self.padded_koopman_dim, self.padded_koopman_dim))
self.decoder = nn.Sequential(
QuaternionLinear(self.padded_koopman_dim, self.padded_koopman_dim * 2),
nn.LeakyReLU(0.2),
nn.LayerNorm(self.padded_koopman_dim * 2),
QuaternionLinear(self.padded_koopman_dim * 2, self.padded_state_dim)
)
nn.init.eye_(self.koopman_matrix_fast)
nn.init.eye_(self.koopman_matrix_medium)
nn.init.eye_(self.koopman_matrix_slow)
self.koopman_matrix_fast.data += 0.015 * torch.randn_like(self.koopman_matrix_fast)
self.koopman_matrix_medium.data += 0.01 * torch.randn_like(self.koopman_matrix_medium)
self.koopman_matrix_slow.data += 0.005 * torch.randn_like(self.koopman_matrix_slow)
self.timescale_blend = nn.Parameter(torch.ones(3) / 3)
def forward(self, x, steps=1):
orig_shape = x.shape
batch_dims = orig_shape[:-1]
# Pad input if needed
if self.state_dim != self.padded_state_dim:
padding = torch.zeros(*batch_dims, self.padded_state_dim - self.state_dim,
device=x.device, dtype=x.dtype)
x_padded = torch.cat([x, padding], dim=-1)
else:
x_padded = x
if len(batch_dims) > 1:
x_padded = x_padded.reshape(-1, self.padded_state_dim)
g = self.encoder(x_padded)
if steps == 1:
g_next_fast = torch.matmul(g, self.koopman_matrix_fast)
g_next_medium = torch.matmul(g, self.koopman_matrix_medium)
g_next_slow = torch.matmul(g, self.koopman_matrix_slow)
else:
koopman_power_fast = torch.matrix_power(self.koopman_matrix_fast, steps)
koopman_power_medium = torch.matrix_power(self.koopman_matrix_medium, max(1, steps // 2))
koopman_power_slow = torch.matrix_power(self.koopman_matrix_slow, max(1, steps // 4))
g_next_fast = torch.matmul(g, koopman_power_fast)
g_next_medium = torch.matmul(g, koopman_power_medium)
g_next_slow = torch.matmul(g, koopman_power_slow)
blend_weights = F.softmax(self.timescale_blend, dim=0)
g_next = blend_weights[0] * g_next_fast + blend_weights[1] * g_next_medium + blend_weights[2] * g_next_slow
x_pred = self.decoder(g_next)
if self.state_dim != self.padded_state_dim:
x_pred = x_pred[..., :self.state_dim]
if len(batch_dims) > 1:
x_pred = x_pred.reshape(*batch_dims, -1)
g = g.reshape(*batch_dims, -1)
g_next = g_next.reshape(*batch_dims, -1)
return x_pred, g, g_next
# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from sklearn.preprocessing import RobustScaler
from classes import DirectionalAccuracyLoss, QuaternionLinear, KoopmanOperator, EnhancedKQT, TransformerEncoderLayer, SectorAwareCrossAttention, CrossStockAttention, QuaternionAttention, MultiScaleKoopmanOperator
class KQTStrategy:
def __init__(self):
self.model = None
self.lookback = 30
self.scalers = {}
self.feature_cols = []
self.stock_to_id = {}
self.sector_mappings = {
"AAPL": "Technology", "MSFT": "Technology", "NVDA": "Technology",
"GOOGL": "Technology", "GOOG": "Technology", "META": "Technology",
"CSCO": "Technology", "ADBE": "Technology", "INTC": "Technology",
"AMD": "Technology", "ORCL": "Technology", "IBM": "Technology",
"QCOM": "Technology", "AMAT": "Technology", "CRM": "Technology",
"AMZN": "Consumer", "TSLA": "Consumer", "HD": "Consumer",
"MCD": "Consumer", "SBUX": "Consumer", "NKE": "Consumer",
"WMT": "Consumer", "COST": "Consumer", "TJX": "Consumer",
"LOW": "Consumer", "DIS": "Consumer", "NFLX": "Consumer",
# Financial
"JPM": "Financial", "BRK-B": "Financial", "V": "Financial",
"MA": "Financial", "BAC": "Financial", "GS": "Financial",
"WFC": "Financial", "AXP": "Financial", "MS": "Financial",
"BLK": "Financial", "C": "Financial",
# Healthcare
"UNH": "Healthcare", "JNJ": "Healthcare", "LLY": "Healthcare",
"PFE": "Healthcare", "MRK": "Healthcare", "ABBV": "Healthcare",
"ABT": "Healthcare", "TMO": "Healthcare", "MDT": "Healthcare",
"BMY": "Healthcare", "ISRG": "Healthcare", "REGN": "Healthcare",
# Energy & Industrial
"XOM": "Energy", "CVX": "Energy", "COP": "Energy",
"AVGO": "Industrial", "HON": "Industrial", "UPS": "Industrial",
"CAT": "Industrial", "DE": "Industrial", "BA": "Industrial",
"UNP": "Industrial", "RTX": "Industrial", "GE": "Industrial",
# Other sectors
"PG": "Consumer Staples", "KO": "Consumer Staples", "PEP": "Consumer Staples",
}
self.adaptive_threshold = 0.2
self.pred_std = 1.0
self.current_regime = "neutral"
self.portfolio_returns = []
self.defensive_mode = False
self.previous_day_hit_stops = []
def initialize_model(self, input_size, hidden_size=3, num_layers=2, dropout=0.25, n_heads=6):
self.model = EnhancedKQT(input_size, hidden_size, num_layers, dropout, n_heads)
def init_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
self.model.apply(init_weights)
self.model.eval() # Set to evaluation mode for inference
return self.model
def load_model_weights(self, weights_path):
if self.model is None:
raise ValueError("Model must be initialized before loading weights")
try:
self.model.load_state_dict(torch.load(weights_path))
self.model.eval()
return True
except Exception as e:
print(f"Error loading model weights: {e}")
return False
def save_model_weights(self, weights_path):
if self.model is None:
raise ValueError("Model not initialized, cannot save weights")
try:
torch.save(self.model.state_dict(), weights_path)
return True
except Exception as e:
print(f"Error saving model weights: {e}")
return False
def create_sliding_sequences(self, df, feature_cols, lookback, stride=1):
X = []
for i in range(0, len(df) - lookback + 1, stride):
X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32))
return np.array(X)
def clip_outliers(self, df, cols, lower=0.01, upper=0.99):
df_copy = df.copy()
for col in cols:
if col in df_copy.columns:
q_low = df_copy[col].quantile(lower)
q_high = df_copy[col].quantile(upper)
df_copy.loc[df_copy[col] < q_low, col] = q_low
df_copy.loc[df_copy[col] > q_high, col] = q_high
return df_copy
def filter_features_to_match_model(self, df, feature_cols, required_count=5):
"""Ensure we have exactly the required number of features"""
if len(feature_cols) == required_count:
return feature_cols
# First, prioritize the lag returns (most important)
lag_features = [col for col in feature_cols if 'return_lag' in col]
# Next, add in the most predictive technical features in a fixed order
tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m',
'oversold', 'overbought', 'roc_diff', 'volatility_regime']
prioritized_features = lag_features.copy()
for feat in tech_priority:
if feat in feature_cols and len(prioritized_features) < required_count:
prioritized_features.append(feat)
# If still not enough, add remaining features
remaining = [col for col in feature_cols if col not in prioritized_features]
while len(prioritized_features) < required_count and remaining:
prioritized_features.append(remaining.pop(0))
# If too many, truncate
return prioritized_features[:required_count]
def add_technical_features(self, df):
if 'Close' not in df.columns:
return df
df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price
df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1
df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal
df['volatility_10'] = df['Close'].pct_change().rolling(10).std()
df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std()
df['roc_5'] = df['Close'].pct_change(5)
df['roc_10'] = df['Close'].pct_change(10)
df['roc_diff'] = df['roc_5'] - df['roc_10']
df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1)
return df.fillna(0)
def add_enhanced_features(self, df):
"""Add enhanced technical features"""
df['volatility_trend'] = df['volatility_10'].pct_change(5)
df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int)
if 'volume' in df.columns:
df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean()
df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5']
df['momentum_1m'] = df['Close'].pct_change(20)
df['momentum_3m'] = df['Close'].pct_change(60)
df['momentum_breadth'] = (
(df['roc_5'] > 0).astype(int) +
(df['momentum_1m'] > 0).astype(int) +
(df['momentum_3m'] > 0).astype(int)
) / 3
df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10']
df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int)
df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int)
df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int)
df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001)
return df
def prepare_stock_data(self, stock_data, ticker, is_training=False):
"""Prepare data for a single stock"""
if len(stock_data) < self.lookback + 5: # Need enough data
return None, None
stock_df = pd.DataFrame({
'Close': stock_data['close'].values,
'time': stock_data['time'].values
})
if 'volume' in stock_data.columns:
stock_df['volume'] = stock_data['volume'].values
stock_df = stock_df.sort_values('time').reset_index(drop=True)
stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100
# In prepare_stock_data, replace the feature cols section with:
feature_cols = []
# Add basic lag features
for i in range(1, 6):
col_name = f'return_lag{i}'
stock_df[col_name] = stock_df['pct_return'].shift(i)
feature_cols.append(col_name)
# Add technical features
stock_df = self.add_technical_features(stock_df)
stock_df = self.add_enhanced_features(stock_df)
# Add all potential features
additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20']
enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m',
'momentum_breadth', 'mean_rev_signal', 'oversold',
'overbought', 'regime_change', 'risk_adj_momentum']
for col in additional_features + enhanced_features:
if col in stock_df.columns:
feature_cols.append(col)
# Filter to the exact number of features expected by the model
model_feature_count = 5 # Use the exact count from your model
feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count)
if not self.feature_cols:
self.feature_cols = feature_cols.copy()
stock_df = stock_df.dropna().reset_index(drop=True)
# Handle outliers
stock_df = self.clip_outliers(stock_df, feature_cols)
# Replace the scaling code in prepare_stock_data with this:
# Scale features
if ticker not in self.scalers or is_training:
# Check if we have data
if len(stock_df) == 0 or len(feature_cols) == 0:
return None, stock_df # Return early if no data
# Check if any features are empty/nan
if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty:
# Fill NaNs with zeros
stock_df[feature_cols] = stock_df[feature_cols].fillna(0)
# Ensure we have data
if len(stock_df[feature_cols]) > 0:
try:
scaler = RobustScaler()
stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols])
self.scalers[ticker] = scaler
except Exception as e:
print(f"Scaling error for {ticker}: {str(e)}")
# Use a simple standardization as fallback
for col in feature_cols:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
else:
return None, stock_df # Return early if empty after processing
else:
# Use existing scaler
scaler = self.scalers[ticker]
try:
stock_df[feature_cols] = scaler.transform(stock_df[feature_cols])
except Exception as e:
print(f"Transform error for {ticker}: {str(e)}")
# Simple standardization fallback
for col in feature_cols:
if col in stock_df.columns and len(stock_df[col]) > 0:
mean = stock_df[col].mean()
std = stock_df[col].std()
if std > 0:
stock_df[col] = (stock_df[col] - mean) / std
else:
stock_df[col] = 0
# Create sequences for prediction
X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1)
if len(X) == 0:
return None, stock_df
return X, stock_df
def predict_returns(self, X, ticker):
"""Make predictions for a single stock"""
if self.model is None:
return 0
if ticker not in self.stock_to_id:
self.stock_to_id[ticker] = len(self.stock_to_id)
stock_id = self.stock_to_id[ticker]
try:
X_tensor = torch.tensor(X, dtype=torch.float32)
stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long)
with torch.no_grad():
predictions = self.model(X_tensor, stock_ids)
# Convert to standard Python float for safety
return float(predictions.detach().numpy().flatten()[-1])
except Exception as e:
print(f"Prediction error for {ticker}: {e}")
return 0 # Return neutral prediction on error
def detect_market_regime(self, daily_returns, lookback=10):
"""Detect current market regime based on portfolio returns"""
if len(daily_returns) >= 1:
market_return = np.mean(daily_returns)
market_vol = np.std(daily_returns)
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):]
avg_recent_return = np.mean(recent_returns)
if len(self.portfolio_returns) >= 5:
very_recent = np.mean(self.portfolio_returns[-3:])
less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3])
trend_change = very_recent - less_recent
if trend_change > 0.5 and avg_recent_return > 0.2:
return "breakout_bullish"
elif trend_change < -0.5 and avg_recent_return < -0.2:
return "breakdown_bearish"
if avg_recent_return > 0.15:
if market_return > 0:
return "bullish_strong"
else:
return "bullish_pullback"
elif avg_recent_return < -0.3:
if market_return < -0.2:
return "bearish_high_vol"
else:
return "bearish_low_vol"
elif avg_recent_return > 0 and market_return > 0:
return "bullish"
elif avg_recent_return < 0 and market_return < 0:
return "bearish"
if market_return > -0.05:
return "neutral"
else:
return "bearish"
return "neutral"
def detect_bearish_signals(self, recent_returns):
"""Detect early warning signs of bearish conditions"""
bearish_signals = 0
signal_strength = 0
if len(self.portfolio_returns) >= 5:
recent_portfolio_returns = self.portfolio_returns[-5:]
pos_days = sum(1 for r in recent_portfolio_returns if r > 0)
neg_days = sum(1 for r in recent_portfolio_returns if r < 0)
if neg_days > pos_days:
bearish_signals += 1
signal_strength += 0.2 * (neg_days - pos_days)
if len(self.portfolio_returns) >= 10:
recent_vol = np.std(self.portfolio_returns[-5:])
older_vol = np.std(self.portfolio_returns[-10:-5])
if recent_vol > older_vol * 1.3: # 30% volatility increase
bearish_signals += 1
signal_strength += 0.3 * (recent_vol/older_vol - 1)
if len(self.portfolio_returns) >= 5:
if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3:
bearish_signals += 1
signal_strength += 0.3
return bearish_signals, signal_strength
def generate_positions(self, prediction_data, current_returns=None):
"""Generate position sizing based on predictions"""
if not prediction_data:
return {}
# Update market regime
if current_returns is not None:
self.current_regime = self.detect_market_regime(current_returns)
bearish_count, bearish_strength = self.detect_bearish_signals(current_returns)
self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5
base_threshold = 0.25 * self.pred_std
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
self.adaptive_threshold = base_threshold * 0.4
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
self.adaptive_threshold = base_threshold * 2.5
elif self.current_regime in ["bearish", "bearish_low_vol"]:
self.adaptive_threshold = base_threshold * 1.6
elif self.current_regime in ["bullish_pullback"]:
self.adaptive_threshold = base_threshold * 0.9
else: # neutral or other regimes
self.adaptive_threshold = base_threshold * 0.75
positions = {}
sector_data = {}
for ticker, data in prediction_data.items():
pred_return = data["pred_return"]
sector = self.sector_mappings.get(ticker, "Unknown")
if sector not in sector_data:
sector_data[sector] = []
sector_data[sector].append({
"ticker": ticker,
"pred_return": pred_return,
"composite_score": pred_return / self.adaptive_threshold
})
# Rank sectors by predicted return
sector_avg_scores = {}
for sector, stocks in sector_data.items():
sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks])
ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True)
# Focus on top 2 sectors
top_sectors = ranked_sectors[:min(2, len(ranked_sectors))]
# Allocate within top sectors - focus on stocks with strongest signals
for sector in top_sectors:
sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True)
# Take top 2 stocks in each selected sector
top_stocks = sector_stocks[:min(2, len(sector_stocks))]
# In the generate_positions method, modify these lines
for stock in top_stocks:
ticker = stock["ticker"]
signal_strength = stock["pred_return"] / (0.2 * self.pred_std)
# Reduce max size from 1.5 to 0.4 to prevent overallocation
size = min(0.4, max(0.1, 0.2 * signal_strength))
positions[ticker] = size
# Defensive adjustments
if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]:
# 1. Reduce overall position sizes
scaling_factor = 0.6 if self.defensive_mode else 0.8
for ticker in positions:
positions[ticker] *= scaling_factor
# 2. Add inverse positions (shorts) as hedges if we have bearish predictions
if len(positions) > 0:
negative_preds = {t: data["pred_return"] for t, data in prediction_data.items()
if data["pred_return"] < 0 and t not in positions}
if negative_preds:
worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2]
for ticker, pred in worst_stocks:
hedge_size = -0.2 if self.defensive_mode else -0.15
positions[ticker] = hedge_size
# Position count limits
max_positions = 5
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
max_positions = 6 # More positions in strong bull markets
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
max_positions = 3 # Fewer positions in bear markets
# Trim to max positions if needed
if len(positions) > max_positions:
sorted_positions = sorted(positions.items(), key=lambda x: abs(prediction_data[x[0]]["composite_score"]), reverse=True)
positions = {ticker: pos for ticker, pos in sorted_positions[:max_positions]}
# Handle extreme market stress (defensive shutdown)
extreme_market_stress = False
if len(self.portfolio_returns) >= 3:
recent_returns = self.portfolio_returns[-3:]
if sum(r < -1.0 for r in recent_returns) >= 2: # 2+ days of >1% losses
extreme_market_stress = True
if extreme_market_stress:
# Convert to all cash except small hedges
new_positions = {}
for ticker, position in positions.items():
if position < 0:
# Keep small hedge positions only
new_positions[ticker] = max(-0.2, position)
positions = new_positions
return positions
def get_stop_loss_level(self):
"""Get appropriate stop-loss level based on market regime"""
if self.current_regime in ["bullish_strong", "breakout_bullish"]:
if self.defensive_mode:
return -2.0 # Tighter in defensive mode
else:
return -3.5 # More room for positions to breathe
elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]:
return -1.5 # Tighter stop-loss in bearish regimes
else:
if self.defensive_mode:
return -1.8
else:
return -2.5
def update_portfolio_returns(self, daily_return):
"""Update portfolio return history"""
self.portfolio_returns.append(daily_return)
if len(self.portfolio_returns) > 60: # Keep a rolling window
self.portfolio_returns = self.portfolio_returns[-60:]
def update_model_calibration(self, all_predictions):
"""Update prediction standard deviation for threshold calibration"""
all_pred_values = [p for p in all_predictions.values()]
if len(all_pred_values) > 5:
self.pred_std = np.std(all_pred_values)# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from datetime import timedelta
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
from classes import DirectionalAccuracyLoss, QuaternionLinear, KoopmanOperator, EnhancedKQT, TransformerEncoderLayer, SectorAwareCrossAttention, CrossStockAttention, QuaternionAttention, MultiScaleKoopmanOperator
from kqtstrategy import KQTStrategy
class KQTAlgorithm(QCAlgorithm):
def Initialize(self):
"""Initialize the algorithm"""
# Set start date, end date, and cash
self.SetStartDate(2019, 1, 3)
self.SetEndDate(2025, 3, 26)
self.SetCash(1000000)
self.previous_portfolio_value = 0
# Set benchmark to SPY
self.SetBenchmark("SPY")
# Initialize the KQT strategy
self.strategy = KQTStrategy()
self.lookback = 60 # Need enough data for technical indicators
# Universe of stocks to trade
self.tickers = [
"AAPL", "MSFT", "AMZN", "NVDA", "GOOGL", "META", "TSLA", "JPM",
"XOM", "V", "JNJ", "PG", "HD", "CVX", "MRK", "COST", "KO", "PEP",
"ADBE", "WMT", "MCD", "BAC", "CSCO", "ACN", "NFLX", "CMCSA"
]
# Add each equity to our universe AND store the Symbol objects
self.symbols = {} # Add this line
for ticker in self.tickers:
self.symbols[ticker] = self.AddEquity(ticker, Resolution.Daily).Symbol # Store the Symbol
# Add SPY for market data
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
# Storage for historical data and predictions
self.stock_data = {}
self.current_predictions = {}
self.previous_positions = {}
# Track stopped out positions
self.stopped_out = set()
# Schedule the trading function to run before market close
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(10, 0), # 10:00 AM Eastern
self.TradeExecute)
# Initialize model with feature count
feature_count = 18 # Adjust based on your actual feature count
self.strategy.initialize_model(input_size=feature_count)
# Load pre-trained model weights if available
self.TryLoadModelWeights()
def TryLoadModelWeights(self):
"""Try to load model weights from ObjectStore with proper dimensions"""
try:
if self.ObjectStore.ContainsKey("kqt_model_weights2"): # Use the new weights name
self.Debug("Found model weights in ObjectStore, loading...")
# Get base64 encoded string
encoded_bytes = self.ObjectStore.Read("kqt_model_weights2")
# Decode back to binary
import base64
model_bytes = base64.b64decode(encoded_bytes)
# Save temporarily to file
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix='.pth') as temp:
temp_path = temp.name
temp.write(model_bytes)
# CRITICAL: Load weights first to check dimensions
try:
state_dict = torch.load(temp_path)
# Extract input and hidden dimensions from weights
# Look at the embedding layer weight to get input size
input_shape = state_dict['embedding.0.weight'].shape
actual_input_size = input_shape[1]
# Look at Koopman matrix to determine hidden size
koopman_shape = state_dict['koopman.koopman_matrix'].shape
koopman_dim = koopman_shape[0]
# Calculate the appropriate hidden_size (reverse of the calculation in EnhancedKQT)
# If koopman_dim is 256, and hidden_size*4 -> koopman_dim/4
hidden_size = 1 # Start with default
if koopman_dim == 256:
hidden_size = 1 # This will make hidden_size*4 = 4, and koopman_dim = 256
elif koopman_dim == 1024:
hidden_size = 4
else:
hidden_size = koopman_dim // 256 # General formula
self.Debug(f"Detected dimensions - input_size: {actual_input_size}, hidden_size: {hidden_size}, koopman_dim: {koopman_dim}")
# Reinitialize model with EXACT dimensions from the saved weights
self.strategy.initialize_model(
input_size=actual_input_size,
hidden_size=hidden_size, # This is critical - must match saved weights
num_layers=2,
dropout=0.25,
n_heads=8
)
# Now load the weights with matching dimensions
self.strategy.load_model_weights(temp_path)
self.Debug("Successfully loaded model weights")
except Exception as inner_e:
self.Debug(f"Error loading weights: {str(inner_e)}")
# Clean up
import os
os.unlink(temp_path)
else:
self.Debug("No model weights found in ObjectStore")
except Exception as e:
self.Debug(f"Error loading model weights: {str(e)}")
def OnData(self, data):
"""OnData event is the primary entry point for your algorithm."""
# We're using scheduled events instead of processing each data point
pass
def TradeExecute(self):
"""Execute trading logic daily"""
# Skip if not trading day
if not self.IsMarketOpen(self.spy):
return
self.Debug(f"TradeExecute running on {self.Time}")
# 1. Update historical data for all stocks
self.UpdateHistoricalData()
# 2. Generate predictions for each stock
self.current_predictions = self.GeneratePredictions()
self.Debug(f"Current predictions: {self.current_predictions}")
# 3. Check for stop losses
self.ProcessStopLosses()
# 4. Generate new position sizes
market_returns = self.GetMarketReturns()
target_positions = self.strategy.generate_positions(self.current_predictions, market_returns)
self.Debug(f"Target positions before execution: {target_positions}")
self.Debug(f"Market returns: {market_returns}")
# 5. Execute trades to reach target positions
self.ExecuteTrades(target_positions)
# 6. Update portfolio return for regime detection
daily_return = self.CalculatePortfolioReturn()
self.strategy.update_portfolio_returns(daily_return)
# 7. Add this line to store today's value for tomorrow's calculation
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
self.Debug(f"Generated {len(target_positions)} positions: {target_positions}")
def CalculatePortfolioReturn(self):
"""Calculate today's portfolio return"""
# Get the portfolio value change
current_value = self.Portfolio.TotalPortfolioValue
# Use our stored previous value instead of the non-existent property
if self.previous_portfolio_value > 0:
return (current_value / self.previous_portfolio_value - 1) * 100 # as percentage
# On first day, just store the value and return 0
self.previous_portfolio_value = current_value
return 0
def UpdateHistoricalData(self):
"""Fetch and update historical data for all symbols"""
for ticker in self.tickers:
symbol = self.Securities[ticker].Symbol
# Request sufficient history for features
history = self.History(symbol, self.lookback, Resolution.Daily)
if history.empty or len(history) < self.lookback:
self.Debug(f"Not enough historical data for {ticker}, skipping")
continue
# Store historical data
if isinstance(history.index, pd.MultiIndex):
history_reset = history.reset_index()
symbol_data = history_reset[history_reset['symbol'] == symbol]
self.stock_data[ticker] = symbol_data
else:
self.stock_data[ticker] = history
def GetMarketReturns(self):
"""Get recent market returns for regime detection"""
spy_history = self.History(self.spy, 10, Resolution.Daily)
if spy_history.empty:
return []
# Handle both MultiIndex and regular index formats
if isinstance(spy_history.index, pd.MultiIndex):
spy_history_reset = spy_history.reset_index()
spy_history_filtered = spy_history_reset[spy_history_reset['symbol'] == self.spy]
spy_prices = spy_history_filtered['close'].values
else:
spy_prices = spy_history['close'].values
# Calculate returns
spy_returns = []
for i in range(1, len(spy_prices)):
daily_return = (spy_prices[i] / spy_prices[i-1] - 1) * 100
spy_returns.append(daily_return)
return spy_returns
def GeneratePredictions(self):
"""Generate predictions for all stocks"""
predictions = {}
self.Debug(f"Generating predictions with {len(self.stock_data)} stocks in data")
for ticker, history in self.stock_data.items():
try:
if len(history) < self.lookback:
continue
# Prepare data for this stock
X, stock_df = self.strategy.prepare_stock_data(history, ticker)
if X is None or len(X) == 0:
# FALLBACK: Simple prediction if ML fails
if isinstance(history.index, pd.MultiIndex):
history_reset = history.reset_index()
closes = history_reset[history_reset['symbol'] == self.symbols[ticker]]['close'].values
else:
closes = history['close'].values
if len(closes) > 20:
# Simple momentum strategy for fallback
short_ma = np.mean(closes[-5:])
long_ma = np.mean(closes[-20:])
momentum = closes[-1] / closes[-10] - 1 if len(closes) > 10 else 0
pred_score = momentum + 0.5 * (short_ma/long_ma - 1)
predictions[ticker] = {
"pred_return": pred_score * 2,
"composite_score": pred_score * 5
}
self.Debug(f"Used fallback prediction for {ticker}: {pred_score}")
continue
# Generate prediction with ML model
pred_return = self.strategy.predict_returns(X, ticker)
# Store prediction
predictions[ticker] = {
"pred_return": pred_return,
"composite_score": pred_return / self.strategy.adaptive_threshold
}
self.Debug(f"ML prediction for {ticker}: {pred_return}")
except Exception as e:
self.Debug(f"Error processing {ticker}: {str(e)}")
continue
self.Debug(f"Generated {len(predictions)} predictions")
return predictions
def ProcessStopLosses(self):
"""Check and process stop loss orders"""
stop_loss_level = self.strategy.get_stop_loss_level()
for ticker in self.tickers:
if not self.Portfolio[ticker].Invested:
continue
symbol = self.Securities[ticker].Symbol
position = self.Portfolio[ticker]
# Get today's return
history = self.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2:
continue
# Handle both MultiIndex and regular index formats
if isinstance(history.index, pd.MultiIndex):
history_reset = history.reset_index()
symbol_data = history_reset[history_reset['symbol'] == symbol]
if len(symbol_data) < 2:
continue
close_prices = symbol_data['close'].values
else:
close_prices = history['close'].values
daily_return = (close_prices[-1] / close_prices[-2] - 1) * 100
position_type = "long" if position.Quantity > 0 else "short"
hit_stop = False
if position_type == "long" and daily_return < stop_loss_level:
hit_stop = True
self.Debug(f"Stop loss triggered for {ticker} (long): {daily_return:.2f}%")
elif position_type == "short" and daily_return > -stop_loss_level:
hit_stop = True
self.Debug(f"Stop loss triggered for {ticker} (short): {daily_return:.2f}%")
if hit_stop:
self.stopped_out.add(ticker)
def ExecuteTrades(self, target_positions):
"""Execute trades to reach target positions"""
if not target_positions:
self.Debug("No target positions received")
return
self.Debug(f"Executing trades for {len(target_positions)} positions")
# Calculate total allocation first to check for overallocation
portfolio_value = self.Portfolio.TotalPortfolioValue
total_allocation = sum(abs(weight) for weight in target_positions.values())
# Scale down if total allocation exceeds 100%
if total_allocation > 1.0:
scaling_factor = 0.95 / total_allocation # Leave a small buffer
self.Debug(f"Scaling positions by {scaling_factor:.2f} to prevent overallocation")
for ticker in target_positions:
target_positions[ticker] *= scaling_factor
# Execute trades to reach target positions
for ticker, target_weight in target_positions.items():
symbol = self.symbols[ticker]
current_security = self.Securities[symbol]
# Calculate target share amount
price = current_security.Price
target_value = portfolio_value * target_weight
target_shares = int(target_value / price) if price > 0 else 0
# Get current holdings
holding = self.Portfolio[symbol]
current_shares = holding.Quantity
# Calculate shares to trade
shares_to_trade = target_shares - current_shares
# Skip tiny orders
if abs(shares_to_trade) > 0:
try:
# Place the order
if shares_to_trade > 0:
self.MarketOrder(symbol, shares_to_trade)
self.Debug(f"BUY {shares_to_trade} shares of {ticker}")
else:
self.MarketOrder(symbol, shares_to_trade) # Negative for sell
self.Debug(f"SELL {abs(shares_to_trade)} shares of {ticker}")
except Exception as e:
self.Debug(f"Order error for {ticker}: {str(e)}")
# If buying fails, try with a smaller order
if shares_to_trade > 0:
try:
reduced_shares = max(1, int(shares_to_trade * 0.5))
self.MarketOrder(symbol, reduced_shares)
self.Debug(f"Reduced BUY to {reduced_shares} shares of {ticker}")
except:
self.Debug(f"Skipping order for {ticker} due to insufficient buying power")
# Store current positions for next day
self.previous_positions = target_positions.copy()