| Overall Statistics |
|
Total Orders 12 Average Win 0.49% Average Loss -0.62% Compounding Annual Return -3.065% Drawdown 0.800% Expectancy -0.104 Start Equity 250000 End Equity 249339.87 Net Profit -0.264% Sharpe Ratio -3.24 Sortino Ratio -17.866 Probabilistic Sharpe Ratio 26.097% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 0.79 Alpha -0.075 Beta -0.018 Annual Standard Deviation 0.024 Annual Variance 0.001 Information Ratio -1.728 Tracking Error 0.097 Treynor Ratio 4.364 Total Fees $234.63 Estimated Strategy Capacity $120000.00 Lowest Capacity Asset SIDU XUBT0M6O6L45 Portfolio Turnover 9.06% Drawdown Recovery 0 |
"""
STRAT-1 Trading Algorithm for QuantConnect
"""
from AlgorithmImports import *
from datetime import timedelta, time
import numpy as np
class Strat1Algorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2024, 1, 1)
self.SetEndDate(2024, 1, 31)
self.SetCash(250000)
# ========== WATCHLIST SCREENING CRITERIA (Applied at midnight) ==========
# MUST MATCH POLYGON EXACTLY
self.VOL_DAYS = 1
self.MIN_VOL1 = 100_000_000 # Minimum dollar volume ($100M) - EXACT
self.APTR_DAYS = 14
self.MIN_APTR14 = 7 # Minimum ATR% (7%) - EXACT
self.BMU_DAYS = 15
self.MIN_BMU_PCT = 0.5 # Minimum bottom-up move (50%) - EXACT MATCH TO POLYGON
self.SMA_PERIODS = [10]
self.MAX_50_SMA_EXTENSION = 4 # Max ATRs above 50 SMA - EXACT MATCH TO POLYGON
self.TOP_N = 50 # Top N stocks by volume
# ========== EXECUTION CRITERIA (Applied during market hours) ==========
self.PHOD_REQUIRED = 1 # 1 = require price > PHOD, 0 = ignore
self.ORH_REQUIRED = 1 # 1 = require price > ORH, 0 = ignore
self.PMH_REQUIRED = 1 # 1 = require price > PMH, 0 = ignore
self.EOD_SELL_OPTION = 1 # 1 = force sell at EOD, 0 = hold overnight
self.LOD_STOP_ENABLED = 0 # 1 = exit if price breaks LOD, 0 = ignore
self.MAX_DRAWDOWN_PCT = 10 # Max daily drawdown % before stopping
self.MAX_VOLUME_PCT = 0.25 # Max position as % of first 5-min volume
# ========== POSITION MANAGEMENT ==========
self.INITIAL_EQUITY = 250000
self.BUY_PERCENT = 0.35 # Position size (35% of portfolio)
self.STOP_PCT = 0.05 # Stop loss (5%)
self.MAX_POSITIONS = 10 # Maximum concurrent positions
# ========== TECHNICAL INDICATORS ==========
self.FAST_EMA_PERIOD = 6 # Fast EMA for MACD
self.SLOW_EMA_PERIOD = 20 # Slow EMA for MACD
self.MACD_SIGNAL_PERIOD = 9 # MACD signal line
self.SELL_EMA_PERIOD = 6 # EMA for exit signal
self.SELL_EMA_AGG = 5 # Time aggregation in minutes for exit EMA
# STATE
self.symbols = {}
self.watchlist = []
self.trades = []
self.triggered = set()
self.last_screen = None
# SPY for market hours
self.AddEquity("SPY", Resolution.Daily)
# SCHEDULES (Trading days only)
# Note: Universe selection runs at midnight automatically
self.Schedule.On(
self.DateRules.EveryDay("SPY"),
self.TimeRules.At(9, 35),
self.CaptureORH
)
self.Schedule.On(
self.DateRules.EveryDay("SPY"),
self.TimeRules.At(15, 59),
self.EOD
)
self.Schedule.On(
self.DateRules.EveryDay("SPY"),
self.TimeRules.At(0, 1),
self.Reset
)
# Universe
self.AddUniverse(self.Coarse)
self.SetWarmUp(timedelta(days=60))
def Coarse(self, coarse):
# Only run once per day at midnight
if self.IsWarmingUp:
return Universe.Unchanged
# Check if we already screened today
if self.last_screen == self.Time.date():
return Universe.Unchanged
# Only run at midnight (when universe selection naturally runs)
if self.Time.hour != 0:
return Universe.Unchanged
# Only run on trading days - check if market is open
if not self.Securities["SPY"].Exchange.DateIsOpen(self.Time.date()):
return Universe.Unchanged
# Filter for liquid stocks
filtered = []
for s in coarse:
if s.HasFundamentalData and s.Price > 5 and s.DollarVolume > 10_000_000:
filtered.append({'sym': s.Symbol, 'vol': s.DollarVolume})
# Sort by volume and take top candidates
filtered.sort(key=lambda x: x['vol'], reverse=True)
syms = [x['sym'] for x in filtered[:1000]] # Get more candidates for screening
# Apply screening immediately
self.ApplyScreen(syms)
# Return a subset for subscription
return [x['sym'] for x in filtered[:200]]
def ApplyScreen(self, syms):
# Print screening start
self.Debug(f"\nScreening {len(syms)} stocks for {self.Time.date()}...")
results = []
# Track filter statistics like Polygon
filter_stats = {
'insufficient_data': 0,
'volume': 0,
'aptr': 0,
'bmu': 0,
'bmu_high': 0,
'sma': 0,
'sma_ext': 0,
'passed': 0
}
# BATCH HISTORY REQUEST - Much faster!
history_data = self.History(syms, 75, Resolution.Daily)
for sym in syms:
try:
# Extract symbol's data from batch result
if sym in history_data.index.levels[0]:
h = history_data.loc[sym]
else:
filter_stats['insufficient_data'] += 1
continue
if h.empty or len(h) < self.BMU_DAYS + 1:
filter_stats['insufficient_data'] += 1
continue
# Check criteria with stats tracking
r = self.CheckCriteria(sym, h, filter_stats)
if r:
results.append(r)
filter_stats['passed'] += 1
except Exception as e:
self.Debug(f"Error screening {sym}: {e}")
pass
# Sort and select
results.sort(key=lambda x: x['vol'], reverse=True)
self.watchlist = results[:self.TOP_N]
# Update universe
self.UpdateSymbols()
self.last_screen = self.Time.date()
# Print filter breakdown like Polygon
self.Debug(f"Filter breakdown - Volume: {filter_stats['volume']}, APTR: {filter_stats['aptr']}, BMU: {filter_stats['bmu']}, BMU_High: {filter_stats['bmu_high']}, SMA: {filter_stats['sma']}, SMA_Ext: {filter_stats['sma_ext']}, Passed: {filter_stats['passed']}")
if self.watchlist:
# Print date and watchlist
self.Debug(f"")
self.Debug(f"{'='*60}")
self.Debug(f"WATCHLIST FOR {self.Time.date()}")
self.Debug(f"{'='*60}")
self.Debug(f"Total Qualified: {len(self.watchlist)} stocks")
self.Debug(f"Top 10: {', '.join([w['ticker'] for w in self.watchlist[:10]])}")
# Show first 5 with details
for i, w in enumerate(self.watchlist[:5], 1):
self.Debug(f" {i}. {w['ticker']}: Vol=${w['vol']/1e6:.0f}M, APTR={w['aptr']:.1f}%")
else:
self.Debug(f"NO STOCKS QUALIFIED FOR {self.Time.date()}")
def CheckCriteria(self, sym, h, stats=None):
# Volume
if len(h) < 2:
return None
# Handle QuantConnect's DataFrame structure - flatten if needed
if hasattr(h, 'levels'): # Multi-index DataFrame
h = h.reset_index(level=0, drop=True)
# Normalize column names to lowercase
h.columns = [c.lower() for c in h.columns]
# Ensure required columns exist
if 'close' not in h.columns or 'volume' not in h.columns:
self.Debug(f"Missing columns for {sym}: {list(h.columns)}")
return None
# Volume filter - calculate average dollar volume over previous VOL_DAYS
# Skip the current day (index 0/last) and use the previous VOL_DAYS
if len(h) <= self.VOL_DAYS:
return None # Not enough data for volume calculation
vol_dollars_sum = 0
valid_vol_days = 0
# Start from -2 (previous day) and go back VOL_DAYS
for i in range(2, min(self.VOL_DAYS + 2, len(h) + 1)):
idx = -i # -2, -3, etc.
day_data = h.iloc[idx]
vol_dollars_sum += day_data['close'] * day_data['volume']
valid_vol_days += 1
if valid_vol_days == 0:
if stats: stats['volume'] += 1
return None
avg_vol_dollars = vol_dollars_sum / valid_vol_days
if avg_vol_dollars < self.MIN_VOL1:
if stats: stats['volume'] += 1
return None
curr = h.iloc[-1]
price = curr['close']
# APTR
aptr = self.CalcAPTR(h, 14)
if aptr < self.MIN_APTR14:
if stats: stats['aptr'] += 1
return None
# BMU - EXACTLY like Polygon
# Look back BMU_DAYS (15) from yesterday, not including today
if len(h) >= self.BMU_DAYS + 1:
# Get the last 16 days (including today), then exclude today
closes_with_today = h['close'].iloc[-(self.BMU_DAYS + 1):].values
highs_with_today = h['high'].iloc[-(self.BMU_DAYS + 1):].values
# Exclude today (last element) to match Polygon logic
past_closes = closes_with_today[:-1] # Last 15 days, not including today
past_highs = highs_with_today[:-1] # Last 15 days, not including today
# Find minimum close in the past 15 days
past_close_low = np.min(past_closes)
# BMU = (current_price - past_low) / past_low
bmu = (price - past_close_low) / past_close_low if past_close_low > 0 else 0
if bmu < self.MIN_BMU_PCT:
if stats: stats['bmu'] += 1
return None
# Price must not be at 15-day high
past_high = np.max(past_highs)
if price > past_high:
if stats: stats['bmu_high'] += 1
return None
# SMA10
if len(h) >= 10:
sma10 = h['close'].iloc[-10:].mean()
if price < sma10:
if stats: stats['sma'] += 1
return None
# SMA50 ext
if len(h) >= 50:
sma50 = h['close'].iloc[-50:].mean()
atr_d = (aptr / 100) * price
ext = (price - sma50) / atr_d if atr_d > 0 else 0
if ext > self.MAX_50_SMA_EXTENSION:
if stats: stats['sma_ext'] += 1
return None
# Get previous day's high for PHOD
prev = h.iloc[-2]
return {
'sym': sym,
'ticker': sym.Value,
'vol': int(avg_vol_dollars),
'aptr': aptr,
'phod': prev['high']
}
def CalcAPTR(self, h, p):
if len(h) < 2:
return 0
# Handle QuantConnect's DataFrame structure
if hasattr(h, 'levels'): # Multi-index DataFrame
h = h.reset_index(level=0, drop=True)
# Normalize column names to lowercase
h.columns = [c.lower() for c in h.columns]
# Ensure required columns exist
if 'close' not in h.columns or 'high' not in h.columns or 'low' not in h.columns:
return 0
trs = []
days = min(p, len(h) - 1)
for i in range(days):
idx = -(i + 1)
t = h.iloc[idx]
y = h.iloc[idx - 1]
tr = max(
t['high'] - t['low'],
abs(t['high'] - y['close']),
abs(t['low'] - y['close'])
)
trs.append(tr)
if not trs:
return 0
atr = sum(trs) / len(trs)
price = h.iloc[-1]['close']
return (atr / price) * 100 if price > 0 else 0
def UpdateSymbols(self):
tickers = [w['ticker'] for w in self.watchlist[:20]]
# Remove old
for s in list(self.symbols.keys()):
if self.symbols[s]['t'] not in tickers:
if self.Portfolio[s].Invested:
self.Liquidate(s)
del self.symbols[s]
# Add new
for w in self.watchlist[:20]:
t = w['ticker']
if t not in [s['t'] for s in self.symbols.values()]:
try:
sym = self.AddEquity(t, Resolution.Minute).Symbol
self.symbols[sym] = {
't': t,
'ema6': self.EMA(sym, self.FAST_EMA_PERIOD, Resolution.Minute),
'ema20': self.EMA(sym, self.SLOW_EMA_PERIOD, Resolution.Minute),
'ema_exit': self.EMA(sym, self.SELL_EMA_PERIOD, Resolution.Minute),
'pmh': 0,
'orh': 0,
'phod': w.get('phod', 0),
'macd': False
}
except:
pass
def OnData(self, data):
if self.IsWarmingUp:
return
t = self.Time.time()
if t < time(9, 30) or t >= time(15, 59):
return
if t >= time(15, 0):
# Exit only
for s, d in self.symbols.items():
if self.Portfolio[s].Invested and data.ContainsKey(s):
self.CheckExit(s, data[s].Close)
return
# Trade
for s, d in self.symbols.items():
if not data.ContainsKey(s):
continue
p = data[s].Close
# Update MACD
if d['ema6'].IsReady and d['ema20'].IsReady:
d['macd'] = d['ema6'].Current.Value > d['ema20'].Current.Value
# Entry
if not self.Portfolio[s].Invested and s not in self.triggered:
if len(self.trades) < self.MAX_POSITIONS:
if self.CheckEntry(s, p):
self.Buy(s, p)
# Exit
elif self.Portfolio[s].Invested:
self.CheckExit(s, p)
def CheckEntry(self, s, p):
d = self.symbols[s]
# Must have bullish MACD
if not d['macd']:
return False
# Check PHOD requirement
if self.PHOD_REQUIRED and d['phod'] > 0 and p <= d['phod']:
return False
# Check ORH requirement
if self.ORH_REQUIRED and d['orh'] > 0 and p <= d['orh']:
return False
# Check PMH requirement
if self.PMH_REQUIRED and d['pmh'] > 0 and p <= d['pmh']:
return False
return True
def Buy(self, s, p):
shares = int((self.Portfolio.TotalPortfolioValue * self.BUY_PERCENT) / p)
if shares <= 0:
return
self.MarketOrder(s, shares)
self.StopMarketOrder(s, -shares, p * (1 - self.STOP_PCT))
self.trades.append(s)
self.triggered.add(s)
self.Log(f"BUY {self.symbols[s]['t']} @ ${p:.2f}")
def CheckExit(self, s, p):
d = self.symbols[s]
if d['ema_exit'].IsReady and p < d['ema_exit'].Current.Value:
self.Liquidate(s)
self.Log(f"EXIT {d['t']} - EMA")
elif not d['macd']:
self.Liquidate(s)
self.Log(f"EXIT {d['t']} - MACD")
def CaptureORH(self):
if self.IsWarmingUp:
return
self.Debug(f"\n=== Capturing ORH/PMH levels at {self.Time} ===")
if not self.symbols:
return
# BATCH HISTORY REQUESTS - Much faster!
symbols_list = list(self.symbols.keys())
try:
# Get pre-market data for all symbols at once
pm_data = self.History(symbols_list, 330, Resolution.Minute)
# Get opening range data for all symbols at once
orh_data = self.History(symbols_list, 5, Resolution.Minute)
for s, d in self.symbols.items():
try:
# Process PMH
if not pm_data.empty and s in pm_data.index.levels[0]:
pm = pm_data.loc[s]
# Normalize column names
pm.columns = [c.lower() for c in pm.columns]
if 'high' in pm.columns:
d['pmh'] = pm['high'].max()
# Process ORH
if not orh_data.empty and s in orh_data.index.levels[0]:
orh = orh_data.loc[s]
# Normalize column names
orh.columns = [c.lower() for c in orh.columns]
if 'high' in orh.columns:
d['orh'] = orh['high'].max()
except:
pass
except:
# Fallback to individual requests if batch fails
for s, d in self.symbols.items():
try:
pm = self.History(s, 330, Resolution.Minute)
if not pm.empty:
if hasattr(pm, 'levels'):
pm = pm.reset_index(level=0, drop=True)
pm.columns = [c.lower() for c in pm.columns]
if 'high' in pm.columns:
d['pmh'] = pm['high'].max()
orh = self.History(s, 5, Resolution.Minute)
if not orh.empty:
if hasattr(orh, 'levels'):
orh = orh.reset_index(level=0, drop=True)
orh.columns = [c.lower() for c in orh.columns]
if 'high' in orh.columns:
d['orh'] = orh['high'].max()
except:
pass
def EOD(self):
# Force exit all positions if enabled
if self.EOD_SELL_OPTION:
for s in self.Portfolio.Keys:
if self.Portfolio[s].Invested:
self.Liquidate(s)
if s in self.symbols:
self.Log(f"EOD EXIT {self.symbols[s]['t']}")
def Reset(self):
self.trades = []
self.triggered = set()