Streaming Data

Key Concepts

Introduction

There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnData event. This page explores streaming a file's contents into your algorithm line-by-line.

Data Formats

Common data formats are CSV, JSON, and XML, but you can use any file type that can be read over the internet. Each request has a one-second overhead, so you should package your custom data to minimize requests. Bundle dates together where possible to speed up execution. Just ensure the data in the file is in chronological order.

Set Data Sources

The GetSource method in your custom data class instructs LEAN where to find the data. This method must return a SubscriptionDataSource object, which contains the data location and format.

The following table describes the arguments the SubscriptionDataSource accepts:

ArgumentData TypeDescriptionDefault Value
sourcestringstrData source location
transportMediumSubscriptionTransportMediumThe transport medium to be used to retrieve data from the source/td>
formatFileFormatThe format of the data within the sourceFileFormat.Csv
headersIEnumerable<KeyValuePair<string, string>>The headers to be used for this source. In cloud algorithms, each of the key-value pairs can consist of up to 1,000 characters.nullNone

The SubscriptionTransportMedium enumeration has the following members:

The FileFormat enumeration has the following members:

The following table describes the arguments the GetSource method accepts:

ArgumentData TypeDescription
configSubscriptionDataConfigThe subscription configuration
dateDateTimedatetimeDate of this source file
isLiveModebooltrueTrue if algorithm is running in live mode

You can use these arguments to create SubscriptionDataSource objects representing different locations and formats.

public class MyCustomDataType : BaseData
{
    public override SubscriptionDataSource GetSource(
        SubscriptionDataConfig config,
        DateTime date,
        bool isLive)
    {
        if (isLiveMode)
        {
            return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest);
        }

        var source = $"http://my-ftp-server.com/{config.Symbol.Value}/{date:yyyyMMdd}.csv";
        return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile);
    }
}
class MyCustomDataType(PythonData):
    def GetSource(self,
         config: SubscriptionDataConfig,
         date: datetime,
         isLive: bool) -> SubscriptionDataSource:
        
         if isLiveMode:
            return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest)

        source = f"http://my-ftp-server.com/{config.Symbol.Value}/{date:%Y%M%d}.csv"
        return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile)

Parse Custom Data

The Reader method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but the following table describes the properties you must set. When there is no useable data in a line, the method should return nullNone. LEAN repeatedly calls the Reader method until the date/time advances or it reaches the end of the file.

PropertyDescription
SymbolYou can set this property to config.Symbol.
TimeThe time when the data sample starts.
EndTimeThe time when the data sample ends and when LEAN should add the sample to a Slice.
ValueThe default data point value.

The following table describes the arguments the Reader method accepts:

ArgumentData TypeDescription
configSubscriptionDataConfigThe subscription configuration
linestringstrContent from the requested data source
dateDateTimedatetimeDate of this source file
isLiveModebooltrueTrue if algorithm is running in live mode

You can use these arguments to create BaseData objects from different sources.

public class MyCustomDataType : BaseData
{
    public override BaseData Reader(
        SubscriptionDataConfig config,
        string line,
        DateTime date,
        bool isLive)
    {
        if (string.IsNullOrWhiteSpace(line.Trim()))
        {
            return null;
        }

        if (isLiveMode)
        {
            var custom = JsonConvert.DeserializeObject<MyCustomDataType>(line);
            custom.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone);
            return custom;
        }

        if (!char.IsDigit(line[0]))
        {
            return null;
        }

        var data = line.Split(',');
        return new MyCustomDataType()
        {
            Time = DateTime.ParseExact(data[0], "yyyyMMdd", CultureInfo.InvariantCulture),
            EndTime = Time.AddDays(1),
            Symbol = config.Symbol,
            Value = data[1].IfNotNullOrEmpty(
                s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)),
        };
    }
}
class MyCustomDataType(PythonData):
    def Reader(self,
         config: SubscriptionDataConfig,
         line: str,
         date: datetime,
         isLive: bool) -> BaseData:

        if not line.strip():
            return None

        custom = MyCustomDataType()
        custom.Symbol = config.Symbol

        if isLiveMode:
            data = json.loads(line)
            custom.EndTime =  Extensions.ConvertFromUtc(datetime.utcnow(), config.ExchangeTimeZone)
            custom.Value = data["value"]
            return custom

        if not line[0].isdigit():
            return None

        data = line.split(',')
        custom.EndTime = datetime.strptime(data[0], '%Y%m%d') + timedelta(1)
        custom.Value = float(data[1])
        return custom

Demonstration Algorithm

The following example algorithm implements a custom data source for the Bitstamp API.

using Newtonsoft.Json;

namespace QuantConnect.Algorithm.CSharp
{
    public class CustomDataBitstampAlgorithm : QCAlgorithm
    {
        private Symbol _customDataSymbol;

        public override void Initialize()
        {
            SetStartDate(2012, 9, 13);
            SetEndDate(2021, 6, 20);

            _customDataSymbol = AddData<Bitstamp>("BTC", Resolution.Daily).Symbol;

            var history = History<Bitstamp>(_customDataSymbol, 200, Resolution.Daily);
            Debug($"We got {history.Count()} items from historical data request of {_customDataSymbol}.");
        }

