Overall Statistics
Total Orders
692
Average Win
1.01%
Average Loss
-0.90%
Compounding Annual Return
0.567%
Drawdown
25.000%
Expectancy
0.020
Start Equity
10000000
End Equity
10243635.47
Net Profit
2.436%
Sharpe Ratio
-0.097
Sortino Ratio
-0.078
Probabilistic Sharpe Ratio
1.182%
Loss Rate
52%
Win Rate
48%
Profit-Loss Ratio
1.13
Alpha
-0.008
Beta
-0.036
Annual Standard Deviation
0.116
Annual Variance
0.013
Information Ratio
-0.456
Tracking Error
0.222
Treynor Ratio
0.318
Total Fees
$218303.79
Estimated Strategy Capacity
$950000.00
Lowest Capacity Asset
VXXB WRBPJAJZ2Q91
Portfolio Turnover
13.34%
from AlgorithmImports import *
from QuantConnect.Securities.Option import OptionPriceModels
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import poly1d, signal
from datetime import datetime, timedelta
import pytz as pytz
from sklearn.metrics import r2_score, mean_absolute_error
from hmmlearn import hmm
from scipy.signal import stft, gaussian, butter, filtfilt, detrend, medfilt, savgol_filter
from scipy.fft import fft
from scipy.interpolate import interp1d
from numpy import quantile
from inspect import signature
from statsmodels.nonparametric.smoothers_lowess import lowess
 
def SST_J(x, Fs, hlength=None, hop=1, n=None, hf=np.inf, lf=0, ths=1):
    if hlength is None:
        raise ValueError("Select a window length.")
    if Fs is None:
        raise ValueError("Select a sampling rate.")

    # Window bandwidth
    sigma = 0.15

    # Do reassignment
    squeeze_flag = True

    # Organize input
    x = np.array(x).flatten()
    if np.any(np.isnan(x)):
        x = interp1d(np.where(~np.isnan(x))[0], x[~np.isnan(x)], kind='pchip', fill_value="extrapolate")(np.arange(len(x)))

    # Time (samples)
    NN = len(x)
    t = np.arange(1, NN + 1, hop)
    tcol = len(t)

    # N-point fft
    n = int(n)
    n = n + 1 - n % 2
    N = 2 * (n - 1)
    N = int(N)

    # Make sure window length is odd
    hlength = hlength + 1 - hlength % 2
    Lh = (hlength - 1) // 2

    # Gaussian window and its derivative
    ex = np.linspace(-0.5, 0.5, hlength)
    h = np.exp(-ex**2 / (2 * sigma**2))
    dh = -ex / sigma**2 * h

    # Perform convolution
    tfr = np.zeros((N, tcol))
    tfr2 = np.zeros((N, tcol))

    for icol in range(tcol):
        ti=t[icol]
        tau = np.arange(-min(n - 1, Lh, ti - 1), min(n - 1, Lh, NN - ti) + 1)
        indices = np.remainder(N + tau, N)
        rSig = x[ti - 1 + tau]
        tfr[indices, icol] = rSig * h[Lh + tau]
        tfr2[indices, icol] = rSig * dh[Lh + tau]

    # Fourier transform
    tfr = fft(tfr, axis=0)
    tfr = tfr[:n, :]

    # Non-negative frequency axis
    frequency = Fs / 2 * np.linspace(0, 1, n)

    if squeeze_flag:
        tfr2 = fft(tfr2, axis=0)
        tfr2 = tfr2[:n, :]

    # Crop output
    u = frequency <= hf
    tfr = tfr[u, :]
    frequency = frequency[u]

    if not squeeze_flag:
        return tfr, tfr, frequency

    # Crop
    tfr2 = tfr2[u, :]

    # Reassignment threshold 
    ths = quantile(np.abs(tfr), (1 - ths),axis=0)

    # Omega operator
    neta = len(frequency)
    omega = -np.inf * np.ones((neta, tcol))
    ups = np.abs(tfr) > ths
    omega[ups] = np.round(N / hlength * np.imag(tfr2[ups] / tfr[ups] / (2 * np.pi)))

    # Mapped out of range
    index = np.tile(np.arange(1, neta + 1).reshape(-1, 1), (1, tcol))
    omega = index - omega
    id = (omega < 1) | (omega > neta) | ~ups
    omega[id] = index[id]
    sst = tfr
    sst[id] = 0

    # Reassignment
    id = omega + neta * np.arange(tcol)
    id_flat = np.transpose(id.ravel())
    sst_flat = np.transpose(sst.ravel())

    # Need real part and imaginary part also
    temp_real = np.bincount(id_flat.astype(int), np.real(sst_flat), minlength=tcol * neta)
    temp_imag = np.bincount(id_flat.astype(int), np.imag(sst_flat), minlength=tcol * neta)

    # Reshape the result to match the desired shape [neta, tcol]
    sst = np.zeros((neta,tcol),dtype=np.complex_)


    if temp_real.size == tcol*neta:
        sst = sst + np.transpose(np.reshape(temp_real[:], (tcol, neta)))
        sst = sst + np.transpose(np.reshape(temp_imag[:], (tcol, neta)))*(0 + 1j)
    else:
        sst = sst + np.transpose(np.reshape(temp_real[:-1], (tcol, neta)))
        sst = sst + np.transpose(np.reshape(temp_imag[:-1], (tcol, neta)))*(0 + 1j)

    # Crop
    # tfr[frequency < lf, :] = 0
    # sst[frequency < lf, :] = 0
    frequency = frequency[frequency >= lf]

    return sst, tfr, frequency


