Consolidator Types
Market Hour Aware Consolidators
Introduction
Market hour aware consolidators aggregate data based on the asset's trading hours. For example, if you create a 7-minute consolidator, the first consolidated bar of each day spans the first 7 minutes of the regular trading session. If you enable extended market hours for the asset and the consolidator, the first consolidated bar of each day spans the first 7 minutes of the pre-market trading session and continue in 7-minute increments from there.
Consolidate Trade Bars
TradeBar consolidators aggregate TradeBar objects into TradeBar objects of the same size or larger. Follow these steps to create and manage a TradeBar consolidator based on regular or extended market hours:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a market hour aware consolidator, pass the following arguments to the MarketHourAwareConsolidator constructor:
| Argument | Data Type | Description |
|---|---|---|
dailyStrictEndTimeEnableddaily_strict_end_time_enabled | bool | Whether daily bars should have an EndTimeend_time that matches the market close time (Truetrue) instead of the following midnight (Falsefalse). This argument usually matches your DailyPreciseEndTimedaily_precise_end_time setting. |
period | TimeSpantimedelta | The duration of the consolidation period. The first consolidation period of each day always starts at the beginning of the regular or extended market hours. |
dataTypedata_type | Typetype | The type of data the consolidator produces. In this case, use TradeBar. |
tickTypetick_type | TickType | The type of data to consolidate. In this case, use TickType.TradeTickType.TRADE. |
extendedMarketHoursextended_market_hours | bool | Whether or not to consolidate data from extended market hours. If Falsefalse, the consolidator ignores data that occurs while the market is closed and anchors bars to the regular market open. If Truetrue, it anchors bars to the start of the extended market hours. |
_consolidator = new MarketHourAwareConsolidator(true, TimeSpan.FromMinutes(7), typeof(TradeBar), TickType.Trade, false);
self._consolidator = MarketHourAwareConsolidator(True, timedelta(minutes=7), TradeBar, TickType.TRADE, False)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, TradeBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
pass
When the consolidation period ends, LEAN passes the consolidated bar to the consolidation handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a TradeBar object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
trade_bar = slice.bars[self._symbol]
self._consolidator.update(trade_bar)
# Example 2: Update the consolidator with data from a history request
history = self.history[TradeBar](self._symbol, 30, Resolution.MINUTE)
for trade_bar in history:
self._consolidator.update(trade_bar) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var tradeBar = slice.Bars[_symbol];
_consolidator.Update(tradeBar);
}
// Example 2: Update the consolidator with data from a history request
var history = History<TradeBar>(_symbol, 30, Resolution.Minute);
foreach (var tradeBar in history)
{
_consolidator.Update(tradeBar);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Quote Bars
QuoteBar consolidators aggregate QuoteBar objects into QuoteBar objects of the same size or larger. Follow these steps to create and manage a QuoteBar consolidator based on regular or extended market hours:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a market hour aware consolidator, pass the following arguments to the MarketHourAwareConsolidator constructor:
| Argument | Data Type | Description |
|---|---|---|
dailyStrictEndTimeEnableddaily_strict_end_time_enabled | bool | Whether daily bars should have an EndTimeend_time that matches the market close time (Truetrue) instead of the following midnight (Falsefalse). This argument usually matches your DailyPreciseEndTimedaily_precise_end_time setting. |
period | TimeSpantimedelta | The duration of the consolidation period. The first consolidation period of each day always starts at the beginning of the regular or extended market hours. |
dataTypedata_type | Typetype | The type of data the consolidator produces. In this case, use QuoteBar. |
tickTypetick_type | TickType | The type of data to consolidate. In this case, use TickType.QuoteTickType.QUOTE. |
extendedMarketHoursextended_market_hours | bool | Whether or not to consolidate data from extended market hours. If Falsefalse, the consolidator ignores data that occurs while the market is closed and anchors bars to the regular market open. If Truetrue, it anchors bars to the start of the extended market hours. |
_consolidator = new MarketHourAwareConsolidator(true, TimeSpan.FromMinutes(7), typeof(QuoteBar), TickType.Quote, false);
self._consolidator = MarketHourAwareConsolidator(True, timedelta(minutes=7), QuoteBar, TickType.QUOTE, False)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, QuoteBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None:
pass
When the consolidation period ends, LEAN passes the consolidated bar to the consolidation handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a QuoteBar object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
quote_bar = slice.quote_bars[self._symbol]
self._consolidator.update(quote_bar)
# Example 2: Update the consolidator with data from a history request
history = self.history[QuoteBar](self._symbol, 30, Resolution.MINUTE)
for quote_bar in history:
self._consolidator.update(quote_bar) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var quoteBar = slice.QuoteBars[_symbol];
_consolidator.Update(quoteBar);
}
// Example 2: Update the consolidator with data from a history request
var history = History<QuoteBar>(_symbol, 30, Resolution.Minute);
foreach (var quoteBar in history)
{
_consolidator.Update(quoteBar);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Trade Ticks
Tick consolidators aggregate Tick objects into TradeBar objects. Follow these steps to create and manage a Tick consolidator based on regular or extended market hours:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a market hour aware consolidator, pass the following arguments to the MarketHourAwareConsolidator constructor:
| Argument | Data Type | Description |
|---|---|---|
dailyStrictEndTimeEnableddaily_strict_end_time_enabled | bool | Whether daily bars should have an EndTimeend_time that matches the market close time (Truetrue) instead of the following midnight (Falsefalse). This argument usually matches your DailyPreciseEndTimedaily_precise_end_time setting. |
period | TimeSpantimedelta | The duration of the consolidation period. The first consolidation period of each day always starts at the beginning of the regular or extended market hours. |
dataTypedata_type | Typetype | The type of data the consolidator produces. In this case, use Tick. |
tickTypetick_type | TickType | The type of data to consolidate. In this case, use TickType.TradeTickType.TRADE. |
extendedMarketHoursextended_market_hours | bool | Whether or not to consolidate data from extended market hours. If Falsefalse, the consolidator ignores data that occurs while the market is closed and anchors bars to the regular market open. If Truetrue, it anchors bars to the start of the extended market hours. |
_consolidator = new MarketHourAwareConsolidator(true, TimeSpan.FromMinutes(7), typeof(Tick), TickType.Trade, false);
self._consolidator = MarketHourAwareConsolidator(True, timedelta(minutes=7), Tick, TickType.TRADE, False)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, TradeBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
pass
When the consolidation period ends, LEAN passes the consolidated bar to the consolidation handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a Tick object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
ticks = slice.ticks[self._symbol]
for tick in ticks:
self._consolidator.update(tick)
# Example 2: Update the consolidator with data from a history request
ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK)
for tick in ticks:
self._consolidator.update(tick) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var ticks = slice.Ticks[_symbol];
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
}
// Example 2: Update the consolidator with data from a history request
var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick);
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Consolidate Quote Ticks
Tick quote bar consolidators aggregate Tick objects that represent quotes into QuoteBar objects. Follow these steps to create and manage a Tick quote bar consolidator based on regular or extended market hours:
- Create the consolidator.
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To create a market hour aware consolidator, pass the following arguments to the MarketHourAwareConsolidator constructor:
| Argument | Data Type | Description |
|---|---|---|
dailyStrictEndTimeEnableddaily_strict_end_time_enabled | bool | Whether daily bars should have an EndTimeend_time that matches the market close time (Truetrue) instead of the following midnight (Falsefalse). This argument usually matches your DailyPreciseEndTimedaily_precise_end_time setting. |
period | TimeSpantimedelta | The duration of the consolidation period. The first consolidation period of each day always starts at the beginning of the regular or extended market hours. |
dataTypedata_type | Typetype | The type of data the consolidator produces. In this case, use Tick. |
tickTypetick_type | TickType | The type of data to consolidate. In this case, use TickType.QuoteTickType.QUOTE. |
extendedMarketHoursextended_market_hours | bool | Whether or not to consolidate data from extended market hours. If Falsefalse, the consolidator ignores data that occurs while the market is closed and anchors bars to the regular market open. If Truetrue, it anchors bars to the start of the extended market hours. |
_consolidator = new MarketHourAwareConsolidator(true, TimeSpan.FromMinutes(7), typeof(Tick), TickType.Quote, false);
self._consolidator = MarketHourAwareConsolidator(True, timedelta(minutes=7), Tick, TickType.QUOTE, False)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself._consolidation_handler, not ConsolidationHandler()self._consolidation_handler().
void ConsolidationHandler(object sender, QuoteBar consolidatedBar)
{
} def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None:
pass
When the consolidation period ends, LEAN passes the consolidated bar to the consolidation handler.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidatoradd_consolidator method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Updateupdate method with a Tick object. You can update the consolidator with data from the Slice object in the OnDataon_data method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object
def on_data(self, slice: Slice) -> None:
ticks = slice.ticks[self._symbol]
for tick in ticks:
self._consolidator.update(tick)
# Example 2: Update the consolidator with data from a history request
ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK)
for tick in ticks:
self._consolidator.update(tick) // Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
var ticks = slice.Ticks[_symbol];
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
}
// Example 2: Update the consolidator with data from a history request
var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick);
foreach (var tick in ticks)
{
_consolidator.Update(tick);
}
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
Reset Consolidators
To reset a consolidator, call its Resetreset method.
self._consolidator.reset()
_consolidator.Reset();
If you are live trading Equities or backtesting Equities without the adjusted data normalization mode,
reset your consolidators when splits and dividends occur.
When a split or dividend occurs while the consolidator is in the process of building a bar, the open, high, and low may reflect prices from before the split or dividend.
To avoid issues, call the consolidator's Resetreset method and then warm it up with ScaledRawSCALED_RAW data from a history request.
def on_data(self, data: Slice):
# When a split or dividend occurs...
if (data.splits.contains_key(self._symbol) and data.splits[self._symbol].type == SplitType.SPLIT_OCCURRED or
data.dividends.contains_key(self._symbol)):
# If the consolidator is working on a bar...
if self._consolidator.working_data:
# Get adjusted prices for the time period of the working bar.
history = self.history[TradeBar](self._symbol, self._consolidator.working_data.time, self.time, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
# Reset the consolidator.
self._consolidator.reset()
# Warm-up the consolidator with the adjusted price data.
for bar in history:
self._consolidator.update(bar) public override void OnData(Slice data)
{
// When a split or dividend occurs...
if ((data.Splits.ContainsKey(_symbol) && data.Splits[_symbol].Type == SplitType.SplitOccurred) ||
data.Dividends.ContainsKey(_symbol))
{
// If the consolidator is working on a bar...
if (_consolidator.WorkingData != null)
{
// Get adjusted prices for the time period of the working bar.
var history = History<TradeBar>(_symbol, _consolidator.WorkingData.Time, Time, dataNormalizationMode: DataNormalizationMode.ScaledRaw);
// Reset the consolidator.
_consolidator.Reset();
// Warm-up the consolidator with the adjusted price data.
foreach (var bar in history)
{
_consolidator.Update(bar);
}
}
}
}
Examples
The following examples demonstrate some common practices for market hour aware consolidators.
Example 1: Intraday Bars Anchored to the Market Open
The following algorithm creates a MarketHourAwareConsolidator with a 7-minute period and registers it for automatic updates.
The consolidator anchors intraday bars to the market open, so the first 7-minute bar of each day arrives 7 minutes after the market opens.
public class MarketHourAwareConsolidatorExampleAlgorithm : QCAlgorithm
{
private MarketHourAwareConsolidator _consolidator;
public override void Initialize()
{
SetStartDate(2024, 9, 1);
SetEndDate(2024, 12, 31);
var symbol = AddEquity("SPY", Resolution.Minute).Symbol;
// Create a market hour aware consolidator that builds 7-minute TradeBar objects anchored to the market open.
_consolidator = new MarketHourAwareConsolidator(true, TimeSpan.FromMinutes(7), typeof(TradeBar), TickType.Trade, false);
// Attach a consolidation handler. The DataConsolidated event provides the bar as an
// IBaseData object, so cast it to a TradeBar.
_consolidator.DataConsolidated += (sender, consolidated) => OnBar(sender, (TradeBar)consolidated);
// Register the consolidator for automatic updates.
SubscriptionManager.AddConsolidator(symbol, _consolidator);
}
private void OnBar(object sender, TradeBar bar)
{
// Plot the close of each 7-minute bar built from regular trading hours only.
Plot("SPY", "7-Minute Close", bar.Close);
}
} class MarketHourAwareConsolidatorExampleAlgorithm(QCAlgorithm):
def initialize(self) -> None:
self.set_start_date(2024, 9, 1)
self.set_end_date(2024, 12, 31)
self._symbol = self.add_equity("SPY", Resolution.MINUTE).symbol
# Create a market hour aware consolidator that builds 7-minute TradeBar objects anchored to the market open.
self._consolidator = MarketHourAwareConsolidator(True, timedelta(minutes=7), TradeBar, TickType.TRADE, False)
# Attach a consolidation handler.
self._consolidator.data_consolidated += self._on_bar
# Register the consolidator for automatic updates.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
def _on_bar(self, sender: object, bar: TradeBar) -> None:
# Plot the close of each 7-minute bar built from regular trading hours only.
self.plot("SPY", "7-Minute Close", bar.close)