| Overall Statistics |
|
Total Orders 2897 Average Win 0.01% Average Loss -0.01% Compounding Annual Return -99.496% Drawdown 4.300% Expectancy -0.481 Start Equity 1000000 End Equity 956869.23 Net Profit -4.313% Sharpe Ratio -5.962 Sortino Ratio -48.887 Probabilistic Sharpe Ratio 0% Loss Rate 79% Win Rate 21% Profit-Loss Ratio 1.50 Alpha 0 Beta 0 Annual Standard Deviation 0.17 Annual Variance 0.029 Information Ratio -5.638 Tracking Error 0.17 Treynor Ratio 0 Total Fees $41399.67 Estimated Strategy Capacity $0 Lowest Capacity Asset ZN YYI8PYJMKWF9 Portfolio Turnover 48524.91% Drawdown Recovery 0 |
# AMERAFRIC CAPITAL - PRODUCTION ALGORITHM
# ==============================================
# Live Machine Learning Futures Trading System
#
# This algorithm combines the robust trading protocol from the POC signal processor, with real-time ML inference.
# It loads a pickle model, processes live tick data into minute bars, generates real-time predictions,
# and executes trades with sophisticated position management and optional slippage modeling.
# import clr
# from System import *
# from System.Collections.Generic import List
# from QuantConnect import *
# from QuantConnect.Algorithm import *
from AlgorithmImports import *
# from QuantConnect.Data import *
# from QuantConnect.Data.Consolidators import *
# import pandas as pd
# from datetime import datetime, timedelta
import pickle
# import numpy as np
# import json
import random
class AmerafricBondFuturesTrader(QCAlgorithm):
"""
Production-ready ML trading algorithm for futures contracts.
Integrates live model inference with trading logic from original prototype.
"""
def Initialize(self):
"""
Initialize algorithm with ML model loading, tick data subscription, and trading parameters.
"""
# =====================================
# ALGORITHM CONFIGURATION
# =====================================
# Backtest settings - ignored during live trading
self.SetStartDate(2025, 9, 1)
self.SetEndDate(2025, 9, 3)
self.SetCash(1000000) # Initial capital
# Set brokerage model to Interactive Brokers for realistic execution
self.SetBrokerageModel(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE)
# =====================================
# TRADING WINDOW PARAMETERS
# =====================================
# Trading hours - only trade within this window
self.tw_open = time(int(self.GetParameter("tw_open_hr", 6)), int(self.GetParameter("tw_open_min", 15))) # 6:15 AM
self.tw_close = time(int(self.GetParameter("tw_close_hr", 16)), int(self.GetParameter("tw_close_min", 15))) # 4:15 PM
# =====================================
# TRADING STATE VARIABLES
# =====================================
# Position tracking (inherited from core architecture from signal processing algo)
self.current_position = None # Current position state (1=long, -1=short, None=no position)
self.current_symbol_str = None # Current symbol being traded as string (e.g., "TYZ21")
self.current_symbol_obj = None # Current QuantConnect Symbol object
# Dictionary to store discovered contract symbols
self.discovered_contracts = {}
# =====================================
# SLIPPAGE MODELING
# =====================================
# TIME SLIPPAGE TOGGLE - Set to False for live/paper trading, True for backtesting
self.add_time_slippage = bool(self.GetParameter("add_time_slippage", False))
# Dictionary to store pending orders with their scheduled execution times
self.pending_orders = {} # Format: {execution_time: {signal_data, symbol_obj, etc.}}
# Time slippage parameters (30 seconds to 2 minutes for backtesting)
self.min_slippage_seconds = int(self.GetParameter("min_slip_ss", 30))
self.max_slippage_seconds = int(self.GetParameter("max_slip_ss", 120))
# Random seed for reproducible slippage results (optional)
random.seed(42)
# =====================================
# POSITION SIZING PARAMETERS
# =====================================
# Percentage of account to trade (default 70% = 0.7)
self.pct_acct_to_trade = float(self.GetParameter("pct_acct_to_trade", 0.7))
# Fixed contract size (default 0 = use percentage)
self.fixed_contract_size = int(self.GetParameter("fixed_contract_size", 0))
# =====================================
# ML MODEL SETUP
# =====================================
# Model variables
self.ml_model = None # Loaded pickle model
self.model_loaded = False # Track if model loaded successfully
# Data processing for ML inference
self.minute_bars = {} # Store minute bar data for each contract
self.current_minute_bar = {} # Current consolidating minute bar data
self.tick_data_buffer = {} # Buffer tick data for calculations
self.last_tick_price = {} # Track last price per symbol for tick classification
# =====================================
# FUTURES CONTRACT SETUP
# =====================================
# Add the canonical futures symbol for 10-Year Treasury Note futures
self.ty_future = self.AddFuture(Futures.Financials.Y_10_TREASURY_NOTE, Resolution.TICK, extendedMarketHours=True)
self.ty_future.SetFilter(0, 360) # Wide filter to discover contracts across timeframe
# Setup tick consolidator for minute bars
self.consolidator = None # Will be initialized when we find active contracts
# =====================================
# INITIALIZATION SEQUENCE
# =====================================
# Load ML model from ObjectStore
self.LoadMLModel()
# Log initialization status
self.Log("Production Algorithm initialized successfully")
self.Log(f"ML Model loaded: {self.model_loaded}")
self.Log(f"Time slippage: {'ENABLED' if self.add_time_slippage else 'DISABLED'}")
self.Log(f"Position sizing: pct={self.pct_acct_to_trade}, fixed={self.fixed_contract_size}")
self.Log(f"Trading window: {self.tw_open} - {self.tw_close}")
# Schedule function to process pending orders (for slippage modeling)
if self.add_time_slippage:
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(seconds=10)), self.ProcessPendingOrders)
def InTradingWindow(self):
"""Check if current time is within trading window"""
ct = self.Time.time()
return self.tw_open <= ct <= self.tw_close
def LoadMLModel(self):
"""
Load the machine learning model from ObjectStore.
Expected to be a pickle file containing a trained scikit-learn pipeline.
"""
try:
# Load pickle model from ObjectStore
model_bytes = self.ObjectStore.ReadBytes("amerafric_model.pkl")
if model_bytes and len(model_bytes) > 0:
# Deserialize the pickle model
import io
model_io = io.BytesIO(model_bytes)
self.ml_model = pickle.load(model_io)
self.model_loaded = True
self.Log("ML model loaded successfully from ObjectStore")
# Log model type for verification
model_type = type(self.ml_model).__name__
self.Log(f"Model type: {model_type}")
else:
self.Log("ERROR: Could not load ML model from ObjectStore - file empty or missing")
self.model_loaded = False
except Exception as e:
self.Log(f"ERROR loading ML model: {str(e)}")
self.model_loaded = False
self.quit("Error loading ML model")
def OnData(self, data):
"""
Main data processing event. Handles futures chain discovery and tick data processing.
"""
# =====================================
# FUTURES CONTRACT DISCOVERY
# =====================================
# Process futures chains to discover and track active contracts
if data.FutureChains.Count > 0:
for chain_symbol, chain in data.FutureChains.items():
# Find the most liquid contract (front month with volume)
active_contract = self.SelectActiveContract(chain)
if active_contract is not None:
self.SetupDataConsolidation(active_contract)
# =====================================
# TICK DATA PROCESSING
# =====================================
# Process tick data for active contracts
# for symbol, tick in data.Ticks.items():
# self.ProcessTickData(symbol, tick)
for symbol, tick_list in data.Ticks.items():
for individual_tick in tick_list:
self.ProcessTickData(symbol, individual_tick) # individual_tick has .Time
def SelectActiveContract(self, chain):
"""
Select the most appropriate contract to trade from the futures chain.
Prioritizes front month contracts with sufficient volume.
Parameters:
chain: The futures chain data
Returns:
The selected contract or None if no suitable contract found
"""
# Filter for contracts with volume and reasonable expiry
viable_contracts = []
for contract in chain:
# Add contract to discovered contracts dictionary
contract_key = self.GetContractKey(contract.Symbol)
if contract_key not in self.discovered_contracts:
self.discovered_contracts[contract_key] = contract.Symbol
self.Log(f"Discovered contract: {contract_key} -> {contract.Symbol}")
# Check if contract is viable for trading
if (contract.Volume >= 0 and
contract.Expiry > self.Time and
(contract.Expiry - self.Time).days > 7): # At least 7 days to expiry
viable_contracts.append(contract)
if not viable_contracts:
return None
# Sort by expiry (closest first) and volume (highest first)
viable_contracts.sort(key=lambda x: (x.Expiry, -x.Volume))
return viable_contracts[0]
def SetupDataConsolidation(self, contract):
"""
Setup tick data consolidation into minute bars for the active contract.
Parameters:
contract: The futures contract to setup consolidation for
"""
symbol = contract.Symbol
# Skip if already setup for this symbol
if symbol.value in self.minute_bars:
return
# Initialize data structures for this contract
self.minute_bars[symbol.value] = []
self.current_minute_bar[symbol.value] = None
self.tick_data_buffer[symbol.value] = []
# Create minute consolidator for this symbol
consolidator = TickConsolidator(timedelta(minutes=1))
consolidator.DataConsolidated += lambda sender, bar: self.OnMinuteBarConsolidated(symbol, bar)
# Subscribe to tick data consolidation
self.SubscriptionManager.AddConsolidator(symbol, consolidator)
self.Log(f"Setup data consolidation for {symbol}")
def ProcessTickData(self, symbol, tick):
"""
Process individual tick data and buffer for calculations.
Parameters:
symbol: The contract symbol
tick: The tick data
"""
if symbol.value not in self.tick_data_buffer:
return
# Buffer tick data for VWAP and tick count calculations
tick_info = {
'time': tick.Time,
'price': float(tick.Price),
'volume': float(tick.Quantity) if hasattr(tick, 'Quantity') and tick.Quantity > 0 else 1,
'tick_type': self.ClassifyTick(tick) # Up, Down, or Same
}
self.tick_data_buffer[symbol.value].append(tick_info)
# Limit buffer size to prevent memory issues (keep last 1000 ticks per minute)
if len(self.tick_data_buffer[symbol.value]) > 1000:
self.tick_data_buffer[symbol.value] = self.tick_data_buffer[symbol.value][-1000:]
def ClassifyTick(self, tick):
"""
Classify tick as up, down, or same tick for tick count calculations.
Parameters:
tick: The tick data
Returns:
String: 'up', 'down', or 'same'
"""
symbol = tick.Symbol.value
price = float(tick.Price)
# Get last price for this symbol
last_price = self.last_tick_price.get(symbol)
# Update stored price
self.last_tick_price[symbol] = price
# Return classification (first tick defaults to 'same')
if last_price is None:
return 'same'
return 'up' if price > last_price else 'down' if price < last_price else 'same'
def OnMinuteBarConsolidated(self, symbol, bar):
"""
Handle newly consolidated minute bar data. This is where ML inference happens.
Parameters:
symbol: The contract symbol
bar: The consolidated minute bar
"""
try:
# Calculate additional metrics from tick data
tick_buffer = self.tick_data_buffer.get(symbol, [])
vwap, tick_counts = self.CalculateTickMetrics(tick_buffer, bar)
# Create data structure matching model's expected input format
minute_data = {
'Date': bar.Time.date(),
'Time': bar.Time.strftime('%H:%M:%S'),
'Date and Time': bar.Time,
'Symbol': self.GetContractKey(symbol),
'Open': float(bar.Open),
'High': float(bar.High),
'Low': float(bar.Low),
'Close': float(bar.Close),
'VWAP': vwap,
'Volume': float(bar.Volume),
'Up Ticks': tick_counts['up'],
'Down Ticks': tick_counts['down'],
'Same Ticks': tick_counts['same'],
'Tick Count': tick_counts['total']
}
# Store minute bar data
if symbol.value not in self.minute_bars:
self.minute_bars[symbol.value] = []
self.minute_bars[symbol.value].append(minute_data)
# Keep only recent bars (limit memory usage)
if len(self.minute_bars[symbol.value]) > 100:
self.minute_bars[symbol.value] = self.minute_bars[symbol.value][-100:]
# Clear tick buffer for next minute
self.tick_data_buffer[symbol.value] = []
# Generate ML prediction and execute trading logic
if self.model_loaded:
self.GenerateSignalAndTrade(symbol, minute_data)
# Log the new bar for debugging
self.Log(f"New minute bar: {symbol} - C:{bar.Close:.4f}, V:{bar.Volume}, VWAP:{vwap:.4f}")
except Exception as e:
self.Log(f"ERROR processing minute bar for {symbol}: {str(e)}")
def CalculateTickMetrics(self, tick_buffer, bar):
"""
Calculate VWAP and tick count metrics from tick data buffer.
Parameters:
tick_buffer: List of tick data for the minute
bar: The consolidated minute bar
Returns:
Tuple: (vwap, tick_counts_dict)
"""
if not tick_buffer:
# Fallback to bar data if no tick data available
vwap = (bar.High + bar.Low + bar.Close) / 3.0 # Approximation
tick_counts = {
'up': 1,
'down': 1,
'same': int(max(1, bar.Volume / 10)), # Approximation
'total': int(max(3, bar.Volume / 10))
}
return vwap, tick_counts
# Calculate VWAP from tick data
total_volume = sum(tick['volume'] for tick in tick_buffer)
if total_volume > 0:
vwap = sum(tick['price'] * tick['volume'] for tick in tick_buffer) / total_volume
else:
vwap = bar.Close
# Count tick types
tick_counts = {
'up': len([t for t in tick_buffer if t['tick_type'] == 'up']),
'down': len([t for t in tick_buffer if t['tick_type'] == 'down']),
'same': len([t for t in tick_buffer if t['tick_type'] == 'same']),
'total': len(tick_buffer)
}
# Ensure minimum counts
if tick_counts['total'] == 0:
tick_counts = {'up': 1, 'down': 1, 'same': 1, 'total': 3}
return vwap, tick_counts
def GenerateSignalAndTrade(self, symbol, minute_data):
"""
Generate ML prediction signal and execute trading logic.
Parameters:
symbol: The contract symbol
minute_data: Dictionary containing the minute bar data
"""
try:
# Convert minute data to DataFrame format
df = pd.DataFrame([minute_data])
# Define the exact feature columns the model was trained on
feature_columns = [
'Open', 'High', 'Low', 'Close', 'VWAP', 'Volume',
'Up Ticks', 'Down Ticks', 'Same Ticks', 'Tick Count'
]
# Select only the required columns for the model
model_input = df[feature_columns]
# Generate ML prediction using the correctly formatted input
prediction = self.ml_model.predict(model_input)
# Convert model prediction to trading signal
pred_value = prediction[0] if hasattr(prediction, '__getitem__') else prediction
trading_signal = 1 if pred_value == 1 else -1 # 1 for long, -1 for short
self.Log(f"ML Prediction: {pred_value} -> Trading Signal: {trading_signal}")
# Execute trading logic with the generated signal
self.ExecuteMLSignal(symbol, trading_signal, minute_data, prediction[0])
except Exception as e:
self.Log(f"ERROR in ML signal generation: {str(e)}")
def ExecuteMLSignal(self, symbol, signal, signal_data, prediction):
"""
Execute trading based on ML-generated signal. Integrates with core trading logic.
Parameters:
symbol: The contract symbol
signal: Trading signal (1 for long, -1 for short)
signal_data: The minute bar data that generated the signal
prediction: Raw model prediction value
"""
# Check trading window first - exit early if outside trading hours
if not self.InTradingWindow():
return
# Get contract key for position tracking
symbol_str = self.GetContractKey(symbol)
# Create unique signal identifier for processing tracking
signal_id = f"{signal_data['Date and Time']}_{symbol_str}"
self.Log(f"Processing ML signal: Time={signal_data['Date and Time']}, Symbol={symbol_str}, Signal={signal}")
# Check if we need to change position
position_change_needed = (self.current_position != signal or
self.current_symbol_str != symbol_str)
if not position_change_needed:
self.Log(f"Maintaining current position: Symbol={self.current_symbol_str}, Direction={self.current_position}")
return
# Execute with or without slippage based on configuration
if self.add_time_slippage:
# Generate random time slippage and schedule order
time_slippage = self.GenerateTimeSlippage()
execution_time = self.Time + time_slippage
# Schedule the order for future execution
self.ScheduleOrder(execution_time, signal, symbol_str, symbol, signal_id, signal_data, prediction)
self.Log(f"Order scheduled for execution at {execution_time} (slippage: {time_slippage.total_seconds()}s)")
else:
# Execute immediately without time slippage
order_details = {
'signal_direction': signal,
'signal_symbol_str': symbol_str,
'symbol_obj': symbol,
'signal_id': signal_id,
'original_signal_time': self.Time,
'vwap': signal_data['VWAP'],
'ticks': f"Up-{signal_data['Up Ticks']} Dn-{signal_data['Down Ticks']} Sm-{signal_data['Same Ticks']}",
'prediction': prediction
}
self.ExecuteScheduledOrder(order_details)
self.Log("Order executed immediately (no slippage)")
# ==============================================
# TRADING LOGIC FROM ORIGINAL SIGNAL PROCESSOR
# ==============================================
def GenerateTimeSlippage(self):
"""
Generate a random time slippage between min_slippage_seconds and max_slippage_seconds.
Returns:
A timedelta object representing the random delay
"""
slippage_seconds = random.randint(self.min_slippage_seconds, self.max_slippage_seconds)
self.Log(f"Generated time slippage: {slippage_seconds} seconds")
return timedelta(seconds=slippage_seconds)
def ScheduleOrder(self, execution_time, signal_direction, signal_symbol_str, symbol_obj, signal_id, signal_data, prediction):
"""
Schedule an order for future execution with time slippage.
"""
order_details = {
'signal_direction': signal_direction,
'signal_symbol_str': signal_symbol_str,
'symbol_obj': symbol_obj,
'signal_id': signal_id,
'original_signal_time': self.Time,
'vwap': signal_data['VWAP'],
'ticks': f"Up-{signal_data['Up Ticks']} Dn-{signal_data['Down Ticks']} Sm-{signal_data['Same Ticks']}",
'prediction': prediction
}
# Use execution time as key
execution_key = execution_time.strftime("%Y-%m-%d %H:%M:%S.%f")
self.pending_orders[execution_key] = {
'execution_time': execution_time,
'order_details': order_details
}
self.Log(f"Pending order added: {signal_id} scheduled for {execution_time}")
def ProcessPendingOrders(self):
"""
Process any pending orders that are due for execution.
"""
if not self.pending_orders:
return
current_time = self.Time
orders_to_execute = []
# Find orders that are due for execution
for execution_key, pending_order in self.pending_orders.items():
execution_time = pending_order['execution_time']
if current_time >= execution_time:
orders_to_execute.append((execution_key, pending_order))
# Execute the due orders
for execution_key, pending_order in orders_to_execute:
self.ExecuteScheduledOrder(pending_order['order_details'])
del self.pending_orders[execution_key]
def ExecuteScheduledOrder(self, order_details):
"""
Execute a scheduled order that has reached its execution time.
"""
# Check trading window before executing scheduled orders too
if not self.InTradingWindow():
self.Log("Skipping scheduled order execution - outside trading window")
return
signal_direction = order_details['signal_direction']
signal_symbol_str = order_details['signal_symbol_str']
symbol_obj = order_details['symbol_obj']
signal_id = order_details['signal_id']
original_signal_time = order_details['original_signal_time']
actual_execution_delay = self.Time - original_signal_time
self.Log(f"Executing order: {signal_id}")
if self.add_time_slippage:
self.Log(f"Execution delay: {actual_execution_delay.total_seconds()} seconds")
# Check if we need to change position
if (self.current_position is None or
self.current_position != signal_direction or
self.current_symbol_str != signal_symbol_str):
# Close existing position if any
if self.current_position is not None:
self.ClosePosition()
self.Log(f"Closed position: {self.current_symbol_str}, Direction: {self.current_position}")
# Calculate quantity to trade
if self.fixed_contract_size > 0:
# Check affordability for fixed size
max_affordable = self.CalculateFutureOrderQuantity(symbol_obj, 1.0)
if abs(max_affordable) < self.fixed_contract_size:
self.Log(f"Cannot afford fixed size {self.fixed_contract_size} - max: {abs(max_affordable)}")
return
quantity = self.fixed_contract_size
else:
quantity = self.CalculateFutureOrderQuantity(symbol_obj, self.pct_acct_to_trade)
# Check if we can afford the position
if quantity == 0:
self.Log(f"Cannot afford position for {signal_symbol_str}")
return
# Create order tag with model data
tag = f"VWAP:{order_details['vwap']:.4f}|{order_details['ticks']}|Pred:{order_details['prediction']}"
# Open new position
if signal_direction == 1:
self.MarketOrder(symbol_obj, abs(quantity), tag=tag) # Long
self.current_position = 1
self.Log(f"Opened LONG: {signal_symbol_str} quantity {abs(quantity)}")
else:
self.MarketOrder(symbol_obj, -abs(quantity), tag=tag) # Short
self.current_position = -1
self.Log(f"Opened SHORT: {signal_symbol_str} quantity {abs(quantity)}")
# Update current symbol tracking
self.current_symbol_str = signal_symbol_str
self.current_symbol_obj = symbol_obj
def CalculateFutureOrderQuantity(self, symbol, pct):
"""
Calculate the number of futures contracts to trade based on portfolio percentage.
"""
if not self.Securities.ContainsKey(symbol):
return 0
security = self.Securities[symbol]
contract_multiplier = security.SymbolProperties.ContractMultiplier
current_price = security.Price
if current_price <= 0 or contract_multiplier <= 0:
return 0
portfolio_value = self.Portfolio.TotalPortfolioValue
allocation = portfolio_value * pct
# Calculate number of contracts
contracts = int(allocation / (current_price * contract_multiplier))
return contracts
def ClosePosition(self):
"""
Close any existing positions for the current symbol.
"""
if self.current_symbol_obj is not None and self.Portfolio.Invested:
self.Liquidate(self.current_symbol_obj)
self.Log(f"Liquidated position for {self.current_symbol_str}")
def GetContractKey(self, symbol):
"""
Create a standardized key for a futures contract symbol.
"""
underlying = symbol.ID.Symbol
expiry = symbol.ID.Date
# Convert month to letter code
month_codes = {1: 'F', 2: 'G', 3: 'H', 4: 'J', 5: 'K', 6: 'M',
7: 'N', 8: 'Q', 9: 'U', 10: 'V', 11: 'X', 12: 'Z'}
month_code = month_codes.get(expiry.month, '')
# Convert ZN to TY for consistency
if underlying == "ZN":
underlying = "TY"
# Format as TYZ21
year_str = str(expiry.year)[-2:]
return f"{underlying}{month_code}{year_str}"
def OnEndOfAlgorithm(self):
"""
Called at the end of the algorithm.
"""
self.Log("Production Algorithm completed")
self.Log(f"Final portfolio value: {self.Portfolio.TotalPortfolioValue}")
if self.Portfolio.Invested:
self.Log(f"Final position: {self.current_position} in {self.current_symbol_str}")
if self.pending_orders:
self.Log(f"Unexecuted pending orders: {len(self.pending_orders)}")
for execution_key, pending_order in self.pending_orders.items():
order_details = pending_order['order_details']
self.Log(f" Unexecuted: {order_details['signal_id']} at {pending_order['execution_time']}")