| Overall Statistics |
|
Total Orders 2055 Average Win 0.41% Average Loss -0.42% Compounding Annual Return 5.664% Drawdown 26.500% Expectancy 0.240 Start Equity 100000 End Equity 277513.87 Net Profit 177.514% Sharpe Ratio 0.293 Sortino Ratio 0.299 Probabilistic Sharpe Ratio 1.026% Loss Rate 37% Win Rate 63% Profit-Loss Ratio 0.98 Alpha -0.003 Beta 0.376 Annual Standard Deviation 0.074 Annual Variance 0.005 Information Ratio -0.409 Tracking Error 0.11 Treynor Ratio 0.057 Total Fees $0.00 Estimated Strategy Capacity $3300000.00 Lowest Capacity Asset BND TRO5ZARLX6JP Portfolio Turnover 3.36% Drawdown Recovery 712 |
# The explicit AddReference is often not strictly needed but is kept for compatibility.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import * # Imports core enums like OrderEventStatus and AccountType
from QuantConnect.Brokerages import BrokerageName
from QuantConnect import AccountType
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
from sklearn import mixture as mix
# --- Define the problematic constant globally (Value of OrderEventStatus.Filled = 3)
ORDER_STATUS_FILLED = 3
# ------------------------------------------------------------------------------
# --- ALGO UTILS FUNCTIONS (INTEGRATED) ---
# ------------------------------------------------------------------------------
def how_many_days(current_date, most_recent_date):
"""Calculate the number of days to request history."""
return int((current_date - most_recent_date).days) + 1
def zero_days_to_request(days_to_request):
"""Check if the history data is up to date."""
return days_to_request < 1
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes for updates"""
# Ensures no errors when combining/unstacking history data
if isinstance(new, pd.DataFrame) and 'close' in new.columns.names:
new = new['close'].unstack(level=0)
both = pd.concat([old, new])
return both.drop_duplicates().sort_index().iloc[-lookback:]
def make_returns(df):
"""Calculate log returns for a price DataFrame."""
return np.log(df / df.shift(1))
def make_gmm(n_components, random_state):
"""Factory function for GaussianMixtureModel."""
return mix.GaussianMixture(n_components=n_components, random_state=random_state)
def make_final_pred_df(pred_rows, cols, thres, sym):
"""Process prediction rows into a DataFrame and apply trading logic."""
df = pd.DataFrame(pred_rows, columns=cols)
df['symbol'] = sym
# Determine which class is the 'high-return' state by comparing means
high_mean_class = 1 if df['last_mean_class_1'].iloc[0] > df['last_mean_class_0'].iloc[0] else 0
if high_mean_class == 1:
# Check if the probability of the high-return state (Class 1) > threshold
df['buys'] = np.where(df['last_prob_class_1'] > thres, 1, 0)
else:
# Check if the probability of the high-return state (Class 0) > threshold
df['buys'] = np.where(df['last_prob_class_0'] > thres, 1, 0)
return df.tail(1)
# --- Other Utilities (Remaining functions from algo_utils, unused by main logic) ---
def calc_quantile_var(data, alpha=0.05): return data.quantile(alpha)
def calc_historical_var(data, alpha=0.05):
if isinstance(data, pd.DataFrame): data = data.squeeze()
return calc_quantile_var(data, alpha=alpha)
def get_open_order_secs(open_orders):
if open_orders: return [order.Symbol for order in open_orders]
return []
# ------------------------------------------------------------------------------
# init parameter registry
# ------------------------------------------------------------------------------
PARAMETER_REGISTRY = {}
def register_param(name, value):
PARAMETER_REGISTRY[name] = value
return value
# -----------------------------------------------------------------------------
# algorithm class
# -----------------------------------------------------------------------------
class TradingWithGMM(QCAlgorithm):
def Initialize(self):
self.INIT_PORTFOLIO_CASH = register_param('portfolio starting cash', 100000)
self.SetStartDate(2007,4,10)
self.SetEndDate(2025, 10, 13) # Set End Date
self.SetCash(self.INIT_PORTFOLIO_CASH)
self.SetBrokerageModel(BrokerageName.ALPACA, AccountType.Margin)
# Correct Symbol Handling
self.BASE_SYMBOL_TICKER = register_param('base symbol for algorithm management: ', 'SPY')
self.tickers = ["SPY", "QQQ", "DIA", "TLT", "GLD", "EFA", "EEM", "BND", "VNQ"]
self.symbols = []
for ticker in self.tickers:
security = self.AddEquity(ticker, Resolution.Minute)
self.symbols.append(security.Symbol)
self.BASE_SYMBOL = self.Securities[self.BASE_SYMBOL_TICKER].Symbol
self.exchange = self.Securities[self.BASE_SYMBOL].Exchange
self.openMarketOnOpenOrders = []
self._init_prices = False
self._longs = list()
self._shorts = list()
register_param('symbols: ', [s.Value for s in self.symbols])
self._holding_period = register_param('holding period (days)', 30)
# MODIFICATION 1: Set lookback to 60 days
self.LOOKBACK = register_param('historical lookback (days)', 60)
self.BET_SIZE = register_param('bet size (%)', 1/len(self.symbols))
self.RANDOM_STATE = register_param('random_state', 777)
self.ALPHA = register_param('gmm alpha', 0.95)
# MODIFICATION 2: Set GMM components to 2
self.N_COMPONENTS = register_param('gmm n components', 2)
self.THRES = register_param('threshold probability for buy signal', 0.9)
self.SAMPLES = register_param('number of samples for bootstrap', 1000)
self.HISTORY_RESOLUTION = Resolution.Daily
register_param('history api resolution', str(self.HISTORY_RESOLUTION))
# Charting setup remains the same
self.splotName = 'Strategy Info'
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series('RAM', SeriesType.Line, 0))
sPlot.AddSeries(Series('Time', SeriesType.Line, 1))
sPlot.AddSeries(Series('Cash', SeriesType.Line, 2))
sPlot.AddSeries(Series('Leverage', SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# Scheduled functions
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 5), Action(self.init_prices))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 10), Action(self.run_main_algo))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 30), Action(self.send_orders))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 35), Action(self.check_liquidate))
self.Schedule.On(self.DateRules.EveryDay(self.BASE_SYMBOL), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 40), Action(self.CHART_RAM))
self.Debug('\n'+'-'*77+'\nPARAMETER REGISTRY\n{}...'.format(json.dumps(PARAMETER_REGISTRY, indent=2)))
def init_prices(self):
if not self.symbols: self.Log('no symbols'); return
if self._init_prices: return
self.prices = (self.History(self.symbols, self.LOOKBACK, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self._init_prices=True
def update_prices(self):
most_recent_date = self.prices.index.max()
current_date = self.Time
days_to_request = how_many_days(current_date, most_recent_date)
if zero_days_to_request(days_to_request): return
new_prices = (self.History(self.symbols, days_to_request, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self.prices = make_update_df(self.prices, new_prices, self.LOOKBACK)
return
def check_liquidate(self):
"""
Robust liquidation check using GetOrderTickets() and manual filtering.
Uses the integer value of OrderEventStatus.Filled (which is 3).
"""
self.Log('\n'+'-'*77+'\n[{}] checking liquidation status...'.format(self.UtcTime))
# Retrieve ALL order tickets once
all_tickets = self.Transactions.GetOrderTickets()
for holding in self.Portfolio.Values:
if not holding.Invested:
continue
# Filter all tickets to find those matching the current holding's Symbol
symbol_tickets = [t for t in all_tickets if t.Symbol == holding.Symbol]
latest_fill_event = None
# Find the latest FILL event that was a BUY order
for ticket in symbol_tickets:
# Use the raw integer value of the status (OrderEventStatus.Filled == 3)
fill_events = [e for e in ticket.OrderEvents if e.Status == ORDER_STATUS_FILLED and e.FillQuantity > 0]
if fill_events:
# Find the latest fill event across all tickets
current_latest_fill = max(fill_events, key=lambda x: x.UtcTime)
if latest_fill_event is None or current_latest_fill.UtcTime > latest_fill_event.UtcTime:
latest_fill_event = current_latest_fill
if latest_fill_event:
entry_time = latest_fill_event.UtcTime
# Check if the current time is past the entry time + holding period
if self.UtcTime >= (entry_time + timedelta(self._holding_period)):
self.Liquidate(holding.Symbol)
fmt_args = (self.UtcTime, holding.Symbol.Value, entry_time, self.UtcTime - entry_time)
self.Log('[{}] liquidating... {}, order date: {}, time delta: {}'.format(*fmt_args))
return
def run_main_algo(self):
self.Log('\n'+'-'*77+'\n[{}] Begin main algorithm computation...'.format(self.UtcTime))
start_time = time.time()
self.update_prices()
self._algo_data = False
self._longs = list()
self._shorts = list()
for sym_obj in self.symbols:
sym = sym_obj.Value
try:
self.Log('checking symbol: {}'.format(str(sym)))
pred_rows = list()
# Check if we should perform the GMM calculation
if (not self.Portfolio[sym_obj].Invested):
if sym not in self.prices.columns: continue
train_px = self.prices.copy()
train_ts = make_returns(train_px)[sym].dropna()
train_ts = train_ts[np.isfinite(train_ts)]
if train_ts.shape[0] < self.N_COMPONENTS + 1: # Ensure enough data points for GMM
self.Debug('{} train data has too few samples (<{})'.format(str(sym), self.N_COMPONENTS + 1))
continue
tmp_X_train = train_ts.values.reshape(-1, 1)
### fit GMM ###
gmm = make_gmm(n_components=self.N_COMPONENTS, random_state=self.RANDOM_STATE).fit(tmp_X_train)
hidden_states = gmm.predict(tmp_X_train)
hidden_state_prob = pd.DataFrame(gmm.predict_proba(tmp_X_train), columns=['s1','s2'], index=train_ts.index)
state_df = train_ts.to_frame()
hs_prob_df = (pd.concat([state_df, hidden_state_prob],axis=1))
# Variables are defined here:
s1_mu = hs_prob_df.query('abs(s1)>0.5')[sym].mean()
s2_mu = hs_prob_df.query('abs(s2)>0.5')[sym].mean()
s1_std = hs_prob_df.query('abs(s1)>0.5')[sym].std()
s2_std = hs_prob_df.query('abs(s2)>0.5')[sym].std()
# GMM state and interval calculation
last_state = hidden_states[-1]
last_mean = gmm.means_[last_state][0]
last_var = np.diag(gmm.covariances_[last_state])[0]
rvs = gmm.sample(self.SAMPLES)[0]
# Corrected argument name for scipy.stats compatibility
low_ci, high_ci = stats.norm.interval(confidence=self.ALPHA, loc=np.mean(rvs), scale=np.std(rvs))
tmp_ret = np.log(float(self.Securities[sym_obj].Price) / train_px[sym].iloc[-1])
### Row creation is now safely inside the calculation block
row = (train_ts.index[-1], last_state, last_mean, np.sqrt(last_var),
low_ci, high_ci, tmp_ret,
gmm.means_.ravel()[0], gmm.means_.ravel()[1],
np.sqrt(np.diag(gmm.covariances_[0]))[0], np.sqrt(np.diag(gmm.covariances_[1]))[0],
hidden_state_prob.iloc[-1][0], hidden_state_prob.iloc[-1][1],
s1_mu,s2_mu,s1_std,s2_std)
pred_rows.append(row)
self.Debug('{} rowzz:\n{}'.format(str(sym), row))
if pred_rows:
cols = ['Dates', 'ith_state', 'ith_ret','ith_std', 'low_ci', 'high_ci', 'current_return',
'last_mean_class_0', 'last_mean_class_1', 'last_std_class_0', 'last_std_class_1',
'last_prob_class_0', 'last_prob_class_1', 'avg_class_0_mean', 'avg_class_1_mean',
'avg_class_0_std', 'avg_class_1_std']
pred_df = make_final_pred_df(pred_rows, cols, self.THRES, sym)
if pred_df.iloc[-1].loc['buys']==1:
self._longs.append(sym_obj)
self.Debug('>>> BUY SIGNAL GENERATED for {} (Prob > {}) <<<'.format(sym, self.THRES))
else:
self.Debug('missing or invested in {}'.format(sym))
except Exception as e:
self.Debug('{} error: {}'.format(sym, e))
continue
self.Debug('Final Longs List for next order run: {}'.format([s.Value for s in self._longs]))
self.time_to_run_main_algo = time.time() - start_time
self.Plot(self.splotName, 'Time', self.time_to_run_main_algo)
return
def send_orders(self):
self.Log('\n'+'-'*77+'\n[{}] checking buy sell arrays to send orders...'.format(self.UtcTime))
if self._longs:
for sym_obj in self._longs:
if not self.Portfolio[sym_obj].Invested:
# Use MarketOrder for execution 30 minutes after open
self.Log('[{}] SENDING MARKET ORDER for {}...'.format(self.UtcTime, sym_obj.Value))
self.MarketOrder(sym_obj, self.CalculateOrderQuantity(sym_obj, self.BET_SIZE))
else:
self.Debug('Skipping {} - Already invested.'.format(sym_obj.Value))
else:
self.Log('send_orders >> no longs listed, no orders sent...')
return
def OnData(self, data):
pass
def CHART_RAM(self):
self.Plot(self.splotName,'RAM', OS.ApplicationMemoryUsed/1024.)
P = self.Portfolio
if P.TotalPortfolioValue != 0:
self.track_account_leverage = P.TotalAbsoluteHoldingsCost/P.TotalPortfolioValue
self.Plot(self.splotName, 'Leverage', float(self.track_account_leverage))
self.Plot(self.splotName, 'Cash', float(self.Portfolio.Cash))
return# The explicit AddReference is often not strictly needed but is kept for compatibility.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Brokerages import BrokerageName
from QuantConnect import AccountType
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
from AlgorithmImports import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
from sklearn import mixture as mix
# --- Define the problematic constant globally (Value of OrderEventStatus.Filled)
ORDER_STATUS_FILLED = 3
# ------------------------------------------------------------------------------
# --- ALGO UTILS FUNCTIONS (INTEGRATED) ---
# ------------------------------------------------------------------------------
def how_many_days(current_date, most_recent_date):
"""Calculate the number of days to request history."""
return int((current_date - most_recent_date).days) + 1
def zero_days_to_request(days_to_request):
"""Check if the history data is up to date."""
return days_to_request < 1
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes for updates"""
both = pd.concat([old, new])
return both.drop_duplicates().sort_index().iloc[-lookback:]
def make_returns(df):
"""Calculate log returns for a price DataFrame."""
return np.log(df / df.shift(1))
def make_gmm(n_components, random_state):
"""Factory function for GaussianMixtureModel."""
return mix.GaussianMixture(n_components=n_components, random_state=random_state)
def make_final_pred_df(pred_rows, cols, thres, sym):
"""Process prediction rows into a DataFrame and apply trading logic."""
df = pd.DataFrame(pred_rows, columns=cols)
df['symbol'] = sym
# Determine which class is the 'high-return' state by comparing means
high_mean_class = 1 if df['last_mean_class_1'].iloc[0] > df['last_mean_class_0'].iloc[0] else 0
if high_mean_class == 1:
# Check if the probability of the high-return state (Class 1) > threshold
df['buys'] = np.where(df['last_prob_class_1'] > thres, 1, 0)
else:
# Check if the probability of the high-return state (Class 0) > threshold
df['buys'] = np.where(df['last_prob_class_0'] > thres, 1, 0)
return df.tail(1)
# --- Other Utilities (Remaining functions from algo_utils, unused by main logic) ---
def calc_quantile_var(data, alpha=0.05): return data.quantile(alpha)
def calc_historical_var(data, alpha=0.05):
if isinstance(data, pd.DataFrame): data = data.squeeze()
return calc_quantile_var(data, alpha=alpha)
def get_open_order_secs(open_orders):
if open_orders: return [order.Symbol for order in open_orders]
return []
# ------------------------------------------------------------------------------
# init parameter registry
# ------------------------------------------------------------------------------
PARAMETER_REGISTRY = {}
def register_param(name, value):
PARAMETER_REGISTRY[name] = value
return value
# -----------------------------------------------------------------------------
# algorithm class
# -----------------------------------------------------------------------------
class TradingWithGMM(QCAlgorithm):
def Initialize(self):
self.INIT_PORTFOLIO_CASH = register_param('portfolio starting cash', 100000)
self.SetStartDate(2007,4,10)
self.SetEndDate(2025, 10, 13) # Set End Date
self.SetCash(self.INIT_PORTFOLIO_CASH)
#self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage,
# AccountType.Margin)
self.set_brokerage_model(BrokerageName.ALPACA, AccountType.Margin)
# Correct Symbol Handling
self.BASE_SYMBOL_TICKER = register_param('base symbol for algorithm management: ', 'SPY')
self.tickers = [self.BASE_SYMBOL_TICKER, "QQQ", "DIA", "TLT", "GLD", "EFA", "EEM", "BND", "VNQ"]
self.symbols = []
for ticker in self.tickers:
security = self.AddEquity(ticker, Resolution.Minute)
self.symbols.append(security.Symbol)
self.BASE_SYMBOL = self.Securities[self.BASE_SYMBOL_TICKER].Symbol
self.exchange = self.Securities[self.BASE_SYMBOL].Exchange
self.openMarketOnOpenOrders = []
self._init_prices = False
self._longs = list()
self._shorts = list()
register_param('symbols: ', [s.Value for s in self.symbols])
self._holding_period = register_param('holding period (days)', 30)
self.LOOKBACK = register_param('historical lookback (days)', 252*3)
self.BET_SIZE = register_param('bet size (%)', 1/len(self.symbols))
self.RANDOM_STATE = register_param('random_state', 777)
self.ALPHA = register_param('gmm alpha', 0.95)
self.N_COMPONENTS = register_param('gmm n components', 2)
self.THRES = register_param('threshold probability for buy signal', 0.9) # Reverted to 0.9
self.SAMPLES = register_param('number of samples for bootstrap', 1000)
self.HISTORY_RESOLUTION = Resolution.Daily
register_param('history api resolution', str(self.HISTORY_RESOLUTION))
# Charting setup remains the same
self.splotName = 'Strategy Info'
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series('RAM', SeriesType.Line, 0))
sPlot.AddSeries(Series('Time', SeriesType.Line, 1))
sPlot.AddSeries(Series('Cash', SeriesType.Line, 2))
sPlot.AddSeries(Series('Leverage', SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# Scheduled functions
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 5), Action(self.init_prices))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 10), Action(self.run_main_algo))
# Changed to MarketOrder for execution at T+30 min (see send_orders)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 30), Action(self.send_orders))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 35), Action(self.check_liquidate))
self.Schedule.On(self.DateRules.EveryDay(self.BASE_SYMBOL), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 40), Action(self.CHART_RAM))
self.Debug('\n'+'-'*77+'\nPARAMETER REGISTRY\n{}...'.format(json.dumps(PARAMETER_REGISTRY, indent=2)))
def init_prices(self):
if not self.symbols: self.Log('no symbols'); return
if self._init_prices: return
self.prices = (self.History(self.symbols, self.LOOKBACK, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self._init_prices=True
def update_prices(self):
most_recent_date = self.prices.index.max()
current_date = self.Time
days_to_request = how_many_days(current_date, most_recent_date)
if zero_days_to_request(days_to_request): return
new_prices = (self.History(self.symbols, days_to_request, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self.prices = make_update_df(self.prices, new_prices, self.LOOKBACK)
return
def check_liquidate(self):
"""
Robust liquidation check using GetOrderTickets() and manual filtering.
FIX: Uses the integer value of OrderEventStatus.Filled (which is 3).
"""
self.Log('\n'+'-'*77+'\n[{}] checking liquidation status...'.format(self.UtcTime))
# Retrieve ALL order tickets once
all_tickets = self.Transactions.GetOrderTickets()
for holding in self.Portfolio.Values:
if not holding.Invested:
continue
# Filter all tickets to find those matching the current holding's Symbol
symbol_tickets = [t for t in all_tickets if t.Symbol == holding.Symbol]
latest_fill_event = None
# Find the latest FILL event that was a BUY order
for ticket in symbol_tickets:
# FIX: Use the raw integer value of the status (OrderEventStatus.Filled == 3)
fill_events = [e for e in ticket.OrderEvents if e.Status == ORDER_STATUS_FILLED and e.FillQuantity > 0]
if fill_events:
# Find the latest fill event across all tickets
current_latest_fill = max(fill_events, key=lambda x: x.UtcTime)
if latest_fill_event is None or current_latest_fill.UtcTime > latest_fill_event.UtcTime:
latest_fill_event = current_latest_fill
if latest_fill_event:
entry_time = latest_fill_event.UtcTime
# Check if the current time is past the entry time + holding period
if self.UtcTime >= (entry_time + timedelta(self._holding_period)):
self.Liquidate(holding.Symbol)
fmt_args = (self.UtcTime, holding.Symbol.Value, entry_time, self.UtcTime - entry_time)
self.Log('[{}] liquidating... {}, order date: {}, time delta: {}'.format(*fmt_args))
return
def run_main_algo(self):
self.Log('\n'+'-'*77+'\n[{}] Begin main algorithm computation...'.format(self.UtcTime))
start_time = time.time()
self.update_prices()
self._algo_data = False
self._longs = list()
self._shorts = list()
for sym_obj in self.symbols:
sym = sym_obj.Value
try:
self.Log('checking symbol: {}'.format(str(sym)))
pred_rows = list()
# Check if we should perform the GMM calculation
if (not self.Portfolio[sym_obj].Invested):
if sym not in self.prices.columns: continue
train_px = self.prices.copy()
train_ts = make_returns(train_px)[sym].dropna()
train_ts = train_ts[np.isfinite(train_ts)]
if train_ts.shape[0] < 50: continue
tmp_X_train = train_ts.values.reshape(-1, 1)
### fit GMM ###
gmm = make_gmm(n_components=self.N_COMPONENTS, random_state=self.RANDOM_STATE).fit(tmp_X_train)
hidden_states = gmm.predict(tmp_X_train)
hidden_state_prob = pd.DataFrame(gmm.predict_proba(tmp_X_train), columns=['s1','s2'], index=train_ts.index)
state_df = train_ts.to_frame()
hs_prob_df = (pd.concat([state_df, hidden_state_prob],axis=1))
# Variables are defined here:
s1_mu = hs_prob_df.query('abs(s1)>0.5')[sym].mean()
s2_mu = hs_prob_df.query('abs(s2)>0.5')[sym].mean()
s1_std = hs_prob_df.query('abs(s1)>0.5')[sym].std()
s2_std = hs_prob_df.query('abs(s2)>0.5')[sym].std()
# GMM state and interval calculation
last_state = hidden_states[-1]
last_mean = gmm.means_[last_state][0]
last_var = np.diag(gmm.covariances_[last_state])[0]
rvs = gmm.sample(self.SAMPLES)[0]
# Corrected argument name for scipy.stats compatibility
low_ci, high_ci = stats.norm.interval(confidence=self.ALPHA, loc=np.mean(rvs), scale=np.std(rvs))
tmp_ret = np.log(float(self.Securities[sym_obj].Price) / train_px[sym].iloc[-1])
### Row creation is now safely inside the calculation block
row = (train_ts.index[-1], last_state, last_mean, np.sqrt(last_var),
low_ci, high_ci, tmp_ret,
gmm.means_.ravel()[0], gmm.means_.ravel()[1],
np.sqrt(np.diag(gmm.covariances_[0]))[0], np.sqrt(np.diag(gmm.covariances_[1]))[0],
hidden_state_prob.iloc[-1][0], hidden_state_prob.iloc[-1][1],
s1_mu,s2_mu,s1_std,s2_std)
pred_rows.append(row)
self.Debug('{} rowzz:\n{}'.format(str(sym), row))
if pred_rows:
cols = ['Dates', 'ith_state', 'ith_ret','ith_std', 'low_ci', 'high_ci', 'current_return',
'last_mean_class_0', 'last_mean_class_1', 'last_std_class_0', 'last_std_class_1',
'last_prob_class_0', 'last_prob_class_1', 'avg_class_0_mean', 'avg_class_1_mean',
'avg_class_0_std', 'avg_class_1_std']
pred_df = make_final_pred_df(pred_rows, cols, self.THRES, sym)
if pred_df.iloc[-1].loc['buys']==1:
self._longs.append(sym_obj)
self.Debug('>>> BUY SIGNAL GENERATED for {} (Prob > {}) <<<'.format(sym, self.THRES))
else:
self.Debug('missing or invested in {}'.format(sym))
except Exception as e:
self.Debug('{} error: {}'.format(sym, e))
continue
self.Debug('Final Longs List for next order run: {}'.format([s.Value for s in self._longs]))
self.time_to_run_main_algo = time.time() - start_time
self.Plot(self.splotName, 'Time', self.time_to_run_main_algo)
return
def send_orders(self):
self.Log('\n'+'-'*77+'\n[{}] checking buy sell arrays to send orders...'.format(self.UtcTime))
if self._longs:
for sym_obj in self._longs:
if not self.Portfolio[sym_obj].Invested:
# Use MarketOrder for execution 30 minutes after open
self.Log('[{}] SENDING MARKET ORDER for {}...'.format(self.UtcTime, sym_obj.Value))
self.MarketOrder(sym_obj, self.CalculateOrderQuantity(sym_obj, self.BET_SIZE))
else:
self.Debug('Skipping {} - Already invested.'.format(sym_obj.Value))
else:
self.Log('send_orders >> no longs listed, no orders sent...')
return
def OnData(self, data):
pass
def CHART_RAM(self):
self.Plot(self.splotName,'RAM', OS.ApplicationMemoryUsed/1024.)
P = self.Portfolio
if P.TotalPortfolioValue != 0:
self.track_account_leverage = P.TotalAbsoluteHoldingsCost/P.TotalPortfolioValue
self.Plot(self.splotName, 'Leverage', float(self.track_account_leverage))
self.Plot(self.splotName, 'Cash', float(self.Portfolio.Cash))
return
# region imports
from AlgorithmImports import *
# endregion
# The explicit AddReference is often not strictly needed but is kept for compatibility.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
# Explicitly import necessary enums from their correct locations
from QuantConnect.Brokerages import BrokerageName
from QuantConnect import AccountType
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
from sklearn import mixture as mix
# --- Define the problematic constant globally (Value of OrderEventStatus.Filled = 3)
ORDER_STATUS_FILLED = 3
# ------------------------------------------------------------------------------
# --- ALGO UTILS FUNCTIONS (INTEGRATED) ---
# ------------------------------------------------------------------------------
def how_many_days(current_date, most_recent_date):
"""Calculate the number of days to request history."""
return int((current_date - most_recent_date).days) + 1
def zero_days_to_request(days_to_request):
"""Check if the history data is up to date."""
return days_to_request < 1
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes for updates"""
# Ensure 'new' is a DataFrame if it came from a multi-column History request
if isinstance(new, pd.DataFrame) and 'close' in new.columns.names:
new = new['close'].unstack(level=0)
both = pd.concat([old, new])
return both.drop_duplicates().sort_index().iloc[-lookback:]
# Placeholder for functions not needed in this benchmark, but required for structural completeness
def make_returns(df):
"""Calculate log returns for a price DataFrame (Placeholder)."""
return np.log(df / df.shift(1))
# -----------------------------------------------------------------------------
# algorithm class
# -----------------------------------------------------------------------------
class EqualWeightBenchmark(QCAlgorithm):
"""
Equal Weight Benchmark Strategy, adjusted for current QuantConnect standards.
"""
def Initialize(self):
"""Initial algorithm settings"""
self.INIT_PORTFOLIO_CASH = 100000
self.SetStartDate(2007, 10, 1) # Set Start Date
self.SetEndDate(2025, 10, 13) # Set End Date
self.SetCash(self.INIT_PORTFOLIO_CASH) # Set Strategy Cash
# init brokerage model
#self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage,
# AccountType.Margin)
self.set_brokerage_model(BrokerageName.ALPACA, AccountType.Margin)
# init custom universe (FIXED to use Symbol objects)
self.BASE_SYMBOL_TICKER = "SPY"
self.ticker_strings = [
self.BASE_SYMBOL_TICKER, "QQQ", "DIA", "TLT", "GLD",
"EFA", "EEM", "BND", "VNQ",
]
self.symbols = [] # This list will hold Symbol objects
for ticker in self.ticker_strings:
security = self.AddEquity(ticker, Resolution.Minute)
self.symbols.append(security.Symbol)
# Base symbol must be the Symbol object for scheduling
self.BASE_SYMBOL = self.Securities[self.BASE_SYMBOL_TICKER].Symbol
# Algo Exchange Settings
self.exchange = self.Securities[self.BASE_SYMBOL].Exchange
# other algo parameter settings
self._init_prices = False
self.LOOKBACK = 252 # trading days
self.LEVERAGE = 1.0
# Use the number of tickers for calculation
self.BET_SIZE = 1 / len(self.ticker_strings) * self.LEVERAGE
self.TOLERANCE = 0.025
self.RANDOM_STATE = 7
## set resolution for historical data calls
self.HISTORY_RESOLUTION = Resolution.Daily
# track RAM and computation time for main func, also leverage and cash
self.splotName = "Strategy Info"
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series("RAM", SeriesType.Line, 0))
sPlot.AddSeries(Series("Time", SeriesType.Line, 1))
sPlot.AddSeries(Series("Cash", SeriesType.Line, 2))
sPlot.AddSeries(Series("Leverage", SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# track portfolio weights by symbol
self.splotName3 = "Security Weights Info"
sPlot3 = Chart(self.splotName3)
# Use ticker strings for chart series names
for i, sec in enumerate(self.ticker_strings):
sPlot3.AddSeries(Series(sec, SeriesType.Line, i))
self.AddChart(sPlot3)
# scheduled functions (Using BASE_SYMBOL object)
self.Schedule.On(
self.DateRules.EveryDay(self.BASE_SYMBOL),
self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 5),
Action(self.init_prices),
)
self.Schedule.On(
self.DateRules.MonthStart(self.BASE_SYMBOL),
self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 10),
Action(self.rebalance),
)
self.Schedule.On(
self.DateRules.EveryDay(self.BASE_SYMBOL),
self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 40),
Action(self.CHART_RAM),
)
self.Schedule.On(
self.DateRules.EveryDay(self.BASE_SYMBOL),
self.TimeRules.BeforeMarketClose(self.BASE_SYMBOL, 70),
Action(self.CHART_SECURITY_WEIGHTS),
)
def init_prices(self):
"""
Initialize historical prices.
"""
if not self.symbols:
self.Log("no symbols")
return
if self._init_prices:
return
# Pass list of Symbol objects for History API
self.prices = (
self.History(self.symbols, self.LOOKBACK, self.HISTORY_RESOLUTION)["close"]
.unstack(level=0)
.astype(np.float32)
)
self._init_prices = True
return
def update_prices(self):
"""
Update prices efficiently using integrated utility functions.
"""
# get last date of stored prices
most_recent_date = self.prices.index.max()
current_date = self.Time
# how many periods do we need (using integrated logic)
days_to_request = how_many_days(current_date, most_recent_date)
# if prices up to date return (using integrated logic)
if zero_days_to_request(days_to_request):
return
# get new data (using Symbol objects)
new_prices = self.History(
self.symbols, days_to_request, self.HISTORY_RESOLUTION
)
if "close" in new_prices.columns:
# unstack is handled inside make_update_df for clean code
new_prices = new_prices["close"].unstack(level=0).astype(np.float32)
else:
return
# combine datasets using integrated utility
self.prices = make_update_df(self.prices, new_prices, self.LOOKBACK)
return
def check_current_weight(self, symbol):
"""
Check symbol's current weight.
Accepts Symbol object.
"""
P = self.Portfolio
if P.TotalPortfolioValue == 0:
return 0.0
# Get the security object using the Symbol object
security = self.Securities[symbol]
# Access HoldingsValue via the Security object
current_weight = float(security.Holdings.HoldingsValue) / float(P.TotalPortfolioValue)
return current_weight
def rebalance(self):
"""Run main algorithm"""
self.Log(
"\n"
+ "-" * 77
+ "\n[{}] Begin main algorithm computation...".format(self.UtcTime)
)
start_time = time.time() # timer
self.update_prices() # update prices
# Iterate over Symbol objects
for sym_obj in self.symbols:
# get current weights
current_weight = self.check_current_weight(sym_obj)
# if current weights outside of tolerance send new orders
tol = self.TOLERANCE * self.BET_SIZE
lower_bound = self.BET_SIZE - tol
upper_bound = self.BET_SIZE + tol
if (current_weight < lower_bound) or (current_weight > upper_bound):
self.SetHoldings(sym_obj, self.BET_SIZE)
## end timer
self.time_to_run_main_algo = time.time() - start_time
self.Plot(self.splotName, "Time", self.time_to_run_main_algo)
return
def OnData(self, data):
"""OnData event is the primary entry point for your algorithm.
Each new data point will be pumped in here."""
pass
def CHART_RAM(self):
# Once a day or something reasonable to prevent spam
self.Plot(self.splotName, "RAM", OS.ApplicationMemoryUsed / 1024.0)
P = self.Portfolio
if P.TotalPortfolioValue != 0:
self.track_account_leverage = (
P.TotalAbsoluteHoldingsCost / P.TotalPortfolioValue
)
self.Plot(self.splotName, "Leverage", float(self.track_account_leverage))
self.Plot(self.splotName, "Cash", float(self.Portfolio.Cash))
return
def CHART_SECURITY_WEIGHTS(self):
"""Plots the current weight of each security."""
P = self.Portfolio
# Iterate over Symbol objects
for sym_obj in self.symbols:
# FIX: Access the Security object first
security = self.Securities[sym_obj]
# Ensure TotalPortfolioValue is not zero before division
weight = 0.0
if P.TotalPortfolioValue != 0:
# Access HoldingsValue via the Security object
weight = float(security.Holdings.HoldingsValue) / float(P.TotalPortfolioValue) * 100
# Plot using the ticker string (Symbol.Value) as the series name
self.Plot(self.splotName3, sym_obj.Value, weight)
return# The explicit AddReference is often not strictly needed but is kept for compatibility.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Brokerages import BrokerageName
from QuantConnect import AccountType
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
from sklearn import mixture as mix
# --- Define the problematic constant globally (Value of OrderEventStatus.Filled = 3)
ORDER_STATUS_FILLED = 3
# ------------------------------------------------------------------------------
# --- ALGO UTILS FUNCTIONS (INTEGRATED) ---
# ------------------------------------------------------------------------------
def how_many_days(current_date, most_recent_date):
"""Calculate the number of days to request history."""
return int((current_date - most_recent_date).days) + 1
def zero_days_to_request(days_to_request):
"""Check if the history data is up to date."""
return days_to_request < 1
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes for updates"""
# Ensures no errors when combining/unstacking history data
if isinstance(new, pd.DataFrame) and 'close' in new.columns.names:
new = new['close'].unstack(level=0)
both = pd.concat([old, new])
return both.drop_duplicates().sort_index().iloc[-lookback:]
def make_returns(df):
"""Calculate log returns for a price DataFrame."""
return np.log(df / df.shift(1))
def make_gmm(n_components, random_state):
"""Factory function for GaussianMixtureModel."""
return mix.GaussianMixture(n_components=n_components, random_state=random_state)
def make_final_pred_df(pred_rows, cols, thres, sym):
"""Process prediction rows into a DataFrame and apply trading logic."""
df = pd.DataFrame(pred_rows, columns=cols)
df['symbol'] = sym
# Determine which class is the 'high-return' state by comparing means
high_mean_class = 1 if df['last_mean_class_1'].iloc[0] > df['last_mean_class_0'].iloc[0] else 0
if high_mean_class == 1:
# Check if the probability of the high-return state (Class 1) > threshold
df['buys'] = np.where(df['last_prob_class_1'] > thres, 1, 0)
else:
# Check if the probability of the high-return state (Class 0) > threshold
df['buys'] = np.where(df['last_prob_class_0'] > thres, 1, 0)
return df.tail(1)
# --- Other Utilities (Remaining functions from algo_utils, unused by main logic) ---
def calc_quantile_var(data, alpha=0.05): return data.quantile(alpha)
def calc_historical_var(data, alpha=0.05):
if isinstance(data, pd.DataFrame): data = data.squeeze()
return calc_quantile_var(data, alpha=alpha)
def get_open_order_secs(open_orders):
if open_orders: return [order.Symbol for order in open_orders]
return []
# ------------------------------------------------------------------------------
# init parameter registry
# ------------------------------------------------------------------------------
PARAMETER_REGISTRY = {}
def register_param(name, value):
PARAMETER_REGISTRY[name] = value
return value
# -----------------------------------------------------------------------------
# algorithm class
# -----------------------------------------------------------------------------
class TradingWithGMM(QCAlgorithm):
def Initialize(self):
self.INIT_PORTFOLIO_CASH = register_param('portfolio starting cash', 100000)
self.SetStartDate(2007,4,10)
self.SetEndDate(2025, 10, 13) # Set End Date
self.SetCash(self.INIT_PORTFOLIO_CASH)
self.SetBrokerageModel(BrokerageName.ALPACA, AccountType.Margin)
# Correct Symbol Handling
self.BASE_SYMBOL_TICKER = register_param('base symbol for algorithm management: ', 'SPY')
self.tickers = ["SPY", "QQQ", "DIA", "TLT", "GLD", "EFA", "EEM", "BND", "VNQ"]
self.symbols = []
for ticker in self.tickers:
security = self.AddEquity(ticker, Resolution.Minute)
self.symbols.append(security.Symbol)
self.BASE_SYMBOL = self.Securities[self.BASE_SYMBOL_TICKER].Symbol
self.exchange = self.Securities[self.BASE_SYMBOL].Exchange
self.openMarketOnOpenOrders = []
self._init_prices = False
self._longs = list()
self._shorts = list()
register_param('symbols: ', [s.Value for s in self.symbols])
self._holding_period = register_param('holding period (days)', 30)
# MODIFICATION 1: Set lookback to 252 days
self.LOOKBACK = register_param('historical lookback (days)', 252)
self.BET_SIZE = register_param('bet size (%)', 1/len(self.symbols))
self.RANDOM_STATE = register_param('random_state', 777)
self.ALPHA = register_param('gmm alpha', 0.95)
# MODIFICATION 2: Set GMM components to 2 (kept from previous instruction)
self.N_COMPONENTS = register_param('gmm n components', 2)
self.THRES = register_param('threshold probability for buy signal', 0.9)
self.SAMPLES = register_param('number of samples for bootstrap', 1000)
self.HISTORY_RESOLUTION = Resolution.Daily
register_param('history api resolution', str(self.HISTORY_RESOLUTION))
# Charting setup remains the same
self.splotName = 'Strategy Info'
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series('RAM', SeriesType.Line, 0))
sPlot.AddSeries(Series('Time', SeriesType.Line, 1))
sPlot.AddSeries(Series('Cash', SeriesType.Line, 2))
sPlot.AddSeries(Series('Leverage', SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# Scheduled functions
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 5), Action(self.init_prices))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 10), Action(self.run_main_algo))
# Changed to MarketOrder for execution at T+30 min (see send_orders)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 30), Action(self.send_orders))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 35), Action(self.check_liquidate))
self.Schedule.On(self.DateRules.EveryDay(self.BASE_SYMBOL), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 40), Action(self.CHART_RAM))
self.Debug('\n'+'-'*77+'\nPARAMETER REGISTRY\n{}...'.format(json.dumps(PARAMETER_REGISTRY, indent=2)))
def init_prices(self):
if not self.symbols: self.Log('no symbols'); return
if self._init_prices: return
self.prices = (self.History(self.symbols, self.LOOKBACK, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self._init_prices=True
def update_prices(self):
most_recent_date = self.prices.index.max()
current_date = self.Time
days_to_request = how_many_days(current_date, most_recent_date)
if zero_days_to_request(days_to_request): return
new_prices = (self.History(self.symbols, days_to_request, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self.prices = make_update_df(self.prices, new_prices, self.LOOKBACK)
return
def check_liquidate(self):
"""
Robust liquidation check using GetOrderTickets() and manual filtering.
Uses the integer value of OrderEventStatus.Filled (which is 3).
"""
self.Log('\n'+'-'*77+'\n[{}] checking liquidation status...'.format(self.UtcTime))
# Retrieve ALL order tickets once
all_tickets = self.Transactions.GetOrderTickets()
for holding in self.Portfolio.Values:
if not holding.Invested:
continue
# Filter all tickets to find those matching the current holding's Symbol
symbol_tickets = [t for t in all_tickets if t.Symbol == holding.Symbol]
latest_fill_event = None
# Find the latest FILL event that was a BUY order
for ticket in symbol_tickets:
# Use the raw integer value of the status (OrderEventStatus.Filled == 3)
fill_events = [e for e in ticket.OrderEvents if e.Status == ORDER_STATUS_FILLED and e.FillQuantity > 0]
if fill_events:
# Find the latest fill event across all tickets
current_latest_fill = max(fill_events, key=lambda x: x.UtcTime)
if latest_fill_event is None or current_latest_fill.UtcTime > latest_fill_event.UtcTime:
latest_fill_event = current_latest_fill
if latest_fill_event:
entry_time = latest_fill_event.UtcTime
# Check if the current time is past the entry time + holding period
if self.UtcTime >= (entry_time + timedelta(self._holding_period)):
self.Liquidate(holding.Symbol)
fmt_args = (self.UtcTime, holding.Symbol.Value, entry_time, self.UtcTime - entry_time)
self.Log('[{}] liquidating... {}, order date: {}, time delta: {}'.format(*fmt_args))
return
def run_main_algo(self):
self.Log('\n'+'-'*77+'\n[{}] Begin main algorithm computation...'.format(self.UtcTime))
start_time = time.time()
self.update_prices()
self._algo_data = False
self._longs = list()
self._shorts = list()
for sym_obj in self.symbols:
sym = sym_obj.Value
try:
self.Log('checking symbol: {}'.format(str(sym)))
pred_rows = list()
# Check if we should perform the GMM calculation
if (not self.Portfolio[sym_obj].Invested):
if sym not in self.prices.columns: continue
train_px = self.prices.copy()
train_ts = make_returns(train_px)[sym].dropna()
train_ts = train_ts[np.isfinite(train_ts)]
if train_ts.shape[0] < self.N_COMPONENTS + 1:
self.Debug('{} train data has too few samples (<{})'.format(str(sym), self.N_COMPONENTS + 1))
continue
tmp_X_train = train_ts.values.reshape(-1, 1)
### fit GMM ###
gmm = make_gmm(n_components=self.N_COMPONENTS, random_state=self.RANDOM_STATE).fit(tmp_X_train)
hidden_states = gmm.predict(tmp_X_train)
hidden_state_prob = pd.DataFrame(gmm.predict_proba(tmp_X_train), columns=['s1','s2'], index=train_ts.index)
state_df = train_ts.to_frame()
hs_prob_df = (pd.concat([state_df, hidden_state_prob],axis=1))
# Variables are defined here:
s1_mu = hs_prob_df.query('abs(s1)>0.5')[sym].mean()
s2_mu = hs_prob_df.query('abs(s2)>0.5')[sym].mean()
s1_std = hs_prob_df.query('abs(s1)>0.5')[sym].std()
s2_std = hs_prob_df.query('abs(s2)>0.5')[sym].std()
# GMM state and interval calculation
last_state = hidden_states[-1]
last_mean = gmm.means_[last_state][0]
last_var = np.diag(gmm.covariances_[last_state])[0]
rvs = gmm.sample(self.SAMPLES)[0]
# Corrected argument name for scipy.stats compatibility
low_ci, high_ci = stats.norm.interval(confidence=self.ALPHA, loc=np.mean(rvs), scale=np.std(rvs))
tmp_ret = np.log(float(self.Securities[sym_obj].Price) / train_px[sym].iloc[-1])
### Row creation is now safely inside the calculation block
row = (train_ts.index[-1], last_state, last_mean, np.sqrt(last_var),
low_ci, high_ci, tmp_ret,
gmm.means_.ravel()[0], gmm.means_.ravel()[1],
np.sqrt(np.diag(gmm.covariances_[0]))[0], np.sqrt(np.diag(gmm.covariances_[1]))[0],
hidden_state_prob.iloc[-1][0], hidden_state_prob.iloc[-1][1],
s1_mu,s2_mu,s1_std,s2_std)
pred_rows.append(row)
self.Debug('{} rowzz:\n{}'.format(str(sym), row))
if pred_rows:
cols = ['Dates', 'ith_state', 'ith_ret','ith_std', 'low_ci', 'high_ci', 'current_return',
'last_mean_class_0', 'last_mean_class_1', 'last_std_class_0', 'last_std_class_1',
'last_prob_class_0', 'last_prob_class_1', 'avg_class_0_mean', 'avg_class_1_mean',
'avg_class_0_std', 'avg_class_1_std']
pred_df = make_final_pred_df(pred_rows, cols, self.THRES, sym)
if pred_df.iloc[-1].loc['buys']==1:
self._longs.append(sym_obj)
self.Debug('>>> BUY SIGNAL GENERATED for {} (Prob > {}) <<<'.format(sym, self.THRES))
else:
self.Debug('missing or invested in {}'.format(sym))
except Exception as e:
self.Debug('{} error: {}'.format(sym, e))
continue
self.Debug('Final Longs List for next order run: {}'.format([s.Value for s in self._longs]))
self.time_to_run_main_algo = time.time() - start_time
self.Plot(self.splotName, 'Time', self.time_to_run_main_algo)
return
def send_orders(self):
self.Log('\n'+'-'*77+'\n[{}] checking buy sell arrays to send orders...'.format(self.UtcTime))
if self._longs:
for sym_obj in self._longs:
if not self.Portfolio[sym_obj].Invested:
# Use MarketOrder for execution 30 minutes after open
self.Log('[{}] SENDING MARKET ORDER for {}...'.format(self.UtcTime, sym_obj.Value))
self.MarketOrder(sym_obj, self.CalculateOrderQuantity(sym_obj, self.BET_SIZE))
else:
self.Debug('Skipping {} - Already invested.'.format(sym_obj.Value))
else:
self.Log('send_orders >> no longs listed, no orders sent...')
return
def OnData(self, data):
pass
def CHART_RAM(self):
self.Plot(self.splotName,'RAM', OS.ApplicationMemoryUsed/1024.)
P = self.Portfolio
if P.TotalPortfolioValue != 0:
self.track_account_leverage = P.TotalAbsoluteHoldingsCost/P.TotalPortfolioValue
self.Plot(self.splotName, 'Leverage', float(self.track_account_leverage))
self.Plot(self.splotName, 'Cash', float(self.Portfolio.Cash))
return# The explicit AddReference is often not strictly needed but is kept for compatibility.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import * # Imports core enums like OrderEventStatus and AccountType
from QuantConnect.Brokerages import BrokerageName
from QuantConnect import AccountType
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
from sklearn import mixture as mix
# --- Define the problematic constant globally (Value of OrderEventStatus.Filled = 3)
ORDER_STATUS_FILLED = 3
# ------------------------------------------------------------------------------
# --- ALGO UTILS FUNCTIONS (INTEGRATED) ---
# ------------------------------------------------------------------------------
def how_many_days(current_date, most_recent_date):
"""Calculate the number of days to request history."""
return int((current_date - most_recent_date).days) + 1
def zero_days_to_request(days_to_request):
"""Check if the history data is up to date."""
return days_to_request < 1
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes for updates"""
# Ensures no errors when combining/unstacking history data
if isinstance(new, pd.DataFrame) and 'close' in new.columns.names:
new = new['close'].unstack(level=0)
both = pd.concat([old, new])
return both.drop_duplicates().sort_index().iloc[-lookback:]
def make_returns(df):
"""Calculate log returns for a price DataFrame."""
return np.log(df / df.shift(1))
def make_gmm(n_components, random_state):
"""Factory function for GaussianMixtureModel."""
return mix.GaussianMixture(n_components=n_components, random_state=random_state)
def make_final_pred_df(pred_rows, cols, thres, sym):
"""Process prediction rows into a DataFrame and apply trading logic."""
df = pd.DataFrame(pred_rows, columns=cols)
df['symbol'] = sym
# Determine which class is the 'high-return' state by comparing means
high_mean_class = 1 if df['last_mean_class_1'].iloc[0] > df['last_mean_class_0'].iloc[0] else 0
if high_mean_class == 1:
# Check if the probability of the high-return state (Class 1) > threshold
df['buys'] = np.where(df['last_prob_class_1'] > thres, 1, 0)
else:
# Check if the probability of the high-return state (Class 0) > threshold
df['buys'] = np.where(df['last_prob_class_0'] > thres, 1, 0)
return df.tail(1)
# --- Other Utilities (Remaining functions from algo_utils, unused by main logic) ---
def calc_quantile_var(data, alpha=0.05): return data.quantile(alpha)
def calc_historical_var(data, alpha=0.05):
if isinstance(data, pd.DataFrame): data = data.squeeze()
return calc_quantile_var(data, alpha=alpha)
def get_open_order_secs(open_orders):
if open_orders: return [order.Symbol for order in open_orders]
return []
# ------------------------------------------------------------------------------
# init parameter registry
# ------------------------------------------------------------------------------
PARAMETER_REGISTRY = {}
def register_param(name, value):
PARAMETER_REGISTRY[name] = value
return value
# -----------------------------------------------------------------------------
# algorithm class
# -----------------------------------------------------------------------------
class TradingWithGMM(QCAlgorithm):
def Initialize(self):
self.INIT_PORTFOLIO_CASH = register_param('portfolio starting cash', 100000)
self.SetStartDate(2007,4,10)
self.SetEndDate(2025, 10, 13) # Set End Date
self.SetCash(self.INIT_PORTFOLIO_CASH)
self.SetBrokerageModel(BrokerageName.ALPACA, AccountType.Margin)
# Correct Symbol Handling
self.BASE_SYMBOL_TICKER = register_param('base symbol for algorithm management: ', 'SPY')
self.tickers = ["SPY", "QQQ", "DIA", "TLT", "GLD", "EFA", "EEM", "BND", "VNQ"]
self.symbols = []
for ticker in self.tickers:
security = self.AddEquity(ticker, Resolution.Minute)
self.symbols.append(security.Symbol)
self.BASE_SYMBOL = self.Securities[self.BASE_SYMBOL_TICKER].Symbol
self.exchange = self.Securities[self.BASE_SYMBOL].Exchange
self.openMarketOnOpenOrders = []
self._init_prices = False
self._longs = list()
self._shorts = list()
register_param('symbols: ', [s.Value for s in self.symbols])
self._holding_period = register_param('holding period (days)', 30)
# MODIFICATION 1: Set lookback to 60 days
self.LOOKBACK = register_param('historical lookback (days)', 60)
self.BET_SIZE = register_param('bet size (%)', 1/len(self.symbols))
self.RANDOM_STATE = register_param('random_state', 777)
self.ALPHA = register_param('gmm alpha', 0.95)
# MODIFICATION 2: Set GMM components to 2
self.N_COMPONENTS = register_param('gmm n components', 2)
self.THRES = register_param('threshold probability for buy signal', 0.9)
self.SAMPLES = register_param('number of samples for bootstrap', 1000)
self.HISTORY_RESOLUTION = Resolution.Daily
register_param('history api resolution', str(self.HISTORY_RESOLUTION))
# Charting setup remains the same
self.splotName = 'Strategy Info'
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series('RAM', SeriesType.Line, 0))
sPlot.AddSeries(Series('Time', SeriesType.Line, 1))
sPlot.AddSeries(Series('Cash', SeriesType.Line, 2))
sPlot.AddSeries(Series('Leverage', SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# Scheduled functions
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 5), Action(self.init_prices))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 10), Action(self.run_main_algo))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 30), Action(self.send_orders))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 35), Action(self.check_liquidate))
self.Schedule.On(self.DateRules.EveryDay(self.BASE_SYMBOL), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 40), Action(self.CHART_RAM))
self.Debug('\n'+'-'*77+'\nPARAMETER REGISTRY\n{}...'.format(json.dumps(PARAMETER_REGISTRY, indent=2)))
def init_prices(self):
if not self.symbols: self.Log('no symbols'); return
if self._init_prices: return
self.prices = (self.History(self.symbols, self.LOOKBACK, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self._init_prices=True
def update_prices(self):
most_recent_date = self.prices.index.max()
current_date = self.Time
days_to_request = how_many_days(current_date, most_recent_date)
if zero_days_to_request(days_to_request): return
new_prices = (self.History(self.symbols, days_to_request, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self.prices = make_update_df(self.prices, new_prices, self.LOOKBACK)
return
def check_liquidate(self):
"""
Robust liquidation check using GetOrderTickets() and manual filtering.
Uses the integer value of OrderEventStatus.Filled (which is 3).
"""
self.Log('\n'+'-'*77+'\n[{}] checking liquidation status...'.format(self.UtcTime))
# Retrieve ALL order tickets once
all_tickets = self.Transactions.GetOrderTickets()
for holding in self.Portfolio.Values:
if not holding.Invested:
continue
# Filter all tickets to find those matching the current holding's Symbol
symbol_tickets = [t for t in all_tickets if t.Symbol == holding.Symbol]
latest_fill_event = None
# Find the latest FILL event that was a BUY order
for ticket in symbol_tickets:
# Use the raw integer value of the status (OrderEventStatus.Filled == 3)
fill_events = [e for e in ticket.OrderEvents if e.Status == ORDER_STATUS_FILLED and e.FillQuantity > 0]
if fill_events:
# Find the latest fill event across all tickets
current_latest_fill = max(fill_events, key=lambda x: x.UtcTime)
if latest_fill_event is None or current_latest_fill.UtcTime > latest_fill_event.UtcTime:
latest_fill_event = current_latest_fill
if latest_fill_event:
entry_time = latest_fill_event.UtcTime
# Check if the current time is past the entry time + holding period
if self.UtcTime >= (entry_time + timedelta(self._holding_period)):
self.Liquidate(holding.Symbol)
fmt_args = (self.UtcTime, holding.Symbol.Value, entry_time, self.UtcTime - entry_time)
self.Log('[{}] liquidating... {}, order date: {}, time delta: {}'.format(*fmt_args))
return
def run_main_algo(self):
self.Log('\n'+'-'*77+'\n[{}] Begin main algorithm computation...'.format(self.UtcTime))
start_time = time.time()
self.update_prices()
self._algo_data = False
self._longs = list()
self._shorts = list()
for sym_obj in self.symbols:
sym = sym_obj.Value
try:
self.Log('checking symbol: {}'.format(str(sym)))
pred_rows = list()
# Check if we should perform the GMM calculation
if (not self.Portfolio[sym_obj].Invested):
if sym not in self.prices.columns: continue
train_px = self.prices.copy()
train_ts = make_returns(train_px)[sym].dropna()
train_ts = train_ts[np.isfinite(train_ts)]
if train_ts.shape[0] < self.N_COMPONENTS + 1: # Ensure enough data points for GMM
self.Debug('{} train data has too few samples (<{})'.format(str(sym), self.N_COMPONENTS + 1))
continue
tmp_X_train = train_ts.values.reshape(-1, 1)
### fit GMM ###
gmm = make_gmm(n_components=self.N_COMPONENTS, random_state=self.RANDOM_STATE).fit(tmp_X_train)
hidden_states = gmm.predict(tmp_X_train)
hidden_state_prob = pd.DataFrame(gmm.predict_proba(tmp_X_train), columns=['s1','s2'], index=train_ts.index)
state_df = train_ts.to_frame()
hs_prob_df = (pd.concat([state_df, hidden_state_prob],axis=1))
# Variables are defined here:
s1_mu = hs_prob_df.query('abs(s1)>0.5')[sym].mean()
s2_mu = hs_prob_df.query('abs(s2)>0.5')[sym].mean()
s1_std = hs_prob_df.query('abs(s1)>0.5')[sym].std()
s2_std = hs_prob_df.query('abs(s2)>0.5')[sym].std()
# GMM state and interval calculation
last_state = hidden_states[-1]
last_mean = gmm.means_[last_state][0]
last_var = np.diag(gmm.covariances_[last_state])[0]
rvs = gmm.sample(self.SAMPLES)[0]
# Corrected argument name for scipy.stats compatibility
low_ci, high_ci = stats.norm.interval(confidence=self.ALPHA, loc=np.mean(rvs), scale=np.std(rvs))
tmp_ret = np.log(float(self.Securities[sym_obj].Price) / train_px[sym].iloc[-1])
### Row creation is now safely inside the calculation block
row = (train_ts.index[-1], last_state, last_mean, np.sqrt(last_var),
low_ci, high_ci, tmp_ret,
gmm.means_.ravel()[0], gmm.means_.ravel()[1],
np.sqrt(np.diag(gmm.covariances_[0]))[0], np.sqrt(np.diag(gmm.covariances_[1]))[0],
hidden_state_prob.iloc[-1][0], hidden_state_prob.iloc[-1][1],
s1_mu,s2_mu,s1_std,s2_std)
pred_rows.append(row)
self.Debug('{} rowzz:\n{}'.format(str(sym), row))
if pred_rows:
cols = ['Dates', 'ith_state', 'ith_ret','ith_std', 'low_ci', 'high_ci', 'current_return',
'last_mean_class_0', 'last_mean_class_1', 'last_std_class_0', 'last_std_class_1',
'last_prob_class_0', 'last_prob_class_1', 'avg_class_0_mean', 'avg_class_1_mean',
'avg_class_0_std', 'avg_class_1_std']
pred_df = make_final_pred_df(pred_rows, cols, self.THRES, sym)
if pred_df.iloc[-1].loc['buys']==1:
self._longs.append(sym_obj)
self.Debug('>>> BUY SIGNAL GENERATED for {} (Prob > {}) <<<'.format(sym, self.THRES))
else:
self.Debug('missing or invested in {}'.format(sym))
except Exception as e:
self.Debug('{} error: {}'.format(sym, e))
continue
self.Debug('Final Longs List for next order run: {}'.format([s.Value for s in self._longs]))
self.time_to_run_main_algo = time.time() - start_time
self.Plot(self.splotName, 'Time', self.time_to_run_main_algo)
return
def send_orders(self):
self.Log('\n'+'-'*77+'\n[{}] checking buy sell arrays to send orders...'.format(self.UtcTime))
if self._longs:
for sym_obj in self._longs:
if not self.Portfolio[sym_obj].Invested:
# Use MarketOrder for execution 30 minutes after open
self.Log('[{}] SENDING MARKET ORDER for {}...'.format(self.UtcTime, sym_obj.Value))
self.MarketOrder(sym_obj, self.CalculateOrderQuantity(sym_obj, self.BET_SIZE))
else:
self.Debug('Skipping {} - Already invested.'.format(sym_obj.Value))
else:
self.Log('send_orders >> no longs listed, no orders sent...')
return
def OnData(self, data):
pass
def CHART_RAM(self):
self.Plot(self.splotName,'RAM', OS.ApplicationMemoryUsed/1024.)
P = self.Portfolio
if P.TotalPortfolioValue != 0:
self.track_account_leverage = P.TotalAbsoluteHoldingsCost/P.TotalPortfolioValue
self.Plot(self.splotName, 'Leverage', float(self.track_account_leverage))
self.Plot(self.splotName, 'Cash', float(self.Portfolio.Cash))
return#region imports
from AlgorithmImports import *
#endregion
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
#######################################################################
# gmm functions
#######################################################################
def make_gmm(n_components=None, max_iter=150, random_state=None):
"""fn: create gmm object"""
model_kwds = dict(n_components=n_components,
max_iter=max_iter,
n_init=100,
init_params='random',
random_state=random_state)
gmm = mix.GaussianMixture(**model_kwds)
return gmm
def make_returns(df):
return np.log(df/df.shift(1)).dropna()
#######################################################################
# pred df functions
#######################################################################
def in_range(df):
"""fn: add binary column for predictions within CI"""
wins = df.query("low_ci < current_return < high_ci").index
in_list = [1 if i in wins else 0 for i in df.index]
df = df.assign(in_range=in_list)
return df
def get_state_prob(df):
state_prob = []
for row in df[['ith_state','last_prob_class_0', 'last_prob_class_1']].itertuples():
prob = pd.eval(f'row.last_prob_class_{row.ith_state}')
state_prob.append(prob)
return df.assign(state_prob=state_prob)
def get_outlier_direction(df):
""""""
direction = []
for row in df[['high_ci', 'current_return']].itertuples(index=False):
if row[-1] > row[0]: # actual_returns > high_ci
direction.append('too_high')
else:
direction.append('too_low')
df = df.assign(direction=direction)
return df
def buys(df, thres=0.5):
buys = []
for row in df.itertuples():
if (row.ith_state==0
and row.mu_diff>0
and row.in_range==1
and row.state_prob>thres
and row.direction=='too_low'):
buys.append(1)
elif (row.ith_state==1
and row.mu_diff <0
and row.in_range==1
and row.state_prob>thres
and row.direction=='too_low'):
buys.append(1)
else:
buys.append(0)
return df.assign(buys=buys)
def make_final_pred_df(pred_rows, cols, thres, sym):
pred_df = (pd.DataFrame(pred_rows, columns=cols)
.assign(mu_diff=lambda df: df.avg_class_0_mean-df.avg_class_1_mean)
.assign(std_diff=lambda df: df.avg_class_0_std-df.avg_class_1_std)
.pipe(in_range)
.pipe(get_state_prob)
.pipe(get_outlier_direction)
.pipe(buys, thres=thres)
.set_index('Dates')
.assign(Dates = lambda df: df.index))
return pred_df
#######################################################################
# updating historical timeseries dataframes
#######################################################################
def how_many_days(current_date, most_recent_date):
"""compute how many days to request from history api
# args: both are datetime objects
"""
return (current_date - most_recent_date).days
def zero_days_to_request(days_to_request):
"""check if days to request is equal to 0
if yes exit algorithm
"""
# request only days that are missing from our dataset
if days_to_request==0:
return True
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes
for updates
# args
old, new: pandas dataframes
lookback: numeric
# returns
both: combined dataframe
"""
# combine datasets
both = pd.concat([old, new])
# clean it up and keep only lookback period
return (both
.drop_duplicates()
.sort_index()
.iloc[-lookback:])
#######################################################################
# order execution functions
#######################################################################
def get_open_order_secs(open_orders):
"""func to return list of symbols
if open order list is populated
"""
if open_orders: # if list is populated
open_order_secs = [order.Symbol for order in open_orders]
else:
open_order_secs = []
return open_order_secs