| Overall Statistics |
|
Total Orders 1499 Average Win 0.74% Average Loss -0.40% Compounding Annual Return -0.219% Drawdown 34.300% Expectancy 0.002 Start Equity 1000000 End Equity 989084.98 Net Profit -1.092% Sharpe Ratio -0.097 Sortino Ratio -0.097 Probabilistic Sharpe Ratio 0.784% Loss Rate 65% Win Rate 35% Profit-Loss Ratio 1.84 Alpha 0 Beta 0 Annual Standard Deviation 0.195 Annual Variance 0.038 Information Ratio 0.101 Tracking Error 0.195 Treynor Ratio 0 Total Fees $13748.15 Estimated Strategy Capacity $2800000000.00 Lowest Capacity Asset ZN Z0WW26QHBAJP Portfolio Turnover 38.89% Drawdown Recovery 209 |
#region imports
from AlgorithmImports import *
from utils import get_position_size
from futures import categories
#endregion
class AdjustedTrendAlphaModel(AlphaModel):
_futures = []
_BUSINESS_DAYS_IN_YEAR = 256
_FORECAST_SCALAR_BY_SPAN = {64: 1.91, 32: 2.79, 16: 4.1, 8: 5.95, 4: 8.53, 2: 12.1} # Table 29 on page 177
_FDM_BY_RULE_COUNT = {1: 1.0, 2: 1.03, 3: 1.08, 4: 1.13, 5: 1.19, 6: 1.26}
_SCALE_AND_CAP_MAPPING_MULTIPLIER = 1.25 # Given on page 254
def __init__(self, algorithm, emac_filters, abs_forecast_cap, sigma_span, target_risk, blend_years):
self._algorithm = algorithm
self._emac_spans = [2**x for x in range (1, emac_filters+1)]
self._fast_ema_spans = self._emac_spans
# "Any ratio between the two moving average lengths of two and six gives statistically indistinguishable results." (p.165)
self._slow_ema_spans = [fast_span * 4 for fast_span in self._emac_spans]
self._all_ema_spans = sorted(list(set(self._fast_ema_spans + self._slow_ema_spans)))
self._annualization_factor = self._BUSINESS_DAYS_IN_YEAR ** 0.5
self._abs_forecast_cap = abs_forecast_cap
self._sigma_span = sigma_span
self._target_risk = target_risk
self._blend_years = blend_years
# Instrument Diversification Multiplier. Hardcoded in https://gitfront.io/r/user-4000052/iTvUZwEUN2Ta/AFTS-CODE/blob/chapter8.py
self._idm = 1.5
self._categories = categories
self._total_lookback = timedelta(365*self._blend_years+self._all_ema_spans[-1])
self._day = -1
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
if data.quote_bars.count:
for future in self._futures:
future.latest_mapped = future.mapped
# If warming up and still > 7 days before start date, don't do anything
# We use a 7-day buffer so that the algorithm has active insights when warm-up ends
if algorithm.start_date - algorithm.time > timedelta(7):
return []
if self._day == data.time.day or data.bars.count == 0:
return []
# Estimate the standard deviation of % daily returns for each future
sigma_pct_by_future = {}
for future in self._futures:
# Estimate the standard deviation of % daily returns
sigma_pct = self._estimate_std_of_pct_returns(future.raw_history, future.adjusted_history)
if sigma_pct is None:
continue
sigma_pct_by_future[future] = sigma_pct
# Create insights.
insights = []
weight_by_symbol = get_position_size({future.symbol: self._categories[future.symbol] for future in sigma_pct_by_future.keys()})
for symbol, instrument_weight in weight_by_symbol.items():
future = algorithm.securities[symbol]
current_contract = algorithm.securities[future.mapped]
# "The price should be for the expiry date we currently hold (not the back-adjusted price)" (p.55)
daily_risk_price_terms = sigma_pct_by_future[future] / (self._annualization_factor) * current_contract.price
# Calculate target position
position = (
(algorithm.portfolio.total_portfolio_value * self._idm * instrument_weight * self._target_risk)
/ (future.symbol_properties.contract_multiplier * daily_risk_price_terms * (self._annualization_factor))
)
# Adjust target position based on forecast
capped_forecast_by_span = {}
for span in self._emac_spans:
risk_adjusted_ewmac = future.ewmac_by_span[span].current.value / daily_risk_price_terms
scaled_forecast_for_ewmac = risk_adjusted_ewmac * self._FORECAST_SCALAR_BY_SPAN[span]
if span == 2: # "Double V" forecast mapping (page 253-254)
if scaled_forecast_for_ewmac < -20:
capped_forecast_by_span[span] = 0
elif -20 <= scaled_forecast_for_ewmac < -10:
capped_forecast_by_span[span] = -40 - (2 * scaled_forecast_for_ewmac)
elif -10 <= scaled_forecast_for_ewmac < 10:
capped_forecast_by_span[span] = 2 * scaled_forecast_for_ewmac
elif 10 <= scaled_forecast_for_ewmac < 20:
capped_forecast_by_span[span] = 40 - (2 * scaled_forecast_for_ewmac)
else:
capped_forecast_by_span[span] = 0
elif span in [4, 64]: # "Scale and cap" forecast mapping
if scaled_forecast_for_ewmac < -15:
capped_forecast_by_span[span] = -15 * self._SCALE_AND_CAP_MAPPING_MULTIPLIER
elif -15 <= scaled_forecast_for_ewmac < 15:
capped_forecast_by_span[span] = scaled_forecast_for_ewmac * self._SCALE_AND_CAP_MAPPING_MULTIPLIER
else:
capped_forecast_by_span[span] = 15 * self._SCALE_AND_CAP_MAPPING_MULTIPLIER
else: # Normal forecast capping
capped_forecast_by_span[span] = max(min(scaled_forecast_for_ewmac, self._abs_forecast_cap), -self._abs_forecast_cap)
raw_combined_forecast = sum(capped_forecast_by_span.values()) / len(capped_forecast_by_span) # Calculate a weighted average of capped forecasts (p. 194)
scaled_combined_forecast = raw_combined_forecast * self._FDM_BY_RULE_COUNT[len(capped_forecast_by_span)] # Apply a forecast diversification multiplier to keep the average forecast at 10 (p 193-194)
capped_combined_forecast = max(min(scaled_combined_forecast, self._abs_forecast_cap), -self._abs_forecast_cap)
if capped_combined_forecast * position == 0:
continue
# Save some data for the PCM
current_contract.forecast = capped_combined_forecast
current_contract.position = position
# Create the insights
local_time = Extensions.convert_to(algorithm.time, algorithm.time_zone, future.exchange.time_zone)
expiry = future.exchange.hours.get_next_market_open(local_time, False) - timedelta(seconds=1)
insights.append(Insight.price(future.mapped, expiry, InsightDirection.UP if capped_combined_forecast * position > 0 else InsightDirection.DOWN))
if insights:
self._day = data.time.day
return insights
def _estimate_std_of_pct_returns(self, raw_history, adjusted_history):
# Align history of raw and adjusted prices
idx = sorted(list(set(adjusted_history.index).intersection(set(raw_history.index))))
adjusted_history_aligned = adjusted_history.loc[idx]
raw_history_aligned = raw_history.loc[idx]
# Calculate exponentially weighted standard deviation of returns
returns = adjusted_history_aligned.diff().dropna() / raw_history_aligned.shift(1).dropna()
rolling_ewmstd_pct_returns = returns.ewm(span=self._sigma_span, min_periods=self._sigma_span).std().dropna()
if rolling_ewmstd_pct_returns.empty: # Not enough history
return None
# Annualize sigma estimate
annulized_rolling_ewmstd_pct_returns = rolling_ewmstd_pct_returns * (self._annualization_factor)
# Blend the sigma estimate (p.80)
blended_estimate = 0.3*annulized_rolling_ewmstd_pct_returns.mean() + 0.7*annulized_rolling_ewmstd_pct_returns.iloc[-1]
return blended_estimate
def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
security = self._algorithm.securities[consolidated_bar.symbol]
end_date = consolidated_bar.end_time.date()
if security.symbol.is_canonical():
# Update adjusted history
security.adjusted_history.loc[end_date] = consolidated_bar.close
security.adjusted_history = security.adjusted_history[security.adjusted_history.index >= end_date - self._total_lookback]
else:
# Update raw history
continuous_contract = self._algorithm.securities[security.symbol.canonical]
if consolidated_bar.symbol == continuous_contract.latest_mapped:
continuous_contract.raw_history.loc[end_date] = consolidated_bar.close
continuous_contract.raw_history = continuous_contract.raw_history[continuous_contract.raw_history.index >= end_date - self._total_lookback]
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
for security in changes.added_securities:
symbol = security.symbol
# Create a consolidator to update the history
security.consolidator = TradeBarConsolidator(timedelta(1))
security.consolidator.data_consolidated += self._consolidation_handler
algorithm.subscription_manager.add_consolidator(symbol, security.consolidator)
if security.symbol.is_canonical():
# Add some members to track price history
security.adjusted_history = pd.Series()
security.raw_history = pd.Series()
# Create indicators for the continuous contract
ema_by_span = {span: algorithm.ema(symbol, span, Resolution.DAILY) for span in self._all_ema_spans}
security.ewmac_by_span = {}
for i, fast_span in enumerate(self._emac_spans):
security.ewmac_by_span[fast_span] = IndicatorExtensions.minus(ema_by_span[fast_span], ema_by_span[self._slow_ema_spans[i]])
security.automatic_indicators = ema_by_span.values()
self._futures.append(security)
for security in changes.removed_securities:
# Remove consolidator + indicators.
algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator)
if security.symbol.is_canonical():
for indicator in security.automatic_indicators:
algorithm.deregister_indicator(indicator)
# region imports
from AlgorithmImports import *
# endregion
categories = {
Symbol.create(Futures.Financials.Y_10_TREASURY_NOTE, SecurityType.FUTURE, Market.CBOT): ("Fixed Income", "Bonds"),
Symbol.create(Futures.Indices.SP_500_E_MINI, SecurityType.FUTURE, Market.CME): ("Equity", "US")
}# region imports
from AlgorithmImports import *
from universe import AdvancedFuturesUniverseSelectionModel
from alpha import AdjustedTrendAlphaModel
from portfolio import BufferedPortfolioConstructionModel
# endregion
class AdjustedTrendFuturesAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(self.end_date - timedelta(5*365))
self.set_cash(1_000_000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.settings.seed_initial_prices = True
self.settings.minimum_order_margin_portfolio_percentage = 0
self.universe_settings.data_normalization_mode = DataNormalizationMode.BACKWARDS_PANAMA_CANAL
self.universe_settings.data_mapping_mode = DataMappingMode.OPEN_INTEREST
self.add_universe_selection(AdvancedFuturesUniverseSelectionModel())
emac_filters = self.get_parameter("emac_filters", 6)
blend_years = self.get_parameter("blend_years", 3) # Number of years to use when blending sigma estimates
self.add_alpha(AdjustedTrendAlphaModel(
self,
emac_filters,
self.get_parameter("abs_forecast_cap", 20), # Hardcoded on p.173
self.get_parameter("sigma_span", 32), # Hardcoded to 32 on p.604
self.get_parameter("target_risk", 0.2), # Recommend value is 0.2 on p.75
blend_years
))
self.settings.rebalance_portfolio_on_security_changes = False
self.settings.rebalance_portfolio_on_insight_changes = False
self._total_count = 0
self._day = -1
self.set_portfolio_construction(BufferedPortfolioConstructionModel(
self._rebalance_func,
self.get_parameter("buffer_scaler", 0.1) # Hardcoded on p.167 & p.173
))
self.add_risk_management(NullRiskManagementModel())
self.set_execution(ImmediateExecutionModel())
self.set_warm_up(timedelta(max(365*blend_years, 4*2**emac_filters) + 7))
def _rebalance_func(self, time):
if ((self._total_count != self.insights.total_count or self._day != self.time.day) and
not self.is_warming_up and
self.current_slice.quote_bars.count > 0):
self._total_count = self.insights.total_count
self._day = self.time.day
return time
return None
#region imports
from AlgorithmImports import *
#endregion
class BufferedPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, rebalance, buffer_scaler):
super().__init__(rebalance)
self._buffer_scaler = buffer_scaler
def create_targets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]:
targets = super().create_targets(algorithm, insights)
adj_targets = []
for insight in insights:
future_contract = algorithm.securities[insight.symbol]
optimal_position = future_contract.forecast * future_contract.position / 10
# Create buffer zone to reduce churn
buffer_width = self._buffer_scaler * abs(future_contract.position)
upper_buffer = round(optimal_position + buffer_width)
lower_buffer = round(optimal_position - buffer_width)
# Determine quantity to put holdings into buffer zone
current_holdings = future_contract.holdings.quantity
if lower_buffer <= current_holdings <= upper_buffer:
continue
quantity = lower_buffer if current_holdings < lower_buffer else upper_buffer
# Place trades
adj_targets.append(PortfolioTarget(insight.symbol, quantity))
# Liquidate contracts that have an expired insight
for target in targets:
if target.quantity == 0:
adj_targets.append(target)
return adj_targets
# region imports
from AlgorithmImports import *
from futures import categories
# endregion
class AdvancedFuturesUniverseSelectionModel(FutureUniverseSelectionModel):
def __init__(self) -> None:
super().__init__(timedelta(1), self._select_future_chain_symbols)
self._symbols = list(categories.keys())
def _select_future_chain_symbols(self, utc_time: datetime) -> List[Symbol]:
return self._symbols
def filter(self, filter: FutureFilterUniverse) -> FutureFilterUniverse:
return filter.expiration(0, 365)#region imports
from AlgorithmImports import *
#endregion
def get_position_size(group):
subcategories = {}
for category, subcategory in group.values():
if category not in subcategories:
subcategories[category] = {subcategory: 0}
elif subcategory not in subcategories[category]:
subcategories[category][subcategory] = 0
subcategories[category][subcategory] += 1
category_count = len(subcategories.keys())
subcategory_count = {
category: len(subcategory.keys())
for category, subcategory in subcategories.items()
}
weights = {}
for symbol in group:
category, subcategory = group[symbol]
weights[symbol] = (
1
/ category_count
/ subcategory_count[category]
/ subcategories[category][subcategory]
)
return weights