Overall Statistics
Total Orders
1833
Average Win
0.87%
Average Loss
-0.93%
Compounding Annual Return
-22.344%
Drawdown
72.400%
Expectancy
-0.105
Net Profit
-63.600%
Sharpe Ratio
-1.01
Sortino Ratio
-1.414
Probabilistic Sharpe Ratio
0.000%
Loss Rate
54%
Win Rate
46%
Profit-Loss Ratio
0.93
Alpha
-0.14
Beta
-0.352
Annual Standard Deviation
0.167
Annual Variance
0.028
Information Ratio
-0.841
Tracking Error
0.297
Treynor Ratio
0.479
Total Fees
$203088.73
Estimated Strategy Capacity
$630000.00
Lowest Capacity Asset
VXXB WRBPJAJZ2Q91
Portfolio Turnover
37.60%
from AlgorithmImports import *
from QuantConnect.Securities.Option import OptionPriceModels
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import poly1d, signal
from datetime import datetime, timedelta
import pytz as pytz
from sklearn.metrics import r2_score, mean_absolute_error
from hmmlearn import hmm
from scipy.signal import stft, gaussian, butter, filtfilt, detrend, medfilt, savgol_filter
from scipy.fft import fft
from scipy.interpolate import interp1d
from numpy import quantile
from inspect import signature
from statsmodels.nonparametric.smoothers_lowess import lowess
 
def SST_J(x, Fs, hlength=None, hop=1, n=None, hf=np.inf, lf=0, ths=1):
    if hlength is None:
        raise ValueError("Select a window length.")
    if Fs is None:
        raise ValueError("Select a sampling rate.")

    # Window bandwidth
    sigma = 0.15

    # Do reassignment
    squeeze_flag = True

    # Organize input
    x = np.array(x).flatten()
    if np.any(np.isnan(x)):
        x = interp1d(np.where(~np.isnan(x))[0], x[~np.isnan(x)], kind='pchip', fill_value="extrapolate")(np.arange(len(x)))

    # Time (samples)
    NN = len(x)
    t = np.arange(1, NN + 1, hop)
    tcol = len(t)

    # N-point fft
    n = int(n)
    n = n + 1 - n % 2
    N = 2 * (n - 1)
    N = int(N)

    # Make sure window length is odd
    hlength = hlength + 1 - hlength % 2
    Lh = (hlength - 1) // 2

    # Gaussian window and its derivative
    ex = np.linspace(-0.5, 0.5, hlength)
    h = np.exp(-ex*2 / (2 * sigma*2))
    dh = -ex / sigma**2 * h

    # Perform convolution
    tfr = np.zeros((N, tcol))
    tfr2 = np.zeros((N, tcol))

    for icol in range(tcol):
        ti=t[icol]
        tau = np.arange(-min(n - 1, Lh, ti - 1), min(n - 1, Lh, NN - ti) + 1)
        indices = np.remainder(N + tau, N)
        rSig = x[ti - 1 + tau]
        tfr[indices, icol] = rSig * h[Lh + tau]
        tfr2[indices, icol] = rSig * dh[Lh + tau]

    # Fourier transform
    tfr = fft(tfr, axis=0)
    tfr = tfr[:n, :]

    # Non-negative frequency axis
    frequency = Fs / 2 * np.linspace(0, 1, n)

    if squeeze_flag:
        tfr2 = fft(tfr2, axis=0)
        tfr2 = tfr2[:n, :]

    # Crop output
    u = frequency <= hf
    tfr = tfr[u, :]
    frequency = frequency[u]

    if not squeeze_flag:
        return tfr, tfr, frequency

    # Crop
    tfr2 = tfr2[u, :]

    # Reassignment threshold 
    ths = quantile(np.abs(tfr), (1 - ths),axis=0)

    # Omega operator
    neta = len(frequency)
    omega = -np.inf * np.ones((neta, tcol))
    ups = np.abs(tfr) > ths
    omega[ups] = np.round(N / hlength * np.imag(tfr2[ups] / tfr[ups] / (2 * np.pi)))

    # Mapped out of range
    index = np.tile(np.arange(1, neta + 1).reshape(-1, 1), (1, tcol))
    omega = index - omega
    id = (omega < 1) | (omega > neta) | ~ups
    omega[id] = index[id]
    sst = tfr
    sst[id] = 0

    # Reassignment
    id = omega + neta * np.arange(tcol)
    id_flat = np.transpose(id.ravel())
    sst_flat = np.transpose(sst.ravel())

    # Need real part and imaginary part also
    temp_real = np.bincount(id_flat.astype(int), np.real(sst_flat), minlength=tcol * neta)
    temp_imag = np.bincount(id_flat.astype(int), np.imag(sst_flat), minlength=tcol * neta)

    # Reshape the result to match the desired shape [neta, tcol]
    sst = np.zeros((neta,tcol),dtype=np.complex_)


    if temp_real.size == tcol*neta:
        sst = sst + np.transpose(np.reshape(temp_real[:], (tcol, neta)))
        sst = sst + np.transpose(np.reshape(temp_imag[:], (tcol, neta)))*(0 + 1j)
    else:
        sst = sst + np.transpose(np.reshape(temp_real[:-1], (tcol, neta)))
        sst = sst + np.transpose(np.reshape(temp_imag[:-1], (tcol, neta)))*(0 + 1j)

    # Crop
    # tfr[frequency < lf, :] = 0
    # sst[frequency < lf, :] = 0
    frequency = frequency[frequency >= lf]

    return sst, tfr, frequency


