Importing Data
Bulk Downloads
Introduction
There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnDataon_data event. This page explores importing an entire file for manual use.
Instead of downloading the file from a remote file provider, you can upload the file to the Object Store (with the Algorithm Lab or with the CLI) for faster execution.
Recommended Use Cases
The batch import technique is outside of the LEAN's awareness or control, so it can't enforce good practices. However, the batch import technique is good for the loading the following datasets:
- Loading data into the Object Store
- Trained AI Models
- Well-defined historical price datasets
- Parameters and setting imports such as
Symbollists
Download Files
The Downloaddownload method downloads the content served from a local file or URL and then returns it as a string.
Basic Usage
var file = Download("<filePathOrURL>");
file = self.download("<filePathOrURL>")
# If your file is in CSV format, convert it to a DataFrame with the `read_csv` method.
from io import StringIO
import pandas as pd
df = pd.read_csv(StringIO(file))
# If your file is in JSON format, parse it with the `loads` method.
import json
data = json.loads(file)
# If your file is in XML format, parse it with the `fromstring` method.
import xml.etree.ElementTree as ET
root = ET.fromstring(file)
Download Method Arguments
The Downloaddownload method can accept header settings, a username, and a password for authentication.
| Argument | Data Type | Description | Default Value |
|---|---|---|---|
address | stringstr | A string containing the URI to download | |
headers |
IEnumerable<KeyValuePair<string, string>>
Dict[str,str] | Defines header values to add to the request | Enumerable.Empty<KeyValuePair<string, string>>()dict() |
userNameuser_name | stringstr | The user name associated with the credentials | nullNone |
password | stringstr | The password for the user name associated with the credentials | nullNone |
Download Request Headers
var headers = new Dictionary{ { "1", "1" } }; Download(address, headers); Download(address, headers, userName, password);
header = { "1": "1" }
self.download(address, headers)
self.download(address, headers, user_name, password)
Transport Binary Data
Follow these steps to transport binary files with joblib:
- Add the following imports to your local program:
- Serialize your object and save it to a file. This Base64 serialization is necessary because the default
joblib.dumpserialization produces a binary file that thedownloadmethod can't read without data corruption. - Save the file to one of the supported sources. For example, save the file to Dropbox.
- Download the remote file into your project.
- Restore the object.
import joblib from io import BytesIO from base64 import b64encode, b64decode
buffer = BytesIO()
joblib.dump(my_object, buffer)
base64_str = b64encode(buffer.getvalue()).decode('ascii')
with open("my_model.b64", "w") as f:
f.write(base64_str)
base64_str = self.download("<fileURL>")
model_bytes = b64decode(base64_str.encode('ascii'))
restored_model = joblib.load(BytesIO(model_bytes))
Follow these steps to transport binary files with pickle:
- Add the following imports to your local program:
- Serialize your object and save it to a file.
- Save the file to one of the supported sources. For example, save the file to Dropbox.
- Download the remote file into your project.
- Restore the object.
import pickle from base64 import b64encode, b64decode
pickle_bytes = pickle.dumps(my_object)
base64_str = b64encode(pickle_bytes).decode('ascii')
with open("my_model.b64", "w") as f:
f.write(base64_str)
base64_str = self.download("<fileURL>")
model_bytes = b64decode(base64_str.encode('ascii'))
restored_model = pickle.loads(model_bytes)
Examples
The following examples demonstrate common practices for bulk downloading data.
Example 1: Download Machine Learning Model
The following algorithm makes use of a
Scikit-Learn
machine learning model to predict SPY price changes and place orders according to the prediction. To obtain the model, we either retrieve it from the
Object Store
if there exists any or download it from a Dropbox link using the
download
method.
import joblib
from io import BytesIO
from base64 import b64decode, b64encode
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
class BulkDownloadExampleAlgorithm(QCAlgorithm):
def initialize(self) -> None:
self.set_start_date(2024, 9, 1)
self.set_end_date(2024, 12, 31)
self.set_cash(100000)
# Request SPY data for model training, prediction, and trading.
self._symbol = self.add_equity("SPY", Resolution.DAILY).symbol
# 2-year data to train the model.
training_length = 252*2
self.training_data = RollingWindow(training_length)
# Warm up the training dataset to train the model immediately.
history = self.history[TradeBar](self._symbol, training_length, Resolution.DAILY)
for trade_bar in history:
self.training_data.add(trade_bar)
# Retrieve the already trained model from the object store for immediate use.
if self.object_store.contains_key("sklearn_model"):
self.model = joblib.load(self.object_store.get_file_path("sklearn_model"))
# Otherwise, bulk-download the base64-encoded model from an external source (Dropbox in this example).
else:
url = "https://www.dropbox.com/scl/fi/tj7wmpv1u2ysvejb0skpm/sklearn_model_b64?rlkey=pvcnuvqrp78oz1t73rytgnvdx&dl=1"
base64_str = self.download(url)
model_bytes = b64decode(base64_str.encode('ascii'))
self.model = joblib.load(BytesIO(model_bytes))
# Train the model to use the prediction right away.
self.train(self.my_training_method)
# Recalibrate the model weekly to ensure its accuracy on the updated domain.
self.train(self.date_rules.every(DayOfWeek.SUNDAY), self.time_rules.at(8,0), self.my_training_method)
def get_features_and_labels(self, n_steps=5) -> None:
# Train and predict the return data, which is more normalized and stationary.
training_df = self.pandas_converter.get_data_frame[TradeBar](list(self.training_data)[::-1])
daily_pct_change = training_df.pct_change().dropna()
# Stack the data for 5-day OHLCV data per each sample to train with.
features = []
labels = []
for i in range(len(daily_pct_change)-n_steps):
features.append(daily_pct_change.iloc[i:i+n_steps].values.flatten())
labels.append(daily_pct_change['close'].iloc[i+n_steps])
features = np.array(features)
labels = np.array(labels)
return features, labels
def my_training_method(self) -> None:
# Prepare the processed training data.
features, labels = self.get_features_and_labels()
# Recalibrate the model based on updated data.
if isinstance(self.model, GridSearchCV):
self.model = self.model.fit(features, labels).best_estimator_
else:
self.model = self.model.fit(features, labels)
def on_data(self, slice: Slice) -> None:
bar = slice.bars.get(self._symbol)
if bar:
self.training_data.add(bar)
# Get predictions by the updated features.
features, _ = self.get_features_and_labels()
prediction = self.model.predict(features[-1].reshape(1, -1))
prediction = float(prediction)
# If the predicted direction is going upward, buy SPY.
if prediction > 0:
self.set_holdings(self._symbol, 1)
# If the predicted direction is going downward, sell SPY.
elif prediction < 0:
self.set_holdings(self._symbol, -1)
def on_end_of_algorithm(self) -> None:
# Store the model in the object store to retrieve it in other instances if the algorithm stops.
model_key = "sklearn_model"
joblib.dump(self.model, self.object_store.get_file_path(model_key))
self._save_base64_encoded_version(model_key)
def _save_base64_encoded_version(self, model_key):
# Save a base64-encoded version of the model to upload to an external source (e.g., Dropbox).
buffer = BytesIO()
joblib.dump(self.model, buffer)
content = b64encode(buffer.getvalue()).decode('ascii')
path = self.object_store.get_file_path(f"{model_key}_b64")
self.object_store.save(path, content)
# Locally, you may want to use the open method
#with open(path, "w") as f:
# f.write(content)
Other Examples
For more examples, see the following algorithms: