| Overall Statistics |
|
Total Orders 869 Average Win 0.59% Average Loss -0.69% Compounding Annual Return 2.698% Drawdown 18.300% Expectancy 0.171 Start Equity 100000 End Equity 163714.23 Net Profit 63.714% Sharpe Ratio 0.009 Sortino Ratio 0.008 Probabilistic Sharpe Ratio 0.023% Loss Rate 37% Win Rate 63% Profit-Loss Ratio 0.85 Alpha -0.018 Beta 0.281 Annual Standard Deviation 0.063 Annual Variance 0.004 Information Ratio -0.522 Tracking Error 0.125 Treynor Ratio 0.002 Total Fees $0.00 Estimated Strategy Capacity $11000000.00 Lowest Capacity Asset TLT SGNKIKYGE9NP Portfolio Turnover 1.83% Drawdown Recovery 3119 |
#region imports
from AlgorithmImports import *
#endregion
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
#######################################################################
# gmm functions
#######################################################################
def make_gmm(n_components=None, max_iter=150, random_state=None):
"""fn: create gmm object"""
model_kwds = dict(n_components=n_components,
max_iter=max_iter,
n_init=100,
init_params='random',
random_state=random_state)
gmm = mix.GaussianMixture(**model_kwds)
return gmm
def make_returns(df):
return np.log(df/df.shift(1)).dropna()
#######################################################################
# pred df functions
#######################################################################
def in_range(df):
"""fn: add binary column for predictions within CI"""
wins = df.query("low_ci < current_return < high_ci").index
in_list = [1 if i in wins else 0 for i in df.index]
df = df.assign(in_range=in_list)
return df
def get_state_prob(df):
state_prob = []
for row in df[['ith_state','last_prob_class_0', 'last_prob_class_1']].itertuples():
prob = pd.eval(f'row.last_prob_class_{row.ith_state}')
state_prob.append(prob)
return df.assign(state_prob=state_prob)
def get_outlier_direction(df):
""""""
direction = []
for row in df[['high_ci', 'current_return']].itertuples(index=False):
if row[-1] > row[0]: # actual_returns > high_ci
direction.append('too_high')
else:
direction.append('too_low')
df = df.assign(direction=direction)
return df
def buys(df, thres=0.5):
buys = []
for row in df.itertuples():
if (row.ith_state==0
and row.mu_diff>0
and row.in_range==1
and row.state_prob>thres
and row.direction=='too_low'):
buys.append(1)
elif (row.ith_state==1
and row.mu_diff <0
and row.in_range==1
and row.state_prob>thres
and row.direction=='too_low'):
buys.append(1)
else:
buys.append(0)
return df.assign(buys=buys)
def make_final_pred_df(pred_rows, cols, thres, sym):
pred_df = (pd.DataFrame(pred_rows, columns=cols)
.assign(mu_diff=lambda df: df.avg_class_0_mean-df.avg_class_1_mean)
.assign(std_diff=lambda df: df.avg_class_0_std-df.avg_class_1_std)
.pipe(in_range)
.pipe(get_state_prob)
.pipe(get_outlier_direction)
.pipe(buys, thres=thres)
.set_index('Dates')
.assign(Dates = lambda df: df.index))
return pred_df
#######################################################################
# updating historical timeseries dataframes
#######################################################################
def how_many_days(current_date, most_recent_date):
"""compute how many days to request from history api
# args: both are datetime objects
"""
return (current_date - most_recent_date).days
def zero_days_to_request(days_to_request):
"""check if days to request is equal to 0
if yes exit algorithm
"""
# request only days that are missing from our dataset
if days_to_request==0:
return True
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes
for updates
# args
old, new: pandas dataframes
lookback: numeric
# returns
both: combined dataframe
"""
# combine datasets
both = pd.concat([old, new])
# clean it up and keep only lookback period
return (both
.drop_duplicates()
.sort_index()
.iloc[-lookback:])
#######################################################################
# order execution functions
#######################################################################
def get_open_order_secs(open_orders):
"""func to return list of symbols
if open order list is populated
"""
if open_orders: # if list is populated
open_order_secs = [order.Symbol for order in open_orders]
else:
open_order_secs = []
return open_order_secs
# The explicit AddReference is often not strictly needed but is kept for compatibility.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Brokerages import BrokerageName
from QuantConnect import AccountType
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
from AlgorithmImports import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import decimal as d
import json
from sklearn import mixture as mix
# --- Define the problematic constant globally (Value of OrderEventStatus.Filled)
ORDER_STATUS_FILLED = 3
# ------------------------------------------------------------------------------
# --- ALGO UTILS FUNCTIONS (INTEGRATED) ---
# ------------------------------------------------------------------------------
def how_many_days(current_date, most_recent_date):
"""Calculate the number of days to request history."""
return int((current_date - most_recent_date).days) + 1
def zero_days_to_request(days_to_request):
"""Check if the history data is up to date."""
return days_to_request < 1
def make_update_df(old, new, lookback):
"""combines and cleans numeric timeseries dataframes for updates"""
both = pd.concat([old, new])
return both.drop_duplicates().sort_index().iloc[-lookback:]
def make_returns(df):
"""Calculate log returns for a price DataFrame."""
return np.log(df / df.shift(1))
def make_gmm(n_components, random_state):
"""Factory function for GaussianMixtureModel."""
return mix.GaussianMixture(n_components=n_components, random_state=random_state)
def make_final_pred_df(pred_rows, cols, thres, sym):
"""Process prediction rows into a DataFrame and apply trading logic."""
df = pd.DataFrame(pred_rows, columns=cols)
df['symbol'] = sym
# Determine which class is the 'high-return' state by comparing means
high_mean_class = 1 if df['last_mean_class_1'].iloc[0] > df['last_mean_class_0'].iloc[0] else 0
if high_mean_class == 1:
# Check if the probability of the high-return state (Class 1) > threshold
df['buys'] = np.where(df['last_prob_class_1'] > thres, 1, 0)
else:
# Check if the probability of the high-return state (Class 0) > threshold
df['buys'] = np.where(df['last_prob_class_0'] > thres, 1, 0)
return df.tail(1)
# --- Other Utilities (Remaining functions from algo_utils, unused by main logic) ---
def calc_quantile_var(data, alpha=0.05): return data.quantile(alpha)
def calc_historical_var(data, alpha=0.05):
if isinstance(data, pd.DataFrame): data = data.squeeze()
return calc_quantile_var(data, alpha=alpha)
def get_open_order_secs(open_orders):
if open_orders: return [order.Symbol for order in open_orders]
return []
# ------------------------------------------------------------------------------
# init parameter registry
# ------------------------------------------------------------------------------
PARAMETER_REGISTRY = {}
def register_param(name, value):
PARAMETER_REGISTRY[name] = value
return value
# -----------------------------------------------------------------------------
# algorithm class
# -----------------------------------------------------------------------------
class TradingWithGMM(QCAlgorithm):
def Initialize(self):
self.INIT_PORTFOLIO_CASH = register_param('portfolio starting cash', 100000)
self.SetStartDate(2007,4,10)
#self.SetEndDate(2019,12,31)
self.SetCash(self.INIT_PORTFOLIO_CASH)
#self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage,
# AccountType.Margin)
self.set_brokerage_model(BrokerageName.ALPACA, AccountType.Margin)
# Correct Symbol Handling
self.BASE_SYMBOL_TICKER = register_param('base symbol for algorithm management: ', 'SPY')
self.tickers = [self.BASE_SYMBOL_TICKER, "QQQ", "DIA", "TLT", "GLD", "EFA", "EEM"]
self.symbols = []
for ticker in self.tickers:
security = self.AddEquity(ticker, Resolution.Minute)
self.symbols.append(security.Symbol)
self.BASE_SYMBOL = self.Securities[self.BASE_SYMBOL_TICKER].Symbol
self.exchange = self.Securities[self.BASE_SYMBOL].Exchange
self.openMarketOnOpenOrders = []
self._init_prices = False
self._longs = list()
self._shorts = list()
register_param('symbols: ', [s.Value for s in self.symbols])
self._holding_period = register_param('holding period (days)', 30)
self.LOOKBACK = register_param('historical lookback (days)', 252*3)
self.BET_SIZE = register_param('bet size (%)', 1/len(self.symbols))
self.RANDOM_STATE = register_param('random_state', 777)
self.ALPHA = register_param('gmm alpha', 0.95)
self.N_COMPONENTS = register_param('gmm n components', 2)
self.THRES = register_param('threshold probability for buy signal', 0.9) # Reverted to 0.9
self.SAMPLES = register_param('number of samples for bootstrap', 1000)
self.HISTORY_RESOLUTION = Resolution.Daily
register_param('history api resolution', str(self.HISTORY_RESOLUTION))
# Charting setup remains the same
self.splotName = 'Strategy Info'
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series('RAM', SeriesType.Line, 0))
sPlot.AddSeries(Series('Time', SeriesType.Line, 1))
sPlot.AddSeries(Series('Cash', SeriesType.Line, 2))
sPlot.AddSeries(Series('Leverage', SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# Scheduled functions
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 5), Action(self.init_prices))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 10), Action(self.run_main_algo))
# Changed to MarketOrder for execution at T+30 min (see send_orders)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 30), Action(self.send_orders))
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 35), Action(self.check_liquidate))
self.Schedule.On(self.DateRules.EveryDay(self.BASE_SYMBOL), self.TimeRules.AfterMarketOpen(self.BASE_SYMBOL, 40), Action(self.CHART_RAM))
self.Debug('\n'+'-'*77+'\nPARAMETER REGISTRY\n{}...'.format(json.dumps(PARAMETER_REGISTRY, indent=2)))
def init_prices(self):
if not self.symbols: self.Log('no symbols'); return
if self._init_prices: return
self.prices = (self.History(self.symbols, self.LOOKBACK, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self._init_prices=True
def update_prices(self):
most_recent_date = self.prices.index.max()
current_date = self.Time
days_to_request = how_many_days(current_date, most_recent_date)
if zero_days_to_request(days_to_request): return
new_prices = (self.History(self.symbols, days_to_request, self.HISTORY_RESOLUTION)["close"].unstack(level=0).astype(np.float32))
self.prices = make_update_df(self.prices, new_prices, self.LOOKBACK)
return
def check_liquidate(self):
"""
Robust liquidation check using GetOrderTickets() and manual filtering.
FIX: Uses the integer value of OrderEventStatus.Filled (which is 3).
"""
self.Log('\n'+'-'*77+'\n[{}] checking liquidation status...'.format(self.UtcTime))
# Retrieve ALL order tickets once
all_tickets = self.Transactions.GetOrderTickets()
for holding in self.Portfolio.Values:
if not holding.Invested:
continue
# Filter all tickets to find those matching the current holding's Symbol
symbol_tickets = [t for t in all_tickets if t.Symbol == holding.Symbol]
latest_fill_event = None
# Find the latest FILL event that was a BUY order
for ticket in symbol_tickets:
# FIX: Use the raw integer value of the status (OrderEventStatus.Filled == 3)
fill_events = [e for e in ticket.OrderEvents if e.Status == ORDER_STATUS_FILLED and e.FillQuantity > 0]
if fill_events:
# Find the latest fill event across all tickets
current_latest_fill = max(fill_events, key=lambda x: x.UtcTime)
if latest_fill_event is None or current_latest_fill.UtcTime > latest_fill_event.UtcTime:
latest_fill_event = current_latest_fill
if latest_fill_event:
entry_time = latest_fill_event.UtcTime
# Check if the current time is past the entry time + holding period
if self.UtcTime >= (entry_time + timedelta(self._holding_period)):
self.Liquidate(holding.Symbol)
fmt_args = (self.UtcTime, holding.Symbol.Value, entry_time, self.UtcTime - entry_time)
self.Log('[{}] liquidating... {}, order date: {}, time delta: {}'.format(*fmt_args))
return
def run_main_algo(self):
self.Log('\n'+'-'*77+'\n[{}] Begin main algorithm computation...'.format(self.UtcTime))
start_time = time.time()
self.update_prices()
self._algo_data = False
self._longs = list()
self._shorts = list()
for sym_obj in self.symbols:
sym = sym_obj.Value
try:
self.Log('checking symbol: {}'.format(str(sym)))
pred_rows = list()
# Check if we should perform the GMM calculation
if (not self.Portfolio[sym_obj].Invested):
if sym not in self.prices.columns: continue
train_px = self.prices.copy()
train_ts = make_returns(train_px)[sym].dropna()
train_ts = train_ts[np.isfinite(train_ts)]
if train_ts.shape[0] < 50: continue
tmp_X_train = train_ts.values.reshape(-1, 1)
### fit GMM ###
gmm = make_gmm(n_components=self.N_COMPONENTS, random_state=self.RANDOM_STATE).fit(tmp_X_train)
hidden_states = gmm.predict(tmp_X_train)
hidden_state_prob = pd.DataFrame(gmm.predict_proba(tmp_X_train), columns=['s1','s2'], index=train_ts.index)
state_df = train_ts.to_frame()
hs_prob_df = (pd.concat([state_df, hidden_state_prob],axis=1))
# Variables are defined here:
s1_mu = hs_prob_df.query('abs(s1)>0.5')[sym].mean()
s2_mu = hs_prob_df.query('abs(s2)>0.5')[sym].mean()
s1_std = hs_prob_df.query('abs(s1)>0.5')[sym].std()
s2_std = hs_prob_df.query('abs(s2)>0.5')[sym].std()
# GMM state and interval calculation
last_state = hidden_states[-1]
last_mean = gmm.means_[last_state][0]
last_var = np.diag(gmm.covariances_[last_state])[0]
rvs = gmm.sample(self.SAMPLES)[0]
# Corrected argument name for scipy.stats compatibility
low_ci, high_ci = stats.norm.interval(confidence=self.ALPHA, loc=np.mean(rvs), scale=np.std(rvs))
tmp_ret = np.log(float(self.Securities[sym_obj].Price) / train_px[sym].iloc[-1])
### Row creation is now safely inside the calculation block
row = (train_ts.index[-1], last_state, last_mean, np.sqrt(last_var),
low_ci, high_ci, tmp_ret,
gmm.means_.ravel()[0], gmm.means_.ravel()[1],
np.sqrt(np.diag(gmm.covariances_[0]))[0], np.sqrt(np.diag(gmm.covariances_[1]))[0],
hidden_state_prob.iloc[-1][0], hidden_state_prob.iloc[-1][1],
s1_mu,s2_mu,s1_std,s2_std)
pred_rows.append(row)
self.Debug('{} rowzz:\n{}'.format(str(sym), row))
if pred_rows:
cols = ['Dates', 'ith_state', 'ith_ret','ith_std', 'low_ci', 'high_ci', 'current_return',
'last_mean_class_0', 'last_mean_class_1', 'last_std_class_0', 'last_std_class_1',
'last_prob_class_0', 'last_prob_class_1', 'avg_class_0_mean', 'avg_class_1_mean',
'avg_class_0_std', 'avg_class_1_std']
pred_df = make_final_pred_df(pred_rows, cols, self.THRES, sym)
if pred_df.iloc[-1].loc['buys']==1:
self._longs.append(sym_obj)
self.Debug('>>> BUY SIGNAL GENERATED for {} (Prob > {}) <<<'.format(sym, self.THRES))
else:
self.Debug('missing or invested in {}'.format(sym))
except Exception as e:
self.Debug('{} error: {}'.format(sym, e))
continue
self.Debug('Final Longs List for next order run: {}'.format([s.Value for s in self._longs]))
self.time_to_run_main_algo = time.time() - start_time
self.Plot(self.splotName, 'Time', self.time_to_run_main_algo)
return
def send_orders(self):
self.Log('\n'+'-'*77+'\n[{}] checking buy sell arrays to send orders...'.format(self.UtcTime))
if self._longs:
for sym_obj in self._longs:
if not self.Portfolio[sym_obj].Invested:
# Use MarketOrder for execution 30 minutes after open
self.Log('[{}] SENDING MARKET ORDER for {}...'.format(self.UtcTime, sym_obj.Value))
self.MarketOrder(sym_obj, self.CalculateOrderQuantity(sym_obj, self.BET_SIZE))
else:
self.Debug('Skipping {} - Already invested.'.format(sym_obj.Value))
else:
self.Log('send_orders >> no longs listed, no orders sent...')
return
def OnData(self, data):
pass
def CHART_RAM(self):
self.Plot(self.splotName,'RAM', OS.ApplicationMemoryUsed/1024.)
P = self.Portfolio
if P.TotalPortfolioValue != 0:
self.track_account_leverage = P.TotalAbsoluteHoldingsCost/P.TotalPortfolioValue
self.Plot(self.splotName, 'Leverage', float(self.track_account_leverage))
self.Plot(self.splotName, 'Cash', float(self.Portfolio.Cash))
return