def curve_ext_m(P, lambda_):
    eps = 1e-8
    E = P / np.sum(P)
    E = -np.log(E + eps)
    m, n = E.shape
    FVal = np.full((m, n), np.inf)
    FVal[0, :] = E[0, :]
    c = np.zeros(m, dtype=int)
    for ii in range(1, m):  # time
        for jj in range(n):  # freq
            # Calculate the penalty term
            for kk in range(n):
                FVal[ii, jj] = min(FVal[ii, jj], FVal[ii - 1, kk] + lambda_ * (kk - jj) ** 2)

            # Add the SST value at time ii and freq jj
            FVal[ii, jj] += E[ii, jj]
    c[-1] = np.argmin(FVal[-1, :])
    for ii in range(m - 2, -1, -1):
        val = FVal[ii + 1, c[ii + 1]] - E[ii + 1, c[ii + 1]]
        for kk in range(n):
            if abs(val - FVal[ii, kk] - lambda_ * (kk - c[ii + 1]) ** 2) < eps:
                c[ii] = kk
                break
        if c[ii] == 0:
            c[ii] = n // 2
    return c, FVal


def recon_sqSTFT_v2(tfrsq, tfrsqtic, Hz, c, Band, coeff):
    alpha = tfrsqtic[1] - tfrsqtic[0]
    RR = round(Band / (Hz * alpha))
    recon = []
    C = 2 * alpha / coeff
    for kk in range(len(c)):
        idx = range(max(0, c[kk] - RR), min(len(tfrsqtic), c[kk] + RR))
        recon.append(C * np.sum(tfrsq[idx, kk]))
    return np.array(recon)


