from QuantConnect.Indicators.CandlestickPatterns import Piercing
class PierceAlpha(AlphaModel):
symbol_data_by_symbol = {}
def Update(self, algorithm, slice):
self.insights = []
for symbol, symbol_data in self.symbol_data_by_symbol.items():
## Check for all Symbols in current data Slice
if not (slice.ContainsKey(symbol) and slice[symbol] is not None):
continue
if symbol_data.indicator.IsReady:
indicator_value = symbol_data.indicator.Current.Value
algorithm.Plot("Indicator", str(symbol), indicator_value)
if indicator_value == 1:
#self.algorithm.Debug(f"Time: {slice.Time} pattern is 1 going long on {symbol}")
self.insights.append(Insight.Price(symbol, timedelta(days = 5), InsightDirection.Up, None, None, None, 1))
elif indicator_value == -1:
#self.algorithm.Debug(f"{symbol} pattern is -1 going short")
self.insights.append(Insight.Price(symbol, timedelta(days = 5), InsightDirection.Down))
return self.insights
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
self.symbol_data_by_symbol[symbol] = SymbolData(symbol, algorithm)
for security in changes.RemovedSecurities:
symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None)
if symbol_data:
symbol_data.dispose()
class SymbolData:
def __init__(self, symbol, algorithm):
self.symbol = symbol
self.algorithm = algorithm
self.indicator = Piercing()
# Setup consolidator to update indicator
self.consolidator = TradeBarConsolidator(1)
self.consolidator.DataConsolidated += self.consolidation_handler
algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
def consolidation_handler(self, sender, consolidated):
self.indicator.Update(consolidated)
def dispose(self):
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
class InsightWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that generates percent targets based on the
Insight.Weight. The target percent holdings of each Symbol is given by the Insight.Weight from the last
active Insight for that symbol.
For insights of direction InsightDirection.Up, long targets are returned and for insights of direction
InsightDirection.Down, short targets are returned.
If the sum of all the last active Insight per symbol is bigger than 1, it will factor down each target
percent holdings proportionally so the sum is 1.
It will ignore Insight that have no Insight.Weight value.'''
def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort):
'''Initialize a new instance of InsightWeightingPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
super().__init__(rebalance, portfolioBias)
def ShouldCreateTargetForInsight(self, insight):
'''Method that will determine if the portfolio construction model should create a
target for this insight
Args:
insight: The insight to create a target for'''
# Ignore insights that don't have Weight value
return insight.Weight is not None
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# We will adjust weights proportionally in case the sum is > 1 so it sums to 1.
weightSums = sum(self.GetValue(insight) for insight in activeInsights if self.RespectPortfolioBias(insight))
weightFactor = 1.0
if weightSums > 1:
weightFactor = 1 / weightSums
for insight in activeInsights:
result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * self.GetValue(insight) * weightFactor
return result
def GetValue(self, insight):
'''Method that will determine which member will be used to compute the weights and gets its value
Args:
insight: The insight to create a target for
Returns:
The value of the selected insight member'''
return insight.Weight
class TrailingStopRiskModel(RiskManagementModel):
'''Provides an implementation of IRiskManagementModel that limits the maximum possible loss
measured from the highest unrealized profit'''
def __init__(self, algorithm, maximumDrawdownPercent):
'''Initializes a new instance of the TrailingStopRiskManagementModel class
Args:
maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown'''
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
self.trailingHighs = dict()
self.algorithm = algorithm
def ManageRisk(self, algorithm, targets):
'''Manages the algorithm's risk at each time step
Args:
algorithm: The algorithm instance
targets: The current portfolio targets to be assessed for risk'''
riskAdjustedTargets = list()
for kvp in algorithm.Securities:
symbol = kvp.Key
security = kvp.Value
# Remove if not invested
if not security.Invested:
self.trailingHighs.pop(symbol, None)
continue
# Add newly invested securities
if symbol not in self.trailingHighs:
self.trailingHighs[symbol] = security.Holdings.AveragePrice # Set to average holding cost
continue
# Check for new highs and update - set to tradebar high
if self.trailingHighs[symbol] < security.High:
self.trailingHighs[symbol] = security.High
continue
# Check for securities past the drawdown limit
securityHigh = self.trailingHighs[symbol]
drawdown = (security.Low / securityHigh) - 1
if drawdown < self.maximumDrawdownPercent:
# liquidate
#self.algorithm.Debug(f"{algorithm.Time}Past the drawdown limit... Liquidating")
riskAdjustedTargets.append(PortfolioTarget(symbol, 0))
return riskAdjustedTargets
from CandleStickAlphaModel import PierceAlpha # Alpha Model
from ImmediateExecution import ImmediateExecutionModel # Execution Model
from TrailingStopModel import TrailingStopRiskModel # Risk Model
from InsightWeightingPortfolioConstructionModel import InsightWeightingPortfolioConstructionModel # Portfolio Construction
from datetime import timedelta
# https://www.quantconnect.com/forum/discussion/7684/creating-consolidated-data-from-universe-filters/p1
class DynamicNadionCoil(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 7, 7) # Set Start Date
self.SetCash(10000) # Set Strategy Cash
self.SetWarmUp(20)
# Manual Universe Selection
# self.symbol = self.AddEquity("TSLA", Resolution.Minute, Market.USA).Symbol
# self.consolidated = self.Consolidate(self.symbol, timedelta(days = 2), self.OneDayBarHandler)
self.numberOfSymbolsCoarse = 50
self.numberOfSymbolsFine = 30
self.dollarVolumeBySymbol = {}
self.lastMonth = -1
self.symbol = []
self.UniverseSettings.Resolution = Resolution.Daily
self.SetUniverseSelection(CoarseFundamentalUniverseSelectionModel(self.CoarseSelectionFunction))
self.AddAlpha(PierceAlpha())
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel())
self.SetRiskManagement(TrailingStopRiskModel(self, maximumDrawdownPercent = .1))
def CoarseSelectionFunction(self, coarse):
# The stocks must have fundamental data
# The stock must have positive previous-day close price
# The stock must have positive volume on the previous trading day
if self.Time.month == self.lastMonth:
return Universe.Unchanged
self.lastMonth = self.Time.month
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}
# If no security has met the QC500 criteria, the universe is unchanged.
# A new selection will be attempted on the next trading day as self.lastMonth is not updated
if len(self.dollarVolumeBySymbol) == 0:
return Universe.Unchanged
# return the symbol objects our sorted collection
return list(self.dollarVolumeBySymbol.keys())