Consolidating Data

Key Concepts

Introduction

Consolidating data allows you to create bars of any length from smaller bars. Consolidation is commonly used to combine one-minute price bars into longer bars such as 10-20 minute bars. Consolidated bars are helpful because price movement over a longer period can sometimes contain less noise and bars of exotic length are less researched than standard bars, so they may present more opportunities to capture alpha.

To consolidate data, create a Consolidator object and register it for data. The built-in consolidators make it easy to create consolidated bars without introducing bugs. In the following sections, we will introduce the different types of consolidators and show you how to shape data into any form.

Data Shapes and Sizes

Consolidators usually produce output data that is the same format as the input data. They aggregate small TradeBar objects into a large TradeBar, small QuoteBar objects into a large QuoteBar, and Tick objects into either a TradeBar or QuoteBar.

Many asset classes in QuantConnect have data for both trades and quotes. By default, consolidators aggregate data into TradeBar objects. Forex data is the only exception, which consolidators aggregate into QuoteBar objects since Forex data doesn't have TradeBar objects.

Create Consolidators

The process to manually create a consolidator depends on the type of consolidator you want to create. For instructions on each type of consolidator, see Consolidator Types.

To create consolidators with the shortcut method, call the CreateConsolidator method. The timedeltaTimeSpan you pass to the method should be at a lower resolution than the security resolution. For instance, if the security resolution is minute, the timedeltaTimeSpan should span at least 1 minute.

# Create a daily TradeBar consolidator
self.tradebar_consolidator = self.CreateConsolidator(timedelta(days=1), TradeBar)

# Create a minute QuoteBar consolidator
self.quotebar_consolidator = self.CreateConsolidator(timedelta(minutes=1), QuoteBar)

# Create a tick consolidator
self.tick_consolidator = self.CreateConsolidator(timedelta(milliseconds=1), Tick)

# Create a tick quote consolidator
self.tick_quote_consolidator = self.CreateConsolidator(timedelta(milliseconds=1), Tick, TickType.Quote)
// Create a daily TradeBar consolidator
var tradebarConsolidator = CreateConsolidator(TimeSpan.FromDays(1), typeof(TradeBar));

// Create a minute QuoteBar consolidator
var quoteBarConsolidator = CreateConsolidator(TimeSpan.FromMinutes(1), typeof(QuoteBar));

// Create a tick consolidator
var tickConsolidator = CreateConsolidator(TimeSpan.FromMilliseconds(1), typeof(Tick));

// Create a tick quote consolidator
var tickQuoteConsolidator = CreateConsolidator(TimeSpan.FromMilliseconds(1), typeof(Tick), TickType.Quote);

To create the default consolidator for a specific Symbol and resolution, call the ResolveConsolidator method. If the data resolution supports both TradeBar and QuoteBar objects, the method returns a TradeBarConsolidator.

# Create a 10-day consolidator
self.ten_day_consolidator = self.ResolveConsolidator(symbol, timedelta(days=10))

# Create a 1-minute consolidator
self.one_min_consolidator = self.ResolveConsolidator(symbol, Resolution.Minute)

# Create a tick consolidator
self.tick_quote_consolidator = self.ResolveConsolidator(symbol, Resolution.Tick)
// Create a 10-day consolidator
var tenDayConsolidator = ResolveConsolidator(symbol, TimeSpan.FromDays(10));

// Create a 1-minute consolidator
var oneMinuteConsolidator = ResolveConsolidator(symbol, Resolution.Minute);

// Create a tick consolidator
var tickConsolidator = ResolveConsolidator(symbol, Resolution.Tick);

Receive Consolidated Bars

LEAN passes consolidated bars to the consolidator event handler in your algorithm. To add an event handler to a consolidator, attach the method name to the DataConsolidated property. If you call the Consolidate method to create a consolidator, pass the event handler method name to the Consolidate method instead.

_consolidator.DataConsolidated += ConsolidationHandler;
self.consolidator.DataConsolidated += self.consolidation_handler

The most common error when creating consolidators is to put parenthesis () at the end of your method name when setting the event handler of the consolidator. If you use parenthesis, the method executes and the result is passed as the event handler instead of the method itself. Remember to pass the name of your method to the event system. Specifically, it should be ConsolidationHandlerself.consolidation_handler, not ConsolidationHandler()self.consolidation_handler().

The event handler can have any name, but it must have the required parameters. Depending on how you're using the consolidator system, you must use one of the following method headers. If you call a consolidator constructor to create a consolidator, the event handler accepts two parameters. If you call the Consolidate method to create a consolidator, the event handler accepts one parameter.

// Manually Created Event Handler
void ConsolidationHandler(object sender, TradeBar consolidatedBar)
{

}

void ConsolidationHandler(object sender, RenkoBar consolidatedBar)
{

}


// Consolidate() Event Handler
void ConsolidationHandler(TradeBar consolidatedBar)
{

}
# Manually Created Event Handlers
def consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
    pass

