| Overall Statistics |
|
Total Orders 319 Average Win 0.03% Average Loss -0.03% Compounding Annual Return -99.908% Drawdown 3.900% Expectancy -0.717 Start Equity 100000 End Equity 96162.72 Net Profit -3.837% Sharpe Ratio -4.028 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 86% Win Rate 14% Profit-Loss Ratio 1.04 Alpha 0 Beta 0 Annual Standard Deviation 0.23 Annual Variance 0.053 Information Ratio -4.018 Tracking Error 0.23 Treynor Ratio 0 Total Fees $787.93 Estimated Strategy Capacity $1300000000.00 Lowest Capacity Asset ZN XUIP59QUPVS5 Portfolio Turnover 14406.36% |
# AMERAFRIC CAPITAL
# -------------------------------------------
# Futures Signal Trader - Executes trades based on pre-generated signals from a CSV file
#
# Implementation follows the Technical Specification for QuantConnect Futures Signal Trader
#
# The algorithm:
# 1. Loads trading signals from a CSV file stored in ObjectStore
# 2. Executes trades based on these signals at the appropriate times
# 3. Always maintains a market position (long or short)
# 4. Trades the specific futures contracts as specified in the signals
import clr
from System import *
from System.Collections.Generic import List
from QuantConnect import *
from QuantConnect.Algorithm import *
from AlgorithmImports import *
from QuantConnect.Data import *
import pandas as pd
from datetime import datetime, timedelta
import csv
import io
import json
class FuturesSignalTrader(QCAlgorithm):
def Initialize(self):
"""
Initialize algorithm parameters, load CSV data and set up the initial state.
"""
# Algorithm settings
self.SetStartDate(2021, 9, 27) # Set to accommodate the sample data timeframe
self.SetEndDate(2021, 9, 28) # Set a reasonable end date
self.SetCash(100000) # Set initial capital
# Set brokerage model to Interactive Brokers
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage)
# Remove the incorrect brokerage model line
# self.set_brokerage_model(BrokerageName.TRADIER_BROKERAGE)
# Initialize state tracking variables
self.current_position = None # Current position state (1=long, -1=short, None=no position)
self.current_symbol_str = None # Current symbol being traded as string (e.g., "TYU12")
self.current_symbol_obj = None # Current QuantConnect Symbol object
self.processed_signals = set() # Track processed signals to avoid duplicates
self.signals_df = None # DataFrame to hold parsed signals
self.signal_time_window = timedelta(seconds=30) # Time window for matching signals
# Dictionary to store discovered contract symbols
self.discovered_contracts = {}
# Add the canonical futures symbols that we might need to trade
# For 10-Year Treasury Note futures (TY)
# self.ty_future = self.AddFuture(Futures.Financials.Y_10_TREASURY_NOTE, Resolution.Minute)
self.ty_future = self.AddFuture(Futures.Financials.Y_10_TREASURY_NOTE, Resolution.Minute, extendedMarketHours=True)
self.ty_future.SetFilter(0, 360) # Wide filter to discover contracts across the timeframe
# Load signals from CSV in ObjectStore
self.LoadSignalsFromObjectStore()
# Log initialization completed
self.Log("Algorithm initialized successfully")
self.Log(f"Loaded {len(self.signals_df)} signals from CSV")
# Schedule function to check for signals every minute
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(minutes=1)), self.CheckForSignals)
def LoadSignalsFromObjectStore(self):
"""
Load and parse the signals CSV file from ObjectStore.
"""
try:
# Attempt to load the CSV file from ObjectStore
csv_string = self.ObjectStore.Read("US_Treasury_10Y_Futures_subset.csv")
if csv_string is None or csv_string == "":
self.Log("ERROR: CSV file not found in ObjectStore or is empty")
return
# Parse CSV data into a DataFrame
csv_io = io.StringIO(csv_string)
self.signals_df = pd.read_csv(csv_io, parse_dates=['Date and Time'])
# Validate the required columns exist
required_columns = ['Date and Time', 'Symbol', 'pred_ens']
missing_columns = [col for col in required_columns if col not in self.signals_df.columns]
if missing_columns:
self.Log(f"ERROR: CSV missing required columns: {missing_columns}")
return
# Sort signals by timestamp
self.signals_df = self.signals_df.sort_values('Date and Time')
self.Log(f"Successfully loaded {len(self.signals_df)} signals from CSV")
# Log a few sample signals for verification
for idx, row in self.signals_df.head(3).iterrows():
self.Log(f"Sample signal: Time={row['Date and Time']}, Symbol={row['Symbol']}, Direction={row['pred_ens']}")
except Exception as e:
self.Log(f"ERROR loading signals from ObjectStore: {str(e)}")
def OnData(self, data):
"""
OnData event handler. This is the primary event for algorithm data processing.
We use this to discover futures contracts from the chains.
"""
# Look for futures chains to discover specific contracts
if data.FutureChains.Count > 0:
# Process each futures chain to discover contracts
for chain_symbol, chain in data.FutureChains.items():
# Check each contract in the chain
for contract in chain:
# Extract the contract's underlying symbol, month, and year for identification
contract_key = self.GetContractKey(contract.Symbol)
if contract_key not in self.discovered_contracts:
self.discovered_contracts[contract_key] = contract.Symbol
self.Log(f"Discovered contract: {contract_key} -> {contract.Symbol}")
def GetContractKey(self, symbol):
"""
Create a standardized key for a futures contract symbol that matches the format in our CSV.
E.g., "TYZ21" for December 2021 10-Year Treasury Note futures.
Parameters:
symbol: The QuantConnect Symbol object
Returns:
A string key in the format used in the signals CSV
"""
# Extract underlying, year, and month from the Symbol
underlying = symbol.ID.Symbol
expiry = symbol.ID.Date
# Convert month to letter code (Z for December, etc.)
month_codes = {1: 'F', 2: 'G', 3: 'H', 4: 'J', 5: 'K', 6: 'M',
7: 'N', 8: 'Q', 9: 'U', 10: 'V', 11: 'X', 12: 'Z'}
month_code = month_codes.get(expiry.month, '')
# For 10-Year T-Note futures, QC uses "ZN" but our CSV might use "TY"
if underlying == "ZN":
underlying = "TY"
# Format as TYZ21 (product + month code + 2-digit year)
year_str = str(expiry.year)[-2:] # Last 2 digits of year
return f"{underlying}{month_code}{year_str}"
def CheckForSignals(self):
"""
Check if there are any signals that match the current algorithm time.
This is scheduled to run every minute.
"""
if self.signals_df is None or len(self.signals_df) == 0:
return
current_time = self.Time
# Find signal for current time
signal = self.FindSignalForTime(current_time)
if signal is None:
return
# Create a unique identifier for this signal
signal_id = f"{signal['Date and Time']}_{signal['Symbol']}"
# Check if we've already processed this signal
if signal_id in self.processed_signals:
self.Log(f"Signal already processed: {signal_id}")
return
# Mark as processed to avoid duplicate execution
self.processed_signals.add(signal_id)
# Get signal direction and symbol
signal_direction = int(signal['pred_ens']) # 1 for long, -1 for short
signal_symbol_str = signal['Symbol']
self.Log(f"Processing new signal: Time={signal['Date and Time']}, Symbol={signal_symbol_str}, Direction={signal_direction}")
# Get the QuantConnect Symbol object for this contract
symbol_obj = self.GetFuturesSymbol(signal_symbol_str)
if symbol_obj is None:
self.Log(f"ERROR: Could not resolve symbol: {signal_symbol_str}. Known contracts: {list(self.discovered_contracts.keys())}")
return
# Check if we need to change position
if self.current_position is None or self.current_position != signal_direction or self.current_symbol_str != signal_symbol_str:
# Close existing position if any
if self.current_position is not None:
self.ClosePosition()
self.Log(f"Closed position: {self.current_symbol_str}, Direction: {self.current_position}")
# Open new position
if signal_direction == 1:
self.MarketOrder(symbol_obj, 1) # Long
self.current_position = 1
self.Log(f"Opened LONG position: {signal_symbol_str} ({symbol_obj})")
else:
self.MarketOrder(symbol_obj, -1) # Short
self.current_position = -1
self.Log(f"Opened SHORT position: {signal_symbol_str} ({symbol_obj})")
# Update current symbol tracking
self.current_symbol_str = signal_symbol_str
self.current_symbol_obj = symbol_obj
# Log the action taken
self.Log(f"Position changed: Symbol={signal_symbol_str}, Direction={signal_direction}")
else:
# No position change needed
self.Log(f"Maintaining current position: Symbol={self.current_symbol_str}, Direction={self.current_position}")
def FindSignalForTime(self, target_time):
"""
Find a signal that matches the current algorithm time within the acceptable time window.
Parameters:
target_time: The current algorithm time to match against
Returns:
The matching signal row as a dict, or None if no match found
"""
# Calculate the time range to look for signals
time_range_start = target_time - self.signal_time_window
time_range_end = target_time + self.signal_time_window
# Convert to pandas datetime for comparison
target_time_pd = pd.Timestamp(target_time.year, target_time.month, target_time.day,
target_time.hour, target_time.minute, target_time.second)
time_range_start_pd = pd.Timestamp(time_range_start.year, time_range_start.month, time_range_start.day,
time_range_start.hour, time_range_start.minute, time_range_start.second)
time_range_end_pd = pd.Timestamp(time_range_end.year, time_range_end.month, time_range_end.day,
time_range_end.hour, time_range_end.minute, time_range_end.second)
# Find signals within the time range
matching_signals = self.signals_df[
(self.signals_df['Date and Time'] >= time_range_start_pd) &
(self.signals_df['Date and Time'] <= time_range_end_pd)
]
if len(matching_signals) == 0:
return None
# Get the first matching signal
return matching_signals.iloc[0].to_dict()
def GetFuturesSymbol(self, symbol_str):
"""
Get the QuantConnect Symbol object for a futures contract based on the symbol string from the CSV.
Uses the discovered contracts from OnData to find the matching QuantConnect contract.
Parameters:
symbol_str: The futures symbol string from CSV (e.g., "TYZ21")
Returns:
The corresponding QuantConnect Symbol object, or None if not found
"""
# Check if we've already discovered this contract
if symbol_str in self.discovered_contracts:
return self.discovered_contracts[symbol_str]
# If not yet discovered, check if we can parse it and find a similar contract
try:
# For TY futures specifically, try to parse the month and year
if len(symbol_str) >= 4 and symbol_str.startswith("TY"):
# Example: TYZ21 -> product="TY", month_code="Z" (December), year="21"
product = symbol_str[:2] # TY
month_code = symbol_str[2] # Z
year_str = symbol_str[3:] # 21
# Convert month code to month number
month_codes_to_num = {'F': 1, 'G': 2, 'H': 3, 'J': 4, 'K': 5, 'M': 6,
'N': 7, 'Q': 8, 'U': 9, 'V': 10, 'X': 11, 'Z': 12}
month = month_codes_to_num.get(month_code.upper())
# Parse year (assuming 20xx for simplicity)
if len(year_str) == 2:
year = 2000 + int(year_str)
else:
year = int(year_str)
# Check if we have any contracts for this product with matching month/year
for contract_key, symbol in self.discovered_contracts.items():
contract_date = symbol.ID.Date
if contract_date.year == year and contract_date.month == month:
self.Log(f"Found similar contract for {symbol_str}: {contract_key} -> {symbol}")
# Cache this mapping for future lookups
self.discovered_contracts[symbol_str] = symbol
return symbol
self.Log(f"No matching contract found for {symbol_str} with month={month}, year={year}")
return None
else:
self.Log(f"Unsupported symbol format: {symbol_str}")
return None
except Exception as e:
self.Log(f"ERROR parsing symbol {symbol_str}: {str(e)}")
return None
def ClosePosition(self):
"""
Close any existing positions for the current symbol.
"""
if self.current_symbol_obj is not None and self.Portfolio.Invested:
self.Liquidate(self.current_symbol_obj)
self.Log(f"Liquidated position for {self.current_symbol_str}")