| Overall Statistics |
|
Total Trades 12 Average Win 9.04% Average Loss -6.97% Compounding Annual Return 4.092% Drawdown 7.100% Expectancy 0.531 Net Profit 22.234% Sharpe Ratio 0.898 Probabilistic Sharpe Ratio 37.747% Loss Rate 33% Win Rate 67% Profit-Loss Ratio 1.30 Alpha 0.031 Beta 0.029 Annual Standard Deviation 0.038 Annual Variance 0.001 Information Ratio -0.47 Tracking Error 0.174 Treynor Ratio 1.198 Total Fees $125.19 |
# source for computation: https://arxiv.org/pdf/1411.5062.pdf
### IMPORTANT: PLEASE NOTE WE USE THETA FOR MEAN AND MU FOR DRIFT
### WHILE OTHER SOURCES, E.G. WIKIPEDIA, USES MU FOR MEAN AND THETA FOR DRIFT
import math
from math import sqrt, exp, log # exp(n) == e^n, log(n) == ln(n)
import scipy.optimize as so
import numpy as np
def __compute_log_likelihood(params, *args):
'''
Compute the average Log Likelihood, this function will by minimized by scipy.
Find in (2.2) in linked paper
returns: the average log likelihood from given parameters
'''
# functions passed into scipy's minimize() needs accept one parameter, a tuple of
# of values that we adjust to minimize the value we return.
# optionally, *args can be passed, which are values we don't change, but still want
# to use in our function (e.g. the measured heights in our sample or the value Pi)
theta, mu, sigma = params
X, dt = args
n = len(X)
sigma_tilde_squared = sigma ** 2 * (1 - exp(-2 * mu * dt)) / (2 * mu)
summation_term = 0
for i in range(1, len(X)):
summation_term += (X[i] - X[i - 1] * exp(-mu * dt) - theta * (1 - exp(-mu * dt))) ** 2
summation_term = -summation_term / (2 * n * sigma_tilde_squared)
log_likelihood = (-log(2 * math.pi) / 2) + (-log(sqrt(sigma_tilde_squared))) + summation_term
return -log_likelihood
# since we want to maximize this total log likelihood, we need to minimize the
# negation of the this value (scipy doesn't support maximize)
def estimate_coefficients_MLE(X, dt, tol=1e-4):
'''
Estimates Ornstein-Uhlenbeck coefficients (θ, µ, σ) of the given array
using the Maximum Likelihood Estimation method
input: X - array-like time series data to be fit as an OU process
dt - time increment (1 / days(start date - end date))
tol - tolerance for determination (smaller tolerance means higher precision)
returns: θ, µ, σ, Average Log Likelihood
'''
bounds = ((None, None), (1e-5, None), (1e-5, None)) # theta ∈ ℝ, mu > 0, sigma > 0
# we need 1e-10 b/c scipy bounds are inclusive of 0,
# and sigma = 0 causes division by 0 error
theta_init = np.mean(X)
initial_guess = (theta_init, 100, 100) # initial guesses for theta, mu, sigma
result = so.minimize(__compute_log_likelihood, initial_guess, args=(X, dt), bounds=bounds)
theta, mu, sigma = result.x
max_log_likelihood = -result.fun # undo negation from __compute_log_likelihood
# .x gets the optimized parameters, .fun gets the optimized value
return theta, mu, sigma, max_log_likelihoodimport ou_mle as ou
import numpy as np
import pandas as pd
from OptimalStopping import OptimalStopping
from datetime import datetime
from collections import deque
class Model:
'''
How to use Model:
1. Model.Update() in OnData (including during Warmup)
2. if Model.Ready2Train() -> Model.Train()
2.1. Retrain periodically
3. Buy Portfolio if Model.IsEnter()
4. If bought, sell if Model.IsExit()
'''
def __init__(self):
self.optimal_stopping = None
self.alloc_B = -1
self.time = deque(maxlen=252) # RW's aren't supported for datetimes
self.close_A = deque(maxlen=252)
self.close_B = deque(maxlen=252)
self.portfolio = None # represents portfolio value of holding
# $1 of stock A and -$alloc_B of stock B
def Update(self, time, close_A, close_B):
'''
Adds a new point of data to our model, which will be used in the future for training/retraining
'''
if self.portfolio is not None:
self.portfolio.Update(close_A, close_B)
self.time.append(time)
self.close_A.append(close_A)
self.close_B.append(close_B)
# @property basically a function to a field
@property
def Ready2Train(self):
'''
returns true iff our model has enough data to train
'''
return len(self.close_A) == self.close_A.maxlen
@property
def IsReady(self):
'''
returns true iff our model is ready to provide signals
'''
return self.optimal_stopping is not None
def Train(self, r=.05, c=.05):
'''
Computes our OU and B-Allocation coefficients
'''
if not self.Ready2Train:
return
ts_A = np.array(self.close_A)
ts_B = np.array(self.close_B)
days = (self.time[-1] - self.time[0]).days
dt = 1.0 / days
theta, mu, sigma, self.alloc_B = self.__argmax_B_alloc(ts_A, ts_B, dt)
try:
if self.optimal_stopping is None:
self.optimal_stopping = OptimalStopping(theta, mu, sigma, r, c)
else:
self.optimal_stopping.UpdateFields(theta, mu, sigma, r, c)
except:
# sometimes we get weird OU Coefficients that lead to unsolveable Optimal Stopping
self.optimal_stopping = None
self.portfolio = Portfolio(ts_A[-1], ts_B[-1], self.alloc_B)
def AllocationB(self):
return self.alloc_B
def IsEnter(self):
'''
Return True if it is optimal to enter the Pairs Trade, False otherwise
'''
return self.portfolio.Value() <= self.optimal_stopping.Entry()
def IsExit(self):
'''
Return True if it is optimal to exit the Pairs Trade, False otherwise
'''
return self.portfolio.Value() >= self.optimal_stopping.Exit()
def __compute_portfolio_values(self, ts_A, ts_B, alloc_B):
'''
Compute the portfolio values over time when holding $1 of stock A
and -$alloc_B of stock B
input: ts_A - time-series of price data of stock A,
ts_B - time-series of price data of stock B
outputs: Portfolio values of holding $1 of stock A and -$alloc_B of stock B
'''
ts_A = ts_A.copy() # defensive programming
ts_B = ts_B.copy()
ts_A = ts_A / ts_A[0]
ts_B = ts_B / ts_B[0]
return ts_A - alloc_B * ts_B
def __argmax_B_alloc(self, ts_A, ts_B, dt):
'''
Finds the $ allocation ratio to stock B to maximize the log likelihood
from the fit of portfolio values to an OU process
input: ts_A - time-series of price data of stock A,
ts_B - time-series of price data of stock B
dt - time increment (1 / days(start date - end date))
returns: θ*, µ*, σ*, B*
'''
theta = mu = sigma = alloc_B = 0
max_log_likelihood = 0
def compute_coefficients(x):
portfolio_values = self.__compute_portfolio_values(ts_A, ts_B, x)
return ou.estimate_coefficients_MLE(portfolio_values, dt)
vectorized = np.vectorize(compute_coefficients)
linspace = np.linspace(.01, 1, 100)
res = vectorized(linspace)
index = res[3].argmax()
return res[0][index], res[1][index], res[2][index], linspace[index]
def get_coefficients(self):
'''
Returns the OU coefficients of our model
'''
if not self.IsReady:
return None
return self.optimal_stopping.theta, self.optimal_stopping.mu, self.optimal_stopping.sigma
def __repr__(self):
'''
String representation of the OU coefficients of our model
'''
return f'θ: {self.optimal_stopping.theta:.2} μ: {self.optimal_stopping.mu:.2} σ: {self.optimal_stopping.sigma:.2}' \
if self.IsReady else 'Not ready'
class Portfolio:
'''
Represents a portfolio of holding $1 of stock A and -$alloc_B of stock B
'''
def __init__(self, price_A, price_B, alloc_B):
self.init_price_A = price_A
self.init_price_B = price_B
self.curr_price_A = price_A
self.curr_price_B = price_B
self.alloc_B = alloc_B
def Update(self, new_price_A, new_price_B):
self.curr_price_A = new_price_A
self.curr_price_B = new_price_B
def Value(self):
return self.curr_price_A / self.init_price_A - self.alloc_B * self.curr_price_B / self.init_price_B# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2020 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from Model import Model
class ModulatedMultidimensionalAtmosphericScrubbers(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 8, 15) # Set Start Date
self.SetEndDate(2020, 8, 15)
self.SetCash(100000) # Set Strategy Cash
self.SetBenchmark('SPY')
self.A = self.AddEquity('GLD', Resolution.Daily).Symbol
self.B = self.AddEquity('SLV', Resolution.Daily).Symbol
self.SetWarmup(252)
self.model = Model()
# retrain our model periodically
self.Train(self.DateRules.MonthStart('GLD'), self.TimeRules.Midnight, self.TrainModel)
def OnData(self, data):
self.model.Update(self.Time, data[self.A].Close, data[self.B].Close)
if self.IsWarmingUp:
return
if not self.model.IsReady:
return
# if we aren't holding the portfolio and our model tells us to buy
# the portfolio, we buy the portfolio
if not self.Portfolio.Invested and self.model.IsEnter():
self.SetHoldings(self.A, 1)
self.SetHoldings(self.B, -self.model.AllocationB())
# if we are holding the portfolio and our model tells us to sell
# the portfolio, we liquidate our holdings
elif self.Portfolio.Invested and self.model.IsExit():
self.Liquidate()
def TrainModel(self):
if not self.model.Ready2Train:
return
# retrain quarterly
if self.Time.month % 3 != 1:
return
self.model.Train()
if not self.model.IsReady:
self.Liquidate()
return
self.Log(self.model)# source for computation: https://arxiv.org/pdf/1411.5062.pdf
from math import sqrt, exp
import scipy.integrate as si
import scipy.optimize as so
import numpy as np
class OptimalStopping:
'''
Optimal Stopping Provides Functions for computing the Optimal Entry and Exit for our Pairs Portfolio
Functions V and J are the functions used to calculate the Exit and Entry values, respectively
'''
def __init__(self, theta, mu, sigma, r, c):
'''
x - current portfolio value
theta, mu, sigma - Ornstein-Uhlenbeck Coefficients
(note we use self.theta for mean and self.mu for drift,
while some sources use self.mu for mean and self.theta for drift)
r - investor's subject discount rate
c - cost of trading
'''
self.theta = theta
self.mu = mu
self.sigma = sigma
self.r = r
self.c = c
self.b_star = self.b()
self.F_of_b = self.F(self.b_star)
self.d_star = self.d()
def UpdateFields(self, theta=None, mu=None, sigma=None, r=None, c=None):
'''
Update our OU Coefficients
'''
if theta is not None:
self.theta = theta
if mu is not None:
self.mu = mu
if sigma is not None:
self.sigma = sigma
if r is not None:
self.r = r
if c is not None:
self.c = c
self.b_star = self.b()
self.F_of_b = self.F(self.b_star)
self.d_star = self.d()
def Entry(self):
'''
Optimal value to enter/buy the portfolio
'''
return self.d_star
def Exit(self):
'''
Optimal value to exit/liquidate the portfolio
'''
return self.b_star
def V(self, x):
# equation 4.2, solution of equation posed by 2.3
if x < self.b_star:
return (self.b_star - self.c) * self.F(x) / self.F_of_b
else:
return x - self.c
def F(self, x):
# equation 3.3
def integrand(u):
return u ** (self.r / self.mu - 1) * exp(sqrt(2 * self.mu / self.sigma ** 2) * (x - self.theta) * u - u ** 2 / 2)
return si.quad(integrand, 0, np.inf)[0]
def G(self, x):
# equation 3.4
def integrand(u):
return u ** (self.r / self.mu - 1) * exp(sqrt(2 * self.mu / self.sigma ** 2) * (self.theta - x) * u - u ** 2 / 2)
return si.quad(integrand, 0, np.inf)[0]
def b(self):
# estimates b* using equation 4.3
def func(b):
return self.F(b) - (b - self.c) * self.Prime(self.F, b)
# finds the root of function between the interval [0, 1]
return so.brentq(func, 0, 1)
def d(self):
# estimates d* using equation 4.11
def func(d):
return (self.G(d) * (self.Prime(self.V, d) - 1)) - (self.Prime(self.G, d) * (self.V(d) - d - self.c))
# finds the root of function between the interval [0, 51
return so.brentq(func, 0, 1)
def Prime(self, f, x, h=1e-4):
# given f, estimates f'(x) using the difference quotient forself.mula
# WARNING: LOWER h VALUES CAN LEAD TO WEIRD RESULTS
return (f(x + h) - f(x)) / h