| Overall Statistics |
|
Total Orders 6597 Average Win 0.20% Average Loss -0.16% Compounding Annual Return 17.945% Drawdown 22.500% Expectancy 0.218 Start Equity 1000000 End Equity 2158879.65 Net Profit 115.888% Sharpe Ratio 0.773 Sortino Ratio 0.852 Probabilistic Sharpe Ratio 38.930% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 1.22 Alpha 0.067 Beta 0.444 Annual Standard Deviation 0.138 Annual Variance 0.019 Information Ratio 0.117 Tracking Error 0.15 Treynor Ratio 0.24 Total Fees $25584.23 Estimated Strategy Capacity $23000000.00 Lowest Capacity Asset MAR R9GTMYCXKZ8L Portfolio Turnover 11.29% |
# region imports
from AlgorithmImports import *
# endregion
class EmpiricalCumulativeDensityFunction(PythonIndicator):
def __init__(self, roc_period, lookback_period):
self.name = f'ECDF_{roc_period}_{lookback_period}'
self.warm_up_period = lookback_period
self.time = datetime.min
self.value = 0
self.current = IndicatorDataPoint(self.time, self.value)
self.roc = RateOfChange(roc_period)
self.roc.name = 'ECDF_ROC'
self._returns = np.array([])
def update(self, t, price):
self.time = t
if not self.roc.update(t, price):
self.is_ready2 = False
return
roc = self.roc.current.value
if len(self._returns) < self.warm_up_period:
self._returns = np.append(self._returns, roc)
self.is_ready2 = False
return
if roc > 0:
denominator = len(self._returns[self._returns >= 0])
self.value = len(self._returns[self._returns >= roc]) / denominator if denominator else 0
else:
denominator = len(self._returns[self._returns <= 0])
self.value = len(self._returns[self._returns <= roc]) / denominator if denominator else 0
self._returns = np.append(self._returns, roc)[1:]
self.is_ready2 = True
self.current = IndicatorDataPoint(t, self.value)# region imports
from AlgorithmImports import *
from collections import deque
# endregion
class HurstExponent(PythonIndicator):
def __init__(self, period):
self.name = 'hurst'
self.warm_up_period = period
self.time = datetime.min
self.value = 0
self.current = IndicatorDataPoint(self.time, self.value)
self._queue = deque(maxlen=period)
def update(self, t, price):
self._queue.append(price)
if len(self._queue) < self.warm_up_period:
self.is_ready2 = False
return
prices = np.array(self._queue)
lags = range(2, 20)
# Calculate differences for each lag.
tau = [np.std(np.diff(prices, lag)) for lag in lags]
# Perform regression to find Hurst exponent
reg = np.polyfit(np.log(lags), np.log(tau), 1)
self.value = reg[0] / 2
self.is_ready2 = True
self.current = IndicatorDataPoint(t, self.value)# region imports
from AlgorithmImports import *
from symbol_data import SymbolData
from trade import Trade
import xgboost as xgb
from sklearn.preprocessing import StandardScaler
# endregion
# Sources:
# - https://www.quantitativo.com/p/a-mean-reversion-strategy-from-first
# - https://www.quantitativo.com/p/machine-learning-and-the-probability
# - https://www.quantitativo.com/p/long-and-short-mean-reversion-machine
class ProbabilityOfBouncingBackAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2020, 1, 1)
self.set_end_date(2024, 8, 29)
self.set_cash(1_000_000)
self.universe_settings.resolution = Resolution.HOUR
self.add_universe(self._get_asset_prices)
etf = Symbol.create('QQQ', SecurityType.EQUITY, Market.USA)
self._universe = self.add_universe(self.universe.etf(etf, universe_filter_func=self._select_assets))
self._vix = self.add_data(CBOE, "VIX", Resolution.DAILY)
self._vix.sma = self.sma(self._vix.symbol, 15)
self._vix_sma_factor = self.get_parameter('vix_sma_factor', 0.15)
self._long_exposure_by_regime = [0.1, 1.1] # bear, bull
self.train(self.date_rules.year_start(), self.time_rules.midnight, self._train_model)
self.schedule.on(self.date_rules.every_day(etf), self.time_rules.after_market_open(etf, 30), self._trade)
self.set_warm_up(self.start_date - datetime(2015, 1, 1)) # Min start date for QQQ
self._symbol_data_by_symbol = {}
self._trade_by_symbol = {}
self._sorted_by_probability = []
self._scaler = StandardScaler()
self._max_universe_size = self.get_parameter('max_universe_size', 20)
self._ecdf_threshold = self.get_parameter('ecdf_threshold', 0.15)
self._probability_threshold = self.get_parameter('probability_threshold', 0.55)
self._stop_loss_factor = self.get_parameter('stop_loss_factor', 0.95)
self._max_hold_duration = timedelta(self.get_parameter('max_hold_duration', 6))
def _get_asset_prices(self, fundamentals):
# Save the current price of the assets so we can update the
# factors in _select_assets.
self._fundamental_by_symbol = {f.symbol: f for f in fundamentals}
return []
def _select_assets(self, constituents):
# Create SymbolData objects for assets that just entered the ETF.
etf_symbols = [c.symbol for c in constituents]
new_symbols = []
for symbol in etf_symbols:
if symbol not in self._symbol_data_by_symbol:
new_symbols.append(symbol)
self._symbol_data_by_symbol[symbol] = SymbolData(self._max_hold_duration.days - 1)
# Warm up the factors for assets that just entered the ETF (or we
# haven't seen yet).
history = self.history[TradeBar](new_symbols, 2*252, Resolution.DAILY)
history_length = len(list(history))
for i, bars in enumerate(history):
for symbol, bar in bars.items():
self._symbol_data_by_symbol[symbol].update(bar.end_time, bar.close, bar.close * bar.volume, i == history_length-1)
# Update the factors for the rest of the assets we're tracking.
for symbol, symbol_data in self._symbol_data_by_symbol.items():
if symbol not in new_symbols and symbol in self._fundamental_by_symbol:
f = self._fundamental_by_symbol[symbol]
self._symbol_data_by_symbol[symbol].update(self.time, f.price, f.dollar_volume, symbol in etf_symbols)
if self.is_warming_up:
return []
# Select a subset of the current ETF constituents.
probability_by_symbol = {}
for c in constituents:
# Filter 1: price >= $1
if c.symbol not in self._fundamental_by_symbol or not self._fundamental_by_symbol[c.symbol].price >= 1:
continue
symbol_data = self._symbol_data_by_symbol[c.symbol]
# Filter 2: Factor values are ready.
if not symbol_data.is_ready:
continue
# Filter 3: ROC(3) < 0 and 3-day ECDF < self._ecdf_threshold.
if not (symbol_data.ecdf.roc.current.value < 0 and symbol_data.ecdf.value < self._ecdf_threshold):
continue
# Filter 4: P(bouncing back) > self._probability_threshold
raw_factors = symbol_data.factor_history.iloc[-1].drop(['in_etf', 'ECDF_ROC']).values.reshape(1, -1)
p = self._model.predict(xgb.DMatrix(self._scaler.transform(raw_factors)))[0]
if p > self._probability_threshold:
probability_by_symbol[c.symbol] = p
self.plot('Universe', 'Size', len(probability_by_symbol))
# Return <=10 assets with the greatest P(bouncing back).
self._sorted_by_probability = [symbol for symbol, _ in sorted(probability_by_symbol.items(), key=lambda x: x[1], reverse=True)[:self._max_universe_size]]
return self._sorted_by_probability
def on_warmup_finished(self):
self._train_model()
def _train_model(self):
if self.is_warming_up:
return
# Get training samples.
factors = []
labels = []
for symbol, symbol_data in self._symbol_data_by_symbol.items():
if not symbol_data.is_ready:
continue
# Select samples that have `in_etf`, `ECDF_3_252` < self._ecdf_threshold, and ECDF_ROC < 0.
factor_history = symbol_data.factor_history[
(symbol_data.factor_history['in_etf']) &
(symbol_data.factor_history['ECDF_3_252'] < self._ecdf_threshold) &
(symbol_data.factor_history['ECDF_ROC'] < 0)
].dropna().drop(['in_etf', 'ECDF_ROC'], axis=1)
# Align this asset's factor and labels.
label_history = symbol_data.label_history
idx = sorted(list(set(label_history.index).intersection(set(factor_history.index))))
factor_history = factor_history.loc[idx]
label_history = label_history.loc[idx]
# Append this asset's factors and labels to the total set of
# factors/labels.
if not (factor_history.empty or label_history.empty):
factors.extend(factor_history.values.tolist())
labels.extend(label_history.values.tolist())
factors = np.array(factors)
labels = np.array(labels)
# Apply pre-processing to the factors.
factors = self._scaler.fit_transform(factors)
# Train the model.
self._model = xgb.train(
{'objective': 'binary:logistic'},
xgb.DMatrix(factors, label=labels),
num_boost_round=2
)
def _trade(self):
# Manage existing trades.
for symbol in list(self._trade_by_symbol.keys()):
if self._trade_by_symbol[symbol].closed(self.time):
self.liquidate(symbol)
del self._trade_by_symbol[symbol]
# Get the market regime (1 = bull; 0 = bear).
regime = int(self._vix.price <= (1+self._vix_sma_factor) * self._vix.sma.current.value)
weight = self._long_exposure_by_regime[regime] / self._max_universe_size
# Open new trades if we haven't hit the maximum yet.
for symbol in self._sorted_by_probability:
if len(self._trade_by_symbol) >= self._max_universe_size or symbol in self._trade_by_symbol:
continue
quantity = self.calculate_order_quantity(symbol, weight)
if quantity:
self._trade_by_symbol[symbol] = Trade(self, symbol, quantity, self._stop_loss_factor, self._max_hold_duration)
# region imports
from AlgorithmImports import *
from ecdf import EmpiricalCumulativeDensityFunction
from hurst import HurstExponent
# endregion
class SymbolData:
def __init__(self, hold_duration):
self._hold_duration = hold_duration
# Define features.
self.ecdf = EmpiricalCumulativeDensityFunction(3, 252)
ecdf_indicators = [EmpiricalCumulativeDensityFunction(3, 21*i) for i in [3, 6, 9]]
self.roc = RateOfChange(21)
roc_indicators = [RateOfChange(21*i) for i in [1, 3, 6, 9, 12]]
self.rsi = RelativeStrengthIndex(14)
rsi_indicators = [RelativeStrengthIndex(21*i) for i in [3, 6, 9, 12]]
self._sma = SimpleMovingAverage(200)
self._price = Identity('price')
self.sma_distance = IndicatorExtensions.over(self._sma, self._price)
self.dollar_volume = Identity('dollar_volume')
self.relative_dollar_volume = IndicatorExtensions.of(RateOfChange(21), self.dollar_volume)
self.hurst_exponent = HurstExponent(252)
self.factor_history = pd.DataFrame()
self.factors_to_update = [self.ecdf, self.roc, self.rsi, self._sma, self._price, self.hurst_exponent]
self.factors_to_update.extend(ecdf_indicators)
self.factors_to_update.extend(roc_indicators)
self.factors_to_update.extend(rsi_indicators)
self.factors_to_record = [self.ecdf, self.ecdf.roc, self.roc, self.rsi, self.sma_distance, self.hurst_exponent, self.dollar_volume, self.relative_dollar_volume]
self.factors_to_record.extend(ecdf_indicators)
self.factors_to_record.extend(roc_indicators)
self.factors_to_record.extend(rsi_indicators)
self._prices = pd.Series()
def update(self, t, price, dollar_volume, in_etf):
self.factor_history.loc[t, 'in_etf'] = in_etf
# Calculate the latest factor values give the new price.
for factor in self.factors_to_update:
factor.update(t, price)
self.dollar_volume.update(t, dollar_volume)
for factor in self.factors_to_record:
if factor.is_ready or (hasattr(factor, 'is_ready2') and factor.is_ready2):
self.factor_history.loc[t, factor.name] = factor.current.value
# Calculate the latest label values give the new price.
self._prices.loc[t] = price
self.label_history = (self._prices.shift(-self._hold_duration) > self._prices).iloc[:-self._hold_duration].astype(int)
@property
def is_ready(self):
return all([factor.is_ready or (hasattr(factor, 'is_ready2') and factor.is_ready2) for factor in self.factors_to_update])
# region imports
from AlgorithmImports import *
# endregion
class Trade:
def __init__(self, algorithm, symbol, quantity, stop_price_factor, max_hold_duration):
# Enter trade.
algorithm.market_order(symbol, quantity)
# Set the stop loss order.
self._ticket = algorithm.stop_market_order(
symbol, -quantity, (1-stop_price_factor) * algorithm.securities[symbol].price
)
# Set time-based exit.
self._exit_time = algorithm.time + max_hold_duration
def closed(self, time):
return self._ticket.status == OrderStatus.FILLED or time >= self._exit_time