| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -19.14 Tracking Error 0.129 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
# MIT License # Copyright (c) [2022] [https://github.com/AdamWLabs/tsfracdiff] # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE.
# region imports
from AlgorithmImports import *
# endregion
from tsfracdiff import FractionalDifferentiator
class MuscularOrangeCaribou(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2010, 1, 1)
self.SetEndDate(2010, 1, 5)
self.SetCash(1000)
self.AddEquity("SPY", Resolution.Minute)
self.ModelStuff()
def OnData(self, data: Slice):
pass
def ModelStuff(self):
df = self.History(self.Securities.Keys, timedelta(days=3), Resolution.Minute)
# Prices
close = df.unstack(level=0)['close']
# Fractionally differentiate
fracDiff = FractionalDifferentiator()
close_stationary = fracDiff.FitTransform( close )
# Invert the transform if needed
close = fracDiff.InverseTransform( close_stationary )
# See the estimated orders
fracDiff.orders
# Model Stuff
returnfrom unit_root_tests import *
import pandas as pd
import numpy as np
class FractionalDifferentiator:
def __init__(self, maxOrderBound=1, significance=0.01, precision=0.01,
unitRootTest='PP', unitRootTestConfig={}):
"""
Provides estimation of the minimum fractional order of differentiation required for stationarity
and data transformations.
The available stationarity/unit root tests are:
-----------------------------------------------
- 'PP' : Phillips and Perron (1988) [default]
- 'ADF' : Augmented Dickey-Fuller (Said & Dickey, 1984)
Parameters:
-----------
maxOrderBound (float) Maximum real-valued order to search in (0, maxOrderBound)
significance (float) Statistical significance level
precision (float) Precision of estimated order
unitRootTest (str) Unit-root/stationarity tests: ['PP','ADF']
unitRootTestConfig (dict) Optional keyword arguments to pass to unit root tests
Attributes:
-----------
orders (list) Estimated minimum orders of differentiation
Example:
--------
```
# A pandas.DataFrame/np.array with potentially non-stationary time series
df
# Automatic stationary transformation with minimal information loss
from tsfracdiff import FractionalDifferentiator
fracDiff = FractionalDifferentiator()
df = fracDiff.fit_transform(df)
```
"""
self.maxOrderBound = maxOrderBound
self.significance = significance
self.precision = precision
# Critical value checks
checkCV = False
cv_sig = None
if (self.significance in [0.01, 0.05, 0.1]):
checkCV = True
cv_sig = str(int(self.significance * 100)) + '%'
# Unit-root/Stationarity tests
if unitRootTest == 'PP':
self.UnitRootTest = PhillipsPerron(significance=significance, checkCV=checkCV, cv_sig=cv_sig)
elif unitRootTest == 'ADF':
self.UnitRootTest = ADFuller(significance=significance, checkCV=checkCV, cv_sig=cv_sig)
else:
raise Exception('Please specify a valid unit root test.')
self.UnitRootTest.config.update( unitRootTestConfig )
# States
self.isFitted = False
self.orders = []
self.lagData = None
def Fit(self, df, parallel=True):
"""
Estimates the minimum orders of differencing required for stationarity.
Parameters:
-----------
df (pandas.DataFrame/np.array) Raw data
parallel (bool) Use multithreading if true (default). Requires `joblib`.
"""
df = pd.DataFrame(df).sort_index()
# Estimate minimum order of differencing
if parallel:
try:
import multiprocessing
from joblib import Parallel, delayed
from functools import partial
except ImportError:
raise Exception('The module `joblib` is required for parallelization.')
def ApplyParallel(df, func, **kwargs):
n_jobs = min(df.shape[1], multiprocessing.cpu_count())
res = Parallel(n_jobs=n_jobs, prefer='threads')( delayed(partial(func, **kwargs))(x) for x in np.array_split(df, df.shape[1], axis=1) )
return res
orders = ApplyParallel(df, self._MinimumOrderSearch, upperOrder=self.maxOrderBound, first_run=True)
else:
orders = []
for j in range(df.shape[1]):
orders.append( self._MinimumOrderSearch(df.iloc[:,j], upperOrder=self.maxOrderBound, first_run=True) )
self.orders = orders
# Store lagged data for inverse-transformations
numLags = [ (len(self._GetMemoryWeights(order)) - 1) for order in self.orders ]
self.lagData = [ df.iloc[:,j].head(lag) for j,lag in enumerate(numLags) ]
self.isFitted = True
return
def FitTransform(self, df, parallel=True):
"""
Estimates the minimum orders and returns a fractionally differentiated dataframe.
Parameters
----------
df (pandas.DataFrame/np.array) Raw data
parallel (bool) Use multithreading if true (default). Requires `joblib`.
"""
if not self.isFitted:
self.Fit(df, parallel=parallel)
fracDiffed = self.Transform(df)
return fracDiffed
def Transform(self, df):
"""
Applies a fractional differentiation transformation based on estimated orders.
Parameters
----------
df (pandas.DataFrame/np.array) Raw data
"""
if not self.isFitted:
raise Exception('Fit the model first.')
df = pd.DataFrame(df).sort_index()
fracDiffed = []
for j in range(df.shape[1]):
x = self._FracDiff(df.iloc[:,j], order=self.orders[j])
fracDiffed.append( x )
fracDiffed = pd.concat(fracDiffed, axis=1)
return fracDiffed
def InverseTransform(self, fracDiffed):
"""
Inverts the fractional differentiation transformation.
Note that the full dataframe exactly as returned by `.transform()` is required,
including any `NaN` padded missing values.
Parameters
----------
fracDiffed (pandas.DataFrame/np.array) Fractionally differentiated data
"""
if not self.isFitted:
raise Exception('Fit the model first.')
fracDiffed = pd.DataFrame(fracDiffed).sort_index()
X = []
for j in range(fracDiffed.shape[1]):
memoryWeights = self._GetMemoryWeights(self.orders[j])
K = len(memoryWeights)
# Initial values
lagData = self.lagData[j]
lag_idx = lagData.index
# Differenced values
X_tilde = fracDiffed.iloc[:,j].dropna()
idx = X_tilde.index
# Iteratively invert transformation
X_vals = np.ravel(self.lagData[j])
X_tilde = np.ravel(X_tilde.values)
for t in range(len(X_tilde)):
x = -np.sum( memoryWeights[:-1] * X_vals[-(K-1):] ) + X_tilde[t]
X_vals = np.append(X_vals, x)
idx = np.concatenate( (lag_idx.values, idx.values) )
X_vals = pd.Series(X_vals, index=idx)
X.append( X_vals )
X = pd.concat([ x for x in X ], axis=1)
X.columns = fracDiffed.columns
return X
def _GetMemoryWeights(self, order, memoryThreshold=1e-4):
"""
Returns an array of memory weights for each time lag.
Parameters:
-----------
order (float) Order of fracdiff
memoryThreshold (float) Minimum magnitude of weight significance
"""
memoryWeights = [1,]
k = 1
while True:
weight = -memoryWeights[-1] * ( order - k + 1 ) / k # Iteratively generate next lag weight
if abs(weight) < memoryThreshold:
break
memoryWeights.append(weight)
k += 1
return np.array(list(reversed(memoryWeights)))
def _FracDiff(self, ts, order=1, memoryWeights=None):
"""
Differentiates a time series based on a real-valued order.
Parameters:
-----------
ts (pandas.Series) Univariate time series
order (float) Order of differentiation
memoryWeights (array) Optional pre-computed weights
"""
if memoryWeights is None:
memoryWeights = self._GetMemoryWeights(order)
K = len(memoryWeights)
fracDiffedSeries = ts.rolling(K).apply(lambda x: np.sum( x * memoryWeights ), raw=True)
fracDiffedSeries = fracDiffedSeries.iloc[(K-1):]
return fracDiffedSeries
def _MinimumOrderSearch(self, ts, lowerOrder=0, upperOrder=1, first_run=False):
"""
Binary search algorithm for estimating the minimum order of differentiation required for stationarity.
Parameters
----------
ts (pandas.Series) Univariate time series
lowerOrder (float) Lower bound on order
upperOrder (float) Upper bound on order
first_run (bool) For testing endpoints of order bounds
"""
## Convergence criteria
if abs( upperOrder - lowerOrder ) <= self.precision:
return upperOrder
## Initial run: Test endpoints
if first_run:
lowerFracDiff = self._FracDiff(ts, order=lowerOrder).dropna()
upperFracDiff = self._FracDiff(ts, order=upperOrder).dropna()
# Unit root tests
lowerStationary = self.UnitRootTest.IsStationary( lowerFracDiff )
upperStationary = self.UnitRootTest.IsStationary( upperFracDiff )
# Series is I(0)
if lowerStationary:
return lowerOrder
# Series is I(k>>1)
if not upperStationary:
print('Warning: Time series is explosive. Increase upper bounds.')
return upperOrder
## Binary Search: Test midpoint
midOrder = ( lowerOrder + upperOrder ) / 2
midFracDiff = self._FracDiff(ts, order=midOrder).dropna()
midStationary = self.UnitRootTest.IsStationary( midFracDiff )
# Series is weakly stationary in [lowerOrder, midOrder]
if midStationary:
return self._MinimumOrderSearch(ts, lowerOrder=lowerOrder, upperOrder=midOrder)
# Series is weakly stationary in [midOrder, upperOrder]
else:
return self._MinimumOrderSearch(ts, lowerOrder=midOrder, upperOrder=upperOrder)
import arch
from arch.unitroot import PhillipsPerron as PP
from arch.unitroot import ADF
## TODO: Ng and Perron (2001)?
class PhillipsPerron:
"""
Unit root testing via Phillips and Perron (1988). This test is robust to
serial correlation and heteroskedasticity.
References:
-----------
Phillips, P. C. B., & Perron, P. (1988). Testing for a unit root in time series regression.
Biometrika, 75(2), 335–346. https://doi.org/10.1093/biomet/75.2.335
"""
def __init__(self,
config={ 'trend' : 'n', 'test_type' : 'tau'},
significance=0.01,
checkCV=False,
cv_sig=None):
self.config = config
self.significance = significance
self.checkCV = checkCV
self.cv_sig = cv_sig
def IsStationary(self, ts):
"""
Performs a unit root test.
"""
testResults = PP(ts, trend=self.config['trend'], test_type=self.config['test_type'])
pval, cv, stat = testResults.pvalue, testResults.critical_values, testResults.stat
result = self.HypothesisTest(pval, cv, stat)
return result
def HypothesisTest(self, pval, cv, stat):
"""
Null Hypothesis: Time series is integrated of order I(1)
Alt Hypothesis: Time series is integrated of order I(k<1)
"""
# Reject the hypothesis
if (pval < self.significance) or ( self.checkCV and (stat < cv.get(self.cv_sig, 0)) ):
return True
# Fail to reject the hypothesis
else:
return False
class ADFuller:
"""
Unit root testing via Said and Dickey (1984). This test assumes a parametric
ARMA structure to correct for serial correlation but assumes the errors are homoskedastic.
References:
-----------
Said E. Said, & Dickey, D. A. (1984). Testing for Unit Roots in Autoregressive-Moving Average
Models of Unknown Order. Biometrika, 71(3), 599–607. https://doi.org/10.2307/2336570
"""
def __init__(self,
config={ 'trend' : 'n', 'method' : 'AIC'},
significance=0.01,
checkCV=False,
cv_sig=None):
self.config = config
self.significance = significance
self.checkCV = checkCV
self.cv_sig = cv_sig
## Compatability workaround //
# arch <= 4.17 uses capital letters but newer versions use lowercase
if (str(arch.__version__) > '4.17'):
if self.config.get('method') == 'AIC':
self.config['method'] = 'aic'
elif self.config.get('method') == 'BIC':
self.config['method'] = 'bic'
def IsStationary(self, ts):
"""
Performs a unit root test.
"""
testResults = ADF(ts, trend=self.config['trend'], method=self.config['method'])
pval, cv, stat = testResults.pvalue, testResults.critical_values, testResults.stat
result = self.HypothesisTest(pval, cv, stat)
return result
def HypothesisTest(self, pval, cv, stat):
"""
Null Hypothesis: Gamma = 0 (Unit root)
Alt Hypothesis: Gamma < 0
"""
# Reject the hypothesis
if (pval < self.significance) or ( self.checkCV and (stat < cv.get(self.cv_sig, 0)) ):
return True
# Fail to reject the hypothesis
else:
return False