Streaming Data

Precomputed ML Predictions

Introduction

QuantConnect's point-in-time backtesting engine is one of its greatest strengths: in machine-learning strategies, it forces every feature and prediction to be calculated using only the data available on that exact historical day. This delivers unmatched realism and eliminates look-ahead bias. At the same time, there are clear trade-offs. The same rigorous point-in-time discipline means that every small change to portfolio rules normally triggers a full rerun of the feature-engineering and model-inference pipeline inside the backtest engine.

To avoid re-running the entire feature-engineering and model-inference pipeline for every small change, you can precompute machine learning predictions in the Research Environment and then stream them into an algorithm as a custom universe dataset. This two-file pattern separates expensive feature engineering and model training from the algorithm, which becomes a pure portfolio engine. It provides the following benefits:

  • Speed - Backtests run much faster because they skip feature calculation and model inference.
  • Iteration speed - Test many thresholds or risk overlays without rerunning the ML pipeline.
  • Reproducibility - Predictions are frozen in the Object Store, so there is no drift between backtest runs.

This page explains how to implement a project for precomputing machine learning predictions in the Research Environment and streaming them into a backtest as a custom universe dataset.

Create ML Predictions

In a Research Environment, gather universe and price history, calculate factors and labels, train a model, and export daily predictions as JSON to the Object Store. When you define factors in the notebook, strictly use trailing data as inputs. If you use future data, you may introduce look-ahead bias into the backtest.

from sklearn.ensemble import GradientBoostingRegressor

# Define the training and validation periods.
validation_end = datetime(2026, 3, 12)  # or datetime.now()
validation_start = validation_end - timedelta(30)
train_end = validation_start
train_start = train_end - timedelta(90)

# Get the SP500 constituents history.
qb = QuantBook()
spy = qb.add_equity('SPY')
universe = qb.add_universe(qb.universe.etf(spy))
universe_history = qb.universe_history(universe, train_start, validation_end)
symbols_by_date = {
    time: [constituent.symbol for constituent in constituents]  # Timestamps are midnight Tuesday-Saturday
    for (_, time), constituents in universe_history.items()
}

# Get price history for all the assets in the universe.
symbols = list(set().union(*symbols_by_date.values()))
start_buffer = timedelta(40) # Add buffers to warm-up factors and labels
end_buffer = timedelta(5)
history = qb.history(symbols, train_start-start_buffer, validation_end+end_buffer, Resolution.DAILY)

# Calculate the feature and labels.
def add_features_and_label(asset_history):
    open_ = asset_history["open"]
    close = asset_history["close"]
    volume = asset_history["volume"]
    daily_return = close.pct_change()
    dataset = pd.DataFrame()
    # Feature 1: 10-day momentum
    dataset["momentum_10d"] = close / close.shift(10) - 1
    # Feature 2: 20-day volatility
    dataset["volatility_20d"] = daily_return.rolling(window=20).std()
    # Feature 3: Relative volume
    dataset["relative_volume"] = volume / volume.rolling(window=20).mean()
    # Label: return from next open to following open
    # The rebalancing logic in the algorithm uses market-on-open orders. 
    # The opening auction is the earliest possible time to trade after 
    # the factors update at market close.
    dataset["label"] = open_.shift(-2) / open_.shift(-1) - 1
    return dataset

dataset = history.groupby(level='symbol').apply(add_features_and_label).droplevel(0)

# Remove rows from the dataset for when the asset wouldn't have been in the universe.
indices = []
rows = []
for universe_dt, symbols in symbols_by_date.items():
    previous_close_dt = spy.exchange.hours.get_next_market_close(universe_dt - timedelta(1), False)
    for symbol in symbols:
        indices.append((universe_dt, symbol))
        rows.append(dataset.loc[(symbol, previous_close_dt)])
cleaned_dataset = pd.DataFrame(rows, index=pd.MultiIndex.from_tuples(indices, names=['time', 'symbol']))

# Split the dataset into train and validation subsets (and drop NaN rows).
t = cleaned_dataset.index.get_level_values("time")
train_data = cleaned_dataset[(train_start <= t) & (t < validation_start)].dropna()
validation_data = cleaned_dataset[(validation_start <= t) & (t <= validation_end)].dropna()
X_train = train_data.drop('label', axis=1)
y_train = train_data['label']
X_val = validation_data.drop('label', axis=1)
y_val = validation_data['label']

# Train the model.
model = GradientBoostingRegressor(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    random_state=42
)
model.fit(X_train, y_train)

# Calculate predictions on the validation set.
predictions = pd.Series(model.predict(X_val), index=X_val.index)

# Convert the predictions Series into a JSON object.
df = predictions.rename("prediction").reset_index()
df['symbol'] = df['symbol'].apply(str)
df['time'] = df['time'].dt.strftime('%Y-%m-%d')
predictions_json = [
    {
        'date': date,
        'prediction_by_symbol': group.set_index('symbol')['prediction'].to_dict()
    }
    for date, group in df.groupby('time')
]
# Save the predictions JSON to the Object Store.
key = 'research-to-backtest-factors.json'
qb.object_store.save(key, json.dumps(predictions_json))

Define A Custom Universe

In the main.py file of your project, define a custom data type that reads the JSON predictions from the Object Store.

class PredictionUniverse(PythonData):

    def get_source(self, config, date, is_live_mode):
        # Read from the JSON file we saved in the Object Store.
        return SubscriptionDataSource(
            'research-to-backtest-factors.json', 
            SubscriptionTransportMedium.OBJECT_STORE, 
            FileFormat.UNFOLDING_COLLECTION
        )

    def reader(self, config, line, date, is_live):
        objects = []
        # Iterate through each day.
        for obj in json.loads(line):
            end_time = datetime.strptime(obj["date"], "%Y-%m-%d")
            # Iterate through each stock on this day.
            for symbol, prediction in obj['prediction_by_symbol'].items():
                stock = PredictionUniverse()
                stock.symbol = Symbol(SecurityIdentifier.parse(symbol), "")
                stock.end_time = end_time
                stock.value = prediction
                objects.append(stock)
        # The BaseDataCollection contains multiple PredictionUniverse objects
        # for each day. Set its `time` to match the `end_time` of one of the objects.
        # We pack the collection and timestamp it with the last timestamp (-1) to signal 
        # that the collection has data prior to (or including) that timestamp.
        return BaseDataCollection(objects[-1].end_time, config.symbol, objects)

Stream Predictions in Backtests

In the main.py file of your project, subscribe to the custom universe type you defined in the preceding step. To rebalance the portfolio on each new set of daily predictions, add a Scheduled Event.

class PredictionUniverseAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2026, 2, 10) 
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        self._return_prediction_threshold = 0
        # Add the universe of custom data.
        self.universe_settings.resolution = Resolution.DAILY
        self._universe = self.add_universe(PredictionUniverse, self._select_assets)
        # Add a Scheduled Event to rebalance the portfolio every day.
        self.schedule.on(self.date_rules.every_day('SPY'), self.time_rules.at(8, 0), self._rebalance)

    def _select_assets(self, data):
        # Select the assets with a return prediction above some threshold.
        return [stock.symbol for stock in data if stock.value > self._return_prediction_threshold]
    
    def _rebalance(self):
        # Form an equal-weighted portfolio with the assets in the universe.
        symbols = self._universe.selected
        if not symbols:
            return
        targets = [PortfolioTarget(symbol, 1/len(symbols)) for symbol in symbols]
        self.set_holdings(targets, True)

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: