| Overall Statistics |
|
Total Orders 4007 Average Win 0.20% Average Loss -0.37% Compounding Annual Return 35.089% Drawdown 26.100% Expectancy 0.346 Start Equity 100000 End Equity 676979.80 Net Profit 576.980% Sharpe Ratio 1.084 Sortino Ratio 1.211 Probabilistic Sharpe Ratio 60.084% Loss Rate 13% Win Rate 87% Profit-Loss Ratio 0.55 Alpha 0.159 Beta 0.777 Annual Standard Deviation 0.213 Annual Variance 0.045 Information Ratio 0.812 Tracking Error 0.171 Treynor Ratio 0.296 Total Fees $4147.55 Estimated Strategy Capacity $1800000000.00 Lowest Capacity Asset BIL TT1EBZ21QWKL Portfolio Turnover 2.47% |
from AlgorithmImports import *
import numpy as np
from datetime import timedelta
# --- Add SciPy Import ---
import scipy.optimize as sco
# --- End SciPy Import ---
# --- Corrected Optimization Import (Keep for potential future use/reference, though not calling Optimize) ---
from QuantConnect.Algorithm.Framework.Portfolio import MaximumSharpeRatioPortfolioOptimizer
# --- End Corrected Optimization Import ---
class FunnyAlgo(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2025, 10, 22)
self.SetCash(100000)
self.annual_risk_free_rate = 0.02 # Example: Assume 2% annual risk-free rate
self.atr_period = 14
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH)
self.MAX_ALLOCATION_TARGET = 0.99 # Target slightly below 100%
self.SetBrokerageModel(DefaultBrokerageModel(AccountType.CASH))
self.UniverseSettings.Resolution = Resolution.Daily
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.SetWarmUp(max(63 + 2, self.atr_period + 2), Resolution.Daily) # Adjust warmup
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.rawm_momentum_scores_20d = {}
self.selected_by_market_cap = []
self.rebalance_flag = False
self.spy_30day_window = RollingWindow[float](30)
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
# Initialize rolling window with historical data
history = self.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
# Add simple tracking of market trend
self.trend_lookback = 10
self.spy_prices = {}
self.max_spy_history = 60 # Days of price history to keep
# Add dynamic stop-loss enhancement
self.stop_loss_base = 0.04 # Reduced base stop-loss threshold
self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold
self.top_momentum_stocks = set() # Track top momentum stocks for special handling
self.trailing_highs = {} # For potential trailing stop-loss
# Expanded list of inverse and defensive ETFs
# Original inverse ETFs
self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500
self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100
self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones
self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000
self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets
self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400
# Alternative defensive ETFs (not inverse but potentially good in downturns)
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury
self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market
# Sector-based defensive ETFs (often outperform in bear markets)
self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples
self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities
self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare
self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare
self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples
# Group all defensive ETFs together
self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
self.alternative_defensive = [self.gld, self.ief, self.bnd]
self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive
# Add diagnostic logging capability
self.diagnostic_mode = True # Enable detailed diagnostics
# Initialize positions tracking and add weekly tactical adjustment
self.defensive_positions = set()
self.last_defensive_update = datetime(1900, 1, 1)
# Add weekly defensive ETF evaluation schedule
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance
self.WeeklyDefensiveAdjustment)
self.pending_bil_purchase_amount = 0.0
self.pending_monthly_targets = None # Store targets from MonthlyRebalance
self.pending_weekly_targets = None # Store targets from WeeklyDefensiveAdjustment
# Initialize positions tracking
self.inverse_positions = set()
# Add inverse ETF lookback windows for better momentum calculation
self.inverse_lookback_short = 7 # 1 week momentum window
self.inverse_lookback_med = 15 # Medium-term momentum
# Add ATR indicators for enhanced volatility-based stop-loss calculation
self.atr = {}
# Register ATR for key symbols (defensive ETFs, BIL, and SPY)
for symbol in self.all_defensive + [self.bil, self.spy]:
self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily)
# --- Remove attributes from previous initial investment logic ---
# Remove lines like:
# self.initial_investment_done = False
# self.initial_investment_attempted = False
# self.pending_initial_targets = None
# --- End Remove attributes ---
# --- Add for GLD Cash Parking ---
self.gld_cash_parking_active = False
# --- End Add for GLD Cash Parking ---
def CoarseSelectionFunction(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
return [x.Symbol for x in sorted_by_cap]
def SetRebalanceFlag(self):
if self.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
def OnData(self, data):
if self.IsWarmingUp: # Ensure you have a warm-up check
return
# --- GLD Cash Parking Logic ---
# Condition: Portfolio has no investments and GLD parking is not already active.
if not self.Portfolio.Invested and not self.gld_cash_parking_active:
self.Debug("Portfolio is 0% invested. Attempting to park 50% in GLD.")
# Ensure GLD has a valid price before attempting to trade
if self.Securities.ContainsKey(self.gld) and self.Securities[self.gld].Price > 0:
self.SetHoldings(self.gld, 0.50)
self.gld_cash_parking_active = True
# Update entry price for GLD if parked
if self.Securities[self.gld].Price > 0: # Re-check price for safety
self.entry_prices[self.gld] = self.Securities[self.gld].Price
self.Debug("GLD cash parking initiated. Exiting OnData for this step to allow order processing.")
return # Exit OnData to allow the order to process
else:
self.Debug(f"GLD ({self.gld}) price is zero or security not found, cannot initiate cash parking in GLD.")
# --- Reset GLD cash parking flag if main rebalance is pending ---
# This means the main strategy is about to make decisions.
if self.gld_cash_parking_active and \
(self.pending_monthly_targets is not None or self.pending_weekly_targets is not None):
self.Debug("Main strategy rebalance is pending. GLD cash parking is now considered inactive. Rebalance will manage GLD.")
self.gld_cash_parking_active = False
# --- End GLD Cash Parking Logic ---
# --- Execute Pending Rebalances FIRST ---
if self.pending_monthly_targets is not None:
# --- Add Logging ---
# --- Add Logging ---
settled_cash = self.Portfolio.Cash
estimated_target_value = 0
total_portfolio_value = self.Portfolio.TotalPortfolioValue
if total_portfolio_value > 0:
for target in self.pending_monthly_targets:
if self.Securities.ContainsKey(target.Symbol) and self.Securities[target.Symbol].Price > 0:
estimated_target_value += target.Quantity * total_portfolio_value # Target.Quantity is the percentage
self.Debug(f"Monthly Execution: Settled Cash ${settled_cash:.2f}, Estimated Target Value ${estimated_target_value:.2f}")
if estimated_target_value > settled_cash:
self.Debug(f"WARNING: Estimated monthly target value exceeds settled cash BEFORE SetHoldings call!")
# --- End Logging ---
self.Debug(f"Executing delayed Monthly Rebalance SetHoldings for {len(self.pending_monthly_targets)} targets.")
self.SetHoldings(self.pending_monthly_targets)
# ... rest of block ...
if self.pending_weekly_targets is not None:
# --- Add Logging ---
settled_cash = self.Portfolio.Cash
estimated_target_value = 0
total_portfolio_value = self.Portfolio.TotalPortfolioValue
if total_portfolio_value > 0:
for target in self.pending_weekly_targets:
if self.Securities.ContainsKey(target.Symbol) and self.Securities[target.Symbol].Price > 0:
estimated_target_value += target.Quantity * total_portfolio_value # Target.Quantity is the percentage
self.Debug(f"Weekly Execution: Settled Cash ${settled_cash:.2f}, Estimated Target Value ${estimated_target_value:.2f}")
if estimated_target_value > settled_cash:
self.Debug(f"WARNING: Estimated weekly target value exceeds settled cash BEFORE SetHoldings call!")
# --- End Logging ---
self.Debug(f"Executing delayed Weekly Adjustment SetHoldings for {len(self.pending_weekly_targets)} targets.")
self.SetHoldings(self.pending_weekly_targets)
# Update entry prices for the targets just set
self._update_entry_prices_after_rebalance(self.pending_weekly_targets)
self.pending_weekly_targets = None # Clear the flag
self.last_defensive_update = self.Time # Update weekly date *after* execution
return # Exit OnData for this step to avoid conflicts
# --- Handle any pending BIL purchase from previous day ---
if self.pending_bil_purchase_amount > 0:
if self.Securities[self.bil].Price > 0:
bil_price = self.Securities[self.bil].Price
# Ensure we don't try to buy more than available settled cash
settled_cash = self.Portfolio.Cash # Check settled cash now
buy_amount = min(self.pending_bil_purchase_amount, settled_cash)
if buy_amount > 1.0: # Minimum order value check
bil_quantity = buy_amount / bil_price
self.MarketOrder(self.bil, bil_quantity)
self.Debug(f"Executing delayed BIL purchase of ${buy_amount:.2f}")
else:
self.Debug(f"Skipping delayed BIL purchase, amount ${buy_amount:.2f} too small or no settled cash.")
else:
self.Debug(f"Could not execute delayed BIL purchase, BIL price is zero.")
self.pending_bil_purchase_amount = 0.0 # Reset flag regardless
# --- End Handle pending BIL purchase ---
if not data.Bars.ContainsKey(self.spy): return
self.spy_30day_window.Add(data.Bars[self.spy].Close)
# Track prices for trend calculation
self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close
# Remove old prices
dates_to_remove = []
for date in self.spy_prices.keys():
if (self.Time.date() - date).days > self.max_spy_history:
dates_to_remove.append(date)
for date in dates_to_remove:
self.spy_prices.pop(date)
market_trend = self._calculateMarketTrend()
# Track if any stop-loss was triggered
stop_loss_triggered = False
market_trend = self._calculateMarketTrend()
stop_loss_triggered_this_step = False # Use a local flag for this step
cash_from_stop_loss = 0.0 # Track cash generated *this step*
# Check stop-loss triggers
for kvp in list(self.Portfolio): # Iterate over a copy in case of modification
symbol = kvp.Key
holding = kvp.Value
if holding.Invested and symbol != self.bil and symbol not in self.all_defensive:
current_price = self.Securities[symbol].Price
# Use AveragePrice if entry_price not recorded (e.g., after initial load)
entry_price = self.entry_prices.get(symbol, holding.AveragePrice)
if entry_price == 0: continue # Avoid division by zero if AveragePrice is 0
price_drop = (entry_price - current_price) / entry_price
# Start with the base threshold and adjust based on market trend
stop_threshold = 0.045 # Increased from 0.04
# ---
if market_trend < -0.03:
stop_threshold *= 0.9
elif market_trend > 0.03:
stop_threshold *= 1.1
# Incorporate ATR if ready with adjustment to prevent overreaction in high volatility
if symbol in self.atr and self.atr[symbol].IsReady:
current_atr = self.atr[symbol].Current.Value
atr_pct = current_atr / current_price
# If ATR is excessively high versus our base, use a lower weight to temper the effect
effective_weight = self.dynamic_stop_weight
if atr_pct > stop_threshold * 1.2:
effective_weight = min(self.dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold +
effective_weight * atr_pct)
if price_drop >= stop_threshold:
# Estimate cash generated BEFORE liquidating
estimated_proceeds = holding.AbsoluteQuantity * self.Securities[symbol].Price
self.Liquidate(symbol)
stop_loss_triggered_this_step = True
cash_from_stop_loss += estimated_proceeds # Add estimated proceeds
self.Debug(f"Stop-loss triggered for {symbol} at {self.Securities[symbol].Price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
if symbol in self.entry_prices:
del self.entry_prices[symbol]
# If stop-loss triggered, set flag to buy BIL *next time* OnData runs
if stop_loss_triggered_this_step:
self.pending_bil_purchase_amount += cash_from_stop_loss # Accumulate cash to reinvest
self.Debug(f"Stop-loss generated estimated ${cash_from_stop_loss:.2f}. Will attempt BIL purchase next OnData.")
# Call defensive strategy handler - COMMENTED OUT
# self.defensive_strategy.OnData(data)
def _update_entry_prices_after_rebalance(self, targets_just_set):
"""Updates entry prices for symbols included in a delayed SetHoldings call."""
symbols_targeted = {t.Symbol for t in targets_just_set}
total_portfolio_value = self.Portfolio.TotalPortfolioValue
if total_portfolio_value == 0: return # Avoid division by zero
for target in targets_just_set:
symbol = target.Symbol
target_weight = target.Quantity # PortfolioTarget Quantity is the target percentage
# Check if it's a new position or an increase
is_new_or_increase = False
if not self.Portfolio.ContainsKey(symbol) or not self.Portfolio[symbol].Invested:
is_new_or_increase = True
else:
current_holding_pct = self.Portfolio[symbol].HoldingsValue / total_portfolio_value
# Use a small tolerance for increase check
if target_weight > current_holding_pct + 0.001:
is_new_or_increase = True
if is_new_or_increase:
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.entry_prices[symbol] = self.Securities[symbol].Price
self.Debug(f"Updated entry price for {symbol} to {self.entry_prices[symbol]} after delayed SetHoldings.")
elif symbol in self.entry_prices:
# Should not happen if logic is correct, but safety check
del self.entry_prices[symbol]
def WeeklyDefensiveAdjustment(self):
"""Weekly check and adjustment for defensive ETF positions"""
# Skip if we've done the monthly rebalance recently
days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999
if days_since_rebalance < 3:
return
# Skip if we've updated defensive positions recently
days_since_update = (self.Time.date() - self.last_defensive_update.date()).days
if days_since_update < 5: # At most once a week
return
# Calculate current market conditions
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Skip in strong bull markets
if market_deviation > 0.04 and market_trend > 0.03:
# Also liquidate any existing defensive positions if market turns strongly bullish
if self.defensive_positions:
self.Debug("Strong bull market detected, liquidating existing defensive positions.")
positions_to_liquidate = list(self.defensive_positions) # Copy set before iterating/modifying
for symbol in positions_to_liquidate:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
self.Liquidate(symbol)
if symbol in self.entry_prices: del self.entry_prices[symbol]
self.defensive_positions.clear()
self.last_defensive_update = self.Time # Mark as updated
return
# Calculate total invested percentage *before* adjustments
total_portfolio_value = self.Portfolio.TotalPortfolioValue
if total_portfolio_value == 0: return
total_invested_pct = sum(holding.HoldingsValue for holding in self.Portfolio.Values
if holding.Invested) / total_portfolio_value
# If we're already invested above our target max, skip adjustments
if total_invested_pct >= self.MAX_ALLOCATION_TARGET:
self.Debug(f"WeeklyDefensive: Already invested near/above target max ({total_invested_pct*100:.2f}% >= {self.MAX_ALLOCATION_TARGET*100:.1f}%), skipping defensive additions/adjustments.")
return
# Calculate available room for defensive positions (target slightly less than 1.0)
available_allocation_total = max(0, self.MAX_ALLOCATION_TARGET - total_invested_pct)
# Calculate current BIL allocation
current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio.ContainsKey(self.bil) and self.Portfolio[self.bil].Invested else 0
bil_allocation_pct = current_bil_value / total_portfolio_value
# Determine max potential allocation based on BIL and available room
# Allow up to 25% of BIL to be converted, but cap by total available room
potential_allocation_from_bil = bil_allocation_pct * 0.25
potential_allocation = min(potential_allocation_from_bil, available_allocation_total)
# Super detailed diagnostics for current defensive positions
if self.diagnostic_mode and self.defensive_positions:
self.Debug(f"WEEKLY CHECK - Current defensive positions:")
for symbol in self.defensive_positions:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
position = self.Portfolio[symbol]
entry = self.entry_prices.get(symbol, position.AveragePrice)
current = self.Securities[symbol].Price
pnl_pct = (current / entry) - 1 if entry > 0 else 0
self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f} ({position.HoldingsValue/total_portfolio_value*100:.2f}%)")
# Evaluate current defensive positions and potential new ones
self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%")
self.Debug(f"BIL allocation: {bil_allocation_pct*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%, Available Room: {available_allocation_total*100:.2f}%")
# Run the defensive ETF evaluation
new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation)
# --- Determine Targets and Removals ---
targets_to_set = {}
positions_to_remove = set()
# Process existing defensive positions
current_defensive_in_portfolio = {s for s in self.defensive_positions if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested}
for symbol in current_defensive_in_portfolio:
target_pct = new_allocations.get(symbol, 0)
current_pct = self.Portfolio[symbol].HoldingsValue / total_portfolio_value # Use current actual pct
if target_pct > 0.001:
# Only add to targets if significantly different OR if it's a new target from evaluation
if abs(target_pct - current_pct) > 0.005 or symbol not in targets_to_set:
targets_to_set[symbol] = target_pct
else:
# Keep existing position as is, count its current pct towards total
targets_to_set[symbol] = current_pct
if symbol in new_allocations: del new_allocations[symbol]
else:
positions_to_remove.add(symbol)
# Add any remaining new positions
for symbol, allocation in new_allocations.items():
if allocation > 0.001:
targets_to_set[symbol] = allocation
# --- Scaling Logic ---
value_of_positions_to_remove = sum(self.Portfolio[s].HoldingsValue
for s in positions_to_remove if self.Portfolio.ContainsKey(s))
# Estimated portfolio value pct *after* removals but *before* new additions/adjustments
# Ensure total_portfolio_value is not zero before division
estimated_portfolio_pct_after_removal = 0
if total_portfolio_value > 0:
estimated_portfolio_pct_after_removal = max(0, (total_invested_pct * total_portfolio_value - value_of_positions_to_remove) / total_portfolio_value)
# Calculate the total target allocation for *all* defensive ETFs we intend to hold/add
total_target_defensive_allocation = sum(targets_to_set.values())
# Calculate the maximum room available specifically for these defensive targets, based on overall target max
max_allowable_defensive_allocation = max(0, self.MAX_ALLOCATION_TARGET - estimated_portfolio_pct_after_removal)
scaled_targets = targets_to_set.copy()
scale_factor = 1.0
if total_target_defensive_allocation > max_allowable_defensive_allocation + 1e-6:
self.Debug(f"Warning: Weekly defensive targets ({total_target_defensive_allocation*100:.2f}%) exceed available room ({max_allowable_defensive_allocation*100:.2f}% based on target max {self.MAX_ALLOCATION_TARGET*100:.1f}%). Scaling targets.")
if total_target_defensive_allocation > 0:
# Scale down to fit the max_allowable_defensive_allocation
scale_factor = max_allowable_defensive_allocation / total_target_defensive_allocation
self.Debug(f"Scaling weekly defensive targets by factor: {scale_factor:.4f}")
for symbol in scaled_targets:
scaled_targets[symbol] *= scale_factor
# Recalculate total after scaling for logging
total_target_defensive_allocation = sum(scaled_targets.values())
else:
scaled_targets = {}
total_target_defensive_allocation = 0
self.Debug("Warning: total_target_defensive_allocation is zero during scaling, clearing targets.")
final_total_allocation = estimated_portfolio_pct_after_removal + total_target_defensive_allocation
self.Debug(f"Estimated total allocation after weekly adjustment: {final_total_allocation*100:.2f}% (Target Max: {self.MAX_ALLOCATION_TARGET*100:.1f}%)")
# --- Execute Trades (Split Logic) ---
if scaled_targets or positions_to_remove:
self.Debug(f"WEEKLY ADJUSTMENT - Processing changes. Targets: {len(scaled_targets)}, Removals: {len(positions_to_remove)}")
# --- Execute Removals NOW ---
liquidated_symbols = set()
if positions_to_remove:
self.Debug(f"WeeklyDefensive: Removing {len(positions_to_remove)} defensive positions NOW: {[s.Value for s in positions_to_remove]}")
for symbol in positions_to_remove:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
self.Liquidate(symbol)
liquidated_symbols.add(symbol)
# Update tracking sets immediately after confirmed liquidation
self.defensive_positions -= liquidated_symbols
for symbol in liquidated_symbols:
if symbol in self.entry_prices: del self.entry_prices[symbol]
# --- Store New/Adjusted Targets for LATER ---
targets_to_set_later = []
intended_defensive_symbols = set() # Track symbols we intend to hold/add
if scaled_targets:
for symbol, allocation in scaled_targets.items():
# Use a smaller threshold after scaling
if allocation > 1e-5:
targets_to_set_later.append(PortfolioTarget(symbol, allocation))
intended_defensive_symbols.add(symbol)
self.Debug(f"WeeklyDefensive: Intending to target {symbol} at {allocation*100:.2f}% (Scaled)")
# --- Remove the immediate liquidation for negligible targets here ---
# --- It will be handled by the monthly rebalance liquidation if needed ---
# elif symbol in self.defensive_positions: ... (REMOVE THIS BLOCK) ...
if targets_to_set_later:
self.Debug(f"WeeklyDefensive: Storing {len(targets_to_set_later)} scaled defensive targets for execution in next OnData.")
self.pending_weekly_targets = targets_to_set_later
# DO NOT call SetHoldings here
# DO NOT update self.defensive_positions here (wait for OnData execution)
# DO NOT update entry prices here
else:
self.Debug("WeeklyDefensive: No new/adjusted targets to set.")
# If no targets, ensure pending is None
self.pending_weekly_targets = None
# Update weekly date only if no targets are pending
self.last_defensive_update = self.Time
else:
self.Debug("WeeklyDefensive: No changes needed for defensive positions.")
# Update weekly date if no changes were needed
self.last_defensive_update = self.Time
# DO NOT update self.last_defensive_update here if targets are pending
# ...existing code...
def MonthlyRebalance(self):
if not self.rebalance_flag: return
self.rebalance_flag = False
symbols_to_clear = [s for s in self.entry_prices if s != self.bil and s not in self.all_defensive]
for symbol in symbols_to_clear:
if symbol in self.entry_prices:
del self.entry_prices[symbol]
self.trailing_highs.clear()
if self.spy_30day_window.Count < 30:
self.Debug("MonthlyRebalance: Waiting for enough SPY history.")
return
spy_price = self.Securities[self.spy].Price
if self.spy_30day_window.Count == 0:
self.Debug("MonthlyRebalance: SPY 30-day window is empty.")
return
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# --- BIL Allocation Logic (largely unchanged from your provided version) ---
initial_bil_weight = 0.0
strong_bull_market = market_deviation > 0.03 and market_trend > 0.01
if spy_price < sma_30:
deviation_pct = (sma_30 - spy_price) / sma_30
if deviation_pct > 0.05: initial_bil_weight = min(deviation_pct * 1.0, 0.50)
elif deviation_pct > 0.01: initial_bil_weight = min(deviation_pct * 0.8, 0.40)
if not strong_bull_market:
min_bil_factor = 0.8
if market_deviation > 0.02: min_bil_factor = 0.6
elif market_deviation > 0.00: min_bil_factor = 0.7
min_bil_allocation = self.previous_bil_allocation * min_bil_factor
initial_bil_weight = max(initial_bil_weight, min_bil_allocation)
else:
initial_bil_weight = 0
if market_deviation > 0.05: initial_bil_weight = min(initial_bil_weight, 0.01)
elif market_deviation > 0.03: initial_bil_weight = min(initial_bil_weight, 0.05)
elif market_deviation > 0.01: initial_bil_weight = min(initial_bil_weight, 0.15)
elif market_deviation > -0.02: initial_bil_weight = min(initial_bil_weight, 0.35)
else: initial_bil_weight = min(initial_bil_weight, 0.50)
# --- End BIL Allocation ---
# --- Defensive ETF Allocation ---
# defensive_etf_potential is how much of BIL could be shifted to defensive
defensive_etf_potential_from_bil = initial_bil_weight * 0.40
if self.diagnostic_mode: self._runDefensiveETFDiagnostics(market_deviation, market_trend)
# _evaluateDefensiveETFs now returns allocations that sum up to its 'max_allocation' if conditions are met
# 'max_allocation' for _evaluateDefensiveETFs should be the portion of the portfolio we are willing to give to defensives
# This portion comes from the initial_bil_weight.
all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, defensive_etf_potential_from_bil)
total_defensive_allocation = sum(all_defensive_allocations.values())
# Final BIL weight is initial target MINUS what was actually used by defensive ETFs
final_bil_weight = max(0, initial_bil_weight - total_defensive_allocation)
# --- End Defensive ETF Allocation ---
equity_weight = max(0, 1.0 - final_bil_weight - total_defensive_allocation)
# Sanity check
intended_total = final_bil_weight + total_defensive_allocation + equity_weight
if abs(intended_total - 1.0) > 0.001 and intended_total > 0:
self.Debug(f"MonthlyRebalance: Pre-opt total {intended_total:.4f} != 1.0. Normalizing equity_weight.")
equity_weight = max(0, 1.0 - final_bil_weight - total_defensive_allocation)
self.Debug(f"MonthlyRebalance Pre-Opt: Equity {equity_weight*100:.2f}%, BIL {final_bil_weight*100:.2f}%, Defensive {total_defensive_allocation*100:.2f}%")
# --- Begin Modified Equity Selection Block in MonthlyRebalance ---
# Calculate enhanced momentum scores (this also sets self.raw_momentum_scores_20d)
momentum_scores = self._calculateSimpleMomentum()
strong_equity_candidates = []
sma_50_lookback = 50
# Load historical data for 50-day SMA calculation for the selected stocks
candidate_symbols_for_sma = [s for s, _ in self.selected_by_market_cap if isinstance(s, Symbol)]
history_for_sma = {}
if candidate_symbols_for_sma:
hist_data_sma = self.History(candidate_symbols_for_sma, sma_50_lookback + 1, Resolution.Daily)
if not hist_data_sma.empty:
for sym_obj in candidate_symbols_for_sma:
if sym_obj in hist_data_sma.index.get_level_values(0):
history_for_sma[sym_obj] = hist_data_sma.loc[sym_obj]['close']
for symbol_obj, mcap in self.selected_by_market_cap:
if not isinstance(symbol_obj, Symbol):
continue
current_price = self.Securities[symbol_obj].Price
if current_price == 0:
continue
# Condition 1: Use a higher momentum score threshold for entry (from 1.10 to 1.15)
score = momentum_scores.get(symbol_obj, 0.0)
if score < 1.15:
continue
# Condition 2: Price must be above the 50-day SMA
sma50 = 0
prices_for_sma = history_for_sma.get(symbol_obj)
if prices_for_sma is not None and len(prices_for_sma) >= sma_50_lookback:
sma50 = prices_for_sma.rolling(window=sma_50_lookback).mean().iloc[-1]
if sma50 == 0 or current_price <= sma50:
continue
# Condition 3: Require a higher 20-day raw momentum (from 1% to 1.5% gain)
raw_mom_20d = self.raw_momentum_scores_20d.get(symbol_obj, -1.0)
if raw_mom_20d <= 0.015:
continue
strong_equity_candidates.append(symbol_obj)
self.Debug(f"Strong Equity Candidate: {symbol_obj.Value} (Score: {score:.2f}, Price: {current_price:.2f} > SMA50: {sma50:.2f}, 20d Mom: {raw_mom_20d*100:.2f}%)")
optimization_symbols = []
if len(strong_equity_candidates) >= 3:
self.Debug(f"Using {len(strong_equity_candidates)} strong equity candidates for optimization.")
optimization_symbols = strong_equity_candidates
else:
self.Debug(f"Too few strong candidates ({len(strong_equity_candidates)}). Falling back to broader momentum selection.")
fallback_candidates = []
for symbol_obj, mcap in self.selected_by_market_cap:
if not isinstance(symbol_obj, Symbol):
continue
score = momentum_scores.get(symbol_obj, 0.0)
current_price = self.Securities[symbol_obj].Price
if current_price == 0:
continue
sma50 = 0
prices_for_sma = history_for_sma.get(symbol_obj)
if prices_for_sma is not None and len(prices_for_sma) >= sma_50_lookback:
sma50 = prices_for_sma.rolling(window=sma_50_lookback).mean().iloc[-1]
passes_fallback = False
if score >= 1.08: # slightly lower threshold for fallback
passes_fallback = True
elif self.raw_momentum_scores.get(symbol_obj, -1) > 0.015 and (sma50 == 0 or current_price > sma50):
passes_fallback = True
if passes_fallback:
fallback_candidates.append(symbol_obj)
if len(fallback_candidates) >= 5:
optimization_symbols = fallback_candidates[:15]
self.Debug(f"Using {len(optimization_symbols)} fallback equity candidates.")
elif len(self.selected_by_market_cap) > 0:
self.Debug(f"Fallback also too few. Using top 10 by market cap.")
optimization_symbols = [s for s, _ in self.selected_by_market_cap[:10]]
else:
self.Debug("No equities available even from market cap list.")
optimization_symbols = []
# --- End Modified Equity Selection Block ---
equity_weights_optimized = {}
if optimization_symbols and equity_weight > 0.001:
self.Debug(f"Optimizing for {len(optimization_symbols)} symbols, equity target {equity_weight*100:.2f}%")
try:
lookback_days_opt = 63
# Ensure optimization_symbols are valid Symbol objects
valid_opt_symbols = [s for s in optimization_symbols if isinstance(s, Symbol) and self.Securities.ContainsKey(s) and self.Securities[s].Price > 0]
if not valid_opt_symbols:
self.Debug("No valid symbols for optimization after final checks.")
else:
history_opt = self.History(valid_opt_symbols, lookback_days_opt, Resolution.Daily)
if history_opt.empty or len(history_opt.index.levels[0]) < len(valid_opt_symbols) / 2: # Allow if some symbols have history
self.Debug("Insufficient history for many optimization symbols, falling to equal weight.")
num_stocks = len(valid_opt_symbols)
equity_weights_optimized = {s: equity_weight / num_stocks for s in valid_opt_symbols} if num_stocks > 0 else {}
else:
returns_df = history_opt['close'].unstack(level=0).pct_change().dropna()
# Filter returns_df for columns (symbols) that have enough data points for cov matrix
min_obs_for_cov = max(2, len(returns_df.columns) // 2) # Heuristic
returns_df = returns_df.dropna(axis=1, thresh=min_obs_for_cov)
if returns_df.empty or len(returns_df.columns) < 2 or len(returns_df) < 2:
self.Debug(f"Returns DataFrame insufficient after processing. Cols: {len(returns_df.columns)}, Rows: {len(returns_df)}. Falling to equal weight.")
num_stocks = len(valid_opt_symbols) # Fallback to original valid_opt_symbols
equity_weights_optimized = {s: equity_weight / num_stocks for s in valid_opt_symbols} if num_stocks > 0 else {}
else:
symbols_in_returns = list(returns_df.columns) # These are Symbol objects
num_assets = len(symbols_in_returns)
mu = returns_df.mean() * 252
S = returns_df.cov() * 252
constraints = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1}) # Sum of weights for optimizer = 1
bounds = tuple((0.0, 1.0) for _ in range(num_assets)) # No shorting, max 100%
initial_weights = np.array(num_assets * [1. / num_assets])
opt_result = sco.minimize(self.negative_sharpe_ratio, initial_weights,
args=(mu, S, self.annual_risk_free_rate),
method='SLSQP', bounds=bounds, constraints=constraints)
if opt_result.success:
optimized_weights_raw = opt_result.x
temp_weights_sum_one = {}
for i, symbol_in_opt in enumerate(symbols_in_returns):
temp_weights_sum_one[symbol_in_opt] = max(0, optimized_weights_raw[i])
# Normalize these weights to sum to 1 again (due to max(0, ..))
current_sum_raw = sum(temp_weights_sum_one.values())
if current_sum_raw > 1e-6:
for s_key in temp_weights_sum_one:
# Scale by equity_weight to get final portfolio percentage
equity_weights_optimized[s_key] = (temp_weights_sum_one[s_key] / current_sum_raw) * equity_weight
else: # Sum of weights is zero, fallback
self.Debug("Sum of raw optimized weights (after max(0,..)) is zero. Equal weight fallback.")
num_stocks_eq = len(symbols_in_returns)
equity_weights_optimized = {s: equity_weight / num_stocks_eq for s in symbols_in_returns} if num_stocks_eq > 0 else {}
self.Debug(f"Optimization successful for {len(equity_weights_optimized)} equities.")
else:
self.Debug(f"Optimization failed: {opt_result.message}. Equal weight for {len(symbols_in_returns)} symbols.")
num_stocks_eq = len(symbols_in_returns)
equity_weights_optimized = {s: equity_weight / num_stocks_eq for s in symbols_in_returns} if num_stocks_eq > 0 else {}
except Exception as e:
self.Error(f"Error during equity optimization: {e}. Equal weight fallback.")
num_stocks = len(optimization_symbols) # Fallback to initial list for optimization
equity_weights_optimized = {s: equity_weight / num_stocks for s in optimization_symbols} if num_stocks > 0 else {}
else:
self.Debug(f"Skipping equity optimization. Symbols: {len(optimization_symbols)}, Equity Weight: {equity_weight*100:.2f}%")
# --- End Portfolio Optimization ---
# --- Final Allocation Scaling & Target Setting (largely unchanged) ---
final_targets_unscaled = {}
if final_bil_weight > 0 : final_targets_unscaled[self.bil] = final_bil_weight
for symbol, weight in all_defensive_allocations.items():
if weight > 0: final_targets_unscaled[symbol] = final_targets_unscaled.get(symbol, 0) + weight
for symbol, weight in equity_weights_optimized.items():
if weight > 0: final_targets_unscaled[symbol] = final_targets_unscaled.get(symbol, 0) + weight
total_unscaled_allocation = sum(w for w in final_targets_unscaled.values() if w > 0)
scale_factor = 1.0
if total_unscaled_allocation > self.MAX_ALLOCATION_TARGET:
if total_unscaled_allocation > 1e-9:
scale_factor = self.MAX_ALLOCATION_TARGET / total_unscaled_allocation
final_targets_scaled = {}
scaled_equity_total = 0
scaled_defensive_total = 0
scaled_bil_total = 0
for symbol, weight in final_targets_unscaled.items():
scaled_weight = max(0, weight * scale_factor)
if scaled_weight > 1e-5: # Keep if meaningful
final_targets_scaled[symbol] = scaled_weight
if symbol == self.bil: scaled_bil_total = scaled_weight
elif symbol in self.all_defensive: scaled_defensive_total += scaled_weight
else: scaled_equity_total += scaled_weight
final_total_scaled_sum = scaled_bil_total + scaled_defensive_total + scaled_equity_total
self.Debug(f"MonthlyRebalance Final Scaled: Equity {scaled_equity_total*100:.2f}%, BIL {scaled_bil_total*100:.2f}%, Defensive {scaled_defensive_total*100:.2f}%, Total: {final_total_scaled_sum*100:.2f}%")
targets_to_set_later = []
final_symbols_targeted_this_rebalance = set()
active_defensive_targets_this_rebalance = {}
for symbol, weight in final_targets_scaled.items():
if weight > 1e-5:
targets_to_set_later.append(PortfolioTarget(symbol, weight))
final_symbols_targeted_this_rebalance.add(symbol)
if symbol in self.all_defensive:
active_defensive_targets_this_rebalance[symbol] = weight
# Liquidate positions not in final_symbols_targeted_this_rebalance
symbols_to_liquidate_now = []
for symbol_in_portfolio in list(self.Portfolio.Keys):
holding = self.Portfolio[symbol_in_portfolio]
if holding.Invested and symbol_in_portfolio != self.spy: # Exclude SPY if it's just for data
if symbol_in_portfolio not in final_symbols_targeted_this_rebalance:
symbols_to_liquidate_now.append(symbol_in_portfolio)
if symbols_to_liquidate_now:
unique_to_liquidate = list(set(symbols_to_liquidate_now)) # Ensure unique
self.Debug(f"MonthlyRebalance: Liquidating {len(unique_to_liquidate)} symbols NOW: {[s.Value for s in unique_to_liquidate]}")
for sym_to_liq in unique_to_liquidate:
self.Liquidate(sym_to_liq)
if sym_to_liq in self.entry_prices: del self.entry_prices[sym_to_liq]
# self.defensive_positions is updated in OnData after SetHoldings confirms defensive trades
if targets_to_set_later:
self.Debug(f"MonthlyRebalance: Storing {len(targets_to_set_later)} targets for OnData. Total target weight: {final_total_scaled_sum:.4f}")
self.pending_monthly_targets = targets_to_set_later
else:
self.Debug("MonthlyRebalance: No targets to set after scaling.")
self.pending_monthly_targets = None
self.previous_bil_allocation = scaled_bil_total # Update based on intended scaled BIL
# self.last_rebalance_date will be updated in OnData after SetHoldings
# ...existing code...
def _calculateMarketTrend(self):
"""Calculate recent market trend using price history"""
if len(self.spy_prices) < self.trend_lookback + 1:
return 0 # Not enough data
dates = sorted(self.spy_prices.keys())
if len(dates) <= self.trend_lookback:
return 0
recent_price = self.spy_prices[dates[-1]]
older_price = self.spy_prices[dates[-self.trend_lookback]]
return (recent_price / older_price) - 1.0
# ...existing code...
def _calculateSimpleMomentum(self):
"""Calculate enhanced momentum scores using multiple timeframes, emphasizing high momentum,
and also store 20-day raw momentum."""
momentum_scores = {}
# Ensure self.selected_by_market_cap provides Symbol objects
symbols = [sym_obj for sym_obj, mcap in self.selected_by_market_cap if isinstance(sym_obj, Symbol)]
if not symbols:
self.raw_momentum_scores = {}
self.raw_momentum_scores_20d = {} # Initialize new attribute
return momentum_scores
# History needed for momentum calculation (60 days for weighted, 20 days for raw_20d)
history_lookback = 60
history = self.History(symbols, history_lookback, Resolution.Daily)
if history.empty:
self.raw_momentum_scores = {s: -0.1 for s in symbols}
self.raw_momentum_scores_20d = {s: -0.1 for s in symbols} # Default for new attribute
return {s: 0.9 for s in symbols}
temp_raw_momentum = {} # Weighted 10/30/60d
temp_raw_momentum_20d = {} # Raw 20-day
for symbol_obj in symbols: # Iterate over Symbol objects
if symbol_obj in history.index.get_level_values(0):
prices = history.loc[symbol_obj]['close']
# Weighted momentum (10/30/60 day)
if len(prices) >= 60:
mom_10d = prices.iloc[-1] / prices.iloc[-10] - 1 if prices.iloc[-10] > 0 else 0
mom_30d = prices.iloc[-1] / prices.iloc[-30] - 1 if prices.iloc[-30] > 0 else 0
mom_60d = prices.iloc[-1] / prices.iloc[-60] - 1 if prices.iloc[-60] > 0 else 0
weighted_momentum = (mom_10d * 0.4) + (mom_30d * 0.4) + (mom_60d * 0.2)
temp_raw_momentum[symbol_obj] = weighted_momentum
if weighted_momentum < -0.05: score = 0.8
elif weighted_momentum < 0: score = 0.85 + (weighted_momentum * 1)
else:
score = 1.0 + (weighted_momentum * 1.5) + (weighted_momentum**2 * 2.0)
score = min(2.0, score)
momentum_scores[symbol_obj] = score
else:
momentum_scores[symbol_obj] = 0.9
temp_raw_momentum[symbol_obj] = -0.1
# Raw 20-day momentum
if len(prices) >= 20:
mom_20d = prices.iloc[-1] / prices.iloc[-20] - 1 if prices.iloc[-20] > 0 else 0
temp_raw_momentum_20d[symbol_obj] = mom_20d
else:
temp_raw_momentum_20d[symbol_obj] = -0.1
else: # Symbol not in history (should not happen if symbols come from universe)
momentum_scores[symbol_obj] = 0.9
temp_raw_momentum[symbol_obj] = -0.1
temp_raw_momentum_20d[symbol_obj] = -0.1
self.raw_momentum_scores = temp_raw_momentum
self.raw_momentum_scores_20d = temp_raw_momentum_20d # Store 20d momentum
return momentum_scores
# ...existing code...
def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced evaluation of inverse ETFs with more sensitive criteria"""
allocations = {symbol: 0 for symbol in self.inverse_etfs}
# More permissive consideration of inverse ETFs
if market_deviation > 0.04 and market_trend > 0.02:
return allocations # Only skip in very strong bull markets
# Get more history for better momentum calculation
history = self.History(self.inverse_etfs, 45, Resolution.Daily)
if history.empty:
return allocations
# Enhanced momentum scoring
momentum_scores = {}
volatility_scores = {}
for symbol in self.inverse_etfs:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Multiple timeframe momentum - more emphasis on recent performance
mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
mom_30d = prices.iloc[-1] / prices.iloc[0] - 1
# Weight recent momentum much more heavily
momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2)
# Calculate volatility (lower is better for inverse ETFs)
returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))]
volatility = np.std(returns) if returns else 0
# Calculate short-term rate of change (acceleration)
if len(prices) >= 10:
recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1
prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1
acceleration = recent_5d_change - prev_5d_change
else:
acceleration = 0
# Momentum score adds weight for accelerating performance
momentum_scores[symbol] = momentum + (acceleration * 0.5)
volatility_scores[symbol] = volatility
# More aggressive filtering - consider even small positive momentum
positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005}
# No allocation if no ETFs have at least neutral momentum
if not positive_momentum_etfs:
self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash")
return allocations
# Enhanced selection: favor momentum but consider volatility too
best_candidates = []
for symbol, score in positive_momentum_etfs.items():
volatility = volatility_scores.get(symbol, 1.0)
# Adjust score: higher momentum is good, lower volatility is good
adjusted_score = score - (volatility * 0.5)
best_candidates.append((symbol, score, adjusted_score))
# Sort by adjusted score
best_candidates.sort(key=lambda x: x[2], reverse=True)
# More aggressive allocation model
allocation_pct = 0.0
# Allocate based on market conditions with more sensitivity
if market_deviation < -0.05:
allocation_pct = 1.0 # Use 100% of available inverse allocation
elif market_deviation < -0.03:
allocation_pct = 0.8 # Use 80% of available inverse allocation
elif market_deviation < -0.01:
allocation_pct = 0.6 # Use 60% of available inverse allocation
elif market_deviation < 0.01: # Even in slight bull market if momentum is positive
allocation_pct = 0.4 # Use 40% of available inverse allocation
else:
allocation_pct = 0.2 # Use 20% only if momentum is strong enough
# No candidates or market conditions don't justify allocation
if not best_candidates or allocation_pct < 0.1:
return allocations
# Take top 1-2 ETFs depending on market conditions
num_etfs = 1
if market_deviation < -0.04 and len(best_candidates) > 1:
num_etfs = 2 # Use two ETFs in stronger downtrends
# Allocate to best ETF(s)
remaining_allocation = max_allocation * allocation_pct
for i in range(min(num_etfs, len(best_candidates))):
symbol, raw_score, _ = best_candidates[i]
# Allocate proportionally to momentum strength, with a minimum threshold
etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3
# Calculate allocation for this ETF
etf_allocation = remaining_allocation * etf_weight / num_etfs
# Only allocate if it's a meaningful amount
if etf_allocation >= 0.01: # At least 1% allocation
allocations[symbol] = etf_allocation
self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%")
return allocations
def _runDefensiveETFDiagnostics(self, market_deviation, market_trend):
"""Run detailed diagnostics on all defensive ETFs"""
# Get extensive history for analysis
history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily)
if history.empty:
return
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0,
"15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
# Log market conditions
self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " +
f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%")
# Analyze each ETF
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate multiple timeframe performance
perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate recent acceleration
recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
accel = recent_5d - prev_5d
# Calculate relative performance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
if period == "7d":
rel_perf[period] = perf_7d - spy_val
elif period == "15d":
rel_perf[period] = perf_15d - spy_val
elif period == "30d":
rel_perf[period] = perf_30d - spy_val
# Log detailed ETF statistics
self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " +
f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " +
f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%")
# ...existing code...
def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced defensive ETF evaluation with sector rotation and more aggressive reduction in neutral/bullish markets."""
allocations = {symbol: 0 for symbol in self.all_defensive}
# Skip if market is very bullish (already handled by caller WeeklyDefensiveAdjustment, but good to have a check)
if market_deviation > 0.04 and market_trend > 0.02:
self.Debug("_evaluateDefensiveETFs: Market very bullish, returning zero allocations.")
return allocations
history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily)
if history.empty:
return allocations
self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS (Max Alloc: {max_allocation:.2%}):")
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 and spy_prices.iloc[-5] > 0 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 and spy_prices.iloc[-10] > 0 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 and spy_prices.iloc[-20] > 0 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 if len(spy_prices) >= 30 and spy_prices.iloc[-30] > 0 else 0
}
self.Debug(f" SPY: 5d: {spy_perf.get('5d',0)*100:.1f}%, 10d: {spy_perf.get('10d',0)*100:.1f}%, " +
f"20d: {spy_perf.get('20d',0)*100:.1f}%, 30d: {spy_perf.get('30d',0)*100:.1f}%")
etf_scores = {}
for group_name, group in [("Inverse", self.inverse_etfs),
("Alternative", self.alternative_defensive),
("Sector", self.sector_defensive)]:
self.Debug(f" {group_name} ETFs:")
for symbol in group:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
perf = {
"5d": prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 and prices.iloc[-5] > 0 else 0,
"10d": prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 and prices.iloc[-10] > 0 else 0,
"20d": prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 and prices.iloc[-20] > 0 else 0,
"30d": prices.iloc[-1] / prices.iloc[-30] - 1 if len(prices) >= 30 and prices.iloc[-30] > 0 else 0
}
rel_perf = {p_key: perf[p_key] - spy_val for p_key, spy_val in spy_perf.items()}
self.Debug(f" {symbol.Value}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf.get('5d',0)*100:+.1f}%), " +
f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf.get('10d',0)*100:+.1f}%), " +
f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf.get('30d',0)*100:+.1f}%)")
score = 0
if symbol in self.inverse_etfs:
if market_deviation < -0.02: # Bearish
score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2)
score += (rel_perf.get("5d",0) + rel_perf.get("10d",0)) * 0.15
else: # Neutral or Bullish
score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
if market_deviation > 0: score -= 0.01 # Penalize in up-markets
elif symbol in self.alternative_defensive:
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
if market_deviation < -0.03: score += rel_perf.get("10d",0) * 0.2
elif market_deviation > 0.01: score -= abs(market_deviation * 0.1) # Penalize if market is up
else: # Sector defensive
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score_val = (rel_perf.get("5d",0) * 0.3) + (rel_perf.get("10d",0) * 0.3) + (rel_perf.get("30d",0) * 0.4)
if market_deviation < -0.02:
score = (abs_score * 0.4) + (rel_score_val * 0.6)
else:
score = (abs_score * 0.6) + (rel_score_val * 0.4)
if market_deviation > 0.01: score -= abs(market_deviation*0.05) # Penalize if market is up
etf_scores[symbol] = score
# --- Modified Allocation Percentage Logic ---
allocation_pct_factor = 0.0 # This is a factor for max_allocation
if market_deviation < -0.05 or market_trend < -0.04: # Very bearish
allocation_pct_factor = 0.95
elif market_deviation < -0.03 or market_trend < -0.02: # Bearish
allocation_pct_factor = 0.80
elif market_deviation < -0.01 or market_trend < -0.01: # Slightly Bearish / Neutral-Bearish
allocation_pct_factor = 0.50 # Reduced from 0.6
elif market_deviation < 0.01 and market_trend < 0.005: # Neutral, not clearly bullish
allocation_pct_factor = 0.20 # Reduced from 0.4/0.3
elif market_deviation < 0.02 and market_trend < 0.01: # Slightly Bullish
allocation_pct_factor = 0.10 # Significantly reduced
else: # More clearly bullish (market_deviation >= 0.02 or trend >= 0.01)
allocation_pct_factor = 0.00 # Aim for no new defensive allocation
self.Debug(f"Defensive Allocation Factor based on market regime: {allocation_pct_factor:.2f}")
# Only proceed if there's some potential allocation factor
if allocation_pct_factor <= 0:
self.Debug("Market conditions suggest no defensive allocation. Returning zero allocations.")
return allocations
threshold = -0.005 # Slightly more permissive than before for initial candidates
if market_deviation < -0.03: threshold = -0.01
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Debug("No defensive ETFs met score threshold - keeping as cash")
return allocations
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
self.Debug(f"Top Defensive Candidates (before allocation factor):")
for symbol, score in sorted_candidates[:3]: # Log top 3
group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f" {symbol.Value} ({group}): Score {score*100:.2f}%")
# Adjust overall allocation_pct_factor based on strength of best candidate
best_score = sorted_candidates[0][1] if sorted_candidates else -1 # Ensure it's negative if no candidates
# Scale factor for allocation_pct_factor: if best_score is low, reduce allocation_pct_factor
# If best_score is high (e.g. > 0.02), use full allocation_pct_factor. If < 0, use 0.
score_strength_factor = min(1.0, max(0.0, (best_score + 0.01) * 50)) # e.g. score 0.01 -> 1.0, score 0 -> 0.5, score -0.01 -> 0
final_allocation_target_pct = max_allocation * allocation_pct_factor * score_strength_factor
if final_allocation_target_pct <= 0.001: # If total target is negligible
self.Debug("Final defensive allocation target is negligible after score strength adjustment.")
return allocations
num_etfs = 1
if (market_deviation < -0.03 or market_trend < -0.02) and len(sorted_candidates) > 1: # More severe conditions
num_etfs = min(2, len(sorted_candidates))
elif (market_deviation < -0.01 or market_trend < -0.01) and len(sorted_candidates) > 1: # Mildly severe
num_etfs = min(max(1, len(sorted_candidates) // 2), 2) # Could be 1 or 2
# Allocate to best candidates, ensuring sum of weights is 'final_allocation_target_pct'
# Use only positive scores for weighting
positive_score_candidates = [(s, sc) for s, sc in sorted_candidates[:num_etfs] if sc > 0]
if not positive_score_candidates:
self.Debug("No candidates with positive scores for allocation.")
return allocations
total_positive_score = sum(score for _, score in positive_score_candidates)
if total_positive_score > 0:
for symbol, score in positive_score_candidates:
weight_proportion = score / total_positive_score
etf_allocation = final_allocation_target_pct * weight_proportion
if etf_allocation >= 0.01: # Minimum 1% allocation for an individual ETF
allocations[symbol] = etf_allocation
etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f"Selected {etf_type} ETF {symbol.Value} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%")
else:
self.Debug("Total positive score is zero, no defensive allocation.")
return allocations
# ...existing code...
def portfolio_performance(self, weights, mean_returns, cov_matrix):
# ... (existing code) ...
weights = np.array(weights) # Ensure weights is numpy array
returns_p = np.sum(mean_returns * weights)
std_dev = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
return returns_p, std_dev
def negative_sharpe_ratio(self, weights, mean_returns, cov_matrix, risk_free_rate=0.0):
# ... (existing code) ...
p_returns, p_stddev = self.portfolio_performance(weights, mean_returns, cov_matrix) # Use self.portfolio_performance
return -(p_returns - risk_free_rate) / (p_stddev + 1e-9)