Overall Statistics
Total Orders
421
Average Win
0.17%
Average Loss
-0.19%
Compounding Annual Return
1.867%
Drawdown
21.500%
Expectancy
0.237
Start Equity
100000.00
End Equity
115963.79
Net Profit
15.964%
Sharpe Ratio
-0.223
Sortino Ratio
-0.294
Probabilistic Sharpe Ratio
0.636%
Loss Rate
35%
Win Rate
65%
Profit-Loss Ratio
0.90
Alpha
-0.018
Beta
0.081
Annual Standard Deviation
0.053
Annual Variance
0.003
Information Ratio
-0.622
Tracking Error
0.15
Treynor Ratio
-0.146
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
XCUUSD 8I
Portfolio Turnover
0.38%
# region imports
from AlgorithmImports import *

import scipy.cluster.hierarchy as sch,random,numpy as np
from scipy.cluster.hierarchy import linkage
from scipy.spatial.distance import squareform
# endregion

class StrategicCryptoReserveAlgorithm(QCAlgorithm):

    def initialize(self) -> None:
        self.set_end_date(2025, 3, 1)
        self.set_start_date(self.end_date - timedelta(8*365))
        self._hrp = HeirarchicalRiskParity()
        self._lookback = timedelta(365)
        # Add Crypto universe.
        self._market = Market.COINBASE
        self._market_pairs = [
            x.key.symbol 
            for x in self.symbol_properties_database.get_symbol_properties_list(self._market) 
            if (x.value.quote_currency == self.account_currency and   # Account currency is USD
                x.value.market_ticker.split('-')[0] not in ['USDT', 'USDC'])  # Remove stable coins
        ]
        self.time_rules.set_default_time_zone(TimeZones.UTC)
        date_rule = self.date_rules.month_start()
        self.universe_settings.schedule.on(date_rule)
        self.universe_settings.resolution = Resolution.DAILY
        self._universe = self.add_universe(CryptoUniverse.coinbase(self._select_assets))

        # Add Forex pairs. 
        # EUR, GBP, and JPY currencies are Special Drawing Rights - financial reserve managed by the Treasury, used for monetary stability.
        # Source: https://home.treasury.gov/data/us-international-reserve-position/01312025
        self._symbols = [self.add_forex(t, Resolution.DAILY).symbol for t in ['EURUSD', 'GBPUSD', 'USDJPY']]

        # Add CFDs. 
        # Gold is a financial reserve managed by the Treasury, used for monetary stability.
        # Copper, Platinum, and Palladium are NDS materials, used for defense/tech.
        # Oil is a stockpile used for energy.
        self._symbols.extend([self.add_cfd(t, Resolution.DAILY).symbol for t in ['XAUUSD', 'XCUUSD', 'XPTUSD', 'XPDUSD', 'WTICOUSD']])

        # Schedule rebalances.
        self.schedule.on(self.date_rules.month_start(), self.time_rules.midnight, self._rebalance)

    def _select_assets(self, data):
        selected = [c for c in data if str(c.symbol.id).split()[0] in self._market_pairs]
        selected = [c.symbol for c in sorted(selected, key=lambda c: c.volume_in_usd)[-10:]]
        self.plot('Universe', 'Size', len(selected))
        return selected

    def _rebalance(self):
        if not self._universe.selected:
            return
        # Get daily returns of the securities.
        crypto_history = self.history(self._universe.selected, self._lookback, Resolution.DAILY).close.unstack(0)
        pct_nans = crypto_history.isna().sum() / crypto_history.shape[0]
        crypto_history = crypto_history[pct_nans[pct_nans < 0.5].index] # Only select Crypto's that have at least 50% non-NaN values
        
        cfd_forex_history = self.history(self._symbols, self._lookback, Resolution.DAILY).close.unstack(0)
        cfd_forex_history.index = pd.DatetimeIndex([t.date() + timedelta(1) for t in cfd_forex_history.index])
        cfd_forex_history['USDJPY'] = 1/cfd_forex_history['USDJPY']

        history = pd.concat([crypto_history, cfd_forex_history], axis=1).dropna()
        daily_returns = history.pct_change()[1:]

        # Calculate weights.
        weight_by_symbol = self._hrp.weights(daily_returns)
        weight_by_symbol['USDJPY'] *= -1 # Short this pair since JPY is the quote currency.

        # Rebalance with HRP.
        self.set_holdings([PortfolioTarget(symbol, 0.95*weight) for symbol, weight in weight_by_symbol.items()], True)


class HeirarchicalRiskParity:

    def weights(self, daily_returns):
        #1) Get covariance and correlation of daily returns.
        cov,corr=daily_returns.cov(),daily_returns.corr()
        ##3) cluster
        dist=self._correlDist(corr)
        link=sch.linkage(squareform(dist),'single')
        sortIx=self._getQuasiDiag(link)
        sortIx=corr.index[sortIx].tolist() # recover labels
        df0=corr.loc[sortIx,sortIx] # reorder
        #4) Capital allocation
        return self._getRecBipart(cov,sortIx)

    def _getIVP(self,cov,**kargs):
        # Compute the inverse-variance portfolio
        ivp=1./np.diag(cov)
        ivp/=ivp.sum()
        return ivp

    def _getClusterVar(self,cov,cItems):
        # Compute variance per cluster
        cov_=cov.loc[cItems,cItems] # matrix slice
        w_=self._getIVP(cov_).reshape(-1,1)
        cVar=np.dot(np.dot(w_.T,cov_),w_)[0,0]
        return cVar

    def _getQuasiDiag(self,link):
        # Sort clustered items by distance
        link=link.astype(int)
        sortIx=pd.Series([link[-1,0],link[-1,1]])
        numItems=link[-1,3] # number of original items
        while sortIx.max()>=numItems:
            sortIx.index=range(0,sortIx.shape[0]*2,2) # make space
            df0=sortIx[sortIx>=numItems] # find clusters
            i=df0.index;j=df0.values-numItems
            sortIx[i]=link[j,0] # item 1
            df0=pd.Series(link[j,1],index=i+1)
            sortIx=pd.concat([sortIx, df0]) # sortIx.append(df0) # item 2
            sortIx=sortIx.sort_index() # re-sort
            sortIx.index=range(sortIx.shape[0]) # re-index
        return sortIx.tolist()

    def _getRecBipart(self,cov,sortIx):
        # Compute HRP alloc
        w=pd.Series(1.0,index=sortIx)
        cItems=[sortIx] # initialize all items in one cluster
        while len(cItems)>0:
            cItems=[i[int(j):int(k)] for i in cItems for j,k in ((0,len(i)/2), (len(i)/2,len(i))) if len(i)>1] # bi-section
            for i in range(0,len(cItems),2): # parse in pairs
                cItems0=cItems[i] # cluster 1
                cItems1=cItems[i+1] # cluster 2
                cVar0=self._getClusterVar(cov,cItems0)
                cVar1=self._getClusterVar(cov,cItems1)
                alpha=1-cVar0/(cVar0+cVar1)
                w[cItems0]*=alpha # weight 1
                w[cItems1]*=1-alpha # weight 2
        return w
    
    def _correlDist(self,corr):
        # A distance matrix based on correlation, where 0<=d[i,j]<=1
        # This is a proper distance metric
        dist=((1-corr)/2.)**.5 # distance matrix
        return dist