Consolidator Types
Count Consolidators
Introduction
Count consolidators aggregate data based on a number of bars or ticks. This type of consolidator aggregates n samples together, regardless of the time period the samples cover. Managing count consolidators is similiar to managing time period consolidators, except you create count consolidators with an integer argument instead of a time-based argument.
Consolidate Trade Bars
TradeBar consolidators aggregate TradeBar objects into TradeBar objects of the same size or larger. Follow these steps to create and manage a TradeBar consolidator based on a number of samples:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a count consolidator, pass the number of samples to the consolidator constructor.
_consolidator = new TradeBarConsolidator(10);
self._consolidator = TradeBarConsolidator(10)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, TradeBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
pass
When the consolidator receives the n-th data point, it passes the consolidated bar to the event handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a TradeBar object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
trade_bar = slice.bars[self._symbol]
self._consolidator.update(trade_bar)
# Example 2: Update the consolidator with data from a history request
history = self.history[TradeBar](self._symbol, 30, Resolution.MINUTE)
for trade_bar in history:
self._consolidator.update(trade_bar) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var tradeBar = slice.Bars[_symbol];
_consolidator.Update(tradeBar);
}
// Example 2: Update the consolidator with data from a history request
var history = History<TradeBar>(_symbol, 30, Resolution.Minute);
foreach (var tradeBar in history)
{
_consolidator.Update(tradeBar);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Quote Bars
QuoteBar consolidators aggregate QuoteBar objects into QuoteBar objects of the same size or larger. Follow these steps to create and manage a QuoteBar consolidator based on a number of samples:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a count consolidator, pass the number of samples to the consolidator constructor.
_consolidator = new QuoteBarConsolidator(10);
self._consolidator = QuoteBarConsolidator(10)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, QuoteBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None:
pass
When the consolidator receives the n-th data point, it passes the consolidated bar to the event handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a QuoteBar object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
quote_bar = slice.quote_bars[self._symbol]
self._consolidator.update(quote_bar)
# Example 2: Update the consolidator with data from a history request
history = self.history[QuoteBar](self._symbol, 30, Resolution.MINUTE)
for quote_bar in history:
self._consolidator.update(quote_bar) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var quoteBar = slice.QuoteBars[_symbol];
_consolidator.Update(quoteBar);
}
// Example 2: Update the consolidator with data from a history request
var history = History<QuoteBar>(_symbol, 30, Resolution.Minute);
foreach (var quoteBar in history)
{
_consolidator.Update(quoteBar);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Trade Ticks
Tick consolidators aggregate Tick objects into TradeBar objects. Follow these steps to create and manage a Tick consolidator based on a number of samples:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a count consolidator, pass the number of samples to the consolidator constructor.
_consolidator = new TickConsolidator(10);
self._consolidator = TickConsolidator(10)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, TradeBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
pass
When the consolidator receives the n-th data point, it passes the consolidated bar to the event handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a Tick object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
ticks = slice.ticks[self._symbol]
for tick in ticks:
self._consolidator.update(tick)
# Example 2: Update the consolidator with data from a history request
ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK)
for tick in ticks:
self._consolidator.update(tick) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var ticks = slice.Ticks[_symbol];
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
}
// Example 2: Update the consolidator with data from a history request
var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick);
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Quote Ticks
Tick quote bar consolidators aggregate Tick objects that represent quotes into QuoteBar objects. Follow these steps to create and manage a Tick quote bar consolidator based on a number of samples:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a count consolidator, pass the number of samples to the consolidator constructor.
_consolidator = new TickQuoteBarConsolidator(10);
self._consolidator = TickQuoteBarConsolidator(10)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, QuoteBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None:
pass
When the consolidator receives the n-th data point, it passes the consolidated bar to the event handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a Tick object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
ticks = slice.ticks[self._symbol]
for tick in ticks:
self._consolidator.update(tick)
# Example 2: Update the consolidator with data from a history request
ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK)
for tick in ticks:
self._consolidator.update(tick) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var ticks = slice.Ticks[_symbol];
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
}
// Example 2: Update the consolidator with data from a history request
var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick);
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Other Data
<dataType> consolidators aggregate various types of data objects into <dataType> objects of the same size or larger. If you consolidate custom data or alternative datasets, check the definition of the data class to see its data type. Follow these steps to create and manage a <dataType> consolidator based on a number of samples:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a count consolidator, pass the number of samples to the consolidator constructor.
_consolidator = new DynamicDataConsolidator(10);
self._consolidator = DynamicDataConsolidator(10)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, <dataType> consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: <dataType>) -> None:
pass
When the consolidator receives the n-th data point, it passes the consolidated bar to the event handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the data subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a <dataType> object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
if self._symbol in slice:
self._consolidator.update(slice[self._symbol])
# Example 2: Update the consolidator with data from a history request
history = self.history[<dataType>](self._symbol, 30, Resolution.DAILY)
for data in history:
self._consolidator.update(data) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
if slice.ContainsKey(_symbol)
{
_consolidator.Update(slice[_symbol]);
}
}
// Example 2: Update the consolidator with data from a history request
var history = History<<dataType>>(_symbol, 30, Resolution.Daily);
foreach (var data in history)
{
_consolidator.Update(data);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Reset Consolidators
To reset a consolidator, call its Resetreset method.
self._consolidator.reset()
_consolidator.Reset();
If you are live trading Equities or backtesting Equities without the adjusted data normalization mode,
reset your consolidators when splits and dividends occur.
When a split or dividend occurs while the consolidator is in the process of building a bar, the open, high, and low may reflect prices from before the split or dividend.
To avoid issues, call the consolidator's Resetreset method and then warm it up with ScaledRawSCALED_RAW data from a history request.
def on_data(self, data: Slice):
# When a split or dividend occurs...
if (data.splits.contains_key(self._symbol) and data.splits[self._symbol].type == SplitType.SPLIT_OCCURRED or
data.dividends.contains_key(self._symbol)):
# If the consolidator is working on a bar...
if self._consolidator.working_data:
# Get adjusted prices for the time period of the working bar.
history = self.history[TradeBar](self._symbol, self._consolidator.working_data.time, self.time, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
# Reset the consolidator.
self._consolidator.reset()
# Warm-up the consolidator with the adjusted price data.
for bar in history:
self._consolidator.update(bar) public override void OnData(Slice data)
{
// When a split or dividend occurs...
if ((data.Splits.ContainsKey(_symbol) && data.Splits[_symbol].Type == SplitType.SplitOccurred) ||
data.Dividends.ContainsKey(_symbol))
{
// If the consolidator is working on a bar...
if (_consolidator.WorkingData != null)
{
// Get adjusted prices for the time period of the working bar.
var history = History<TradeBar>(_symbol, _consolidator.WorkingData.Time, Time, dataNormalizationMode: DataNormalizationMode.ScaledRaw);
// Reset the consolidator.
_consolidator.Reset();
// Warm-up the consolidator with the adjusted price data.
foreach (var bar in history)
{
_consolidator.Update(bar);
}
}
}
}
Examples
The following examples demonstrate common practices for implementing count consolidator.
Example 1: Tick Count Consolidator
public class CountConsolidatorAlgorithm : QCAlgorithm
{
private Symbol _spy;
// Create EMA indicators for trade signal generation.
private ExponentialMovingAverage _ema = new(14);
// The _day variable controls trade daily when open.
private int _day = -1;
public override void Initialize()
{
SetStartDate(2024, 9, 1);
SetEndDate(2024, 12, 31);
// Request SPY data to feed to indicators and trade.
_spy = AddEquity("SPY", Resolution.Tick).Symbol;
// Create a 10000 trade tick consolidator to feed the "volume" bar to the EMA indicator for trend entry/exit signal.
var tickConsolidator = new TickConsolidator(10000);
// Subscribe the consolidators to SPY data to automatically update the indicators.
RegisterIndicator(_spy, _ema, tickConsolidator);
SetWarmUp(120);
}
public override void OnData(Slice slice)
{
if (_ema.IsReady && slice.Ticks.TryGetValue(_spy, out var ticks) && slice.Time.Day != _day)
{
// Obtain the latest quote price from the latest tick.
var lastTick = ticks.MaxBy(x => x.EndTime);
// Trade the trend by EMA crosses.
if (lastTick.Price > _ema && !Portfolio[_spy].IsLong)
{
SetHoldings(_spy, 0.5m);
}
else if (lastTick.Price < _ema && !Portfolio[_spy].IsShort)
{
SetHoldings(_spy, -0.5m);
}
_day = slice.Time.Day;
}
}
} class CountConsolidatorAlgorithm(QCAlgorithm):
# Create EMA indicators for trade signal generation.
_ema = ExponentialMovingAverage(14)
# The _day variable controls trade daily when open.
_day = -1
def initialize(self) -> None:
self.set_start_date(2024, 9, 1)
self.set_end_date(2024, 12, 31)
# Request SPY data to feed to indicators and trade.
self.spy = self.add_equity("SPY", Resolution.TICK).symbol
# Create a 10000 trade tick consolidator to feed the "volume" bar to the EMA indicator for trend entry/exit signal.
tick_consolidator = TickConsolidator(10000)
# Subscribe the consolidators to SPY data to automatically update the indicators.
self.register_indicator(self.spy, self._ema, tick_consolidator)
self.set_warm_up(120)
def on_data(self, slice: Slice) -> None:
ticks = slice.ticks.get(self.spy)
if self._ema.is_ready and ticks and self._day != slice.time.day:
# Obtain the latest quote price from the latest tick.
last_tick = sorted(ticks, key=lambda x: x.end_time, reverse=True)[0]
ema = self._ema.current.value
# Trade the trend by EMA crosses.
if last_tick.price > ema and not self.portfolio[self.spy].is_long:
self.set_holdings(self.spy, 0.5)
elif last_tick.price < ema and not self.portfolio[self.spy].is_short:
self.set_holdings(self.spy, -0.5)
self._day = slice.time.day
Other Examples
For more examples, see the following algorithms: