Streaming Data
Key Concepts
Introduction
There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnData
on_data
event. This page explores streaming a file's contents into your algorithm line-by-line. The data you import can be from a remote server or the Object Store.
Data Formats
Common data formats are CSV, JSON, XML, and ZIP but you can use any file type that can be read over the internet. For Excel files, double check the raw data format for parsing in the data reader, since data will be formatted for convenient visualization in Excel application view. To avoid confusion of data format, save the spreadsheet as a CSV file and open it in a text editor to confirm the raw data format.
The data in the file must be in chronological order. If you import from a remote file provider, each request has a one-second overhead, so package your custom data to minimize requests. Bundle dates together where possible to speed up execution. The Object Store file provider gives you the fastest execution because you don't need to download the files on every execution.
Set Data Sources
The GetSource
get_source
method in your custom data class instructs LEAN where to find the data.
public class MyCustomDataType : BaseData { public override SubscriptionDataSource GetSource( SubscriptionDataConfig config, DateTime date, bool isLiveMode) { if (isLiveMode) { return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest); } var source = $"http://my-ftp-server.com/{config.Symbol.Value}/{date:yyyyMMdd}.csv"; return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile); /* // Example of loading from the Object Store: return new SubscriptionDataSource(Bitstamp.KEY, SubscriptionTransportMedium.ObjectStore); // Example of loading a remote zip file: return new SubscriptionDataSource( "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip", SubscriptionTransportMedium.RemoteFile, FileFormat.ZipEntryName); // Example of loading a remote zip file and accessing a CSV file inside it: return new SubscriptionDataSource( "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip#csv_file_10.csv", SubscriptionTransportMedium.RemoteFile, FileFormat.ZipEntryName); */ } }
class MyCustomDataType(PythonData): def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource: if is_live_mode: return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.REST) source = f"http://my-ftp-server.com/{config.symbol.value}/{date:%Y%M%d}.csv" return SubscriptionDataSource(source, SubscriptionTransportMedium.REMOTE_FILE) # Example of loading from the Object Store: # return SubscriptionDataSource(Bitstamp.KEY, SubscriptionTransportMedium.OBJECT_STORE) # Example of loading a remote zip file: # return SubscriptionDataSource( # "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip", # SubscriptionTransportMedium.REMOTE_FILE, # FileFormat.ZIP_ENTRY_NAME # ) # Example of loading a remote zip file and accessing a CSV file inside it: # return SubscriptionDataSource( # "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip#csv_file_10.csv", # SubscriptionTransportMedium.REMOTE_FILE, # FileFormat.ZIP_ENTRY_NAME # )
The following table describes the arguments the GetSource
get_source
method accepts:
Argument | Data Type | Description |
---|---|---|
config | SubscriptionDataConfig | The subscription configuration |
date | DateTime datetime | Date of this source file |
isLiveMode is_live_mode | bool | true True if algorithm is running in live mode |
You can use these arguments to create SubscriptionDataSource
objects representing different locations and formats. The following table describes the arguments the SubscriptionDataSource
accepts:
Argument | Data Type | Description | Default Value |
---|---|---|---|
source | string str | Data source location | |
transportMedium transport_medium | SubscriptionTransportMedium | The transport medium to be used to retrieve data from the source | |
format | FileFormat | The format of the data within the source | FileFormat.Csv |
headers | IEnumerable<KeyValuePair<string, string>> | The headers to be used for this source. In cloud algorithms, each of the key-value pairs can consist of up to 1,000 characters. | null None |
The FileFormat
enumeration has the following members:
The SubscriptionTransportMedium
enumeration has the following members:
Member | Description | Example |
---|---|---|
LocalFile LOCAL_FILE | The data comes from disk | Lean.DataSource.CBOE |
RemoteFile REMOTE_FILE | The data is downloaded from a remote source | Custom Securities Examples |
Rest REST | The data comes from a rest call that is polled and returns a single line/data point of information | LiveMode live_mode case of Demonstration Algorithm |
ObjectStore OBJECT_STORE | The data comes from the object store | Example of Custom Data |
Parse Custom Data
The Reader
reader
method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but the following table describes the properties you must set. When there is no useable data in a line, the method should return null
None
. LEAN repeatedly calls the Reader
reader
method until the date/time advances or it reaches the end of the file.
Property | Description |
---|---|
Symbol symbol | You can set this property to config. Symbol symbol . |
Time time | The time when the data sample starts. |
EndTime end_time | The time when the data sample ends and when LEAN should add the sample to a Slice. |
Value value | The default data point value (decimal float ). |
The following table describes the arguments the Reader
reader
method accepts:
Argument | Data Type | Description |
---|---|---|
config | SubscriptionDataConfig | The subscription configuration |
line | string str | Content from the requested data source |
date | DateTime datetime | Date of this source file |
isLiveMode is_live_mode | bool | true True if algorithm is running in live mode |
You can use these arguments to create BaseData
objects from different sources.
public class MyCustomDataType : BaseData { public override BaseData Reader( SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { if (string.IsNullOrWhiteSpace(line.Trim())) { return null; } if (isLiveMode) { var custom = JsonConvert.DeserializeObject<MyCustomDataType>(line); custom.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone); return custom; } if (!char.IsDigit(line[0])) { return null; } var data = line.Split(','); return new MyCustomDataType() { Time = DateTime.ParseExact(data[0], "yyyyMMdd", CultureInfo.InvariantCulture), EndTime = Time.AddDays(1), Symbol = config.Symbol, Value = data[1].IfNotNullOrEmpty( s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)), }; } }
class MyCustomDataType(PythonData): def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool) -> BaseData: if not line.strip(): return None custom = MyCustomDataType() custom.symbol = config.symbol if is_live_mode: data = json.loads(line) custom.end_time = Extensions.convert_from_utc(datetime.utcnow(), config.exchange_time_zone) custom.value = data["value"] return custom if not line[0].isdigit(): return None data = line.split(',') custom.end_time = datetime.strptime(data[0], '%Y%m%d') + timedelta(1) custom.value = float(data[1]) return custom
Object Store
The Object Store file provider gives you the fastest execution because you don't need to download the files on every execution.
To upload the data files into the Object Store, use the Algorithm Lab, CLI, or Research Environment.
To then pull the data from the Object Store into an algorithm, set the custom data source to read from the file in Object Store and parse the data in the reader
Reader
method.
public class MyCustomDataType : BaseData { public override SubscriptionDataSource GetSource( SubscriptionDataConfig config, DateTime date, bool isLiveMode) { return new SubscriptionDataSource("<YourCSVKey>", SubscriptionTransportMedium.ObjectStore, FileFormat.Csv); // return new SubscriptionDataSource("<YourJSONKey>", SubscriptionTransportMedium.ObjectStore, FileFormat.UnfoldingCollection); } }
class MyCustomDataType(PythonData): def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource: return SubscriptionDataSource("<your_csv_key>", SubscriptionTransportMedium.OBJECT_STORE, FileFormat.CSV) # return new SubscriptionDataSource("<your_json_key>", SubscriptionTransportMedium.ObjectStore, FileFormat.UNFOLDING_COLLECTION);
Set Properties
To set the Symbol Properties of the custom data, provide a SymbolProperties
object when you subscribe to the dataset. The ticker you pass to the SymbolProperties
constructor and the AddData
add_data
method must be the same.
To set the Exchange Hours of the custom data, provide a SecurityExchangeHours
object when you subscribe to the dataset. The default hours are for the market to be open 24/7.
var ticker = "ABC"; var properties = new SymbolProperties("description", "USD", 1, 0.01, 0.01, ticker); var hours = MarketHoursDatabase.GetEntry(Market.USA, "SPY", SecurityType.Equity).ExchangeHours; AddData<CustomData>(ticker, properties, hours);
ticker = "ABC" properties = SymbolProperties("description", "USD", 1, 0.01, 0.01, ticker) hours = MarketHoursDatabase.get_entry(Market.USA, "SPY", SecurityType.EQUITY).exchange_hours self.add_data(CustomData, ticker, properties, hours)
Live Trading Considerations
In live trading, we pass custom data to your algorithm as soon as it arrives. The time it arrives may not align with the time of other slices. Design your algorithm to handle unsychronized data so that you don't run into issues.
Demonstration Algorithm
The following example algorithm implements a custom data source for the Bitstamp API.
using Newtonsoft.Json; namespace QuantConnect.Algorithm.CSharp { public class CustomDataBitstampAlgorithm : QCAlgorithm { private Symbol _customDataSymbol; public override void Initialize() { SetStartDate(2012, 9, 13); SetEndDate(2021, 6, 20); _customDataSymbol = AddData<Bitstamp>("BTC", Resolution.Daily).Symbol; var history = History<Bitstamp>(_customDataSymbol, 200, Resolution.Daily); Debug($"We got {history.Count()} items from historical data request of {_customDataSymbol}."); } public void OnData(Bitstamp data) { Log($"{data.EndTime}: Close: {data.Close}"); Plot(_customDataSymbol, "Price", data.Close); } public class Bitstamp : BaseData { [JsonProperty("timestamp")] public int Timestamp = 0; [JsonProperty("open")] public decimal Open = 0; [JsonProperty("high")] public decimal High = 0; [JsonProperty("low")] public decimal Low = 0; [JsonProperty("last")] public decimal Close = 0; [JsonProperty("bid")] public decimal Bid = 0; [JsonProperty("ask")] public decimal Ask = 0; [JsonProperty("vwap")] public decimal WeightedPrice = 0; [JsonProperty("volume")] public decimal VolumeBTC = 0; public decimal VolumeUSD = 0; public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode) { if (isLiveMode) { return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest); } var source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv"; return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile); } public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { if (string.IsNullOrWhiteSpace(line.Trim())) { return null; } var coin = new Bitstamp() {Symbol = config.Symbol}; if (isLiveMode) { //Example Line Format: //{"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"} coin = JsonConvert.DeserializeObject<Bitstamp>(line); coin.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone); coin.Time = coin.EndTime.AddDays(-1); coin.Value = coin.Close; return coin; } //Example Line Format: //Date Open High Low Close Volume (BTC) Volume (Currency) Weighted Price //2011-09-13 5.8 6.0 5.65 5.97 58.37138238, 346.0973893944 5.929230648356 if (!char.IsDigit(line[0])) { return null; } var data = line.Split(','); coin.Value = data[4].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); if (coin.Value == 0) { return null; } coin.Time = DateTime.Parse(data[0], CultureInfo.InvariantCulture); coin.EndTime = coin.Time.AddDays(1); coin.Open = data[1].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.High = data[2].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.Low = data[3].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.VolumeBTC = data[5].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.VolumeUSD = data[6].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.WeightedPrice = data[7].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.Close = coin.Value; return coin; } } } }
class CustomDataBitstampAlgorithm(QCAlgorithm): def initialize(self): self.set_start_date(2012, 9, 13) self.set_end_date(2021, 6, 20) self.set_cash(100000) # Define the symbol and "type" of our generic data: self._custom_data_symbol = self.add_data(Bitstamp, "BTC").symbol history = self.history(Bitstamp, self._custom_data_symbol, 200, Resolution.DAILY) self.debug(f"We got {len(history)} items from historical data request of {self._custom_data_symbol}.") def on_data(self, slice): if self._custom_data_symbol not in slice: return data = slice[self._custom_data_symbol] self.log(f'{data.end_time}: Close: {data.close}') self.plot(self._custom_data_symbol, 'Price', data.close) class Bitstamp(PythonData): def get_source(self, config, date, is_live_mode): if is_live_mode: return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.REST) source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv" return SubscriptionDataSource(source, SubscriptionTransportMedium.REMOTE_FILE) def reader(self, config, line, date, is_live_mode): if not line.strip(): return None coin = Bitstamp() coin.symbol = config.symbol if is_live_mode: # Example Line Format: # {"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"} live_btc = json.loads(line) # If value is zero, return None coin.value = float(live_btc["last"]) if coin.value == 0: return None coin.end_time = Extensions.convert_from_utc(datetime.utcnow(), config.exchange_time_zone) coin.time = coin.end_time - timedelta(1) coin["Open"] = float(live_btc["open"]) coin["High"] = float(live_btc["high"]) coin["Low"] = float(live_btc["low"]) coin["Close"] = coin.value coin["Ask"] = float(live_btc["ask"]) coin["Bid"] = float(live_btc["bid"]) coin["VolumeBTC"] = float(live_btc["volume"]) coin["WeightedPrice"] = float(live_btc["vwap"]) return coin # Example Line Format: # Date Open High Low Close Volume (BTC) Volume (Currency) Weighted Price # 2011-09-13 5.8 6.0 5.65 5.97 58.37138238, 346.0973893944 5.929230648356 if not line[0].isdigit(): return None data = line.split(',') # If value is zero, return None coin.value = float(data[4]) if coin.value == 0: return None coin.time = datetime.strptime(data[0], "%Y-%m-%d") coin.end_time = coin.time + timedelta(1) coin["Open"] = float(data[1]) coin["High"] = float(data[2]) coin["Low"] = float(data[3]) coin["Close"] = coin.value coin["VolumeBTC"] = float(data[5]) coin["VolumeUSD"] = float(data[6]) coin["WeightedPrice"] = float(data[7]) return coin
To save this algorithm to your cloud projects, clone itclone it.