| Overall Statistics |
|
Total Orders 628 Average Win 0.30% Average Loss -0.28% Compounding Annual Return -0.645% Drawdown 9.400% Expectancy -0.028 Start Equity 10000000 End Equity 9728552.12 Net Profit -2.714% Sharpe Ratio -0.704 Sortino Ratio -0.461 Probabilistic Sharpe Ratio 0.308% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.06 Alpha -0.028 Beta 0.031 Annual Standard Deviation 0.037 Annual Variance 0.001 Information Ratio -0.638 Tracking Error 0.181 Treynor Ratio -0.833 Total Fees $69598.74 Estimated Strategy Capacity $59000000.00 Lowest Capacity Asset AAPL R735QTJ8XC9X Portfolio Turnover 12.10% |
from AlgorithmImports import *
from QuantConnect.Securities.Option import OptionPriceModels
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import poly1d, signal
from datetime import datetime, timedelta
import pytz as pytz
from sklearn.metrics import r2_score, mean_absolute_error
from hmmlearn import hmm
from scipy.signal import stft, gaussian, butter, filtfilt, detrend, medfilt, savgol_filter
from scipy.fft import fft
from scipy.interpolate import interp1d
from numpy import quantile
from inspect import signature
from statsmodels.nonparametric.smoothers_lowess import lowess
def SST_J(x, Fs, hlength=None, hop=1, n=None, hf=np.inf, lf=0, ths=1):
if hlength is None:
raise ValueError("Select a window length.")
if Fs is None:
raise ValueError("Select a sampling rate.")
# Window bandwidth
sigma = 0.15
# Do reassignment
squeeze_flag = True
# Organize input
x = np.array(x).flatten()
if np.any(np.isnan(x)):
x = interp1d(np.where(~np.isnan(x))[0], x[~np.isnan(x)], kind='pchip', fill_value="extrapolate")(np.arange(len(x)))
# Time (samples)
NN = len(x)
t = np.arange(1, NN + 1, hop)
tcol = len(t)
# N-point fft
n = int(n)
n = n + 1 - n % 2
N = 2 * (n - 1)
N = int(N)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex**2 / (2 * sigma**2))
dh = -ex / sigma**2 * h
# Perform convolution
tfr = np.zeros((N, tcol))
tfr2 = np.zeros((N, tcol))
for icol in range(tcol):
ti=t[icol]
tau = np.arange(-min(n - 1, Lh, ti - 1), min(n - 1, Lh, NN - ti) + 1)
indices = np.remainder(N + tau, N)
rSig = x[ti - 1 + tau]
tfr[indices, icol] = rSig * h[Lh + tau]
tfr2[indices, icol] = rSig * dh[Lh + tau]
# Fourier transform
tfr = fft(tfr, axis=0)
tfr = tfr[:n, :]
# Non-negative frequency axis
frequency = Fs / 2 * np.linspace(0, 1, n)
if squeeze_flag:
tfr2 = fft(tfr2, axis=0)
tfr2 = tfr2[:n, :]
# Crop output
u = frequency <= hf
tfr = tfr[u, :]
frequency = frequency[u]
if not squeeze_flag:
return tfr, tfr, frequency
# Crop
tfr2 = tfr2[u, :]
# Reassignment threshold
ths = quantile(np.abs(tfr), (1 - ths),axis=0)
# Omega operator
neta = len(frequency)
omega = -np.inf * np.ones((neta, tcol))
ups = np.abs(tfr) > ths
omega[ups] = np.round(N / hlength * np.imag(tfr2[ups] / tfr[ups] / (2 * np.pi)))
# Mapped out of range
index = np.tile(np.arange(1, neta + 1).reshape(-1, 1), (1, tcol))
omega = index - omega
id = (omega < 1) | (omega > neta) | ~ups
omega[id] = index[id]
sst = tfr
sst[id] = 0
# Reassignment
id = omega + neta * np.arange(tcol)
id_flat = np.transpose(id.ravel())
sst_flat = np.transpose(sst.ravel())
# Need real part and imaginary part also
temp_real = np.bincount(id_flat.astype(int), np.real(sst_flat), minlength=tcol * neta)
temp_imag = np.bincount(id_flat.astype(int), np.imag(sst_flat), minlength=tcol * neta)
# Reshape the result to match the desired shape [neta, tcol]
sst = np.zeros((neta,tcol),dtype=np.complex_)
if temp_real.size == tcol*neta:
sst = sst + np.transpose(np.reshape(temp_real[:], (tcol, neta)))
sst = sst + np.transpose(np.reshape(temp_imag[:], (tcol, neta)))*(0 + 1j)
else:
sst = sst + np.transpose(np.reshape(temp_real[:-1], (tcol, neta)))
sst = sst + np.transpose(np.reshape(temp_imag[:-1], (tcol, neta)))*(0 + 1j)
# Crop
# tfr[frequency < lf, :] = 0
# sst[frequency < lf, :] = 0
frequency = frequency[frequency >= lf]
return sst, tfr, frequency
def curve_ext_m(P, lambda_):
eps = 1e-8
E = P / np.sum(P)
E = -np.log(E + eps)
m, n = E.shape
FVal = np.full((m, n), np.inf)
FVal[0, :] = E[0, :]
c = np.zeros(m, dtype=int)
for ii in range(1, m): # time
for jj in range(n): # freq
# Calculate the penalty term
for kk in range(n):
FVal[ii, jj] = min(FVal[ii, jj], FVal[ii - 1, kk] + lambda_ * (kk - jj) ** 2)
# Add the SST value at time ii and freq jj
FVal[ii, jj] += E[ii, jj]
c[-1] = np.argmin(FVal[-1, :])
for ii in range(m - 2, -1, -1):
val = FVal[ii + 1, c[ii + 1]] - E[ii + 1, c[ii + 1]]
for kk in range(n):
if abs(val - FVal[ii, kk] - lambda_ * (kk - c[ii + 1]) ** 2) < eps:
c[ii] = kk
break
if c[ii] == 0:
c[ii] = n // 2
return c, FVal
def recon_sqSTFT_v2(tfrsq, tfrsqtic, Hz, c, Band, coeff):
alpha = tfrsqtic[1] - tfrsqtic[0]
RR = round(Band / (Hz * alpha))
recon = []
C = 2 * alpha / coeff
for kk in range(len(c)):
idx = range(max(0, c[kk] - RR), min(len(tfrsqtic), c[kk] + RR))
recon.append(C * np.sum(tfrsq[idx, kk]))
return np.array(recon)
def do_stuff(sig, std_bool, smooth_bool, hlength, hf, win, n):
trend = None
seas = None
# SST Parameters
hop = 1
lf = 0.005
sigma = 0.15
price = np.array(sig)
PAD = 1
if std_bool:
sig = sig.pct_change().rolling(win).std() * (252**0.5)
sig = sig.to_numpy()
sig = sig[~np.isnan(sig)]
sig = sig.reshape((-1,1))
else:
sig = sig.to_numpy()
sig = sig.reshape((-1,1))
if sig.size % 2 ==0:
sig = sig[1:]
# Signal preprocessing
#b_hp, a_hp = butter(6, 0.5 / 50, 'high')
#b_lp, a_lp = butter(6, 20 / 50, 'low')
sig = np.transpose(sig)
#sig = filtfilt(b_hp, a_hp, sig)
#sig = filtfilt(b_lp, a_lp, sig)
# Apply median filter
# trend = medfilt(sig, 1)
trend = np.ravel(sig)
# Apply LOESS smoothing
# The fraction parameter in lowess is analogous to the window length in MATLAB's smooth
trend_loess = lowess(trend, np.arange(len(trend)), frac=0.1, return_sorted=False)
# Detrend the signal
sig = sig - trend_loess
# Smooth check
if smooth_bool:
sig = savgol_filter(sig, 21, 11) # May need to change parameters (signal, window filter odd, polynomial order for regression)
# temp = sig
if PAD == 1:
# Do SST and Curve Extraction
temp = sig
start_sig = sig[:,0:round(hlength/2)]
end_sig = sig[:,-round(hlength/2):]
sig = np.concatenate((start_sig[::-1], sig, end_sig[::-1]), axis = 1)
[sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
sst = sst[:, round(hlength/2):-round(hlength/2)]
[c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex**2 / (2 * sigma**2))
x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
recon = x1+x2+x3
temp = temp.flatten()
arr1 = temp - np.transpose(np.real(recon))
#arr1 = abs(temp - np.transpose(np.real(recon)))
trend = trend_loess[-1]
seas = np.transpose(np.real(recon)).flatten()
seas = seas[-1]
trend_seas = trend_loess + np.transpose(np.real(recon))
else:
# Do SST and Curve Extraction
[sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
[c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex**2 / (2 * sigma**2))
x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
recon = x1+x2+x3
sig = sig.flatten()
arr1 = sig - np.transpose(np.real(recon))
#arr1 = abs(sig - np.transpose(np.real(recon)))
trend = trend_loess[-1]
seas = np.transpose(np.real(recon)).flatten()
seas = seas[-1]
trend_seas = trend_loess + np.transpose(np.real(recon))
return np.quantile(arr1[-20:], 0.5), np.quantile(arr1, 0.2), np.quantile(arr1, 0.3), np.quantile(arr1, 0.4), np.quantile(arr1, 0.5), np.quantile(arr1, 0.6), np.quantile(arr1, 0.7), np.quantile(arr1, 0.8), trend, seas, arr1[-1]
class BasicTemplateAlgorithm(QCAlgorithm):
def Initialize(self):
# Parameters to change
self.SST_resolution = 'Hour'
self.SST_lookback = 160
self.SST_window = 64+1
self.SST_rolling_window = 60 #NOT BEING USED
self.n = 257
#1. Backtesting Period Selection
# self.TrainingPeriod = "OOSA"
#2. Backtesting Period Lookup
# self.SetStartDate(2020, 1, 1)
# self.SetEndDate(2024, 1, 1)
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2024, 4, 1)
# if self.TrainingPeriod == "test":
# self.SetStartDate(2021, 11, 1)
# self.SetEndDate(2021, 11, 19)
# if self.TrainingPeriod == "IS":
# self.SetStartDate(2017, 1, 1)
# self.SetEndDate(2021, 1, 1)
# self.start_date = datetime(2018, 1, 1, 0, 0)
# self.end_date = datetime(2023, 7, 1, 0, 0)
# if self.TrainingPeriod == "OOSA":
# self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2022, 11, 1)
# if self.TrainingPeriod == "OOSB":
# self.SetStartDate(2016, 1, 1)
# self.SetEndDate(2017, 1, 1)
# if self.TrainingPeriod == "OOSC":
# self.SetStartDate(2010, 1, 1)
# self.SetEndDate(2011, 1, 1)
# if self.TrainingPeriod == "LT":
# self.SetStartDate(2022, 11, 9)
# self.SetEndDate(2022, 12, 9)
# if self.TrainingPeriod == "ST":
# self.SetStartDate(2020, 3, 1)
# self.SetEndDate(2020, 3, 31)
# if self.TrainingPeriod == "BS":
# self.SetStartDate(2019, 1, 1)
# self.SetEndDate(2023, 12, 31)
#3. Set Max Drawdown
self.AddRiskManagement(MaximumDrawdownPercentPortfolio(0.15, True))
#4. Set Initial Cash and Warmup Data
self.SetCash(10000000)
self.SetWarmup(300)
#5. Assign Symbols for SST
self.sst_symbols =[]
if self.SST_resolution == 'Hour':
self.AddEquity('AAPL', Resolution.Hour)
elif self.SST_resolution == 'Minute':
self.AddEquity('AAPL', Resolution.Minute)
elif self.SST_resolution == 'Second':
self.AddEquity('AAPL', Resolution.Second)
self.sst_symbols.append(self.Symbol('AAPL'))
# if self.SST_resolution == 'Hour':
# self.AddEquity('VXX', Resolution.Hour)
# elif self.SST_resolution == 'Minute':
# self.AddEquity('VXX', Resolution.Minute)
# elif self.SST_resolution == 'Second':
# self.AddEquity('VXX', Resolution.Second)
# self.sst_symbols.append(self.Symbol('VXX'))
self.sym = self.sst_symbols[0]
self.lookback = self.SST_lookback
# self.lookback = 160*8
# security = self.AddEquity("VXX", Resolution.Hour)
# self.temp_security = self.Symbol("VXX")
# self.rsi = self.RSI('VXX', 120, Resolution.Minute)
self.invested_bool = 0
# #6. Assign Symbols for HMM
# tickers = ["SPY"]
# self.symbols = []
# for ticker in tickers:
# self.symbols.append(self.AddEquity(ticker, Resolution.Hour).Symbol)
# If V.X.X. then
self.s = "SHORT "
self.l = "LONG "
# # If S.V.X.Y. then
# self.s = "LONG "
# self.l = "SHORT "
def OnData(self, slice: Slice):
#1. Return if warming up
if self.IsWarmingUp:
return
#2. Check trading and liquidation criterion every day at noon
# if int(self.Time.hour) == 12:
if True:
self.df = self.History(self.sst_symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
short_avg, quantile_20, quantile_30, quantile_40, quantile_50, quantile_60, quantile_70, quantile_80, trend, seas, noise = do_stuff(self.dg, False, True, self.SST_window, 0.25, self.SST_rolling_window, self.n)
self.trend = trend
self.seas = seas
self.noise = noise
self.Plot('Quantiles', '20', quantile_20)
self.Plot('Quantiles', '40', quantile_40)
self.Plot('Quantiles', '50', quantile_50)
self.Plot('Quantiles', '60', quantile_60)
self.Plot('Quantiles', '80', quantile_80)
self.Plot('Quantiles', 'Short Median', short_avg)
# self.Debug(f"Type of trend: {type(trend)}, Value: {trend}")
# self.Debug(f"Type of seas: {type(seas)}, Value: {seas}")
# self.Debug(f"Type of noise: {type(noise)}, Value: {noise}")
# # # Example check and plot a single value
# # if isinstance(self.trend, np.ndarray) and self.trend.size > 0:
# self.Plot('Trend', 'Trend', self.trend) # Ensure it's a float and plot the last value
# # Similarly for 'seas' and 'noise'
# # if isinstance(self.seas, np.ndarray) and self.seas.size > 0:
# self.Plot('Seasonality', 'Seasonality', self.seas)
# # if isinstance(self.noise, np.ndarray) and self.noise.size > 0:
# self.Plot('Noise', 'Noise', self.noise)
self.Plot('Trend', 'Trend', self.trend)
self.Plot('Seasonality', 'Seasonality', self.seas)
self.Plot('Noise', 'Noise', self.noise)
self.Plot('Everything', 'Trend+Seasonality', self.trend+self.seas)
self.Plot('Everything', 'Noise+Seasonality', self.noise+self.seas)
self.Plot('Everything', 'SPY', self.Securities["AAPL"].Close)
# self.Plot('Everything', 'Trend+Seasonality', self.trend+self.seas)
# self.Plot('Everything', 'Noise+Seasonality', self.noise+self.seas)
# self.Plot('Everything', 'VXX', self.Securities["VXX"].Close)
# if not self.invested_bool:
# if short_avg < quantile_30:
# self.SetHoldings("VXX", 0.3)
# self.invested_bool = 1
# self.temp = 0
# self.priceVXX = self.Portfolio['VXX'].Price
# self.quant = quantile_40
# self.Debug(self.l + 'VXX at ' + str(self.Time) + " (SST: short avg low REVERSE TREND)")
# elif short_avg > quantile_70:
# self.SetHoldings("VXX", -0.3)
# self.invested_bool = 1
# self.temp = 2
# self.priceVXX = self.Portfolio['VXX'].Price
# self.quant = quantile_60
# self.Debug(self.s + 'VXX at ' + str(self.Time) + " (SST: short avg high REVERSE TREND)")
# else:
# if self.temp == 0 and short_avg > self.quant:
# self.Liquidate()
# self.invested_bool = 0
# self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
# elif self.temp == 2 and short_avg < self.quant:
# self.Liquidate()
# self.invested_bool = 0
# self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
if not self.invested_bool:
if short_avg < quantile_30:
self.SetHoldings("AAPL", 0.3)
self.invested_bool = 1
self.temp = 0
self.priceVXX = self.Portfolio['AAPL'].Price
self.quant = quantile_40
self.Debug(self.l + 'AAPL at ' + str(self.Time) + " (SST: short avg low REVERSE TREND)")
elif short_avg > quantile_70:
self.SetHoldings("AAPL", -0.3)
self.invested_bool = 1
self.temp = 2
self.priceVXX = self.Portfolio['AAPL'].Price
self.quant = quantile_60
self.Debug(self.s + 'AAPL at ' + str(self.Time) + " (SST: short avg high REVERSE TREND)")
else:
if self.temp == 0 and short_avg > self.quant:
self.Liquidate()
self.invested_bool = 0
self.Debug('Liquidate ' + 'AAPL at ' + str(self.Time))
elif self.temp == 2 and short_avg < self.quant:
self.Liquidate()
self.invested_bool = 0
self.Debug('Liquidate ' + 'AAPL at ' + str(self.Time))
#2. Check stop loss criterion every hour of every day
# if self.invested_bool:
# if (self.temp == 0 or self.temp == 3) and (self.Portfolio['VXX'].Price < self.priceVXX*0.9):
# self.Liquidate()
# self.invested_bool = 0
# self.Debug ('Stop Loss ' + self.l+ 'at ' + str(self.Time))
# elif (self.temp == 1 or self.temp == 2) and (self.Portfolio['VXX'].Price > self.priceVXX*1.1):
# self.Liquidate()
# self.invested_bool = 0
# self.Debug ('Stop Loss ' + self.s+ 'at ' + str(self.Time))