Consolidator Types
Time Period Consolidators
Introduction
Time period consolidators aggregate data based on a period of time like a number of seconds, minutes, or days. If a time period consolidator aggregates data over multiple days, the multi-day aggregation cycle starts and repeats based on the time stamp of the first data point you use to update the consolidator.
Consolidate Trade Bars
TradeBar
consolidators aggregate TradeBar
objects into TradeBar
objects of the same size or larger. Follow these steps to create and manage a TradeBar
consolidator based on a period of time:
- Create the consolidator.
timedelta
PeriodsTimeSpan
PeriodsResolution
Periods- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use either a timedelta
TimeSpan
or Resolution
object. The consolidator time period must be greater than or equal to the resolution of the security subscription. For instance, you can aggregate minute bars into 10-minute bars, but you can't aggregate hour bars into 10-minute bars.
_consolidator = new TradeBarConsolidator(TimeSpan.FromDays(1)); // Aliases: // _consolidator = CreateConsolidator(TimeSpan.FromDays(1), typeof(TradeBar)); // _consolidator = ResolveConsolidator(_symbol, TimeSpan.FromDays(1));
self._consolidator = TradeBarConsolidator(timedelta(days=1)) # Aliases: # self._consolidator = self.create_consolidator(timedelta(days=1), TradeBar) # self._consolidator = self.resolve_consolidator(self._symbol, timedelta(days=1))
The time period is relative to the data time zone, not the algorithm time zone. If you consolidate Crypto data into daily bars, the event handler receives the consolidated bars at midnight 12:00 AM Coordinated Universal Time (UTC), regardless of the algorithm time zone.
The Resolution
enumeration has the following members:
_consolidator = TradeBarConsolidator.FromResolution(Resolution.Daily); // Alias: // _consolidator = ResolveConsolidator(_symbol, Resolution.Daily);
self._consolidator = TradeBarConsolidator.from_resolution(Resolution.DAILY) # Alias: # self._consolidator = self.resolve_consolidator(self._symbol, Resolution.DAILY)
If the security subscription in your algorithm provides TradeBar
and QuoteBar
data, ResolveConsolidator
resolve_consolidator
returns a TradeBarConsolidator
.
If you consolidate on an hourly basis, the consolidator ends at the top of the hour, not every hour after the market open. For US Equities, that's 10 AM Eastern Time (ET), not 10:30 AM ET.
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, TradeBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: pass
The consolidation event handler receives bars when the consolidated bar closes based on the data time zone. If you subscribe to minute resolution data for Bitcoin and create an hourly consolidator, you receive consolidated bars at the top of each hour. However, if you subscribe to minute resolution data for the regular trading hours of US Equities and create a daily consolidator, you receive consolidated bars at market close or midnight, depending on your DailyPreciseEndTime
daily_precise_end_time
setting.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a TradeBar
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: trade_bar = slice.bars[self._symbol] self._consolidator.update(trade_bar) # Example 2: Update the consolidator with data from a history request history = self.history[TradeBar](self._symbol, 30, Resolution.MINUTE) for trade_bar in history: self._consolidator.update(trade_bar)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var tradeBar = slice.Bars[_symbol]; _consolidator.Update(tradeBar); } // Example 2: Update the consolidator with data from a history request var history = History<TradeBar>(_symbol, 30, Resolution.Minute); foreach (var tradeBar in history) { _consolidator.Update(tradeBar); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
helper method to create period consolidators and register them for automatic updates. With just one line of code, you can create data in any time period based on a timedelta
TimeSpan
or Resolution
object:
TimeSpan
Periodstimedelta
PeriodsResolution
Periods
_consolidator = Consolidate(_symbol, TimeSpan.FromDays(1), ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, timedelta(days=1), self._consolidation_handler)
_consolidator = Consolidate(_symbol, Resolution.Daily, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Resolution.DAILY, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(TradeBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: TradeBar) -> None: pass
Consolidate Quote Bars
QuoteBar
consolidators aggregate QuoteBar
objects into QuoteBar
objects of the same size or larger. Follow these steps to create and manage a QuoteBar
consolidator based on a period of time:
- Create the consolidator.
timedelta
PeriodsTimeSpan
PeriodsResolution
Periods- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use either a timedelta
TimeSpan
or Resolution
object. The consolidator time period must be greater than or equal to the resolution of the security subscription. For instance, you can aggregate minute bars into 10-minute bars, but you can't aggregate hour bars into 10-minute bars.
_consolidator = new QuoteBarConsolidator(TimeSpan.FromDays(1)); // Aliases: // _consolidator = CreateConsolidator(TimeSpan.FromDays(1), typeof(QuoteBar)); // _consolidator = ResolveConsolidator(_symbol, TimeSpan.FromDays(1));
self._consolidator = QuoteBarConsolidator(timedelta(days=1)) # Aliases: # self._consolidator = self.create_consolidator(timedelta(days=1), QuoteBar) # self._consolidator = self.resolve_consolidator(self._symbol, timedelta(days=1))
The time period is relative to the data time zone, not the algorithm time zone. If you consolidate Crypto data into daily bars, the event handler receives the consolidated bars at midnight 12:00 AM Coordinated Universal Time (UTC), regardless of the algorithm time zone.
The Resolution
enumeration has the following members:
_consolidator = ResolveConsolidator(_symbol, Resolution.Daily);
self._consolidator = self.resolve_consolidator(self._symbol, Resolution.DAILY)
If the security subscription in your algorithm provides TradeBar
and QuoteBar
data, ResolveConsolidator
resolve_consolidator
returns a TradeBarConsolidator
.
If you consolidate on an hourly basis, the consolidator ends at the top of the hour, not every hour after the market open. For US Equities, that's 10 AM Eastern Time (ET), not 10:30 AM ET.
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, QuoteBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None: pass
The consolidation event handler receives bars when the consolidated bar closes based on the data time zone. If you subscribe to minute resolution data for Bitcoin and create an hourly consolidator, you receive consolidated bars at the top of each hour. However, if you subscribe to minute resolution data for the regular trading hours of US Equities and create a daily consolidator, you receive consolidated bars at market close or midnight, depending on your DailyPreciseEndTime
daily_precise_end_time
setting.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a QuoteBar
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: quote_bar = slice.quote_bars[self._symbol] self._consolidator.update(quote_bar) # Example 2: Update the consolidator with data from a history request history = self.history[QuoteBar](self._symbol, 30, Resolution.MINUTE) for quote_bar in history: self._consolidator.update(quote_bar)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var quoteBar = slice.QuoteBars[_symbol]; _consolidator.Update(quoteBar); } // Example 2: Update the consolidator with data from a history request var history = History<QuoteBar>(_symbol, 30, Resolution.Minute); foreach (var quoteBar in history) { _consolidator.Update(quoteBar); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
helper method to create period consolidators and register them for automatic updates. With just one line of code, you can create data in any time period based on a timedelta
TimeSpan
or Resolution
object:
TimeSpan
Periodstimedelta
PeriodsResolution
Periods
_consolidator = Consolidate<QuoteBar>(_symbol, TimeSpan.FromDays(1), TickType.Quote, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, timedelta(days=1), TickType.Quote, self._consolidation_handler)
_consolidator = Consolidate<QuoteBar>(_symbol, Resolution.Daily, TickType.Quote, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Resolution.DAILY, TickType.Quote, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(QuoteBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: QuoteBar) -> None: pass
Consolidate Trade Ticks
Tick
consolidators aggregate Tick
objects into TradeBar
objects. Follow these steps to create and manage a Tick
consolidator based on a period of time:
- Create the consolidator.
timedelta
PeriodsTimeSpan
PeriodsResolution
Periods- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use either a timedelta
TimeSpan
or Resolution
object. The consolidator time period must be greater than or equal to the resolution of the security subscription. For instance, you can aggregate minute bars into 10-minute bars, but you can't aggregate hour bars into 10-minute bars.
_consolidator = new TickConsolidator(TimeSpan.FromMilliseconds(100)); // Aliases: // _consolidator = CreateConsolidator(TimeSpan.FromMilliseconds(100), typeof(Tick)); // _consolidator = ResolveConsolidator(_symbol, TimeSpan.FromMilliseconds(100));
self._consolidator = TickConsolidator(timedelta(milliseconds=100)) # Aliases: # self._consolidator = self.create_consolidator(timedelta(milliseconds=100), Tick) # self._consolidator = self.resolve_consolidator(self._symbol, timedelta(milliseconds=100))
The time period is relative to the data time zone, not the algorithm time zone. If you consolidate Crypto data into daily bars, the event handler receives the consolidated bars at midnight 12:00 AM Coordinated Universal Time (UTC), regardless of the algorithm time zone.
The Resolution
enumeration has the following members:
_consolidator = ResolveConsolidator(_symbol, Resolution.Second);
self._consolidator = self.resolve_consolidator(self._symbol, Resolution.SECOND)
If you consolidate on an hourly basis, the consolidator ends at the top of the hour, not every hour after the market open. For US Equities, that's 10 AM Eastern Time (ET), not 10:30 AM ET.
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, TradeBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: pass
The consolidation event handler receives bars when the consolidated bar closes based on the data time zone. If you subscribe to tick resolution data for Bitcoin and create an hourly consolidator, you receive consolidated bars at the top of each hour. However, if you subscribe to tick resolution data for the regular trading hours of US Equities and create a daily consolidator, you receive consolidated bars at market close or midnight, depending on your DailyPreciseEndTime
daily_precise_end_time
setting.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a Tick
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: ticks = slice.ticks[self._symbol] for tick in ticks: self._consolidator.update(tick) # Example 2: Update the consolidator with data from a history request ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK) for tick in ticks: self._consolidator.update(tick)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var ticks = slice.Ticks[_symbol]; foreach (var tick in ticks) { _consolidator.Update(tick); } } // Example 2: Update the consolidator with data from a history request var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick); foreach (var tick in ticks) { _consolidator.Update(tick); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
helper method to create period consolidators and register them for automatic updates. With just one line of code, you can create data in any time period based on a timedelta
TimeSpan
or Resolution
object:
TimeSpan
Periodstimedelta
PeriodsResolution
Periods
_consolidator = Consolidate(_symbol, TimeSpan.FromMilliseconds(100), ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, timedelta(milliseconds=100), self._consolidation_handler)
_consolidator = Consolidate(_symbol, Resolution.Second, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Resolution.SECOND, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(TradeBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: TradeBar) -> None: pass
Consolidate Quote Ticks
Tick
quote bar consolidators aggregate Tick
objects that represent quotes into QuoteBar
objects. Follow these steps to create and manage a Tick
quote bar consolidator based on a period of time:
- Create the consolidator.
timedelta
PeriodsTimeSpan
PeriodsResolution
Periods- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use either a timedelta
TimeSpan
or Resolution
object. The consolidator time period must be greater than or equal to the resolution of the security subscription. For instance, you can aggregate minute bars into 10-minute bars, but you can't aggregate hour bars into 10-minute bars.
_consolidator = new TickQuoteBarConsolidator(TimeSpan.FromMilliseconds(100)); // Aliases: // _consolidator = CreateConsolidator(TimeSpan.FromMilliseconds(100), typeof(Tick), TickType.Quote); // _consolidator = ResolveConsolidator(_symbol, TimeSpan.FromMilliseconds(100), typeof(QuoteBar));
self._consolidator = TickQuoteBarConsolidator(timedelta(milliseconds=100)) # Aliases: # self._consolidator = self.create_consolidator(timedelta(milliseconds=100), Tick, TickType.QUOTE) # self._consolidator = self.resolve_consolidator(self._symbol, timedelta(milliseconds=100))
The time period is relative to the data time zone, not the algorithm time zone. If you consolidate Crypto data into daily bars, the event handler receives the consolidated bars at midnight 12:00 AM Coordinated Universal Time (UTC), regardless of the algorithm time zone.
The Resolution
enumeration has the following members:
_consolidator = ResolveConsolidator(_symbol, Resolution.Second);
self._consolidator = self.resolve_consolidator(self._symbol, Resolution.SECOND)
If you consolidate on an hourly basis, the consolidator ends at the top of the hour, not every hour after the market open. For US Equities, that's 10 AM Eastern Time (ET), not 10:30 AM ET.
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, QuoteBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None: pass
The consolidation event handler receives bars when the consolidated bar closes based on the data time zone. If you subscribe to tick resolution data for Bitcoin and create an hourly consolidator, you receive consolidated bars at the top of each hour. However, if you subscribe to tick resolution data for the regular trading hours of US Equities and create a daily consolidator, you receive consolidated bars at market close or midnight, depending on your DailyPreciseEndTime
daily_precise_end_time
setting.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a Tick
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: ticks = slice.ticks[self._symbol] for tick in ticks: self._consolidator.update(tick) # Example 2: Update the consolidator with data from a history request ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK) for tick in ticks: self._consolidator.update(tick)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var ticks = slice.Ticks[_symbol]; foreach (var tick in ticks) { _consolidator.Update(tick); } } // Example 2: Update the consolidator with data from a history request var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick); foreach (var tick in ticks) { _consolidator.Update(tick); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
helper method to create period consolidators and register them for automatic updates. With just one line of code, you can create data in any time period based on a timedelta
TimeSpan
or Resolution
object:
TimeSpan
Periodstimedelta
PeriodsResolution
Periods
_consolidator = Consolidate<QuoteBar>(_symbol, TimeSpan.FromMilliseconds(100), TickType.Quote, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, timedelta(milliseconds=100), TickType.Quote, self._consolidation_handler)
_consolidator = Consolidate<QuoteBar>(_symbol, Resolution.Second, TickType.Quote, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Resolution.SECOND, TickType.Quote, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(QuoteBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: QuoteBar) -> None: pass
Consolidate Other Data
<dataType>
consolidators aggregate various types of data objects into <dataType>
objects of the same size or larger. If you consolidate custom data or alternative datasets, check the definition of the data class to see its data type. Follow these steps to create and manage a <dataType>
consolidator based on a period of time:
- Create the consolidator.
timedelta
PeriodsTimeSpan
PeriodsResolution
Periods- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use either a timedelta
TimeSpan
or Resolution
object. The consolidator time period must be greater than or equal to the resolution of the security subscription. For instance, you can aggregate minute bars into 10-minute bars, but you can't aggregate hour bars into 10-minute bars.
_consolidator = new DynamicDataConsolidator(TimeSpan.FromDays(1)); // Aliases: // _consolidator = CreateConsolidator(TimeSpan.FromDays(1), typeof(<dataType>)); // _consolidator = ResolveConsolidator(_symbol, TimeSpan.FromDays(1));
self._consolidator = DynamicDataConsolidator(timedelta(days=1)) # Aliases: # self._consolidator = self.create_consolidator(timedelta(days=1), <dataType>) # self._consolidator = self.resolve_consolidator(self._symbol, timedelta(days=1))
The time period is relative to the data time zone, not the algorithm time zone. If you consolidate Crypto data into daily bars, the event handler receives the consolidated bars at midnight 12:00 AM Coordinated Universal Time (UTC), regardless of the algorithm time zone.
The Resolution
enumeration has the following members:
_consolidator = ResolveConsolidator(_symbol, Resolution.Daily);
self._consolidator = self.resolve_consolidator(self._symbol, Resolution.DAILY)
If you consolidate on an hourly basis, the consolidator ends at the top of the hour, not every hour after the market open. For US Equities, that's 10 AM Eastern Time (ET), not 10:30 AM ET.
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, <dataType> consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: <dataType>) -> None: pass
The consolidation event handler receives bars when the consolidated bar closes based on the data time zone.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the data subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a <dataType>
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: if self._symbol in slice: self._consolidator.update(slice[self._symbol]) # Example 2: Update the consolidator with data from a history request history = self.history[<dataType>](self._symbol, 30, Resolution.DAILY) for data in history: self._consolidator.update(data)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { if slice.ContainsKey(_symbol) { _consolidator.Update(slice[_symbol]); } } // Example 2: Update the consolidator with data from a history request var history = History<<dataType>>(_symbol, 30, Resolution.Daily); foreach (var data in history) { _consolidator.Update(data); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
helper method to create period consolidators and register them for automatic updates. With just one line of code, you can create data in any time period based on a timedelta
TimeSpan
or Resolution
object:
TimeSpan
Periodstimedelta
PeriodsResolution
Periods
_consolidator = Consolidate(_symbol, TimeSpan.FromDays(1), ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, timedelta(days=1), self._consolidation_handler)
_consolidator = Consolidate(_symbol, Resolution.Daily, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Resolution.DAILY, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(<dataType> consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: <dataType>) -> None: pass
Examples
The following examples demonstrate common practices for implementing time-period consolidators.
Example 1: RSI-MACD Trading
The following algorithm utilizes a 5-minute trade bar consolidator to feed the smoothed trade bar to an RSI indicator and a 15-minute quote bar consolidator to feed the smoothed quote bar to a MACD indicator, generating trade entry signals during an uptrend and oversold conditions while exiting when the trend turns down.
public class TimePeriodConsolidatorAlgorithm : QCAlgorithm { private Symbol _spy; // Create RSI and MACD indicators for trade signal generation. private RelativeStrengthIndex _rsi = new(14); private MovingAverageConvergenceDivergence _macd = new(12, 26, 9); public override void Initialize() { SetStartDate(2021, 1, 1); SetEndDate(2021, 2, 1); // Request SPY data to feed to indicators and trade. _spy = AddEquity("SPY").Symbol; // Create a 5-minute trade bar consolidator to feed the smoothed bar to the RSI indicator for the price action entry signal. var rsiConsolidator = new TradeBarConsolidator(TimeSpan.FromMinutes(5)); // Create a 15-minute quote bar consolidator to feed the smoothed bar to the MACD consolidator for trend entry/exit signal. var macdConsolidator = new QuoteBarConsolidator(TimeSpan.FromMinutes(15)); // Subscribe the consolidators to SPY data to automatically update the indicators. RegisterIndicator(_spy, _rsi, rsiConsolidator); RegisterIndicator(_spy, _macd, macdConsolidator); // Warm up the indicators by updating the consolidators with historical data. var tradeBars = History<TradeBar>(_spy, 5 * _rsi.WarmUpPeriod, Resolution.Minute); foreach (var bar in tradeBars) { rsiConsolidator.Update(bar); } var quoteBars = History<QuoteBar>(_spy, 15 * _macd.WarmUpPeriod, Resolution.Minute); foreach (var bar in quoteBars) { macdConsolidator.Update(bar); } } public override void OnData(Slice slice) { // Entry rule: To maximize return, the MACD histogram must be positive (positive trend) and the RSI must be below 30 (oversold). if (_rsi < 30 && _macd.Histogram > 0 && !Portfolio[_spy].IsLong) { SetHoldings(_spy, 0.5m); } // Exit rule: MACD histogram is below 0 (negative trend) else if (_macd.Histogram < 0 && Portfolio[_spy].IsLong) { Liquidate(_spy); } } }
class UniverseSettingsAlgorithm(QCAlgorithm): # Create RSI and MACD indicators for trade signal generation. _rsi = RelativeStrengthIndex(14) _macd = MovingAverageConvergenceDivergence(12, 26, 9) def initialize(self) -> None: self.set_start_date(2021, 1, 1) self.set_end_date(2021, 2, 1) # Request SPY data to feed to indicators and trade. self.spy = self.add_equity("SPY").symbol # Create a 5-minute trade bar consolidator to feed the smoothed bar to the RSI indicator for the price action entry signal. rsi_consolidator = TradeBarConsolidator(timedelta(minutes=5)) # Create a 15-minute quote bar consolidator to feed the smoothed bar to the MACD consolidator for trend entry/exit signal. macd_consolidator = QuoteBarConsolidator(timedelta(minutes=15)) # Subscribe the consolidators to SPY data to automatically update the indicators. self.register_indicator(self.spy, self._rsi, rsi_consolidator) self.register_indicator(self.spy, self._macd, macd_consolidator) # Warm up the indicators by updating the consolidators with historical data. trade_bars = self.history[TradeBar](self.spy, 5 * self._rsi.warm_up_period, Resolution.MINUTE) for bar in trade_bars: rsi_consolidator.update(bar) quote_bars = self.history[QuoteBar](self.spy, 15 * self._macd.warm_up_period, Resolution.MINUTE) for bar in quote_bars: macd_consolidator.update(bar) def on_data(self, slice: Slice) -> None: # Entry rule: MACD histogram is positive (positive trend) and RSI is below 30 (oversold) to maximize return. if self._rsi.current.value < 30 and self._macd.histogram.current.value > 0 and not self.portfolio[self.spy].is_long: self.set_holdings(self.spy, 0.5) # Exit rule: MACD histogram is below 0 (negative trend) elif self._macd.histogram.current.value < 0 and self.portfolio[self.spy].is_long: self.liquidate(self.spy)
Other Examples
For more examples, see the following algorithms: