| Overall Statistics |
|
Total Orders 14089 Average Win 0.18% Average Loss -0.09% Compounding Annual Return -0.210% Drawdown 28.900% Expectancy 0.002 Start Equity 100000000 End Equity 97144911.4 Net Profit -2.855% Sharpe Ratio -0.244 Sortino Ratio -0.221 Probabilistic Sharpe Ratio 0.001% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 2.04 Alpha -0.015 Beta -0.014 Annual Standard Deviation 0.068 Annual Variance 0.005 Information Ratio -0.645 Tracking Error 0.159 Treynor Ratio 1.188 Total Fees $12212976.10 Estimated Strategy Capacity $1600000000.00 Lowest Capacity Asset ES YLZ9Z50BJE2P Portfolio Turnover 140.65% |
# region imports
from AlgorithmImports import *
from realized_gamma import RealizedGamma
# endregion
class FuturesIntradayTrendFollowingWithRealizedGammaAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2011, 1, 1)
self.set_end_date(2024, 10, 1)
self.set_cash(100_000_000)
self.settings.minimum_order_margin_portfolio_percentage = 0
# Set some parameters.
self._trading_interval_length = timedelta(minutes=60)
self._realized_gamma_period = 20 # trading days (values in paper: 5, 20, 60, 120)
self._weight_scaler = 5 # To utilize more cash.
# Add the E-mini.
self._future = self.add_future(
Futures.Indices.SP_500_E_MINI,
data_normalization_mode=DataNormalizationMode.BACKWARDS_RATIO,
data_mapping_mode=DataMappingMode.OPEN_INTEREST,
contract_depth_offset=0
)
self._future.set_filter(0, 180)
self._future.realized_gamma_by_time = {}
self._future.yesterdays_close = None
self._future.previous_interval_close = None
# Create some Scheduled Events.
date_rule = self.date_rules.every_day(self._future.symbol)
self.schedule.on(date_rule, self.time_rules.midnight, self._record_close_price)
self.schedule.on(date_rule, self.time_rules.every(self._trading_interval_length), self._rebalance)
# Liquidate everything at the market close.
self.schedule.on(
date_rule,
self.time_rules.before_market_close(self._future.symbol, 16), # By default, you must place MOC orders at least 15.5 minutes before the close
self._close_position
)
# Add a warm-up period to warm-up the indicator.
self.set_warm_up(timedelta(int(1.5*self._realized_gamma_period)))
def _record_close_price(self):
self._future.yesterdays_close = self._future.price
def _rebalance(self):
# Wait until the market is open.
t = self.time
if (not self._future.yesterdays_close or
not self._future.exchange.hours.is_open(t - self._trading_interval_length, False)):
return
# Create a realized Gamma indicator for this time interval if it doesn't already exist.
trading_interval = (t.hour, t.minute)
if trading_interval not in self._future.realized_gamma_by_time:
self._future.realized_gamma_by_time[trading_interval] = RealizedGamma(trading_interval, self._realized_gamma_period)
# Get the indicator value for this time interval.
realized_gamma = self._future.realized_gamma_by_time[trading_interval]
return_since_last_close = self._future.price / self._future.yesterdays_close - 1
if realized_gamma.update(IndicatorDataPoint(t, return_since_last_close)):
self.plot('Realized Gamma', str(trading_interval), realized_gamma.value)
# Update the training data of the previous interval's indicator.
if self._future.previous_interval_close:
previous_t = t - self._trading_interval_length
previous_trading_interval = (previous_t.hour, previous_t.minute)
if previous_trading_interval in self._future.realized_gamma_by_time:
self._future.realized_gamma_by_time[previous_trading_interval].add_label(
self._future.price / self._future.previous_interval_close - 1
)
# Record the interval close price.
self._future.previous_interval_close = self._future.price
# Check if we can rebalance.
if (self.is_warming_up or
not realized_gamma.ready or
not self._future.exchange.hours.is_open(t + self._trading_interval_length - timedelta(seconds=1), False)):
return
# Place orders to rebalance the portfolio.
# Have exposure only when the realized gamma is negative (trending market).
# Set the position proportional to the return since yesterday's close.
# Only enter long (short) positions when the price is not below (above) the VWAP.
security = self.securities[self._future.mapped]
vwap = security.vwap.current.value
self.set_holdings(self._future.mapped, int(realized_gamma.value < 0) * self._weight_scaler * return_since_last_close * int(security.price <= vwap if return_since_last_close < 0 else security.price >= vwap))
def _close_position(self):
quantity = self.portfolio[self._future.mapped].quantity
if quantity:
self.market_on_close_order(self._future.mapped, -quantity)
def on_data(self, data):
if self.is_warming_up:
return
security = self.securities[self._future.mapped]
vwap = security.vwap.current.value
if (security.holdings.is_long and security.price < vwap or
security.holdings.is_short and security.price > vwap):
self.liquidate(security.symbol, tag=f'Crossed VWAP; Price={security.price}; VWAP: {vwap}')
def on_securities_changed(self, changes):
for security in changes.added_securities:
security.vwap = self.vwap(security.symbol)# region imports
from AlgorithmImports import *
from sklearn.linear_model import LinearRegression
# endregion
class RealizedGamma(PythonIndicator):
def __init__(self, trading_interval, period, fit_intercept=True):
self.name = f'RealizedGamma({trading_interval}, {period})'
self.time = datetime.min
self.value = 0
self._X = np.array([]) # Return from previous close to t.
self._y = np.array([]) # Return from t to t+trading_interval.
self._period = period
self._model = LinearRegression(fit_intercept=fit_intercept)
def update(self, input):
# Check if there is sufficient training data.
self.ready = len(self._y) == self._period
if self.ready:
# Fit model.
self._model.fit(self._X.reshape(-1, 1), self._y.reshape(-1, 1))
# Set the value to the opposite (negative) of the predicted the return from t to t+trading_interval.
# `input.value` is the return from previous close to t.
self.value = -self._model.predict([[input.value]])[0][0]
# Add the sample of the independent variable to the training data.
self._X = np.append(self._X, input.value)[-self._period:]
self.time = input.time
return self.ready
def add_label(self, label):
self._y = np.append(self._y, label)[-self._period:]