Overall Statistics
Total Trades
10790
Average Win
0.06%
Average Loss
-0.04%
Compounding Annual Return
5.657%
Drawdown
10.700%
Expectancy
0.211
Net Profit
55.752%
Sharpe Ratio
0.608
Probabilistic Sharpe Ratio
8.426%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
1.55
Alpha
0.038
Beta
0.041
Annual Standard Deviation
0.068
Annual Variance
0.005
Information Ratio
-0.267
Tracking Error
0.163
Treynor Ratio
1.007
Total Fees
$462.86
Estimated Strategy Capacity
$230000.00
Lowest Capacity Asset
TAPA T62ATFS9H5R9
from AlgorithmImports import *
# endregion

class BettingAgainstCorrelationInSP500Stocks(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2015, 1, 1)
        self.SetCash(10000)
        
        self.volatility_period:int = 12*21
        self.correlation_period:int = 5*12*21
        self.quantile:int = 4
        self.portfolio_percentage:float = 1.
        
        self.prices:dict[Symbol, RollingWindow] = {}
        self.weights:dict = {}

        self.exchanges:list[str] = ['NYS']

        self.market_symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        self.prices[self.market_symbol] = RollingWindow[float](self.correlation_period)

        self.max_cap_weight:float = .1
        self.long_leg_corr_treshold:float = 0.
        self.long_leg_corr_substitute:float = .001

        self.coarse_count:int = 1000
        self.selection_flag:bool = False
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        self.SetSecurityInitializer(lambda security: security.SetMarketPrice(self.GetLastKnownPrice(security)))
        self.Schedule.On(self.DateRules.MonthStart(self.market_symbol), self.TimeRules.BeforeMarketClose(self.market_symbol, 0), self.Selection)
        self.SetWarmUp(self.correlation_period)

    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())
            #security.SetLeverage(10)

    def CoarseSelectionFunction(self, coarse):
        # update daily prices
        for equity in coarse:
            symbol:Symbol = equity.Symbol

            if symbol in self.prices:
                self.prices[symbol].Add(equity.AdjustedPrice)

        if not self.selection_flag:
            return Universe.Unchanged
        
        if self.coarse_count < 300:
            selected:list = sorted([x for x in coarse if x.HasFundamentalData and x.Market == 'usa'],
                    key=lambda x: x.DollarVolume, reverse=True)[:self.coarse_count]
        else:
            selected:list = [x for x in coarse if x.HasFundamentalData and x.Market == 'usa']

        # warm up stock prices
        for stock in selected:
            symbol:Symbol = stock.Symbol
            
            if symbol not in self.prices:
                self.prices[symbol] = RollingWindow[float](self.correlation_period)
                history = self.History(symbol, self.volatility_period, Resolution.Daily)
                if history.empty:
                    continue

                closes = history.loc[symbol].close

                for time, close in closes.iteritems():
                    self.prices[symbol].Add(close)

        # make sure SPY prices are ready
        if not self.prices[self.market_symbol].IsReady:
            return Universe.Unchanged

        return [symbol for symbol, prices in self.prices.items() if prices.IsReady]

    def FineSelectionFunction(self, fine):
        fine = [x for x in fine if x.SecurityReference.ExchangeId in self.exchanges]

        if len(fine) > self.coarse_count:
            sorted_by_market_cap = sorted(fine, key = lambda x: x.MarketCap, reverse=True)
            fine = sorted_by_market_cap[:self.coarse_count]

        volatility:dict[Symbol, list[float, list]] = {}
        stocks_returns:dict[Symbol, np.ndarray] = {}

        # calculate volatility and store daily returns
        for stock in fine:
            symbol:Symbol = stock.Symbol

            prices:np.ndarray = np.array([x for x in self.prices[symbol]])
            returns:np.ndarray = prices[:-1] / prices[1:] - 1
            vol_value:float = np.std(returns[:self.volatility_period])

            volatility[symbol] = vol_value
            stocks_returns[symbol] = returns

        # make sure enough stocks has volatility value
        if len(volatility) < self.quantile:
            return Universe.Unchanged

        quantile:int = int(len(volatility) / self.quantile)
        sorted_by_vol:list[Symbol] = [x[0] for x in sorted(volatility.items(), key=lambda item: item[1])]

        market_prices:np.ndarray = np.array([x for x in self.prices[self.market_symbol]])
        market_returns:np.ndarray = market_prices[:-1] / market_prices[1:] - 1

        # create long and short portfolio part
        for i in range(self.quantile):
            long_leg:list[tuple[Symbol, float]] = []
            short_leg:list[tuple[Symbol, float]] = []

            total_long_corr:float = 0
            total_short_corr:float = 0

            correlation:dict[Symbol, float] = {}
            curr_quantile_stocks:list[Symbol] = sorted_by_vol[i * quantile : (i + 1) * quantile]

            for symbol in curr_quantile_stocks:
                stock_returns:np.ndarray = stocks_returns[symbol]
                correlation_matrix:np.ndarray = np.corrcoef(stock_returns, market_returns)
                corr_value:float = correlation_matrix[0][1]
                correlation[symbol] = corr_value

            corr_median:float = np.median(list(correlation.values()))

            for symbol, corr_value in correlation.items():
                # within each quartile we go long (short) low (high) correlation stocks using the median as a threshold
                if corr_value >= corr_median:
                    short_leg.append((symbol, corr_value))
                    total_short_corr += abs(corr_value)
                else:
                    if corr_value < self.long_leg_corr_treshold:
                        corr_value = self.long_leg_corr_substitute

                    long_leg.append((symbol, corr_value))
                    total_long_corr += 1 / abs(corr_value)

            # weights calculations
            for symbol, corr_value in long_leg:
                w:float = ((1 / corr_value) / total_long_corr) * (1 / self.quantile) * self.portfolio_percentage
                w = min(self.max_cap_weight, w) # weight cap
                self.weights[symbol] = w
                
            for symbol, corr_value in short_leg:
                w:float = (corr_value / total_short_corr) * (1 / self.quantile) * self.portfolio_percentage
                w = min(self.max_cap_weight, w) # weight cap
                self.weights[symbol] = -w

        return list(self.weights.keys())
        
    def OnData(self, data):
        # rebalance monthly
        if not self.selection_flag:
            return
        self.selection_flag = False
        
        # trade execution
        invested = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in invested:
            if symbol not in self.weights:
                self.Liquidate(symbol)
                
        for symbol, w in self.weights.items():
            if self.Securities[symbol].IsTradable and self.Securities[symbol].Price != 0:
                self.SetHoldings(symbol, w)
                
        self.weights.clear()
        
    def Selection(self):
        self.selection_flag = True
        
# Custom fee model
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.001
        return OrderFee(CashAmount(fee, "USD"))