Overall Statistics
Total Trades
19
Average Win
5.45%
Average Loss
-0.88%
Compounding Annual Return
14.813%
Drawdown
23.200%
Expectancy
3.778
Net Profit
33.720%
Sharpe Ratio
0.726
Probabilistic Sharpe Ratio
28.590%
Loss Rate
33%
Win Rate
67%
Profit-Loss Ratio
6.17
Alpha
0.083
Beta
0.026
Annual Standard Deviation
0.156
Annual Variance
0.024
Information Ratio
-1.687
Tracking Error
0.63
Treynor Ratio
4.345
Total Fees
$953.66
Estimated Strategy Capacity
$1100000.00
Lowest Capacity Asset
BNBUSDT 18N
###################################################
#
#  Smart Rolling window
#  ========================
#  Convenience object to build on RollingWindow functionality
#
#  Methods:
#  -------------------------
#  mySmartWindow.IsRising()
#  mySmartWindow.IsFalling()
#  mySmartWindow.crossedAboveValue(value)
#  mySmartWindow.crossedBelowValue(value)
#  mySmartWindow.crossedAbove(otherWindow)
#  mySmartWindow.crossedBelow(otherWindow)
#  mySmartWindow.IsFlat(decimalPrecision)
#  mySmartWindow.hasAtLeastThisMany(value)
#
#
#  Author:ekz
###################################################

class SmartRollingWindow():
    
    def __init__(self, windowType, windowLength):
        self.window    = None
        self.winLength = windowLength

        if (windowType is "int"):self.window = RollingWindow[int](windowLength)
        elif (windowType is "bool"):self.window = RollingWindow[bool](windowLength)
        elif (windowType is "float"):self.window = RollingWindow[float](windowLength)
        elif (windowType is "TradeBar"):self.window = RollingWindow[TradeBar](windowLength)

    def crossedAboveValue(self, value):return (self.window[1] <= value < self.window[0])
    def crossedBelowValue(self, value): return (self.window[1] >= value > self.window[0])

    def crossedAbove(self, series): return (self.window[1] <= series[1] and self.window[0] > series[0])
    def crossedBelow(self, series): return (self.window[1] >= series[1] and self.window[0] < series[0])

    def isAbove(self, series): return (self.window[0] > series[0])
    def isBelow(self, series): return (self.window[0] < series[0])
    
    def isFlat(self):    return (self.window[1] == self.window[0])
    def isFalling(self): return (self.window[1] > self.window[0])
    def isRising(self):  return (self.window[1] < self.window[0])

    def Add(self,value): 
        self.window.Add(value)

    def IsReady(self):
        return (self.window is not None) and \
               (self.window.Count >= self.winLength) ## TODO: just use rw.IsReady?
    
    def __getitem__(self, index):
        return self.window[index]
