| Overall Statistics |
|
Total Orders 977 Average Win 2.95% Average Loss -1.65% Compounding Annual Return 48.915% Drawdown 87.500% Expectancy 0.454 Start Equity 100000 End Equity 2422983.26 Net Profit 2322.983% Sharpe Ratio 0.922 Sortino Ratio 1.263 Probabilistic Sharpe Ratio 20.484% Loss Rate 48% Win Rate 52% Profit-Loss Ratio 1.79 Alpha 0.483 Beta 1.094 Annual Standard Deviation 0.62 Annual Variance 0.385 Information Ratio 0.821 Tracking Error 0.597 Treynor Ratio 0.523 Total Fees $0.00 Estimated Strategy Capacity $2600000.00 Lowest Capacity Asset JASMYUSD 2XR Portfolio Turnover 1.63% |
# region imports
from AlgorithmImports import *
import scipy.cluster.hierarchy as sch,random,numpy as np
from scipy.cluster.hierarchy import linkage
from scipy.spatial.distance import squareform
# endregion
class StrategicCryptoReserveAlgorithm(QCAlgorithm):
def initialize(self) -> None:
self.set_end_date(2025, 3, 1)
self.set_start_date(self.end_date - timedelta(8*365))
# Getting all the Crypto pairs on Coinbase that have a
# quote currency of USD and aren’t a stablecoin.
self._market_pairs = [
x.key.symbol
for x in self.symbol_properties_database.get_symbol_properties_list(Market.COINBASE)
if (x.value.quote_currency == self.account_currency and # Account currency is USD
x.value.market_ticker.split('-')[0] not in ['DAI', 'USDT', 'USDC']) # Remove stable coins
]
# Add a Crypto universe that updates at the start of each month.
self.time_rules.set_default_time_zone(TimeZones.UTC)
date_rule = self.date_rules.month_start()
self.universe_settings.schedule.on(date_rule)
self.universe_settings.resolution = Resolution.DAILY
self._universe_size = self.get_parameter('universe_size', 10)
self._universe = self.add_universe(CryptoUniverse.coinbase(self._select_assets))
# Schedule rebalances.
self.schedule.on(date_rule, self.time_rules.midnight, self._rebalance)
# Create the HRP.
self._hrp = HeirarchicalRiskParity(self, self.get_parameter('lookback_months', 12)*30)
def _select_assets(self, data):
selected = [c for c in data if str(c.symbol.id).split()[0] in self._market_pairs]
selected = [c.symbol for c in sorted(selected, key=lambda c: c.volume_in_usd)[-self._universe_size:]]
self.plot('Universe', 'Size', len(selected))
return selected
def _rebalance(self):
symbols = self._universe.selected
if not symbols:
return
self.set_holdings([PortfolioTarget(symbol, 0.9*weight) for symbol, weight in self._hrp.weights(symbols).items()], True)
class HeirarchicalRiskParity:
def __init__(self, algorithm, lookback=365):
self._algorithm = algorithm
self._lookback = lookback
def weights(self, symbols):
# Step 1) Cluster assets based on daily returns.
daily_returns = self._algorithm.history(symbols, self._lookback, Resolution.DAILY).close.unstack(0).pct_change()[1:]
cov, corr = daily_returns.cov(), daily_returns.corr()
distance = self._distance(corr)
link = sch.linkage(squareform(distance), 'single')
# Step 2) Quasi-diagonalization
sort_ix = self._quasi_diagonalization(link)
sort_ix = corr.index[sort_ix].tolist() # recover labels
# Step 3) Recursive bisection
return self._recursive_bisection(cov, sort_ix)
def _distance(self, corr):
# A distance matrix based on correlation, where 0<=d[i,j]<=1
# This is a proper distance metric
return ((1 - corr) / 2.0) ** 0.5 # distance matrix
def _quasi_diagonalization(self, link):
# Sort clustered items by distance
link = link.astype(int)
sort_ix = pd.Series([link[-1, 0], link[-1, 1]])
num_items = link[-1, 3] # number of original items
while sort_ix.max() >= num_items:
sort_ix.index = range(0, sort_ix.shape[0] * 2, 2) # make space
df0 = sort_ix[sort_ix >= num_items] # find clusters
i = df0.index
j = df0.values - num_items
sort_ix[i] = link[j, 0] # item 1
df0 = pd.Series(link[j, 1], index=i+1)
sort_ix = pd.concat([sort_ix, df0]) # item 2
sort_ix = sort_ix.sort_index() # re-sort
sort_ix.index = range(sort_ix.shape[0]) # re-index
return sort_ix.tolist()
def _recursive_bisection(self, cov, sort_ix):
# Compute HRP alloc
w = pd.Series(1.0, index=sort_ix)
cluster_items = [sort_ix] # initialize all items in one cluster
while len(cluster_items) > 0:
# Bi-section. Drop elements that one 1-element lists. For elements that are multi-element lists, split them into 2 lists.
bisected_cluster_items = []
for i in cluster_items:
if len(i) > 1:
for j, k in ((0, len(i) / 2), (len(i) / 2, len(i))):
bisected_cluster_items.append(i[int(j):int(k)])
cluster_items = bisected_cluster_items
for i in range(0, len(cluster_items), 2): # parse in pairs
cluster_items_0 = cluster_items[i] # cluster 1
cluster_items_1 = cluster_items[i+1] # cluster 2
c_var_0 = self._cluster_variance(cov, cluster_items_0)
c_var_1 = self._cluster_variance(cov, cluster_items_1)
alpha = 1 - c_var_0 / (c_var_0 + c_var_1)
w[cluster_items_0] *= alpha # weight 1
w[cluster_items_1] *= 1 - alpha # weight 2
return w
def _cluster_variance(self, cov, cluster_items):
# Compute variance per cluster
cluster_cov = cov.loc[cluster_items, cluster_items] # matrix slice
weights = self._inverse_variance_weights(cluster_cov).reshape(-1, 1)
return np.dot(np.dot(weights.T, cluster_cov), weights)[0, 0]
def _inverse_variance_weights(self, cov, **kargs):
# Compute the inverse-variance portfolio
inverse_var = 1 / np.diag(cov)
return inverse_var / inverse_var.sum()