Overall Statistics
Total Trades
417
Average Win
4.98%
Average Loss
-2.65%
Compounding Annual Return
-10.980%
Drawdown
88.000%
Expectancy
0.154
Net Profit
-37.222%
Sharpe Ratio
0.233
Probabilistic Sharpe Ratio
1.040%
Loss Rate
60%
Win Rate
40%
Profit-Loss Ratio
1.88
Alpha
0.288
Beta
-0.257
Annual Standard Deviation
1.088
Annual Variance
1.183
Information Ratio
0.107
Tracking Error
1.108
Treynor Ratio
-0.984
Total Fees
$19143.23
Estimated Strategy Capacity
$160000.00
Lowest Capacity Asset
AAC VUE7AQ4QYD0L
#region imports
from AlgorithmImports import *
#endregion
# https://quantpedia.com/Screener/Details/77
from QuantConnect.Data.UniverseSelection import *
from QuantConnect.Python import PythonData
from collections import deque
from datetime import datetime
import math
import numpy as np
import pandas as pd
import scipy as sp
from decimal import Decimal


class BetaFactorInStocks(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018, 1, 1)   
        self.SetEndDate(2022, 1, 1)         
        self.SetCash(1000000)            

        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction)
        self.AddEquity("SPY", Resolution.Daily)

        # Add Wilshire 5000 Total Market Index data from Dropbox 
        self.price5000 = self.AddData(Fred, Fred.Wilshire.Price5000, Resolution.Daily).Symbol

        # Setup a RollingWindow to hold market return
        self.market_return = RollingWindow[float](252)
        # Use a ROC indicator to convert market price index into return, and save it to the RollingWindow
        self.roc = self.ROC(self.price5000, 1)
        self.roc.Updated += lambda sender, updated: self.market_return.Add(updated.Value)
        # Warm up
        hist = self.History(self.price5000, 253, Resolution.Daily)
        for point in hist.itertuples():
            self.roc.Update(point.Index[1], point.value)

        self.data = {}
        self.monthly_rebalance = False
        self.long = None
        self.short = None
            
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), self.rebalance)

    def CoarseSelectionFunction(self, coarse):
        for c in coarse:
            if c.Symbol not in self.data:
                self.data[c.Symbol] = SymbolData(c.Symbol)
            self.data[c.Symbol].Update(c.EndTime, c.AdjustedPrice)

        if self.monthly_rebalance:
            filtered_data = {symbol: data for symbol, data in self.data.items() if data.last_price > 5 and data.IsReady()}

            if len(filtered_data) > 10:
                # sort the dictionary in ascending order by beta value
                sorted_beta = sorted(filtered_data, key = lambda x: filtered_data[x].beta(self.market_return))
                self.long = sorted_beta[:5]
                self.short = sorted_beta[-5:]
                return self.long + self.short

            else: 
                self.monthly_rebalance = False
                return []

        else:
            return []

    def rebalance(self):
        self.monthly_rebalance = True

    def OnData(self, data):
        if not self.monthly_rebalance: return 
        
        # Liquidate symbols not in the universe anymore
        for symbol in self.Portfolio.Keys:
            if self.Portfolio[symbol].Invested and symbol not in self.long + self.short:
                self.Liquidate(symbol)

        if self.long is None or self.short is None: return
                
        long_scale_factor = 0.5/sum(range(1,len(self.long)+1))
        for rank, symbol in enumerate(self.long):    
            self.SetHoldings(symbol, (len(self.long)-rank+1)*long_scale_factor)
        
        short_scale_factor = 0.5/sum(range(1,len(self.long)+1))
        for rank, symbol in enumerate(self.short):    
            self.SetHoldings(symbol, -(rank+1)*short_scale_factor)
            
        self.monthly_rebalance = False
        self.long = None
        self.short = None


class SymbolData:
    def __init__(self, symbol):
        self.Symbol = symbol
        self.last_price = 0
        self.returns = RollingWindow[float](252)
        self.roc = RateOfChange(1)
        self.roc.Updated += lambda sender, updated: self.returns.Add(updated.Value)
        
    def Update(self, time, price):
        if price != 0:
            self.last_price = price
            self.roc.Update(time, price)
    
    def IsReady(self):
        return self.roc.IsReady and self.returns.IsReady
    
    def beta(self, market_ret):
        asset_return = np.array(list(self.returns), dtype=np.float32)
        market_return = np.array(list(market_ret), dtype=np.float32)
        return np.cov(asset_return, market_return)[0][1]/np.var(market_return)