| Overall Statistics |
|
Total Trades 126 Average Win 0.07% Average Loss -0.09% Compounding Annual Return -24.263% Drawdown 2.600% Expectancy -0.382 Net Profit -2.258% Sharpe Ratio -6.879 Probabilistic Sharpe Ratio 0.178% Loss Rate 64% Win Rate 36% Profit-Loss Ratio 0.73 Alpha -0.235 Beta -0.118 Annual Standard Deviation 0.03 Annual Variance 0.001 Information Ratio 0.271 Tracking Error 0.17 Treynor Ratio 1.747 Total Fees $126.84 Estimated Strategy Capacity $4200000.00 |
#from riskManagement import *
from datetime import timedelta
class EMAMomentumUniverse(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 1, 1)
self.SetEndDate(2015, 2, 1)
#self.SetEndDate(2019, 4, 1)
self.SetCash(100000)
self.SetBenchmark("SPY")
self.UniverseSettings.Resolution = Resolution.Minute
#setting the coarse filter for investment universe
self.AddUniverse(self.CoarseSelectionFunction)
#self.AddUniverseSelection(LiquidValueUniverseSelectionModel())
#self.AddRiskManagement( ProtecOptions() )
#declaring dictionary averages
#self.SetRiskManagement(MaximumDrawdownPercentPerSecurityCustom(0.10))
self.SetExecution(ImmediateExecutionModel())
self.averages = { }
# self.underlyingsymbol = equity.Symbol
self.hist = RollingWindow[float](390*22)
self.contract = None
self.SetSecurityInitializer(self.security_initializer)
self.buys = []
self.sells = []
self.contract_by_equity = {}
def security_initializer(self, security):
if security.Type == SecurityType.Equity:
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
elif security.Type == SecurityType.Option:
security.SetMarketPrice(self.GetLastKnownPrice(security))
def CoarseSelectionFunction(self, universe):
#Main output, creating a list where the below applies
selected = []
#Sort by dollar volume using lambda function, declare universe as EQTY > $10
universe = sorted(universe, key=lambda c: c.DollarVolume, reverse=True)
universe = [c for c in universe if c.Price > 10][:100]
#loop for all stocks in universe, uses all coarse data
for coarse in universe:
symbol = coarse.Symbol
#Check for instance of SelectionData for this symbol in averages dictionary
if symbol not in self.averages:
# 1. Call history to get an array of 200 days of history data
history = self.History(symbol, 300, Resolution.Daily)
if history.empty or 'close' not in history.columns:
continue
#2. Create new instance of SelectionData with the 'history' result
self.averages[symbol] = SelectionData(history.loc[symbol].close)
#Update symbol with latest coarse.AdjustedPrice data \\ accesing method and pass params
self.averages[symbol].update(self.Time, coarse.AdjustedPrice)
#Check if indicators are ready, and that the 50 day EMA is > the 200 day EMA; then add to list 'selected'
#Access property of class as dictionary item
if self.averages[symbol].is_ready() and self.averages[symbol].fast > self.averages[symbol].slow:
selected.append(symbol)
#update the selected list with the top 10 results
return selected[:10]
def OnData(self, data):
sells = self.sells.copy()
for security in sells:
self.Liquidate(security.Symbol) # Is this necessary??
if security.Symbol in self.contract_by_equity:
contract = self.contract_by_equity.pop(security.Symbol)
self.Liquidate(contract)
self.sells.remove(security)
buys = self.buys.copy()
for security in buys:
if data.ContainsKey(security.Symbol) and data[security.Symbol] is not None and security.Type == SecurityType.Equity:
self.SetHoldings(security.Symbol, 0.05)
self.contract_by_equity[security.Symbol] = self.BuyPut(security.Symbol)
self.buys.remove(security)
#Method for monitoring if universe has changed
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
self.buys.append(security)
for security in changes.RemovedSecurities:
self.sells.append(security)
#def SellCall(self, sec):
'''def SellCall(self):
#self.symbol = sec
contracts = self.OptionChainProvider.GetOptionContractList(self.symbol, self.Time)
self.Debug(f"SellCall: {len(contracts)}")
if len(contracts) == 0: return
min_expiry = 0
max_expiry = 40
filtered_contracts = [i for i in contracts if min_expiry <= (i.ID.Date.date() - self.Time.date()).days <= max_expiry]
call = [x for x in filtered_contracts if x.ID.OptionRight == 0]
if len(call) == 0: return
# sorted the contracts according to their expiration dates and choose the ATM options
price = self.Securities[self.symbol].Price
self.contract = sorted(sorted(call, key = lambda x: abs(price - x.ID.StrikePrice)),
key = lambda x: x.ID.Date, reverse=True)[0]
self.AddOptionContract(self.contract, Resolution.Minute)
self.MarketOrder(self.contract, -1)
'''
#def BuyPut(self, sec):
def BuyPut(self, symbol):
contracts = self.OptionChainProvider.GetOptionContractList(symbol, self.Time)
self.Debug(f"BuyPut: {len(contracts)}")
#contracts = self.OptionChainProvider.GetOptionChains(self.Symbol, self.Time.date())
if len(contracts) == 0: return
min_expiry = 0
max_expiry = 40
filtered_contracts = [i for i in contracts if min_expiry <= (i.ID.Date.date() - self.Time.date()).days <= max_expiry]
put = [x for x in filtered_contracts if x.ID.OptionRight == 1]
if len(put) == 0: return
price = self.Securities[symbol].Price
# sorted the contracts according to their expiration dates and choose the ATM options
self.contract = sorted(sorted(put, key = lambda x: abs(price - x.ID.StrikePrice)),
key = lambda x: x.ID.Date, reverse=True)[0]
self.AddOptionContract(self.contract, Resolution.Minute)
self.MarketOrder(self.contract, 1)
return self.contract
class SelectionData(object):
def __init__(self, closes):
self.tolerance = 1.01
self.fast = ExponentialMovingAverage(100)
self.slow = ExponentialMovingAverage(300)
self.is_uptrend = False
self.scale = 0
for time, close in closes.iteritems():
self.fast.Update(time, close)
self.slow.Update(time, close)
def update(self, time, value):
if self.fast.Update(time, value) and self.slow.Update(time, value):
fast = self.fast.Current.Value
slow = self.slow.Current.Value
self.is_uptrend = fast > slow * self.tolerance
if self.is_uptrend:
self.scale = (fast - slow) / ((fast + slow) / 2.0)
def is_ready(self):
return self.slow.IsReady