def curve_ext_m(P, lambda_):
    eps = 1e-8
    E = P / np.sum(P)
    E = -np.log(E + eps)
    m, n = E.shape
    FVal = np.full((m, n), np.inf)
    FVal[0, :] = E[0, :]
    c = np.zeros(m, dtype=int)
    for ii in range(1, m):  # time
        for jj in range(n):  # freq
            # Calculate the penalty term
            for kk in range(n):
                FVal[ii, jj] = min(FVal[ii, jj], FVal[ii - 1, kk] + lambda_ * (kk - jj) ** 2)

            # Add the SST value at time ii and freq jj
            FVal[ii, jj] += E[ii, jj]
    c[-1] = np.argmin(FVal[-1, :])
    for ii in range(m - 2, -1, -1):
        val = FVal[ii + 1, c[ii + 1]] - E[ii + 1, c[ii + 1]]
        for kk in range(n):
            if abs(val - FVal[ii, kk] - lambda_ * (kk - c[ii + 1]) ** 2) < eps:
                c[ii] = kk
                break
        if c[ii] == 0:
            c[ii] = n // 2
    return c, FVal


def recon_sqSTFT_v2(tfrsq, tfrsqtic, Hz, c, Band, coeff):
    alpha = tfrsqtic[1] - tfrsqtic[0]
    RR = round(Band / (Hz * alpha))
    recon = []
    C = 2 * alpha / coeff
    for kk in range(len(c)):
        idx = range(max(0, c[kk] - RR), min(len(tfrsqtic), c[kk] + RR))
        recon.append(C * np.sum(tfrsq[idx, kk]))
    return np.array(recon)


