I'm really struggling to get this to work. 

I am focused on getting a backtest to work current. I want to import custom data alongside add_equity. It's second's resolution, via Polygon, and so this returns a json with many entries. I am having difficulty returning the custom data appropriately, and accessing its slice in on_data. I have tried UnfoldingCollections, as well as CSV and JSON format. I have tried using yield instead of returning a BaseDataCollection as well. 

 

Could someone please assist me in getting this to return data properly? Thank you.

 


from AlgorithmImports import *
from QuantConnect import *
from QuantConnect.Data import *
from QuantConnect.Python import *

import requests
import json
from datetime import datetime, timedelta
import pytz
from typing import Dict, Any



class MyTradingAlgo(QCAlgorithm):
    def initialize(self):
        equity = self.add_equity(ticker, Resolution.SECOND, extended_market_hours=False, fill_forward=True)
 
        symbol = equity.symbol
        self.symbol_cache.append(symbol)
        symbol_properties = self.symbol_properties_database.get_symbol_properties(symbol.id.market, symbol, SecurityType.EQUITY, 'USD')
        security_exchange_hours = self.market_hours_database.get_exchange_hours(symbol.id.market, symbol, SecurityType.EQUITY)
        
        polygon_live = self.add_data(PolygonLiveData, ticker, properties=symbol_properties, exchange_hours=security_exchange_hours, resolution=Resolution.SECOND, fill_forward=True)
        polygon_live_symbol = polygon_live.symbol

        if not hasattr(self, 'polygon_symbol_cache'):
            self.polygon_symbol_cache = {}
        self.polygon_symbol_cache[symbol] = polygon_live_symbol

    def on_data(self, data: Slice):
         try:
             if hasattr(algorithm, 'polygon_symbol_cache') and symbol in algorithm.polygon_symbol_cache:
                 
                 polygon_symbol = algorithm.polygon_symbol_cache[symbol]
                 polygon_dict = data.get(PolygonLiveData)
                 
                 # if polygon_dict and polygon_dict.ContainsKey(polygon_symbol):
                 #     poly_data = polygon_dict[polygon_symbol]
                 #     vwap = poly_data["Vwap"]
                 #     num_transactions = poly_data["Num_transactions"]
                 #     poly_bar_close = poly_data["Close"]
                 #     poly_bar_open = poly_data["Open"]
                 #     poly_bar_high = poly_data["High"]
                 #     poly_bar_low = poly_data["Low"]
                 #     poly_bar_volume = poly_data["Volume"]
                 #     #poly_original_time_stamp = poly_data['polygon_timestamp']
                 #     poly_time = poly_data.EndTime

                 
                 if polygon_dict and polygon_dict.ContainsKey(polygon_symbol):
                     # This is the BaseDataCollection returned from reader()
                     collection = polygon_dict[polygon_symbol]

                     # Access individual PolygonLiveData bars
                     if collection.Data and len(collection.Data) > 0:
                         poly_bar = collection.Data[-1]  # last bar in the collection

                         vwap = poly_bar["Vwap"]
                         num_transactions = poly_bar["Num_transactions"]
                         poly_bar_close = poly_bar["Close"]
                         poly_bar_open = poly_bar["Open"]
                         poly_bar_high = poly_bar["High"]
                         poly_bar_low = poly_bar["Low"]
                         poly_bar_volume = poly_bar["Volume"]
                         poly_time = poly_bar.EndTime

                         self.Log(f"Polygon VWAP={vwap}, Close={poly_bar_close}, Time={poly_time}")
                 
         except Exception as e:
             pass

class PolygonLiveData(PythonData):

    def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource:
        """Return the Polygon aggregates endpoint URL for the given date."""
        ticker = str(config.Symbol.Value)
        api_key = "Redacted"
        base_url = "https://api.polygon.io"

        # --- Backtesting mode ---
        eastern = pytz.timezone("US/Eastern")
        from datetime import datetime as dt
        # Define market hours (Polygon timestamps are UTC)
        market_open = eastern.localize(dt(date.year, date.month, date.day, 0, 00))
        market_close = eastern.localize(dt(date.year, date.month, date.day, 23, 59))

        start_time = int(market_open.timestamp() * 1000)
        end_time = int(market_close.timestamp() * 1000)

        # Polygon aggregate bars endpoint (JSON format)
        endpoint = f"/v2/aggs/ticker/{ticker}/range/1/second/{start_time}/{end_time}"
        full_url = f"{base_url}{endpoint}?adjusted=true&sort=asc&limit=50000&apikey={api_key}"

        # Each JSON response = one collection of many bars
        return SubscriptionDataSource(
            source=full_url,
            transport_medium=SubscriptionTransportMedium.REMOTE_FILE,
            format=FileFormat.UNFOLDING_COLLECTION
        )

    def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool):
        """Parse Polygon JSON response (one per day) into a BaseDataCollection."""
        line = (line or "").strip()
        if not line or line.startswith("v") or line.startswith("ticker"):
            return None

        try:
            # Expecting one JSON object per line (from Polygon)
            if not line.startswith("{"):
                return None

            json_response = json.loads(line)
            results = json_response.get("results", [])
            if not results:
                return None

            eastern = pytz.timezone("US/Eastern")
            bars = []

            for bar in results:
                ts_ms = bar["t"]
                o, h, l, c = bar["o"], bar["h"], bar["l"], bar["c"]
                v = bar["v"]
                vw = bar.get("vw", 0)
                n = bar.get("n", 0)

                # Convert timestamp (ms since epoch UTC) → datetime Eastern
                dt_utc = datetime.fromtimestamp(ts_ms / 1000.0, tz=pytz.UTC)
                time_est = dt_utc.astimezone(eastern).replace(microsecond=0)

                datum = PolygonLiveData()
                datum.Symbol = config.Symbol
                datum.Time = time_est
                datum.EndTime = time_est
                datum.Value = float(c)

                datum["Open"] = float(o)
                datum["High"] = float(h)
                datum["Low"] = float(l)
                datum["Close"] = float(c)
                datum["Volume"] = int(v)
                datum["Num_transactions"] = int(n)
                datum["Vwap"] = float(vw)

                bars.append(datum)

            if not bars:
                return None

            # Return one BaseDataCollection (unfolded from this JSON)
            return BaseDataCollection(bars[-1].EndTime, config.Symbol, bars)

        except Exception as e:
            self.Error(f"Error parsing Polygon JSON: {e}")
            return None
                
    def RequiresMapping(self):
        """Indicates this data is linked to equity symbols"""
        return True
    
    def DefaultResolution(self):
        """Default resolution for this data source"""
        return Resolution.SECOND
    
    def SupportedResolutions(self):
        """Supported resolutions for this data source"""
        return [Resolution.SECOND]