| Overall Statistics |
|
Total Orders 101 Average Win 10.37% Average Loss -2.58% Compounding Annual Return 5.799% Drawdown 26.700% Expectancy 1.205 Start Equity 100000 End Equity 387101.14 Net Profit 287.101% Sharpe Ratio 0.241 Sortino Ratio 0.206 Probabilistic Sharpe Ratio 0.062% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 4.01 Alpha 0.009 Beta 0.379 Annual Standard Deviation 0.099 Annual Variance 0.01 Information Ratio -0.129 Tracking Error 0.126 Treynor Ratio 0.063 Total Fees $670.68 Estimated Strategy Capacity $1200000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X Portfolio Turnover 1.15% |
# region imports
from AlgorithmImports import *
import itertools
# endregion
class WalkForwardOptimizationGridSearchAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2000, 1, 1)
self.set_end_date(2024, 1, 1)
self.set_cash(100_000)
self.settings.automatic_indicator_warm_up = True
self._security = self.add_equity("SPY", Resolution.DAILY)
self._symbol = self._security.symbol
self._short_ema = None
self._long_ema = None
# Set the optimization search space.
self._parameter_sets = self._generate_parameter_sets(
{
'short_ema': (10, 50, 10), # min, max, step
'long_ema': (60, 200, 10)
}
)
# Define the optimization objective function.
objective = self._cumulative_return
# Schedule periodic optimization sessions.
self.train(
self.date_rules.month_start(self._symbol),
self.time_rules.midnight,
lambda: self._do_wfo(self._optimization_func, max, objective)
)
# Set a warm-up period so we hit one of the optimization sessions
# before we start trading.
self.set_warm_up(timedelta(45))
def _generate_parameter_sets(self, search_space):
# Create ranges for each parameter.
ranges = {
parameter_name: np.arange(min_, max_ + step_size, step_size)
for parameter_name, (min_, max_, step_size) in search_space.items()
}
# Create list of dictionaries for parameter sets.
return [
dict(zip(ranges.keys(), combination))
for combination in list(itertools.product(*ranges.values()))
]
def _do_wfo(self, optimization_func, min_max, objective):
# Get the historical data we need to calculate the scores.
prices = self.history(
self._symbol, timedelta(365), Resolution.DAILY
).loc[self._symbol]
# Calculate the score of each parameter set.
scores = [
optimization_func(prices, parameter_set, objective)
for parameter_set in self._parameter_sets
]
# Find the parameter set that maximizes the objective function.
optimal_parameters = self._parameter_sets[scores.index(min_max(scores))]
# Record the grid search results.
for i, score in enumerate(scores):
self.log(
f"{self.time}; Parameters: {self._parameter_sets[i]}; Score: {score}"
)
for name, value in optimal_parameters.items():
self.plot('Parameters', name, value)
# Adjust the algorithm's logic.
self._update_algorithm_logic(optimal_parameters)
def _optimization_func(self, data, parameter_set, objective):
p1 = parameter_set['short_ema']
p2 = parameter_set['long_ema']
short_ema = data['close'].ewm(p1, min_periods=p1).mean()
long_ema = data['close'].ewm(p2, min_periods=p2).mean()
exposure = (short_ema - long_ema).dropna().apply(np.sign)\
.replace(0, pd.NA).ffill().shift(1)
# ^ shift(1) because we enter the position on the next day.
asset_daily_returns = data['open'].pct_change().shift(-1)
# ^ shift(-1) because we want each entry to be the return from
# the current day to the next day.
strategy_daily_returns = (exposure * asset_daily_returns).dropna()
return objective(strategy_daily_returns)
def _cumulative_return(self, daily_returns):
return (daily_returns + 1).cumprod()[-1] - 1
def _update_algorithm_logic(self, optimal_parameters):
# Remove the old indicators.
if self._short_ema:
self.deregister_indicator(self._short_ema)
if self._long_ema:
self.deregister_indicator(self._long_ema)
# Create the new indicators.
self._short_ema = self.ema(
self._symbol, optimal_parameters['short_ema'], Resolution.DAILY
)
self._long_ema = self.ema(
self._symbol, optimal_parameters['long_ema'], Resolution.DAILY
)
def on_data(self, data):
if self.is_warming_up:
return
# Case 1: Short EMA is above long EMA
if (self._short_ema > self._long_ema and
not self._security.holdings.is_long):
self.set_holdings(self._symbol, 1)
# Case 2: Short EMA is below long EMA
elif (self._short_ema < self._long_ema and
not self._security.holdings.is_short):
self.set_holdings(self._symbol, 0)