Overall Statistics
Total Orders
940
Average Win
2.84%
Average Loss
-1.68%
Compounding Annual Return
50.130%
Drawdown
87.100%
Expectancy
0.435
Start Equity
100000
End Equity
2389392.48
Net Profit
2289.392%
Sharpe Ratio
0.938
Sortino Ratio
1.282
Probabilistic Sharpe Ratio
21.645%
Loss Rate
47%
Win Rate
53%
Profit-Loss Ratio
1.69
Alpha
0.492
Beta
1.101
Annual Standard Deviation
0.623
Annual Variance
0.388
Information Ratio
0.835
Tracking Error
0.599
Treynor Ratio
0.531
Total Fees
$0.00
Estimated Strategy Capacity
$510000.00
Lowest Capacity Asset
SEIUSD 2XR
Portfolio Turnover
1.59%
# region imports

from AlgorithmImports import *

import scipy.cluster.hierarchy as sch, random, numpy as np

from scipy.spatial.distance import squareform

# endregion


class StrategicCryptoReserveAlgorithm(QCAlgorithm):

    

    def initialize(self) -> None:

        self.set_end_date(2025, 3, 1)

        self.set_start_date(self.end_date - timedelta(8*365))

        # self.account_currency = 'USD'

        

        # Filter Crypto Pairs on Coinbase (Excluding Stablecoins)

        self._market_pairs = [

    x.key.symbol 

    for x in self.symbol_properties_database.get_symbol_properties_list(Market.COINBASE) 

    if (x.value.quote_currency == self.account_currency and   # Account currency is USD

        x.value.market_ticker.split('-')[0] not in ['DAI', 'USDT', 'USDC'])  # Remove stablecoins

]

        

        # Universe and Scheduling

        self.time_rules.set_default_time_zone(TimeZones.UTC)

        date_rule = self.date_rules.month_start()

        self.universe_settings.schedule.on(date_rule)

        self.universe_settings.resolution = Resolution.DAILY

        self._universe = self.add_universe(CryptoUniverse.coinbase(self._select_assets))

        

        # Scheduled Rebalancing

        self.schedule.on(date_rule, self.time_rules.midnight, self._rebalance)

        

        # HRP Model with Lookback for Sortino & Momentum

        self._hrp = HierarchicalRiskParity(self, self.get_parameter('lookback_months', 12)*30)

        

    def _select_assets(self, data):

        selected = [c for c in data if str(c.symbol.id).split()[0] in self._market_pairs]

        selected = [c.symbol for c in sorted(selected, key=lambda c: c.volume_in_usd)[-10:]]

        self.plot('Universe', 'Size', len(selected))

        return selected

    

    def _rebalance(self):

        symbols = self._universe.selected

        if not symbols:

            return


        trailing_sortino = self._hrp.calculate_sortino(symbols)

        high_momentum_assets = self._hrp.identify_momentum(symbols)

        weights = self._hrp.weights(symbols, trailing_sortino, high_momentum_assets)


        # Log weight changes

        for symbol, weight in weights.items():

            self.plot("HRP Weights", str(symbol), weight)


        self.set_holdings([PortfolioTarget(symbol, 0.9 * weight) for symbol, weight in weights.items()], True)

    

    def on_order_event(self, order_event):

        if order_event.status == OrderStatus.FILLED:

            self.log(f"Filled {order_event.symbol} at {order_event.fill_price}")


class HierarchicalRiskParity:

    

    def __init__(self, algorithm, lookback=365):

        self._algorithm = algorithm

        self._lookback = lookback

    

    def weights(self, symbols, sortino_scores, momentum_assets):

        daily_returns = self._algorithm.history(symbols, self._lookback, Resolution.DAILY).close.unstack(0).pct_change()[1:]

        cov, corr = daily_returns.cov(), daily_returns.corr()

        distance = self._distance(corr)

        link = sch.linkage(squareform(distance), 'single')

        

        sort_ix = self._quasi_diagonalization(link)

        sort_ix = corr.index[sort_ix].tolist()

        

        base_weights = self._recursive_bisection(cov, sort_ix)

        

        # Adjust Weights Based on Sortino & Momentum

        for asset in sort_ix:

            if asset in sortino_scores:

                base_weights[asset] *= (1 + sortino_scores[asset])

            if asset in momentum_assets:

                base_weights[asset] *= 1.1

        

        return base_weights / base_weights.sum()

    

    def _distance(self, corr):

        return ((1 - corr) / 2.0) ** 0.5

    

    def _quasi_diagonalization(self, link):

        link = link.astype(int)

        sort_ix = pd.Series([link[-1, 0], link[-1, 1]])

        num_items = link[-1, 3]

        

        while sort_ix.max() >= num_items:

            sort_ix.index = range(0, sort_ix.shape[0] * 2, 2)

            df0 = sort_ix[sort_ix >= num_items]

            i, j = df0.index, df0.values - num_items

            sort_ix[i] = link[j, 0]

            df0 = pd.Series(link[j, 1], index=i + 1)

            sort_ix = pd.concat([sort_ix, df0]).sort_index()

            sort_ix.index = range(sort_ix.shape[0])

        

        return sort_ix.tolist()

    

    def _recursive_bisection(self, cov, sort_ix):

        w = pd.Series(1.0, index=sort_ix)

        cluster_items = [sort_ix]

        

        while len(cluster_items) > 0:

            bisected_cluster_items = []

            for i in cluster_items:

                if len(i) > 1:

                    bisected_cluster_items.extend([i[:len(i) // 2], i[len(i) // 2:]])

            cluster_items = bisected_cluster_items

            

            for i in range(0, len(cluster_items), 2):

                cluster_items_0, cluster_items_1 = cluster_items[i], cluster_items[i + 1]

                c_var_0, c_var_1 = self._cluster_variance(cov, cluster_items_0), self._cluster_variance(cov, cluster_items_1)

                alpha = 1 - c_var_0 / (c_var_0 + c_var_1)

                w[cluster_items_0] *= alpha

                w[cluster_items_1] *= 1 - alpha

        

        return w

    

    def calculate_sortino(self, symbols):

        sortino_scores = {}

        daily_returns = self._algorithm.history(symbols, self._lookback, Resolution.DAILY).close.unstack(0).pct_change()[1:]

        

        for symbol in symbols:

            returns = daily_returns[symbol]

            downside_deviation = np.std(returns[returns < 0])

            if downside_deviation > 0:

                sortino_scores[symbol] = (returns.mean() / downside_deviation)

            else:

                sortino_scores[symbol] = 0

        

        return sortino_scores

    

    def identify_momentum(self, symbols):

        high_momentum_assets = []

        daily_returns = self._algorithm.history(symbols, 90, Resolution.DAILY).close.unstack(0).pct_change()[1:]

        

        for symbol in symbols:

            avg_return = daily_returns[symbol].rolling(30).mean().iloc[-1]

            if avg_return > 0.02:

                high_momentum_assets.append(symbol)

        

        return high_momentum_assets

    def _cluster_variance(self, cov, cluster_items):

        # Compute variance per cluster

        cluster_cov = cov.loc[cluster_items, cluster_items]  # Extract covariance matrix for the cluster

        weights = self._inverse_variance_weights(cluster_cov).reshape(-1, 1)

        return np.dot(np.dot(weights.T, cluster_cov), weights)[0, 0]

    def _inverse_variance_weights(self, cov):

        # Compute the inverse-variance portfolio

        inverse_var = 1 / np.diag(cov)  # Inverse of variances

        return inverse_var / inverse_var.sum()  # Normalize