| Overall Statistics |
|
Total Orders 776 Average Win 0.09% Average Loss -0.09% Compounding Annual Return 24.754% Drawdown 0.500% Expectancy 0.101 Start Equity 100000 End Equity 103305.47 Net Profit 3.305% Sharpe Ratio 4.296 Sortino Ratio 7.496 Probabilistic Sharpe Ratio 99.571% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 0.99 Alpha 0.103 Beta 0.035 Annual Standard Deviation 0.024 Annual Variance 0.001 Information Ratio 0.884 Tracking Error 0.101 Treynor Ratio 2.956 Total Fees $1.05 Estimated Strategy Capacity $9300000.00 Lowest Capacity Asset DE R735QTJ8XC9X Portfolio Turnover 255.41% |
#region imports
from AlgorithmImports import *
#endregion
from scipy.stats import linregress
import math
import numpy as np
class Consolidator:
limits = False
def __init__(self, s1, s2, algo, smooth, z, ratio):
self.s1 = s1
self.s2 = s2
self.algo = algo
self.hedge_ratio = ratio
self.smooth = smooth
self.z = z
self.child = s1
self.parent = s2
self.s1_ask = None
self.s1_bid = None
self.s2_bid = None
self.s2_ask = None
self.warmed_up = False
self.S1 = Identity('S1')
self.S2 = Identity('S2')
self.sym_str = f'{self.s1}/{self.s2}'
cons = self.algo.res
self.is_invested = None
self.order_tickets = []
self.algo.RegisterIndicator(self.s1, self.S1, cons) #MAYBE dont use any consolidator? Just subscribe to self.algo.res?
self.algo.RegisterIndicator(self.s2, self.S2, cons)
# self.hedge_ratio = 1.0 # DEFAULT -- Null equivalent.
# try:
# self.GetHedgeRatio()
# except:
# self.algo.Debug(f'Error -- Cannot get hedge ratio (YET)')
self.series = IndicatorExtensions.Minus(self.S1, IndicatorExtensions.Times( self.S2, self.hedge_ratio))
self.bb = BollingerBands(self.smooth, self.z, MovingAverageType.Exponential)
n_pairs = len(self.algo.runs)
per_symbol = .9 / n_pairs # Take approx 2x leverage, little below
self.per_symbol = per_symbol
# This will not work with ensemble
self.long_targets = [PortfolioTarget(self.s1, per_symbol), PortfolioTarget(self.s2, -per_symbol)]
self.short_targets = [PortfolioTarget(self.s1, -per_symbol), PortfolioTarget(self.s2, per_symbol)]
self.flat_targets = [PortfolioTarget(self.s1, 0.0), PortfolioTarget(self.s2, 0.0)]
try:
self.WarmUp()
except:
self.algo.Debug(f'Cannot Run Warmup (No idea Why)')
pass
def WarmUp(self):
n = max(1000, self.smooth + 10)
history = self.algo.History([self.s1, self.s2], n, self.algo.res)
hist = history.unstack(level=0).close
hist.dropna(inplace=True)
for tm, row in hist.iterrows():
value = row[self.s1] - row[self.s2] * self.hedge_ratio
self.bb.Update(tm, value)
self.warmed_up = True
def GetHedgeRatio(self, history = None):
if history is None:
try:
history = self.algo.History([self.s1, self.s2], self.algo.fit_period, self.algo.fit_res)
hist = history.unstack(level=0).close
hist.dropna(inplace=True)
p1 = hist[self.s1]
p2 = hist[self.s2]
except:
self.algo.Debug(f' --------- CANNOT get History, therefore hedge ratio ------------- ')
return False
else:
p1 = history.loc[self.s1].close
p2 = history.loc[self.s2].close
# s1 is officially child
# s2 is officially parent
# so -- s1 is y
# -- s2 is x
# -- why doesn't this make a huge difference for these pairs? weird.
if self.algo._SWAP_XY:
reg = linregress(p1, p2)
else:
reg = linregress(p2,p1)
# Proper order is (x,y)
self.hedge_ratio = reg.slope
self.algo.Debug(f'Ratio Fit --> {self.s1} - {self.hedge_ratio} * {self.s2}')
return True
def OnData(self, data):
qc = self.algo
if (not data.Bars.ContainsKey(self.s1)) or (not data.Bars.ContainsKey(self.s2)):
return
for symbol, quote_bar in data.QuoteBars.items():
if symbol == self.s1:
self.s1_bid = quote_bar.Bid.Close
self.s1_ask = quote_bar.Ask.Close
self.s1_time = quote_bar.EndTime
if symbol == self.s2:
self.s2_bid = quote_bar.Bid.Close
self.s2_ask = quote_bar.Ask.Close
self.s2_time = quote_bar.EndTime
if self.hedge_ratio == 1:
self.GetHedgeRatio()
# Ensure on same event, of both products.
if self.s1_time != self.s2_time:
return
# Ensure warmed up,
if not self.bb.IsReady:
return
serie = self.series.Current.Value
self.bb.Update(self.algo.Time, serie)
if self.bb.IsReady:
if self.hedge_ratio != 1.0:
self.EntryLogic()
# if self.algo.plot:
# self.PlotSpread()
@property
def QuoteReady(self):
return self.s1_ask != None and self.s1_bid != None and self.s2_ask != None and self.s2_bid != None
def EntryLogic(self):
if not self.bb.IsReady: return
if not self.series.IsReady: return
if not self.QuoteReady: return
serie = self.series.Current.Value
if self.hedge_ratio > 0:
buy_price = self.s1_ask - self.hedge_ratio * self.s2_bid
sell_price = self.s1_bid - self.hedge_ratio * self.s2_ask
else:
buy_price = self.s1_ask - self.hedge_ratio * self.s2_ask
sell_price = self.s1_bid - self.hedge_ratio * self.s2_bid
# This is based on buying spread.
quantity_s1 = self.algo.CalculateOrderQuantity(self.s1, self.per_symbol)
quantity_s2 = self.algo.CalculateOrderQuantity(self.s2, self.per_symbol * np.sign(self.hedge_ratio))
# # if it is not invested, see if there is an entry point
if not self.is_invested:
# if our portfolio is bellow the lower band, enter long
if buy_price < self.bb.LowerBand.Current.Value:
self.algo.SetHoldings(self.long_targets, tag=f"LE -- {self.sym_str}: {buy_price} < {self.bb.LowerBand.Current.Value}")
# This enables ensemble -- but behaves differently?
# q1 = quantity_s1
# q2 = quantity_s2
# self.algo.MarketOrder(self.s1, q1)
# self.algo.MarketOrder(self.s2, q2)
# self.pos_s1 = q1
# self.pos_s2 = q2
self.is_invested = 'long'
if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {buy_price} < {self.bb.LowerBand.Current.Value}')
if self.algo.db_lvl >= 1: self.algo.Debug(f'LE : {self.sym_str}')
# if our portfolio is above the upper band, go short
if sell_price > self.bb.UpperBand.Current.Value:
self.algo.SetHoldings(self.short_targets, tag=f"SE -- {self.sym_str}: {sell_price} > {self.bb.UpperBand.Current.Value}")
# q1 = -1 * quantity_s1
# q2 = -1 * quantity_s2
# self.algo.MarketOrder(self.s1, q1)
# self.algo.MarketOrder(self.s2, q2)
# self.pos_s1 = q1
# self.pos_s2 = q2
self.is_invested = 'short'
if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {sell_price} > {self.bb.UpperBand.Current.Value}')
if self.algo.db_lvl >= 1: self.algo.Debug(f'SE: {self.sym_str}')
# if it is invested in something, check the exiting signal (when it crosses the mean)
elif self.is_invested == 'long':
if sell_price > self.bb.MiddleBand.Current.Value:
self.algo.SetHoldings(self.flat_targets, tag=f"LX -- {self.sym_str}: {sell_price} > {self.bb.MiddleBand.Current.Value}")
# self.algo.MarketOrder(self.s1, self.pos_s1 * -1)
# self.algo.MarketOrder(self.s2, self.pos_s2 * -1)
# self.pos_s1 = 0
# self.pos_s2 = 0
self.is_invested = None
if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {sell_price} > {self.bb.MiddleBand.Current.Value}')
if self.algo.db_lvl >= 1: self.algo.Debug(f'LX {self.sym_str}')
elif self.is_invested == 'short':
if buy_price < self.bb.MiddleBand.Current.Value:
self.algo.SetHoldings(self.flat_targets, tag=f"SX -- {self.sym_str}: {buy_price} < {self.bb.MiddleBand.Current.Value}")
# self.algo.MarketOrder(self.s1, self.pos_s1 * -1)
# self.algo.MarketOrder(self.s2, self.pos_s2 * -1)
# self.pos_s1 = 0
# self.pos_s2 = 0
self.is_invested = None
if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {buy_price} < {self.bb.MiddleBand.Current.Value}')
if self.algo.db_lvl >= 1: self.algo.Debug(f'SX {self.sym_str}')
def PlotSpread(self):
# if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {serie} < {self.bb.LowerBand.Current.Value}')
self.algo.Plot("Strategy Equity", f"{self.sym_str}", serie)
self.algo.Plot("Strategy Equity", f"{self.sym_str}_Upper", self.bb.UpperBand.Current.Value)
self.algo.Plot("Strategy Equity", f"{self.sym_str}_Middle", self.bb.MiddleBand.Current.Value)
self.algo.Plot("Strategy Equity", f"{self.sym_str}_Lower", self.bb.LowerBand.Current.Value)
self.algo.Debug(f'Spread Plotted...')
# region QuoteLogic
def SpreadLimitOrder(self, dir=1):
s1 = self.s1
s2 = self.s2
ps = self.per_symbol
if not self.QuoteReady: return
p1 = self.algo.Portfolio[s1].Price
p2 = self.algo.Portfolio[s2].Price
if dir == 1:
q1 = self.algo.CalculateOrderQuantity(s1, ps)
q2 = self.algo.CalculateOrderQuantity(s2, -ps * np.sign(self.hedge_ratio))
p1 = self.s1_ask
p2 = self.s2_bid if self.hedge_ratio > 0 else self.s2_ask
else:
q1 = self.algo.CalculateOrderQuantity(s1, -ps)
q2 = self.algo.CalculateOrderQuantity(s2, ps * np.sign(self.hedge_ratio))
p1 = self.s1_bid
p2 = self.s2_ask if self.hedge_ratio > 0 else self.s2_bid
self.s1_ticket = self.algo.LimitOrder(s1, q1, p1)
self.s2_ticket = self.algo.LimitOrder(s2, q2, p2)
self.order_tickets.append(self.s1_ticket)
self.order_tickets.append(self.s2_ticket)
@property
def OrderIds(self):
if not self.order_tickets: return []
return [i.OrderId for i in self.order_tickets]
def OnParentUpdate(self, order: OrderEvent):
order = self.algo.Transactions.GetOrderById(orderEvent.OrderId)
def OnChildUpdate(self, order: OrderEvent):
order = self.algo.Transactions.GetOrderById(orderEvent.OrderId)
# if orderEvent.Status == OrderStatus.Filled:
# self.Debug(f"{self.Time}: {order.Type}: {orderEvent}")
def OnOrderUdate(self, order: OrderEvent):
"""
Treated as a delegate method, called via Algo.main, if routed via id to this instance
https://www.quantconnect.com/docs/v2/writing-algorithms/trading-and-orders/order-events#01-Introduction
"""
order = self.algo.Transactions.GetOrderById(orderEvent.OrderId)
if order.Symbol == inst.child:
inst.OnChildUpdate(order)
if order.Symbol == inst.parent:
inst.OnParentUpdate(order)
# endregion#region imports
from AlgorithmImports import *
from scipy.stats import linregress
import math
from Consolidator import Consolidator
from datetime import timedelta
from enum import Enum
#endregion
class Run:
def __init__(self, child: str, parent: str, smooth: int, z: float, ratio: float):
self.child = child
self.parent = parent
self.smooth = smooth
self.z = z
self.ratio = ratio
@classmethod
def base(cls, child: str, parent: str):
return cls(child, parent, 150, 1.5)
# How often do we re-fit the hedge ratio.
class FitFrequency(Enum):
DAILY = 0
WEEKLY = 1
MONTHLY = 2
class YXTaker(QCAlgorithm):
# Fit parameters.
fit_period = 500
fit_res = Resolution.Hour
fit_freq = FitFrequency.WEEKLY
# This will fit on 500 hours -- just what I was using to begin with.
_SWAP_XY = False
# Makes no sense, but for some reason swapping this works?
# Model Parameters. (defaults)
res = Resolution.Minute # Could do Tick/ Second...
bb_len = 120
n_sig = .8
# Params overridden by run in Runs -- below.
runs = [
Run('EWC','EWA', 500, 1.5, 1.5),
Run('XOM','CVX', 150, 3.5, .75),
Run('QQQ','XLK', 150, 2, 2.1),
# Run('AMAT','XLC', 500, 3, 3)
Run("CAT",'DE', 500, 3, 1.5), # Fuck yeah.
# Run("WFC",'MS', 1000, 2, .63), # Works more of LATE! earlier, eh.
Run("V",'MA', 350, 2, .58),
# Run("BLK", "C", 350, 2, 13),
# Run("WMT","TGT", 250, 1, .33),
# Horrible pair -- so interesting.
]
# Debug parameters.
db_lvl = 2
plot = False
def Initialize(self):
self.SetStartDate(2024,3,15)
self.SetCash(100000)
self.AddEquity('SPY')
self._universe_bullshit = [[i.child, i.parent] for i in self.runs]
self.SafeOpt()
self.Pairs = {}
# self.SetWarmup(timedelta(days=5))
_added = {}
# for _s1, _s2 in self.Universe:
for i in self.runs:
try:
if i.child not in _added:
s1 = self.AddEquity(i.child, self.res).Symbol
_added[i.child] = s1
else:
s1 = _added[i.child]
if i.parent not in _added:
s2 = self.AddEquity(i.parent, self.res).Symbol
_added[i.parent] = s2
else:
s2 = _added[i.parent]
# Regardless of how added, we now have symbols, and tickers -- good to go.
self.Pairs[(i.child, i.parent)] = Consolidator(s1, s2, self, i.smooth, i.z, i.ratio)
except:
self.Debug(f'Cannot Add {i}')
for symbol, security in self.Securities.items():
security.SetSlippageModel(ConstantSlippageModel(0))
security.SetFeeModel(MakerTakerModel())
# region Re-Fit Callbacks
# if self.fit_freq == FitFrequency.DAILY:
# self.Schedule.On(self.DateRules.EveryDay("SPY"),
# self.TimeRules.AfterMarketOpen("SPY", -10),
# self.ReFit)
# elif self.fit_freq == FitFrequency.WEEKLY:
# self.Schedule.On(self.DateRules.WeekStart("SPY"),
# self.TimeRules.AfterMarketOpen("SPY", -10),
# self.ReFit)
# elif self.fit_freq == FitFrequency.MONTHLY:
# self.Schedule.On(self.DateRules.MonthStart("SPY"),
# self.TimeRules.AfterMarketOpen("SPY", -10),
# self.ReFit)
# endregion
def ReFit(self):
for pair, inst in self.Pairs.items():
inst.GetHedgeRatio()
def OnData(self, data):
for pair, inst in self.Pairs.items():
inst.OnData(data)
def SafeOpt(self):
tst = self.GetParameter('bb-len')
if tst: self.bb_len = int(tst)
tst = self.GetParameter('n-sig')
if tst: self.n_sig = float(tst)
def OnOrderEvent(self, orderEvent: OrderEvent) -> None:
# order = self.Transactions.GetOrderById(orderEvent.OrderId)
for parent_child, inst in self.Pairs.items():
if orderEvent.OrderId in inst.OrderIds:
inst.OnOrderUpdate(orderEvent)
# Can do any bookkeeping, order tracking, logging here.
class MakerTakerModel(FeeModel):
def __init__(self, maker = -.0016, taker = .003):
self.maker = maker
self.taker = taker
def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee:
qty = parameters.Order.Quantity
ord_type = parameters.Order.Type
# self.Debug(f'Order Type: {ord_type}')
if ord_type in [OrderType.Market, OrderType.StopMarket]:
fee_usd = self.taker * qty
else:
fee_usd = self.maker * qty
return OrderFee(CashAmount(fee_usd, 'USD'))#region imports
'''
Ratio taken from Research (06 - 12)
Tested out of sample from 12 +
'''
from AlgorithmImports import *
#endregion
from scipy.stats import linregress
import math
class EMAMomentumUniverse(QCAlgorithm):
res = Resolution.Hour
s1 = 'GOOG'
s2 = 'GOOGL'
# DISC, DISCA, NWS, NWSA, UA, UAA, HPE, HPQ ? Others?
bb_len = 120
n_sig = .8
def Initialize(self):
# Define backtest window and portfolio cash
self.SetStartDate(2012, 6, 10)
self.SetEndDate(2012, 6, 20)
# self.SetEndDate(2012 + 5, 6, 10) # Try running for 3 years after?
# self.SetEndDate(2021, 6, 9) # Try running for 10 yrs after?
self.SetCash(100000)
# Add the assets to be fed into the algorithm and save the symbol objects (to be referred later)
self.ewc_symbol = self.AddEquity(self.s1, self.res).Symbol
self.ewa_symbol = self.AddEquity(self.s2, self.res).Symbol
# Create two identity indicators (a indicator that repeats the value without any processing)
self.ewc_identity = Identity("My_EWC")
self.ewa_identity = Identity("My_EWA")
# Set these indicators to receive the data from EWC and EWA
self.RegisterIndicator(self.ewc_symbol, self.ewc_identity, self.res)
self.RegisterIndicator(self.ewa_symbol, self.ewa_identity, self.res)
# ---------------------------------------------- ADDIT ------------------------------------------
# h = self.History(self.Securities.Keys, 500, self.res) # Maybe not working?
# p1 = h.loc[self.s1]
# p1 = self.History(self.ewa_symbol, 500, self.res).loc[self.ewa_symbol].close
# p2 = self.History(self.ewc_symbol, 500, self.res).loc[self.ewc_symbol].close
# reg = linregress(p1, p2)
# self.hedge = reg.slope
# self.hedge = 1.3188 #Original
self.hedge = None
# ---------------------------------------- End
# create the portfolio as a new indicator
# this is handy as the portfolio will be updated as new data comes in, without the necessity of updating the values manually
# as the QCAlgorithm already has a Portfolio attribute, we will call our combined portfolio as series
self.series = IndicatorExtensions.Minus(self.ewc_identity, IndicatorExtensions.Times(self.ewa_identity, 1.3188))
# We then create a bollinger band with 120 steps for lookback period
self.bb = BollingerBands(self.bb_len, self.n_sig, MovingAverageType.Exponential)
# Register indicator here...
# Define the objectives when going long or going short (long=buy EWC and sell EWA) (short=sell EWC and buy EWA)
self.long_targets = [PortfolioTarget(self.ewc_symbol, 0.9), PortfolioTarget(self.ewa_symbol, -0.9)]
self.short_targets = [PortfolioTarget(self.ewc_symbol, -0.9), PortfolioTarget(self.ewa_symbol, 0.9)]
self.is_invested = None
self.first_time = True
for symbol, security in self.Securities.items():
security.SetSlippageModel(ConstantSlippageModel(0))
# kvp.SetFeeModel(ConstantFeeModel(.008)) #This is a raw price I think... #TODO: unsure how to write a fee model ?
security.SetFeeModel(MakerTakerModel())
def GetHedgeRatio(self):
p1 = self.History(self.ewa_symbol, 500, self.res).loc[self.ewa_symbol].close
p2 = self.History(self.ewc_symbol, 500, self.res).loc[self.ewc_symbol].close
reg = linregress(p1,p2)
self.hedge = reg.slope
# Define the objectives when going long or going short (long=buy EWC and sell EWA) (short=sell EWC and buy EWA)
self.long_targets = [PortfolioTarget(self.ewc_symbol, 0.9), PortfolioTarget(self.ewa_symbol, -0.9)]
self.short_targets = [PortfolioTarget(self.ewc_symbol, -0.9), PortfolioTarget(self.ewa_symbol, 0.9)]
def OnEndOfDay(self):
if not self.hedge:
try:
self.GetHedgeRatio()
self.first_time = False
self.Debug(f' ------------------------------------- HEDGE QTY SET {self.hedge} --------------------------------------------------- ')
except:
pass
def OnData(self, data):
# for daily bars data is delivered at 00:00 of the day containing the closing price of the previous day (23:59:59)
if (not data.Bars.ContainsKey(self.ewc_symbol)) or (not data.Bars.ContainsKey(self.ewa_symbol)):
return
#update the Bollinger Band value
self.bb.Update(self.Time, self.series.Current.Value)
# check if the bolllinger band indicator is ready (filled with 120 steps)
if not self.bb.IsReady:
return
serie = self.series.Current.Value
# self.Plot("EWA Prices", "Open", self.Securities[self.ewa_symbol].Open)
# self.Plot("EWA Prices", "Close", self.Securities[self.ewa_symbol].Close)
# self.Plot("Indicators", "Serie", serie)
# self.Plot("Indicators", "Middle", self.bb.MiddleBand.Current.Value)
# self.Plot("Indicators", "Upper", self.bb.UpperBand.Current.Value)
# self.Plot("Indicators", "Lower", self.bb.LowerBand.Current.Value)
s1, s2 = self.ewc_symbol, self.ewa_symbol
# if it is not invested, see if there is an entry point
if not self.is_invested:
# if our portfolio is bellow the lower band, enter long
if serie < self.bb.LowerBand.Current.Value:
self.SetHoldings(self.long_targets)
q1 = self.CalculateOrderQuantity(s1, .9)
q2 = self.CalculateOrderQuantity(s2, -.9)
# ask = self.Portfolio[s1].AskPrice
# ask = data.Bars[s1].AskPrice
p1 = self.Portfolio[s1].Price
p2 = self.Portfolio[s2].Price
# self.LimitOrder(s1, q1, p1)
# self.LimitOrder(s2, q1, p2)
# self.Debug('Entering Long')
self.is_invested = 'long'
# if our portfolio is above the upper band, go short
if serie > self.bb.UpperBand.Current.Value:
self.SetHoldings(self.short_targets)
q1 = self.CalculateOrderQuantity(s1, -.9)
q2 = self.CalculateOrderQuantity(s2, .9)
# ask = self.Portfolio[s1].AskPrice
# ask = data.Bars[s1].AskPrice
p1 = self.Portfolio[s1].Price
p2 = self.Portfolio[s2].Price
# Pricing model not working?
# self.LimitOrder(s1, q1, p1)
# self.LimitOrder(s2, q1, p2)
# self.Debug('Entering Short')
self.is_invested = 'short'
# if it is invested in something, check the exiting signal (when it crosses the mean)
elif self.is_invested == 'long':
if serie > self.bb.MiddleBand.Current.Value:
self.Liquidate()
# self.Debug('Exiting Long')
self.is_invested = None
elif self.is_invested == 'short':
if serie < self.bb.MiddleBand.Current.Value:
self.Liquidate()
# self.Debug('Exiting Short')
self.is_invested = None
class MakerTakerModel(FeeModel):
# def __init__(maker = -.0016, taker = .003):
# self.maker = maker
# self.taker = taker
def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee:
qty = parameters.Order.Quantity
ord_type = parameters.Order.Type
# self.Debug(f'Order Type: {ord_type}')
# fee_in_usd = .0008
make_ps = -.0016 #Rebate
take_ps = .003
if ord_type in [OrderType.Market, OrderType.StopMarket]:
fee_usd = take_ps * qty
else:
fee_usd = make_ps * qty
# fee_usd = make_ps * qty
return OrderFee(CashAmount(fee_usd, 'USD'))