Importing Data

Bulk Downloads

Introduction

There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnDataon_data event. This page explores importing an entire file for manual use.

Instead of downloading the file from a remote file provider, you can upload the file to the Object Store (with the Algorithm Lab or with the CLI) for faster execution.

Recommended Use Cases

The batch import technique is outside of the LEAN's awareness or control, so it can't enforce good practices. However, the batch import technique is good for the loading the following datasets:

  • Loading data into the Object Store
  • Trained AI Models
  • Well-defined historical price datasets
  • Parameters and setting imports such as Symbol lists

Download Files

The Downloaddownload method downloads the content served from a local file or URL and then returns it as a string.

Basic Usage

var file = Download("<filePathOrURL>");
file = self.download("<filePathOrURL>")

# If your file is in CSV format, convert it to a DataFrame with the `read_csv` method.
from io import StringIO
import pandas as pd
df = pd.read_csv(StringIO(file))

# If your file is in JSON format, parse it with the `loads` method.
import json
data = json.loads(file)

# If your file is in XML format, parse it with the `fromstring` method.
import xml.etree.ElementTree as ET
root = ET.fromstring(file)

Download Method Arguments

The Downloaddownload method can accept header settings, a username, and a password for authentication.

ArgumentData TypeDescriptionDefault Value
addressstringstrA string containing the URI to download
headers IEnumerable<KeyValuePair<string, string>> Dict[str,str]Defines header values to add to the requestEnumerable.Empty<KeyValuePair<string, string>>()dict()
userNameuser_namestringstrThe user name associated with the credentialsnullNone
passwordstringstrThe password for the user name associated with the credentialsnullNone

Download Request Headers

var headers = new Dictionary { { "1", "1" } };
Download(address, headers);
Download(address, headers, userName, password);
header = { "1": "1" }
self.download(address, headers)
self.download(address, headers, user_name, password)

Save Files

When you download a remote file, save it into the Object Store so that you don't have to download the file again. If you need to import the file multiple times, it's faster to import it from the Object Store rather than repeatedly downloading the file from the remote file provider.

Transport Binary Data

Follow these steps to transport binary files with joblib:

  1. Add the following imports to your local program:
  2. import joblib
    from io import BytesIO
    from base64 import b64encode, b64decode
  3. Serialize your object and save it to a file. This Base64 serialization is necessary because the default joblib.dump serialization produces a binary file that the download method can't read without data corruption.
  4. buffer = BytesIO()
    joblib.dump(my_object, buffer)
    base64_str = b64encode(buffer.getvalue()).decode('ascii')
    with open("my_model.b64", "w") as f:
        f.write(base64_str)
  5. Save the file to one of the supported sources. For example, save the file to Dropbox.
  6. Download the remote file into your project.
  7. base64_str = self.download("<fileURL>")
  8. Restore the object.
  9. model_bytes = b64decode(base64_str.encode('ascii'))
    restored_model = joblib.load(BytesIO(model_bytes))

Follow these steps to transport binary files with pickle:

  1. Add the following imports to your local program:
  2. import pickle
    from base64 import b64encode, b64decode
  3. Serialize your object and save it to a file.
  4. pickle_bytes = pickle.dumps(my_object)
    base64_str = b64encode(pickle_bytes).decode('ascii')
    with open("my_model.b64", "w") as f:
        f.write(base64_str)
  5. Save the file to one of the supported sources. For example, save the file to Dropbox.
  6. Download the remote file into your project.
  7. base64_str = self.download("<fileURL>")
  8. Restore the object.
  9. model_bytes = b64decode(base64_str.encode('ascii'))
    restored_model = pickle.loads(model_bytes)

Examples

The following examples demonstrate common practices for bulk downloading data.

Example 1: Download Machine Learning Model

The following algorithm makes use of a Scikit-Learn machine learning model to predict SPY price changes and place orders according to the prediction. To obtain the model, we either retrieve it from the Object Store if there exists any or download it from a Dropbox link using the download method.

import joblib
from io import BytesIO
from base64 import b64decode, b64encode
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV

class BulkDownloadExampleAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2024, 9, 1)
        self.set_end_date(2024, 12, 31)
        self.set_cash(100000)
        # Request SPY data for model training, prediction, and trading.
        self._symbol = self.add_equity("SPY", Resolution.DAILY).symbol

        # 2-year data to train the model.
        training_length = 252*2
        self.training_data = RollingWindow(training_length)
        # Warm up the training dataset to train the model immediately.
        history = self.history[TradeBar](self._symbol, training_length, Resolution.DAILY)
        for trade_bar in history:
            self.training_data.add(trade_bar)

        # Retrieve the already trained model from the object store for immediate use.
        if self.object_store.contains_key("sklearn_model"):
            self.model = joblib.load(self.object_store.get_file_path("sklearn_model"))
        # Otherwise, bulk-download the base64-encoded model from an external source (Dropbox in this example).
        else:
            url = "https://www.dropbox.com/scl/fi/tj7wmpv1u2ysvejb0skpm/sklearn_model_b64?rlkey=pvcnuvqrp78oz1t73rytgnvdx&dl=1"
            base64_str = self.download(url)
            model_bytes = b64decode(base64_str.encode('ascii'))
            self.model = joblib.load(BytesIO(model_bytes))

        # Train the model to use the prediction right away.
        self.train(self.my_training_method)
        # Recalibrate the model weekly to ensure its accuracy on the updated domain.
        self.train(self.date_rules.every(DayOfWeek.SUNDAY), self.time_rules.at(8,0), self.my_training_method)
        
    def get_features_and_labels(self, n_steps=5) -> None:
        # Train and predict the return data, which is more normalized and stationary.
        training_df = self.pandas_converter.get_data_frame[TradeBar](list(self.training_data)[::-1])
        daily_pct_change = training_df.pct_change().dropna()

        # Stack the data for 5-day OHLCV data per each sample to train with.
        features = []
        labels = []
        for i in range(len(daily_pct_change)-n_steps):
            features.append(daily_pct_change.iloc[i:i+n_steps].values.flatten())
            labels.append(daily_pct_change['close'].iloc[i+n_steps])
        features = np.array(features)
        labels = np.array(labels)

        return features, labels

    def my_training_method(self) -> None:
        # Prepare the processed training data.
        features, labels = self.get_features_and_labels()
        # Recalibrate the model based on updated data.
        if isinstance(self.model, GridSearchCV):
            self.model = self.model.fit(features, labels).best_estimator_
        else:
            self.model = self.model.fit(features, labels)

    def on_data(self, slice: Slice) -> None:
        bar = slice.bars.get(self._symbol)
        if bar:
            self.training_data.add(bar)

        # Get predictions by the updated features.
        features, _ = self.get_features_and_labels()
        prediction = self.model.predict(features[-1].reshape(1, -1))
        prediction = float(prediction)

        # If the predicted direction is going upward, buy SPY.
        if prediction > 0:
            self.set_holdings(self._symbol, 1)
        # If the predicted direction is going downward, sell SPY.
        elif prediction < 0:            
            self.set_holdings(self._symbol, -1)

    def on_end_of_algorithm(self) -> None:
        # Store the model in the object store to retrieve it in other instances if the algorithm stops.
        model_key = "sklearn_model"
        joblib.dump(self.model, self.object_store.get_file_path(model_key))
        self._save_base64_encoded_version(model_key)

    def _save_base64_encoded_version(self, model_key):
        # Save a base64-encoded version of the model to upload to an external source (e.g., Dropbox).
        buffer = BytesIO()
        joblib.dump(self.model, buffer)
        content = b64encode(buffer.getvalue()).decode('ascii')
        path = self.object_store.get_file_path(f"{model_key}_b64")
        self.object_store.save(path, content)

        # Locally, you may want to use the open method
        #with open(path, "w") as f:
        #    f.write(content)

Other Examples

For more examples, see the following algorithms:

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: