| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 30000.00 End Equity 30000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.416 Tracking Error 0.379 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
import random
from collections import defaultdict, deque
import pickle
import base64
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
class Config:
def __init__(self, algorithm):
# Project requirement and constraints configurable parameters section
self.mode = "training"
self.start_date = datetime(2019, 1, 1)
self.end_date = datetime(2025, 3, 31) # Training period
self.initial_cash = 30000
self.commission_rate = float(algorithm.GetParameter("commission_rate") or 0.001)
self.slippage = float(algorithm.GetParameter("slippage") or 0.0)
self.allocation = 0.2
self.benchmark_symbol = "BTCUSDT"
self.trading_symbol = "ETHUSDT"
self.exchange = algorithm.GetParameter("exchange", "binance")
self.resolution_training = Resolution.Daily
self.model_version = algorithm.GetParameter("model_version", "v01")
self.random_seed = int(algorithm.GetParameter("random_seed", "168") or 42)
# end - Project requirement and constraints configurable parameters
# Daily trading strategy parameters
self.trend_confirmation_period = 3
self.profit_target_pct = 0.03
self.stop_loss_pct = 0.02
# State configuration
self.state_config = {
'price_bins': 12,
'volume_bins': 8,
'rsi_bins': 6,
'macd_bins': 5,
'sma_cross': True,
'volatility_bins': 4,
'trend_strength_bins': 5,
'price_level_bins': 7
}
# RL parameters
self.learning_rate = 0.05
self.discount_factor = 0.95
self.epsilon = 1.0
self.epsilon_min = 0.05
self.epsilon_decay = 0.995
self.episodes = 15
self.min_avg_reward = -0.2
# MBPO parameters
self.model_rollout_length = 3
self.model_train_freq = 30
self.synthetic_data_ratio = 2
self.min_real_samples = 300
self.replay_buffer_size = 8000
self.patience = 6
self.min_improvement = 0.0001
self.min_training_days = 100
self.warmup_period = 100
self.reward_scaling = 10.0
self.log_frequency = 1
self.max_consecutive_failures = 5
self.max_consecutive_negative_rewards = 8
self.max_failed_model_updates = 4
# Indicator parameters
self.sma_short_period = int(algorithm.GetParameter("sma_short") or 20)
self.sma_long_period = int(algorithm.GetParameter("sma_long") or 60)
self.ema_short_period = 9
self.rsi_period = 14
# For the revised signal, we will use RSI relative to 50 (rather than extremes)
self.rsi_overbought = 50
self.rsi_oversold = 50
self.macd_fast = 12
self.macd_slow = 26
self.macd_signal = 9
# Backtest recommendation thresholds
self.success_rate_threshold = 0.52
self.sharpe_ratio_threshold = 1.0
self.strategy_accuracy_threshold = 0.48
def get_market(self):
exchange_key = self.exchange.lower()
if exchange_key == "binance":
return Market.Binance
elif exchange_key in ["coinbase", "gdax"]:
return Market.GDAX
elif exchange_key == "bitfinex":
return Market.Bitfinex
elif exchange_key == "kraken":
return Market.Kraken
return Market.Binance
def get_model_prefix(self):
return f"{self.model_version}_{self.random_seed}"
class DailyTradingModel:
def __init__(self, config):
self.config = config
# Price data
self.close_prices = []
self.high_prices = []
self.low_prices = []
self.volume = []
self.daily_returns = []
# Indicators
self.sma_short = []
self.sma_long = []
self.ema_short = []
self.rsi = []
self.macd = []
self.macd_signal = []
self.macd_hist = []
# Analysis
self.volatility_history = []
self.support_levels = []
self.resistance_levels = []
self.trend_direction = 0 # -1: down, 0: sideways, 1: up
self.trend_strength = 0 # 0-10 scale
# For RSI calculation
self.avg_gain = None
self.avg_loss = None
def update(self, price, high, low, volume):
self.close_prices.append(price)
self.high_prices.append(high)
self.low_prices.append(low)
self.volume.append(volume)
if len(self.close_prices) > 1:
daily_return = (price - self.close_prices[-2]) / self.close_prices[-2]
self.daily_returns.append(daily_return)
else:
self.daily_returns.append(0)
if len(self.close_prices) >= 20:
recent_prices = self.close_prices[-20:]
self.volatility_history.append(np.std(recent_prices) / np.mean(recent_prices))
else:
self.volatility_history.append(0)
self._update_indicators()
self._update_trend_analysis()
self._update_support_resistance()
def _update_indicators(self):
closes = np.array(self.close_prices)
if len(closes) >= self.config.sma_short_period:
self.sma_short.append(np.mean(closes[-self.config.sma_short_period:]))
else:
self.sma_short.append(closes[-1] if closes.size > 0 else 0)
if len(closes) >= self.config.sma_long_period:
self.sma_long.append(np.mean(closes[-self.config.sma_long_period:]))
else:
self.sma_long.append(closes[-1] if closes.size > 0 else 0)
self._calculate_ema(closes, self.ema_short, self.config.ema_short_period)
self._calculate_rsi(closes)
self._calculate_macd(closes)
def _calculate_ema(self, prices, ema_list, period):
if len(prices) == 0:
ema_list.append(0)
return
if len(ema_list) == 0:
if len(prices) >= period:
ema_list.append(np.mean(prices[-period:]))
else:
ema_list.append(prices[-1])
return
multiplier = 2 / (period + 1)
ema_list.append((prices[-1] - ema_list[-1]) * multiplier + ema_list[-1])
def _calculate_rsi(self, prices):
if len(prices) <= self.config.rsi_period:
self.rsi.append(50)
return
delta = np.diff(prices)
if self.avg_gain is None or self.avg_loss is None:
gains = [x if x > 0 else 0 for x in delta[-self.config.rsi_period:]]
losses = [-x if x < 0 else 0 for x in delta[-self.config.rsi_period:]]
self.avg_gain = np.mean(gains)
self.avg_loss = np.mean(losses)
else:
current_gain = delta[-1] if delta[-1] > 0 else 0
current_loss = -delta[-1] if delta[-1] < 0 else 0
self.avg_gain = ((self.avg_gain * (self.config.rsi_period - 1)) + current_gain) / self.config.rsi_period
self.avg_loss = ((self.avg_loss * (self.config.rsi_period - 1)) + current_loss) / self.config.rsi_period
if self.avg_loss == 0:
rsi_value = 100
else:
rs = self.avg_gain / self.avg_loss
rsi_value = 100 - (100 / (1 + rs))
self.rsi.append(rsi_value)
def _calculate_macd(self, prices):
if len(prices) < self.config.macd_slow:
self.macd.append(0)
self.macd_signal.append(0)
self.macd_hist.append(0)
return
ema12 = self._calculate_vectorized_ema(prices, self.config.macd_fast)
ema26 = self._calculate_vectorized_ema(prices, self.config.macd_slow)
macd_line = ema12[-1] - ema26[-1]
macd_values = [p[0] - p[1] for p in zip(ema12, ema26)]
if len(macd_values) >= self.config.macd_signal:
signal_line = np.mean(macd_values[-self.config.macd_signal:])
else:
signal_line = macd_line
histogram = macd_line - signal_line
self.macd.append(macd_line)
self.macd_signal.append(signal_line)
self.macd_hist.append(histogram)
def _calculate_vectorized_ema(self, prices, period):
if len(prices) < period:
return [prices[-1]] * len(prices)
return pd.Series(prices).ewm(span=period, adjust=False).mean().values
def _update_trend_analysis(self):
# We keep this method for other internal uses.
if len(self.close_prices) < self.config.trend_confirmation_period + 1:
self.trend_direction = 0
self.trend_strength = 0
return
short_term_change = (self.close_prices[-1] / self.close_prices[-self.config.trend_confirmation_period]) - 1
if len(self.ema_short) >= 2 and len(self.sma_short) >= 2 and len(self.sma_long) >= 2:
ema_short_slope = (self.ema_short[-1] / self.ema_short[-2]) - 1
sma_short_slope = (self.sma_short[-1] / self.sma_short[-2]) - 1
sma_long_slope = (self.sma_long[-1] / self.sma_long[-2]) - 1
signals = [
short_term_change > 0,
self.sma_short[-1] > self.sma_long[-1],
ema_short_slope > 0,
sma_short_slope > 0,
sma_long_slope > 0
]
positive_count = sum(signals)
negative_count = len(signals) - positive_count
if positive_count > negative_count + 1:
self.trend_direction = 1
self.trend_strength = min(10, int((positive_count / len(signals)) * 10))
elif negative_count > positive_count + 1:
self.trend_direction = -1
self.trend_strength = min(10, int((negative_count / len(signals)) * 10))
else:
self.trend_direction = 0
self.trend_strength = min(5, abs(positive_count - negative_count))
else:
self.trend_direction = 1 if short_term_change > 0.01 else (-1 if short_term_change < -0.01 else 0)
self.trend_strength = min(5, int(abs(short_term_change * 100)))
def _update_support_resistance(self):
if len(self.close_prices) < 20:
return
prices = np.array(self.close_prices)
window = min(5, len(prices) // 10)
if len(self.close_prices) % 20 == 0:
self.support_levels = []
self.resistance_levels = []
for i in range(window, len(prices) - window):
if all(prices[i] > prices[i-j] for j in range(1, window+1)) and all(prices[i] > prices[i+j] for j in range(1, window+1)):
self.resistance_levels.append(prices[i])
if all(prices[i] < prices[i-j] for j in range(1, window+1)) and all(prices[i] < prices[i+j] for j in range(1, window+1)):
self.support_levels.append(prices[i])
self.support_levels = self.support_levels[-5:] if self.support_levels else [min(prices)]
self.resistance_levels = self.resistance_levels[-5:] if self.resistance_levels else [max(prices)]
def get_state_features(self):
if len(self.close_prices) < 2:
return np.zeros(9)
price_change = (self.close_prices[-1] / self.close_prices[-2]) - 1
volume_ratio = self.volume[-1] / np.mean(self.volume[-10:]) if len(self.volume) >= 10 else 1.0
rsi_value = self.rsi[-1] if self.rsi else 50
macd_hist_value = self.macd_hist[-1] if self.macd_hist else 0
sma_cross = 1 if (len(self.sma_short) > 0 and len(self.sma_long) > 0 and self.sma_short[-1] > self.sma_long[-1]) else -1
volatility = self.volatility_history[-1] if self.volatility_history else 0
# Use the SMA comparison for trend instead of complex internal trend analysis:
if len(self.sma_short) and len(self.sma_long):
trend_signal = 1 if self.sma_short[-1] > self.sma_long[-1] else -1
else:
trend_signal = 0
trend_str = trend_signal
price_level = 0.5
if self.support_levels and self.resistance_levels:
closest_support = min(self.support_levels, key=lambda x: abs(x - self.close_prices[-1]))
closest_resistance = min(self.resistance_levels, key=lambda x: abs(x - self.close_prices[-1]))
range_total = closest_resistance - closest_support
if range_total > 0:
price_level = (self.close_prices[-1] - closest_support) / range_total
price_level = min(1.0, max(0.0, price_level))
sr_proximity = 0
if self.support_levels and self.resistance_levels:
support_dist = (self.close_prices[-1] - closest_support) / self.close_prices[-1] if closest_support > 0 else 1
resist_dist = (closest_resistance - self.close_prices[-1]) / self.close_prices[-1] if closest_resistance > 0 else 1
if support_dist < resist_dist and support_dist < 0.02:
sr_proximity = -1
elif resist_dist < support_dist and resist_dist < 0.02:
sr_proximity = 1
return np.array([
price_change,
volume_ratio,
rsi_value / 100,
macd_hist_value,
sma_cross,
volatility,
trend_str, # trend signal here is simply +1 or -1 from SMA cross
price_level,
sr_proximity
])
def get_trade_signals(self):
"""
Revised trade signal generation:
- Trend signal: based on SMA cross.
- Mean reversion: based on RSI relative to 50.
- MACD signal: based on current MACD vs MACD signal line.
Each individual signal now always returns +1 or -1 when data is available.
"""
signals = {}
# Trend signal from SMA cross:
if len(self.sma_short) > 0 and len(self.sma_long) > 0:
if self.sma_short[-1] > self.sma_long[-1]:
trend_signal = 1
else:
trend_signal = -1
# Confidence: relative difference
trend_conf = abs(self.sma_short[-1] - self.sma_long[-1]) / (abs(self.sma_long[-1]) + 1e-6)
else:
trend_signal, trend_conf = 0, 0
# Mean reversion using RSI relative to 50:
if self.rsi:
rsi_value = self.rsi[-1]
if rsi_value < 50:
mean_rev_signal = 1
mean_rev_conf = (50 - rsi_value) / 50.0
elif rsi_value > 50:
mean_rev_signal = -1
mean_rev_conf = (rsi_value - 50) / 50.0
else:
mean_rev_signal, mean_rev_conf = 0, 0
else:
mean_rev_signal, mean_rev_conf = 0, 0
# MACD signal:
if len(self.macd) > 0 and len(self.macd_signal) > 0:
if self.macd[-1] > self.macd_signal[-1]:
macd_signal = 1
macd_conf = abs(self.macd[-1] - self.macd_signal[-1]) / (abs(self.macd_signal[-1]) + 1e-6)
else:
macd_signal = -1
macd_conf = abs(self.macd_signal[-1] - self.macd[-1]) / (abs(self.macd_signal[-1]) + 1e-6)
else:
macd_signal, macd_conf = 0, 0
# Weighted consensus (weights can be adjusted further):
weighted = trend_signal * trend_conf * 0.4 + mean_rev_signal * mean_rev_conf * 0.3 + macd_signal * macd_conf * 0.3
if weighted > 0:
final_action = 1 # Long
elif weighted < 0:
final_action = 2 # Short
else:
final_action = 0 # Hold
final_confidence = min(1.0, abs(weighted))
signals['action'] = final_action
signals['confidence'] = final_confidence
signals['trend'] = trend_signal
signals['mean_reversion'] = mean_rev_signal
signals['macd_cross'] = macd_signal
return signals
class TransitionModel:
def __init__(self, state_dim, action_dim, random_seed=42):
self.state_dim = state_dim
self.action_dim = action_dim
self.random_seed = random_seed
self.models = []
for i in range(state_dim):
model = Pipeline([
('scaler', RobustScaler()),
('regressor', GradientBoostingRegressor(n_estimators=40, max_depth=3, learning_rate=0.05,
subsample=0.8, min_samples_leaf=10, random_state=random_seed))
])
self.models.append(model)
self.trained = False
self.validation_error = float('inf')
self.train_attempts = 0
self.consecutive_failures = 0
self.best_models = None
self.best_error = float('inf')
def train(self, states, actions, next_states):
if len(states) < 200:
return False
self.train_attempts += 1
X = np.hstack((states, actions.reshape(-1, 1)))
try:
X_train, X_val, y_train, y_val = train_test_split(X, next_states, test_size=0.2,
random_state=self.random_seed + self.train_attempts)
total_error = 0
for i in range(self.state_dim):
self.models[i].fit(X_train, y_train[:, i])
y_pred = self.models[i].predict(X_val)
total_error += mean_squared_error(y_val[:, i], y_pred)
new_validation_error = total_error / self.state_dim
if new_validation_error < self.validation_error or not self.trained:
self.validation_error = new_validation_error
self.trained = True
self.consecutive_failures = 0
if new_validation_error < self.best_error:
import copy
self.best_models = copy.deepcopy(self.models)
self.best_error = new_validation_error
return True
else:
self.consecutive_failures += 1
if self.best_models is not None and new_validation_error > 1.5 * self.best_error:
self.models = self.best_models
self.validation_error = self.best_error
return True
except Exception as e:
self.consecutive_failures += 1
print(f"Model training error: {str(e)}")
return False
def predict(self, state, action):
if not self.trained:
return None
try:
X = np.hstack((state.reshape(1, -1), np.array([[action]])))
predictions = np.zeros(self.state_dim)
for i in range(self.state_dim):
predictions[i] = self.models[i].predict(X)[0]
return predictions
except Exception as e:
print(f"Prediction error: {str(e)}")
return None
def needs_reset(self, max_failures):
return self.consecutive_failures >= max_failures
class RewardModel:
def __init__(self, state_dim, action_dim, random_seed=42):
self.state_dim = state_dim
self.action_dim = action_dim
self.random_seed = random_seed
self.model = Pipeline([
('scaler', RobustScaler()),
('regressor', GradientBoostingRegressor(n_estimators=30, max_depth=3, learning_rate=0.05,
subsample=0.8, min_samples_leaf=10, random_state=random_seed))
])
self.trained = False
self.validation_error = float('inf')
self.train_attempts = 0
self.consecutive_failures = 0
self.best_model = None
self.best_error = float('inf')
def train(self, states, actions, rewards):
if len(states) < 200:
return False
self.train_attempts += 1
X = np.hstack((states, actions.reshape(-1, 1)))
y = rewards
try:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2,
random_state=self.random_seed + self.train_attempts)
self.model.fit(X_train, y_train)
y_pred = self.model.predict(X_val)
new_validation_error = mean_squared_error(y_val, y_pred)
if new_validation_error < self.validation_error or not self.trained:
self.validation_error = new_validation_error
self.trained = True
self.consecutive_failures = 0
if new_validation_error < self.best_error:
import copy
self.best_model = copy.deepcopy(self.model)
self.best_error = new_validation_error
return True
else:
self.consecutive_failures += 1
if self.best_model is not None and new_validation_error > 1.5 * self.best_error:
self.model = self.best_model
self.validation_error = self.best_error
return True
except Exception as e:
self.consecutive_failures += 1
print(f"Reward model training error: {str(e)}")
return False
def predict(self, state, action):
if not self.trained:
return 0.0
try:
X = np.hstack((state.reshape(1, -1), np.array([[action]])))
return self.model.predict(X)[0]
except Exception:
return 0.0
def needs_reset(self, max_failures):
return self.consecutive_failures >= max_failures
class ReplayBuffer:
def __init__(self, buffer_size=10000):
self.buffer = deque(maxlen=buffer_size)
def add(self, state, action, reward, next_state):
self.buffer.append((state, action, reward, next_state))
def sample(self, batch_size):
batch_size = min(len(self.buffer), batch_size)
batch = random.sample(self.buffer, batch_size)
states = np.array([x[0] for x in batch])
actions = np.array([x[1] for x in batch])
rewards = np.array([x[2] for x in batch])
next_states = np.array([x[3] for x in batch])
return states, actions, rewards, next_states
def __len__(self):
return len(self.buffer)
def clear(self):
self.buffer.clear()
class MBPOTrainingAlgorithm(QCAlgorithm):
def Initialize(self):
self.Debug("Initializing Daily Trading MBPO Training Algorithm...")
self.config = Config(self)
self.SetStartDate(self.config.start_date)
self.SetEndDate(self.config.end_date)
self.SetCash(self.config.initial_cash)
self.Debug(f"Using random seed: {self.config.random_seed}")
market = self.config.get_market()
self.SetBrokerageModel(DefaultBrokerageModel(AccountType.Margin))
self.btcSymbol = self.AddCrypto(self.config.benchmark_symbol, self.config.resolution_training, market).Symbol
self.SetBenchmark(self.btcSymbol)
self.symbol = self.AddCrypto(self.config.trading_symbol, self.config.resolution_training, market).Symbol
self.trading_model = DailyTradingModel(self.config)
self.sma20 = self.SMA(self.symbol, self.config.sma_short_period, self.config.resolution_training)
self.sma50 = self.SMA(self.symbol, self.config.sma_long_period, self.config.resolution_training)
self.rsi = self.RSI(self.symbol, self.config.rsi_period, MovingAverageType.Simple, self.config.resolution_training)
self.macd = self.MACD(self.symbol, self.config.macd_fast, self.config.macd_slow, self.config.macd_signal,
MovingAverageType.Exponential, self.config.resolution_training)
self.SetWarmUp(TimeSpan.FromDays(self.config.warmup_period))
self.actions = [0, 1, 2] # Hold, Long, Short
self.state_dim = 9 # Daily trading state features
self.action_dim = 1
self.training_data_loaded = False
self.training_index = None
self.training_data_length = None
self.last_training_action_time = None
self.training_action_interval = timedelta(days=1)
self.epsilon = self.config.epsilon
self.epsilon_min = self.config.epsilon_min
self.epsilon_decay = self.config.epsilon_decay
self.best_avg_reward = -float('inf')
self.no_improvement_count = 0
self.model_reset_count = 0
self.consecutive_negative_rewards = 0
self.log_count = 0
self.episode_rewards = []
self.daily_returns = []
self.sharpe_ratios = []
self.performance_stats = {}
self.strategy_accuracy = {'trend': [], 'mean_reversion': [], 'macd_cross': []}
self.transition_model = TransitionModel(self.state_dim, self.action_dim, self.config.random_seed)
self.reward_model = RewardModel(self.state_dim, self.action_dim, self.config.random_seed)
self.replay_buffer = ReplayBuffer(self.config.replay_buffer_size)
random.seed(self.config.random_seed)
np.random.seed(self.config.random_seed)
self.days_since_model_update = 0
self.q_table = defaultdict(lambda: np.zeros(len(self.actions)))
self.current_episode = 0
self.training_complete = False
self.current_episode_reward = 0
self.last_action = 0
self.model_prediction_errors = []
self.Debug(f"Model prefix: {self.config.get_model_prefix()}")
def LogWithRateLimit(self, message, force=False):
self.log_count += 1
if force or self.log_count % 5 == 0:
self.Debug(message)
def LoadTrainingData(self):
try:
history = self.History(self.symbol, 300, Resolution.Daily)
if history.empty or len(history) < self.config.min_training_days:
self.LogWithRateLimit("Not enough daily historical data to train.", True)
self.training_complete = True
self.training_data_length = 0
self.training_index = 0
return
self.LogWithRateLimit(f"Loaded {len(history)} daily bars for training.", True)
history = history.loc[self.symbol]
self.tr_closes = history['close'].values
self.tr_highs = history['high'].values
self.tr_lows = history['low'].values
self.tr_volumes = history['volume'].values
for i in range(1, len(self.tr_closes)):
self.daily_returns.append((self.tr_closes[i] - self.tr_closes[i-1]) / self.tr_closes[i-1])
for i in range(len(self.tr_closes)):
self.trading_model.update(self.tr_closes[i], self.tr_highs[i], self.tr_lows[i], self.tr_volumes[i])
self.tr_raw_states = []
for i in range(len(self.tr_closes)):
if i < 20:
self.tr_raw_states.append(np.zeros(self.state_dim))
else:
state_features = self.trading_model.get_state_features()
self.tr_raw_states.append(state_features)
self.tr_raw_states = np.array(self.tr_raw_states)
self.tr_signals = []
for i in range(len(self.tr_closes)):
if i < 20:
default_signal = {'action': 0, 'confidence': 0, 'trend': 0, 'mean_reversion': 0, 'macd_cross': 0}
self.tr_signals.append(default_signal)
else:
signal = self.trading_model.get_trade_signals()
signal['trend'] = signal.get('trend', 0)
signal['mean_reversion'] = signal.get('mean_reversion', 0)
signal['macd_cross'] = signal.get('macd_cross', 0)
self.tr_signals.append(signal)
self.training_data_length = len(self.tr_closes)
self.training_index = min(60, self.training_data_length - 2)
self.LogWithRateLimit(f"Training data length: {self.training_data_length}", True)
self.LogWithRateLimit(f"Starting Episode {self.current_episode+1}.", True)
except Exception as e:
self.LogWithRateLimit(f"Error loading training data: {str(e)}", True)
self.training_complete = True
self.training_data_length = 0
self.training_index = 0
def DiscretizeState(self, state_vector):
discretized = []
price_change = np.clip(state_vector[0], -0.05, 0.05)
price_change_bin = int((price_change + 0.05) / 0.1 * 5)
price_change_bin = min(4, max(0, price_change_bin))
discretized.append(price_change_bin)
volume_ratio = state_vector[1]
volume_bin = 0 if volume_ratio < 0.7 else (2 if volume_ratio > 1.3 else 1)
discretized.append(volume_bin)
rsi = state_vector[2] * 100
if rsi < 30:
rsi_bin = 0
elif rsi < 45:
rsi_bin = 1
elif rsi < 55:
rsi_bin = 2
elif rsi < 70:
rsi_bin = 3
else:
rsi_bin = 4
discretized.append(rsi_bin)
macd = state_vector[3]
macd_bin = 0 if macd < -0.01 else (2 if macd > 0.01 else 1)
discretized.append(macd_bin)
sma_bin = 0 if state_vector[4] == -1 else 1
discretized.append(sma_bin)
volatility = state_vector[5]
vol_bin = 0 if volatility < 0.01 else (2 if volatility > 0.03 else 1)
discretized.append(vol_bin)
trend = state_vector[6]
trend_bin = int((trend + 1) / 2 * 3)
trend_bin = min(2, max(0, trend_bin))
discretized.append(trend_bin)
sr_proximity = state_vector[8]
price_level = state_vector[7]
if sr_proximity == -1:
level_bin = 0
elif sr_proximity == 1:
level_bin = 2
else:
level_bin = 1
discretized.append(level_bin)
return tuple(discretized)
def ResetModels(self):
self.LogWithRateLimit(f"Resetting models (reset #{self.model_reset_count+1})", True)
self.transition_model = TransitionModel(self.state_dim, self.action_dim, self.config.random_seed)
self.reward_model = RewardModel(self.state_dim, self.action_dim, self.config.random_seed)
self.replay_buffer.clear()
self.model_update_failures = 0
self.model_prediction_errors = []
self.model_reset_count += 1
self.consecutive_negative_rewards = 0
self.epsilon = min(0.8, max(0.5, self.epsilon * 1.2))
def TrainStep(self):
if self.training_data_length is None or self.training_index is None:
self.LogWithRateLimit("Training data not loaded. Skipping TrainStep.")
return
if self.training_data_length <= 60:
self.LogWithRateLimit("Insufficient training data length. Stopping training.", True)
self.training_complete = True
return
if self.consecutive_negative_rewards >= self.config.max_consecutive_negative_rewards:
if self.model_reset_count < 2:
self.ResetModels()
else:
self.LogWithRateLimit("Early stopping due to consecutive negative rewards.", True)
self.training_complete = True
self.EvaluateTrainingResults()
self.SaveQTable()
return
if self.training_index >= self.training_data_length - 1:
self.episode_rewards.append(self.current_episode_reward)
episode_returns = []
if len(self.episode_rewards) > 1:
returns = []
for i in range(1, len(self.episode_rewards)):
prev_reward = self.episode_rewards[i-1]
if abs(prev_reward) > 1e-6:
returns.append((self.episode_rewards[i] - prev_reward) / abs(prev_reward))
else:
returns.append(0)
episode_returns = np.nan_to_num(returns, nan=0)
if len(episode_returns) > 0:
sharpe = self.CalculateSharpe(episode_returns)
self.sharpe_ratios.append(sharpe)
self.current_episode += 1
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
should_log = (self.current_episode % self.config.log_frequency == 0 or
self.current_episode == 1 or
self.current_episode == self.config.episodes)
if should_log:
self.LogWithRateLimit(f"Episode {self.current_episode} done. Reward: {self.current_episode_reward:.2f}, Epsilon: {self.epsilon:.3f}", True)
if len(self.model_prediction_errors) > 0 and should_log:
median_error = np.median(self.model_prediction_errors)
self.LogWithRateLimit(f"Model prediction median error: {median_error:.4f}")
if self.current_episode > 1:
recent_window = min(3, len(self.episode_rewards))
current_avg_reward = np.mean(self.episode_rewards[-recent_window:])
if current_avg_reward <= 0:
self.consecutive_negative_rewards += 1
else:
self.consecutive_negative_rewards = 0
if current_avg_reward > self.best_avg_reward + self.config.min_improvement:
self.best_avg_reward = current_avg_reward
self.no_improvement_count = 0
self.SaveQTable("best_qtable.pkl")
else:
self.no_improvement_count += 1
if self.no_improvement_count >= self.config.patience:
if self.model_reset_count < 2:
self.ResetModels()
self.no_improvement_count = 0
else:
self.LogWithRateLimit(f"No improvement for {self.config.patience} episodes. Stopping early.", True)
self.training_complete = True
self.EvaluateTrainingResults()
self.SaveQTable()
return
if self.current_episode >= self.config.episodes:
self.training_complete = True
self.EvaluateTrainingResults()
self.SaveQTable()
return
self.training_index = min(60, self.training_data_length - 2)
self.current_episode_reward = 0
self.last_action = 0
return
i = self.training_index
try:
current_state_raw = self.tr_raw_states[i].copy()
current_state_tuple = self.DiscretizeState(current_state_raw)
investor_signal = self.tr_signals[i]
self.days_since_model_update += 1
if (self.days_since_model_update >= self.config.model_train_freq and
len(self.replay_buffer) >= self.config.min_real_samples):
success = self.UpdateDynamicsModel()
if not success:
self.model_update_failures += 1
if self.model_update_failures >= self.config.max_failed_model_updates:
if self.model_reset_count < 2:
self.ResetModels()
else:
self.training_complete = True
self.EvaluateTrainingResults()
self.SaveQTable()
return
else:
self.model_update_failures = 0
self.days_since_model_update = 0
if random.random() < 0.3 and investor_signal['confidence'] > 0.4:
action = investor_signal['action']
elif random.random() < self.epsilon:
action = random.choice(self.actions)
else:
action = int(np.argmax(self.q_table[current_state_tuple]))
if i + 1 >= len(self.tr_raw_states):
self.LogWithRateLimit(f"Index {i+1} out of range in training data.")
self.training_complete = True
return
real_next_state_raw = self.tr_raw_states[i+1].copy()
real_next_state_tuple = self.DiscretizeState(real_next_state_raw)
price_return = (self.tr_closes[i+1] - self.tr_closes[i]) / self.tr_closes[i]
reward = self.CalculateDailyReward(action, self.tr_closes[i], self.tr_closes[i+1], self.tr_signals[i])
self.current_episode_reward += reward
scaled_reward = reward * self.config.reward_scaling
self.replay_buffer.add(current_state_raw, action, scaled_reward, real_next_state_raw)
if self.transition_model.trained and self.reward_model.trained:
try:
predicted_next_state = self.transition_model.predict(current_state_raw, action)
if predicted_next_state is not None:
prediction_error = np.mean(((real_next_state_raw - predicted_next_state) / (np.maximum(np.abs(real_next_state_raw), 1e-6)))**2)
self.model_prediction_errors.append(min(prediction_error, 100.0))
except Exception:
pass
for strategy_name in ['trend', 'mean_reversion', 'macd_cross']:
signal_value = investor_signal.get(strategy_name, 0)
if signal_value != 0:
is_correct = (signal_value > 0 and price_return > 0) or (signal_value < 0 and price_return < 0)
self.strategy_accuracy[strategy_name].append(1 if is_correct else 0)
if random.random() < 0.5:
best_next_action = int(np.argmax(self.q_table[real_next_state_tuple]))
td_target = scaled_reward + self.config.discount_factor * self.q_table[real_next_state_tuple][best_next_action]
td_error = td_target - self.q_table[current_state_tuple][action]
self.q_table[current_state_tuple][action] += self.config.learning_rate * td_error
if (self.transition_model.trained and self.reward_model.trained and
len(self.replay_buffer) >= self.config.min_real_samples):
self.GenerateSyntheticExperience(current_state_raw, action)
self.last_action = action
self.training_index += 1
except Exception as e:
self.LogWithRateLimit(f"Error in TrainStep: {str(e)}")
self.training_index = self.training_data_length
def CalculateSharpe(self, returns, risk_free_rate=0.0):
if len(returns) < 2:
return 0.0
returns_array = np.array(returns)
returns_array = returns_array[~np.isnan(returns_array)]
if len(returns_array) < 2:
return 0.0
excess_returns = returns_array - risk_free_rate
std_dev = np.std(excess_returns)
if std_dev == 0:
return 0.0
return np.mean(excess_returns) / std_dev * np.sqrt(252)
def CalculateDailyReward(self, action, current_price, next_price, signal_info):
if current_price <= 0 or not np.isfinite(current_price) or not np.isfinite(next_price):
return 0
price_return = (next_price - current_price) / current_price
commission_penalty = self.config.commission_rate * 0.5
if action == 1:
reward = price_return - commission_penalty
elif action == 2:
reward = -price_return - commission_penalty
else:
reward = 0.0
if abs(price_return) < 0.01:
reward = 0.001
if action == signal_info['action']:
reward += 0.01
else:
reward -= 0.005
if self.last_action == action and action != 0 and abs(price_return) < 0.02:
reward += 0.005
if action != 0 and abs(price_return) > 0.05:
reward -= 0.005
return reward
def UpdateDynamicsModel(self):
if len(self.replay_buffer) < self.config.min_real_samples:
return False
sample_size = min(len(self.replay_buffer), 2000)
states, actions, rewards, next_states = self.replay_buffer.sample(sample_size)
transition_success = self.transition_model.train(states, actions, next_states)
reward_success = self.reward_model.train(states, actions, rewards)
return transition_success and reward_success
def GenerateSyntheticExperience(self, init_state, init_action):
if not (self.transition_model.trained and self.reward_model.trained):
return
num_synthetic = min(self.config.synthetic_data_ratio, 3)
for _ in range(num_synthetic):
state = init_state.copy()
action = init_action
for _ in range(min(self.config.model_rollout_length, 3)):
next_state = self.transition_model.predict(state, action)
if next_state is None:
break
try:
states_sample, _, _, _ = self.replay_buffer.sample(min(500, len(self.replay_buffer)))
distances = np.sqrt(np.sum((states_sample - next_state.reshape(1, -1)) ** 2, axis=1))
if np.min(distances) > 5.0:
break
reward = self.reward_model.predict(state, action) * 0.8
state_tuple = self.DiscretizeState(state)
next_state_tuple = self.DiscretizeState(next_state)
next_q_values = self.q_table[next_state_tuple]
expected_q = sum([self.GetActionProbability(next_state_tuple, a) * next_q_values[a]
for a in range(len(self.actions))])
td_target = reward + self.config.discount_factor * expected_q
td_error = td_target - self.q_table[state_tuple][action]
self.q_table[state_tuple][action] += self.config.learning_rate * 0.3 * td_error
state = next_state
best_next_action = int(np.argmax(self.q_table[next_state_tuple]))
if random.random() < max(self.epsilon, 0.4):
action = random.choice(self.actions)
else:
action = best_next_action
except Exception:
break
def GetActionProbability(self, state_tuple, action):
q_values = self.q_table[state_tuple]
max_q = np.max(q_values)
probs = np.ones(len(self.actions)) * self.epsilon / len(self.actions)
best_actions = np.where(q_values == max_q)[0]
probs[best_actions] += (1 - self.epsilon) / len(best_actions)
return probs[action]
def EvaluateTrainingResults(self):
avg_reward = np.mean(self.episode_rewards) if self.episode_rewards else 0
max_reward = max(self.episode_rewards) if self.episode_rewards else 0
min_reward = min(self.episode_rewards) if self.episode_rewards else 0
avg_sharpe = np.mean(self.sharpe_ratios) if self.sharpe_ratios else 0
model_error = np.median(self.model_prediction_errors) if self.model_prediction_errors else float('inf')
reward_std = np.std(self.episode_rewards) if len(self.episode_rewards) > 1 else float('inf')
reward_consistency = 0 if reward_std == float('inf') or reward_std == 0 else avg_reward / reward_std
positive_episodes = sum(1 for r in self.episode_rewards if r > 0)
total_episodes = len(self.episode_rewards) if self.episode_rewards else 0
success_rate = positive_episodes / total_episodes if total_episodes > 0 else 0
strategy_accuracy = {}
for strategy, results in self.strategy_accuracy.items():
strategy_accuracy[strategy] = sum(results) / len(results) if results else 0
strategy_values = [acc for acc in strategy_accuracy.values() if acc > 0]
avg_strategy_accuracy = np.mean(strategy_values) if strategy_values else 0
self.performance_stats = {
'avg_reward': avg_reward,
'max_reward': max_reward,
'min_reward': min_reward,
'reward_std': reward_std,
'reward_consistency': reward_consistency,
'success_rate': success_rate,
'avg_sharpe': avg_sharpe,
'model_error': model_error,
'episodes_completed': self.current_episode,
'total_episodes': self.config.episodes,
'strategy_accuracy': strategy_accuracy,
'avg_strategy_accuracy': avg_strategy_accuracy
}
self.Debug("=" * 40)
self.Debug("TRAINING PERFORMANCE SUMMARY")
self.Debug("=" * 40)
self.Debug(f"Episodes: {self.current_episode}/{self.config.episodes}")
self.Debug(f"Avg Reward: {avg_reward:.4f} [{min_reward:.4f}, {max_reward:.4f}]")
self.Debug(f"Reward Consistency: {reward_consistency:.4f}")
self.Debug(f"Success Rate: {success_rate:.1%}")
self.Debug(f"Sharpe Ratio: {avg_sharpe:.4f}")
self.Debug(f"Model Error: {model_error:.4f}")
self.Debug("-" * 30)
self.Debug("STRATEGY ACCURACY")
for strategy, accuracy in strategy_accuracy.items():
self.Debug(f"{strategy.replace('_', ' ').title()}: {accuracy:.1%}")
self.Debug("=" * 40)
good_avg_reward = avg_reward >= self.config.min_avg_reward
good_success_rate = success_rate >= self.config.success_rate_threshold
good_sharpe = avg_sharpe >= self.config.sharpe_ratio_threshold
good_strategy = avg_strategy_accuracy >= self.config.strategy_accuracy_threshold
reasons = []
if good_avg_reward:
reasons.append("Average reward meets minimum threshold")
else:
reasons.append("Average reward below minimum threshold")
if good_success_rate:
reasons.append("Success rate meets the threshold")
else:
reasons.append("Success rate below threshold")
if good_sharpe:
reasons.append("Sharpe ratio meets threshold")
else:
reasons.append("Sharpe ratio below threshold")
if good_strategy:
reasons.append("Strategy accuracy meets threshold")
else:
reasons.append("Strategy accuracy needs improvement")
is_ready = good_avg_reward and good_success_rate and good_sharpe and good_strategy
self.performance_stats['is_ready_for_backtest'] = is_ready
self.performance_stats['recommendation_reasons'] = reasons
self.Debug("BACKTEST RECOMMENDATION")
self.Debug("=" * 40)
if is_ready:
self.Debug("RECOMMENDATION: PROCEED WITH BACKTEST")
else:
self.Debug("RECOMMENDATION: FURTHER TRAINING NEEDED")
self.Debug("Rationale:")
for reason in reasons:
self.Debug(f"- {reason}")
self.Debug("=" * 40)
try:
self.LoadBestQTable()
except:
pass
self.SaveModelStats()
def SaveQTable(self, filename=None):
if filename is None:
filename = "qtable.pkl"
prefixed_filename = f"{self.config.get_model_prefix()}_{filename}"
try:
qtable_to_save = dict(self.q_table)
qtable_data = pickle.dumps(qtable_to_save)
qtable_data_b64 = base64.b64encode(qtable_data).decode('utf-8')
if self.ObjectStore is not None:
self.ObjectStore.Save(prefixed_filename, qtable_data_b64)
self.LogWithRateLimit(f"QTable saved as '{prefixed_filename}'", True)
except Exception as e:
self.LogWithRateLimit(f"Error saving QTable: {str(e)}", True)
def SaveModelStats(self):
try:
stats = {
'avg_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
'max_reward': max(self.episode_rewards) if self.episode_rewards else 0,
'min_reward': min(self.episode_rewards) if self.episode_rewards else 0,
'episodes': self.current_episode,
'sharpe_ratio': np.mean(self.sharpe_ratios) if self.sharpe_ratios else 0,
'training_complete': self.training_complete,
'timestamp': str(self.Time),
'random_seed': self.config.random_seed,
'model_version': self.config.model_version,
'performance_summary': self.performance_stats
}
stats_data = pickle.dumps(stats)
stats_data_b64 = base64.b64encode(stats_data).decode('utf-8')
filename = f"{self.config.get_model_prefix()}_model_stats.pkl"
if self.ObjectStore is not None:
self.ObjectStore.Save(filename, stats_data_b64)
self.LogWithRateLimit(f"Model stats saved as '{filename}'", True)
except Exception as e:
self.LogWithRateLimit(f"Error saving model stats: {str(e)}", True)
def LoadBestQTable(self):
try:
best_qtable_filename = f"{self.config.get_model_prefix()}_best_qtable.pkl"
qtable_data_b64 = self.ObjectStore.Read(best_qtable_filename)
if qtable_data_b64 is not None:
qtable_data = base64.b64decode(qtable_data_b64)
loaded_dict = pickle.loads(qtable_data)
self.q_table = defaultdict(lambda: np.zeros(len(self.actions)), loaded_dict)
self.LogWithRateLimit(f"Loaded best QTable: {best_qtable_filename}", True)
return True
except Exception as e:
self.LogWithRateLimit(f"Could not load best QTable: {str(e)}")
return False
def OnData(self, data):
if self.IsWarmingUp:
return
current_time = self.Time
if not self.training_data_loaded:
self.LoadTrainingData()
self.training_data_loaded = True
self.last_training_action_time = current_time
if (self.last_training_action_time is None or
current_time - self.last_training_action_time >= self.training_action_interval):
if self.training_data_length and self.training_data_length > 1:
self.TrainStep()
self.last_training_action_time = current_time
def OnEndOfAlgorithm(self):
if not self.training_complete:
self.LogWithRateLimit("Algorithm ending, finalizing training...", True)
self.EvaluateTrainingResults()
self.SaveQTable()