| Overall Statistics |
|
Total Orders 42423 Average Win 0.03% Average Loss -0.01% Compounding Annual Return 635.112% Drawdown 2.300% Expectancy 0.901 Start Equity 1000000 End Equity 7433611.28 Net Profit 643.361% Sharpe Ratio 26.164 Sortino Ratio 145.519 Probabilistic Sharpe Ratio 100% Loss Rate 51% Win Rate 49% Profit-Loss Ratio 2.88 Alpha 0 Beta 0 Annual Standard Deviation 0.114 Annual Variance 0.013 Information Ratio 26.247 Tracking Error 0.114 Treynor Ratio 0 Total Fees $1451337.42 Estimated Strategy Capacity $35000000000.00 Lowest Capacity Asset ZN Y4H3VAWAKI79 Portfolio Turnover 7560.48% |
# AMERAFRIC CAPITAL
# -------------------------------------------
# Futures Signal Trader - Executes trades based on pre-generated signals from a CSV file
# WITH TIME SLIPPAGE MODELING
#
# Implementation follows the Technical Specification for QuantConnect Futures Signal Trader
# Added: Random time slippage between 30 seconds and 2 minutes to simulate realistic execution delays
#
# The algorithm:
# 1. Loads trading signals from a CSV file stored in ObjectStore
# 2. Adds random time slippage (30s-2min) before executing trades
# 3. Executes trades based on these signals at the appropriate times (plus slippage)
# 4. Always maintains a market position (long or short)
# 5. Trades the specific futures contracts as specified in the signals
import clr
from System import *
from System.Collections.Generic import List
from QuantConnect import *
from QuantConnect.Algorithm import *
from AlgorithmImports import *
from QuantConnect.Data import *
import pandas as pd
from datetime import datetime, timedelta
import csv
import io
import json
import random
class FuturesSignalTrader(QCAlgorithm):
def Initialize(self):
"""
Initialize algorithm parameters, load CSV data and set up the initial state.
"""
# Algorithm settings
self.SetStartDate(2021, 9, 27) # Set to accommodate the sample data timeframe
self.SetEndDate(2022, 9, 28) # Set a reasonable end date
# self.SetEndDate(2024, 5, 15) # Set a reasonable end date
self.SetCash(1000000) # Set initial capital
# Set brokerage model to Interactive Brokers
self.SetBrokerageModel(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE)
# Initialize state tracking variables
self.current_position = None # Current position state (1=long, -1=short, None=no position)
self.current_symbol_str = None # Current symbol being traded as string (e.g., "TYU12")
self.current_symbol_obj = None # Current QuantConnect Symbol object
self.processed_signals = set() # Track processed signals to avoid duplicates
self.signals_df = None # DataFrame to hold parsed signals
self.signal_time_window = timedelta(seconds=30) # Time window for matching signals
# Dictionary to store discovered contract symbols
self.discovered_contracts = {}
# TIME SLIPPAGE MODELING
# -----------------------------------
# TIME SLIPPAGE TOGGLE - Set to False to disable time slippage
self.add_time_slippage = False # Set to False for immediate execution
# Dictionary to store pending orders with their scheduled execution times
self.pending_orders = {} # Format: {execution_time: {signal_data, symbol_obj, etc.}}
# Time slippage parameters (30 seconds to 2 minutes)
self.min_slippage_seconds = int(self.get_parameter("min_slip_ss", 30))
self.max_slippage_seconds = self.min_slippage_seconds + 1 # modify this for slippage range
# Random seed for reproducible results (optional - remove for true randomness)
random.seed(42) # Remove this line if you want different randomness each run
# POSITION SIZING PARAMETERS
# -----------------------------------
# Percentage of account to trade (default 50% = 0.5)
self.pct_acct_to_trade = float(self.get_parameter("pct_acct_to_trade", 0.7))
# Fixed contract size (default 0 = use percentage)
self.fixed_contract_size = int(self.get_parameter("fixed_contract_size", 0))
# Add the canonical futures symbols that we might need to trade
# For 10-Year Treasury Note futures (TY)
self.ty_future = self.AddFuture(Futures.Financials.Y_10_TREASURY_NOTE, Resolution.MINUTE, extendedMarketHours=True)
self.ty_future.SetFilter(0, 360) # Wide filter to discover contracts across the timeframe
# Load signals from CSV in ObjectStore
self.signal_time_column = 'Signal Timestamp'
self.LoadSignalsFromObjectStore()
# Log initialization completed
self.Log("Algorithm initialized successfully with time slippage modeling")
if self.add_time_slippage:
self.Log(f"Time slippage ENABLED: {self.min_slippage_seconds}-{self.max_slippage_seconds} seconds")
else:
self.Log("Time slippage DISABLED: Orders execute immediately")
self.Log(f"Position sizing: pct_acct_to_trade={self.pct_acct_to_trade}, fixed_contract_size={self.fixed_contract_size}")
self.Log(f"Loaded {len(self.signals_df)} signals from CSV")
# Schedule function to check for signals every minute
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(minutes=1)), self.CheckForSignals)
# Schedule function to process pending orders every 10 seconds for more precise timing
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(seconds=10)), self.ProcessPendingOrders)
def LoadSignalsFromObjectStore(self):
"""
Load and parse the signals CSV file from ObjectStore.
"""
try:
# Attempt to load the CSV file from ObjectStore
csv_string = self.ObjectStore.Read("Signal_US_Treasury_10Y_Futures_subset.csv")
if csv_string is None or csv_string == "":
self.Log("ERROR: CSV file not found in ObjectStore or is empty")
return
# Parse CSV data into a DataFrame
csv_io = io.StringIO(csv_string)
self.signals_df = pd.read_csv(csv_io, parse_dates=[self.signal_time_column])
# Validate the required columns exist
required_columns = [self.signal_time_column, 'Symbol', 'pred_ens']
missing_columns = [col for col in required_columns if col not in self.signals_df.columns]
if missing_columns:
self.Log(f"ERROR: CSV missing required columns: {missing_columns}")
return
# Sort signals by timestamp
self.signals_df = self.signals_df.sort_values(self.signal_time_column)
self.Log(f"Successfully loaded {len(self.signals_df)} signals from CSV")
# Log a few sample signals for verification
for idx, row in self.signals_df.head(3).iterrows():
self.Log(f"Sample signal: Time={row[self.signal_time_column]}, Symbol={row['Symbol']}, Direction={row['pred_ens']}")
except Exception as e:
self.Log(f"ERROR loading signals from ObjectStore: {str(e)}")
def OnData(self, data):
"""
OnData event handler. This is the primary event for algorithm data processing.
We use this to discover futures contracts from the chains.
"""
# Look for futures chains to discover specific contracts
if data.FutureChains.Count > 0:
# Process each futures chain to discover contracts
for chain_symbol, chain in data.FutureChains.items():
# Check each contract in the chain
for contract in chain:
# Extract the contract's underlying symbol, month, and year for identification
contract_key = self.GetContractKey(contract.Symbol)
if contract_key not in self.discovered_contracts:
self.discovered_contracts[contract_key] = contract.Symbol
self.Log(f"Discovered contract: {contract_key} -> {contract.Symbol}")
def GetContractKey(self, symbol):
"""
Create a standardized key for a futures contract symbol that matches the format in our CSV.
E.g., "TYZ21" for December 2021 10-Year Treasury Note futures.
Parameters:
symbol: The QuantConnect Symbol object
Returns:
A string key in the format used in the signals CSV
"""
# Extract underlying, year, and month from the Symbol
underlying = symbol.ID.Symbol
expiry = symbol.ID.Date
# Convert month to letter code (Z for December, etc.)
month_codes = {1: 'F', 2: 'G', 3: 'H', 4: 'J', 5: 'K', 6: 'M',
7: 'N', 8: 'Q', 9: 'U', 10: 'V', 11: 'X', 12: 'Z'}
month_code = month_codes.get(expiry.month, '')
# For 10-Year T-Note futures, QC uses "ZN" but our CSV might use "TY"
if underlying == "ZN":
underlying = "TY"
# Format as TYZ21 (product + month code + 2-digit year)
year_str = str(expiry.year)[-2:] # Last 2 digits of year
return f"{underlying}{month_code}{year_str}"
def GenerateTimeSlippage(self):
"""
Generate a random time slippage between min_slippage_seconds and max_slippage_seconds.
Returns:
A timedelta object representing the random delay
"""
# Generate random number of seconds within the specified range
slippage_seconds = random.randint(self.min_slippage_seconds, self.max_slippage_seconds)
self.Log(f"Generated time slippage: {slippage_seconds} seconds")
return timedelta(seconds=slippage_seconds)
def CheckForSignals(self):
"""
Check if there are any signals that match the current algorithm time.
Instead of executing immediately, schedule the order with time slippage.
This is scheduled to run every minute.
"""
if self.signals_df is None or len(self.signals_df) == 0:
return
current_time = self.Time
# Find signal for current time
signal = self.FindSignalForTime(current_time)
if signal is None:
return
# Create a unique identifier for this signal
signal_id = f"{signal[self.signal_time_column]}_{signal['Symbol']}"
# Check if we've already processed this signal
if signal_id in self.processed_signals:
self.Log(f"Signal already processed: {signal_id}")
return
# Mark as processed to avoid duplicate processing
self.processed_signals.add(signal_id)
# Get signal direction and symbol
signal_direction = int(signal['pred_ens']) # 1 for long, -1 for short
signal_symbol_str = signal['Symbol']
self.Log(f"New signal detected: Time={signal[self.signal_time_column]}, Symbol={signal_symbol_str}, Direction={signal_direction}")
# Get the QuantConnect Symbol object for this contract
symbol_obj = self.GetFuturesSymbol(signal_symbol_str)
if symbol_obj is None:
self.Log(f"ERROR: Could not resolve symbol: {signal_symbol_str}. Known contracts: {list(self.discovered_contracts.keys())}")
return
if self.add_time_slippage:
# Generate random time slippage and schedule order
time_slippage = self.GenerateTimeSlippage()
execution_time = current_time + time_slippage
# Schedule the order for future execution
self.ScheduleOrder(execution_time, signal_direction, signal_symbol_str, symbol_obj, signal_id)
self.Log(f"Order scheduled for execution at {execution_time} (slippage: {time_slippage.total_seconds()} seconds)")
else:
# Execute immediately without time slippage
order_details = {
'signal_direction': signal_direction,
'signal_symbol_str': signal_symbol_str,
'symbol_obj': symbol_obj,
'signal_id': signal_id,
'original_signal_time': current_time
}
self.ExecuteScheduledOrder(order_details)
self.Log(f"Order executed immediately (no time slippage)")
def ScheduleOrder(self, execution_time, signal_direction, signal_symbol_str, symbol_obj, signal_id):
"""
Schedule an order for future execution with time slippage.
Parameters:
execution_time: When the order should be executed
signal_direction: 1 for long, -1 for short
signal_symbol_str: String representation of the symbol
symbol_obj: QuantConnect Symbol object
signal_id: Unique identifier for the signal
"""
# Store the pending order details
order_details = {
'signal_direction': signal_direction,
'signal_symbol_str': signal_symbol_str,
'symbol_obj': symbol_obj,
'signal_id': signal_id,
'original_signal_time': self.Time
}
# Use execution time as key (convert to string for dictionary key)
execution_key = execution_time.strftime("%Y-%m-%d %H:%M:%S.%f")
self.pending_orders[execution_key] = {
'execution_time': execution_time,
'order_details': order_details
}
self.Log(f"Pending order added: {signal_id} scheduled for {execution_time}")
def ProcessPendingOrders(self):
"""
Process any pending orders that are due for execution.
This is scheduled to run every 10 seconds for more precise timing.
"""
if not self.pending_orders:
return
current_time = self.Time
orders_to_execute = []
# Find orders that are due for execution
for execution_key, pending_order in self.pending_orders.items():
execution_time = pending_order['execution_time']
if current_time >= execution_time:
orders_to_execute.append((execution_key, pending_order))
# Execute the due orders
for execution_key, pending_order in orders_to_execute:
self.ExecuteScheduledOrder(pending_order['order_details'])
# Remove the executed order from pending orders
del self.pending_orders[execution_key]
def ExecuteScheduledOrder(self, order_details):
"""
Execute a scheduled order that has reached its execution time.
Parameters:
order_details: Dictionary containing the order details
"""
signal_direction = order_details['signal_direction']
signal_symbol_str = order_details['signal_symbol_str']
symbol_obj = order_details['symbol_obj']
signal_id = order_details['signal_id']
original_signal_time = order_details['original_signal_time']
actual_execution_delay = self.Time - original_signal_time
self.Log(f"Executing delayed order: {signal_id}")
self.Log(f"Actual execution delay: {actual_execution_delay.total_seconds()} seconds")
# Check if we need to change position
if self.current_position is None or self.current_position != signal_direction or self.current_symbol_str != signal_symbol_str:
# Close existing position if any
if self.current_position is not None:
self.ClosePosition()
self.Log(f"Closed position: {self.current_symbol_str}, Direction: {self.current_position}")
# Calculate quantity to trade
if self.fixed_contract_size > 0:
# Check if we can afford the fixed contract size
max_affordable = self.CalculateFutureOrderQuantity(symbol_obj, 1.0) # Max we can afford
if abs(max_affordable) < self.fixed_contract_size:
self.Log(f"Cannot afford fixed contract size {self.fixed_contract_size} for {signal_symbol_str} - max affordable: {abs(max_affordable)}")
return
quantity = self.fixed_contract_size
else:
quantity = self.CalculateFutureOrderQuantity(symbol_obj, (self.pct_acct_to_trade))
# Check if we can afford the position
if quantity == 0:
self.Log(f"Cannot afford position for {signal_symbol_str} - insufficient account value")
return
# Open new position
if signal_direction == 1:
self.MarketOrder(symbol_obj, abs(quantity)) # Long
self.current_position = 1
self.Log(f"Opened LONG position: {signal_symbol_str} ({symbol_obj}) quantity {abs(quantity)} with {actual_execution_delay.total_seconds()}s delay")
else:
self.MarketOrder(symbol_obj, -abs(quantity)) # Short
self.current_position = -1
self.Log(f"Opened SHORT position: {signal_symbol_str} ({symbol_obj}) quantity {abs(quantity)} with {actual_execution_delay.total_seconds()}s delay")
# Update current symbol tracking
self.current_symbol_str = signal_symbol_str
self.current_symbol_obj = symbol_obj
# Log the action taken
self.Log(f"Position changed with slippage: Symbol={signal_symbol_str}, Direction={signal_direction}, Quantity={abs(quantity)}, Delay={actual_execution_delay.total_seconds()}s")
else:
# No position change needed
self.Log(f"Maintaining current position (delayed signal): Symbol={self.current_symbol_str}, Direction={self.current_position}")
def CalculateFutureOrderQuantity(self, symbol, pct):
"""
Calculate the number of futures contracts to trade based on portfolio percentage,
accounting for the contract multiplier.
Parameters:
symbol: The futures contract symbol
pct: Percentage of portfolio to allocate (0.0 to 1.0)
Returns:
Number of contracts to trade (integer)
"""
if not self.Securities.ContainsKey(symbol):
return 0
security = self.Securities[symbol]
contract_multiplier = security.SymbolProperties.ContractMultiplier
current_price = security.Price
if current_price <= 0 or contract_multiplier <= 0:
return 0
portfolio_value = self.Portfolio.TotalPortfolioValue
allocation = portfolio_value * pct
# Calculate number of contracts: allocation / (price * multiplier)
contracts = int(allocation / (current_price * contract_multiplier))
return contracts
def FindSignalForTime(self, target_time):
"""
Find a signal that matches the current algorithm time within the acceptable time window.
Parameters:
target_time: The current algorithm time to match against
Returns:
The matching signal row as a dict, or None if no match found
"""
# Calculate the time range to look for signals
time_range_start = target_time - self.signal_time_window
time_range_end = target_time + self.signal_time_window
# Convert to pandas datetime for comparison
target_time_pd = pd.Timestamp(target_time.year, target_time.month, target_time.day,
target_time.hour, target_time.minute, target_time.second)
time_range_start_pd = pd.Timestamp(time_range_start.year, time_range_start.month, time_range_start.day,
time_range_start.hour, time_range_start.minute, time_range_start.second)
time_range_end_pd = pd.Timestamp(time_range_end.year, time_range_end.month, time_range_end.day,
time_range_end.hour, time_range_end.minute, time_range_end.second)
# Find signals within the time range
matching_signals = self.signals_df[
(self.signals_df[self.signal_time_column] >= time_range_start_pd) &
(self.signals_df[self.signal_time_column] <= time_range_end_pd)
]
if len(matching_signals) == 0:
return None
# Get the first matching signal
return matching_signals.iloc[0].to_dict()
def GetFuturesSymbol(self, symbol_str):
"""
Get the QuantConnect Symbol object for a futures contract based on the symbol string from the CSV.
Uses the discovered contracts from OnData to find the matching QuantConnect contract.
Parameters:
symbol_str: The futures symbol string from CSV (e.g., "TYZ21")
Returns:
The corresponding QuantConnect Symbol object, or None if not found
"""
# Check if we've already discovered this contract
if symbol_str in self.discovered_contracts:
return self.discovered_contracts[symbol_str]
# If not yet discovered, check if we can parse it and find a similar contract
try:
# For TY futures specifically, try to parse the month and year
if len(symbol_str) >= 4 and symbol_str.startswith("TY"):
# Example: TYZ21 -> product="TY", month_code="Z" (December), year="21"
product = symbol_str[:2] # TY
month_code = symbol_str[2] # Z
year_str = symbol_str[3:] # 21
# Convert month code to month number
month_codes_to_num = {'F': 1, 'G': 2, 'H': 3, 'J': 4, 'K': 5, 'M': 6,
'N': 7, 'Q': 8, 'U': 9, 'V': 10, 'X': 11, 'Z': 12}
month = month_codes_to_num.get(month_code.upper())
# Parse year (assuming 20xx for simplicity)
if len(year_str) == 2:
year = 2000 + int(year_str)
else:
year = int(year_str)
# Check if we have any contracts for this product with matching month/year
for contract_key, symbol in self.discovered_contracts.items():
contract_date = symbol.ID.Date
if contract_date.year == year and contract_date.month == month:
self.Log(f"Found similar contract for {symbol_str}: {contract_key} -> {symbol}")
# Cache this mapping for future lookups
self.discovered_contracts[symbol_str] = symbol
return symbol
self.Log(f"No matching contract found for {symbol_str} with month={month}, year={year}")
return None
else:
self.Log(f"Unsupported symbol format: {symbol_str}")
return None
except Exception as e:
self.Log(f"ERROR parsing symbol {symbol_str}: {str(e)}")
return None
def ClosePosition(self):
"""
Close any existing positions for the current symbol.
"""
if self.current_symbol_obj is not None and self.Portfolio.Invested:
self.Liquidate(self.current_symbol_obj)
self.Log(f"Liquidated position for {self.current_symbol_str}")
def OnEndOfAlgorithm(self):
"""
Called at the end of the algorithm. Log any pending orders that weren't executed.
"""
if self.pending_orders:
self.Log(f"Algorithm ended with {len(self.pending_orders)} pending orders:")
for execution_key, pending_order in self.pending_orders.items():
order_details = pending_order['order_details']
self.Log(f" Unexecuted: {order_details['signal_id']} scheduled for {pending_order['execution_time']}")