# region imports
from AlgorithmImports import *
from datetime import timedelta
from typing import List, Dict
import numpy as np
# endregion
class ReleaseRecord:
def __init__(self, release_date, eps, ws_eps, ez_eps):
self.release_date = release_date
self.eps = eps
self.ws_eps = ws_eps
self.ez_eps = ez_eps
class EpsHistory:
def __init__(self):
self.records = []
def add(self, record):
if record.release_date is None:
return
self.records.append(record)
self.records.sort(key=lambda x: x.release_date)
def get_recent(self):
if not self.records:
return None
return self.records[-1]
def get_surprises_ws(self, lookback):
surprises = []
for r in self.records[-lookback:]:
if r.eps is not None and r.ws_eps is not None:
surprises.append(r.eps - r.ws_eps)
return surprises
def get_surprises_ez(self, lookback):
surprises = []
for r in self.records[-lookback:]:
if r.eps is not None and r.ez_eps is not None:
surprises.append(r.eps - r.ez_eps)
return surprises
class PositionInfo:
def __init__(self, entry_date, direction):
self.entry_date = entry_date
self.direction = direction
self.days_held = 0
class SymbolData:
def __init__(self):
self.eps_history = EpsHistory()
self.estimize_sym = None
self.pending_release = None
self.position = None
class EstimizeAugmentedPEAD(QCAlgorithm):
def initialize(self):
self.set_start_date(2025, 1, 1)
self.set_cash(100000)
self.settings.seed_initial_prices = True
self.settings.rebalance_portfolio_on_security_changes = False
self.settings.free_portfolio_value_percentage = 0.05
self.add_equity("SPY", Resolution.DAILY)
self._universe = self.add_universe(self._select_fine)
self._symbol_data: Dict[Symbol, SymbolData] = {}
self._active_positions: Dict[Symbol, PositionInfo] = {}
self._lookback_quarters = 8
self._max_hold_days = 60
self._gross_leverage_cap = 0.30
self._gap_sigma_threshold = 4.0
self._volatility_lookback = 20
self._min_surprises = 2
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.at(9, 31),
self._rebalance
)
def _select_fine(self, fundamentals: List[Fundamental]) -> List[Symbol]:
filtered = [
f for f in fundamentals
if f.price > 5.0 and f.dollar_volume > 5_000_000
]
sorted_f = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)
return [f.symbol for f in sorted_f[:200]]
def on_securities_changed(self, changes):
for security in changes.added_securities:
symbol = security.symbol
sd = SymbolData()
sd.estimize_sym = self.add_data(EstimizeRelease, symbol)
self._symbol_data[symbol] = sd
# Warm-up historical Estimize releases so the stock can trade immediately
hist = self.history(EstimizeRelease, symbol, timedelta(days=5 * 365), Resolution.DAILY)
if hist is not None and len(hist) > 0:
for item in hist.itertuples():
eps = getattr(item, "eps", None)
ws = getattr(item, "wall_street_eps_estimate", None)
ez = getattr(item, "consensus_eps_estimate", None)
r_date = getattr(item, "release_date", None)
if eps is not None:
sd.eps_history.add(ReleaseRecord(r_date, eps, ws, ez))
for security in changes.removed_securities:
symbol = security.symbol
if symbol in self._symbol_data:
sd = self._symbol_data[symbol]
if sd.position is not None:
self._active_positions[symbol] = sd.position
if sd.estimize_sym is not None:
self.remove_security(sd.estimize_sym)
del self._symbol_data[symbol]
def on_data(self, data: Slice):
for symbol, sd in self._symbol_data.items():
if sd.estimize_sym is None:
continue
if data.contains_key(sd.estimize_sym):
release = data[sd.estimize_sym]
if release is not None and release.eps is not None:
sd.eps_history.add(ReleaseRecord(
release.release_date,
release.eps,
release.wall_street_eps_estimate,
release.consensus_eps_estimate
))
sd.pending_release = release
def _rebalance(self):
today = self.time.date()
exits = []
for symbol, pos in list(self._active_positions.items()):
pos.days_held += 1
if pos.days_held >= self._max_hold_days:
self.liquidate(symbol)
exits.append(symbol)
for symbol in exits:
del self._active_positions[symbol]
if symbol in self._symbol_data:
self._symbol_data[symbol].position = None
candidates = []
for symbol, sd in list(self._symbol_data.items()):
if sd.pending_release is None:
continue
release = sd.pending_release
r_date = release.release_date
if isinstance(r_date, datetime):
r_date = r_date.date()
if r_date is None or r_date >= today:
continue
if sd.position is not None or symbol in self._active_positions:
continue
recent = sd.eps_history.get_recent()
if recent is None:
continue
ws_surprises = sd.eps_history.get_surprises_ws(self._lookback_quarters)
ez_surprises = sd.eps_history.get_surprises_ez(self._lookback_quarters)
if len(ws_surprises) < self._min_surprises or len(ez_surprises) < self._min_surprises:
continue
sigma_ws = np.std(ws_surprises)
sigma_ez = np.std(ez_surprises)
if sigma_ws == 0 or sigma_ez == 0:
continue
sue_ws = (recent.eps - recent.ws_eps) / sigma_ws
sue_ez = (recent.eps - recent.ez_eps) / sigma_ez
if recent.ws_eps is not None and abs(recent.ws_eps) > 1e-6 and recent.ez_eps is not None:
delta = (recent.ez_eps - recent.ws_eps) / abs(recent.ws_eps)
else:
delta = 0.0
composite = (sue_ws + sue_ez + np.sign(sue_ws) * abs(delta)) / 3.0
if not self._pass_gap_check(symbol):
continue
candidates.append((symbol, composite))
sd.pending_release = None
if not candidates:
return
candidates.sort(key=lambda x: x[1])
n = len(candidates)
n_decile = max(1, n // 10)
longs = candidates[-n_decile:]
shorts = candidates[:n_decile]
self._enter_positions(longs, 1)
self._enter_positions(shorts, -1)
def _pass_gap_check(self, symbol):
security = self.securities.get(symbol)
if security is None or security.close == 0 or security.open == 0:
return True
gap = abs(security.open - security.close) / security.close
history = self.history(symbol, self._volatility_lookback + 1, Resolution.DAILY)
if history is None or len(history) < 3:
return True
closes = []
if hasattr(history, "close"):
closes = list(history["close"].dropna())
else:
closes = [bar.close for bar in history if hasattr(bar, "close")]
if len(closes) < 3:
return True
returns = []
for i in range(1, len(closes) - 1):
if closes[i - 1] != 0:
returns.append(abs((closes[i] - closes[i - 1]) / closes[i - 1]))
if len(returns) < 2:
return True
vol = np.std(returns)
if vol == 0:
return True
return gap <= self._gap_sigma_threshold * vol
def _current_gross_exposure(self, direction):
total = 0.0
for symbol, pos in self._active_positions.items():
if pos.direction != direction:
continue
holding = self.portfolio[symbol]
if holding is None:
continue
total += abs(holding.holdings_value) / self.portfolio.total_portfolio_value
return total
def _enter_positions(self, candidates, direction):
if not candidates:
return
current_exposure = self._current_gross_exposure(direction)
available = self._gross_leverage_cap - current_exposure
if available <= 0:
return
inv_vols = []
valid_symbols = []
for symbol, score in candidates:
vol = self._get_volatility(symbol)
if vol > 0:
inv_vols.append(1.0 / vol)
valid_symbols.append(symbol)
if not valid_symbols:
return
total_inv = sum(inv_vols)
if total_inv == 0:
return
raw_weights = []
for i in range(len(valid_symbols)):
raw_weights.append((inv_vols[i] / total_inv) * self._gross_leverage_cap * direction)
total_new = sum(abs(w) for w in raw_weights)
if total_new > available:
scale = available / total_new
raw_weights = [w * scale for w in raw_weights]
targets = []
for i, symbol in enumerate(valid_symbols):
targets.append(PortfolioTarget(symbol, raw_weights[i]))
pos = PositionInfo(self.time.date(), direction)
self._active_positions[symbol] = pos
if symbol in self._symbol_data:
self._symbol_data[symbol].position = pos
self.set_holdings(targets, liquidate_existing_holdings=False)
def _get_volatility(self, symbol):
history = self.history(symbol, self._volatility_lookback + 1, Resolution.DAILY)
if history is None or len(history) < 5:
return 0.5
closes = []
if hasattr(history, "close"):
closes = list(history["close"].dropna())
else:
closes = [bar.close for bar in history if hasattr(bar, "close")]
if len(closes) < 2:
return 0.5
returns = []
for i in range(1, len(closes)):
if closes[i - 1] > 0:
returns.append(np.log(closes[i] / closes[i - 1]))
if not returns:
return 0.5
return np.std(returns) * np.sqrt(252)