| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
# region imports
from AlgorithmImports import *
from collections import defaultdict
# endregion
"""
This algorithm is designed to identify and quantify discrepancies between manually aggregated minute-level market data
and the automatically consolidated data produced by QuantConnects TradeBarConsolidator.
By consolidating data manually for multiple resolutions (e.g., 5 minutes, 30 minutes) and comparing it to the consolidator
output, the algorithm highlights any differences in open, high, low, close, and volume values, providing statistical
summaries of the deviations.
Note: WorkingData is also affected, in fact incorrect looking WorkingData was how I identified the problem.
"""
class ConsolidatorTestAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 6)
self.SetEndDate(self.start_date)
self.SetCash(100000)
""" Mismatch Severity Display Threshold: Only display mismatches with percentage differences above this threshold (set to 0 to see all diffs)"""
self.diff_display_thresh = 0.05
# Choose a single symbol
self.ext_hrs = True
self.symbol = self.AddEquity("AAPL", Resolution.Minute, extendedMarketHours=self.ext_hrs).Symbol
# Set up expected and actual consolidation times
self.expected_consolidation_times = {}
self.actual_consolidation_times = {}
# Dict to store stats
self.mismatch_stats = defaultdict(lambda: defaultdict(list))
# Set up manual aggregation dictionary
self.manual_aggregation = {}
# Set up consolidators
# self.target_resolutions = ['5m', '30m']
self.target_resolutions = ['30m']
self.consolidators = {}
for res in self.target_resolutions:
interval = self.str_to_timedelta(res)
consolidator = TradeBarConsolidator(interval)
consolidator.DataConsolidated += self.OnDataConsolidated
self.SubscriptionManager.AddConsolidator(self.symbol, consolidator)
self.consolidators[res] = consolidator
self.log("(a) Mismatches: Consolidator vs Manual Aggregation")
def OnData(self, data):
# Current time
current_time = self.Time
# Market open/close times
exchange = self.Securities[self.symbol].Exchange
market_open_time = exchange.Hours.GetNextMarketOpen(current_time.date(), self.ext_hrs)
market_close_time = exchange.Hours.GetNextMarketClose(current_time, self.ext_hrs)
# Operate within market hours
if current_time >= market_open_time and current_time < market_close_time:
# Loop through consolitors
for period, consolidator in self.consolidators.items():
# Add 1-minute bar to log - to be aggregated
if period not in self.manual_aggregation:
self.manual_aggregation[period] = []
self.manual_aggregation[period].append({
'open': data[self.symbol].Open,
'high': data[self.symbol].High,
'low': data[self.symbol].Low,
'close': data[self.symbol].Close,
'volume': data[self.symbol].Volume
})
# Get consolidator interval / resolution
interval = self.str_to_timedelta(period)
# Check if we're at the end of a consolidation period
if (current_time - market_open_time) % interval == timedelta(0):
if period not in self.expected_consolidation_times:
self.expected_consolidation_times[period] = []
self.expected_consolidation_times[period].append(current_time)
def OnDataConsolidated(self, sender, bar):
# Identify the resolution of the consolidator
period = next(key for key, value in self.consolidators.items() if value == sender)
if period not in self.actual_consolidation_times:
self.actual_consolidation_times[period] = []
self.actual_consolidation_times[period].append(bar.EndTime)
# Aggregate data manually for comparison
manual_data = self.AggregateManualData(period)
if manual_data:
open_price, high_price, low_price, close_price, volume = manual_data
# Log mismatches above 2% difference threshold
self.RecordMismatchStats('Open', period, bar.EndTime, bar.Open, open_price)
self.RecordMismatchStats('Close', period, bar.EndTime, bar.Close, close_price)
self.RecordMismatchStats('High', period, bar.EndTime, bar.High, high_price)
self.RecordMismatchStats('Low', period, bar.EndTime, bar.Low, low_price)
self.RecordMismatchStats('Volume', period, bar.EndTime, bar.Volume, volume)
# Print all mismatches
# if bar.Open != open_price:
# self.log(f"Open mismatch for {self.symbol} ({period}): {bar.Open} vs {open_price}")
# if bar.Close != close_price:
# self.log(f"Close mismatch for {self.symbol} ({period}): {bar.Close} vs {close_price}")
# if bar.High != high_price:
# self.log(f"High mismatch for {self.symbol} ({period}): {bar.High} vs {high_price}")
# if bar.Low != low_price:
# self.log(f"Low mismatch for {self.symbol} ({period}): {bar.Low} vs {low_price}")
# if bar.Volume != volume:
# self.log(f"Volume mismatch for {self.symbol} ({period}): {bar.Volume} vs {volume}")
self.manual_aggregation[period] = []
def AggregateManualData(self, period):
if period in self.manual_aggregation and self.manual_aggregation[period]:
data = self.manual_aggregation[period]
open_price = data[0]['open']
close_price = data[-1]['close']
high_price = max(x['high'] for x in data)
low_price = min(x['low'] for x in data)
volume = sum(x['volume'] for x in data)
return open_price, high_price, low_price, close_price, volume
return None
def str_to_timedelta(self, time_str):
unit = time_str[-1]
value = int(time_str[:-1])
if unit == 'm':
return timedelta(minutes=value)
elif unit == 'h':
return timedelta(hours=value)
elif unit == 'd':
return timedelta(days=value)
raise ValueError(f"Invalid time string {time_str}")
def RecordMismatchStats(self, field_name, period, end_time, actual, expected):
abs_diff = abs(actual - expected)
rel_diff = abs_diff / expected if expected != 0 else float('inf')
self.mismatch_stats[period][field_name].append((abs_diff, rel_diff))
if abs_diff > self.diff_display_thresh * expected: # You can adjust this threshold as needed
self.log(f"{end_time} | {field_name} mismatch for {self.symbol} ({period}): {actual} vs {expected} (abs_diff={abs_diff}, rel_diff={rel_diff:.4%})")
def OnEndOfAlgorithm(self):
for period, expected_times in self.expected_consolidation_times.items():
actual_times = self.actual_consolidation_times.get(period, [])
missing_times = [time for time in expected_times if time not in actual_times]
if missing_times:
self.Debug(f"Missing consolidations for {self.symbol} ({period}): {missing_times}")
# Calculate and print statistics
self.log("(b) Summary Statistics:")
for period, field_stats in self.mismatch_stats.items():
for field_name, diffs in field_stats.items():
abs_diffs, rel_diffs = zip(*diffs)
self.log(f"{field_name} ({period}): "
f"Mean Abs Diff = {np.mean(abs_diffs):.4f}, "
f"Max Abs Diff = {np.max(abs_diffs):.4f}, "
f"Mean Rel Diff = {np.mean(rel_diffs):.4%}, "
f"Max Rel Diff = {np.max(rel_diffs):.4%}")