Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
3.037
Tracking Error
0.128
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
from scipy.signal import argrelextrema
import numpy as np
import pandas as pd

def find_extremes(df, order):
    o,h,l,c = [df[x].values for x in ['open','high','low','close']]
    local_min_idx = np.array(argrelextrema(l, np.less, order = order)[0])
    local_max_idx = np.array(argrelextrema(h, np.greater, order = order)[0]) 
    
    local_min, local_max = [], []
    [local_min.append(l[x]) for x in local_min_idx];
    [local_max.append(l[x]) for x in local_max_idx];
    
    return {
        'max': (local_max, local_max_idx), 
        'min': (local_min, local_min_idx)
    }

def get_boundaries(local_idx, local, n, degrees): 
    bounds = []
    for i in range(0, len(local_idx) - n): 
        coefs = np.polyfit(local_idx[i:i+n], local[i:i+n], degrees, True)
        bounds.append([local_idx[i+n], *coefs])
        
    bounds = pd.DataFrame(bounds).rename({0: 'index'}, axis=1).set_index('index')
    #bounds  = pd.DataFrame(bounds).set_index(0)
    cols = list(range(0, degrees+1))
    cols.reverse()
    bounds.columns = cols
    return bounds

def find_c(df, order, lookback, degrees): 
    extremas = find_extremes(df, order)
    c_max, c_min  = extremas['max'], extremas['min']
    bound_max  = get_boundaries(c_max[1], c_max[0], lookback, degrees)
    bound_min  = get_boundaries(c_min[1], c_min[0], lookback, degrees)
    for deg in bound_max.columns: 
        bound_max[deg] = bound_max.index ** deg * bound_max[deg]
        bound_min[deg] = bound_min.index ** deg * bound_min[deg]
    
    bound_max = bound_max.sum(axis=1)
    bound_max.rename('ip_max',inplace=True)
    bound_min = bound_min.sum(axis=1)
    bound_min.rename('ip_min',inplace=True)
    return {
        'max': bound_max, 
        'min': bound_min
    }

def assest_performance(ip): 
    ip['ret'] = np.log(ip.close/ip.close.shift(1))
    ip['under'] = (ip.close < ip.ip_min).shift(1)
    ip['over'] = (ip.close > ip.ip_max).shift(1)
    ip['neutral'] = (ip.close > ip.ip_min) & (ip.close < ip.ip_max)
    ip['buys'] = ip.neutral & ip.under
    ip['sells'] = ip.neutral & ip.over
    
    buys_idx = ip[ip.buys].index
    sells_idx = ip[ip.sells].index

    idx_pairs = []
    for idx in buys_idx: 
        try: 
            sell = sells_idx[sells_idx > idx][0]
            idx_pairs.append((idx, sell))
        except: 
            continue
            
    rets = []
    for idx in idx_pairs: 
        start,end = idx
        df = ip.ret.iloc[start:end]
        rets.append(df.sum())
    rets = np.array(rets)
    try: 
        ret = np.sum(rets)
        winrate = len(rets[rets>0])/len(rets)
        score = ret
        n_trades  = len(idx_pairs)
        fees = n_trades * .004
        score -= fees
    except:
        return (0, 0, 0, 0)
    
    return score, ret, winrate, n_trades
 
def get_params(df): 
    fib = [2,3,5,8,11,13,15,20,30,50,70]
    best_score  = -1
    best_pair = (-1 ,-1)
    for order in range(len(fib) - 1): 
        for lookback in fib[order+1:]: 
            try: 
                minmax = find_c(df, fib[order], lookback, 1)
                ma, mi = minmax['max'], minmax['min']
                ip = pd.DataFrame(df.reset_index().close).join(ma).join(mi).ffill().dropna()
                score, ret, winrate, n_trades = assest_performance(ip)
                pair = (fib[order], lookback)
                if score > best_score: 
                    best_score = score
                    best_pair = pair
                print(f' order: {fib[order]}, lookback: {lookback}')
                print(f' score: {score}, return: {ret}, winrate: {winrate}, n_trades: {lookback} \n')
            except: 
                continue
    return best_pair, best_score
from signals import *
from funcs import * 
from sklearn.preprocessing import StandardScaler, MaxAbsScaler

