| Overall Statistics |
|
Total Orders 225 Average Win 4.87% Average Loss -0.55% Compounding Annual Return 42.186% Drawdown 26.500% Expectancy 3.938 Start Equity 100000 End Equity 773841.59 Net Profit 673.842% Sharpe Ratio 1.339 Sortino Ratio 1.473 Probabilistic Sharpe Ratio 80.018% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 8.88 Alpha 0 Beta 0 Annual Standard Deviation 0.206 Annual Variance 0.043 Information Ratio 1.466 Tracking Error 0.206 Treynor Ratio 0 Total Fees $539.08 Estimated Strategy Capacity $0 Lowest Capacity Asset LLY R735QTJ8XC9X Portfolio Turnover 1.21% |
from AlgorithmImports import *
import numpy as np
from datetime import timedelta
# --- Add SciPy Import ---
import scipy.optimize as sco
# --- End SciPy Import ---
# --- Corrected Optimization Import (Keep for potential future use/reference, though not calling Optimize) ---
from QuantConnect.Algorithm.Framework.Portfolio import MaximumSharpeRatioPortfolioOptimizer
# --- End Corrected Optimization Import ---
class FunnyAlgo(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2024, 10, 22)
self.SetCash(100000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.annual_risk_free_rate = 0.02 # Example: Assume 2% annual risk-free rate
self.atr_period = 14
self.UniverseSettings.Resolution = Resolution.Daily
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol
self.SetWarmUp(max(63 + 2, self.atr_period + 2), Resolution.Daily) # Adjust warmup
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.selected_by_market_cap = []
self.rebalance_flag = False
self.spy_30day_window = RollingWindow[float](30)
self.entry_prices = {}
self.previous_bil_allocation = 0.0
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.SetRebalanceFlag)
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.MonthlyRebalance)
# Initialize rolling window with historical data
history = self.History(self.spy, 30, Resolution.Daily)
if not history.empty:
for time, row in history.loc[self.spy].iterrows():
self.spy_30day_window.Add(row["close"])
# Add simple tracking of market trend
self.trend_lookback = 10
self.spy_prices = {}
self.max_spy_history = 60 # Days of price history to keep
# Add dynamic stop-loss enhancement
self.stop_loss_base = 0.04 # Reduced base stop-loss threshold
self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold
self.top_momentum_stocks = set() # Track top momentum stocks for special handling
self.trailing_highs = {} # For potential trailing stop-loss
# Expanded list of inverse and defensive ETFs
# Original inverse ETFs
self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500
self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100
self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones
self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000
self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets
self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400
# Alternative defensive ETFs (not inverse but potentially good in downturns)
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury
self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market
# Sector-based defensive ETFs (often outperform in bear markets)
self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples
self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities
self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare
self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare
self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples
# Group all defensive ETFs together
self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]
self.alternative_defensive = [self.gld, self.ief, self.bnd]
self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc]
self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive
# Add diagnostic logging capability
self.diagnostic_mode = True # Enable detailed diagnostics
# Initialize positions tracking and add weekly tactical adjustment
self.defensive_positions = set()
self.last_defensive_update = datetime(1900, 1, 1)
# Add weekly defensive ETF evaluation schedule
self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance
self.WeeklyDefensiveAdjustment)
# Initialize positions tracking
self.inverse_positions = set()
# Add inverse ETF lookback windows for better momentum calculation
self.inverse_lookback_short = 7 # 1 week momentum window
self.inverse_lookback_med = 15 # Medium-term momentum
# Add ATR indicators for enhanced volatility-based stop-loss calculation
self.atr = {}
# Register ATR for key symbols (defensive ETFs, BIL, and SPY)
for symbol in self.all_defensive + [self.bil, self.spy]:
self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily)
# Initialize defensive strategy handler
# self.defensive_strategy = DefensiveStrategyHandler(self, {})
# self.defensive_strategy.Initialize()
def CoarseSelectionFunction(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData
and x.Price > 5
and x.Market == Market.USA]
return [x.Symbol for x in filtered]
def FineSelectionFunction(self, fine):
filtered = [x for x in fine if x.MarketCap > 1e10
and x.SecurityReference.SecurityType == "ST00000001"]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30]
self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap]
return [x.Symbol for x in sorted_by_cap]
def SetRebalanceFlag(self):
if self.Time.weekday() == 2: # Wednesday
self.rebalance_flag = True
def OnData(self, data):
# Update price window
if not data.Bars.ContainsKey(self.spy): return
self.spy_30day_window.Add(data.Bars[self.spy].Close)
# Track prices for trend calculation
self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close
# Remove old prices
dates_to_remove = []
for date in self.spy_prices.keys():
if (self.Time.date() - date).days > self.max_spy_history:
dates_to_remove.append(date)
for date in dates_to_remove:
self.spy_prices.pop(date)
market_trend = self._calculateMarketTrend()
# Track if any stop-loss was triggered
stop_loss_triggered = False
# Check stop-loss triggers with improved dynamic thresholds
for kvp in self.Portfolio:
symbol = kvp.Key
holding = kvp.Value
# Skip check for BIL and defensive ETFs managed elsewhere
if holding.Invested and symbol != self.bil and symbol not in self.all_defensive:
current_price = self.Securities[symbol].Price
# Use AveragePrice if entry_price not recorded (e.g., after initial load)
entry_price = self.entry_prices.get(symbol, holding.AveragePrice)
if entry_price == 0: continue # Avoid division by zero if AveragePrice is 0
price_drop = (entry_price - current_price) / entry_price
# Start with the base threshold and adjust based on market trend
stop_threshold = 0.045 # Increased from 0.04
# ---
if market_trend < -0.03:
stop_threshold *= 0.9
elif market_trend > 0.03:
stop_threshold *= 1.1
# Incorporate ATR if ready with adjustment to prevent overreaction in high volatility
if symbol in self.atr and self.atr[symbol].IsReady:
current_atr = self.atr[symbol].Current.Value
atr_pct = current_atr / current_price
# If ATR is excessively high versus our base, use a lower weight to temper the effect
effective_weight = self.dynamic_stop_weight
if atr_pct > stop_threshold * 1.2:
effective_weight = min(self.dynamic_stop_weight, 0.3)
stop_threshold = ((1 - effective_weight) * stop_threshold +
effective_weight * atr_pct)
if price_drop >= stop_threshold:
self.Liquidate(symbol)
stop_loss_triggered = True
self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%")
if symbol in self.entry_prices:
del self.entry_prices[symbol] # Remove entry price after liquidation
# If any stop-loss was triggered, invest all available cash in BIL
if stop_loss_triggered:
available_cash = self.Portfolio.Cash # Use Portfolio.Cash which reflects settled cash
if available_cash > 0:
# Check if BIL has price data
if self.Securities[self.bil].Price > 0:
bil_price = self.Securities[self.bil].Price
bil_quantity = available_cash / bil_price
self.MarketOrder(self.bil, bil_quantity)
self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss")
else:
self.Debug(f"Could not invest in BIL after stop-loss, BIL price is zero.")
# Call defensive strategy handler - COMMENTED OUT
# self.defensive_strategy.OnData(data)
def WeeklyDefensiveAdjustment(self):
"""Weekly check and adjustment for defensive ETF positions"""
# Skip if we've done the monthly rebalance recently
days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999
if days_since_rebalance < 3:
return
# Skip if we've updated defensive positions recently
days_since_update = (self.Time.date() - self.last_defensive_update.date()).days
if days_since_update < 5: # At most once a week
return
# Calculate current market conditions
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# Skip in strong bull markets
if market_deviation > 0.04 and market_trend > 0.03:
return
# Calculate total invested amount including all positions
total_invested = sum(holding.HoldingsValue for holding in self.Portfolio.Values
if holding.Invested) / self.Portfolio.TotalPortfolioValue
# If we're already fully invested, can't add more defensive positions
if total_invested >= 0.98: # Allow small buffer for rounding errors
self.Debug(f"Already fully invested ({total_invested:.2f}), skipping defensive adjustments")
return
# Calculate available room for defensive positions
available_allocation = max(0, 0.99 - total_invested) # Keep tiny buffer
# Calculate how much is currently allocated to defensive positions
current_defensive_value = sum(self.Portfolio[s].HoldingsValue
for s in self.defensive_positions
if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested)
# Calculate current BIL allocation
current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0
bil_allocation = current_bil_value / self.Portfolio.TotalPortfolioValue
# Limit potential allocation to available room
max_defensive_pct = min(0.25, available_allocation / bil_allocation if bil_allocation > 0 else 0)
potential_allocation = bil_allocation * max_defensive_pct
# Make sure we don't exceed available room
potential_allocation = min(potential_allocation, available_allocation)
# Super detailed diagnostics for current defensive positions
if self.diagnostic_mode and self.defensive_positions:
self.Debug(f"WEEKLY CHECK - Current defensive positions:")
for symbol in self.defensive_positions:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
position = self.Portfolio[symbol]
entry = self.entry_prices.get(symbol, position.AveragePrice)
current = self.Securities[symbol].Price
pnl_pct = (current / entry) - 1 if entry > 0 else 0
self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f}")
# Evaluate current defensive positions and potential new ones
self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%")
self.Debug(f"BIL allocation: {bil_allocation*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%")
# Run the defensive ETF evaluation
new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation)
# Calculate which positions to add, modify, or remove
targets_to_set = {}
positions_to_remove = set()
# Process existing positions
current_defensive_in_portfolio = {s for s in self.defensive_positions if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested}
for symbol in current_defensive_in_portfolio:
target_pct = new_allocations.get(symbol, 0) # Get target, default to 0 if not in new_allocations
if target_pct > 0.01: # Keep or adjust
current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue
# Only add to targets if significantly different or not currently targeted at this level
if abs(target_pct - current_pct) > 0.005: # Use smaller threshold for adjustments
targets_to_set[symbol] = target_pct
# Remove from new_allocations to avoid adding again
if symbol in new_allocations: del new_allocations[symbol]
else: # Remove
positions_to_remove.add(symbol)
# Add any remaining new positions (must have allocation > 0.01)
for symbol, allocation in new_allocations.items():
if allocation > 0.01:
targets_to_set[symbol] = allocation
# Check if we'll exceed our allocation limits with new positions
total_new_allocation_target = sum(targets_to_set.values())
# Calculate allocation currently in defensive ETFs that are *not* being removed
current_kept_defensive_alloc = sum(self.Portfolio[s].HoldingsValue / self.Portfolio.TotalPortfolioValue
for s in current_defensive_in_portfolio if s not in positions_to_remove)
# Estimate allocation *after* removals but *before* new additions/adjustments
estimated_portfolio_pct_after_removal = total_invested - sum(self.Portfolio[s].HoldingsValue / self.Portfolio.TotalPortfolioValue
for s in positions_to_remove if self.Portfolio.ContainsKey(s))
# Calculate the *change* in allocation needed for the targets
required_allocation_increase = total_new_allocation_target - current_kept_defensive_alloc
available_room_strict = max(0, 1.0 - estimated_portfolio_pct_after_removal) # Max room available in portfolio
if required_allocation_increase > available_room_strict + 0.001: # Add small buffer for float issues
self.Debug(f"Warning: Required defensive increase ({required_allocation_increase*100:.2f}%) exceeds available room ({available_room_strict*100:.2f}%). Scaling targets.")
# This case is complex - ideally shouldn't happen with potential_allocation logic
# Fallback: Scale down the *increase* portion to fit
if required_allocation_increase > 0:
scale_factor = available_room_strict / required_allocation_increase
for symbol, target in targets_to_set.items():
current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue if symbol in current_defensive_in_portfolio else 0
increase_part = max(0, target - current_pct)
targets_to_set[symbol] = current_pct + (increase_part * scale_factor)
# Execute trades if needed
if targets_to_set or positions_to_remove:
self.Debug(f"WEEKLY ADJUSTMENT - Making defensive position changes")
# --- Execute Removals First ---
liquidated_symbols = set()
for symbol in positions_to_remove:
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
self.Liquidate(symbol)
liquidated_symbols.add(symbol)
self.Debug(f"Removed defensive position: {symbol}")
# Update internal tracking *after* liquidation calls
self.defensive_positions -= liquidated_symbols
for symbol in liquidated_symbols:
if symbol in self.entry_prices: del self.entry_prices[symbol]
# --- Set New Targets ---
target_list = []
updated_symbols = set()
for symbol, allocation in targets_to_set.items():
if allocation > 0.001: # Final check
target_list.append(PortfolioTarget(symbol, allocation))
updated_symbols.add(symbol)
self.Debug(f"Targeting defensive position: {symbol} to {allocation*100:.2f}%")
if target_list:
self.SetHoldings(target_list)
# Update tracking and entry prices for symbols we targeted
self.defensive_positions.update(updated_symbols)
for symbol in updated_symbols:
# Update entry price if adding/increasing
if not self.Portfolio[symbol].Invested or targets_to_set[symbol] > self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue:
self.entry_prices[symbol] = self.Securities[symbol].Price
self.last_defensive_update = self.Time
def MonthlyRebalance(self):
if not self.rebalance_flag: return
self.rebalance_flag = False
# Clear entry prices only for non-BIL/non-defensive assets during rebalance
symbols_to_clear = [s for s in self.entry_prices if s != self.bil and s not in self.all_defensive]
for symbol in symbols_to_clear:
del self.entry_prices[symbol]
self.trailing_highs.clear() # Still needed if using trailing stop for top stocks
if self.spy_30day_window.Count < 30:
self.Debug("Waiting for enough SPY history.")
return
spy_price = self.Securities[self.spy].Price
sma_30 = sum(self.spy_30day_window) / 30
# Calculate market deviation for better decisions
market_deviation = (spy_price / sma_30) - 1.0
market_trend = self._calculateMarketTrend()
# --- Adjusted BIL allocation logic ---
initial_bil_weight = 0.0 # Start with zero BIL
strong_bull_market = market_deviation > 0.03 and market_trend > 0.01 # Refined strong bull condition
# Calculate base BIL weight based on deviation from SMA
if spy_price < sma_30:
deviation_pct = (sma_30 - spy_price) / sma_30
if deviation_pct > 0.05:
initial_bil_weight = min(deviation_pct * 1.0, 0.50) # Cap at 50%
elif deviation_pct > 0.01:
initial_bil_weight = min(deviation_pct * 0.8, 0.40) # Cap at 40%
# --- Conditional Minimum BIL Allocation ---
# Apply floor only if NOT in a strong bull market
if not strong_bull_market:
# Adjust reduction speed based on market deviation (more positive deviation = faster reduction)
if market_deviation > 0.02:
min_bil_allocation = self.previous_bil_allocation * 0.6 # Faster reduction
elif market_deviation > 0.00: # Near zero deviation
min_bil_allocation = self.previous_bil_allocation * 0.7
else: # Negative deviation (Bearish)
min_bil_allocation = self.previous_bil_allocation * 0.8 # Slower reduction
initial_bil_weight = max(initial_bil_weight, min_bil_allocation)
self.Debug(f"Applied min_bil_allocation floor: {min_bil_allocation*100:.1f}% (Market Dev <= 3% or Trend <= 1%)")
else:
# In strong bull, aggressively reduce BIL, REMOVE floor dependency
initial_bil_weight = 0 # Target zero BIL initially in strong bull
self.Debug(f"Strong bull market (Dev > 3% and Trend > 1%), targeting minimal BIL.")
# --- End Conditional Minimum BIL Allocation ---
# Apply overall lower caps based on market deviation (more aggressive reduction in positive territory)
if market_deviation > 0.05:
initial_bil_weight = min(initial_bil_weight, 0.01) # Max 1% BIL if very strongly positive
elif market_deviation > 0.03:
initial_bil_weight = min(initial_bil_weight, 0.05) # Max 5%
elif market_deviation > 0.01:
initial_bil_weight = min(initial_bil_weight, 0.15) # Max 15% (was 20%)
elif market_deviation > -0.02:
initial_bil_weight = min(initial_bil_weight, 0.35) # Max 35% (was 40%)
else: # Market deviation < -2%
initial_bil_weight = min(initial_bil_weight, 0.50) # Max 50% (unchanged)
# --- End Adjusted BIL Logic ---
# --- Defensive ETF Allocation (Revised Flow) ---
# Determine potential allocation available *from* the initial BIL target
defensive_etf_potential = initial_bil_weight * 0.40
# Run diagnostics if enabled
if self.diagnostic_mode:
self._runDefensiveETFDiagnostics(market_deviation, market_trend)
# Evaluate defensive ETFs using the calculated potential as the maximum possible allocation
all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, defensive_etf_potential)
# Calculate the *actual* total allocation assigned to defensive ETFs
total_defensive_allocation = sum(all_defensive_allocations.values())
# Final BIL weight is the initial target MINUS what was actually allocated to defensive ETFs
final_bil_weight = max(0, initial_bil_weight - total_defensive_allocation)
# --- Correct Equity Weight Calculation ---
equity_weight = max(0, 1.0 - final_bil_weight - total_defensive_allocation)
# --- Sanity Check & Normalization (Should be less frequent now) ---
total_allocation = final_bil_weight + total_defensive_allocation + equity_weight
if abs(total_allocation - 1.0) > 0.01: # Check if significantly different from 100%
self.Debug(f"Warning: Pre-normalization total allocation is {total_allocation:.3f}. Re-normalizing.")
if total_allocation <= 0: # Avoid division by zero
final_bil_weight = 0
total_defensive_allocation = 0
equity_weight = 1.0 # Default to full equity if something went wrong
all_defensive_allocations = {s: 0 for s in self.all_defensive}
else:
scale_factor = 1.0 / total_allocation
final_bil_weight *= scale_factor
equity_weight *= scale_factor
# Scale each defensive allocation
for symbol in all_defensive_allocations:
all_defensive_allocations[symbol] *= scale_factor
total_defensive_allocation = sum(all_defensive_allocations.values()) # Recalculate sum
# --- End Sanity Check ---
self.Debug(f"Allocation breakdown: Equity {equity_weight*100:.1f}%, BIL {final_bil_weight*100:.1f}%, " +
f"Defensive ETFs {total_defensive_allocation*100:.1f}%")
# Enhance stock selection with simple momentum filter
momentum_scores = self._calculateSimpleMomentum() # Uses enhanced calculation now
# --- Identify Top Raw Momentum Stocks (Optional - can be removed if optimizer handles all) ---
# Keep this if you still want the special trailing stop logic for top stocks
top_momentum_stocks = []
if hasattr(self, 'raw_momentum_scores') and self.raw_momentum_scores:
sorted_by_raw_mom = sorted(self.raw_momentum_scores.items(), key=lambda item: item[1], reverse=True)
num_top_stocks = 1
if len(sorted_by_raw_mom) > 1 and sorted_by_raw_mom[1][1] > 0.05: num_top_stocks = 2
top_momentum_stocks = [item[0] for item in sorted_by_raw_mom[:num_top_stocks] if item[1] > 0]
self.Debug(f"Identified Top {len(top_momentum_stocks)} Momentum Stocks: {top_momentum_stocks}")
self.top_momentum_stocks = set(top_momentum_stocks)
else:
self.top_momentum_stocks = set()
# --- End Identify Top Raw Momentum Stocks ---
# Filter stocks based on the calculated score
# Filter stocks based on the calculated score - INCREASED THRESHOLD
filtered_symbols = []
for symbol, mcap in self.selected_by_market_cap:
score = momentum_scores.get(symbol, 0.0) # Default to 0 if no score
# Require a score indicating positive weighted momentum
if score >= 1.05: # Increased threshold (was 0.9) - Requires score > 1.0
filtered_symbols.append(symbol)
elif symbol.Value in self.raw_momentum_scores and self.raw_momentum_scores[symbol.Value] > 0.01: # Fallback: check raw momentum > 1%
filtered_symbols.append(symbol)
self.Debug(f"Including {symbol} based on raw momentum {self.raw_momentum_scores[symbol.Value]:.2f} despite score {score:.2f}")
# Revert logic if too few stocks
if len(filtered_symbols) < 5: # Need at least a few stocks for optimization
self.Debug(f"Filtered too few stocks ({len(filtered_symbols)}), reverting to top 10 by cap.")
filtered_symbols = [s for s, _ in self.selected_by_market_cap[:10]] # Take top 10 by cap
# --- Portfolio Optimization Section ---
weights = {} # Final weights dictionary
if filtered_symbols and equity_weight > 0.01:
self.Debug(f"Optimizing portfolio for {len(filtered_symbols)} symbols with target equity weight {equity_weight:.2f}")
try:
# 1. Define Optimizer (Instantiate just for reference, not calling Optimize)
# optimizer = MaximumSharpeRatioPortfolioOptimizer()
# 2. Prepare Inputs
lookback_days = 63
history = self.History(filtered_symbols, lookback_days, Resolution.Daily)
if history.empty or len(history.index.levels[0]) < len(filtered_symbols):
self.Debug("Insufficient history for optimization, falling back to equal weight.")
num_stocks = len(filtered_symbols)
weights = {symbol: equity_weight / num_stocks for symbol in filtered_symbols} if num_stocks > 0 else {}
else:
# Calculate historical returns (daily)
returns = history['close'].unstack(level=0).pct_change().dropna()
if returns.empty or len(returns.columns) == 0 or len(returns) < 2: # Need at least 2 returns for cov
raise ValueError("Returns DataFrame is empty or too short after processing history.")
symbols_in_returns = list(returns.columns)
num_assets = len(symbols_in_returns)
# Filter out symbols that might have dropped out due to insufficient history
filtered_symbols_in_returns = [s for s in filtered_symbols if s in symbols_in_returns]
if len(filtered_symbols_in_returns) != len(filtered_symbols):
self.Debug(f"Warning: Some symbols dropped during history fetch/processing for optimization. Optimizing for {len(filtered_symbols_in_returns)} symbols.")
if not filtered_symbols_in_returns:
raise ValueError("No symbols remaining after history processing for optimization.")
# Adjust returns dataframe
returns = returns[filtered_symbols_in_returns]
symbols_in_returns = filtered_symbols_in_returns
num_assets = len(symbols_in_returns)
# Calculate annualized mean returns and covariance matrix
mu = returns.mean() * 252
S = returns.cov() * 252
constraints = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
bounds = tuple((0.0, 1.0) for _ in range(num_assets))
initial_weights = np.array(num_assets * [1. / num_assets])
# Run the optimization, passing the risk-free rate
opt_result = sco.minimize(self.negative_sharpe_ratio,
initial_weights,
# Pass mu, S, AND the risk_free_rate
args=(mu, S, self.annual_risk_free_rate),
method='SLSQP',
bounds=bounds,
constraints=constraints)
# --- End Pass Risk-Free Rate ---
if opt_result.success:
optimized_weights_raw = opt_result.x
# --- End Manual MVO ---
# 4. Process Results (Same as before)
# Check if the result is a list or array and has the correct length
if optimized_weights_raw is not None and hasattr(optimized_weights_raw, '__len__') and len(optimized_weights_raw) == len(symbols_in_returns):
# Scale optimized weights by the target equity weight
temp_weights = {}
for i, symbol in enumerate(symbols_in_returns):
weight = max(0, optimized_weights_raw[i]) # Ensure non-negative
temp_weights[symbol] = weight
# Normalize raw weights first (sum to 1) before scaling by equity_weight
raw_sum = sum(temp_weights.values())
if raw_sum > 1e-6: # Avoid division by zero
norm_factor = 1.0 / raw_sum
for symbol in temp_weights:
weights[symbol] = temp_weights[symbol] * norm_factor * equity_weight # Scale by equity weight
else: # If sum is near zero, distribute equity weight equally
self.Debug("Warning: Sum of raw optimized weights is near zero. Falling back to equal weight for optimized symbols.")
num_opt_stocks = len(symbols_in_returns)
weights = {symbol: equity_weight / num_opt_stocks for symbol in symbols_in_returns} if num_opt_stocks > 0 else {}
self.Debug(f"SciPy Optimization successful. Raw weights length: {len(optimized_weights_raw)}")
# Re-check final sum against equity_weight (due to potential floating point issues)
final_sum = sum(weights.values())
if abs(final_sum - equity_weight) > 0.01:
self.Debug(f"Warning: Final optimized weight sum {final_sum:.3f} differs from target {equity_weight:.3f}. Re-normalizing again.")
if final_sum > 1e-6:
renorm_factor = equity_weight / final_sum
for symbol in weights:
weights[symbol] *= renorm_factor
else: # Fallback again
num_opt_stocks = len(symbols_in_returns)
weights = {symbol: equity_weight / num_opt_stocks for symbol in symbols_in_returns} if num_opt_stocks > 0 else {}
weights_str = str({s.Value: f'{w:.3f}' for s, w in weights.items()}) # Use s.Value for cleaner logging
if len(weights_str) > 500: weights_str = weights_str[:500] + "...}"
self.Debug(f"Final optimized weights ({len(weights)} symbols): {weights_str}")
else: # Should not happen if opt_result.success is True, but keep as safeguard
raise ValueError(f"Optimization result length mismatch. Expected {len(symbols_in_returns)}, Got {len(optimized_weights_raw) if optimized_weights_raw is not None else 'None'}")
else:
# Optimization failed
self.Debug(f"SciPy optimization failed: {opt_result.message}. Falling back to equal weight for {len(filtered_symbols)} symbols.")
num_stocks = len(filtered_symbols)
weights = {symbol: equity_weight / num_stocks for symbol in filtered_symbols} if num_stocks > 0 else {}
except Exception as e:
self.Error(f"Error during portfolio optimization: {e}. Falling back to equal weight.")
# Fallback: Equal weight the filtered symbols
num_stocks = len(filtered_symbols)
weights = {symbol: equity_weight / num_stocks for symbol in filtered_symbols} if num_stocks > 0 else {}
else:
self.Debug(f"Skipping optimization. Filtered Symbols: {len(filtered_symbols)}, Equity Weight: {equity_weight:.2f}")
weights = {} # Ensure weights is empty dict if skipping
# --- End Portfolio Optimization Section ---
# --- Execution Logic ---
targets = []
final_symbols_targeted = set()
# 1. Add Equity Targets
for symbol, weight in weights.items():
if weight > 0.001: # Minimum weight threshold
targets.append(PortfolioTarget(symbol, weight))
final_symbols_targeted.add(symbol)
# Update entry price later if needed
# 2. Add BIL Target
if final_bil_weight > 0.001:
targets.append(PortfolioTarget(self.bil, final_bil_weight))
final_symbols_targeted.add(self.bil)
# 3. Add Defensive ETF Targets
active_defensive_targets = {}
for symbol, weight in all_defensive_allocations.items():
if weight > 0.001: # Use slightly higher threshold for defensive
targets.append(PortfolioTarget(symbol, weight))
final_symbols_targeted.add(symbol)
active_defensive_targets[symbol] = weight # Track for logging/entry price
self.defensive_positions.add(symbol)
elif symbol in self.defensive_positions:
# If allocation is zero but it was in our set, ensure it gets liquidated
self.defensive_positions.remove(symbol)
if symbol in self.entry_prices: del self.entry_prices[symbol]
# --- Single SetHoldings Call ---
if targets:
self.Debug(f"Setting {len(targets)} targets.")
self.SetHoldings(targets)
else:
self.Debug("No targets to set, liquidating non-SPY portfolio.")
self.Liquidate(exclude=[self.spy]) # Liquidate everything except SPY if no targets
# --- Update Entry Prices for newly targeted/increased positions ---
# Need to wait briefly for orders to potentially fill and portfolio to update
# Schedule a small action shortly after, or update based on target comparison
# Simpler: Update based on targets vs current holdings *before* SetHoldings
symbols_to_update_entry_price = set()
for symbol in final_symbols_targeted:
target_weight = 0
if symbol in weights: target_weight = weights[symbol]
elif symbol == self.bil: target_weight = final_bil_weight
elif symbol in active_defensive_targets: target_weight = active_defensive_targets[symbol]
current_holding_pct = 0
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
current_holding_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue
# Update if not invested or target weight is greater than current pct
if not self.Portfolio.ContainsKey(symbol) or not self.Portfolio[symbol].Invested or target_weight > current_holding_pct + 0.001: # Add tolerance
symbols_to_update_entry_price.add(symbol)
# Update entry prices *after* SetHoldings call (prices might change slightly)
for symbol in symbols_to_update_entry_price:
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.entry_prices[symbol] = self.Securities[symbol].Price
elif symbol in self.entry_prices: # Remove if price is invalid
del self.entry_prices[symbol]
# --- Liquidate Untargeted Positions ---
symbols_to_liquidate = []
for symbol in self.Portfolio.Keys:
holding = self.Portfolio[symbol]
# Liquidate if invested, not SPY, and not in our final target set
if holding.Invested and symbol != self.spy and symbol not in final_symbols_targeted:
symbols_to_liquidate.append(symbol)
if symbols_to_liquidate:
self.Debug(f"Liquidating {len(symbols_to_liquidate)} untargeted symbols: {[s.Value for s in symbols_to_liquidate]}")
for symbol in symbols_to_liquidate:
self.Liquidate(symbol)
if symbol in self.entry_prices: del self.entry_prices[symbol]
if symbol in self.defensive_positions: self.defensive_positions.remove(symbol) # Ensure cleanup
# Update last rebalance date tracker
self.last_rebalance_date = self.Time
# Update previous BIL allocation based on the *target* we just set
self.previous_bil_allocation = final_bil_weight
self.Debug(f"Target BIL allocation set to: {final_bil_weight*100:.2f}%.")
def _calculateMarketTrend(self):
"""Calculate recent market trend using price history"""
if len(self.spy_prices) < self.trend_lookback + 1:
return 0 # Not enough data
dates = sorted(self.spy_prices.keys())
if len(dates) <= self.trend_lookback:
return 0
recent_price = self.spy_prices[dates[-1]]
older_price = self.spy_prices[dates[-self.trend_lookback]]
return (recent_price / older_price) - 1.0
def _calculateSimpleMomentum(self):
"""Calculate enhanced momentum scores using multiple timeframes, emphasizing high momentum."""
momentum_scores = {}
symbols = [sym for sym, _ in self.selected_by_market_cap]
if not symbols:
return momentum_scores
# History needed for momentum calculation
history = self.History(symbols, 60, Resolution.Daily)
if history.empty:
return momentum_scores
raw_momentum = {} # Store raw weighted momentum for later ranking
for symbol in symbols:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 60:
mom_10d = prices.iloc[-1] / prices.iloc[-10] - 1 if prices.iloc[-10] > 0 else 0
mom_30d = prices.iloc[-1] / prices.iloc[-30] - 1 if prices.iloc[-30] > 0 else 0
mom_60d = prices.iloc[-1] / prices.iloc[-60] - 1 if prices.iloc[-60] > 0 else 0
weighted_momentum = (mom_10d * 0.4) + (mom_30d * 0.4) + (mom_60d * 0.2)
raw_momentum[symbol] = weighted_momentum # Store raw score
# Enhanced Score Scaling (remains the same)
if weighted_momentum < -0.05: score = 0.8
elif weighted_momentum < 0: score = 0.85 + (weighted_momentum * 1)
else:
score = 1.0 + (weighted_momentum * 1.5) + (weighted_momentum**2 * 2.0)
score = min(2.0, score)
# --- Return only the score ---
momentum_scores[symbol] = score
# ---
else:
momentum_scores[symbol] = 0.9 # Default score
raw_momentum[symbol] = -0.1 # Assign low raw score
self.raw_momentum_scores = raw_momentum
# --- Return dictionary of scores ---
return momentum_scores
# ---
def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced evaluation of inverse ETFs with more sensitive criteria"""
allocations = {symbol: 0 for symbol in self.inverse_etfs}
# More permissive consideration of inverse ETFs
if market_deviation > 0.04 and market_trend > 0.02:
return allocations # Only skip in very strong bull markets
# Get more history for better momentum calculation
history = self.History(self.inverse_etfs, 45, Resolution.Daily)
if history.empty:
return allocations
# Enhanced momentum scoring
momentum_scores = {}
volatility_scores = {}
for symbol in self.inverse_etfs:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Multiple timeframe momentum - more emphasis on recent performance
mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
mom_30d = prices.iloc[-1] / prices.iloc[0] - 1
# Weight recent momentum much more heavily
momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2)
# Calculate volatility (lower is better for inverse ETFs)
returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))]
volatility = np.std(returns) if returns else 0
# Calculate short-term rate of change (acceleration)
if len(prices) >= 10:
recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1
prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1
acceleration = recent_5d_change - prev_5d_change
else:
acceleration = 0
# Momentum score adds weight for accelerating performance
momentum_scores[symbol] = momentum + (acceleration * 0.5)
volatility_scores[symbol] = volatility
# More aggressive filtering - consider even small positive momentum
positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005}
# No allocation if no ETFs have at least neutral momentum
if not positive_momentum_etfs:
self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash")
return allocations
# Enhanced selection: favor momentum but consider volatility too
best_candidates = []
for symbol, score in positive_momentum_etfs.items():
volatility = volatility_scores.get(symbol, 1.0)
# Adjust score: higher momentum is good, lower volatility is good
adjusted_score = score - (volatility * 0.5)
best_candidates.append((symbol, score, adjusted_score))
# Sort by adjusted score
best_candidates.sort(key=lambda x: x[2], reverse=True)
# More aggressive allocation model
allocation_pct = 0.0
# Allocate based on market conditions with more sensitivity
if market_deviation < -0.05:
allocation_pct = 1.0 # Use 100% of available inverse allocation
elif market_deviation < -0.03:
allocation_pct = 0.8 # Use 80% of available inverse allocation
elif market_deviation < -0.01:
allocation_pct = 0.6 # Use 60% of available inverse allocation
elif market_deviation < 0.01: # Even in slight bull market if momentum is positive
allocation_pct = 0.4 # Use 40% of available inverse allocation
else:
allocation_pct = 0.2 # Use 20% only if momentum is strong enough
# No candidates or market conditions don't justify allocation
if not best_candidates or allocation_pct < 0.1:
return allocations
# Take top 1-2 ETFs depending on market conditions
num_etfs = 1
if market_deviation < -0.04 and len(best_candidates) > 1:
num_etfs = 2 # Use two ETFs in stronger downtrends
# Allocate to best ETF(s)
remaining_allocation = max_allocation * allocation_pct
for i in range(min(num_etfs, len(best_candidates))):
symbol, raw_score, _ = best_candidates[i]
# Allocate proportionally to momentum strength, with a minimum threshold
etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3
# Calculate allocation for this ETF
etf_allocation = remaining_allocation * etf_weight / num_etfs
# Only allocate if it's a meaningful amount
if etf_allocation >= 0.01: # At least 1% allocation
allocations[symbol] = etf_allocation
self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%")
return allocations
def _runDefensiveETFDiagnostics(self, market_deviation, market_trend):
"""Run detailed diagnostics on all defensive ETFs"""
# Get extensive history for analysis
history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily)
if history.empty:
return
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0,
"15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
# Log market conditions
self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " +
f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%")
# Analyze each ETF
for symbol in self.all_defensive:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate multiple timeframe performance
perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0
perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0
perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate recent acceleration
recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
accel = recent_5d - prev_5d
# Calculate relative performance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
if period == "7d":
rel_perf[period] = perf_7d - spy_val
elif period == "15d":
rel_perf[period] = perf_15d - spy_val
elif period == "30d":
rel_perf[period] = perf_30d - spy_val
# Log detailed ETF statistics
self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " +
f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " +
f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%")
def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation):
"""Enhanced defensive ETF evaluation with sector rotation"""
allocations = {symbol: 0 for symbol in self.all_defensive}
# Skip if market is very bullish
if market_deviation > 0.04 and market_trend > 0.02:
return allocations
# Get history for all defensive options and SPY
history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily)
if history.empty:
return allocations
# Detailed diagnostics on all ETFs
self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS:")
# Calculate SPY performance for relative comparisons
spy_perf = {}
if self.spy in history.index.get_level_values(0):
spy_prices = history.loc[self.spy]['close']
if len(spy_prices) >= 30:
spy_perf = {
"5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0,
"10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0,
"20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0,
"30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1
}
self.Debug(f" SPY: 5d: {spy_perf['5d']*100:.1f}%, 10d: {spy_perf['10d']*100:.1f}%, " +
f"20d: {spy_perf['20d']*100:.1f}%, 30d: {spy_perf['30d']*100:.1f}%")
# Enhanced scoring system with different criteria for different ETF types
etf_scores = {}
# Process each ETF by type
for group_name, group in [("Inverse", self.inverse_etfs),
("Alternative", self.alternative_defensive),
("Sector", self.sector_defensive)]:
self.Debug(f" {group_name} ETFs:")
for symbol in group:
if symbol in history.index.get_level_values(0):
prices = history.loc[symbol]['close']
if len(prices) >= 30:
# Calculate absolute momentum components
perf = {}
perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0
perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0
perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0
perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1
# Calculate relative outperformance vs SPY
rel_perf = {}
for period, spy_val in spy_perf.items():
rel_perf[period] = perf[period] - spy_val
# Log detailed performance
self.Debug(f" {symbol}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf['5d']*100:+.1f}%), " +
f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf['10d']*100:+.1f}%), " +
f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf['30d']*100:+.1f}%)")
# Inverse ETFs need to show positive momentum in down markets
if symbol in self.inverse_etfs:
# In downtrends, rising inverse ETFs are good
if market_deviation < -0.02:
score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2)
# Bonus for relative outperformance
score += (rel_perf["5d"] + rel_perf["10d"]) * 0.15
else:
# Less emphasis on long-term performance in neutral markets
score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1)
# Alternative defensive (bonds, gold) - focus on absolute return
elif symbol in self.alternative_defensive:
# Less dramatic movements, need lower thresholds
score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3)
# In downtrends, emphasize relative performance more
if market_deviation < -0.03:
score += rel_perf["10d"] * 0.2 # Bonus for outperformance
# Sector ETFs - focus on relative outperformance
else:
# These should have positive absolute returns and outperform SPY
abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4)
rel_score = (rel_perf["5d"] * 0.3) + (rel_perf["10d"] * 0.3) + (rel_perf["30d"] * 0.4)
# Balance absolute and relative performance
if market_deviation < -0.02:
# In downtrends, relative outperformance is more important
score = (abs_score * 0.4) + (rel_score * 0.6)
else:
# In neutral markets, absolute performance matters more
score = (abs_score * 0.6) + (rel_score * 0.4)
etf_scores[symbol] = score
# Find candidates with appropriate momentum based on market conditions
threshold = -0.007 # Default threshold
if market_deviation < -0.03:
threshold = -0.01 # More permissive in stronger downturns
candidates = {s: score for s, score in etf_scores.items() if score > threshold}
if not candidates:
self.Debug("No defensive ETFs showed sufficient momentum - keeping as cash")
return allocations
# Sort and log candidate scores
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
self.Debug(f"Top 5 defensive candidates:")
for symbol, score in sorted_candidates[:5]:
group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f" {symbol} ({group}): Score {score*100:.2f}%")
# Set allocation percent based on market conditions and trend
allocation_pct = 0.0
if market_deviation < -0.05 or market_trend < -0.04:
allocation_pct = 0.95 # Almost all available allocation
elif market_deviation < -0.03 or market_trend < -0.02:
allocation_pct = 0.8
elif market_deviation < -0.01 or market_trend < -0.01:
allocation_pct = 0.6
else:
allocation_pct = 0.4
# Adjust allocation based on strength of best candidate
best_score = sorted_candidates[0][1] if sorted_candidates else 0
allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4))
# Determine number of ETFs to use - more in stronger downtrends
num_etfs = 1
if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1:
num_etfs = min(2, len(sorted_candidates))
# Allocate to best candidates
remaining_allocation = max_allocation * allocation_pct
total_score = sum(score for _, score in sorted_candidates[:num_etfs])
if total_score > 0:
for i in range(num_etfs):
symbol, score = sorted_candidates[i]
# Weight by relative score
weight = score / total_score if total_score > 0 else 1.0/num_etfs
# Calculate allocation
etf_allocation = remaining_allocation * weight
# Only allocate if meaningful
if etf_allocation >= 0.02: # 2% minimum allocation
allocations[symbol] = etf_allocation
etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector"
self.Debug(f"Selected {etf_type} ETF {symbol} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%")
return allocations
def portfolio_performance(self, weights, mean_returns, cov_matrix):
# ... (existing code) ...
weights = np.array(weights) # Ensure weights is numpy array
returns_p = np.sum(mean_returns * weights)
std_dev = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
return returns_p, std_dev
def negative_sharpe_ratio(self, weights, mean_returns, cov_matrix, risk_free_rate=0.0):
# ... (existing code) ...
p_returns, p_stddev = self.portfolio_performance(weights, mean_returns, cov_matrix) # Use self.portfolio_performance
return -(p_returns - risk_free_rate) / (p_stddev + 1e-9)
"""
Based on 'In & Out' strategy by Peter Guenther 10-04-2020
expanded/inspired by Tentor Testivis, Dan Whitnable (Quantopian), Vladimir, and Thomas Chang.
https://www.quantopian.com/posts/new-strategy-in-and-out
"""
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
# Import packages
import numpy as np
import pandas as pd
import scipy as sc
class InOut(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2024, 1, 1) # Set Start Date
self.SetEndDate(2026, 1, 1)
self.SetCash(1000000) # Set Strategy Cash
self.UniverseSettings.Resolution = Resolution.Daily
# Feed-in constants
self.INI_WAIT_DAYS = 15 # out for 3 trading weeks
res = Resolution.Minute
self.MRKT = self.AddEquity('QQQ', res).Symbol
self.TLT = self.AddEquity('TLT', res).Symbol
self.IEF = self.AddEquity('IEF', res).Symbol
# Market and list of signals based on ETFs
self.PRDC = self.AddEquity('XLI', res).Symbol # production (industrials)
self.METL = self.AddEquity('DBB', res).Symbol # input prices (metals)
self.NRES = self.AddEquity('IGE', res).Symbol # input prices (natural res)
self.DEBT = self.AddEquity('SHY', res).Symbol # cost of debt (bond yield)
self.USDX = self.AddEquity('UUP', res).Symbol # safe haven (USD)
self.GOLD = self.AddEquity('GLD', res).Symbol # gold
self.SLVA = self.AddEquity('SLV', res).Symbol # VS silver
self.UTIL = self.AddEquity('XLU', res).Symbol # utilities
self.SHCU = self.AddEquity('FXF', res).Symbol # safe haven (CHF)
self.RICU = self.AddEquity('FXA', res).Symbol # risk currency (AUD)
self.INDU = self.PRDC # vs industrials
self.FORPAIRS = [self.GOLD, self.SLVA, self.UTIL, self.SHCU, self.RICU]
self.SIGNALS = [self.PRDC, self.METL, self.NRES, self.DEBT, self.USDX]
# 'In' and 'out' holdings incl. weights
self.HLD_IN = {self.MRKT: 1.0}
self.HLD_OUT = {self.TLT: .5, self.IEF: .5}
# Initialize variables
## 'In'/'out' indicator
self.be_in = 1
## Day count variables
self.dcount = 0 # count of total days since start
self.outday = 0 # dcount when self.be_in=0
## Flexi wait days
self.WDadjvar = self.INI_WAIT_DAYS
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen('QQQ', 120),
self.rebalance_when_out_of_the_market
)
self.Schedule.On(
self.DateRules.WeekEnd(),
self.TimeRules.AfterMarketOpen('QQQ', 120),
self.rebalance_when_in_the_market
)
def rebalance_when_out_of_the_market(self):
# Returns sample to detect extreme observations
hist = self.History(
self.SIGNALS + [self.MRKT] + self.FORPAIRS, 252, Resolution.Daily)['close'].unstack(level=0).dropna()
# hist_shift = hist.rolling(66).apply(lambda x: x[:11].mean())
hist_shift = hist.apply(lambda x: (x.shift(65) + x.shift(64) + x.shift(63) + x.shift(62) + x.shift(
61) + x.shift(60) + x.shift(59) + x.shift(58) + x.shift(57) + x.shift(56) + x.shift(55)) / 11)
returns_sample = (hist / hist_shift - 1)
# Reverse code USDX: sort largest changes to bottom
returns_sample[self.USDX] = returns_sample[self.USDX] * (-1)
# For pairs, take returns differential, reverse coded
returns_sample['G_S'] = -(returns_sample[self.GOLD] - returns_sample[self.SLVA])
returns_sample['U_I'] = -(returns_sample[self.UTIL] - returns_sample[self.INDU])
returns_sample['C_A'] = -(returns_sample[self.SHCU] - returns_sample[self.RICU])
self.pairlist = ['G_S', 'U_I', 'C_A']
# Extreme observations; statist. significance = 1%
pctl_b = np.nanpercentile(returns_sample, 1, axis=0)
extreme_b = returns_sample.iloc[-1] < pctl_b
# Determine waitdays empirically via safe haven excess returns, 50% decay
self.WDadjvar = int(
max(0.50 * self.WDadjvar,
self.INI_WAIT_DAYS * max(1,
#returns_sample[self.GOLD].iloc[-1] / returns_sample[self.SLVA].iloc[-1],
#returns_sample[self.UTIL].iloc[-1] / returns_sample[self.INDU].iloc[-1],
#returns_sample[self.SHCU].iloc[-1] / returns_sample[self.RICU].iloc[-1]
np.where((returns_sample[self.GOLD].iloc[-1]>0) & (returns_sample[self.SLVA].iloc[-1]<0) & (returns_sample[self.SLVA].iloc[-2]>0), self.INI_WAIT_DAYS, 1),
np.where((returns_sample[self.UTIL].iloc[-1]>0) & (returns_sample[self.INDU].iloc[-1]<0) & (returns_sample[self.INDU].iloc[-2]>0), self.INI_WAIT_DAYS, 1),
np.where((returns_sample[self.SHCU].iloc[-1]>0) & (returns_sample[self.RICU].iloc[-1]<0) & (returns_sample[self.RICU].iloc[-2]>0), self.INI_WAIT_DAYS, 1)
))
)
adjwaitdays = min(60, self.WDadjvar)
# self.Debug('{}'.format(self.WDadjvar))
# Determine whether 'in' or 'out' of the market
if (extreme_b[self.SIGNALS + self.pairlist]).any():
self.be_in = False
self.outday = self.dcount
if self.dcount >= self.outday + adjwaitdays:
self.be_in = True
self.dcount += 1
# Swap to 'out' assets if applicable
if not self.be_in:
# Close 'In' holdings
for asset, weight in self.HLD_IN.items():
self.SetHoldings(asset, 0)
for asset, weight in self.HLD_OUT.items():
self.SetHoldings(asset, weight)
self.Plot("In Out", "in_market", int(self.be_in))
self.Plot("In Out", "num_out_signals", extreme_b[self.SIGNALS + self.pairlist].sum())
self.Plot("Wait Days", "waitdays", adjwaitdays)
def rebalance_when_in_the_market(self):
# Swap to 'in' assets if applicable
if self.be_in:
# Close 'Out' holdings
for asset, weight in self.HLD_OUT.items():
self.SetHoldings(asset, 0)
for asset, weight in self.HLD_IN.items():
self.SetHoldings(asset, weight)