| Overall Statistics |
|
Total Orders 2 Average Win 4.94% Average Loss 0% Compounding Annual Return 8021.440% Drawdown 1.600% Expectancy 0 Start Equity 100000 End Equity 104936.72 Net Profit 4.937% Sharpe Ratio 132.787 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 100% Profit-Loss Ratio 0 Alpha 61.431 Beta -1.589 Annual Standard Deviation 0.452 Annual Variance 0.205 Information Ratio 120.982 Tracking Error 0.49 Treynor Ratio -37.81 Total Fees $53.66 Estimated Strategy Capacity $300000.00 Lowest Capacity Asset PRSC SRALEY945R39 Portfolio Turnover 47.06% |
# region imports
from AlgorithmImports import *
# endregion
# Your New Python File
class Consolidator:
period = 7
def __init__(self, symbol, algo):
self.symbol = symbol
self.algo = algo
self.tgt_pct = self.algo.naive_tgt_pct
self.stp_pct = self.algo.naive_stp_pct
self.rsi = self.algo.RSI(self.symbol, self.period, MovingAverageType.Wilders)
# this runs on add, we cannot do this.
# if self.algo.extended_hours and self.algo.res == Resolution.MINUTE:
# # we cant really warm it up here, unless we're using minute, and extended hours (only minute uses ETH)
def warmup_rsi(self):
if self.algo.extended_hours and self.algo.res == Resolution.MINUTE:
history = self.algo.History([self.symbol], self.period + 3, self.algo.res, extendedMarketHours=self.algo.extended_hours).loc[self.symbol]
for row in history.itertuples():
self.rsi.Update(row.Index, row.close)
self.algo.Debug(f'warming up... with {row.Index}, {row.close}')
def profit_target(self):
target_threshold = self.tgt_pct
symbol = self.symbol
holding = self.algo.Portfolio[symbol]
if target_threshold == 0.0: return
urpnl_pct = holding.UnrealizedProfitPercent
# Check if the current price is above the target threshold
if urpnl_pct > target_threshold:
self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} > {target_threshold}")
self.algo.Liquidate(symbol, tag=f"TP -- {urpnl_pct*100}% gain")
def stop_loss(self):
stp_threshold = self.stp_pct
symbol = self.symbol
holding = self.algo.Portfolio[symbol]
if stp_threshold == 0.0: return
stp_threshold = -1 * abs(stp_threshold)
urpnl_pct = holding.UnrealizedProfitPercent
# Check if the current price is above the target threshold
if urpnl_pct < stp_threshold:
self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} < {stp_threshold}")
self.algo.Liquidate(symbol, tag=f"SL -- {urpnl_pct*100}%")
def on_bar(self, bar):
self.rsi.Update(bar.EndTime, bar.Close)
@property
def warmed_up(self):
return self.rsi.IsReady
@property
def RSI(self):
if self.warmed_up:
return self.rsi.current.value
else:
return 0.0 # Bit of a hack# region imports
from AlgorithmImports import *
# endregion
class Graveyard:
# PROBABLY want to do this w scheduler, at 4am, 9:30am
# DONT know if this will work! BC we may not have todays history data TODAY!
def try_get_gaps(self, symbols: List[str], assert_date: bool = True):
gap_data_by_symbol = {}
today = self.Time.date()
# Get daily data for prior close (RTH only)
daily = self.History(symbols, timedelta(days=3), Resolution.Daily)
history_minute = self.History(symbols, timedelta(days=3), Resolution.Minute)
history_minute_eth = self.History(symbols, timedelta(days=3), Resolution.Minute, extendedMarketHours=True)
# Get the latest date in the minute data (today's date)
latest_minute_date = history_minute.index.get_level_values('time').date.max()
# Check if latest minute data date matches today's date
if latest_minute_date != today and assert_date:
self.Debug(f"Warning: Data is not from today! Latest data date: {latest_minute_date}, Expected date: {today}")
return None
# Filter today's minute data for RTH and ETH
today_minute_data_rth = history_minute[history_minute.index.get_level_values('time').date == latest_minute_date]
today_minute_data_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date]
prior_close_rth = daily.groupby('symbol')['close'].last()
# Get the last close price from ETH for prior day
prior_close_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date - timedelta(days=1)]
prior_close_eth = prior_close_eth.groupby('symbol')['close'].last()
# Get today's ETH open (first price of the day during ETH)
today_open_eth = today_minute_data_eth.groupby('symbol')['open'].first()
# Get today's RTH open (first price during regular trading hours)
today_open_rth = today_minute_data_rth.groupby('symbol')['open'].first()
# Loop through symbols and store all values in a result object
for symbol in symbols:
try:
# Calculate the gaps
gap_rth = ((today_open_rth[symbol] - prior_close_rth[symbol]) / prior_close_rth[symbol]) * 100
gap_eth = ((today_open_eth[symbol] - prior_close_eth[symbol]) / prior_close_eth[symbol]) * 100
# Store the calculated data for each symbol
gap_data_by_symbol[symbol] = {
"gap_rth": gap_rth,
"gap_eth": gap_eth,
"today_open_rth": today_open_rth[symbol],
"today_open_eth": today_open_eth[symbol],
"prior_close_rth": prior_close_rth[symbol],
"prior_close_eth": prior_close_eth[symbol]
}
print(gap_data_by_symbol)
except KeyError:
self.Debug(f"Symbol {symbol} data not available for gap calculation.")
return gap_data_by_symbol
def RunGapCalculation(self):
symbols = list(self.symbol_data.keys())
gap_data = self.try_get_gaps(symbols)
if gap_data:
self.Debug(f"Gap Data: {gap_data}")
self.gap_data = gap_data
else:
self.Debug("No gap data available.")
self.gap_data = {}# region imports
from AlgorithmImports import *
from Consolidator import Consolidator
# endregion
"""
Capitalization notes
mega cap -- 200b+
(big 5)
large cap -- 10b - 200b
(mcdonalds, nike)
mid cap -- 2b - 10b
(etsy, roku)
small cap -- 300m - 2b
(five below, cargurus)
micro cap -- 50m - 300m
"""
market_cap_categories = {
"Mega Cap": {"min": 2.00e11, "max": float('inf')},
"Large Cap": {"min": 1.00e10, "max": 2.00e11},
"Mid Cap": {"min": 2.00e9, "max": 1.00e10},
"Small Cap": {"min": 3.00e8, "max": 2.00e9},
"Micro Cap": {"min": 5.00e7, "max": 3.00e8},
"Nano Cap": {"min": 0, "max": 5.00e7}
}
"""
TODO: replace this big stupid ass dictionary with our consolidator class + fields!
move some of the calculations to that class, as well, even as statics.
Add targets! I think stops are probably a bad idea, maybe impossible, but we can certainly try.
consider adding filters for signal, say high of day for longs, etc.
"""
class EnergeticVioletShark(QCAlgorithm):
# Basic Universe Params
max_univ_price = 20
min_univ_price = 5
max_market_cap = market_cap_categories['Micro Cap']['max']
min_market_cap = market_cap_categories['Micro Cap']['min']
# Finer Universe Filters
volume_lookback_days = 7
# volume multiple is today vs avg of X days
min_gap_pct = 15
# Long filters
min_premkt_vol_mult = 1.0
min_regmkt_vol_mult = 1.5
min_rsi = 50
# We keep the top n1 by premkt volume multiple (todays premkt volume / avg premkt, by resolution, averaged)
n1 = 40
# we keep the top n2 by regmkt volume multiple (todays regmkt volume / avg regmkt, by resolution, averaged)
n2 = 10
# Short filter (not enabled yet)
max_regmkt_vol_mult = 1.5
# If target or stop is 0.0, not used.
# .05 = 5%
naive_tgt_pct = 0.05
# naive stp pct can be positive or negative, it's taken as abs, and used as negative
naive_stp_pct = 0.00
res = Resolution.MINUTE
# testing only
debug_lvl = 1
flip_short = False
def initialize(self):
self.set_start_date(2024, 10, 1)
self.set_end_date(2024, 10, 5)
self.set_cash(100_000)
self.bm = self.add_equity("SPY", self.res).symbol
self.AddUniverse(self.SelectCoarse, self.SelectFine)
self.extended_hours = True
# self.UniverseSettings.Resolution = self.res
if self.extended_hours:
self.universe_settings.extended_market_hours = True
# self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.RunGapCalculation)
# self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(5, 00), self.AfterETHOpen) # DOES NOT WORK
self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.AfterRTHOpen)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 1), self.EODX)
self.symbol_list = []
self.symbol_data = {}
self.gap_data = {}
self.longs = {}
self.shorts = {}
# region Event Handlers
def EODX(self):
self.liquidate(tag="EOD Exit")
"""
def AfterETHOpen(self):
symbols = list(self.symbol_data.keys())
self.gap_data = self.get_prior_closes(symbols)
data = self.current_slice
if data.contains_key(self.bm):
bars = data.bars
for symbol in symbols:
if data.contains_key(symbol):
eth_open = bars[symbol].open
self.gap_data[symbol]['eth_open'] = eth_open
self.Debug(f'updated {symbol} at time {self.Time} with eth open: {eth_open}')
else:
self.Debug(f'failed to update {symbol} at time {self.time} -- dropping.')
# self.gap_data.pop(symbol)
# SO little will work in ETH, so don't do it.
else:
self.debug("no data... wtf (ETH)")
"""
def AfterRTHOpen(self):
symbols = list(self.symbol_data.keys())
try:
self.gap_data = self.get_prior_closes(symbols)
except:
self.Debug(f'failed to get any data... strange.')
return
data = self.current_slice
if data.contains_key(self.bm):
bars = data.bars
for symbol in symbols:
if symbol not in self.gap_data: continue
if data.contains_key(symbol):
_open = bars[symbol].open
_prior_rth_close = self.gap_data[symbol].get('prior_close_rth', None)
self.gap_data[symbol]['rth_open'] = _open
if _prior_rth_close:
_gap_rth_pct = ((_open - _prior_rth_close) / _prior_rth_close) * 100
if _gap_rth_pct > self.min_gap_pct:
self.gap_data[symbol]['rth_gap_pct'] = _gap_rth_pct
if self.debug_lvl > 2: self.Debug(f'{str(symbol)} --> viable gap pct: {_gap_rth_pct}')
else:
if self.debug_lvl > 2: self.Debug(f'{str(symbol)} dropped -- nonviable gap {_gap_rth_pct}')
self.gap_data.pop(symbol)
continue
if self.debug_lvl > 2: self.Debug(f'updated {symbol} at time {self.Time} with rth open: {_open}')
# RSI Checks
symbol_data = self.symbol_data.get(symbol, None)
if symbol_data:
# NOTE -- we could try this, warmup w extended NOW
# DOES not work.... strange.
# symbol_data.warmup_rsi()
rsi_value = symbol_data.RSI
if rsi_value > self.min_rsi:
self.gap_data[symbol]['rsi'] = rsi_value
else:
self.gap_data.pop(symbol, None)
else:
self.gap_data.pop(symbol, None)
else:
# self.Debug(f'failed to update {symbol} at time {self.time} -- dropping')
if symbol in self.gap_data:
self.gap_data.pop(symbol)
if symbol in self.symbol_data:
self.unsubscribe(symbol)
else:
self.debug("no data... wtf")
return
# ----------- NOTE -- we're now down with symbol data
# thats taken us as far as we can get with traditional flow.
# the rest is history call hacks to grab ETH data day of, prior to this moment.
self.Debug(f'viable gappers: {len(self.gap_data)}')
to_fetch = list(self.gap_data.keys())
if len(to_fetch) == 0:
return
try:
volume_multiples_df = self.fetch_volume_multiples(to_fetch).dropna()
except:
return
if volume_multiples_df.empty:
return
self.Debug(f'volume multiples df shape -- {volume_multiples_df.shape}')
pre_mkt = volume_multiples_df.loc[4:8]
reg_mkt = volume_multiples_df.loc[8:]
# ---------------------------- ENTRY filter
pma = pre_mkt.mean()
rma = reg_mkt.mean()
maybe_longs = []
maybe_shorts = []
# Maybe easier to filter HERE, we should have this data now...
for symbol in to_fetch:
_pma = pma[symbol]
_rma = rma[symbol]
ok_for_long = _pma > self.min_premkt_vol_mult and _rma > self.min_regmkt_vol_mult
ok_for_short = _pma > self.min_premkt_vol_mult and _rma < self.max_regmkt_vol_mult
# NOTE -- we want to find things that are NOT in this _rma > min reg for SHORTS !
if ok_for_long or ok_for_short:
data = self.gap_data[symbol]
data['volume_df'] = volume_multiples_df[symbol]
data['pre_mkt_mult'] = _pma
data['reg_mkt_mult'] = _rma
if ok_for_long:
maybe_longs.append(symbol)
if ok_for_short:
maybe_shorts.append(symbol)
else:
if symbol in self.gap_data:
self.gap_data.pop(symbol)
if symbol in self.symbol_data:
self.unsubscribe(symbol)
# LONG filtering!
# for longs we want to find the LARGEST mult reg mkt, WITH big pre mkt volume.
# WE want something that is on pace for a 5x multiple, really.
top_by_pma = self.get_top_by_key(self.gap_data, key='pre_mkt_mult', asc=False, n=self.n1)
top_by_rma = self.get_top_by_key(self.gap_data, key='reg_mkt_mult', asc=False, n=self.n2)
# Extract the symbols from both lists
pma_symbols = {item[0] for item in top_by_pma}
rma_symbols = {item[0] for item in top_by_rma}
common_symbols = pma_symbols & rma_symbols
self.longs = {symbol: self.gap_data[symbol] for symbol in common_symbols if symbol in maybe_longs}
# SHORTS we want to sort on the LARGEST gap, and find a WEAK volume for. it's a bit different of a pipeline, really.
# self.shorts = {symbol: self.gap_data[symbol] for symbol in common_short_symbols if symbol in maybe_shorts}
# NOW we need to do high of day tracking, probably -- fuck man.
n_longs = len(self.longs)
if n_longs == 0: return
mult = -1 if self.flip_short else 1
full_wt = .95
wt = mult * full_wt / n_longs
self.Debug(f'# entries: {n_longs}')
for symbol, data in self.longs.items():
self.Debug(f'buying: {data}')
self.set_holdings(symbol, wt)
@staticmethod
def get_top_by_key(gap_data, key='reg_mkt_mult', asc=True, n=5):
return sorted(gap_data.items(), key=lambda x: x[1][key], reverse=not asc)[:n]
# endregion
# region Universe
def SelectCoarse(self, coarse):
filtered = [ x for x in coarse if x.Price < self.max_univ_price and x.Price > self.min_univ_price]
filtered = [x for x in filtered if x.HasFundamentalData and x.MarketCap < self.max_market_cap and x.MarketCap > self.min_market_cap]
chosen = [x.Symbol for x in filtered]
if self.debug_lvl > 0:
_symbol_str_list = [str(i) for i in chosen]
self.Debug(f'Universe ( {len(chosen)} symbols )')
self.symbol_list = chosen
return self.symbol_list
def SelectFine(self, fine):
return [f.Symbol for f in fine]
def on_securities_changed(self, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
self.symbol_data[symbol] = Consolidator(symbol, self)
for security in changes.RemovedSecurities:
# Handle the removed securities if necessary
if security.Symbol in self.symbol_data:
self.unsubscribe(security.Symbol)
def unsubscribe(self, symbol):
self.RemoveSecurity(symbol)
self.symbol_data.pop(symbol)
# endregion
def on_data(self, data: Slice):
if not data.contains_key(self.bm): return
n_symbols = len(self.symbol_data)
for symbol, instance in self.symbol_data.items():
if not data.contains_key(symbol): continue
trade_bar = data[symbol]
instance.on_bar(trade_bar)
maybe_rsi = instance.rsi.current.value if instance.warmed_up else 0
if self.debug_lvl >= 2: self.Debug(f"{self.time} -- {symbol}: warmed up? {instance.warmed_up} -- rsi {maybe_rsi}")
if self.portfolio[symbol].invested:
instance.profit_target()
instance.stop_loss()
# region Helpers
def get_prior_closes(self, symbols: List[str]):
prior_closes_by_symbol = {}
daily_rth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=False)
daily_eth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=True)
# Use the latest available date in the data
latest_rth = daily_rth.groupby('symbol')['close'].last()
latest_eth = daily_eth.groupby('symbol')['close'].last()
# Loop through symbols and store the prior closes in the result object
for symbol in symbols:
try:
# Store prior RTH and ETH close for each symbol
prior_closes_by_symbol[symbol] = {
"prior_close_rth": latest_rth[symbol],
"prior_close_eth": latest_eth[symbol]
}
except KeyError:
self.Debug(f"Symbol {symbol} data not available for prior close calculation.")
return prior_closes_by_symbol
def fetch_volume_multiples(self, symbols: List[str]):
# Fetch minute-level historical data for the past 7 days with extended market hours
history_minute = self.History(symbols, timedelta(days=self.volume_lookback_days), Resolution.Minute, extendedMarketHours=True)
history_minute_flat = history_minute.reset_index()
if history_minute.empty:
self.Debug("History minute data is empty")
return pd.DataFrame()
history_minute_flat = history_minute_flat.set_index('time')
# Resample to hourly intervals for all symbols at once, summing the volume
history_hourly_resampled = history_minute_flat.groupby('symbol').resample('H').agg({'volume': 'sum'}).reset_index()
last_date = history_hourly_resampled['time'].dt.date.max()
# Split data into 'today' (last available date) and 'before_today' for all symbols
today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date == last_date].copy()
before_today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date < last_date].copy()
# Create 'hour' column for grouping purposes
today_data['hour'] = today_data['time'].dt.hour
before_today_data['hour'] = before_today_data['time'].dt.hour
# Calculate the average volume per hour for past days, grouped by symbol and hour
avg_volume_before_today = before_today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack()
# Calculate today's volume by hour for each symbol
avg_volume_today = today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack()
# Reindex both DataFrames to ensure all hours (0-23) are present for each symbol
all_hours = range(24)
avg_volume_before_today = avg_volume_before_today.reindex(columns=all_hours, fill_value=np.nan)
avg_volume_today = avg_volume_today.reindex(columns=all_hours, fill_value=np.nan)
# Calculate the safe volume multiples (today's volume / average past volume)
volume_multiples = avg_volume_today / avg_volume_before_today
# Display the results
vol_mults_safe = volume_multiples.T
return vol_mults_safe
def _check_today_in_hist(self):
symbols = ["SPY"]
today = self.Time.date()
history = self.History(symbols, timedelta(days=3), Resolution.MINUTE)
latest_date = history.index.get_level_values('time').date.max()
if latest_date == today:
self.Debug(f"History includes today's data: {latest_date}")
else:
self.Debug(f"History does not include today's data. Latest data is from: {latest_date}")
# endregion