| Overall Statistics |
|
Total Orders 259 Average Win 1.96% Average Loss -1.17% Compounding Annual Return 44.263% Drawdown 23.900% Expectancy 0.682 Start Equity 100000.00 End Equity 433913.64 Net Profit 333.914% Sharpe Ratio 1.465 Sortino Ratio 1.571 Probabilistic Sharpe Ratio 80.201% Loss Rate 37% Win Rate 63% Profit-Loss Ratio 1.67 Alpha 0.271 Beta 0.29 Annual Standard Deviation 0.201 Annual Variance 0.04 Information Ratio 0.91 Tracking Error 0.235 Treynor Ratio 1.015 Total Fees $670.67 Estimated Strategy Capacity $360000.00 Lowest Capacity Asset SOLUSDT 2XR Portfolio Turnover 3.29% Drawdown Recovery 550 |
"""
Advanced Features Module
Implementation of sophisticated market analysis features from research
"""
from AlgorithmImports import *
import numpy as np
# Phase 3: Removed pandas import - no longer needed since we eliminated History() calls
from params import (ENABLE_INTRABAR_PYRAMIDS, PYRAMID_MIN_MOVE, PYRAMID_COOLDOWN, PYRAMID_K1, PYRAMID_K2,
CRYPTO_VOL_GATE, CRYPTO_MIN_VOL, CRYPTO_VOL_LOOKBACK,
ENABLE_VOL_FALLBACKS, LOW_VOL_SIGNAL_THRESHOLD,
TREND_EMA_FAST, TREND_EMA_SLOW, TREND_SLOPE_THRESHOLD, CRYPTO_SLOPE_THRESHOLD, SHARPE_THRESHOLD,
SHARPE_LOOKBACK)
# Phase 3: Removed _ensure_datetime_index helper - no longer needed since we eliminated History() calls
def detect_intrabar_pyramid_opportunities(algorithm, symbol, indicator_bundle, current_direction, position_data):
"""
Detect pyramid opportunities within the current bar.
Enhanced pyramid detection that looks for price crossing thresholds during the bar.
Args:
algorithm: QCAlgorithm instance
symbol: Symbol to check
indicator_bundle: IndicatorBundle for the symbol
current_direction: 1 for long, -1 for short
position_data: Dictionary with position information including 'last_pyramid_time'
Returns:
dict: {'pyramid1': bool, 'pyramid2': bool}
"""
if not ENABLE_INTRABAR_PYRAMIDS:
return {'pyramid1': False, 'pyramid2': False}
try:
# Get current bar data
current_bar = algorithm.CurrentSlice[symbol]
if current_bar is None:
return {'pyramid1': False, 'pyramid2': False}
# Cooldown check - PYRAMID_COOLDOWN is in days for daily bars
current_time = algorithm.Time
last_pyramid_time = position_data.get('last_pyramid_time', None)
if last_pyramid_time is not None:
time_diff = current_time - last_pyramid_time
if time_diff.total_seconds() < PYRAMID_COOLDOWN * 24 * 60 * 60:
return {'pyramid1': False, 'pyramid2': False}
# Check if we've moved enough to trigger pyramid levels
if hasattr(current_bar, 'Open') and hasattr(current_bar, 'Close'):
if current_bar.Open != 0:
price_move_pct = abs((current_bar.Close - current_bar.Open) / current_bar.Open)
if price_move_pct < PYRAMID_MIN_MOVE:
return {'pyramid1': False, 'pyramid2': False}
# Get ATR and reference levels
values = indicator_bundle.get_values()
atr = values.get('atr', 0)
if atr <= 0:
return {'pyramid1': False, 'pyramid2': False}
if current_direction == 1: # Long positions
# Get lookback high
from logic import BreakoutLogic
logic = BreakoutLogic(algorithm)
lookback_high = logic._get_lookback_high(symbol)
if lookback_high <= 0:
return {'pyramid1': False, 'pyramid2': False}
# Calculate pyramid levels
pyramid1_level = lookback_high + PYRAMID_K1 * atr
pyramid2_level = lookback_high + PYRAMID_K2 * atr
# Conservative assumption: if close > level and low <= level, we crossed it during the bar
pyramid1_triggered = (current_bar.Close > pyramid1_level and
hasattr(current_bar, 'Low') and current_bar.Low <= pyramid1_level)
pyramid2_triggered = (current_bar.Close > pyramid2_level and
hasattr(current_bar, 'Low') and current_bar.Low <= pyramid2_level)
else: # Short positions (current_direction == -1)
# Get lookback low
from logic import BreakoutLogic
logic = BreakoutLogic(algorithm)
lookback_low = logic._get_lookback_low(symbol)
if lookback_low <= 0:
return {'pyramid1': False, 'pyramid2': False}
# Calculate pyramid levels (below lookback low)
pyramid1_level = lookback_low - PYRAMID_K1 * atr
pyramid2_level = lookback_low - PYRAMID_K2 * atr
# Conservative assumption: if close < level and high >= level, we crossed it during the bar
pyramid1_triggered = (current_bar.Close < pyramid1_level and
hasattr(current_bar, 'High') and current_bar.High >= pyramid1_level)
pyramid2_triggered = (current_bar.Close < pyramid2_level and
hasattr(current_bar, 'High') and current_bar.High >= pyramid2_level)
return {
'pyramid1': pyramid1_triggered,
'pyramid2': pyramid2_triggered
}
except Exception as e:
algorithm.Debug(f"Error in detect_intrabar_pyramid_opportunities for {symbol}: {e}")
return {'pyramid1': False, 'pyramid2': False}
def calculate_crypto_volatility_gate(algorithm, symbol):
"""
Check if crypto volatility is sufficient for OR-filter logic.
High volatility crypto gets OR logic (looser), low volatility gets AND logic (stricter).
Args:
algorithm: QCAlgorithm instance
symbol: Symbol to check
Returns:
bool: True if volatility is sufficient, False if should use stricter logic
"""
if not CRYPTO_VOL_GATE:
return True # Gating disabled
# Check if this is a crypto asset
symbol_str = str(symbol)
if not (symbol_str.endswith('USD') or symbol_str.endswith('USDT')):
return True # Not crypto
try:
# Phase 3: Use indicator bundle instead of History() call
if symbol not in algorithm.indicators:
algorithm.Debug(f"No indicator bundle for crypto volatility gate: {symbol}")
return True # Default to allow trading
indicator_bundle = algorithm.indicators[symbol]
# Calculate realized volatility using rolling returns
realized_vol = indicator_bundle.get_realized_volatility(CRYPTO_VOL_LOOKBACK)
if realized_vol <= 0:
algorithm.Debug(f"Insufficient data for crypto volatility gate: {symbol}")
return True # Default to allow trading
# Check if volatility meets threshold
vol_sufficient = realized_vol >= CRYPTO_MIN_VOL
algorithm.Debug(f"CRYPTO VOL GATE {symbol}: Realized Vol={realized_vol:.2%}, "
f"Threshold={CRYPTO_MIN_VOL:.2%}, Sufficient={vol_sufficient}")
return vol_sufficient
except Exception as e:
algorithm.Debug(f"Error in calculate_crypto_volatility_gate for {symbol}: {e}")
return True # Default to allow trading
def calculate_low_volatility_fallback(algorithm, symbol, year_signals):
"""
Check if we're in a low volatility period requiring fallback rules.
Relaxes entry conditions when signal count is below threshold.
Args:
algorithm: QCAlgorithm instance
symbol: Symbol being analyzed
year_signals: Number of signals generated this year
Returns:
bool: True if should use relaxed conditions
"""
if not ENABLE_VOL_FALLBACKS:
return False
# Check if we're in a low signal year
is_low_vol_year = year_signals < LOW_VOL_SIGNAL_THRESHOLD
if is_low_vol_year:
algorithm.Debug(f"LOW VOL FALLBACK {symbol}: Signals={year_signals}, "
f"Threshold={LOW_VOL_SIGNAL_THRESHOLD}, Using relaxed conditions")
return is_low_vol_year
def calculate_trend_based_regime_filter(algorithm, symbol, direction='long'):
"""
Calculate trend-based regime filter with asset-specific logic.
Asset-specific implementations:
- GLD: Uses Sharpe-based filter (trends weakly)
- MSFT/GOOGL: Uses weekly EMA filter (smoother than daily)
- Crypto: OR logic (20 EMA > 200 EMA OR slope > 0.4%) when volatility sufficient
- Others: AND logic (20 EMA > 200 EMA AND slope > 0.3%)
Args:
algorithm: QCAlgorithm instance
symbol: Symbol to analyze
direction: 'long', 'short', or 'both'
Returns:
bool or tuple: Regime filter result(s)
"""
try:
symbol_str = str(symbol)
# Asset-specific logic
if 'GLD' in symbol_str:
return _calculate_sharpe_based_filter(algorithm, symbol, direction)
elif any(asset in symbol_str for asset in ['MSFT', 'GOOGL']):
return _calculate_weekly_ema_filter(algorithm, symbol, direction)
else:
return _calculate_daily_trend_filter(algorithm, symbol, direction)
except Exception as e:
algorithm.Debug(f"Error in calculate_trend_based_regime_filter for {symbol}: {e}")
# Default to allow trading
if direction == 'both':
return True, True
return True
def _calculate_sharpe_based_filter(algorithm, symbol, direction='long'):
"""Sharpe-based filter for GLD (trends weakly)"""
try:
# Phase 3: Use indicator bundle instead of History() call
if symbol not in algorithm.indicators:
if direction == 'both':
return True, True
return True
indicator_bundle = algorithm.indicators[symbol]
# Calculate Sharpe ratio using rolling returns
sharpe = indicator_bundle.get_sharpe_ratio(SHARPE_LOOKBACK)
if sharpe == 0:
if direction == 'both':
return True, True
return True
# For GLD, use Sharpe > threshold for both directions (weak trending asset)
regime_ok = sharpe > SHARPE_THRESHOLD
algorithm.Debug(f"SHARPE FILTER {symbol}: Sharpe={sharpe:.2f}, "
f"Threshold={SHARPE_THRESHOLD:.2f}, OK={regime_ok}")
if direction == 'both':
return regime_ok, regime_ok
return regime_ok
except Exception as e:
algorithm.Debug(f"Error in _calculate_sharpe_based_filter for {symbol}: {e}")
if direction == 'both':
return True, True
return True
def _calculate_weekly_ema_filter(algorithm, symbol, direction='long'):
"""Weekly EMA filter for MSFT/GOOGL (smoother than daily)"""
try:
# Phase 3: Use indicator bundle instead of History() call
if symbol not in algorithm.indicators:
if direction == 'both':
return True, True
return True
indicator_bundle = algorithm.indicators[symbol]
# Get weekly EMA values from indicator bundle
ema_13, ema_26 = indicator_bundle.get_weekly_ema_values()
if ema_13 is None or ema_26 is None:
if direction == 'both':
return True, True
return True
# Long: 13 EMA > 26 EMA, Short: 13 EMA < 26 EMA
long_regime = ema_13 > ema_26
short_regime = ema_13 < ema_26
algorithm.Debug(f"WEEKLY EMA FILTER {symbol}: EMA13={ema_13:.2f}, EMA26={ema_26:.2f}, "
f"LongOK={long_regime}, ShortOK={short_regime}")
if direction == 'long':
return long_regime
elif direction == 'short':
return short_regime
else: # 'both'
return long_regime, short_regime
except Exception as e:
algorithm.Debug(f"Error in _calculate_weekly_ema_filter for {symbol}: {e}")
if direction == 'both':
return True, True
return True
def _calculate_daily_trend_filter(algorithm, symbol, direction='long'):
"""Refined daily trend filter with asset-specific logic and crypto volatility gating"""
try:
# Phase 3: Use indicator bundle instead of History() call
if symbol not in algorithm.indicators:
if direction == 'both':
return True, True
return True
indicator_bundle = algorithm.indicators[symbol]
if not indicator_bundle.is_ready:
if direction == 'both':
return True, True
return True
# Get current EMA values from indicators (ema20 = fast, ema200 = slow)
values = indicator_bundle.get_values()
current_ema_fast = values.get('ema20', 0)
current_ema_slow = values.get('ema200', 0)
if current_ema_fast <= 0 or current_ema_slow <= 0:
if direction == 'both':
return True, True
return True
# Calculate EMA slope using recent price data (approximate slope)
if indicator_bundle.close_window.Count < 5:
ema_slope = 0.0
else:
recent_price = indicator_bundle.close_window[0]
older_price = indicator_bundle.close_window[4] # 5 days ago
if older_price > 0:
ema_slope = (recent_price - older_price) / older_price / 5 # Daily rate
else:
ema_slope = 0.0
# Trend conditions for both directions
uptrend = current_ema_fast > current_ema_slow
downtrend = current_ema_fast < current_ema_slow
# Use asset-specific slope thresholds
symbol_str = str(symbol)
if symbol_str.endswith('USD') or symbol_str.endswith('USDT'):
slope_threshold = CRYPTO_SLOPE_THRESHOLD
is_crypto = True
else:
slope_threshold = TREND_SLOPE_THRESHOLD
is_crypto = False
momentum_up = ema_slope > slope_threshold
momentum_down = ema_slope < -slope_threshold
# Crypto volatility gating for OR/AND logic decision
if is_crypto:
crypto_vol_sufficient = calculate_crypto_volatility_gate(algorithm, symbol)
if crypto_vol_sufficient:
# High volatility: Use OR logic (either EMA condition OR strong slope)
long_regime_ok = uptrend or momentum_up
short_regime_ok = downtrend or momentum_down
else:
# Low volatility: Use stricter AND logic to avoid crypto winter bleeding
long_regime_ok = uptrend and momentum_up
short_regime_ok = downtrend and momentum_down
else:
# For others: AND logic (both EMA condition AND slope momentum)
long_regime_ok = uptrend and momentum_up
short_regime_ok = downtrend and momentum_down
algorithm.Debug(f"DAILY TREND FILTER {symbol}: EMAs={current_ema_fast:.2f}/{current_ema_slow:.2f}, "
f"Slope={ema_slope:.4f}, LongOK={long_regime_ok}, ShortOK={short_regime_ok}")
if direction == 'long':
return long_regime_ok
elif direction == 'short':
return short_regime_ok
else: # 'both'
return long_regime_ok, short_regime_ok
except Exception as e:
algorithm.Debug(f"Error in _calculate_daily_trend_filter for {symbol}: {e}")
if direction == 'both':
return True, True
return True
def apply_low_volatility_adjustments(algorithm, symbol, conditions, is_fallback=False):
"""
Apply low volatility fallback adjustments to entry conditions.
Relaxes RSI and Bollinger Band thresholds during low volatility periods.
Args:
algorithm: QCAlgorithm instance
symbol: Symbol being analyzed
conditions: Dictionary of current conditions
is_fallback: Whether to apply fallback adjustments
Returns:
dict: Adjusted conditions
"""
if not is_fallback or not ENABLE_VOL_FALLBACKS:
return conditions
try:
# Apply relaxed thresholds
adjusted_conditions = conditions.copy()
# Note: This would typically adjust RSI and BB conditions
# The exact implementation depends on how conditions are structured
# This is a placeholder for the concept
algorithm.Debug(f"FALLBACK ADJUSTMENTS applied for {symbol}")
return adjusted_conditions
except Exception as e:
algorithm.Debug(f"Error applying low volatility adjustments for {symbol}: {e}")
return conditions """
Technical Indicators Module - Step 4 Implementation
Simple indicator bundle with QC indicators and warm-up functionality
"""
from AlgorithmImports import *
class IndicatorBundle:
"""
Simple bundle of technical indicators for a single symbol.
Step 4 implementation focusing on QC indicator objects and warm-up.
"""
def __init__(self, algorithm, symbol):
self.algorithm = algorithm
self.symbol = symbol
self.is_ready = False
# Initialize QC indicator objects
self.atr = AverageTrueRange(14)
self.ema20 = ExponentialMovingAverage(20)
self.ema200 = ExponentialMovingAverage(200)
self.rsi = RelativeStrengthIndex(14)
self.bb = BollingerBands(20, 2)
self.adx = AverageDirectionalIndex(14)
# Phase 3: Additional indicators to replace History() calls in advanced_features.py
from params import CRYPTO_VOL_LOOKBACK, SHARPE_LOOKBACK
# Rolling windows for returns-based calculations (replace History calls)
self.daily_returns = RollingWindow[float](CRYPTO_VOL_LOOKBACK + 10) # For crypto volatility
self.sharpe_returns = RollingWindow[float](SHARPE_LOOKBACK + 10) # For Sharpe calculation
# Weekly EMA indicators (for MSFT/GOOGL filter)
self.weekly_closes = RollingWindow[float](30) # Store weekly closes
self.weekly_bar_count = 0 # Track when to sample weekly
# Phase 2: Rolling window for adaptive thresholds (replaces fixed per-asset params)
self.close_window = RollingWindow[float](252) # 1 year of daily data for percentiles
# Phase 3: Rolling windows for lookback high/low calculations (replace History calls in logic.py)
from params import LOOKBACK_PERIOD
self.high_window = RollingWindow[float](LOOKBACK_PERIOD + 1)
self.low_window = RollingWindow[float](LOOKBACK_PERIOD + 1)
# Store all indicators for easy management
self.indicators = {
'atr': self.atr,
'ema20': self.ema20,
'ema200': self.ema200,
'rsi': self.rsi,
'bb': self.bb,
'adx': self.adx
}
# Phase 3: Track previous close for return calculation
self.previous_close = None
def warm_up(self, history):
"""
Warm up indicators with historical data so they're ready before use.
Args:
history: Historical price data from QC History() method
"""
if history.empty:
self.algorithm.Debug(f"No historical data available for {self.symbol}")
return
# Process each historical bar
for index, row in history.iterrows():
# Create TradeBar object for indicator updates
bar = TradeBar(
index[1], # timestamp
self.symbol,
row['open'],
row['high'],
row['low'],
row['close'],
row['volume'] if 'volume' in row else 0
)
# Update all indicators with this bar
self.update_indicators(bar)
# Check if indicators are ready after warm-up
self._check_ready_status()
if self.is_ready:
self.algorithm.Debug(f"Indicators warmed up for {self.symbol}")
else:
self.algorithm.Debug(f"WARNING: Some indicators not ready for {self.symbol}")
def update_indicators(self, data):
"""
Update all indicators with new price data.
Args:
data: Price data (TradeBar or QuoteBar)
"""
if not hasattr(data, 'Close'):
return
# Update indicators that need TradeBar
self.atr.Update(data)
self.adx.Update(data)
# Update indicators that need (timestamp, value)
timestamp = data.EndTime
close_price = data.Close
self.ema20.Update(timestamp, close_price)
self.ema200.Update(timestamp, close_price)
self.rsi.Update(timestamp, close_price)
self.bb.Update(timestamp, close_price)
# Phase 2: Update rolling window for adaptive thresholds
self.close_window.Add(close_price)
# Phase 3: Update high/low rolling windows (replace History calls in logic.py)
if hasattr(data, 'High') and hasattr(data, 'Low'):
self.high_window.Add(data.High)
self.low_window.Add(data.Low)
# Phase 3: Update returns-based rolling windows (replace History() calls)
if self.previous_close is not None and self.previous_close > 0:
daily_return = (close_price - self.previous_close) / self.previous_close
self.daily_returns.Add(daily_return)
self.sharpe_returns.Add(daily_return)
# Phase 3: Weekly sampling for weekly EMA filter (every 5 business days)
self.weekly_bar_count += 1
if self.weekly_bar_count % 5 == 0: # Approximate weekly sampling
self.weekly_closes.Add(close_price)
# Update previous close for next iteration
self.previous_close = close_price
# Update ready status
if not self.is_ready:
self._check_ready_status()
def _check_ready_status(self):
"""Check if all indicators are ready"""
all_ready = all(indicator.IsReady for indicator in self.indicators.values())
if all_ready and not self.is_ready:
self.is_ready = True
self.algorithm.Debug(f"All indicators ready for {self.symbol}")
def get_values(self):
"""
Get current values of all indicators including adaptive parameters.
Phase 1.2: Added adaptive K and stop multiples.
Returns:
dict: Dictionary of indicator values (None if not ready)
"""
if not self.is_ready:
return {name: None for name in self.indicators.keys()}
# Get current price and ATR for adaptive calculations
current_price = self.algorithm.Securities[self.symbol].Price
atr_value = self.atr.Current.Value
# Calculate adaptive parameters
adaptive_params = self._calculate_adaptive_parameters(current_price, atr_value)
base_values = {
'atr': atr_value,
'ema20': self.ema20.Current.Value,
'ema200': self.ema200.Current.Value,
'rsi': self.rsi.Current.Value,
'bb_upper': self.bb.UpperBand.Current.Value,
'bb_middle': self.bb.MiddleBand.Current.Value,
'bb_lower': self.bb.LowerBand.Current.Value,
'adx': self.adx.Current.Value
}
# Merge with adaptive parameters
base_values.update(adaptive_params)
return base_values
def _calculate_adaptive_parameters(self, current_price, atr_value):
"""
Calculate adaptive parameters based on current market conditions.
Phase 1.2: Replaces fixed ASSET_PARAMS with dynamic calculations.
Args:
current_price: Current symbol price
atr_value: Current ATR value
Returns:
dict: Adaptive parameters (K, stop_atr_mult, trail_atr_mult, vol_mult)
"""
from params import (K_DEFAULT, K_MIN, K_MAX, STOP_ATR_MULT_DEFAULT, STOP_ATR_MULT_MIN,
STOP_ATR_MULT_MAX, TRAIL_ATR_MULT_DEFAULT, TRAIL_ATR_MULT_MIN,
TRAIL_ATR_MULT_MAX, VOL_MULT_DEFAULT, VOL_MULT_MIN, VOL_MULT_MAX,
USE_ADAPTIVE_PARAMS, CRYPTO_K_BASE, CRYPTO_STOP_BASE,
EQUITY_K_BASE, EQUITY_STOP_BASE, BOND_K_BASE, BOND_STOP_BASE)
if current_price <= 0 or atr_value <= 0:
# Return defaults if invalid data
return {
'adaptive_k': K_DEFAULT,
'adaptive_stop_atr': STOP_ATR_MULT_DEFAULT,
'adaptive_trail_atr': TRAIL_ATR_MULT_DEFAULT,
'adaptive_vol_mult': VOL_MULT_DEFAULT
}
# Emergency kill-switch: use static defaults
if not USE_ADAPTIVE_PARAMS:
return {
'adaptive_k': K_DEFAULT,
'adaptive_stop_atr': STOP_ATR_MULT_DEFAULT,
'adaptive_trail_atr': TRAIL_ATR_MULT_DEFAULT,
'adaptive_vol_mult': VOL_MULT_DEFAULT
}
# When adaptive enabled: use asset-class baseline + gentle volatility tilt
symbol_str = str(self.symbol)
# Determine asset class baseline
if any(crypto in symbol_str for crypto in ['BTC', 'ETH', 'SOL', 'USD']):
base_k = CRYPTO_K_BASE
base_stop = CRYPTO_STOP_BASE
base_vol_mult = VOL_MULT_DEFAULT * 1.1
elif 'GLD' in symbol_str:
base_k = BOND_K_BASE
base_stop = BOND_STOP_BASE
base_vol_mult = VOL_MULT_DEFAULT * 0.85
else:
# Equities
base_k = EQUITY_K_BASE
base_stop = EQUITY_STOP_BASE
base_vol_mult = VOL_MULT_DEFAULT
# Calculate ATR as percentage of price
atr_pct = atr_value / current_price
from params import BASELINE_ATR_PCT
# Apply VERY gentle volatility tilt
try:
ratio = max(0.5, min(2.0, atr_pct / BASELINE_ATR_PCT)) # clamp extreme ratios
k_adj = base_k * (ratio ** 0.15) # Very gentle 0.15 exponent
stop_adj = base_stop * (ratio ** 0.20) # Slightly steeper for stops
except Exception:
k_adj = base_k
stop_adj = base_stop
# Apply bounds
adaptive_k = max(K_MIN, min(K_MAX, k_adj))
adaptive_stop_atr = max(STOP_ATR_MULT_MIN, min(STOP_ATR_MULT_MAX, stop_adj))
# Trail follows stop with fixed multiplier
adaptive_trail_atr = adaptive_stop_atr * 1.5
adaptive_trail_atr = max(TRAIL_ATR_MULT_MIN, min(TRAIL_ATR_MULT_MAX, adaptive_trail_atr))
# Vol mult with gentle adjustment
if atr_pct >= 0.025: # High volatility
adaptive_vol_mult = base_vol_mult * 1.05
elif atr_pct <= 0.008: # Low volatility
adaptive_vol_mult = base_vol_mult * 0.95
else:
adaptive_vol_mult = base_vol_mult
# Bound vol mult within limits
adaptive_vol_mult = max(VOL_MULT_MIN, min(VOL_MULT_MAX, adaptive_vol_mult))
return {
'adaptive_k': adaptive_k,
'adaptive_stop_atr': adaptive_stop_atr,
'adaptive_trail_atr': adaptive_trail_atr,
'adaptive_vol_mult': adaptive_vol_mult
}
# Phase 2: Adaptive threshold methods (replace fixed per-asset parameters)
def rolling_percentile(self, window_days=252, percentile=50):
"""
Calculate rolling percentile of close prices to replace fixed thresholds.
Args:
window_days: Number of days to look back (default 252 = 1 year)
percentile: Percentile to calculate (0-100)
Returns:
float: Percentile value or 0 if insufficient data
"""
if self.close_window.Count < min(window_days, 20): # Need at least 20 bars
return 0.0
# Get data points (up to window_days or available count)
data_points = []
count = min(window_days, self.close_window.Count)
for i in range(count):
data_points.append(self.close_window[i])
if len(data_points) < 5: # Need minimum data
return 0.0
# Calculate percentile using numpy equivalent
data_points.sort()
index = (percentile / 100.0) * (len(data_points) - 1)
if index.is_integer():
return data_points[int(index)]
else:
lower = data_points[int(index)]
upper = data_points[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def adaptive_rsi_bounds(self):
"""
Generate adaptive RSI bounds based on historical price distribution.
Replaces fixed RSI_LOWER/RSI_UPPER parameters.
Returns:
tuple: (lower_bound, upper_bound) for RSI, defaults to (30, 70)
"""
if not self.is_ready or self.close_window.Count < 50:
return (30, 70) # Conservative defaults
# Use price volatility to adjust RSI sensitivity
# Higher volatility assets get wider RSI bounds (less sensitive)
current_price = self.close_window[0]
price_25th = self.rolling_percentile(90, 25)
price_75th = self.rolling_percentile(90, 75)
if price_25th <= 0 or price_75th <= 0:
return (30, 70)
volatility_ratio = (price_75th - price_25th) / current_price
# Adaptive bounds: high volatility = wider bounds, low volatility = tighter bounds
if volatility_ratio > 0.15: # High volatility
return (25, 75)
elif volatility_ratio < 0.05: # Low volatility
return (35, 65)
else: # Medium volatility
return (30, 70)
def adaptive_bb_threshold(self):
"""
Generate adaptive Bollinger Band threshold based on recent price behavior.
Replaces fixed BB_THRESH parameters.
Returns:
float: BB threshold (0.0 to 1.0), defaults to 0.4
"""
if not self.is_ready or self.close_window.Count < 30:
return 0.4 # Conservative default
# Calculate recent volatility to adjust BB sensitivity
recent_high = self.rolling_percentile(20, 90)
recent_low = self.rolling_percentile(20, 10)
current_price = self.close_window[0]
if recent_high <= recent_low or current_price <= 0:
return 0.4
recent_range = (recent_high - recent_low) / current_price
# Adaptive threshold: high volatility = higher threshold (less sensitive)
if recent_range > 0.20: # High volatility
return 0.5
elif recent_range < 0.05: # Low volatility
return 0.3
else: # Medium volatility
return 0.4
# Phase 3: Methods to replace History() calls in advanced_features.py
def get_realized_volatility(self, lookback_days):
"""
Calculate realized volatility from rolling returns (replaces History call in crypto vol gate).
Args:
lookback_days: Number of days to look back
Returns:
float: Annualized realized volatility or 0 if insufficient data
"""
if self.daily_returns.Count < max(lookback_days, 20):
return 0.0
# Get recent returns
returns = []
count = min(lookback_days, self.daily_returns.Count)
for i in range(count):
returns.append(self.daily_returns[i])
if len(returns) < 20:
return 0.0
# Calculate standard deviation and annualize
mean_return = sum(returns) / len(returns)
variance = sum((r - mean_return) ** 2 for r in returns) / (len(returns) - 1)
std_dev = variance ** 0.5
# Annualize (252 trading days, but crypto trades 365 days)
return std_dev * (365 ** 0.5)
def get_sharpe_ratio(self, lookback_days):
"""
Calculate Sharpe ratio from rolling returns (replaces History call in Sharpe filter).
Args:
lookback_days: Number of days to look back
Returns:
float: Annualized Sharpe ratio or 0 if insufficient data
"""
if self.sharpe_returns.Count < max(lookback_days, 20):
return 0.0
# Get recent returns
returns = []
count = min(lookback_days, self.sharpe_returns.Count)
for i in range(count):
returns.append(self.sharpe_returns[i])
if len(returns) < 20:
return 0.0
# Calculate Sharpe ratio (assuming 0% risk-free rate)
mean_return = sum(returns) / len(returns)
if len(returns) < 2:
return 0.0
variance = sum((r - mean_return) ** 2 for r in returns) / (len(returns) - 1)
std_dev = variance ** 0.5
if std_dev == 0:
return 0.0
# Annualize
return (mean_return / std_dev) * (252 ** 0.5)
def get_weekly_ema_values(self):
"""
Calculate weekly EMA values (replaces History call in weekly EMA filter).
Returns:
tuple: (ema_13, ema_26) or (None, None) if insufficient data
"""
if self.weekly_closes.Count < 26:
return (None, None)
# Get weekly closes
weekly_data = []
for i in range(min(26, self.weekly_closes.Count)):
weekly_data.append(self.weekly_closes[i])
# Reverse to get chronological order
weekly_data.reverse()
if len(weekly_data) < 26:
return (None, None)
# Calculate EMA using simple approximation
# EMA formula: new_ema = (value * multiplier) + (previous_ema * (1 - multiplier))
# where multiplier = 2 / (period + 1)
mult_13 = 2.0 / (13 + 1)
mult_26 = 2.0 / (26 + 1)
# Initialize with first value
ema_13 = weekly_data[0]
ema_26 = weekly_data[0]
# Calculate EMAs
for price in weekly_data[1:]:
ema_13 = (price * mult_13) + (ema_13 * (1 - mult_13))
ema_26 = (price * mult_26) + (ema_26 * (1 - mult_26))
return (ema_13, ema_26)
def get_lookback_high(self, lookback_periods=None):
"""
Get the highest high over the lookback period (replaces History call in logic.py).
Args:
lookback_periods: Number of periods to look back (uses LOOKBACK_PERIOD if None)
Returns:
float: Highest high over lookback period, or 0 if insufficient data
"""
if lookback_periods is None:
from params import LOOKBACK_PERIOD
lookback_periods = LOOKBACK_PERIOD
if self.high_window.Count < lookback_periods + 1:
return 0.0
# Get highs excluding the most recent bar (to avoid same-day comparison)
highs = []
for i in range(1, min(lookback_periods + 1, self.high_window.Count)): # Start from 1 to exclude current
highs.append(self.high_window[i])
if not highs:
return 0.0
return max(highs)
def get_lookback_low(self, lookback_periods=None):
"""
Get the lowest low over the lookback period (replaces History call in logic.py).
Args:
lookback_periods: Number of periods to look back (uses LOOKBACK_PERIOD if None)
Returns:
float: Lowest low over lookback period, or 0 if insufficient data
"""
if lookback_periods is None:
from params import LOOKBACK_PERIOD
lookback_periods = LOOKBACK_PERIOD
if self.low_window.Count < lookback_periods + 1:
return 0.0
# Get lows excluding the most recent bar (to avoid same-day comparison)
lows = []
for i in range(1, min(lookback_periods + 1, self.low_window.Count)): # Start from 1 to exclude current
lows.append(self.low_window[i])
if not lows:
return 0.0
return min(lows) """
Trading Logic Module - Step 8 Implementation
Core breakout entry logic with short selling and pyramiding support
Enhanced with four advanced features from research
"""
from AlgorithmImports import *
from params import (LOOKBACK_PERIOD, BASE_POSITION_SIZE,
ENABLE_SHORTS, ENABLE_PYRAMIDING, PYRAMID_K1, PYRAMID_K2,
PYRAMID_SIZE_MULT, MAX_TOTAL_SIZE_MULT)
from advanced_features import (detect_intrabar_pyramid_opportunities, calculate_crypto_volatility_gate,
calculate_low_volatility_fallback, calculate_trend_based_regime_filter,
apply_low_volatility_adjustments)
class BreakoutLogic:
"""
Implements core breakout entry logic for Step 8.
Enhanced with short selling and pyramiding capabilities.
"""
def __init__(self, algorithm):
self.algorithm = algorithm
# Track recent data for lookback calculations
self.recent_data = {} # symbol -> list of recent price data
def should_enter_long(self, symbol, indicator_bundle):
"""
Check if we should enter a long position for this symbol.
Core logic from research notebook:
1. Price breaks above 18-period high + K * ATR
2. Basic momentum: close > 21-period EMA
3. Indicators are ready
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance for the symbol
Returns:
bool: True if should enter long position
"""
if not indicator_bundle.is_ready:
self.algorithm.Debug(f"{symbol}: Indicators not ready for LONG entry")
return False
try:
# Get current indicator values
values = indicator_bundle.get_values()
current_price = self.algorithm.Securities[symbol].Price
atr = values.get('atr', 0)
ema20 = values.get('ema20', 0)
if atr <= 0 or ema20 <= 0 or current_price <= 0:
self.algorithm.Debug(f"{symbol}: Invalid indicator values for LONG - ATR={atr}, EMA20={ema20}, Price={current_price}")
return False
# Get adaptive parameters from indicator bundle
adaptive_k = values.get('adaptive_k', 1.25) # Use adaptive K
# Get recent high for lookback calculation
lookback_high = self._get_lookback_high(symbol)
if lookback_high <= 0:
self.algorithm.Debug(f"{symbol}: No lookback high available for LONG")
return False # Not enough data yet
# Core breakout condition: close > lookback_high + K * ATR
k_value = adaptive_k
breakout_level = lookback_high + (k_value * atr)
breakout_condition = current_price >= breakout_level
# Basic momentum condition: close > EMA20 (upward momentum for longs)
momentum_condition = current_price > ema20
# Additional logging for debugging
self.algorithm.Debug(f"{symbol} LONG: Price={current_price:.2f}, LookbackHigh={lookback_high:.2f}, "
f"ATR={atr:.3f}, K={k_value:.2f}, BreakoutLevel={breakout_level:.2f}, "
f"EMA20={ema20:.2f}")
self.algorithm.Debug(f"{symbol} LONG: BreakoutCondition={breakout_condition}, "
f"MomentumCondition={momentum_condition}")
# Combined entry condition
should_enter = breakout_condition and momentum_condition
if should_enter:
self.algorithm.Debug(f"LONG ENTRY SIGNAL: {symbol} - Price={current_price:.2f} > "
f"Breakout={breakout_level:.2f} AND Price > EMA20={ema20:.2f}")
return should_enter
except Exception as e:
self.algorithm.Debug(f"Error in should_enter_long for {symbol}: {e}")
return False
def should_enter_short(self, symbol, indicator_bundle):
"""
Check if we should enter a short position for this symbol.
Step 8: New short selling capability.
Phase 3.3: Added regime filter to block shorts in bull markets.
Core logic (mirroring long logic):
1. Price breaks below 18-period low - K * ATR
2. Basic momentum: close < 21-period EMA
3. Indicators are ready
4. Phase 3.3: Regime filter allows shorts
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance for the symbol
Returns:
bool: True if should enter short position
"""
if not ENABLE_SHORTS:
return False
# Phase 3.3: Check regime filter before proceeding
if not self._check_short_regime_filter():
return False
# Phase 3.5: Skip expensive or unavailable shorts (borrow filter)
try:
from params import EXPENSIVE_BORROW_THRESHOLD
security = self.algorithm.Securities[symbol]
borrow_rate = getattr(security, 'BorrowingCostRate', 0.0)
is_shortable = getattr(security, 'IsShortable', True)
if (not is_shortable) or (borrow_rate > EXPENSIVE_BORROW_THRESHOLD):
self.algorithm.Debug(f"SKIP SHORT {symbol}: BorrowRate={borrow_rate:.4f} IsShortable={is_shortable}")
return False
except Exception as _:
# If attribute missing, continue
pass
if not indicator_bundle.is_ready:
self.algorithm.Debug(f"{symbol}: Indicators not ready for SHORT entry")
return False
try:
# Get current indicator values
values = indicator_bundle.get_values()
current_price = self.algorithm.Securities[symbol].Price
atr = values.get('atr', 0)
ema20 = values.get('ema20', 0)
if atr <= 0 or ema20 <= 0 or current_price <= 0:
self.algorithm.Debug(f"{symbol}: Invalid indicator values for SHORT - ATR={atr}, EMA20={ema20}, Price={current_price}")
return False
# Get adaptive parameters from indicator bundle
adaptive_k = values.get('adaptive_k', 1.25) # Use adaptive K
# Get recent low for lookback calculation
lookback_low = self._get_lookback_low(symbol)
if lookback_low <= 0:
self.algorithm.Debug(f"{symbol}: No lookback low available for SHORT")
return False # Not enough data yet
# Core breakout condition: close < lookback_low - K * ATR
k_value = adaptive_k
breakout_level = lookback_low - (k_value * atr)
breakout_condition = current_price <= breakout_level
# Basic momentum condition: close < EMA20 (downward momentum for shorts)
momentum_condition = current_price < ema20
# Additional logging for debugging
self.algorithm.Debug(f"{symbol} SHORT: Price={current_price:.2f}, LookbackLow={lookback_low:.2f}, "
f"ATR={atr:.3f}, K={k_value:.2f}, BreakoutLevel={breakout_level:.2f}, "
f"EMA20={ema20:.2f}")
self.algorithm.Debug(f"{symbol} SHORT: BreakoutCondition={breakout_condition}, "
f"MomentumCondition={momentum_condition}")
# Combined entry condition
should_enter = breakout_condition and momentum_condition
if should_enter:
self.algorithm.Debug(f"SHORT ENTRY SIGNAL: {symbol} - Price={current_price:.2f} < "
f"Breakout={breakout_level:.2f} AND Price < EMA20={ema20:.2f}")
return should_enter
except Exception as e:
self.algorithm.Debug(f"Error in should_enter_short for {symbol}: {e}")
return False
def check_pyramid_opportunities(self, symbol, indicator_bundle, current_direction):
"""
Check for pyramid entry opportunities.
Step 8: New pyramiding capability.
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance for the symbol
current_direction: Current position direction (1 for long, -1 for short)
Returns:
dict: Dictionary with pyramid levels that can be entered
{'pyramid1': bool, 'pyramid2': bool}
"""
if not ENABLE_PYRAMIDING:
return {'pyramid1': False, 'pyramid2': False}
if not indicator_bundle.is_ready:
return {'pyramid1': False, 'pyramid2': False}
try:
# Get current indicator values
values = indicator_bundle.get_values()
current_price = self.algorithm.Securities[symbol].Price
atr = values.get('atr', 0)
if atr <= 0 or current_price <= 0:
return {'pyramid1': False, 'pyramid2': False}
# Check pyramid cooldown
if not self.algorithm.risk_manager.check_pyramid_cooldown(symbol):
return {'pyramid1': False, 'pyramid2': False}
# Get reference levels based on direction
if current_direction == 1: # Long position
reference_level = self._get_lookback_high(symbol)
if reference_level <= 0:
return {'pyramid1': False, 'pyramid2': False}
# Pyramid levels for long positions (above reference)
pyramid1_level = reference_level + (PYRAMID_K1 * atr)
pyramid2_level = reference_level + (PYRAMID_K2 * atr)
pyramid1_triggered = current_price >= pyramid1_level
pyramid2_triggered = current_price >= pyramid2_level
else: # Short position (current_direction == -1)
reference_level = self._get_lookback_low(symbol)
if reference_level <= 0:
return {'pyramid1': False, 'pyramid2': False}
# Pyramid levels for short positions (below reference)
pyramid1_level = reference_level - (PYRAMID_K1 * atr)
pyramid2_level = reference_level - (PYRAMID_K2 * atr)
pyramid1_triggered = current_price <= pyramid1_level
pyramid2_triggered = current_price <= pyramid2_level
direction_str = "LONG" if current_direction == 1 else "SHORT"
self.algorithm.Debug(f"{symbol} {direction_str} PYRAMID: P1_Level={pyramid1_level:.2f}, "
f"P2_Level={pyramid2_level:.2f}, Price={current_price:.2f}")
return {
'pyramid1': pyramid1_triggered,
'pyramid2': pyramid2_triggered
}
except Exception as e:
self.algorithm.Debug(f"Error checking pyramid opportunities for {symbol}: {e}")
return {'pyramid1': False, 'pyramid2': False}
def check_intrabar_pyramid_opportunities(self, symbol, indicator_bundle, current_direction, position_data):
"""
Enhanced pyramid detection using intrabar analysis.
Advanced Feature #1: Intrabar Pyramid Detection
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance
current_direction: Current position direction (1 for long, -1 for short)
position_data: Dictionary with position metadata
Returns:
dict: {'pyramid1': bool, 'pyramid2': bool}
"""
if not ENABLE_PYRAMIDING:
return {'pyramid1': False, 'pyramid2': False}
# Use the advanced intrabar detection
return detect_intrabar_pyramid_opportunities(
self.algorithm, symbol, indicator_bundle, current_direction, position_data
)
def check_regime_filter(self, symbol, direction='long'):
"""
Check trend-based regime filter with asset-specific logic.
Advanced Feature #4: Complex Regime Filters
Args:
symbol: Symbol to check
direction: 'long', 'short', or 'both'
Returns:
bool or tuple: Whether current regime allows trading in specified direction(s)
"""
return calculate_trend_based_regime_filter(self.algorithm, symbol, direction)
def should_enter_long_advanced(self, symbol, indicator_bundle, year_signals=0):
"""
Enhanced long entry logic with all advanced features.
Integrates regime filters, volatility gating, and low volatility fallbacks.
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance for the symbol
year_signals: Number of signals generated this year for fallback logic
Returns:
bool: True if should enter long position
"""
if not indicator_bundle.is_ready:
self.algorithm.Debug(f"{symbol}: Indicators not ready for ADVANCED LONG entry")
return False
try:
# Step 1: Check basic breakout conditions
basic_long_signal = self.should_enter_long(symbol, indicator_bundle)
if not basic_long_signal:
return False
# Step 2: Apply Advanced Feature #4 - Complex Regime Filter
regime_ok = self.check_regime_filter(symbol, 'long')
if not regime_ok:
self.algorithm.Debug(f"{symbol}: REGIME FILTER blocked long entry")
return False
# Step 3: Apply Advanced Feature #3 - Low Volatility Fallbacks
is_low_vol_fallback = calculate_low_volatility_fallback(self.algorithm, symbol, year_signals)
# Step 4: Apply Advanced Feature #2 - Crypto Volatility Gating (integrated in regime filter)
# This is already handled within the regime filter for crypto assets
# Step 5: Final confirmation with potential adjustments
self.algorithm.Debug(f"ADVANCED LONG ENTRY {symbol}: BasicOK={basic_long_signal}, "
f"RegimeOK={regime_ok}, LowVolFallback={is_low_vol_fallback}")
return True
except Exception as e:
self.algorithm.Debug(f"Error in should_enter_long_advanced for {symbol}: {e}")
return False
def should_enter_short_advanced(self, symbol, indicator_bundle, year_signals=0):
"""
Enhanced short entry logic with all advanced features.
Integrates regime filters, volatility gating, and low volatility fallbacks.
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance for the symbol
year_signals: Number of signals generated this year for fallback logic
Returns:
bool: True if should enter short position
"""
if not ENABLE_SHORTS:
return False
if not indicator_bundle.is_ready:
self.algorithm.Debug(f"{symbol}: Indicators not ready for ADVANCED SHORT entry")
return False
try:
# Step 1: Check basic breakout conditions
basic_short_signal = self.should_enter_short(symbol, indicator_bundle)
if not basic_short_signal:
return False
# Step 2: Apply Advanced Feature #4 - Complex Regime Filter
regime_ok = self.check_regime_filter(symbol, 'short')
if not regime_ok:
self.algorithm.Debug(f"{symbol}: REGIME FILTER blocked short entry")
return False
# Step 3: Apply Advanced Feature #3 - Low Volatility Fallbacks
is_low_vol_fallback = calculate_low_volatility_fallback(self.algorithm, symbol, year_signals)
# Step 4: Apply Advanced Feature #2 - Crypto Volatility Gating (integrated in regime filter)
# This is already handled within the regime filter for crypto assets
# Step 5: Final confirmation with potential adjustments
self.algorithm.Debug(f"ADVANCED SHORT ENTRY {symbol}: BasicOK={basic_short_signal}, "
f"RegimeOK={regime_ok}, LowVolFallback={is_low_vol_fallback}")
return True
except Exception as e:
self.algorithm.Debug(f"Error in should_enter_short_advanced for {symbol}: {e}")
return False
def get_position_size(self, symbol, entry_type='primary', direction=1):
"""
Get position size for this symbol.
Step 8: Enhanced with pyramiding support.
Phase 3.4: Added direction parameter for Kelly short edge discount.
Args:
symbol: Symbol to size
entry_type: 'primary', 'pyramid1', 'pyramid2'
direction: 1 for long, -1 for short
Returns:
float: Position size as percentage (varies based on ATR and entry type)
"""
try:
# Get indicator bundle for this symbol
if symbol not in self.algorithm.indicators:
self.algorithm.Debug(f"No indicators for {symbol}, using base position size")
return BASE_POSITION_SIZE
indicator_bundle = self.algorithm.indicators[symbol]
# Check if indicators are ready
if not indicator_bundle.is_ready:
self.algorithm.Debug(f"Indicators not ready for {symbol}, using base position size")
return BASE_POSITION_SIZE
# Get current values
values = indicator_bundle.get_values()
current_price = self.algorithm.Securities[symbol].Price
atr_value = values.get('atr', 0)
# Validate values
if atr_value <= 0 or current_price <= 0:
self.algorithm.Debug(f"Invalid values for {symbol}: ATR={atr_value}, Price={current_price}")
return BASE_POSITION_SIZE
# Calculate base position size using volatility-based formula
from risk_manager import calculate_volatility_based_position_size
base_size = calculate_volatility_based_position_size(
self.algorithm, symbol, atr_value, current_price, direction
)
# Apply size multiplier based on entry type
if entry_type == 'primary':
position_size = base_size
elif entry_type in ['pyramid1', 'pyramid2']:
position_size = base_size * PYRAMID_SIZE_MULT
else:
position_size = base_size
# Check if adding this size would exceed maximum total size
current_total_size = self.algorithm.risk_manager.get_total_position_size(symbol)
if current_total_size + position_size > base_size * MAX_TOTAL_SIZE_MULT:
# Reduce size to fit within limits
max_additional = (base_size * MAX_TOTAL_SIZE_MULT) - current_total_size
position_size = max(0, max_additional)
if position_size <= 0:
self.algorithm.Debug(f"Cannot add {entry_type} for {symbol}: would exceed max total size")
return 0.0
self.algorithm.Debug(f"POSITION SIZE {symbol} ({entry_type}): Base={base_size:.2%}, "
f"Final={position_size:.2%}, Current={current_total_size:.2%}")
return position_size
except Exception as e:
self.algorithm.Debug(f"Error getting position size for {symbol}: {e}")
return BASE_POSITION_SIZE
# Phase 1.3: Removed _get_asset_params - now using adaptive parameters from indicator bundle
def _check_short_regime_filter(self):
"""
Phase 3.3: Check if market regime allows short selling.
Block shorts when SPY is above its 200-day EMA (bull market).
Returns:
bool: True if shorts are allowed
"""
from params import ENABLE_SHORT_REGIME_FILTER
if not ENABLE_SHORT_REGIME_FILTER:
return True # Filter disabled
try:
# Check if SPY indicators are available
if not hasattr(self.algorithm, 'spy_symbol') or self.algorithm.spy_symbol not in self.algorithm.indicators:
self.algorithm.Debug("SPY regime filter: SPY indicators not available, allowing shorts")
return True
spy_indicators = self.algorithm.indicators[self.algorithm.spy_symbol]
if not spy_indicators.is_ready:
self.algorithm.Debug("SPY regime filter: SPY indicators not ready, allowing shorts")
return True
# Get SPY values
spy_values = spy_indicators.get_values()
spy_price = self.algorithm.Securities[self.algorithm.spy_symbol].Price
spy_ema200 = spy_values.get('ema200', 0)
if spy_ema200 <= 0 or spy_price <= 0:
self.algorithm.Debug("SPY regime filter: Invalid SPY data, allowing shorts")
return True
# Block shorts if SPY is above 200 EMA (bull market)
is_bear_market = spy_price < spy_ema200
if not is_bear_market:
self.algorithm.Debug(f"SPY REGIME FILTER: Blocking shorts - SPY ${spy_price:.2f} > EMA200 ${spy_ema200:.2f}")
return is_bear_market
except Exception as e:
self.algorithm.Debug(f"Error in SPY regime filter: {e}, allowing shorts")
return True
def _get_lookback_high(self, symbol):
"""
Get the highest high over the lookback period.
Phase 3: Now uses indicator bundle instead of History() call.
Args:
symbol: Symbol to get lookback high for
Returns:
float: Highest high over lookback period, or 0 if insufficient data
"""
try:
# Phase 3: Use indicator bundle instead of History() call
if symbol not in self.algorithm.indicators:
self.algorithm.Debug(f"{symbol}: No indicator bundle for lookback high")
return 0
indicator_bundle = self.algorithm.indicators[symbol]
lookback_high = indicator_bundle.get_lookback_high()
if lookback_high <= 0:
self.algorithm.Debug(f"{symbol}: Insufficient data for lookback high")
return 0
return lookback_high
except Exception as e:
self.algorithm.Debug(f"Error getting lookback high for {symbol}: {e}")
return 0
def _get_lookback_low(self, symbol):
"""
Get the lowest low over the lookback period.
Step 8: New function for short selling.
Phase 3: Now uses indicator bundle instead of History() call.
Args:
symbol: Symbol to get lookback low for
Returns:
float: Lowest low over lookback period, or 0 if insufficient data
"""
try:
# Phase 3: Use indicator bundle instead of History() call
if symbol not in self.algorithm.indicators:
self.algorithm.Debug(f"{symbol}: No indicator bundle for lookback low")
return 0
indicator_bundle = self.algorithm.indicators[symbol]
lookback_low = indicator_bundle.get_lookback_low()
if lookback_low <= 0:
self.algorithm.Debug(f"{symbol}: Insufficient data for lookback low")
return 0
return lookback_low
except Exception as e:
self.algorithm.Debug(f"Error getting lookback low for {symbol}: {e}")
return 0 from AlgorithmImports import *
# Import helper modules (same directory)
from universe import UniverseSelector
from indicators import IndicatorBundle
from portfolio_sizing import PortfolioManager
from logic import BreakoutLogic
from risk import RiskManager
import params
class SkeletonAlgorithm(QCAlgorithm):
"""
Minimal QuantConnect algorithm skeleton for backtesting, paper, and live trading.
Step 9: Event-based exits and trailing runner with sophisticated scale-out strategy.
Usage:
1. Backtest: Run to verify event-based exit logic with scale-outs and runner trails.
2. Paper / Live: Deploy this project in QuantConnect, switch to live mode, and
choose your preferred broker (IB, Tradier, etc.)
3. Strategy development: Implements complete breakout strategy with event-based exits,
40% scale-out at profit target, and 60% runner with event-based trail.
"""
def Initialize(self):
# ---------------------------------------
# Basic algorithm setup
# ---------------------------------------
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2023, 12, 31)
self.SetCash(100000)
# Initialize strategy components
self.universe_selector = UniverseSelector(self)
self.portfolio_manager = PortfolioManager(self)
self.breakout_logic = BreakoutLogic(self) # Step 5: Add breakout logic
self.risk_manager = RiskManager(self) # Step 6: Add risk management
# Register coarse fundamental universe for dynamic equity candidate feed (if enabled)
if params.USE_LIVE_CANDIDATE_POOL:
self.AddUniverse(self._coarse_selection)
# Track yearly signals for low volatility fallback feature
self.yearly_signals = {} # symbol -> signal count
# Track indicators for each symbol (Step 4 requirement)
self.indicators = {}
# Phase 3.3: Add SPY for regime filtering
self.spy_symbol = self.AddEquity("SPY", Resolution.Daily).Symbol
self.spy_indicators = IndicatorBundle(self, self.spy_symbol)
# Warm up SPY indicators
spy_history = self.History(self.spy_symbol, 250, Resolution.Daily)
self.spy_indicators.warm_up(spy_history)
self.indicators[self.spy_symbol] = self.spy_indicators
# Get initial universe
initial_universe = self.universe_selector.get_universe()
# Add securities and set up indicators
for symbol_str in initial_universe:
symbol = None
try:
# Decide whether the symbol is crypto or equity/ETF
if symbol_str.endswith("USD") or symbol_str.endswith("USDT") or symbol_str.endswith("USDC"):
symbol = self.AddCrypto(symbol_str, Resolution.Daily).Symbol
else:
symbol = self.AddEquity(symbol_str, Resolution.Daily).Symbol
# Create indicator bundle for this symbol (Step 4 requirement)
indicator_bundle = IndicatorBundle(self, symbol)
# Warm up indicators with historical data
history = self.History(symbol, 250, Resolution.Daily)
indicator_bundle.warm_up(history)
# Store bundle in self.indicators (Step 4 requirement)
self.indicators[symbol] = indicator_bundle
except Exception as e:
# Skip symbols that cannot be added in the current QC data feed
self.Debug(f"WARNING: Could not add {symbol_str}: {e}")
# Set benchmark
self.SetBenchmark("SPY")
# Schedule dynamic universe updates
# Scan every Monday 10:00 NY time for new additions
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.universe_selector.update_universe)
# Monthly prune on first business day, 10:30 NY time
self.Schedule.On(self.DateRules.MonthStart(),
self.TimeRules.At(10, 30),
self.universe_selector.update_universe)
# Rebalancing will now happen in OnData when daily bars arrive
# This ensures we have valid prices and complete daily bars
self.rebalance_ready = False
# Log algorithm start
mode = "LIVE" if self.LiveMode else "BACKTEST"
self.Debug(f"Algorithm started in {mode} mode with {len(initial_universe)} symbols")
self.Debug(f"Enhanced with 4 advanced features: Intrabar Pyramids, Crypto Vol Gating, Low Vol Fallbacks, Complex Regime Filters")
self.Debug(f"Dynamic universe enabled: {params.ENABLE_DYNAMIC_UNIVERSE}, Static universe enabled: {params.ENABLE_STATIC_UNIVERSE}")
def OnData(self, data):
"""
Main data handler - processes new market data.
Step 9: Enhanced with event-based exits and trailing runner logic.
Step 10: Added risk governor with daily equity curve tracking.
"""
# Reset yearly signals at start of each year
if self.Time.month == 1 and self.Time.day == 1:
self.yearly_signals = {}
self.Debug("YEARLY RESET: Signal counters reset for low volatility fallback feature")
# Phase 4: Moved equity curve update to OnEndOfDay
# Update all indicator bundles first (including SPY)
for symbol in data.Keys:
if symbol in self.indicators:
bar = data[symbol]
if hasattr(bar, 'Close') and bar.Close > 0:
self.indicators[symbol].update_indicators(bar)
# Only trade if we have valid daily data for the symbols
if len(data) > 0:
# Step 9: Check for exits FIRST (before new entries)
self.check_for_exits(data)
# Then check for new entries
self.check_for_entries(data)
def OnEndOfDay(self, symbol):
"""
Phase 4: Handle end-of-day processing including equity curve updates.
Moved from OnData to reduce frequency and improve performance.
"""
# Update daily equity curve for risk governor
self.risk_manager.update_equity_curve()
# Emit periodic performance summary (once per week on Friday)
if self.Time.weekday() == 4: # Friday
self.risk_manager.emit_periodic_summary()
def check_for_exits(self, data):
"""
Step 9: Check for event-based exits - scale-outs, runner trails, and adaptive stops.
This is the sophisticated exit strategy from the research notebook.
"""
# Get all symbols with active risk states
symbols_with_positions = list(self.risk_manager.risk_states.keys())
for symbol in symbols_with_positions:
# Skip if no current data for this symbol
if symbol not in data or data[symbol] is None:
continue
bar = data[symbol]
if not hasattr(bar, 'Close') or bar.Close <= 0:
continue
current_price = bar.Close
current_high = getattr(bar, 'High', current_price)
current_low = getattr(bar, 'Low', current_price)
# Get indicator values for ATR
if symbol not in self.indicators:
continue
indicator_bundle = self.indicators[symbol]
if not indicator_bundle.is_ready:
continue
values = indicator_bundle.get_values()
atr_value = values.get('atr', 0)
if atr_value <= 0:
continue
risk_state = self.risk_manager.risk_states[symbol]
# Update bars held counter
risk_state.bars_held += 1
# Update adaptive stops based on survival time and scale-out status
risk_state.current_stop = self.risk_manager.calculate_adaptive_stops(
symbol, current_price, atr_value
)
# Update event-based trail (only moves on new highs/lows after scale-out)
trail_updated = self.risk_manager.update_event_based_trail(
symbol, current_high, current_low, atr_value
)
# ===== EXIT CHECKS (in order of priority) =====
# 1. Check for scale-out at profit target (40% of position)
if self.risk_manager.check_scale_out_conditions(symbol, current_price, current_high, current_low):
# Process the 40% scale-out
scale_out_pnl, scale_out_size, scale_out_return = self.risk_manager.process_scale_out(
symbol, current_price
)
# Reduce position by 40% (keep 60% as runner)
try:
current_holding = self.Portfolio[symbol]
portfolio_value = self.Portfolio.TotalPortfolioValue
if portfolio_value > 0:
current_holdings_pct = current_holding.HoldingsValue / portfolio_value
new_holdings_pct = current_holdings_pct * 0.6 # Keep 60%
self.SetHoldings(symbol, new_holdings_pct)
self.Debug(f"SCALE-OUT COMPLETE: {symbol} - 40% exited at ${current_price:.2f}, "
f"60% remains as runner (Return: {scale_out_return:.2%})")
# Step 10: Record scale-out trade for analytics
portfolio_value = self.Portfolio.TotalPortfolioValue
trade_pnl = scale_out_size * scale_out_return * portfolio_value
self.risk_manager.record_trade(scale_out_return, trade_pnl)
else:
self.Debug(f"Portfolio value error during scale-out for {symbol}")
except Exception as e:
self.Debug(f"Error during scale-out for {symbol}: {e}")
# Continue to check other conditions (don't return early)
# 2. Check for stop loss exit (full position)
elif current_price <= risk_state.current_stop if risk_state.direction == 1 else current_price >= risk_state.current_stop:
# Stop loss hit - exit full remaining position
self.Liquidate(symbol)
# Calculate final P&L
final_pnl, final_size, final_return = self.risk_manager.calculate_combined_pnl(
symbol, current_price, risk_state.remaining_position
)
direction_str = "LONG" if risk_state.direction == 1 else "SHORT"
self.Debug(f"STOP LOSS: {symbol} {direction_str} - Exited at ${current_price:.2f}, "
f"Stop: ${risk_state.current_stop:.2f} (Return: {final_return:.2%})")
# Close out risk state
self.risk_manager.close_position(symbol, "STOP_LOSS")
# Step 10: Record trade for analytics
portfolio_value = self.Portfolio.TotalPortfolioValue
trade_pnl = final_size * final_return * portfolio_value
self.risk_manager.record_trade(final_return, trade_pnl)
# Update portfolio manager
self.portfolio_manager.update_position(symbol, 0)
# 3. Check for runner trail exit (remaining 60% after scale-out)
elif self.risk_manager.check_runner_trail_exit(symbol, current_price, current_high, current_low):
# Runner trail hit - exit remaining position
self.Liquidate(symbol)
# Calculate runner P&L
runner_pnl, runner_size, runner_return = self.risk_manager.calculate_combined_pnl(
symbol, current_price, risk_state.remaining_position
)
direction_str = "LONG" if risk_state.direction == 1 else "SHORT"
self.Debug(f"RUNNER TRAIL: {symbol} {direction_str} - Exited at ${current_price:.2f}, "
f"Trail: ${risk_state.runner_trail:.2f} (Return: {runner_return:.2%})")
# Close out risk state
self.risk_manager.close_position(symbol, "RUNNER_TRAIL")
# Step 10: Record trade for analytics
portfolio_value = self.Portfolio.TotalPortfolioValue
trade_pnl = runner_size * runner_return * portfolio_value
self.risk_manager.record_trade(runner_return, trade_pnl)
# Update portfolio manager
self.portfolio_manager.update_position(symbol, 0)
def check_for_entries(self, data):
"""
Check for new entry opportunities on each daily bar.
Step 8: Enhanced with short selling and pyramiding capabilities.
"""
# Get current universe
universe = self.universe_selector.get_universe()
# Process each symbol in universe
for symbol_str in universe:
try:
# Get symbol object (may need to create if it's new from dynamic universe)
if symbol_str.endswith("USD") or symbol_str.endswith("USDT") or symbol_str.endswith("USDC"):
symbol = self.Symbol(symbol_str)
else:
symbol = self.Symbol(symbol_str)
# Skip if we don't have an indicator bundle for this symbol
if symbol not in self.indicators:
continue
except Exception as e:
self.Debug(f"Error processing symbol {symbol_str}: {e}")
continue
# Skip if we don't have data for this symbol
if symbol not in data or data[symbol] is None:
continue
indicator_bundle = self.indicators[symbol]
# Only check symbols with valid prices
bar = data[symbol]
if not hasattr(bar, 'Close'):
continue
current_price = bar.Close
if current_price <= 0:
continue
# Skip if indicators not ready
if not indicator_bundle.is_ready:
continue
values = indicator_bundle.get_values()
# Step 8: Check for new entries and pyramid opportunities
current_position = self.Portfolio[symbol].Quantity
is_long = current_position > 0
is_short = current_position < 0
has_position = is_long or is_short
if not has_position:
# No position - check for new entries
# Check for long entry with advanced features
year_signals = self.yearly_signals.get(symbol, 0)
should_enter_long = self.breakout_logic.should_enter_long_advanced(symbol, indicator_bundle, year_signals)
if should_enter_long:
position_size = self.breakout_logic.get_position_size(symbol, 'primary', direction=1)
if position_size > 0:
# Enter long position
self.SetHoldings(symbol, position_size)
self.Debug(f"LONG ENTRY: {symbol} with {position_size:.1%} allocation")
# Track yearly signals for low volatility fallback
self.yearly_signals[symbol] = self.yearly_signals.get(symbol, 0) + 1
# Create risk state for long position
entry_price = current_price
atr_value = values.get('atr', 0)
if atr_value > 0:
self.risk_manager.create_risk_state(
symbol, entry_price, position_size, atr_value,
direction=1, entry_type='primary'
)
# Update portfolio manager
quantity = self.Portfolio[symbol].Quantity
self.portfolio_manager.update_position(symbol, quantity)
continue # Skip short check since we just entered long
# Check for short entry with advanced features (only if shorts enabled and no long entry)
year_signals = self.yearly_signals.get(symbol, 0)
should_enter_short = self.breakout_logic.should_enter_short_advanced(symbol, indicator_bundle, year_signals)
if should_enter_short:
position_size = self.breakout_logic.get_position_size(symbol, 'primary', direction=-1)
if position_size > 0:
# Enter short position (negative size)
self.SetHoldings(symbol, -position_size)
self.Debug(f"SHORT ENTRY: {symbol} with {position_size:.1%} allocation (short)")
# Track yearly signals for low volatility fallback
self.yearly_signals[symbol] = self.yearly_signals.get(symbol, 0) + 1
# Create risk state for short position
entry_price = current_price
atr_value = values.get('atr', 0)
if atr_value > 0:
self.risk_manager.create_risk_state(
symbol, entry_price, position_size, atr_value,
direction=-1, entry_type='primary'
)
# Update portfolio manager
quantity = self.Portfolio[symbol].Quantity
self.portfolio_manager.update_position(symbol, quantity)
else:
# Has position - check for pyramid opportunities
current_direction = 1 if is_long else -1
# Check if we can add pyramid legs
if self.risk_manager.can_add_pyramid_leg(symbol,
self.breakout_logic.get_position_size(symbol, 'primary', direction=current_direction)):
# Check pyramid opportunities
pyramid_ops = self.breakout_logic.check_pyramid_opportunities(
symbol, indicator_bundle, current_direction
)
# Process pyramid1 opportunity
if pyramid_ops['pyramid1']:
pyramid_size = self.breakout_logic.get_position_size(symbol, 'pyramid1', direction=current_direction)
if pyramid_size > 0:
# Add pyramid leg (same direction as existing position)
try:
current_holding = self.Portfolio[symbol]
portfolio_value = self.Portfolio.TotalPortfolioValue
if portfolio_value > 0:
current_holdings_pct = current_holding.HoldingsValue / portfolio_value
additional_size = pyramid_size * current_direction
new_holdings_pct = current_holdings_pct + additional_size
self.SetHoldings(symbol, new_holdings_pct)
direction_str = "LONG" if current_direction == 1 else "SHORT"
self.Debug(f"PYRAMID1 {direction_str}: {symbol} adding {pyramid_size:.1%} "
f"(total now {abs(new_holdings_pct):.1%})")
# Add to risk state
entry_price = current_price
atr_value = values.get('atr', 0)
if atr_value > 0:
self.risk_manager.create_risk_state(
symbol, entry_price, pyramid_size, atr_value,
direction=current_direction, entry_type='pyramid1'
)
# Update portfolio manager
quantity = self.Portfolio[symbol].Quantity
self.portfolio_manager.update_position(symbol, quantity)
else:
self.Debug(f"Portfolio value error for {symbol} pyramid1")
except Exception as e:
self.Debug(f"Error adding pyramid1 for {symbol}: {e}")
# Process pyramid2 opportunity
elif pyramid_ops['pyramid2']: # elif to avoid double-adding
pyramid_size = self.breakout_logic.get_position_size(symbol, 'pyramid2', direction=current_direction)
if pyramid_size > 0:
# Add pyramid leg (same direction as existing position)
try:
current_holding = self.Portfolio[symbol]
portfolio_value = self.Portfolio.TotalPortfolioValue
if portfolio_value > 0:
current_holdings_pct = current_holding.HoldingsValue / portfolio_value
additional_size = pyramid_size * current_direction
new_holdings_pct = current_holdings_pct + additional_size
self.SetHoldings(symbol, new_holdings_pct)
direction_str = "LONG" if current_direction == 1 else "SHORT"
self.Debug(f"PYRAMID2 {direction_str}: {symbol} adding {pyramid_size:.1%} "
f"(total now {abs(new_holdings_pct):.1%})")
# Add to risk state
entry_price = current_price
atr_value = values.get('atr', 0)
if atr_value > 0:
self.risk_manager.create_risk_state(
symbol, entry_price, pyramid_size, atr_value,
direction=current_direction, entry_type='pyramid2'
)
# Update portfolio manager
quantity = self.Portfolio[symbol].Quantity
self.portfolio_manager.update_position(symbol, quantity)
else:
self.Debug(f"Portfolio value error for {symbol} pyramid2")
except Exception as e:
self.Debug(f"Error adding pyramid2 for {symbol}: {e}")
# Log portfolio status periodically
invested_symbols = [symbol for symbol in self.Portfolio.Keys if self.Portfolio[symbol].Invested]
if len(invested_symbols) > 0:
total_exposure = sum(abs(holding.HoldingsValue) for holding in self.Portfolio.Values) / self.Portfolio.TotalPortfolioValue
long_count = sum(1 for symbol in invested_symbols if self.Portfolio[symbol].Quantity > 0)
short_count = sum(1 for symbol in invested_symbols if self.Portfolio[symbol].Quantity < 0)
self.Debug(f"Portfolio: {len(invested_symbols)} positions ({long_count} long, {short_count} short), "
f"{total_exposure:.1%} exposure")
def Rebalance(self):
"""
Legacy rebalancing method - now handled in OnData.
Kept for backward compatibility.
"""
self.Debug("Rebalance called - trading logic now runs in OnData with valid prices")
def OnOrderEvent(self, orderEvent):
"""
Called when an order is filled, partially filled, or cancelled.
"""
if orderEvent.Status == OrderStatus.Filled:
self.Debug(f"Order filled: {orderEvent.Symbol} - {orderEvent.FillQuantity} shares at ${orderEvent.FillPrice}")
# Update portfolio manager position tracking
symbol = orderEvent.Symbol
current_quantity = self.Portfolio[symbol].Quantity
self.portfolio_manager.update_position(symbol, current_quantity)
def _coarse_selection(self, coarse):
"""Runs every trading day before market open. Supplies a fresh,
dollar-volume-ranked symbol list to UniverseSelector without forcing
subscriptions, so compute cost stays low."""
try:
# Filter for valid equities with price > $1
valid_equities = [c for c in coarse if c.Price > 1 and c.HasFundamentalData]
# Sort by dollar volume (highest first)
top_equities = sorted(valid_equities, key=lambda x: x.DollarVolume, reverse=True)
# Take top N equities
top_symbols = top_equities[:params.EQUITY_CANDIDATE_COUNT]
# Push list of ticker strings to UniverseSelector
symbol_strings = [c.Symbol.Value for c in top_symbols]
self.universe_selector.refresh_equity_candidates(symbol_strings)
# Return empty list - we're only harvesting tickers, not subscribing
return []
except Exception as e:
self.Debug(f"ERROR in _coarse_selection: {e}")
return [] # region imports
from AlgorithmImports import *
# endregion
"""
Strategy Parameters Module
Core parameters extracted from research notebook for breakout strategy
"""
# Core strategy parameters
LOOKBACK_PERIOD = 18 # Bars for high/low lookback
ATR_PERIOD = 14 # ATR calculation period
EMA_PERIOD = 21 # Trend EMA period
VOL_MA_PERIOD = 20 # Volume moving average period
CONF = 2 # Confirmation bars for entry
FEE = 0.02 # Trading fee percentage
# Phase 1: Adaptive parameters (replaced ASSET_PARAMS)
# Global defaults that adapt based on symbol characteristics
USE_ADAPTIVE_PARAMS = False # Emergency kill-switch for adaptive logic
K_DEFAULT = 1.25 # Static default when adaptive disabled
K_MIN = 0.90 # Allow smaller K for low-vol assets
K_MAX = 1.80 # Clamp K so it never becomes too strict
STOP_ATR_MULT_DEFAULT = 1.50 # Static default when adaptive disabled
STOP_ATR_MULT_MIN = 0.70 # Allow tighter stops for low vol
STOP_ATR_MULT_MAX = 2.20 # Limit stop widening
TRAIL_ATR_MULT_DEFAULT = 2.20 # Static default when adaptive disabled
TRAIL_ATR_MULT_MIN = 1.20
TRAIL_ATR_MULT_MAX = 3.00
VOL_MULT_DEFAULT = 1.0 # Default volume multiplier
VOL_MULT_MIN = 0.7 # Minimum volume multiplier
VOL_MULT_MAX = 1.3 # Maximum volume multiplier
# Asset class baselines (when adaptive enabled)
CRYPTO_K_BASE = 1.40
CRYPTO_STOP_BASE = 1.80
EQUITY_K_BASE = 1.20
EQUITY_STOP_BASE = 1.40
BOND_K_BASE = 1.00
BOND_STOP_BASE = 1.20
# ATR percentage thresholds for adaptive scaling
HIGH_VOLATILITY_THRESHOLD = 0.025 # 2.5% daily ATR considered high vol
LOW_VOLATILITY_THRESHOLD = 0.008 # 0.8% daily ATR considered low vol
# Position sizing parameters
BASE_POSITION_SIZE = 0.20 # 20% base position size for Step 5
MAX_POSITION_SIZE = 0.30 # 30% maximum position size
MIN_POSITION_SIZE = 0.15 # 15% minimum position size
# Step 7: Volatility-based position sizing parameters
BASELINE_ATR_PCT = 0.015 # 1.5% baseline ATR for position sizing
# Phase 5: Removed POSITION_SIZE_MULTIPLIER - now using Kelly-proxy sizing
SHARPE_LOOKBACK = 90 # Lookback period for Sharpe ratio calculation
MIN_DATA_BARS = 50 # Minimum data bars needed for position sizing
# Step 8: Short side + pyramiding (multi-K) parameters
ENABLE_SHORTS = False # Disable shorts while optimizing universe scaling
ENABLE_PYRAMIDING = False # Disable pyramiding for testing impact
TRADING_SIDE = 'long' # Long only for Phase 2
SHORT_BORROW_FEE = 0.01 # Additional fee for shorts (0.01% = 1bp)
# Phase 3: Short-book repair parameters
ASYMMETRIC_SHORT_STOPS = True # Use tighter stops for shorts
SHORT_STOP_MULTIPLIER = 0.75 # 25% tighter stops for shorts (1.5 -> 1.125 ATR)
ENABLE_SHORT_REGIME_FILTER = True # Block shorts in bull markets
SPY_EMA_PERIOD = 200 # EMA period for SPY regime filter
KELLY_SHORT_EDGE_DISCOUNT = 0.5 # Halve Kelly edge estimate for shorts
EXPENSIVE_BORROW_THRESHOLD = 0.0003 # Skip shorts with borrow > 3bp/day
# Phase 0: Borrow cost tracking
DAILY_BORROW_RATE_DEFAULT = 0.0001 # 1 bp/day default borrow rate
ENABLE_BORROW_COST_TRACKING = True # Track daily borrow costs
# Multi-K Pyramiding parameters
PYRAMID_K1 = 1.0 # First pyramid level K value
PYRAMID_K2 = 1.5 # Second pyramid level K value
PYRAMID_SIZE_MULT = 0.5 # Size multiplier for pyramid adds (50%)
MAX_TOTAL_SIZE_MULT = 1.5 # Maximum total position size (1.5x base)
PYRAMID_COOLDOWN = 3 # Bars cooldown between pyramid attempts
# Step 9: Event-based exits & trailing runner parameters
SCALE_OUT_PERCENT = 0.40 # Exit 40% at profit target (preserves 60% for runner)
EVENT_TRAIL_DISTANCE = 3.0 # 3x ATR distance for event-based trail
PROFIT_TARGET_MULT = 2.8 # 2.8x ATR for scale-out target
# ATR-Scaled "Soft Stop" System
ENABLE_SOFT_STOPS = True # Enable adaptive stop management
SOFT_STOP_BARS = 8 # Bars position must survive before stop tightening
SOFT_STOP_MULTIPLIER = 0.75 # Tighten stops to 75% of original after survival period
POST_SCALE_STOP_MULT = 0.6 # Tighten runner stops to 60% after scale-out
# Enhanced Pyramiding Detection
ENABLE_INTRABAR_PYRAMIDS = False # Detect pyramid opportunities within bars
PYRAMID_MIN_MOVE = 0.005 # Minimum 0.5% move to trigger intrabar pyramid
# Step 10: Risk Governor & Analytics parameters
ENABLE_RISK_GOVERNOR = True # Enable equity curve governor
DRAWDOWN_LOOKBACK = 90 # Rolling drawdown lookback period
MAX_DRAWDOWN_THRESHOLD = 0.08 # Max 8% rolling drawdown before risk reduction
RISK_REDUCTION_FACTOR = 0.5 # Halve position sizes during drawdown periods
WIN_RATE_LOOKBACK = 90 # Rolling win rate lookback for recent performance
MIN_WIN_RATE_THRESHOLD = 0.35 # Minimum 35% win rate to maintain full sizing
# Low Volatility Fallbacks (DISABLED - Phase 2 cleanup)
LOW_VOL_SIGNAL_THRESHOLD = 10 # Minimum signals per year before fallback
ENABLE_VOL_FALLBACKS = False # Disabled - feature was incomplete
# FALLBACK_RSI_THRESH = 0.3 # Removed - unused parameters
# FALLBACK_BB_THRESH = 0.2 # Removed - unused parameters
# Crypto Volatility Gating
CRYPTO_VOL_GATE = True # Enable crypto volatility gating
CRYPTO_MIN_VOL = 0.40 # Minimum 40% rolling 100-day realized volatility
CRYPTO_VOL_LOOKBACK = 100 # 100-day realized volatility lookback
# Complex Regime Filter Parameters
TREND_EMA_FAST = 20 # Fast EMA for trend detection
TREND_EMA_SLOW = 200 # Slow EMA for trend detection
TREND_SLOPE_THRESHOLD = 0.003 # 0.3% daily slope threshold for stocks
CRYPTO_SLOPE_THRESHOLD = 0.004 # 0.4% daily slope threshold for crypto (24/7 markets)
SHARPE_THRESHOLD = 0.0 # Sharpe threshold for GLD filter (rolling Sharpe > 0)
# ─── Hybrid Universe Control ──────────────────────────
ENABLE_STATIC_UNIVERSE = True # keep your core list untouched
ENABLE_DYNAMIC_UNIVERSE = True # master switch for the add-on sleeve
STATIC_UNIVERSE_LIST = [ # your proven base – NEVER altered
"BTCUSD", "ETHUSD", "SOLUSDT",
"NVDA", "TSLA", "AAPL", "MSFT", "GOOGL", "GLD"
]
# ─── Dynamic sleeve sizing rules ──────────────────────
DYN_UNIVERSE_MIN = 3 # never run with < 3 extra names
DYN_UNIVERSE_MAX = 15 # cap to protect liquidity / focus
MAX_ADDITIONS_PER_SCAN = 3 # at most 3 fresh names per scan
MAX_DROPS_PER_PRUNE = 5 # protect turnover on the way out
# ─── Scan / prune cadence (empirically robust) ───────
DYNAMIC_SCAN_INTERVAL_DAYS = 7 # **weekly** add scan (Mon 10 am)
DYNAMIC_PRUNE_INTERVAL_DAYS = 30 # **monthly** cull (1st trading day)
# ─── Edge-score inputs (literature based) ─────────────
EDGE_LOOKBACK_DAYS = 63 # 3-month momentum window
EDGE_SHARPE_LOOKBACK_DAYS = 63
MIN_AVG_DOLLAR_VOLUME = 10_000_000 # liquidity filter
MOMENTUM_PERCENTILE_THRESH = 75 # only keep top quartile of returns
MIN_SHARPE_THRESHOLD = 1.0 # positive, risk-adjusted
MAX_ATR_PCT_THRESHOLD = 0.05 # skip > 5 % daily ATR assets
MAX_CORRELATION_THRESHOLD = 0.80 # diversification guard
# ─── Live candidate pool sizing ───────────────────────
EQUITY_CANDIDATE_COUNT = 200 # top-volume equities to consider each week
# ─── Candidate pool source control ────────────────────
USE_LIVE_CANDIDATE_POOL = False # True: use QC coarse universe, False: use static list # region imports
from AlgorithmImports import *
# endregion
"""
Portfolio Management Module
Handles position sizing, risk management, and portfolio optimization
"""
class PortfolioManager:
"""Manages portfolio allocation and risk"""
def __init__(self, algorithm):
self.algorithm = algorithm
self.positions = {}
self.max_positions = 10 # Will be configurable later
def calculate_position_size(self, symbol, signal_strength=1.0):
"""Calculate position size for a symbol
Args:
symbol: Symbol to calculate size for
signal_strength (float): Strength of the signal (0-1)
Returns:
float: Position size as percentage of portfolio
"""
# Placeholder - will add proper position sizing logic later
# For now, equal weight allocation
if self.max_positions > 0:
base_size = 1.0 / self.max_positions
return base_size * signal_strength
return 0.0
def can_open_position(self, symbol):
"""Check if we can open a new position
Args:
symbol: Symbol to check
Returns:
bool: True if position can be opened
"""
# Check if we're already at max positions
current_positions = len([pos for pos in self.positions.values() if pos != 0])
return current_positions < self.max_positions
def update_position(self, symbol, quantity):
"""Update position tracking
Args:
symbol: Symbol to update
quantity: New position quantity
"""
self.positions[symbol] = quantity
def get_position(self, symbol):
"""Get current position for symbol
Args:
symbol: Symbol to check
Returns:
float: Current position quantity
"""
return self.positions.get(symbol, 0)
def get_portfolio_exposure(self):
"""Get total portfolio exposure
Returns:
float: Total exposure as percentage
"""
# Placeholder - will calculate actual exposure later
return 0.0 # region imports from AlgorithmImports import * # endregion """ Risk Management Module - Modular Structure Main entry point that imports from specialized modules """ # Import all classes and functions from the modular structure from risk_state import RiskState, PositionLeg from risk_manager import RiskManager, calculate_volatility_based_position_size from risk_analytics import RiskAnalytics # Re-export everything for backward compatibility __all__ = ['RiskState', 'PositionLeg', 'RiskManager', 'RiskAnalytics', 'calculate_volatility_based_position_size']
"""
Risk Analytics Module
Contains analytics and risk governor functionality for portfolio performance tracking
"""
from AlgorithmImports import *
from typing import List
from params import (ENABLE_RISK_GOVERNOR, DRAWDOWN_LOOKBACK, MAX_DRAWDOWN_THRESHOLD,
RISK_REDUCTION_FACTOR, WIN_RATE_LOOKBACK, MIN_WIN_RATE_THRESHOLD)
class RiskAnalytics:
"""
Handles risk analytics and risk governor functionality.
Step 10: Risk governor with equity curve tracking and adaptive sizing.
"""
def __init__(self, algorithm):
self.algorithm = algorithm
# Step 10: Risk governor tracking
self.equity_curve = [] # Track daily equity values
self.trade_returns = [] # Track individual trade returns
self.last_equity_update = None # Track last equity update time
# Phase 5: Kelly-proxy position sizing
self.kelly_trade_returns = RollingWindow[float](50) # Last 50 trades for Kelly calculation
# Analytics tracking
self.total_trades = 0
self.winning_trades = 0
self.total_pnl = 0.0
self.equity_high_water_mark = 0.0
self.max_drawdown = 0.0
# Performance metrics
self.sharpe_ratio = 0.0
self.win_rate = 0.0
self.avg_trade_return = 0.0
self.profit_factor = 0.0
def update_equity_curve(self, current_time=None):
"""Update daily equity curve for risk governor calculations"""
if current_time is None:
current_time = self.algorithm.Time
# Only update once per day
if (self.last_equity_update is not None and
current_time.date() == self.last_equity_update.date()):
return
current_equity = self.algorithm.Portfolio.TotalPortfolioValue
self.equity_curve.append(current_equity)
self.last_equity_update = current_time
# Update high water mark and max drawdown
if current_equity > self.equity_high_water_mark:
self.equity_high_water_mark = current_equity
current_drawdown = (self.equity_high_water_mark - current_equity) / self.equity_high_water_mark
if current_drawdown > self.max_drawdown:
self.max_drawdown = current_drawdown
def calculate_adaptive_risk_multiplier(self):
"""Calculate risk multiplier based on equity curve and recent performance"""
if not ENABLE_RISK_GOVERNOR or len(self.equity_curve) < DRAWDOWN_LOOKBACK:
return 1.0
try:
# Calculate rolling drawdown
recent_equity = self.equity_curve[-DRAWDOWN_LOOKBACK:]
max_equity = max(recent_equity)
current_equity = recent_equity[-1]
current_drawdown = (max_equity - current_equity) / max_equity
# Phase 4: Remove win-rate logic, keep only drawdown rule
drawdown_risk = current_drawdown > MAX_DRAWDOWN_THRESHOLD
if drawdown_risk:
self.algorithm.Debug(f"RISK GOVERNOR: Drawdown risk detected - {current_drawdown:.2%} > {MAX_DRAWDOWN_THRESHOLD:.2%}")
# Apply risk reduction if drawdown condition met
if drawdown_risk:
risk_multiplier = RISK_REDUCTION_FACTOR
self.algorithm.Debug(f"RISK GOVERNOR: Reducing position sizes by {(1-risk_multiplier)*100:.0f}%")
return risk_multiplier
return 1.0 # No risk reduction
except Exception as e:
self.algorithm.Debug(f"Error calculating risk multiplier: {e}")
return 1.0
def record_trade(self, trade_return, trade_pnl):
"""Record completed trade for analytics"""
self.trade_returns.append(trade_return)
self.total_trades += 1
self.total_pnl += trade_pnl
if trade_return > 0:
self.winning_trades += 1
# Phase 5: Update Kelly rolling window
self.kelly_trade_returns.Add(trade_return)
# Update performance metrics
self.win_rate = self.winning_trades / self.total_trades if self.total_trades > 0 else 0.0
self.avg_trade_return = sum(self.trade_returns) / len(self.trade_returns) if self.trade_returns else 0.0
# Calculate profit factor
winning_trades = [r for r in self.trade_returns if r > 0]
losing_trades = [r for r in self.trade_returns if r < 0]
if winning_trades and losing_trades:
gross_profit = sum(winning_trades)
gross_loss = abs(sum(losing_trades))
self.profit_factor = gross_profit / gross_loss if gross_loss > 0 else 0.0
else:
self.profit_factor = 0.0
def calculate_sharpe_ratio(self, lookback_days=252):
"""Calculate Sharpe ratio from equity curve"""
if len(self.equity_curve) < 2:
return 0.0
try:
# Calculate daily returns from equity curve
equity_values = self.equity_curve[-lookback_days:] if len(self.equity_curve) > lookback_days else self.equity_curve
if len(equity_values) < 2:
return 0.0
daily_returns = []
for i in range(1, len(equity_values)):
daily_return = (equity_values[i] - equity_values[i-1]) / equity_values[i-1]
daily_returns.append(daily_return)
if not daily_returns:
return 0.0
# Calculate Sharpe ratio (assuming 0% risk-free rate)
mean_return = sum(daily_returns) / len(daily_returns)
if len(daily_returns) < 2:
return 0.0
# Calculate standard deviation
variance = sum((r - mean_return) ** 2 for r in daily_returns) / (len(daily_returns) - 1)
std_dev = variance ** 0.5
if std_dev == 0:
return 0.0
# Annualize the Sharpe ratio
annualized_sharpe = (mean_return / std_dev) * (252 ** 0.5)
return annualized_sharpe
except Exception as e:
self.algorithm.Debug(f"Error calculating Sharpe ratio: {e}")
return 0.0
def calculate_max_drawdown(self):
"""Calculate maximum drawdown from equity curve"""
if len(self.equity_curve) < 2:
return 0.0
try:
max_dd = 0.0
peak = self.equity_curve[0]
for equity in self.equity_curve:
if equity > peak:
peak = equity
drawdown = (peak - equity) / peak
if drawdown > max_dd:
max_dd = drawdown
return max_dd
except Exception as e:
self.algorithm.Debug(f"Error calculating max drawdown: {e}")
return 0.0
def calculate_cagr(self):
"""Calculate Compound Annual Growth Rate"""
if len(self.equity_curve) < 2:
return 0.0
try:
start_value = self.equity_curve[0]
end_value = self.equity_curve[-1]
if start_value <= 0:
return 0.0
# Assume daily data, so number of years = days / 252
years = len(self.equity_curve) / 252.0
if years <= 0:
return 0.0
cagr = ((end_value / start_value) ** (1.0 / years)) - 1.0
return cagr
except Exception as e:
self.algorithm.Debug(f"Error calculating CAGR: {e}")
return 0.0
def calculate_kelly_position_size(self, k=0.5, direction=1):
"""
Calculate Kelly-proxy position size based on recent trade returns.
Phase 5: Kelly formula approximation.
Phase 3.4: Added direction parameter for short edge discount.
Args:
k: Kelly multiplier (0.5 = half-Kelly for safety)
direction: 1 for long, -1 for short (applies edge discount for shorts)
Returns:
float: Position size between 0.10 and 0.30
"""
if self.kelly_trade_returns.Count < 10: # Need minimum trades
return 0.20 # Default position size
try:
# Get recent trade returns
returns = []
for i in range(self.kelly_trade_returns.Count):
returns.append(self.kelly_trade_returns[i])
if not returns:
return 0.20
# Calculate edge (mean return) and variance
edge = sum(returns) / len(returns)
# Phase 3.4: Apply edge discount for shorts
from params import KELLY_SHORT_EDGE_DISCOUNT
if direction == -1: # Short position
edge = edge * KELLY_SHORT_EDGE_DISCOUNT
self.algorithm.Debug(f"KELLY SHORT: Applied {KELLY_SHORT_EDGE_DISCOUNT}x edge discount")
if len(returns) < 2:
return 0.20
variance = sum((r - edge) ** 2 for r in returns) / (len(returns) - 1)
if variance <= 0 or edge <= 0:
return 0.10 # Minimum size if no edge or variance issues
# Kelly formula: f = edge / variance
kelly_fraction = k * edge / variance
# Apply bounds: 10% minimum, 30% maximum
kelly_fraction = max(0.10, min(0.30, kelly_fraction))
self.algorithm.Debug(f"KELLY SIZING: Edge={edge:.3%}, Var={variance:.6f}, "
f"Kelly={kelly_fraction:.2%} (k={k})")
return kelly_fraction
except Exception as e:
self.algorithm.Debug(f"Error calculating Kelly position size: {e}")
return 0.20 # Fallback
def emit_periodic_summary(self):
"""Emit periodic performance summary"""
if len(self.equity_curve) < 2:
return
try:
# Calculate key metrics
current_equity = self.equity_curve[-1]
cagr = self.calculate_cagr()
max_dd = self.calculate_max_drawdown()
sharpe = self.calculate_sharpe_ratio()
# Format summary
summary = f"""
=== PERFORMANCE SUMMARY ===
Current Equity: ${current_equity:,.2f}
CAGR: {cagr:.2%}
Max Drawdown: {max_dd:.2%}
Sharpe Ratio: {sharpe:.2f}
Win Rate: {self.win_rate:.2%}
Total Trades: {self.total_trades}
Avg Trade Return: {self.avg_trade_return:.2%}
Profit Factor: {self.profit_factor:.2f}
Risk Multiplier: {self.calculate_adaptive_risk_multiplier():.2f}
=========================""".strip()
self.algorithm.Debug(summary)
except Exception as e:
self.algorithm.Debug(f"Error generating performance summary: {e}")
def get_performance_metrics(self):
"""Get current performance metrics as dictionary"""
return {
'total_trades': self.total_trades,
'winning_trades': self.winning_trades,
'win_rate': self.win_rate,
'avg_trade_return': self.avg_trade_return,
'profit_factor': self.profit_factor,
'total_pnl': self.total_pnl,
'max_drawdown': self.max_drawdown,
'equity_high_water_mark': self.equity_high_water_mark,
'current_equity': self.equity_curve[-1] if self.equity_curve else 0.0,
'cagr': self.calculate_cagr(),
'sharpe_ratio': self.calculate_sharpe_ratio(),
'risk_multiplier': self.calculate_adaptive_risk_multiplier()
} """
Risk Manager Module
Core risk management functionality with position tracking and exit logic
"""
from AlgorithmImports import *
from typing import Dict, Tuple, Optional
from params import (PROFIT_TARGET_MULT, ENABLE_SOFT_STOPS, SOFT_STOP_BARS,
SOFT_STOP_MULTIPLIER, POST_SCALE_STOP_MULT, EVENT_TRAIL_DISTANCE,
SCALE_OUT_PERCENT, PYRAMID_COOLDOWN, ENABLE_BORROW_COST_TRACKING,
DAILY_BORROW_RATE_DEFAULT, STOP_ATR_MULT_DEFAULT, ASYMMETRIC_SHORT_STOPS,
SHORT_STOP_MULTIPLIER)
from risk_state import RiskState, PositionLeg
from risk_analytics import RiskAnalytics
def calculate_volatility_based_position_size(algorithm, symbol, atr_value, current_price, direction=1):
"""
Calculate position size using Kelly-proxy methodology.
Phase 5: Enhanced with Kelly formula based on recent trade performance.
Phase 3.4: Added direction parameter for short edge discount.
"""
try:
# Phase 5: Use Kelly-proxy sizing from risk analytics
if hasattr(algorithm, 'risk_manager') and algorithm.risk_manager.analytics:
kelly_size = algorithm.risk_manager.analytics.calculate_kelly_position_size(k=0.5, direction=direction)
# Optional: Still consider volatility as a scaling factor
from params import BASELINE_ATR_PCT
atr_pct = atr_value / current_price
volatility_factor = min(1.2, max(0.8, (BASELINE_ATR_PCT / atr_pct) ** 0.25)) # Gentle scaling
final_size = kelly_size * volatility_factor
final_size = max(0.10, min(0.30, final_size)) # Ensure bounds
algorithm.Debug(f"KELLY SIZING {symbol}: Kelly={kelly_size:.2%}, "
f"VolFactor={volatility_factor:.2f}, Final={final_size:.2%}")
return final_size
else:
# Fallback to simple volatility-based sizing if analytics not available
from params import BASELINE_ATR_PCT
atr_pct = atr_value / current_price
size = 0.20 / (atr_pct / BASELINE_ATR_PCT) ** 0.5
size = max(0.10, min(0.30, size))
algorithm.Debug(f"FALLBACK SIZING {symbol}: ATR%={atr_pct:.3%}, Size={size:.2%}")
return size
except Exception as e:
algorithm.Debug(f"Error calculating position size for {symbol}: {e}")
return 0.20 # Fallback to base size
class RiskManager:
"""
Manages risk states for all positions with pyramiding and short support.
Step 8: Enhanced for multiple legs and bidirectional trading.
Step 10: Added risk governor and analytics functionality.
"""
def __init__(self, algorithm):
self.algorithm = algorithm
self.risk_states: Dict[str, RiskState] = {} # symbol -> RiskState
self.analytics = RiskAnalytics(algorithm) # Step 10: Analytics module
def create_risk_state(self, symbol, entry_price, position_size, atr_value, direction, entry_type='primary'):
"""
Create a new risk state for a position or add to existing one.
Phase 1.4: Updated to use adaptive parameters instead of ASSET_PARAMS.
Args:
symbol: Symbol of the position
entry_price: Entry price of the position
position_size: Position size as percentage of portfolio
atr_value: ATR value at entry time
direction: 1 for long, -1 for short
entry_type: 'primary', 'pyramid1', 'pyramid2'
Returns:
RiskState: New or updated risk state object
"""
# Get adaptive parameters from indicator bundle
adaptive_stop_atr = STOP_ATR_MULT_DEFAULT # Default fallback
if symbol in self.algorithm.indicators and self.algorithm.indicators[symbol].is_ready:
values = self.algorithm.indicators[symbol].get_values()
adaptive_stop_atr = values.get('adaptive_stop_atr', STOP_ATR_MULT_DEFAULT)
# Check if we already have a risk state (for pyramiding)
if symbol in self.risk_states:
risk_state = self.risk_states[symbol]
# Validate direction consistency
if risk_state.direction != direction:
self.algorithm.Debug(f"ERROR: Direction mismatch for {symbol}. "
f"Existing: {risk_state.direction}, New: {direction}")
return risk_state
# Add pyramid leg
risk_state.add_position_leg(entry_price, position_size, entry_type, direction, self.algorithm.Time)
self.algorithm.Debug(f"PYRAMID ADDED: {symbol} - Total size: {risk_state.total_position_size:.2%}")
return risk_state
# Create new risk state
risk_state = RiskState(
position_legs=[PositionLeg(
entry_price=entry_price,
position_size=position_size,
entry_time=self.algorithm.Time,
entry_type=entry_type,
direction=direction
)],
total_position_size=position_size,
direction=direction,
original_stop=0.0,
current_stop=0.0,
profit_target=0.0,
scaled_out=False,
remaining_position=1.0,
runner_trail=0.0,
highest_high_since_entry=entry_price if direction == 1 else 0.0,
lowest_low_since_entry=entry_price if direction == -1 else 0.0,
bars_held=0,
is_active=True,
entry_time=self.algorithm.Time,
last_update_time=self.algorithm.Time,
last_pyramid_time=self.algorithm.Time
)
# Calculate stop loss and profit target based on direction using adaptive parameters
# Phase 3.2: Apply asymmetric stops for shorts
final_stop_atr = adaptive_stop_atr
if ASYMMETRIC_SHORT_STOPS and direction == -1: # Tighter stops for shorts
final_stop_atr = adaptive_stop_atr * SHORT_STOP_MULTIPLIER
if direction == 1: # Long position
risk_state.original_stop = entry_price - (adaptive_stop_atr * atr_value)
risk_state.current_stop = risk_state.original_stop
risk_state.profit_target = entry_price + (PROFIT_TARGET_MULT * atr_value)
else: # Short position (direction == -1)
risk_state.original_stop = entry_price + (final_stop_atr * atr_value)
risk_state.current_stop = risk_state.original_stop
risk_state.profit_target = entry_price - (PROFIT_TARGET_MULT * atr_value)
# Store risk state
self.risk_states[symbol] = risk_state
direction_str = "LONG" if direction == 1 else "SHORT"
self.algorithm.Debug(f"RISK STATE CREATED: {symbol} {direction_str} - Entry=${entry_price:.2f}, "
f"Stop=${risk_state.original_stop:.2f}, Target=${risk_state.profit_target:.2f}, "
f"ATR={atr_value:.3f}")
return risk_state
def close_position(self, symbol, exit_reason):
"""
Close a position and mark risk state as inactive.
Phase 0.2: Added borrow cost calculation for short positions.
Args:
symbol: Symbol to close
exit_reason: Reason for closing (STOP_LOSS, PROFIT_TARGET, etc.)
"""
if symbol in self.risk_states:
risk_state = self.risk_states[symbol]
# Calculate P&L across all legs BEFORE clearing them
current_price = self.algorithm.Securities[symbol].Price
total_pnl = 0.0
total_size = 0.0
for leg in risk_state.position_legs:
if risk_state.direction == 1: # Long
leg_pnl = (current_price - leg.entry_price) * leg.position_size
else: # Short
leg_pnl = (leg.entry_price - current_price) * leg.position_size
total_pnl += leg_pnl
total_size += leg.position_size
weighted_entry = risk_state.get_weighted_entry_price()
# Calculate return percentage
if weighted_entry != 0:
if risk_state.direction == 1:
pnl_pct = ((current_price - weighted_entry) / weighted_entry)
else:
pnl_pct = ((weighted_entry - current_price) / weighted_entry)
else:
pnl_pct = 0.0
# Phase 0.2: Calculate borrow cost for short positions
total_borrow_cost = 0.0
if ENABLE_BORROW_COST_TRACKING and risk_state.direction == -1: # Short position
total_borrow_cost = self._calculate_borrow_cost(risk_state, current_price)
if total_borrow_cost > 0:
self.algorithm.Debug(f"BORROW COST {symbol}: ${total_borrow_cost:.2f} "
f"({total_borrow_cost/abs(total_pnl)*100:.1f}% of P&L)")
# Now clear position data
risk_state.clear_position()
direction_str = "LONG" if risk_state.direction == 1 else "SHORT"
borrow_str = f", BorrowCost=${total_borrow_cost:.2f}" if total_borrow_cost > 0 else ""
self.algorithm.Debug(f"POSITION CLOSED: {symbol} {direction_str} - Reason={exit_reason}, "
f"AvgEntry=${weighted_entry:.2f}, Exit=${current_price:.2f}, "
f"Return={pnl_pct:.2%}, TotalSize={total_size:.2%}{borrow_str}")
# Remove from active tracking
del self.risk_states[symbol]
def _calculate_borrow_cost(self, risk_state, current_price):
"""
Calculate total borrow cost for a short position.
Phase 0.2: Added borrow cost tracking.
Args:
risk_state: RiskState object for the position
current_price: Current symbol price
Returns:
float: Total borrow cost in dollars
"""
try:
total_borrow_cost = 0.0
current_time = self.algorithm.Time
for leg in risk_state.position_legs:
if risk_state.direction == -1: # Only for short positions
# Calculate days held for this leg
time_diff = current_time - leg.entry_time
days_held = max(1, time_diff.total_seconds() / (24 * 60 * 60)) # At least 1 day
# Calculate position value
position_value = leg.position_size * self.algorithm.Portfolio.TotalPortfolioValue
# Calculate borrow cost (daily rate * position value * days)
leg_borrow_cost = DAILY_BORROW_RATE_DEFAULT * position_value * days_held
total_borrow_cost += leg_borrow_cost
return total_borrow_cost
except Exception as e:
self.algorithm.Debug(f"Error calculating borrow cost: {e}")
return 0.0
def get_total_position_size(self, symbol):
"""Get total position size for a symbol across all legs"""
if symbol not in self.risk_states:
return 0.0
return self.risk_states[symbol].total_position_size
def can_add_pyramid_leg(self, symbol, base_position_size):
"""Check if we can add a pyramid leg to an existing position"""
if symbol not in self.risk_states:
return False
risk_state = self.risk_states[symbol]
return risk_state.can_add_pyramid(self.algorithm.Time)
def check_pyramid_cooldown(self, symbol):
"""Check if pyramid cooldown period has passed"""
if symbol not in self.risk_states:
return True
risk_state = self.risk_states[symbol]
if risk_state.last_pyramid_time == datetime.min:
return True
# PYRAMID_COOLDOWN is in days, convert to seconds
time_diff = self.algorithm.Time - risk_state.last_pyramid_time
return time_diff.total_seconds() >= PYRAMID_COOLDOWN * 24 * 60 * 60
# ===== STEP 9: EVENT-BASED EXIT METHODS =====
def calculate_adaptive_stops(self, symbol, current_price, atr_value):
"""Calculate adaptive stop levels based on position survival and scale-out status"""
if symbol not in self.risk_states or not ENABLE_SOFT_STOPS:
return self.risk_states[symbol].current_stop if symbol in self.risk_states else 0.0
risk_state = self.risk_states[symbol]
try:
# Soft stop after survival period
if risk_state.bars_held >= SOFT_STOP_BARS:
weighted_entry = risk_state.get_weighted_entry_price()
if risk_state.direction == 1: # Long position
tightened_stop = weighted_entry - (weighted_entry - risk_state.original_stop) * SOFT_STOP_MULTIPLIER
# Additional tightening after scale-out (preserve remaining 60%)
if risk_state.scaled_out:
tightened_stop = weighted_entry - (weighted_entry - risk_state.original_stop) * POST_SCALE_STOP_MULT
# Ensure stop is never widened for longs
return max(risk_state.original_stop, tightened_stop)
else: # Short position (direction == -1)
tightened_stop = weighted_entry + (risk_state.original_stop - weighted_entry) * SOFT_STOP_MULTIPLIER
# Additional tightening after scale-out (preserve remaining 60%)
if risk_state.scaled_out:
tightened_stop = weighted_entry + (risk_state.original_stop - weighted_entry) * POST_SCALE_STOP_MULT
# Ensure stop is never widened for shorts
return min(risk_state.original_stop, tightened_stop)
return risk_state.original_stop
except Exception as e:
self.algorithm.Debug(f"Error calculating adaptive stops for {symbol}: {e}")
return risk_state.original_stop if symbol in self.risk_states else 0.0
def update_event_based_trail(self, symbol, current_high, current_low, atr_value):
"""Update event-based runner trail only when new highs/lows are achieved"""
if symbol not in self.risk_states:
return False
risk_state = self.risk_states[symbol]
# Only update if we've scaled out (runner trail is active)
if not risk_state.scaled_out:
return False
# Check for new extremes and update trail
new_extreme = risk_state.update_highs_lows(current_high, current_low)
if new_extreme:
if risk_state.direction == 1: # Long positions
risk_state.runner_trail = current_low - EVENT_TRAIL_DISTANCE * atr_value
self.algorithm.Debug(f"EVENT TRAIL UPDATED (LONG): {symbol} - New High: ${current_high:.2f}, "
f"Trail: ${risk_state.runner_trail:.2f}")
return True
elif risk_state.direction == -1: # Short positions
risk_state.runner_trail = current_high + EVENT_TRAIL_DISTANCE * atr_value
self.algorithm.Debug(f"EVENT TRAIL UPDATED (SHORT): {symbol} - New Low: ${current_low:.2f}, "
f"Trail: ${risk_state.runner_trail:.2f}")
return True
return False
def check_scale_out_conditions(self, symbol, current_price, current_high, current_low):
"""Check if conditions are met for scale-out at profit target"""
if symbol not in self.risk_states:
return False
risk_state = self.risk_states[symbol]
# Only check if we haven't already scaled out
if risk_state.scaled_out:
return False
# Check profit target hit based on direction
if risk_state.direction == 1: # Long positions
if current_high >= risk_state.profit_target:
self.algorithm.Debug(f"SCALE-OUT TRIGGERED (LONG): {symbol} - High: ${current_high:.2f} >= "
f"Target: ${risk_state.profit_target:.2f}")
return True
elif risk_state.direction == -1: # Short positions
if current_low <= risk_state.profit_target:
self.algorithm.Debug(f"SCALE-OUT TRIGGERED (SHORT): {symbol} - Low: ${current_low:.2f} <= "
f"Target: ${risk_state.profit_target:.2f}")
return True
return False
def check_runner_trail_exit(self, symbol, current_price, current_high, current_low):
"""Check if runner position should exit on trail"""
if symbol not in self.risk_states:
return False
risk_state = self.risk_states[symbol]
# Only check if we've scaled out and have remaining position
if not risk_state.scaled_out or risk_state.remaining_position <= 0:
return False
# Check trail hit based on direction
if risk_state.direction == 1: # Long positions
if current_low <= risk_state.runner_trail:
self.algorithm.Debug(f"RUNNER TRAIL EXIT (LONG): {symbol} - Low: ${current_low:.2f} <= "
f"Trail: ${risk_state.runner_trail:.2f}")
return True
elif risk_state.direction == -1: # Short positions
if current_high >= risk_state.runner_trail:
self.algorithm.Debug(f"RUNNER TRAIL EXIT (SHORT): {symbol} - High: ${current_high:.2f} >= "
f"Trail: ${risk_state.runner_trail:.2f}")
return True
return False
def process_scale_out(self, symbol, current_price):
"""Process the 40% scale-out and update position state"""
if symbol not in self.risk_states:
return 0.0, 0.0, 0.0 # pnl, exit_size, return
risk_state = self.risk_states[symbol]
# Calculate scale-out PnL using combined position calculation
scale_out_pnl, scale_out_size, scale_out_return = self.calculate_combined_pnl(
symbol, current_price, SCALE_OUT_PERCENT
)
# Update position state
risk_state.scaled_out = True
risk_state.remaining_position -= SCALE_OUT_PERCENT
# Initialize runner trail at current level
if risk_state.direction == 1: # Long
risk_state.runner_trail = current_price
else: # Short
risk_state.runner_trail = current_price
self.algorithm.Debug(f"SCALE-OUT PROCESSED: {symbol} - Size: {scale_out_size:.2%}, "
f"PnL: ${scale_out_pnl:.2f}, Remaining: {risk_state.remaining_position:.1%}")
return scale_out_pnl, scale_out_size, scale_out_return
def calculate_combined_pnl(self, symbol, current_price, exit_percentage):
"""Calculate combined P&L across all position legs for partial exit"""
if symbol not in self.risk_states:
return 0.0, 0.0, 0.0
risk_state = self.risk_states[symbol]
total_pnl = 0.0
total_size = 0.0
for leg in risk_state.position_legs:
leg_size = leg.position_size * exit_percentage
total_size += leg_size
if risk_state.direction == 1: # Long
leg_pnl = (current_price - leg.entry_price) * leg_size
else: # Short
leg_pnl = (leg.entry_price - current_price) * leg_size
total_pnl += leg_pnl
# Calculate weighted return
weighted_entry = risk_state.get_weighted_entry_price()
if weighted_entry != 0:
if risk_state.direction == 1:
weighted_return = (current_price - weighted_entry) / weighted_entry
else:
weighted_return = (weighted_entry - current_price) / weighted_entry
else:
weighted_return = 0.0
return total_pnl, total_size, weighted_return
# ===== STEP 10: ANALYTICS INTEGRATION =====
def update_equity_curve(self, current_time=None):
"""Update daily equity curve for risk governor calculations"""
self.analytics.update_equity_curve(current_time)
def record_trade(self, trade_return, trade_pnl):
"""Record completed trade for analytics"""
self.analytics.record_trade(trade_return, trade_pnl)
def emit_periodic_summary(self):
"""Emit periodic performance summary"""
self.analytics.emit_periodic_summary()
def get_adaptive_risk_multiplier(self):
"""Get current adaptive risk multiplier"""
return self.analytics.calculate_adaptive_risk_multiplier() """
Risk State Module
Contains the RiskState and PositionLeg dataclasses for position tracking
"""
from AlgorithmImports import *
from dataclasses import dataclass, field
from typing import List
from params import PYRAMID_COOLDOWN, MAX_TOTAL_SIZE_MULT, PYRAMID_SIZE_MULT
@dataclass
class PositionLeg:
"""
Represents a single leg of a position (for pyramiding).
"""
entry_price: float
position_size: float
entry_time: datetime
entry_type: str # 'primary', 'pyramid1', 'pyramid2'
direction: int # 1 for long, -1 for short
def __post_init__(self):
"""Validate the position leg after initialization"""
if self.entry_price <= 0:
raise ValueError(f"Invalid entry price: {self.entry_price}")
if self.position_size <= 0:
raise ValueError(f"Invalid position size: {self.position_size}")
if self.direction not in [1, -1]:
raise ValueError(f"Invalid direction: {self.direction}")
@dataclass
class RiskState:
"""
Enhanced risk state for Step 9: Event-based exits with pyramiding support.
Tracks stop loss, profit targets, and runner trail for multiple position legs.
"""
# Position tracking
position_legs: List[PositionLeg] = field(default_factory=list)
total_position_size: float = 0.0
direction: int = 0 # +1 for long, -1 for short, 0 for no position
# Core risk levels
original_stop: float = 0.0
current_stop: float = 0.0
profit_target: float = 0.0
# Step 9: Event-based exit tracking
scaled_out: bool = False
remaining_position: float = 1.0 # Percentage of position remaining
runner_trail: float = 0.0 # Event-based trail level
highest_high_since_entry: float = 0.0 # For long positions
lowest_low_since_entry: float = 0.0 # For short positions
bars_held: int = 0 # For adaptive stop management
# State flags
is_active: bool = True
entry_time: datetime = field(default_factory=lambda: datetime.min)
last_update_time: datetime = field(default_factory=lambda: datetime.min)
last_pyramid_time: datetime = field(default_factory=lambda: datetime.min) # Track last pyramid entry for cooldown
def __post_init__(self):
"""Validate the risk state after initialization"""
if self.total_position_size < 0:
raise ValueError(f"Total position size cannot be negative: {self.total_position_size}")
if self.direction not in [-1, 0, 1]:
raise ValueError(f"Direction must be -1, 0, or 1: {self.direction}")
def add_position_leg(self, entry_price: float, position_size: float, entry_type: str, direction: int, current_time: datetime = None):
"""Add a new position leg (for pyramiding)"""
if current_time is None:
current_time = datetime.now()
leg = PositionLeg(
entry_price=entry_price,
position_size=position_size,
entry_time=current_time,
entry_type=entry_type,
direction=direction
)
self.position_legs.append(leg)
self.total_position_size += position_size
self.direction = direction # Set overall position direction
# Update pyramid time for cooldown tracking
if entry_type in ['pyramid1', 'pyramid2']:
self.last_pyramid_time = current_time
# Initialize tracking on first leg
if len(self.position_legs) == 1:
self.entry_time = leg.entry_time
if direction == 1: # Long
self.highest_high_since_entry = entry_price
else: # Short
self.lowest_low_since_entry = entry_price
def get_weighted_entry_price(self) -> float:
"""Calculate weighted average entry price across all legs"""
if not self.position_legs:
return 0.0
total_value = sum(leg.entry_price * leg.position_size for leg in self.position_legs)
return total_value / self.total_position_size if self.total_position_size > 0 else 0.0
def update_highs_lows(self, current_high: float, current_low: float):
"""Update highest high and lowest low since entry for event-based trail"""
if self.direction == 1: # Long positions
if current_high > self.highest_high_since_entry:
self.highest_high_since_entry = current_high
return True # New high achieved
elif self.direction == -1: # Short positions
if current_low < self.lowest_low_since_entry:
self.lowest_low_since_entry = current_low
return True # New low achieved
return False # No new extreme
def can_add_pyramid(self, current_time: datetime) -> bool:
"""Check if we can add another pyramid leg"""
# Check cooldown (PYRAMID_COOLDOWN is in days)
time_diff = current_time - self.last_pyramid_time
if time_diff.total_seconds() < PYRAMID_COOLDOWN * 24 * 60 * 60:
return False
# Check size limits
max_allowed_size = self.get_base_position_size() * MAX_TOTAL_SIZE_MULT
return self.total_position_size < max_allowed_size
def get_base_position_size(self) -> float:
"""Get the base position size (size of first leg)"""
if not self.position_legs:
return 0.0
return self.position_legs[0].position_size / (1.0 if self.position_legs[0].entry_type == 'primary' else PYRAMID_SIZE_MULT)
def clear_position(self):
"""Clear all position data"""
self.position_legs.clear()
self.total_position_size = 0.0
self.direction = 0
self.scaled_out = False
self.remaining_position = 1.0
self.runner_trail = 0.0
self.highest_high_since_entry = 0.0
self.lowest_low_since_entry = 0.0
self.bars_held = 0
self.is_active = False # region imports
from AlgorithmImports import *
# endregion
"""
Signal Generation Module
Handles entry and exit signal logic
"""
class SignalGenerator:
"""Generates trading signals based on indicators and market conditions"""
def __init__(self, algorithm):
self.algorithm = algorithm
self.signals = {}
def generate_signals(self, symbol, indicator_bundle):
"""Generate entry and exit signals for a symbol
Args:
symbol: Symbol to generate signals for
indicator_bundle: IndicatorBundle instance for the symbol
Returns:
dict: Dictionary containing signal information
"""
if not indicator_bundle.is_ready:
return {"entry": None, "exit": None, "position_size": 0}
# Placeholder signal logic - will be expanded in later steps
# For now, just return neutral signals
return {
"entry": None, # Will be "long", "short", or None
"exit": None, # Will be True/False
"position_size": 0 # Will be calculated position size
}
def get_entry_signal(self, symbol, indicator_bundle):
"""Get entry signal for a symbol
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance
Returns:
str: "long", "short", or None
"""
signals = self.generate_signals(symbol, indicator_bundle)
return signals.get("entry")
def get_exit_signal(self, symbol, indicator_bundle):
"""Get exit signal for a symbol
Args:
symbol: Symbol to check
indicator_bundle: IndicatorBundle instance
Returns:
bool: True if should exit position
"""
signals = self.generate_signals(symbol, indicator_bundle)
return signals.get("exit", False) """
Universe Selection Module
Handles symbol selection and universe management with hybrid static + dynamic approach
"""
import numpy as np
from datetime import datetime, timedelta
import params
from AlgorithmImports import *
class UniverseSelector:
"""Handles hybrid universe selection with static base + dynamic add-ons"""
def __init__(self, algorithm):
self.algorithm = algorithm
# Static universe (never changes)
self.static_universe = params.STATIC_UNIVERSE_LIST if params.ENABLE_STATIC_UNIVERSE else []
# Dynamic universe (changes based on edge scoring)
self.dynamic_universe = []
# Timing trackers
self.last_scan = None
self.last_prune = None
# Candidate pool for dynamic selection
self.candidate_pool = self._build_candidate_pool()
# Edge score cache
self.edge_scores = {}
# Correlation matrix cache
self.correlation_cache = {}
self.correlation_cache_date = None
candidate_mode = "Live" if params.USE_LIVE_CANDIDATE_POOL else "Static"
self.algorithm.Debug(f"UniverseSelector initialized: Static={len(self.static_universe)}, Dynamic enabled={params.ENABLE_DYNAMIC_UNIVERSE}, Candidate pool={candidate_mode}")
def get_universe(self):
"""Returns the current universe of symbols to trade
Returns:
list: List of symbol strings (static + dynamic)
"""
universe = []
if params.ENABLE_STATIC_UNIVERSE:
universe.extend(self.static_universe)
if params.ENABLE_DYNAMIC_UNIVERSE:
universe.extend(self.dynamic_universe)
# Remove duplicates while preserving order
seen = set()
unique_universe = []
for symbol in universe:
if symbol not in seen:
seen.add(symbol)
unique_universe.append(symbol)
return unique_universe
def refresh_equity_candidates(self, symbol_list):
"""Replace the equity portion of candidate_pool with a fresh list
coming from the algorithm's Coarse universe filter."""
# Only refresh if live candidate pool is enabled
if not params.USE_LIVE_CANDIDATE_POOL:
return
# Retain existing ETFs & crypto (short symbols or ending in USD/USDT/USDC)
non_equity_symbols = [s for s in self.candidate_pool
if s.endswith("USD") or s.endswith("USDT") or s.endswith("USDC") or len(s) <= 5]
# Combine with new equity list, removing duplicates
self.candidate_pool = list(set(symbol_list) | set(non_equity_symbols))
self.algorithm.Debug(f"CANDIDATE REFRESH: Updated pool with {len(symbol_list)} equities, "
f"total candidates: {len(self.candidate_pool)}")
def update_universe(self):
"""Updates the universe selection with scan/prune logic"""
if not params.ENABLE_DYNAMIC_UNIVERSE:
return
today = self.algorithm.Time.date()
# ---- Additions scan -------------------------------------------------------
if (self.last_scan is None or
(today - self.last_scan).days >= params.DYNAMIC_SCAN_INTERVAL_DAYS):
self._run_add_scan()
self.last_scan = today
# ---- Pruning scan ---------------------------------------------------------
if (self.last_prune is None or
(today - self.last_prune).days >= params.DYNAMIC_PRUNE_INTERVAL_DAYS):
self._run_prune()
self.last_prune = today
def _build_candidate_pool(self):
"""Build initial pool of candidate symbols for dynamic selection"""
# Major ETFs (static list)
etfs = [
"SPY", "QQQ", "IWM", "EFA", "EEM", "VTI", "VEA", "VWO", "GLD", "SLV",
"TLT", "IEF", "HYG", "LQD", "XLF", "XLK", "XLE", "XLV", "XLI", "XLP",
"ARKK", "ARKQ", "ARKG", "ARKW", "SQQQ", "TQQQ", "UPRO", "SPXL", "VIX", "UVXY"
]
# Major crypto pairs (static list)
crypto = [
"BTCUSD", "ETHUSD", "ADAUSD", "SOLUSDT", "DOTUSD", "AVAXUSD", "MATICUSD",
"ALGOUSD", "ATOMUSD", "LINKUSD", "UNIUSD", "AAVEUSD", "SUSHIUSD", "COMPUSD",
"LTCUSD", "BCHUSD", "XRPUSD", "XLMUSD", "TRXUSD", "EOSUSD"
]
# Equities - either static or dynamic based on parameter
if params.USE_LIVE_CANDIDATE_POOL:
# Equities will be populated dynamically via refresh_equity_candidates()
equities = []
else:
# Use proven static equity list
equities = [
"AAPL", "MSFT", "GOOGL", "GOOG", "AMZN", "TSLA", "META", "NVDA", "BRK.B", "UNH",
"JNJ", "V", "WMT", "JPM", "PG", "MA", "HD", "CVX", "ABBV", "PFE",
"BAC", "KO", "AVGO", "PEP", "TMO", "COST", "DIS", "ABT", "DHR", "VZ",
"ADBE", "NFLX", "CRM", "XOM", "NKE", "CMCSA", "T", "QCOM", "TXN", "NEE",
"LLY", "AMD", "HON", "UPS", "LOW", "INTC", "ORCL", "PM", "IBM", "SPGI",
"INTU", "GS", "CAT", "AMGN", "BKNG", "ISRG", "AXP", "NOW", "DE", "TJX",
"SCHW", "SYK", "BLK", "ADP", "GILD", "MDLZ", "TMUS", "CI", "LRCX", "CB",
"ZTS", "MMC", "CVS", "FIS", "MO", "USB", "SO", "DUK", "BSX", "REGN",
"PYPL", "EQIX", "ITW", "CL", "AON", "NSC", "GE", "PNC", "FCX", "SHW",
"ATVI", "MU", "F", "GM", "SNOW", "COIN", "PLTR", "RBLX", "DKNG", "UBER"
]
return equities + etfs + crypto
def _run_add_scan(self):
"""Run scan to add new symbols to dynamic universe"""
try:
self.algorithm.Debug("DYNAMIC SCAN: Starting add scan...")
# Calculate edge scores for all candidates
scored_candidates = []
for symbol_str in self.candidate_pool:
# Skip if already in universe
if symbol_str in self.static_universe or symbol_str in self.dynamic_universe:
continue
edge_score = self._calculate_edge_score(symbol_str)
if edge_score is not None and edge_score > 0:
scored_candidates.append((symbol_str, edge_score))
# Sort by edge score (highest first)
scored_candidates.sort(key=lambda x: x[1], reverse=True)
# Add top candidates up to limits
additions = 0
max_additions = min(params.MAX_ADDITIONS_PER_SCAN,
params.DYN_UNIVERSE_MAX - len(self.dynamic_universe))
for symbol_str, edge_score in scored_candidates:
if additions >= max_additions:
break
if self._add_symbol_to_dynamic_universe(symbol_str):
additions += 1
self.algorithm.Debug(f"DYNAMIC ADD: {symbol_str} (edge_score={edge_score:.3f})")
self.algorithm.Debug(f"DYNAMIC SCAN: Added {additions} symbols. Dynamic universe size: {len(self.dynamic_universe)}")
except Exception as e:
self.algorithm.Debug(f"ERROR in _run_add_scan: {e}")
def _run_prune(self):
"""Run pruning to remove underperforming symbols from dynamic universe"""
try:
self.algorithm.Debug("DYNAMIC PRUNE: Starting prune scan...")
symbols_to_remove = []
for symbol_str in self.dynamic_universe:
# Re-evaluate edge score
edge_score = self._calculate_edge_score(symbol_str)
# Remove if edge score is now negative or None
if edge_score is None or edge_score <= 0:
symbols_to_remove.append(symbol_str)
# Limit removals per prune cycle
symbols_to_remove = symbols_to_remove[:params.MAX_DROPS_PER_PRUNE]
# Remove symbols
for symbol_str in symbols_to_remove:
self._remove_symbol_from_dynamic_universe(symbol_str)
self.algorithm.Debug(f"DYNAMIC REMOVE: {symbol_str}")
self.algorithm.Debug(f"DYNAMIC PRUNE: Removed {len(symbols_to_remove)} symbols. Dynamic universe size: {len(self.dynamic_universe)}")
except Exception as e:
self.algorithm.Debug(f"ERROR in _run_prune: {e}")
def _calculate_edge_score(self, symbol_str):
"""Calculate edge score for a symbol"""
try:
# Get historical data
symbol = self._get_symbol_object(symbol_str)
if symbol is None:
return None
history = self.algorithm.History(symbol, params.EDGE_LOOKBACK_DAYS, Resolution.Daily)
if history.empty or len(history) < params.EDGE_LOOKBACK_DAYS // 2:
return None
# Calculate metrics
returns = history['close'].pct_change().dropna()
if len(returns) < 20:
return None
# Total return
total_return = (history['close'].iloc[-1] / history['close'].iloc[0]) - 1
# Volatility (annualized)
volatility = returns.std() * np.sqrt(252)
if volatility <= 0:
return None
# Sharpe ratio
sharpe = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0
# ATR percentage
high_low = history['high'] - history['low']
high_close = abs(history['high'] - history['close'].shift(1))
low_close = abs(history['low'] - history['close'].shift(1))
true_range = np.maximum(high_low, np.maximum(high_close, low_close))
atr = true_range.rolling(14).mean().iloc[-1]
atr_pct = atr / history['close'].iloc[-1] if history['close'].iloc[-1] > 0 else 0
# Average dollar volume
# Safely calculate average dollar volume (skip if volume unavailable)
if 'volume' in history.columns:
avg_dollar_volume = (history['close'] * history['volume']).mean()
else:
avg_dollar_volume = 0 # Treat as illiquid if volume data missing
# Apply filters
if avg_dollar_volume < params.MIN_AVG_DOLLAR_VOLUME:
return None
if sharpe < params.MIN_SHARPE_THRESHOLD:
return None
if atr_pct > params.MAX_ATR_PCT_THRESHOLD:
return None
# Check momentum percentile (simplified)
if total_return < 0: # Basic momentum filter
return None
# Calculate edge score
edge_score = sharpe * total_return / volatility
return edge_score
except Exception as e:
return None
def _get_symbol_object(self, symbol_str):
"""Get symbol object for a symbol string"""
try:
if symbol_str.endswith("USD") or symbol_str.endswith("USDT") or symbol_str.endswith("USDC"):
return Symbol.Create(symbol_str, SecurityType.Crypto, Market.Binance)
else:
return Symbol.Create(symbol_str, SecurityType.Equity, Market.USA)
except:
return None
def _add_symbol_to_dynamic_universe(self, symbol_str):
"""Add symbol to dynamic universe and algorithm"""
try:
# Check if we're at capacity
if len(self.dynamic_universe) >= params.DYN_UNIVERSE_MAX:
return False
# Add to algorithm
symbol = None
if symbol_str.endswith("USD") or symbol_str.endswith("USDT") or symbol_str.endswith("USDC"):
symbol = self.algorithm.AddCrypto(symbol_str, Resolution.Daily).Symbol
else:
symbol = self.algorithm.AddEquity(symbol_str, Resolution.Daily).Symbol
# Add to dynamic universe
self.dynamic_universe.append(symbol_str)
# Initialize indicators if needed
if hasattr(self.algorithm, 'indicators'):
from indicators import IndicatorBundle
indicator_bundle = IndicatorBundle(self.algorithm, symbol)
# Warm up indicators with historical data
try:
history = self.algorithm.History(symbol, 250, Resolution.Daily)
indicator_bundle.warm_up(history)
self.algorithm.indicators[symbol] = indicator_bundle
except:
pass # If warmup fails, indicators will initialize naturally
return True
except Exception as e:
self.algorithm.Debug(f"ERROR adding symbol {symbol_str}: {e}")
return False
def _remove_symbol_from_dynamic_universe(self, symbol_str):
"""Remove symbol from dynamic universe"""
try:
if symbol_str in self.dynamic_universe:
self.dynamic_universe.remove(symbol_str)
# Liquidate position if held
symbol = self._get_symbol_object(symbol_str)
if symbol and self.algorithm.Portfolio[symbol].Invested:
self.algorithm.Liquidate(symbol)
# Clean up indicators
if hasattr(self.algorithm, 'indicators') and symbol in self.algorithm.indicators:
del self.algorithm.indicators[symbol]
except Exception as e:
self.algorithm.Debug(f"ERROR removing symbol {symbol_str}: {e}")
def is_valid_symbol(self, symbol):
"""Check if symbol is valid for trading
Args:
symbol: Symbol to validate
Returns:
bool: True if symbol is valid
"""
return symbol is not None and str(symbol) in self.get_universe()