Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-19.14
Tracking Error
0.129
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
# MIT License

# Copyright (c) [2022] [https://github.com/AdamWLabs/tsfracdiff]

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# region imports
from AlgorithmImports import *
# endregion

from tsfracdiff import FractionalDifferentiator

class MuscularOrangeCaribou(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2010, 1, 1)
        self.SetEndDate(2010, 1, 5)
        self.SetCash(1000)
        self.AddEquity("SPY", Resolution.Minute)
        self.ModelStuff()

    def OnData(self, data: Slice):
        pass

    def ModelStuff(self):

        df = self.History(self.Securities.Keys, timedelta(days=3), Resolution.Minute)

        # Prices
        close = df.unstack(level=0)['close']

        # Fractionally differentiate
        fracDiff = FractionalDifferentiator()
        close_stationary = fracDiff.FitTransform( close )

        # Invert the transform if needed
        close = fracDiff.InverseTransform( close_stationary )

        # See the estimated orders
        fracDiff.orders

        # Model Stuff
        
        
        return
from unit_root_tests import *

import pandas as pd
import numpy as np

class FractionalDifferentiator:
    
    def __init__(self, maxOrderBound=1, significance=0.01, precision=0.01, 
                       unitRootTest='PP', unitRootTestConfig={}):
        """
        Provides estimation of the minimum fractional order of differentiation required for stationarity
        and data transformations.
        
        The available stationarity/unit root tests are:
        -----------------------------------------------
            - 'PP'  : Phillips and Perron (1988) [default]
            - 'ADF' : Augmented Dickey-Fuller (Said & Dickey, 1984)

        Parameters:
        -----------
            maxOrderBound       (float) Maximum real-valued order to search in (0, maxOrderBound)
            significance        (float) Statistical significance level
            precision           (float) Precision of estimated order
            unitRootTest        (str)   Unit-root/stationarity tests: ['PP','ADF']
            unitRootTestConfig  (dict)  Optional keyword arguments to pass to unit root tests

        Attributes:
        -----------
            orders              (list)  Estimated minimum orders of differentiation

        Example:
        --------
        ```
	    # A pandas.DataFrame/np.array with potentially non-stationary time series
        df 
	
	    # Automatic stationary transformation with minimal information loss
        from tsfracdiff import FractionalDifferentiator
        fracDiff = FractionalDifferentiator()
        df = fracDiff.fit_transform(df)
        ```
        """
        self.maxOrderBound = maxOrderBound
        self.significance = significance
        self.precision = precision
        
        # Critical value checks
        checkCV = False
        cv_sig = None
        if (self.significance in [0.01, 0.05, 0.1]):
            checkCV = True
            cv_sig = str(int(self.significance * 100)) + '%'
        
        # Unit-root/Stationarity tests
        if unitRootTest == 'PP':
            self.UnitRootTest = PhillipsPerron(significance=significance, checkCV=checkCV, cv_sig=cv_sig)
        elif unitRootTest == 'ADF':
            self.UnitRootTest = ADFuller(significance=significance, checkCV=checkCV, cv_sig=cv_sig)
        else:
            raise Exception('Please specify a valid unit root test.')
        self.UnitRootTest.config.update( unitRootTestConfig )

        # States
        self.isFitted = False
        self.orders = []
        self.lagData = None
        
    def Fit(self, df, parallel=True):
        """
        Estimates the minimum orders of differencing required for stationarity.
        
        Parameters:
        -----------
            df       (pandas.DataFrame/np.array) Raw data
            parallel (bool) Use multithreading if true (default). Requires `joblib`.
        """
        df = pd.DataFrame(df).sort_index()
        
        # Estimate minimum order of differencing
        if parallel:
            try:
                import multiprocessing
                from joblib import Parallel, delayed
                from functools import partial
            except ImportError:
                raise Exception('The module `joblib` is required for parallelization.')

            def ApplyParallel(df, func, **kwargs):
                n_jobs = min(df.shape[1], multiprocessing.cpu_count())
                res = Parallel(n_jobs=n_jobs, prefer='threads')( delayed(partial(func, **kwargs))(x) for x in np.array_split(df, df.shape[1], axis=1) )
                return res
            orders = ApplyParallel(df, self._MinimumOrderSearch, upperOrder=self.maxOrderBound, first_run=True)
        else:
            orders = []
            for j in range(df.shape[1]):
                orders.append( self._MinimumOrderSearch(df.iloc[:,j], upperOrder=self.maxOrderBound, first_run=True) )
        self.orders = orders
        
        # Store lagged data for inverse-transformations
        numLags = [ (len(self._GetMemoryWeights(order)) - 1) for order in self.orders ]
        self.lagData = [ df.iloc[:,j].head(lag) for j,lag in enumerate(numLags) ]
        
        self.isFitted = True

        return
        
    def FitTransform(self, df, parallel=True):
        """
        Estimates the minimum orders and returns a fractionally differentiated dataframe.
        
        Parameters
        ----------
            df       (pandas.DataFrame/np.array) Raw data
            parallel (bool) Use multithreading if true (default). Requires `joblib`.
        """
        if not self.isFitted: 
            self.Fit(df, parallel=parallel)
        fracDiffed = self.Transform(df)
        return fracDiffed
    
    def Transform(self, df):
        """
        Applies a fractional differentiation transformation based on estimated orders.

        Parameters
        ----------
            df  (pandas.DataFrame/np.array) Raw data
        """
        if not self.isFitted: 
            raise Exception('Fit the model first.')
            
        df = pd.DataFrame(df).sort_index()
        fracDiffed = []
        for j in range(df.shape[1]):
            x = self._FracDiff(df.iloc[:,j], order=self.orders[j])
            fracDiffed.append( x )
        fracDiffed = pd.concat(fracDiffed, axis=1)
        return fracDiffed
    
    def InverseTransform(self, fracDiffed):
        """
        Inverts the fractional differentiation transformation. 
        Note that the full dataframe exactly as returned by `.transform()` is required,
        including any `NaN` padded missing values.
        
        Parameters
        ----------
            fracDiffed (pandas.DataFrame/np.array) Fractionally differentiated data
        """
        if not self.isFitted: 
            raise Exception('Fit the model first.')
        
        fracDiffed = pd.DataFrame(fracDiffed).sort_index()
        X = []
        for j in range(fracDiffed.shape[1]):
            memoryWeights = self._GetMemoryWeights(self.orders[j])
            K = len(memoryWeights)
            
            # Initial values
            lagData = self.lagData[j]
            lag_idx = lagData.index
            
            # Differenced values
            X_tilde = fracDiffed.iloc[:,j].dropna()
            idx = X_tilde.index
            
            # Iteratively invert transformation
            X_vals = np.ravel(self.lagData[j])
            X_tilde = np.ravel(X_tilde.values)
            for t in range(len(X_tilde)):
                x = -np.sum( memoryWeights[:-1] * X_vals[-(K-1):] ) + X_tilde[t]
                X_vals = np.append(X_vals, x)
            idx = np.concatenate( (lag_idx.values, idx.values) )
            X_vals = pd.Series(X_vals, index=idx)
            X.append( X_vals )
        X = pd.concat([ x for x in X ], axis=1)
        X.columns = fracDiffed.columns
        return X
    
    def _GetMemoryWeights(self, order, memoryThreshold=1e-4):
        """
        Returns an array of memory weights for each time lag.

        Parameters:
        -----------
            order           (float) Order of fracdiff
            memoryThreshold (float) Minimum magnitude of weight significance
        """
        memoryWeights = [1,]
        k = 1
        while True:
            weight = -memoryWeights[-1] * ( order - k + 1 ) / k # Iteratively generate next lag weight
            if abs(weight) < memoryThreshold:
                break
            memoryWeights.append(weight)
            k += 1
        return np.array(list(reversed(memoryWeights)))
    
    def _FracDiff(self, ts, order=1, memoryWeights=None):
        """
        Differentiates a time series based on a real-valued order.

        Parameters:
        -----------
            ts            (pandas.Series) Univariate time series
            order         (float) Order of differentiation
            memoryWeights (array) Optional pre-computed weights
        """
        if memoryWeights is None:
            memoryWeights = self._GetMemoryWeights(order)

        K = len(memoryWeights)
        fracDiffedSeries = ts.rolling(K).apply(lambda x: np.sum( x * memoryWeights ), raw=True)
        fracDiffedSeries = fracDiffedSeries.iloc[(K-1):]
        
        return fracDiffedSeries
    
    def _MinimumOrderSearch(self, ts, lowerOrder=0, upperOrder=1, first_run=False):
        """
        Binary search algorithm for estimating the minimum order of differentiation required for stationarity.
        
        Parameters
        ----------
            ts                   (pandas.Series) Univariate time series
            lowerOrder           (float) Lower bound on order
            upperOrder           (float) Upper bound on order
            first_run            (bool)  For testing endpoints of order bounds
        """  
        ## Convergence criteria
        if abs( upperOrder - lowerOrder ) <= self.precision:
            return upperOrder
        
        ## Initial run: Test endpoints
        if first_run:
            lowerFracDiff = self._FracDiff(ts, order=lowerOrder).dropna()
            upperFracDiff = self._FracDiff(ts, order=upperOrder).dropna()
            
            # Unit root tests
            lowerStationary = self.UnitRootTest.IsStationary( lowerFracDiff )
            upperStationary = self.UnitRootTest.IsStationary( upperFracDiff )

            # Series is I(0)
            if lowerStationary:
                return lowerOrder
            # Series is I(k>>1)
            if not upperStationary:                                                        
                print('Warning: Time series is explosive. Increase upper bounds.')
                return upperOrder
            
        ## Binary Search: Test midpoint
        midOrder = ( lowerOrder + upperOrder ) / 2                                      
        midFracDiff = self._FracDiff(ts, order=midOrder).dropna()
        midStationary = self.UnitRootTest.IsStationary( midFracDiff )
        
        # Series is weakly stationary in [lowerOrder, midOrder]
        if midStationary:
            return self._MinimumOrderSearch(ts, lowerOrder=lowerOrder, upperOrder=midOrder)
        # Series is weakly stationary in [midOrder, upperOrder]
        else:
            return self._MinimumOrderSearch(ts, lowerOrder=midOrder, upperOrder=upperOrder)
        
