| Overall Statistics |
|
Total Orders 28 Average Win 2.72% Average Loss -0.83% Compounding Annual Return 0.444% Drawdown 5.900% Expectancy 0.125 Start Equity 7500 End Equity 7633.95 Net Profit 1.786% Sharpe Ratio -0.3 Sortino Ratio -0.338 Probabilistic Sharpe Ratio 1.586% Loss Rate 74% Win Rate 26% Profit-Loss Ratio 3.27 Alpha -0.01 Beta -0.004 Annual Standard Deviation 0.034 Annual Variance 0.001 Information Ratio -0.546 Tracking Error 0.191 Treynor Ratio 2.333 Total Fees $28.00 Estimated Strategy Capacity $590000000.00 Lowest Capacity Asset XLE RGRPZX100F39 Portfolio Turnover 0.31% |
#region imports
from AlgorithmImports import *
#from utils import GetPositionSize
#from futures import categories
#endregion
class TrendAlphaModel(AlphaModel):
# _sma_fast = SimpleMovingAverage(16)
# _sma_slow = SimpleMovingAverage(64)
# def __init__(self, algorithm: QCAlgorithm, resolution) -> None:
# self._algorithm = algorithm
# self.resolution = Resolution.DAILY
# # self.prediction_interval = Extensions.to_time_span(resolution) #Time.multiply(Extensions.to_time_span(resolution), fast_period)
# # self.symbol_data_by_symbol = {}
# # Warm up the SMA indicator.
# current = algorithm.time
# # provider = algorithm.risk_free_interest_rate_model
# dt = current - timedelta(100)
# while dt <= current:
# #rate = provider.get_interest_rate(dt)
# self._sma_fast.update(dt, rate)
# self._sma_slow.update(dt, rate)
# # self._was_rising = rate > self._sma.current.value
# dt += timedelta(1)
# # Set a schedule to update the interest rate trend indicator every day.
# algorithm.schedule.on(
# algorithm.date_rules.every_day(),
# algorithm.time_rules.at(0, 1),
# self.update_interest_rate
# )
# def update_interest_rate(self) -> None:
# # Update interest rate to the SMA indicator to estimate its trend.
# rate = self._algorithm.risk_free_interest_rate_model.get_interest_rate(self._algorithm.time)
# self._sma.update(self._algorithm.time, rate)
# self._was_rising = rate > self._sma.current.value
# def update(self, algorithm: QCAlgorithm, slice: Slice) -> List[Insight]:
# insights = []
# # Split the forexes by whether the quote currency is USD.
# quote_usd = []
# base_usd = []
# for symbol, security in algorithm.securities.items():
# if security.quote_currency.symbol == Currencies.USD:
# quote_usd.append(symbol)
# else:
# base_usd.append(symbol)
# rate = algorithm.risk_free_interest_rate_model.get_interest_rate(algorithm.time)
# # During the rising interest rate cycle, long the forexes with USD as the base currency and short the ones with USD as the quote currency.
# if rate > self._sma.current.value:
# insights.extend(
# [Insight.price(symbol, timedelta(1), InsightDirection.UP) for symbol in base_usd] +
# [Insight.price(symbol, timedelta(1), InsightDirection.DOWN) for symbol in quote_usd]
# )
# # During the downward interest rate cycle, short the forexes with USD as the base currency and long the ones with USD as the quote currency.
# elif rate < self._sma.current.value:
# insights.extend(
# [Insight.price(symbol, timedelta(1), InsightDirection.DOWN) for symbol in base_usd] +
# [Insight.price(symbol, timedelta(1), InsightDirection.UP) for symbol in quote_usd]
# )
# # If the interest rate cycle is steady for a long time, we expect a flip in the cycle.
# elif self._was_rising:
# insights.extend(
# [Insight.price(symbol, timedelta(1), InsightDirection.DOWN) for symbol in base_usd] +
# [Insight.price(symbol, timedelta(1), InsightDirection.UP) for symbol in quote_usd]
# )
# else:
# insights.extend(
# [Insight.price(symbol, timedelta(1), InsightDirection.UP) for symbol in base_usd] +
# [Insight.price(symbol, timedelta(1), InsightDirection.DOWN) for symbol in quote_usd]
# )
# return insights
# From old file: emac strategy
def __init__(self,
fast_period = 16,
slow_period = 64,
resolution = Resolution.DAILY):
'''Initializes a new instance of the EmaCrossAlphaModel class
Args:
fast_period: The fast EMA period
slow_period: The slow EMA period'''
self.fast_period = fast_period
self.slow_period = slow_period
self.resolution = resolution
#self.prediction_interval = Extensions.to_time_span(resolution) #
self.prediction_interval = Time.multiply(Extensions.to_time_span(resolution), fast_period)
self.symbol_data_by_symbol = {}
resolution_string = Extensions.get_enum_string(resolution, Resolution)
self.name = '{}({},{},{})'.format(self.__class__.__name__, fast_period, slow_period, resolution_string)
def update(self, algorithm, data):
'''Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for symbol, symbol_data in self.symbol_data_by_symbol.items():
if symbol_data.fast.is_ready and symbol_data.slow.is_ready:
if symbol_data.fast_is_over_slow:
if symbol_data.slow > symbol_data.fast:
insights.append(Insight.price(symbol_data.symbol, self.prediction_interval, InsightDirection.DOWN))
elif symbol_data.slow_is_over_fast:
if symbol_data.fast > symbol_data.slow:
insights.append(Insight.price(symbol_data.symbol, self.prediction_interval, InsightDirection.UP))
symbol_data.fast_is_over_slow = symbol_data.fast > symbol_data.slow
return insights
def on_securities_changed(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.added_securities:
symbol_data = self.symbol_data_by_symbol.get(added.symbol)
if symbol_data is None:
symbol_data = SymbolData(added, self.fast_period, self.slow_period, algorithm, self.resolution)
self.symbol_data_by_symbol[added.symbol] = symbol_data
else:
# a security that was already initialized was re-added, reset the indicators
symbol_data.fast.reset()
symbol_data.slow.reset()
for removed in changes.removed_securities:
data = self.symbol_data_by_symbol.pop(removed.symbol, None)
if data is not None:
# clean up our consolidators
data.remove_consolidators()
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, security, fast_period, slow_period, algorithm, resolution):
self.security = security
self.symbol = security.symbol
self.algorithm = algorithm
self.fast_consolidator = algorithm.resolve_consolidator(security.symbol, resolution)
self.slow_consolidator = algorithm.resolve_consolidator(security.symbol, resolution)
algorithm.subscription_manager.add_consolidator(security.symbol, self.fast_consolidator)
algorithm.subscription_manager.add_consolidator(security.symbol, self.slow_consolidator)
# create fast/slow SMAs
self.fast = SimpleMovingAverage(security.symbol, fast_period) #, SimpleMovingAverage.smoothing_factor_default(fast_period))
self.slow = SimpleMovingAverage(security.symbol, slow_period) #, SimpleMovingAverage.smoothing_factor_default(slow_period))
algorithm.register_indicator(security.symbol, self.fast, self.fast_consolidator);
algorithm.register_indicator(security.symbol, self.slow, self.slow_consolidator);
algorithm.warm_up_indicator(security.symbol, self.fast, resolution);
algorithm.warm_up_indicator(security.symbol, self.slow, resolution);
# True if the fast is above the slow, otherwise false.
# This is used to prevent emitting the same signal repeatedly
self.fast_is_over_slow = False
def remove_consolidators(self):
self.algorithm.subscription_manager.remove_consolidator(self.security.symbol, self.fast_consolidator)
self.algorithm.subscription_manager.remove_consolidator(self.security.symbol, self.slow_consolidator)
@property
def slow_is_over_fast(self):
return not self.fast_is_over_slow
# def OnSecuritiesChanged(self, algorithm, changes):
# '''Event fired each time the we add/remove securities from the data feed
# Args:
# algorithm: The algorithm instance that experienced the change in securities
# changes: The security additions and removals from the algorithm'''
# pass
# # # Remove security from sector set
# # for security in changes.RemovedSecurities:
# # for sector in self.sectors:
# # if security in self.sectors[sector]:
# # self.sectors[sector].remove(security)
# # # Add security to corresponding sector set
# # for security in changes.AddedSecurities:
# # sector = security.Fundamentals.AssetClassification.MorningstarSectorCode
# # if sector not in self.sectors:
# # self.sectors[sector] = set()
# # self.sectors[sector].add(security)# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
class EmaCrossAlphaModel(AlphaModel):
'''Alpha model that uses an EMA cross to create insights'''
def __init__(self,
fast_period = 12,
slow_period = 26,
resolution = Resolution.DAILY):
'''Initializes a new instance of the EmaCrossAlphaModel class
Args:
fast_period: The fast EMA period
slow_period: The slow EMA period'''
self.fast_period = fast_period
self.slow_period = slow_period
self.resolution = resolution
self.prediction_interval = Time.multiply(Extensions.to_time_span(resolution), fast_period)
self.symbol_data_by_symbol = {}
resolution_string = Extensions.get_enum_string(resolution, Resolution)
self.name = '{}({},{},{})'.format(self.__class__.__name__, fast_period, slow_period, resolution_string)
def update(self, algorithm, data):
'''Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for symbol, symbol_data in self.symbol_data_by_symbol.items():
if symbol_data.fast.is_ready and symbol_data.slow.is_ready:
if symbol_data.fast_is_over_slow:
if symbol_data.slow > symbol_data.fast:
insights.append(Insight.price(symbol_data.symbol, self.prediction_interval, InsightDirection.DOWN))
elif symbol_data.slow_is_over_fast:
if symbol_data.fast > symbol_data.slow:
insights.append(Insight.price(symbol_data.symbol, self.prediction_interval, InsightDirection.UP))
symbol_data.fast_is_over_slow = symbol_data.fast > symbol_data.slow
return insights
def on_securities_changed(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.added_securities:
symbol_data = self.symbol_data_by_symbol.get(added.symbol)
if symbol_data is None:
symbol_data = SymbolData(added, self.fast_period, self.slow_period, algorithm, self.resolution)
self.symbol_data_by_symbol[added.symbol] = symbol_data
else:
# a security that was already initialized was re-added, reset the indicators
symbol_data.fast.reset()
symbol_data.slow.reset()
for removed in changes.removed_securities:
data = self.symbol_data_by_symbol.pop(removed.symbol, None)
if data is not None:
# clean up our consolidators
data.remove_consolidators()
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, security, fast_period, slow_period, algorithm, resolution):
self.security = security
self.symbol = security.symbol
self.algorithm = algorithm
self.fast_consolidator = algorithm.resolve_consolidator(security.symbol, resolution)
self.slow_consolidator = algorithm.resolve_consolidator(security.symbol, resolution)
algorithm.subscription_manager.add_consolidator(security.symbol, self.fast_consolidator)
algorithm.subscription_manager.add_consolidator(security.symbol, self.slow_consolidator)
# create fast/slow EMAs
self.fast = ExponentialMovingAverage(security.symbol, fast_period, ExponentialMovingAverage.smoothing_factor_default(fast_period))
self.slow = ExponentialMovingAverage(security.symbol, slow_period, ExponentialMovingAverage.smoothing_factor_default(slow_period))
algorithm.register_indicator(security.symbol, self.fast, self.fast_consolidator);
algorithm.register_indicator(security.symbol, self.slow, self.slow_consolidator);
algorithm.warm_up_indicator(security.symbol, self.fast, resolution);
algorithm.warm_up_indicator(security.symbol, self.slow, resolution);
# True if the fast is above the slow, otherwise false.
# This is used to prevent emitting the same signal repeatedly
self.fast_is_over_slow = False
def remove_consolidators(self):
self.algorithm.subscription_manager.remove_consolidator(self.security.symbol, self.fast_consolidator)
self.algorithm.subscription_manager.remove_consolidator(self.security.symbol, self.slow_consolidator)
@property
def slow_is_over_fast(self):
return not self.fast_is_over_slow
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Add the stop loss order logic to the Alpha model.
When the stop loss is hit,
emit a flat insight for the security.
'''
from AlgorithmImports import *
class SmaCrossAlphaModel(AlphaModel):
'''Alpha model that uses an SMA cross to create insights'''
def __init__(self,
fast_period = 16,
slow_period = 64,
resolution = Resolution.DAILY):
'''Initializes a new instance of the EmaCrossAlphaModel class
Args:
fast_period: The fast SMA period
slow_period: The slow SMA period'''
self.fast_period = fast_period
self.slow_period = slow_period
self.resolution = resolution
self.prediction_interval = Time.multiply(Extensions.to_time_span(resolution), 1) #fast_period)
self.symbol_data_by_symbol = {}
resolution_string = Extensions.get_enum_string(resolution, Resolution)
self.name = '{}({},{},{})'.format(self.__class__.__name__, fast_period, slow_period, resolution_string)
def update(self, algorithm, data):
'''Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
# # Schedule the event
# if (data.Time.hour, data.Time.minute) != (9, 31): return
insights = []
for symbol, symbol_data in self.symbol_data_by_symbol.items():
security = algorithm.securities[symbol]
# algorithm.Log(f"Total holdings value is {round(algorithm.Portfolio[symbol].holdings_value,2)}")
if symbol_data.fast.is_ready and symbol_data.slow.is_ready:
if symbol_data.fast_is_over_slow: # and security.holdings.quantity==0:
if symbol_data.slow > symbol_data.fast:
insights.append(Insight.price(symbol_data.symbol, self.prediction_interval, InsightDirection.DOWN))
#insight_is_down = True
elif symbol_data.slow_is_over_fast: # and security.holdings.quantity==0:
if symbol_data.fast > symbol_data.slow:
insights.append(Insight.price(symbol_data.symbol, self.prediction_interval, InsightDirection.UP))
#insight_is_down = False
#symbol_data.fast_is_over_slow = insight_is_down #symbol_data.fast > symbol_data.slow
symbol_data.fast_is_over_slow = symbol_data.fast > symbol_data.slow
return insights
def on_securities_changed(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.added_securities:
symbol_data = self.symbol_data_by_symbol.get(added.symbol)
if symbol_data is None:
symbol_data = SymbolData(added, self.fast_period, self.slow_period, algorithm, self.resolution)
self.symbol_data_by_symbol[added.symbol] = symbol_data
else:
# a security that was already initialized was re-added, reset the indicators
symbol_data.fast.reset()
symbol_data.slow.reset()
for removed in changes.removed_securities:
data = self.symbol_data_by_symbol.pop(removed.symbol, None)
if data is not None:
# clean up our consolidators
data.remove_consolidators()
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, security, fast_period, slow_period, algorithm, resolution):
self.security = security
self.symbol = security.symbol
self.algorithm = algorithm
self.fast_consolidator = algorithm.resolve_consolidator(security.symbol, resolution)
self.slow_consolidator = algorithm.resolve_consolidator(security.symbol, resolution)
algorithm.subscription_manager.add_consolidator(security.symbol, self.fast_consolidator)
algorithm.subscription_manager.add_consolidator(security.symbol, self.slow_consolidator)
# create fast/slow SMAs
self.fast = SimpleMovingAverage(security.symbol, fast_period) #, SimpleMovingAverage.smoothing_factor_default(fast_period))
self.slow = SimpleMovingAverage(security.symbol, slow_period) #, SimpleMovingAverage.smoothing_factor_default(slow_period))
algorithm.register_indicator(security.symbol, self.fast, self.fast_consolidator);
algorithm.register_indicator(security.symbol, self.slow, self.slow_consolidator);
algorithm.warm_up_indicator(security.symbol, self.fast, resolution);
algorithm.warm_up_indicator(security.symbol, self.slow, resolution);
# True if the fast is above the slow, otherwise false.
# This is used to prevent emitting the same signal repeatedly
self.fast_is_over_slow = False
def remove_consolidators(self):
self.algorithm.subscription_manager.remove_consolidator(self.security.symbol, self.fast_consolidator)
self.algorithm.subscription_manager.remove_consolidator(self.security.symbol, self.slow_consolidator)
@property
def slow_is_over_fast(self):
return not self.fast_is_over_slow# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
class TradierExecutionModel(ExecutionModel):
'''Provides an implementation of IExecutionModel that submits market orders at a specific time to achieve the desired portfolio targets'''
def __init__(self):
'''Initializes a new instance of the TradierExecutionModel class'''
self.targets_collection = PortfolioTargetCollection()
def execute(self, algorithm, targets):
'''Immediately submits orders for the specified portfolio targets.
Args:
algorithm: The algorithm instance
targets: The portfolio targets to be ordered'''
# Schedule the event
#if (self.Time.hour,self.Time.minute) != (9, 31): return
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
self.targets_collection.add_range(targets)
if not self.targets_collection.is_empty:
for target in self.targets_collection.order_by_margin_impact(algorithm):
security = algorithm.securities[target.symbol]
# calculate remaining quantity to be ordered
quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True)
if quantity != 0:
above_minimum_portfolio = BuyingPowerModelExtensions.above_minimum_order_margin_portfolio_percentage(
security.buying_power_model,
security,
quantity,
algorithm.portfolio,
algorithm.settings.minimum_order_margin_portfolio_percentage)
if above_minimum_portfolio:
algorithm.market_order(security, quantity)
elif not PortfolioTarget.minimum_order_margin_percentage_warning_sent:
# will trigger the warning if it has not already been sent
PortfolioTarget.minimum_order_margin_percentage_warning_sent = False
self.targets_collection.clear_fulfilled(algorithm)# region imports
from AlgorithmImports import *
from universe import StarterUniverseSelectionModel
from alpha_smac import SmaCrossAlphaModel
from execution import TradierExecutionModel
from portfolio import BufferedPortfolioConstructionModel
from risk_management import TrailingStopUsingInstrumentalRiskManagementModel
#from utils import GetPositionSize
import numpy as np
# endregion
'''
OBSERVATION:
Currently, the algo changes trading direction if the insight changes, regardless the current position's direction.
The algo needs modification to change its behavior.
'''
class StarterAlgorithmMultiAssets(QCAlgorithm):
def initialize(self):
self.set_start_date(2019, 1, 1)
self.set_end_date(2023, 1, 1)
self.set_cash(7500)
# # Add a custom chart to track the SMA cross
# symbol = self.add_equity("XLE", Resolution.DAILY).symbol
# self.ema_fast = self.sma(symbol, 16)
# self.ema_slow = self.sma(symbol, 64)
# chart = Chart('SMA Cross')
# chart.add_series(Series('Fast', SeriesType.LINE, 0))
# chart.add_series(Series('Slow', SeriesType.LINE, 0))
# self.add_chart(chart)
# Use Tradier brokerage model
# self.set_brokerage_model(BrokerageName.TRADIER_BROKERAGE, AccountType.MARGIN)
# # Fix the issue that Tradier does not suppor short order using GTC
# self.DefaultOrderProperties.TimeInForce = TimeInForce.Day
self.UniverseSettings.Resolution = Resolution.DAILY
self.universe_settings.data_normalization_mode = DataNormalizationMode.ADJUSTED
# Universe Model
self.add_universe_selection(StarterUniverseSelectionModel())
# Alpha Model
self.alpha_model = SmaCrossAlphaModel()
self.AddAlpha(self.alpha_model)
# Portfolio construction model
self.set_portfolio_construction(BufferedPortfolioConstructionModel(
timedelta(days=1),
self.get_parameter("buffer_scaler", 0.1)
))
# Disable automatic portfolio rebalancing upon insight change, allowing for manual control over when portfolio adjustments are made based on insights.
#self.settings.rebalance_portfolio_on_insight_changes = False
# Risk model
self.SetRiskManagement(TrailingStopUsingInstrumentalRiskManagementModel())
# Execution model
self.SetExecution(ImmediateExecutionModel())
# Schedule order submission
# self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 1), self.__AfterOpen)
# Warming up
def OnEndOfAlgorithm(self):
self.Debug(self.alpha_model.name)
self.Debug(self.alpha_model.prediction_interval)
self.Debug(f"Printing {len(self.alpha_model.symbol_data_by_symbol)} mapping(s)...")
# for symbol, _symbolData in self.alpha_model.symbol_data_by_symbol.items():
# self.Debug( str(symbol) + " " + str(_symbolData))
# def IsRebalanceDue(self, time):
# # Rebalance when all of the following are true:
# # - There are new insights or old insights have been cancelled since the last rebalance
# # - The algorithm isn't warming up
# # - There is QuoteBar data in the current slice
# # latest_expiry_time = sorted([insight.close_time_utc for insight in self.insights], reverse=True)[0] if self.insights.count else None
# # if self._previous_expiry_time != latest_expiry_time and not self.is_warming_up and self.current_slice.quote_bars.count > 0:
# # self._previous_expiry_time = latest_expiry_time
# # return time
# # return None
# for key in self.portfolio.keys():
# self.debug(key)
#self.underlying = self.portfolio.keys #][0] #.security_holding.symbol
# Add underlying
# self.underlying = "XLE" #"TNA" # "IWM" #"UUP", "XLK", "DBA", "USO", "GLD", "DIA", "SPY"
# self.add_equity("SPY", Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.RAW)
# # self.add_equity(self.underlying, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED)
# self._symbol = self.underlying
# self.benchmark = self.underlying
# # Build history for indicators
# #history_trade_bar =
# self.history[TradeBar](self._symbol, 100, Resolution.DAILY) #, dataNormalizationMode=DataNormalizationMode.SCALED_RAW)
# # Warm up the close price and trade bar rolling windows with the previous 100-day trade bar data
# self.rollingwindow = RollingWindow[TradeBar](100) # Create a class member to store the RollingWindow
# self.Consolidate(self._symbol, Resolution.DAILY, self.CustomBarHandler)
# # Risk parameters
# self.target_risk = 0.12 # percentage
# self.instrument_look_back = 30 # number of days
# self.instrument_risk_annualizer = 16 #math.sqrt(256/self.instrument_look_back)
# self.stop_price_instrument_ratio = 0.5 # percentage
# # Strategy parameters
# self.short_look_back = 16
# self.long_look_back = 64
# # self.short_ma = self.sma(self._symbol, short_look_back) #, Resolution.DAILY)
# # self.long_ma = self.sma(self._symbol, long_look_back) #, Resolution.DAILY)
# # self.short_ma = SimpleMovingAverage(short_look_back)
# # self.long_ma = SimpleMovingAverage(long_look_back)
# # self._sma.window.size = 5
# # self.set_warm_up(long_look_back) # Warm up for the long ma
# self.strategy_direction = ""
# self.new_strategy_direction = ""
# # Order ticket for our stop order, Datetime when stop order was last hit
# self.stop_market_ticket = None
def OnData(self, data):
# Exit positions that aren't backed by existing insights.
# If you don't want this behavior, delete this method definition.
if self.IsWarmingUp:
return
# Schedule the event
if (self.Time.hour,self.Time.minute) != (9, 31): return
self.plot_sma()
# self.Log(f"Total portfolio value is {self.Portfolio.TotalPortfolioValue}")
def plot_sma(self):
"""Plot the chart"""
# pass
self.plot('SMA Cross', self.alpha_model.fast_is_over_slow)
self.plot('SMA Cross', 'Fast', self.alpha_model.fast.current.value)
self.plot('SMA Cross', 'Slow', self.alpha_model.slow.current.value)
# self.debug(self.Time)
# # if not self.checked_symbols_from_previous_deployment:
# for security_holding in self.Portfolio.Values:
# self.debug("Here")
# if not security_holding.Invested:
# continue
# symbol = security_holding.Symbol
# if not self.Insights.HasActiveInsights(symbol, self.UtcTime):
# self.undesired_symbols_from_previous_deployment.append(symbol)
# self.checked_symbols_from_previous_deployment = True
# self.debug("There")
# for symbol in self.undesired_symbols_from_previous_deployment[:]:
# if self.IsMarketOpen(symbol):
# self.Liquidate(symbol, tag="Holding from previous deployment that's no longer desired")
# self.undesired_symbols_from_previous_deployment.remove(symbol)
# def on_data(self, data):
# # Warming up the data and indicator
# # if not self.long_ma.is_ready: return
# if not self.rollingwindow.is_ready: return
# # Schedule the event
# if (self.Time.hour,self.Time.minute) != (9, 31): return
# # Get current close
# self.trade_bar_df = self.pandas_converter.get_data_frame[TradeBar](list(self.rollingwindow)[::-1])
# self.current_close = self.trade_bar_df.close[-1]
# # Calculate the trade signal
# self.cal_trade_signal()
# # self.debug(self.current_close)
# # Calculate annualized instrument risk
# self.cal_annualized_instrument_risk()
# self.plot("Risks", "Annualized Instrument Risk", self.annalized_instrument_risk)
# # Get stop adjustment
# stop_adjustment = self.annalized_instrument_risk*self.stop_price_instrument_ratio
# # Calculate the stop price
# # self.starting_stop_price_long = np.nan #self.securities[self.underlying].close # initial value
# # self.starting_stop_price_short = np.nan #self.securities[self.underlying].close # initial value
# self.starting_stop_price_long = self.current_close - stop_adjustment
# self.starting_stop_price_short = self.current_close + stop_adjustment
# if not self.portfolio.invested and self.new_strategy_direction == self.strategy_direction: return
# # No signal change to trigger an open trade.
# if not self.portfolio.invested and self.new_strategy_direction != self.strategy_direction:
# # Calculate the trade size
# self.cal_trade_size()
# # Open trade and compute the stop price
# self.open_trade()
# # Open daily Trailing-Stop-Loss trade
# if self.new_strategy_direction == "long":
# self.stop_price = self.starting_stop_price_long
# self.stop_market_ticket = self.open_daily_tsl_trade(-self.size)
# else:
# self.stop_price = self.starting_stop_price_short
# self.stop_market_ticket = self.open_daily_tsl_trade(self.size)
# # Start tracking price for trailing stop loss order
# self.highest_price = self.current_close #self.securities[self.underlying].close
# self.lowest_price = self.current_close #self.securities[self.underlying].close
# if self.portfolio.invested:
# # update trailing-stop-loss price
# self.update_tsl_order()
# #Plot the moving stop price on "Data Chart" with "Stop Price" series name
# self.plot_data()
def on_order_event(self, order_event):
if order_event.status != OrderStatus.FILLED:
return
self.Debug(order_event)
# def CustomBarHandler(self, bar):
# self.rollingwindow.add(bar)
# def Instrument_Risk(self, df) -> float:
# self.daily_returns = df.close.pct_change()
# self.risk = self.current_close * self.daily_returns[-self.instrument_look_back:].std()
# return self.risk
# def cal_annualized_instrument_risk(self):
# #self.Debug("Warming up is ready." + str(trade_bar_df.tail())) #close.pct_change())) #self.close_price_window[1]))
# self.instrument_risk = self.Instrument_Risk(self.trade_bar_df) #self.close_price_window)
# # self.Debug(str(self.instrument_risk))
# self.annalized_instrument_risk = self.instrument_risk * 16 # self.instrument_risk_annualizer * self.instrument_risk
# # self.Debug(f"Annualized risk by price points is {round(self.annalized_instrument_risk, 2)}")
# def cal_trade_size(self):
# self.target_exposure = self.target_risk*self.portfolio.total_portfolio_value
# self.Debug(f"Target exposure is {round(self.target_exposure, 2)}.")
# self.notional_exposure = self.target_exposure/self.annalized_instrument_risk
# self.size = math.floor(self.notional_exposure)
# self.Debug(f"Trade size is {str(self.size)}.")
# def cal_trade_signal(self):
# self.long_ma = self.trade_bar_df.close.rolling(window = self.long_look_back).mean()
# self.short_ma = self.trade_bar_df.close.rolling(window = self.short_look_back).mean()
# self.short_ma_current_value = self.short_ma[-1]
# self.long_ma_current_value = self.long_ma[-1]
# if self.short_ma_current_value > self.long_ma_current_value :
# self.new_strategy_direction = "long"
# else:
# self.new_strategy_direction = "short"
# if self.new_strategy_direction != self.strategy_direction:
# self.debug(f"Strategy direction changed to : {self.new_strategy_direction}.")
# def open_trade(self):
# self.strategy_direction = self.new_strategy_direction
# if self.strategy_direction == "long":
# self.market_order(self.underlying, self.size)
# else:
# self.market_order(self.underlying, -1 * self.size)
# def open_daily_tsl_trade(self, size):
# self.stop_market_order(self.underlying, size, self.stop_price)
# def update_tsl_order(self):
# self.start_stop_price = self.stop_price
# if self.strategy_direction == "long":
# if self.current_close > self.highest_price:
# delta = self.highest_price - self.current_close
# self.stop_price = self.start_stop_price - delta
# self.highest_price = self.current_close
# else:
# self.stop_price = self.start_stop_price
# self.stop_market_ticket = self.open_daily_tsl_trade(-self.size)
# if self.strategy_direction == "short":
# if self.current_close < self.lowest_price:
# delta = self.lowest_price - self.current_close
# self.stop_price = self.start_stop_price - delta
# self.lowest_price = self.current_close
# else:
# self.stop_price = self.start_stop_price
# self.stop_market_ticket = self.open_daily_tsl_trade(self.size)
# # def plot_data(self):
# # self.plot("Data Chart", "Asset Price", self.current_close)
# # self.plot("Data Chart", "Stop Price", self.stop_price)
# def plot_data(self):
# self.plot("Data Chart", "Asset Price", self.current_close)
# self.plot("Data Chart", "Stop Price", self.stop_price)
# self.plot("Data Chart", "Short MA", self.short_ma_current_value)
# self.plot("Data Chart", "Long MA", self.long_ma_current_value)#region imports
from AlgorithmImports import *
import numpy as np
#endregion
class BufferedPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, rebalance, buffer_scaler):
super().__init__(rebalance)
self.buffer_scaler = buffer_scaler
self.is_size_risk_adjusted = True
self.target_risk = 0.12
if self.is_size_risk_adjusted == True:
self.target_exposure = self.target_risk #* self.portfolio.total_portfolio_value
else:
self.target_exposure = 1 #self.portfolio.total_portfolio_value
def create_targets(self, algorithm: QCAlgorithm, active_insights: List[Insight]) -> List[PortfolioTarget]:
# targets = super().determine_target_percent(insights)
'''Will determine the target percent for each insight
Args:
active_insights: The active insights to generate a target for'''
targets = {}
# give equal weighting to each security
# count = sum(x.direction != InsightDirection.FLAT and self.respect_portfolio_bias(x) for x in active_insights)
count = len(active_insights) #sum(self.respect_portfolio_bias(x) for x in active_insights)
percent = 0 if count == 0 else self.target_exposure #1.0 / count
for insight in active_insights:
algorithm.Log(f"Insights to begin with -> {insight.direction}, {algorithm.Portfolio[insight.symbol].Invested}")
targets[insight] = (insight.direction if self.respect_portfolio_bias(insight) else InsightDirection.FLAT) * percent
#return result
total_portfolio_value = algorithm.Portfolio.TotalPortfolioValue
adj_targets = []
for insight in active_insights:
security = algorithm.securities[insight.symbol]
# optimal_position = security.position
# optimal_position = targets[insight] #.target.security.position
optimal_position = targets[insight]*total_portfolio_value / security.Price # insight.Weight is the desired weight.
# optimal_position_quantity = optimal_position * self.TotalPortfolioValueHolding / security.Price
#optimal_position_quantity = int(optimal_position_quantity)
# Create buffer zone to reduce churn
buffer_width = self.buffer_scaler * abs(optimal_position)
upper_buffer = round(optimal_position + buffer_width)
lower_buffer = round(optimal_position - buffer_width)
# Determine quantity to put holdings into buffer zone
current_holdings = algorithm.Portfolio[insight.symbol].quantity #security.holdings.quantity
if not algorithm.Portfolio[insight.symbol].invested: #current_holdings == 0:
quantity = optimal_position
# if lower_buffer <= current_holdings <= upper_buffer:
# continue
# elif current_holdings > 0:
# quantity = lower_buffer if current_holdings < lower_buffer else upper_buffer
# else:
# quantity = upper_buffer if current_holdings > upper_buffer else lower_buffer
else:
quantity = optimal_position #current_holdings
# algorithm.Log(f"CreateTargets Add -> {insight.Symbol} {insight.Direction} {quantity} ")
# Place trades
adj_targets.append(PortfolioTarget(insight.symbol, quantity))
# else:
# adj_targets.append(PortfolioTarget(insight.symbol, current_holdings))
return adj_targets
# for target in targets:
# if target.quantity == 0:
# adj_targets.append(target)
# return adj_targets
from AlgorithmImports import *
class TrailingStopUsingInstrumentalRiskManagementModel(RiskManagementModel):
'''Provides an implementation of RiskManagementModel that limits the maximum loss per share propotional to the instrumental risk measurd as
the dollar value of the annualized standard deviation of price'''
ret_by_symbol = {}
def __init__(self, stop_price_instrument_ratio = 0.5):
'''Initializes a new instance of the TrailingStopRiskManagementModel class
Args:
maximum_drawdown_percent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown'''
self.stop_price_instrument_ratio = stop_price_instrument_ratio
self.trailing_absolute_holdings_state = dict()
def manage_risk(self, algorithm, targets):
'''Manages the algorithm's risk at each time step
Args:
algorithm: The algorithm instance
targets: The current portfolio targets to be assessed for risk'''
risk_adjusted_targets = list()
# algorithm.plot("Risks", "Annualized Instrument Risk", self.annalized_instrument_risk)
for kvp in algorithm.securities:
symbol = kvp.key
security = kvp.value
# Remove if not invested
if not security.invested:
self.trailing_absolute_holdings_state.pop(symbol, None)
continue
position = PositionSide.LONG if security.holdings.is_long else PositionSide.SHORT
absolute_holdings_value = security.holdings.absolute_holdings_value
trailing_absolute_holdings_state = self.trailing_absolute_holdings_state.get(symbol)
# Add newly invested security (if doesn't exist) or reset holdings state (if position changed)
if trailing_absolute_holdings_state == None or position != trailing_absolute_holdings_state.position:
self.trailing_absolute_holdings_state[symbol] = trailing_absolute_holdings_state = HoldingsState(position, security.holdings.absolute_holdings_cost)
trailing_absolute_holdings_value = trailing_absolute_holdings_state.absolute_holdings_value
# Check for new max (for long position) or min (for short position) absolute holdings value
if ((position == PositionSide.LONG and trailing_absolute_holdings_value < absolute_holdings_value) or
(position == PositionSide.SHORT and trailing_absolute_holdings_value > absolute_holdings_value)):
self.trailing_absolute_holdings_state[symbol].absolute_holdings_value = absolute_holdings_value
continue
# Debug
# algorithm.Log(f"Trailing holdings value {round(trailing_absolute_holdings_value,2)}")
# drawdown = abs((trailing_absolute_holdings_value - absolute_holdings_value) / trailing_absolute_holdings_value)
drawdown_2 = abs((trailing_absolute_holdings_value - absolute_holdings_value) / trailing_absolute_holdings_value)
drawdown = drawdown_2
self.target_drawdown_percent = self.get_data(symbol)
algorithm.Log(f"drawdown is {drawdown} vs target {self.target_drawdown_percent}")
if self.target_drawdown_percent < drawdown:
# Cancel insights
algorithm.insights.cancel([ symbol ]);
self.trailing_absolute_holdings_state.pop(symbol, None)
# liquidate
risk_adjusted_targets.append(PortfolioTarget(symbol, 0))
return risk_adjusted_targets
def get_data(self, symbol: Symbol) -> float:
symbol_data = self.ret_by_symbol.get(symbol)
if not symbol_data:
return 0
self.annalized_instrument_risk = symbol_data.std_ret * 16
target_drawdown_percent = self.annalized_instrument_risk * self.stop_price_instrument_ratio
return target_drawdown_percent
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
for added in changes.added_securities:
# Add SymbolData class to handle log returns.
self.ret_by_symbol[added.symbol] = SymbolData(algorithm, added.symbol)
for removed in changes.removed_securities:
# Stop subscription on the data to release computational resources.
symbol_data = self.ret_by_symbol.pop(removed.symbol, None)
if symbol_data:
symbol_data.dispose()
class SymbolData:
def __init__(self, algorithm: QCAlgorithm, symbol: Symbol) -> None:
self.algorithm = algorithm
self.symbol = symbol
# Since the return is assumed log-normal, we use the log return indicator to calculate TVaR later.
self.ret = RateOfChange(1)
# Register the indicator for automatic updating for daily returns.
algorithm.register_indicator(symbol, self.ret, Resolution.DAILY)
# Set up a rolling window to save the return for calculating the mean and SD for TVaR calculation.
self.window = RollingWindow[float](252)
# Add a handler to save the return to the rolling window.
self.ret.updated += lambda _, point: self.window.add(point.value)
# Warm up the rolling window.
history = algorithm.history[TradeBar](symbol, 252, Resolution.DAILY)
for bar in history:
self.ret.update(bar.end_time, bar.close)
# algorithm.Log(f"Close prices for risk calculation are {history[-2:]}.")
@property
def is_ready(self) -> bool:
return self.window.is_ready
@property
def std_ret(self) -> float:
# Standard deviation of return for instrument risk calculation.
return np.std(list(self.window)) #, ddof=1)
def dispose(self) -> None:
# Stop subscription on the data to release computational resources.
self.algorithm.deregister_indicator(self.ret)
class HoldingsState:
def __init__(self, position, absolute_holdings_value):
self.position = position
self.absolute_holdings_value = absolute_holdings_value#region imports
from AlgorithmImports import *
#endregion
class StarterUniverseSelectionModel(ManualUniverseSelectionModel):
def __init__(self):
tickers = [
# "SPY", # iShares MSCI Japan Index ETF
"XLE", # iShare Energy Sector
#"AAPL", # AAPL stock
# "EFNL", # iShares MSCI Finland Capped Investable Market Index ETF
# "EWW", # iShares MSCI Mexico Inv. Mt. Idx
# "ERUS", # iShares MSCI Russia ETF
# "IVV", # iShares S&P 500 Index
# "AUD", # Australia Bond Index Fund
# "EWQ", # iShares MSCI France Index ETF
# "EWH", # iShares MSCI Hong Kong Index ETF
# "EWI", # iShares MSCI Italy Index ETF
# "EWY", # iShares MSCI South Korea Index ETF
# "EWP", # iShares MSCI Spain Index ETF
# "EWD", # iShares MSCI Sweden Index ETF
# "EWL", # iShares MSCI Switzerland Index ETF
# "EWC", # iShares MSCI Canada Index ETF
# "EWZ", # iShares MSCI Brazil Index ETF
# "EWO", # iShares MSCI Austria Investable Mkt Index ETF
# "EWK", # iShares MSCI Belgium Investable Market Index ETF
# "BRAQ", # Global X Brazil Consumer ETF
# "ECH" # iShares MSCI Chile Investable Market Index ETF
]
symbols = [Symbol.create(ticker, SecurityType.EQUITY, Market.USA) for ticker in tickers]
super().__init__(symbols)
#region imports
from AlgorithmImports import *
#endregion
def GetPositionSize(group):
subcategories = {}
for category, subcategory in group.values():
if category not in subcategories:
subcategories[category] = {subcategory: 0}
elif subcategory not in subcategories[category]:
subcategories[category][subcategory] = 0
subcategories[category][subcategory] += 1
category_count = len(subcategories.keys())
subcategory_count = {category: len(subcategory.keys()) for category, subcategory in subcategories.items()}
weights = {}
for symbol in group:
category, subcategory = group[symbol]
weight = 1 / category_count / subcategory_count[category] / subcategories[category][subcategory]
weights[symbol] = weight
return weights