| Overall Statistics |
|
Total Orders 8036 Average Win 0.04% Average Loss -0.03% Compounding Annual Return 5.770% Drawdown 3.100% Expectancy 0.037 Start Equity 1000000 End Equity 1052838.39 Net Profit 5.284% Sharpe Ratio 0.489 Sortino Ratio 0.612 Probabilistic Sharpe Ratio 41.070% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.04 Alpha 0.018 Beta -0.069 Annual Standard Deviation 0.05 Annual Variance 0.002 Information Ratio 0.55 Tracking Error 0.224 Treynor Ratio -0.355 Total Fees $0.00 Estimated Strategy Capacity $290000000.00 Lowest Capacity Asset RGBK R735QTJ8XC9X Portfolio Turnover 32.34% Drawdown Recovery 71 |
# region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
import math
import heapq
from typing import Union
# endregion
class DelistingAwareTopDollarVolumeUniverse(QCAlgorithm):
def initialize(self) -> None:
# Basic algorithm setup
self.set_start_date(2022, 1, 1)
self.set_end_date(2022,12,1)
self.set_cash(1000000)
self.truncate = 0.05
self._ipo_days = 180
# Kakushadze framework parameters
self._lookback_period = 60 # Period for covariance estimation
self._mean_reversion_window = 20 # Moving average window for mean reversion
self._lambda_reg = 0.1 # Regularization parameter (ridge penalty)
self._transaction_cost = 0.001 # Transaction cost per dollar traded
self._target_positions = 40 # Number of positions to hold
# Store previous weights for transaction cost calculation
self._previous_weights: Dict[Symbol, float] = {}
# Use daily data in universe selections
self.universe_settings.resolution = Resolution.DAILY
self.universe_settings.fill_forward = True
# Tracking fields per requirements
self.target_count: int = 500
self.coarse_count: int = 1000
self.universe_size: int = 200
self.coarse_dv: dict = {}
self.blacklist: set = set()
self.selected: list = []
self.last_selection_date = None
self.industry_set: set = set()
self.set_security_initializer(self.debug_cost_model)
# Add the coarse + fine universe
self.add_universe(self.coarse_selection_function, self.fine_selection_function)
self._universe_symbols: Set[Symbol] = set()
self.final_universe: Set[Symbol] = set()
self.symbol_to_industry: Dict[Symbol, int] = {}
self.anchor = Symbol.create("SPY",SecurityType.EQUITY,Market.USA)
# Schedule volatility filter before entering positions
self.schedule.on(self.date_rules.every_day(self.anchor),
self.time_rules.before_market_open(self.anchor,10),
self.update_volatility_filter)
self.schedule.on(self.date_rules.every_day(self.anchor),
self.time_rules.before_market_open(self.anchor,5),
self.enter_position)
#self.set_warmup(22,Resolution.DAILY)
def debug_cost_model(self, security: Security) -> None:
security.set_fee_model(ConstantFeeModel(0))
security.set_slippage_model(ConstantSlippageModel(0))
def coarse_selection_function(self, coarse: List[CoarseFundamental]) -> List[Symbol] | Universe.UNCHANGED:
"""
Coarse stage: keep top N by dollar volume after basic liquidity and data checks.
Also buffer by storing same-day coarse dollar volume for reuse during Fine ranking.
"""
# Reset the coarse DV buffer each selection to avoid stale values
self.coarse_dv = {}
'''if self.time.weekday() != 0:
return Universe.UNCHANGED'''
if coarse is None:
return []
# Materialize iterator to a list for multiple passes
coarse_list = [c for c in coarse if c is not None]
if len(coarse_list) == 0:
return []
# Filter for reasonable and tradable entries, and avoid symbols already blacklisted
filtered = [
c for c in coarse_list
if c.has_fundamental_data
and c.price is not None and c.price > 5
and c.volume is not None and c.volume > 10000
and c.symbol not in self.blacklist
]
# Sort by dollar volume descending and take the top coarse_count
top = heapq.nlargest(
self.coarse_count,
filtered,
key=lambda x: float(x.dollar_volume) if x.dollar_volume is not None else 0.0
)
# Store the same-day coarse dollar volume for Fine ranking
for c in top:
dv = float(c.dollar_volume) if c.dollar_volume is not None else 0.0
self.coarse_dv[c.symbol] = dv
return [c.symbol for c in top]
def fine_selection_function(self, fine: List[FineFundamental]) -> List[Symbol]:
"""
Fine stage: exclude any symbol with a concrete delisting date set.
Keep only entries where SecurityReference exists and DelistingDate is None or default (year <= 1).
Re-rank by the stored same-day coarse dollar volume and return top target_count.
"""
cutoff = self.time - timedelta(days=self._ipo_days)
def seasoned(f):
# Robust IPO date check
sr = f.security_reference
if sr is None:
return False
ipo = sr.ipo_date
if ipo is None:
return False
try:
if ipo.year < 1900:
return False
except Exception:
return False
return ipo <= cutoff
if fine is None:
self.selected = []
self.last_selection_date = self.time.date()
return []
fine_list = [f for f in fine if f is not None]
if len(fine_list) == 0:
self.selected = []
self.last_selection_date = self.time.date()
return []
def is_active_no_delisting(ff: FineFundamental) -> bool:
# Defensive checks and exception-safe access to delisting metadata.
try:
sr = ff.security_reference
if sr is None:
return False
# DelistingDate could be None or default min value for active symbols.
dd = sr.delisting_date
if dd is None:
return True
# In QC, default min date corresponds to 0001-01-01 (Year <= 1). Treat as active.
year_attr = getattr(dd, 'Year', None)
if year_attr is None:
# Fallback: try python datetime.year if available
year_attr = getattr(dd, 'year', None)
if year_attr is None:
return False
return int(year_attr) <= 1
except Exception as e:
# Be conservative: exclude on any error
return False
# Apply the delisting filter
kept = [ff for ff in fine_list if is_active_no_delisting(ff) and seasoned(ff)]
# Sort remaining by the coarse-stage dollar volume captured earlier
kept_sorted = heapq.nlargest(
self.target_count,
kept,
key=lambda f: float(self.coarse_dv.get(f.symbol, 0.0))
)
# Take top target_count symbols
symbols = [f.symbol for f in kept_sorted[: self.target_count]]
# Track and log selection for observability
self.selected = symbols
self.last_selection_date = self.time.date()
return symbols
def on_data(self, slice: Slice) -> None:
"""Handle delisting events with immediate risk-off and blacklisting."""
if slice is None or slice.delistings is None:
return
for kvp in slice.delistings:
symbol = kvp.key
delisting = kvp.value
if delisting is None:
continue
if delisting.type == DelistingType.WARNING:
if self.portfolio is not None and self.portfolio[symbol] is not None and self.portfolio[symbol].invested:
self.liquidate(symbol, "Delisting warning")
self.blacklist.add(symbol)
self.remove_security(symbol)
#self.log(f"Delisting WARNING for {symbol.value}. Liquidated if invested, removed and blacklisted.")
elif delisting.type == DelistingType.DELISTED:
self.blacklist.add(symbol)
self.remove_security(symbol)
#self.log(f"Delisted {symbol.value}. Removed and blacklisted.")
def on_securities_changed(self, changes: SecurityChanges) -> None:
"""Guardrails to avoid holding or tracking non-tradable/delisted securities."""
if changes is None:
return
for s in changes.added_securities:
if s is None:
continue
# If security is not tradable or already flagged as delisted, remove and blacklist immediately.
if (not s.is_tradable) or s.is_delisted:
#self.log(f"Security {s.symbol.value} added but not tradable or already delisted. Removing and blacklisting.")
self.blacklist.add(s.symbol)
self.remove_security(s.symbol)
for s in changes.removed_securities:
if s is None:
continue
#self.log(f"Security removed from universe: {s.symbol.value}")
# Add newly-included symbols
for security in changes.added_securities:
self._universe_symbols.add(security.symbol)
f = security.fundamentals
# Cache the industry code (fallback to -1 if missing)
industry = f.asset_classification.morningstar_sector_code if f and f.asset_classification else -1
self.symbol_to_industry[security.symbol] = industry
# Remove symbols that left the universe
for security in changes.removed_securities:
if security.symbol in self._universe_symbols:
self._universe_symbols.remove(security.symbol)
self.liquidate(security.symbol)
ind = self.symbol_to_industry[security.symbol]
self.symbol_to_industry.pop(security.symbol, None)
#self.log(f"Securities Selected: {len(self._universe_symbols)}")
def update_volatility_filter(self):
"""Filter universe to top 200 least volatile stocks with std < 1.5%."""
if len(self._universe_symbols) == 0:
self.final_universe = set()
return
symbols = list(self._universe_symbols)
# Get 22 days of daily data to calculate volatility
history = self.history(symbols, 22, Resolution.DAILY)
if history.empty or 'close' not in history.columns:
# If no data available, keep existing final_universe
return
# Calculate returns and volatility for each symbol
close_prices = history['close'].unstack(level=0)
returns = close_prices.pct_change().dropna()
volatility_data = []
for symbol in symbols:
if symbol in returns.columns:
symbol_returns = returns[symbol].dropna()
# Need at least 10 data points to calculate meaningful volatility
if len(symbol_returns) >= 10:
std_dev = symbol_returns.std()
# Only include if volatility < a certain percentage
if std_dev < 0.02:
volatility_data.append((symbol, std_dev))
# Sort by volatility ascending (least volatile first) and take top 200
volatility_data.sort(key=lambda x: x[1])
selected_symbols = [symbol for symbol, _ in volatility_data[:200]]
self.final_universe = set(selected_symbols)
self.log(f"Volatility Filter: {len(self._universe_symbols)} -> {len(self.final_universe)} stocks (std < 1.5%)")
def enter_position(self):
"""Kakushadze Mean-Reversion Optimization Framework."""
symbols = list(self.final_universe)
if len(symbols) == 0:
return
history = self.history(symbols, self._lookback_period + self._mean_reversion_window, Resolution.DAILY)
if history.empty or 'close' not in history.columns:
return
close = history['close'].unstack(level=0)
# Calculate returns for covariance estimation
returns = close.pct_change().dropna()
if len(returns) < self._lookback_period:
return
# Step 1: Compute mean-reversion signal (μ in the paper)
# Signal = deviation from moving average, normalized by volatility
current_prices = close.iloc[-1]
moving_avg = close.rolling(window=self._mean_reversion_window).mean().iloc[-1]
volatility = returns.rolling(window=self._mean_reversion_window).std().iloc[-1]
# Mean reversion signal: negative of normalized deviation
# When price > MA, signal is negative (expect reversion down)
# When price < MA, signal is positive (expect reversion up)
mu = -(current_prices - moving_avg) / (volatility * current_prices)
mu = mu.fillna(0)
# Step 2: Estimate covariance matrix (C in the paper)
# Use recent returns for estimation
recent_returns = returns.tail(self._lookback_period)
cov_matrix = recent_returns.cov()
# Handle NaN and ensure positive semi-definite
cov_matrix = cov_matrix.fillna(0)
# Step 3: Get current portfolio weights for transaction cost
current_weights = pd.Series(0.0, index=mu.index)
portfolio_value = self.portfolio.total_portfolio_value
for symbol in mu.index:
if self.portfolio[symbol].invested:
current_weights[symbol] = self.portfolio[symbol].holdings_value / portfolio_value
# Step 4: Solve optimization with regularization and transaction costs
# Following Kakushadze: w = (C + λI + κ*sgn(w-w0))^(-1) * μ
# Simplified approach: w = (C + λI)^(-1) * (μ - κ*sgn(w-w0))
optimal_weights = self._solve_kakushadze_optimization(
mu, cov_matrix, current_weights
)
# Step 5: Apply constraints and create portfolio targets
optimal_weights = self._apply_portfolio_constraints(optimal_weights)
# Update previous weights for next iteration
self._previous_weights = optimal_weights.to_dict()
# Step 6: Execute trades
targets = []
for symbol, weight in optimal_weights.items():
if abs(weight) > 1e-4: # Only trade if weight is meaningful
lookup = self.get_last_known_price(self.securities[symbol])
if lookup is not None:
targets.append(PortfolioTarget(symbol, weight))
if len(targets) > 0:
self.set_holdings(targets, liquidate_existing_holdings=True)
# Clean up positions no longer in universe
for symbol in list(self.portfolio.keys()):
if self.portfolio[symbol].invested and symbol not in self._universe_symbols:
self.liquidate(symbol)
def _solve_kakushadze_optimization(
self,
mu: pd.Series,
cov_matrix: pd.DataFrame,
current_weights: pd.Series
) -> pd.Series:
"""Solve the regularized optimization problem from Kakushadze's paper.
The optimization problem is:
minimize: w' * C * w - μ' * w + λ * ||w||^2 + κ * ||w - w0||
Where:
- C is the covariance matrix
- μ is the mean-reversion signal (expected returns)
- λ is the regularization parameter (ridge penalty)
- κ is the transaction cost parameter
- w0 is the current portfolio weights
"""
n = len(mu)
# Create regularized covariance matrix: C_reg = C + λI
regularized_cov = cov_matrix + self._lambda_reg * np.eye(n)
# Adjust signal for transaction costs
# Penalize deviation from current weights
adjusted_mu = mu.copy()
# Add transaction cost penalty to the signal
# Reduce signal for positions we'd need to increase, increase for ones we'd decrease
weight_diff = current_weights
tc_adjustment = self._transaction_cost * np.sign(weight_diff)
adjusted_mu = adjusted_mu - tc_adjustment
# Solve: w = C_reg^(-1) * μ_adjusted
try:
# Align indices
aligned_cov = regularized_cov.loc[mu.index, mu.index]
# Solve the system
optimal_weights = np.linalg.solve(aligned_cov.values, adjusted_mu.values)
optimal_weights = pd.Series(optimal_weights, index=mu.index)
except np.linalg.LinAlgError:
# If singular, use pseudo-inverse
aligned_cov = regularized_cov.loc[mu.index, mu.index]
optimal_weights = np.linalg.lstsq(aligned_cov.values, adjusted_mu.values, rcond=None)[0]
optimal_weights = pd.Series(optimal_weights, index=mu.index)
return optimal_weights
def _apply_portfolio_constraints(self, weights: pd.Series) -> pd.Series:
"""Apply portfolio constraints: leverage, position limits, and concentration."""
# Industry neutralization
industries = pd.Series({s: self.symbol_to_industry.get(s, -1) for s in weights.index})
neutralized = weights.groupby(industries).transform(lambda x: x - x.mean())
# Normalize to target gross leverage (e.g., 0.9)
gross_leverage = neutralized.abs().sum()
if gross_leverage > 0:
neutralized = neutralized / gross_leverage * 0.90
# Apply position limits
neutralized = neutralized.clip(-self.truncate, self.truncate)
# Keep only top N long and short positions
long_weights = neutralized[neutralized > 0].nlargest(self._target_positions // 2)
short_weights = neutralized[neutralized < 0].nsmallest(self._target_positions // 2)
# Zero out other positions
final_weights = pd.Series(0.0, index=weights.index)
final_weights[long_weights.index] = short_weights
final_weights[short_weights.index] = long_weights
# Final normalization
total = final_weights.abs().sum()
if total > 0:
final_weights = final_weights / total * 0.90
return final_weights