import arch
from arch.unitroot import PhillipsPerron as PP
from arch.unitroot import ADF

## TODO: Ng and Perron (2001)?

class PhillipsPerron:
    """
    Unit root testing via Phillips and Perron (1988). This test is robust to
    serial correlation and heteroskedasticity.

    References:
    -----------
    Phillips, P. C. B., & Perron, P. (1988). Testing for a unit root in time series regression. 
    Biometrika, 75(2), 335–346. https://doi.org/10.1093/biomet/75.2.335
    """
    
    def __init__(self, 
                config={ 'trend' : 'n', 'test_type' : 'tau'}, 
                significance=0.01,
                checkCV=False, 
                cv_sig=None):
        self.config = config
        self.significance = significance
        self.checkCV = checkCV
        self.cv_sig = cv_sig

    def IsStationary(self, ts):
        """
        Performs a unit root test.
        """

        testResults = PP(ts, trend=self.config['trend'], test_type=self.config['test_type'])
        pval, cv, stat = testResults.pvalue, testResults.critical_values, testResults.stat

        result = self.HypothesisTest(pval, cv, stat)

        return result

    def HypothesisTest(self, pval, cv, stat):
        """
        Null Hypothesis: Time series is integrated of order I(1)
        Alt Hypothesis: Time series is integrated of order I(k<1)
        """
        
        # Reject the hypothesis
        if (pval < self.significance) or ( self.checkCV and (stat < cv.get(self.cv_sig, 0)) ):
            return True
        # Fail to reject the hypothesis
        else:
            return False

