Overall Statistics
# region imports
from AlgorithmImports import *
from datetime import timedelta
from typing import List, Dict
import numpy as np
# endregion


class ReleaseRecord:
    def __init__(self, release_date, eps, ws_eps, ez_eps):
        self.release_date = release_date
        self.eps = eps
        self.ws_eps = ws_eps
        self.ez_eps = ez_eps


class EpsHistory:
    def __init__(self):
        self.records = []

    def add(self, record):
        if record.release_date is None:
            return
        self.records.append(record)
        self.records.sort(key=lambda x: x.release_date)

    def get_recent(self):
        if not self.records:
            return None
        return self.records[-1]

    def get_surprises_ws(self, lookback):
        surprises = []
        for r in self.records[-lookback:]:
            if r.eps is not None and r.ws_eps is not None:
                surprises.append(r.eps - r.ws_eps)
        return surprises

    def get_surprises_ez(self, lookback):
        surprises = []
        for r in self.records[-lookback:]:
            if r.eps is not None and r.ez_eps is not None:
                surprises.append(r.eps - r.ez_eps)
        return surprises


class PositionInfo:
    def __init__(self, entry_date, direction):
        self.entry_date = entry_date
        self.direction = direction
        self.days_held = 0


class SymbolData:
    def __init__(self):
        self.eps_history = EpsHistory()
        self.estimize_sym = None
        self.pending_release = None
        self.position = None


