Consolidator Types
Calendar Consolidators
Introduction
Calendar consolidators aggregate data based on specific periods of time like a weekly basis, quarterly basis, or any schedule with specific start and end times. In contrast to time period consolidators, the bars that calendar consolidators produce always end on the same schedule, regardless of the first data point the consolidator receives.
Consolidate Trade Bars
TradeBar
consolidators aggregate TradeBar
objects into TradeBar
objects of the same size or larger. Follow these steps to create and manage a TradeBar
consolidator based on custom start and end periods:
- Create the consolidator.
- Standard Periods
- Custom Periods
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use the built-in CalendarInfo
objects or create your own. The following list describes each technique:
The following table describes the helper methods that the Calendar
class provides to create the built-in CalendarInfo
objects:
Method | Description |
---|---|
Calendar.Weekly Calendar.WEEKLY | Computes the start of week (previous Monday) of the given date/time |
Calendar.Monthly Calendar.MONTHLY | Computes the start of month (1st of the current month) of the given date/time |
Calendar.Quarterly Calendar.QUARTERLY | Computes the start of quarter (1st of the starting month of the current quarter) of the given date/time |
Calendar.Yearly Calendar.YEARLY | Computes the start of year (1st of the current year) of the given date/time |
_consolidator = new TradeBarConsolidator(Calendar.Weekly); // Alias: // _consolidator = CreateConsolidator(Calendar.Weekly, typeof(TradeBar));
self._consolidator = TradeBarConsolidator(Calendar.WEEKLY) # Alias: # self._consolidator = self.create_consolidator(Calendar.WEEKLY, TradeBar)
If you need something more specific than the preceding time periods, define a method to set the start time and period of the consolidated bars. The method should receive a datetime
DateTime
object that's based in the data time zone and should return a CalendarInfo
object, which contains the start time of the bar in the data time zone and the duration of the consolidation period. The following example demonstrates how to create a custom consolidator for weekly bars:
_consolidator = new TradeBarConsolidator(datetime => { var period = TimeSpan.FromDays(7); var timeSpan = new TimeSpan(17, 0, 0); var newDateTime = datetime.Date + timeSpan; var delta = 1 + (int)newDateTime.DayOfWeek; if (delta > 6) { delta = 0; } var start = newDateTime.AddDays(-delta); return new CalendarInfo(start, period); });
# Define a consolidation period method def _consolidation_period(self, dt: datetime) -> CalendarInfo: period = timedelta(7) dt = dt.replace(hour=17, minute=0, second=0, microsecond=0) delta = 1+dt.weekday() if delta > 6: delta = 0 start = dt-timedelta(delta) return CalendarInfo(start, period) # Create the consolidator with the consolidation period method self._consolidator = TradeBarConsolidator(self._consolidation_period)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, TradeBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: pass
If you use a custom consolidation period method, LEAN passes the consolidated bar to the consolidation handler when the consolidation period ends. The Time
time
and EndTime
end_time
properties of the consolidated bar reflect the data time zone, but the Time
time
property of the algorithm still reflects the algorithm time zone.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a TradeBar
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: trade_bar = slice.bars[self._symbol] self._consolidator.update(trade_bar) # Example 2: Update the consolidator with data from a history request history = self.history[TradeBar](self._symbol, 30, Resolution.MINUTE) for trade_bar in history: self._consolidator.update(trade_bar)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var tradeBar = slice.Bars[_symbol]; _consolidator.Update(tradeBar); } // Example 2: Update the consolidator with data from a history request var history = History<TradeBar>(_symbol, 30, Resolution.Minute); foreach (var tradeBar in history) { _consolidator.Update(tradeBar); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
consolidate
helper method to create calendar consolidators and register them for automatic updates.
_consolidator = Consolidate(_symbol, Calendar.Weekly, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Calendar.WEEKLY, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(TradeBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: TradeBar) -> None: pass
Consolidate Quote Bars
QuoteBar
consolidators aggregate QuoteBar
objects into QuoteBar
objects of the same size or larger. Follow these steps to create and manage a QuoteBar
consolidator based on custom start and end periods:
- Create the consolidator.
- Standard Periods
- Custom Periods
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use the built-in CalendarInfo
objects or create your own. The following list describes each technique:
The following table describes the helper methods that the Calendar
class provides to create the built-in CalendarInfo
objects:
Method | Description |
---|---|
Calendar.Weekly Calendar.WEEKLY | Computes the start of week (previous Monday) of the given date/time |
Calendar.Monthly Calendar.MONTHLY | Computes the start of month (1st of the current month) of the given date/time |
Calendar.Quarterly Calendar.QUARTERLY | Computes the start of quarter (1st of the starting month of the current quarter) of the given date/time |
Calendar.Yearly Calendar.YEARLY | Computes the start of year (1st of the current year) of the given date/time |
_consolidator = new QuoteBarConsolidator(Calendar.Weekly); // Alias: // _consolidator = CreateConsolidator(Calendar.Weekly, typeof(QuoteBar));
self._consolidator = QuoteBarConsolidator(Calendar.WEEKLY) # Alias: # self._consolidator = self.create_consolidator(Calendar.WEEKLY, QuoteBar)
If you need something more specific than the preceding time periods, define a method to set the start time and period of the consolidated bars. The method should receive a datetime
DateTime
object that's based in the data time zone and should return a CalendarInfo
object, which contains the start time of the bar in the data time zone and the duration of the consolidation period. The following example demonstrates how to create a custom consolidator for weekly bars:
_consolidator = new QuoteBarConsolidator(datetime => { var period = TimeSpan.FromDays(7); var timeSpan = new TimeSpan(17, 0, 0); var newDateTime = datetime.Date + timeSpan; var delta = 1 + (int)newDateTime.DayOfWeek; if (delta > 6) { delta = 0; } var start = newDateTime.AddDays(-delta); return new CalendarInfo(start, period); });
# Define a consolidation period method def _consolidation_period(self, dt: datetime) -> CalendarInfo: period = timedelta(7) dt = dt.replace(hour=17, minute=0, second=0, microsecond=0) delta = 1+dt.weekday() if delta > 6: delta = 0 start = dt-timedelta(delta) return CalendarInfo(start, period) # Create the consolidator with the consolidation period method self._consolidator = QuoteBarConsolidator(self._consolidation_period)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, QuoteBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None: pass
If you use a custom consolidation period method, LEAN passes the consolidated bar to the consolidation handler when the consolidation period ends. The Time
time
and EndTime
end_time
properties of the consolidated bar reflect the data time zone, but the Time
time
property of the algorithm still reflects the algorithm time zone.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a QuoteBar
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: quote_bar = slice.quote_bars[self._symbol] self._consolidator.update(quote_bar) # Example 2: Update the consolidator with data from a history request history = self.history[QuoteBar](self._symbol, 30, Resolution.MINUTE) for quote_bar in history: self._consolidator.update(quote_bar)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var quoteBar = slice.QuoteBars[_symbol]; _consolidator.Update(quoteBar); } // Example 2: Update the consolidator with data from a history request var history = History<QuoteBar>(_symbol, 30, Resolution.Minute); foreach (var quoteBar in history) { _consolidator.Update(quoteBar); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
consolidate
helper method to create calendar consolidators and register them for automatic updates.
_consolidator = Consolidate<QuoteBar>(_symbol, Calendar.Weekly, TickType.Quote, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Calendar.WEEKLY, TickType.QUOTE, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(QuoteBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: QuoteBar) -> None: pass
Consolidate Trade Ticks
Tick
consolidators aggregate Tick
objects into TradeBar
objects. Follow these steps to create and manage a Tick
consolidator based on custom start and end periods:
- Create the consolidator.
- Standard Periods
- Custom Periods
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use the built-in CalendarInfo
objects or create your own. The following list describes each technique:
The following table describes the helper methods that the Calendar
class provides to create the built-in CalendarInfo
objects:
Method | Description |
---|---|
Calendar.Weekly Calendar.WEEKLY | Computes the start of week (previous Monday) of the given date/time |
Calendar.Monthly Calendar.MONTHLY | Computes the start of month (1st of the current month) of the given date/time |
Calendar.Quarterly Calendar.QUARTERLY | Computes the start of quarter (1st of the starting month of the current quarter) of the given date/time |
Calendar.Yearly Calendar.YEARLY | Computes the start of year (1st of the current year) of the given date/time |
_consolidator = new TickConsolidator(Calendar.Weekly); // Alias: // _consolidator = CreateConsolidator(Calendar.Weekly, typeof(Tick));
self._consolidator = TickConsolidator(Calendar.WEEKLY) # Alias: # self._consolidator = self.create_consolidator(Calendar.WEEKLY, Tick)
If you need something more specific than the preceding time periods, define a method to set the start time and period of the consolidated bars. The method should receive a datetime
DateTime
object that's based in the data time zone and should return a CalendarInfo
object, which contains the start time of the bar in the data time zone and the duration of the consolidation period. The following example demonstrates how to create a custom consolidator for weekly bars:
_consolidator = new TickConsolidator(datetime => { var period = TimeSpan.FromDays(7); var timeSpan = new TimeSpan(17, 0, 0); var newDateTime = datetime.Date + timeSpan; var delta = 1 + (int)newDateTime.DayOfWeek; if (delta > 6) { delta = 0; } var start = newDateTime.AddDays(-delta); return new CalendarInfo(start, period); });
# Define a consolidation period method def _consolidation_period(self, dt: datetime) -> CalendarInfo: period = timedelta(7) dt = dt.replace(hour=17, minute=0, second=0, microsecond=0) delta = 1+dt.weekday() if delta > 6: delta = 0 start = dt-timedelta(delta) return CalendarInfo(start, period) # Create the consolidator with the consolidation period method self._consolidator = TickConsolidator(self._consolidation_period)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, TradeBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: pass
If you use a custom consolidation period method, LEAN passes the consolidated bar to the consolidation handler when the consolidation period ends. The Time
time
and EndTime
end_time
properties of the consolidated bar reflect the data time zone, but the Time
time
property of the algorithm still reflects the algorithm time zone.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a Tick
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: ticks = slice.ticks[self._symbol] for tick in ticks: self._consolidator.update(tick) # Example 2: Update the consolidator with data from a history request ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK) for tick in ticks: self._consolidator.update(tick)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var ticks = slice.Ticks[_symbol]; foreach (var tick in ticks) { _consolidator.Update(tick); } } // Example 2: Update the consolidator with data from a history request var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick); foreach (var tick in ticks) { _consolidator.Update(tick); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
consolidate
helper method to create calendar consolidators and register them for automatic updates.
_consolidator = Consolidate(_symbol, Calendar.Weekly, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Calendar.WEEKLY, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(TradeBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: TradeBar) -> None: pass
Consolidate Quote Ticks
Tick
quote bar consolidators aggregate Tick
objects that represent quotes into QuoteBar
objects. Follow these steps to create and manage a Tick
quote bar consolidator based on custom start and end periods:
- Create the consolidator.
- Standard Periods
- Custom Periods
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use the built-in CalendarInfo
objects or create your own. The following list describes each technique:
The following table describes the helper methods that the Calendar
class provides to create the built-in CalendarInfo
objects:
Method | Description |
---|---|
Calendar.Weekly Calendar.WEEKLY | Computes the start of week (previous Monday) of the given date/time |
Calendar.Monthly Calendar.MONTHLY | Computes the start of month (1st of the current month) of the given date/time |
Calendar.Quarterly Calendar.QUARTERLY | Computes the start of quarter (1st of the starting month of the current quarter) of the given date/time |
Calendar.Yearly Calendar.YEARLY | Computes the start of year (1st of the current year) of the given date/time |
_consolidator = new TickQuoteBarConsolidator(Calendar.Weekly); // Alias: // _consolidator = CreateConsolidator(Calendar.Weekly, typeof(Tick), TickType.Quote);
self._consolidator = TickQuoteBarConsolidator(Calendar.WEEKLY) # Alias: # self._consolidator = self.create_consolidator(Calendar.WEEKLY, Tick, TickType.QUOTE)
If you need something more specific than the preceding time periods, define a method to set the start time and period of the consolidated bars. The method should receive a datetime
DateTime
object that's based in the data time zone and should return a CalendarInfo
object, which contains the start time of the bar in the data time zone and the duration of the consolidation period. The following example demonstrates how to create a custom consolidator for weekly bars:
_consolidator = new TickQuoteBarConsolidator(datetime => { var period = TimeSpan.FromDays(7); var timeSpan = new TimeSpan(17, 0, 0); var newDateTime = datetime.Date + timeSpan; var delta = 1 + (int)newDateTime.DayOfWeek; if (delta > 6) { delta = 0; } var start = newDateTime.AddDays(-delta); return new CalendarInfo(start, period); });
# Define a consolidation period method def _consolidation_period(self, dt: datetime) -> CalendarInfo: period = timedelta(7) dt = dt.replace(hour=17, minute=0, second=0, microsecond=0) delta = 1+dt.weekday() if delta > 6: delta = 0 start = dt-timedelta(delta) return CalendarInfo(start, period) # Create the consolidator with the consolidation period method self._consolidator = TickQuoteBarConsolidator(self._consolidation_period)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, QuoteBar consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None: pass
If you use a custom consolidation period method, LEAN passes the consolidated bar to the consolidation handler when the consolidation period ends. The Time
time
and EndTime
end_time
properties of the consolidated bar reflect the data time zone, but the Time
time
property of the algorithm still reflects the algorithm time zone.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the security subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a Tick
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: ticks = slice.ticks[self._symbol] for tick in ticks: self._consolidator.update(tick) # Example 2: Update the consolidator with data from a history request ticks = self.history[Tick](self._symbol, timedelta(minutes=3), Resolution.TICK) for tick in ticks: self._consolidator.update(tick)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { var ticks = slice.Ticks[_symbol]; foreach (var tick in ticks) { _consolidator.Update(tick); } } // Example 2: Update the consolidator with data from a history request var ticks = History<Tick>(_symbol, TimeSpan.FromMinutes(3), Resolution.Tick); foreach (var tick in ticks) { _consolidator.Update(tick); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
consolidate
helper method to create calendar consolidators and register them for automatic updates.
_consolidator = Consolidate<QuoteBar>(_symbol, Calendar.Weekly, TickType.Quote, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Calendar.WEEKLY, TickType.QUOTE, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(QuoteBar consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: QuoteBar) -> None: pass
Consolidate Other Data
<dataType>
consolidators aggregate various types of data objects into <dataType>
objects of the same size or larger. If you consolidate custom data or alternative datasets, check the definition of the data class to see its data type. Follow these steps to create and manage a <dataType>
consolidator based on custom start and end periods:
- Create the consolidator.
- Standard Periods
- Custom Periods
- Add an event handler to the consolidator.
- Define the consolidation handler.
- Update the consolidator.
- Automatic Updates
- Manual Updates
- If you create consolidators for securities in a dynamic universe and register them for automatic updates, remove the consolidator when the security leaves the universe.
To set the time period for the consolidator, you can use the built-in CalendarInfo
objects or create your own. The following list describes each technique:
The following table describes the helper methods that the Calendar
class provides to create the built-in CalendarInfo
objects:
Method | Description |
---|---|
Calendar.Weekly Calendar.WEEKLY | Computes the start of week (previous Monday) of the given date/time |
Calendar.Monthly Calendar.MONTHLY | Computes the start of month (1st of the current month) of the given date/time |
Calendar.Quarterly Calendar.QUARTERLY | Computes the start of quarter (1st of the starting month of the current quarter) of the given date/time |
Calendar.Yearly Calendar.YEARLY | Computes the start of year (1st of the current year) of the given date/time |
_consolidator = new DynamicDataConsolidator(Calendar.Weekly); // Alias: // _consolidator = CreateConsolidator(Calendar.Weekly, typeof(<dataType>));
self._consolidator = DynamicDataConsolidator(Calendar.WEEKLY) # Alias: # self._consolidator = self.create_consolidator(Calendar.WEEKLY, <dataType>)
If you need something more specific than the preceding time periods, define a method to set the start time and period of the consolidated bars. The method should receive a datetime
DateTime
object that's based in the data time zone and should return a CalendarInfo
object, which contains the start time of the bar in the data time zone and the duration of the consolidation period. The following example demonstrates how to create a custom consolidator for weekly bars:
_consolidator = new DynamicDataConsolidator(datetime => { var period = TimeSpan.FromDays(7); var timeSpan = new TimeSpan(17, 0, 0); var newDateTime = datetime.Date + timeSpan; var delta = 1 + (int)newDateTime.DayOfWeek; if (delta > 6) { delta = 0; } var start = newDateTime.AddDays(-delta); return new CalendarInfo(start, period); });
# Define a consolidation period method def _consolidation_period(self, dt: datetime) -> CalendarInfo: period = timedelta(7) dt = dt.replace(hour=17, minute=0, second=0, microsecond=0) delta = 1+dt.weekday() if delta > 6: delta = 0 start = dt-timedelta(delta) return CalendarInfo(start, period) # Create the consolidator with the consolidation period method self._consolidator = DynamicDataConsolidator(self._consolidation_period)
_consolidator.DataConsolidated += ConsolidationHandler;
self._consolidator.data_consolidated += self._consolidation_handler
LEAN passes consolidated bars to the consolidator event handler in your algorithm. The most common error when creating consolidators is to put parenthesis ()
at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandler
self._consolidation_handler
, not ConsolidationHandler()
self._consolidation_handler()
.
void ConsolidationHandler(object sender, <dataType> consolidatedBar) { }
def _consolidation_handler(self, sender: object, consolidated_bar: <dataType>) -> None: pass
If you use a custom consolidation period method, LEAN passes the consolidated bar to the consolidation handler when the consolidation period ends. The Time
time
and EndTime
end_time
properties of the consolidated bar reflect the data time zone, but the Time
time
property of the algorithm still reflects the algorithm time zone.
You can automatically or manually update the consolidator.
To automatically update a consolidator with data from the data subscription, call the AddConsolidator
add_consolidator
method of the Subscription Manager.
self.subscription_manager.add_consolidator(self._symbol, self._consolidator)
SubscriptionManager.AddConsolidator(_symbol, _consolidator);
Manual updates let you control when the consolidator updates and what data you use to update it. If you need to warm up a consolidator with data outside of the warm-up period, you can manually update the consolidator. To manually update a consolidator, call its Update
update
method with a <dataType>
object. You can update the consolidator with data from the Slice object in the OnData
on_data
method or with data from a history request.
# Example 1: Update the consolidator with data from the Slice object def on_data(self, slice: Slice) -> None: if self._symbol in slice: self._consolidator.update(slice[self._symbol]) # Example 2: Update the consolidator with data from a history request history = self.history[<dataType>](self._symbol, 30, Resolution.DAILY) for data in history: self._consolidator.update(data)
// Example 1: Update the consolidator with data from the Slice object public override void OnData(Slice slice) { if slice.ContainsKey(_symbol) { _consolidator.Update(slice[_symbol]); } } // Example 2: Update the consolidator with data from a history request var history = History<<dataType>>(_symbol, 30, Resolution.Daily); foreach (var data in history) { _consolidator.Update(data); }
SubscriptionManager.RemoveConsolidator(_symbol, _consolidator);
self.subscription_manager.remove_consolidator(self._symbol, self._consolidator)
If you have a dynamic universe and don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.
You can also use the Consolidate
consolidate
helper method to create calendar consolidators and register them for automatic updates.
_consolidator = Consolidate(_symbol, Calendar.Weekly, ConsolidationHandler);
self._consolidator = self.consolidate(self._symbol, Calendar.WEEKLY, self._consolidation_handler)
If you use the Consolidate
consolidate
helper method, the consolidation handler doesn't receive an object
argument.
void ConsolidationHandler(<dataType> consolidatedBar) { }
def _consolidation_handler(self, consolidated_bar: <dataType>) -> None: pass
Reset Consolidators
To reset a consolidator, call its Reset
reset
method.
self._consolidator.reset()
_consolidator.Reset();
If you are live trading Equities or backtesting Equities without the adjusted data normalization mode,
reset your consolidators when splits and dividends occur.
When a split or dividend occurs while the consolidator is in the process of building a bar, the open, high, and low may reflect prices from before the split or dividend.
To avoid issues, call the consolidator's Reset
reset
method and then warm it up with ScaledRaw
SCALED_RAW
data from a history request.
def on_data(self, data: Slice): # When a split or dividend occurs... if (data.splits.contains_key(self._symbol) and data.splits[self._symbol].type == SplitType.SPLIT_OCCURRED or data.dividends.contains_key(self._symbol)): # If the consolidator is working on a bar... if self._consolidator.working_data: # Get adjusted prices for the time period of the working bar. history = self.history[TradeBar](self._symbol, self._consolidator.working_data.time, self.time, data_normalization_mode=DataNormalizationMode.SCALED_RAW) # Reset the consolidator. self._consolidator.reset() # Warm-up the consolidator with the adjusted price data. for bar in history: self._consolidator.update(bar)
public override void OnData(Slice data) { // When a split or dividend occurs... if ((data.Splits.ContainsKey(_symbol) && data.Splits[_symbol].Type == SplitType.SplitOccurred) || data.Dividends.ContainsKey(_symbol)) { // If the consolidator is working on a bar... if (_consolidator.WorkingData != null) { // Get adjusted prices for the time period of the working bar. var history = History<TradeBar>(_symbol, _consolidator.WorkingData.Time, Time, dataNormalizationMode: DataNormalizationMode.ScaledRaw); // Reset the consolidator. _consolidator.Reset(); // Warm-up the consolidator with the adjusted price data. foreach (var bar in history) { _consolidator.Update(bar); } } } }
Examples
The following examples demonstrate some common practices for calendar consolidators.
Example 1: Daily Futures Bars
Daily bars from the CME and Yahoo Finance represent the price action from 5 PM Central Time (CT) to 4 PM CT on the following day. In contrast, the regular daily resolution bars on QuantConnect represent the price action from 12 AM Eastern Time (ET) to 12 AM ET the following day. To create daily bars that represent the same period of time as the daily bars from the CME, the following algorithm consolidates minute bars (during regular and extended market hours) into daily bars that span from 5 PM CT to 4 PM CT on the following day. The algorithm then uses these consolidated daily bars to place take profit and stop loss orders as a function of the daily Average True Range .
public class CalendarConsolidatorExampleAlgorithm : QCAlgorithm { private Future _future; private decimal _takeProfitPrice, _stopLossPrice; // Create an ATR indicator to set take profit and stop loss levels. private AverageTrueRange _atr = new(10); // Create a day tracker to avoid over-trading. private int _day = -1; public override void Initialize() { SetStartDate(2021, 1, 1); SetEndDate(2022, 1, 1); // Add ES Futures data and configure the continuous contract settings // so you can update the ATR with a smooth feed of prices. _future = AddFuture( Futures.Indices.SP500EMini, extendedMarketHours: true, dataNormalizationMode: DataNormalizationMode.BackwardsRatio, dataMappingMode: DataMappingMode.OpenInterest, contractDepthOffset: 0 ); // Create a calendar consolidator to generate daily ES Futures bars // since market starts at 6pm EST to 5pm EST next day. var consolidator = new TradeBarConsolidator(FutureTradingHour); consolidator.DataConsolidated += OnConsolidated; // Warm up the consolidator and ATR indicator. var history = History<TradeBar>(_future.Symbol, 14500, Resolution.Minute); foreach (var bar in history) { consolidator.Update(bar); } // Subscribe the consolidator for automatic updates with ES data. SubscriptionManager.AddConsolidator(_future.Symbol, consolidator); } private CalendarInfo FutureTradingHour(DateTime datetime) { // Set the open time of the bar to be 6 PM EST. var start = datetime.Date.AddHours(datetime.Hour < 18 ? -6 : 18); // Set the close time of the bar to be 5 PM EST the next day, which is 23 hours later. return new CalendarInfo(start, TimeSpan.FromHours(23)); } private void OnConsolidated(object sender, TradeBar bar) { // Update the ATR indicator since we want the daily ATR to set the stop price. _atr.Update(bar); // Set the daily take profit and stop price levels as a function of the previous // close price and the ATR. _takeProfitPrice = bar.Close + _atr * 2m; _stopLossPrice = bar.Close - _atr * 2m; } public override void OnData(Slice slice) { // Liquidate the current contract in the continuous contract series // when it's about to expire. if (slice.SymbolChangedEvents.TryGetValue(_future.Symbol, out var changedEvent)) { Liquidate(changedEvent.OldSymbol); } // If we're not invested, buy the current contract in the continuous contract series. if (!Portfolio.Invested && Securities[_future.Mapped].Price != 0 && _day != slice.Time.Day) { MarketOrder(_future.Mapped, 1); } // If we're already invested, check if we've hit the TP/SL level. else if (Portfolio.Invested && slice.Bars.TryGetValue(_future.Symbol, out var bar)) { // Check if the current price is above profit taker or below stop loss. if (bar.Close > _takeProfitPrice || bar.Close < _stopLossPrice) { // Exit the position. Liquidate(); } } // Update the daily tracker so we only update our position once per day. _day = slice.Time.Day; } }
class CalendarConsolidatorExampleAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2021, 1, 1) self.set_end_date(2022, 1, 1) # Create an ATR indicator to set take profit and stop loss levels. self._atr = AverageTrueRange(10) # Create a day tracker to avoid overtrading. self._day = -1 # Add ES Futures data and configure the continuous contract settings # so you can update the ATR with a smooth feed of prices. self._future = self.add_future( Futures.Indices.SP_500_E_MINI, extended_market_hours=True, data_normalization_mode=DataNormalizationMode.BACKWARDS_RATIO, data_mapping_mode=DataMappingMode.OPEN_INTEREST, contract_depth_offset=0 ) # Create a calendar consolidator to generate daily ES Futures bars # since market starts at 6pm EST to 5pm EST next day. consolidator = TradeBarConsolidator(self._future_trading_hour) consolidator.data_consolidated += self._on_consolidated # Warm up the consolidator and ATR indicator. history = self.history[TradeBar](self._future.symbol, 14500, Resolution.MINUTE) for bar in history: consolidator.update(bar) # Subscribe the consolidator for automatic updates with ES data. self.subscription_manager.add_consolidator(self._future.symbol, consolidator) def _future_trading_hour(self, dt: datetime) -> CalendarInfo: # Set the open time of the bar to be 6 PM EST. start = dt.replace(hour=0, minute=0, second=0, microsecond=0) if dt.hour < 18: start -= timedelta(hours=6) else: start += timedelta(hours=18) # Set the close time of the bar to be 5 PM EST the next day, which is 23 hours later. return CalendarInfo(start, timedelta(hours=23)) def _on_consolidated(self, sender: object, bar: TradeBar) -> None: # Update the ATR indicator since we want the daily ATR to set the stop price. self._atr.update(bar) # Set the daily take profit and stop price levels as a function of the previous # close price and the ATR. self._take_profit_price = bar.close + self._atr.current.value * 2 self._stop_loss_price = bar.close - self._atr.current.value * 2 def on_data(self, slice: Slice) -> None: # Liquidate the current contract in the continuous contract series # when it's about to expire. changed_event = slice.symbol_changed_events.get(self._future.symbol) if changed_event: self.liquidate(changed_event.old_symbol) # If we're not invested, buy the current contract in the continuous contract series. if (not self.portfolio.invested and self.securities[self._future.mapped].price and self._day != slice.time.day): self.market_order(self._future.mapped, 1) # If we're already invested, check if we've hit the TP/SL level. elif self.portfolio.invested and self._future.symbol in slice.bars: current_price = slice.bars[self._future.symbol].close # Check if the current price is above profit taker or below stop loss. if current_price > self._take_profit_price or current_price < self._stop_loss_price: # Exit the position. self.liquidate() # Update the daily tracker so we only update our position once per day. self._day = slice.time.day
Example 2: Create Daily
QuoteBar
Objects That Start at 5 PM
The following algorithm consolidates minute resolution Forex data into daily
QuoteBar
objects that open and close at 5 PM Eastern Time.
public class CalendarConsolidatorExampleAlgorithm : QCAlgorithm { private dynamic _pair; public override void Initialize() { SetStartDate(2020, 1, 1); // Add the USDJPY Forex pair. _pair = AddForex("USDJPY"); // Create an EMA indicator for the pair. _pair.Ema = new ExponentialMovingAverage(10); // Create a QuoteBar consolidator with a custom consolidation period. var consolidator = new QuoteBarConsolidator(DailyForexConsolidationPeriod); // Attach a consolidation handler that will receive the consolidated bars. consolidator.DataConsolidated += ConsolidationHandler; // Subscribe the consolidator for automatic updates with the prices of the pair. SubscriptionManager.AddConsolidator(_pair.Symbol, consolidator); // Register the indicator for automatic updates with the consolidated bars. RegisterIndicator<IndicatorDataPoint>(_pair.Symbol, _pair.Ema, consolidator); // Warm up the consolidator and indicator. var history = History<QuoteBar>(_pair.Symbol, 29000, Resolution.Minute); foreach (var bar in history) { consolidator.Update(bar); } } // Define the consolidation period. private CalendarInfo DailyForexConsolidationPeriod(DateTime dt) { // Set the start of the bar to be 5 PM ET. var start = dt.Date; if (dt.Hour < 17) { start = start.AddHours(-7); } else { start = start.AddHours(17); } // Set the end of the bar to be 5 PM ET the next day. return new CalendarInfo(start, TimeSpan.FromDays(1)); } private void ConsolidationHandler(object sender, QuoteBar consolidatedBar) { // Wait until the indicator is ready and the algorithm is running. if (!_pair.Ema.IsReady || IsWarmingUp) { return; } // Plot the closing price and the EMA. Plot(consolidatedBar.Symbol.ToString(), "Close", consolidatedBar.Close); Plot(consolidatedBar.Symbol.ToString(), "EMA", _pair.Ema.Current.Value); } }
class CalendarConsolidatorExampleAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2020, 1, 1) # Add the USDJPY Forex pair. self._pair = self.add_forex("USDJPY") # Create an EMA indicator for the pair. self._pair.ema = ExponentialMovingAverage(10) # Create a QuoteBar consolidator with a custom consolidation period. consolidator = QuoteBarConsolidator(self._daily_forex_consolidation_period) # Attach a consolidation handler that will receive the consolidated bars. consolidator.data_consolidated += self._consolidation_handler # Subscribe the consolidator for automatic updates with the prices of the pair. self.subscription_manager.add_consolidator(self._pair.symbol, consolidator) # Register the indicator for automatic updates with the consolidated bars. self.register_indicator(self._pair.symbol, self._pair.ema, consolidator) # Warm up the consolidator and indicator. history = self.history[QuoteBar](self._pair.symbol, 29000, Resolution.MINUTE) for bar in history: consolidator.update(bar) # Define the consolidation period. def _daily_forex_consolidation_period(self, dt: datetime) -> CalendarInfo: # Set the start of the bar to be 5 PM ET. start = dt.replace(hour=0, minute=0, second=0, microsecond=0) if dt.hour < 17: start -= timedelta(hours=7) else: start += timedelta(hours=17) # Set the end of the bar to be 5 PM ET the next day. return CalendarInfo(start, timedelta(1)) def _consolidation_handler(self, sender: object, consolidated_bar: QuoteBar) -> None: # Wait until the indicator is ready and the algorithm is running. if not self._pair.ema.is_ready or self.is_warming_up: return # Plot the closing price and the EMA. self.plot(str(consolidated_bar.symbol), 'Close', consolidated_bar.close) self.plot(str(consolidated_bar.symbol), 'EMA', self._pair.ema.current.value)
Example 3: Monthly Candlesticks
The following algorithm consolidates minute-resolution data into monthly
TradeBar
objects and then
plots them as candlesticks
.
public class CalendarConsolidatorExampleAlgorithm : QCAlgorithm { public override void Initialize() { // Add an asset. var symbol = AddEquity("SPY").Symbol; // Create a consolidator that produces monthly TradeBar objects. // This is the stardard way: //var consolidator = new TradeBarConsolidator(Calendar.Monthly); // However, you can acomplish the same thing with a custom period: var consolidator = new TradeBarConsolidator(CustomMonthly); // Add a consolidation handler that plots the monthly bars as candlesticks. consolidator.DataConsolidated += (_, bar) => Plot( "Monthly Candlestick", symbol.Value, bar.Open, bar.High, bar.Low, bar.Close ); // Register the consolidator for automatic updates. SubscriptionManager.AddConsolidator(symbol, consolidator); } public CalendarInfo CustomMonthly(DateTime dt) { // Set the start time to the beginning of the month. var start = dt.AddDays(1 - dt.Day).Date; // Set the end time to the beginning of next month. var end = start.AddMonths(1); return new CalendarInfo(start, (end - start)); } }
from dateutil.relativedelta import relativedelta class CalendarConsolidatorExampleAlgorithm(QCAlgorithm): def initialize(self) -> None: # Add an asset. symbol = self.add_equity("SPY").symbol # Create a consolidator that produces monthly TradeBar objects. # This is the stardard way: #consolidator = TradeBarConsolidator(Calendar.MONTHLY) # However, you can acomplish the same thing with a custom period: consolidator = TradeBarConsolidator(self._custom_monthly) # Add a consolidation handler that plots the monthly bars as candlesticks. consolidator.data_consolidated += lambda _, bar: self.plot( "Monthly Candlestick", symbol.value, bar.open, bar.high, bar.low, bar.close ) # Register the consolidator for automatic updates. self.subscription_manager.add_consolidator(symbol, consolidator) def _custom_monthly(self, dt: datetime) -> CalendarInfo: # Set the start time to the beginning of the month. start = dt.replace(day=1).date() # Set the end time to the beginning of next month. end = start + relativedelta(months=1) return CalendarInfo(start, end - start)
Other Examples
For more examples, see the following algorithms: