| Overall Statistics |
|
Total Orders 234 Average Win 2.56% Average Loss -1.85% Compounding Annual Return 604.030% Drawdown 14.200% Expectancy 0.516 Net Profit 53.074% Sharpe Ratio 5.825 Sortino Ratio 14.645 Probabilistic Sharpe Ratio 91.843% Loss Rate 36% Win Rate 64% Profit-Loss Ratio 1.38 Alpha 0 Beta 0 Annual Standard Deviation 0.599 Annual Variance 0.359 Information Ratio 5.917 Tracking Error 0.599 Treynor Ratio 0 Total Fees $175.42 Estimated Strategy Capacity $65000000.00 Lowest Capacity Asset QQQ YGMACVCCBZDY|QQQ RIWIV7K5Z9LX Portfolio Turnover 321.23% |
from QuantConnect.Algorithm import QCAlgorithm
from QuantConnect.Data.Custom import *
from QuantConnect.Orders import *
from QuantConnect.Securities.Option import OptionPriceModels
from datetime import timedelta, datetime, date, timedelta
import csv
import pytz
import io
from io import StringIO
import pandas as pd
from QuantConnect import OptionRight
from QuantConnect import Resolution
from QuantConnect import DataNormalizationMode
from QuantConnect.Brokerages import *
from QuantConnect import SecurityType
'''
REFERENCE:
https://www.quantconnect.com/docs/v2/writing-algorithms/trading-and-orders/order-types/combo-leg-limit-orders
https://www.quantconnect.com/docs/v2/writing-algorithms/securities/asset-classes/us-equity/handling-data
'''
class SPXWTradingAlgorithm(QCAlgorithm):
def Initialize(self):
##self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage)
self.SetStartDate(2024, 1, 1)
self.SetEndDate(2024, 3, 20)
self.SetCash(10000)
self.UniverseSettings.Asynchronous = True
# spx = self.AddIndex("SPX").Symbol
# option = self.AddIndexOption(spx, "SPXW") # SPXW is the target non-standard contract
self.resolution = Resolution.Minute # TEMP
self.seconds_delta = 61 # TEMP
self.equity = self.AddEquity('QQQ', self.resolution)
self.equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
self.option = self.AddOption(self.equity.Symbol, self.resolution)
self.option.SetFilter(self.option_chain_filter)
self.symbol = self.option.Symbol
self.assets = []
#self.Schedule.On(self.DateRules.EveryDay("QQQ"), self.TimeRules.BeforeMarketClose("QQQ", 1), self.ClosePositions)
self.assets.append(self.equity.Symbol)
# trigger every 60 secs - during trading hours
# self.Schedule.On(self.DateRules.EveryDay(self.symbol),
# self.TimeRules.Every(timedelta(seconds=self.seconds_delta)),
# self.Trade)
self.is_backtest = True
self.relevant_row = None
self.sheet_url = 'https://docs.google.com/spreadsheets/d/1wwadCU8msu6FEUJt1ANoZS2qMO2MWiheARrdm7zaQlM/export?format=csv'
self.full_sheet = None
self.last_trade_date = None
# self.tz = pytz.timezone('America/New_York')
# Scheduled function to run at 3:59 pm to close any short positions if in-the-money
self.Schedule.On(self.DateRules.EveryDay(self.symbol), \
self.TimeRules.At(15, 59), \
self.close_in_the_money_shorts)
def option_chain_filter(self, option_chain):
return option_chain.IncludeWeeklys()\
.Strikes(-10, 10)\
.Expiration(timedelta(0), timedelta(1))
def retry_with_backoff(self, fn, retries=10, min_backoff=5):
x = 0
while True:
try:
return fn()
except:
if x == retries:
tprint("raise")
raise
sleep = min_backoff * x
tprint(f"sleep: {sleep}")
time.sleep(sleep)
x += 1
def download_sheet_data(self):
csv_string = self.Download(self.sheet_url)
df_sheet = pd.read_csv(StringIO(csv_string), sep=",")
return df_sheet
def fetch_sheet_data_update(self, current_time):
"""Download google sheet data and return row for the requested date"""
# csv_string = self.Download(self.sheet_url)
# df_sheet = pd.read_csv(StringIO(csv_string), sep=",")
if (self.full_sheet is None) or (not self.is_backtest):
self.full_sheet = self.retry_with_backoff(self.download_sheet_data)
self.Debug(f'Downloaded Sheet has {len(self.full_sheet)} rows')
self.full_sheet['trigger_datetime'] = self.full_sheet['Trigger Time'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d-%H:%M:%S')) # , tz=self.tz
prev_cutoff = current_time - timedelta(seconds=self.seconds_delta)
mask = self.full_sheet['trigger_datetime'].between(prev_cutoff, current_time, inclusive='left')
next_trade = self.full_sheet[mask]
if len(next_trade) == 1:
self.Debug(f'Found a trade between {prev_cutoff} and {current_time}')
return next_trade.squeeze().to_dict()
elif len(next_trade) > 1:
self.Debug(f'Multiple trades in sheet between {prev_cutoff} and {current_time}')
return
else:
self.Debug(f'No trades found in sheet')
return
def get_right_for_option_type(self, option_type):
"""Map option type strings to QC `right` [C -> 0; P -> 1]"""
if option_type =='C':
return 0
elif option_type == 'P':
return 1
else:
self.Debug("Invalid option type: " + option_type)
def get_nearest_contract(self, opt_chain, strike_threshold, option_type):
"""Select the contract with the largest strike less than the threshold for the option type """
right = self.get_right_for_option_type(option_type)
chosen_contract = None
for x in opt_chain:
if x.Right == right and x.Strike <= strike_threshold:
if chosen_contract is None or chosen_contract.Strike < strike_threshold:
chosen_contract = x
if chosen_contract is not None:
self.Debug(f"For {strike_threshold=} {option_type=}: using {chosen_contract.Strike}")
else:
self.Debug(f"Could not find any contract for {strike_threshold=} {option_type=}")
return chosen_contract
def compute_mid(self, sec):
return 0.5 * (sec.BidPrice + sec.AskPrice)
def get_side_multiplier(self, side: str) -> int:
if side in ['BUY', 'B']:
return 1
elif side in ['SELL', 'S']:
return -1
else:
self.Debug("Unknown side: " + side)
return
def create_leg(self, opt_chain, trade_details, leg_num) -> float:
assert leg_num in (1, 2) # Only two legs supported right now
contract = self.get_nearest_contract(opt_chain, trade_details[f'Strike {leg_num}'], trade_details[f'Right {leg_num}'])
if contract is None:
return None, None
mult = self.get_side_multiplier(trade_details[f'Action {leg_num}'])
contribution_to_limit = self.compute_mid(contract) * mult
leg = Leg.Create(contract.Symbol, mult) # TODO: allow for different qty across legs
return leg, contribution_to_limit
def GenerateTrade(self, slice):
if self.IsWarmingUp:
return
self.Debug(f'Triggered at {self.Time}')
optionchain = slice.OptionChains.get(self.symbol)
if optionchain is None:
self.Debug(f"Current option chain does not contain {self.symbol}. Skipping.")
return
trade_details = self.fetch_sheet_data_update(self.Time)
if trade_details is None:
return
# expiry is same as current date for 0DTE - check timezone of remote host
expiry = datetime.strptime(str(trade_details['TWS Contract Date']), '%Y%m%d')
contracts = [i for i in optionchain if i.Expiry == expiry]
if len(contracts) == 0:
self.Debug(f"Not enough option contracts for {self.symbol} and {expiry=}")
return
quantity = int(trade_details['Order Quantity'])
legs = []
limit_prc = 0
for leg_num in (1, 2):
this_leg, this_limit_prc = self.create_leg(contracts, trade_details, leg_num)
if this_leg is None:
self.Debug(f'Skipping because no leg found for {leg_num=}')
return
legs.append(this_leg)
limit_prc += this_limit_prc
self.ComboLimitOrder(legs, quantity=quantity, limitPrice=limit_prc)
self.last_trade_date = self.Time.date()
self.Debug(f'Generated order with {limit_prc=}')
# TODO: make limit price competeitive by modifying the combo order limit price every few seconds
def OnOrderEvent(self, orderEvent):
if orderEvent.Status == OrderStatus.Filled:
self.Debug("Order filled: " + str(orderEvent))
def OnData(self, slice):
if self.last_trade_date != self.Time.date():
self.GenerateTrade(slice)
else:
open_orders = self.Transactions.GetOpenOrders()
if len(open_orders) > 0:
self.refresh_order_price(open_orders) # TODO: increase frequency for refresh
def refresh_order_price(self, open_orders):
pass # TODO
def close_in_the_money_shorts(self):
"""Close any short option positions which are in-the-money"""
self.cancel_pending_orders() # To prevent any fills after this function executes
equity_price = self.Securities[self.equity.Symbol].Price
for security_holding in self.Portfolio.Values:
if security_holding.Symbol.SecurityType == SecurityType.Option and security_holding.Quantity < 0:
option_strike = security_holding.Symbol.ID.StrikePrice
option_right = security_holding.Symbol.ID.OptionRight
is_itm = (option_right == OptionRight.Call and option_strike < equity_price) or \
(option_right == OptionRight.Put and option_strike > equity_price)
if is_itm:
self.Liquidate(security_holding.Symbol) # Liquidate all positions?
self.Debug(f"Closed ITM short position for {security_holding.Symbol} to be safe")
else:
self.Debug(f"Short option position is not ITM. Phew!")
def cancel_pending_orders(self):
try:
# https://www.quantconnect.com/docs/v2/writing-algorithms/trading-and-orders/order-management/order-tickets
# Cancel if order is (New / Submitted / Partially filled)
order_id = self.Transactions.LastOrderId # assumption is that there is only one order per day
ticket = self.Transactions.GetOrderTicket(order_id)
if ticket.Status in (1, 2, 3):
ticket.Cancel()
self.Debug(f"Cancelled pending order for {self.Time.date()}")
except:
self.Debug(f"No order for {self.Time.date()} to Cancel!")