class EstimizeAugmentedPEAD(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2025, 1, 1)
        self.set_cash(100000)
        self.settings.seed_initial_prices = True
        self.settings.rebalance_portfolio_on_security_changes = False
        self.settings.free_portfolio_value_percentage = 0.05
        self.add_equity("SPY", Resolution.DAILY)

        self._universe = self.add_universe(self._select_fine)

        self._symbol_data: Dict[Symbol, SymbolData] = {}
        self._active_positions: Dict[Symbol, PositionInfo] = {}

        self._lookback_quarters = 8
        self._max_hold_days = 60
        self._gross_leverage_cap = 0.30
        self._gap_sigma_threshold = 4.0
        self._volatility_lookback = 20
        self._min_surprises = 2

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.at(9, 31),
            self._rebalance
        )

    def _select_fine(self, fundamentals: List[Fundamental]) -> List[Symbol]:
        filtered = [
            f for f in fundamentals
            if f.price > 5.0 and f.dollar_volume > 5_000_000
        ]
        sorted_f = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)
        return [f.symbol for f in sorted_f[:200]]

    def on_securities_changed(self, changes):
        for security in changes.added_securities:
            symbol = security.symbol
            sd = SymbolData()
            sd.estimize_sym = self.add_data(EstimizeRelease, symbol)
            self._symbol_data[symbol] = sd

            # Warm-up historical Estimize releases so the stock can trade immediately
            hist = self.history(EstimizeRelease, symbol, timedelta(days=5 * 365), Resolution.DAILY)
            if hist is not None and len(hist) > 0:
                for item in hist.itertuples():
                    eps = getattr(item, "eps", None)
                    ws = getattr(item, "wall_street_eps_estimate", None)
                    ez = getattr(item, "consensus_eps_estimate", None)
                    r_date = getattr(item, "release_date", None)
                    if eps is not None:
                        sd.eps_history.add(ReleaseRecord(r_date, eps, ws, ez))

        for security in changes.removed_securities:
            symbol = security.symbol
            if symbol in self._symbol_data:
                sd = self._symbol_data[symbol]
                if sd.position is not None:
                    self._active_positions[symbol] = sd.position
                if sd.estimize_sym is not None:
                    self.remove_security(sd.estimize_sym)
                del self._symbol_data[symbol]

    def on_data(self, data: Slice):
        for symbol, sd in self._symbol_data.items():
            if sd.estimize_sym is None:
                continue
            if data.contains_key(sd.estimize_sym):
                release = data[sd.estimize_sym]
                if release is not None and release.eps is not None:
                    sd.eps_history.add(ReleaseRecord(
                        release.release_date,
                        release.eps,
                        release.wall_street_eps_estimate,
                        release.consensus_eps_estimate
                    ))
                    sd.pending_release = release

    def _rebalance(self):
        today = self.time.date()

        exits = []
        for symbol, pos in list(self._active_positions.items()):
            pos.days_held += 1
            if pos.days_held >= self._max_hold_days:
                self.liquidate(symbol)
                exits.append(symbol)
        for symbol in exits:
            del self._active_positions[symbol]
            if symbol in self._symbol_data:
                self._symbol_data[symbol].position = None

        candidates = []
        for symbol, sd in list(self._symbol_data.items()):
            if sd.pending_release is None:
                continue

            release = sd.pending_release
            r_date = release.release_date
            if isinstance(r_date, datetime):
                r_date = r_date.date()
            if r_date is None or r_date >= today:
                continue

            if sd.position is not None or symbol in self._active_positions:
                continue

            recent = sd.eps_history.get_recent()
            if recent is None:
                continue

            ws_surprises = sd.eps_history.get_surprises_ws(self._lookback_quarters)
            ez_surprises = sd.eps_history.get_surprises_ez(self._lookback_quarters)

            if len(ws_surprises) < self._min_surprises or len(ez_surprises) < self._min_surprises:
                continue

            sigma_ws = np.std(ws_surprises)
            sigma_ez = np.std(ez_surprises)
            if sigma_ws == 0 or sigma_ez == 0:
                continue

            sue_ws = (recent.eps - recent.ws_eps) / sigma_ws
            sue_ez = (recent.eps - recent.ez_eps) / sigma_ez

            if recent.ws_eps is not None and abs(recent.ws_eps) > 1e-6 and recent.ez_eps is not None:
                delta = (recent.ez_eps - recent.ws_eps) / abs(recent.ws_eps)
            else:
                delta = 0.0

            composite = (sue_ws + sue_ez + np.sign(sue_ws) * abs(delta)) / 3.0

            if not self._pass_gap_check(symbol):
                continue

            candidates.append((symbol, composite))
            sd.pending_release = None

        if not candidates:
            return

        candidates.sort(key=lambda x: x[1])
        n = len(candidates)
        n_decile = max(1, n // 10)

        longs = candidates[-n_decile:]
        shorts = candidates[:n_decile]

        self._enter_positions(longs, 1)
        self._enter_positions(shorts, -1)

    def _pass_gap_check(self, symbol):
        security = self.securities.get(symbol)
        if security is None or security.close == 0 or security.open == 0:
            return True

        gap = abs(security.open - security.close) / security.close

        history = self.history(symbol, self._volatility_lookback + 1, Resolution.DAILY)
        if history is None or len(history) < 3:
            return True

        closes = []
        if hasattr(history, "close"):
            closes = list(history["close"].dropna())
        else:
            closes = [bar.close for bar in history if hasattr(bar, "close")]

        if len(closes) < 3:
            return True

        returns = []
        for i in range(1, len(closes) - 1):
            if closes[i - 1] != 0:
                returns.append(abs((closes[i] - closes[i - 1]) / closes[i - 1]))

        if len(returns) < 2:
            return True

        vol = np.std(returns)
        if vol == 0:
            return True

        return gap <= self._gap_sigma_threshold * vol

    def _current_gross_exposure(self, direction):
        total = 0.0
        for symbol, pos in self._active_positions.items():
            if pos.direction != direction:
                continue
            holding = self.portfolio[symbol]
            if holding is None:
                continue
            total += abs(holding.holdings_value) / self.portfolio.total_portfolio_value
        return total

    def _enter_positions(self, candidates, direction):
        if not candidates:
            return

        current_exposure = self._current_gross_exposure(direction)
        available = self._gross_leverage_cap - current_exposure
        if available <= 0:
            return

        inv_vols = []
        valid_symbols = []

        for symbol, score in candidates:
            vol = self._get_volatility(symbol)
            if vol > 0:
                inv_vols.append(1.0 / vol)
                valid_symbols.append(symbol)

        if not valid_symbols:
            return

        total_inv = sum(inv_vols)
        if total_inv == 0:
            return

        raw_weights = []
        for i in range(len(valid_symbols)):
            raw_weights.append((inv_vols[i] / total_inv) * self._gross_leverage_cap * direction)

        total_new = sum(abs(w) for w in raw_weights)
        if total_new > available:
            scale = available / total_new
            raw_weights = [w * scale for w in raw_weights]

        targets = []
        for i, symbol in enumerate(valid_symbols):
            targets.append(PortfolioTarget(symbol, raw_weights[i]))
            pos = PositionInfo(self.time.date(), direction)
            self._active_positions[symbol] = pos
            if symbol in self._symbol_data:
                self._symbol_data[symbol].position = pos

        self.set_holdings(targets, liquidate_existing_holdings=False)

    def _get_volatility(self, symbol):
        history = self.history(symbol, self._volatility_lookback + 1, Resolution.DAILY)
        if history is None or len(history) < 5:
            return 0.5

        closes = []
        if hasattr(history, "close"):
            closes = list(history["close"].dropna())
        else:
            closes = [bar.close for bar in history if hasattr(bar, "close")]

        if len(closes) < 2:
            return 0.5

        returns = []
        for i in range(1, len(closes)):
            if closes[i - 1] > 0:
                returns.append(np.log(closes[i] / closes[i - 1]))

        if not returns:
            return 0.5

        return np.std(returns) * np.sqrt(252)