        public void OnData(Bitstamp data)
        {
            Log($"{data.EndTime}: Close: {data.Close}");
            Plot(_customDataSymbol, "Price", data.Close);
        }

        public class Bitstamp : BaseData
        {
            [JsonProperty("timestamp")]
            public int Timestamp = 0;
            [JsonProperty("open")]
            public decimal Open = 0;
            [JsonProperty("high")]
            public decimal High = 0;
            [JsonProperty("low")]
            public decimal Low = 0;
            [JsonProperty("last")]
            public decimal Close = 0;
            [JsonProperty("bid")]
            public decimal Bid = 0;
            [JsonProperty("ask")]
            public decimal Ask = 0;
            [JsonProperty("vwap")]
            public decimal WeightedPrice = 0;
            [JsonProperty("volume")]
            public decimal VolumeBTC = 0;
            public decimal VolumeUSD = 0;

            public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode)
            {
                if (isLiveMode)
                {
                    return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest);
                }

                var source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv";
                return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile);
            }

            public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode)
            {
                if (string.IsNullOrWhiteSpace(line.Trim()))
                {
                    return null;
                }

                var coin = new Bitstamp() {Symbol = config.Symbol};

                if (isLiveMode)
                {
                    //Example Line Format:
                    //{"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"}
                    coin = JsonConvert.DeserializeObject<Bitstamp>(line);
                    coin.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone);
                    coin.Time = coin.EndTime.AddDays(-1);
                    coin.Value = coin.Close;
                    return coin;
                }

                //Example Line Format:
                //Date      Open   High    Low     Close   Volume (BTC)    Volume (Currency)   Weighted Price
                //2011-09-13 5.8    6.0     5.65    5.97    58.37138238,    346.0973893944      5.929230648356
                if (!char.IsDigit(line[0]))
                {
                    return null;
                }

                var data = line.Split(',');
                coin.Value = data[4].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                if (coin.Value == 0)
                {
                    return null;
                }

                coin.Time = DateTime.Parse(data[0], CultureInfo.InvariantCulture);
                coin.EndTime = coin.Time.AddDays(1);
                coin.Open = data[1].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.High = data[2].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.Low = data[3].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.VolumeBTC = data[5].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.VolumeUSD = data[6].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.WeightedPrice = data[7].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
                coin.Close = coin.Value;
                return coin;
            }
        }
    }
}
class CustomDataBitstampAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2012, 9, 13)
        self.SetEndDate(2021, 6, 20)
        self.SetCash(100000)

        # Define the symbol and "type" of our generic data:
        self.custom_data_symbol = self.AddData(Bitstamp, "BTC").Symbol

        history = self.History(Bitstamp, self.custom_data_symbol, 200, Resolution.Daily)
        self.Debug(f"We got {len(history)} items from historical data request of {self.custom_data_symbol}.")


    def OnData(self, slice):
        if self.custom_data_symbol not in slice:
            return

        data = slice[self.custom_data_symbol]
        self.Log(f'{data.EndTime}: Close: {data.Close}')
        self.Plot(self.custom_data_symbol, 'Price', data.Close)


class Bitstamp(PythonData):

    def GetSource(self, config, date, isLiveMode):
        if isLiveMode:
            return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest)

        source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv"
        return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile)

    def Reader(self, config, line, date, isLiveMode):

        if not line.strip():
            return None

        coin = Bitstamp()
        coin.Symbol = config.Symbol

        if isLiveMode:
            # Example Line Format:
            # {"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"}
            liveBTC = json.loads(line)

            # If value is zero, return None
            coin.Value = float(liveBTC["last"])
            if coin.Value == 0:
                return None

            coin.EndTime =  Extensions.ConvertFromUtc(datetime.utcnow(), config.ExchangeTimeZone)
            coin.Time = coin.EndTime - timedelta(1)
            coin["Open"] = float(liveBTC["open"])
            coin["High"] = float(liveBTC["high"])
            coin["Low"] = float(liveBTC["low"])
            coin["Close"] = coin.Value
            coin["Ask"] = float(liveBTC["ask"])
            coin["Bid"] = float(liveBTC["bid"])
            coin["VolumeBTC"] = float(liveBTC["volume"])
            coin["WeightedPrice"] = float(liveBTC["vwap"])
            return coin

        # Example Line Format:
        # Date      Open   High    Low     Close   Volume (BTC)    Volume (Currency)   Weighted Price
        # 2011-09-13 5.8    6.0     5.65    5.97    58.37138238,    346.0973893944      5.929230648356
        if not line[0].isdigit():
            return None

        data = line.split(',')

        # If value is zero, return None
        coin.Value = float(data[4])
        if coin.Value == 0:
            return None

        coin.Time = datetime.strptime(data[0], "%Y-%m-%d")
        coin.EndTime = coin.Time + timedelta(1)
        coin["Open"] = float(data[1])
        coin["High"] = float(data[2])
        coin["Low"] = float(data[3])
        coin["Close"] = coin.Value
        coin["VolumeBTC"] = float(data[5])
        coin["VolumeUSD"] = float(data[6])
        coin["WeightedPrice"] = float(data[7])
        return coin

To save this algorithm to your cloud projects, clone itclone it.

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: