| Overall Statistics |
|
Total Orders 859 Average Win 0.07% Average Loss -0.06% Compounding Annual Return 2.442% Drawdown 4.100% Expectancy 0.096 Start Equity 1000000.00 End Equity 1024535.86 Net Profit 2.454% Sharpe Ratio -0.909 Sortino Ratio -1.205 Probabilistic Sharpe Ratio 27.621% Loss Rate 52% Win Rate 48% Profit-Loss Ratio 1.30 Alpha -0.035 Beta -0.018 Annual Standard Deviation 0.038 Annual Variance 0.001 Information Ratio 0.071 Tracking Error 0.072 Treynor Ratio 1.874 Total Fees $0.00 Estimated Strategy Capacity $42000000.00 Lowest Capacity Asset GBPUSD 8G Portfolio Turnover 31.02% |
#region imports
from AlgorithmImports import *
import pywt
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
#endregion
class SVMWavelet:
def forecast(self, data):
'''
Decomposes 1-D array "data" into multiple components using Discrete Wavelet Transform,
denoises each component using thresholding,
use Support Vector Regression (SVR) to forecast each component,
recombine components for aggregate forecast
returns: the value of the aggregate forecast 1 time-step into the future
'''
w = pywt.Wavelet('sym10') # Daubechies/Symlets are good choices for denoising
threshold = 0.5
# Decompose into wavelet components
coeffs = pywt.wavedec(data, w)
# if we want at least 3 levels (components), solve for:
# log2(len(data) / wave_length - 1) >= 3
# in this case, since we wave_length(sym10) == 20, after solving we get len(data) >= 152,
# hence why our RollingWindow is of length 152 in main.py
for i in range(len(coeffs)):
if i > 0:
# we don't want to threshold the approximation coefficients
coeffs[i] = pywt.threshold(coeffs[i], threshold*max(coeffs[i]))
forecasted = self._svm_forecast(coeffs[i])
coeffs[i] = np.roll(coeffs[i], -1)
coeffs[i][-1] = forecasted
datarec = pywt.waverec(coeffs, w)
return datarec[-1]
def _svm_forecast(self, data, sample_size=10):
'''
Paritions "data" and fits an SVM model to this data, then forecasts the
value one time-step into the future
'''
X, y = self._partition_array(data, size=sample_size)
param_grid = {'C': [.05, .1, .5, 1, 5, 10], 'epsilon': [0.001, 0.005, 0.01, 0.05, 0.1]}
gsc = GridSearchCV(SVR(), param_grid, scoring='neg_mean_squared_error')
model = gsc.fit(X, y).best_estimator_
return model.predict(data[np.newaxis, -sample_size:])[0]
def _partition_array(self, arr, size=None, splits=None):
'''
partitions 1-D array "arr" in a Rolling fashion if "size" is specified,
else, divides the into "splits" pieces
returns: list of paritioned arrays, list of the values 1 step ahead of each partitioned array
'''
arrs = []
values = []
if not (bool(size is None) ^ bool(splits is None)):
raise ValueError('Size XOR Splits should not be None')
if size:
arrs = [arr[i:i + size] for i in range(len(arr) - size)]
values = [arr[i] for i in range(size, len(arr))]
elif splits:
size = len(arr) // splits
if len(arr) % size == 0:
arrs = [arr[i:i + size] for i in range(size - 1, len(arr) - 1, size)]
values = [arr[i] for i in range(2 * size - 1, len(arr), size)]
else:
arrs = [arr[i:i + size] for i in range(len(arr) % size - 1, len(arr) - 1, size)]
values = [arr[value].iloc[i] for i in range(len(arr) % size + size - 1, len(arr), size)]
return np.array(arrs), np.array(values)
#region imports
from AlgorithmImports import *
from SVMWavelet import SVMWavelet
#endregion
class SVMWaveletAlphaModel(AlphaModel):
def __init__(self, period):
self._period = period
self._wavelet = SVMWavelet()
self._symbol_data = {}
self._day = -1
def update(self, algorithm, data):
insights = []
if self._day == algorithm.time.day:
return []
self._day = algorithm.time.day
for symbol, data in self._symbol_data.items():
if not algorithm.is_market_open(symbol):
continue
prices = data.prices()
forecasted_value = self._wavelet.forecast(prices)
# if the sums of the weights > 1, IWPCM normalizes the sum to 1, which
# means we don't need to worry about normalizing them
weight = (forecasted_value / prices[-1]) - 1
if weight > 0.005:
insights.append(Insight.price(symbol, timedelta(1), InsightDirection.UP, weight=abs(weight)))
elif weight < -0.005:
insights.append(Insight.price(symbol, timedelta(1), InsightDirection.DOWN, weight=abs(weight)))
algorithm.insights.cancel([symbol])
return insights
def on_securities_changed(self, algorithm, changed):
for security in changed.added_securities:
symbol = security.symbol
self._symbol_data[symbol] = SymbolData(algorithm, symbol, self._period)
for security in changed.removed_securities:
data = self._symbol_data.pop(security.symbol, None)
if data:
data.dispose()
class SymbolData:
def __init__(self, algorithm, symbol, period):
self._algorithm = algorithm
self._symbol = symbol
self._close = RollingWindow[float](period)
self._consolidator = QuoteBarConsolidator(timedelta(1))
self._consolidator.data_consolidated += self._on_consolidated
algorithm.subscription_manager.add_consolidator(symbol, self._consolidator)
hist = algorithm.history[QuoteBar](symbol, period, Resolution.DAILY)
for bar in hist:
self._consolidator.update(bar)
def _on_consolidated(self, sender, bar):
self._close.add(bar.close)
def dispose(self):
self._close.reset()
self._algorithm.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
def prices(self):
return np.array(list(self._close))[::-1]#region imports
from AlgorithmImports import *
from alpha import SVMWaveletAlphaModel
from portfolio import LeveragedWeightingPortfolioConstructionModel
#endregion
class OptimizedUncoupledRegulators(QCAlgorithm):
def initialize(self):
self.set_start_date(2023, 3, 1)
self.set_end_date(2024, 3, 1)
self.set_cash(1000000)
period = self.get_parameter("period", 152)
leverage = self.get_parameter("leverage", 20)
self.set_brokerage_model(BrokerageName.OANDA_BROKERAGE, AccountType.MARGIN)
self.set_benchmark(SecurityType.FOREX, "EURUSD")
self.universe_settings.leverage = leverage
self.universe_settings.resolution = Resolution.MINUTE
symbols = [ Symbol.create(pairs, SecurityType.FOREX, Market.OANDA)
for pairs in ["EURJPY", "GBPUSD", "AUDCAD", "NZDCHF"]]
self.set_universe_selection(ManualUniverseSelectionModel(symbols))
self.set_alpha(SVMWaveletAlphaModel(period))
self.set_portfolio_construction(LeveragedWeightingPortfolioConstructionModel(lambda dt: None, leverage))#region imports
from AlgorithmImports import *
#endregion
class LeveragedWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, rebalance = Resolution.DAILY, leverage = 20):
super().__init__(rebalance)
self.leverage = leverage
def should_create_target_for_insight(self, insight):
return insight.weight is not None
def determine_target_percent(self, activeInsights):
result = {}
for insight in activeInsights:
result[insight] = (insight.direction if self.respect_portfolio_bias(insight) else InsightDirection.FLAT) * insight.weight * self.leverage
return result