def consolidation_handler(self, sender: object, consolidated_bar: RenkoBar) -> None:
    pass


# self.Consolidate() Event Handler
def consolidation_handler(self, consolidated_bar: TradeBar) -> None:
    pass

The consolidation event handlers receive bars when the consolidated bar closes based on the data time zone. If you subscribe to minute resolution data for Bitcoin and create an hourly consolidator, you receive consolidated bars at the top of each hour. However, if you subscribe to minute resolution data for the regular trading hours of US Equities and create a daily consolidator, you receive consolidated bars at 9:31 AM Eastern Time (ET). The consolidated bar for US Equities doesn't close at 4:00 PM ET because the day isn't over. The consolidated bar for US Equities also doesn't close at midnight because your algorithm doesn't receive minute resolution data after 4:00 PM ET until 9:31 AM ET.

Update Aggregated Bars

You can update the consolidator manually or automatically.

Manual Updates

To manually update a consolidator, call its Update method. Depending on the type of consolidator it is, pass a Tick, TradeBar, or QuoteBar to the method. You can update the consolidator with data from the Slice object in the OnData method or with data from a history request.

# Example 1: Update the consolidator with data from the Slice object
def OnData(self, slice: Slice) -> None:
    trade_bar = slice.Bars[self.symbol]
    self.consolidator.Update(trade_bar)

# Example 2.1: Update the consolidator with data from a history request
history = self.History(self.symbol, 30, Resolution.Minute)
for index, row in history.iterrows():
    trade_bar = TradeBar(index[1], self.symbol, row.open, row.high, row.low, row.close, row.volume)
    self.consolidator.Update(trade_bar)

# Example 2.2: Update the consolidator with data from a tradebar history request
history = self.History[TradeBar](self.symbol, 30, Resolution.Minute)
for trade_bar in history:
    self.consolidator.Update(trade_bar)
// Example 1: Update the consolidator with data from the Slice object
public override void OnData(Slice slice)
{
    var tradeBar = slice.Bars[_symbol];
    _consolidator.Update(tradeBar);
}

// Example 2: Update the consolidator with data from a history request
var history = History<TradeBar>(_symbol, 30, Resolution.Minute);
foreach (var tradeBar in history)
{
    _consolidator.Update(tradeBar);
}

Automatic Updates

To automatically update a consolidator with a data subscription, call the AddConsolidator method on the Subscription Manager.

def Initialize(self) -> None:
    # Create the security subscription
    self.symbol = self.AddEquity("QQQ", Resolution.Minute).Symbol

    # Create the consolidator and attach the event handler
    self.consolidator = TradeBarConsolidator(timedelta(minutes=30))
    self.consolidator.DataConsolidated += self.consolidation_handler

    # Register the consolidator for automatic updates
    self.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
public override void Initialize()
{ 
     // Create the security subscription
    _symbol = AddEquity("QQQ", Resolution.Minute).Symbol;

    // Create the consolidator and attach the event handler
    _consolidator = new TradeBarConsolidator(TimeSpan.FromMinutes(30));
    _consolidator.DataConsolidated += ConsolidationHandler;

    // Register the consolidator for automatic updates
    SubscriptionManager.AddConsolidator(_symbol, _consolidator);
}

Execution Sequence

The algorithm manager calls events in the following order:

  1. Scheduled Events
  2. Consolidation event handlers
  3. OnData event handler

This event flow is important to note. For instance, if your consolidation handlers or OnData event handler appends data to a RollingWindow and you use that RollingWindow in your Scheduled Event, when the Scheduled Event executes, the RollingWindow won't contain the most recent data.

Remove Consolidators

If you manually create a consolidator for a universe subscription, remove it once the security leaves your universe. If you don't remove consolidators, they compound internally, causing your algorithm to slow down and eventually die once it runs out of RAM.

To remove a consolidator, call the RemoveConsolidator method. You need to save a reference to each consolidator you want to remove, so we recommend you use a class to organize all of the Symbol-specific objects you create over the lifetime of a security in your universe. For an example of removing consolidators from universe subscriptions, see the GasAndCrudeOilEnergyCorrelationAlphaGasAndCrudeOilEnergyCorrelationAlpha in the LEAN GitHub repository.

SubscriptionManager.RemoveConsolidator(symbol, myConsolidator)
self.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)

Consolidator History

To access historical bars that were passed to your consolidation handler, save the bars as they are produced. You can use a RollingWindow to save the consolidated bars and easily access them later on in your algorithm.

// Create a class member to store the RollingWindow
_window = new RollingWindow<TradeBar>(2);

// In the consolidation handler, add consolidated bars to the RollingWindow
private void ConsolidationHandler(object sender, TradeBar consolidatedBar)
{
    _window.Add(consolidatedBar);
}
# Create a class member to store the RollingWindow
self.window = RollingWindow[TradeBar](2)

# In the consolidation handler, add consolidated bars to the RollingWindow
def consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
    self.window.Add(consolidated_bar)

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: