Streaming Data

Key Concepts

Introduction

There are two techniques to import data into your algorithm. You can either manually import the entire file or stream the file line-by-line into your algorithm's OnDataon_data event. This page explores streaming a file's contents into your algorithm line-by-line. The data you import can be from a remote server or the Object Store.

Data Formats

Common data formats are CSV, JSON, XML, and ZIP but you can use any file type that can be read over the internet. For Excel files, double check the raw data format for parsing in the data reader, since data will be formatted for convenient visualization in Excel application view. To avoid confusion of data format, save the spreadsheet as a CSV file and open it in a text editor to confirm the raw data format.

If you import from a remote file provider, each request has a one-second overhead, so package your custom data to minimize requests. Bundle dates together where possible to speed up execution. The Object Store file provider gives you the fastest execution because you don't need to download the files on every execution.

Point-In-Time Tickers

Tickers are a string shortcode representation for an asset. Some examples of popular tickers include "AAPL" for Apple Corporation and "IBM" for International Business Machines Corporation. These tickers often change when the company rebrands or they undergo a merger or reverse merger.

The ticker of an asset is not the same as the Symbol. Symbol objects are permanent and track the underlying entity. When a company rebrands or changes its name, the Symbol object remains constant, giving algorithms a way to reliably track assets over time.

Tickers are also often reused by different brokerages. For example Coinbase, a leading US Crypto Brokerage lists the "BTCUSD" ticker for trading. Its competitor, Bitfinex, also lists "BTCUSD". You can trade both tickers with LEAN. Symbol objects allow LEAN to identify which market you reference in your algorithms.

To create a Symbol object for a point-in-time ticker, call the GenerateEquitygenerate_equity method to create the security identifier and then call the Symbol constructor. For example, Heliogen, Inc. changed their ticker from ATHN to HLGN on December 31, 2021. To convert the ATHN ticker to the Equity Symbol, type:

# Generate an Equity security identifier for a ticker and then create a symbol with it.
ticker = "ATHN"
security_id = SecurityIdentifier.generate_equity(ticker, Market.USA, mapping_resolve_date=datetime(2021, 12, 1))
symbol = Symbol(security_id, ticker)
// Generate an Equity security identifier for a ticker and then create a symbol with it.
var ticker = "ATHN";
var securityID = SecurityIdentifier.GenerateEquity(ticker, Market.USA, mappingResolveDate:new DateTime(2021, 12, 1));
var symbol = new Symbol(securityID, ticker);

In the preceding code snippet, the mappingResolveDatemapping_resolve_date must be a date when the point-in-time ticker was trading.

For examples of point-in-time tickers in custom data types, refer to the CSV Format Example and JSON Format Example.

Set Data Sources

The GetSourceget_source method in your custom data class instructs LEAN where to find the data.

public class MyCustomDataType : BaseData
{
    public override SubscriptionDataSource GetSource(
        SubscriptionDataConfig config,
        DateTime date,
        bool isLiveMode)
    {
        if (isLiveMode)
        {
            return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest);
        }

        var source = $"http://my-ftp-server.com/{config.Symbol.Value}/{date:yyyyMMdd}.csv";
        return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile);

        /*
        // Example of loading from the Object Store:
        return new SubscriptionDataSource(Bitstamp.KEY, 
            SubscriptionTransportMedium.ObjectStore);

        // Example of loading a remote zip file:
        return new SubscriptionDataSource(
            "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip",
            SubscriptionTransportMedium.RemoteFile,
            FileFormat.ZipEntryName);

        // Example of loading a remote zip file and accessing a CSV file inside it:
        return new SubscriptionDataSource(
            "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip#csv_file_10.csv",
            SubscriptionTransportMedium.RemoteFile,
            FileFormat.ZipEntryName);
        */
    }
}
class MyCustomDataType(PythonData):
    def get_source(self,
         config: SubscriptionDataConfig,
         date: datetime,
         is_live_mode: bool) -> SubscriptionDataSource:
        
         if is_live_mode:
            return SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.REST)

        source = f"http://my-ftp-server.com/{config.symbol.value}/{date:%Y%M%d}.csv"
        return SubscriptionDataSource(source, SubscriptionTransportMedium.REMOTE_FILE)

        # Example of loading from the Object Store:
        # return SubscriptionDataSource(Bitstamp.KEY, SubscriptionTransportMedium.OBJECT_STORE)

        # Example of loading a remote zip file:
        # return SubscriptionDataSource(
        #     "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip",
        #     SubscriptionTransportMedium.REMOTE_FILE,
        #     FileFormat.ZIP_ENTRY_NAME
        # )

        # Example of loading a remote zip file and accessing a CSV file inside it:
        # return SubscriptionDataSource(
        #     "https://cdn.quantconnect.com/uploads/multi_csv_zipped_file.zip#csv_file_10.csv",
        #     SubscriptionTransportMedium.REMOTE_FILE,
        #     FileFormat.ZIP_ENTRY_NAME
        # )

