Overall Statistics
Total Trades
1792
Average Win
0.26%
Average Loss
-0.06%
Compounding Annual Return
14.198%
Drawdown
53.600%
Expectancy
2.997
Net Profit
459.963%
Sharpe Ratio
0.895
Probabilistic Sharpe Ratio
26.734%
Loss Rate
21%
Win Rate
79%
Profit-Loss Ratio
4.07
Alpha
0.131
Beta
-0.044
Annual Standard Deviation
0.141
Annual Variance
0.02
Information Ratio
0.122
Tracking Error
0.241
Treynor Ratio
-2.886
Total Fees
$2005.75
from datetime import timedelta
import numpy as np
from scipy import stats
from collections import deque

class ModulatedMultidimensionalReplicator(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2008, 1, 1)     
        #self.SetEndDate(2020, 1, 1)        
        self.SetCash(100000)

        tickers = ["QQQ","FDN","TLH","TLT"]

        self.SetUniverseSelection(CustomUniverseSelectionModel("CustomUniverseSelectionModel", lambda time: tickers))    
        self.UniverseSettings.Resolution = Resolution.Daily
        
        self.AddAlpha(MOMAlphaModel())
        
        self.Settings.RebalancePortfolioOnInsightChanges = False          
        self.Settings.RebalancePortfolioOnSecurityChanges = False
        
        self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(self.DateRules.Every(DayOfWeek.Monday)))
        self.SetExecution(ImmediateExecutionModel()) 
  
        
class MOMAlphaModel(AlphaModel): 
    def __init__(self):
        
        self.indi = {} 
        self.indi_Update = {}
    
        self.wind=200
        self.num=3
        
        self.securities = [] 
        
    def OnSecuritiesChanged(self, algorithm, changes):
        
        for security in changes.AddedSecurities:
            self.securities.append(security)
            symbol = security.Symbol
            
            self.indi[symbol] = My_Custom('My_Custom', symbol, self.wind)
            algorithm.RegisterIndicator(symbol, self.indi[symbol], Resolution.Daily)
        
            history = algorithm.History(symbol, self.wind, Resolution.Daily)
            self.indi[symbol].Warmup(history)

          
    def Update(self, algorithm, data):
        insights = []
        
        #### filter out low correlation
        # filter top X    
        sort = sorted(self.indi.items(), key=lambda x: x[1].Corr, reverse=True)[:self.num*2] 
        filter1 = [x[0] for x in sort ]
        
        # filter treshold
        #filter1 = [x[0] for x in self.indi.items() if (self.indi[x[0]].Corr < 0.5) ]
        
        #### filter out slope == 0
        filter2 = [x[0] for x in self.indi.items() if (self.indi[x[0]].Slope > 0) ]

        for security in self.securities:
            symbol = security.Symbol
            if symbol in filter1 and symbol in filter2:
                self.indi_Update[symbol] = self.indi[symbol] 
        
        #if self.indicator == True: 
        ordered = sorted(self.indi_Update.items(), key=lambda x: x[1].Slope, reverse=True)[:self.num]        
        for x in ordered:
            symbol = x[0]
            insights.append( Insight.Price(symbol, timedelta(1), InsightDirection.Up) ) 
        
        # for testing
        # algorithm.Plot("Custom_Slope", "Value", list(self.indi.values())[0].Slope *10000)

        return insights
        
        
# Python implementation of Custom Indicator
class My_Custom:
    def __init__(self, name, symbol, period):
        self.symbol = symbol
        self.Name = name
        self.Time = datetime.min
        self.Value = 0
        self.Slope = 0
        self.Corr = 0

        self.queue = deque(maxlen=period)
        self.IsReady = False

    # Update method is mandatory
    def Update(self, input):
        return self.Update2(input.Time, input.Close)
    
    def Update2(self, time, value):
        self.queue.appendleft(value)
        count = len(self.queue)
        self.Time = time
        
        self.IsReady = count == self.queue.maxlen
        
        #### start here the indicator calulation
        if self.IsReady:    
            y = np.log(self.queue)
            x = [range(len(y))]
            slope, corr = stats.linregress(x, y)[0], stats.linregress(x, y)[2]
            self.Slope = slope # value is very small an will display 0 if not multiplyed
            self.Corr = corr  # value is very small an will display 0 if not multiplyed
            self.Value = slope / corr 
        #### finish the custom indicator
        
        # for testing self.IsReady = False
        self.IsReady = False
        
        return self.IsReady 
        
        
    def Warmup(self,history):
        for index, row in history.loc[self.symbol].iterrows():
            self.Update2(index, row['close'])