##########################################################################
# Inspired by @nitay-rabinovich at QuqntConnect
# https://www.quantconnect.com/forum/discussion/12768/share-kalman-filter-crossovers-for-crypto-and-smart-rollingwindows/p1/comment-38144
##########################################################################
#
# EMA Crossover In a Crypto Universe 
# ---------------------------------------------
# FOR EDUCATIONAL PURPOSES ONLY. DO NOT DEPLOY.
#
#
# Entry:
# -------
# Minimum volume threshold traded 
#         and
# Price > Fast Daily EMA 
#         and
# Fast Daily EMA > Slow Daily EMA
#
# Exit:
# ------
# Price < Slow Daily EMA 
#       or 
# Slow Daily EMA < Fast Daily EMA
#
# Additional Consideration:
# --------------------------
# Max exposure pct: Total % of available capital to trade with at any time
# Max holdings: Total # of positions that can be held simultaneously
# Rebalance Weekly: If false,  only rebalance when we add/remove positions  
# UseMomWeight: If true, rebalance w/momentum-based weights (top gainers=more weight)
#
#########################################################################
from AlgorithmImports import *
import operator
import math
from SmartRollingWindow import *
class EMACrossoverUniverse(QCAlgorithm):
    
    ## 
    def Initialize(self):
        self.InitAlgoParams()
        self.InitAssets()
        self.InitUniverse()
        self.InitBacktestParams()        
        self.ScheduleRoutines()

    ## Set backtest params: dates, cash, etc. Called from Initialize().
    ## ----------------------------------------------------------------
    def InitBacktestParams(self):
        self.SetStartDate(2020, 1, 1)
        # self.SetEndDate(2019, 2, 1)  
        self.SetCash(100000)
        self.SetBenchmark(Symbol.Create("BTCUSDT", SecurityType.Crypto, Market.Binance))

    
    def InitUniverse(self):
        self.UniverseSettings.Resolution = Resolution.Daily
        self.symDataDict = { }

        self.UniverseTickers = ["ETHUSDT", "BNBUSDT", "ADAUSDT", "BTCUSDT"]
        # self.UniverseTickers = ["SOLUSDT", "ETHUSDT", "BNBUSDT", "ADAUSDT", "BTCUSDT"]

        ## More test tickers
        ##0z
        # self.UniverseTickers = ["ANTUSDT","BATUSDT","BNBUSDT","BNTUSDT",
        #                         "BTCUSDT", "BTGUSDT",
        #                         "DAIUSDT","DASHUSDT","DGBUSDT",
        #                         "EOSUSDT","ETCUSDT",
        #                         "ETHUSDT","FUNUSDT",
        #                         "IOTAUSDT","KNCUSDT","LRCUSDT",
        #                         "LTCUSDT","MKRUSDT",
        #                         "NEOUSDT","OMGUSDT",
        #                         "PNTUSDT","QTUMUSDT","REQUSDT",
        #                         "STORJUSDT","TRXUSDT","UTKUSDT","VETUSDT",
        #                         "XLMUSDT","XMRUSDT",
        #                         "XRPUSDT","XTZUSDT","XVGUSDT","ZECUSDT",
        #                         "ZILUSDT","ZRXUSDT"]
        
        universeSymbols = []
        for symbol in self.UniverseTickers:

            universeSymbols.append(Symbol.Create(symbol, SecurityType.Crypto, Market.Binance))
            
        self.SetUniverseSelection(ManualUniverseSelectionModel(universeSymbols))


    # --------------------
    def InitAlgoParams(self):
        
        self.emaSlowPeriod    = int(self.GetParameter('emaSlowPeriod'))
        self.emaFastPeriod    = int(self.GetParameter('emaFastPeriod'))
        self.mompPeriod       = int(self.GetParameter('mompPeriod'))       # used for momentum based weight
        self.minimumVolPeriod = int(self.GetParameter('minimumVolPeriod')) # used for volume threshold
        self.warmupPeriod     = max(self.emaSlowPeriod, self.mompPeriod, self.minimumVolPeriod)
        
        self.useMomWeight     = (int(self.GetParameter("useMomWeight")) == 1)                 
        self.maxExposurePct   = float(self.GetParameter("maxExposurePct"))/100
        self.rebalanceWeekly  = (int(self.GetParameter("rebalanceWeekly")) == 1)

        self.minimumVolume    = int(self.GetParameter("minimumVolume")) 
        self.maxHoldings      = int(self.GetParameter("maxHoldings")) 
        
        ## Experimental:
        ## self.maxSecurityDrawDown  = float(self.GetParameter("maxSecurityDrawDown"))
    
    # --------------------
    def InitAssets(self):    
        self.symbol = "BTCUSDT" 
        self.SetBrokerageModel(BrokerageName.Binance, AccountType.Cash)
        self.SetAccountCurrency("USDT")
        self.AddCrypto(self.symbol, Resolution.Daily)

        self.EnableAutomaticIndicatorWarmUp = True
        self.SetWarmUp(timedelta(self.warmupPeriod))
        self.SelectedSymbolsAndWeights = {}
        
        ## Experimental:
        ## self.AddRiskManagement(MaximumUnrealizedProfitPercentPerSecurity(self.maxSecurityDrawDown))
    
    ## Schedule routines
    ## ------------------------
    def ScheduleRoutines(self):
        
        ## TODO: 
        ## Check if rebalancing has happened in the last 7 days, 
        ## If it has, do not rebalance again 
        if(self.rebalanceWeekly):
            self.Schedule.On( self.DateRules.WeekStart(self.symbol),
                              self.TimeRules.AfterMarketOpen(self.symbol, 31),
                              self.RebalanceHoldings )

    ##
    ## Check if we are already holding the max # of open positions.
    ## TODO:
    ## When we start using limit orders, include pending holdings
    ## ------------------------------------------------------------
    @property
    def PortfolioAtCapacity(self):
        numHoldings = len([x.Key for x in self.Portfolio if x.Value.Invested])
        return numHoldings >= self.maxHoldings
    
    ## TODO:
    ## Test logic below for pending holdings
    # pendingOrders  = len( [x for x in self.Transactions.GetOpenOrders()
    #                         if x.Direction == OrderDirection.Buy
    #                             and x.Type == OrderType.Limit ] )
    
                             
    ## Check for  signals
    ## ------------------------------------------------
    def OnData(self, dataSlice):

        ## loop through the symbols in the slice
        for symbol in dataSlice.Keys:

            ## if we have this symbol in our data dictioary
            if symbol in self.symDataDict:

                symbolData = self.symDataDict[symbol] 

                ## Update the symbol with the data slice data
                symbolData.OnSymbolData(self.Securities[symbol].Price, dataSlice[symbol])
                
                ## If we're invested in this symbol, manage any open positions
                if(self.Portfolio[symbolData.symbol.Value].Invested):
                    symbolData.ManageOpenPositions()
                
                ## otherwise, if we're not invested, check for entry signal
                else:

                    ## First check if we are at capacity for new positions. 
                    ##
                    ## TODO: 
                    ## For Go-Live, note that the portfolio capacity may not be accurate while
                    ## checking it inside this for-loop. It will be accurate after the positions 
                    ## have been open. IE: When the orders are actually filled.                    
                    if(not self.PortfolioAtCapacity):            
                        if( symbolData.EntrySignalFired() ):
                            self.OpenNewPosition(symbolData.symbol)
                    
                            ## TODO: 
                            ## For Go-Live, call OnNewPositionOpened only after 
                            ## the order is actually filled
                            symbolData.OnNewPositionOpened()
      
    ## Logic to rebalance our portfolio of holdings. 
    ## We will either rebalance with equal weighting, 
    ## or assign weights based on momentum. 
    ##
    ## TODO: 
    ## Check if rebalancing has happened in the last 7 days, 
    ## If it has, do not rebalance again    
    ## -----------------------------------------------------
    def RebalanceHoldings(self, rebalanceCurrHoldings=False):
        # try:
            
        if self.useMomWeight:
            momentumSum = sum(self.symDataDict[symbol].momp.Current.Value for symbol in self.SelectedSymbolsAndWeights)
            if (momentumSum == 0):
                self.useMomWeight = False
            
        # symbolsAndActualWeights = {k: v for k, v in }
        for symbol in self.SelectedSymbolsAndWeights:
            if self.useMomWeight:
                symbolWeight = round((self.symDataDict[symbol].momp.Current.Value / momentumSum),4) 
            else:
                symbolWeight = round(1/len(self.SelectedSymbolsAndWeights),4)
            
            self.SetWeightedHolding(symbol,symbolWeight)

        sortedSymbolsAndWeights = {k: v for k, v in sorted(self.SelectedSymbolsAndWeights.items(), key=lambda item: item[1], reverse=True)};
        for symbol in sortedSymbolsAndWeights:
            self.SetSymbolHoldings(symbol)
    
    def SetSymbolHoldings(self, symbol):
        adjustedWeight = self.SelectedSymbolsAndWeights[symbol]
        cash = self.Portfolio.TotalPortfolioValue - self.Portfolio.TotalHoldingsValue
        percent = adjustedWeight * self.maxExposurePct
        cost = self.Portfolio.TotalPortfolioValue * percent
        orderMsg = f"{symbol} | alloc. ({round(adjustedWeight*100,2)}% adjusted) "
        if (cost > cash):
            percent = self.GetTruncatedValue(cash / self.Portfolio.TotalPortfolioValue, 3)
        if(self.Portfolio[symbol].Invested): 
            orderMsg = f"[Re-Balancing] {orderMsg}"
        else:
            orderMsg = f"[NEW Addition] {orderMsg}"
            
        self.SetHoldings(symbol, percent, tag=orderMsg)   

    ## Allocate the specified weight (pct) of the portfolio value to 
    ## the specified symbol. This weight will first be adjusted to consider
    ## cost basis, whether the position is already open and has profit.
    ## We are doing this to solve the problem where re-balancing causes winners
    ## to reduce in position size.
    ## --------–--------–--------–--------–--------–--------–--------–--------–
    def SetWeightedHolding(self,symbol,symbolWeight):

        ## Calculate the basis (the denominator) for rebalancing weights
        ## This is the sum of costs basis, plus uninvested cash 

        if( self.Portfolio.Invested ):
            # numHoldings = len([x.Key for x in self.Portfolio if x.Value.Invested])
            totalCostBasis = sum( [x.Value.HoldingsCost for x in self.Portfolio if x.Value.Invested] )
        else: 
            totalCostBasis = 0.0             

        ## it's okay if this includes cash reserved for pending orders
        ## because we have alread considered those orders in the symbolsAndWeights list
        cashAvailable = self.Portfolio.TotalPortfolioValue - self.Portfolio.TotalHoldingsValue
        # cashAvailable  = self.Portfolio.CashBook["USDT"].Amount
        weightingBasis = totalCostBasis + cashAvailable
        amtToInvest    = weightingBasis * symbolWeight 

        ## if already invested, our adjusted weight needs to account for
        ## the profits gained, so we adjust the 'amt to invest' based on 
        ## unrealized profit pct of the position.  

        if(self.Portfolio[symbol].Invested):
            profitPct           = self.Portfolio[symbol].UnrealizedProfitPercent
            adjustedAmtToInvest = amtToInvest * (1 + profitPct)
            adjustedWeight      = adjustedAmtToInvest / self.Portfolio.TotalPortfolioValue
            
        else:
            adjustedWeight = amtToInvest / self.Portfolio.TotalPortfolioValue
            
        symbolWeight   = self.GetTruncatedValue(symbolWeight,3)
        adjustedWeight = self.GetTruncatedValue(adjustedWeight,3)

        self.SelectedSymbolsAndWeights[symbol] = adjustedWeight

        # ## TODO: Calculate order qty instead of using % setholdings
        # ## https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/BasicTemplateCryptoAlgorithm.py
        # orderMsg = f"{symbol} | {round(symbolWeight*100,2)}% alloc. ({round(adjustedWeight*100,2)}% adjusted) "
        
        # if(self.Portfolio[symbol].Invested): 
        #     orderMsg = f"[Re-Balancing] {orderMsg}"
        # else:
        #     orderMsg = f"[NEW Addition] {orderMsg}"
            
        # self.SetHoldings(symbol, adjustedWeight * self.maxExposurePct, tag=orderMsg)            



    ## Adding the symbol to our dictionary will ensure
    ## that it gets processed in the rebalancing routine
    ## -------------------------------------------------
    def OpenNewPosition(self, symbol):
        self.SelectedSymbolsAndWeights[symbol] = 0
        self.RebalanceHoldings()    
                    
    ## Removing the symbol from our dictionary will ensure
    ## that it wont get processed in the rebalancing routine
    ## -----------------------------------------------------
    def ExitPosition(self, symbol, exitMsg=""):
        profitPct   = round(self.Securities[symbol].Holdings.UnrealizedProfitPercent,2)
        self.Liquidate(symbol, tag=f"SELL {symbol.Value} ({profitPct}% profit) [{exitMsg}]")
        if(symbol in self.SelectedSymbolsAndWeights.keys()):
            self.SelectedSymbolsAndWeights.pop(symbol)
        
        ## TODO:
        ## Before go-live, wait until liquidation has happened before rebalancing
        ## Perhaps Call RebalanceHoldings after an order event has occured. 
        self.RebalanceHoldings()
        return

            
    ## Create new symboldata object and add to our dictionary
    ## ------------------------------------------------------
    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            symbol  = security.Symbol
            if( symbol in self.UniverseTickers and \
                symbol not in self.symDataDict.keys()):
                self.symDataDict[symbol] = SymbolData(symbol, self) 

    def GetTruncatedValue(self, value, decPlaces):
        truncFactor  = 10.0 ** decPlaces
        return math.trunc(value * truncFactor) / truncFactor
    
