| Overall Statistics |
|
Total Trades 1337 Average Win 0.00% Average Loss 0.00% Compounding Annual Return 0.009% Drawdown 0.100% Expectancy 0.040 Net Profit 0.047% Sharpe Ratio 0.163 Probabilistic Sharpe Ratio 1.802% Loss Rate 55% Win Rate 45% Profit-Loss Ratio 1.30 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.707 Tracking Error 0.17 Treynor Ratio -0.883 Total Fees $9.78 |
import SVMWavelet as svmw
import numpy as np
class OptimizedUncoupledRegulators(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 9, 20) # Set Start Date
self.SetCash(1000000) # Set Strategy Cash
period = 152
self.SetWarmup(period)
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None))
self.SetAlpha(SVMWaveletAlphaModel(period))
# 1549 data points with FXCM, 1560 data points with Oanda
self.AddForex('EURJPY', Resolution.Daily, Market.Oanda)
class SVMWaveletAlphaModel(AlphaModel):
def __init__(self, period):
self.period = period
self.closes = {}
def Update(self, algorithm, data):
for symbol, closes in self.closes.items():
if data.ContainsKey(symbol) and data[symbol] is not None:
closes.Add(data[symbol].Close)
if algorithm.IsWarmingUp:
return []
insights = []
for symbol, closes in self.closes.items():
if not closes.IsReady:
continue
recent_close = closes[0]
forecasted_value = svmw.forecast(np.array(list(closes))[::-1])
# if the sums of the weights > 1, IWPCM normalizes the sum to 1, which
# means we don't need to worry about normalizing them
weight = (forecasted_value / recent_close) - 1
insights.append(self.InsightHelper(symbol, weight))
return insights
def InsightHelper(self, symbol, percentage):
if abs(percentage) < 0.001:
return Insight.Price(symbol, timedelta(1), InsightDirection.Flat)
elif percentage > 0:
return Insight.Price(symbol, timedelta(1), InsightDirection.Up, None, None, None, percentage)
else:
return Insight.Price(symbol, timedelta(1), InsightDirection.Down, None, None, None, abs(percentage))
def OnSecuritiesChanged(self, algorithm, changed):
for security in changed.AddedSecurities:
self.closes[security.Symbol] = RollingWindow[float](self.period)
for security in changed.RemovedSecurities:
self.closes.pop(security.Symbol)import pywt
import numpy as np
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
def forecast(data):
'''
Decomposes 1-D array "data" into multiple components using Discrete Wavelet Transform,
denoises each component using thresholding,
use Support Vector Regression (SVR) to forecast each component,
recombine components for aggregate forecast
returns: the value of the aggregate forecast 1 time-step into the future
'''
w = pywt.Wavelet('sym10') # Daubechies/Symlets are good choices for denoising
threshold = 0.5
# Decompose into wavelet components
coeffs = pywt.wavedec(data, w)
# if we want at least 3 levels, solve for:
# log2(len(data) / wave_length - 1) >= 3
# in this case, since we wave_length(sym10) == 20, after solving we get len(data) >= 152,
# hence why our RollingWindow is of length 152 in main.py
for i in range(1, len(coeffs)):
coeffs[i] = pywt.threshold(coeffs[i], threshold*max(coeffs[i]))
forecasted = __svm_forecast(coeffs[i])
coeffs[i] = np.roll(coeffs[i], -1)
coeffs[i][-1] = forecasted
datarec = pywt.waverec(coeffs, w)
return datarec[-1]
def __svm_forecast(data, sample_size=10):
'''
Paritions "data" and fits an SVM model to this data, then forecasts the
value one time-step into the future
'''
X, y = __partition_array(data, size=sample_size)
param_grid = {'C': [.05, .1, .5, 1, 5, 10], 'epsilon': [0.001, 0.005, 0.01, 0.05, 0.1]}
gsc = GridSearchCV(SVR(), param_grid, scoring='neg_mean_squared_error')
model = gsc.fit(X, y).best_estimator_
return model.predict(data[np.newaxis, -sample_size:])[0]
def __partition_array(arr, size=None, splits=None):
'''
partitions 1-D array "arr" in a Rolling fashion if "size" is specified,
else, divides the into "splits" pieces
returns: list of paritioned arrays, list of the values 1 step ahead of each partitioned array
'''
arrs = []
values = []
if not (bool(size is None) ^ bool(splits is None)):
raise ValueError('Size XOR Splits should not be None')
if size:
arrs = [arr[i:i + size] for i in range(len(arr) - size)]
values = [arr[i] for i in range(size, len(arr))]
elif splits:
size = len(arr) // splits
if len(arr) % size == 0:
arrs = [arr[i:i + size] for i in range(size - 1, len(arr) - 1, size)]
values = [arr[i] for i in range(2 * size - 1, len(arr), size)]
else:
arrs = [arr[i:i + size] for i in range(len(arr) % size - 1, len(arr) - 1, size)]
values = [arr[value].iloc[i] for i in range(len(arr) % size + size - 1, len(arr), size)]
return np.array(arrs), np.array(values)