| Overall Statistics |
|
Total Orders 48 Average Win 0.48% Average Loss -0.29% Compounding Annual Return 118.963% Drawdown 1.400% Expectancy 0.440 Start Equity 1000000.00 End Equity 1030517.38 Net Profit 3.052% Sharpe Ratio 6.616 Sortino Ratio 12.001 Probabilistic Sharpe Ratio 88.200% Loss Rate 46% Win Rate 54% Profit-Loss Ratio 1.66 Alpha 0.643 Beta 0.149 Annual Standard Deviation 0.113 Annual Variance 0.013 Information Ratio 0.412 Tracking Error 0.123 Treynor Ratio 5.006 Total Fees $0.00 Estimated Strategy Capacity $210000.00 Lowest Capacity Asset LTCUSD 2XR Portfolio Turnover 136.48% |
import numpy as np
import pandas as pd
import statsmodels.api as sm
from datetime import datetime
from AlgorithmImports import *
from arch.unitroot import ADF
from arch.unitroot import KPSS
from arch.unitroot import PhillipsPerron
from sklearn.linear_model import LinearRegression
###Minute Scale
class CointegrationAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2024, 6, 1) # Set Start Date
self.SetEndDate(2024, 6, 14) # Set End Date
self.SetCash(1000000) # Set Strategy Cash
self.set_brokerage_model(BrokerageName.QuantConnectBrokerage, AccountType.MARGIN)
self.formulation_hours = 21 #Hours to look back, optimized parameter
self.model = False
self.cointegrated = False
self.model_date = None
self.coef = None
self.position = False
self.upper_bound = None
self.lower_bound = None
self.mean = None
self.pos_type = None
self.sigma = 3.5 #upper/lower boundaries, optimized parameter
self.spread_val = None
self.stop_loss_spread_val = None
self.stop_loss_constant = 0.05
self.lev = 1
self.trailing_spread_val = None
self.trailing_constant = 0.02
# Add Crypto assets
self.symbol1 = "LTCUSD"
self.symbol2 = "SOLUSD"
#self.symbol3 = "SOLUSD"
self.symbols = ['LTCUSD','SOLUSD']
self.amount_assets = len(self.symbols)
self.a1 = self.AddCrypto(self.symbol1, Resolution.Minute, leverage=1.0).Symbol
self.a2 = self.AddCrypto(self.symbol2, Resolution.Minute, leverage=1.0).Symbol
#self.a3 = self.AddCrypto(self.symbol3, Resolution.Minute, leverage=1.0).Symbol
self.SetWarmUp(self.formulation_hours)
def OnData(self, data): #goes through each minute
#checks for warm-up period
if self.is_warming_up:
return
current_time = self.Time
current_hour = current_time.hour
if current_hour >= 20 or current_hour <= 13: #if the hour of day is less than 18
#see if there is a model built
if self.model == False and self.cointegrated == False:
self.stationarity_tests()
if self.model == False and self.cointegrated == True:
self.create_model()
else: #we have a model built and we check to see if our spread is at a boundary
self.calculate_spread_value()
#self.debug(f"spread val:{self.spread_val}, lower_bound:{self.lower_bound}, upper_bound:{self.upper_bound}")
self.check_spread() #check to enter/exit trades
if self.position == True:
self.check_stop()
pass
else:
#liquidate all positions and clear vars
self.clear_vars()
self.close_all_positions()
self.debug(f"clearing vars and liquidating")
pass
def close_all_positions(self):
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
return
def check_spread(self):
prices = [self.securities[symbol].Price for symbol in self.symbols]
if self.position == True: #close out existing once target hit
if self.pos_type == "LONG":
if self.spread_val > self.upper_bound:
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]}")
self.debug(f"Closing Long spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, coefs:{self.coef}")
self.clear_vars_exit()
return
elif self.pos_type == "SHORT":
if self.spread_val < self.lower_bound:
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]}")
self.debug(f"Closing Short spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, coefs:{self.coef}")
self.clear_vars_exit()
return
else: #enter new position
if self.spread_val > self.upper_bound:
self.calc_qty()
for i in range(len(self.symbols)):
self.market_order(self.symbols[i], -1*self.lev*self.coef[i])
self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]} QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}")
self.debug(f"Short spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}")
self.position = True
self.pos_type = "SHORT"
self.stop_loss_spread_val = self.upper_bound + self.upper_bound * self.stop_loss_constant if self.spread_val > 0 else self.upper_bound - self.upper_bound * self.stop_loss_constant
self.trailing_spread_val = self.stop_loss_spread_val
elif self.spread_val < self.lower_bound: #we buy the spread
self.calc_qty()
for i in range(len(self.symbols)):
self.market_order(self.symbols[i], self.lev*self.coef[i])
self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]} QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}")
self.debug(f"Long spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}")
self.position = True
self.pos_type = "LONG"
self.stop_loss_spread_val = self.lower_bound - self.lower_bound * self.stop_loss_constant if self.spread_val > 0 else self.lower_bound + self.lower_bound * self.stop_loss_constant
self.trailing_spread_val = self.stop_loss_spread_val
return
def check_stop(self):
if self.spread_val < self.stop_loss_spread_val and self.pos_type == "LONG":
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
self.debug(f"Long pos stopped out: {self.spread_val}")
self.clear_vars()
elif self.spread_val > self.stop_loss_spread_val and self.pos_type == "SHORT":
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
self.debug(f"Short pos stopped out: {self.spread_val}")
self.clear_vars()
elif self.spread_val < self.trailing_spread_val and self.pos_type == "LONG":
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
self.debug(f"stopped out trailing, spread_val: {self.spread_val}")
self.clear_vars()
elif self.spread_val > self.trailing_spread_val and self.pos_type == "SHORT":
for i in range(len(self.symbols)):
self.SetHoldings(self.symbols[i], 0)
self.debug(f"stopped out trailing, spread_val: {self.spread_val}")
self.clear_vars()
#update trailing stop
if self.pos_type == "LONG":
self.trailing_spread_val = self.spread_val - self.spread_val * self.trailing_constant if self.spread_val > 0 else self.spread_val + self.spread_val * self.trailing_constant
if self.pos_type == "SHORT":
self.trailing_spread_val = self.spread_val + self.spread_val * self.trailing_constant if self.spread_val > 0 else self.spread_val - self.spread_val * self.trailing_constant
return
def clear_vars_exit(self):
self.position = False
self.pos_type = None
self.stop_loss_spread_val = None
return
def calc_qty(self):
avail_cash = self.portfolio.cash
prices = [self.Securities[symbol].Price for symbol in self.symbols]
total_value = 0
for i in range(len(prices)):
total_value += abs(self.coef[i]*prices[i])
pct_portfolio = 0.80
self.lev = (pct_portfolio * avail_cash)/(total_value)
#self.lev = 1
return
def clear_vars(self):
self.model = False
self.cointegrated = False
self.model_date = None
self.coef = None
self.position = False
self.upper_bound = None
self.lower_bound = None
self.mean = None
self.pos_type = None
self.spread_val = None
self.stop_loss_spread_val = None
return
def calculate_spread_value(self):
assets = {}
for symbol, security in self.securities.items():
history = self.History([symbol], 5 , Resolution.Minute)
if len(history) > 0:
assets[str(symbol)] = history.loc[symbol]['close']
else:
return
df_spread = pd.DataFrame.from_dict(assets)
last_row = df_spread.iloc[-1].values
self.spread_val = np.dot(last_row, np.array(self.coef))
return
def stationarity_tests(self):
for symbol, security in self.securities.items():
history = self.History([symbol], timedelta(hours=self.formulation_hours), Resolution.Minute)
close_prices = history.loc[symbol]['close'].diff().dropna()
adf = ADF(close_prices)
pp = PhillipsPerron(close_prices)
kpss = KPSS(close_prices)
if adf.pvalue < 0.05 and pp.pvalue < 0.05 and kpss.pvalue > 0.10: #check weather pass all the tests
continue
else:
return
#make it through all of the tickers then it is cointegrated
self.cointegrated = True
return
def create_model(self):
lin_model = LinearRegression()
assets = {}
for symbol, security in self.securities.items():
#self.debug(f"symbol is:{symbol}")
history = self.History([symbol], timedelta(hours=self.formulation_hours), Resolution.Minute)
assets[str(symbol)] = history.loc[symbol]['close']
if self.amount_assets == 3:
X = np.column_stack((assets[self.symbol2].values, assets[self.symbol3].values))
Y = assets[self.symbol1].values
lin_model.fit(X,Y)
spread = assets[self.symbol1].values - (lin_model.coef_[0]*assets[self.symbol2].values) - (lin_model.coef_[1]*assets[self.symbol3].values)
else:
X = np.array(assets[self.symbol2].values).reshape(-1, 1)
Y = assets[self.symbol1].values
lin_model.fit(X,Y)
spread = assets[self.symbol1].values - (lin_model.coef_[0]*assets[self.symbol2].values)
self.mean = np.mean(spread)
self.upper_bound = self.mean + self.sigma * np.std(spread)
self.lower_bound = self.mean - self.sigma * np.std(spread)
self.coef = [1] + list(-1*lin_model.coef_)
#self.debug(f"coef:{self.coef}")
df_spread = pd.DataFrame.from_dict(assets)
last_row = df_spread.iloc[-1].values
self.spread_val = np.dot(last_row, np.array(self.coef))
self.model_built = True
self.model_date = self.Time
self.debug(f"time:{self.Time}, spread val:{self.spread_val}, lower_bound:{self.lower_bound}, upper_bound:{self.upper_bound}")
return