| Overall Statistics |
|
Total Trades 1 Average Win 0% Average Loss 0% Compounding Annual Return 76.769% Drawdown 31.500% Expectancy 0 Net Profit 58.711% Sharpe Ratio 1.703 Probabilistic Sharpe Ratio 60.393% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.934 Beta -0.219 Annual Standard Deviation 0.449 Annual Variance 0.201 Information Ratio -0.02 Tracking Error 0.7 Treynor Ratio -3.491 Total Fees $1.00 |
import time
from functools import wraps
def time_method(func):
@wraps(func)
def timed(*args, **kw):
time_thresh = 1 # Function time taken printed if greater than this number
ts = time.time()
result = func(*args, **kw)
te = time.time()
if te - ts > time_thresh:
algo = args[0]
algo.Debug("%r took %2.2f seconds to run." % (func.__name__, te - ts))
return result
return timedimport pandas as pd
import numpy as np
from sadf import get_sadf
from datetime import datetime, timedelta
from collections import deque
class TransdimensionalCalibratedChamber(QCAlgorithm):
def Initialize(self):
# parameters for one stock
self.symbol = Symbol.Create("AAPL", SecurityType.Equity, Market.USA)
self.cash_invested = 10000
self.confidence_threshold = 1.1
# date, equity, brokerage and bencmark
self.SetStartDate(2020, 1, 1)
# self.SetEndDate(2020, 10, 10)
self.SetCash(self.cash_invested)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetBenchmark(self.symbol)
# universe; use for fundamental data
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction, None, None))
self.UniverseSettings.Resolution = Resolution.Daily
# old way
# self.AddEquity(self.symbol, Resolution.Hour, fillDataForward=True).SetDataNormalizationMode(DataNormalizationMode.Adjusted)
# warmp up period
self.lookback = 150
self.SetWarmUp(self.lookback)
# # frequency between minute and 30 min
# thirtyMinuteConsolidator = TradeBarConsolidator(timedelta(minutes=30))
# self.SubscriptionManager.AddConsolidator("SPY", thirtyMinuteConsolidator)
# self.RegisterIndicator("SPY", self.sadf, thirtyMinuteConsolidator)
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
# if there are no bars data (only stok splits, dividends etc) than cont
if self.symbol not in data.Bars:
return
# continue if warm up has finished
if self.IsWarmingUp:
return
# Alpha
if not self.Portfolio[self.symbol].Invested and self.sadf.Value < self.confidence_threshold:
# buy max quantity by buying power sted mannually
size = int(self.cash_invested / data[self.symbol].Close)
buy_order = self.MarketOrder(self.symbol, size)
elif self.Portfolio[self.symbol].IsLong and self.sadf.Value > self.confidence_threshold:
# = self.close.pct_change(periods=1)
if self.sadf.CurrentReturn < 0 and self.sadf.PreviousReturn < 0:
# buy max quantity by buying power
self.cash_invested = self.cash_invested + self.Portfolio[self.symbol].UnrealizedProfit
self.Liquidate(self.symbol)
def CoarseSelectionFunction(self, coarse):
return [self.symbol]
def FineSelectionFunction(self, fine):
return [self.symbol]
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
self.sadf = SadfIndicator('sadf', self.lookback, self, security.Symbol)
self.RegisterIndicator(self.symbol, self.sadf, Resolution.Daily)
self.Log(self.sadf.ValuePe)
class SadfIndicator(PythonIndicator):
def __init__(self, name, period, algorithm, symbol):
self.Name = name
self.period = period
self.Time = datetime.min
self.Value = 0
self.ValuePe = 0
self.queue = deque(maxlen=period)
self.queueTime = deque(maxlen=period)
self.queuePe = deque(maxlen=period)
self.CurrentReturn = 0
self.algorithm = algorithm
self.security = algorithm.Securities[symbol]
def sadf_last(self, value):
sadf_linear = get_sadf(
value,
min_length=50,
add_const=True,
model='linear',
# phi=0.5,
lags=4)
if len(sadf_linear) > 0:
last_value = sadf_linear.values[-1].item()
else:
last_value = 0
return last_value
def Update(self, input):
self.algorithm.Log(f'Input type: {type(input)}')
pe_ratio = self.security.Fundamentals.ValuationRatios.NormalizedPERatio
self.algorithm.Plot('Normalized PE', 'Ratio', pe_ratio)
self.queue.appendleft(input.Price)
self.queueTime.appendleft(input.EndTime)
self.queuePe.appendleft(pe_ratio)
self.Time = input.EndTime
if len(self.queue) > self.period:
close_ = pd.Series(self.queue, index=self.queueTime).rename('close').sort_index()
pe_ = pd.Series(self.queuePe, index=self.queueTime).rename('pe').sort_index()
self.CurrentReturn = close_.pct_change(periods=1)[-1]
self.PreviousReturn = close_.pct_change(periods=1)[-2]
self.Value = self.sadf_last(close=close_)
self.ValuePe = self.sadf_last(close=close_)
count = len(self.queue)
# self.IsReady = count == self.queue.maxlen
return count == self.queue.maxlen
## AKO CU TREBATI KASNIJE HISTORYI DATA
# df = self.History(self.tickers, 10, Resolution.Daily).close
# prices = pd.concat([df.loc[x] for x in self.tickers], axis = 1)
# prices.columns = self.tickers# Copyright 2019, Hudson and Thames Quantitative Research
# All rights reserved
# Read more: https://github.com/hudson-and-thames/mlfinlab/blob/master/LICENSE.txt
"""
Explosiveness tests: SADF
"""
from typing import Union, Tuple
import pandas as pd
import numpy as np
# pylint: disable=invalid-name
def _get_sadf_at_t(X: pd.DataFrame, y: pd.DataFrame, min_length: int, model: str, phi: float) -> float:
"""
Advances in Financial Machine Learning, Snippet 17.2, page 258.
SADF's Inner Loop (get SADF value at t)
:param X: (pd.DataFrame) Lagged values, constants, trend coefficients
:param y: (pd.DataFrame) Y values (either y or y.diff())
:param min_length: (int) Minimum number of samples needed for estimation
:param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
:param phi: (float) Coefficient to penalize large sample lengths when computing SMT, in [0, 1]
:return: (float) SADF statistics for y.index[-1]
"""
start_points, bsadf = range(0, y.shape[0] - min_length + 1), -np.inf
for start in start_points:
y_, X_ = y[start:], X[start:]
b_mean_, b_std_ = get_betas(X_, y_)
if not np.isnan(b_mean_[0]):
b_mean_, b_std_ = b_mean_[0, 0], b_std_[0, 0] ** 0.5
# TODO: Rewrite logic of this module to avoid division by zero
with np.errstate(invalid='ignore'):
all_adf = b_mean_ / b_std_
if model[:2] == 'sm':
all_adf = np.abs(all_adf) / (y.shape[0]**phi)
if all_adf > bsadf:
bsadf = all_adf
return bsadf
def _get_y_x(series: pd.Series, model: str, lags: Union[int, list],
add_const: bool) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Advances in Financial Machine Learning, Snippet 17.2, page 258-259.
Preparing The Datasets
:param series: (pd.Series) Series to prepare for test statistics generation (for example log prices)
:param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
:param lags: (int or list) Either number of lags to use or array of specified lags
:param add_const: (bool) Flag to add constant
:return: (pd.DataFrame, pd.DataFrame) Prepared y and X for SADF generation
"""
series = pd.DataFrame(series)
series_diff = series.diff().dropna()
x = _lag_df(series_diff, lags).dropna()
x['y_lagged'] = series.shift(1).loc[x.index] # add y_(t-1) column
y = series_diff.loc[x.index]
if add_const is True:
x['const'] = 1
if model == 'linear':
x['trend'] = np.arange(x.shape[0]) # Add t to the model (0, 1, 2, 3, 4, 5, .... t)
beta_column = 'y_lagged' # Column which is used to estimate test beta statistics
elif model == 'quadratic':
x['trend'] = np.arange(x.shape[0]) # Add t to the model (0, 1, 2, 3, 4, 5, .... t)
x['quad_trend'] = np.arange(x.shape[0]) ** 2 # Add t^2 to the model (0, 1, 4, 9, ....)
beta_column = 'y_lagged' # Column which is used to estimate test beta statistics
elif model == 'sm_poly_1':
y = series.loc[y.index]
x = pd.DataFrame(index=y.index)
x['const'] = 1
x['trend'] = np.arange(x.shape[0])
x['quad_trend'] = np.arange(x.shape[0]) ** 2
beta_column = 'quad_trend'
elif model == 'sm_poly_2':
y = np.log(series.loc[y.index])
x = pd.DataFrame(index=y.index)
x['const'] = 1
x['trend'] = np.arange(x.shape[0])
x['quad_trend'] = np.arange(x.shape[0]) ** 2
beta_column = 'quad_trend'
elif model == 'sm_exp':
y = np.log(series.loc[y.index])
x = pd.DataFrame(index=y.index)
x['const'] = 1
x['trend'] = np.arange(x.shape[0])
beta_column = 'trend'
elif model == 'sm_power':
y = np.log(series.loc[y.index])
x = pd.DataFrame(index=y.index)
x['const'] = 1
# TODO: Rewrite logic of this module to avoid division by zero
with np.errstate(divide='ignore'):
x['log_trend'] = np.log(np.arange(x.shape[0]))
beta_column = 'log_trend'
else:
raise ValueError('Unknown model')
# Move y_lagged column to the front for further extraction
columns = list(x.columns)
columns.insert(0, columns.pop(columns.index(beta_column)))
x = x[columns]
return x, y
def _lag_df(df: pd.DataFrame, lags: Union[int, list]) -> pd.DataFrame:
"""
Advances in Financial Machine Learning, Snipet 17.3, page 259.
Apply Lags to DataFrame
:param df: (int or list) Either number of lags to use or array of specified lags
:param lags: (int or list) Lag(s) to use
:return: (pd.DataFrame) Dataframe with lags
"""
df_lagged = pd.DataFrame()
if isinstance(lags, int):
lags = range(1, lags + 1)
else:
lags = [int(lag) for lag in lags]
for lag in lags:
temp_df = df.shift(lag).copy(deep=True)
temp_df.columns = [str(i) + '_' + str(lag) for i in temp_df.columns]
df_lagged = df_lagged.join(temp_df, how='outer')
return df_lagged
def get_betas(X: pd.DataFrame, y: pd.DataFrame) -> Tuple[np.array, np.array]:
"""
Advances in Financial Machine Learning, Snippet 17.4, page 259.
Fitting The ADF Specification (get beta estimate and estimate variance)
:param X: (pd.DataFrame) Features(factors)
:param y: (pd.DataFrame) Outcomes
:return: (np.array, np.array) Betas and variances of estimates
"""
xy = np.dot(X.T, y)
xx = np.dot(X.T, X)
try:
xx_inv = np.linalg.inv(xx)
except np.linalg.LinAlgError:
return [np.nan], [[np.nan, np.nan]]
b_mean = np.dot(xx_inv, xy)
err = y - np.dot(X, b_mean)
b_var = np.dot(err.T, err) / (X.shape[0] - X.shape[1]) * xx_inv
return b_mean, b_var
def _sadf_outer_loop(X: pd.DataFrame, y: pd.DataFrame, min_length: int, model: str, phi: float,
molecule: list) -> pd.Series:
"""
This function gets SADF for t times from molecule
:param X: (pd.DataFrame) Features(factors)
:param y: (pd.DataFrame) Outcomes
:param min_length: (int) Minimum number of observations
:param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
:param phi: (float) Coefficient to penalize large sample lengths when computing SMT, in [0, 1]
:param molecule: (list) Indices to get SADF
:return: (pd.Series) SADF statistics
"""
sadf_series = pd.Series(index=molecule, dtype='float64')
for index in molecule:
X_subset = X.loc[:index].values
y_subset = y.loc[:index].values.reshape(-1, 1)
value = _get_sadf_at_t(X_subset, y_subset, min_length, model, phi)
sadf_series[index] = value
return sadf_series
def get_sadf(series: pd.Series, model: str, lags: Union[int, list], min_length: int, add_const: bool = False,
phi: float = 0, num_threads: int = 8, verbose: bool = True) -> pd.Series:
"""
Advances in Financial Machine Learning, p. 258-259.
Multithread implementation of SADF
SADF fits the ADF regression at each end point t with backwards expanding start points. For the estimation
of SADF(t), the right side of the window is fixed at t. SADF recursively expands the beginning of the sample
up to t - min_length, and returns the sup of this set.
When doing with sub- or super-martingale test, the variance of beta of a weak long-run bubble may be smaller than
one of a strong short-run bubble, hence biasing the method towards long-run bubbles. To correct for this bias,
ADF statistic in samples with large lengths can be penalized with the coefficient phi in [0, 1] such that:
ADF_penalized = ADF / (sample_length ^ phi)
:param series: (pd.Series) Series for which SADF statistics are generated
:param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
:param lags: (int or list) Either number of lags to use or array of specified lags
:param min_length: (int) Minimum number of observations needed for estimation
:param add_const: (bool) Flag to add constant
:param phi: (float) Coefficient to penalize large sample lengths when computing SMT, in [0, 1]
:param num_threads: (int) Number of cores to use
:param verbose: (bool) Flag to report progress on asynch jobs
:return: (pd.Series) SADF statistics
"""
X, y = _get_y_x(series, model, lags, add_const)
molecule = y.index[min_length:y.shape[0]]
sadf_series = _sadf_outer_loop(X=X, y=y, min_length=min_length, model=model, phi=phi,
molecule=molecule)
return sadf_series# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
class EmaCrossAlphaModel(AlphaModel):
'''Alpha model that uses an EMA cross to create insights'''
def __init__(self,
fastPeriod = 12,
slowPeriod = 26,
resolution = Resolution.Daily):
'''Initializes a new instance of the EmaCrossAlphaModel class
Args:
fastPeriod: The fast EMA period
slowPeriod: The slow EMA period'''
self.fastPeriod = fastPeriod
self.slowPeriod = slowPeriod
self.resolution = resolution
self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(resolution), fastPeriod)
self.symbolDataBySymbol = {}
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)
def Update(self, algorithm, data):
'''Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for symbol, symbolData in self.symbolDataBySymbol.items():
if symbolData.Fast.IsReady and symbolData.Slow.IsReady:
if symbolData.FastIsOverSlow:
if symbolData.Slow > symbolData.Fast:
insights.append(Insight.Price(symbolData.Symbol, self.predictionInterval, InsightDirection.Down))
elif symbolData.SlowIsOverFast:
if symbolData.Fast > symbolData.Slow:
insights.append(Insight.Price(symbolData.Symbol, self.predictionInterval, InsightDirection.Up))
symbolData.FastIsOverSlow = symbolData.Fast > symbolData.Slow
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.AddedSecurities:
symbolData = self.symbolDataBySymbol.get(added.Symbol)
if symbolData is None:
# create fast/slow EMAs
symbolData = SymbolData(added)
symbolData.Fast = algorithm.EMA(added.Symbol, self.fastPeriod, self.resolution)
symbolData.Slow = algorithm.EMA(added.Symbol, self.slowPeriod, self.resolution)
self.symbolDataBySymbol[added.Symbol] = symbolData
else:
# a security that was already initialized was re-added, reset the indicators
symbolData.Fast.Reset()
symbolData.Slow.Reset()
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, security):
self.Security = security
self.Symbol = security.Symbol
self.Fast = None
self.Slow = None
# True if the fast is above the slow, otherwise false.
# This is used to prevent emitting the same signal repeatedly
self.FastIsOverSlow = False
@property
def SlowIsOverFast(self):
return not self.FastIsOverSlow