| Overall Statistics |
|
Total Orders 145 Average Win 0.63% Average Loss -0.62% Compounding Annual Return -10.147% Drawdown 14.600% Expectancy -0.124 Start Equity 1000000 End Equity 898002.18 Net Profit -10.200% Sharpe Ratio -1.434 Sortino Ratio -1.299 Probabilistic Sharpe Ratio 1.002% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 1.02 Alpha 0 Beta 0 Annual Standard Deviation 0.086 Annual Variance 0.007 Information Ratio -0.815 Tracking Error 0.086 Treynor Ratio 0 Total Fees $2316.07 Estimated Strategy Capacity $1500000.00 Lowest Capacity Asset PX R735QTJ8XC9X Portfolio Turnover 29.22% |
# region imports
from AlgorithmImports import *
from universe import SectorETFUniverseSelectionModel
from portfolio import CointegratedVectorPortfolioConstructionModel
# endregion
class ETFPairsTrading(QCAlgorithm):
def initialize(self):
self.set_start_date(2023, 3, 1) # Set Start Date
self.set_end_date(2024, 3, 1)
self.set_cash(1000000) # Set Strategy Cash
lookback = self.get_parameter("lookback", 60) # lookback window on correlation & coinetgration
threshold = self.get_parameter("threshold", 2) # we want at least 2+% expected profit margin to cover fees
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
# This should be a intra-day strategy
self.set_security_initializer(lambda security: security.set_margin_model(PatternDayTradingMarginModel()))
self.universe_settings.resolution = Resolution.MINUTE
self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW
self.set_universe_selection(SectorETFUniverseSelectionModel(self.universe_settings))
# This alpha model helps to pick the most correlated pair
# and emit signal when they have mispricing that stay active for a predicted period
# https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#09-Pearson-Correlation-Pairs-Trading-Model
self.add_alpha(PearsonCorrelationPairsTradingAlphaModel(lookback, Resolution.DAILY, threshold=threshold))
# We try to use cointegrating vector to decide the relative movement magnitude of the paired assets
self.pcm = CointegratedVectorPortfolioConstructionModel(self, lookback, Resolution.DAILY)
self.pcm.rebalance_portfolio_on_security_changes = False
self.set_portfolio_construction(self.pcm)
# Avoid catastrophic loss in portfolio level by pair trading strategy
#self.add_risk_management(MaximumDrawdownPercentPortfolio(0.08))
self.set_warm_up(timedelta(90))
def on_data(self, slice):
if slice.splits or slice.dividends:
self.pcm.handle_corporate_actions(self, slice)#region imports
from AlgorithmImports import *
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from arch.unitroot.cointegration import engle_granger
from utils import reset_and_warm_up
#endregion
class CointegratedVectorPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, algorithm, lookback = 252, resolution = Resolution.MINUTE,
rebalance = Expiry.END_OF_WEEK) -> None:
super().__init__(rebalance, PortfolioBias.LONG_SHORT)
self.algorithm = algorithm
self.lookback = lookback
self.resolution = resolution
def should_create_target_for_insight(self, insight: Insight) -> bool:
# Ignore insights if the asset has open position in the same direction
return self.should_create_new_target(insight.symbol, insight.direction)
def determine_target_percent(self, active_insights: List[Insight]) -> Dict[Insight, float]:
# If less than 2 active insights, no valid pair trading can be resulted
if len(active_insights) < 2:
self.live_log(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets')
return {insight: 0 for insight in active_insights}
result = {}
# Get log return for cointegrating vector regression
logr = pd.DataFrame({symbol: self.returns(self.algorithm.securities[symbol])
for symbol in self.algorithm.securities.keys() if symbol in [x.symbol for x in active_insights]})
# fill nans with mean, if the whole column is nan, drop it
logr = logr.fillna(logr.mean()).dropna(axis=1)
# make sure we have at least 2 columns
if logr.shape[1] < 2:
self.live_log(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets.')
return {insight: 0 for insight in active_insights}
# Obtain the cointegrating vector of all signaled assets for statistical arbitrage
model = engle_granger(logr.iloc[:, 0], logr.iloc[:, 1:], trend='n', lags=0)
# If result not significant, return
if model.pvalue > 0.05:
return {insight: 0 for insight in active_insights}
# Normalization for budget constraint
coint_vector = model.cointegrating_vector
total_weight = sum(abs(coint_vector))
for insight, weight in zip(active_insights, coint_vector):
# we can assume any paired assets' 2 dimensions in coint_vector are in opposite sign
result[insight] = abs(weight) / total_weight * insight.direction
return result
def on_securities_changed(self, algorithm, changes):
self.live_log(algorithm, f'PortfolioContructionModel.on_securities_changed: Changes: {changes}')
super().on_securities_changed(algorithm, changes)
for added in changes.added_securities:
self.init_security_data(algorithm, added)
for removed in changes.removed_securities:
self.dispose_security_data(algorithm, removed)
def handle_corporate_actions(self, algorithm, slice):
symbols = set(slice.dividends.keys())
symbols.update(slice.splits.keys())
for symbol in symbols:
self.warm_up_indicator(algorithm.securities[symbol])
def live_log(self, algorithm, message):
if algorithm.live_mode:
algorithm.log(message)
def init_security_data(self, algorithm, security):
# To store the historical daily log return
security['window'] = RollingWindow[IndicatorDataPoint](self.lookback)
# Use daily log return to predict cointegrating vector
security['logr'] = LogReturn(1)
security['logr'].updated += lambda _, updated: security['window'].add(IndicatorDataPoint(updated.end_time, updated.value))
security['consolidator'] = TradeBarConsolidator(timedelta(1))
# Subscribe the consolidator and indicator to data for automatic update
algorithm.register_indicator(security.symbol, security['logr'], security['consolidator'])
algorithm.subscription_manager.add_consolidator(security.symbol, security['consolidator'])
self.warm_up_indicator(security)
def warm_up_indicator(self, security):
self.reset(security)
security['consolidator'] = reset_and_warm_up(self.algorithm, security, self.resolution, self.lookback)
def reset(self, security):
security['logr'].reset()
security['window'].reset()
def dispose_security_data(self, algorithm, security):
self.reset(security)
algorithm.subscription_manager.remove_consolidator(security.symbol, security['consolidator'])
def should_create_new_target(self, symbol, direction):
quantity = self.algorithm.portfolio[symbol].quantity
return quantity == 0 or direction != int(np.sign(quantity))
def returns(self, security):
return pd.Series(
data = [x.value for x in security['window']],
index = [x.end_time for x in security['window']])[::-1]#region imports
from AlgorithmImports import *
#endregion
class SectorETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
def __init__(self, universe_settings: UniverseSettings = None) -> None:
# Select the tech sector ETF constituents to get correlated assets
symbol = Symbol.create("IYM", SecurityType.EQUITY, Market.USA)
super().__init__(symbol, universe_settings, self.etf_constituents_filter)
def etf_constituents_filter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
# Get the 10 securities with the largest weight in the index to reduce slippage and keep speed of the algorithm
selected = sorted([c for c in constituents if c.weight],
key=lambda c: c.weight, reverse=True)
return [c.symbol for c in selected[:10]]#region imports
from AlgorithmImports import *
#endregion
def reset_and_warm_up(algorithm, security, resolution, lookback = None):
indicator = security['logr']
consolidator = security['consolidator']
if not lookback:
lookback = indicator.warm_up_period
# historical request to update the consolidator that will warm up the indicator
history = algorithm.history[consolidator.input_type](security.symbol, lookback, resolution,
data_normalization_mode = DataNormalizationMode.SCALED_RAW)
indicator.reset()
# Replace the consolidator, since we cannot reset it
#
#
#
#
#
#
# Not ideal since we don't the consolidator type and period
algorithm.subscription_manager.remove_consolidator(security.symbol, consolidator)
consolidator = TradeBarConsolidator(timedelta(1))
algorithm.register_indicator(security.symbol, indicator, consolidator)
for bar in list(history)[:-1]:
consolidator.update(bar)
return consolidator
'''
# In main.py, OnData and call HandleCorporateActions for framework models (if necessary)
def on_data(self, slice):
if slice.splits or slice.dividends:
self.alpha.handle_corporate_actions(self, slice)
self.pcm.handle_corporate_actions(self, slice)
self.risk.handle_corporate_actions(self, slice)
self.execution.handle_corporate_actions(self, slice)
# In the framework models, add
from utils import ResetAndWarmUp
and implement HandleCorporateActions. E.g.:
def handle_corporate_actions(self, algorithm, slice):
for security.symbol, data in self.security.symbol_data.items():
if slice.splits.contains_key(security.symbol) or slice.dividends.contains_key(security.symbol):
data.warm_up_indicator()
where WarmUpIndicator will call ResetAndWarmUp for each indicator/consolidator pair
'''