Streaming Data

Key Concepts

Introduction

There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnDataon_data event. This page explores streaming a file's contents into your algorithm line-by-line. The data you import can be from a remote server or the Object Store.

Data Formats

Common data formats are CSV, JSON, and XML, but you can use any file type that can be read over the internet. For Excel files, double check the raw data format for parsing in the data reader, since data will be formatted for convenient visualization in Excel application view. To avoid confusion of data format, save the spreadsheet as a CSV file and open it in a text editor to confirm the raw data format.

The data in the file must be in chronological order. If you import from a remote file provider, each request has a one-second overhead, so package your custom data to minimize requests. Bundle dates together where possible to speed up execution. The Object Store file provider gives you the fastest execution because you don't need to download the files on every execution.

Set Data Sources

The GetSourceget_source method in your custom data class instructs LEAN where to find the data.

public class MyCustomDataType : BaseData
{
    public override SubscriptionDataSource GetSource(
        SubscriptionDataConfig config,
        DateTime date,
        bool isLiveMode)
    {
        if (isLiveMode)
        {
            return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest);
        }

        var source = $"http://my-ftp-server.com/{config.Symbol.Value}/{date:yyyyMMdd}.csv";
        return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile);
    }
}
class MyCustomDataType(PythonData):
    def get_source(self,
         config: SubscriptionDataConfig,
         date: datetime,
         isLiveMode: bool) -> SubscriptionDataSource:
        
         if isLiveMode:
            return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.REST)

        source = f"http://my-ftp-server.com/{config.symbol.value}/{date:%Y%M%d}.csv"
        return SubscriptionDataSource(source, SubscriptionTransportMedium.REMOTE_FILE)

The following table describes the arguments the GetSourceget_source method accepts:

ArgumentData TypeDescription
configSubscriptionDataConfigThe subscription configuration
dateDateTimedatetimeDate of this source file
isLiveModebooltrueTrue if algorithm is running in live mode

You can use these arguments to create SubscriptionDataSource objects representing different locations and formats. The following table describes the arguments the SubscriptionDataSource accepts:

ArgumentData TypeDescriptionDefault Value
sourcestringstrData source location
transportMediumSubscriptionTransportMediumThe transport medium to be used to retrieve data from the source
formatFileFormatThe format of the data within the sourceFileFormat.Csv
headersIEnumerable<KeyValuePair<string, string>>The headers to be used for this source. In cloud algorithms, each of the key-value pairs can consist of up to 1,000 characters.nullNone

The FileFormat enumeration has the following members:

The SubscriptionTransportMedium enumeration has the following members:

MemberDescriptionExample
LocalFileThe data comes from diskLean.DataSource.CBOE
RemoteFileThe data is downloaded from a remote sourceCustom Securities Examples
RestThe data comes from a rest call that is polled and returns a single line/data point of informationLiveModelive_mode case of Demonstration Algorithm
ObjectStoreThe data comes from the object storeExample of Custom Data

Parse Custom Data

The Readerreader method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but the following table describes the properties you must set. When there is no useable data in a line, the method should return nullNone. LEAN repeatedly calls the Readerreader method until the date/time advances or it reaches the end of the file.

PropertyDescription
SymbolYou can set this property to config.Symbol.
TimetimeThe time when the data sample starts.
EndTimeend_timeThe time when the data sample ends and when LEAN should add the sample to a Slice.
ValueThe default data point value.

The following table describes the arguments the Readerreader method accepts:

ArgumentData TypeDescription
configSubscriptionDataConfigThe subscription configuration
linestringstrContent from the requested data source
dateDateTimedatetimeDate of this source file
isLiveModebooltrueTrue if algorithm is running in live mode

You can use these arguments to create BaseData objects from different sources.

public class MyCustomDataType : BaseData
{
    public override BaseData Reader(
        SubscriptionDataConfig config,
        string line,
        DateTime date,
        bool isLiveMode)
    {
        if (string.IsNullOrWhiteSpace(line.Trim()))
        {
            return null;
        }

        if (isLiveMode)
        {
            var custom = JsonConvert.DeserializeObject<MyCustomDataType>(line);
            custom.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone);
            return custom;
        }

        if (!char.IsDigit(line[0]))
        {
            return null;
        }

        var data = line.Split(',');
        return new MyCustomDataType()
        {
            Time = DateTime.ParseExact(data[0], "yyyyMMdd", CultureInfo.InvariantCulture),
            EndTime = Time.AddDays(1),
            Symbol = config.Symbol,
            Value = data[1].IfNotNullOrEmpty(
                s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)),
        };
    }
}
class MyCustomDataType(PythonData):
    def reader(self,
         config: SubscriptionDataConfig,
         line: str,
         date: datetime,
         isLiveMode: bool) -> BaseData:

        if not line.strip():
            return None

        custom = MyCustomDataType()
        custom.symbol = config.symbol

        if isLiveMode:
            data = json.loads(line)
            custom.end_time =  Extensions.convert_from_utc(datetime.utcnow(), config.exchange_time_zone)
            custom.value = data["value"]
            return custom

        if not line[0].isdigit():
            return None

        data = line.split(',')
        custom.end_time = datetime.strptime(data[0], '%Y%m%d') + timedelta(1)
        custom.value = float(data[1])
        return custom

Live Trading Considerations

In live trading, we pass custom data to your algorithm as soon as it arrives. The time it arrives may not align with the time of other slices. Design your algorithm to handle unsychronized data so that you don't run into issues.

Demonstration Algorithm

The following example algorithm implements a custom data source for the Bitstamp API.

using Newtonsoft.Json;

namespace QuantConnect.Algorithm.CSharp
{
    public class CustomDataBitstampAlgorithm : QCAlgorithm
    {
        private Symbol _customDataSymbol;

