| Overall Statistics |
|
Total Orders 62 Average Win 7.24% Average Loss -2.52% Compounding Annual Return 1824.749% Drawdown 18.400% Expectancy 0.523 Start Equity 100000 End Equity 127516.09 Net Profit 27.516% Sharpe Ratio 12.108 Sortino Ratio 23.287 Probabilistic Sharpe Ratio 69.300% Loss Rate 61% Win Rate 39% Profit-Loss Ratio 2.88 Alpha 15.015 Beta -3.517 Annual Standard Deviation 1.161 Annual Variance 1.348 Information Ratio 11.656 Tracking Error 1.183 Treynor Ratio -3.997 Total Fees $0.00 Estimated Strategy Capacity $5000.00 Lowest Capacity Asset SNTG XPY9HNNNXVFP Portfolio Turnover 157.50% |
# region imports
from AlgorithmImports import *
# endregion
'''
We need to do the 'gap' check -- lets do this with a scheduled event on open.
then we need to do a continuous check of pct daily change -- lets reference the open price (store it in the SymbolData, on OnStart event)
1% trailing
3% fixed stop.
Exit EOD
'''
from datetime import timedelta
class TechnicalUniverseAlgorithm(QCAlgorithm):
gap_pct = 0 # Requires a gap > this, or a gap < -1 * this (daily gap)
# day_chg_pct = 0 # requires a day pct change (open of day to current) > this, or < -1 * this.
# this serves no purpose anymore.. (top_gaps)
top_gaps = 50
top_final = 1
lvg = 1.0
direction = -1
# 1 == long only.
# -1 == short only.
# need to think more about this...
# cont = 1
stop_pct = -0.01
hedge_pct = .05
def Initialize(self):
'''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.'''
# self.SetStartDate(2023,11,29) #Set Start Date
self.SetStartDate(2024, 1, 1)
self.SetEndDate(2024,1,30) #Set End Date
self.SetCash(100000) #Set Strategy Cash
# Confused about universe resolution, why is it not minute?
self.UniverseSettings.Resolution = Resolution.Daily
self.UniverseSettings.Leverage = self.lvg
self.SetSecurityInitializer(self.CustomSecurityInitializer)
self.bm = self.AddEquity('SPY').Symbol
self.coarse_count = 10
self.averages = { }
# These are lists of SymbolData instances.
self.Above = []
self.Below = []
self.BelowSymbols = []
self.AboveSymbols = []
self.Univ = []
self.entries_by_date = {}
self.warmed_up = False
self.set_security_initializer(MySecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)))
self.SetWarmup(timedelta(days=1))
self.AddUniverse(self.CoarseSelectionFilter)
# self.AddUniverse(self.CoarseSelectionFilter, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 1), # try 1 after? maybe 0?
self.OnMarketOpen)
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.BeforeMarketClose("SPY", 1),
self.EOD)
# region Schedule Handlers
def CustomSecurityInitializer(self, security):
security.SetLeverage(self.lvg)
def EOD(self):
self.Liquidate()
# Capture open of day metrics, and prune the universe.
def OnMarketOpen(self):
data = self.CurrentSlice
if data is None: return
both = self.Above + self.Below
# self.Log(f'Both: {len(both)}')
for inst in both:
symbol = inst.symbol
# if not data.Bars.ContainsKey(symbol): continue
# bar = data.Bars[symbol]
try:
bar = data.Bars[symbol]
except:
# Why is there never data at these times, that is SO rare that there owuld be no data 5 min into the open.
# self.Log(f'no data -- bc there never is {symbol}')
# self.Log(f'{inst.last_close} -- {inst.sma.Current.Value}')
continue
inst.day_open = bar.Open
inst.open_volume = bar.Volume
# self.Log(f'Gap Pct: {inst.GapPct}')
if inst in self.Above:
if inst.GapPct < self.gap_pct:
self.Above.remove(inst)
self.AboveSymbols.remove(symbol)
# TODO: could also remove from Securities, here -- make it 'faster'
if inst in self.Below:
if inst.GapPct > -1 * self.gap_pct:
self.Below.remove(inst)
self.BelowSymbols.remove(symbol)
# self.Log(f'Top: {len(self.Above)}')
# self.Log(f'Btm: {len(self.Below)}')
# endregion
# region Universe
def CoarseSelectionFilter(self, coarse: List[Fundamental]):
coarse = [x for x in coarse if x.CompanyProfile.SharesOutstanding > 1e6] # Maybe?
coarse = [x for x in coarse if x.DollarVolume > 1e6 * x.AdjustedPrice]
coarse = [x for x in coarse if x.CompanyProfile.MarketCap > 3e6]
if not self.warmed_up:
history = self.History([c.Symbol for c in coarse], 50, Resolution.Daily) # Why failing?
for c in coarse:
symbol = c.Symbol
try:
# if symbol in df.index:
df = history.loc[symbol]
except:
try:
hist = self.History(symbol, 50, Resolution.Daily)
df = hist.loc[symbol]
except:
continue
for idx, row in df.iterrows():
if symbol not in self.averages:
self.averages[symbol] = SymbolData(symbol)
avg = self.averages[symbol]
avg.update(idx, row['close'])
self.warmed_up = True
else:
for cf in coarse:
if cf.Symbol not in self.averages:
self.averages[cf.Symbol] = SymbolData(cf.Symbol)
# Updates the SymbolData object with current EOD price
avg = self.averages[cf.Symbol]
avg.update(cf.EndTime, cf.AdjustedPrice)
# Filter the values of the dict: we only want up-trending securities
self.Above = list(filter(lambda x: x.above, self.averages.values()))
self.Below = list(filter(lambda x: x.below, self.averages.values()))
self.AboveSymbols = [i.symbol for i in self.Above]
self.BelowSymbols = [i.symbol for i in self.Below]
# self.Log(f'Universe (Above): {len(self.Above)}')
# we need to return only the symbol objects
return [ x.symbol for x in self.Above + self.Below ]
# def FineSelectionFunction(self, fine: List[FineFundamental]) -> List[str]:
# # Filter securities with a calculated market cap greater than $500 million
# filtered = [
# f.Symbol for f in fine if f.ValuationRatios.MarketCap > 5e8
# ]
# return filtered
def OnSecuritiesChanged(self, changes):
# liquidate removed securities
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol)
self.Log(f'Added: {len(changes.AddedSecurities)}')
# endregion
def OnData(self, slice):
self.ExitLogic()
if self.time.date not in self.entries_by_date:
self.EntryLogic()
if self.portfolio.invested:
self.entries_by_date[self.time.date] = True
# region entry + exit
def EntryLogic(self):
# if self.Portfolio.Invested: return
data = self.CurrentSlice
data = data.Bars
both = self.Above + self.Below
for inst in both:
if data.ContainsKey(inst.symbol):
c = data[inst.symbol].Close
inst.DayPctReturn(c)
# # Extra remove of 0's -- dangerous.
n_set = len([i for i in both if i.GapPct > 0])
n_set_below = len([i for i in both if i.GapPct < 0])
self.Log(f'pos gaps: {n_set}')
self.Log(f'neg gaps: {n_set_below}')
if n_set == 0 and n_set_below == 0: return
largest_pos_gaps = [i for i in sorted(both, key=lambda x: x.GapPct, reverse=True)][:self.top_gaps] # rev = descending, want largest first.
largest_neg_gaps = [i for i in sorted(both, key=lambda x: x.GapPct, reverse=False)][:self.top_gaps] # keep the smallest 10 (most negative)
# top_symbols = [i.symbol for i in largest_pos_gaps]
top_all = [(i.symbol, i.GapPct) for i in largest_pos_gaps]
# if top_all != []:
# self.Log(f'Top: {top_all}')
btm_all = [(i.symbol, i.GapPct) for i in largest_neg_gaps]
# if btm_all != []:
# self.Log(f'Btm: {btm_all}')
# Largest gaps up, we really want to SHORT them (I think)
# we want to divide the leveraged amount between the top
allocation = (.99 * self.lvg - self.hedge_pct) / self.top_final
long_short = self.direction == 0
# (we use top twice, in long short -- so divide allocation by 2)
if long_short: allocation /= 2
# This is the point we know what we are buying, and at what scale.
if not self.Portfolio.Invested:
# Short Fades
# THIS also conflicts with the direction! we need to change this to include cont.
# short_fade_sign = 1 if self.cont else -1
if self.direction <= 0:
for symbol, gap in top_all[:self.top_final]:
self.SetHoldings(symbol, -1 * allocation)
self.log(f'SELLING large gap in {symbol}')
# Long Fades
if self.direction >= 0:
for symbol, gap in btm_all[:self.top_final]:
self.SetHoldings(symbol, allocation)
self.log(f'BUYING large gap down in {symbol}')
# hedge_alloc = self.hedge_pct * self.direction * -1
# if long_short:
# self.SetHoldings(self.bm, hedge_alloc )
def ExitLogic(self):
invested = [symbol for symbol, holding in self.Portfolio.items() if holding.Invested]
if self.stop_pct != 0:
for symbol in invested:
urpnl = self.portfolio[symbol].unrealized_profit_percent
if urpnl < self.stop_pct:
self.liquidate(symbol, tag=f"SL -- {urpnl}")
# self.Log(f'Testing -- {symbol} pnl%: {urpnl}')
# endregion
class MySecurityInitializer(BrokerageModelSecurityInitializer):
def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None:
super().__init__(brokerage_model, security_seeder)
def initialize(self, security: Security) -> None:
# First, call the superclass definition
# This method sets the reality models of each security using the default reality models of the brokerage model
super().initialize(security)
# Next, overwrite some of the reality models
security.set_fee_model(ConstantFeeModel(0))
class SymbolData(object):
def __init__(self, symbol):
self.symbol = symbol
self.sma = SimpleMovingAverage(50)
self.above = False
self.below = False
self.last_close = None
self.day_open = None
self.day_ret_pct = None
def update(self, time, value):
self.sma.Update(time, value)
self.last_close = value
if self.sma.IsReady:
self.above = value > self.sma.Current.Value
self.below = value < self.sma.Current.Value
@property
def GapPct(self):
if self.last_close and self.day_open:
return ((self.day_open - self.last_close) / self.last_close) * 100
else:
return 0
def DayPctReturn(self, current_close):
if self.day_open:
self.day_ret_pct = ((current_close - self.day_open) / self.day_open) * 100
return self.day_ret_pct
else:
return 0
#region imports
from AlgorithmImports import *
#endregion
# Your New Python File
'''
for above_obj in self.Above[:]:
symbol = above_obj.symbol
if not data.Bars.ContainsKey(symbol): continue
bar = data.Bars[symbol]
above_obj.day_open = bar.Open
if above_obj.GapPct < self.gap_pct:
self.Above.remove(above_obj)
self.AboveSymbols.remove(symbol)
else:
above_obj.open_volume = bar.Volume
for below_obj in self.Below[:]:
symbol = below_obj.symbol
if not data.Bars.ContainsKey(symbol): continue
bar = data.Bars[symbol]
below_obj.day_open = bar.Open
if below_obj.GapPct > -1 * self.gap_pct: # Negative gap desired.
self.Below.remove(above_obj)
self.BelowSymbols.remove(symbol)
else:
below_obj.open_volume = bar.Volume
'''