Streaming Data
Key Concepts
Introduction
There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnData
event. This page explores streaming a file's contents into your algorithm line-by-line. The data you import can be from a remote server or the Object Store.
Data Formats
Common data formats are CSV, JSON, and XML, but you can use any file type that can be read over the internet. For Excel files, double check the raw data format for parsing in the data reader, since data will be formatted for convenient visualization in Excel application view. To avoid confusion of data format, save the spreadsheet as a CSV file and open it in a text editor to confirm the raw data format.
The data in the file must be in chronological order. If you import from a remote file provider, each request has a one-second overhead, so package your custom data to minimize requests. Bundle dates together where possible to speed up execution. The Object Store file provider gives you the fastest execution because you don't need to download the files on every execution.
Set Data Sources
The GetSource
method in your custom data class instructs LEAN where to find the data. This method must return a SubscriptionDataSource
object, which contains the data location and format.
The following table describes the arguments the SubscriptionDataSource
accepts:
Argument | Data Type | Description | Default Value |
---|---|---|---|
source | string str | Data source location | |
transportMedium | SubscriptionTransportMedium | The transport medium to be used to retrieve data from the source | |
format | FileFormat | The format of the data within the source | FileFormat.Csv |
headers | IEnumerable<KeyValuePair<string, string>> | The headers to be used for this source. In cloud algorithms, each of the key-value pairs can consist of up to 1,000 characters. | null None |
The SubscriptionTransportMedium
enumeration has the following members:
The FileFormat
enumeration has the following members:
The following table describes the arguments the GetSource
method accepts:
Argument | Data Type | Description |
---|---|---|
config | SubscriptionDataConfig | The subscription configuration |
date | DateTime datetime | Date of this source file |
isLiveMode | bool | true True if algorithm is running in live mode |
You can use these arguments to create SubscriptionDataSource
objects representing different locations and formats.
public class MyCustomDataType : BaseData { public override SubscriptionDataSource GetSource( SubscriptionDataConfig config, DateTime date, bool isLiveMode) { if (isLiveMode) { return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest); } var source = $"http://my-ftp-server.com/{config.Symbol.Value}/{date:yyyyMMdd}.csv"; return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile); } }
class MyCustomDataType(PythonData): def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource: if isLiveMode: return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest) source = f"http://my-ftp-server.com/{config.Symbol.Value}/{date:%Y%M%d}.csv" return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile)
Parse Custom Data
The Reader
method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but the following table describes the properties you must set. When there is no useable data in a line, the method should return null
None
. LEAN repeatedly calls the Reader
method until the date/time advances or it reaches the end of the file.
Property | Description |
---|---|
Symbol | You can set this property to config.Symbol . |
Time | The time when the data sample starts. |
EndTime | The time when the data sample ends and when LEAN should add the sample to a Slice. |
Value | The default data point value. |
The following table describes the arguments the Reader
method accepts:
Argument | Data Type | Description |
---|---|---|
config | SubscriptionDataConfig | The subscription configuration |
line | string str | Content from the requested data source |
date | DateTime datetime | Date of this source file |
isLiveMode | bool | true True if algorithm is running in live mode |
You can use these arguments to create BaseData
objects from different sources.
public class MyCustomDataType : BaseData { public override BaseData Reader( SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { if (string.IsNullOrWhiteSpace(line.Trim())) { return null; } if (isLiveMode) { var custom = JsonConvert.DeserializeObject<MyCustomDataType>(line); custom.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone); return custom; } if (!char.IsDigit(line[0])) { return null; } var data = line.Split(','); return new MyCustomDataType() { Time = DateTime.ParseExact(data[0], "yyyyMMdd", CultureInfo.InvariantCulture), EndTime = Time.AddDays(1), Symbol = config.Symbol, Value = data[1].IfNotNullOrEmpty( s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)), }; } }
class MyCustomDataType(PythonData): def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData: if not line.strip(): return None custom = MyCustomDataType() custom.Symbol = config.Symbol if isLiveMode: data = json.loads(line) custom.EndTime = Extensions.ConvertFromUtc(datetime.utcnow(), config.ExchangeTimeZone) custom.Value = data["value"] return custom if not line[0].isdigit(): return None data = line.split(',') custom.EndTime = datetime.strptime(data[0], '%Y%m%d') + timedelta(1) custom.Value = float(data[1]) return custom
Live Trading Considerations
In live trading, we pass custom data to your algorithm as soon as it arrives. The time it arrives may not align with the time of other slices. Design your algorithm to handle unsychronized data so that you don't run into issues.
Demonstration Algorithm
The following example algorithm implements a custom data source for the Bitstamp API.
using Newtonsoft.Json; namespace QuantConnect.Algorithm.CSharp { public class CustomDataBitstampAlgorithm : QCAlgorithm { private Symbol _customDataSymbol; public override void Initialize() { SetStartDate(2012, 9, 13); SetEndDate(2021, 6, 20); _customDataSymbol = AddData<Bitstamp>("BTC", Resolution.Daily).Symbol; var history = History<Bitstamp>(_customDataSymbol, 200, Resolution.Daily); Debug($"We got {history.Count()} items from historical data request of {_customDataSymbol}."); } public void OnData(Bitstamp data) { Log($"{data.EndTime}: Close: {data.Close}"); Plot(_customDataSymbol, "Price", data.Close); } public class Bitstamp : BaseData { [JsonProperty("timestamp")] public int Timestamp = 0; [JsonProperty("open")] public decimal Open = 0; [JsonProperty("high")] public decimal High = 0; [JsonProperty("low")] public decimal Low = 0; [JsonProperty("last")] public decimal Close = 0; [JsonProperty("bid")] public decimal Bid = 0; [JsonProperty("ask")] public decimal Ask = 0; [JsonProperty("vwap")] public decimal WeightedPrice = 0; [JsonProperty("volume")] public decimal VolumeBTC = 0; public decimal VolumeUSD = 0; public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode) { if (isLiveMode) { return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest); } var source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv"; return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile); } public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { if (string.IsNullOrWhiteSpace(line.Trim())) { return null; } var coin = new Bitstamp() {Symbol = config.Symbol}; if (isLiveMode) { //Example Line Format: //{"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"} coin = JsonConvert.DeserializeObject<Bitstamp>(line); coin.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone); coin.Time = coin.EndTime.AddDays(-1); coin.Value = coin.Close; return coin; } //Example Line Format: //Date Open High Low Close Volume (BTC) Volume (Currency) Weighted Price //2011-09-13 5.8 6.0 5.65 5.97 58.37138238, 346.0973893944 5.929230648356 if (!char.IsDigit(line[0])) { return null; } var data = line.Split(','); coin.Value = data[4].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); if (coin.Value == 0) { return null; } coin.Time = DateTime.Parse(data[0], CultureInfo.InvariantCulture); coin.EndTime = coin.Time.AddDays(1); coin.Open = data[1].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.High = data[2].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.Low = data[3].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.VolumeBTC = data[5].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.VolumeUSD = data[6].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.WeightedPrice = data[7].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)); coin.Close = coin.Value; return coin; } } } }
class CustomDataBitstampAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2012, 9, 13) self.SetEndDate(2021, 6, 20) self.SetCash(100000) # Define the symbol and "type" of our generic data: self.custom_data_symbol = self.AddData(Bitstamp, "BTC").Symbol history = self.History(Bitstamp, self.custom_data_symbol, 200, Resolution.Daily) self.Debug(f"We got {len(history)} items from historical data request of {self.custom_data_symbol}.") def OnData(self, slice): if self.custom_data_symbol not in slice: return data = slice[self.custom_data_symbol] self.Log(f'{data.EndTime}: Close: {data.Close}') self.Plot(self.custom_data_symbol, 'Price', data.Close) class Bitstamp(PythonData): def GetSource(self, config, date, isLiveMode): if isLiveMode: return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest) source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv" return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): if not line.strip(): return None coin = Bitstamp() coin.Symbol = config.Symbol if isLiveMode: # Example Line Format: # {"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"} liveBTC = json.loads(line) # If value is zero, return None coin.Value = float(liveBTC["last"]) if coin.Value == 0: return None coin.EndTime = Extensions.ConvertFromUtc(datetime.utcnow(), config.ExchangeTimeZone) coin.Time = coin.EndTime - timedelta(1) coin["Open"] = float(liveBTC["open"]) coin["High"] = float(liveBTC["high"]) coin["Low"] = float(liveBTC["low"]) coin["Close"] = coin.Value coin["Ask"] = float(liveBTC["ask"]) coin["Bid"] = float(liveBTC["bid"]) coin["VolumeBTC"] = float(liveBTC["volume"]) coin["WeightedPrice"] = float(liveBTC["vwap"]) return coin # Example Line Format: # Date Open High Low Close Volume (BTC) Volume (Currency) Weighted Price # 2011-09-13 5.8 6.0 5.65 5.97 58.37138238, 346.0973893944 5.929230648356 if not line[0].isdigit(): return None data = line.split(',') # If value is zero, return None coin.Value = float(data[4]) if coin.Value == 0: return None coin.Time = datetime.strptime(data[0], "%Y-%m-%d") coin.EndTime = coin.Time + timedelta(1) coin["Open"] = float(data[1]) coin["High"] = float(data[2]) coin["Low"] = float(data[3]) coin["Close"] = coin.Value coin["VolumeBTC"] = float(data[5]) coin["VolumeUSD"] = float(data[6]) coin["WeightedPrice"] = float(data[7]) return coin
To save this algorithm to your cloud projects, clone itclone it.