Overall Statistics
Total Orders
28
Average Win
19.42%
Average Loss
-3.31%
Compounding Annual Return
17.446%
Drawdown
30.000%
Expectancy
2.430
Start Equity
10000
End Equity
26262.98
Net Profit
162.630%
Sharpe Ratio
0.57
Sortino Ratio
0.401
Probabilistic Sharpe Ratio
15.144%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
5.86
Alpha
0.098
Beta
0.129
Annual Standard Deviation
0.196
Annual Variance
0.038
Information Ratio
0.035
Tracking Error
0.242
Treynor Ratio
0.869
Total Fees
$28.16
Estimated Strategy Capacity
$970000000.00
Lowest Capacity Asset
MSTR RBGP9S2961YD
Portfolio Turnover
0.67%
from AlgorithmImports import *
import numpy as np
import pandas as pd
import random
from collections import defaultdict
from decimal import Decimal  # <-- Still imported, but not used for Plot now

class FixedQLearningTradingAlgorithm(QCAlgorithm):
    
    def Initialize(self):
        # Set start/end dates and initial capital
        self.SetStartDate(2019, 1, 1)
        self.SetEndDate(2024, 12, 31)
        self.SetCash(10000)
        
        # Add benchmark security first, then set as benchmark
        self.spySymbol = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.SetBenchmark(self.spySymbol)
        
        # Main trading symbol
        self.symbol = self.AddEquity("MSTR", Resolution.Daily).Symbol

        # --------------------
        # RL Parameters
        # --------------------
        self.learning_rate = 0.1
        self.discount_factor = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.episodes = 50
        
        # --------------------
        # State space configuration
        # --------------------
        self.state_config = {
            'price_bins': 10,
            'volume_bins': 5,
            'rsi_bins': 4,
            'macd_bins': 3,
            'sma_cross': True
        }
        
        # Action space: 0=Hold, 1=Buy, 2=Sell
        self.actions = [0, 1, 2]
        
        # Initialize Q-table with default zeros
        self.q_table = defaultdict(lambda: np.zeros(len(self.actions)))
        
        # Training tracking
        self.episode_rewards = []
        self.current_episode = 0
        self.training_complete = False
        self.episode_numbers = []
        
        # Warm up period for indicators
        self.SetWarmUp(200, Resolution.Daily)
        
        # --------------------
        # Indicators
        # --------------------
        self.sma20 = self.SMA(self.symbol, 20, Resolution.Daily)
        self.sma50 = self.SMA(self.symbol, 50, Resolution.Daily)
        self.rsi   = self.RSI(self.symbol, 14, MovingAverageType.Simple, Resolution.Daily)
        self.macd  = self.MACD(self.symbol, 12, 26, 9, MovingAverageType.Exponential, Resolution.Daily)
        
        # Schedule training to run monthly
        self.Schedule.On(
            self.DateRules.MonthEnd(self.symbol), 
            self.TimeRules.AfterMarketOpen(self.symbol, 30), 
            self.TrainModel
        )
        
        # Initialize chart for learning curve
        self.SetupLearningCurveChart()
    
    def SetupLearningCurveChart(self):
        """Sets up the chart for plotting training performance."""
        learning_curve = Chart("Training Performance")
        
        reward_series = Series("Episode Reward")
        reward_series.SeriesType = SeriesType.Line
        reward_series.Color = Color.Blue
        
        moving_avg_series = Series("Moving Average")
        moving_avg_series.SeriesType = SeriesType.Line
        moving_avg_series.Color = Color.Red
        
        learning_curve.AddSeries(reward_series)
        learning_curve.AddSeries(moving_avg_series)
        
        self.AddChart(learning_curve)
    
    def TrainModel(self):
        """Runs a single episode of Q-learning each time it's scheduled."""
        if self.IsWarmingUp or self.training_complete:
            return
            
        self.Log(f"Starting training episode {self.current_episode + 1}/{self.episodes}")
        
        try:
            # Get historical data (1 year)
            history = self.History(self.symbol, 252, Resolution.Daily)
            if history.empty or len(history) < 100:
                self.Log("Insufficient history data for training")
                return
            
            closes = history['close'].values
            volumes = history['volume'].values
            
            # Compute indicators from historical data
            sma20 = history['close'].rolling(20).mean().values
            sma50 = history['close'].rolling(50).mean().values
            rsi_values = self.CalculateRSI(history['close'], 14)
            macd_values = self.CalculateMACD(history['close'])
            
            # Discretize states
            price_bins = self.Discretize(closes, self.state_config['price_bins'])
            volume_bins = self.Discretize(volumes, self.state_config['volume_bins'])
            rsi_bins = self.Discretize(rsi_values, self.state_config['rsi_bins'])
            macd_bins = self.Discretize(macd_values, self.state_config['macd_bins'])
            
            episode_reward = 0
            
            # Training loop
            for i in range(50, len(history) - 1):
                current_state = self.CreateState(
                    price_bins[i],
                    volume_bins[i],
                    rsi_bins[i],
                    macd_bins[i],
                    sma20[i] > sma50[i]
                )
                
                # Epsilon-greedy selection
                if random.random() < self.epsilon:
                    action = random.choice(self.actions)
                else:
                    action = np.argmax(self.q_table[current_state])
                
                # Reward calculation
                current_price = closes[i]
                next_price = closes[i + 1]
                reward = self.CalculateReward(action, current_price, next_price)
                episode_reward += reward
                
                # Next state
                next_state = self.CreateState(
                    price_bins[i+1],
                    volume_bins[i+1],
                    rsi_bins[i+1],
                    macd_bins[i+1],
                    sma20[i+1] > sma50[i+1]
                )
                
                # Q-learning update
                best_next_action = np.argmax(self.q_table[next_state])
                td_target = reward + self.discount_factor * self.q_table[next_state][best_next_action]
                td_error = td_target - self.q_table[current_state][action]
                self.q_table[current_state][action] += self.learning_rate * td_error
            
            # Track training progress
            self.episode_rewards.append(float(episode_reward))
            self.episode_numbers.append(self.current_episode + 1)
            self.current_episode += 1
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
            
            # Log progress
            self.Log(f"Episode {self.current_episode} completed. Reward: {episode_reward:.2f}, Epsilon: {self.epsilon:.3f}")
            
            # Update chart
            self.UpdateLearningCurve()
            
            # Check completion
            if self.current_episode >= self.episodes:
                self.training_complete = True
                self.LogTrainingPerformance()
                self.Log("Training complete. Switching to live trading mode.")
                
        except Exception as e:
            self.Error(f"Error during training: {str(e)}")
    
    def UpdateLearningCurve(self):
        """Updates the learning curve chart with the latest reward data."""
        if len(self.episode_rewards) == 0:
            return
            
        try:
            current_episode = self.current_episode
            current_reward = self.episode_rewards[-1]
            
            # Moving average window ~ 10% of episodes or at least 3
            window_size = max(3, len(self.episode_rewards) // 10)
            moving_avg = pd.Series(self.episode_rewards).rolling(window=window_size, min_periods=1).mean().values[-1]
            
            # Ensure the values are finite (not NaN or inf)
            if not np.isfinite(current_reward) or not np.isfinite(moving_avg):
                self.Log(f"Skipping plot due to invalid values - Episode: {current_episode}, Reward: {current_reward}, Moving Avg: {moving_avg}")
                return
            
            # Log the values for debugging
            self.Log(f"Plotting - Episode: {current_episode}, Reward: {current_reward}, Moving Avg: {moving_avg}")
            
            # Use the simpler 3-parameter Plot method (Y-value only, X-axis is time)
            self.Plot("Training Performance", "Episode Reward", current_reward)
            self.Plot("Training Performance", "Moving Average", moving_avg)
            
        except Exception as e:
            self.Error(f"Error updating learning curve: {str(e)}")
    
    def LogTrainingPerformance(self):
        """Logs final training statistics."""
        self.Log(f"Final training rewards: {self.episode_rewards}")
        self.Log(f"Average reward: {np.mean(self.episode_rewards):.2f}")
        self.Log(f"Max reward: {max(self.episode_rewards):.2f}")
        self.Log(f"Min reward: {min(self.episode_rewards):.2f}")
    
    def OnData(self, data):
        """Executes real-time decisions after training is complete."""
        if self.IsWarmingUp or not self.training_complete or not data.ContainsKey(self.symbol):
            return
            
        try:
            current_state = self.GetCurrentState()
            action = np.argmax(self.q_table[current_state])
            
            if action == 1 and not self.Portfolio[self.symbol].Invested:
                self.SetHoldings(self.symbol, 0.5)  # Buy with 50% allocation
            elif action == 2 and self.Portfolio[self.symbol].Invested:
                self.Liquidate(self.symbol)
                
        except Exception as e:
            self.Error(f"Error in OnData: {str(e)}")
    
    def GetCurrentState(self):
        """Retrieves the current market state for the live bar."""
        # Ensure all indicators are ready
        if not (self.sma20.IsReady and self.sma50.IsReady and 
                self.rsi.IsReady and self.macd.IsReady):
            return (0, 0, 0, 0, 0)
            
        price = self.Securities[self.symbol].Close
        volume = self.Securities[self.symbol].Volume
        rsi = self.rsi.Current.Value
        macd = self.macd.Current.Value
        sma_cross = self.sma20.Current.Value > self.sma50.Current.Value
        
        # Historical data for binning
        history = self.History(self.symbol, 252, Resolution.Daily)
        if history.empty:
            return (0, 0, 0, 0, 0)
        
        price_bin = self.DiscretizeValue(price, history['close'])
        volume_bin = self.DiscretizeValue(volume, history['volume'])
        rsi_bin = self.DiscretizeValue(rsi, [x for x in range(0, 101)])
        macd_bin = self.DiscretizeValue(macd, history['close'].pct_change().dropna())
        
        return self.CreateState(price_bin, volume_bin, rsi_bin, macd_bin, sma_cross)
    
    def CreateState(self, price_bin, volume_bin, rsi_bin, macd_bin, sma_cross):
        """Creates a tuple representing the discretized state."""
        return (price_bin, volume_bin, rsi_bin, macd_bin, int(sma_cross))
    
    def CalculateReward(self, action, current_price, next_price):
        """Simple reward: profit/loss percentage based on action, with safeguards."""
        # Avoid division by zero and invalid prices
        if current_price == 0 or not np.isfinite(current_price) or not np.isfinite(next_price):
            return 0  # Return neutral reward if prices are invalid
        
        if action == 1:  # Buy
            return (next_price - current_price) / current_price
        elif action == 2:  # Sell
            return (current_price - next_price) / current_price
        return 0  # Hold
    
    def Discretize(self, values, num_bins):
        """Discretize a 1D array into the specified number of bins."""
        if len(values) == 0:
            return np.zeros(0)
        bins = pd.qcut(values, num_bins, labels=False, duplicates='drop')
        return np.nan_to_num(bins, nan=0)
    
    def DiscretizeValue(self, value, values):
        """
        Discretize a single value based on the distribution of 'values'.
        Chooses bin counts from self.state_config. 
        """
        if not isinstance(values, (list, pd.Series, np.ndarray)) or len(values) == 0:
            return 0
        
        try:
            if isinstance(values, list):
                values = pd.Series(values)
            
            # Decide how many bins to use
            bin_count = self.state_config['price_bins']
            if hasattr(values, 'name'):
                if values.name == 'volume':
                    bin_count = self.state_config['volume_bins']
                elif values.name == 'close':
                    bin_count = self.state_config['price_bins']
            
            # Heuristic check for RSI range (0..100)
            if values.min() >= 0 and values.max() <= 100:
                bin_count = self.state_config['rsi_bins']
            
            # If it doesn't fit above, default to MACD bin count
            if bin_count == self.state_config['price_bins'] and not (values.min() >= 0 and values.max() <= 100):
                bin_count = self.state_config['macd_bins']
            
            # Use qcut to determine bin edges
            bins = pd.qcut(values, bin_count, labels=False, duplicates='drop')
            max_bin = bins.max()
            if pd.isna(max_bin):
                return 0
            
            # Place the value into the same [min..max] range
            val_bin = min(
                int((value - values.min()) / (values.max() - values.min()) * max_bin), 
                max_bin
            )
            return val_bin
        except:
            return 0
    
    def CalculateRSI(self, prices, window):
        """Calculates RSI values (simple python version)."""
        delta = prices.diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(window).mean()
        avg_loss = loss.rolling(window).mean()
        rs = avg_gain / avg_loss
        return 100 - (100 / (1 + rs))
    
    def CalculateMACD(self, prices):
        """Calculates MACD (12/26 EMA difference)."""
        ema12 = prices.ewm(span=12, adjust=False).mean()
        ema26 = prices.ewm(span=26, adjust=False).mean()
        return ema12 - ema26