The following table describes the arguments the GetSourceget_source method accepts:

ArgumentData TypeDescription
configSubscriptionDataConfigThe subscription configuration
dateDateTimedatetimeDate of this source file
isLiveModeis_live_modebooltrueTrue if algorithm is running in live mode

You can use these arguments to create SubscriptionDataSource objects representing different locations and formats. The following table describes the arguments the SubscriptionDataSource accepts:

ArgumentData TypeDescriptionDefault Value
sourcestringstrData source location
transportMediumtransport_mediumSubscriptionTransportMediumThe transport medium to be used to retrieve data from the source
formatFileFormatThe format of the data within the sourceFileFormat.Csv
headersIEnumerable<KeyValuePair<string, string>>The headers to be used for this source. In cloud algorithms, each of the key-value pairs can consist of up to 1,000 characters.nullNone

The FileFormat enumeration has the following members:

The SubscriptionTransportMedium enumeration has the following members:

MemberDescriptionExample
LocalFileLOCAL_FILEThe data comes from diskLean.DataSource.CBOE
RemoteFileREMOTE_FILEThe data is downloaded from a remote sourceCustom Securities Examples
RestRESTThe data comes from a rest call that is polled and returns a single line/data point of informationLiveModelive_mode case of Demonstration Algorithm
ObjectStoreOBJECT_STOREThe data comes from the object storeExample of Custom Data

Parse Custom Data

The Readerreader method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but the following table describes the properties you must set. When there is no useable data in a line, the method should return nullNone. LEAN repeatedly calls the Readerreader method until the date/time advances or it reaches the end of the file.

PropertyDescription
SymbolsymbolYou can set this property to config.Symbolsymbol.
TimetimeThe time when the data sample starts.
EndTimeend_timeThe time when the data sample ends and when LEAN should add the sample to a Slice.
ValuevalueThe default data point value (decimalfloat).

The following table describes the arguments the Readerreader method accepts:

ArgumentData TypeDescription
configSubscriptionDataConfigThe subscription configuration
linestringstrContent from the requested data source
dateDateTimedatetimeDate of this source file
isLiveModeis_live_modebooltrueTrue if algorithm is running in live mode

You can use these arguments to create BaseData objects from different sources.

public class MyCustomDataType : BaseData
{
    public override BaseData Reader(
        SubscriptionDataConfig config,
        string line,
        DateTime date,
        bool isLiveMode)
    {
        if (string.IsNullOrWhiteSpace(line.Trim()))
        {
            return null;
        }

        if (isLiveMode)
        {
            var custom = JsonConvert.DeserializeObject<MyCustomDataType>(line);
            custom.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone);
            return custom;
        }

        if (!char.IsDigit(line[0]))
        {
            return null;
        }

        var data = line.Split(',');
        return new MyCustomDataType()
        {
            Time = DateTime.ParseExact(data[0], "yyyyMMdd", CultureInfo.InvariantCulture),
            EndTime = Time.AddDays(1),
            Symbol = config.Symbol,
            Value = data[1].IfNotNullOrEmpty(
                s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture)),
        };
    }
}
class MyCustomDataType(PythonData):
    def reader(self,
         config: SubscriptionDataConfig,
         line: str,
         date: datetime,
         is_live_mode: bool) -> BaseData:

        if not line.strip():
            return None

        custom = MyCustomDataType()
        custom.symbol = config.symbol

        if is_live_mode:
            data = json.loads(line)
            custom.end_time =  Extensions.convert_from_utc(datetime.utcnow(), config.exchange_time_zone)
            custom.value = data["value"]
            return custom

        if not line[0].isdigit():
            return None

        data = line.split(',')
        custom.end_time = datetime.strptime(data[0], '%Y%m%d') + timedelta(1)
        custom.value = float(data[1])
        return custom

Unsorted Data

By default, LEAN expects the data in chronological order. If the data is unsorted, set the Sortsort property of the SubscriptionDataSource object to trueTrue.

public class MyCustomDataType : BaseData
{
    public override SubscriptionDataSource GetSource(
        SubscriptionDataConfig config,
        DateTime date,
        bool isLiveMode)
    {
        var source = $"http://my-ftp-server.com/{config.Symbol.Value}/{date:yyyyMMdd}.csv";
        return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile)
        { 
            Sort = true
        };
    }
}
class MyCustomDataType(PythonData):
    def get_source(self,
            config: SubscriptionDataConfig,
            date: datetime,
            is_live_mode: bool) -> SubscriptionDataSource:
            
        source = f"http://my-ftp-server.com/{config.symbol.value}/{date:%Y%M%d}.csv"
        subscription = SubscriptionDataSource(source, SubscriptionTransportMedium.REMOTE_FILE)
        subscription.sort = True
        return subscription

LEAN uses EndTimeend_time property of your custom data to sort it.

Object Store

The Object Store file provider gives you the fastest execution because you don't need to download the files on every execution. To upload the data files into the Object Store, use the Algorithm Lab, CLI, or Research Environment. To then pull the data from the Object Store into an algorithm, set the custom data source to read from the file in Object Store and parse the data in the readerReader method.

public class MyCustomDataType : BaseData
{
    public override SubscriptionDataSource GetSource(
        SubscriptionDataConfig config,
        DateTime date,
        bool isLiveMode)
    {
        return new SubscriptionDataSource("<YourCSVKey>", SubscriptionTransportMedium.ObjectStore, FileFormat.Csv);
        // return new SubscriptionDataSource("<YourJSONKey>", SubscriptionTransportMedium.ObjectStore, FileFormat.UnfoldingCollection);
    }
}
class MyCustomDataType(PythonData):
    def get_source(self,
            config: SubscriptionDataConfig,
            date: datetime,
            is_live_mode: bool) -> SubscriptionDataSource:
            
        return SubscriptionDataSource("<your_csv_key>", SubscriptionTransportMedium.OBJECT_STORE, FileFormat.CSV)
        # return new SubscriptionDataSource("<your_json_key>", SubscriptionTransportMedium.ObjectStore, FileFormat.UNFOLDING_COLLECTION);

Set Properties

To set the Symbol Properties of the custom data, provide a SymbolProperties object when you subscribe to the dataset. The ticker you pass to the SymbolProperties constructor and the AddDataadd_data method must be the same.

To set the Exchange Hours of the custom data, provide a SecurityExchangeHours object when you subscribe to the dataset. The default hours are for the market to be open 24/7.

var ticker = "ABC";
var properties = new SymbolProperties("description", "USD", 1, 0.01, 0.01, ticker);
var hours = MarketHoursDatabase.GetEntry(Market.USA, "SPY", SecurityType.Equity).ExchangeHours;
AddData<CustomData>(ticker, properties, hours);
ticker = "ABC"
properties = SymbolProperties("description", "USD", 1, 0.01, 0.01, ticker)
hours = MarketHoursDatabase.get_entry(Market.USA, "SPY", SecurityType.EQUITY).exchange_hours
self.add_data(CustomData, ticker, properties, hours)

Live Trading Considerations

In live trading, we pass custom data to your algorithm as soon as it arrives. The time it arrives may not align with the time of other slices. Design your algorithm to handle unsychronized data so that you don't run into issues.

Demonstration Algorithm

The following example algorithm implements a custom data source for the Bitstamp API.

using Newtonsoft.Json;

public class CustomDataBitstampAlgorithm : QCAlgorithm
{
    private Symbol _customDataSymbol;

    public override void Initialize()
    {
        SetStartDate(2020, 9, 1);
        SetEndDate(2020, 12, 31);
        // Define the symbol and "type" of our custom data.
        _customDataSymbol = AddData<Bitstamp>("BTC", Resolution.Daily).Symbol;
        // Get some historical data.
        var history = History<Bitstamp>(_customDataSymbol, 200, Resolution.Daily);
    }

    // Get the data of the current day.
    public void OnData(Bitstamp data)
    {
        Plot(_customDataSymbol, "Price", data.Close);
    }

    public class Bitstamp : BaseData
    {
        public int Timestamp = 0;
        public decimal Open = 0, High = 0, Low = 0, Close = 0, Bid = 0, Ask = 0, WeightedPrice = 0, VolumeBTC = 0, VolumeUSD = 0;
        