def do_stuff(sig, std_bool, smooth_bool, hlength, hf, win, n):
    trend = None
    seas = None
    # SST Parameters
    hop = 1
    lf = 0.005
    sigma = 0.15
    price = np.array(sig)

    PAD = 1

    if std_bool:
        sig = sig.pct_change().rolling(win).std() * (252**0.5)
        sig = sig.to_numpy()
        sig = sig[~np.isnan(sig)]
        sig = sig.reshape((-1,1))
    else:
        sig = sig.to_numpy()
        sig = sig.reshape((-1,1))

    if sig.size % 2 ==0:
        sig = sig[1:]

    # Signal preprocessing
    b_hp, a_hp = butter(6, 0.5 / 50, 'high')
    b_lp, a_lp = butter(6, 20 / 50, 'low')
    sig = np.transpose(sig)
    #sig = filtfilt(b_hp, a_hp, sig)
    #sig = filtfilt(b_lp, a_lp, sig)
   

    # Apply median filter
    # trend = medfilt(sig, 1)
    trend = np.ravel(sig)

    # Apply LOESS smoothing
    # The fraction parameter in lowess is analogous to the window length in MATLAB's smooth
    trend_loess = lowess(trend, np.arange(len(trend)), frac=0.1, return_sorted=False)

    # Detrend the signal
    sig = sig - trend_loess

    # Smooth check
    if smooth_bool:
        sig = savgol_filter(sig, 21, 11) # May need to change parameters (signal, window filter odd, polynomial order for regression)

    # temp = sig
    if PAD == 1:
        # Do SST and Curve Extraction
        temp = sig
        start_sig = sig[:,0:round(hlength/2)]
        end_sig = sig[:,-round(hlength/2):]
        sig = np.concatenate((start_sig[::-1], sig, end_sig[::-1]), axis = 1)
        [sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)
        sst = sst[:, round(hlength/2):-round(hlength/2)]

        [c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)

        # Make sure window length is odd
        hlength = hlength + 1 - hlength % 2
        Lh = (hlength - 1) // 2

        # Gaussian window and its derivative
        ex = np.linspace(-0.5, 0.5, hlength)
        h = np.exp(-ex**2 / (2 * sigma**2))

        x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
        x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
        x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
        
        recon = x1+x2+x3
        temp = temp.flatten()
        arr1 = temp - np.transpose(np.real(recon))

        trend = trend_loess[-1]
        seas = np.transpose(np.real(recon)).flatten()
        seas = seas[-1]
        trend_seas = trend_loess + np.transpose(np.real(recon))
    else:
        # Do SST and Curve Extraction

        [sst, tfr, frequency] = SST_J(sig, 1, hlength, hop, n, hf, lf, ths=1)

        [c, Fval] = curve_ext_m(np.transpose(abs(sst)), 4)

        # Make sure window length is odd
        hlength = hlength + 1 - hlength % 2
        Lh = (hlength - 1) // 2

        # Gaussian window and its derivative
        ex = np.linspace(-0.5, 0.5, hlength)
        h = np.exp(-ex**2 / (2 * sigma**2))

        x1 = recon_sqSTFT_v2(sst, frequency, 1, c, 0.1, h[(hlength+1)//2])
        x2 = recon_sqSTFT_v2(sst, frequency, 1, 2*c, 0.1, h[(hlength+1)//2])
        x3 = recon_sqSTFT_v2(sst, frequency, 1, 3*c, 0.1, h[(hlength+1)//2])
        
        recon = x1+x2+x3
        sig = sig.flatten()
        arr1 = sig - np.transpose(np.real(recon))
        #arr1 = abs(sig - np.transpose(np.real(recon)))

        trend = trend_loess[-1]
        seas = np.transpose(np.real(recon)).flatten()
        seas = seas[-1]
        trend_seas = trend_loess + np.transpose(np.real(recon))

    return np.quantile(arr1[-20:], 0.5), np.quantile(arr1, 0.2), np.quantile(arr1, 0.3), np.quantile(arr1, 0.4), np.quantile(arr1, 0.5), np.quantile(arr1, 0.6), np.quantile(arr1, 0.7), np.quantile(arr1, 0.8), trend, seas, arr1[-1]

class BasicTemplateAlgorithm(QCAlgorithm):
 
    def Initialize(self):
        # Parameters to change
        self.SST_resolution = 'Hour'
        self.SST_lookback = 160
        self.SST_window = 64+1
        self.SST_rolling_window = 60 #NOT BEING USED
        self.n = 257

        

        #1. Backtesting Period Selection
        # self.TrainingPeriod = "OOSA"
 
        #2. Backtesting Period Lookup
        # self.SetStartDate(2020, 1, 1)
        # self.SetEndDate(2024, 1, 1)
        self.SetStartDate(2020, 1, 1)
        self.SetEndDate(2024, 4, 1)

        # if self.TrainingPeriod == "test":
        #     self.SetStartDate(2021, 11, 1)
        #     self.SetEndDate(2021, 11, 19)
        # if self.TrainingPeriod == "IS":
        #     self.SetStartDate(2017, 1, 1)
        #     self.SetEndDate(2021, 1, 1)
        #     self.start_date = datetime(2018, 1, 1, 0, 0)
        #     self.end_date = datetime(2023, 7, 1, 0, 0)
        # if self.TrainingPeriod == "OOSA":
        #     self.SetStartDate(2022, 1, 1)
        #     self.SetEndDate(2022, 11, 1)
        # if self.TrainingPeriod == "OOSB":
        #     self.SetStartDate(2016, 1, 1)
        #     self.SetEndDate(2017, 1, 1)
        # if self.TrainingPeriod == "OOSC":
        #     self.SetStartDate(2010, 1, 1)
        #     self.SetEndDate(2011, 1, 1)
        # if self.TrainingPeriod == "LT":
        #     self.SetStartDate(2022, 11, 9)
        #     self.SetEndDate(2022, 12, 9)
        # if self.TrainingPeriod == "ST":
        #     self.SetStartDate(2020, 3, 1)
        #     self.SetEndDate(2020, 3, 31)
        # if self.TrainingPeriod == "BS":
        #     self.SetStartDate(2019, 1, 1)
        #     self.SetEndDate(2023, 12, 31)
 
        #3. Set Max Drawdown
        self.AddRiskManagement(MaximumDrawdownPercentPortfolio(0.15, True))

        #4. Set Initial Cash and Warmup Data
        self.SetCash(10000000)
        self.SetWarmup(300)

        #5. Assign Symbols for SST
        self.sst_symbols =[]
        # if self.SST_resolution == 'Hour':
        #     self.AddEquity('SPY', Resolution.Hour)
        # elif self.SST_resolution == 'Minute':
        #     self.AddEquity('SPY', Resolution.Minute)
        # elif self.SST_resolution == 'Second':
        #     self.AddEquity('SPY', Resolution.Second)
        # self.sst_symbols.append(self.Symbol('SPY'))
        if self.SST_resolution == 'Hour':
            self.AddEquity('VXX', Resolution.Hour)
        elif self.SST_resolution == 'Minute':
            self.AddEquity('VXX', Resolution.Minute)
        elif self.SST_resolution == 'Second':
            self.AddEquity('VXX', Resolution.Second)
        self.sst_symbols.append(self.Symbol('VXX'))
        self.sym = self.sst_symbols[0]
        self.lookback = self.SST_lookback
        # self.lookback = 160*8
        # security = self.AddEquity("VXX", Resolution.Hour)
        # self.temp_security = self.Symbol("VXX")
        # self.rsi = self.RSI('VXX', 120, Resolution.Minute)
        self.invested_bool = 0
 
        # #6. Assign Symbols for HMM
        # tickers = ["SPY"]
        # self.symbols = []
        # for ticker in tickers:
        #     self.symbols.append(self.AddEquity(ticker, Resolution.Hour).Symbol)

        
        # If V.X.X. then
        self.s = "SHORT "
        self.l = "LONG "

        # # If S.V.X.Y. then
        # self.s = "LONG "
        # self.l = "SHORT "
  
    def OnData(self, slice: Slice):
        #1. Return if warming up
        if self.IsWarmingUp:
            return

        #2. Check trading and liquidation criterion every day at noon
        # if int(self.Time.hour) == 12:
        if True:
            self.df = self.History(self.sst_symbols, self.lookback)
            self.dg = self.df["close"].unstack(level=0)
            short_avg, quantile_20, quantile_30, quantile_40, quantile_50, quantile_60, quantile_70, quantile_80, trend, seas, noise  = do_stuff(self.dg, False, True, self.SST_window, 0.25, self.SST_rolling_window, self.n)

            self.trend = trend
            self.seas = seas
            self.noise = noise

            self.Plot('Quantiles', '20', quantile_20)
            self.Plot('Quantiles', '40', quantile_40)
            self.Plot('Quantiles', '50', quantile_50)
            self.Plot('Quantiles', '60', quantile_60)
            self.Plot('Quantiles', '80', quantile_80)
            self.Plot('Quantiles', 'Short Median', short_avg)

            
            # self.Debug(f"Type of trend: {type(trend)}, Value: {trend}")
            # self.Debug(f"Type of seas: {type(seas)}, Value: {seas}")
            # self.Debug(f"Type of noise: {type(noise)}, Value: {noise}")




            # # # Example check and plot a single value
            # # if isinstance(self.trend, np.ndarray) and self.trend.size > 0:
            # self.Plot('Trend', 'Trend', self.trend)  # Ensure it's a float and plot the last value


            # # Similarly for 'seas' and 'noise'
            # # if isinstance(self.seas, np.ndarray) and self.seas.size > 0:
            # self.Plot('Seasonality', 'Seasonality', self.seas)


            # # if isinstance(self.noise, np.ndarray) and self.noise.size > 0:
            # self.Plot('Noise', 'Noise', self.noise)

            self.Plot('Trend', 'Trend', self.trend)
            self.Plot('Seasonality', 'Seasonality', self.seas)
            self.Plot('Noise', 'Noise', self.noise)

            self.Plot('Everything', 'Trend+Seasonality', self.trend+self.seas)
            self.Plot('Everything', 'Noise+Seasonality', self.noise+self.seas)
            self.Plot('Everything', 'VXX', self.Securities["VXX"].Close)
            # self.Plot('Everything', 'Trend+Seasonality+Noise', self.trend+self.seas+self.noise)
            # self.Plot('Everything', 'VXX', self.dg[-1])

            


            if not self.invested_bool:
                if short_avg < quantile_30:
                    self.SetHoldings("VXX", 0.3)
                    self.invested_bool = 1
                    self.temp = 0
                    self.priceVXX = self.Portfolio['VXX'].Price
                    self.quant = quantile_40
                    self.Debug(self.l + 'VXX at ' + str(self.Time) + " (SST: short avg low REVERSE TREND)")
                elif short_avg > quantile_70:
                    self.SetHoldings("VXX", -0.3)
                    self.invested_bool = 1
                    self.temp = 2
                    self.priceVXX = self.Portfolio['VXX'].Price
                    self.quant = quantile_60
                    self.Debug(self.s + 'VXX at ' + str(self.Time) + " (SST: short avg high REVERSE TREND)")   
            else:
                if self.temp == 0 and short_avg > self.quant:
                    self.Liquidate()
                    self.invested_bool = 0
                    self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
                elif self.temp == 2 and short_avg < self.quant:
                    self.Liquidate()
                    self.invested_bool = 0
                    self.Debug('Liquidate ' + 'VXX at ' + str(self.Time))
        #2. Check stop loss criterion every hour of every day
        # if self.invested_bool:
        #     if (self.temp == 0 or self.temp == 3) and (self.Portfolio['VXX'].Price < self.priceVXX*0.9):
        #         self.Liquidate()
        #         self.invested_bool = 0
        #         self.Debug ('Stop Loss ' + self.l+ 'at ' + str(self.Time))
        #     elif (self.temp == 1 or self.temp == 2) and (self.Portfolio['VXX'].Price > self.priceVXX*1.1):
        #         self.Liquidate()
        #         self.invested_bool = 0
        #         self.Debug ('Stop Loss ' + self.s+ 'at ' + str(self.Time))