| Overall Statistics |
|
Total Trades 133 Average Win 3.81% Average Loss -2.56% Compounding Annual Return 25.785% Drawdown 38.600% Expectancy 0.303 Net Profit 28.324% Sharpe Ratio 0.674 Probabilistic Sharpe Ratio 29.582% Loss Rate 48% Win Rate 52% Profit-Loss Ratio 1.49 Alpha 0.181 Beta 0.474 Annual Standard Deviation 0.588 Annual Variance 0.346 Information Ratio -0.098 Tracking Error 0.607 Treynor Ratio 0.837 Total Fees $5684.81 Estimated Strategy Capacity $16000000.00 Lowest Capacity Asset DOTUSDT 18N |
###################################################
#
# Smart Rolling window
# ========================
# Convenience object to build on RollingWindow functionality
#
# Methods:
# -------------------------
# mySmartWindow.IsRising()
# mySmartWindow.IsFalling()
# mySmartWindow.crossedAboveValue(value)
# mySmartWindow.crossedBelowValue(value)
# mySmartWindow.crossedAbove(otherWindow)
# mySmartWindow.crossedBelow(otherWindow)
# mySmartWindow.IsFlat(decimalPrecision)
# mySmartWindow.hasAtLeastThisMany(value)
#
#
# Author:ekz
###################################################
class SmartRollingWindow():
def __init__(self, windowType, windowLength):
self.window = None
self.winLength = windowLength
if (windowType is "int"):self.window = RollingWindow[int](windowLength)
elif (windowType is "bool"):self.window = RollingWindow[bool](windowLength)
elif (windowType is "float"):self.window = RollingWindow[float](windowLength)
elif (windowType is "TradeBar"):self.window = RollingWindow[TradeBar](windowLength)
def crossedAboveValue(self, value):return (self.window[1] <= value < self.window[0])
def crossedBelowValue(self, value): return (self.window[1] >= value > self.window[0])
def crossedAbove(self, series): return (self.window[1] <= series[1] and self.window[0] > series[0])
def crossedBelow(self, series): return (self.window[1] >= series[1] and self.window[0] < series[0])
def isAbove(self, series): return (self.window[0] > series[0])
def isBelow(self, series): return (self.window[0] < series[0])
def isFlat(self): return (self.window[1] == self.window[0])
def isFalling(self): return (self.window[1] > self.window[0])
def isRising(self): return (self.window[1] < self.window[0])
def Add(self,value):
self.window.Add(value)
def IsReady(self):
return (self.window is not None) and \
(self.window.Count >= self.winLength) ## TODO: just use rw.IsReady?
def __getitem__(self, index):
return self.window[index]##########################################################################
# Inspired by @nitay-rabinovich at QuqntConnect
# https://www.quantconnect.com/forum/discussion/12768/share-kalman-filter-crossovers-for-crypto-and-smart-rollingwindows/p1/comment-38144
##########################################################################
#
# EMA Crossover In a Crypto Universe
# ---------------------------------------------
# FOR EDUCATIONAL PURPOSES ONLY. DO NOT DEPLOY.
#
#
# Entry:
# -------
# Minimum volume threshold traded
# and
# Price > Fast Daily EMA
# and
# Fast Daily EMA > Slow Daily EMA
#
# Exit:
# ------
# Price < Slow Daily EMA
# or
# Slow Daily EMA < Fast Daily EMA
#
# Additional Consideration:
# --------------------------
# Max exposure pct: Total % of available capital to trade with at any time
# Max holdings: Total # of positions that can be held simultaneously
# Rebalance Weekly: If false, only rebalance when we add/remove positions
# UseMomWeight: If true, rebalance w/momentum-based weights (top gainers=more weight)
#
#########################################################################
import datetime
import os
import traceback
import typing
from io import StringIO
import QuantConnect
import requests
from AlgorithmImports import *
import operator
import math
from SmartRollingWindow import *
class EMACrossoverUniverse(QCAlgorithm):
##
def Initialize(self):
self.InitAlgoParams()
self.InitAssets()
self.InitUniverse()
self.InitBacktestParams()
self.ScheduleRoutines()
try:
data = StringIO(self.Download(
"https://api.onedrive.com/v1.0/shares/u!aHR0cHM6Ly8xZHJ2Lm1zL3UvcyFBdDhVc2x1c0MxYkxnYVFrUGlob0tZSWVMRnhxeVE_ZT02V0t0d0U/root/content"))
self.df = pd.read_csv(data, delimiter=",")
self.df = self.df.set_index(pd.DatetimeIndex(self.df[self.df.columns[0]], name="date")).drop(self.df.columns[0], axis=1)
except Exception:
ex = traceback.format_exc
print(ex)
def SelectTopMarketCapCoins(self, date, n):
return self.df.loc[date].nlargest(n).keys().tolist()
## Set backtest params: dates, cash, etc. Called from Initialize().
## ----------------------------------------------------------------
def InitBacktestParams(self):
self.SetStartDate(2021, 1, 1)
self.SetEndDate(2022, 1, 31)
self.SetCash(100000)
self.SetBenchmark(Symbol.Create("BTCUSDT", SecurityType.Crypto, Market.Binance))
def InitUniverse(self):
self.UniverseSettings.Resolution = Resolution.Daily
self.symDataDict = { }
self.UniverseTickers = ["SOLUSDT", "ETHUSDT", "BNBUSDT", "ADAUSDT", "BTCUSDT"]
## More test tickers
##0z
# self.UniverseTickers = ["ANTUSDT","BATUSDT","BNBUSDT","BNTUSDT",
# "BTCUSDT", "BTGUSDT",
# "DAIUSDT","DASHUSDT","DGBUSDT",
# "EOSUSDT","ETCUSDT",
# "ETHUSDT","FUNUSDT",
# "IOTAUSDT","KNCUSDT","LRCUSDT",
# "LTCUSDT","MKRUSDT",
# "NEOUSDT","OMGUSDT",
# "PNTUSDT","QTUMUSDT","REQUSDT",
# "STORJUSDT","TRXUSDT","UTKUSDT","VETUSDT",
# "XLMUSDT","XMRUSDT",
# "XRPUSDT","XTZUSDT","XVGUSDT","ZECUSDT",
# "ZILUSDT","ZRXUSDT"]
universeSymbols = []
self.SetUniverseSelection(ScheduledUniverseSelectionModel(
self.DateRules.WeekStart(),
self.TimeRules.At(0, 0),
self.SelectSymbols
))
def SelectSymbols(self, dateTime):
symbols = []
top = self.SelectTopMarketCapCoins(dateTime.date(), 5)
self.Debug(f"Selected {top} at {dateTime.date()}")
for t in top:
if t in ["USDT", "INNBCL", "BSV", "HEX"]:
continue
symbols.append(Symbol.Create(t+"USDT", SecurityType.Crypto, Market.Binance))
self.UniverseTickers = symbols
return symbols
# --------------------
def InitAlgoParams(self):
self.emaSlowPeriod = int(self.GetParameter('emaSlowPeriod'))
self.emaFastPeriod = int(self.GetParameter('emaFastPeriod'))
self.mompPeriod = int(self.GetParameter('mompPeriod')) # used for momentum based weight
self.minimumVolPeriod = int(self.GetParameter('minimumVolPeriod')) # used for volume threshold
self.warmupPeriod = max(self.emaSlowPeriod, self.mompPeriod, self.minimumVolPeriod)
self.useMomWeight = (int(self.GetParameter("useMomWeight")) == 1)
self.maxExposurePct = float(self.GetParameter("maxExposurePct"))/100
self.rebalanceWeekly = (int(self.GetParameter("rebalanceWeekly")) == 1)
self.minimumVolume = int(self.GetParameter("minimumVolume"))
self.maxHoldings = int(self.GetParameter("maxHoldings"))
self.minAmountToBeInvested = int(self.GetParameter("minAmountToBeInvested"))
## Experimental:
## self.maxSecurityDrawDown = float(self.GetParameter("maxSecurityDrawDown"))
# --------------------
def InitAssets(self):
self.symbol = "BTCUSDT"
self.SetBrokerageModel(BrokerageName.Binance, AccountType.Cash)
self.SetAccountCurrency("USDT")
self.AddCrypto(self.symbol, Resolution.Daily)
self.EnableAutomaticIndicatorWarmUp = True
self.SetWarmUp(timedelta(self.warmupPeriod))
self.SelectedSymbolsAndWeights = {}
## Experimental:
## self.AddRiskManagement(MaximumUnrealizedProfitPercentPerSecurity(self.maxSecurityDrawDown))
## Schedule routines
## ------------------------
def ScheduleRoutines(self):
## TODO:
## Check if rebalancing has happened in the last 7 days,
## If it has, do not rebalance again
if(self.rebalanceWeekly):
self.Schedule.On( self.DateRules.WeekStart(self.symbol),
self.TimeRules.AfterMarketOpen(self.symbol, 31),
self.RebalanceHoldings )
##
## Check if we are already holding the max # of open positions.
## TODO:
## When we start using limit orders, include pending holdings
## ------------------------------------------------------------
@property
def PortfolioAtCapacity(self):
numHoldings = len([x.Key for x in self.Portfolio if self.IsInvested(x.Key)])
return numHoldings >= self.maxHoldings
## TODO:
## Test logic below for pending holdings
# pendingOrders = len( [x for x in self.Transactions.GetOpenOrders()
# if x.Direction == OrderDirection.Buy
# and x.Type == OrderType.Limit ] )
## Check for signals
## ------------------------------------------------
def OnData(self, dataSlice):
## loop through the symbols in the slice
for symbol in dataSlice.Keys:
## if we have this symbol in our data dictioary
if symbol in self.symDataDict:
symbolData = self.symDataDict[symbol]
## Update the symbol with the data slice data
symbolData.OnSymbolData(self.Securities[symbol].Price, dataSlice[symbol])
## If we're invested in this symbol, manage any open positions
if self.IsInvested(symbol):
symbolData.ManageOpenPositions()
## otherwise, if we're not invested, check for entry signal
else:
## First check if we are at capacity for new positions.
##
## TODO:
## For Go-Live, note that the portfolio capacity may not be accurate while
## checking it inside this for-loop. It will be accurate after the positions
## have been open. IE: When the orders are actually filled.
if(not self.PortfolioAtCapacity):
if( symbolData.EntrySignalFired() ):
self.OpenNewPosition(symbolData.symbol)
## TODO:
## For Go-Live, call OnNewPositionOpened only after
## the order is actually filled
symbolData.OnNewPositionOpened()
## Logic to rebalance our portfolio of holdings.
## We will either rebalance with equal weighting,
## or assign weights based on momentum.
##
## TODO:
## Check if rebalancing has happened in the last 7 days,
## If it has, do not rebalance again
## -----------------------------------------------------
def RebalanceHoldings(self, rebalanceCurrHoldings=False):
# try:
if self.useMomWeight:
momentumSum = sum(self.symDataDict[symbol].momp.Current.Value for symbol in self.SelectedSymbolsAndWeights)
if (momentumSum <= 0):
self.useMomWeight = False
# symbolsAndActualWeights = {k: v for k, v in }
for symbol in self.SelectedSymbolsAndWeights:
if self.useMomWeight:
symbolWeight = round((self.symDataDict[symbol].momp.Current.Value / momentumSum),4)
else:
symbolWeight = round(1/len(self.SelectedSymbolsAndWeights),4)
self.SetWeightedHolding(symbol,symbolWeight)
sortedSymbolsAndWeights = {k: v for k, v in sorted(self.SelectedSymbolsAndWeights.items(), key=lambda item: item[1], reverse=True)}
for symbol in sortedSymbolsAndWeights:
self.SetSymbolHoldings(symbol)
def SetSymbolHoldings(self, symbol):
adjustedWeight = self.SelectedSymbolsAndWeights[symbol]
cash = self.Portfolio.TotalPortfolioValue - self.Portfolio.TotalHoldingsValue
percent = adjustedWeight * self.maxExposurePct
cost = self.Portfolio.TotalPortfolioValue * percent
orderMsg = f"{symbol} | alloc. ({round(adjustedWeight*100,2)}% adjusted) "
if (cost > cash):
percent = self.GetTruncatedValue(cash / self.Portfolio.TotalPortfolioValue, 3)
if(self.Portfolio[symbol].Invested):
orderMsg = f"[Re-Balancing] {orderMsg}"
else:
orderMsg = f"[NEW Addition] {orderMsg}"
self.SetHoldings(symbol, percent, tag=orderMsg)
## Allocate the specified weight (pct) of the portfolio value to
## the specified symbol. This weight will first be adjusted to consider
## cost basis, whether the position is already open and has profit.
## We are doing this to solve the problem where re-balancing causes winners
## to reduce in position size.
## --------–--------–--------–--------–--------–--------–--------–--------–
def SetWeightedHolding(self,symbol,symbolWeight):
## Calculate the basis (the denominator) for rebalancing weights
## This is the sum of costs basis, plus uninvested cash
if( self.Portfolio.Invested ):
# numHoldings = len([x.Key for x in self.Portfolio if x.Value.Invested])
totalCostBasis = sum( [x.Value.HoldingsCost for x in self.Portfolio if x.Value.Invested] )
else:
totalCostBasis = 0.0
## it's okay if this includes cash reserved for pending orders
## because we have alread considered those orders in the symbolsAndWeights list
cashAvailable = self.Portfolio.TotalPortfolioValue - self.Portfolio.TotalHoldingsValue
# cashAvailable = self.Portfolio.CashBook["USDT"].Amount
weightingBasis = totalCostBasis + cashAvailable
amtToInvest = weightingBasis * symbolWeight
## if already invested, our adjusted weight needs to account for
## the profits gained, so we adjust the 'amt to invest' based on
## unrealized profit pct of the position.
if(self.Portfolio[symbol].Invested):
profitPct = self.Portfolio[symbol].UnrealizedProfitPercent
adjustedAmtToInvest = amtToInvest * (1 + profitPct)
adjustedWeight = adjustedAmtToInvest / self.Portfolio.TotalPortfolioValue
else:
adjustedWeight = amtToInvest / self.Portfolio.TotalPortfolioValue
symbolWeight = self.GetTruncatedValue(symbolWeight,3)
adjustedWeight = self.GetTruncatedValue(adjustedWeight,3)
self.SelectedSymbolsAndWeights[symbol] = adjustedWeight
# ## TODO: Calculate order qty instead of using % setholdings
# ## https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/BasicTemplateCryptoAlgorithm.py
# orderMsg = f"{symbol} | {round(symbolWeight*100,2)}% alloc. ({round(adjustedWeight*100,2)}% adjusted) "
# if(self.Portfolio[symbol].Invested):
# orderMsg = f"[Re-Balancing] {orderMsg}"
# else:
# orderMsg = f"[NEW Addition] {orderMsg}"
# self.SetHoldings(symbol, adjustedWeight * self.maxExposurePct, tag=orderMsg)
## Adding the symbol to our dictionary will ensure
## that it gets processed in the rebalancing routine
## -------------------------------------------------
def OpenNewPosition(self, symbol):
self.SelectedSymbolsAndWeights[symbol] = 0
self.RebalanceHoldings()
## Removing the symbol from our dictionary will ensure
## that it wont get processed in the rebalancing routine
## -----------------------------------------------------
def ExitPosition(self, symbol, exitMsg=""):
profitPct = round(self.Securities[symbol].Holdings.UnrealizedProfitPercent,2)
self.Liquidate(symbol, tag=f"SELL {symbol.Value} ({profitPct}% profit) [{exitMsg}]")
self.SelectedSymbolsAndWeights.pop(symbol)
## TODO:
## Before go-live, wait until liquidation has happened before rebalancing
## Perhaps Call RebalanceHoldings after an order event has occured.
self.RebalanceHoldings()
return
## Create new symboldata object and add to our dictionary
## ------------------------------------------------------
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
if( symbol in self.UniverseTickers and \
symbol not in self.symDataDict.keys()):
self.symDataDict[symbol] = SymbolData(symbol, self)
def GetTruncatedValue(self, value, decPlaces):
truncFactor = 10.0 ** decPlaces
return math.trunc(value * truncFactor) / truncFactor
def IsInvested(self, symbol):
return self.Portfolio[symbol].Invested and self.Portfolio[symbol].Quantity * self.Securities[symbol].Price > self.minAmountToBeInvested
##################################
# SymbolData Class
##################################
class SymbolData():
def __init__(self, theSymbol, algo):
## Algo / Symbol / Price reference
self.algo = algo
self.symbol = theSymbol
self.lastPrice = 0
self.price = 0
## Initialize indicators
self.InitIndicators()
## ----------------------------------------
def InitIndicators(self):
self.indicators = { 'EMA_FAST' : self.algo.EMA(self.symbol,self.algo.emaFastPeriod,Resolution.Daily),
'EMA_SLOW' : self.algo.EMA(self.symbol,self.algo.emaSlowPeriod,Resolution.Daily),
'30DAY_VOL' : IndicatorExtensions.Times( self.algo.SMA(self.symbol,self.algo.minimumVolPeriod, Resolution.Daily, Field.Volume),
self.algo.SMA(self.symbol,self.algo.minimumVolPeriod, Resolution.Daily, Field.Close)),
'MOMP' : self.algo.MOMP(self.symbol,self.algo.mompPeriod,Resolution.Daily)}
## for easy reference from main algo
self.momp = self.indicators['MOMP']
for key, indicator in self.indicators.items():
self.algo.WarmUpIndicator(self.symbol, indicator, Resolution.Minute)
self.emaFastWindow = SmartRollingWindow("float", 2)
self.emaSlowWindow = SmartRollingWindow("float", 2)
self.lastPriceWindow = SmartRollingWindow("float", 2)
## ----------------------------------------
def OnSymbolData(self, lastKnownPrice, tradeBar):
self.lastPrice = lastKnownPrice
self.UpdateRollingWindows()
self.PlotCharts()
## ----------------------------------------
def UpdateRollingWindows(self):
self.emaFastWindow.Add(self.indicators['EMA_FAST'].Current.Value)
self.emaSlowWindow.Add(self.indicators['EMA_SLOW'].Current.Value)
self.lastPriceWindow.Add(self.lastPrice)
## ----------------------------------------
def IsReady(self):
return (self.indicators['EMA_FAST'].IsReady and self.indicators['EMA_SLOW'].IsReady \
and self.indicators['30DAY_VOL'].IsReady)
## ----------------------------------------
def MinimumVolTraded(self):
if( self.indicators['30DAY_VOL'].IsReady ):
dollarVolume = self.indicators['30DAY_VOL'].Current.Value
if( dollarVolume >= self.algo.minimumVolume ):
return True
return False
## ----------------------------------------
def EntrySignalFired(self):
if( self.IsReady() ):
if( self.MinimumVolTraded() ):
if( self.emaFastWindow.isAbove(self.emaSlowWindow) and \
self.lastPriceWindow.isAbove(self.emaFastWindow) ):
return True
return False
## ----------------------------------------
def ExitSignalFired(self):
if( self.IsReady() ):
if ( self.lastPriceWindow.isBelow(self.emaSlowWindow) or \
self.emaSlowWindow.isAbove(self.emaFastWindow) ):
return True
return False
## Logic to run immediately after a new position is opened.
## ---------------------------------------------------------
def OnNewPositionOpened(self):
# self.algo.Log(f"[BOUGHT {self.symbol.Value}] @ ${self.lastPrice:.2f}")
return
## Manage open positions if any. ie: close them, update stops, add to them, etc
## Called periodically, eg: from a scheduled routine
##
## TODO:
## Consilder also liquidating if volume or liquidity thresholds arent met
## -----------------------------------------------------------------------------
def ManageOpenPositions(self):
## if( not self.MinimumVolTraded() ):
## self.ExitPosition(exitMsg="Trading volume below threshold")
if(self.ExitSignalFired()):
self.ExitPosition(exitMsg="Exit Signal Fired")
# ----------------------------------------
def ExitPosition(self, exitMsg):
# self.algo.Log(f"[SELL {self.symbol.Value}] @ ${self.lastPrice:.2f}")
self.algo.ExitPosition(self.symbol, exitMsg)
# ----------------------------------------
def PlotCharts(self):
## To Plot charts, comment out the below
# self.algo.Plot(f"{self.symbol}-charts", "Price", self.lastPriceWindow[0])
# self.algo.Plot(f"{self.symbol}-charts", "EMA Fast", self.indicators['EMA_FAST'].Current.Value)
# self.algo.Plot(f"{self.symbol}-charts", "EMA Slow", self.indicators['EMA_SLOW'].Current.Value)
return
# Your New Python File