##################################
# SymbolData Class
##################################
class SymbolData():
    
    def __init__(self, theSymbol, algo):

        ## Algo / Symbol / Price reference
        self.algo       = algo
        self.symbol     = theSymbol
        self.lastPrice  = 0
        self.price      = 0

        ## Initialize indicators
        self.InitIndicators()
            
            
    ## ----------------------------------------
    def InitIndicators(self):

        self.indicators = { 'EMA_FAST'  : self.algo.EMA(self.symbol,self.algo.emaFastPeriod,Resolution.Daily),
                            'EMA_SLOW'  : self.algo.EMA(self.symbol,self.algo.emaSlowPeriod,Resolution.Daily),
                            '30DAY_VOL' : IndicatorExtensions.Times( self.algo.SMA(self.symbol,self.algo.minimumVolPeriod, Resolution.Daily, Field.Volume), 
                                                                     self.algo.SMA(self.symbol,self.algo.minimumVolPeriod, Resolution.Daily, Field.Close)),
                            'MOMP'      : self.algo.MOMP(self.symbol,self.algo.mompPeriod,Resolution.Daily)}
        
        ## for easy reference from main algo
        self.momp = self.indicators['MOMP']
                            
        for key, indicator in self.indicators.items():
            self.algo.WarmUpIndicator(self.symbol, indicator, Resolution.Minute)

        self.emaFastWindow  = SmartRollingWindow("float", 2)
        self.emaSlowWindow  = SmartRollingWindow("float", 2)
        self.lastPriceWindow    = SmartRollingWindow("float", 2)
        
    ## ----------------------------------------
    def OnSymbolData(self, lastKnownPrice, tradeBar):
        self.lastPrice = lastKnownPrice
        self.UpdateRollingWindows()
        self.PlotCharts()
        
    ## ----------------------------------------
    def UpdateRollingWindows(self):
        self.emaFastWindow.Add(self.indicators['EMA_FAST'].Current.Value)
        self.emaSlowWindow.Add(self.indicators['EMA_SLOW'].Current.Value)
        self.lastPriceWindow.Add(self.lastPrice)
        
    ## ----------------------------------------
    def IsReady(self):
        return (self.indicators['EMA_FAST'].IsReady and self.indicators['EMA_SLOW'].IsReady \
            and self.indicators['30DAY_VOL'].IsReady)

    ## ----------------------------------------
    def MinimumVolTraded(self):
        if( self.indicators['30DAY_VOL'].IsReady ):
            dollarVolume = self.indicators['30DAY_VOL'].Current.Value
            if( dollarVolume >= self.algo.minimumVolume ):
                return True
                
        return False
        
    ## ----------------------------------------
    def EntrySignalFired(self):
        if( self.IsReady() ):
            if( self.MinimumVolTraded() ):
                if( self.emaFastWindow.isAbove(self.emaSlowWindow) and \
                    self.lastPriceWindow.isAbove(self.emaFastWindow) ):
                    return True
        
        return False

    ## ----------------------------------------
    def ExitSignalFired(self):
        if( self.IsReady() ):
            if ( self.lastPriceWindow.isBelow(self.emaSlowWindow) or \
                 self.emaSlowWindow.isAbove(self.emaFastWindow) ):
                return True
        
        return False

    ## Logic to run immediately after a new position is opened.
    ## ---------------------------------------------------------
    def OnNewPositionOpened(self):            
        # self.algo.Log(f"[BOUGHT {self.symbol.Value}] @ ${self.lastPrice:.2f}")    
        return
    
    ## Manage open positions if any. ie: close them, update stops, add to them, etc
    ## Called periodically, eg: from a scheduled routine
    ##
    ## TODO:
    ## Consilder also liquidating if volume or liquidity thresholds arent met
    ## -----------------------------------------------------------------------------
    def ManageOpenPositions(self):

        ## if( not self.MinimumVolTraded() ):
        ##     self.ExitPosition(exitMsg="Trading volume below threshold")
        
        if(self.ExitSignalFired()):        
            self.ExitPosition(exitMsg="Exit Signal Fired")
        
        
    # ----------------------------------------
    def ExitPosition(self, exitMsg): 
        # self.algo.Log(f"[SELL {self.symbol.Value}] @ ${self.lastPrice:.2f}") 
        self.algo.ExitPosition(self.symbol, exitMsg)
        


    # ----------------------------------------
    def PlotCharts(self):    
    
        ## To Plot charts, comment out the below
        # self.algo.Plot(f"{self.symbol}-charts", "Price", self.lastPriceWindow[0])        
        # self.algo.Plot(f"{self.symbol}-charts", "EMA Fast", self.indicators['EMA_FAST'].Current.Value)            
        # self.algo.Plot(f"{self.symbol}-charts", "EMA Slow", self.indicators['EMA_SLOW'].Current.Value)            
        
        return