Overall Statistics
Total Orders
490
Average Win
0.25%
Average Loss
-0.27%
Compounding Annual Return
0.129%
Drawdown
2.800%
Expectancy
0.012
Start Equity
100000
End Equity
100634.79
Net Profit
0.635%
Sharpe Ratio
-3.234
Sortino Ratio
-4.148
Probabilistic Sharpe Ratio
0.922%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
0.93
Alpha
-0.026
Beta
0.004
Annual Standard Deviation
0.008
Annual Variance
0
Information Ratio
-0.684
Tracking Error
0.175
Treynor Ratio
-7.196
Total Fees
$490.00
Estimated Strategy Capacity
$470000000.00
Lowest Capacity Asset
QQQ RIWIV7K5Z9LX
Portfolio Turnover
2.66%
Drawdown Recovery
104
# region imports
from AlgorithmImports import *
from collections import deque
# endregion

class PairsTradingStrategyMACrossover(QCAlgorithm):
    """
    Pairs Trading Strategy - Converted from TradeStation EasyLanguage
    Trades the spread between two correlated securities using moving average crossovers.
    
    Original logic:
    - Long the Spread: Buy Symbol1, Short Symbol2
    - Short the Spread: Short Symbol1, Buy Symbol2
    - Entry: Moving average crossover of price ratio
    - No pyramiding, automatic position reversal
    """
    
    def initialize(self):
        """Initialize the algorithm"""
        
        # ===== CONFIGURATION PARAMETERS =====
        self.set_start_date(2020, 1, 1)
        self.set_end_date(2024, 12, 1)
        self.set_cash(100000)
        
        # Pair symbols to trade
        self.symbol1_ticker = "SPY"  # Data1 equivalent
        self.symbol2_ticker = "QQQ"  # Data2 equivalent
        
        # Trading parameters
        self.dollars_per_symbol = 10000  # Amount to invest in each leg
        self.fast_period = 5   # Fast MA period for ratio
        self.slow_period = 10  # Slow MA period for ratio
        
        # Add equity symbols
        self.symbol1 = self.add_equity(self.symbol1_ticker, Resolution.DAILY).symbol
        self.symbol2 = self.add_equity(self.symbol2_ticker, Resolution.DAILY).symbol
        
        # Set benchmark
        self.set_benchmark(self.symbol1)
        
        # ===== STATE VARIABLES =====
        # Position states: 0=Flat, 1=Long Spread, -1=Short Spread, 2=Other
        self.position_state = 0
        
        # Main states: 0=Idle, 1=Setup, 2=Exiting, 3=Flat, 4=Ready, 5=Entering, 6=In Position
        self.main_state = 0
        
        # Order state: 0=No orders, 1=Orders pending
        self.order_state = 0
        
        # Entry flags
        self.go_long = False
        self.go_short = False
        self.first_trade = False
        
        # Last signal direction for pyramiding prevention
        self.last_dot_direction = 0  # 1=Long signal, -1=Short signal
        
        # Track quantities
        self.symbol1_quantity = 0
        self.symbol2_quantity = 0
        
        # ===== INDICATORS =====
        # Store price ratio history for moving averages
        self.ratio_window = RollingWindow[float](self.slow_period + 1)
        
        # ===== WARMUP =====
        self.set_warmup(self.slow_period + 1, Resolution.DAILY)
        
        # ===== SCHEDULE =====
        # Execute at market close (similar to EasyLanguage bar close logic)
        self.schedule.on(
            self.date_rules.every_day(self.symbol1),
            self.time_rules.before_market_close(self.symbol1, 1),
            self.on_bar_close
        )
        
        # ===== LOGGING =====
        self.log_level = 1  # 0=None, 1=Trades only, 2=Debug
        
    def on_data(self, data):
        """Update price ratio on each bar"""
        if self.is_warming_up:
            return
            
        # Ensure we have prices for both symbols
        if not (data.contains_key(self.symbol1) and data.contains_key(self.symbol2)):
            return
            
        price1 = data[self.symbol1].close
        price2 = data[self.symbol2].close
        
        if price1 > 0 and price2 > 0:
            ratio = price1 / price2
            self.ratio_window.add(ratio)
    
    def on_bar_close(self):
        """Main logic executed at bar close"""
        
        # Skip if warming up or not enough data
        if self.is_warming_up or not self.ratio_window.is_ready:
            return
        
        # Update position state
        self.update_position_state()
        
        # Check for pending orders
        self.check_open_orders()
        
        # Calculate moving averages of ratio
        fast_ma = self.calculate_fast_ma()
        slow_ma = self.calculate_slow_ma()
        
        if fast_ma is None or slow_ma is None:
            return
        
        # Get previous values for crossover detection
        prev_fast_ma = self.calculate_fast_ma(1)
        prev_slow_ma = self.calculate_slow_ma(1)
        
        if prev_fast_ma is None or prev_slow_ma is None:
            return
        
        # Detect crossovers
        long_entry_condition = prev_fast_ma >= prev_slow_ma and fast_ma < slow_ma  # Cross under
        short_entry_condition = prev_fast_ma <= prev_slow_ma and fast_ma > slow_ma  # Cross over
        
        # Log signals
        if long_entry_condition:
            self.log_trade(f"LONG SIGNAL: Fast MA ({fast_ma:.4f}) crossed under Slow MA ({slow_ma:.4f})")
        if short_entry_condition:
            self.log_trade(f"SHORT SIGNAL: Fast MA ({fast_ma:.4f}) crossed over Slow MA ({slow_ma:.4f})")
        
        # Update last dot direction (for visualization/tracking)
        if long_entry_condition:
            self.last_dot_direction = 1
        if short_entry_condition:
            self.last_dot_direction = -1
        
        # Evaluate entry conditions
        self.evaluate_entry_conditions(long_entry_condition, short_entry_condition)
    
    def calculate_fast_ma(self, offset=0):
        """Calculate fast moving average of ratio"""
        if self.ratio_window.count < self.fast_period + offset:
            return None
        
        total = 0
        for i in range(offset, self.fast_period + offset):
            total += self.ratio_window[i]
        
        return total / self.fast_period
    
    def calculate_slow_ma(self, offset=0):
        """Calculate slow moving average of ratio"""
        if self.ratio_window.count < self.slow_period + offset:
            return None
        
        total = 0
        for i in range(offset, self.slow_period + offset):
            total += self.ratio_window[i]
        
        return total / self.slow_period
    
    def evaluate_entry_conditions(self, long_entry_condition, short_entry_condition):
        """Evaluate entry conditions and manage state machine"""
        
        # Conditions for first trade
        first_trade_ok = not self.first_trade and self.main_state == 0
        
        # Conditions for reversal
        rev_to_long_ok = self.main_state == 6 and self.position_state == -1
        rev_to_short_ok = self.main_state == 6 and self.position_state == 1
        
        # Check for invalid position state on reversal
        if self.first_trade:
            if long_entry_condition and self.position_state != -1 and self.last_dot_direction == -1:
                self.error("Incorrect position for reversing exists (Long signal)")
                return
            if short_entry_condition and self.position_state != 1 and self.last_dot_direction == 1:
                self.error("Incorrect position for reversing exists (Short signal)")
                return
        
        # Long entry logic
        if long_entry_condition and self.order_state == 0 and (rev_to_long_ok or first_trade_ok):
            self.main_state = 1
            self.go_long = True
            self.go_short = False
            self.first_trade = True
            self.exit_existing_positions()
        
        # Short entry logic
        elif short_entry_condition and self.order_state == 0 and (rev_to_short_ok or first_trade_ok):
            self.main_state = 1
            self.go_short = True
            self.go_long = False
            self.first_trade = True
            self.exit_existing_positions()
    
    def exit_existing_positions(self):
        """Exit existing positions before reversing"""
        
        if self.main_state == 1:
            self.main_state = 2
        
        if self.position_state == 1:  # Long the Spread
            self.log_trade(f"Exiting Long Spread: Sell {abs(self.symbol1_quantity)} {self.symbol1_ticker}, Cover {abs(self.symbol2_quantity)} {self.symbol2_ticker}")
            
            # Exit long position in symbol1
            if self.symbol1_quantity > 0:
                self.liquidate(self.symbol1)
            
            # Exit short position in symbol2
            if self.symbol2_quantity < 0:
                self.liquidate(self.symbol2)
        
        elif self.position_state == -1:  # Short the Spread
            self.log_trade(f"Exiting Short Spread: Cover {abs(self.symbol1_quantity)} {self.symbol1_ticker}, Sell {abs(self.symbol2_quantity)} {self.symbol2_ticker}")
            
            # Exit short position in symbol1
            if self.symbol1_quantity < 0:
                self.liquidate(self.symbol1)
            
            # Exit long position in symbol2
            if self.symbol2_quantity > 0:
                self.liquidate(self.symbol2)
        
        self.update_position_state()
    
    def update_position_state(self):
        """Update position state based on current holdings"""
        
        # Get current quantities
        if self.portfolio[self.symbol1].invested:
            self.symbol1_quantity = self.portfolio[self.symbol1].quantity
        else:
            self.symbol1_quantity = 0
        
        if self.portfolio[self.symbol2].invested:
            self.symbol2_quantity = self.portfolio[self.symbol2].quantity
        else:
            self.symbol2_quantity = 0
        
        # Determine position state
        long_the_spread = self.symbol1_quantity > 0 and self.symbol2_quantity < 0
        short_the_spread = self.symbol1_quantity < 0 and self.symbol2_quantity > 0
        
        if long_the_spread:
            self.position_state = 1
        elif short_the_spread:
            self.position_state = -1
        elif self.symbol1_quantity == 0 and self.symbol2_quantity == 0:
            self.position_state = 0  # Flat
            
            # Update main state machine
            if self.main_state == 2:
                self.main_state = 3
                self.check_open_orders()
        else:
            self.position_state = 2  # Other (unexpected)
            self.error(f"Unexpected position state: Symbol1={self.symbol1_quantity}, Symbol2={self.symbol2_quantity}")
        
        # Check for spread position after entry
        if (long_the_spread or short_the_spread) and self.main_state == 5:
            self.main_state = 6
        
        # Validate position matches expected
        if self.main_state == 6 and self.position_state == 2:
            self.error("Position does not match expected")
    
    def check_open_orders(self):
        """Check for open orders"""
        
        open_orders = self.transactions.get_open_orders()
        
        if len(open_orders) > 0:
            self.order_state = 1
        else:
            self.order_state = 0
            
            # If we were waiting for orders to clear, proceed to entry
            if self.main_state == 3:
                self.main_state = 4
                self.entry_method()
    
    def entry_method(self):
        """Execute entry orders"""
        
        if self.main_state != 4:
            return
        
        if self.go_long:
            self.go_long = False
            self.issue_long_spread_order()
        elif self.go_short:
            self.go_short = False
            self.issue_short_spread_order()
    
    def issue_long_spread_order(self):
        """Issue orders to go long the spread (Buy Symbol1, Short Symbol2)"""
        
        if self.main_state == 4:
            self.main_state = 5
        
        price1 = self.securities[self.symbol1].close
        price2 = self.securities[self.symbol2].close
        
        if price1 > 0 and price2 > 0:
            # Calculate quantities based on dollar allocation
            qty1 = int(self.dollars_per_symbol / price1)
            qty2 = int(self.dollars_per_symbol / price2)
            
            self.log_trade(f"ENTERING LONG SPREAD: Buy {qty1} {self.symbol1_ticker} @ ${price1:.2f}, Short {qty2} {self.symbol2_ticker} @ ${price2:.2f}")
            
            # Buy symbol1
            self.market_order(self.symbol1, qty1)
            
            # Short symbol2
            self.market_order(self.symbol2, -qty2)
        
        self.update_position_state()
    
    def issue_short_spread_order(self):
        """Issue orders to go short the spread (Short Symbol1, Buy Symbol2)"""
        
        if self.main_state == 4:
            self.main_state = 5
        
        price1 = self.securities[self.symbol1].close
        price2 = self.securities[self.symbol2].close
        
        if price1 > 0 and price2 > 0:
            # Calculate quantities based on dollar allocation
            qty1 = int(self.dollars_per_symbol / price1)
            qty2 = int(self.dollars_per_symbol / price2)
            
            self.log_trade(f"ENTERING SHORT SPREAD: Short {qty1} {self.symbol1_ticker} @ ${price1:.2f}, Buy {qty2} {self.symbol2_ticker} @ ${price2:.2f}")
            
            # Short symbol1
            self.market_order(self.symbol1, -qty1)
            
            # Buy symbol2
            self.market_order(self.symbol2, qty2)
        
        self.update_position_state()
    
    def on_order_event(self, order_event):
        """Handle order events"""
        
        if order_event.status == OrderStatus.FILLED:
            order = self.transactions.get_order_by_id(order_event.order_id)
            self.log_debug(f"Order filled: {order.symbol} {order.quantity} @ ${order_event.fill_price:.2f}")
            
            # Update position state after fills
            self.update_position_state()
            self.check_open_orders()
    
    def log_trade(self, message):
        """Log trade messages"""
        if self.log_level >= 1:
            self.log(message)
    
    def log_debug(self, message):
        """Log debug messages"""
        if self.log_level >= 2:
            self.debug(message)