class volatility(QCAlgorithm):
    
    def Initialize(self):
        self.SetStartDate(2021, 9, 1)   
        self.SetCash(10000)           
        self.symbolData = {}
        self.lookback, self.period = 500, 1
        self.resolution = Resolution.Minute
        
        assets_ = ['SPY']
        vols_    = ['VIXY']
        
        self.asset_symb = []
        tickers = assets_ + vols_ 
        
        for ticker in tickers :
            equity = self.AddEquity(ticker, self.resolution)
            equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
            symbol = equity.Symbol
            
            if ticker in assets_: 
                self.asset_symb.append(symbol)
            if ticker in vols_: 
                option = self.AddOption(ticker, self.resolution)
                option.SetFilter(-1, 1, timedelta(1), timedelta(20))
            
            self.Securities[symbol].FeeModel = ConstantFeeModel(0)
            self.symbolData[symbol] = symbolData(self, symbol)
            
            consolidator = TradeBarConsolidator(int(self.period))
            consolidator.DataConsolidated += self.OnDataConsolidated
            self.SubscriptionManager.AddConsolidator(symbol, consolidator)
            
            
    def OnDataConsolidated(self, sender, bar):
        symbol = bar.Symbol
        time   = bar.Time
        ohlcv   = [bar.Open,bar.High, bar.Low, bar.Close, bar.Volume]
        self.symbolData[symbol].update_df(time, ohlcv)
        pass
    
    def OnData(self, data): 
        #option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type == SecurityType.Option]
        
        #for option in option_invested: 
        #    if self.Time + timedelta(1) > option.ID.Date: 
        #        self.Liquidate(option)
        
        if not self.Portfolio.Invested:
            d_vol,negpos  = dvol(self.symbolData[self.asset_symb[0]].df, 20)
            #scaler = MaxAbsScaler()
            #s_vol  = scaler.fit_transform(d_vol)
            #self.Debug(s_vol.shape)
            s_vol  = d_vol.values[-1]
            if abs(s_vol) > 1: 
                for i in data.OptionChains: 
                    chains = i.Value
                    buyOption(self, chains, s_vol)
        
        pass
    
    def buyOption(self, chains, pos): 
        expiry = sorted(chains, key = lambda x: x.Expiry, reverse = True)[0].Expiry
        opt_type = OptionRight.Call if pos < 0 else OptionRight.Put
        if pos < 0: 
            contracts  = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
        if pos > 0: 
            contracts  = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Put]
            
        contracts = sorted(contracts, key = lambda x: abs(x.Strike - x.UnderlyingLastPrice))
        if len(contracts) == 0: 
            return
        self.contract = contracts[0]
        
        quantity = self.Portfolio.TotalPortfolioValue / self.contract.AskPrice
        quantity = int(.05 * quantity / 100)
        self.Buy(self.contract, quantity)
            
            
            
class symbolData: 
    def __init__(self, algorithm, symbol): 
        self.symbol   = symbol
        self.df     = algorithm.History(
            [symbol],
            algorithm.lookback * algorithm.period,
            algorithm.resolution,
            )[['open','high','low','close','volume']]
    
    def update_df(self, time, ohlcv): 
        ohlcv = pd.DataFrame(
            [ohlcv], 
            columns = ['open','high','low','close','volume'], 
            index = [time]
            )
        self.df = self.df.append(ohlcv).iloc[1:]
        
        
        
        
        
def ret(ohlc):
    ret = np.log(ohlc.close/ohlc.close.shift(1))
    return ret.rename('log_ret')

def dvol(ohlc, period = 5): 
    ret = np.log(ohlc.close/ohlc.close.shift(1))
    posvol,negvol = [
        ret.apply(lambda x: x if x > 0 else 0).rolling(period).std().diff(),
        ret.apply(lambda x: x if x < 0 else 0).rolling(period).std().diff()
    ]
    return (posvol-negvol).rename(f'dvol_{period}'), (posvol, negvol)
    
def obv(df): 
    obv = ta.OBV(df.close/df.volume)
    obv /= max(abs(obv))
    return obv.rename('obv')
    
def bias(ohlc, fast, slow): 
    
    close = ohlc.close.values
    last  = close[-1]

    ma_fast = close[-fast:].mean()
    ma_slow = close[-slow:].mean()
    
    s1 = 1 if ma_fast > ma_slow else -1
    s2 = 1 if last > ma_fast else 0
    s3 = 0 if last > ma_slow else -1
    s  = s1 + s2 + s3    
    return s