| Overall Statistics |
|
Total Orders 478 Average Win 9.18% Average Loss -4.86% Compounding Annual Return 80.850% Drawdown 84.300% Expectancy 0.471 Start Equity 10000000.00 End Equity 446838017.52 Net Profit 4368.380% Sharpe Ratio 1.228 Sortino Ratio 1.877 Probabilistic Sharpe Ratio 38.760% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.89 Alpha 0.699 Beta 1.5 Annual Standard Deviation 0.686 Annual Variance 0.47 Information Ratio 1.163 Tracking Error 0.642 Treynor Ratio 0.561 Total Fees $4305381.02 Estimated Strategy Capacity $18000000.00 Lowest Capacity Asset NKD YT87FWY1TBLT Portfolio Turnover 24.26% |
# region imports
from AlgorithmImports import *
from itertools import combinations
import statsmodels.api as sm
from scipy.stats import norm
# endregion
class EquityIndexFuturesBasePairsAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2019, 1, 1)
self.set_end_date(2025, 5, 28)
self.set_cash(10_000_000)
self.settings.automatic_indicator_warm_up = True
self._month = 0
# Define some parameters.
self._selectivity_level = self.get_parameter('selectivity_level', 0.5) # 0.05 = 5%
self._minimum_observations = 24 # months
self._maximum_observations = 10*12 # months
# Add some Equity Index Futures.
self._futures = []
tickers = [
Futures.Indices.EURO_STOXX_50,
Futures.Indices.NIKKEI_225_DOLLAR,
Futures.Indices.NASDAQ_100_E_MINI,
Futures.Indices.RUSSELL_2000_E_MINI,
Futures.Indices.SP_500_E_MINI
]
for ticker in tickers:
# Add the Future contracts and define the continuous contract settings.
future = self.add_future(
ticker,
data_mapping_mode=DataMappingMode.FIRST_DAY_MONTH,
data_normalization_mode=DataNormalizationMode.BACKWARDS_PANAMA_CANAL,
contract_depth_offset=0
)
# Select the 2 front contracts.
future.set_filter(lambda universe: universe.contracts(lambda symbols: sorted(symbols, key=lambda s: s.id.date)[:2]))
# Create a Series to hold the monthly returns.
future.returns = pd.Series()
roc = RateOfChange(1)
self.consolidate(future.symbol, Calendar.MONTHLY, lambda bar, roc=roc: roc.update(bar.end_time, bar.close))
roc.updated += lambda indicator, indicator_data_point, future=future: self._update_returns_history(indicator, indicator_data_point, future)
# Create a DataFrame to hold the factor history.
future.factors = pd.DataFrame()
# Define the momentum factor: Average trailing 12-month return (using monthly returns)
momentum = IndicatorExtensions.sma(roc, 12)
momentum.updated += lambda indicator, indicator_data_point, future=future: self._update_factor_history(indicator, indicator_data_point, future)
# Create an indicator we'll need to calculate the value factor (5-year mean price).
future.mean_price = self.sma(future.symbol, 5*252, Resolution.DAILY)
# Save a reference to the Future for later.
self._futures.append(future)
# Warm up the factor and label history.
self.set_warm_up(timedelta(400 + 24*31))
def _update_returns_history(self, indicator, indicator_data_point, future):
if indicator.is_ready:
future.returns.loc[indicator_data_point.end_time] = indicator_data_point.value
def _update_factor_history(self, indicator, indicator_data_point, future):
t = indicator_data_point.end_time
# Momentum
if indicator.is_ready:
future.factors.loc[t, 'momentum'] = indicator_data_point.value
# Value
if future.mean_price.is_ready:
future.factors.loc[t, 'value'] = 1 - future.price / future.mean_price.current.value
# Carry
if future.symbol not in self.current_slice.future_chains:
return
chain = self.current_slice.future_chains[future.symbol]
if len(list(chain.contracts)) < 2:
return
front_contract, next_contract = sorted(chain, key=lambda c: c.expiry)
carry = front_contract.last_price - next_contract.last_price
months_between_contracts = round((next_contract.expiry - front_contract.expiry).days / 30)
expiry_difference_in_years = abs(months_between_contracts) / 12
annualized_carry = carry / expiry_difference_in_years
future.factors.loc[t, 'carry'] = annualized_carry
def on_data(self, data: Slice):
# Rebalance at the start of each month, when all of the markets are open.
if (self.is_warming_up or
self._month == self.time.month or
not all([self.is_market_open(future.symbol) and future.symbol in data.future_chains for future in self._futures])):
return
self._month = self.time.month
# Calculate the weight of each Future.
weight_by_symbol = self._create_weights()
# Re-scale weights to keep unit gross exposure.
abs_weight_sum = sum([abs(x) for x in weight_by_symbol.values()])
if not abs_weight_sum:
return
weight_by_symbol = {s: w/abs_weight_sum for s, w in weight_by_symbol.items()}
# Rebalance the portfolio.
self.set_holdings([PortfolioTarget(symbol, weight/2) for symbol, weight in weight_by_symbol.items()], True)
def _create_weights(self):
# Calculate the conditional expected return of the base pairs for each factor.
thetas_by_factor = {}
for future_i, future_j in combinations(self._futures, 2):
# Analyze each factor independently.
for factor in set(future_i.factors.columns) & set(future_j.factors.columns):
# Fit the base pairs models.
x_i = future_i.factors[factor].dropna()
x_j = future_j.factors[factor].dropna()
if len(x_i) < self._minimum_observations or len(x_j) < self._minimum_observations:
continue
r_i = future_i.returns.shift(-1)[:-1] # `shift` so that it represents forward return
r_j = future_j.returns.shift(-1)[:-1]
idx = sorted(list(set(r_i.index) & set(r_j.index) & set(x_i.index) & set(x_j.index)))[-self._maximum_observations:]
X = sm.add_constant(pd.concat([x_i[idx], x_j[idx]], axis=1))
mu_i, b_ii, b_ji = sm.OLS(r_i[idx], X).fit().params
mu_j, b_ij, b_jj = sm.OLS(r_j[idx], X).fit().params
# Decompose theta into the three components.
if x_i[-1] == x_j[-1]:
continue
m_i, m_j = x_i.mean(), x_j.mean()
s_i, s_j = x_i.std(), x_j.std()
p_ij = x_i.corr(x_j)
s_ij = (s_i**2 + s_j**2 - (2*s_i*s_j*p_ij))**(1/2)
y_ij = -((m_i - m_j)/s_ij)
cdf = norm.cdf(y_ij)
pdf = norm.pdf(y_ij)
oa_factor = ((b_ii*s_i**2 + b_jj*s_j**2 - (b_ii+b_jj)*s_i*s_j*p_ij) / s_ij)
ca_factor = ((b_ij*s_i**2 + b_ji*s_j**2 - (b_ij+b_ji)*s_i*s_j*p_ij) / s_ij)
if x_i[-1] > x_j[-1]:
ue = mu_i - mu_j # Unexplained effect of asset means
oa = b_ii*m_i - b_jj*m_j + (pdf / (1-cdf)) * oa_factor # Own-asset effect
ca = -(b_ij*m_i - b_ji*m_j + (pdf / (1-cdf)) * ca_factor) # Cross-asset effect
pair = (future_i, future_j)
else:
ue = mu_j - mu_i # Unexplained effect of asset means
oa = b_jj*m_j - b_ii*m_i + (pdf / cdf) * oa_factor # Own-asset effect
ca = -(b_ji*m_j - b_ij*m_i + (pdf / cdf) * ca_factor) # Cross-asset effect
pair = (future_j, future_i)
if factor not in thetas_by_factor:
thetas_by_factor[factor] = {}
thetas_by_factor[factor][pair] = ue + oa + ca
# Create a dictionary to track the weights.
weight_by_symbol = {future.mapped: 0 for future in self._futures}
for theta_by_pair in thetas_by_factor.values():
# Select the top s% of the base pairs.
selected_pairs = [kvp[0] for kvp in sorted(theta_by_pair.items(), key=lambda kvp: kvp[1], reverse=True)[:int(self._selectivity_level*len(theta_by_pair))]]
# Calculate portfolio weights.
n = 2*len(selected_pairs)
denominator = n**2 - int(n % 2 != 0) # `n % 2 != 0` checks if n is odd
if not denominator:
continue
a = (2*(n-1))/denominator
h = 4/denominator
for i, pair in enumerate(selected_pairs):
weight = (a - i*h) / len(thetas_by_factor) # Divide to give equal weight to each factor.
weight_by_symbol[pair[0].mapped] += weight # Long the Future with the larger signal.
weight_by_symbol[pair[1].mapped] -= weight # Short the Future with the smaller signal.
return weight_by_symbol