| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0.283 Tracking Error 0.199 Treynor Ratio 0 Total Fees $0.00 |
from collections import Counter
import dateutil
import json
from missingdatatracker import *
import parameters
class ParticleVentralContainmentField(QCAlgorithm):
'''
Select a universe of securities then see how many data points are missing for them at hourly resolution.
'''
def Initialize(self):
self.delisted = set()
self.missingDataTrackers = {}
self.universeSelected = False
self.Debug(parameters.final)
startDate = datetime.strptime(parameters.start, "%Y-%m-%d")
endDate = datetime.strptime(parameters.end, "%Y-%m-%d")
self.SetStartDate(startDate)
self.SetEndDate(endDate)
self.SetCash(100000) # Set Strategy Cash
self.UniverseSettings.Resolution = Resolution.Hour
self.UniverseSettings.FillForward = False
self.coarseUniverse = self.AddUniverse(self.CoarseFilter)
self.usEquitiesHours = self.MarketHoursDatabase.GetEntry("usa", None, SecurityType.Equity).ExchangeHours
self.dataWhenMarketClosed = []
def CoarseFilter(self, coarse):
if self.universeSelected:
return Universe.Unchanged
self.universeSelected = True
usEquities = [c for c in coarse if c.Symbol.ID.Market.lower() == "usa" and c.Symbol.SecurityType == SecurityType.Equity]
usEquities.sort(key=lambda c: c.DollarVolume, reverse=True)
universe = [c.Symbol for c in usEquities[:500]]
self.Debug(f"{[s.Value for s in universe]}")
for symbol in universe:
if symbol not in self.missingDataTrackers:
mdt = MissingDataTracker(self, symbol)
self.missingDataTrackers[symbol] = mdt
return universe
def OnData(self, slice):
for kvp in slice.Delistings:
if kvp.Value.Type == DelistingType.Delisted:
if kvp.Key in self.missingDataTrackers:
self.Debug(f"{kvp.Key.Value} delisted")
self.missingDataTrackers[kvp.Key].TrackRun()
self.delisted.add(kvp.Key)
if self.Time.hour >= 9 and self.Time.hour <= 16 and self.Time.minute == 0:
for symbol in self.coarseUniverse.Securities.Keys:
if not self.usEquitiesHours.IsOpen(self.Time - timedelta(minutes=1), False): # Subtract 1 minute because last data comes at 4pm when market is closed
self.dataWhenMarketClosed.append((self.Time, symbol))
if symbol in self.delisted:
continue
mdt = self.missingDataTrackers[symbol]
if mdt.lastTime != self.Time:
if slice.Bars.ContainsKey(symbol) and slice.Bars[symbol].Volume > 0:
mdt.DataReceived(self.Time)
elif self.Securities[symbol].Exchange.Hours.IsOpen(self.Time, False):
mdt.DataMissing(self.Time)
self.SetRuntimeStatistic("received", sum(mdt.receivedCnt for mdt in self.missingDataTrackers.values()))
self.SetRuntimeStatistic("missing", sum(mdt.missingCnt for mdt in self.missingDataTrackers.values()))
self.SetRuntimeStatistic("symbolsWithMissing", sum(mdt.missingCnt > 0 for mdt in self.missingDataTrackers.values()))
def OnEndOfAlgorithm(self):
# Output in a format that makes it easy to verify in research environment
# dict mapping security identifier to array of arrays
obj = {}
totalReceived = 0
totalMissing = 0
symbolsWithMissing = 0
for (symbol,mdt) in self.missingDataTrackers.items():
mdt.TrackRun()
# Check correctness before we dump a bunch of logs
for i in range(0, len(mdt.runs), 2):
if i+1 < len(mdt.runs) and mdt.runs[i][1] >= mdt.runs[i+1][0]:
self.Debug("Runs overlap")
self.Debug(f"{symbol.Value} rcvd={mdt.receivedCnt} missing={mdt.missingCnt} runs={[(a.isoformat(), b.isoformat()) for (a,b) in mdt.runs]}")
return
totalReceived += mdt.receivedCnt
totalMissing += mdt.missingCnt
if mdt.runs:
symbolsWithMissing += 1
obj[str(symbol.ID)] = [[start.isoformat(), end.isoformat()] for (start, end) in mdt.runs]
dmc = Counter()
for d in self.dataWhenMarketClosed:
dmc[d[0]] += 1
self.Debug(f"len(dataWhenMarketClosed)={len(self.dataWhenMarketClosed)}")
self.Debug(f"{dmc}")
self.Debug(f"Missing rate = {totalMissing}/{totalMissing+totalReceived} = {totalMissing/(totalMissing+totalReceived)}")
self.Debug(f"Symbols with missing data = {symbolsWithMissing}/{self.coarseUniverse.Securities.Count} = {symbolsWithMissing/self.coarseUniverse.Securities.Count}")
self.Debug(json.dumps(obj))class MissingDataTracker:
def __init__(self, algorithm, symbol):
self.algorithm = algorithm
self.symbol = symbol
self.runStart = None
self.lastTime = algorithm.Time
self.runs = []
self.receivedCnt = 0
self.missingCnt = 0
def DataReceived(self, time):
#self.algorithm.Debug(f"{self.symbol.Value} DataReceived {time} missing={self.missing} lastDataPoint={self.lastDataPoint}")
self.TrackRun()
self.receivedCnt += 1
self.lastTime = time
def DataMissing(self, time):
#self.algorithm.Debug(f"{self.symbol.Value} DataMissing {time} missing={self.missing} lastDataPoint={self.lastDataPoint}")
if not self.runStart:
self.runStart = time
self.missingCnt += 1
self.lastTime = time
def TrackRun(self):
if self.runStart:
#self.algorithm.Debug(f"{self.symbol.Value} missing={self.missing} run={(self.lastDataPoint, time)}")
self.runs.append((self.runStart, self.lastTime))
self.runStart = Nonefrom datetime import date
import json
# The following line is what gets replaced by script
_parametersJson = '{"start": "2000-01-01", "end": "2000-12-31"}'
_defaults = {
################################################################################################################
# These are parameters that are passed to the backtesting framework
################################################################################################################
"start": date(2020, 1, 1).isoformat(),
"end": date(2020, 12, 31).isoformat(),
}
_parsedJson = json.loads(_parametersJson)
final = {k: _parsedJson[k] if k in _parsedJson else _defaults[k] for k in _defaults}
start = final["start"]
end = final["end"]