Overall Statistics
Total Orders
8036
Average Win
0.04%
Average Loss
-0.03%
Compounding Annual Return
5.770%
Drawdown
3.100%
Expectancy
0.037
Start Equity
1000000
End Equity
1052838.39
Net Profit
5.284%
Sharpe Ratio
0.489
Sortino Ratio
0.612
Probabilistic Sharpe Ratio
41.070%
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
1.04
Alpha
0.018
Beta
-0.069
Annual Standard Deviation
0.05
Annual Variance
0.002
Information Ratio
0.55
Tracking Error
0.224
Treynor Ratio
-0.355
Total Fees
$0.00
Estimated Strategy Capacity
$290000000.00
Lowest Capacity Asset
RGBK R735QTJ8XC9X
Portfolio Turnover
32.34%
Drawdown Recovery
71
# region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
import math
import heapq
from typing import Union
# endregion

class DelistingAwareTopDollarVolumeUniverse(QCAlgorithm):
   
    def initialize(self) -> None:
        # Basic algorithm setup
        self.set_start_date(2022, 1, 1)
        self.set_end_date(2022,12,1)
        self.set_cash(1000000)

        self.truncate = 0.05
        self._ipo_days = 180
        
        # Kakushadze framework parameters
        self._lookback_period = 60  # Period for covariance estimation
        self._mean_reversion_window = 20  # Moving average window for mean reversion
        self._lambda_reg = 0.1  # Regularization parameter (ridge penalty)
        self._transaction_cost = 0.001  # Transaction cost per dollar traded
        self._target_positions = 40  # Number of positions to hold
        
        # Store previous weights for transaction cost calculation
        self._previous_weights: Dict[Symbol, float] = {}
        # Use daily data in universe selections
        self.universe_settings.resolution = Resolution.DAILY
        self.universe_settings.fill_forward = True

        # Tracking fields per requirements
        self.target_count: int = 500
        self.coarse_count: int = 1000
        self.universe_size: int = 200
        self.coarse_dv: dict = {}
        self.blacklist: set = set()
        self.selected: list = []
        self.last_selection_date = None
        self.industry_set: set = set()

        self.set_security_initializer(self.debug_cost_model)

        # Add the coarse + fine universe
        self.add_universe(self.coarse_selection_function, self.fine_selection_function)

        self._universe_symbols: Set[Symbol] = set()
        self.final_universe: Set[Symbol] = set()
        self.symbol_to_industry: Dict[Symbol, int] = {}

        self.anchor = Symbol.create("SPY",SecurityType.EQUITY,Market.USA)

        # Schedule volatility filter before entering positions
        self.schedule.on(self.date_rules.every_day(self.anchor),
        self.time_rules.before_market_open(self.anchor,10),
        self.update_volatility_filter)
        
        self.schedule.on(self.date_rules.every_day(self.anchor),
        self.time_rules.before_market_open(self.anchor,5),
        self.enter_position)
        
        #self.set_warmup(22,Resolution.DAILY)

    def debug_cost_model(self, security: Security) -> None:
        security.set_fee_model(ConstantFeeModel(0))
        security.set_slippage_model(ConstantSlippageModel(0))

    def coarse_selection_function(self, coarse: List[CoarseFundamental]) -> List[Symbol] | Universe.UNCHANGED:
        """
        Coarse stage: keep top N by dollar volume after basic liquidity and data checks.
        Also buffer by storing same-day coarse dollar volume for reuse during Fine ranking.
        """
        # Reset the coarse DV buffer each selection to avoid stale values
        self.coarse_dv = {}

        '''if self.time.weekday() != 0:
            return Universe.UNCHANGED'''

        if coarse is None:
            return []

        # Materialize iterator to a list for multiple passes
        coarse_list = [c for c in coarse if c is not None]
        if len(coarse_list) == 0:
            return []

        # Filter for reasonable and tradable entries, and avoid symbols already blacklisted
        filtered = [
            c for c in coarse_list
            if c.has_fundamental_data
            and c.price is not None and c.price > 5
            and c.volume is not None and c.volume > 10000
            and c.symbol not in self.blacklist
        ]

        # Sort by dollar volume descending and take the top coarse_count
        top = heapq.nlargest(
            self.coarse_count,
            filtered,
            key=lambda x: float(x.dollar_volume) if x.dollar_volume is not None else 0.0
        )

        # Store the same-day coarse dollar volume for Fine ranking
        for c in top:
            dv = float(c.dollar_volume) if c.dollar_volume is not None else 0.0
            self.coarse_dv[c.symbol] = dv

        return [c.symbol for c in top]

    def fine_selection_function(self, fine: List[FineFundamental]) -> List[Symbol]:
        """
        Fine stage: exclude any symbol with a concrete delisting date set.
        Keep only entries where SecurityReference exists and DelistingDate is None or default (year <= 1).
        Re-rank by the stored same-day coarse dollar volume and return top target_count.
        """
        cutoff = self.time - timedelta(days=self._ipo_days)

        def seasoned(f):
            # Robust IPO date check
            sr = f.security_reference
            if sr is None:
                return False
            ipo = sr.ipo_date
            if ipo is None:
                return False
            try:
                if ipo.year < 1900:
                    return False
            except Exception:
                return False
            return ipo <= cutoff

        if fine is None:
            self.selected = []
            self.last_selection_date = self.time.date()
            return []

        fine_list = [f for f in fine if f is not None]
        if len(fine_list) == 0:
            self.selected = []
            self.last_selection_date = self.time.date()
            return []

        def is_active_no_delisting(ff: FineFundamental) -> bool:
            # Defensive checks and exception-safe access to delisting metadata.
            try:
                sr = ff.security_reference
                if sr is None:
                    return False
                # DelistingDate could be None or default min value for active symbols.
                dd = sr.delisting_date
                if dd is None:
                    return True
                # In QC, default min date corresponds to 0001-01-01 (Year <= 1). Treat as active.
                year_attr = getattr(dd, 'Year', None)
                if year_attr is None:
                    # Fallback: try python datetime.year if available
                    year_attr = getattr(dd, 'year', None)
                if year_attr is None:
                    return False
                return int(year_attr) <= 1
            except Exception as e:
                # Be conservative: exclude on any error
                return False

        # Apply the delisting filter
        kept = [ff for ff in fine_list if is_active_no_delisting(ff) and seasoned(ff)]

        # Sort remaining by the coarse-stage dollar volume captured earlier
        kept_sorted = heapq.nlargest(
            self.target_count,
            kept,
            key=lambda f: float(self.coarse_dv.get(f.symbol, 0.0))
        )

        # Take top target_count symbols
        symbols = [f.symbol for f in kept_sorted[: self.target_count]]

        # Track and log selection for observability
        self.selected = symbols
        self.last_selection_date = self.time.date()

        return symbols

    def on_data(self, slice: Slice) -> None:
        """Handle delisting events with immediate risk-off and blacklisting."""
        if slice is None or slice.delistings is None:
            return

        for kvp in slice.delistings:
            symbol = kvp.key
            delisting = kvp.value
            if delisting is None:
                continue

            if delisting.type == DelistingType.WARNING:
                if self.portfolio is not None and self.portfolio[symbol] is not None and self.portfolio[symbol].invested:
                    self.liquidate(symbol, "Delisting warning")
                self.blacklist.add(symbol)
                self.remove_security(symbol)
                #self.log(f"Delisting WARNING for {symbol.value}. Liquidated if invested, removed and blacklisted.")

            elif delisting.type == DelistingType.DELISTED:
                self.blacklist.add(symbol)
                self.remove_security(symbol)
                #self.log(f"Delisted {symbol.value}. Removed and blacklisted.")

    def on_securities_changed(self, changes: SecurityChanges) -> None:
        """Guardrails to avoid holding or tracking non-tradable/delisted securities."""
        if changes is None:
            return

        for s in changes.added_securities:
            if s is None:
                continue
            # If security is not tradable or already flagged as delisted, remove and blacklist immediately.
            if (not s.is_tradable) or s.is_delisted:
                #self.log(f"Security {s.symbol.value} added but not tradable or already delisted. Removing and blacklisting.")
                self.blacklist.add(s.symbol)
                self.remove_security(s.symbol)

        for s in changes.removed_securities:
            if s is None:
                continue
            #self.log(f"Security removed from universe: {s.symbol.value}")

        # Add newly-included symbols
        for security in changes.added_securities:
            self._universe_symbols.add(security.symbol)

            f = security.fundamentals
            # Cache the industry code (fallback to -1 if missing)
            industry = f.asset_classification.morningstar_sector_code if f and f.asset_classification else -1
            self.symbol_to_industry[security.symbol] = industry
        
        # Remove symbols that left the universe
        for security in changes.removed_securities:
            if security.symbol in self._universe_symbols:
                self._universe_symbols.remove(security.symbol)
                self.liquidate(security.symbol)
                ind = self.symbol_to_industry[security.symbol]
                self.symbol_to_industry.pop(security.symbol, None)

        #self.log(f"Securities Selected: {len(self._universe_symbols)}")
    
    def update_volatility_filter(self):
        """Filter universe to top 200 least volatile stocks with std < 1.5%."""
        if len(self._universe_symbols) == 0:
            self.final_universe = set()
            return
        
        symbols = list(self._universe_symbols)
        
        # Get 22 days of daily data to calculate volatility
        history = self.history(symbols, 22, Resolution.DAILY)
        
        if history.empty or 'close' not in history.columns:
            # If no data available, keep existing final_universe
            return
        
        # Calculate returns and volatility for each symbol
        close_prices = history['close'].unstack(level=0)
        returns = close_prices.pct_change().dropna()
        
        volatility_data = []
        
        for symbol in symbols:
            if symbol in returns.columns:
                symbol_returns = returns[symbol].dropna()
                
                # Need at least 10 data points to calculate meaningful volatility
                if len(symbol_returns) >= 10:
                    std_dev = symbol_returns.std()
                    
                    # Only include if volatility < a certain percentage
                    if std_dev < 0.02:
                        volatility_data.append((symbol, std_dev))
        
        # Sort by volatility ascending (least volatile first) and take top 200
        volatility_data.sort(key=lambda x: x[1])
        selected_symbols = [symbol for symbol, _ in volatility_data[:200]]
        
        self.final_universe = set(selected_symbols)
        
        self.log(f"Volatility Filter: {len(self._universe_symbols)} -> {len(self.final_universe)} stocks (std < 1.5%)")

    def enter_position(self):
        """Kakushadze Mean-Reversion Optimization Framework."""
        symbols = list(self.final_universe)
        if len(symbols) == 0:
            return
            
        history = self.history(symbols, self._lookback_period + self._mean_reversion_window, Resolution.DAILY)
        
        if history.empty or 'close' not in history.columns:
            return

        close = history['close'].unstack(level=0)
        
        # Calculate returns for covariance estimation
        returns = close.pct_change().dropna()
        
        if len(returns) < self._lookback_period:
            return
        
        # Step 1: Compute mean-reversion signal (μ in the paper)
        # Signal = deviation from moving average, normalized by volatility
        current_prices = close.iloc[-1]
        moving_avg = close.rolling(window=self._mean_reversion_window).mean().iloc[-1]
        volatility = returns.rolling(window=self._mean_reversion_window).std().iloc[-1]
        
        # Mean reversion signal: negative of normalized deviation
        # When price > MA, signal is negative (expect reversion down)
        # When price < MA, signal is positive (expect reversion up)
        mu = -(current_prices - moving_avg) / (volatility * current_prices)
        mu = mu.fillna(0)
        
        # Step 2: Estimate covariance matrix (C in the paper)
        # Use recent returns for estimation
        recent_returns = returns.tail(self._lookback_period)
        cov_matrix = recent_returns.cov()
        
        # Handle NaN and ensure positive semi-definite
        cov_matrix = cov_matrix.fillna(0)
        
        # Step 3: Get current portfolio weights for transaction cost
        current_weights = pd.Series(0.0, index=mu.index)
        portfolio_value = self.portfolio.total_portfolio_value
        
        for symbol in mu.index:
            if self.portfolio[symbol].invested:
                current_weights[symbol] = self.portfolio[symbol].holdings_value / portfolio_value
        
        # Step 4: Solve optimization with regularization and transaction costs
        # Following Kakushadze: w = (C + λI + κ*sgn(w-w0))^(-1) * μ
        # Simplified approach: w = (C + λI)^(-1) * (μ - κ*sgn(w-w0))
        
        optimal_weights = self._solve_kakushadze_optimization(
            mu, cov_matrix, current_weights
        )
        
        # Step 5: Apply constraints and create portfolio targets
        optimal_weights = self._apply_portfolio_constraints(optimal_weights)
        
        # Update previous weights for next iteration
        self._previous_weights = optimal_weights.to_dict()
        
        # Step 6: Execute trades
        targets = []
        for symbol, weight in optimal_weights.items():
            if abs(weight) > 1e-4:  # Only trade if weight is meaningful
                lookup = self.get_last_known_price(self.securities[symbol])
                if lookup is not None:
                    targets.append(PortfolioTarget(symbol, weight))
        
        if len(targets) > 0:
            self.set_holdings(targets, liquidate_existing_holdings=True)
        
        # Clean up positions no longer in universe
        for symbol in list(self.portfolio.keys()):
            if self.portfolio[symbol].invested and symbol not in self._universe_symbols:
                self.liquidate(symbol)
    
    def _solve_kakushadze_optimization(
        self, 
        mu: pd.Series, 
        cov_matrix: pd.DataFrame,
        current_weights: pd.Series
    ) -> pd.Series:
        """Solve the regularized optimization problem from Kakushadze's paper.
        
        The optimization problem is:
        minimize: w' * C * w - μ' * w + λ * ||w||^2 + κ * ||w - w0||
        
        Where:
        - C is the covariance matrix
        - μ is the mean-reversion signal (expected returns)
        - λ is the regularization parameter (ridge penalty)
        - κ is the transaction cost parameter
        - w0 is the current portfolio weights
        """
        n = len(mu)
        
        # Create regularized covariance matrix: C_reg = C + λI
        regularized_cov = cov_matrix + self._lambda_reg * np.eye(n)
        
        # Adjust signal for transaction costs
        # Penalize deviation from current weights
        adjusted_mu = mu.copy()
        
        # Add transaction cost penalty to the signal
        # Reduce signal for positions we'd need to increase, increase for ones we'd decrease
        weight_diff = current_weights
        tc_adjustment = self._transaction_cost * np.sign(weight_diff)
        adjusted_mu = adjusted_mu - tc_adjustment
        
        # Solve: w = C_reg^(-1) * μ_adjusted
        try:
            # Align indices
            aligned_cov = regularized_cov.loc[mu.index, mu.index]
            
            # Solve the system
            optimal_weights = np.linalg.solve(aligned_cov.values, adjusted_mu.values)
            optimal_weights = pd.Series(optimal_weights, index=mu.index)
            
        except np.linalg.LinAlgError:
            # If singular, use pseudo-inverse
            aligned_cov = regularized_cov.loc[mu.index, mu.index]
            optimal_weights = np.linalg.lstsq(aligned_cov.values, adjusted_mu.values, rcond=None)[0]
            optimal_weights = pd.Series(optimal_weights, index=mu.index)
        
        return optimal_weights
    
    def _apply_portfolio_constraints(self, weights: pd.Series) -> pd.Series:
        """Apply portfolio constraints: leverage, position limits, and concentration."""
        
        # Industry neutralization
        industries = pd.Series({s: self.symbol_to_industry.get(s, -1) for s in weights.index})
        neutralized = weights.groupby(industries).transform(lambda x: x - x.mean())
        
        # Normalize to target gross leverage (e.g., 0.9)
        gross_leverage = neutralized.abs().sum()
        if gross_leverage > 0:
            neutralized = neutralized / gross_leverage * 0.90
        
        # Apply position limits
        neutralized = neutralized.clip(-self.truncate, self.truncate)
        
        # Keep only top N long and short positions
        long_weights = neutralized[neutralized > 0].nlargest(self._target_positions // 2)
        short_weights = neutralized[neutralized < 0].nsmallest(self._target_positions // 2)
        
        # Zero out other positions
        final_weights = pd.Series(0.0, index=weights.index)
        final_weights[long_weights.index] = short_weights
        final_weights[short_weights.index] = long_weights
        
        # Final normalization
        total = final_weights.abs().sum()
        if total > 0:
            final_weights = final_weights / total * 0.90
        
        return final_weights