Overall Statistics
Total Orders
78
Average Win
0.52%
Average Loss
-0.60%
Compounding Annual Return
-0.378%
Drawdown
2.700%
Expectancy
-0.018
Start Equity
1000000
End Equity
995123.31
Net Profit
-0.488%
Sharpe Ratio
-2.422
Sortino Ratio
-2.652
Probabilistic Sharpe Ratio
6.713%
Loss Rate
47%
Win Rate
53%
Profit-Loss Ratio
0.87
Alpha
-0.058
Beta
0.012
Annual Standard Deviation
0.024
Annual Variance
0.001
Information Ratio
-0.612
Tracking Error
0.15
Treynor Ratio
-4.762
Total Fees
$210.81
Estimated Strategy Capacity
$0
Lowest Capacity Asset
AAPL R735QTJ8XC9X
Portfolio Turnover
2.49%
#region imports
from AlgorithmImports import *
#endregion

import numpy as np
from collections import deque

class PairsTradingZScoreAlgorithm(QCAlgorithm):

    def Initialize(self):
        # Settings
        self.SetStartDate(2024, 1, 1)
        self.SetEndDate(2025, 4, 15)
        self.SetCash(1000000)
        
        # Symbols
        self.symbol1 = self.AddEquity("AAPL", Resolution.Daily).Symbol
        self.symbol2 = self.AddEquity("MSFT", Resolution.Daily).Symbol
        
        # Parameters
        self.window = 20
        self.entry_threshold = 1.5
        self.exit_threshold = 0.5
        self.position_size = 0.3  # 30% of available cash
        
        # Data storage
        self.prices1 = deque(maxlen=self.window)
        self.prices2 = deque(maxlen=self.window)
        
        # Tracking state
        self.current_position = 0  # 0 = no position, 1 = long spread, -1 = short spread

        # Flags
        self.history_loaded = False

        # Schedule daily trading decisions
        self.Schedule.On(self.DateRules.EveryDay(self.symbol1), self.TimeRules.AfterMarketOpen(self.symbol1, 30), self.TradeLogic)

    def OnData(self, data):
        # Only load history once after market open
        if not self.history_loaded:
            self.LoadHistory()
            self.history_loaded = True

    def LoadHistory(self):
        history = self.History([self.symbol1, self.symbol2], self.window, Resolution.Daily)
        if history.empty:
            self.Debug("No history data available.")
            return

        # Align data
        if self.symbol1 in history.index.get_level_values(0) and self.symbol2 in history.index.get_level_values(0):
            hist1 = history.loc[self.symbol1]["close"]
            hist2 = history.loc[self.symbol2]["close"]
            for p1, p2 in zip(hist1, hist2):
                self.prices1.append(p1)
                self.prices2.append(p2)
            self.Debug(f"History loaded: {len(self.prices1)} AAPL, {len(self.prices2)} MSFT")
        else:
            self.Debug("History loading error: Missing symbols.")

    def TradeLogic(self):
        # Update latest prices
        if not self.Securities[self.symbol1].HasData or not self.Securities[self.symbol2].HasData:
            self.Debug("Waiting for data...")
            return
        
        price1 = self.Securities[self.symbol1].Price
        price2 = self.Securities[self.symbol2].Price

        if np.isnan(price1) or np.isnan(price2) or price1 <= 0 or price2 <= 0:
            self.Debug(f"Invalid price data at {self.Time.date()}: price1={price1}, price2={price2}")
            return

        self.prices1.append(price1)
        self.prices2.append(price2)

        if len(self.prices1) < self.window or len(self.prices2) < self.window:
            self.Debug("Not enough rolling window yet.")
            return

        # Calculate ratio and z-score
        ratio = np.array(self.prices1) / np.array(self.prices2)
        mean = np.mean(ratio)
        std = np.std(ratio)

        if std == 0:
            std = 1e-6  # avoid division by zero

        current_ratio = price1 / price2
        z_score = (current_ratio - mean) / std

        self.Debug(f"Date: {self.Time.date()}, Z-Score: {z_score:.2f}")

        # Trading logic
        if self.current_position == 0:
            if z_score > self.entry_threshold:
                self.EnterShortSpread(price1, price2)
            elif z_score < -self.entry_threshold:
                self.EnterLongSpread(price1, price2)

        elif self.current_position == 1:
            if z_score > -self.exit_threshold:
                self.ExitPosition()

        elif self.current_position == -1:
            if z_score < self.exit_threshold:
                self.ExitPosition()

    def EnterLongSpread(self, price1, price2):
        cash_to_use = self.Portfolio.Cash * self.position_size
        qty1 = int(cash_to_use / (2 * price1))
        qty2 = int(cash_to_use / (2 * price2))
        
        if qty1 > 0 and qty2 > 0:
            self.MarketOrder(self.symbol1, qty1)
            self.MarketOrder(self.symbol2, -qty2)
            self.Debug(f"Entered Long Spread: Buy {qty1} {self.symbol1.Value}, Sell {qty2} {self.symbol2.Value}")
            self.current_position = 1

    def EnterShortSpread(self, price1, price2):
        cash_to_use = self.Portfolio.Cash * self.position_size
        qty1 = int(cash_to_use / (2 * price1))
        qty2 = int(cash_to_use / (2 * price2))
        
        if qty1 > 0 and qty2 > 0:
            self.MarketOrder(self.symbol1, -qty1)
            self.MarketOrder(self.symbol2, qty2)
            self.Debug(f"Entered Short Spread: Sell {qty1} {self.symbol1.Value}, Buy {qty2} {self.symbol2.Value}")
            self.current_position = -1

    def ExitPosition(self):
        qty1 = self.Portfolio[self.symbol1].Quantity
        qty2 = self.Portfolio[self.symbol2].Quantity

        if qty1 != 0:
            self.MarketOrder(self.symbol1, -qty1)
        if qty2 != 0:
            self.MarketOrder(self.symbol2, -qty2)

        self.Debug(f"Exited spread position.")
        self.current_position = 0