| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 3.037 Tracking Error 0.128 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from scipy.signal import argrelextrema
import numpy as np
import pandas as pd
def find_extremes(df, order):
o,h,l,c = [df[x].values for x in ['open','high','low','close']]
local_min_idx = np.array(argrelextrema(l, np.less, order = order)[0])
local_max_idx = np.array(argrelextrema(h, np.greater, order = order)[0])
local_min, local_max = [], []
[local_min.append(l[x]) for x in local_min_idx];
[local_max.append(l[x]) for x in local_max_idx];
return {
'max': (local_max, local_max_idx),
'min': (local_min, local_min_idx)
}
def get_boundaries(local_idx, local, n, degrees):
bounds = []
for i in range(0, len(local_idx) - n):
coefs = np.polyfit(local_idx[i:i+n], local[i:i+n], degrees, True)
bounds.append([local_idx[i+n], *coefs])
bounds = pd.DataFrame(bounds).rename({0: 'index'}, axis=1).set_index('index')
#bounds = pd.DataFrame(bounds).set_index(0)
cols = list(range(0, degrees+1))
cols.reverse()
bounds.columns = cols
return bounds
def find_c(df, order, lookback, degrees):
extremas = find_extremes(df, order)
c_max, c_min = extremas['max'], extremas['min']
bound_max = get_boundaries(c_max[1], c_max[0], lookback, degrees)
bound_min = get_boundaries(c_min[1], c_min[0], lookback, degrees)
for deg in bound_max.columns:
bound_max[deg] = bound_max.index ** deg * bound_max[deg]
bound_min[deg] = bound_min.index ** deg * bound_min[deg]
bound_max = bound_max.sum(axis=1)
bound_max.rename('ip_max',inplace=True)
bound_min = bound_min.sum(axis=1)
bound_min.rename('ip_min',inplace=True)
return {
'max': bound_max,
'min': bound_min
}
def assest_performance(ip):
ip['ret'] = np.log(ip.close/ip.close.shift(1))
ip['under'] = (ip.close < ip.ip_min).shift(1)
ip['over'] = (ip.close > ip.ip_max).shift(1)
ip['neutral'] = (ip.close > ip.ip_min) & (ip.close < ip.ip_max)
ip['buys'] = ip.neutral & ip.under
ip['sells'] = ip.neutral & ip.over
buys_idx = ip[ip.buys].index
sells_idx = ip[ip.sells].index
idx_pairs = []
for idx in buys_idx:
try:
sell = sells_idx[sells_idx > idx][0]
idx_pairs.append((idx, sell))
except:
continue
rets = []
for idx in idx_pairs:
start,end = idx
df = ip.ret.iloc[start:end]
rets.append(df.sum())
rets = np.array(rets)
try:
ret = np.sum(rets)
winrate = len(rets[rets>0])/len(rets)
score = ret
n_trades = len(idx_pairs)
fees = n_trades * .004
score -= fees
except:
return (0, 0, 0, 0)
return score, ret, winrate, n_trades
def get_params(df):
fib = [2,3,5,8,11,13,15,20,30,50,70]
best_score = -1
best_pair = (-1 ,-1)
for order in range(len(fib) - 1):
for lookback in fib[order+1:]:
try:
minmax = find_c(df, fib[order], lookback, 1)
ma, mi = minmax['max'], minmax['min']
ip = pd.DataFrame(df.reset_index().close).join(ma).join(mi).ffill().dropna()
score, ret, winrate, n_trades = assest_performance(ip)
pair = (fib[order], lookback)
if score > best_score:
best_score = score
best_pair = pair
print(f' order: {fib[order]}, lookback: {lookback}')
print(f' score: {score}, return: {ret}, winrate: {winrate}, n_trades: {lookback} \n')
except:
continue
return best_pair, best_scorefrom signals import *
from funcs import *
from sklearn.preprocessing import StandardScaler, MaxAbsScaler
class volatility(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 9, 1)
self.SetCash(10000)
self.symbolData = {}
self.lookback, self.period = 500, 1
self.resolution = Resolution.Minute
assets_ = ['SPY']
vols_ = ['VIXY']
self.asset_symb = []
tickers = assets_ + vols_
for ticker in tickers :
equity = self.AddEquity(ticker, self.resolution)
equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
symbol = equity.Symbol
if ticker in assets_:
self.asset_symb.append(symbol)
if ticker in vols_:
option = self.AddOption(ticker, self.resolution)
option.SetFilter(-1, 1, timedelta(1), timedelta(20))
self.Securities[symbol].FeeModel = ConstantFeeModel(0)
self.symbolData[symbol] = symbolData(self, symbol)
consolidator = TradeBarConsolidator(int(self.period))
consolidator.DataConsolidated += self.OnDataConsolidated
self.SubscriptionManager.AddConsolidator(symbol, consolidator)
def OnDataConsolidated(self, sender, bar):
symbol = bar.Symbol
time = bar.Time
ohlcv = [bar.Open,bar.High, bar.Low, bar.Close, bar.Volume]
self.symbolData[symbol].update_df(time, ohlcv)
pass
def OnData(self, data):
#option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type == SecurityType.Option]
#for option in option_invested:
# if self.Time + timedelta(1) > option.ID.Date:
# self.Liquidate(option)
if not self.Portfolio.Invested:
d_vol,negpos = dvol(self.symbolData[self.asset_symb[0]].df, 20)
#scaler = MaxAbsScaler()
#s_vol = scaler.fit_transform(d_vol)
#self.Debug(s_vol.shape)
s_vol = d_vol.values[-1]
if abs(s_vol) > 1:
for i in data.OptionChains:
chains = i.Value
buyOption(self, chains, s_vol)
pass
def buyOption(self, chains, pos):
expiry = sorted(chains, key = lambda x: x.Expiry, reverse = True)[0].Expiry
opt_type = OptionRight.Call if pos < 0 else OptionRight.Put
if pos < 0:
contracts = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
if pos > 0:
contracts = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Put]
contracts = sorted(contracts, key = lambda x: abs(x.Strike - x.UnderlyingLastPrice))
if len(contracts) == 0:
return
self.contract = contracts[0]
quantity = self.Portfolio.TotalPortfolioValue / self.contract.AskPrice
quantity = int(.05 * quantity / 100)
self.Buy(self.contract, quantity)
class symbolData:
def __init__(self, algorithm, symbol):
self.symbol = symbol
self.df = algorithm.History(
[symbol],
algorithm.lookback * algorithm.period,
algorithm.resolution,
)[['open','high','low','close','volume']]
def update_df(self, time, ohlcv):
ohlcv = pd.DataFrame(
[ohlcv],
columns = ['open','high','low','close','volume'],
index = [time]
)
self.df = self.df.append(ohlcv).iloc[1:]
def ret(ohlc):
ret = np.log(ohlc.close/ohlc.close.shift(1))
return ret.rename('log_ret')
def dvol(ohlc, period = 5):
ret = np.log(ohlc.close/ohlc.close.shift(1))
posvol,negvol = [
ret.apply(lambda x: x if x > 0 else 0).rolling(period).std().diff(),
ret.apply(lambda x: x if x < 0 else 0).rolling(period).std().diff()
]
return (posvol-negvol).rename(f'dvol_{period}'), (posvol, negvol)
def obv(df):
obv = ta.OBV(df.close/df.volume)
obv /= max(abs(obv))
return obv.rename('obv')
def bias(ohlc, fast, slow):
close = ohlc.close.values
last = close[-1]
ma_fast = close[-fast:].mean()
ma_slow = close[-slow:].mean()
s1 = 1 if ma_fast > ma_slow else -1
s2 = 1 if last > ma_fast else 0
s3 = 0 if last > ma_slow else -1
s = s1 + s2 + s3
return s