| Overall Statistics |
|
Total Trades 30 Average Win 6.61% Average Loss -6.28% Compounding Annual Return 22.878% Drawdown 5.500% Expectancy 0.173 Net Profit 13.133% Sharpe Ratio 1.193 Probabilistic Sharpe Ratio 54.438% Loss Rate 43% Win Rate 57% Profit-Loss Ratio 1.05 Alpha 0.135 Beta 0.115 Annual Standard Deviation 0.124 Annual Variance 0.015 Information Ratio 0.11 Tracking Error 0.324 Treynor Ratio 1.292 Total Fees $1028.63 Estimated Strategy Capacity $1700000.00 Lowest Capacity Asset H UHDQ76M9UDET |
# region imports
import numpy as np
from AlgorithmImports import *
from decimal import Decimal
from io import StringIO
from collections import deque
from time import gmtime, strftime
# endregion
"""
outline:
get pairs monthly
every day compute the spread using the computed weights
if spread is above mean, buy spread
reverse position when spread crossovers the mean
if spread is below mean, sell spread
reverse position when spread crossovers the mean
new features:
- don't reenter spreads from timeout until it has crossed the sma at least once
|- how to do it? store trades, exit type, and flag as not tradeable until currpx > sma for long and vice versa
|- updated logic so that spread is calculated on every interval to check spread crossing after trade timeout
"""
class StockDataSource(PythonData):
def GetSource(self, config, date, isLiveMode):
url = (
# "https://www.dropbox.com/s/2l73mu97gcehmh7/daily-stock-picker-live.csv?dl=1" # Brian's
"https://www.dropbox.com/s/t4nafyooof485h7/pair_csv.csv?dl=1" # mine
if isLiveMode
# else "https://docs.google.com/spreadsheets/d/e/2PACX-1vRi02Q7O9jJG6Nl04PnpXBld_HhYyRBZchvQocMHnopjCN7jIQz6i1JYRKLPpvu3D5WPOakfEeNxZ-_/pub?gid=0&single=true&output=csv" # Brian's
else "https://docs.google.com/spreadsheets/d/1jtMR6fAewQzDb08cg5qsCzXNliq5zpTvWCm7goVdDMc/export?format=csv"
)
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip() and line[0].isdigit()):
return None
stocks = StockDataSource()
stocks.Symbol = config.Symbol
def get_point_in_time_symbol(ticker):
sid = SecurityIdentifier.GenerateEquity(
ticker, Market.USA, mappingResolveDate = date)
return Symbol(sid, ticker)
csv = line.split(",")
if isLiveMode:
stocks.Time = date
stocks["Symbols"] = [get_point_in_time_symbol(t) for t in csv]
else:
stocks.Time = datetime.strptime(csv[0], "%Y-%m-%d")
stocks["Symbols"] = [get_point_in_time_symbol(t) for t in csv[1:]]
return stocks
def quantize(number, digits=-2):
"""
convert messy floating point to clean num of digits
"""
num_places = Decimal(10) ** digits
q = Decimal(number).quantize(num_places)
q = float(q)
return q
# Custom slippage implementation
class CustomSlippageModel:
def __init__(self, algorithm):
self.algorithm = algorithm
def GetSlippageApproximation(self, asset, order):
# custom slippage math
slippage = asset.Price * 0.0001 * np.log10(2 * float(order.AbsoluteQuantity))
self.algorithm.Debug(f"CustomSlippageModel: {slippage:.2f}")
return slippage
class symbolData:
def __init__(
self,
pair,
bb_multiple,
window,
algorithm,
):
self.pair = pair
self.symbol1 = self.pair[0]
self.symbol2 = self.pair[1]
self.bb_multiple = bb_multiple
self.window = window
self.spread = None
self.intraday_spread = None
self.roll_mean = None
self.roll_std = None
self.bb_upper = None
self.bb_lower = None
self.long_spread = False
self.short_spread = False
self.in_trade = False
self.trade_entry_timestamp = None
self.algo = algorithm
self.trade_data = None
self.trade_timeout = False
def reset_trade_flags(self):
self.long_spread = False
self.short_spread = False
self.in_trade = False
self.trade_entry_timestamp = None
self.trade_data = None
self.trade_timeout = False
return
def spread_crossed_since_trade(self):
"""this should be used only if trade is timed out"""
# time diff
ts = self.trade_data["timestamp"].iloc[0]
tmp_spread = self.spread.loc[ts:]
tmp_roll_mean = self.roll_mean.loc[ts:]
if self.long_spread:
if any(tmp_spread > tmp_roll_mean):
self.reset_trade_flags()
elif self.short_spread:
if any(tmp_spread < tmp_roll_mean):
self.reset_trade_flags()
def check_trade_timeout(self):
if self.in_trade and self.trade_entry_timestamp is not None:
# numpy.busday_count(start, end)
trade_duration = np.busday_count(
self.trade_entry_timestamp.strftime("%Y-%m-%d"),
self.algo.Time.strftime("%Y-%m-%d"),
)
if trade_duration >= self.algo.max_trade_duration:
for symbol in self.pair:
# how many shares do we hold of the symbol in this pair trade
# liquidate only the amount for that pair trade
held_quantity = self.trade_data.loc[symbol, "quantity"]
if self.long_spread:
self.algo.MarketOnCloseOrder(
symbol,
-1 * held_quantity,
tag=f"TRADE TIMEOUT LONG SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
)
# need to track over time to confirm that spread crosses sma before anymore long positions
# cannot reset all flags before this happens
self.trade_timeout = True
self.spread_crossed_since_trade()
elif self.short_spread:
self.algo.MarketOnCloseOrder(
symbol,
-1 * held_quantity,
tag=f"TRADE TIMEOUT SHORT SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
)
# need to track over time to confirm that spread crosses sma before anymore long positions
# cannot reset all flags before this happens
self.trade_timeout = True
self.spread_crossed_since_trade()
return
def compute_spread(self, prices, intraday_prices):
# use ratio spread
self.spread = prices[self.symbol1] / prices[self.symbol2]
# intraday spread for entering positions
self.intraday_spread = (
intraday_prices[self.symbol1].iloc[-1]
/ intraday_prices[self.symbol2].iloc[-1]
)
roll_mean = self.spread.rolling(self.window).mean()
roll_std = self.spread.rolling(self.window).std()
self.roll_mean = roll_mean
self.roll_std = roll_std
bb_upper = roll_mean + self.bb_multiple * roll_std
bb_lower = roll_mean - self.bb_multiple * roll_std
self.bb_upper = bb_upper
self.bb_lower = bb_lower
return
def is_short_trigger(self):
cond = self.intraday_spread > self.bb_upper.iloc[-1]
if cond:
return True
return False
def is_long_trigger(self):
cond = self.intraday_spread < self.bb_lower.iloc[-1]
if cond:
return True
return False
class pairsTrader(QCAlgorithm):
def Initialize(self):
self.cash = 1_000_000
self.SetStartDate(2020, 1, 1) # Set Start Date
self.SetEndDate(2022, 4, 1)
self.SetCash(self.cash) # Set Strategy Cash
self.UniverseSettings.ExtendedMarketHours = True
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Adjusted
self.pairs = None
self.symbols = None
self.AddUniverse(StockDataSource, "my-stock-data-source", self.stockDataSource)
self.splotName = "Spread"
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series("spread", SeriesType.Line, 0))
sPlot.AddSeries(Series("rollmean", SeriesType.Line, 0))
sPlot.AddSeries(Series("bb upper", SeriesType.Line, 0))
sPlot.AddSeries(Series("bb lower", SeriesType.Line, 0))
self.AddChart(sPlot)
self.PLOT = False
self.pPlotName = "Concurrent Positions"
pPlot = Chart(self.pPlotName)
pPlot.AddSeries(Series("num positions", SeriesType.Line, 0))
self.AddChart(pPlot)
#####################################
# algo parameters
self.allocation = int(0.5 * self.cash / 2) # len(self.pairs))
self.Debug(f"default allocation to each pair trade: {self.allocation}")
MarketOnCloseOrder.SubmissionTimeBuffer = timedelta(minutes=10)
self.bb_multiple = 2.0 # for spread bollinger bands
self.window = 20 # for spread bollinger bands
self.lookback = int(60) # for price data
self.intraday_lookback = 10 # for intraday history call
self.portfolio_equity_history = list() # for anl vol of port
self.portfolio_anl_vol_lookback = 60 # for anl vol of port
self.history_resolution = Resolution.Daily
self.max_trade_duration = 7
self.target_vol = 0.10
self.spy = self.AddEquity("SPY").Symbol
self.SetBenchmark(self.spy)
self.symbol_data = dict()
#####################################
# schedule functions
self.Schedule.On(
self.DateRules.EveryDay(self.spy),
self.TimeRules.At(9, 30),
self.plot_num_positions,
)
self.Schedule.On(
self.DateRules.EveryDay(self.spy), self.TimeRules.At(9, 25), self.get_pairs
)
self.Schedule.On(
self.DateRules.EveryDay(self.spy), self.TimeRules.At(15, 45), self.trade
)
self.Schedule.On(
self.DateRules.EveryDay(self.spy),
self.TimeRules.At(15, 30),
self.check_trade_exit,
)
#####################################
# algo functions
def stockDataSource(self, data):
self.symbols = [symbol for item in data for symbol in item["Symbols"]]
self.pairs = []
for i, v in enumerate(self.symbols, start=1):
if i % 2 == 0:
chunk = self.symbols[i - 2 : i]
pair = []
for symbol in chunk:
pair.append(symbol)
pair = tuple(pair)
self.pairs.append(pair)
self.Debug(f"{self.Time} pairs: {[[str(tick) for tick in tupl] for tupl in self.pairs] }")
# self.Debug(f"{self.Time} pairs: {str(self.pairs[0][0])}")
return self.symbols
def get_daily_prices(self):
"""
get price history and make sure each stock has minimum amount of data
"""
prices = self.get_history(
self.symbols, self.lookback, Resolution.Daily, "close", "close prices"
)
if prices.empty:
return prices
prices = prices["close"].unstack(level=0).dropna()
return prices
def get_intraday_prices(self):
"""
get price history and make sure each stock has minimum amount of data
"""
prices = self.get_history(
self.symbols, self.intraday_lookback, Resolution.Minute, "close", "close prices"
)
if prices.empty:
return prices
prices = prices["close"].unstack(level=0).dropna()
return prices
##################
def get_history(self, symbol, lookback, resolution, column, data_label):
"""
- function to download security history and check that the column we need
exists in the dataframe. if it is missing it retries twice.
- returns empty series if column not found
"""
hist = self.History(symbol, lookback, resolution)
if column not in hist.columns:
self.Debug(f"{self.Time} {data_label} {column} data missing")
hist = self.History(symbol, lookback, resolution)
if column not in hist.columns:
self.Debug(f"{self.Time} {data_label} {column} data missing")
hist = self.History(symbol, lookback, resolution)
if column not in hist.columns:
self.Debug(
f"{self.Time} {data_label} {column} data missing no trades today"
)
return pd.Series()
return hist
##################
def plot_num_positions(self):
"""function to plot the daily number of concurrent positions"""
if len(self.symbol_data.keys()) < 1:
return
num_concurrent_positions = 0
for k in self.symbol_data.keys():
if self.symbol_data[k].in_trade:
num_concurrent_positions += 1
self.Plot(
self.pPlotName,
"num positions",
num_concurrent_positions if np.isfinite(num_concurrent_positions) else 0,
)
return
##################
def manage_pair_universe(self):
"""
remove pairs no longer in universe and liquidate if in any trades
"""
current_pairs = list(self.symbol_data.keys())
for pair in current_pairs:
sd = self.symbol_data[pair]
if pair not in self.pairs:
if sd.in_trade:
for symbol in sd.pair:
qty = sd.trade_data.loc[symbol, "quantity"]
self.MarketOnCloseOrder(
symbol, -1 * qty, tag="symbol no longer in universe"
)
del self.symbol_data[pair]
return
##################
def get_pairs(self):
"""
get all pairs for trading
"""
if self.symbols is None:
self.Debug(f"[{self.Time}] missing symbols inside get_pairs")
return
# manage change in pair universe
self.manage_pair_universe()
prices = self.get_daily_prices()
if prices.empty:
return
for pair in self.pairs:
if pair not in self.symbol_data.keys():
self.symbol_data[pair] = symbolData(
pair,
self.bb_multiple,
self.window,
self,
)
# self.Debug(f"pairs:\n{pair_df['pair']}")
return
##################
def pair_can_trade(self, pair):
"""
make sure the pair is tradeable:
1) by ensuring the security price is populated
2) that trade has not been timed out waiting for another crossover
"""
sd = self.symbol_data[pair]
for symbol in pair:
if not self.Securities[symbol].Close > 0:
self.Debug(f"{self.Time} {symbol} is missing price data")
return False
if sd.trade_timeout:
if not sd.spread_crossed_since_trade():
self.Log(
f"{self.Time} {[str(p) for p in pair]} has trade timed out, waiting for crossover"
)
return False
return True
##################
def buy_spread(self, sd, pair, allocation=None):
"""
function to buy spread
"""
if allocation is None:
allocation = self.allocation
wt1 = allocation
wt2 = allocation * -1
pair0_shares = int(wt1 / self.Securities[pair[0]].Close)
pair1_shares = int(wt2 / self.Securities[pair[1]].Close)
self.Debug(
f"LONG spread : {str(sd.pair[0])} dv={wt1:.2f} shares={pair0_shares} vs {str(sd.pair[1])} dv={wt2:.2f} shares={pair1_shares}"
)
# send market on close orders instead
mkto0 = self.MarketOnCloseOrder(pair[0], pair0_shares)
mkto1 = self.MarketOnCloseOrder(pair[1], pair1_shares)
# save trade data
sd.trade_data = pd.DataFrame(
index=[pair[0], pair[1]], columns=["timestamp", "market_order", "quantity"]
)
sd.trade_data.loc[pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity)
sd.trade_data.loc[pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity)
# set trade flags
sd.in_trade = True
sd.long_spread = True
sd.trade_entry_timestamp = self.Time
return
##################
def short_spread(self, sd, pair, allocation=None):
"""
function to short spread
"""
if allocation is None:
allocation = self.allocation
wt1 = allocation * -1
wt2 = allocation
pair0_shares = int(wt1 / self.Securities[pair[0]].Close)
pair1_shares = int(wt2 / self.Securities[pair[1]].Close)
self.Debug(
f"SHORT spread : {str(sd.pair[0])} dv={wt1:.2f} shares={pair0_shares} vs {str(sd.pair[1])} dv={wt2:.2f} shares={pair1_shares}"
)
# send market on close orders instead
mkto0 = self.MarketOnCloseOrder(pair[0], pair0_shares)
mkto1 = self.MarketOnCloseOrder(pair[1], pair1_shares)
# save trade data
sd.trade_data = pd.DataFrame(
index=[pair[0], pair[1]], columns=["timestamp", "market_order", "quantity"]
)
sd.trade_data.loc[pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity)
sd.trade_data.loc[pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity)
# set trade flags
sd.in_trade = True
sd.short_spread = True
sd.trade_entry_timestamp = self.Time
return
##################
def calc_and_log_portfolio_anl_vol(self):
# accumulate portfolio equity to log rolling annualized vol of portfolio
self.portfolio_equity_history.append(
(self.Time, self.Portfolio.TotalPortfolioValue)
)
eq_s = pd.DataFrame(self.portfolio_equity_history).set_index(0)[
1
] # .drop_duplicates()
if len(eq_s) >= self.portfolio_anl_vol_lookback:
anl_vol = eq_s.pct_change().dropna().std() * np.sqrt(252)
self.Log(f"{self.Time} PORTFOLIO rolling annualized std: {anl_vol:.2%}")
return
##################
def calc_and_log_pair_anl_vol(self, pair, symbol_data):
if len(symbol_data.roll_mean.dropna()) > 3:
anl_vol = symbol_data.roll_mean.pct_change().dropna().std() * np.sqrt(252)
self.Log(f"{self.Time} {[str(p) for p in pair]} rolling annualized std: {anl_vol:.2%}")
self.Log(
f"spread {symbol_data.spread.iloc[-1] if np.isfinite(symbol_data.spread.iloc[-1]) else 0}"
)
self.Log(
f"rollmean {symbol_data.roll_mean.iloc[-1] if np.isfinite(symbol_data.roll_mean.iloc[-1]) else 0}"
)
self.Log(
f"bb upper {symbol_data.bb_upper.iloc[-1] if np.isfinite(symbol_data.bb_upper.iloc[-1]) else 0}"
)
self.Log(
f"bb lower {symbol_data.bb_lower.iloc[-1] if np.isfinite(symbol_data.bb_lower.iloc[-1]) else 0}"
)
return anl_vol
##################
def trade(self):
"""
function to implement trades
"""
# accumulate portfolio equity to log rolling annualized vol of portfolio
self.calc_and_log_portfolio_anl_vol()
if self.symbols is None:
self.Debug(f"[{self.Time}] missing symbols inside trade")
return
# get prices
prices = self.get_daily_prices()
if prices.empty:
return
# get intraday prices for trade entry and exit
intraday_prices = self.get_intraday_prices()
if intraday_prices.empty:
return
for pair in self.symbol_data.copy().keys():
sd = self.symbol_data[pair]
# if already in trade check to see if our maximum trade duration is breached
if sd.in_trade and not sd.trade_timeout:
sd.check_trade_timeout()
# compute spread and boundaries
sd.compute_spread(prices, intraday_prices)
# log annualized vol for pair
pair_anl_vol = self.calc_and_log_pair_anl_vol(pair, sd)
# need a minimum number of vol
if pair_anl_vol < 0.1:
pair_anl_vol = 0.1
if not self.pair_can_trade(pair):
continue
# plot spread data for debugging individual pairs
# NOTE: only works for a single pair set
if self.PLOT:
self.Plot(
self.splotName,
"spread",
sd.spread.iloc[-1] if np.isfinite(sd.spread.iloc[-1]) else 0,
)
self.Plot(
self.splotName,
"rollmean",
sd.roll_mean.iloc[-1] if np.isfinite(sd.roll_mean.iloc[-1]) else 0,
)
self.Plot(
self.splotName,
"bb upper",
sd.bb_upper.iloc[-1] if np.isfinite(sd.bb_upper.iloc[-1]) else 0,
)
self.Plot(
self.splotName,
"bb lower",
sd.bb_lower.iloc[-1] if np.isfinite(sd.bb_lower.iloc[-1]) else 0,
)
# if not in trade for this pair
if not sd.in_trade:
allocation = (
self.Portfolio.TotalPortfolioValue
* self.target_vol
/ pair_anl_vol
/ len(self.symbols)
)
allocation = quantize(allocation)
# is buy trigger?
if sd.is_long_trigger():
self.Debug(
f"{self.Time}::{[str(p) for p in pair]} {pair_anl_vol:.2%} ${allocation:,}"
)
self.buy_spread(sd, pair, allocation)
# or short triggered?
elif sd.is_short_trigger():
self.Debug(
f"{self.Time}::{[str(p) for p in pair]} {pair_anl_vol:.2%} ${allocation:,}"
)
self.short_spread(sd, pair, allocation)
return
def check_trade_exit(self):
if self.symbols is None:
self.Debug(f"[{self.Time}] missing symbols inside check_trade_exit")
return
# get prices
prices = self.get_daily_prices()
if prices.empty:
return
# get intraday prices for trade entry and exit
intraday_prices = self.get_intraday_prices()
if intraday_prices.empty:
return
# no trades after 4pm RTH
if self.Time.hour >= 16:
return
for pair in self.symbol_data.copy().keys():
sd = self.symbol_data[pair]
# if already in trade check to see if our maximum trade duration is breached
if sd.in_trade and not sd.trade_timeout:
sd.check_trade_timeout()
# compute spread and boundaries
sd.compute_spread(prices, intraday_prices)
if not self.pair_can_trade(pair):
continue
# if already in trade for this pair
if sd.in_trade:
# if long spread check that spread is >= roll mean
if sd.long_spread:
spread_gt_mean = sd.intraday_spread >= sd.roll_mean.iloc[-1]
if spread_gt_mean:
for symbol in sd.pair:
qty = sd.trade_data.loc[symbol, "quantity"]
# exit only the quantity of shares involved in that trade
self.MarketOnCloseOrder(
symbol,
-1 * qty,
tag=f"TP LONG SPREAD::{str(sd.pair[0])}-{str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}",
)
sd.reset_trade_flags()
self.Debug(
f"tp liquidating long spread: {str(sd.pair[0])} vs {str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}"
)
# else if short spread check that spread is <= roll mean
elif sd.short_spread:
spread_lt_mean = sd.intraday_spread <= sd.roll_mean.iloc[-1]
if spread_lt_mean:
for symbol in sd.pair:
# exit only the quantity of shares involved in that trade
qty = sd.trade_data.loc[symbol, "quantity"]
self.MarketOnCloseOrder(
symbol,
-1 * qty,
tag=f"TP SHORT SPREAD::{str(sd.pair[0])}-{str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}",
)
sd.reset_trade_flags()
self.Debug(
f"tp liquidating short spread: {str(sd.pair[0])} vs {str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}"
)
##################
def OnSecuritiesChanged(self, changes):
self._changes = changes
for security in changes.AddedSecurities:
security.MarginModel = PatternDayTradingMarginModel()
self.Debug(self.Time)
def OnData(self, data):
"""OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
"""
if self._changes is None:
return