| Overall Statistics |
|
Total Orders 337 Average Win 2.52% Average Loss -1.21% Compounding Annual Return -17.650% Drawdown 51.700% Expectancy -0.346 Start Equity 30000 End Equity 16745.08 Net Profit -44.183% Sharpe Ratio -0.707 Sortino Ratio -0.583 Probabilistic Sharpe Ratio 0.037% Loss Rate 79% Win Rate 21% Profit-Loss Ratio 2.09 Alpha -0.149 Beta 0.282 Annual Standard Deviation 0.19 Annual Variance 0.036 Information Ratio -0.888 Tracking Error 0.213 Treynor Ratio -0.475 Total Fees $369.78 Estimated Strategy Capacity $1500000.00 Lowest Capacity Asset BHVN WK8OR2T99BOL Portfolio Turnover 5.21% |
# region imports
from AlgorithmImports import *
from enum import Enum
from collections import deque
from datetime import timedelta
# endregion
class SpyCandidatesMomentumBreakoutStrategy(QCAlgorithm):
def initialize(self):
# Locally Lean installs free sample data, to download more data please visit https://www.quantconnect.com/docs/v2/lean-cli/datasets/downloading-data
self.set_start_date(2021, 1, 1) # Set Start Date
self.set_end_date(2025, 1, 1) # Set End Date
self._initial_cash = 30000
self._leverage = 1.0
self.set_cash(self._initial_cash) # Set Strategy Cash
# Use minute bar
self.universe_settings.resolution = Resolution.MINUTE
self.universe_settings.schedule.on(self.date_rules.every(DayOfWeek.TUESDAY))
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE,
account_type=AccountType.MARGIN) # Defaults to margin account
self.set_benchmark("SPY")
self.universe_settings.asynchronous = False
# Simulate putting extra $200 into the account every month
self._monthly_contribution = self.get_parameter("monthly_deposit", default_value=200.0)
self._yearly_contribution = self.get_parameter(
"yearly_deposit", default_value=10000.0)
# Universe Selection
self.add_universe_selection(
FundamentalUniverseSelectionModel(self._fundamental_filter_function))
self.universe_last_update_week = -1
self.universe_last_update_month = -1
# Parameters
self.atr_period = self.get_parameter("atr_period", default_value=14)
self.k = self.get_parameter("k", default_value=1.0)
self.risk_factor = self.get_parameter(
"risk_factor", default_value=0.01)
self.tp_multiple_factor = self.get_parameter(
"tp_multiple_factor", default_value=3.0
)
self.sl_atr_period = self.get_parameter(
"sl_atr_period", default_value=14)
self.initial_sl_atr_multiple = self.get_parameter(
"initial_sl_atr_multiple", default_value=4.0)
self.trailing_sl_atr_multiple = self.get_parameter(
"trailing_sl_atr_multiple", default_value=1.0)
self.sl_sma_period = self.get_parameter(
"sl_sma_period", default_value=20)
self.poi_index = self.get_parameter("POI", default_value=0)
self.n_bar_low_lookback = self.get_parameter(
"n_bar_low_lookback", default_value=5)
self.adx_period = self.get_parameter("adx_period", default_value=15)
self.adx_low_threshold = self.get_parameter("adx_low_threshold", default_value=20)
self.adx_top_threshold = self.get_parameter(
"adx_top_threshold", default_value=40)
# Use VIX as filter --------------------------------------------------------------------
self.vix = self.add_index(
"VIX", market=Market.USA, resolution=Resolution.DAILY).symbol
self.vix_consolidator = TradeBarConsolidator(timedelta(days=1))
# VIX Rate of change
self.vix_roc_5d = self.roc(self.vix, 5, resolution=Resolution.DAILY)
self.warm_up_indicator(self.vix, self.vix_roc_5d,
resolution=Resolution.DAILY)
self.vix_buy_threshold = self.get_parameter("vix_buy_threshod", default_value=25.0)
self.risky_roc_threshold = self.get_parameter("risky_roc_threshold", default_value=0.1)
# Use IXIC as filter -------------------------------------------------------------------
self.ixic = self.add_index("COMP", market=Market.USA, resolution=Resolution.DAILY).symbol
# MAs and ROCs
self._ixic_10ma = self.sma(self.ixic, period=10, resolution=Resolution.DAILY)
self._ixic_20ma = self.sma(self.ixic, period=20, resolution=Resolution.DAILY)
self._ixic_roc_10ma = ROCOnMovingAverage("ROC_on_10MA", sma_period=10, roc_period=60)
self._ixic_roc_20ma = ROCOnMovingAverage("ROC_on_20MA", sma_period=20, roc_period=60)
self.register_indicator(self.ixic, self._ixic_roc_10ma, resolution=Resolution.DAILY)
self.register_indicator(self.ixic, self._ixic_roc_20ma, resolution=Resolution.DAILY)
self.warm_up_indicator(self.ixic, self._ixic_roc_10ma, resolution=Resolution.DAILY)
self.warm_up_indicator(self.ixic, self._ixic_roc_20ma, resolution=Resolution.DAILY)
# Momentum
self.top_n = self.get_parameter("top_n", default_value=20)
self.roc_periods = self.get_parameter("roc_periods", default_value=100)
self.roc_level = self.get_parameter("roc_level", default_value=0.6)
poi = POI(self.poi_index)
# Create selection data
self.alpha_model = MomentumAlphaModel(self,
atr_period=self.atr_period,
k=self.k,
adx_period=self.adx_period,
adx_low_threshold=self.adx_low_threshold,
adx_top_threshold=self.adx_top_threshold,
poi=poi)
self.set_alpha(self.alpha_model)
full_margin = (self.portfolio.total_margin_used + self.portfolio.margin_remaining) * self._leverage
self.debug("Full Margin: %.2f" % full_margin)
self.risk_taking = (full_margin * self.risk_factor)
self.profit_taking = self.risk_taking * self.tp_multiple_factor
self.set_portfolio_construction(PCM(algorithm=self,
max_risk = self.risk_taking,
stop_loss_atr=self.initial_sl_atr_multiple))
# Combine them using CompositeRiskManagementModel
composite_risk_management = CompositeRiskManagementModel(
FixedAmountRiskManagementModel(max_loses=self.risk_taking),
FixedAmountProfitTakingModel(max_profit=self.profit_taking),
ATRTrailingStopRiskModel(algorithm=self,
sl_sma_period=self.sl_sma_period,
sl_atr_period=self.sl_atr_period, stop_atr_multiple=self.trailing_sl_atr_multiple)
)
# Set the composite risk management model
self.set_risk_management(composite_risk_management)
# self.add_risk_management(NBarLowRiskModel(algorithm=self, lookback=self.n_bar_low_lookback))
self.set_execution(CashExecutionModel(algorithm=self))
def _coarse_filter(self, fundamentals: list[Fundamental]) -> list[Symbol]:
"""
Coarse filter that filter by fundamentals.
Filter by
- Market must be in USA
- Market Cap >= 150,000,000
- Volume >= 500,000
- Has fundamental data
"""
_coarse_filtered_symbols = []
# We do a coarse filter first
for security in fundamentals:
if not security.has_fundamental_data:
continue
market_condition = security.market == Market.USA
market_cap_condition = security.market_cap >= 150e6
volume_condition = security.volume > 500_000
price_condition = security.price >= 10
# earnings_filter = security.earning_reports
all_conditions = (
market_condition and
market_cap_condition and
volume_condition and
price_condition)
if all_conditions:
_coarse_filtered_symbols.append(security.symbol)
return _coarse_filtered_symbols
def _fine_filter(self, symbols: list[Symbol]):
_fine_filtered_symbols = {}
for symbol in symbols:
if symbol in ["VIX", "COMP"]:
continue
# Collect history and setup SelectionData
history = self.history(symbol,
periods=260,
resolution=Resolution.DAILY)
if len(history) < 260:
continue
data = MomentumData(symbol=symbol,
roc_days_period=self.roc_periods)
for bar in history.itertuples():
tradebar = TradeBar(
bar.Index[1], bar.Index[0], bar.open, bar.high, bar.low, bar.close, bar.volume)
data.update(tradebar) # Manual Warmup
if data.is_ready:
filter_condition = (
(data.roc_n_days > self.roc_level)
)
if filter_condition:
_fine_filtered_symbols[symbol] = data.composite_momentum
sorted_items = sorted(_fine_filtered_symbols.items(),
key=lambda kv: kv[1], reverse=True)
sorted_dict = dict(sorted_items)
result = [k for k, _ in sorted_dict.items()]
# What if we don't trade if there are too few available setups?
if len(result) < 30:
self.debug("%s::Too few stocks available: %s. Not trade. " %
(self.time, len(result)))
return []
result = result[:self.top_n] if len(result) > self.top_n else result
return result
def _fundamental_filter_function(self, fundamental: list[Fundamental]) -> list[Symbol]:
"""
Filter function. This should run every week.
"""
if ((self.universe_last_update_week == self.time.isocalendar().week)):
return Universe.UNCHANGED
# risky_condition = (
# (self.securities["VIX"].close > self.vix_buy_threshold) and
# (self.vix_roc_5d.is_ready) and
# (self.vix_roc_5d.current.value > self.risky_roc_threshold)
# )
# self.debug("%s::VIX: %.2f, VIX ROC 1D(1W): %.2f. Buy condition? %s" % (self.time,
# self.securities["VIX"].price,
# self.vix_roc_5d.current.value,
# (not risky_condition)))
safe_condition = (
(self._ixic_10ma.current.value > self._ixic_20ma.current.value) and
(self._ixic_roc_10ma.current.value > 0.02) and
(self._ixic_roc_20ma.current.value > 0.02)
)
# If index < 200MA, no need to filter. Skip the rest of the step to speed up
if not safe_condition:
return []
# Coarse filtere symbols
coarse_filtered_symbols = self._coarse_filter(fundamental)
self.debug("%s stocks left after coarse filter. " %
len(coarse_filtered_symbols))
# Fine filter
fine_filtered_symbols = self._fine_filter(coarse_filtered_symbols)
self.debug("%s stocks left after fine filter. " %
len(fine_filtered_symbols))
self.universe_last_update_week = self.time.isocalendar().week
return fine_filtered_symbols
def make_monthly_contribution(self) -> None:
if not self._has_initialised:
self._has_initialised = True
return
# Add the monthly cash contribution
self.portfolio.cash_book["USD"].add_amount(self._monthly_contribution)
# Update Risk taking and profit taking amount
full_margin = (self.portfolio.total_margin_used + self.portfolio.margin_remaining) * self._leverage
self.risk_taking = max(self.risk_taking, full_margin * self.risk_factor)
self.profit_taking = self.risk_taking * self.tp_multiple_factor
self.debug("Full Margin: %.2f, New Risk Taking: %.2f" %
(full_margin, self.risk_taking))
# Update PCM
self.set_portfolio_construction(PCM(algorithm=self,
max_risk = self.risk_taking,
stop_loss_atr=self.initial_sl_atr_multiple))
# Combine them using CompositeRiskManagementModel
composite_risk_management = CompositeRiskManagementModel(
FixedAmountRiskManagementModel(max_loses=self.risk_taking),
FixedAmountProfitTakingModel(max_profit=self.profit_taking),
ATRTrailingStopRiskModel(algorithm=self,
sl_sma_period=self.sl_sma_period,
sl_atr_period=self.sl_atr_period,
stop_atr_multiple=self.trailing_sl_atr_multiple)
)
# Set the composite risk management model
self.set_risk_management(composite_risk_management)
self.debug("Extra Deposit of %.2f. Current equity value: %.2f" %
(self._yearly_contribution, self.portfolio.total_portfolio_value))
def make_yearly_contribution(self) -> None:
if not self._has_initialised:
self._has_initialised = True
return
# Add the monthly cash contribution
self.portfolio.cash_book["USD"].add_amount(self._yearly_contribution)
# self._initial_cash += self._yearly_contribution
# Update Risk taking and profit taking amount
full_margin = (self.portfolio.total_margin_used + self.portfolio.margin_remaining) * self._leverage
self.risk_taking = max(self.risk_taking, full_margin * self.risk_factor)
self.profit_taking = self.risk_taking * self.tp_multiple_factor
self.debug("Full Margin: %.2f, New Risk Taking: %.2f" %
(full_margin, self.risk_taking))
# Update PCM
self.set_portfolio_construction(PCM(algorithm=self,
max_risk = self.risk_taking,
stop_loss_atr=self.initial_sl_atr_multiple))
# Combine them using CompositeRiskManagementModel
composite_risk_management = CompositeRiskManagementModel(
FixedAmountRiskManagementModel(max_loses=self.risk_taking),
FixedAmountProfitTakingModel(max_profit=self.profit_taking),
ATRTrailingStopRiskModel(algorithm=self,
sl_sma_period=self.sl_sma_period,
sl_atr_period=self.sl_atr_period,
stop_atr_multiple=self.trailing_sl_atr_multiple)
)
# Set the composite risk management model
self.set_risk_management(composite_risk_management)
self.debug("Extra Deposit of %.2f. Current equity value: %.2f" %
(self._yearly_contribution, self.portfolio.total_portfolio_value))
class POI(Enum):
LAST_DAY_CLOSE = 0
CURRENT_DAY_OPEN = 1
class ROCOnMovingAverage(PythonIndicator):
"""
Calculate the rate of change of indicators
"""
def __init__(self, name, sma_period: int, roc_period: int):
super().__init__()
self.name = name
self.sma_period = sma_period
self.roc_period = roc_period
self.warm_up_period = max(sma_period, roc_period)
self._ma = SimpleMovingAverage(period=self.sma_period)
self._roc = RateOfChange(period=self.roc_period)
self.value = 0.0
def update(self, input: IndicatorDataPoint) -> bool:
if not isinstance(input, TradeBar):
raise TypeError(
'ROCOnMovingAverage.update: input must be a TradeBar')
# Update indicator first
self._ma.update(input)
# Then update roc
if self._ma.is_ready:
self._roc.update(input.time, self._ma.current.value)
all_ready = (self._ma.is_ready and self._roc.is_ready)
# Calculate the index if the queue is full
if all_ready:
self.value = self._roc.current.value
self.current = IndicatorDataPoint(input.time, float(self.value))
return self.is_ready
@property
def is_ready(self):
return self.value != 0.0
class CompositeRSMomentum(PythonIndicator):
"""
Composite Momentum of ROC(63), ROC(126), ROC(189), ROC(252)
"""
def __init__(self, name):
super().__init__()
# The indicator must have an update method and name, time, and value attributes.
self.name = name
self.warm_up_period = 252
self.time = datetime.min
self.value: float = 0.0
# ROC
self.roc63 = RateOfChange(63)
self.roc126 = RateOfChange(126)
self.roc189 = RateOfChange(189)
self.roc252 = RateOfChange(252)
def update(self, input: IndicatorDataPoint) -> bool:
if not isinstance(input, TradeBar):
raise TypeError(
'ATRIndexOverTime.update: input must be a TradeBar')
self.time = input.time
# Update ATR first
self.roc63.update(input)
self.roc126.update(input)
self.roc189.update(input)
self.roc252.update(input)
all_ready = (self.roc63.is_ready and
self.roc126.is_ready and
self.roc189.is_ready and
self.roc252.is_ready)
# Calculate the index if the queue is full
if all_ready:
self.value = (
2.5 * self.roc63.current.value +
self.roc126.current.value +
self.roc189.current.value +
self.roc252.current.value)
self.current = IndicatorDataPoint(input.time, float(self.value))
return self.is_ready
@property
def is_ready(self):
return self.value > 0
class MomentumData:
def __init__(self, symbol, roc_days_period=100) -> None:
# RS Momentum Indicator
self.symbol = symbol
self._composite_momentum = CompositeRSMomentum("composite_rs_momentum")
self._roc_n_days = RateOfChange(period=roc_days_period)
self._sma_50 = SimpleMovingAverage(period=50)
self._latest_price = 0.0
def update(self, bar: TradeBar) -> None:
self._composite_momentum.update(bar)
self._roc_n_days.update(bar)
self._sma_50.update(bar)
self._latest_price = bar.close
@property
def composite_momentum(self) -> float:
"""Return the most recent ATR value."""
return self._composite_momentum.current.value if self._composite_momentum.is_ready else float('nan')
@property
def roc_n_days(self) -> float:
return self._roc_n_days.current.value if self._roc_n_days.is_ready else float('nan')
@property
def sma_50(self) -> float:
return self._sma_50.current.value if self._sma_50.is_ready else float('nan')
@property
def latest_price(self) -> float:
return self._latest_price
@property
def is_ready(self):
all_ready = (self._composite_momentum.is_ready and
self._roc_n_days.is_ready and
self._latest_price > 0.0)
return all_ready
class SelectionData:
def __init__(self,
algorithm: QCAlgorithm,
symbol: Symbol,
atr_period: int = 14,
sl_atr_period: int = 14,
rsi_period: int = 14,
sl_sma_period: int = 20,
k: float = 1.0,
adx_period: int = 15,
poi: POI = POI.CURRENT_DAY_OPEN) -> None:
"""
SelectionData holds and manages indicators and key daily values for the given symbol.
"""
# Store reference to the algorithm for any utility methods
self._algo = algorithm
self._symbol = symbol
self._algo.debug("Symbol: %s" % symbol)
# Parameters
self.atr_period = atr_period
self.rsi_period = rsi_period
self.sl_sma_period = sl_sma_period
self.sl_atr_period = sl_atr_period
self.k = k
self.poi = poi
self.adx_period = adx_period
# Create an RSI indicator based on daily data
self._atr = AverageTrueRange(
name="ATR", period=self.atr_period, movingAverageType=MovingAverageType.SIMPLE)
# self._rsi = RelativeStrengthIndex(name="RSI", period=self.rsi_period)
self._sl_sma = SimpleMovingAverage("sl_sma", period=self.sl_sma_period)
self._sl_atr = AverageTrueRange(
name="sl_ATR", period=self.sl_atr_period, movingAverageType=MovingAverageType.SIMPLE)
# Moving Averages
self._ema_5 = ExponentialMovingAverage("EMA5", period=5)
self._ema_10 = ExponentialMovingAverage("EMA10", period=10)
self._sma_20 = SimpleMovingAverage("SMA20", period=20)
self._sma_50 = SimpleMovingAverage("SMA50", period=50)
self._sma_200 = SimpleMovingAverage("SMA200", period=200)
# ADX
self._adx = AverageDirectionalIndex(self.adx_period)
# We'll track the current day's opening price and breakout threshold
self._last_day_close = 0.0
self._current_day_open = 0.0
self._current_day_close = 0.0
self._current_day_low = 0.0
self._current_day_high = 0.0
self._breakout_threshold = 0.0
# Last update date
self._last_update_date = None
# Register and warm up indicators
self.register_indicators()
def register_indicators(self):
"""
Register Indicators
"""
# Create 30-minute consolidator for day open
_30_minute_consolidator = TradeBarConsolidator(timedelta(minutes=30))
_30_minute_consolidator.data_consolidated += self._on_30_minute_consolidated
# Create a daily consolidator
_daily_consolidator = TradeBarConsolidator(timedelta(days=1))
# Register the consolidator for daily updates on our RSI
# self._algo.register_indicator(
# self._symbol, self._rsi, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._atr, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._sl_sma, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._sl_atr, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._ema_5, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._ema_10, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._sma_20, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._sma_50, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._sma_200, _daily_consolidator)
self._algo.register_indicator(
self._symbol, self._adx, _daily_consolidator)
# Warm up the RSI so it's ready immediately with daily resolution data
# self._algo.warm_up_indicator(
# self._symbol, self._rsi, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._atr, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._sl_sma, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._sl_atr, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._ema_5, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._ema_10, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._sma_20, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._sma_50, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._sma_200, resolution=Resolution.DAILY)
self._algo.warm_up_indicator(
self._symbol, self._adx, resolution=Resolution.DAILY)
# Add the consolidator to SubscriptionManager
self._algo.subscription_manager.add_consolidator(
self._symbol, _30_minute_consolidator)
self._algo.subscription_manager.add_consolidator(
self._symbol, _daily_consolidator)
# @property
# def rsi(self) -> float:
# """Return the most recent RSI value."""
# return self._rsi.current.value if self._rsi.is_ready else float('nan')
def deregister_indicators(self):
self._algo.deregister_indicator(self._atr)
self._algo.deregister_indicator(self._sl_sma)
self._algo.deregister_indicator(self._sl_atr)
self._algo.deregister_indicator(self._ema_5)
self._algo.deregister_indicator(self._ema_10)
self._algo.deregister_indicator(self._sma_20)
self._algo.deregister_indicator(self._sma_50)
self._algo.deregister_indicator(self._sma_200)
self._algo.deregister_indicator(self._adx)
@property
def atr(self) -> float:
"""Return the most recent ATR value."""
return self._atr.current.value if self._atr.is_ready else float('nan')
@property
def sl_sma(self) -> float:
"""Return the most recent ATR value."""
return self._sl_sma.current.value if self._sl_sma.is_ready else float('nan')
@property
def sl_atr(self) -> float:
"""Return the most recent ATR value."""
return self._sl_atr.current.value if self._sl_atr.is_ready else float('nan')
@property
def ema_5(self) -> float:
"""Return the most recent ATR value."""
return self._ema_5.current.value if self._ema_5.is_ready else float('nan')
@property
def ema_10(self) -> float:
"""Return the most recent ATR value."""
return self._ema_10.current.value if self._ema_10.is_ready else float('nan')
@property
def sma_20(self) -> float:
"""Return the most recent ATR value."""
return self._sma_20.current.value if self._sma_20.is_ready else float('nan')
@property
def sma_50(self) -> float:
"""Return the most recent ATR value."""
return self._sma_50.current.value if self._sma_50.is_ready else float('nan')
@property
def sma_200(self) -> float:
"""Return the most recent ATR value."""
return self._sma_200.current.value if self._sma_200.is_ready else float('nan')
@property
def adx(self) -> float:
"""Return the most recent ATR value."""
return self._adx.current.value if self._adx.is_ready else float('nan')
@property
def plus_di(self) -> float:
return self._adx.positive_directional_index.current.value if self._adx.is_ready else float('nan')
def poi_value(self, poi) -> float:
"""Return point of initiation"""
if poi is POI.LAST_DAY_CLOSE:
return self._last_day_close
elif poi is POI.CURRENT_DAY_OPEN:
return self._current_day_open
else:
raise ValueError(
"poi must be a value of the enum POI, got %s" % poi)
@property
def long_breakout_threshold(self) -> float:
"""Return the computed breakout threshold."""
return self.poi_value(self.poi) + self.k * self.atr
@property
def short_breakout_threshold(self) -> float:
"""Return the computed breakout threshold."""
return self.poi_value(self.poi) - self.k * self.atr
def _on_30_minute_consolidated(self, sender, bar: TradeBar) -> None:
"""
Event fired after a daily TradeBar is consolidated.
Records the open price, computes breakout threshold, and plots key data.
"""
if bar.time.date() != self._last_update_date:
self._last_day_close = self._current_day_close
self._current_day_open = bar.open
self._current_day_low = bar.low
self._current_day_high = bar.high
self._last_update_date = bar.time.date()
self._current_day_close = bar.close
# self._algo.debug("Time: %s H: %.2f, L: %.2f, O: %.2f, C%.2f: " %
# (bar.time,
# bar.high,
# bar.low,
# bar.open,
# bar.close))
def update_indicators(self, time: datetime, price: float) -> None:
"""
If needed, we can update any minute-based indicators here.
For now, only our daily RSI is used, so nothing to update each minute.
"""
pass
def reset_indicators(self):
"""
Reset Indicators
"""
self._atr.reset()
self._sl_sma.reset()
self._adx.reset()
self._ema_5.reset()
self._ema_10.reset()
self._sma_20.reset()
self._sma_50.reset()
self._sma_200.reset()
# self._rsi.reset()
@property
def is_ready(self) -> bool:
all_ready = (
self._current_day_close > 0.0 and
self._last_day_close > 0.0 and
self._current_day_low > 0.0 and
self._current_day_high > 0.0 and
self._ema_5.is_ready and
self._ema_10.is_ready and
self._sma_20.is_ready and
self._sma_50.is_ready and
self._sma_200.is_ready and
self._sl_sma.is_ready and
# self._rsi.is_ready and
self._atr.is_ready and
self._adx.is_ready
)
return all_ready
class MomentumAlphaModel(AlphaModel):
"""
Alpha model produces insights on entry. Alpha model should run every day.
"""
name = "MomentumAlphaModel"
symbol_data_by_symbol: Dict[Symbol, SelectionData] = {}
def __init__(self,
algorithm: QCAlgorithm,
atr_period: int = 14,
k: float = 1.0,
adx_period: int = 15,
adx_low_threshold: int = 20,
adx_top_threshold: int = 40,
poi: POI = POI.LAST_DAY_CLOSE) -> None:
self.algorithm = algorithm
super().__init__()
self.algorithm.debug("%s Alpha Model Initialised. " %
self.algorithm.time)
# Parameters
self.atr_period = atr_period
self.k = k
self.adx_period = adx_period
self.adx_low_threshold = adx_low_threshold
self.adx_top_threshold = adx_top_threshold
self.poi = poi
# For debugging
self._last_date = None
def update(self, algorithm: QCAlgorithm, data: Slice) -> list[Insight]:
insights = []
# risky_condition = (
# (self.algorithm.securities["VIX"].close > self.algorithm.vix_buy_threshold) and
# (self.algorithm.vix_roc_5d.is_ready) and
# (self.algorithm.vix_roc_5d.current.value > self.algorithm.risky_roc_threshold)
# )
safe_condition = (
(self.algorithm._ixic_10ma.current.value > self.algorithm._ixic_20ma.current.value) and
(self.algorithm._ixic_roc_10ma.current.value > 0.02) and
(self.algorithm._ixic_roc_20ma.current.value > 0.02)
)
if not safe_condition:
# No insights if SPX is under 200EMA
# algorithm.debug("%s::IXIC: %.2f 10MA>20MA: %s, ROC(10MA): %.4f, ROC(20MA):%.4f" %
# (algorithm.time,
# self.algorithm.securities[self.algorithm.ixic].close,
# self.algorithm._ixic_10ma.current.value > self.algorithm._ixic_20ma.current.value,
# self.algorithm._ixic_roc_10ma.current.value,
# self.algorithm._ixic_roc_20ma.current.value))
return []
for symbol, symbol_data in self.symbol_data_by_symbol.items():
if not symbol_data.is_ready or (symbol_data._last_update_date != algorithm.time.date()):
continue
symbol_price = algorithm.securities[symbol].price
# algorithm.debug("%s Price: %.2f" % (symbol, symbol_price))
# ----------------------------- CORE LOGIC ----------------------------------
# TODO: Add Volatility Breakout Logic here
# Double momentum
if symbol_price != 0:
buy_condition = (symbol_data.is_ready and
(not algorithm.portfolio[symbol].invested) and
(symbol_data._current_day_close > symbol_data._last_day_close) and # 30-min close > last day close
(symbol_price > symbol_data.ema_10 > symbol_data.sma_20 > symbol_data.sma_50) and
# (symbol_price > symbol_data.ema_5 > symbol_data.ema_10) and
(symbol_data.adx > self.adx_low_threshold) and
(symbol_data.adx < self.adx_top_threshold) and
(symbol_price > symbol_data.long_breakout_threshold))
sell_condition = (symbol_data.is_ready and
(not algorithm.portfolio[symbol].invested) and
# (symbol_price < symbol_data.ema_10 < symbol_data.sma_20) and
(symbol_price < symbol_data.sma_200) and
(symbol_data.adx < self.adx_top_threshold) and
(symbol_price < symbol_data.short_breakout_threshold))
if buy_condition:
# algorithm.debug("%s::%s Price: %.2f, Threshold: %.2f" %
# (algorithm.time,
# symbol,
# symbol_price,
# symbol_data.long_breakout_threshold))
insights.append(Insight.price(symbol,
resolution=Resolution.DAILY,
bar_count=1,
direction=InsightDirection.UP))
# if sell_condition:
# insights.append(Insight.price(symbol,
# resolution=Resolution.DAILY,
# bar_count=1,
# direction=InsightDirection.DOWN))
# # Only buy the asset if the price > 200 SMA
# if (not risky_condition) and buy_condition:
# insights.append(Insight.price(symbol,
# resolution=Resolution.DAILY,
# bar_count=1,
# direction=InsightDirection.UP))
# ------------------------ END OF CORE LOGIC ---------------------------------
return insights
def on_securities_changed(self, algorithm, changes):
"""
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
"""
for added in changes.added_securities:
if added.symbol in ["VIX", "COMP"]:
continue
symbol_data = self.symbol_data_by_symbol.get(added.symbol)
if symbol_data is None:
# symbol_data = SelectionData(added.symbol, algorithm)
symbol_data = SelectionData(
algorithm=self.algorithm,
symbol=added.symbol,
atr_period=self.atr_period,
k=self.k,
adx_period=self.adx_period,
poi=self.poi)
self.symbol_data_by_symbol[added.symbol] = symbol_data
else:
# a security that was already initialized was re-added, reset the indicators
symbol_data.reset_indicators()
algorithm.debug("AlphaModel::Added: %s" % added.symbol)
for removed in changes.removed_securities:
data = self.symbol_data_by_symbol.pop(removed.symbol, None)
if data is not None:
# clean up our consolidators
data.deregister_indicators()
algorithm.debug("AlphaModel::Removed: %s" % removed.symbol)
class PCM(PortfolioConstructionModel):
"""
A portfolio construction model that buy stocks starting from strongest momentum
until cash runs out
"""
removed_securities = []
def __init__(self,
algorithm: QCAlgorithm,
max_risk: float,
k: float = 1.0,
atr_period: int = 14,
stop_loss_atr: float = 4.0,
portfolio_bias: PortfolioBias = PortfolioBias.LONG_SHORT,
resolution: Resolution = Resolution.DAILY):
"""Initialize the model
Args:
algorithm (QCAlgorithm)
risk_factor (float): Risk to take as a fraction of portfolio
resolution (Resolution): The Bar resolution to consider
"""
# Parameters
self.algorithm = algorithm
self.max_risk = max_risk
self.stop_loss_atr = stop_loss_atr
self.atr_period = atr_period
self.k = k
self.resolution = resolution
# Track trading days
self._trading_days = {}
self.time_stop = 40
if portfolio_bias == PortfolioBias.SHORT:
raise ArgumentException(
"Long position must be allowed in RiskParityPortfolioConstructionModel.")
# Collection of data
self.symbol_data_by_symbol: Dict[Symbol, SelectionData] = {}
if not self.symbol_data_by_symbol:
# Re-initialise with existing symbols
for kvp in algorithm.securities:
symbol = kvp.key
if symbol in ["VIX", "COMP"]:
continue
# Collect history
symbol_data = SelectionData(
algorithm=self.algorithm,
symbol=symbol,
atr_period=self.atr_period,
k=self.k,
poi=POI.LAST_DAY_CLOSE)
# algorithm.debug("[RISK MODEL] %s::%s ATR=%s" % (algorithm.time, symbol, symbol_data.atr.current.value))
self.symbol_data_by_symbol[symbol] = symbol_data
# Debug
self.algorithm.debug(
"Initialise TopMomentumPortfolioConstructionModel")
def create_targets(self, algorithm: QCAlgorithm, insights: list[Insight]) -> list[PortfolioTargetCollection]:
"""
Creating targets based on the insights generated
"""
targets = []
# algorithm.debug("[CREATE TARGETS] %s Create targets" % algorithm.time)
for insight in insights:
symbol = insight.symbol
if algorithm.portfolio[symbol].invested:
continue
if insight.direction == InsightDirection.UP:
data = self.symbol_data_by_symbol[symbol]
sl_atr = data.sl_atr
entry_price = algorithm.securities[symbol].price
last_bar_low = data._current_day_low
initial_stop = last_bar_low - self.stop_loss_atr * sl_atr
# Position Sizing
risk = self.max_risk
pp = entry_price - initial_stop
if pp == 0:
# If pp is too small just ignore
continue
shares = risk / pp
# algorithm.debug("[TARGET] %s::%s Entry: %.2f, First bar Low: %.2f, Stop: %.2f. Buy %s shares. " %
# (algorithm.time, symbol, entry_price, last_bar_low, initial_stop, shares))
targets.append(PortfolioTarget(
symbol, shares, tag="Long Entry"))
if symbol not in self._trading_days:
self._trading_days[symbol] = algorithm.time
# if insight.direction == InsightDirection.DOWN:
# data = self.symbol_data_by_symbol[symbol]
# sl_atr = data.sl_atr
# entry_price = algorithm.securities[symbol].price
# last_bar_high = data._current_day_high
# initial_stop = last_bar_high + self.stop_loss_atr * sl_atr
# # Position Sizing
# risk = self.max_risk
# pp = entry_price - initial_stop
# shares = risk / pp
# algorithm.debug("[TARGET] %s::%s Entry: %.2f, Stop: %.2f. Short %s shares. " %
# (algorithm.time, symbol, entry_price, initial_stop, shares))
# targets.append(PortfolioTarget(
# symbol, shares, tag="Short Entry"))
for symbol in self.removed_securities.copy():
if algorithm.portfolio[symbol].invested:
if symbol not in self._trading_days:
# Suppose the removal signal is sent
continue
td = (algorithm.time - self._trading_days[symbol]).days
_pnl = algorithm.portfolio[symbol].unrealized_profit_percent
if td > 40 and _pnl < 0.05:
algorithm.debug(
"[CURRENTLY INVESTED REMOVING FROM PORTFOLIO]: %s::%s (Trading days: %d; PNL: %.2f)" %
(algorithm.time, symbol, td, _pnl))
targets.append(PortfolioTarget(
symbol, 0, tag="Rebalance Remove"))
self.removed_securities.remove(symbol)
# self._trading_days[symbol] = None
del self._trading_days[symbol]
return targets
def on_securities_changed(self, algorithm, changes):
"""
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
"""
for added in changes.added_securities:
if added.symbol in ["VIX", "COMP"]:
continue
symbol_data = self.symbol_data_by_symbol.get(added.symbol)
if symbol_data is None:
# symbol_data = SelectionData(added.symbol, algorithm)
symbol_data = SelectionData(
algorithm=self.algorithm,
symbol=added.symbol,
atr_period=self.atr_period,
k=self.k,
poi=POI.LAST_DAY_CLOSE)
self.symbol_data_by_symbol[added.symbol] = symbol_data
else:
# a security that was already initialized was re-added, reset the indicators
symbol_data.reset_indicators()
algorithm.debug(
"TopMomentumPortfolioConstructionModel::Added: %s" % added.symbol)
for removed in changes.removed_securities:
data = self.symbol_data_by_symbol.pop(removed.symbol, None)
self.removed_securities.append(removed.symbol)
if data is not None:
# clean up our consolidators
data.deregister_indicators()
algorithm.debug("TopMomentumPortfolioConstructionModel::Removed: %s" % removed.symbol)
class FixedAmountRiskManagementModel(RiskManagementModel):
'''Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage'''
def __init__(self, max_loses: float = 500):
'''Initializes a new instance of the MaximumDrawdownPercentPerSecurity class
Args:
maximum_drawdown_percent: The maximum percentage drawdown allowed for any single security holding'''
self.max_loses = -abs(max_loses)
def manage_risk(self, algorithm, targets):
'''Manages the algorithm's risk at each time step
Args:
algorithm: The algorithm instance
targets: The current portfolio targets to be assessed for risk'''
targets = []
for kvp in algorithm.securities:
security = kvp.value
if not security.invested:
continue
pnl = security.holdings.unrealized_profit
if pnl < self.max_loses:
symbol = security.symbol
algorithm.debug("[STOP OUT] %s::%s - PNL: %.2f (Max Loss: %.2f)" % (algorithm.time,
symbol,
security.holdings.unrealized_profit,
self.max_loses))
# Cancel insights
algorithm.insights.cancel([symbol])
# liquidate
targets.append(PortfolioTarget(
symbol, 0, tag="Fixed Amount SL"))
return targets
class FixedAmountProfitTakingModel(RiskManagementModel):
'''Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage'''
def __init__(self, max_profit: float = 1500):
'''Initializes a new instance of the MaximumDrawdownPercentPerSecurity class
Args:
maximum_drawdown_percent: The maximum percentage drawdown allowed for any single security holding'''
self.max_profit = max_profit
def manage_risk(self, algorithm, targets):
'''Manages the algorithm's risk at each time step
Args:
algorithm: The algorithm instance
targets: The current portfolio targets to be assessed for risk'''
targets = []
for kvp in algorithm.securities:
security = kvp.value
if not security.invested:
continue
pnl = security.holdings.unrealized_profit
if pnl > self.max_profit:
symbol = security.symbol
algorithm.debug("[TP] %s::%s - PNL: %.2f" % (algorithm.time,
symbol,
security.holdings.unrealized_profit))
# Cancel insights
algorithm.insights.cancel([symbol])
# liquidate
targets.append(PortfolioTarget(
symbol, 0, tag="Fixed Amount TP"))
return targets
class ATRTrailingStopRiskModel(RiskManagementModel):
"""
ATR Trailing Stop Risk Model
Args:
stop_ema_period (int): The Stop loss Moving Average period
"""
def __init__(self,
algorithm,
sl_sma_period=20,
sl_atr_period=14,
stop_atr_multiple=2):
self.algorithm = algorithm
# Parameters
self.sl_atr_period = sl_atr_period
self.stop_atr_multiple = stop_atr_multiple
self.sl_sma_period = sl_sma_period
# Collection of data
self.symbol_data_by_symbol: Dict[Symbol, SelectionData] = {}
if not self.symbol_data_by_symbol:
# Re-initialise with existing symbols
for kvp in algorithm.securities:
symbol = kvp.key
if symbol in ["VIX", "COMP"]:
continue
# Collect history
symbol_data = SelectionData(
algorithm=self.algorithm,
symbol=symbol,
sl_atr_period=self.sl_atr_period,
sl_sma_period=self.sl_sma_period,
poi=POI.LAST_DAY_CLOSE)
# algorithm.debug("[RISK MODEL] %s::%s ATR=%s" % (algorithm.time, symbol, symbol_data.atr.current.value))
self.symbol_data_by_symbol[symbol] = symbol_data
# Adjust the portfolio targets and return them. If no changes emit nothing.
def manage_risk(self, algorithm: QCAlgorithm, targets: list[PortfolioTarget]) -> list[PortfolioTarget]:
updated_targets = []
for kvp in algorithm.securities:
symbol = kvp.key
security = kvp.value
# Remove if not invested
if not algorithm.portfolio[symbol].invested:
continue
symbol_data = self.symbol_data_by_symbol[symbol]
if algorithm.portfolio[symbol].is_long and symbol_data.is_ready:
sl = symbol_data.sl_sma - self.stop_atr_multiple * symbol_data.sl_atr
if algorithm.securities[symbol].price < sl:
algorithm.debug("[STOP OUT] %s::%s %.2f < %.2f (Trailing SL) - PNL: %.2f" % (algorithm.time,
symbol,
algorithm.securities[symbol].price,
sl,
security.holdings.unrealized_profit))
updated_targets.append(PortfolioTarget(
symbol, quantity=0, tag="Trailing Stop Loss"))
# if algorithm.portfolio[symbol].is_short:
# sl = symbol_data.sl_sma + self.stop_atr_multiple * symbol_data.sl_atr
# if algorithm.securities[symbol].price > sl:
# algorithm.debug("[STOP OUT] %s::%s %.2f > %.2f (SL) - PNL: %.2f" % (algorithm.time,
# symbol,
# algorithm.securities[symbol].price,
# sl,
# security.holdings.unrealized_profit))
# updated_targets.append(PortfolioTarget(
# symbol, quantity=0, tag="Trailing Short Stop Loss"))
return updated_targets
def on_securities_changed(self, algorithm, changes):
"""
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
"""
for added in changes.added_securities:
if added.symbol in ["VIX", "COMP"]:
continue
symbol_data = self.symbol_data_by_symbol.get(added.symbol)
if symbol_data is None:
# symbol_data = SelectionData(added.symbol, algorithm)
# Collect history
symbol_data = SelectionData(
algorithm=self.algorithm,
symbol=added.symbol,
sl_atr_period=self.sl_atr_period,
sl_sma_period=self.sl_sma_period,
poi=POI.LAST_DAY_CLOSE)
# algorithm.debug("[RISK MODEL] %s::%s ATR=%s" % (algorithm.time, symbol, symbol_data.atr.current.value))
self.symbol_data_by_symbol[added.symbol] = symbol_data
else:
# a security that was already initialized was re-added, reset the indicators
symbol_data.reset_indicators()
algorithm.debug(
"ATRTrailingStopRiskModel::Added: %s" % added.symbol)
# for removed in changes.removed_securities:
# data = self.symbol_data_by_symbol.pop(removed.symbol, None)
# if data is not None:
# # clean up our consolidators
# data.deregister_indicators()
# algorithm.debug("ATRTrailingStopRiskModel::Removed: %s" % removed.symbol)
class CashExecutionModel(ExecutionModel):
targets_collection = PortfolioTargetCollection()
def __init__(self, algorithm):
self.algorithm = algorithm
super().__init__()
self._last_execution_date = {}
# Keep track of how many trades have been closed so we can detect newly closed trades
self._last_closed_trades_count = 0
def execute(self, algorithm, targets: list[PortfolioTarget]):
"""
Immediately submits orders for the specified portfolio targets.
Args:
algorithm: The algorithm instance
targets: The portfolio targets to be ordered
"""
self.targets_collection.add_range(targets)
if self.targets_collection.is_empty:
return
# Check available margin
available_margin = algorithm.portfolio.margin_remaining
# algorithm.debug(
# "[AVAILABLE MARGIN UPDATE] Current available margin: %.2f" % available_margin)
for target in self.targets_collection:
security = algorithm.securities[target.symbol]
# calculate remaining quantity to be ordered
quantity = OrderSizing.get_unordered_quantity(
algorithm, target, security, True)
if (quantity > 0 and
security.symbol in self._last_execution_date and
self._last_execution_date[security.symbol] == algorithm.time.date()):
continue
if quantity > 0 and algorithm.portfolio[security.symbol].invested:
# Don't adjust for invested symbols
# algorithm.debug("[NOT EXECUTE] %s::%s" % (algorithm.time, security.symbol))
continue
if quantity < 0 and (not algorithm.portfolio[security.symbol].invested):
# Avoid Short
continue
if quantity != 0:
required_margin = security.buying_power_model.get_initial_margin_requirement(
InitialMarginParameters(security, quantity))
if quantity > 0 and required_margin.value > available_margin:
# algorithm.debug("[INSUFFICIENT MARGIN] %s::%s Margin left: %.4f" % (algorithm.time,
# security.symbol,
# available_margin))
continue
algorithm.market_order(
security, quantity, tag=target.tag)
# Update available margin
if quantity > 0:
available_margin -= required_margin.value
else:
available_margin += required_margin.value
# Track date
if security.symbol not in self._last_execution_date:
self._last_execution_date[security.symbol] = algorithm.time.date()
self.targets_collection.clear() # Remove all symbols
def on_order_event(self, algorithm: QCAlgorithm, orderEvent: OrderEvent) -> None:
# Check if an order was filled
if orderEvent.status == OrderStatus.FILLED:
# Get the total number of closed trades so far
current_closed_trades_count = len(algorithm.trade_builder.closed_trades)
# If the count is greater than our record, we have newly closed trades
if current_closed_trades_count > self._last_closed_trades_count:
# Retrieve newly closed trades
new_closed_trades = algorithm.trade_builder.closed_trades[self._last_closed_trades_count:current_closed_trades_count]
for trade in new_closed_trades:
# The profit and loss for this trade
algorithm.debug("[EXECUTE] %s::%s - Change in qty: %d@%.2f PNL: %.2f (%s)" %
(algorithm.time,
trade.symbol,
trade.quantity,
trade.entry_price,
trade.profit_loss,
orderEvent.ticket.tag))
self._last_closed_trades_count = current_closed_trades_count
class VIXRiskModel(RiskManagementModel):
"""
VIX Risk Model
Args:
vix_threshold (int): VIX Threshold
"""
def __init__(self,
vix_threshold: float=25.0,
vix_roc_period: int=5,
roc_threshold: float=0.0):
self.vix_threshold = vix_threshold
self.vix_roc_period = vix_roc_period
self.roc_threshold = roc_threshold
# Adjust the portfolio targets and return them. If no changes emit nothing.
def manage_risk(self, algorithm: QCAlgorithm, targets: list[PortfolioTarget]) -> list[PortfolioTarget]:
updated_targets = []
# vix = algorithm.securities["VIX"].close
vix_roc = algorithm.roc("VIX", self.vix_roc_period)
algorithm.warm_up_indicator(
"VIX", vix_roc, resolution=Resolution.DAILY)
if not vix_roc.is_ready:
algorithm.error("[VIXRiskModel] Waring: vix_roc is not ready")
return []
# algorithm.debug("[VIX Risk Model] VIX: %.2f, VIX_ROC: %.2f" % (algorithm.securities["VIX"].close,
# vix_roc.current.value))
for kvp in algorithm.securities:
symbol = kvp.key
security = kvp.value
# Remove if not invested
if not algorithm.portfolio[symbol].invested:
continue
# Liquidate if over threshold and keeping uptrend
if (vix_roc.current.value > self.roc_threshold):
algorithm.debug("[VIX Risk Model] %s::VIX: %.2f, VIX_ROC: %.2f. Liquidate %s" % (
algorithm.time,
algorithm.securities["VIX"].close,
vix_roc.current.value, symbol))
algorithm.liquidate(symbol)
return updated_targets