| Overall Statistics |
|
Total Orders 341 Average Win 0.16% Average Loss -0.15% Compounding Annual Return 11.871% Drawdown 3.500% Expectancy 0.103 Start Equity 100000 End Equity 102487.86 Net Profit 2.488% Sharpe Ratio 0.356 Sortino Ratio 0.521 Probabilistic Sharpe Ratio 49.069% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 1.02 Alpha 0.075 Beta 0.288 Annual Standard Deviation 0.086 Annual Variance 0.007 Information Ratio 1.525 Tracking Error 0.121 Treynor Ratio 0.106 Total Fees $444.89 Estimated Strategy Capacity $700000000.00 Lowest Capacity Asset EWW R735QTJ8XC9X Portfolio Turnover 55.40% |
# region imports
from AlgorithmImports import *
# endregion
import random
from collections import deque
class RLPortfolioAllocation(QCAlgorithm):
def Initialize(self):
# Set start date and cash
self.SetStartDate(2025, 1, 1)
self.InitCash = 100000
self.set_cash(self.InitCash)
# Configure benchmark charting
self.MKT = self.add_equity("SPY", Resolution.DAILY).Symbol
self.spy = []
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.BeforeMarketClose('SPY', 15),
self.atdayend)
# Portfolio allocation percentage (75% of portfolio will be invested)
self.portfolio_allocation_percent = 0.80
# Set rebalance frequency (daily, weekly, or monthly)
self.rebalance_frequency = "daily" # Options: "daily", "weekly", "monthly"
# Define tickers to trade
self.tickers = ["TLT", "XBI", "XLU", "SLV", "GLD", "GDXJ", "GDX", "EWW", "XRT", "EWY", "XHB", "IWM", "EWJ", "XLP", "EEM", "IYR", "DIA", "XLV", "EFA", "XLB", "DJX", "XLY", "SMH", "OIH", "KRE", "FXI", "XOP", "EWZ", "XME", "XLE", "QQQ", "XLK", "XLF", "SPY"]
self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in self.tickers]
for symbol in self.symbols:
option = self.add_option(symbol, Resolution.DAILY)
option.set_filter(self._contract_selector)
# Cache for forward IV values
self.forward_iv_cache = {}
# RL parameters
self.learning_rate = 0.1 # Learning rate
self.gamma = 0.95 # Discount factor
self.epsilon = 1.0 # Initial exploration rate
self.epsilon_decay = 0.995 # Exploration decay
self.epsilon_min = 0.01 # Minimum exploration
self.memory = deque(maxlen=2000) # Replay memory
self.batch_size = 32 # Training batch size
# Define state and action spaces
self.state_size = len(self.tickers) * 2 # Price changes and volatility for each ticker
self.action_size = 11 # Allocation percentages (0%, 10%, 20%, ..., 100%)
# Initialize Q-table for simplicity (a neural network would be better for production)
# Q[state][action] = expected future reward
self.q_table = {}
# Track performance metrics
self.portfolio_returns = []
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
# Set next rebalance time
self.SetNextRebalance()
def _contract_selector(self, option_filter_universe: OptionFilterUniverse) -> OptionFilterUniverse:
return (
option_filter_universe
.include_weeklys()
#.calls_only() # calls only
.expiration(10, 30)
.IV(0, 100) # Filter contracts between 20% and 80%
)
def SetNextRebalance(self):
current_time = self.Time
if self.rebalance_frequency == "daily":
self.next_rebalance = current_time + timedelta(days=1)
elif self.rebalance_frequency == "weekly":
days_to_add = 7 - current_time.weekday() if current_time.weekday() < 5 else 7 - current_time.weekday() + 7
self.next_rebalance = current_time + timedelta(days=days_to_add)
elif self.rebalance_frequency == "monthly":
# Move to first day of next month
if current_time.month == 12:
next_month = datetime(current_time.year + 1, 1, 1)
else:
next_month = datetime(current_time.year, current_time.month + 1, 1)
self.next_rebalance = next_month
self.Log(f"Next rebalance scheduled for: {self.next_rebalance}")
def OnData(self, data):
# Update forward IV calculations when we receive option data
for symbol in self.symbols:
if self.Time.day == 1 or symbol not in self.forward_iv_cache: # Calculate on first day of month or if not calculated yet
forward_iv = self.get_forward_implied_volatility(symbol)
if forward_iv is not None:
self.forward_iv_cache[symbol] = forward_iv
self.Log(f"Updated forward IV for {symbol}: {forward_iv:.4f}")
if self.Time >= self.next_rebalance:
self.Rebalance(data)
self.SetNextRebalance()
def get_forward_implied_volatility(self, underlying_symbol):
# Fetch the option chain for the given symbol
option_chain = self.option_chain(underlying_symbol, flatten=True).data_frame
self.Debug(f"Fetched option chain for {underlying_symbol} at {self.Time}")
if option_chain is None or option_chain.empty:
self.Debug(f"Option chain empty: {option_chain}")
self.Log(f"Option chain empty: {option_chain}")
return None
# Calculate the next month
next_month = self.Time.month % 12 + 1 # Get the next month
next_year = self.Time.year if next_month > self.Time.month else self.Time.year + 1
# Filter for options expiring in the next month (long-term options)
next_month_options = option_chain[
(option_chain['expiry'].dt.month == next_month) &
(option_chain['expiry'].dt.year == next_year)
]
if next_month_options.empty:
self.error(f"No options available for the next month ({next_month})")
self.Log(f"No options available for the next month ({next_month})")
return None
# Filter for options expiring this month (short-term options)
current_month_options = option_chain[
(option_chain['expiry'].dt.month == self.Time.month) &
(option_chain['expiry'].dt.year == self.Time.year)
]
if current_month_options.empty:
self.error(f"No options available for the current month ({self.Time.month})")
self.Log(f"No options available for the current month ({self.Time.month})")
return None
# Calculate moneyness for both sets of options
underlying_price = self.Securities[underlying_symbol].Price
option_chain['moneyness'] = option_chain['strike'] / underlying_price
# Select ATM options for current month based on moneyness
atm_threshold = 0.05 # Only options within 5% of ATM are selected.
short_term_atm_options = current_month_options[
abs(option_chain['moneyness'] - 1) <= atm_threshold
]
# Select ATM options for next month
long_term_atm_options = next_month_options[
abs(option_chain['moneyness'] - 1) <= atm_threshold
]
if short_term_atm_options.empty:
self.error(f"No ATM options available for the current month ({self.Time.month})")
self.Log(f"No ATM options available for the current month ({self.Time.month})")
return None
if long_term_atm_options.empty:
self.error(f"No ATM options available for the next month ({next_month})")
self.Log(f"No ATM options available for the next month ({next_month})")
return None
# Extract the implied volatility for both sets of ATM options
short_term_iv = short_term_atm_options['impliedvolatility'].mean()
long_term_iv = long_term_atm_options['impliedvolatility'].mean()
# Calculate the time to expiry between options (in years)
T1 = 0
T2 = (next_month_options['expiry'].iloc[0] - self.Time).days / 365.0
if T2 == T1:
self.Debug(f"Error: Time to expiry for both options is the same for {underlying_symbol}. Cannot calculate forward IV.")
self.Log(f"Error: Time to expiry for both options is the same for {underlying_symbol}. Cannot calculate forward IV.")
return None
if (T2 * long_term_iv**2 - T1 * short_term_iv**2) < 0:
self.Debug(f"Error: The calculated value for forward IV is negative for {underlying_symbol}, T1={T1} T2={T2}")
self.Log(f"Error: The calculated value for forward IV is negative for {underlying_symbol}, T1={T1} T2={T2}")
return None
# Calculate forward implied volatility using the formula
try:
forward_iv = math.sqrt((T2 * long_term_iv**2 - T1 * short_term_iv**2) / (T2 - T1))
return forward_iv
except ValueError as e:
self.Debug(f"Math error while calculating forward IV for {underlying_symbol}: {e}")
return None
def GetState(self, data):
"""Convert market data to state representation with fallback for missing data"""
state = []
for symbol in self.symbols:
try:
# Get price history (last 10 days)
history = self.History(symbol, 10, Resolution.Daily)
# If we don't have enough history, use default values
if len(history) < 10:
self.Log(f"Insufficient history for {symbol.Value}, using default values")
# Default: no price change and low volatility
state.extend([0.0, 0.001])
continue
# Calculate features
prices = history["close"].values
returns = np.diff(prices) / prices[:-1]
# 5-day price change
price_change = (prices[-1] / prices[-5]) - 1
# 10-day volatility
volatility = np.std(returns)
forward_iv = self.forward_iv_cache.get(symbol, volatility)
state.extend([price_change, volatility, forward_iv])
except Exception as e:
# Log the error but continue with default values
self.Log(f"Error calculating state for {symbol.Value}: {str(e)}")
# Default: no price change and low volatility
state.extend([0.0, 0.001])
# Convert to tuple for dictionary key
return tuple(state)
# Convert to tuple for dictionary key
return tuple(state)
def ChooseAction(self, state):
"""Select allocation percentages using epsilon-greedy policy"""
if state not in self.q_table:
self.q_table[state] = np.zeros(self.action_size)
# Exploration: random action
if random.random() < self.epsilon:
# Generate a valid allocation (must sum to 100%)
allocations = []
for _ in range(len(self.symbols) - 1):
# Limit remaining allocation choices
remaining = 100 - sum(allocations)
alloc = random.randint(0, min(100, remaining))
allocations.append(alloc)
# Last allocation makes sum = 100%
allocations.append(100 - sum(allocations))
return allocations
# Exploitation: best action based on Q-values
else:
# For simplicity, we'll use the Q-values to determine allocations sequentially
# This is a simplified approach; a better method would consider joint allocations
allocations = []
remaining = 100
for i in range(len(self.symbols) - 1):
# Mask unavailable actions (those that would exceed 100%)
masked_q = self.q_table[state].copy()
masked_q[masked_q > remaining] = -np.inf
# Choose best allocation percentage
alloc = int((np.argmax(masked_q) / (self.action_size - 1)) * 100)
alloc = min(alloc, remaining) # Ensure we don't exceed 100%
allocations.append(alloc)
remaining -= alloc
# Last allocation is whatever remains to reach 100%
allocations.append(remaining)
return allocations
def Rebalance(self, data):
"""Execute the RL-based portfolio rebalancing"""
self.Log(f"Rebalancing portfolio on {self.Time}")
# Get current state
state = self.GetState(data)
if state is None:
self.Log("Insufficient data for state calculation")
return
# Get action (allocation percentages)
allocations = self.ChooseAction(state)
# Calculate portfolio value before rebalancing
portfolio_value_before = self.Portfolio.TotalPortfolioValue
# Execute rebalance - using only 75% of portfolio
for i, symbol in enumerate(self.symbols):
# Scale the allocation by the portfolio_allocation_percent (75%)
scaled_percent = (allocations[i] / 100.0) * self.portfolio_allocation_percent
self.SetHoldings(symbol, scaled_percent)
self.Log(f"Allocated {scaled_percent:.2%} to {symbol.Value}")
# Log cash percentage
cash_percent = 1 - self.portfolio_allocation_percent
self.Log(f"Keeping {cash_percent:.2%} in cash")
# Store the state and action for learning after we observe the reward
self.current_state = state
self.current_action = allocations
# Decay epsilon for exploration
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def atdayend(self):
"""Update the RL model based on portfolio performance"""
if hasattr(self, 'current_state') and hasattr(self, 'current_action'):
# Calculate daily return as reward
daily_return = self.Portfolio.TotalPortfolioValue / self.previous_portfolio_value - 1
# Store current value for next day's calculation
self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue
self.portfolio_returns.append(daily_return)
# Store experience in replay memory
# For simplicity, we use a placeholder for next_state
# In a more sophisticated implementation, we would calculate the actual next state
next_state = self.current_state # Placeholder
reward = daily_return * 100 # Scale for better learning
self.memory.append((self.current_state, self.current_action, reward, next_state))
# Learn from batch of experiences
if len(self.memory) >= self.batch_size:
self.ReplayExperience()
self.record_vars()
def ReplayExperience(self):
"""Learn from past experiences using replay memory"""
# Sample batch of experiences
batch = random.sample(self.memory, min(len(self.memory), self.batch_size))
for state, action, reward, next_state in batch:
# If this state isn't in our Q-table, initialize it
if state not in self.q_table:
self.q_table[state] = np.zeros(self.action_size)
# If next_state isn't in our Q-table, initialize it
if next_state not in self.q_table:
self.q_table[next_state] = np.zeros(self.action_size)
# Update Q-value using the Bellman equation
# For simplicity, we update each symbol allocation independently
for i, alloc in enumerate(action):
# Convert allocation to action index
action_idx = int((alloc / 100) * (self.action_size - 1))
# Current Q-value
current_q = self.q_table[state][action_idx]
# Next maximum Q-value
max_next_q = np.max(self.q_table[next_state])
# Update Q-value
new_q = current_q + self.learning_rate * (reward + self.gamma * max_next_q - current_q)
self.q_table[state][action_idx] = new_q
def OnEndOfAlgorithm(self):
"""Log performance metrics at end of algorithm"""
self.Log(f"Final Portfolio Value: ${self.Portfolio.TotalPortfolioValue}")
self.Log(f"Average Daily Return: {np.mean(self.portfolio_returns):.4%}")
self.Log(f"Final Exploration Rate (Epsilon): {self.epsilon:.4f}")
self.Log(f"Q-Table Size: {len(self.q_table)}")
# Plot SPY benchmark
def record_vars(self):
hist = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).dropna()
self.spy.append(hist[self.MKT].iloc[-1])
spy_perf = self.spy[-1] / self.spy[0] * self.InitCash
self.Plot('Strategy Equity', 'SPY', spy_perf)