        public override void Initialize()
        {
            SetStartDate(2012, 9, 13);
            SetEndDate(2021, 6, 20);

            _customDataSymbol = AddData<Bitstamp>("BTC", Resolution.Daily).Symbol;

            var history = History<Bitstamp>(_customDataSymbol, 200, Resolution.Daily);
            Debug($"We got {history.Count()} items from historical data request of {_customDataSymbol}.");
        }

        public void OnData(Bitstamp data)
        {
            Log($"{data.EndTime}: Close: {data.Close}");
            Plot(_customDataSymbol, "Price", data.Close);
        }

        public class Bitstamp : BaseData
        {
            [JsonProperty("timestamp")]
            public int Timestamp = 0;
            [JsonProperty("open")]
            public decimal Open = 0;
            [JsonProperty("high")]
            public decimal High = 0;
            [JsonProperty("low")]
            public decimal Low = 0;
            [JsonProperty("last")]
            public decimal Close = 0;
            [JsonProperty("bid")]
            public decimal Bid = 0;
            [JsonProperty("ask")]
            public decimal Ask = 0;
            [JsonProperty("vwap")]
            public decimal WeightedPrice = 0;
            [JsonProperty("volume")]
            public decimal VolumeBTC = 0;
            public decimal VolumeUSD = 0;

            public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode)
            {
                if (isLiveMode)
                {
                    return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest);
                }

                var source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv";
                return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile);
            }

            public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode)
            {
                if (string.IsNullOrWhiteSpace(line.Trim()))
                {
                    return null;
                }

                var coin = new Bitstamp() {Symbol = config.Symbol};

                if (isLiveMode)
                {
                    //Example Line Format:
                    //{"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"}
                    coin = JsonConvert.DeserializeObject<Bitstamp>(line);
                    coin.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone);
                    coin.Time = coin.EndTime.AddDays(-1);
                    coin.Value = coin.Close;
                    return coin;
                }

                //Example Line Format:
                //Date      Open   High    Low     Close   Volume (BTC)    Volume (Currency)   Weighted Price
                //2011-09-13 5.8    6.0     5.65    5.97    58.37138238,    346.0973893944      5.929230648356
                if (!char.IsDigit(line[0]))
                {
                    return null;
                }

                var data = line.Split(',');
                coin.Value = data[4].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                if (coin.Value == 0)
                {
                    return null;
                }

                coin.Time = DateTime.Parse(data[0], CultureInfo.InvariantCulture);
                coin.EndTime = coin.Time.AddDays(1);
                coin.Open = data[1].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.High = data[2].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.Low = data[3].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.VolumeBTC = data[5].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.VolumeUSD = data[6].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.WeightedPrice = data[7].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.Close = coin.Value;
                return coin;
            }
        }
    }
}
class CustomDataBitstampAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2012, 9, 13)
        self.set_end_date(2021, 6, 20)
        self.set_cash(100000)

        # Define the symbol and "type" of our generic data:
        self.custom_data_symbol = self.add_data(Bitstamp, "BTC").symbol

        history = self.history(Bitstamp, self.custom_data_symbol, 200, Resolution.DAILY)
        self.debug(f"We got {len(history)} items from historical data request of {self.custom_data_symbol}.")


    def on_data(self, slice):
        if self.custom_data_symbol not in slice:
            return

        data = slice[self.custom_data_symbol]
        self.log(f'{data.end_time}: Close: {data.close}')
        self.plot(self.custom_data_symbol, 'Price', data.close)


class Bitstamp(PythonData):

    def get_source(self, config, date, isLiveMode):
        if isLiveMode:
            return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.REST)

        source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv"
        return SubscriptionDataSource(source, SubscriptionTransportMedium.REMOTE_FILE)

    def reader(self, config, line, date, isLiveMode):

        if not line.strip():
            return None

        coin = Bitstamp()
        coin.symbol = config.symbol

        if isLiveMode:
            # Example Line Format:
            # {"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"}
            live_b_t_c = json.loads(line)

            # If value is zero, return None
            coin.value = float(live_b_t_c["last"])
            if coin.value == 0:
                return None

            coin.end_time =  Extensions.convert_from_utc(datetime.utcnow(), config.exchange_time_zone)
            coin.time = coin.end_time - timedelta(1)
            coin["Open"] = float(live_b_t_c["open"])
            coin["High"] = float(live_b_t_c["high"])
            coin["Low"] = float(live_b_t_c["low"])
            coin["Close"] = coin.value
            coin["Ask"] = float(live_b_t_c["ask"])
            coin["Bid"] = float(live_b_t_c["bid"])
            coin["VolumeBTC"] = float(live_b_t_c["volume"])
            coin["WeightedPrice"] = float(live_b_t_c["vwap"])
            return coin

        # Example Line Format:
        # Date      Open   High    Low     Close   Volume (BTC)    Volume (Currency)   Weighted Price
        # 2011-09-13 5.8    6.0     5.65    5.97    58.37138238,    346.0973893944      5.929230648356
        if not line[0].isdigit():
            return None

        data = line.split(',')

        # If value is zero, return None
        coin.value = float(data[4])
        if coin.value == 0:
            return None

        coin.time = datetime.strptime(data[0], "%Y-%m-%d")
        coin.end_time = coin.time + timedelta(1)
        coin["Open"] = float(data[1])
        coin["High"] = float(data[2])
        coin["Low"] = float(data[3])
        coin["Close"] = coin.value
        coin["VolumeBTC"] = float(data[5])
        coin["VolumeUSD"] = float(data[6])
        coin["WeightedPrice"] = float(data[7])
        return coin

To save this algorithm to your cloud projects, clone itclone it.

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: