| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 30000.00 End Equity 30000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.404 Tracking Error 0.379 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from AlgorithmImports import *
import numpy as np, pandas as pd, random
from collections import defaultdict, deque
import pickle, base64
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
class Config:
def __init__(self, algorithm):
self.mode, self.start_date, self.end_date = "training", datetime(2019,1,1), datetime(2025,3,31)
self.initial_cash, self.allocation = 30000, 0.2
self.commission_rate = float(algorithm.GetParameter("commission_rate") or 0.001)
self.slippage = float(algorithm.GetParameter("slippage") or 0.0)
self.benchmark_symbol, self.trading_symbol = "BTCUSDT", "ETHUSDT"
self.exchange = algorithm.GetParameter("exchange", "binance")
self.resolution_training = Resolution.Daily
self.model_version = algorithm.GetParameter("model_version", "v01")
self.random_seed = int(algorithm.GetParameter("random_seed", "138") or 42)
self.trend_confirmation_period, self.profit_target_pct, self.stop_loss_pct = 3, 0.03, 0.02
self.state_config = {'price_bins':12, 'volume_bins':8, 'rsi_bins':6, 'macd_bins':5, 'sma_cross':True, 'volatility_bins':4, 'trend_strength_bins':5, 'price_level_bins':7}
self.learning_rate, self.discount_factor = 0.1, 0.95
self.epsilon, self.epsilon_min, self.epsilon_decay = 1.0, 0.1, 0.98
self.episodes, self.min_avg_reward = 30, -0.3
self.model_rollout_length, self.model_train_freq = 3, 20
self.min_real_samples, self.replay_buffer_size = 200, 8000
self.patience, self.min_improvement = 100, 0.0001
self.min_training_days, self.warmup_period = 60, 100
self.reward_scaling, self.log_frequency = 2.0, 1
self.max_consecutive_failures = 5
self.sma_short_period = int(algorithm.GetParameter("sma_short") or 20)
self.sma_long_period = int(algorithm.GetParameter("sma_long") or 60)
self.ema_short_period, self.rsi_period = 9, 14
self.rsi_overbought, self.rsi_oversold = 70, 30
self.macd_fast, self.macd_slow, self.macd_signal = 12, 26, 9
self.success_rate_threshold = 0.52
self.sharpe_ratio_threshold = 0.8
self.strategy_accuracy_threshold = 0.48
def get_market(self): return {"binance":Market.Binance, "coinbase":Market.GDAX, "gdax":Market.GDAX, "bitfinex":Market.Bitfinex, "kraken":Market.Kraken}.get(self.exchange.lower(), Market.Binance)
def get_model_prefix(self): return f"{self.model_version}_{self.random_seed}"
class DailyTradingModel:
def __init__(self, config):
self.config = config
self.close_prices, self.high_prices, self.low_prices, self.volume = [], [], [], []
self.daily_returns, self.sma_short, self.sma_long, self.ema_short = [], [], [], []
self.rsi, self.macd, self.macd_signal, self.macd_hist = [], [], [], []
self.volatility_history, self.support_levels, self.resistance_levels = [], [], []
self.trend_direction = self.trend_strength = self.avg_gain = self.avg_loss = 0
def update(self, price, high, low, volume):
self.close_prices.append(price)
self.high_prices.append(high)
self.low_prices.append(low)
self.volume.append(volume)
self.daily_returns.append((price - self.close_prices[-2])/self.close_prices[-2] if len(self.close_prices)>1 else 0)
self.volatility_history.append(np.std(self.close_prices[-20:])/np.mean(self.close_prices[-20:]) if len(self.close_prices)>=20 else 0)
self._update_indicators()
self._update_trend_analysis()
self._update_support_resistance()
def _update_indicators(self):
closes = np.array(self.close_prices)
self.sma_short.append(np.mean(closes[-self.config.sma_short_period:]) if len(closes)>=self.config.sma_short_period else closes[-1] if closes.size>0 else 0)
self.sma_long.append(np.mean(closes[-self.config.sma_long_period:]) if len(closes)>=self.config.sma_long_period else closes[-1] if closes.size>0 else 0)
self._calculate_ema(closes, self.ema_short, self.config.ema_short_period)
self._calculate_rsi(closes)
self._calculate_macd(closes)
def _calculate_ema(self, prices, ema_list, period):
if len(prices) == 0: ema_list.append(0); return
if len(ema_list) == 0:
ema_list.append(np.mean(prices[-period:]) if len(prices)>=period else prices[-1])
return
multiplier = 2/(period+1)
ema_list.append((prices[-1]-ema_list[-1])*multiplier + ema_list[-1])
def _calculate_rsi(self, prices):
if len(prices) <= self.config.rsi_period:
self.rsi.append(50)
return
delta = np.diff(prices)
if self.avg_gain is None or self.avg_loss is None:
gains = [x if x>0 else 0 for x in delta[-self.config.rsi_period:]]
losses = [-x if x<0 else 0 for x in delta[-self.config.rsi_period:]]
self.avg_gain, self.avg_loss = np.mean(gains), np.mean(losses)
else:
current_gain = delta[-1] if delta[-1]>0 else 0
current_loss = -delta[-1] if delta[-1]<0 else 0
self.avg_gain = ((self.avg_gain*(self.config.rsi_period-1))+current_gain)/self.config.rsi_period
self.avg_loss = ((self.avg_loss*(self.config.rsi_period-1))+current_loss)/self.config.rsi_period
self.rsi.append(100 if self.avg_loss==0 else 100-(100/(1+self.avg_gain/self.avg_loss)))
def _calculate_macd(self, prices):
if len(prices) < self.config.macd_slow:
self.macd.append(0)
self.macd_signal.append(0)
self.macd_hist.append(0)
return
ema12 = pd.Series(prices).ewm(span=self.config.macd_fast, adjust=False).mean()
ema26 = pd.Series(prices).ewm(span=self.config.macd_slow, adjust=False).mean()
macd_line = ema12.iloc[-1]-ema26.iloc[-1]
macd_values = ema12-ema26
signal_line = macd_values.ewm(span=self.config.macd_signal, adjust=False).mean().iloc[-1]
self.macd.append(macd_line)
self.macd_signal.append(signal_line)
self.macd_hist.append(macd_line-signal_line)
def _update_trend_analysis(self):
if len(self.close_prices) < self.config.trend_confirmation_period+1:
self.trend_direction = self.trend_strength = 0
return
short_term_change = (self.close_prices[-1]/self.close_prices[-self.config.trend_confirmation_period])-1
if len(self.ema_short)>=2 and len(self.sma_short)>=2 and len(self.sma_long)>=2:
signals = [short_term_change>0, self.sma_short[-1]>self.sma_long[-1],
(self.ema_short[-1]/self.ema_short[-2])-1>0,
(self.sma_short[-1]/self.sma_short[-2])-1>0,
(self.sma_long[-1]/self.sma_long[-2])-1>0]
positive = sum(signals)
if positive > len(signals)-positive+1:
self.trend_direction, self.trend_strength = 1, min(10,int((positive/len(signals))*10))
elif len(signals)-positive > positive+1:
self.trend_direction, self.trend_strength = -1, min(10,int(((len(signals)-positive)/len(signals))*10))
else:
self.trend_direction, self.trend_strength = 0, min(5,abs(positive-(len(signals)-positive)))
else:
self.trend_direction = 1 if short_term_change>0.01 else (-1 if short_term_change<-0.01 else 0)
self.trend_strength = min(5,int(abs(short_term_change*100)))
def _update_support_resistance(self):
if len(self.close_prices)<20: return
prices = np.array(self.close_prices)
window = min(5,len(prices)//10)
if len(self.close_prices)%20==0:
self.support_levels, self.resistance_levels = [], []
for i in range(window,len(prices)-window):
if all(prices[i]>prices[i-j] for j in range(1,window+1)) and all(prices[i]>prices[i+j] for j in range(1,window+1)):
self.resistance_levels.append(prices[i])
if all(prices[i]<prices[i-j] for j in range(1,window+1)) and all(prices[i]<prices[i+j] for j in range(1,window+1)):
self.support_levels.append(prices[i])
self.support_levels = self.support_levels[-5:] if self.support_levels else [min(prices)]
self.resistance_levels = self.resistance_levels[-5:] if self.resistance_levels else [max(prices)]
def get_state_features(self):
if len(self.close_prices)<2: return np.zeros(9)
price_change = (self.close_prices[-1]/self.close_prices[-2])-1
volume_ratio = self.volume[-1]/np.mean(self.volume[-10:]) if len(self.volume)>=10 else 1.0
rsi_value = self.rsi[-1] if self.rsi else 50
macd_hist_value = self.macd_hist[-1] if self.macd_hist else 0
sma_cross = 1 if (len(self.sma_short)>0 and len(self.sma_long)>0 and self.sma_short[-1]>self.sma_long[-1]) else -1
volatility = self.volatility_history[-1] if self.volatility_history else 0
trend_signal = self.trend_direction
price_level, sr_proximity = 0.5, 0
if self.support_levels and self.resistance_levels:
closest_support = min(self.support_levels,key=lambda x:abs(x-self.close_prices[-1]))
closest_resistance = min(self.resistance_levels,key=lambda x:abs(x-self.close_prices[-1]))
range_total = closest_resistance-closest_support
if range_total>0:
price_level = min(1.0,max(0.0,(self.close_prices[-1]-closest_support)/range_total))
support_dist = (self.close_prices[-1]-closest_support)/self.close_prices[-1] if closest_support>0 else 1
resist_dist = (closest_resistance-self.close_prices[-1])/self.close_prices[-1] if closest_resistance>0 else 1
if support_dist<resist_dist and support_dist<0.02: sr_proximity = -1
elif resist_dist<support_dist and resist_dist<0.02: sr_proximity = 1
return np.array([price_change,volume_ratio,rsi_value/100,macd_hist_value,
sma_cross,volatility,trend_signal,price_level,sr_proximity])
def get_trade_signals(self):
trend_signal = trend_conf = mean_rev_signal = mean_rev_conf = macd_signal = macd_conf = 0
if len(self.sma_short)>0 and len(self.sma_long)>0:
trend_signal = 1 if self.sma_short[-1]>self.sma_long[-1] else -1
trend_conf = abs(self.sma_short[-1]-self.sma_long[-1])/self.sma_long[-1]
if self.rsi:
rsi_value = self.rsi[-1]
if rsi_value<self.config.rsi_oversold:
mean_rev_signal, mean_rev_conf = 1, (self.config.rsi_oversold-rsi_value)/self.config.rsi_oversold
elif rsi_value>self.config.rsi_overbought:
mean_rev_signal, mean_rev_conf = -1, (rsi_value-self.config.rsi_overbought)/(100-self.config.rsi_overbought)
if len(self.macd)>0 and len(self.macd_signal)>0:
macd_signal = 1 if self.macd[-1]>self.macd_signal[-1] else -1
macd_conf = abs(self.macd[-1]-self.macd_signal[-1])/(abs(self.macd_signal[-1])+1e-6)
weighted = trend_signal*trend_conf*0.4 + mean_rev_signal*mean_rev_conf*0.3 + macd_signal*macd_conf*0.3
return {'action': 1 if weighted>0.2 else (2 if weighted<-0.2 else 0),
'confidence': min(1.0,abs(weighted)) if abs(weighted)>=0.2 else 0.0,
'trend': trend_signal, 'mean_reversion': mean_rev_signal, 'macd_cross': macd_signal}
class TransitionModel:
def __init__(self, state_dim, action_dim, random_seed=42):
self.state_dim, self.action_dim, self.random_seed = state_dim, action_dim, random_seed
self.models = [Pipeline([('scaler',RobustScaler()),
('regressor',GradientBoostingRegressor(n_estimators=40,max_depth=3,
learning_rate=0.05,subsample=0.8,min_samples_leaf=10,
random_state=random_seed))]) for _ in range(state_dim)]
self.trained = False
self.validation_error = float('inf')
self.train_attempts = self.consecutive_failures = 0
self.best_models, self.best_error = None, float('inf')
def train(self, states, actions, next_states):
if len(states)<200: return False
self.train_attempts += 1
X = np.hstack((states,actions.reshape(-1,1)))
try:
X_train,X_val,y_train,y_val = train_test_split(X,next_states,test_size=0.2,
random_state=self.random_seed+self.train_attempts)
total_error = 0
for i in range(self.state_dim):
self.models[i].fit(X_train,y_train[:,i])
y_pred = self.models[i].predict(X_val)
total_error += mean_squared_error(y_val[:,i],y_pred)
new_validation_error = total_error/self.state_dim
if new_validation_error<self.validation_error or not self.trained:
self.validation_error = new_validation_error
self.trained = True
self.consecutive_failures = 0
if new_validation_error<self.best_error:
import copy
self.best_models = copy.deepcopy(self.models)
self.best_error = new_validation_error
return True
else:
self.consecutive_failures += 1
if self.best_models is not None and new_validation_error>1.5*self.best_error:
self.models = self.best_models
self.validation_error = self.best_error
return True
except Exception:
self.consecutive_failures += 1
return False
def predict(self, state, action):
if not self.trained: return None
try:
X = np.hstack((state.reshape(1,-1),np.array([[action]])))
return np.array([m.predict(X)[0] for m in self.models])
except Exception: return None
def needs_reset(self, max_failures):
return self.consecutive_failures >= max_failures
class RewardModel:
def __init__(self, state_dim, action_dim, random_seed=42):
self.state_dim, self.action_dim, self.random_seed = state_dim, action_dim, random_seed
self.model = Pipeline([('scaler',RobustScaler()),
('regressor',GradientBoostingRegressor(n_estimators=30,max_depth=3,
learning_rate=0.05,subsample=0.8,min_samples_leaf=10,
random_state=random_seed))])
self.trained = False
self.validation_error = float('inf')
self.train_attempts = self.consecutive_failures = 0
self.best_model, self.best_error = None, float('inf')
def train(self, states, actions, rewards):
if len(states)<200: return False
self.train_attempts += 1
X = np.hstack((states,actions.reshape(-1,1)))
try:
X_train,X_val,y_train,y_val = train_test_split(X,rewards,test_size=0.2,
random_state=self.random_seed+self.train_attempts)
self.model.fit(X_train,y_train)
y_pred = self.model.predict(X_val)
new_validation_error = mean_squared_error(y_val,y_pred)
if new_validation_error<self.validation_error or not self.trained:
self.validation_error = new_validation_error
self.trained = True
self.consecutive_failures = 0
if new_validation_error<self.best_error:
import copy
self.best_model = copy.deepcopy(self.model)
self.best_error = new_validation_error
return True
else:
self.consecutive_failures += 1
if self.best_model is not None and new_validation_error>1.5*self.best_error:
self.model = self.best_model
self.validation_error = self.best_error
return True
except Exception:
self.consecutive_failures += 1
return False
def predict(self, state, action):
if not self.trained: return 0.0
try:
X = np.hstack((state.reshape(1,-1),np.array([[action]])))
return self.model.predict(X)[0]
except Exception: return 0.0
def needs_reset(self, max_failures):
return self.consecutive_failures >= max_failures
class ReplayBuffer:
def __init__(self, buffer_size=10000):
self.buffer = deque(maxlen=buffer_size)
def add(self, state, action, reward, next_state):
self.buffer.append((state,action,reward,next_state))
def sample(self, batch_size):
batch_size = min(len(self.buffer),batch_size)
batch = random.sample(self.buffer,batch_size)
return map(np.array,zip(*batch))
def __len__(self): return len(self.buffer)
def clear(self): self.buffer.clear()
class MBPOTrainingAlgorithm(QCAlgorithm):
def Initialize(self):
self.Debug("Initializing Daily Trading MBPO Training Algorithm...")
self.config = Config(self)
self.SetStartDate(self.config.start_date)
self.SetEndDate(self.config.end_date)
self.SetCash(self.config.initial_cash)
self.Debug(f"Using random seed: {self.config.random_seed}")
market = self.config.get_market()
self.SetBrokerageModel(DefaultBrokerageModel(AccountType.Margin))
self.btcSymbol = self.AddCrypto(self.config.benchmark_symbol,
self.config.resolution_training,market).Symbol
self.SetBenchmark(self.btcSymbol)
self.symbol = self.AddCrypto(self.config.trading_symbol,
self.config.resolution_training,market).Symbol
self.trading_model = DailyTradingModel(self.config)
self.sma20 = self.SMA(self.symbol,self.config.sma_short_period,self.config.resolution_training)
self.sma50 = self.SMA(self.symbol,self.config.sma_long_period,self.config.resolution_training)
self.rsi = self.RSI(self.symbol,self.config.rsi_period,MovingAverageType.Simple,
self.config.resolution_training)
self.macd = self.MACD(self.symbol,self.config.macd_fast,self.config.macd_slow,
self.config.macd_signal,MovingAverageType.Exponential,
self.config.resolution_training)
self.SetWarmUp(TimeSpan.FromDays(self.config.warmup_period))
self.actions = [0,1,2]
self.state_dim, self.action_dim = 9, 1
self.training_data_loaded = False
self.training_index = self.training_data_length = None
self.last_training_action_time = None
self.training_action_interval = timedelta(days=1)
self.epsilon = self.config.epsilon
self.epsilon_min = self.config.epsilon_min
self.epsilon_decay = self.config.epsilon_decay
self.best_avg_reward = -float('inf')
self.no_improvement_count = self.model_reset_count = 0
self.consecutive_negative_rewards = self.log_count = 0
self.episode_rewards = []
self.daily_returns = []
self.sharpe_ratios = []
self.performance_stats = {}
self.strategy_accuracy = {'trend':[],'mean_reversion':[],'macd_cross':[]}
self.transition_model = TransitionModel(self.state_dim,self.action_dim,self.config.random_seed)
self.reward_model = RewardModel(self.state_dim,self.action_dim,self.config.random_seed)
self.replay_buffer = ReplayBuffer(self.config.replay_buffer_size)
random.seed(self.config.random_seed)
np.random.seed(self.config.random_seed)
self.days_since_model_update = 0
self.q_table = defaultdict(lambda:np.zeros(len(self.actions)))
self.current_episode = 0
self.training_complete = False
self.current_episode_reward = 0
self.last_action = 0
self.model_prediction_errors = []
self.Debug(f"Model prefix: {self.config.get_model_prefix()}")
def LogWithRateLimit(self, message, force=False):
self.log_count += 1
if force or self.log_count%self.config.log_frequency==0:
self.Debug(message)
def LoadTrainingData(self):
try:
history = self.History(self.symbol,300,Resolution.Daily)
if history.empty or len(history)<self.config.min_training_days:
self.LogWithRateLimit("Not enough daily historical data to train.",True)
self.training_complete = True
self.training_data_length = self.training_index = 0
return
self.LogWithRateLimit(f"Loaded {len(history)} daily bars for training.",True)
history = history.loc[self.symbol]
self.tr_closes = history['close'].values
self.tr_highs = history['high'].values
self.tr_lows = history['low'].values
self.tr_volumes = history['volume'].values
for i in range(1,len(self.tr_closes)):
self.daily_returns.append((self.tr_closes[i]-self.tr_closes[i-1])/self.tr_closes[i-1])
for i in range(len(self.tr_closes)):
self.trading_model.update(self.tr_closes[i],self.tr_highs[i],
self.tr_lows[i],self.tr_volumes[i])
self.tr_raw_states = []
for i in range(len(self.tr_closes)):
self.tr_raw_states.append(np.zeros(self.state_dim) if i<20 else
self.trading_model.get_state_features())
self.tr_raw_states = np.array(self.tr_raw_states)
self.tr_signals = []
for i in range(len(self.tr_closes)):
self.tr_signals.append({'action':0,'confidence':0,'trend':0,
'mean_reversion':0,'macd_cross':0} if i<20 else
self.trading_model.get_trade_signals())
self.training_data_length = len(self.tr_closes)
self.training_index = min(60,self.training_data_length-2)
self.LogWithRateLimit(f"Training data length: {self.training_data_length}",True)
self.LogWithRateLimit(f"Starting Episode {self.current_episode+1}.",True)
except Exception as e:
self.LogWithRateLimit(f"Error loading training data: {str(e)}",True)
self.training_complete = True
self.training_data_length = self.training_index = 0
def DiscretizeState(self, state_vector):
price_change = np.clip(state_vector[0],-0.05,0.05)
volume_ratio = state_vector[1]
rsi = state_vector[2]*100
macd = state_vector[3]
sma_cross = state_vector[4]
volatility = state_vector[5]
trend = state_vector[6]
price_level = state_vector[7]
sr_proximity = state_vector[8]
return tuple([
min(4,max(0,int((price_change+0.05)/0.1*5))),
0 if volume_ratio<0.7 else (2 if volume_ratio>1.3 else 1),
0 if rsi<30 else (1 if rsi<45 else (2 if rsi<55 else (3 if rsi<70 else 4))),
0 if macd<-0.01 else (2 if macd>0.01 else 1),
0 if sma_cross==-1 else 1,
0 if volatility<0.01 else (2 if volatility>0.03 else 1),
min(2,max(0,int((trend+1)/2*3))),
min(2,max(0,int(price_level*3))),
1 if sr_proximity>0 else (0 if sr_proximity<0 else 2)
])
def CalculateDailyReward(self, action, current_price, next_price, signal_info):
if current_price<=0 or not np.isfinite(current_price) or not np.isfinite(next_price):
return 0
price_return = (next_price-current_price)/current_price
base_commission = self.config.commission_rate
scaled_commission = base_commission*(1+abs(price_return))
reward = 0
if action==1:
reward = price_return-scaled_commission
elif action==2:
reward = -price_return-scaled_commission
else:
reward = 0.001 if abs(price_return)<base_commission else 0
if action in [1,2]:
signal_bonus = 0.01 if action==signal_info['action'] else -0.005
reward += signal_bonus*(1+abs(price_return))
if hasattr(self,'volatility_history') and len(self.volatility_history)>0:
vol_scale = min(2.0,max(0.5,1.0/(self.volatility_history[-1]+1e-6)))
reward *= vol_scale
return reward
def CalculateSharpe(self, returns, risk_free_rate=0.0):
try:
if len(returns)<2: return 0.0
arr = np.array(returns)
arr = arr[np.isfinite(arr)]
if len(arr)<2: return 0.0
excess = arr-risk_free_rate
stddev = np.std(excess)
if stddev==0 or not np.isfinite(stddev): return 0.0
sharpe = np.mean(excess)/stddev*np.sqrt(252)
return np.clip(sharpe,-10.0,10.0)
except Exception as e:
self.LogWithRateLimit(f"Error calculating Sharpe ratio: {str(e)}")
return 0.0
def UpdateTrainingMetrics(self, reward, action, current_price, next_price, signal_info):
self.current_episode_reward += reward
price_return = (next_price-current_price)/current_price
for strat in ['trend', 'mean_reversion', 'macd_cross']:
signal_val = signal_info.get(strat, 0)
if signal_val != 0:
is_correct = (signal_val>0 and price_return>0) or (signal_val<0 and price_return<0)
self.strategy_accuracy[strat].append(1 if is_correct else 0)
if self.transition_model.trained and self.reward_model.trained:
try:
predicted_next_state = self.transition_model.predict(self.tr_raw_states[self.training_index], action)
if predicted_next_state is not None:
prediction_error = np.mean(((self.tr_raw_states[self.training_index+1]-predicted_next_state)/
(np.maximum(np.abs(self.tr_raw_states[self.training_index+1]),1e-6)))**2)
self.model_prediction_errors.append(min(prediction_error,100.0))
except Exception: pass
def TrainStep(self):
if self.training_data_length is None or self.training_index is None:
self.LogWithRateLimit("Training data not loaded. Skipping TrainStep.")
return
try:
if self.training_data_length <= self.config.min_training_days:
self.LogWithRateLimit(f"Insufficient training data length ({self.training_data_length})")
self.training_complete = True
return
if self.training_index >= self.training_data_length-1:
self.HandleEpisodeEnd()
return
i = self.training_index
current_state_raw = self.tr_raw_states[i].copy()
current_state_tuple = self.DiscretizeState(current_state_raw)
# Action selection with improved exploration
if random.random() < 0.3 and self.tr_signals[i]['confidence'] > 0.4:
action = self.tr_signals[i]['action']
elif random.random() < self.epsilon:
action = random.choice(self.actions)
else:
action = int(np.argmax(self.q_table[current_state_tuple]))
if i+1 >= len(self.tr_raw_states):
self.LogWithRateLimit(f"Index {i+1} out of range in training data.")
self.training_complete = True
return
next_state_raw = self.tr_raw_states[i+1].copy()
next_state_tuple = self.DiscretizeState(next_state_raw)
reward = self.CalculateDailyReward(action, self.tr_closes[i], self.tr_closes[i+1], self.tr_signals[i])
self.UpdateTrainingMetrics(reward, action, self.tr_closes[i], self.tr_closes[i+1], self.tr_signals[i])
scaled_reward = reward * self.config.reward_scaling
self.replay_buffer.add(current_state_raw, action, scaled_reward, next_state_raw)
# Model updates
self.days_since_model_update += 1
if (self.days_since_model_update >= self.config.model_train_freq and
len(self.replay_buffer) >= self.config.min_real_samples):
states, actions, rewards, next_states = self.replay_buffer.sample(2000)
trans_success = self.transition_model.train(states, actions, next_states)
reward_success = self.reward_model.train(states, actions, rewards)
if not (trans_success and reward_success):
if self.model_reset_count < 2:
self.transition_model = TransitionModel(self.state_dim, self.action_dim, self.config.random_seed)
self.reward_model = RewardModel(self.state_dim, self.action_dim, self.config.random_seed)
self.model_reset_count += 1
self.days_since_model_update = 0
# Q-table update with improved learning
best_next_action = int(np.argmax(self.q_table[next_state_tuple]))
td_target = scaled_reward + self.config.discount_factor * self.q_table[next_state_tuple][best_next_action]
td_error = td_target - self.q_table[current_state_tuple][action]
self.q_table[current_state_tuple][action] += self.config.learning_rate * td_error
self.last_action = action
self.training_index += 1
except Exception as e:
self.LogWithRateLimit(f"Error in TrainStep: {str(e)}")
if self.current_episode < self.config.episodes:
self.HandleEpisodeEnd()
else:
self.training_complete = True
def HandleEpisodeEnd(self):
self.episode_rewards.append(self.current_episode_reward)
if len(self.episode_rewards) > 1:
returns = []
for i in range(1, len(self.episode_rewards)):
prev = self.episode_rewards[i-1]
returns.append((self.episode_rewards[i]-prev)/(abs(prev)+1e-6))
if returns:
sharpe = self.CalculateSharpe(returns)
self.sharpe_ratios.append(sharpe)
self.current_episode += 1
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# Log episode results
self.LogWithRateLimit(f"Episode {self.current_episode} done. Reward: {self.current_episode_reward:.2f}, Epsilon: {self.epsilon:.3f}", True)
if len(self.model_prediction_errors) > 0:
median_error = np.median(self.model_prediction_errors)
self.LogWithRateLimit(f"Model prediction median error: {median_error:.4f}")
if self.current_episode >= self.config.episodes:
self.training_complete = True
self.EvaluateTrainingResults()
self.SaveQTable()
return
self.training_index = min(60, self.training_data_length-2)
self.current_episode_reward = 0
self.last_action = 0
# Adjust learning parameters based on performance
if len(self.episode_rewards) >= 3:
recent_rewards = self.episode_rewards[-3:]
avg_recent_reward = np.mean(recent_rewards)
if avg_recent_reward < self.config.min_avg_reward:
self.epsilon = min(0.8, self.epsilon*1.1)
reward_std = np.std(recent_rewards)
if reward_std > abs(avg_recent_reward):
self.config.learning_rate *= 0.95
else:
self.config.learning_rate = min(0.1, self.config.learning_rate*1.05)
def EvaluateTrainingResults(self):
avg_reward = np.mean(self.episode_rewards) if self.episode_rewards else 0
max_reward = max(self.episode_rewards) if self.episode_rewards else 0
min_reward = min(self.episode_rewards) if self.episode_rewards else 0
avg_sharpe = np.mean(self.sharpe_ratios) if self.sharpe_ratios else 0
model_err = np.median(self.model_prediction_errors) if self.model_prediction_errors else float('inf')
reward_std = np.std(self.episode_rewards) if len(self.episode_rewards)>1 else float('inf')
reward_consistency = avg_reward/reward_std if reward_std not in [0,float('inf')] else 0
success_rate = sum(1 for r in self.episode_rewards if r>0)/len(self.episode_rewards) if self.episode_rewards else 0
strategy_accuracy = {strat: sum(res)/len(res) if res else 0
for strat,res in self.strategy_accuracy.items()}
avg_strategy_accuracy = np.mean([acc for acc in strategy_accuracy.values() if acc>0])
self.performance_stats = {
'avg_reward': avg_reward, 'max_reward': max_reward, 'min_reward': min_reward,
'reward_std': reward_std, 'reward_consistency': reward_consistency,
'success_rate': success_rate, 'avg_sharpe': avg_sharpe,
'model_error': model_err, 'episodes_completed': self.current_episode,
'total_episodes': self.config.episodes, 'strategy_accuracy': strategy_accuracy,
'avg_strategy_accuracy': avg_strategy_accuracy
}
self.Debug("="*40)
self.Debug("TRAINING PERFORMANCE SUMMARY")
self.Debug("="*40)
self.Debug(f"Episodes: {self.current_episode}/{self.config.episodes}")
self.Debug(f"Avg Reward: {avg_reward:.4f} [{min_reward:.4f}, {max_reward:.4f}]")
self.Debug(f"Reward Consistency: {reward_consistency:.4f}")
self.Debug(f"Success Rate: {success_rate:.1%}")
self.Debug(f"Sharpe Ratio: {avg_sharpe:.4f}")
self.Debug(f"Model Error: {model_err:.4f}")
self.Debug("-"*30)
self.Debug("STRATEGY ACCURACY")
for strat,acc in strategy_accuracy.items():
self.Debug(f"{strat.replace('_',' ').title()}: {acc:.1%}")
good_avg = avg_reward >= self.config.min_avg_reward
good_success = success_rate >= self.config.success_rate_threshold
good_sharpe = avg_sharpe >= self.config.sharpe_ratio_threshold
good_strat = avg_strategy_accuracy >= self.config.strategy_accuracy_threshold
reasons = [
"Average reward meets minimum threshold" if good_avg else "Average reward below minimum threshold",
"Success rate meets the threshold" if good_success else "Success rate below threshold",
"Sharpe ratio meets threshold" if good_sharpe else "Sharpe ratio below threshold",
"Strategy accuracy meets threshold" if good_strat else "Strategy accuracy needs improvement"
]
is_ready = good_avg and good_success and good_sharpe and good_strat
self.performance_stats['is_ready_for_backtest'] = is_ready
self.performance_stats['recommendation_reasons'] = reasons
self.Debug("BACKTEST RECOMMENDATION")
self.Debug("="*40)
self.Debug("RECOMMENDATION: " + ("PROCEED WITH BACKTEST" if is_ready else "FURTHER TRAINING NEEDED"))
self.Debug("Rationale:")
for r in reasons:
self.Debug(f"- {r}")
self.Debug("="*40)
try:
self.LoadBestQTable()
except:
pass
self.SaveModelStats()
def SaveQTable(self, filename=None):
if filename is None:
filename = "qtable.pkl"
prefixed_filename = f"{self.config.get_model_prefix()}_{filename}"
try:
qtable_data = pickle.dumps(dict(self.q_table))
qtable_b64 = base64.b64encode(qtable_data).decode('utf-8')
if self.ObjectStore is not None:
self.ObjectStore.Save(prefixed_filename, qtable_b64)
self.LogWithRateLimit(f"QTable saved as '{prefixed_filename}'", True)
except Exception as e:
self.LogWithRateLimit(f"Error saving QTable: {str(e)}", True)
def SaveModelStats(self):
try:
stats = {
'avg_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
'max_reward': max(self.episode_rewards) if self.episode_rewards else 0,
'min_reward': min(self.episode_rewards) if self.episode_rewards else 0,
'episodes': self.current_episode,
'sharpe_ratio': np.mean(self.sharpe_ratios) if self.sharpe_ratios else 0,
'training_complete': self.training_complete,
'timestamp': str(self.Time),
'random_seed': self.config.random_seed,
'model_version': self.config.model_version,
'performance_summary': self.performance_stats
}
stats_data = pickle.dumps(stats)
stats_b64 = base64.b64encode(stats_data).decode('utf-8')
filename = f"{self.config.get_model_prefix()}_model_stats.pkl"
if self.ObjectStore is not None:
self.ObjectStore.Save(filename, stats_b64)
self.LogWithRateLimit(f"Model stats saved as '{filename}'", True)
except Exception as e:
self.LogWithRateLimit(f"Error saving model stats: {str(e)}", True)
def OnData(self, data):
if self.IsWarmingUp:
return
current_time = self.Time
if not self.training_data_loaded:
self.LoadTrainingData()
self.training_data_loaded = True
self.last_training_action_time = current_time
if (self.last_training_action_time is None or
current_time-self.last_training_action_time >= self.training_action_interval):
if self.training_data_length and self.training_data_length > 1:
self.TrainStep()
self.last_training_action_time = current_time
def OnEndOfAlgorithm(self):
if not self.training_complete:
self.LogWithRateLimit("Algorithm ending, finalizing training...", True)
self.EvaluateTrainingResults()
self.SaveQTable()