| Overall Statistics |
|
Total Orders 982 Average Win 0.95% Average Loss -0.74% Compounding Annual Return -8.667% Drawdown 55.700% Expectancy -0.088 Start Equity 10000000 End Equity 6800090.43 Net Profit -31.999% Sharpe Ratio -0.488 Sortino Ratio -0.633 Probabilistic Sharpe Ratio 0.064% Loss Rate 60% Win Rate 40% Profit-Loss Ratio 1.28 Alpha -0.033 Beta -0.433 Annual Standard Deviation 0.148 Annual Variance 0.022 Information Ratio -0.558 Tracking Error 0.291 Treynor Ratio 0.167 Total Fees $217804.31 Estimated Strategy Capacity $1500000.00 Lowest Capacity Asset VXXB WRBPJAJZ2Q91 Portfolio Turnover 18.91% |
from AlgorithmImports import *
from QuantConnect.Securities.Option import OptionPriceModels
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import poly1d, signal
from datetime import datetime, timedelta
import pytz as pytz
from sklearn.metrics import r2_score, mean_absolute_error
from hmmlearn import hmm
from scipy.signal import stft, gaussian, butter, filtfilt, detrend, medfilt, savgol_filter
from scipy.fft import fft
from scipy.interpolate import interp1d
from numpy import quantile
from inspect import signature
from statsmodels.nonparametric.smoothers_lowess import lowess
def SST_J(x, Fs, hlength=None, hop=1, n=None, hf=np.inf, lf=0, ths=1):
if hlength is None:
raise ValueError("Select a window length.")
if Fs is None:
raise ValueError("Select a sampling rate.")
# Window bandwidth
sigma = 0.15
# Do reassignment
squeeze_flag = True
# Organize input
x = np.array(x).flatten()
if np.any(np.isnan(x)):
x = interp1d(np.where(~np.isnan(x))[0], x[~np.isnan(x)], kind='pchip', fill_value="extrapolate")(np.arange(len(x)))
# Time (samples)
NN = len(x)
t = np.arange(1, NN + 1, hop)
tcol = len(t)
# N-point fft
n = int(n)
n = n + 1 - n % 2
N = 2 * (n - 1)
N = int(N)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex**2 / (2 * sigma**2))
dh = -ex / sigma**2 * h
# Perform convolution
tfr = np.zeros((N, tcol))
tfr2 = np.zeros((N, tcol))
for icol in range(tcol):
ti=t[icol]
tau = np.arange(-min(n - 1, Lh, ti - 1), min(n - 1, Lh, NN - ti) + 1)
indices = np.remainder(N + tau, N)
rSig = x[ti - 1 + tau]
tfr[indices, icol] = rSig * h[Lh + tau]
tfr2[indices, icol] = rSig * dh[Lh + tau]
# Fourier transform
tfr = fft(tfr, axis=0)
tfr = tfr[:n, :]
# Non-negative frequency axis
frequency = Fs / 2 * np.linspace(0, 1, n)
if squeeze_flag:
tfr2 = fft(tfr2, axis=0)
tfr2 = tfr2[:n, :]
# Crop output
u = frequency <= hf
tfr = tfr[u, :]
frequency = frequency[u]
if not squeeze_flag:
return tfr, tfr, frequency
# Crop
tfr2 = tfr2[u, :]
# Reassignment threshold
ths = quantile(np.abs(tfr), (1 - ths),axis=0)
# Omega operator
neta = len(frequency)
omega = -np.inf * np.ones((neta, tcol))
ups = np.abs(tfr) > ths
omega[ups] = np.round(N / hlength * np.imag(tfr2[ups] / tfr[ups] / (2 * np.pi)))
# Mapped out of range
index = np.tile(np.arange(1, neta + 1).reshape(-1, 1), (1, tcol))
omega = index - omega
id = (omega < 1) | (omega > neta) | ~ups
omega[id] = index[id]
sst = tfr
sst[id] = 0
# Reassignment
id = omega + neta * np.arange(tcol)
id_flat = np.transpose(id.ravel())
sst_flat = np.transpose(sst.ravel())
# Need real part and imaginary part also
temp_real = np.bincount(id_flat.astype(int), np.real(sst_flat), minlength=tcol * neta)
temp_imag = np.bincount(id_flat.astype(int), np.imag(sst_flat), minlength=tcol * neta)
# Reshape the result to match the desired shape [neta, tcol]
sst = np.zeros((neta,tcol),dtype=np.complex_)
if temp_real.size == tcol*neta:
sst = sst + np.transpose(np.reshape(temp_real[:], (tcol, neta)))
sst = sst + np.transpose(np.reshape(temp_imag[:], (tcol, neta)))*(0 + 1j)
else:
sst = sst + np.transpose(np.reshape(temp_real[:-1], (tcol, neta)))
sst = sst + np.transpose(np.reshape(temp_imag[:-1], (tcol, neta)))*(0 + 1j)
# Crop
# tfr[frequency < lf, :] = 0
# sst[frequency < lf, :] = 0
frequency = frequency[frequency >= lf]
return sst, tfr, frequency
def curve_ext_m(P, lambda_):
eps = 1e-8
E = P / np.sum(P)
E = -np.log(E + eps)
m, n = E.shape
FVal = np.full((m, n), np.inf)
FVal[0, :] = E[0, :]
c = np.zeros(m, dtype=int)
for ii in range(1, m): # time
for jj in range(n): # freq
# Calculate the penalty term
for kk in range(n):
FVal[ii, jj] = min(FVal[ii, jj], FVal[ii - 1, kk] + lambda_ * (kk - jj) ** 2)
# Add the SST value at time ii and freq jj
FVal[ii, jj] += E[ii, jj]
c[-1] = np.argmin(FVal[-1, :])
for ii in range(m - 2, -1, -1):
val = FVal[ii + 1, c[ii + 1]] - E[ii + 1, c[ii + 1]]
for kk in range(n):
if abs(val - FVal[ii, kk] - lambda_ * (kk - c[ii + 1]) ** 2) < eps:
c[ii] = kk
break
if c[ii] == 0:
c[ii] = n // 2
return c, FVal
def recon_sqSTFT_v2(tfrsq, tfrsqtic, Hz, c, Band, coeff):
alpha = tfrsqtic[1] - tfrsqtic[0]
RR = round(Band / (Hz * alpha))
recon = []
C = 2 * alpha / coeff
for kk in range(len(c)):
idx = range(max(0, c[kk] - RR), min(len(tfrsqtic), c[kk] + RR))
recon.append(C * np.sum(tfrsq[idx, kk]))
return np.array(recon)
def do_stuff(sig, std_bool, smooth_bool, hlength, hf, win, n):
trend = None
seas = None
# SST Parameters
hop = 1
lf = 0.005
sigma = 0.15
price = np.array(sig)
PAD = 1
if std_bool:
sig = sig.pct_change().rolling(win).std() * (252**0.5)
sig = sig.to_numpy()
sig = sig[~np.isnan(sig)]
sig = sig.reshape((-1,1))
else:
sig = sig.to_numpy()
sig = sig.reshape((-1,1))
if sig.size % 2 ==0:
sig = sig[1:]
# Signal preprocessing
#b_hp, a_hp = butter(6, 0.5 / 50, 'high')
#b_lp, a_lp = butter(6, 20 / 50, 'low')
sig = np.transpose(sig)
#sig = filtfilt(b_hp, a_hp, sig)
#sig = filtfilt(b_lp, a_lp, sig)
# Apply median filter
# trend = medfilt(sig, 1)
trend = np.ravel(sig)
# Apply LOESS smoothing
# The fraction parameter in lowess is analogous to the window length in MATLAB's smooth
trend_loess = lowess(trend, np.arange(len(trend)), frac=0.1, return_sorted=False)
# Detrend the signal
sig = sig - trend_loess
# Smooth check
if smooth_bool:
sig = savgol_filter(sig, 21, 11) # May need to change parameters (signal, window filter odd, polynomial order for regression)
# temp = sig
if PAD == 1:
# Do SST and Curve Extraction
temp = sig
start_sig = sig[:,0:round(hlength/2)]
end_sig = sig[:,-round(hlength/2):]
sig = np.concatenate((start_sig[::-1], sig, end_sig[::-1]), axis = 1)
[sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
sst = sst[:, round(hlength/2):-round(hlength/2)]
[c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex**2 / (2 * sigma**2))
x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
recon = x1+x2+x3
temp = temp.flatten()
#arr1 = temp - np.transpose(np.real(recon))
arr1 = abs(temp - np.transpose(np.real(recon)))
trend = trend_loess[-1]
seas = np.transpose(np.real(recon)).flatten()
seas = seas[-1]
trend_seas = trend_loess + np.transpose(np.real(recon))
else:
# Do SST and Curve Extraction
[sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
[c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex**2 / (2 * sigma**2))
x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
recon = x1+x2+x3
sig = sig.flatten()
#arr1 = sig - np.transpose(np.real(recon))
arr1 = abs(sig - np.transpose(np.real(recon)))
trend = trend_loess[-1]
seas = np.transpose(np.real(recon)).flatten()
seas = seas[-1]
trend_seas = trend_loess + np.transpose(np.real(recon))
return np.quantile(arr1[-7:], 0.5), np.quantile(arr1, 0.2), np.quantile(arr1, 0.3), np.quantile(arr1, 0.4), np.quantile(arr1, 0.5), np.quantile(arr1, 0.6), np.quantile(arr1, 0.7), np.quantile(arr1, 0.8), trend, seas, arr1[-1]
class BasicTemplateAlgorithm(QCAlgorithm):
def Initialize(self):
# Parameters to change
self.SST_resolution = 'Hour'
self.SST_lookback = 160 # 160 trading hours
self.SST_window = 64+1
self.SST_rolling_window = 60 #NOT BEING USED
self.n = 257 # 257 faster
#1. Backtesting Period Selection
# self.TrainingPeriod = "OOSA"
#2. Backtesting Period Lookup
# self.SetStartDate(2020, 1, 1)
# self.SetEndDate(2024, 1, 1)
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2024, 4, 1)
# if self.TrainingPeriod == "test":
# self.SetStartDate(2021, 11, 1)
# self.SetEndDate(2021, 11, 19)
# if self.TrainingPeriod == "IS":
# self.SetStartDate(2017, 1, 1)
# self.SetEndDate(2021, 1, 1)
# self.start_date = datetime(2018, 1, 1, 0, 0)
# self.end_date = datetime(2023, 7, 1, 0, 0)
# if self.TrainingPeriod == "OOSA":
# self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2022, 11, 1)
# if self.TrainingPeriod == "OOSB":
# self.SetStartDate(2016, 1, 1)
# self.SetEndDate(2017, 1, 1)
# if self.TrainingPeriod == "OOSC":
# self.SetStartDate(2010, 1, 1)
# self.SetEndDate(2011, 1, 1)
# if self.TrainingPeriod == "LT":
# self.SetStartDate(2022, 11, 9)
# self.SetEndDate(2022, 12, 9)
# if self.TrainingPeriod == "ST":
# self.SetStartDate(2020, 3, 1)
# self.SetEndDate(2020, 3, 31)
# if self.TrainingPeriod == "BS":
# self.SetStartDate(2019, 1, 1)
# self.SetEndDate(2023, 12, 31)
#3. Set Max Drawdown
self.AddRiskManagement(MaximumDrawdownPercentPortfolio(0.15, True))
#4. Set Initial Cash and Warmup Data
self.SetCash(10000000)
self.SetWarmup(300)
#5. Assign Symbols for SST
self.sst_symbols =[]
if self.SST_resolution == 'Hour':
self.AddEquity('SPY', Resolution.Hour)
self.AddEquity('VXX', Resolution.Hour)
elif self.SST_resolution == 'Minute':
self.AddEquity('SPY', Resolution.Minute)
self.AddEquity('VXX', Resolution.Minute)
elif self.SST_resolution == 'Second':
self.AddEquity('SPY', Resolution.Second)
self.AddEquity('VXX', Resolution.Minute)
self.sst_symbols.append(self.Symbol('SPY'))
# if self.SST_resolution == 'Hour':
# self.AddEquity('VXX', Resolution.Hour)
# elif self.SST_resolution == 'Minute':
# self.AddEquity('VXX', Resolution.Minute)
# elif self.SST_resolution == 'Second':
# self.AddEquity('VXX', Resolution.Second)
# self.sst_symbols.append(self.Symbol('VXX'))
self.sym = self.sst_symbols[0]
self.lookback = self.SST_lookback
# self.lookback = 160*8
# security = self.AddEquity("VXX", Resolution.Hour)
# self.temp_security = self.Symbol("VXX")
# self.rsi = self.RSI('VXX', 120, Resolution.Minute)
self.invested_bool = 0
# #6. Assign Symbols for HMM
# tickers = ["SPY"]
# self.symbols = []
# for ticker in tickers:
# self.symbols.append(self.AddEquity(ticker, Resolution.Hour).Symbol)
# If V.X.X. then
self.s = "SHORT "
self.l = "LONG "
# # If S.V.X.Y. then
# self.s = "LONG "
# self.l = "SHORT "
def OnData(self, slice: Slice):
#1. Return if warming up
if self.IsWarmingUp:
return
#2. Check trading and liquidation criterion every day at noon
# if int(self.Time.hour) == 12:
if True:
self.df = self.History(self.sst_symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
short_avg, quantile_20, quantile_30, quantile_40, quantile_50, quantile_60, quantile_70, quantile_80, trend, seas, noise = do_stuff(self.dg, False, True, self.SST_window, 0.25, self.SST_rolling_window, self.n)
self.trend = trend
self.seas = seas
self.noise = noise
self.Plot('Quantiles', '30', quantile_30)
self.Plot('Quantiles', '40', quantile_40)
self.Plot('Quantiles', '50', quantile_50)
self.Plot('Quantiles', '60', quantile_60)
self.Plot('Quantiles', '70', quantile_70)
self.Plot('Quantiles', 'Short Median', short_avg)
self.Plot('VXX', 'VXX', self.Securities["VXX"].Close)
# self.Debug(f"Type of trend: {type(trend)}, Value: {trend}")
# self.Debug(f"Type of seas: {type(seas)}, Value: {seas}")
# self.Debug(f"Type of noise: {type(noise)}, Value: {noise}")
# # # Example check and plot a single value
# # if isinstance(self.trend, np.ndarray) and self.trend.size > 0:
# self.Plot('Trend', 'Trend', self.trend) # Ensure it's a float and plot the last value
# # Similarly for 'seas' and 'noise'
# # if isinstance(self.seas, np.ndarray) and self.seas.size > 0:
# self.Plot('Seasonality', 'Seasonality', self.seas)
# # if isinstance(self.noise, np.ndarray) and self.noise.size > 0:
# self.Plot('Noise', 'Noise', self.noise)
self.Plot('Trend', 'Trend', self.trend)
self.Plot('Seasonality', 'Seasonality', self.seas)
self.Plot('Noise', 'Noise', self.noise)
self.Plot('Everything', 'Trend+Seasonality', self.trend+self.seas)
self.Plot('Everything', 'Noise+Seasonality', self.noise+self.seas)
self.Plot('Everything', 'SPY', self.Securities["SPY"].Close)
# self.Plot('Everything', 'Trend+Seasonality+Noise', self.trend+self.seas+self.noise)
# self.Plot('Everything', 'VXX', self.dg[-1])
# follow vol spikes
if not self.invested_bool:
if short_avg > quantile_70:
# following spike vol
self.SetHoldings("VXX", 0.3)
self.invested_bool = 1
self.priceVXX = self.Portfolio['VXX'].Price
self.quant = quantile_60
self.Debug(self.l + 'VXX at ' + str(self.Time) + " (SST: VXX has spikes")
else:
if short_avg < self.quant:
self.Liquidate()
self.invested_bool = 0
self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
#2. Check stop loss criterion every hour of every day
# if self.invested_bool:
# if (self.temp == 0 or self.temp == 3) and (self.Portfolio['VXX'].Price < self.priceVXX*0.9):
# self.Liquidate()
# self.invested_bool = 0
# self.Debug ('Stop Loss ' + self.l+ 'at ' + str(self.Time))
# elif (self.temp == 1 or self.temp == 2) and (self.Portfolio['VXX'].Price > self.priceVXX*1.1):
# self.Liquidate()
# self.invested_bool = 0
# self.Debug ('Stop Loss ' + self.s+ 'at ' + str(self.Time))