| Overall Statistics |
|
Total Orders 3921 Average Win 0.16% Average Loss -0.14% Compounding Annual Return -72.874% Drawdown 73.300% Expectancy -0.480 Start Equity 100000 End Equity 27025.66 Net Profit -72.974% Sharpe Ratio -2.738 Sortino Ratio -2.166 Probabilistic Sharpe Ratio 0.000% Loss Rate 76% Win Rate 24% Profit-Loss Ratio 1.16 Alpha 0 Beta 0 Annual Standard Deviation 0.233 Annual Variance 0.054 Information Ratio -2.502 Tracking Error 0.233 Treynor Ratio 0 Total Fees $9674.99 Estimated Strategy Capacity $6800000.00 Lowest Capacity Asset ZN YOFW7JXIF9D1 Portfolio Turnover 1948.82% Drawdown Recovery 0 |
# QUANTCONNECT ALGORITHM - Bond Futures ML Trading
# ===============================================
# Minimal algorithm that uses ONNX model to trade bond futures based on ML predictions
import numpy as np
import pandas as pd
import json
from AlgorithmImports import *
import onnxruntime as ort
class BondFuturesMLTrader(QCAlgorithm):
def Initialize(self):
"""Initialize algorithm with ML model and trading parameters"""
# Algorithm settings
self.SetStartDate(2024, 1, 1)
self.SetEndDate(2024, 12, 31)
self.SetCash(100000)
# Set brokerage model
self.SetBrokerageModel(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE)
# Add Treasury futures
self.ty_future = self.AddFuture(Futures.Financials.Y_10_TREASURY_NOTE, Resolution.MINUTE)
self.ty_future.SetFilter(0, 90) # Contracts up to 90 days out
# Trading state
self.current_contract = None
self.current_position = None # 1 = long, -1 = short, None = no position
# Model variables
self.onnx_session = None
self.feature_columns = []
# Data storage for feature calculation
self.price_data = {} # Store recent price data for each contract
self.max_history_length = 20 # Keep last 20 bars for moving averages
# Load ML model and metadata
self.LoadModel()
# Position sizing
self.position_size_pct = 0.5 # Use 50% of portfolio per trade
self.Log("Algorithm initialized with ML model")
def LoadModel(self):
"""Load the ONNX model and metadata from ObjectStore"""
try:
# Load ONNX model
onnx_bytes = bytes(self.ObjectStore.read_bytes("bond_futures_model.onnx"))
if onnx_bytes:
self.onnx_session = ort.InferenceSession(onnx_bytes)
self.Log("ONNX model loaded successfully")
else:
self.Log("ERROR: Could not load ONNX model from ObjectStore")
return
# Load metadata
metadata_str = self.ObjectStore.Read("model_metadata.json")
if metadata_str:
metadata = json.loads(metadata_str)
self.feature_columns = metadata['feature_columns']
self.Log(f"Model metadata loaded. Features: {self.feature_columns}")
self.Log(f"Model accuracy: {metadata.get('model_accuracy', 'N/A')}")
else:
self.Log("ERROR: Could not load model metadata")
except Exception as e:
self.Log(f"ERROR loading model: {str(e)}")
def OnData(self, data):
"""Main data processing and trading logic"""
# Process futures chains to get current contracts
for chain_symbol, chain in data.FutureChains.items():
# Get the front month contract (closest expiry with sufficient volume)
contracts = [contract for contract in chain if contract.Volume > 0]
if not contracts:
continue
# Sort by expiry and take the front month
contracts = sorted(contracts, key=lambda x: x.Expiry)
front_contract = contracts[0]
# Update price data for this contract
self.UpdatePriceData(front_contract)
# Generate prediction if we have enough data
prediction = self.GeneratePrediction(front_contract)
if prediction is not None:
self.ExecuteTrading(front_contract, prediction)
def UpdatePriceData(self, contract):
"""Update historical price data for feature calculation"""
symbol = contract.Symbol
# Initialize data storage for new contracts
if symbol not in self.price_data:
self.price_data[symbol] = {
'time': [],
'open': [],
'high': [],
'low': [],
'close': []
}
contractBar = self.current_slice[symbol]
# Add current bar data
data_dict = self.price_data[symbol]
data_dict['time'].append(contractBar.Time)
data_dict['open'].append(float(contractBar.Open))
data_dict['high'].append(float(contractBar.High))
data_dict['low'].append(float(contractBar.Low))
data_dict['close'].append(float(contractBar.Close))
# Keep only recent data (limit memory usage)
for key in data_dict:
if len(data_dict[key]) > self.max_history_length:
data_dict[key] = data_dict[key][-self.max_history_length:]
def CalculateFeatures(self, symbol):
"""Calculate ML features from price data"""
if symbol not in self.price_data:
return None
data = self.price_data[symbol]
# Need at least 10 bars for moving averages
if len(data['close']) < 10:
return None
try:
# Convert to numpy arrays for easier calculation
opens = np.array(data['open'])
highs = np.array(data['high'])
lows = np.array(data['low'])
closes = np.array(data['close'])
# Calculate features (using the most recent values)
current_close = closes[-1]
current_open = opens[-1]
current_high = highs[-1]
current_low = lows[-1]
# Price-based features
price_range = (current_high - current_low) / current_close if current_close != 0 else 0
oc_ratio = current_open / current_close if current_close != 0 else 1
hl_ratio = current_high / current_low if current_low != 0 else 1
# Moving averages
sma_5 = np.mean(closes[-5:]) if len(closes) >= 5 else current_close
sma_10 = np.mean(closes[-10:]) if len(closes) >= 10 else current_close
price_vs_sma5 = current_close / sma_5 if sma_5 != 0 else 1
price_vs_sma10 = current_close / sma_10 if sma_10 != 0 else 1
# Volatility (5-period rolling std of returns)
if len(closes) >= 6:
returns = np.diff(closes[-6:]) / closes[-6:-1]
volatility_5 = np.std(returns)
else:
volatility_5 = 0
# Create feature array in the same order as training
features = np.array([
price_range,
oc_ratio,
hl_ratio,
price_vs_sma5,
price_vs_sma10,
volatility_5
]).reshape(1, -1) # Reshape for ONNX input
return features
except Exception as e:
self.Log(f"ERROR calculating features: {str(e)}")
return None
def GeneratePrediction(self, contract):
"""Generate ML prediction for trading direction"""
if self.onnx_session is None:
return None
# Calculate features
features = self.CalculateFeatures(contract.Symbol)
if features is None:
return None
try:
# Make prediction using ONNX model
input_name = self.onnx_session.get_inputs()[0].name
prediction = self.onnx_session.run(None, {input_name: features.astype(np.float32)})
# Get predicted class (0 or 1) and probability
predicted_class = prediction[0][0] # First output is usually the prediction
# Convert to trading signal: 1 = long, -1 = short
trading_signal = 1 if predicted_class == 1 else -1
self.Log(f"ML Prediction for {contract.Symbol}: {predicted_class} -> Signal: {trading_signal}")
return trading_signal
except Exception as e:
self.Log(f"ERROR making prediction: {str(e)}")
return None
def ExecuteTrading(self, contract, signal):
"""Execute trades based on ML signal"""
# Update current contract if it changed
if self.current_contract != contract.Symbol:
# Close any existing position in old contract
if self.current_contract is not None and self.Portfolio.Invested:
self.Liquidate(self.current_contract)
self.Log(f"Closed position in {self.current_contract}")
self.current_contract = contract.Symbol
# Check if we need to change position
if self.current_position != signal:
# Close existing position if any
if self.Portfolio.Invested:
self.Liquidate(contract.Symbol)
self.Log(f"Liquidated existing position")
# Calculate position size
quantity = self.CalculatePositionSize(contract)
if quantity == 0:
self.Log("Cannot calculate position size")
return
# Open new position
if signal == 1:
# Long position
self.MarketOrder(contract.Symbol, abs(quantity))
self.current_position = 1
self.Log(f"Opened LONG position: {contract.Symbol} quantity {abs(quantity)}")
elif signal == -1:
# Short position
self.MarketOrder(contract.Symbol, -abs(quantity))
self.current_position = -1
self.Log(f"Opened SHORT position: {contract.Symbol} quantity {abs(quantity)}")
# Log the action
self.Log(f"Position changed based on ML signal: {signal}")
def CalculatePositionSize(self, contract):
"""Calculate number of contracts to trade"""
try:
security = self.Securities[contract.Symbol]
contract_multiplier = security.SymbolProperties.ContractMultiplier
current_price = security.Price
if current_price <= 0 or contract_multiplier <= 0:
return 0
# Calculate position size based on percentage of portfolio
portfolio_value = self.Portfolio.TotalPortfolioValue
allocation = portfolio_value * self.position_size_pct
# Number of contracts = allocation / (price * multiplier)
contracts = int(allocation / (current_price * contract_multiplier))
return max(1, contracts) # At least 1 contract
except Exception as e:
self.Log(f"ERROR calculating position size: {str(e)}")
return 0
def OnEndOfAlgorithm(self):
"""Called when algorithm ends"""
self.Log(f"Algorithm completed. Final portfolio value: {self.Portfolio.TotalPortfolioValue}")
if self.Portfolio.Invested:
self.Log(f"Final position: {self.current_position} in {self.current_contract}")