| Overall Statistics |
|
Total Orders 490 Average Win 0.25% Average Loss -0.27% Compounding Annual Return 0.129% Drawdown 2.800% Expectancy 0.012 Start Equity 100000 End Equity 100634.79 Net Profit 0.635% Sharpe Ratio -3.234 Sortino Ratio -4.148 Probabilistic Sharpe Ratio 0.922% Loss Rate 48% Win Rate 52% Profit-Loss Ratio 0.93 Alpha -0.026 Beta 0.004 Annual Standard Deviation 0.008 Annual Variance 0 Information Ratio -0.684 Tracking Error 0.175 Treynor Ratio -7.196 Total Fees $490.00 Estimated Strategy Capacity $470000000.00 Lowest Capacity Asset QQQ RIWIV7K5Z9LX Portfolio Turnover 2.66% Drawdown Recovery 104 |
# region imports
from AlgorithmImports import *
from collections import deque
# endregion
class PairsTradingStrategyMACrossover(QCAlgorithm):
"""
Pairs Trading Strategy - Converted from TradeStation EasyLanguage
Trades the spread between two correlated securities using moving average crossovers.
Original logic:
- Long the Spread: Buy Symbol1, Short Symbol2
- Short the Spread: Short Symbol1, Buy Symbol2
- Entry: Moving average crossover of price ratio
- No pyramiding, automatic position reversal
"""
def initialize(self):
"""Initialize the algorithm"""
# ===== CONFIGURATION PARAMETERS =====
self.set_start_date(2020, 1, 1)
self.set_end_date(2024, 12, 1)
self.set_cash(100000)
# Pair symbols to trade
self.symbol1_ticker = "SPY" # Data1 equivalent
self.symbol2_ticker = "QQQ" # Data2 equivalent
# Trading parameters
self.dollars_per_symbol = 10000 # Amount to invest in each leg
self.fast_period = 5 # Fast MA period for ratio
self.slow_period = 10 # Slow MA period for ratio
# Add equity symbols
self.symbol1 = self.add_equity(self.symbol1_ticker, Resolution.DAILY).symbol
self.symbol2 = self.add_equity(self.symbol2_ticker, Resolution.DAILY).symbol
# Set benchmark
self.set_benchmark(self.symbol1)
# ===== STATE VARIABLES =====
# Position states: 0=Flat, 1=Long Spread, -1=Short Spread, 2=Other
self.position_state = 0
# Main states: 0=Idle, 1=Setup, 2=Exiting, 3=Flat, 4=Ready, 5=Entering, 6=In Position
self.main_state = 0
# Order state: 0=No orders, 1=Orders pending
self.order_state = 0
# Entry flags
self.go_long = False
self.go_short = False
self.first_trade = False
# Last signal direction for pyramiding prevention
self.last_dot_direction = 0 # 1=Long signal, -1=Short signal
# Track quantities
self.symbol1_quantity = 0
self.symbol2_quantity = 0
# ===== INDICATORS =====
# Store price ratio history for moving averages
self.ratio_window = RollingWindow[float](self.slow_period + 1)
# ===== WARMUP =====
self.set_warmup(self.slow_period + 1, Resolution.DAILY)
# ===== SCHEDULE =====
# Execute at market close (similar to EasyLanguage bar close logic)
self.schedule.on(
self.date_rules.every_day(self.symbol1),
self.time_rules.before_market_close(self.symbol1, 1),
self.on_bar_close
)
# ===== LOGGING =====
self.log_level = 1 # 0=None, 1=Trades only, 2=Debug
def on_data(self, data):
"""Update price ratio on each bar"""
if self.is_warming_up:
return
# Ensure we have prices for both symbols
if not (data.contains_key(self.symbol1) and data.contains_key(self.symbol2)):
return
price1 = data[self.symbol1].close
price2 = data[self.symbol2].close
if price1 > 0 and price2 > 0:
ratio = price1 / price2
self.ratio_window.add(ratio)
def on_bar_close(self):
"""Main logic executed at bar close"""
# Skip if warming up or not enough data
if self.is_warming_up or not self.ratio_window.is_ready:
return
# Update position state
self.update_position_state()
# Check for pending orders
self.check_open_orders()
# Calculate moving averages of ratio
fast_ma = self.calculate_fast_ma()
slow_ma = self.calculate_slow_ma()
if fast_ma is None or slow_ma is None:
return
# Get previous values for crossover detection
prev_fast_ma = self.calculate_fast_ma(1)
prev_slow_ma = self.calculate_slow_ma(1)
if prev_fast_ma is None or prev_slow_ma is None:
return
# Detect crossovers
long_entry_condition = prev_fast_ma >= prev_slow_ma and fast_ma < slow_ma # Cross under
short_entry_condition = prev_fast_ma <= prev_slow_ma and fast_ma > slow_ma # Cross over
# Log signals
if long_entry_condition:
self.log_trade(f"LONG SIGNAL: Fast MA ({fast_ma:.4f}) crossed under Slow MA ({slow_ma:.4f})")
if short_entry_condition:
self.log_trade(f"SHORT SIGNAL: Fast MA ({fast_ma:.4f}) crossed over Slow MA ({slow_ma:.4f})")
# Update last dot direction (for visualization/tracking)
if long_entry_condition:
self.last_dot_direction = 1
if short_entry_condition:
self.last_dot_direction = -1
# Evaluate entry conditions
self.evaluate_entry_conditions(long_entry_condition, short_entry_condition)
def calculate_fast_ma(self, offset=0):
"""Calculate fast moving average of ratio"""
if self.ratio_window.count < self.fast_period + offset:
return None
total = 0
for i in range(offset, self.fast_period + offset):
total += self.ratio_window[i]
return total / self.fast_period
def calculate_slow_ma(self, offset=0):
"""Calculate slow moving average of ratio"""
if self.ratio_window.count < self.slow_period + offset:
return None
total = 0
for i in range(offset, self.slow_period + offset):
total += self.ratio_window[i]
return total / self.slow_period
def evaluate_entry_conditions(self, long_entry_condition, short_entry_condition):
"""Evaluate entry conditions and manage state machine"""
# Conditions for first trade
first_trade_ok = not self.first_trade and self.main_state == 0
# Conditions for reversal
rev_to_long_ok = self.main_state == 6 and self.position_state == -1
rev_to_short_ok = self.main_state == 6 and self.position_state == 1
# Check for invalid position state on reversal
if self.first_trade:
if long_entry_condition and self.position_state != -1 and self.last_dot_direction == -1:
self.error("Incorrect position for reversing exists (Long signal)")
return
if short_entry_condition and self.position_state != 1 and self.last_dot_direction == 1:
self.error("Incorrect position for reversing exists (Short signal)")
return
# Long entry logic
if long_entry_condition and self.order_state == 0 and (rev_to_long_ok or first_trade_ok):
self.main_state = 1
self.go_long = True
self.go_short = False
self.first_trade = True
self.exit_existing_positions()
# Short entry logic
elif short_entry_condition and self.order_state == 0 and (rev_to_short_ok or first_trade_ok):
self.main_state = 1
self.go_short = True
self.go_long = False
self.first_trade = True
self.exit_existing_positions()
def exit_existing_positions(self):
"""Exit existing positions before reversing"""
if self.main_state == 1:
self.main_state = 2
if self.position_state == 1: # Long the Spread
self.log_trade(f"Exiting Long Spread: Sell {abs(self.symbol1_quantity)} {self.symbol1_ticker}, Cover {abs(self.symbol2_quantity)} {self.symbol2_ticker}")
# Exit long position in symbol1
if self.symbol1_quantity > 0:
self.liquidate(self.symbol1)
# Exit short position in symbol2
if self.symbol2_quantity < 0:
self.liquidate(self.symbol2)
elif self.position_state == -1: # Short the Spread
self.log_trade(f"Exiting Short Spread: Cover {abs(self.symbol1_quantity)} {self.symbol1_ticker}, Sell {abs(self.symbol2_quantity)} {self.symbol2_ticker}")
# Exit short position in symbol1
if self.symbol1_quantity < 0:
self.liquidate(self.symbol1)
# Exit long position in symbol2
if self.symbol2_quantity > 0:
self.liquidate(self.symbol2)
self.update_position_state()
def update_position_state(self):
"""Update position state based on current holdings"""
# Get current quantities
if self.portfolio[self.symbol1].invested:
self.symbol1_quantity = self.portfolio[self.symbol1].quantity
else:
self.symbol1_quantity = 0
if self.portfolio[self.symbol2].invested:
self.symbol2_quantity = self.portfolio[self.symbol2].quantity
else:
self.symbol2_quantity = 0
# Determine position state
long_the_spread = self.symbol1_quantity > 0 and self.symbol2_quantity < 0
short_the_spread = self.symbol1_quantity < 0 and self.symbol2_quantity > 0
if long_the_spread:
self.position_state = 1
elif short_the_spread:
self.position_state = -1
elif self.symbol1_quantity == 0 and self.symbol2_quantity == 0:
self.position_state = 0 # Flat
# Update main state machine
if self.main_state == 2:
self.main_state = 3
self.check_open_orders()
else:
self.position_state = 2 # Other (unexpected)
self.error(f"Unexpected position state: Symbol1={self.symbol1_quantity}, Symbol2={self.symbol2_quantity}")
# Check for spread position after entry
if (long_the_spread or short_the_spread) and self.main_state == 5:
self.main_state = 6
# Validate position matches expected
if self.main_state == 6 and self.position_state == 2:
self.error("Position does not match expected")
def check_open_orders(self):
"""Check for open orders"""
open_orders = self.transactions.get_open_orders()
if len(open_orders) > 0:
self.order_state = 1
else:
self.order_state = 0
# If we were waiting for orders to clear, proceed to entry
if self.main_state == 3:
self.main_state = 4
self.entry_method()
def entry_method(self):
"""Execute entry orders"""
if self.main_state != 4:
return
if self.go_long:
self.go_long = False
self.issue_long_spread_order()
elif self.go_short:
self.go_short = False
self.issue_short_spread_order()
def issue_long_spread_order(self):
"""Issue orders to go long the spread (Buy Symbol1, Short Symbol2)"""
if self.main_state == 4:
self.main_state = 5
price1 = self.securities[self.symbol1].close
price2 = self.securities[self.symbol2].close
if price1 > 0 and price2 > 0:
# Calculate quantities based on dollar allocation
qty1 = int(self.dollars_per_symbol / price1)
qty2 = int(self.dollars_per_symbol / price2)
self.log_trade(f"ENTERING LONG SPREAD: Buy {qty1} {self.symbol1_ticker} @ ${price1:.2f}, Short {qty2} {self.symbol2_ticker} @ ${price2:.2f}")
# Buy symbol1
self.market_order(self.symbol1, qty1)
# Short symbol2
self.market_order(self.symbol2, -qty2)
self.update_position_state()
def issue_short_spread_order(self):
"""Issue orders to go short the spread (Short Symbol1, Buy Symbol2)"""
if self.main_state == 4:
self.main_state = 5
price1 = self.securities[self.symbol1].close
price2 = self.securities[self.symbol2].close
if price1 > 0 and price2 > 0:
# Calculate quantities based on dollar allocation
qty1 = int(self.dollars_per_symbol / price1)
qty2 = int(self.dollars_per_symbol / price2)
self.log_trade(f"ENTERING SHORT SPREAD: Short {qty1} {self.symbol1_ticker} @ ${price1:.2f}, Buy {qty2} {self.symbol2_ticker} @ ${price2:.2f}")
# Short symbol1
self.market_order(self.symbol1, -qty1)
# Buy symbol2
self.market_order(self.symbol2, qty2)
self.update_position_state()
def on_order_event(self, order_event):
"""Handle order events"""
if order_event.status == OrderStatus.FILLED:
order = self.transactions.get_order_by_id(order_event.order_id)
self.log_debug(f"Order filled: {order.symbol} {order.quantity} @ ${order_event.fill_price:.2f}")
# Update position state after fills
self.update_position_state()
self.check_open_orders()
def log_trade(self, message):
"""Log trade messages"""
if self.log_level >= 1:
self.log(message)
def log_debug(self, message):
"""Log debug messages"""
if self.log_level >= 2:
self.debug(message)