        public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode)
        {
            if (isLiveMode)
            {
                return new SubscriptionDataSource("https://www.bitstamp.net/api/ticker/", SubscriptionTransportMedium.Rest);
            }
            var source = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv";
            return new SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile);
        }

        public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode)
        {
            if (string.IsNullOrWhiteSpace(line.Trim()))
            {
                return null;
            }
            var coin = new Bitstamp() {Symbol = config.Symbol};
            // In live trading, parse the JSON file.
            if (isLiveMode)
            {
                //Example Line Format:
                //{"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"}
                coin = JsonConvert.DeserializeObject<Bitstamp>(line);
                coin.EndTime = DateTime.UtcNow.ConvertFromUtc(config.ExchangeTimeZone);
                coin.Time = coin.EndTime.AddDays(-1);
                coin.Value = coin.Close;
                return coin;
            }

            // In backtests, parse the CSV file.
            //Example Line Format:
            //Date      Open   High    Low     Close   Volume (BTC)    Volume (Currency)   Weighted Price
            //2011-09-13 5.8    6.0     5.65    5.97    58.37138238,    346.0973893944      5.929230648356
            if (!char.IsDigit(line[0]))
            {
                return null;
            }
            var data = line.Split(',');
            // If value is zero, return null.
            coin.Value = data[4].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            if (coin.Value == 0)
            {
                return null;
            }
            coin.Time = DateTime.Parse(data[0], CultureInfo.InvariantCulture);
            coin.EndTime = coin.Time.AddDays(1);
            coin.Open = data[1].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            coin.High = data[2].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            coin.Low = data[3].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            coin.VolumeBTC = data[5].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            coin.VolumeUSD = data[6].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            coin.WeightedPrice = data[7].IfNotNullOrEmpty(s => decimal.Parse(s, NumberStyles.Any, CultureInfo.InvariantCulture));
            coin.Close = coin.Value;
            return coin;
        }
    }
}
class CustomDataBitstampAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2020, 9, 1)
        self.set_end_date(2020, 12, 31)
        self.set_cash(100000)
        # Define the symbol and "type" of our generic data:
        self._custom_data_symbol = self.add_data(Bitstamp, "BTC").symbol
        # Get some historical data.
        history = self.history(
            Bitstamp, self._custom_data_symbol, 200, Resolution.DAILY
        )

    def on_data(self, slice):
        # Get the data of the current day.
        data = slice.get(self._custom_data_symbol)
        if not data:
            return
        self.plot(self._custom_data_symbol, 'Price', data.close)


class Bitstamp(PythonData):

    def get_source(self, config, date, is_live_mode):
        if is_live_mode:
            return SubscriptionDataSource(
                'https://www.bitstamp.net/api/ticker/', 
                SubscriptionTransportMedium.REST
            )
        return SubscriptionDataSource(
            "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv", 
            SubscriptionTransportMedium.REMOTE_FILE
        )

    def reader(self, config, line, date, is_live_mode):
        if not line.strip():
            return None
        coin = Bitstamp()
        coin.symbol = config.symbol
        # In live trading, parse the JSON file.
        if is_live_mode:
            # Example Line Format:
            # {"high": "441.00", "last": "421.86", "timestamp": "1411606877", "bid": "421.96", "vwap": "428.58", "volume": "14120.40683975", "low": "418.83", "ask": "421.99"}
            live_btc = json.loads(line)
            # If value is zero, return None
            coin.value = float(live_btc["last"])
            if coin.value == 0:
                return None
            coin.end_time =  Extensions.convert_from_utc(
                datetime.utcnow(), config.exchange_time_zone
            )
            coin.time = coin.end_time - timedelta(1)
            coin["Open"] = float(live_btc["open"])
            coin["High"] = float(live_btc["high"])
            coin["Low"] = float(live_btc["low"])
            coin["Close"] = coin.value
            coin["Ask"] = float(live_btc["ask"])
            coin["Bid"] = float(live_btc["bid"])
            coin["VolumeBTC"] = float(live_btc["volume"])
            coin["WeightedPrice"] = float(live_btc["vwap"])
            return coin

        # In backtests, parse the CSV file.
        # Example Line Format:
        # Date      Open   High    Low     Close   Volume (BTC)    Volume (Currency)   Weighted Price
        # 2011-09-13 5.8    6.0     5.65    5.97    58.37138238,    346.0973893944      5.929230648356
        if not line[0].isdigit():
            return None
        data = line.split(',')
        # If value is zero, return None
        coin.value = float(data[4])
        if coin.value == 0:
            return None
        coin.time = datetime.strptime(data[0], "%Y-%m-%d")
        coin.end_time = coin.time + timedelta(1)
        coin["Open"] = float(data[1])
        coin["High"] = float(data[2])
        coin["Low"] = float(data[3])
        coin["Close"] = coin.value
        coin["VolumeBTC"] = float(data[5])
        coin["VolumeUSD"] = float(data[6])
        coin["WeightedPrice"] = float(data[7])
        return coin

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: