| Overall Statistics |
|
Total Orders 2169 Average Win 0.77% Average Loss -0.80% Compounding Annual Return -22.228% Drawdown 70.700% Expectancy -0.103 Net Profit -63.382% Sharpe Ratio -0.98 Sortino Ratio -1.347 Probabilistic Sharpe Ratio 0.000% Loss Rate 54% Win Rate 46% Profit-Loss Ratio 0.97 Alpha -0.138 Beta -0.36 Annual Standard Deviation 0.17 Annual Variance 0.029 Information Ratio -0.827 Tracking Error 0.3 Treynor Ratio 0.463 Total Fees $248165.98 Estimated Strategy Capacity $760000.00 Lowest Capacity Asset VXXB WRBPJAJZ2Q91 Portfolio Turnover 44.50% |
from AlgorithmImports import *
from QuantConnect.Securities.Option import OptionPriceModels
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import poly1d, signal
from datetime import datetime, timedelta
import pytz as pytz
from sklearn.metrics import r2_score, mean_absolute_error
from hmmlearn import hmm
from scipy.signal import stft, gaussian, butter, filtfilt, detrend, medfilt, savgol_filter
from scipy.fft import fft
from scipy.interpolate import interp1d
from numpy import quantile
from inspect import signature
from statsmodels.nonparametric.smoothers_lowess import lowess
def SST_J(x, Fs, hlength=None, hop=1, n=None, hf=np.inf, lf=0, ths=1):
if hlength is None:
raise ValueError("Select a window length.")
if Fs is None:
raise ValueError("Select a sampling rate.")
# Window bandwidth
sigma = 0.15
# Do reassignment
squeeze_flag = True
# Organize input
x = np.array(x).flatten()
if np.any(np.isnan(x)):
x = interp1d(np.where(~np.isnan(x))[0], x[~np.isnan(x)], kind='pchip', fill_value="extrapolate")(np.arange(len(x)))
# Time (samples)
NN = len(x)
t = np.arange(1, NN + 1, hop)
tcol = len(t)
# N-point fft
n = int(n)
n = n + 1 - n % 2
N = 2 * (n - 1)
N = int(N)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex*2 / (2 * sigma*2))
dh = -ex / sigma**2 * h
# Perform convolution
tfr = np.zeros((N, tcol))
tfr2 = np.zeros((N, tcol))
for icol in range(tcol):
ti=t[icol]
tau = np.arange(-min(n - 1, Lh, ti - 1), min(n - 1, Lh, NN - ti) + 1)
indices = np.remainder(N + tau, N)
rSig = x[ti - 1 + tau]
tfr[indices, icol] = rSig * h[Lh + tau]
tfr2[indices, icol] = rSig * dh[Lh + tau]
# Fourier transform
tfr = fft(tfr, axis=0)
tfr = tfr[:n, :]
# Non-negative frequency axis
frequency = Fs / 2 * np.linspace(0, 1, n)
if squeeze_flag:
tfr2 = fft(tfr2, axis=0)
tfr2 = tfr2[:n, :]
# Crop output
u = frequency <= hf
tfr = tfr[u, :]
frequency = frequency[u]
if not squeeze_flag:
return tfr, tfr, frequency
# Crop
tfr2 = tfr2[u, :]
# Reassignment threshold
ths = quantile(np.abs(tfr), (1 - ths),axis=0)
# Omega operator
neta = len(frequency)
omega = -np.inf * np.ones((neta, tcol))
ups = np.abs(tfr) > ths
omega[ups] = np.round(N / hlength * np.imag(tfr2[ups] / tfr[ups] / (2 * np.pi)))
# Mapped out of range
index = np.tile(np.arange(1, neta + 1).reshape(-1, 1), (1, tcol))
omega = index - omega
id = (omega < 1) | (omega > neta) | ~ups
omega[id] = index[id]
sst = tfr
sst[id] = 0
# Reassignment
id = omega + neta * np.arange(tcol)
id_flat = np.transpose(id.ravel())
sst_flat = np.transpose(sst.ravel())
# Need real part and imaginary part also
temp_real = np.bincount(id_flat.astype(int), np.real(sst_flat), minlength=tcol * neta)
temp_imag = np.bincount(id_flat.astype(int), np.imag(sst_flat), minlength=tcol * neta)
# Reshape the result to match the desired shape [neta, tcol]
sst = np.zeros((neta,tcol),dtype=np.complex_)
if temp_real.size == tcol*neta:
sst = sst + np.transpose(np.reshape(temp_real[:], (tcol, neta)))
sst = sst + np.transpose(np.reshape(temp_imag[:], (tcol, neta)))*(0 + 1j)
else:
sst = sst + np.transpose(np.reshape(temp_real[:-1], (tcol, neta)))
sst = sst + np.transpose(np.reshape(temp_imag[:-1], (tcol, neta)))*(0 + 1j)
# Crop
# tfr[frequency < lf, :] = 0
# sst[frequency < lf, :] = 0
frequency = frequency[frequency >= lf]
return sst, tfr, frequency
def curve_ext_m(P, lambda_):
eps = 1e-8
E = P / np.sum(P)
E = -np.log(E + eps)
m, n = E.shape
FVal = np.full((m, n), np.inf)
FVal[0, :] = E[0, :]
c = np.zeros(m, dtype=int)
for ii in range(1, m): # time
for jj in range(n): # freq
# Calculate the penalty term
for kk in range(n):
FVal[ii, jj] = min(FVal[ii, jj], FVal[ii - 1, kk] + lambda_ * (kk - jj) ** 2)
# Add the SST value at time ii and freq jj
FVal[ii, jj] += E[ii, jj]
c[-1] = np.argmin(FVal[-1, :])
for ii in range(m - 2, -1, -1):
val = FVal[ii + 1, c[ii + 1]] - E[ii + 1, c[ii + 1]]
for kk in range(n):
if abs(val - FVal[ii, kk] - lambda_ * (kk - c[ii + 1]) ** 2) < eps:
c[ii] = kk
break
if c[ii] == 0:
c[ii] = n // 2
return c, FVal
def recon_sqSTFT_v2(tfrsq, tfrsqtic, Hz, c, Band, coeff):
alpha = tfrsqtic[1] - tfrsqtic[0]
RR = round(Band / (Hz * alpha))
recon = []
C = 2 * alpha / coeff
for kk in range(len(c)):
idx = range(max(0, c[kk] - RR), min(len(tfrsqtic), c[kk] + RR))
recon.append(C * np.sum(tfrsq[idx, kk]))
return np.array(recon)
def do_stuff(sig, std_bool, smooth_bool, hlength, hf, win, n):
# SST Parameters
hop = 1
lf = 0.005
sigma = 0.15
price = np.array(sig)
PAD = 1
if std_bool:
sig = sig.pct_change().rolling(win).std() * (252**0.5)
sig = sig.to_numpy()
sig = sig[~np.isnan(sig)]
sig = sig.reshape((-1,1))
else:
sig = sig.to_numpy()
if sig.size % 2 ==0:
sig = sig[1:]
# Signal preprocessing
b_hp, a_hp = butter(6, 0.5 / 50, 'high')
b_lp, a_lp = butter(6, 20 / 50, 'low')
sig = np.transpose(sig)
sig = filtfilt(b_hp, a_hp, sig)
sig = filtfilt(b_lp, a_lp, sig)
# Apply median filter
trend = medfilt(sig, 1)
trend = np.ravel(trend)
# Apply LOESS smoothing
# The fraction parameter in lowess is analogous to the window length in MATLAB's smooth
trend_loess = lowess(trend, np.arange(len(trend)), frac=0.1, return_sorted=False)
# Detrend the signal
sig = sig - trend_loess
# Smooth check
if smooth_bool:
sig = savgol_filter(sig, 51, 11)
# temp = sig
if PAD == 1:
# Do SST and Curve Extraction
temp = sig
start_sig = sig[:,0:round(hlength/2)]
end_sig = sig[:,-round(hlength/2):]
sig = np.concatenate((start_sig[::-1], sig, end_sig[::-1]), axis = 1)
[sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
sst = sst[:, round(hlength/2):-round(hlength/2)]
[c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex*2 / (2 * sigma*2))
x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
recon = x1+x2+x3
temp = temp.flatten()
arr1 = abs(temp - np.transpose(np.real(recon)))
arr2 = abs(np.diff(np.log(price)))
else:
# Do SST and Curve Extraction
[sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
[c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)
# Make sure window length is odd
hlength = hlength + 1 - hlength % 2
Lh = (hlength - 1) // 2
# Gaussian window and its derivative
ex = np.linspace(-0.5, 0.5, hlength)
h = np.exp(-ex*2 / (2 * sigma*2))
x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
recon = x1+x2+x3
sig = sig.flatten()
arr1 = abs(sig - np.transpose(np.real(recon)))
arr2 = abs(np.diff(np.log(price)))
return np.quantile(arr1[-60:], 0.5), np.quantile(arr1, 0.2), np.quantile(arr1, 0.4), np.quantile(arr1, 0.5), np.quantile(arr1, 0.6), np.quantile(arr1, 0.8)
class BasicTemplateAlgorithm(QCAlgorithm):
def Initialize(self):
# Parameters to change
self.SST_resolution = 'Minute'
self.SST_lookback = 2250 # 60*7.5*5
self.SST_window = 181 # 60*3+1
self.SST_rolling_window = 60
self.n = 129
#1. Backtesting Period Selection
# self.TrainingPeriod = "OOSA"
#2. Backtesting Period Lookup
self.SetStartDate(2020, 1, 1)
#self.SetEndDate(2020, 1, 2)
self.SetEndDate(2024, 1, 1)
# self.SetStartDate(2023, 6, 1)
# self.SetEndDate(2024, 3, 11)
# if self.TrainingPeriod == "test":
# self.SetStartDate(2021, 11, 1)
# self.SetEndDate(2021, 11, 19)
# if self.TrainingPeriod == "IS":
# self.SetStartDate(2017, 1, 1)
# self.SetEndDate(2021, 1, 1)
# self.start_date = datetime(2018, 1, 1, 0, 0)
# self.end_date = datetime(2023, 7, 1, 0, 0)
# if self.TrainingPeriod == "OOSA":
# self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2022, 11, 1)
# if self.TrainingPeriod == "OOSB":
# self.SetStartDate(2016, 1, 1)
# self.SetEndDate(2017, 1, 1)
# if self.TrainingPeriod == "OOSC":
# self.SetStartDate(2010, 1, 1)
# self.SetEndDate(2011, 1, 1)
# if self.TrainingPeriod == "LT":
# self.SetStartDate(2022, 11, 9)
# self.SetEndDate(2022, 12, 9)
# if self.TrainingPeriod == "ST":
# self.SetStartDate(2020, 3, 1)
# self.SetEndDate(2020, 3, 31)
# if self.TrainingPeriod == "BS":
# self.SetStartDate(2019, 1, 1)
# self.SetEndDate(2023, 12, 31)
#3. Set Max Drawdown
self.AddRiskManagement(MaximumDrawdownPercentPortfolio(0.15, True))
#4. Set Initial Cash and Warmup Data
self.SetCash(10000000)
self.SetWarmup(2300)
#5. Assign Symbols for SST
self.sst_symbols =[]
if self.SST_resolution == 'Hour':
self.AddEquity('SPY', Resolution.Hour)
elif self.SST_resolution == 'Minute':
self.AddEquity('SPY', Resolution.Minute)
elif self.SST_resolution == 'Second':
self.AddEquity('SPY', Resolution.Second)
self.sst_symbols.append(self.Symbol('SPY'))
# if self.SST_resolution == 'Hour':
# self.AddEquity('VXX', Resolution.Hour)
# elif self.SST_resolution == 'Minute':
# self.AddEquity('VXX', Resolution.Minute)
# elif self.SST_resolution == 'Second':
# self.AddEquity('VXX', Resolution.Second)
self.sst_symbols.append(self.Symbol('SPY'))
self.sym = self.sst_symbols[0]
self.lookback = self.SST_lookback
# self.lookback = 160*8
security = self.AddEquity("VXX", Resolution.Minute)
self.temp_security = self.Symbol("VXX")
# self.rsi = self.RSI('VXX', 120, Resolution.Minute)
self.invested_bool = 0
# #6. Assign Symbols for HMM
# tickers = ["SPY"]
# self.symbols = []
# for ticker in tickers:
# self.symbols.append(self.AddEquity(ticker, Resolution.Hour).Symbol)
# If V.X.X. then
self.s = "SHORT "
self.l = "LONG "
# # If S.V.X.Y. then
# self.s = "LONG "
# self.l = "SHORT "
def OnData(self, slice: Slice):
#1. Return if warming up
if self.IsWarmingUp:
return
#2. Check trading and liquidation criterion every day at noon
if int(self.Time.minute) == 15:
self.df = self.History(self.sst_symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
short_avg, quantile_20, quantile_40, quantile_50, quantile_60, quantile_80 = do_stuff(self.dg, True, False, self.SST_window, 0.25, self.SST_rolling_window, self.n)
if not self.invested_bool:
if short_avg < quantile_40:
self.SetHoldings("VXX", 0.3)
self.invested_bool = 1
self.temp = 0
self.priceVXX = self.Portfolio['VXX'].Price
self.quant = quantile_50
self.Debug(self.l + 'VXX at ' + str(self.Time) + " (SST: short avg low REVERSE TREND)")
elif short_avg > quantile_60:
self.SetHoldings("VXX", -0.3)
self.invested_bool = 1
self.temp = 2
self.priceVXX = self.Portfolio['VXX'].Price
self.quant = quantile_50
self.Debug(self.s + 'VXX at ' + str(self.Time) + " (SST: short avg high REVERSE TREND)")
else:
if self.temp == 0 and short_avg > self.quant:
self.Liquidate()
self.invested_bool = 0
self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
elif self.temp == 2 and short_avg < self.quant:
self.Liquidate()
self.invested_bool = 0
self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
#2. Check stop loss criterion every hour of every day
# if self.invested_bool:
# if (self.temp == 0 or self.temp == 3) and (self.Portfolio['VXX'].Price < self.priceVXX*0.95):
# self.Liquidate()
# self.invested_bool = 0
# self.Debug ('Stop Loss ' + self.l+ 'at ' + str(self.Time))
# elif (self.temp == 1 or self.temp == 2) and (self.Portfolio['VXX'].Price > self.priceVXX*1.05):
# self.Liquidate()
# self.invested_bool = 0
# self.Debug ('Stop Loss ' + self.s+ 'at ' + str(self.Time))