| Overall Statistics |
|
Total Trades 19996 Average Win 0.44% Average Loss -0.32% Compounding Annual Return 99.395% Drawdown 38.300% Expectancy 0.167 Net Profit 21459.215% Sharpe Ratio 2.08 Probabilistic Sharpe Ratio 97.424% Loss Rate 51% Win Rate 49% Profit-Loss Ratio 1.39 Alpha 0.801 Beta -1.26 Annual Standard Deviation 0.339 Annual Variance 0.115 Information Ratio 1.416 Tracking Error 0.445 Treynor Ratio -0.56 Total Fees $68169.01 Estimated Strategy Capacity $610000.00 Lowest Capacity Asset DIST R735QTJ8XC9X |
#region imports
from AlgorithmImports import *
#endregion
# 11613 trend0 custom python indicator by Derek Melchin
# inspired by Warren Harding https://www.quantconnect.com/forum/discussion/11613/algo-trend0/p1
class TrendPower(PythonIndicator):
def __init__(self, name, period, power):
self.Name = name
self.period = period
self.power = power
self.Time = datetime.min
self.Value = 0
self.prices = np.array([])
self.times = np.array([])
self.IndicatorIsReady = False
# def Update(self, input):
def Update(self, timeIndex, value):
self.prices = np.append(self.prices, value)[-self.period:]
timeStr = f"{timeIndex.hour}:{timeIndex.minute}:{timeIndex.second}"
self.times = np.append(self.times, timeStr)[-self.period:]
if len(self.prices) != self.period:
self.Value = 0
return False
self.Value = self.calc_trend()
self.IndicatorIsReady = True
return True
def calc_trend(self):
changes = np.array([])
for i in range(len(self.prices) - 1):
_return = (self.prices[i + 1] - self.prices[i]) / self.prices[i]
changes = np.append(changes, _return)
return self.power_weighted_moving_average(changes)
def power_weighted_moving_average(self, changes):
return self.weighted_average(changes, self.power_weights(len(changes)))
def power_weights(self, length):
weights = np.array([])
for i in range(length):
w = i + 1
weights = np.append(weights, w**self.power)
return weights
def weighted_average(self, changes, weights):
products = []
for i in range(len(changes)):
products.append(changes[i] * weights[i])
return sum(products) / sum(weights)#region imports
from AlgorithmImports import *
#endregion
######
# The Trend Power Universe Selector
# ----------------------------------------------------
# Ikezi Kamanu
#
# Entry:
# -------
# Short Stock with the higest positive Trend Power (measured over last 10 hours)
#
# Exit:
# -------
# Exit when Trend power goes negative
#
########################################################################################
from TrendPowerIndicator import *
class TrendPowerUniverseStrategy(QCAlgorithm):
# =====================================
def Initialize(self):
self.SetStartDate(2015, 1, 1)
self.SetCash(10000)
self.SPY = self.AddEquity("SPY", Resolution.Hour)
self.UniverseSettings.Resolution = Resolution.Hour
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.symDataDict = { }
# self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
self.maxHoldings = int(self.GetParameter("maxHoldings"))
self.pctPerHolding = float(self.GetParameter("pctPerHolding"))/100
# =====================================
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
security.SetLeverage(10)
def OnData(self, dataSlice):
# Update Indicators
# ----------------------------
for symbol in dataSlice.Keys:
if symbol in self.symDataDict:
symbolData = self.symDataDict[symbol]
symbolData.UpdateIndicatorsWithBars(dataSlice[symbol])
if( self.Portfolio[symbol].Invested and symbolData.ExitSignalFired() ):
self.Liquidate(symbol)
# If during market hours, Look for opportunity
# ---------------------------------------------
if(self.Securities["SPY"].Exchange.DateTimeIsOpen(self.Time)):
numHoldings = len([x.Key for x in self.Portfolio if x.Value.Invested])
# IF we have less than our max holdings, add more, up to max.
if( numHoldings < self.maxHoldings):
addHoldingsCount = self.maxHoldings - numHoldings
trendersSymData = [self.symDataDict[symb] for symb in self.symDataDict \
if (dataSlice.ContainsKey(symb) and \
(self.symDataDict[symb].TrendPowerValue is not None) and \
(self.symDataDict[symb].TrendPowerValue > 0))]
topTrendersSymData = sorted(trendersSymData, key=lambda symbolData: symbolData.TrendPowerValue, reverse=True)
topToBuy = topTrendersSymData[:addHoldingsCount]
for symbolData in topToBuy:
orderMsg = f"self.indicator.Value = {symbolData.TrendPowerValue}"
self.Log (f"{self.Time} - {orderMsg}")
self.SetHoldings(symbolData.symbol, -self.pctPerHolding, False, orderMsg)
# self.SetHoldings(symbolData.symbol, 0.10)
# =====================================
def CoarseSelectionFunction(self, universe):
coarseuniverse = sorted(universe, key=lambda c: c.DollarVolume, reverse=True)
coarseuniverse = [c for c in coarseuniverse if c.Price > 200][:200]
return [x.Symbol for x in coarseuniverse]
# =====================================
def FineSelectionFunction(self, universe):
self.symDataDict = {} # reset the dictionary
fineUniverse = [x for x in universe if x.SecurityReference.IsPrimaryShare
and x.SecurityReference.SecurityType == "ST00000001"
and x.SecurityReference.IsDepositaryReceipt == 0
and x.CompanyReference.IsLimitedPartnership == 0]
# -------------
# Debug Block:
# -------------
# Force universe to be just AMZN, for Performance comparison
# --------------------------------------------------------------
# amzn = self.AddEquity("AMZN", Resolution.Hour).Symbol
# history = self.History(amzn, 10, Resolution.Hour)
# tmpSymbolData = SymbolData(amzn, history, self)
# if amzn not in self.symDataDict:
# self.symDataDict[amzn] = tmpSymbolData
# return [amzn]
# ---------------------------------------------------------------
selected = []
for element in fineUniverse:
symbol = element.Symbol
history = self.History(symbol, 10, Resolution.Hour)
tmpSymbolData = SymbolData(symbol, history, self)
# Store data for this symbol, seed it with some history
# -----------------------------------------------------
if symbol not in self.symDataDict:
self.symDataDict[symbol] = tmpSymbolData
# If the indicators are ready for this symbol, add to universe
# --------------------------------------------------------------
if tmpSymbolData.IsReady() and tmpSymbolData.EntrySignalFired():
selected.append(symbol)
self.symDataDict[symbol] = tmpSymbolData
return selected
##################################
#
# SymbolData Class
#
##################################
class SymbolData():
# ==========================================
# Constructor. Accepts History array
# ==========================================
def __init__(self, theSymbol, history, algo):
# Algo / Symbol / Price reference
# ----------------------------------------
self.algo = algo
self.symbol = theSymbol
self.lastPrice = 0
# Initialize our trend power indicator
# ----------------------------------------
self.TrendPower = TrendPower('TrendPower', period = 10, power = 1.5)
# Loop over the history data and update the indicator
# -------------------------------------------------------------
if history.empty or 'close' not in history.columns:
return
for index, row in history.loc[theSymbol].iterrows():
tradeBar = TradeBar()
tradeBar.Close = row['close']
tradeBar.Open = row['open']
tradeBar.High = row['high']
tradeBar.Low = row['low']
tradeBar.Volume = row['volume']
tradeBar.Time = index
tradeBar.Period = timedelta(hours=1) # todo: address this. it may not always be correct.
tradeBar.Symbol = theSymbol
self.UpdateIndicatorsWithBars(tradeBar)
# =====================================
def UpdateIndicatorsWithBars(self, tradeBar):
if((tradeBar is not None) and (self.TrendPower is not None)):
self.TrendPower.Update(tradeBar.Time, tradeBar.Close)
# =====================================
@property
def TrendPowerValue(self):
if(self.TrendPower.IndicatorIsReady):
return self.TrendPower.Value
else:
return None
# =====================================
def IsReady(self):
return self.TrendPower.IndicatorIsReady
# =====================================
def EntrySignalFired(self):
return (self.TrendPower.IndicatorIsReady) and (self.TrendPower.Value > 0)
# =====================================
def ExitSignalFired(self):
return (self.TrendPower.IndicatorIsReady) and (self.TrendPower.Value < 0)