class ADFuller:
    """
    Unit root testing via Said and Dickey (1984). This test assumes a parametric
    ARMA structure to correct for serial correlation but assumes the errors are homoskedastic.

    References:
    -----------
    Said E. Said, & Dickey, D. A. (1984). Testing for Unit Roots in Autoregressive-Moving Average 
    Models of Unknown Order. Biometrika, 71(3), 599–607. https://doi.org/10.2307/2336570
    """
    def __init__(self, 
                config={ 'trend' : 'n', 'method' : 'AIC'}, 
                significance=0.01,
                checkCV=False, 
                cv_sig=None):
        self.config = config
        self.significance = significance
        self.checkCV = checkCV
        self.cv_sig = cv_sig

        ## Compatability workaround //
        #   arch <= 4.17 uses capital letters but newer versions use lowercase
        if (str(arch.__version__) > '4.17'):
            if self.config.get('method') == 'AIC':
                self.config['method'] = 'aic'
            elif self.config.get('method') == 'BIC':
                self.config['method'] = 'bic'

    def IsStationary(self, ts):
        """
        Performs a unit root test.
        """

        testResults = ADF(ts, trend=self.config['trend'], method=self.config['method'])
        pval, cv, stat = testResults.pvalue, testResults.critical_values, testResults.stat

        result = self.HypothesisTest(pval, cv, stat)

        return result

    def HypothesisTest(self, pval, cv, stat):
        """
        Null Hypothesis: Gamma = 0 (Unit root)
        Alt Hypothesis: Gamma < 0
        """
        
        # Reject the hypothesis
        if (pval < self.significance) or ( self.checkCV and (stat < cv.get(self.cv_sig, 0)) ):
            return True
        # Fail to reject the hypothesis
        else:
            return False