# region imports
from AlgorithmImports import *
from sklearn.cluster import KMeans
import numpy as np
from collections import Counter
from datetime import timedelta
# endregion
class KMeansPairsSelector(QCAlgorithm):
def initialize(self):
self.set_cash(200000)
self.settings.seed_initial_prices = True
self.set_start_date(self.end_date - timedelta(5 * 365))
self._universe = self.add_universe(self._select_fundamentals)
# Monthly pair selection + entry
self.schedule.on(
self.date_rules.month_start(),
self.time_rules.after_market_open("SPY", 30),
self._rebalance
)
# Daily: check exits and reversion
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.after_market_open("SPY", 60),
self._check_exits
)
self._current_pair = None
self._entry_date = None
self._entry_threshold = 0.5
self._exit_threshold = 0.2
self._max_hold_days = 20
def _select_fundamentals(self, fundamentals: List[Fundamental]) -> List[Symbol]:
sorted_by_market_cap = sorted(fundamentals, key=lambda f: f.market_cap, reverse=True)
return [f.symbol for f in sorted_by_market_cap[:100]]
def _in_trading_window(self):
h = self.time.hour
m = self.time.minute
return (h == 9 and m >= 45) or (10 <= h <= 15)
def _has_price(self, sym):
return self.securities[sym].has_data and self.securities[sym].price > 0
def _liquidate_pair(self):
if self._current_pair is not None:
for sym in self._current_pair:
if self.portfolio[sym].invested and self._has_price(sym) and self._in_trading_window():
self.liquidate(sym)
self._current_pair = None
self._entry_date = None
def _compute_z_score(self, sym_a, sym_b, prices_df):
window = 20
pair_prices = prices_df[[sym_a, sym_b]].iloc[-window:].copy()
norm_a = pair_prices[sym_a] / pair_prices[sym_a].iloc[0]
norm_b = pair_prices[sym_b] / pair_prices[sym_b].iloc[0]
spread = norm_a - norm_b
spread_mean = spread.mean()
spread_std = spread.std()
if spread_std == 0 or np.isnan(spread_std):
return None
return (spread.iloc[-1] - spread_mean) / spread_std
def _check_exits(self):
if self._current_pair is None:
return
sym_a, sym_b = self._current_pair
# Max hold days exit
if self._entry_date is not None:
hold_days = (self.time - self._entry_date).days
if hold_days >= self._max_hold_days:
self._liquidate_pair()
return
if not self._has_price(sym_a) or not self._has_price(sym_b):
return
# Reversion exit
history = self.history(list(self._current_pair), 21, Resolution.DAILY)
if history.empty or 'close' not in history.columns:
return
prices = history['close'].unstack(level=0)
prices = prices[[sym_a, sym_b]].dropna()
if len(prices) < 20:
return
z_score = self._compute_z_score(sym_a, sym_b, prices)
if z_score is None:
return
if abs(z_score) < self._exit_threshold:
self._liquidate_pair()
def _rebalance(self):
selected = self._universe.selected
if selected is None:
self._liquidate_pair()
return
symbols = list(selected)
if len(symbols) < 10:
self._liquidate_pair()
return
history = self.history(symbols, 61, Resolution.DAILY)
if history.empty or 'close' not in history.columns:
self._liquidate_pair()
return
prices = history['close'].unstack(level=0)
prices = prices.dropna(axis=1, how='any')
if prices.shape[1] < 10:
self._liquidate_pair()
return
returns = prices.pct_change().dropna()
if len(returns) < 60:
self._liquidate_pair()
return
returns = returns.iloc[-60:]
return_vectors = returns.T.values
kmeans = KMeans(n_clusters=10, random_state=42, n_init=10)
labels = kmeans.fit_predict(return_vectors)
# Pick the pair with the strongest |z-score| across all clusters
best_pair = None
best_z = 0.0
label_counts = Counter(labels)
for cluster_label in label_counts:
indices = [i for i, l in enumerate(labels) if l == cluster_label]
if len(indices) < 2:
continue
cluster_symbols = [prices.columns[i] for i in indices]
for i in range(len(cluster_symbols)):
for j in range(i + 1, len(cluster_symbols)):
sym_a = cluster_symbols[i]
sym_b = cluster_symbols[j]
z = self._compute_z_score(sym_a, sym_b, prices)
if z is not None and abs(z) > best_z:
best_z = abs(z)
best_pair = (sym_a, sym_b)
if best_pair is None:
self._liquidate_pair()
return
# Liquidate old pair if it changed
if self._current_pair is not None and (
best_pair[0] != self._current_pair[0] or best_pair[1] != self._current_pair[1]
):
self._liquidate_pair()
self._current_pair = best_pair
sym_a, sym_b = best_pair
# If already in a trade, let _check_exits handle exits
if self.portfolio[sym_a].invested or self.portfolio[sym_b].invested:
return
if not self._in_trading_window():
return
if not self._has_price(sym_a) or not self._has_price(sym_b):
return
z_score = best_z
if abs(z_score) < self._exit_threshold:
return
if z_score > self._entry_threshold:
self.set_holdings([
PortfolioTarget(sym_a, -0.5),
PortfolioTarget(sym_b, 0.5)
], liquidate_existing_holdings=True)
self._entry_date = self.time
elif z_score < -self._entry_threshold:
self.set_holdings([
PortfolioTarget(sym_a, 0.5),
PortfolioTarget(sym_b, -0.5)
], liquidate_existing_holdings=True)
self._entry_date = self.time