def do_stuff(sig, std_bool, smooth_bool, hlength, hf, win, n):
    # SST Parameters
    hop = 1
    lf = 0.005
    sigma = 0.15
    price = np.array(sig)

    PAD = 1

    if std_bool:
        sig = sig.pct_change().rolling(win).std() * (252**0.5)
        sig = sig.to_numpy()
        sig = sig[~np.isnan(sig)]
        sig = sig.reshape((-1,1))
    else:
        sig = sig.to_numpy()

    if sig.size % 2 ==0:
        sig = sig[1:]

    # Signal preprocessing
    b_hp, a_hp = butter(6, 0.5 / 50, 'high')
    b_lp, a_lp = butter(6, 20 / 50, 'low')
    sig = np.transpose(sig)
    sig = filtfilt(b_hp, a_hp, sig)
    sig = filtfilt(b_lp, a_lp, sig)

    # Apply median filter
    trend = medfilt(sig, 1)
    trend = np.ravel(trend)

    # Apply LOESS smoothing
    # The fraction parameter in lowess is analogous to the window length in MATLAB's smooth
    trend_loess = lowess(trend, np.arange(len(trend)), frac=0.1, return_sorted=False)

    # Detrend the signal
    sig = sig - trend_loess

    # Smooth check
    if smooth_bool:
        sig = savgol_filter(sig, 51, 11)

    # temp = sig
    if PAD == 1:
        # Do SST and Curve Extraction
        temp = sig
        start_sig = sig[:,0:round(hlength/2)]
        end_sig = sig[:,-round(hlength/2):]
        sig = np.concatenate((start_sig[::-1], sig, end_sig[::-1]), axis = 1)
        [sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
        sst = sst[:, round(hlength/2):-round(hlength/2)]

        [c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)

        # Make sure window length is odd
        hlength = hlength + 1 - hlength % 2
        Lh = (hlength - 1) // 2

        # Gaussian window and its derivative
        ex = np.linspace(-0.5, 0.5, hlength)
        h = np.exp(-ex*2 / (2 * sigma*2))

        x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
        x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
        x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
        
        recon = x1+x2+x3
        temp = temp.flatten()
        arr1 = abs(temp - np.transpose(np.real(recon)))
        arr2 = abs(np.diff(np.log(price)))
    else:
        # Do SST and Curve Extraction

        [sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)

        [c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)

        # Make sure window length is odd
        hlength = hlength + 1 - hlength % 2
        Lh = (hlength - 1) // 2

        # Gaussian window and its derivative
        ex = np.linspace(-0.5, 0.5, hlength)
        h = np.exp(-ex*2 / (2 * sigma*2))

        x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
        x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
        x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
        
        recon = x1+x2+x3
        sig = sig.flatten()
        arr1 = abs(sig - np.transpose(np.real(recon)))
        arr2 = abs(np.diff(np.log(price)))

    return np.quantile(arr1[-60:], 0.5), np.quantile(arr1, 0.2), np.quantile(arr1, 0.4), np.quantile(arr1, 0.5), np.quantile(arr1, 0.6), np.quantile(arr1, 0.8)

class BasicTemplateAlgorithm(QCAlgorithm):
 
    def Initialize(self):
        # Parameters to change
        self.SST_resolution = 'Minute'
        self.SST_lookback = 2250 # 60*7.5*5
        self.SST_window = 451 # 60*7.5+1
        self.SST_rolling_window = 60
        self.n = 129

        

        #1. Backtesting Period Selection
        # self.TrainingPeriod = "OOSA"
 
        #2. Backtesting Period Lookup
        self.SetStartDate(2020, 1, 1)
        #self.SetEndDate(2020, 1, 2)
        self.SetEndDate(2024, 1, 1)

        # self.SetStartDate(2023, 6, 1)
        # self.SetEndDate(2024, 3, 11)

        # if self.TrainingPeriod == "test":
        #     self.SetStartDate(2021, 11, 1)
        #     self.SetEndDate(2021, 11, 19)
        # if self.TrainingPeriod == "IS":
        #     self.SetStartDate(2017, 1, 1)
        #     self.SetEndDate(2021, 1, 1)
        #     self.start_date = datetime(2018, 1, 1, 0, 0)
        #     self.end_date = datetime(2023, 7, 1, 0, 0)
        # if self.TrainingPeriod == "OOSA":
        #     self.SetStartDate(2022, 1, 1)
        #     self.SetEndDate(2022, 11, 1)
        # if self.TrainingPeriod == "OOSB":
        #     self.SetStartDate(2016, 1, 1)
        #     self.SetEndDate(2017, 1, 1)
        # if self.TrainingPeriod == "OOSC":
        #     self.SetStartDate(2010, 1, 1)
        #     self.SetEndDate(2011, 1, 1)
        # if self.TrainingPeriod == "LT":
        #     self.SetStartDate(2022, 11, 9)
        #     self.SetEndDate(2022, 12, 9)
        # if self.TrainingPeriod == "ST":
        #     self.SetStartDate(2020, 3, 1)
        #     self.SetEndDate(2020, 3, 31)
        # if self.TrainingPeriod == "BS":
        #     self.SetStartDate(2019, 1, 1)
        #     self.SetEndDate(2023, 12, 31)
 
        #3. Set Max Drawdown
        self.AddRiskManagement(MaximumDrawdownPercentPortfolio(0.15, True))

        #4. Set Initial Cash and Warmup Data
        self.SetCash(10000000)
        self.SetWarmup(2300)

        #5. Assign Symbols for SST
        self.sst_symbols =[]
        if self.SST_resolution == 'Hour':
            self.AddEquity('SPY', Resolution.Hour)
        elif self.SST_resolution == 'Minute':
            self.AddEquity('SPY', Resolution.Minute)
        elif self.SST_resolution == 'Second':
            self.AddEquity('SPY', Resolution.Second)
        self.sst_symbols.append(self.Symbol('SPY'))
        # if self.SST_resolution == 'Hour':
        #     self.AddEquity('VXX', Resolution.Hour)
        # elif self.SST_resolution == 'Minute':
        #     self.AddEquity('VXX', Resolution.Minute)
        # elif self.SST_resolution == 'Second':
        #     self.AddEquity('VXX', Resolution.Second)
        self.sst_symbols.append(self.Symbol('SPY'))
        self.sym = self.sst_symbols[0]
        self.lookback = self.SST_lookback
        # self.lookback = 160*8
        security = self.AddEquity("VXX", Resolution.Minute)
        self.temp_security = self.Symbol("VXX")
        # self.rsi = self.RSI('VXX', 120, Resolution.Minute)
        self.invested_bool = 0
 
        # #6. Assign Symbols for HMM
        # tickers = ["SPY"]
        # self.symbols = []
        # for ticker in tickers:
        #     self.symbols.append(self.AddEquity(ticker, Resolution.Hour).Symbol)

        
        # If V.X.X. then
        self.s = "SHORT "
        self.l = "LONG "

        # # If S.V.X.Y. then
        # self.s = "LONG "
        # self.l = "SHORT "
  
    def OnData(self, slice: Slice):
        #1. Return if warming up
        if self.IsWarmingUp:
            return

        #2. Check trading and liquidation criterion every day at noon
        if int(self.Time.minute) == 30:
            self.df = self.History(self.sst_symbols, self.lookback)
            self.dg = self.df["close"].unstack(level=0)
            short_avg, quantile_20, quantile_40, quantile_50, quantile_60, quantile_80  = do_stuff(self.dg, True, False, self.SST_window, 0.25, self.SST_rolling_window, self.n)

            if not self.invested_bool:
                if short_avg < quantile_40:
                    self.SetHoldings("VXX", 0.3)
                    self.invested_bool = 1
                    self.temp = 0
                    self.priceVXX = self.Portfolio['VXX'].Price
                    self.quant = quantile_50
                    self.Debug(self.l + 'VXX at ' + str(self.Time) + " (SST: short avg low REVERSE TREND)")
                elif short_avg > quantile_60:
                    self.SetHoldings("VXX", -0.3)
                    self.invested_bool = 1
                    self.temp = 2
                    self.priceVXX = self.Portfolio['VXX'].Price
                    self.quant = quantile_50
                    self.Debug(self.s + 'VXX at ' + str(self.Time) + " (SST: short avg high REVERSE TREND)")   
            else:
                if self.temp == 0 and short_avg > self.quant:
                    self.Liquidate()
                    self.invested_bool = 0
                    self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
                elif self.temp == 2 and short_avg < self.quant:
                    self.Liquidate()
                    self.invested_bool = 0
                    self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
        #2. Check stop loss criterion every hour of every day
        # if self.invested_bool:
        #     if (self.temp == 0 or self.temp == 3) and (self.Portfolio['VXX'].Price < self.priceVXX*0.95):
        #         self.Liquidate()
        #         self.invested_bool = 0
        #         self.Debug ('Stop Loss ' + self.l+ 'at ' + str(self.Time))
        #     elif (self.temp == 1 or self.temp == 2) and (self.Portfolio['VXX'].Price > self.priceVXX*1.05):
        #         self.Liquidate()
        #         self.invested_bool = 0
        #         self.Debug ('Stop Loss ' + self.s+ 'at ' + str(self.Time))