| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 30000.00 End Equity 30000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -1.744 Tracking Error 0.652 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
import random
from collections import defaultdict
import pickle
import base64
from datetime import timedelta, datetime
import sys
class Config:
def __init__(self, algorithm):
# Core configurable parameters
self.mode = algorithm.GetParameter("mode", "backtesting")
# Handle start_date
start_date_param = algorithm.GetParameter("start_date", "2019-01-01")
if isinstance(start_date_param, str):
try:
self.start_date = datetime.strptime(start_date_param, "%Y-%m-%d")
except ValueError as e:
algorithm.Debug(f"Error parsing start_date '{start_date_param}': {str(e)}. Using default 2019-01-01.")
self.start_date = datetime(2019, 1, 1)
elif isinstance(start_date_param, datetime):
self.start_date = start_date_param
else:
algorithm.Debug(f"Invalid start_date type: {type(start_date_param)}. Using default 2019-01-01.")
self.start_date = datetime(2019, 1, 1)
# Handle end_date
end_date_param = algorithm.GetParameter("end_date", "2025-03-31")
if isinstance(end_date_param, str):
try:
self.end_date = datetime.strptime(end_date_param, "%Y-%m-%d")
except ValueError as e:
algorithm.Debug(f"Error parsing end_date '{end_date_param}': {str(e)}. Using default 2025-03-31.")
self.end_date = datetime(2025, 3, 31)
elif isinstance(end_date_param, datetime):
self.end_date = end_date_param
else:
algorithm.Debug(f"Invalid end_date type: {type(end_date_param)}. Using default 2025-03-31.")
self.end_date = datetime(2025, 3, 31)
self.initial_cash = float(algorithm.GetParameter("initial_cash", 30000))
self.allocation = float(algorithm.GetParameter("allocation", 0.2))
self.benchmark_symbol = algorithm.GetParameter("benchmark_symbol", "BTCUSDT")
self.trading_symbol = algorithm.GetParameter("trading_symbol", "ETHUSDT")
self.exchange = algorithm.GetParameter("exchange", "binance")
self.resolution_training = Resolution.Daily
self.model_version = algorithm.GetParameter("model_version", "v02")
self.random_seed = int(algorithm.GetParameter("random_seed", 138))
# Additional backtesting parameters
self.q_table_path = algorithm.GetParameter("q_table_path", f"{self.model_version}_{self.random_seed}_qtable.pkl")
self.max_position_size = float(algorithm.GetParameter("max_position_size", 0.5))
self.commission_rate = float(algorithm.GetParameter("commission_rate", 0.0005))
self.slippage = float(algorithm.GetParameter("slippage", 0.0))
self.profit_target_pct = float(algorithm.GetParameter("profit_target_pct", 0.03))
self.stop_loss_pct = float(algorithm.GetParameter("stop_loss_pct", 0.02))
self.max_consecutive_losses = int(algorithm.GetParameter("max_consecutive_losses", 5))
self.log_frequency = int(algorithm.GetParameter("log_frequency", 1))
# Technical indicator periods
self.sma_short_period = int(algorithm.GetParameter("sma_short", 20))
self.sma_long_period = int(algorithm.GetParameter("sma_long", 60))
self.ema_short_period = int(algorithm.GetParameter("ema_short", 9))
self.rsi_period = int(algorithm.GetParameter("rsi_period", 14))
self.macd_fast = int(algorithm.GetParameter("macd_fast", 12))
self.macd_slow = int(algorithm.GetParameter("macd_slow", 26))
self.macd_signal = int(algorithm.GetParameter("macd_signal", 9))
# Performance thresholds
self.success_rate_threshold = float(algorithm.GetParameter("success_rate_threshold", 0.50))
self.sharpe_ratio_threshold = float(algorithm.GetParameter("sharpe_ratio_threshold", 0.5))
self.strategy_accuracy_threshold = float(algorithm.GetParameter("strategy_accuracy_threshold", 0.50))
def get_market(self):
return {
"binance": Market.Binance,
"coinbase": Market.GDAX,
"gdax": Market.GDAX,
"bitfinex": Market.Bitfinex,
"kraken": Market.Kraken
}.get(self.exchange.lower(), Market.Binance)
def get_model_prefix(self):
return f"{self.model_version}_{self.random_seed}"
class DailyTradingModel:
def __init__(self, config):
self.config = config
self.close_prices, self.high_prices, self.low_prices, self.volume = [], [], [], []
self.daily_returns, self.sma_short, self.sma_long, self.ema_short = [], [], [], []
self.rsi, self.macd, self.macd_signal, self.macd_hist = [], [], [], []
self.volatility_history, self.support_levels, self.resistance_levels = [], [], []
self.trend_direction = self.trend_strength = self.avg_gain = self.avg_loss = None
def update(self, close, high, low, volume=None):
# Validate inputs
if not all(np.isfinite(x) for x in [close, high, low]):
close, high, low = 1.0, 1.0, 1.0 # Fallback to default if invalid
self.close_prices.append(close)
self.high_prices.append(high)
self.low_prices.append(low)
self.volume.append(volume if volume is not None and np.isfinite(volume) else 0)
self.daily_returns.append((close - self.close_prices[-2])/self.close_prices[-2] if len(self.close_prices) > 1 else 0)
vol = np.std(self.close_prices[-20:])/np.mean(self.close_prices[-20:]) if len(self.close_prices) >= 20 else 0
self.volatility_history.append(0 if not np.isfinite(vol) else vol)
self._update_indicators()
self._update_trend_analysis()
self._update_support_resistance()
def _update_indicators(self):
closes = np.array(self.close_prices)
sma_short = np.mean(closes[-self.config.sma_short_period:]) if len(closes) >= self.config.sma_short_period else (closes[-1] if closes.size > 0 else 0)
sma_long = np.mean(closes[-self.config.sma_long_period:]) if len(closes) >= self.config.sma_long_period else (closes[-1] if closes.size > 0 else 0)
self.sma_short.append(0 if not np.isfinite(sma_short) else sma_short)
self.sma_long.append(0 if not np.isfinite(sma_long) else sma_long)
self._calculate_ema(closes, self.ema_short, self.config.ema_short_period)
self._calculate_rsi(closes)
self._calculate_macd(closes)
def _calculate_ema(self, prices, ema_list, period):
if len(prices) == 0:
ema_list.append(0)
return
if len(ema_list) == 0:
ema = np.mean(prices[-period:]) if len(prices) >= period else prices[-1]
ema_list.append(0 if not np.isfinite(ema) else ema)
return
multiplier = 2 / (period + 1)
ema = (prices[-1] - ema_list[-1]) * multiplier + ema_list[-1]
ema_list.append(0 if not np.isfinite(ema) else ema)
def _calculate_rsi(self, prices):
if len(prices) <= self.config.rsi_period:
self.rsi.append(50)
return
delta = np.diff(prices)
gains = np.array([max(0, d) for d in delta[-self.config.rsi_period:]])
losses = np.array([abs(min(0, d)) for d in delta[-self.config.rsi_period:]])
if self.avg_gain is None or self.avg_loss is None:
self.avg_gain = np.mean(gains)
self.avg_loss = np.mean(losses)
else:
self.avg_gain = (self.avg_gain * (self.config.rsi_period - 1) + gains[-1]) / self.config.rsi_period
self.avg_loss = (self.avg_loss * (self.config.rsi_period - 1) + losses[-1]) / self.config.rsi_period
self.avg_gain = 0 if not np.isfinite(self.avg_gain) else self.avg_gain
self.avg_loss = 0 if not np.isfinite(self.avg_loss) else self.avg_loss
if self.avg_loss == 0:
if self.avg_gain == 0:
self.rsi.append(50)
else:
self.rsi.append(85)
else:
rs = self.avg_gain / self.avg_loss
rsi = 100 - (100 / (1 + rs))
self.rsi.append(50 if not np.isfinite(rsi) else rsi)
def _calculate_macd(self, prices):
if len(prices) < self.config.macd_slow:
self.macd.append(0)
self.macd_signal.append(0)
self.macd_hist.append(0)
return
ema12 = pd.Series(prices).ewm(span=self.config.macd_fast, adjust=False).mean()
ema26 = pd.Series(prices).ewm(span=self.config.macd_slow, adjust=False).mean()
macd_line = ema12.iloc[-1] - ema26.iloc[-1]
macd_values = ema12 - ema26
signal_line = macd_values.ewm(span=self.config.macd_signal, adjust=False).mean().iloc[-1]
macd_line = 0 if not np.isfinite(macd_line) else macd_line
signal_line = 0 if not np.isfinite(signal_line) else signal_line
self.macd.append(macd_line)
self.macd_signal.append(signal_line)
self.macd_hist.append(macd_line - signal_line)
def _update_trend_analysis(self):
if len(self.close_prices) < 3:
self.trend_direction = self.trend_strength = 0
return
short_term_change = (self.close_prices[-1] / self.close_prices[-3]) - 1
if len(self.ema_short) >= 2 and len(self.sma_short) >= 2 and len(self.sma_long) >= 2:
signals = [
short_term_change > 0,
self.sma_short[-1] > self.sma_long[-1],
(self.ema_short[-1] / self.ema_short[-2]) - 1 > 0,
(self.sma_short[-1] / self.sma_short[-2]) - 1 > 0,
(self.sma_long[-1] / self.sma_long[-2]) - 1 > 0
]
positive = sum(signals)
if positive > len(signals) - positive + 1:
self.trend_direction, self.trend_strength = 1, min(10, int((positive / len(signals)) * 10))
elif len(signals) - positive > positive + 1:
self.trend_direction, self.trend_strength = -1, min(10, int(((len(signals) - positive) / len(signals)) * 10))
else:
self.trend_direction, self.trend_strength = 0, min(5, abs(positive - (len(signals) - positive)))
else:
self.trend_direction = 1 if short_term_change > 0.01 else (-1 if short_term_change < -0.01 else 0)
self.trend_strength = min(5, int(abs(short_term_change * 100)))
def _update_support_resistance(self):
if len(self.close_prices) < 20:
return
prices = np.array(self.close_prices)
window = min(5, len(prices) // 10)
if len(self.close_prices) % 20 == 0:
self.support_levels, self.resistance_levels = [], []
for i in range(window, len(prices) - window):
if all(prices[i] > prices[i - j] for j in range(1, window + 1)) and all(prices[i] > prices[i + j] for j in range(1, window + 1)):
self.resistance_levels.append(prices[i])
if all(prices[i] < prices[i - j] for j in range(1, window + 1)) and all(prices[i] < prices[i + j] for j in range(1, window + 1)):
self.support_levels.append(prices[i])
self.support_levels = self.support_levels[-5:] if self.support_levels else [min(prices)]
self.resistance_levels = self.resistance_levels[-5:] if self.resistance_levels else [max(prices)]
def get_state_features(self):
if len(self.close_prices) < 2:
return np.zeros(9)
price_change = (self.close_prices[-1] / self.close_prices[-2]) - 1
price_change = 0 if not np.isfinite(price_change) else price_change
avg_volume = np.mean(self.volume[-10:]) if len(self.volume) >= 10 else 1.0
volume_ratio = self.volume[-1] / avg_volume if avg_volume != 0 else 1.0
volume_ratio = 1.0 if not np.isfinite(volume_ratio) else volume_ratio
rsi_value = self.rsi[-1] if self.rsi else 50
macd_hist_value = self.macd_hist[-1] if self.macd_hist else 0
sma_cross = 1 if (len(self.sma_short) > 0 and len(self.sma_long) > 0 and self.sma_short[-1] > self.sma_long[-1]) else -1
volatility = self.volatility_history[-1] if self.volatility_history else 0
trend_signal = self.trend_direction if self.trend_direction is not None else 0
price_level, sr_proximity = 0.5, 0
if self.support_levels and self.resistance_levels:
closest_support = min(self.support_levels, key=lambda x: abs(x - self.close_prices[-1]))
closest_resistance = min(self.resistance_levels, key=lambda x: abs(x - self.close_prices[-1]))
range_total = closest_resistance - closest_support
if range_total > 0:
price_level = min(1.0, max(0.0, (self.close_prices[-1] - closest_support) / range_total))
support_dist = (self.close_prices[-1] - closest_support) / self.close_prices[-1] if closest_support > 0 else 1
resist_dist = (closest_resistance - self.close_prices[-1]) / self.close_prices[-1] if closest_resistance > 0 else 1
if support_dist < resist_dist and support_dist < 0.02:
sr_proximity = -1
elif resist_dist < support_dist and resist_dist < 0.02:
sr_proximity = 1
price_level = 0.5 if not np.isfinite(price_level) else price_level
features = [
price_change, volume_ratio, rsi_value / 100, macd_hist_value,
sma_cross, volatility, trend_signal, price_level, sr_proximity
]
return np.array([0 if not np.isfinite(x) else x for x in features])
def get_trade_signals(self):
trend_signal = trend_conf = mean_rev_signal = mean_rev_conf = macd_signal = macd_conf = 0
if len(self.sma_short) > 0 and len(self.sma_long) > 0:
trend_signal = 1 if self.sma_short[-1] > self.sma_long[-1] else -1
trend_conf = min(1.0, abs(self.sma_short[-1] - self.sma_long[-1]) / self.sma_long[-1])
trend_conf = 0 if not np.isfinite(trend_conf) else trend_conf
if self.rsi:
rsi_value = self.rsi[-1]
rsi_dev = abs(rsi_value - 50)
if rsi_value < 50:
mean_rev_signal = 1
mean_rev_conf = min(1.0, rsi_dev / 25)
else:
mean_rev_signal = -1
mean_rev_conf = min(1.0, rsi_dev / 25)
mean_rev_conf = 0 if not np.isfinite(mean_rev_conf) else mean_rev_conf
if len(self.macd) > 0 and len(self.macd_signal) > 0:
macd_signal = 1 if self.macd[-1] > self.macd_signal[-1] else -1
macd_conf = min(1.0, abs(self.macd[-1] - self.macd_signal[-1]) / (abs(self.macd_signal[-1]) + 1e-6))
macd_conf = 0 if not np.isfinite(macd_conf) else macd_conf
weighted = (trend_signal * trend_conf * 0.3 +
mean_rev_signal * mean_rev_conf * 0.3 +
macd_signal * macd_conf * 0.4)
weighted = 0 if not np.isfinite(weighted) else weighted
if abs(weighted) < 0.2:
final_action = 0
final_confidence = 0.0
else:
final_action = 1 if weighted > 0 else 2
final_confidence = min(1.0, abs(weighted))
return {
'action': final_action,
'confidence': final_confidence,
'trend': trend_signal,
'mean_reversion': mean_rev_signal,
'macd_cross': macd_signal
}
def CalculateDailyReward(self, action, current_price, next_price, next_high, next_low, signal_info):
if current_price <= 0 or not np.isfinite(current_price) or not np.isfinite(next_price):
return 0
price_return = (next_price - current_price) / current_price
price_return = 0 if not np.isfinite(price_return) else price_return
commission = self.config.commission_rate
stop_loss = -self.config.stop_loss_pct
take_profit = self.config.profit_target_pct
reward = 0
if action == 1: # Buy
if next_low <= current_price * (1 + stop_loss):
reward = stop_loss - commission
elif next_high >= current_price * (1 + take_profit):
reward = take_profit - commission
else:
reward = price_return - commission
elif action == 2: # Sell (short)
if next_high >= current_price * (1 - stop_loss):
reward = stop_loss - commission
elif next_low <= current_price * (1 - take_profit):
reward = take_profit - commission
else:
reward = -price_return - commission
else: # Hold
reward = 0
if action in [1, 2]:
signal_bonus = 0.05 if action == signal_info['action'] else -0.02
reward += signal_bonus
if len(self.volatility_history) > 0:
vol_scale = min(1.5, max(0.8, 1.0 / (self.volatility_history[-1] + 1e-6)))
vol_scale = 1.0 if not np.isfinite(vol_scale) else vol_scale
reward *= vol_scale
return reward
class MBPOBacktestAlgorithm(QCAlgorithm):
def Initialize(self):
self.Debug("Initializing MBPO Backtesting Algorithm...")
# Log Python version and environment info for debugging
self.Debug(f"Python version: {sys.version}")
self.Debug(f"NumPy version: {np.__version__}")
self.Debug(f"Pandas version: {pd.__version__}")
self.config = Config(self)
# Set backtest parameters
self.SetStartDate(self.config.start_date)
self.SetEndDate(self.config.end_date)
self.SetCash(self.config.initial_cash)
# Configure brokerage for Binance paper trading
self.SetBrokerageModel(BrokerageName.Binance, AccountType.Margin)
self.SetSecurityInitializer(FuncSecurityInitializer(self._initialize_security))
# Add symbols
market = self.config.get_market()
self.btcSymbol = self.AddCrypto(self.config.benchmark_symbol, self.config.resolution_training, market).Symbol
self.SetBenchmark(self.btcSymbol)
self.symbol = self.AddCrypto(self.config.trading_symbol, self.config.resolution_training, market).Symbol
# Initialize trading model
self.trading_model = DailyTradingModel(self.config)
# Initialize indicators
self.sma20 = self.SMA(self.symbol, self.config.sma_short_period, self.config.resolution_training)
self.sma50 = self.SMA(self.symbol, self.config.sma_long_period, self.config.resolution_training)
self.rsi = self.RSI(self.symbol, self.config.rsi_period, MovingAverageType.Simple, self.config.resolution_training)
self.macd = self.MACD(self.symbol, self.config.macd_fast, self.config.macd_slow,
self.config.macd_signal, MovingAverageType.Exponential, self.config.resolution_training)
# Set warmup period to ensure indicators are ready
warmup_days = max(self.config.sma_long_period, self.config.macd_slow, self.config.rsi_period) + 20 # Increased buffer
self.SetWarmUp(TimeSpan.FromDays(warmup_days))
# Initialize state and action parameters
self.state_dim = 9
self.actions = [0, 1, 2] # Hold, Buy, Sell
self.q_table = defaultdict(lambda: np.zeros(len(self.actions)))
self.q_table_loaded = False
self.last_action_time = None
self.action_interval = timedelta(days=1)
# Performance tracking
self.daily_returns = []
self.sharpe_ratios = []
self.strategy_accuracy = {'trend': [], 'mean_reversion': [], 'macd_cross': []}
self.consecutive_losses = 0
self.portfolio_values = []
self.trade_count = 0
self.log_count = 0
# Set random seed
random.seed(self.config.random_seed)
np.random.seed(self.config.random_seed)
self.Debug(f"Model prefix: {self.config.get_model_prefix()}")
self.Debug(f"Backtest mode: {self.config.mode}")
def _initialize_security(self, security):
security.SetFeeModel(ConstantFeeModel(self.config.commission_rate * security.Price))
security.SetSlippageModel(ConstantSlippageModel(self.config.slippage))
def LogWithRateLimit(self, message, force=False):
self.log_count += 1
if force or self.log_count % self.config.log_frequency == 0:
self.Debug(message)
def LoadQTable(self):
try:
if self.ObjectStore.ContainsKey(self.config.q_table_path):
qtable_b64 = self.ObjectStore.Read(self.config.q_table_path)
qtable_data = base64.b64decode(qtable_b64)
loaded_table = pickle.loads(qtable_data)
# Convert loaded table to defaultdict to ensure missing keys are handled
self.q_table = defaultdict(lambda: np.zeros(len(self.actions)), loaded_table)
self.q_table_loaded = True
self.LogWithRateLimit(f"QTable loaded from '{self.config.q_table_path}'", True)
else:
self.LogWithRateLimit(f"QTable not found at '{self.config.q_table_path}'", True)
self.q_table_loaded = False
except Exception as e:
self.LogWithRateLimit(f"Error loading QTable: {str(e)}", True)
self.q_table_loaded = False
def DiscretizeState(self, state_vector):
# Ensure all values are finite and within expected ranges
state_vector = np.nan_to_num(state_vector, nan=0.0, posinf=1.0, neginf=-1.0)
price_change = np.clip(state_vector[0], -0.05, 0.05)
volume_ratio = state_vector[1]
rsi = state_vector[2] * 100
macd = state_vector[3]
sma_cross = state_vector[4]
volatility = state_vector[5]
trend = state_vector[6]
price_level = state_vector[7]
sr_proximity = state_vector[8]
discretized = (
min(4, max(0, int((price_change + 0.05) / 0.1 * 5))),
0 if volume_ratio < 0.7 else (2 if volume_ratio > 1.3 else 1),
0 if rsi < 30 else (1 if rsi < 45 else (2 if rsi < 55 else (3 if rsi < 70 else 4))),
0 if macd < -0.01 else (2 if macd > 0.01 else 1),
0 if sma_cross == -1 else 1,
0 if volatility < 0.01 else (2 if volatility > 0.03 else 1),
min(2, max(0, int((trend + 1) / 2 * 3))),
min(2, max(0, int(price_level * 3))),
1 if sr_proximity > 0 else (0 if sr_proximity < 0 else 2)
)
# Validate that all elements are hashable
if not all(isinstance(x, (int, float)) for x in discretized):
raise ValueError(f"Invalid state tuple elements: {discretized}")
return discretized
def CalculateSharpe(self, returns, risk_free_rate=0.0):
try:
if len(returns) < 2:
return 0.0
returns = np.array(returns)
returns = returns[np.abs(returns) < np.percentile(np.abs(returns), 95)]
if len(returns) < 2:
return 0.0
excess = returns - risk_free_rate
stddev = np.std(excess)
if stddev == 0 or not np.isfinite(stddev):
return 0.0
periods = min(252, len(returns) * 4)
sharpe = np.mean(excess) / stddev * np.sqrt(periods)
return np.clip(sharpe, -10.0, 10.0)
except Exception as e:
self.LogWithRateLimit(f"Error calculating Sharpe ratio: {str(e)}")
return 0.0
def CalculateMaxDrawdown(self, returns):
if not returns:
return 0.0
cumulative = np.cumsum(returns)
peak = np.maximum.accumulate(cumulative)
drawdown = peak - cumulative
return np.max(drawdown) if len(drawdown) > 0 else 0.0
def UpdatePerformanceMetrics(self, action, current_price, signal_info):
portfolio_value = self.Portfolio.TotalPortfolioValue
self.portfolio_values.append(portfolio_value)
if len(self.portfolio_values) > 1:
daily_return = (self.portfolio_values[-1] - self.portfolio_values[-2]) / self.portfolio_values[-2]
daily_return = 0 if not np.isfinite(daily_return) else daily_return
self.daily_returns.append(daily_return)
price_return = (current_price - self.trading_model.close_prices[-2]) / self.trading_model.close_prices[-2] if len(self.trading_model.close_prices) > 1 else 0
price_return = 0 if not np.isfinite(price_return) else price_return
for strat in ['trend', 'mean_reversion', 'macd_cross']:
signal_val = signal_info.get(strat, 0)
if signal_val != 0:
is_correct = (signal_val > 0 and price_return > 0) or (signal_val < 0 and price_return < 0)
self.strategy_accuracy[strat].append(1 if is_correct else 0)
def SaveBacktestStats(self):
try:
avg_return = np.mean(self.daily_returns) if self.daily_returns else 0
sharpe = self.CalculateSharpe(self.daily_returns)
max_drawdown = self.CalculateMaxDrawdown(self.daily_returns)
strategy_accuracy = {strat: sum(res) / len(res) if res else 0 for strat, res in self.strategy_accuracy.items()}
avg_strategy_accuracy = np.mean([acc for acc in strategy_accuracy.values() if acc > 0]) if strategy_accuracy else 0
stats = {
'avg_daily_return': avg_return,
'sharpe_ratio': sharpe,
'max_drawdown': max_drawdown,
'trade_count': self.trade_count,
'strategy_accuracy': strategy_accuracy,
'avg_strategy_accuracy': avg_strategy_accuracy,
'portfolio_value': self.Portfolio.TotalPortfolioValue,
'timestamp': str(self.Time),
'random_seed': self.config.random_seed,
'model_version': self.config.model_version
}
stats_data = pickle.dumps(stats)
stats_b64 = base64.b64encode(stats_data).decode('utf-8')
filename = f"{self.config.get_model_prefix()}_backtest_stats.pkl"
if self.ObjectStore is not None:
self.ObjectStore.Save(filename, stats_b64)
self.LogWithRateLimit(f"Backtest stats saved as '{filename}'", True)
except Exception as e:
self.LogWithRateLimit(f"Error saving backtest stats: {str(e)}", True)
def OnData(self, data):
if self.IsWarmingUp or not self.q_table_loaded:
if not self.q_table_loaded:
self.LoadQTable()
return
# Ensure indicators are ready
if not self.sma20.IsReady or not self.sma50.IsReady or not self.rsi.IsReady or not self.macd.IsReady:
self.LogWithRateLimit("Indicators not ready, skipping OnData", True)
return
current_time = self.Time
if self.last_action_time is None or (current_time - self.last_action_time) >= self.action_interval:
try:
# Validate data availability
if self.symbol not in data or not data[self.symbol]:
self.LogWithRateLimit("No data available for symbol", True)
return
bar = data[self.symbol]
if isinstance(bar, TradeBar):
close = bar.Close
high = bar.High
low = bar.Low
volume = bar.Volume
elif isinstance(bar, QuoteBar):
bid_close = bar.Bid.Close if bar.Bid else bar.LastBidPrice
ask_close = bar.Ask.Close if bar.Ask else bar.LastAskPrice
bid_high = bar.Bid.High if bar.Bid else bar.LastBidPrice
ask_high = bar.Ask.High if bar.Ask else bar.LastAskPrice
bid_low = bar.Bid.Low if bar.Bid else bar.LastBidPrice
ask_low = bar.Ask.Low if bar.Ask else bar.LastAskPrice
close = (bid_close + ask_close) / 2 if (bid_close and ask_close and np.isfinite(bid_close) and np.isfinite(ask_close)) else (bar.Value if bar.Value and np.isfinite(bar.Value) else 1.0)
high = (bid_high + ask_high) / 2 if (bid_high and ask_high and np.isfinite(bid_high) and np.isfinite(ask_high)) else (bar.Value if bar.Value and np.isfinite(bar.Value) else 1.0)
low = (bid_low + ask_low) / 2 if (bid_low and ask_low and np.isfinite(bid_low) and np.isfinite(ask_low)) else (bar.Value if bar.Value and np.isfinite(bar.Value) else 1.0)
volume = None
else:
self.LogWithRateLimit(f"Unexpected bar type: {type(bar)}", True)
return
# Validate bar data
if not all(np.isfinite(x) for x in [close, high, low]) or close <= 0 or high <= 0 or low <= 0:
self.LogWithRateLimit(f"Invalid bar data: close={close}, high={high}, low={low}", True)
return
self.trading_model.update(close, high, low, volume)
# Get state and signals
current_state_raw = self.trading_model.get_state_features()
current_state_tuple = self.DiscretizeState(current_state_raw)
# Log state for debugging
self.LogWithRateLimit(f"State: {current_state_raw}, Tuple: {current_state_tuple}")
# Select action from Q-table
if not all(isinstance(x, (int, float)) for x in current_state_tuple):
self.LogWithRateLimit(f"Invalid state tuple: {current_state_tuple}", True)
return
# Access q_table with defensive check
q_values = self.q_table[current_state_tuple]
if not isinstance(q_values, np.ndarray) or len(q_values) != len(self.actions):
self.LogWithRateLimit(f"Invalid Q-table entry for state {current_state_tuple}: {q_values}", True)
q_values = np.zeros(len(self.actions))
action = int(np.argmax(q_values))
# Calculate position size
current_price = self.trading_model.close_prices[-1]
portfolio_value = self.Portfolio.TotalPortfolioValue
max_allocation = portfolio_value * self.config.allocation
max_position = portfolio_value * self.config.max_position_size
quantity = min(max_allocation / current_price, max_position / current_price)
quantity = max(0.001, quantity)
# Execute trade
holding = self.Portfolio[self.symbol]
is_long = holding.Quantity > 0
is_short = holding.Quantity < 0
if action == 1: # Buy
if is_short:
self.SetHoldings(self.symbol, 0)
self.LogWithRateLimit(f"Closed short position at {current_price}", True)
if not is_long and quantity > 0:
self.SetHoldings(self.symbol, self.config.allocation)
self.trade_count += 1
self.LogWithRateLimit(f"Buy {quantity:.4f} {self.config.trading_symbol} at {current_price}", True)
elif action == 2: # Sell/Short
if is_long:
self.SetHoldings(self.symbol, 0)
self.LogWithRateLimit(f"Closed long position at {current_price}", True)
if not is_short and quantity > 0:
self.SetHoldings(self.symbol, -self.config.allocation)
self.trade_count += 1
self.LogWithRateLimit(f"Short {quantity:.4f} {self.config.trading_symbol} at {current_price}", True)
else: # Hold
if is_long or is_short:
self.SetHoldings(self.symbol, 0)
self.LogWithRateLimit(f"Closed position at {current_price}", True)
# Update performance metrics
self.UpdatePerformanceMetrics(action, current_price, signal_info=self.trading_model.get_trade_signals())
# Check for stop-loss/take-profit
if holding.Quantity != 0:
unrealized_pnl = holding.UnrealizedProfitPercent
unrealized_pnl = 0 if not np.isfinite(unrealized_pnl) else unrealized_pnl
if unrealized_pnl <= -self.config.stop_loss_pct:
self.SetHoldings(self.symbol, 0)
self.consecutive_losses += 1
self.LogWithRateLimit(f"Stop-loss triggered at {current_price}, PNL: {unrealized_pnl:.2%}", True)
elif unrealized_pnl >= self.config.profit_target_pct:
self.SetHoldings(self.symbol, 0)
self.consecutive_losses = 0
self.LogWithRateLimit(f"Take-profit triggered at {current_price}, PNL: {unrealized_pnl:.2%}", True)
# Circuit breaker
if self.consecutive_losses >= self.config.max_consecutive_losses:
self.LogWithRateLimit("Circuit breaker triggered: too many consecutive losses", True)
self.SetHoldings(self.symbol, 0)
self.Quit("Terminating due to excessive losses")
self.last_action_time = current_time
except Exception as e:
self.LogWithRateLimit(f"Error in OnData: {str(e)}", True)
def OnEndOfAlgorithm(self):
self.LogWithRateLimit("Backtest completed, saving final stats...", True)
self.SaveBacktestStats()
# Log final performance summary
avg_return = np.mean(self.daily_returns) if self.daily_returns else 0
sharpe = self.CalculateSharpe(self.daily_returns)
max_drawdown = self.CalculateMaxDrawdown(self.daily_returns)
strategy_accuracy = {strat: sum(res) / len(res) if res else 0 for strat, res in self.strategy_accuracy.items()}
avg_strategy_accuracy = np.mean([acc for acc in strategy_accuracy.values() if acc > 0]) if strategy_accuracy else 0
self.Debug("=" * 40)
self.Debug("BACKTEST PERFORMANCE SUMMARY")
self.Debug("=" * 40)
self.Debug(f"Total Trades: {self.trade_count}")
self.Debug(f"Final Portfolio Value: {self.Portfolio.TotalPortfolioValue:.2f}")
self.Debug(f"Average Daily Return: {avg_return:.4f}")
self.Debug(f"Sharpe Ratio: {sharpe:.4f}")
self.Debug(f"Max Drawdown: {max_drawdown:.4f}")
self.Debug("-" * 30)
self.Debug("STRATEGY ACCURACY")
for strat, acc in strategy_accuracy.items():
self.Debug(f"{strat.replace('_', ' ').title()}: {acc:.1%}")
self.Debug("=" * 40)