| Overall Statistics |
|
Total Orders 28 Average Win 19.42% Average Loss -3.31% Compounding Annual Return 17.446% Drawdown 30.000% Expectancy 2.430 Start Equity 10000 End Equity 26262.98 Net Profit 162.630% Sharpe Ratio 0.57 Sortino Ratio 0.401 Probabilistic Sharpe Ratio 15.144% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 5.86 Alpha 0.098 Beta 0.129 Annual Standard Deviation 0.196 Annual Variance 0.038 Information Ratio 0.035 Tracking Error 0.242 Treynor Ratio 0.869 Total Fees $28.16 Estimated Strategy Capacity $970000000.00 Lowest Capacity Asset MSTR RBGP9S2961YD Portfolio Turnover 0.67% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
import random
from collections import defaultdict
from decimal import Decimal # <-- Still imported, but not used for Plot now
class FixedQLearningTradingAlgorithm(QCAlgorithm):
def Initialize(self):
# Set start/end dates and initial capital
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2024, 12, 31)
self.SetCash(10000)
# Add benchmark security first, then set as benchmark
self.spySymbol = self.AddEquity("SPY", Resolution.Daily).Symbol
self.SetBenchmark(self.spySymbol)
# Main trading symbol
self.symbol = self.AddEquity("MSTR", Resolution.Daily).Symbol
# --------------------
# RL Parameters
# --------------------
self.learning_rate = 0.1
self.discount_factor = 0.95
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.episodes = 50
# --------------------
# State space configuration
# --------------------
self.state_config = {
'price_bins': 10,
'volume_bins': 5,
'rsi_bins': 4,
'macd_bins': 3,
'sma_cross': True
}
# Action space: 0=Hold, 1=Buy, 2=Sell
self.actions = [0, 1, 2]
# Initialize Q-table with default zeros
self.q_table = defaultdict(lambda: np.zeros(len(self.actions)))
# Training tracking
self.episode_rewards = []
self.current_episode = 0
self.training_complete = False
self.episode_numbers = []
# Warm up period for indicators
self.SetWarmUp(200, Resolution.Daily)
# --------------------
# Indicators
# --------------------
self.sma20 = self.SMA(self.symbol, 20, Resolution.Daily)
self.sma50 = self.SMA(self.symbol, 50, Resolution.Daily)
self.rsi = self.RSI(self.symbol, 14, MovingAverageType.Simple, Resolution.Daily)
self.macd = self.MACD(self.symbol, 12, 26, 9, MovingAverageType.Exponential, Resolution.Daily)
# Schedule training to run monthly
self.Schedule.On(
self.DateRules.MonthEnd(self.symbol),
self.TimeRules.AfterMarketOpen(self.symbol, 30),
self.TrainModel
)
# Initialize chart for learning curve
self.SetupLearningCurveChart()
def SetupLearningCurveChart(self):
"""Sets up the chart for plotting training performance."""
learning_curve = Chart("Training Performance")
reward_series = Series("Episode Reward")
reward_series.SeriesType = SeriesType.Line
reward_series.Color = Color.Blue
moving_avg_series = Series("Moving Average")
moving_avg_series.SeriesType = SeriesType.Line
moving_avg_series.Color = Color.Red
learning_curve.AddSeries(reward_series)
learning_curve.AddSeries(moving_avg_series)
self.AddChart(learning_curve)
def TrainModel(self):
"""Runs a single episode of Q-learning each time it's scheduled."""
if self.IsWarmingUp or self.training_complete:
return
self.Log(f"Starting training episode {self.current_episode + 1}/{self.episodes}")
try:
# Get historical data (1 year)
history = self.History(self.symbol, 252, Resolution.Daily)
if history.empty or len(history) < 100:
self.Log("Insufficient history data for training")
return
closes = history['close'].values
volumes = history['volume'].values
# Compute indicators from historical data
sma20 = history['close'].rolling(20).mean().values
sma50 = history['close'].rolling(50).mean().values
rsi_values = self.CalculateRSI(history['close'], 14)
macd_values = self.CalculateMACD(history['close'])
# Discretize states
price_bins = self.Discretize(closes, self.state_config['price_bins'])
volume_bins = self.Discretize(volumes, self.state_config['volume_bins'])
rsi_bins = self.Discretize(rsi_values, self.state_config['rsi_bins'])
macd_bins = self.Discretize(macd_values, self.state_config['macd_bins'])
episode_reward = 0
# Training loop
for i in range(50, len(history) - 1):
current_state = self.CreateState(
price_bins[i],
volume_bins[i],
rsi_bins[i],
macd_bins[i],
sma20[i] > sma50[i]
)
# Epsilon-greedy selection
if random.random() < self.epsilon:
action = random.choice(self.actions)
else:
action = np.argmax(self.q_table[current_state])
# Reward calculation
current_price = closes[i]
next_price = closes[i + 1]
reward = self.CalculateReward(action, current_price, next_price)
episode_reward += reward
# Next state
next_state = self.CreateState(
price_bins[i+1],
volume_bins[i+1],
rsi_bins[i+1],
macd_bins[i+1],
sma20[i+1] > sma50[i+1]
)
# Q-learning update
best_next_action = np.argmax(self.q_table[next_state])
td_target = reward + self.discount_factor * self.q_table[next_state][best_next_action]
td_error = td_target - self.q_table[current_state][action]
self.q_table[current_state][action] += self.learning_rate * td_error
# Track training progress
self.episode_rewards.append(float(episode_reward))
self.episode_numbers.append(self.current_episode + 1)
self.current_episode += 1
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# Log progress
self.Log(f"Episode {self.current_episode} completed. Reward: {episode_reward:.2f}, Epsilon: {self.epsilon:.3f}")
# Update chart
self.UpdateLearningCurve()
# Check completion
if self.current_episode >= self.episodes:
self.training_complete = True
self.LogTrainingPerformance()
self.Log("Training complete. Switching to live trading mode.")
except Exception as e:
self.Error(f"Error during training: {str(e)}")
def UpdateLearningCurve(self):
"""Updates the learning curve chart with the latest reward data."""
if len(self.episode_rewards) == 0:
return
try:
current_episode = self.current_episode
current_reward = self.episode_rewards[-1]
# Moving average window ~ 10% of episodes or at least 3
window_size = max(3, len(self.episode_rewards) // 10)
moving_avg = pd.Series(self.episode_rewards).rolling(window=window_size, min_periods=1).mean().values[-1]
# Ensure the values are finite (not NaN or inf)
if not np.isfinite(current_reward) or not np.isfinite(moving_avg):
self.Log(f"Skipping plot due to invalid values - Episode: {current_episode}, Reward: {current_reward}, Moving Avg: {moving_avg}")
return
# Log the values for debugging
self.Log(f"Plotting - Episode: {current_episode}, Reward: {current_reward}, Moving Avg: {moving_avg}")
# Use the simpler 3-parameter Plot method (Y-value only, X-axis is time)
self.Plot("Training Performance", "Episode Reward", current_reward)
self.Plot("Training Performance", "Moving Average", moving_avg)
except Exception as e:
self.Error(f"Error updating learning curve: {str(e)}")
def LogTrainingPerformance(self):
"""Logs final training statistics."""
self.Log(f"Final training rewards: {self.episode_rewards}")
self.Log(f"Average reward: {np.mean(self.episode_rewards):.2f}")
self.Log(f"Max reward: {max(self.episode_rewards):.2f}")
self.Log(f"Min reward: {min(self.episode_rewards):.2f}")
def OnData(self, data):
"""Executes real-time decisions after training is complete."""
if self.IsWarmingUp or not self.training_complete or not data.ContainsKey(self.symbol):
return
try:
current_state = self.GetCurrentState()
action = np.argmax(self.q_table[current_state])
if action == 1 and not self.Portfolio[self.symbol].Invested:
self.SetHoldings(self.symbol, 0.5) # Buy with 50% allocation
elif action == 2 and self.Portfolio[self.symbol].Invested:
self.Liquidate(self.symbol)
except Exception as e:
self.Error(f"Error in OnData: {str(e)}")
def GetCurrentState(self):
"""Retrieves the current market state for the live bar."""
# Ensure all indicators are ready
if not (self.sma20.IsReady and self.sma50.IsReady and
self.rsi.IsReady and self.macd.IsReady):
return (0, 0, 0, 0, 0)
price = self.Securities[self.symbol].Close
volume = self.Securities[self.symbol].Volume
rsi = self.rsi.Current.Value
macd = self.macd.Current.Value
sma_cross = self.sma20.Current.Value > self.sma50.Current.Value
# Historical data for binning
history = self.History(self.symbol, 252, Resolution.Daily)
if history.empty:
return (0, 0, 0, 0, 0)
price_bin = self.DiscretizeValue(price, history['close'])
volume_bin = self.DiscretizeValue(volume, history['volume'])
rsi_bin = self.DiscretizeValue(rsi, [x for x in range(0, 101)])
macd_bin = self.DiscretizeValue(macd, history['close'].pct_change().dropna())
return self.CreateState(price_bin, volume_bin, rsi_bin, macd_bin, sma_cross)
def CreateState(self, price_bin, volume_bin, rsi_bin, macd_bin, sma_cross):
"""Creates a tuple representing the discretized state."""
return (price_bin, volume_bin, rsi_bin, macd_bin, int(sma_cross))
def CalculateReward(self, action, current_price, next_price):
"""Simple reward: profit/loss percentage based on action, with safeguards."""
# Avoid division by zero and invalid prices
if current_price == 0 or not np.isfinite(current_price) or not np.isfinite(next_price):
return 0 # Return neutral reward if prices are invalid
if action == 1: # Buy
return (next_price - current_price) / current_price
elif action == 2: # Sell
return (current_price - next_price) / current_price
return 0 # Hold
def Discretize(self, values, num_bins):
"""Discretize a 1D array into the specified number of bins."""
if len(values) == 0:
return np.zeros(0)
bins = pd.qcut(values, num_bins, labels=False, duplicates='drop')
return np.nan_to_num(bins, nan=0)
def DiscretizeValue(self, value, values):
"""
Discretize a single value based on the distribution of 'values'.
Chooses bin counts from self.state_config.
"""
if not isinstance(values, (list, pd.Series, np.ndarray)) or len(values) == 0:
return 0
try:
if isinstance(values, list):
values = pd.Series(values)
# Decide how many bins to use
bin_count = self.state_config['price_bins']
if hasattr(values, 'name'):
if values.name == 'volume':
bin_count = self.state_config['volume_bins']
elif values.name == 'close':
bin_count = self.state_config['price_bins']
# Heuristic check for RSI range (0..100)
if values.min() >= 0 and values.max() <= 100:
bin_count = self.state_config['rsi_bins']
# If it doesn't fit above, default to MACD bin count
if bin_count == self.state_config['price_bins'] and not (values.min() >= 0 and values.max() <= 100):
bin_count = self.state_config['macd_bins']
# Use qcut to determine bin edges
bins = pd.qcut(values, bin_count, labels=False, duplicates='drop')
max_bin = bins.max()
if pd.isna(max_bin):
return 0
# Place the value into the same [min..max] range
val_bin = min(
int((value - values.min()) / (values.max() - values.min()) * max_bin),
max_bin
)
return val_bin
except:
return 0
def CalculateRSI(self, prices, window):
"""Calculates RSI values (simple python version)."""
delta = prices.diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window).mean()
avg_loss = loss.rolling(window).mean()
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def CalculateMACD(self, prices):
"""Calculates MACD (12/26 EMA difference)."""
ema12 = prices.ewm(span=12, adjust=False).mean()
ema26 = prices.ewm(span=26, adjust=False).mean()
return ema12 - ema26