| Overall Statistics |
|
Total Orders 57 Average Win 0.61% Average Loss -0.28% Compounding Annual Return 24.576% Drawdown 1.200% Expectancy 0.403 Start Equity 100000 End Equity 101863.4 Net Profit 1.863% Sharpe Ratio 2.551 Sortino Ratio 3.82 Probabilistic Sharpe Ratio 80.620% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 2.16 Alpha -0.413 Beta 0.11 Annual Standard Deviation 0.054 Annual Variance 0.003 Information Ratio -13.934 Tracking Error 0.35 Treynor Ratio 1.244 Total Fees $121.60 Estimated Strategy Capacity $0 Lowest Capacity Asset TQQQ 32FCI8YE0RD2E|TQQQ UK280CGTCB51 Portfolio Turnover 0.45% |
#region imports
from AlgorithmImports import *
#endregion
from Initialization import SetupBaseStructure
from Alpha.Utils import Scanner, Order, Stats
from Tools import ContractUtils, Logger, Underlying
from Strategy import Leg, Position, OrderType, WorkingOrder
"""
NOTE: We can't use multiple inheritance in Python because this is a managed class. We will use composition instead so in
order to call the methods of SetupBaseStructure we'll call then using self.setup.methodName().
----------------------------------------------------------------------------------------------------------------------------------------
The base class for all the alpha models. It is used to setup the base structure of the algorithm and to run the strategies.
This class has some configuration capabilities that can be used to setup the strategies more easily by just changing the
configuration parameters.
Here are the default values for the configuration parameters:
scheduleStartTime: time(9, 30, 0)
scheduleStopTime: None
scheduleFrequency: timedelta(minutes = 5)
maxActivePositions: 1
dte: 0
dteWindow: 0
----------------------------------------------------------------------------------------------------------------------------------------
The workflow of the algorithm is the following:
`Update` method gets called every minute
- If the market is closed, the algorithm exits
- If algorithm is warming up, the algorithm exits
- The Scanner class is used to filter the option chain
- If the chain is empty, the algorithm exits
- The CreateInsights method is called
- Inside the GetOrder method is called
"""
class Base(AlphaModel):
# Internal counter for all the orders
orderCount = 0
DEFAULT_PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution
# (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 30, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": None, # time(13, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes=5),
# Minimum time distance between opening two consecutive trades
"minimumTradeScheduleDistance": timedelta(days=1),
# If True, the order is not placed if the legs are already part of an existing position.
"checkForDuplicatePositions": True,
# Maximum number of open positions at any given time
"maxActivePositions": 1,
# Maximum quantity used to scale each position. If the target premium cannot be reached within this
# quantity (i.e. premium received is too low), the position is not going to be opened
"maxOrderQuantity": 1,
# If True, the order is submitted as long as it does not exceed the maxOrderQuantity.
"validateQuantity": True,
# Days to Expiration
"dte": 0,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 0,
# DTE Threshold. This is ignored if self.dte < self.dteThreshold
"dteThreshold": 21,
# Controls whether to use the furthest (True) or the earliest (False) expiration date when multiple expirations are available in the chain
"useFurthestExpiry": True,
# Controls whether to consider the DTE of the last closed position when opening a new one:
# If True, the Expiry date of the new position is selected such that the open DTE is the nearest to the DTE of the closed position
"dynamicDTESelection": False,
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM strike for each available expiration
"nStrikesLeft": 200, # 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesRight": 200, # 200
# Controls what happens when an open position reaches/crosses the dteThreshold ( -> DTE(openPosition) <= dteThreshold)
# - If True, the position is closed as soon as the dteThreshold is reached, regardless of whether the position is profitable or not
# - If False, once the dteThreshold is reached, the position is closed as soon as it is profitable
"forceDteThreshold": False,
# DIT Threshold. This is ignored if self.dte < self.ditThreshold
"ditThreshold": None,
"hardDitThreshold": None,
# Controls what happens when an open position reaches/crosses the ditThreshold ( -> DIT(openPosition) >= ditThreshold)
# - If True, the position is closed as soon as the ditThreshold is reached, regardless of whether the position is profitable or not
# - If False, once the ditThreshold is reached, the position is closed as soon as it is profitable
# - If self.hardDitThreashold is set, the position is closed once the hardDitThreashold is
# crossed, regardless of whether forceDitThreshold is True or False
"forceDitThreshold": False,
# Slippage used to set Limit orders
"slippage": 0.0,
# Used when validateBidAskSpread = True. if the ratio between the bid-ask spread and the
# mid-price is higher than this parameter, the order is not executed
"bidAskSpreadRatio": 0.3,
# If True, the order mid-price is validated to make sure the Bid-Ask spread is not too wide.
# - The order is not submitted if the ratio between Bid-Ask spread of the entire order and its mid-price is more than self.bidAskSpreadRatio
"validateBidAskSpread": False,
# Control whether to allow multiple positions to be opened for the same Expiration date
"allowMultipleEntriesPerExpiry": False,
# Controls whether to include details on each leg (open/close fill price and descriptive statistics about mid-price, Greeks, and IV)
"includeLegDetails": False,
# The frequency (in minutes) with which the leg details are updated (used only if includeLegDetails = True)
"legDatailsUpdateFrequency": 30,
# Controls whether to track the details on each leg across the life of the trade
"trackLegDetails": False,
# Controls which greeks are included in the output log
# "greeksIncluded": ["Delta", "Gamma", "Vega", "Theta", "Rho", "Vomma", "Elasticity"],
"greeksIncluded": [],
# Controls whether to compute the greeks for the strategy. If True, the greeks will be computed and stored in the contract under BSMGreeks.
"computeGreeks": False,
# The time (on expiration day) at which any position that is still open will closed
"marketCloseCutoffTime": time(15, 45, 0),
# Limit Order Management
"useLimitOrders": True,
# Adjustment factor applied to the Mid-Price to set the Limit Order:
# - Credit Strategy:
# Adj = 0.3 --> sets the Limit Order price 30% higher than the current Mid-Price
# - Debit Strategy:
# Adj = -0.2 --> sets the Limit Order price 20% lower than the current Mid-Price
"limitOrderRelativePriceAdjustment": 0,
# Set expiration for Limit orders. This tells us how much time a limit order will stay in pending mode before it gets a fill.
"limitOrderExpiration": timedelta(hours=8),
# Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified
# Unless you know that your price target can get a fill, it is advisable to use a relative adjustment or you may never get your order filled
# - Credit Strategy:
# AbsolutePrice = 1.5 --> sets the Limit Order price at exactly 1.5$
# - Debit Strategy:
# AbsolutePrice = -2.3 --> sets the Limit Order price at exactly -2.3$
"limitOrderAbsolutePrice": None,
# Target <credit|debit> premium amount: used to determine the number of contracts needed to reach the desired target amount
# - targetPremiumPct --> target premium is expressed as a percentage of the total Portfolio Net Liq (0 < targetPremiumPct < 1)
# - targetPremium --> target premium is a fixed dollar amount
# If both are specified, targetPremiumPct takes precedence. If none of them are specified,
# the number of contracts specified by the maxOrderQuantity parameter is used.
"targetPremiumPct": None,
# You can't have one without the other in this case below.
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": None,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": None,
"targetPremium": None,
# Defines how the profit target is calculated. Valid options are (case insensitive):
# - Premium: the profit target is a percentage of the premium paid/received.
# - Theta: the profit target is calculated based on the theta value of the position evaluated
# at self.thetaProfitDays from the time of entering the trade
# - TReg: the profit target is calculated as a percentage of the TReg (MaxLoss + openPremium)
# - Margin: the profit target is calculted as a percentage of the margin requirement (calculated based on
# self.portfolioMarginStress percentage upside/downside movement of the underlying)
"profitTargetMethod": "Premium",
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 0.6,
# Number of days into the future at which the theta of the position is calculated. Used if profitTargetMethod = "Theta"
"thetaProfitDays": None,
# Delta and Wing size used for Naked Put/Call and Spreads
"delta": 10,
"wingSize": 10,
# Put/Call delta for Iron Condor
"putDelta": 10,
"callDelta": 10,
# Net delta for Straddle, Iron Fly and Butterfly (using ATM strike if netDelta = None)
"netDelta": None,
# Put/Call Wing size for Iron Condor, Iron Fly
"putWingSize": 10,
"callWingSize": 10,
# Butterfly specific parameters
"butteflyType": None,
"butterflyLeftWingSize": 10,
"butterflyRightWingSize": 10,
# useSlice determines if we should use the chainOption slice data instead of optionProvider. Default is set to FALSE
"useSlice": True,
}
def __init__(self, context):
self.context = context
# Set default name (use the class name)
self.name = type(self).__name__
# Set the Strategy Name (optional)
self.nameTag = self.name
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
self.order = Order(context, self)
# This adds all the parameters to the class. We can also access them via self.parameter("parameterName")
self.context.structure.AddConfiguration(parent=self, **self.getMergedParameters())
# Initialize the contract utils
self.contractUtils = ContractUtils(context)
# Initialize the stats dictionary
# This will hold any details related to the underlying.
# For example, the underlying price at the time of opening of day
self.stats = Stats()
self.logger.debug(f'{self.name} -> __init__')
@staticmethod
def getNextOrderId():
Base.orderCount += 1
return Base.orderCount
@classmethod
def getMergedParameters(cls):
# Merge the DEFAULT_PARAMETERS from both classes
return {**cls.DEFAULT_PARAMETERS, **getattr(cls, "PARAMETERS", {})}
@classmethod
def parameter(cls, key, default=None):
return cls.getMergedParameters().get(key, default)
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
insights = []
# Start the timer
self.context.executionTimer.start('Alpha.Base -> Update')
self.logger.debug(f'{self.name} -> update -> start')
self.logger.debug(f'Is Warming Up: {self.context.IsWarmingUp}')
self.logger.debug(f'Is Market Open: {self.context.IsMarketOpen(self.underlyingSymbol)}')
self.logger.debug(f'Time: {self.context.Time}')
# Exit if the algorithm is warming up or the market is closed (avoid processing orders on the last minute as these will be executed the following day)
if self.context.IsWarmingUp or\
not self.context.IsMarketOpen(self.underlyingSymbol) or\
self.context.Time.time() >= time(16, 0, 0):
return insights
self.logger.debug(f'Did Alpha UPDATE after warmup?!?')
# This thing just passes the data to the performance tool so we can keep track of all
# symbols. This should not be needed if the culprit of the slonwess of backtesting is sorted.
self.context.performance.OnUpdate(data)
# Update the stats dictionary
self.syncStats()
# Check if the workingOrders are still OK to execute
self.context.structure.checkOpenPositions()
# Run the strategies to open new positions
filteredChain, lastClosedOrderTag = Scanner(self.context, self).Call(data)
self.logger.debug(f'Did Alpha SCAN')
self.logger.debug(f'Last Closed Order Tag: {lastClosedOrderTag}')
if filteredChain is not None:
if self.stats.hasOptions == False:
self.logger.info(f"Found options {self.context.Time.strftime('%A, %Y-%m-%d %H:%M')}")
self.stats.hasOptions = True
insights = self.CreateInsights(filteredChain, lastClosedOrderTag, data)
elif self.stats.hasOptions is None and self.context.Time.time() >= time(9, 35, 0):
self.stats.hasOptions = False
self.logger.info(f"No options data for {self.context.Time.strftime('%A, %Y-%m-%d %H:%M')}")
self.logger.debug(f"NOTE: Why could this happen? A: The filtering of the chain caused no contracts to be returned. Make sure to make a check on this.")
# Stop the timer
self.context.executionTimer.stop('Alpha.Base -> Update')
return Insight.Group(insights)
# Get the order with extra filters applied by the strategy
def GetOrder(self, chain):
raise NotImplementedError("GetOrder() not implemented")
# Previous method CreateOptionPosition.py#OpenPosition
def CreateInsights(self, chain, lastClosedOrderTag=None, data = Slice) -> List[Insight]:
insights = []
# Call the getOrder method of the class implementing OptionStrategy
order = self.getOrder(chain, data)
# Execute the order
# Exit if there is no order to process
if order is None:
return insights
# Start the timer
self.context.executionTimer.start('Alpha.Base -> CreateInsights')
# Get the context
context = self.context
order = [order] if not isinstance(order, list) else order
for o in order:
self.logger.debug(f"CreateInsights -> strategyId: {o['strategyId']}, strikes: {o['strikes']}")
for single_order in order:
position, workingOrder = self.buildOrderPosition(single_order, lastClosedOrderTag)
self.logger.debug(f"CreateInsights -> position: {position}")
self.logger.debug(f"CreateInsights -> workingOrder: {workingOrder}")
if position is None:
continue
if self.hasDuplicateLegs(single_order):
self.logger.debug(f"CreateInsights -> Duplicate legs found in order: {single_order}")
continue
orderId = position.orderId
orderTag = position.orderTag
insights.extend(workingOrder.insights)
# Add this position to the global dictionary
context.allPositions[orderId] = position
context.openPositions[orderTag] = orderId
# Keep track of all the working orders
context.workingOrders[orderTag] = {}
# Map each contract to the openPosition dictionary (key: expiryStr)
context.workingOrders[orderTag] = workingOrder
self.logger.debug(f"CreateInsights -> insights: {insights}")
# Stop the timer
self.context.executionTimer.stop('Alpha.Base -> CreateInsights')
return insights
def buildOrderPosition(self, order, lastClosedOrderTag=None):
# Get the context
context = self.context
# Get the list of contracts
contracts = order["contracts"]
self.logger.debug(f"buildOrderPosition -> contracts: {len(contracts)}")
# Exit if there are no contracts
if (len(contracts) == 0):
return [None, None]
useLimitOrders = self.useLimitOrders
useMarketOrders = not useLimitOrders
# Current timestamp
currentDttm = self.context.Time
strategyId = order["strategyId"]
contractSide = order["contractSide"]
# midPrices = order["midPrices"]
strikes = order["strikes"]
# IVs = order["IV"]
expiry = order["expiry"]
targetPremium = order["targetPremium"]
maxOrderQuantity = order["maxOrderQuantity"]
orderQuantity = order["orderQuantity"]
bidAskSpread = order["bidAskSpread"]
orderMidPrice = order["orderMidPrice"]
limitOrderPrice = order["limitOrderPrice"]
maxLoss = order["maxLoss"]
targetProfit = order.get("targetProfit", None)
# Expiry String
expiryStr = expiry.strftime("%Y-%m-%d")
self.logger.debug(f"buildOrderPosition -> expiry: {expiry}, expiryStr: {expiryStr}")
# Validate the order prior to submit
if ( # We have a minimum order quantity
orderQuantity == 0
# The sign of orderMidPrice must be consistent with whether this is a credit strategy (+1) or debit strategy (-1)
or np.sign(orderMidPrice) != 2 * int(order["creditStrategy"]) - 1
# Exit if the order quantity exceeds the maxOrderQuantity
or (self.validateQuantity and orderQuantity > maxOrderQuantity)
# Make sure the bid-ask spread is not too wide before opening the position.
# Only for Market orders. In case of limit orders, this validation is done at the time of execution of the Limit order
or (useMarketOrders and self.validateBidAskSpread
and abs(bidAskSpread) >
self.bidAskSpreadRatio * abs(orderMidPrice))):
return [None, None]
self.logger.debug(f"buildOrderPosition -> orderMidPrice: {orderMidPrice}, orderQuantity: {orderQuantity}, maxOrderQuantity: {maxOrderQuantity}")
# Get the current price of the underlying
underlyingPrice = self.contractUtils.getUnderlyingLastPrice(contracts[0])
# Get the Order Id and add it to the order dictionary
orderId = self.getNextOrderId()
# Create unique Tag to keep track of the order when the fill occurs
orderTag = f"{strategyId}-{orderId}"
strategyLegs = []
self.logger.debug(f"buildOrderPosition -> strategyLegs: {strategyLegs}")
for contract in contracts:
key = order["contractSideDesc"][contract.Symbol]
leg = Leg(
key=key,
strike=strikes[key],
expiry=order["contractExpiry"][key],
contractSide=contractSide[contract.Symbol],
symbol=contract.Symbol,
contract=contract,
)
strategyLegs.append(leg)
position = Position(
orderId=orderId,
orderTag=orderTag,
strategy=self,
strategyTag=self.nameTag,
strategyId=strategyId,
legs=strategyLegs,
expiry=expiry,
expiryStr=expiryStr,
targetProfit=targetProfit,
linkedOrderTag=lastClosedOrderTag,
contractSide=contractSide,
openDttm=currentDttm,
openDt=currentDttm.strftime("%Y-%m-%d"),
openDTE=(expiry.date() - currentDttm.date()).days,
limitOrder=useLimitOrders,
targetPremium=targetPremium,
orderQuantity=orderQuantity,
maxOrderQuantity=maxOrderQuantity,
openOrderMidPrice=orderMidPrice,
openOrderMidPriceMin=orderMidPrice,
openOrderMidPriceMax=orderMidPrice,
openOrderBidAskSpread=bidAskSpread,
openOrderLimitPrice=limitOrderPrice,
# underlyingPriceAtOrderOpen=underlyingPrice,
underlyingPriceAtOpen=underlyingPrice,
openOrder=OrderType(
limitOrderExpiryDttm=context.Time + self.limitOrderExpiration,
midPrice=orderMidPrice,
limitOrderPrice=limitOrderPrice,
bidAskSpread=bidAskSpread,
maxLoss=maxLoss
)
)
self.logger.debug(f"buildOrderPosition -> position: {position}")
# Create combo orders by using the provided method instead of always calling MarketOrder.
insights = []
# Create the orders
for contract in contracts:
# Get the contract side (Long/Short)
orderSide = contractSide[contract.Symbol]
insight = Insight.Price(
contract.Symbol,
position.openOrder.limitOrderExpiryDttm,
InsightDirection.Down if orderSide == -1 else InsightDirection.Up
)
insights.append(insight)
self.logger.debug(f"buildOrderPosition -> insights: {insights}")
# Map each contract to the openPosition dictionary (key: expiryStr)
workingOrder = WorkingOrder(
positionKey=orderId,
insights=insights,
limitOrderPrice=limitOrderPrice,
orderId=orderId,
strategy=self,
strategyTag=self.nameTag,
useLimitOrder=useLimitOrders,
orderType="open",
fills=0
)
self.logger.debug(f"buildOrderPosition -> workingOrder: {workingOrder}")
return [position, workingOrder]
def hasDuplicateLegs(self, order):
# Check if checkForDuplicatePositions is enabled
if not self.checkForDuplicatePositions:
return False
# Get the context
context = self.context
# Get the list of contracts
contracts = order["contracts"]
openPositions = context.openPositions
"""
workingOrders = context.workingOrders
# Get a list of orderIds from openPositions and workingOrders
orderIds = list(openPositions.keys()) + [workingOrder.orderId for workingOrder in workingOrders.values()]
# Iterate through the list of orderIds
for orderId in orderIds:
"""
# Iterate through open positions
for orderTag, orderId in list(openPositions.items()):
position = context.allPositions[orderId]
# Check if the expiry matches
if position.expiryStr != order["expiry"].strftime("%Y-%m-%d"):
continue
# Check if the strategy matches (if allowMultipleEntriesPerExpiry is False)
if not self.allowMultipleEntriesPerExpiry and position.strategyId == order["strategyId"]:
return True
# Compare legs
position_legs = set((leg.strike, leg.contractSide) for leg in position.legs)
order_legs = set((contract.Strike, order["contractSide"][contract.Symbol]) for contract in contracts)
if position_legs == order_legs:
return True
return False
"""
This method is called every minute to update the stats dictionary.
"""
def syncStats(self):
# Get the current day
currentDay = self.context.Time.date()
# Update the underlyingPriceAtOpen to be set at the start of each day
underlying = Underlying(self.context, self.underlyingSymbol)
if currentDay != self.stats.currentDay:
self.logger.trace(f"Previous day: {self.stats.currentDay} data {self.stats.underlyingPriceAtOpen}, {self.stats.highOfTheDay}, {self.stats.lowOfTheDay}")
self.stats.underlyingPriceAtOpen = underlying.Price()
# Update the high/low of the day
self.stats.highOfTheDay = underlying.Close()
self.stats.lowOfTheDay = underlying.Close()
# Add a dictionary to keep track of whether the price has touched the EMAs
self.stats.touchedEMAs = {}
self.logger.debug(f"Updating stats for {currentDay} Open: {self.stats.underlyingPriceAtOpen}, High: {self.stats.highOfTheDay}, Low: {self.stats.lowOfTheDay}")
self.stats.currentDay = currentDay
self.stats.hasOptions = None
# This is like poor mans consolidator
frequency = 5 # minutes
# Continue the processing only if we are at the specified schedule
if self.context.Time.minute % frequency != 0:
return None
# This should add the data for the underlying symbol chart.
self.context.charting.updateCharts(symbol = self.underlyingSymbol)
# Update the high/low of the day
self.stats.highOfTheDay = max(self.stats.highOfTheDay, underlying.Close())
self.stats.lowOfTheDay = min(self.stats.lowOfTheDay, underlying.Close())
# The method will be called each time a consolidator is receiving data. We have a default one of 5 minutes
# so if we need something to happen every 5 minutes this can be used for that.
def dataConsolidated(self, sender, consolidated):
pass
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# Security additions and removals are pushed here.
# This can be used for setting up algorithm state.
# changes.AddedSecurities
# changes.RemovedSecurities
pass
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class CCModel(Base):
PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 30, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": time(16, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes = 5),
# Maximum number of open positions at any given time
"maxActivePositions": 1,
# Control whether to allow multiple positions to be opened for the same Expiration date
"allowMultipleEntriesPerExpiry": False,
# Minimum time distance between opening two consecutive trades
"minimumTradeScheduleDistance": timedelta(minutes=10),
# Days to Expiration
"dte": 7,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 14,
"useLimitOrders": True,
"limitOrderRelativePriceAdjustment": 0.2,
# Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified
"limitOrderAbsolutePrice": 0.30,
"limitOrderExpiration": timedelta(minutes=15),
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM
# strike for each available expiration
# Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesLeft": 35,
"nStrikesRight": 35,
# TODO fix this and set it based on buying power.
# "maxOrderQuantity": 25,
"validateQuantity": False,
"targetPremiumPct": 0.015,
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": 0.05,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": 0.8,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 0.4,
"bidAskSpreadRatio": 0.4,
"validateBidAskSpread": True,
"marketCloseCutoffTime": None, #time(15, 45, 0),
# Put/Call Wing size for Iron Condor, Iron Fly
# "targetPremium": 500,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# You can change the name here
self.name = "CCModel"
self.nameTag = "CCModel"
self.ticker = "TSLA"
self.context.structure.AddUnderlying(self, self.ticker)
def getOrder(self, chain, data):
if data.ContainsKey(self.underlyingSymbol):
self.logger.debug(f"CCModel -> getOrder: Data contains key {self.underlyingSymbol}")
# Based on maxActivePositions set to 1. We should already check if there is an open position or
# working order. If there is, then this will not even run.
call = self.order.getNakedOrder(
chain,
'call',
fromPrice = self.minPremium,
toPrice = self.maxPremium,
sell=True
)
self.logger.debug(f"CCModel -> getOrder: Call: {call}")
if call is not None:
return [call]
else:
return None
else:
return None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
from Data.GoogleSheetsData import GoogleSheetsData
class FPLModel(Base):
PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 20, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": None, # time(13, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes = 5),
# Maximum number of open positions at any given time
"maxActivePositions": 2,
# Days to Expiration
"dte": 0,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 0,
"useLimitOrders": True,
"limitOrderRelativePriceAdjustment": 0.2,
"limitOrderAbsolutePrice": 0.5,
"limitOrderExpiration": timedelta(hours=1),
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM
# strike for each available expiration
# Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesLeft": 18,
"nStrikesRight": 18,
"maxOrderQuantity": 40,
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": 0.50,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": 1.5,
"profitTarget": 0.5,
"bidAskSpreadRatio": 0.4,
"validateBidAskSpread": True,
"marketCloseCutoffTime": None, #time(15, 45, 0),
# "targetPremium": 500,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# You can change the name here
self.name = "FPLModel"
self.nameTag = "FPL"
self.ticker = "SPX"
self.context.structure.AddUnderlying(self, self.ticker)
self.customSymbol = self.context.AddData(GoogleSheetsData, "SPXT", Resolution.Minute).Symbol
def getOrder(self, chain, data):
if data.ContainsKey(self.customSymbol):
self.logger.info(f'L: just got a new trade!! {data[self.customSymbol]}')
print(f'P: just got a new trade!! {data[self.customSymbol]}')
trade_instructions = data[self.customSymbol]
tradeType = trade_instructions.Type
condor = False
self.logger.info(f'L: instructions: {trade_instructions}')
print(f'P: instructions: {trade_instructions}')
if tradeType == 'Call Credit Spreads':
action = 'call'
strike = trade_instructions.CallStrike
elif tradeType == 'Put Credit Spreads':
action = 'put'
strike = trade_instructions.PutStrike
elif tradeType == 'Iron Condor':
callStrike = trade_instructions.CallStrike
putStrike = trade_instructions.PutStrike
condor = True
else:
return None
if condor:
return self.order.getIronCondorOrder(
chain,
callStrike = callStrike,
putStrike = putStrike,
callWingSize = 5,
putWingSize = 5
)
else:
return self.order.getSpreadOrder(
chain,
action,
strike=strike,
wingSize=5,
sell=True
)
else:
return None
# if not chain.ContainsKey('SPXTRADES'):
# return []
# customTrades = chain['SPXTRADES']
# if customTrades is None:
# return []
# # Check if the current time is past the instructed time
# if self.context.Time < customTrades.Time:
# return []
# # Use the customTrades data to generate insights
# tradeType = customTrades.Type
# call_strike = customTrades.CallStrike
# put_strike = customTrades.PutStrike
# minimum_premium = customTrades.MinimumPremium
# self.Log(f'{data.EndTime}: Close: {data.Close}')
# self.Plot(self.custom_data_symbol, 'Price', data.Close)
# strike = self.context.underlyingPrice() + self.parameters["distance"]
# region imports
from AlgorithmImports import *
# endregion
from .Base import Base
from Tools import Underlying
class IBS(Base):
PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 30, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": time(16, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes = 5),
# Maximum number of open positions at any given time
"maxActivePositions": 10,
# Control whether to allow multiple positions to be opened for the same Expiration date
"allowMultipleEntriesPerExpiry": True,
# Minimum time distance between opening two consecutive trades
"minimumTradeScheduleDistance": timedelta(minutes=10),
# Days to Expiration
"dte": 30,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 10,
"dteThreshold": None,
"useLimitOrders": True,
"limitOrderRelativePriceAdjustment": 0.2,
# Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified
"limitOrderAbsolutePrice": 1.0,
"limitOrderExpiration": timedelta(minutes=10),
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM
# strike for each available expiration
# Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesLeft": 18,
"nStrikesRight": 18,
# TODO fix this and set it based on buying power.
# "maxOrderQuantity": 200,
# COMMENT OUT this one below because it caused the orderQuantity to be 162 and maxOrderQuantity to be 10 so it would not place trades.
"targetPremiumPct": 0.01,
"validateQuantity": False,
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": 0.9,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": 1.2,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.0,
"bidAskSpreadRatio": 0.4,
"validateBidAskSpread": True,
"marketCloseCutoffTime": time(15, 45, 0),
# Put/Call Wing size for Iron Condor, Iron Fly
"putWingSize": 5,
# "targetPremium": 500,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# You can change the name here
self.name = "IBS"
self.nameTag = "IBS"
self.ticker = "TQQQ"
self.context.structure.AddUnderlying(self, self.ticker)
self.logger.debug(f"{self.__class__.__name__} -> __init__ -> AddUnderlying")
self._ibs = InternalBarStrength()
self.context.register_indicator(self.underlyingSymbol, self._ibs, Resolution.DAILY)
def getOrder(self, chain, data):
self.logger.debug(f"{self.__class__.__name__} -> getOrder -> start")
self.logger.debug(f"IBS -> getOrder -> data.ContainsKey(self.underlyingSymbol): {data.ContainsKey(self.underlyingSymbol)}")
self.logger.debug(f"IBS -> getOrder -> Underlying Symbol: {self.underlyingSymbol}")
current_time = self.context.Time.time()
market_open_time = time(9, 35, 0)
# self.logger.debug(f"IBS -> getOrder -> Current Time: {current_time}")
# self.logger.debug(f"IBS -> getOrder -> Market Open Time: {market_open_time}")
# Check if it's market open time
if current_time != market_open_time:
# self.logger.debug(f"IBS -> getOrder -> Not market open time, returning None")
return None
if data.ContainsKey(self.underlyingSymbol):
ibs_value = self._ibs.current.value
self.logger.debug(f"IBS -> getOrder -> IBS Value: {ibs_value}")
if ibs_value > 0.2:
self.logger.debug(f"IBS -> getOrder -> IBS Value > 0.2, returning None")
return None
underlying = Underlying(self.context, self.underlyingSymbol)
self.logger.debug(f"IBS -> getOrder: Data contains key {self.underlyingSymbol}")
put = self.order.getSpreadOrder(
chain,
'put',
delta=30,
strike=underlying.Price(),
wingSize=self.putWingSize,
sell=True
)
self.logger.debug(f"IBS -> getOrder: Put order details: {put}")
if put is not None:
return [put]
else:
self.logger.debug(f"IBS -> getOrder: Data does not contain key {self.underlyingSymbol}, returning None")
return None#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class SPXButterfly(Base):
PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 30, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": time(16, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes = 15),
# Maximum number of open positions at any given time
"maxActivePositions": 30,
# Control whether to allow multiple positions to be opened for the same Expiration date
"allowMultipleEntriesPerExpiry": True,
# Minimum time distance between opening two consecutive trades
"minimumTradeScheduleDistance": timedelta(minutes=10),
# Days to Expiration
"dte": 0,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 0,
"useLimitOrders": True,
"limitOrderRelativePriceAdjustment": 0.2,
# Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified
"limitOrderExpiration": timedelta(minutes=20),
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM
# strike for each available expiration
# Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesLeft": 18,
"nStrikesRight": 18,
# TODO fix this and set it based on buying power.
"maxOrderQuantity": 1000,
"targetPremiumPct": 0.015,
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": None,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": None,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
# "profitTarget": 1.0,
"bidAskSpreadRatio": 0.4,
"validateBidAskSpread": True,
"marketCloseCutoffTime": time(16, 10, 0),
# Put/Call Wing size for Iron Condor, Iron Fly
"butterflyLeftWingSize": 35,
"butterflyRightWingSize": 35,
# "targetPremium": 500,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# You can change the name here
self.name = "SPXButterfly"
self.nameTag = "SPXButterfly"
self.ticker = "SPX"
self.context.structure.AddUnderlying(self, self.ticker)
def getOrder(self, chain, data):
# Open trades at 13:00
if data.ContainsKey(self.underlyingSymbol):
trade_times = [time(9, 45, 0)]
current_time = self.context.Time.time()
if current_time not in trade_times:
return None
fly = self.order.getIronFlyOrder(
chain,
callWingSize=self.butterflyLeftWingSize,
putWingSize=self.butterflyRightWingSize,
sell=True
)
if fly is not None:
return fly
else:
return None
else:
return None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class SPXCondor(Base):
PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 30, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": time(16, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes = 15),
# Maximum number of open positions at any given time
"maxActivePositions": 30,
# Control whether to allow multiple positions to be opened for the same Expiration date
"allowMultipleEntriesPerExpiry": True,
# Minimum time distance between opening two consecutive trades
"minimumTradeScheduleDistance": timedelta(minutes=10),
# Days to Expiration
"dte": 0,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 0,
"useLimitOrders": True,
"limitOrderRelativePriceAdjustment": 0.2,
# Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified
"limitOrderAbsolutePrice": 0.90,
"limitOrderExpiration": timedelta(minutes=5),
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM
# strike for each available expiration
# Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesLeft": 18,
"nStrikesRight": 18,
# TODO fix this and set it based on buying power.
"maxOrderQuantity": 1000,
"targetPremiumPct": 0.015,
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": None,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": None,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.0,
"bidAskSpreadRatio": 0.4,
"validateBidAskSpread": True,
"marketCloseCutoffTime": None, #time(15, 45, 0),
# Put/Call Wing size for Iron Condor, Iron Fly
"putWingSize": 5,
"callWingSize": 5,
# "targetPremium": 500,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# You can change the name here
self.name = "SPXCondor"
self.nameTag = "SPXCondor"
self.ticker = "SPX"
self.context.structure.AddUnderlying(self, self.ticker)
def getOrder(self, chain, data):
# Best time to open the trade: 9:45 + 10:15 + 12:30 + 13:00 + 13:30 + 13:45 + 14:00 + 15:00 + 15:15 + 15:45
# https://tradeautomationtoolbox.com/byob-ticks/?save=admZ4dG
if data.ContainsKey(self.underlyingSymbol):
trade_times = [time(9, 45, 0), time(13, 10, 0), time(15, 15, 0)]
current_time = self.context.Time.time()
if current_time not in trade_times:
return None
strike = self.order.strategyBuilder.getATMStrike(chain)
condor = self.order.getIronCondorOrder(
chain,
callStrike=strike + 30,
callWingSize=self.callWingSize,
putStrike=strike - 30,
putWingSize=self.putWingSize,
sell=True
)
if condor is not None:
return condor
else:
return None
else:
return None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class SPXic(Base):
PARAMETERS = {
# The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time
"scheduleStartTime": time(9, 30, 0),
# The stop time at which the algorithm will look to open a new position.
"scheduleStopTime": time(16, 0, 0),
# Periodic interval with which the algorithm will check to open new positions
"scheduleFrequency": timedelta(minutes = 5),
# Maximum number of open positions at any given time
"maxActivePositions": 10,
# Control whether to allow multiple positions to be opened for the same Expiration date
"allowMultipleEntriesPerExpiry": True,
# Minimum time distance between opening two consecutive trades
"minimumTradeScheduleDistance": timedelta(minutes=10),
# Days to Expiration
"dte": 0,
# The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected
"dteWindow": 0,
"useLimitOrders": True,
"limitOrderRelativePriceAdjustment": 0.2,
# Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified
"limitOrderAbsolutePrice": 1.0,
"limitOrderExpiration": timedelta(minutes=5),
# Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM
# strike for each available expiration
# Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18
"nStrikesLeft": 18,
"nStrikesRight": 18,
# TODO fix this and set it based on buying power.
# "maxOrderQuantity": 200,
# COMMENT OUT this one below because it caused the orderQuantity to be 162 and maxOrderQuantity to be 10 so it would not place trades.
"targetPremiumPct": 0.01,
"validateQuantity": False,
# Minimum premium accepted for opening a new position. Setting this to None disables it.
"minPremium": 0.9,
# Maximum premium accepted for opening a new position. Setting this to None disables it.
"maxPremium": 1.2,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.0,
"bidAskSpreadRatio": 0.4,
"validateBidAskSpread": True,
"marketCloseCutoffTime": time(15, 45, 0),
# Put/Call Wing size for Iron Condor, Iron Fly
"putWingSize": 10,
"callWingSize": 10,
# "targetPremium": 500,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# You can change the name here
self.name = "SPXic"
self.nameTag = "SPXic"
self.ticker = "SPX"
self.context.structure.AddUnderlying(self, self.ticker)
self.logger.debug(f"{self.__class__.__name__} -> __init__ -> AddUnderlying")
def getOrder(self, chain, data):
self.logger.debug(f"{self.__class__.__name__} -> getOrder -> start")
self.logger.debug(f"SPXic -> getOrder -> data.ContainsKey(self.underlyingSymbol): {data.ContainsKey(self.underlyingSymbol)}")
self.logger.debug(f"SPXic -> getOrder -> Underlying Symbol: {self.underlyingSymbol}")
# Best time to open the trade: 9:45 + 10:15 + 12:30 + 13:00 + 13:30 + 13:45 + 14:00 + 15:00 + 15:15 + 15:45
# https://tradeautomationtoolbox.com/byob-ticks/?save=admZ4dG
if data.ContainsKey(self.underlyingSymbol):
self.logger.debug(f"SPXic -> getOrder: Data contains key {self.underlyingSymbol}")
# trade_times = [time(9, 45, 0), time(10, 15, 0), time(12, 30, 0), time(13, 0, 0), time(13, 30, 0), time(13, 45, 0), time(14, 0, 0), time(15, 0, 0), time(15, 15, 0), time(15, 45, 0)]
trade_times = [time(9, 45, 0), time(10, 15, 0), time(12, 30, 0), time(13, 0, 0), time(13, 30, 0), time(13, 45, 0), time(14, 0, 0)]
# trade_times = [time(hour, minute, 0) for hour in range(9, 15) for minute in range(0, 60, 30) if not (hour == 15 and minute > 0)]
# Remove the microsecond from the current time
current_time = self.context.Time.time().replace(microsecond=0)
self.logger.debug(f"SPXic -> getOrder -> current_time: {current_time}")
self.logger.debug(f"SPXic -> getOrder -> trade_times: {trade_times}")
self.logger.debug(f"SPXic -> getOrder -> current_time in trade_times: {current_time in trade_times}")
if current_time not in trade_times:
return None
call = self.order.getSpreadOrder(
chain,
'call',
fromPrice=self.minPremium,
toPrice=self.maxPremium,
wingSize=self.callWingSize,
sell=True
)
put = self.order.getSpreadOrder(
chain,
'put',
fromPrice=self.minPremium,
toPrice=self.maxPremium,
wingSize=self.putWingSize,
sell=True
)
self.logger.debug(f"SPXic -> getOrder: Call: {call}")
self.logger.debug(f"SPXic -> getOrder: Put: {put}")
if call is not None and put is not None:
return [call, put]
else:
return None
else:
return None
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
from .OrderBuilder import OrderBuilder
from Tools import ContractUtils, BSM, Logger
from Strategy import Position
class Order:
def __init__(self, context, base):
self.context = context
self.base = base
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
# Initialize the BSM pricing model
self.bsm = BSM(context)
# Initialize the contract utils
self.contractUtils = ContractUtils(context)
# Initialize the Strategy Builder
self.strategyBuilder = OrderBuilder(context)
# Function to evaluate the P&L of the position
def fValue(self, spotPrice, contracts, sides=None, atTime=None, openPremium=None):
# Compute the theoretical value at the given Spot price and point in time
prices = np.array(
[
self.bsm.bsmPrice(
contract,
sigma=contract.BSMImpliedVolatility,
spotPrice=spotPrice,
atTime=atTime,
)
for contract in contracts
]
)
# Total value of the position
value = openPremium + sum(prices * np.array(sides))
return value
def getPayoff(self, spotPrice, contracts, sides):
# Exit if there are no contracts to process
if len(contracts) == 0:
return 0
# Initialize the counter
n = 0
# initialize the payoff
payoff = 0
for contract in contracts:
# direction: Call -> +1, Put -> -1
direction = 2*int(contract.Right == OptionRight.Call)-1
# Add the payoff of the current contract
payoff += sides[n] * max(0, direction * (spotPrice - contract.Strike))
# Increment the counter
n += 1
# Return the payoff
return payoff
def computeOrderMaxLoss(self, contracts, sides):
# Exit if there are no contracts to process
if len(contracts) == 0:
return 0
# Get the current price of the underlying
UnderlyingLastPrice = self.contractUtils.getUnderlyingLastPrice(contracts[0])
# Evaluate the payoff at the extreme (spotPrice = 0)
maxLoss = self.getPayoff(0, contracts, sides)
# Evaluate the payoff at each strike
for contract in contracts:
maxLoss = min(maxLoss, self.getPayoff(contract.Strike, contracts, sides))
# Evaluate the payoff at the extreme (spotPrice = 10x higher)
maxLoss = min(maxLoss, self.getPayoff(UnderlyingLastPrice*10, contracts, sides))
# Cap the payoff at zero: we are only interested in losses
maxLoss = min(0, maxLoss)
# Return the max loss
return maxLoss
def getMaxOrderQuantity(self):
# Get the context
context = self.context
# Get the maximum order quantity parameter
maxOrderQuantity = self.base.maxOrderQuantity
# Get the targetPremiumPct
targetPremiumPct = self.base.targetPremiumPct
# Check if we are using dynamic premium targeting
if targetPremiumPct != None:
# Scale the maxOrderQuantity consistently with the portfolio growth
maxOrderQuantity = round(maxOrderQuantity * (1 + context.Portfolio.TotalProfit / context.initialAccountValue))
# Make sure we don't go below the initial parameter value
maxOrderQuantity = max(self.base.maxOrderQuantity, maxOrderQuantity)
# Return the result
return maxOrderQuantity
def isDuplicateOrder(self, contracts, sides):
# Loop through all working orders of this strategy
for orderTag in list(self.context.workingOrders):
# Get the current working order
workingOrder = self.context.workingOrders.get(orderTag)
# Check if the number of contracts of this working order is the same as the number of contracts in the input list
if workingOrder and workingOrder.insights == len(contracts):
# Initialize the isDuplicate flag. Assume it's duplicate unless we find a mismatch
isDuplicate = True
# Loop through each pair (contract, side)
for contract, side in zip(contracts, sides):
# Get the details of the contract
contractInfo = workingOrder.get(contract.Symbol)
# If we cannot find this contract then it's not a duplicate
if contractInfo == None:
isDuplicate = False
break
# Get the orderSide and expiryStr properties
orderSide = contractInfo.get("orderSide")
expiryStr = contractInfo.get("expiryStr")
# Check for a mismatch
if (orderSide != side # Found the contract but it's on a different side (Sell/Buy)
or expiryStr != contract.Expiry.strftime("%Y-%m-%d") # Found the contract but it's on a different Expiry
):
# It's not a duplicate. Brake this innermost loop
isDuplicate = False
break
# Exit if we found a duplicate
if isDuplicate:
return isDuplicate
# If we got this far, there are no duplicates
return False
def limitOrderPrice(self, sides, orderMidPrice):
# Get the limitOrderAbsolutePrice
limitOrderAbsolutePrice = self.base.limitOrderAbsolutePrice
# Get the minPremium and maxPremium to determine the limit price based on that.
minPremium = self.base.minPremium
maxPremium = self.base.maxPremium
# Get the limitOrderRelativePriceAdjustment
limitOrderRelativePriceAdjustment = self.base.limitOrderRelativePriceAdjustment or 0.0
# Compute Limit Order price
if limitOrderAbsolutePrice is not None:
if abs(orderMidPrice) < 1e-5:
limitOrderRelativePriceAdjustment = 0
else:
# Compute the relative price adjustment (needed to adjust each leg with the same proportion)
limitOrderRelativePriceAdjustment = limitOrderAbsolutePrice / orderMidPrice - 1
# Use the specified absolute price
limitOrderPrice = limitOrderAbsolutePrice
else:
# Set the Limit Order price (including slippage)
limitOrderPrice = orderMidPrice * (1 + limitOrderRelativePriceAdjustment)
# Compute the total slippage
totalSlippage = sum(list(map(abs, sides))) * self.base.slippage
# Add slippage to the limit order
limitOrderPrice -= totalSlippage
# Adjust the limit order price based on minPremium and maxPremium
if minPremium is not None and limitOrderPrice < minPremium:
limitOrderPrice = minPremium
if maxPremium is not None and limitOrderPrice > maxPremium:
limitOrderPrice = maxPremium
return limitOrderPrice
# Create dictionary with the details of the order to be submitted
def getOrderDetails(self, contracts, sides, strategy, sell=True, strategyId=None, expiry=None, sidesDesc=None):
# Exit if there are no contracts to process
if not contracts:
return
# Exit if we already have a working order for the same set of contracts and sides
if self.isDuplicateOrder(contracts, sides):
return
# Get the context
context = self.context
# Set the Strategy Id (if not specified)
strategyId = strategyId or strategy.replace(" ", "")
# Get the Expiration from the first contract (unless otherwise specified
expiry = expiry or contracts[0].Expiry
# Get the last trading day for the given expiration date (in case it falls on a holiday)
expiryLastTradingDay = self.context.lastTradingDay(expiry)
# Set the date/time threshold by which the position must be closed (on the last trading day before expiration)
expiryMarketCloseCutoffDttm = None
if self.base.marketCloseCutoffTime != None:
expiryMarketCloseCutoffDttm = datetime.combine(expiryLastTradingDay, self.base.marketCloseCutoffTime)
# Dictionary to map each contract symbol to the side (short/long)
contractSide = {}
# Dictionary to map each contract symbol to its description
contractSideDesc = {}
# Dictionary to map each contract symbol to the actual contract object
contractDictionary = {}
# Dictionaries to keep track of all the strikes, Delta and IV
strikes = {}
delta = {}
gamma = {}
vega = {}
theta = {}
rho = {}
vomma = {}
elasticity = {}
IV = {}
midPrices = {}
contractExpiry = {}
# Compute the Greeks for each contract (if not already available)
if self.base.computeGreeks:
self.bsm.setGreeks(contracts)
# Compute the Mid-Price and Bid-Ask spread for the full order
orderMidPrice = 0.0
bidAskSpread = 0.0
# Get the slippage parameter (if available)
slippage = self.base.slippage or 0.0
# Get the maximum order quantity
maxOrderQuantity = self.getMaxOrderQuantity()
# Get the targetPremiumPct
targetPremiumPct = self.base.targetPremiumPct
# Check if we are using dynamic premium targeting
if targetPremiumPct != None:
# Make sure targetPremiumPct is bounded to the range [0, 1])
targetPremiumPct = max(0.0, min(1.0, targetPremiumPct))
# Compute the target premium as a percentage of the total net portfolio value
targetPremium = context.Portfolio.TotalPortfolioValue * targetPremiumPct
else:
targetPremium = self.base.targetPremium
# Check if we have a description for the contracts
if sidesDesc == None:
# Temporary dictionaries to lookup a description
optionTypeDesc = {OptionRight.Put: "Put", OptionRight.Call: "Call"}
optionSideDesc = {-1: "short", 1: "long"}
# create a description for each contract: <long|short><Call|Put>
sidesDesc = list(map(lambda contract, side: f"{optionSideDesc[np.sign(side)]}{optionTypeDesc[contract.Right]}", contracts, sides))
n = 0
for contract in contracts:
# Contract Side: +n -> Long, -n -> Short
orderSide = sides[n]
# Contract description (<long|short><Call|Put>)
orderSideDesc = sidesDesc[n]
# Store it in the dictionary
contractSide[contract.Symbol] = orderSide
contractSideDesc[contract.Symbol] = orderSideDesc
contractDictionary[contract.Symbol] = contract
# Set the strike in the dictionary -> "<short|long><Call|Put>": <strike>
strikes[f"{orderSideDesc}"] = contract.Strike
# Add the contract expiration time and add 16 hours to the market close
contractExpiry[f"{orderSideDesc}"] = contract.Expiry + timedelta(hours = 16)
if hasattr(contract, "BSMGreeks"):
# Set the Greeks and IV in the dictionary -> "<short|long><Call|Put>": <greek|IV>
delta[f"{orderSideDesc}"] = contract.BSMGreeks.Delta
gamma[f"{orderSideDesc}"] = contract.BSMGreeks.Gamma
vega[f"{orderSideDesc}"] = contract.BSMGreeks.Vega
theta[f"{orderSideDesc}"] = contract.BSMGreeks.Theta
rho[f"{orderSideDesc}"] = contract.BSMGreeks.Rho
vomma[f"{orderSideDesc}"] = contract.BSMGreeks.Vomma
elasticity[f"{orderSideDesc}"] = contract.BSMGreeks.Elasticity
IV[f"{orderSideDesc}"] = contract.BSMImpliedVolatility
# Get the latest mid-price
midPrice = self.contractUtils.midPrice(contract)
# Store the midPrice in the dictionary -> "<short|long><Call|Put>": midPrice
midPrices[f"{orderSideDesc}"] = midPrice
# Compute the bid-ask spread
bidAskSpread += self.contractUtils.bidAskSpread(contract)
# Adjusted mid-price (include slippage). Take the sign of orderSide to determine the direction of the adjustment
# adjustedMidPrice = midPrice + np.sign(orderSide) * slippage
# Keep track of the total credit/debit or the order
orderMidPrice -= orderSide * midPrice
# Increment counter
n += 1
limitOrderPrice = self.limitOrderPrice(sides=sides, orderMidPrice=orderMidPrice)
# Round the prices to the nearest cent
orderMidPrice = round(orderMidPrice, 2)
limitOrderPrice = round(limitOrderPrice, 2)
# Determine which price is used to compute the order quantity
if self.base.useLimitOrders:
# Use the Limit Order price
qtyMidPrice = limitOrderPrice
else:
# Use the contract mid-price
qtyMidPrice = orderMidPrice
if targetPremium == None:
# No target premium was provided. Use maxOrderQuantity
orderQuantity = maxOrderQuantity
else:
# Make sure we are not exceeding the available portfolio margin
targetPremium = min(context.Portfolio.MarginRemaining, targetPremium)
# Determine the order quantity based on the target premium
if abs(qtyMidPrice) <= 1e-5:
orderQuantity = 1
else:
orderQuantity = abs(targetPremium / (qtyMidPrice * 100))
# Different logic for Credit vs Debit strategies
if sell: # Credit order
# Sell at least one contract
orderQuantity = max(1, round(orderQuantity))
else: # Debit order
# Make sure the total price does not exceed the target premium
orderQuantity = math.floor(orderQuantity)
# Get the current price of the underlying
security = context.Securities[self.base.underlyingSymbol]
underlyingPrice = context.GetLastKnownPrice(security).Price
# Compute MaxLoss
maxLoss = self.computeOrderMaxLoss(contracts, sides)
# Get the Profit Target percentage is specified (default is 50%)
profitTargetPct = self.base.parameter("profitTarget", 0.5)
# Compute T-Reg margin based on the MaxLoss
TReg = min(0, orderMidPrice + maxLoss) * orderQuantity
portfolioMarginStress = self.context.portfolioMarginStress
if self.base.computeGreeks:
# Compute the projected P&L of the position following a % movement of the underlying up or down
portfolioMargin = min(
0,
self.fValue(underlyingPrice * (1-portfolioMarginStress), contracts, sides=sides, atTime=context.Time, openPremium=midPrice),
self.fValue(underlyingPrice * (1+portfolioMarginStress), contracts, sides=sides, atTime=context.Time, openPremium=midPrice)
) * orderQuantity
order = {
"strategyId": strategyId,
"expiry": expiry,
"orderMidPrice": orderMidPrice,
"limitOrderPrice": limitOrderPrice,
"bidAskSpread": bidAskSpread,
"orderQuantity": orderQuantity,
"maxOrderQuantity": maxOrderQuantity,
"targetPremium": targetPremium,
"strikes": strikes,
"sides": sides,
"sidesDesc": sidesDesc,
"contractSide": contractSide,
"contractSideDesc": contractSideDesc,
"contracts": contracts,
"contractExpiry": contractExpiry,
"creditStrategy": sell,
"maxLoss": maxLoss,
"expiryLastTradingDay": expiryLastTradingDay,
"expiryMarketCloseCutoffDttm": expiryMarketCloseCutoffDttm
}
# Create order details
# order = {"expiry": expiry
# , "expiryStr": expiry.strftime("%Y-%m-%d")
# , "expiryLastTradingDay": expiryLastTradingDay
# , "expiryMarketCloseCutoffDttm": expiryMarketCloseCutoffDttm
# , "strategyId": strategyId
# , "strategy": strategy
# , "sides": sides
# , "sidesDesc": sidesDesc
# , "contractExpiry": contractExpiry
# , "contractSide": contractSide
# , "contractSideDesc": contractSideDesc
# , "contractDictionary": contractDictionary
# , "strikes": strikes
# , "midPrices": midPrices
# , "delta": delta
# , "gamma": gamma
# , "vega": vega
# , "theta": theta
# , "rho": rho
# , "vomma": vomma
# , "elasticity": elasticity
# , "IV": IV
# , "contracts": contracts
# , "targetPremium": targetPremium
# , "maxOrderQuantity": maxOrderQuantity
# , "orderQuantity": orderQuantity
# , "creditStrategy": sell
# , "maxLoss": maxLoss
# , "TReg": TReg
# , "portfolioMargin": portfolioMargin
# , "open": {"orders": []
# , "fills": 0
# , "filled": False
# , "stalePrice": False
# , "orderMidPrice": orderMidPrice
# , "limitOrderPrice": limitOrderPrice
# , "qtyMidPrice": qtyMidPrice
# , "limitOrder": parameters["useLimitOrders"]
# , "limitOrderExpiryDttm": context.Time + parameters["limitOrderExpiration"]
# , "bidAskSpread": bidAskSpread
# , "fillPrice": 0.0
# }
# , "close": {"orders": []
# , "fills": 0
# , "filled": False
# , "stalePrice": False
# , "orderMidPrice": 0.0
# , "fillPrice": 0.0
# }
# }
# Determine the method used to calculate the profit target
profitTargetMethod = self.base.parameter("profitTargetMethod", "Premium").lower()
thetaProfitDays = self.base.parameter("thetaProfitDays", 0)
# Set a custom profit target unless we are using the default Premium based methodology
if profitTargetMethod != "premium":
if profitTargetMethod == "theta" and thetaProfitDays > 0:
# Calculate the P&L of the position at T+[thetaProfitDays]
thetaPnL = self.fValue(underlyingPrice, contracts, sides=sides, atTime=context.Time + timedelta(days=thetaProfitDays), openPremium=midPrice)
# Profit target is a percentage of the P&L calculated at T+[thetaProfitDays]
profitTargetAmt = profitTargetPct * abs(thetaPnL) * orderQuantity
elif profitTargetMethod == "treg":
# Profit target is a percentage of the TReg requirement
profitTargetAmt = profitTargetPct * abs(TReg) * orderQuantity
elif profitTargetMethod == "margin":
# Profit target is a percentage of the margin requirement
profitTargetAmt = profitTargetPct * abs(portfolioMargin) * orderQuantity
else:
pass
# Set the target profit for the position
order["targetProfit"] = profitTargetAmt
return order
def getNakedOrder(self, contracts, type, strike = None, delta = None, fromPrice = None, toPrice = None, sell = True):
if sell:
# Short option contract
sides = [-1]
strategy = f"Short {type.title()}"
else:
# Long option contract
sides = [1]
strategy = f"Long {type.title()}"
type = type.lower()
if type == "put":
# Get all Puts with a strike lower than the given strike and delta lower than the given delta
sorted_contracts = self.strategyBuilder.getPuts(contracts, toDelta = delta, toStrike = strike, fromPrice = fromPrice, toPrice = toPrice)
elif type == "call":
# Get all Calls with a strike higher than the given strike and delta lower than the given delta
sorted_contracts = self.strategyBuilder.getCalls(contracts, toDelta = delta, fromStrike = strike, fromPrice = fromPrice, toPrice = toPrice)
else:
self.logger.error(f"Input parameter type = {type} is invalid. Valid values: Put|Call.")
return
# Check if we got any contracts
if len(sorted_contracts):
# Create order details
order = self.getOrderDetails([sorted_contracts[0]], sides, strategy, sell)
# Return the order
return order
# Create order details for a Straddle order
def getStraddleOrder(self, contracts, strike = None, netDelta = None, sell = True):
if sell:
# Short Straddle
sides = [-1, -1]
strategy = "Short Straddle"
else:
# Long Straddle
sides = [1, 1]
strategy = "Long Straddle"
# Delta strike selection (in case the Iron Fly is not centered on the ATM strike)
delta = None
# Make sure the netDelta is less than 50
if netDelta != None and abs(netDelta) < 50:
delta = 50 + netDelta
if strike == None and delta == None:
# Standard Straddle: get the ATM contracts
legs = self.strategyBuilder.getATM(contracts)
else:
legs = []
# This is a Straddle centered at the given strike or Net Delta.
# Get the Put at the requested delta or strike
puts = self.strategyBuilder.getPuts(contracts, toDelta = delta, toStrike = strike)
if(len(puts) > 0):
put = puts[0]
# Get the Call at the same strike as the Put
calls = self.strategyBuilder.getCalls(contracts, fromStrike = put.Strike)
if(len(calls) > 0):
call = calls[0]
# Collect both legs
legs = [put, call]
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell)
# Return the order
return order
# Create order details for a Strangle order
def getStrangleOrder(self, contracts, callDelta = None, putDelta = None, callStrike = None, putStrike = None, sell = True):
if sell:
# Short Strangle
sides = [-1, -1]
strategy = "Short Strangle"
else:
# Long Strangle
sides = [1, 1]
strategy = "Long Strangle"
# Get all Puts with a strike lower than the given putStrike and delta lower than the given putDelta
puts = self.strategyBuilder.getPuts(contracts, toDelta = putDelta, toStrike = putStrike)
# Get all Calls with a strike higher than the given callStrike and delta lower than the given callDelta
calls = self.strategyBuilder.getCalls(contracts, toDelta = callDelta, fromStrike = callStrike)
# Get the two contracts
legs = []
if len(puts) > 0 and len(calls) > 0:
legs = [puts[0], calls[0]]
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell)
# Return the order
return order
def getSpreadOrder(self, contracts, type, strike = None, delta = None, wingSize = None, sell = True, fromPrice = None, toPrice = None, premiumOrder = "max"):
if sell:
# Credit Spread
sides = [-1, 1]
strategy = f"{type.title()} Credit Spread"
else:
# Debit Spread
sides = [1, -1]
strategy = f"{type.title()} Debit Spread"
# Get the legs of the spread
legs = self.strategyBuilder.getSpread(contracts, type, strike = strike, delta = delta, wingSize = wingSize, fromPrice = fromPrice, toPrice = toPrice, premiumOrder = premiumOrder)
self.logger.debug(f"getSpreadOrder -> legs: {legs}")
self.logger.debug(f"getSpreadOrder -> sides: {sides}")
self.logger.debug(f"getSpreadOrder -> strategy: {strategy}")
self.logger.debug(f"getSpreadOrder -> sell: {sell}")
# Exit if we couldn't get both legs of the spread
if len(legs) != 2:
return
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell)
# Return the order
return order
def getIronCondorOrder(self, contracts, callDelta = None, putDelta = None, callStrike = None, putStrike = None, callWingSize = None, putWingSize = None, sell = True):
if sell:
# Sell Iron Condor: [longPut, shortPut, shortCall, longCall]
sides = [1, -1, -1, 1]
strategy = "Iron Condor"
else:
# Buy Iron Condor: [shortPut, longPut, longCall, shortCall]
sides = [-1, 1, 1, -1]
strategy = "Reverse Iron Condor"
# Get the Put spread
puts = self.strategyBuilder.getSpread(contracts, "Put", strike = putStrike, delta = putDelta, wingSize = putWingSize, sortByStrike = True)
# Get the Call spread
calls = self.strategyBuilder.getSpread(contracts, "Call", strike = callStrike, delta = callDelta, wingSize = callWingSize)
# Collect all legs
legs = puts + calls
# Exit if we couldn't get all legs of the Iron Condor
if len(legs) != 4:
return
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell)
# Return the order
return order
def getIronFlyOrder(self, contracts, netDelta = None, strike = None, callWingSize = None, putWingSize = None, sell = True):
if sell:
# Sell Iron Fly: [longPut, shortPut, shortCall, longCall]
sides = [1, -1, -1, 1]
strategy = "Iron Fly"
else:
# Buy Iron Fly: [shortPut, longPut, longCall, shortCall]
sides = [-1, 1, 1, -1]
strategy = "Reverse Iron Fly"
# Delta strike selection (in case the Iron Fly is not centered on the ATM strike)
delta = None
# Make sure the netDelta is less than 50
if netDelta != None and abs(netDelta) < 50:
delta = 50 + netDelta
if strike == None and delta == None:
# Standard ATM Iron Fly
strike = self.strategyBuilder.getATMStrike(contracts)
# Get the Put spread
puts = self.strategyBuilder.getSpread(contracts, "Put", strike = strike, delta = delta, wingSize = putWingSize, sortByStrike = True)
# Get the Call spread with the same strike as the first leg of the Put spread
calls = self.strategyBuilder.getSpread(contracts, "Call", strike = puts[-1].Strike, wingSize = callWingSize)
# Collect all legs
legs = puts + calls
# Exit if we couldn't get all legs of the Iron Fly
if len(legs) != 4:
return
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell)
# Return the order
return order
def getButterflyOrder(self, contracts, type, netDelta = None, strike = None, leftWingSize = None, rightWingSize = None, sell = False):
# Make sure the wing sizes are set
leftWingSize = leftWingSize or rightWingSize or 1
rightWingSize = rightWingSize or leftWingSize or 1
if sell:
# Sell Butterfly: [short<Put|Call>, 2 long<Put|Call>, short<Put|Call>]
sides = [-1, 2, -1]
strategy = "Credit Butterfly"
else:
# Buy Butterfly: [long<Put|Call>, 2 short<Put|Call>, long<Put|Call>]
sides = [1, -2, 1]
strategy = "Debit Butterfly"
# Create a custom description for each side to uniquely identify the wings:
# Sell Butterfly: [leftShort<Put|Call>, 2 Long<Put|Call>, rightShort<Put|Call>]
# Buy Butterfly: [leftLong<Put|Call>, 2 Short<Put|Call>, rightLong<Put|Call>]
optionSides = {-1: "Short", 1: "Long"}
sidesDesc = list(map(lambda side, prefix: f"{prefix}{optionSides[np.sign(side)]}{type.title()}", sides, ["left", "", "right"]))
# Delta strike selection (in case the Butterfly is not centered on the ATM strike)
delta = None
# Make sure the netDelta is less than 50
if netDelta != None and abs(netDelta) < 50:
if type.lower() == "put":
# Use Put delta
delta = 50 + netDelta
else:
# Use Call delta
delta = 50 - netDelta
if strike == None and delta == None:
# Standard ATM Butterfly
strike = self.strategyBuilder.getATMStrike(contracts)
type = type.lower()
if type == "put":
# Get the Put spread (sorted by strike in ascending order)
putSpread = self.strategyBuilder.getSpread(contracts, "Put", strike = strike, delta = delta, wingSize = leftWingSize, sortByStrike = True)
# Exit if we couldn't get all legs of the Iron Fly
if len(putSpread) != 2:
return
# Get the middle strike (second entry in the list)
middleStrike = putSpread[1].Strike
# Find the right wing of the Butterfly (add a small offset to the fromStrike in order to avoid selecting the middle strike as a wing)
wings = self.strategyBuilder.getPuts(contracts, fromStrike = middleStrike + 0.1, toStrike = middleStrike + rightWingSize)
# Exit if we could not find the wing
if len(wings) == 0:
return
# Combine all the legs
legs = putSpread + wings[0]
elif type == "call":
# Get the Call spread (sorted by strike in ascending order)
callSpread = self.strategyBuilder.getSpread(contracts, "Call", strike = strike, delta = delta, wingSize = rightWingSize)
# Exit if we couldn't get all legs of the Iron Fly
if len(callSpread) != 2:
return
# Get the middle strike (first entry in the list)
middleStrike = callSpread[0].Strike
# Find the left wing of the Butterfly (add a small offset to the toStrike in order to avoid selecting the middle strike as a wing)
wings = self.strategyBuilder.getCalls(contracts, fromStrike = middleStrike - leftWingSize, toStrike = middleStrike - 0.1)
# Exit if we could not find the wing
if len(wings) == 0:
return
# Combine all the legs
legs = wings[0] + callSpread
else:
self.logger.error(f"Input parameter type = {type} is invalid. Valid values: Put|Call.")
return
# Exit if we couldn't get both legs of the spread
if len(legs) != 3:
return
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell = sell, sidesDesc = sidesDesc)
# Return the order
return order
def getCustomOrder(self, contracts, types, deltas = None, sides = None, sidesDesc = None, strategy = "Custom", sell = None):
# Make sure the Sides parameter has been specified
if not sides:
self.logger.error("Input parameter sides cannot be null. No order will be returned.")
return
# Make sure the Sides and Deltas parameters are of the same length
if not deltas or len(deltas) != len(sides):
self.logger.error(f"Input parameters deltas = {deltas} and sides = {sides} must have the same length. No order will be returned.")
return
# Convert types into a list if it is a string
if isinstance(types, str):
types = [types] * len(sides)
# Make sure the Sides and Types parameters are of the same length
if not types or len(types) != len(sides):
self.logger.error(f"Input parameters types = {types} and sides = {sides} must have the same length. No order will be returned.")
return
legs = []
midPrice = 0
for side, type, delta in zip(sides, types, deltas):
# Get all Puts with a strike lower than the given putStrike and delta lower than the given putDelta
deltaContracts = self.strategyBuilder.getContracts(contracts, type = type, toDelta = delta, reverse = type.lower() == "put")
# Exit if we could not find the contract
if not deltaContracts:
return
# Append the contract to the list of legs
legs = legs + [deltaContracts[0]]
# Update the mid-price
midPrice -= self.contractUtils.midPrice(deltaContracts[0]) * side
# Automatically determine if this is a credit or debit strategy (unless specified)
if sell is None:
sell = midPrice > 0
# Create order details
order = self.getOrderDetails(legs, sides, strategy, sell = sell, sidesDesc = sidesDesc)
# Return the order
return order
# region imports
from AlgorithmImports import *
# endregion
from Tools import Logger, ContractUtils, BSM
"""
This is an Order builder class. It will get the proper contracts i need to create the order per parameters.
"""
class OrderBuilder:
# \param[in] context is a reference to the QCAlgorithm instance. The following attributes are used from the context:
# - slippage: (Optional) controls how the mid-price of an order is adjusted to include slippage.
# - targetPremium: (Optional) used to determine how many contracts to buy/sell.
# - maxOrderQuantity: (Optional) Caps the number of contracts that are bought/sold (Default: 1).
# If targetPremium == None -> This is the number of contracts bought/sold.
# If targetPremium != None -> The order is executed only if the number of contracts required
# to reach the target credit/debit does not exceed the maxOrderQuantity
def __init__(self, context):
# Set the context (QCAlgorithm object)
self.context = context
# Initialize the BSM pricing model
self.bsm = BSM(context)
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
# Initialize the contract utils
self.contractUtils = ContractUtils(context)
# Returns True/False based on whether the option contract is of the specified type (Call/Put)
def optionTypeFilter(self, contract, type = None):
if type is None:
return True
type = type.lower()
if type == "put":
return contract.Right == OptionRight.Put
elif type == "call":
return contract.Right == OptionRight.Call
else:
return True
# Return the ATM contracts (Put/Call or both)
def getATM(self, contracts, type = None):
# Initialize result
atm_contracts = []
# Sort the contracts based on how close they are to the current price of the underlying.
# Filter them by the selected contract type (Put/Call or both)
sorted_contracts = sorted([contract
for contract in contracts
if self.optionTypeFilter(contract, type)
]
, key = lambda x: abs(x.Strike - self.contractUtils.getUnderlyingLastPrice(x))
, reverse = False
)
# Check if any contracts were returned after the filtering
if len(sorted_contracts) > 0:
if type == None or type.lower() == "both":
# Select the first two contracts (one Put and one Call)
Ncontracts = min(len(sorted_contracts), 2)
else:
# Select the first contract (either Put or Call, based on the type specified)
Ncontracts = 1
# Extract the selected contracts
atm_contracts = sorted_contracts[0:Ncontracts]
# Return result
return atm_contracts
def getATMStrike(self, contracts):
ATMStrike = None
# Get the ATM contracts
atm_contracts = self.getATM(contracts)
# Check if any contracts were found
if len(atm_contracts) > 0:
# Get the Strike of the first contract
ATMStrike = atm_contracts[0].Strike
# Return result
return ATMStrike
# Returns the Strike of the contract with the closest Delta
# Assumptions:
# - Input list contracts must be sorted by ascending strike
# - All contracts in the list must be of the same type (Call|Put)
def getDeltaContract(self, contracts, delta = None):
# Skip processing if the option type or Delta has not been specified
if delta == None or not contracts:
return
leftIdx = 0
rightIdx = len(contracts)-1
# Compute the Greeks for the contracts at the extremes
self.bsm.setGreeks([contracts[leftIdx], contracts[rightIdx]])
# #######################################################
# Check if the requested Delta is outside of the range
# #######################################################
if contracts[rightIdx].Right == OptionRight.Call:
# Check if the furthest OTM Call has a Delta higher than the requested Delta
if abs(contracts[rightIdx].BSMGreeks.Delta) > delta/100.0:
# The requested delta is outside the boundary, return the strike of the furthest OTM Call
return contracts[rightIdx]
# Check if the furthest ITM Call has a Delta lower than the requested Delta
elif abs(contracts[leftIdx].BSMGreeks.Delta) < delta/100.0:
# The requested delta is outside the boundary, return the strike of the furthest ITM Call
return contracts[leftIdx]
else:
# Check if the furthest OTM Put has a Delta higher than the requested Delta
if abs(contracts[leftIdx].BSMGreeks.Delta) > delta/100.0:
# The requested delta is outside the boundary, return the strike of the furthest OTM Put
return contracts[leftIdx]
# Check if the furthest ITM Put has a Delta lower than the requested Delta
elif abs(contracts[rightIdx].BSMGreeks.Delta) < delta/100.0:
# The requested delta is outside the boundary, return the strike of the furthest ITM Put
return contracts[rightIdx]
# The requested Delta is inside the range, use the Bisection method to find the contract with the closest Delta
while (rightIdx-leftIdx) > 1:
# Get the middle point
middleIdx = round((leftIdx + rightIdx)/2.0)
middleContract = contracts[middleIdx]
# Compute the greeks for the contract in the middle
self.bsm.setGreeks(middleContract)
contractDelta = contracts[middleIdx].BSMGreeks.Delta
# Determine which side we need to continue the search
if(abs(contractDelta) > delta/100.0):
if middleContract.Right == OptionRight.Call:
# The requested Call Delta is on the right side
leftIdx = middleIdx
else:
# The requested Put Delta is on the left side
rightIdx = middleIdx
else:
if middleContract.Right == OptionRight.Call:
# The requested Call Delta is on the left side
rightIdx = middleIdx
else:
# The requested Put Delta is on the right side
leftIdx = middleIdx
# At this point where should only be two contracts remaining: choose the contract with the closest Delta
deltaContract = sorted([contracts[leftIdx], contracts[rightIdx]]
, key = lambda x: abs(abs(x.BSMGreeks.Delta) - delta/100.0)
, reverse = False
)[0]
return deltaContract
def getDeltaStrike(self, contracts, delta = None):
deltaStrike = None
# Get the contract with the closest Delta
deltaContract = self.getDeltaContract(contracts, delta = delta)
# Check if we got any contract
if deltaContract != None:
# Get the strike
deltaStrike = deltaContract.Strike
# Return the strike
return deltaStrike
def getFromDeltaStrike(self, contracts, delta = None, default = None):
fromDeltaStrike = default
# Get the call with the closest Delta
deltaContract = self.getDeltaContract(contracts, delta = delta)
# Check if we found the contract
if deltaContract:
if abs(deltaContract.BSMGreeks.Delta) >= delta/100.0:
# The contract is in the required range. Get the Strike
fromDeltaStrike = deltaContract.Strike
else:
# Calculate the offset: +0.01 in case of Puts, -0.01 in case of Calls
offset = 0.01 * (2*int(deltaContract.Right == OptionRight.Put)-1)
# The contract is outside of the required range. Get the Strike and add (Put) or subtract (Call) a small offset so we can filter for contracts above/below this strike
fromDeltaStrike = deltaContract.Strike + offset
return fromDeltaStrike
def getToDeltaStrike(self, contracts, delta = None, default = None):
toDeltaStrike = default
# Get the put with the closest Delta
deltaContract = self.getDeltaContract(contracts, delta = delta)
# Check if we found the contract
if deltaContract:
if abs(deltaContract.BSMGreeks.Delta) <= delta/100.0:
# The contract is in the required range. Get the Strike
toDeltaStrike = deltaContract.Strike
else:
# Calculate the offset: +0.01 in case of Calls, -0.01 in case of Puts
offset = 0.01 * (2*int(deltaContract.Right == OptionRight.Call)-1)
# The contract is outside of the required range. Get the Strike and add (Call) or subtract (Put) a small offset so we can filter for contracts above/below this strike
toDeltaStrike = deltaContract.Strike + offset
return toDeltaStrike
def getPutFromDeltaStrike(self, contracts, delta = None):
return self.getFromDeltaStrike(contracts, delta = delta, default = 0.0)
def getCallFromDeltaStrike(self, contracts, delta = None):
return self.getFromDeltaStrike(contracts, delta = delta, default = float('Inf'))
def getPutToDeltaStrike(self, contracts, delta = None):
return self.getToDeltaStrike(contracts, delta = delta, default = float('Inf'))
def getCallToDeltaStrike(self, contracts, delta = None):
return self.getToDeltaStrike(contracts, delta = delta, default = 0)
def getContracts(self, contracts, type = None, fromDelta = None, toDelta = None, fromStrike = None, toStrike = None, fromPrice = None, toPrice = None, reverse = False):
# Make sure all constraints are set
fromStrike = fromStrike or 0
fromPrice = fromPrice or 0
toStrike = toStrike or float('inf')
toPrice = toPrice or float('inf')
# Get the Put contracts, sorted by ascending strike. Apply the Strike/Price constraints
puts = []
if type == None or type.lower() == "put":
puts = sorted([contract
for contract in contracts
if self.optionTypeFilter(contract, "Put")
# Strike constraint
and (fromStrike <= contract.Strike <= toStrike)
# The option contract is tradable
and self.contractUtils.getSecurity(contract).IsTradable
# Option price constraint (based on the mid-price)
and (fromPrice <= self.contractUtils.midPrice(contract) <= toPrice)
]
, key = lambda x: x.Strike
, reverse = False
)
# Get the Call contracts, sorted by ascending strike. Apply the Strike/Price constraints
calls = []
if type == None or type.lower() == "call":
calls = sorted([contract
for contract in contracts
if self.optionTypeFilter(contract, "Call")
# Strike constraint
and (fromStrike <= contract.Strike <= toStrike)
# The option contract is tradable
and self.contractUtils.getSecurity(contract).IsTradable
# Option price constraint (based on the mid-price)
and (fromPrice <= self.contractUtils.midPrice(contract) <= toPrice)
]
, key = lambda x: x.Strike
, reverse = False
)
deltaFilteredPuts = puts
deltaFilteredCalls = calls
# Check if we need to filter by Delta
if (fromDelta or toDelta):
# Find the strike range for the Puts based on the From/To Delta
putFromDeltaStrike = self.getPutFromDeltaStrike(puts, delta = fromDelta)
putToDeltaStrike = self.getPutToDeltaStrike(puts, delta = toDelta)
# Filter the Puts based on the delta-strike range
deltaFilteredPuts = [contract for contract in puts
if putFromDeltaStrike <= contract.Strike <= putToDeltaStrike
]
# Find the strike range for the Calls based on the From/To Delta
callFromDeltaStrike = self.getCallFromDeltaStrike(calls, delta = fromDelta)
callToDeltaStrike = self.getCallToDeltaStrike(calls, delta = toDelta)
# Filter the Puts based on the delta-strike range. For the calls, the Delta decreases with increasing strike, so the order of the filter is inverted
deltaFilteredCalls = [contract for contract in calls
if callToDeltaStrike <= contract.Strike <= callFromDeltaStrike
]
# Combine the lists and Sort the contracts by their strike in the specified order.
result = sorted(deltaFilteredPuts + deltaFilteredCalls
, key = lambda x: x.Strike
, reverse = reverse
)
# Return result
return result
def getPuts(self, contracts, fromDelta = None, toDelta = None, fromStrike = None, toStrike = None, fromPrice = None, toPrice = None):
# Sort the Put contracts by their strike in reverse order. Filter them by the specified criteria (Delta/Strike/Price constrains)
return self.getContracts(contracts
, type = "Put"
, fromDelta = fromDelta
, toDelta = toDelta
, fromStrike = fromStrike
, toStrike = toStrike
, fromPrice = fromPrice
, toPrice = toPrice
, reverse = True
)
def getCalls(self, contracts, fromDelta = None, toDelta = None, fromStrike = None, toStrike = None, fromPrice = None, toPrice = None):
# Sort the Call contracts by their strike in ascending order. Filter them by the specified criteria (Delta/Strike/Price constrains)
return self.getContracts(contracts
, type = "Call"
, fromDelta = fromDelta
, toDelta = toDelta
, fromStrike = fromStrike
, toStrike = toStrike
, fromPrice = fromPrice
, toPrice = toPrice
, reverse = False
)
# Get the wing contract at the requested distance
# Assumptions:
# - The input contracts are sorted by increasing distance from the ATM (ascending order for Calls, descending order for Puts)
# - The first contract in the list is assumed to be one of the legs of the spread, and it is used to determine the distance for the wing
def getWing(self, contracts, wingSize = None):
# Make sure the wingSize is specified
wingSize = wingSize or 0
# Initialize output
wingContract = None
if len(contracts) > 1 and wingSize > 0:
# Get the short strike
firstLegStrike = contracts[0].Strike
# keep track of the wing size based on the long contract being selected
currentWings = 0
# Loop through all contracts
for contract in contracts[1:]:
# Select the long contract as long as it is within the specified wing size
if abs(contract.Strike - firstLegStrike) <= wingSize:
currentWings = abs(contract.Strike - firstLegStrike)
wingContract = contract
else:
# We have exceeded the wing size, check if the distance to the requested wing size is closer than the contract previously selected
if (abs(contract.Strike - firstLegStrike) - wingSize < wingSize - currentWings):
wingContract = contract
break
### Loop through all contracts
### if wingSize > 0
return wingContract
# Get Spread contracts (Put or Call)
def getSpread(self, contracts, type, strike = None, delta = None, wingSize = None, sortByStrike = False, fromPrice = None, toPrice = None, premiumOrder = 'max'):
# Type is a required parameter
if type == None:
self.logger.error(f"Input parameter type = {type} is invalid. Valid values: 'Put'|'Call'")
return
type = type.lower()
if type == "put":
# Get all Puts with a strike lower than the given strike and delta lower than the given delta
sorted_contracts = self.getPuts(contracts, toDelta = delta, toStrike = strike)
elif type == "call":
# Get all Calls with a strike higher than the given strike and delta lower than the given delta
sorted_contracts = self.getCalls(contracts, toDelta = delta, fromStrike = strike)
else:
self.logger.error(f"Input parameter type = {type} is invalid. Valid values: 'Put'|'Call'")
return
# Initialize the result and the best premium
best_spread = []
best_premium = -float('inf') if premiumOrder == 'max' else float('inf')
self.logger.debug(f"wingSize: {wingSize}, premiumOrder: {premiumOrder}, fromPrice: {fromPrice}, toPrice: {toPrice}, sortByStrike: {sortByStrike}, strike: {strike}")
if strike is not None:
wing = self.getWing(sorted_contracts, wingSize = wingSize)
self.logger.debug(f"STRIKE: wing: {wing}")
# Check if we have any contracts
if(len(sorted_contracts) > 0):
# Add the first leg
best_spread.append(sorted_contracts[0])
if wing != None:
# Add the wing
best_spread.append(wing)
else:
# Iterate over sorted contracts
for i in range(len(sorted_contracts) - 1):
# Get the wing
wing = self.getWing(sorted_contracts[i:], wingSize = wingSize)
self.logger.debug(f"NO STRIKE: wing: {wing}")
if wing is not None:
# Calculate the net premium
net_premium = abs(self.contractUtils.midPrice(sorted_contracts[i]) - self.contractUtils.midPrice(wing))
self.logger.debug(f"fromPrice: {fromPrice} <= net_premium: {net_premium} <= toPrice: {toPrice}")
# Check if the net premium is within the specified price range
if fromPrice <= net_premium <= toPrice:
# Check if this spread has a better premium
if (premiumOrder == 'max' and net_premium > best_premium) or (premiumOrder == 'min' and net_premium < best_premium):
best_spread = [sorted_contracts[i], wing]
best_premium = net_premium
# By default, the legs of a spread are sorted based on their distance from the ATM strike.
# - For Call spreads, they are already sorted by increasing strike
# - For Put spreads, they are sorted by decreasing strike
# In some cases it might be more convenient to return the legs ordered by their strike (i.e. in case of Iron Condors/Flys)
if sortByStrike:
best_spread = sorted(best_spread, key = lambda x: x.Strike, reverse = False)
return best_spread
# Get Put Spread contracts
def getPutSpread(self, contracts, strike = None, delta = None, wingSize = None, sortByStrike = False):
return self.getSpread(contracts, "Put", strike = strike, delta = delta, wingSize = wingSize, sortByStrike = sortByStrike)
# Get Put Spread contracts
def getCallSpread(self, contracts, strike = None, delta = None, wingSize = None, sortByStrike = True):
return self.getSpread(contracts, "Call", strike = strike, delta = delta, wingSize = wingSize, sortByStrike = sortByStrike)
#region imports
from AlgorithmImports import *
#endregion
from Tools import BSM, Logger
class Scanner:
def __init__(self, context, base):
self.context = context
self.base = base
# Initialize the BSM pricing model
self.bsm = BSM(context)
# Dictionary to keep track of all the available expiration dates at any given date
self.expiryList = {}
# Set the logger
self.logger = Logger(context, className = type(self).__name__, logLevel = context.logLevel)
def Call(self, data) -> [Dict, str]:
# Start the timer
self.context.executionTimer.start('Alpha.Utils.Scanner -> Call')
self.logger.trace(f'{self.base.name} -> Call -> start')
if self.isMarketClosed():
self.logger.trace(" -> Market is closed.")
return None, None
self.logger.debug(f'Market not closed')
if not self.isWithinScheduledTimeWindow():
self.logger.trace(" -> Not within scheduled time window.")
return None, None
self.logger.debug(f'Within scheduled time window')
if self.hasReachedMaxActivePositions():
self.logger.trace(" -> Already reached max active positions.")
return None, None
self.logger.trace(f'Not max active positions')
# Get the option chain
chain = self.base.dataHandler.getOptionContracts(data)
self.logger.trace(f'Number of contracts in chain: {len(chain) if chain else 0}')
# Exit if we got no chains
if chain is None:
self.logger.debug(" -> No chains inside currentSlice!")
return None, None
self.logger.trace('We have chains inside currentSlice')
self.syncExpiryList(chain)
self.logger.debug(f'Expiry List: {self.expiryList}')
# Exit if we haven't found any Expiration cycles to process
if not self.expiryList:
self.logger.trace(" -> No expirylist.")
return None, None
self.logger.debug(f'We have expirylist {self.expiryList}')
# Run the strategy
filteredChain, lastClosedOrderTag = self.Filter(chain)
self.logger.trace(f'Filtered Chain Count: {len(filteredChain) if filteredChain else 0}')
self.logger.debug(f'Last Closed Order Tag: {lastClosedOrderTag}')
# Stop the timer
self.context.executionTimer.stop('Alpha.Utils.Scanner -> Call')
return filteredChain, lastClosedOrderTag
# Filter the contracts to buy and sell based on the defined AlphaModel/Strategy
def Filter(self, chain):
# Start the timer
self.context.executionTimer.start("Alpha.Utils.Scanner -> Filter")
# Get the context
context = self.context
self.logger.debug(f'Context: {context}')
# DTE range
dte = self.base.dte
dteWindow = self.base.dteWindow
# Controls whether to select the furthest or the earliest expiry date
useFurthestExpiry = self.base.useFurthestExpiry
# Controls whether to enable dynamic selection of the expiry date
dynamicDTESelection = self.base.dynamicDTESelection
# Controls whether to allow multiple entries for the same expiry date
allowMultipleEntriesPerExpiry = self.base.allowMultipleEntriesPerExpiry
self.logger.debug(f'Allow Multiple Entries Per Expiry: {allowMultipleEntriesPerExpiry}')
# Set the DTE range (make sure values are not negative)
minDte = max(0, dte - dteWindow)
maxDte = max(0, dte)
self.logger.debug(f'Min DTE: {minDte}')
self.logger.debug(f'Max DTE: {maxDte}')
# Get the minimum time distance between consecutive trades
minimumTradeScheduleDistance = self.base.parameter("minimumTradeScheduleDistance", timedelta(hours=0))
# Make sure the minimum required amount of time has passed since the last trade was opened
if (self.context.lastOpenedDttm is not None and context.Time < (self.context.lastOpenedDttm + minimumTradeScheduleDistance)):
return None, None
self.logger.debug(f'Min Trade Schedule Distance: {minimumTradeScheduleDistance}')
# Check if the expiryList was specified as an input
if self.expiryList is None:
# List of expiry dates, sorted in reverse order
self.expiryList = sorted(set([
contract.Expiry for contract in chain
if minDte <= (contract.Expiry.date() - context.Time.date()).days <= maxDte
]), reverse=True)
self.logger.debug(f'Expiry List: {self.expiryList}')
# Log the list of expiration dates found in the chain
self.logger.debug(f"Expiration dates in the chain: {len(self.expiryList)}")
for expiry in self.expiryList:
self.logger.debug(f" -> {expiry}")
self.logger.debug(f'Expiry List: {self.expiryList}')
# Exit if we haven't found any Expiration cycles to process
if not self.expiryList:
# Stop the timer
self.context.executionTimer.stop()
return None, None
self.logger.debug('No expirylist')
# Get the DTE of the last closed position
lastClosedDte = None
lastClosedOrderTag = None
if self.context.recentlyClosedDTE:
while (self.context.recentlyClosedDTE):
# Pop the oldest entry in the list (FIFO)
lastClosedTradeInfo = self.context.recentlyClosedDTE.pop(0)
if lastClosedTradeInfo["closeDte"] >= minDte:
lastClosedDte = lastClosedTradeInfo["closeDte"]
lastClosedOrderTag = lastClosedTradeInfo["orderTag"]
# We got a good entry, get out of the loop
break
self.logger.debug(f'Last Closed DTE: {lastClosedDte}')
self.logger.debug(f'Last Closed Order Tag: {lastClosedOrderTag}')
# Check if we need to do dynamic DTE selection
if dynamicDTESelection and lastClosedDte is not None:
# Get the expiration with the nearest DTE as that of the last closed position
expiry = sorted(self.expiryList,
key=lambda expiry: abs((expiry.date(
) - context.Time.date()).days - lastClosedDte),
reverse=False)[0]
else:
# Determine the index used to select the expiry date:
# useFurthestExpiry = True -> expiryListIndex = 0 (takes the first entry -> furthest expiry date since the expiry list is sorted in reverse order)
# useFurthestExpiry = False -> expiryListIndex = -1 (takes the last entry -> earliest expiry date since the expiry list is sorted in reverse order)
expiryListIndex = int(useFurthestExpiry) - 1
# Get the expiry date
expiry = list(self.expiryList.get(self.context.Time.date()))[expiryListIndex]
# expiry = list(self.expiryList.keys())[expiryListIndex]
self.logger.debug(f'Expiry: {expiry}')
# Convert the date to a string
expiryStr = expiry.strftime("%Y-%m-%d")
filteredChain = None
openPositionsExpiries = [self.context.allPositions[orderId].expiryStr for orderId in self.context.openPositions.values()]
# Proceed if we have not already opened a position on the given expiration (unless we are allowed to open multiple positions on the same expiry date)
if (allowMultipleEntriesPerExpiry or expiryStr not in openPositionsExpiries):
# Filter the contracts in the chain, keep only the ones expiring on the given date
filteredChain = self.filterByExpiry(chain, expiry=expiry)
self.logger.debug(f'Number of items in Filtered Chain: {len(filteredChain) if filteredChain else 0}')
# Stop the timer
self.context.executionTimer.stop("Alpha.Utils.Scanner -> Filter")
return filteredChain, lastClosedOrderTag
def isMarketClosed(self) -> bool:
# Exit if the algorithm is warming up or the market is closed
return self.context.IsWarmingUp or not self.context.IsMarketOpen(self.base.underlyingSymbol)
def isWithinScheduledTimeWindow(self) -> bool:
# Compute the schedule start datetime
scheduleStartDttm = datetime.combine(self.context.Time.date(), self.base.scheduleStartTime)
self.logger.debug(f'Schedule Start Datetime: {scheduleStartDttm}')
# Exit if we have not reached the schedule start datetime
if self.context.Time < scheduleStartDttm:
self.logger.debug('Current time is before the schedule start datetime')
return False
# Check if we have a schedule stop datetime
if self.base.scheduleStopTime is not None:
# Compute the schedule stop datetime
scheduleStopDttm = datetime.combine(self.context.Time.date(), self.base.scheduleStopTime)
self.logger.debug(f'Schedule Stop Datetime: {scheduleStopDttm}')
# Exit if we have exceeded the stop datetime
if self.context.Time > scheduleStopDttm:
self.logger.debug('Current time is after the schedule stop datetime')
return False
minutesSinceScheduleStart = round((self.context.Time - scheduleStartDttm).seconds / 60)
self.logger.debug(f'Minutes Since Schedule Start: {minutesSinceScheduleStart}')
scheduleFrequencyMinutes = round(self.base.scheduleFrequency.seconds / 60)
self.logger.debug(f'Schedule Frequency Minutes: {scheduleFrequencyMinutes}')
# Exit if we are not at the right scheduled interval
isWithinWindow = minutesSinceScheduleStart % scheduleFrequencyMinutes == 0
self.logger.debug(f'Is Within Scheduled Time Window: {isWithinWindow}')
return isWithinWindow
def hasReachedMaxActivePositions(self) -> bool:
# Filter openPositions and workingOrders by strategyTag
openPositionsByStrategy = {tag: pos for tag, pos in self.context.openPositions.items() if self.context.allPositions[pos].strategyTag == self.base.nameTag}
workingOrdersByStrategy = {tag: order for tag, order in self.context.workingOrders.items() if order.strategyTag == self.base.nameTag}
# Do not open any new positions if we have reached the maximum for this strategy
return (len(openPositionsByStrategy) + len(workingOrdersByStrategy)) >= self.base.maxActivePositions
def syncExpiryList(self, chain):
# The list of expiry dates will change once a day (at most). See if we have already processed this list for the current date
if self.context.Time.date() in self.expiryList:
# Get the expiryList from the dictionary
expiry = self.expiryList.get(self.context.Time.date())
else:
# Start the timer
self.context.executionTimer.start("Alpha.Utils.Scanner -> syncExpiryList")
# Set the DTE range (make sure values are not negative)
minDte = max(0, self.base.dte - self.base.dteWindow)
maxDte = max(0, self.base.dte)
# Get the list of expiry dates, sorted in reverse order
expiry = sorted(
set(
[contract.Expiry for contract in chain if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte]
),
reverse=True
)
# Only add the list to the dictionary if we found at least one expiry date
if expiry:
# Add the list to the dictionary
self.expiryList[self.context.Time.date()] = expiry
else:
self.logger.debug(f"No expiry dates found in the chain! {self.context.Time.strftime('%Y-%m-%d %H:%M')}')}}")
# Stop the timer
self.context.executionTimer.stop("Alpha.Utils.Scanner -> syncExpiryList")
def filterByExpiry(self, chain, expiry=None, computeGreeks=False):
# Start the timer
self.context.executionTimer.start("Alpha.Utils.Scanner -> filterByExpiry")
# Check if the expiry date has been specified
if expiry is not None:
# Filter contracts based on the requested expiry date
filteredChain = [
contract for contract in chain if contract.Expiry.date() == expiry.date()
]
else:
# No filtering
filteredChain = chain
# Check if we need to compute the Greeks for every single contract (this is expensive!)
# By default, the Greeks are only calculated while searching for the strike with the
# requested delta, so there should be no need to set computeGreeks = True
if computeGreeks:
self.bsm.setGreeks(filteredChain)
# Stop the timer
self.context.executionTimer.stop("Alpha.Utils.Scanner -> filterByExpiry")
# Return the filtered contracts
return filteredChain
#region imports
from AlgorithmImports import *
#endregion
class Stats:
def __init__(self):
self._stats = {}
def __setattr__(self, key, value):
if key == '_stats':
super().__setattr__(key, value)
else:
self._stats[key] = value
def __getattr__(self, key):
return self._stats.get(key, None)
def __delattr__(self, key):
if key in self._stats:
del self._stats[key]
else:
raise AttributeError(f"No such attribute: {key}")
#region imports from AlgorithmImports import * #endregion # Your New Python File from .Scanner import Scanner from .Order import Order from .OrderBuilder import OrderBuilder from .Stats import Stats
#region imports from AlgorithmImports import * #endregion # Your New Python File from .FPLModel import FPLModel from .SPXic import SPXic from .CCModel import CCModel from .SPXButterfly import SPXButterfly from .SPXCondor import SPXCondor from .IBS import IBS
#region imports
from AlgorithmImports import *
from collections import deque
from scipy import stats
from numpy import mean, array
#endregion
# Indicator from https://www.satyland.com/atrlevels by Saty
# Use like this:
#
# self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
# self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# algorithm.RegisterIndicator(self.ticker, self.ATRLevels, Resolution.Daily)
# self.algorithm.WarmUpIndicator(self.ticker, self.ATRLevels, Resolution.Daily)
# // Set the appropriate timeframe based on trading mode
# timeframe_func() =>
# timeframe = "D"
# if trading_type == day_trading
# timeframe := "D"
# else if trading_type == multiday_trading
# timeframe := "W"
# else if trading_type == swing_trading
# timeframe := "M"
# else if trading_type == position_trading
# timeframe := "3M"
# else
# timeframe := "D"
class ATRLevels(PythonIndicator):
TriggerPercentage = 0.236
MiddlePercentage = 0.618
def __init__(self, name, length = 14):
# default indicator definition
super().__init__()
self.Name = name
self.Value = 0
self.Time = datetime.min
# set automatic warmup period + 1 day
self.WarmUpPeriod = length + 1
self.length = length
self.ATR = AverageTrueRange(self.length)
# Holds 2 values the current close and the previous day/period close.
self.PreviousCloseQueue = deque(maxlen=2)
# Indicator to hold the period close, high, low, open
self.PeriodHigh = Identity('PeriodHigh')
self.PeriodLow = Identity('PeriodLow')
self.PeriodOpen = Identity('PeriodOpen')
@property
def IsReady(self) -> bool:
return self.ATR.IsReady
def Update(self, input) -> bool:
# update all the indicators with the new data
dataPoint = IndicatorDataPoint(input.Symbol, input.EndTime, input.Close)
bar = TradeBar(input.Time, input.Symbol, input.Open, input.High, input.Low, input.Close, input.Volume)
## Update SMA with data time and volume
# symbolSMAv.Update(tuple.Index, tuple.volume)
# symbolRSI.Update(tuple.Index, tuple.close)
# symbolADX.Update(bar)
# symbolATR.Update(bar)
# symbolSMA.Update(tuple.Index, tuple.close)
self.ATR.Update(bar)
self.PreviousCloseQueue.appendleft(dataPoint)
self.PeriodHigh.Update(input.Time, input.High)
self.PeriodLow.Update(input.Time, input.Low)
self.PeriodOpen.Update(input.Time, input.Open)
if self.ATR.IsReady and len(self.PreviousCloseQueue) == 2:
self.Time = input.Time
self.Value = self.PreviousClose().Value
return self.IsReady
# Returns the previous close value of the period.
# @return [Float]
def PreviousClose(self):
if len(self.PreviousCloseQueue) == 1: return None
return self.PreviousCloseQueue[0]
# Bear level method. This is represented usually as a yellow line right under the close line.
# @return [Float]
def LowerTrigger(self):
return self.PreviousClose().Value - (self.TriggerPercentage * self.ATR.Current.Value) # biggest value 1ATR
# Lower Midrange level. This is under the lowerTrigger (yellow line) and above the -1ATR line(lowerATR)
# @return [Float]
def LowerMiddle(self):
return self.PreviousClose().Value - (self.MiddlePercentage * self.ATR.Current.Value)
# Lower -1ATR level.
# @return [Float]
def LowerATR(self):
return self.PreviousClose().Value - self.ATR.Current.Value
# Lower Extension level.
# @return [Float]
def LowerExtension(self):
return self.LowerATR() - (self.TriggerPercentage * self.ATR.Current.Value)
# Lower Midrange Extension level.
# @return [Float]
def LowerMiddleExtension(self):
return self.LowerATR() - (self.MiddlePercentage * self.ATR.Current.Value)
# Lower -2ATR level.
# @return [Float]
def Lower2ATR(self):
return self.LowerATR() - self.ATR.Current.Value
# Lower -2ATR Extension level.
# @return [Float]
def Lower2ATRExtension(self):
return self.Lower2ATR() - (self.TriggerPercentage * self.ATR.Current.Value)
# Lower -2ATR Midrange Extension level.
# @return [Float]
def Lower2ATRMiddleExtension(self):
return self.Lower2ATR() - (self.MiddlePercentage * self.ATR.Current.Value)
# Lower -3ATR level.
# @return [Float]
def Lower3ATR(self):
return self.Lower2ATR() - self.ATR.Current.Value
def BearLevels(self):
return [
self.LowerTrigger(),
self.LowerMiddle(),
self.LowerATR(),
self.LowerExtension(),
self.LowerMiddleExtension(),
self.Lower2ATR(),
self.Lower2ATRExtension(),
self.Lower2ATRMiddleExtension(),
self.Lower3ATR()
]
# Bull level method. This is represented usually as a blue line right over the close line.
# @return [Float]
def UpperTrigger(self):
return self.PreviousClose().Value + (self.TriggerPercentage * self.ATR.Current.Value) # biggest value 1ATR
# Upper Midrange level.
# @return [Float]
def UpperMiddle(self):
return self.PreviousClose().Value + (self.MiddlePercentage * self.ATR.Current.Value)
# Upper 1ATR level.
# @return [Float]
def UpperATR(self):
return self.PreviousClose().Value + self.ATR.Current.Value
# Upper Extension level.
# @return [Float]
def UpperExtension(self):
return self.UpperATR() + (self.TriggerPercentage * self.ATR.Current.Value)
# Upper Midrange Extension level.
# @return [Float]
def UpperMiddleExtension(self):
return self.UpperATR() + (self.MiddlePercentage * self.ATR.Current.Value)
# Upper 2ATR level.
def Upper2ATR(self):
return self.UpperATR() + self.ATR.Current.Value
# Upper 2ATR Extension level.
# @return [Float]
def Upper2ATRExtension(self):
return self.Upper2ATR() + (self.TriggerPercentage * self.ATR.Current.Value)
# Upper 2ATR Midrange Extension level.
# @return [Float]
def Upper2ATRMiddleExtension(self):
return self.Upper2ATR() + (self.MiddlePercentage * self.ATR.Current.Value)
# Upper 3ATR level.
# @return [Float]
def Upper3ATR(self):
return self.Upper2ATR() + self.ATR.Current.Value
def BullLevels(self):
return [
self.UpperTrigger(),
self.UpperMiddle(),
self.UpperATR(),
self.UpperExtension(),
self.UpperMiddleExtension(),
self.Upper2ATR(),
self.Upper2ATRExtension(),
self.Upper2ATRMiddleExtension(),
self.Upper3ATR()
]
def NextLevel(self, LevelNumber, bull = False, bear = False):
dayOpen = self.PreviousClose().Value
allLevels = [dayOpen] + self.BearLevels() + self.BullLevels()
allLevels = sorted(allLevels, key = lambda x: x, reverse = False)
bearLs = sorted(filter(lambda x: x <= dayOpen, allLevels), reverse = True)
bullLs = list(filter(lambda x: x >= dayOpen, allLevels))
if bull:
return bullLs[LevelNumber]
if bear:
return bearLs[LevelNumber]
return None
def Range(self):
return self.PeriodHigh.Current.Value - self.PeriodLow.Current.Value
def PercentOfAtr(self):
return (self.Range() / self.ATR.Current.Value) * 100
def Warmup(self, history):
for index, row in history.iterrows():
self.Update(row)
# Method to return a string with the bull and bear levels.
# @return [String]
def ToString(self):
return "Bull Levels: [{}]; Bear Levels: [{}]".format(self.BullLevels(), self.BearLevels())
#region imports from AlgorithmImports import * from .ATRLevels import ATRLevels #endregion # Your New Python File
#region imports
from AlgorithmImports import *
#endregion
import math
from datetime import datetime, timedelta
"""
The GoogleSheetsData class reads data from the Google Sheets CSV link directly during live mode. In backtesting mode, you can use a
static CSV file saved in the local directory with the same format as the Google Sheets file.
The format should be like this:
datetime,type,put_strike,call_strike,minimum_premium
2023-12-23 14:00:00,Iron Condor,300,350,0.50
2023-12-24 14:00:00,Bear Call Spread,0,360,0.60
2023-12-25 14:00:00,Bull Put Spread,310,0,0.70
Replace the google_sheet_csv_link variable in the GetSource method with your actual Google Sheets CSV link.
Example for alpha model:
class MyAlphaModel(AlphaModel):
def Update(self, algorithm, data):
if not data.ContainsKey('SPY_TradeInstructions'):
return []
trade_instructions = data['SPY_TradeInstructions']
if trade_instructions is None:
return []
# Check if the current time is past the instructed time
if algorithm.Time < trade_instructions.Time:
return []
# Use the trade_instructions data to generate insights
type = trade_instructions.Type
call_strike = trade_instructions.CallStrike
put_strike = trade_instructions.PutStrike
minimum_premium = trade_instructions.MinimumPremium
insights = []
if type == "Iron Condor":
insights.extend(self.GenerateIronCondorInsights(algorithm, call_strike, put_strike, minimum_premium))
elif type == "Bear Call Spread":
insights.extend(self.GenerateBearCallSpreadInsights(algorithm, call_strike, minimum_premium))
elif type == "Bull Put Spread":
insights.extend(self.GenerateBullPutSpreadInsights(algorithm, put_strike, minimum_premium))
return insights
"""
class GoogleSheetsData(PythonData):
def GetSource(self, config, date, isLiveMode):
google_sheet_csv_link = 'https://docs.google.com/spreadsheets/d/e/2PACX-1vS9oNUoYqY-u0WnLuJRCb8pSuQKcLStK8RaTfs5Cm9j6iiYNpx82iJuAc3D32zytXA4EiosfxjWKyJp/pub?gid=509927026&single=true&output=csv'
if isLiveMode:
return SubscriptionDataSource(google_sheet_csv_link, SubscriptionTransportMedium.Streaming)
else:
return SubscriptionDataSource(google_sheet_csv_link, SubscriptionTransportMedium.RemoteFile)
# if isLiveMode:
# # Replace the link below with your Google Sheets CSV link
# google_sheet_csv_link = 'https://docs.google.com/spreadsheets/d/e/2PACX-1vS9oNUoYqY-u0WnLuJRCb8pSuQKcLStK8RaTfs5Cm9j6iiYNpx82iJuAc3D32zytXA4EiosfxjWKyJp/pub?gid=509927026&single=true&output=csv'
# return SubscriptionDataSource(google_sheet_csv_link, SubscriptionTransportMedium.RemoteFile)
# # In backtesting, you can use a static CSV file saved in the local directory
# return SubscriptionDataSource("trade_instructions.csv", SubscriptionTransportMedium.LocalFile)
def Reader(self, config, line, date, isLiveMode):
if not line.strip():
return None
columns = line.split(',')
if columns[0] == 'datetime':
return None
trade = GoogleSheetsData()
trade.Symbol = config.Symbol
trade.Value = float(columns[2]) or float(columns[3])
# Parse the datetime and adjust the timezone
trade_time = datetime.strptime(columns[0], "%Y-%m-%d %H:%M:%S") - timedelta(hours=7)
# Round up the minute to the nearest 5 minutes
minute = 5 * math.ceil(trade_time.minute / 5)
# If the minute is 60, set it to 0 and add 1 hour
if minute == 60:
trade_time = trade_time.replace(minute=0, hour=trade_time.hour+1)
else:
trade_time = trade_time.replace(minute=minute)
trade.Time = trade_time
# trade.EndTime = trade.Time + timedelta(hours=4)
trade["Type"] = columns[1]
trade["PutStrike"] = float(columns[2])
trade["CallStrike"] = float(columns[3])
trade["MinimumPremium"] = float(columns[4])
return trade
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class AutoExecutionModel(Base):
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)from AlgorithmImports import *
from Tools import ContractUtils, Logger
from Execution.Utils import MarketOrderHandler, LimitOrderHandler, LimitOrderHandlerWithCombo
"""
"""
class Base(ExecutionModel):
DEFAULT_PARAMETERS = {
# Retry decrease/increase percentage. Each time we try and get a fill we are going to decrease the limit price
# by this percentage.
"retryChangePct": 1.0,
# Minimum price percentage accepted as limit price. If the limit price set is 0.5 and this value is 0.8 then
# the minimum price accepted will be 0.4
"minPricePct": 0.7,
# The limit order price initial adjustmnet. This will add some leeway to the limit order price so we can try and get
# some more favorable price for the user than the algo set price. So if we set this to 0.1 (10%) and our limit price
# is 0.5 then we will try and fill the order at 0.55 first.
"orderAdjustmentPct": -0.2,
# The increment we are going to use to adjust the limit price. This is used to
# properly adjust the price for SPX options. If the limit price is 0.5 and this
# value is 0.01 then we are going to try and fill the order at 0.51, 0.52, 0.53, etc.
"adjustmentIncrement": None, # 0.01,
# Speed of fill. Option taken from https://optionalpha.com/blog/smartpricing-released.
# Can be: "Normal", "Fast", "Patient"
# "Normal" will retry every 3 minutes, "Fast" every 1 minute, "Patient" every 5 minutes.
"speedOfFill": "Fast",
# maxRetries is the maximum number of retries we are going to do to try
# and get a fill. This is calculated based on the speedOfFill and this
# value is just for reference.
"maxRetries": 10,
}
def __init__(self, context):
self.context = context
# Calculate maxRetries based on speedOfFill
speedOfFill = self.parameter("speedOfFill")
if speedOfFill == "Patient":
self.maxRetries = 7
elif speedOfFill == "Normal":
self.maxRetries = 5
elif speedOfFill == "Fast":
self.maxRetries = 3
else:
raise ValueError("Invalid speedOfFill value")
self.targetsCollection = PortfolioTargetCollection()
self.contractUtils = ContractUtils(context)
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
self.marketOrderHandler = MarketOrderHandler(context, self)
self.limitOrderHandler = LimitOrderHandler(context, self)
# self.limitOrderHandler = LimitOrderHandlerWithCombo(context, self)
self.logger.debug(f"{self.__class__.__name__} -> __init__")
# Gets or sets the maximum spread compare to current price in percentage.
# self.acceptingSpreadPercent = Math.Abs(acceptingSpreadPercent)
# self.executionTimeThreshold = timedelta(minutes=10)
# self.openExecutedOrders = {}
self.context.structure.AddConfiguration(parent=self, **self.getMergedParameters())
@classmethod
def getMergedParameters(cls):
# Merge the DEFAULT_PARAMETERS from both classes
return {**cls.DEFAULT_PARAMETERS, **getattr(cls, "PARAMETERS", {})}
@classmethod
def parameter(cls, key, default=None):
return cls.getMergedParameters().get(key, default)
def Execute(self, algorithm, targets):
self.context.executionTimer.start('Execution.Base -> Execute')
# Use this section to check if a target is in the workingOrder dict
self.targetsCollection.AddRange(targets)
self.logger.debug(f"{self.__class__.__name__} -> Execute -> targets: {targets}")
self.logger.debug(f"{self.__class__.__name__} -> Execute -> targets count: {len(targets)}")
self.logger.debug(f"{self.__class__.__name__} -> Execute -> workingOrders: {self.context.workingOrders}")
self.logger.debug(f"{self.__class__.__name__} -> Execute -> allPositions: {self.context.allPositions}")
# Check if the workingOrders are still OK to execute
self.context.structure.checkOpenPositions()
self.logger.debug(f"{self.__class__.__name__} -> Execute -> checkOpenPositions")
for order in list(self.context.workingOrders.values()):
position = self.context.allPositions[order.orderId]
useLimitOrders = order.useLimitOrder
useMarketOrders = not useLimitOrders
self.logger.debug(f"Processing order: {order.orderId}")
self.logger.debug(f"Order details: {order}")
self.logger.debug(f"Position details: {position}")
self.logger.debug(f"Use Limit Orders: {useLimitOrders}")
self.logger.debug(f"Use Market Orders: {useMarketOrders}")
if useMarketOrders:
self.marketOrderHandler.call(position, order)
elif useLimitOrders:
self.limitOrderHandler.call(position, order)
# if not self.targetsCollection.IsEmpty:
# for target in targets:
# order = Helper().findIn(
# self.context.workingOrders.values(),
# lambda v: any(t == target for t in v.targets)
# )
# orders[order.orderId] = order
# for order in orders.values():
# position = self.context.allPositions[order.orderId]
# useLimitOrders = order.useLimitOrder
# useMarketOrders = not useLimitOrders
# if useMarketOrders:
# self.executeMarketOrder(position, order)
# elif useLimitOrders:
# self.executeLimitOrder(position, order)
self.targetsCollection.ClearFulfilled(algorithm)
# Update the charts after execution
self.context.charting.updateCharts()
self.context.executionTimer.stop('Execution.Base -> Execute')
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class SPXExecutionModel(Base):
PARAMETERS = {
# Retry decrease/increase percentage. Each time we try and get a fill we are going to decrease the limit price
# by this percentage.
"retryChangePct": -0.05,
# Minimum price percentage accepted as limit price. If the limit price set is 0.5 and this value is 0.8 then
# the minimum price accepted will be 0.4
"minPricePct": 0.5,
# The limit order price initial adjustmnet. This will add some leeway to the limit order price so we can try and get
# some more favorable price for the user than the algo set price. So if we set this to 0.1 (10%) and our limit price
# is 0.5 then we will try and fill the order at 0.55 first.
"orderAdjustmentPct": -0.5,
# The increment we are going to use to adjust the limit price. This is used to
# properly adjust the price for SPX options. If the limit price is 0.5 and this
# value is 0.01 then we are going to try and fill the order at 0.51, 0.52, 0.53, etc.
"adjustmentIncrement": 0.05,
# Speed of fill. Option taken from https://optionalpha.com/blog/smartpricing-released.
# Can be: "Normal", "Fast", "Patient"
# "Normal" will retry every 3 minutes, "Fast" every 1 minute, "Patient" every 5 minutes.
"speedOfFill": "Fast",
# maxRetries is the maximum number of retries we are going to do to try
# and get a fill. This is calculated based on the speedOfFill and this
# value is just for reference.
"maxRetries": 10,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)from AlgorithmImports import *
"""
Started discussion on this here: https://chat.openai.com/chat/b5be32bf-850a-44ba-80fc-44f79a7df763
Use like this in main.py file:
percent_of_spread = 0.5
timeout = timedelta(minutes=2)
self.SetExecution(SmartPricingExecutionModel(percent_of_spread, timeout))
"""
class SmartPricingExecutionModel(ExecutionModel):
def __init__(self, percent_of_spread, timeout):
self.percent_of_spread = percent_of_spread
self.timeout = timeout
self.order_tickets = dict()
def Execute(self, algorithm, targets):
for target in targets:
symbol = target.Symbol
quantity = target.Quantity
# If an order already exists for the symbol, skip
if symbol in self.order_tickets:
continue
# Get the bid-ask spread and apply the user-defined percentage
security = algorithm.Securities[symbol]
if security.BidPrice != 0 and security.AskPrice != 0:
spread = security.AskPrice - security.BidPrice
adjusted_spread = spread * self.percent_of_spread
if quantity > 0:
limit_price = security.BidPrice + adjusted_spread
else:
limit_price = security.AskPrice - adjusted_spread
# Submit the limit order with the calculated price
ticket = algorithm.LimitOrder(symbol, quantity, limit_price)
self.order_tickets[symbol] = ticket
# Set the order expiration
expiration = algorithm.UtcTime + self.timeout
# ticket.Update(new UpdateOrderFields { TimeInForce = TimeInForce.GoodTilDate(expiration) })
def OnOrderEvent(self, algorithm, order_event):
if order_event.Status.IsClosed():
order = algorithm.Transactions.GetOrderById(order_event.OrderId)
symbol = order.Symbol
if symbol in self.order_tickets:
del self.order_tickets[symbol]
#region imports
from AlgorithmImports import *
#endregion
from Tools import ContractUtils, Logger, Underlying
# Your New Python File
class LimitOrderHandler:
def __init__(self, context, base):
self.context = context
self.contractUtils = ContractUtils(context)
self.base = base
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
def call(self, position, order):
# Start the timer
self.context.executionTimer.start()
# Get the context
context = self.context
# Get the Limit order details
# Get the order type: open|close
orderType = order.orderType
# This updates prices and stats for the order
position.updateOrderStats(context, orderType)
# This updates the stats for the position
position.updateStats(context, orderType)
execOrder = position[f"{orderType}Order"]
ticket = None
orderTransactionIds = execOrder.transactionIds
self.logger.debug(f"orderTransactionIds: {orderTransactionIds}")
self.logger.debug(f"order.lastRetry: {order.lastRetry}")
self.logger.debug(f"self.sinceLastRetry(context, order, timedelta(minutes = 1)): {self.sinceLastRetry(context, order, timedelta(minutes = 1))}")
# Exit if we are not at the right scheduled interval
if orderTransactionIds and (order.lastRetry is None or self.sinceLastRetry(context, order, timedelta(minutes = 1))):
"""
IMPORTANT!!:
Why do we cancel?
If we update the ticket with the new price then we risk execution while updating the price of the rest of the combo order causing discrepancies.
"""
for id in orderTransactionIds:
ticket = context.Transactions.GetOrderTicket(id)
if ticket:
ticket.Cancel('Cancelled trade and trying with new prices')
# store when we last canceled/retried and check with current time if like 2-3 minutes passed before we retry again.
self.makeLimitOrder(position, order, retry = True)
# NOTE: If combo limit orders will execute limit orders instead of market orders then let's use this method.
# self.updateComboLimitOrder(position, orderTransactionIds)
elif not orderTransactionIds:
self.makeLimitOrder(position, order)
# Stop the timer
self.context.executionTimer.stop()
def makeLimitOrder(self, position, order, retry = False):
context = self.context
# Get the Limit order details
# Get the order type: open|close
orderType = order.orderType
limitOrderPrice = self.limitOrderPrice(order)
execOrder = position[f"{orderType}Order"]
# Keep track of the midPrices of this order for faster debugging
execOrder.priceProgressList.append(round(execOrder.midPrice, 2))
orderTag = position.orderTag
# Get the contracts
contracts = [v.contract for v in position.legs]
# Get the order quantity
orderQuantity = position.orderQuantity
# Sign of the order: open -> 1 (use orderSide as is), close -> -1 (reverse the orderSide)
orderSign = 2 * int(orderType == "open") - 1
# Get the order sides
orderSides = np.array([c.contractSide for c in position.legs])
# Define the legs of the combo order
legs = []
isComboOrder = len(contracts) > 1
# Log the parameters used to validate the order
self.logger.debug(f"Executing Limit Order to {orderType} the position:")
self.logger.debug(f" - orderType: {orderType}")
self.logger.debug(f" - orderTag: {orderTag}")
self.logger.debug(f" - underlyingPrice: {Underlying(context, position.underlyingSymbol()).Price()}")
self.logger.debug(f" - strikes: {[c.Strike for c in contracts]}")
self.logger.debug(f" - orderQuantity: {orderQuantity}")
self.logger.debug(f" - midPrice: {execOrder.midPrice} (limitOrderPrice: {limitOrderPrice})")
self.logger.debug(f" - bidAskSpread: {execOrder.bidAskSpread}")
# Calculate the adjustment value based on the difference between the limit price and the total midPrice
# TODO: this might have to be changed if we start buying options instead of selling for premium.
if orderType == "close":
adjustmentValue = self.calculateAdjustmentValueBought(
execOrder=execOrder,
limitOrderPrice=limitOrderPrice,
retries=order.fillRetries,
nrContracts=len(contracts)
)
else:
adjustmentValue = self.calculateAdjustmentValueSold(
execOrder=execOrder,
limitOrderPrice=limitOrderPrice,
retries=order.fillRetries,
nrContracts=len(contracts)
)
# IMPORTANT!! Because ComboLimitOrder right now still executes market orders we should not use it. We need to use ComboLegLimitOrder and that will work.
for n, contract in enumerate(contracts):
# Set the order side: -1 -> Sell, +1 -> Buy
orderSide = orderSign * orderSides[n]
if orderSide != 0:
newLimitPrice = self.contractUtils.midPrice(contract) + adjustmentValue if orderSide == -1 else self.contractUtils.midPrice(contract) - adjustmentValue
# round the price or we get an error like:
# Adjust the limit price to meet brokerage precision requirements
increment = self.base.adjustmentIncrement if self.base.adjustmentIncrement is not None else 0.05
newLimitPrice = round(newLimitPrice / increment) * increment
newLimitPrice = round(newLimitPrice, 1) # Ensure the price is rounded to two decimal places
newLimitPrice = max(newLimitPrice, increment) # make sure the price is never 0. At least the increment.
self.logger.info(f"{orderType.upper()} {orderQuantity} {orderTag}, {contract.Symbol}, newLimitPrice: {newLimitPrice}")
if isComboOrder:
legs.append(Leg.Create(contract.Symbol, orderSide, newLimitPrice))
else:
newTicket = context.LimitOrder(contract.Symbol, orderQuantity, newLimitPrice, tag=orderTag)
execOrder.transactionIds = [newTicket.OrderId]
log_message = f"{orderType.upper()} {orderQuantity} {orderTag}, "
log_message += f"{[c.Strike for c in contracts]} @ Mid: {round(execOrder.midPrice, 2)}, "
log_message += f"NewLimit: {round(sum([l.OrderPrice * l.Quantity for l in legs]), 2)}, "
log_message += f"Limit: {round(limitOrderPrice, 2)}, "
log_message += f"DTTM: {execOrder.limitOrderExpiryDttm}, "
log_message += f"Spread: ${round(execOrder.bidAskSpread, 2)}, "
log_message += f"Bid & Ask: {[(round(self.contractUtils.bidPrice(c), 2), round(self.contractUtils.askPrice(c),2)) for c in contracts]}, "
log_message += f"Volume: {[self.contractUtils.volume(c) for c in contracts]}, "
log_message += f"OpenInterest: {[self.contractUtils.openInterest(c) for c in contracts]}"
if orderType.lower() == 'close':
log_message += f", Reason: {position.closeReason}"
# To limit logs just log every 25 minutes
self.logger.info(log_message)
### for contract in contracts
if isComboOrder:
# Execute by using a multi leg order if we have multiple sides.
newTicket = context.ComboLegLimitOrder(legs, orderQuantity, tag=orderTag)
execOrder.transactionIds = [t.OrderId for t in newTicket]
# Store the last retry on this order. This is not ideal but the only way to handle combo limit orders on QC as the comboLimitOrder and all the others
# as soon as you update one leg it will execute and mess it up.
if retry:
order.lastRetry = context.Time
order.fillRetries += 1 # increment the number of fill tries
def limitOrderPrice(self, order):
orderType = order.orderType
limitOrderPrice = order.limitOrderPrice
# Just use a default limit price that is supposed to be the smallest prossible.
# The limit order price of 0 can happen if the trade is worthless.
if limitOrderPrice == 0 and orderType == 'close':
limitOrderPrice = 0.05
return limitOrderPrice
def sinceLastRetry(self, context, order, frequency = timedelta(minutes = 3)):
if order.lastRetry is None: return True
timeSinceLastRetry = context.Time - order.lastRetry
minutesSinceLastRetry = timedelta(minutes = round(timeSinceLastRetry.seconds / 60))
return minutesSinceLastRetry % frequency == timedelta(minutes=0)
def calculateAdjustmentValueSold(self, execOrder, limitOrderPrice, retries=0, nrContracts=1):
if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None:
raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters")
# Adjust the limitOrderPrice
limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct
min_price = self.base.minPricePct * limitOrderPrice # Minimum allowed price is % of limitOrderPrice
# Calculate the range and step
if self.base.adjustmentIncrement is None:
# Calculate the step based on the bidAskSpread and the number of retries
step = execOrder.bidAskSpread / self.base.maxRetries
else:
step = self.base.adjustmentIncrement
# Start with the preferred price
target_price = execOrder.midPrice + step
# If we have retries, adjust the target price accordingly
if retries > 0:
target_price -= retries * step
# Ensure the target price does not fall below the minimum limit
if target_price < min_price:
target_price = min_price
# Round the target price to the nearest multiple of adjustmentIncrement
target_price = round(target_price / step) * step
# Calculate the adjustment value
adjustment_value = (target_price - execOrder.midPrice) / nrContracts
return adjustment_value
def calculateAdjustmentValueBought(self, execOrder, limitOrderPrice, retries=0, nrContracts=1):
if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None:
raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters")
# Adjust the limitOrderPrice
limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct
increment = self.base.retryChangePct * limitOrderPrice # Increment value for each retry
max_price = self.base.minPricePct * limitOrderPrice # Maximum allowed price is % of limitOrderPrice
# Start with the preferred price
target_price = max_price
# If we have retries, increment the target price accordingly
if retries > 0:
target_price += retries * increment
# Ensure the target price does not exceed the maximum limit
if target_price > limitOrderPrice:
target_price = limitOrderPrice
# Calculate the range and step
if self.base.adjustmentIncrement is None:
# Calculate the step based on the bidAskSpread and the number of retries
step = execOrder.bidAskSpread / self.base.maxRetries
else:
step = self.base.adjustmentIncrement
# Round the target price to the nearest multiple of adjustmentIncrement
target_price = round(target_price / step) * step
# Calculate the adjustment value
adjustment_value = (target_price - execOrder.midPrice) / nrContracts
return adjustment_value
"""
def updateComboLimitOrder(self, position, orderTransactionIds):
context = self.context
for id in orderTransactionIds:
ticket = context.Transactions.GetOrderTicket(id)
# store when we last canceled/retried and check with current time if like 2-3 minutes passed before we retry again.
leg = next((leg for leg in position.legs if ticket.Symbol == leg.symbol), None) contract = leg.contract
# To update the limit price of the combo order, you only need to update the limit price of one of the leg orders.
# The Update method returns an OrderResponse to signal the success or failure of the update request.
if ticket and ticket.Status is not OrderStatus.Filled:
newLimitPrice = self.contractUtils.midPrice(contract) + 0.1 if leg.isSold else self.contractUtils.midPrice(contract) - 0.1
update_settings = UpdateOrderFields()
update_settings.LimitPrice = newLimitPrice
response = ticket.Update(update_settings)
# Check if the update was successful
if response.IsSuccess:
self.logger.debug(f"Order updated successfully for {ticket.Symbol}")
"""
#region imports
from AlgorithmImports import *
#endregion
from Tools import ContractUtils, Logger, Underlying
# Your New Python File
class LimitOrderHandlerWithCombo:
def __init__(self, context, base):
self.context = context
self.contractUtils = ContractUtils(context)
self.base = base
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
def call(self, position, order):
# Start the timer
self.context.executionTimer.start()
# Get the context
context = self.context
# Get the Limit order details
# Get the order type: open|close
orderType = order.orderType
# This updates prices and stats for the order
position.updateOrderStats(context, orderType)
# This updates the stats for the position
position.updateStats(context, orderType)
execOrder = position[f"{orderType}Order"]
ticket = None
orderTransactionIds = execOrder.transactionIds
self.logger.debug(f"orderTransactionIds: {orderTransactionIds}")
self.logger.debug(f"order.lastRetry: {order.lastRetry}")
self.logger.debug(f"self.sinceLastRetry(context, order, timedelta(minutes = 1)): {self.sinceLastRetry(context, order, timedelta(minutes = 1))}")
# Exit if we are not at the right scheduled interval
if orderTransactionIds and (order.lastRetry is None or self.sinceLastRetry(context, order, timedelta(minutes = 1))):
self.updateComboLimitOrder(position, order, orderTransactionIds)
elif not orderTransactionIds:
self.makeLimitOrder(position, order)
# Stop the timer
self.context.executionTimer.stop()
def makeLimitOrder(self, position, order, retry = False):
context = self.context
orderType = order.orderType
limitOrderPrice = self.limitOrderPrice(order)
execOrder = position[f"{orderType}Order"]
# Keep track of the midPrices of this order for faster debugging
execOrder.priceProgressList.append(round(execOrder.midPrice, 2))
orderTag = position.orderTag
# Get the contracts
contracts = [v.contract for v in position.legs]
# Get the order quantity
orderQuantity = position.orderQuantity
# Sign of the order: open -> 1 (use orderSide as is), close -> -1 (reverse the orderSide)
orderSign = 2 * int(orderType == "open") - 1
# Get the order sides
orderSides = np.array([c.contractSide for c in position.legs])
# Define the legs of the combo order
legs = []
for n, contract in enumerate(contracts):
# Set the order side: -1 -> Sell, +1 -> Buy
orderSide = orderSign * orderSides[n]
if orderSide != 0:
legs.append(Leg.Create(contract.Symbol, orderSide))
# Calculate the new limit price
newLimitPrice = self.calculateNewLimitPrice(position, execOrder, limitOrderPrice, order.fillRetries, len(contracts), orderType)
# Log the parameters used to validate the order
self.logOrderDetails(position, order)
# Execute the combo limit order
newTicket = context.ComboLimitOrder(legs, orderQuantity, newLimitPrice, tag=orderTag)
execOrder.transactionIds = [t.OrderId for t in newTicket]
# Log the order execution
self.logOrderExecution(position, order, newLimitPrice)
# Update order information if it's a retry
if retry:
order.lastRetry = context.Time
order.fillRetries += 1
def updateComboLimitOrder(self, position, order, orderTransactionIds):
context = self.context
orderType = order.orderType
execOrder = position[f"{orderType}Order"]
# Calculate the new limit price
limitOrderPrice = self.limitOrderPrice(order)
newLimitPrice = self.calculateNewLimitPrice(position, execOrder, limitOrderPrice, order.fillRetries, len(position.legs), orderType)
# Get the first order ticket (we only need to update one for the combo order)
ticket = context.Transactions.GetOrderTicket(orderTransactionIds[0])
if ticket and ticket.Status != OrderStatus.Filled:
update_settings = UpdateOrderFields()
update_settings.LimitPrice = newLimitPrice
response = ticket.Update(update_settings)
if response.IsSuccess:
self.logger.debug(f"Combo order updated successfully. New limit price: {newLimitPrice}")
else:
self.logger.warning(f"Failed to update combo order: {response.ErrorCode}")
# Log the update
self.logOrderExecution(position, order, newLimitPrice, action="UPDATED")
# Update order information
order.lastRetry = context.Time
order.fillRetries += 1 # increment the number of fill tries
def calculateNewLimitPrice(self, position, execOrder, limitOrderPrice, retries, nrContracts, orderType):
if orderType == "close":
adjustmentValue = self.calculateAdjustmentValueBought(
execOrder=execOrder,
limitOrderPrice=limitOrderPrice,
retries=retries,
nrContracts=nrContracts
)
else:
adjustmentValue = self.calculateAdjustmentValueSold(
execOrder=execOrder,
limitOrderPrice=limitOrderPrice,
retries=retries,
nrContracts=nrContracts
)
# Determine if it's a credit or debit strategy
isCredit = position.isCreditStrategy
if isCredit:
# For credit strategies, we want to receive at least this much (negative value)
newLimitPrice = -(abs(execOrder.midPrice) - adjustmentValue) if orderType == "open" else -(abs(execOrder.midPrice) + adjustmentValue)
else:
# For debit strategies, we're willing to pay up to this much (positive value)
newLimitPrice = execOrder.midPrice + adjustmentValue if orderType == "open" else execOrder.midPrice - adjustmentValue
# Adjust the limit price to meet brokerage precision requirements
increment = self.base.adjustmentIncrement if self.base.adjustmentIncrement is not None else 0.05
newLimitPrice = round(newLimitPrice / increment) * increment
newLimitPrice = round(newLimitPrice, 2) # Ensure the price is rounded to two decimal places
# Ensure the price is never 0 and maintains the correct sign
if isCredit:
newLimitPrice = min(newLimitPrice, -increment)
else:
newLimitPrice = max(newLimitPrice, increment)
return newLimitPrice
def logOrderDetails(self, position, order):
orderType = order.orderType
execOrder = position[f"{orderType}Order"]
contracts = [v.contract for v in position.legs]
self.logger.debug(f"Executing Limit Order to {orderType} the position:")
self.logger.debug(f" - orderType: {orderType}")
self.logger.debug(f" - orderTag: {position.orderTag}")
self.logger.debug(f" - underlyingPrice: {Underlying(self.context, position.underlyingSymbol()).Price()}")
self.logger.debug(f" - strikes: {[c.Strike for c in contracts]}")
self.logger.debug(f" - orderQuantity: {position.orderQuantity}")
self.logger.debug(f" - midPrice: {execOrder.midPrice} (limitOrderPrice: {self.limitOrderPrice(order)})")
self.logger.debug(f" - bidAskSpread: {execOrder.bidAskSpread}")
def logOrderExecution(self, position, order, newLimitPrice, action=None):
orderType = order.orderType
execOrder = position[f"{orderType}Order"]
contracts = [v.contract for v in position.legs]
action = action or orderType.upper()
log_message = f"{action} {position.orderQuantity} {position.orderTag}, "
log_message += f"{[c.Strike for c in contracts]} @ Mid: {round(execOrder.midPrice, 2)}, "
log_message += f"NewLimit: {round(newLimitPrice, 2)}, "
log_message += f"Limit: {round(self.limitOrderPrice(order), 2)}, "
log_message += f"DTTM: {execOrder.limitOrderExpiryDttm}, "
log_message += f"Spread: ${round(execOrder.bidAskSpread, 2)}, "
log_message += f"Bid & Ask: {[(round(self.contractUtils.bidPrice(c), 2), round(self.contractUtils.askPrice(c),2)) for c in contracts]}, "
log_message += f"Volume: {[self.contractUtils.volume(c) for c in contracts]}, "
log_message += f"OpenInterest: {[self.contractUtils.openInterest(c) for c in contracts]}"
if orderType.lower() == 'close':
log_message += f", Reason: {position.closeReason}"
# To limit logs just log every 25 minutes
self.logger.info(log_message)
def limitOrderPrice(self, order):
orderType = order.orderType
limitOrderPrice = order.limitOrderPrice
# Just use a default limit price that is supposed to be the smallest prossible.
# The limit order price of 0 can happen if the trade is worthless.
if limitOrderPrice == 0 and orderType == 'close':
limitOrderPrice = 0.05
return limitOrderPrice
def sinceLastRetry(self, context, order, frequency = timedelta(minutes = 3)):
if order.lastRetry is None: return True
timeSinceLastRetry = context.Time - order.lastRetry
minutesSinceLastRetry = timedelta(minutes = round(timeSinceLastRetry.seconds / 60))
return minutesSinceLastRetry % frequency == timedelta(minutes=0)
def calculateAdjustmentValueSold(self, execOrder, limitOrderPrice, retries=0, nrContracts=1):
if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None:
raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters")
# Adjust the limitOrderPrice
limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct
min_price = self.base.minPricePct * limitOrderPrice # Minimum allowed price is % of limitOrderPrice
# Calculate the range and step
if self.base.adjustmentIncrement is None:
# Calculate the step based on the bidAskSpread and the number of retries
step = execOrder.bidAskSpread / self.base.maxRetries
else:
step = self.base.adjustmentIncrement
step = max(step, 0.01) # Ensure the step is at least 0.01
# Start with the preferred price
target_price = execOrder.midPrice + step
# If we have retries, adjust the target price accordingly
if retries > 0:
target_price -= retries * step
# Ensure the target price does not fall below the minimum limit
if target_price < min_price:
target_price = min_price
# Round the target price to the nearest multiple of adjustmentIncrement
target_price = round(target_price / step) * step
# Calculate the adjustment value
adjustment_value = (target_price - execOrder.midPrice) / nrContracts
return adjustment_value
def calculateAdjustmentValueBought(self, execOrder, limitOrderPrice, retries=0, nrContracts=1):
if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None:
raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters")
# Adjust the limitOrderPrice
limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct
increment = self.base.retryChangePct * limitOrderPrice # Increment value for each retry
max_price = self.base.minPricePct * limitOrderPrice # Maximum allowed price is % of limitOrderPrice
# Start with the preferred price
target_price = max_price
# If we have retries, increment the target price accordingly
if retries > 0:
target_price += retries * increment
# Ensure the target price does not exceed the maximum limit
if target_price > limitOrderPrice:
target_price = limitOrderPrice
# Calculate the range and step
if self.base.adjustmentIncrement is None:
# Calculate the step based on the bidAskSpread and the number of retries
step = execOrder.bidAskSpread / self.base.maxRetries
else:
step = self.base.adjustmentIncrement
# Round the target price to the nearest multiple of adjustmentIncrement
target_price = round(target_price / step) * step
# Calculate the adjustment value
adjustment_value = (target_price - execOrder.midPrice) / nrContracts
return adjustment_value
#region imports
from AlgorithmImports import *
#endregion
from Tools import ContractUtils, Logger, Underlying
# Your New Python File
class MarketOrderHandler:
def __init__(self, context, base):
self.context = context
self.base = base
self.contractUtils = ContractUtils(context)
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
def call(self, position, order):
# Start the timer
self.context.executionTimer.start()
# Get the context
context = self.context
orderTag = position.orderTag
orderQuantity = position.orderQuantity
orderType = order.orderType
contracts = [v.contract for v in position.legs]
orderSides = [v.contractSide for v in position.legs]
bidAskSpread = sum(list(map(self.contractUtils.bidAskSpread, contracts)))
midPrice = sum(side * self.contractUtils.midPrice(contract) for side, contract in zip(orderSides, contracts))
underlying = Underlying(context, position.underlyingSymbol())
orderSign = 2 * int(orderType == "open") - 1
execOrder = position[f"{orderType}Order"]
execOrder.midPrice = midPrice
# Check if the order already has transaction IDs
orderTransactionIds = execOrder.transactionIds
if orderTransactionIds:
self.logger.debug(f"Market order already placed. Waiting for execution. Transaction IDs: {orderTransactionIds}")
return
# This updates prices and stats for the order
position.updateOrderStats(context, orderType)
# This updates the stats for the position
position.updateStats(context, orderType)
# Keep track of the midPrices of this order for faster debugging
execOrder.priceProgressList.append(round(midPrice, 2))
isComboOrder = len(position.legs) > 1
legs = []
# Loop through all contracts
for contract in position.legs:
# Get the order side
orderSide = contract.contractSide * orderSign
# Get the order quantity
quantity = contract.quantity
# Get the contract symbol
symbol = contract.symbol
# Get the contract object
security = context.Securities[symbol]
# get the target
target = next(t for t in order.targets if t.Symbol == symbol)
# calculate remaining quantity to be ordered
# quantity = OrderSizing.GetUnorderedQuantity(context, target, security)
self.logger.debug(f"{orderType} contract {symbol}:")
self.logger.debug(f" - orderSide: {orderSide}")
self.logger.debug(f" - quantity: {quantity}")
self.logger.debug(f" - orderTag: {orderTag}")
if orderSide != 0:
if isComboOrder:
# If we are doing market orders, we need to create the legs of the combo order
legs.append(Leg.Create(symbol, orderSide))
else:
# Send the Market order (asynchronous = True -> does not block the execution in case of partial fills)
context.MarketOrder(
symbol,
orderSide * quantity,
asynchronous=True,
tag=orderTag
)
### Loop through all contracts
# Log the parameters used to validate the order
log_message = f"{orderType.upper()} {orderQuantity} {orderTag}, "
log_message += f"{[c.Strike for c in contracts]} @ Mid: {round(midPrice, 2)}"
if orderType.lower() == 'close':
log_message += f", Reason: {position.closeReason}"
self.logger.info(log_message)
self.logger.debug(f"Executing Market Order to {orderType} the position:")
self.logger.debug(f" - orderType: {orderType}")
self.logger.debug(f" - orderTag: {orderTag}")
self.logger.debug(f" - underlyingPrice: {underlying.Price()}")
self.logger.debug(f" - strikes: {[c.Strike for c in contracts]}")
self.logger.debug(f" - orderQuantity: {orderQuantity}")
self.logger.debug(f" - midPrice: {midPrice}")
self.logger.debug(f" - bidAskSpread: {bidAskSpread}")
# Execute only if we have multiple legs (sides) per order and no existing transaction IDs
if (
len(legs) > 0
and not orderTransactionIds
# Validate the bid-ask spread to make sure it's not too wide
and not (position.strategyParam("validateBidAskSpread") and abs(bidAskSpread) > position.strategyParam("bidAskSpreadRatio")*abs(midPrice))
):
order_result = context.ComboMarketOrder(
legs,
orderQuantity,
asynchronous=True,
tag=orderTag
)
execOrder.transactionIds = [t.OrderId for t in order_result]
# Stop the timer
self.context.executionTimer.stop()#region imports from AlgorithmImports import * #endregion from .LimitOrderHandler import LimitOrderHandler from .LimitOrderHandlerWithCombo import LimitOrderHandlerWithCombo from .MarketOrderHandler import MarketOrderHandler
#region imports from AlgorithmImports import * #endregion # Your New Python File from .AutoExecutionModel import AutoExecutionModel from .SmartPricingExecutionModel import SmartPricingExecutionModel from .SPXExecutionModel import SPXExecutionModel
#region imports
from AlgorithmImports import *
#endregion
class AlwaysBuyingPowerModel(BuyingPowerModel):
def __init__(self, context):
super().__init__()
self.context = context
def HasSufficientBuyingPowerForOrder(self, parameters):
# custom behavior: this model will assume that there is always enough buying power
hasSufficientBuyingPowerForOrderResult = HasSufficientBuyingPowerForOrderResult(True)
self.context.logger.debug(f"CustomBuyingPowerModel: {hasSufficientBuyingPowerForOrderResult.IsSufficient}")
return hasSufficientBuyingPowerForOrderResult
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
# Custom Fill model based on Beta distribution:
# - Orders are filled based on a Beta distribution skewed towards the mid-price with Sigma = bidAskSpread/6 (-> 99% fills within the bid-ask spread)
class BetaFillModel(ImmediateFillModel):
# Initialize Random Number generator with a fixed seed (for replicability)
random = np.random.RandomState(1234)
def __init__(self, context):
self.context = context
def MarketFill(self, asset, order):
# Start the timer
self.context.executionTimer.start()
# Get the random number generator
random = BetaFillModel.random
# Compute the Bid-Ask spread
bidAskSpread = abs(asset.AskPrice - asset.BidPrice)
# Compute the Mid-Price
midPrice = 0.5 * (asset.AskPrice + asset.BidPrice)
# Call the parent method
fill = super().MarketFill(asset, order)
# Setting the parameters of the Beta distribution:
# - The shape parameters (alpha and beta) are chosen such that the fill is "reasonably close" to the mid-price about 96% of the times
# - How close -> The fill price is within 15% of half the bid-Ask spread
if order.Direction == OrderDirection.Sell:
# Beta distribution in the range [Bid-Price, Mid-Price], skewed towards the Mid-Price
# - Fill price is within the range [Mid-Price - 0.15*bidAskSpread/2, Mid-Price] with about 96% probability
offset = asset.BidPrice
alpha = 20
beta = 1
else:
# Beta distribution in the range [Mid-Price, Ask-Price], skewed towards the Mid-Price
# - Fill price is within the range [Mid-Price, Mid-Price + 0.15*bidAskSpread/2] with about 96% probability
offset = midPrice
alpha = 1
beta = 20
# Range (width) of the Beta distribution
range = bidAskSpread / 2.0
# Compute the new fillPrice (centered around the midPrice)
fillPrice = round(offset + range * random.beta(alpha, beta), 2)
# Update the FillPrice attribute
fill.FillPrice = fillPrice
# Stop the timer
self.context.executionTimer.stop()
# Return the fill
return fill
#region imports
from AlgorithmImports import *
#endregion
import re
import numpy as np
from Tools import Logger, Helper
"""
Details about order types:
/// New order pre-submission to the order processor (0)
New = 0,
/// Order submitted to the market (1)
Submitted = 1,
/// Partially filled, In Market Order (2)
PartiallyFilled = 2,
/// Completed, Filled, In Market Order (3)
Filled = 3,
/// Order cancelled before it was filled (5)
Canceled = 5,
/// No Order State Yet (6)
None = 6,
/// Order invalidated before it hit the market (e.g. insufficient capital) (7)
Invalid = 7,
/// Order waiting for confirmation of cancellation (6)
CancelPending = 8,
/// Order update submitted to the market (9)
UpdateSubmitted = 9
"""
class HandleOrderEvents:
def __init__(self, context, orderEvent):
self.context = context
self.orderEvent = orderEvent
self.logger = Logger(self.context, className=type(self.context).__name__, logLevel=self.context.logLevel)
# section: handle order events from main.py
def Call(self):
# Get the context
context = self.context
orderEvent = self.orderEvent
# Start the timer
context.executionTimer.start()
# Process only Fill events
if not (orderEvent.Status == OrderStatus.Filled or orderEvent.Status == OrderStatus.PartiallyFilled):
return
if(orderEvent.IsAssignment):
# TODO: Liquidate the assigned position.
# Eventually figure out which open position it belongs to and close that position.
return
# Get the orderEvent id
orderEventId = orderEvent.OrderId
# Retrieve the order associated to this events
order = context.Transactions.GetOrderById(orderEventId)
# Get the order tag. Remove any warning text that might have been added in case of Fills at Stale Price
orderTag = re.sub(" - Warning.*", "", order.Tag)
# TODO: Additionally check for OTM Underlying order.Tag that would mean it expired worthless.
# if orderEvent.FillPrice == 0.0:
# position = next((position for position in context.allPositions if any(leg.symbol == orderEvent.Symbol for leg in position.legs)), None)
# context.workingOrders.pop(position.orderTag)
# Get the working order (if available)
workingOrder = context.workingOrders.get(orderTag)
# Exit if this order tag is not in the list of open orders.
if workingOrder == None:
return
# Get the position from the openPositions
openPosition = context.openPositions.get(orderTag)
if openPosition is None:
return
# Retrieved the book position (this it the full entry inside allPositions that will be converted into a CSV record)
# bookPosition = context.allPositions[orderId]
bookPosition = context.allPositions[openPosition]
contractInfo = Helper().findIn(
bookPosition.legs,
lambda c: c.symbol == orderEvent.Symbol
)
# Exit if we couldn't find the contract info.
if contractInfo == None:
return
# Get the order id and expiryStr value for the contract
orderId = bookPosition.orderId # contractInfo["orderId"]
positionKey = bookPosition.orderTag # contractInfo["positionKey"]
expiryStr = contractInfo.expiry # contractInfo["expiryStr"]
orderType = workingOrder.orderType # contractInfo["orderType"]
# Log the order event
self.logger.debug(f" -> Processing order id {orderId} (orderTag: {orderTag} - orderType: {orderType} - Expiry: {expiryStr})")
# Get the contract associated to this order event
contract = contractInfo.contract # openPosition["contractDictionary"][orderEvent.Symbol]
# Get the description associated with this contract
contractDesc = contractInfo.key # openPosition["contractSideDesc"][orderEvent.Symbol]
# Get the quantity used to open the position
positionQuantity = bookPosition.orderQuantity # openPosition["orderQuantity"]
# Get the side of each leg (-n -> Short, +n -> Long)
contractSides = np.array([c.contractSide for c in bookPosition.legs]) # np.array(openPosition["sides"])
# Leg Quantity
legQuantity = abs(bookPosition.contractSide[orderEvent.Symbol])
# Total legs quantity in the whole position
Nlegs = sum(abs(contractSides))
# get the position order block
execOrder = bookPosition[f"{orderType}Order"]
# Check if the contract was filled at a stale price (Warnings in the orderTag)
if re.search(" - Warning.*", order.Tag):
self.logger.warning(order.Tag)
execOrder.stalePrice = True
bookPosition[f"{orderType}StalePrice"] = True
# Add the order to the list of openPositions orders (only if this is the first time the order is filled - in case of partial fills)
# if contractInfo["fills"] == 0:
# openPosition[f"{orderType}Order"]["orders"].append(order)
# Update the number of filled contracts associated with this order
workingOrder.fills += abs(orderEvent.FillQuantity)
# Remove this order entry from the self.workingOrders[orderTag] dictionary if it has been fully filled
# if workingOrder.fills == legQuantity * positionQuantity:
# removedOrder = context.workingOrders.pop(orderTag)
# # Update the stats of the given contract inside the bookPosition (reverse the sign of the FillQuantity: Sell -> credit, Buy -> debit)
# bookPosition.updateContractStats(openPosition, contract, orderType = orderType, fillPrice = - np.sign(orderEvent.FillQuantity) * orderEvent.FillPrice)
# Update the counter of positions that have been filled
execOrder.fills += abs(orderEvent.FillQuantity)
execOrder.fillPrice -= np.sign(orderEvent.FillQuantity) * orderEvent.FillPrice
# Get the total amount of the transaction
transactionAmt = orderEvent.FillQuantity * orderEvent.FillPrice * 100
# Check if this is a fill order for an entry position
if orderType == "open":
# Update the openPremium field to include the current transaction (use "-=" to reverse the side of the transaction: Short -> credit, Long -> debit)
bookPosition.openPremium -= transactionAmt
else: # This is an order for the exit position
# Update the closePremium field to include the current transaction (use "-=" to reverse the side of the transaction: Sell -> credit, Buy -> debit)
bookPosition.closePremium -= transactionAmt
# Check if all legs have been filled
if execOrder.fills == Nlegs*positionQuantity:
execOrder.filled = True
bookPosition.updateOrderStats(context, orderType)
# Remove the working order now that it has been filled
context.workingOrders.pop(orderTag)
# Set the time when the full order was filled
bookPosition[orderType + "FilledDttm"] = context.Time
# Record the order mid price
bookPosition[orderType + "OrderMidPrice"] = execOrder.midPrice
# All of this for the logger.info
orderTypeUpper = orderType.upper()
premium = round(bookPosition[f'{orderType}Premium'], 2)
fillPrice = round(execOrder.fillPrice, 2)
message = f" >>> {orderTypeUpper}: {orderTag}, Premium: ${premium} @ ${fillPrice}"
if orderTypeUpper == "CLOSE":
PnL = round(bookPosition.PnL, 2)
percentage = round(bookPosition.PnL / bookPosition.openPremium * 100, 2)
message += f"; P&L: ${PnL} ({percentage}%)"
self.logger.info(message)
self.logger.info(f"Working order progress of prices: {execOrder.priceProgressList}")
self.logger.info(f"Position progress of prices: {bookPosition.priceProgressList}")
self.logger.debug(f"The {orderType} event happened:")
self.logger.debug(f" - orderType: {orderType}")
self.logger.debug(f" - orderTag: {orderTag}")
self.logger.debug(f" - premium: ${bookPosition[f'{orderType}Premium']}")
self.logger.debug(f" - {orderType} price: ${round(execOrder.fillPrice, 2)}")
context.charting.plotTrade(bookPosition, orderType)
if orderType == "open":
# Trigger an update of the charts
context.statsUpdated = True
# Marks the date/time of the most recenlty opened position
context.lastOpenedDttm = context.Time
# Store the credit received (needed to determine the stop loss): value is per share (divided by 100)
execOrder.premium = bookPosition.openPremium / 100
# Check if the entire position has been closed
if orderType == "close" and bookPosition.openOrder.filled and bookPosition.closeOrder.filled:
# Compute P&L for the position
positionPnL = bookPosition.openPremium + bookPosition.closePremium
# Store the PnL for the position
bookPosition.PnL = positionPnL
# Now we can remove the position from the self.openPositions dictionary
context.openPositions.pop(orderTag)
# Compute the DTE at the time of closing the position
closeDte = (contract.Expiry.date() - context.Time.date()).days
# Collect closing trade info
closeTradeInfo = {"orderTag": orderTag, "closeDte": closeDte}
# Add this trade info to the FIFO list
context.recentlyClosedDTE.append(closeTradeInfo)
# ###########################
# Collect Performance metrics
# ###########################
context.charting.updateStats(bookPosition)
# Stop the timer
context.executionTimer.stop()
# ENDsection: handle order events from main.py
#region imports
from AlgorithmImports import *
#endregion
# Custom class: fills orders at the mid-price
class MidPriceFillModel(ImmediateFillModel):
def __init__(self, context):
self.context = context
def MarketFill(self, asset, order):
# Start the timer
self.context.executionTimer.start()
# Call the parent method
fill = super().MarketFill(asset, order)
# Compute the new fillPrice (at the mid-price)
fillPrice = round(0.5 * (asset.AskPrice + asset.BidPrice), 2)
# Update the FillPrice attribute
fill.FillPrice = fillPrice
# Stop the timer
self.context.executionTimer.stop()
# Return the fill
return fill
#region imports
from AlgorithmImports import *
#endregion
from Tools import Timer, Logger, DataHandler, Underlying, Charting
from Initialization import AlwaysBuyingPowerModel, BetaFillModel, TastyWorksFeeModel
"""
This class is used to setup the base structure of the algorithm in the main.py file.
It is used to setup the logger, the timer, the brokerage model, the security initializer, the
option chain filter function and the benchmark.
It is also used to schedule an event to get the underlying price at market open.
The class has chainable methods for Setup and AddUnderlying.
How to use it:
1. Import the class
2. Create an instance of the class in the Initialize method of the algorithm
3. Call the AddUnderlying method to add the underlying and the option chain to the algorithm
Example:
from Initialization import SetupBaseStructure
class Algorithm(QCAlgorithm):
def Initialize(self):
# Set the algorithm base variables and structures
self.structure = SetupBaseStructure(self)
self.structure.Setup()
# Add the alpha model and that will add the underlying and the option chain to the
# algorithm
self.SetAlpha(AlphaModel(self))
class AlphaModel:
def __init__(self, context):
# Store the context as a class variable
self.context = context
# Add the underlying and the option chain to the algorithm
self.context.structure.AddUnderlying(self, "SPX")
"""
class SetupBaseStructure:
# Default parameters
DEFAULT_PARAMETERS = {
"creditStrategy": True,
# -----------------------------
# THESE BELOW ARE GENERAL PARAMETERS
"backtestMarketCloseCutoffTime": time(15, 45, 0),
# Controls whether to include Cancelled orders (Limit orders that didn't fill) in the final output
"includeCancelledOrders": True,
# Risk Free Rate for the Black-Scholes-Merton model
"riskFreeRate": 0.001,
# Upside/Downside stress applied to the underlying to calculate the portfolio margin requirement of the position
"portfolioMarginStress": 0.12,
# Controls the memory (in minutes) of EMA process. The exponential decay
# is computed such that the contribution of each value decays by 95%
# after <emaMemory> minutes (i.e. decay^emaMemory = 0.05)
"emaMemory": 200,
}
# Initialize the algorithm
# The context is the class that contains all the variables that are shared across the different classes
def __init__(self, context):
# Store the context as a class variable
self.context = context
def Setup(self):
self.context.positions = {}
# Set the logger
self.context.logger = Logger(self.context, className=type(self.context).__name__, logLevel=self.context.logLevel)
# Set the timer to monitor the execution performance
self.context.executionTimer = Timer(self.context)
self.context.logger.debug(f'{self.__class__.__name__} -> Setup')
# Set brokerage model and margin account
self.context.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
# override security position group model
self.context.Portfolio.SetPositions(SecurityPositionGroupModel.Null)
# Set requested data resolution
self.context.universe_settings.resolution = self.context.timeResolution
# Keep track of the option contract subscriptions
self.context.optionContractsSubscriptions = []
# Set Security Initializer
self.context.SetSecurityInitializer(self.CompleteSecurityInitializer)
# Initialize the dictionary to keep track of all positions
self.context.allPositions = {}
# Dictionary to keep track of all open positions
self.context.openPositions = {}
# Create dictionary to keep track of all the working orders. It stores orderTags
self.context.workingOrders = {}
# Create FIFO list to keep track of all the recently closed positions (needed for the Dynamic DTE selection)
self.context.recentlyClosedDTE = []
# Keep track of when was the last position opened
self.context.lastOpenedDttm = None
# Keep track of all strategies instances. We mainly need this to filter through them in case
# we want to call some general method.
self.context.strategies = []
# Array to keep track of consolidators
self.context.consolidators = {}
# Dictionary to keep track of all leg details across time
self.positionTracking = {}
# Assign the DEFAULT_PARAMETERS
self.AddConfiguration(**SetupBaseStructure.DEFAULT_PARAMETERS)
self.SetBacktestCutOffTime()
# Set charting
self.context.charting = Charting(
self.context,
openPositions=False,
Stats=False,
PnL=False,
WinLossStats=False,
Performance=True,
LossDetails=False,
totalSecurities=False,
Trades=True
)
return self
# Called every time a security (Option or Equity/Index) is initialized
def CompleteSecurityInitializer(self, security: Security) -> None:
'''Initialize the security with raw prices'''
self.context.logger.debug(f"{self.__class__.__name__} -> CompleteSecurityInitializer -> Security: {security}")
# Disable buying power on the security: https://www.quantconnect.com/docs/v2/writing-algorithms/live-trading/trading-and-orders#10-Disable-Buying-Power
security.set_buying_power_model(BuyingPowerModel.NULL)
if self.context.LiveMode:
return
self.context.executionTimer.start()
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
security.SetMarketPrice(self.context.GetLastKnownPrice(security))
# security.SetBuyingPowerModel(AlwaysBuyingPowerModel(self.context))
# override margin requirements
# security.SetBuyingPowerModel(ConstantBuyingPowerModel(1))
if security.Type == SecurityType.Equity:
# This is for stocks
security.VolatilityModel = StandardDeviationOfReturnsVolatilityModel(30)
history = self.context.History(security.Symbol, 31, Resolution.Daily)
if history.empty or 'close' not in history.columns:
self.context.executionTimer.stop()
return
for time, row in history.loc[security.Symbol].iterrows():
trade_bar = TradeBar(time, security.Symbol, row.open, row.high, row.low, row.close, row.volume)
security.VolatilityModel.Update(security, trade_bar)
elif security.Type in [SecurityType.Option, SecurityType.IndexOption]:
# This is for options.
security.SetFillModel(BetaFillModel(self.context))
# security.SetFillModel(MidPriceFillModel(self))
security.SetFeeModel(TastyWorksFeeModel())
security.PriceModel = OptionPriceModels.CrankNicolsonFD()
# security.set_option_assignment_model(NullOptionAssignmentModel())
if security.Type == SecurityType.IndexOption:
# disable option assignment. This is important for SPX but we disable for all for now.
security.SetOptionAssignmentModel(NullOptionAssignmentModel())
self.context.executionTimer.stop()
def ClearSecurity(self, security: Security) -> None:
"""
Remove any additional data or settings associated with the security.
"""
# Remove the security from the optionContractsSubscriptions dictionary
if security.Symbol in self.context.optionContractsSubscriptions:
self.context.optionContractsSubscriptions.remove(security.Symbol)
# Remove the security from the algorithm
self.context.RemoveSecurity(security.Symbol)
def SetBacktestCutOffTime(self) -> None:
# Determine what is the last trading day of the backtest
self.context.endOfBacktestCutoffDttm = None
if hasattr(self.context, "EndDate") and self.context.EndDate is not None:
self.context.endOfBacktestCutoffDttm = datetime.combine(self.context.lastTradingDay(self.context.EndDate), self.context.backtestMarketCloseCutoffTime)
def AddConfiguration(self, parent=None, **kwargs) -> None:
"""
Dynamically add attributes to the self.context object.
:param parent: Parent object to which the attributes will be added.
:param kwargs: Keyword arguments containing attribute names and their values.
"""
parent = parent or self.context
for attr_name, attr_value in kwargs.items():
setattr(parent, attr_name, attr_value)
# Add the underlying and the option chain to the algorithm. We define the number of strikes left and right,
# the dte and the dte window. These parameters are used in the option chain filter function.
# @param ticker [string]
def AddUnderlying(self, strategy, ticker):
self.context.strategies.append(strategy)
# Store the algorithm base variables
strategy.ticker = ticker
self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Ticker: {ticker}")
# Add the underlying and the option chain to the algorithm
strategy.dataHandler = DataHandler(self.context, ticker, strategy)
underlying = strategy.dataHandler.AddUnderlying(self.context.timeResolution)
# Set data normalization mode to Raw
underlying.SetDataNormalizationMode(DataNormalizationMode.Raw)
self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Underlying: {underlying}")
# Keep track of the option contract subscriptions
self.context.optionContractsSubscriptions = []
# Store the symbol for the option and the underlying
strategy.underlyingSymbol = underlying.Symbol
# REGION FOR USING SLICE INSTEAD OF PROVIDER
if strategy.useSlice:
option = strategy.dataHandler.AddOptionsChain(underlying, self.context.timeResolution)
# Set the option chain filter function
option.SetFilter(strategy.dataHandler.SetOptionFilter)
self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Option: {option}")
strategy.optionSymbol = option.Symbol
else:
strategy.optionSymbol = None
# Set the benchmark.
self.context.SetBenchmark(underlying.Symbol)
self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Benchmark: {self.context.Benchmark}")
# Creating a 5-minute consolidator.
# self.AddConsolidators(strategy.underlyingSymbol, 5)
# !IMPORTANT
# ! this schedule needs to happen only once on initialization. That means the method AddUnderlying
# ! needs to be called only once either in the main.py file or in the AlphaModel class.
self.context.Schedule.On(
self.context.DateRules.EveryDay(strategy.underlyingSymbol),
self.context.TimeRules.AfterMarketOpen(strategy.underlyingSymbol, minutesAfterOpen=1),
self.MarketOpenStructure
)
return self
def AddConsolidators(self, symbol, minutes=5):
consolidator = TradeBarConsolidator(timedelta(minutes=minutes))
# Subscribe to the DataConsolidated event
consolidator.DataConsolidated += self.onDataConsolidated
self.context.SubscriptionManager.AddConsolidator(symbol, consolidator)
self.context.consolidators[symbol] = consolidator
def onDataConsolidated(self, sender, bar):
for strategy in self.context.strategies:
# We don't have the underlying added yet, so we can't get the price.
if strategy.underlyingSymbol == None:
return
strategy.dataConsolidated(sender, bar)
self.context.charting.updateUnderlying(bar)
# NOTE: this is not needed anymore as we have another method in alpha that handles it.
def MarketOpenStructure(self):
"""
The MarketOpenStructure method is part of the SetupBaseStructure class, which is used to
set up the base structure of the algorithm in the main.py file. This specific method is
designed to be called at market open every day to update the price of the underlying
security. It first checks if the underlying symbol has been added to the context, and if
not, it returns without performing any action. If the underlying symbol is available, it
creates an instance of the Underlying class using the context and the symbol. Finally,
it updates the underlying price at the market open by calling the Price() method on the
Underlying instance.
Example:
Schedule the MarketOpenStructure method to be called at market open
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.strategy.underlyingSymbol, 0), base_structure.MarketOpenStructure)
Other methods, like OnData, can now access the updated underlying price using self.context.underlyingPriceAtOpen
"""
for strategy in self.context.strategies:
# We don't have the underlying added yet, so we can't get the price.
if strategy.underlyingSymbol == None:
return
underlying = Underlying(self.context, strategy.underlyingSymbol)
strategy.underlyingPriceAtOpen = underlying.Price()
# This just clears the workingOrders that are supposed to be expired or unfilled. It can happen when an order is not filled
# for it to stay in check until next day. This will clear that out. Similar method to the monitor one.
def checkOpenPositions(self):
self.context.executionTimer.start()
# Iterate over all option contracts and remove the expired ones from the
for symbol, security in self.context.Securities.items():
# Check if the security is an option
if security.Type == SecurityType.Option and security.HasData:
# Check if the option has expired
if security.Expiry.date() < self.context.Time.date():
self.context.logger.debug(f" >>> EXPIRED SECURITY-----> Removing expired {security.Expiry.date()} option contract {security.Symbol} from the algorithm.")
# Remove the expired option contract
self.ClearSecurity(security)
# Remove the expired positions from the openPositions dictionary. These are positions that expired
# worthless or were closed before expiration.
for orderTag, orderId in list(self.context.openPositions.items()):
position = self.context.allPositions[orderId]
# Check if we need to cancel the order
if any(self.context.Time > leg.expiry for leg in position.legs):
# Remove this position from the list of open positions
self.context.charting.updateStats(position)
self.context.logger.debug(f" >>> EXPIRED POSITION-----> Removing expired position {orderTag} from the algorithm.")
self.context.openPositions.pop(orderTag)
# Remove the expired positions from the workingOrders dictionary. These are positions that expired
# without being filled completely.
for order in list(self.context.workingOrders.values()):
position = self.context.allPositions[order.orderId]
orderTag = position.orderTag
orderId = position.orderId
orderType = order.orderType
execOrder = position[f"{orderType}Order"]
# Check if we need to cancel the order
if self.context.Time > execOrder.limitOrderExpiryDttm or any(self.context.Time > leg.expiry for leg in position.legs):
self.context.logger.debug(f" >>> EXPIRED ORDER-----> Removing expired order {orderTag} from the algorithm.")
# Remove this position from the list of open positions
if orderTag in self.context.openPositions:
self.context.openPositions.pop(orderTag)
# Remove the cancelled position from the final output unless we are required to include it
if not self.context.includeCancelledOrders:
self.context.allPositions.pop(orderId)
# Remove the order from the self.context.workingOrders dictionary
if orderTag in self.context.workingOrders:
self.context.workingOrders.pop(orderTag)
# Mark the order as being cancelled
position.cancelOrder(self.context, orderType=orderType, message=f"order execution expiration or legs expired")
self.context.executionTimer.stop()
#region imports
from AlgorithmImports import *
#endregion
class TastyWorksFeeModel:
def GetOrderFee(self, parameters):
optionFee = min(10, parameters.Order.AbsoluteQuantity * 0.5)
transactionFee = parameters.Order.AbsoluteQuantity * 0.14
return OrderFee(CashAmount(optionFee + transactionFee, 'USD'))
#region imports from AlgorithmImports import * from .AlwaysBuyingPowerModel import AlwaysBuyingPowerModel from .BetaFillModel import BetaFillModel from .MidPriceFillModel import MidPriceFillModel from .TastyWorksFeeModel import TastyWorksFeeModel from .SetupBaseStructure import SetupBaseStructure from .HandleOrderEvents import HandleOrderEvents #endregion
#region imports
from AlgorithmImports import *
#endregion
from Initialization import SetupBaseStructure
from Strategy import WorkingOrder
from Tools import Underlying
class Base(RiskManagementModel):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 1,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 0.8,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 1.9,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
self.context = context
self.context.structure.AddConfiguration(parent=self, **self.getMergedParameters())
self.context.logger.debug(f"{self.__class__.__name__} -> __init__")
@classmethod
def getMergedParameters(cls):
# Merge the DEFAULT_PARAMETERS from both classes
return {**cls.DEFAULT_PARAMETERS, **getattr(cls, "PARAMETERS", {})}
@classmethod
def parameter(cls, key, default=None):
return cls.getMergedParameters().get(key, default)
# @param algorithm [QCAlgorithm] The algorithm argument that the methods receive is an instance of the base QCAlgorithm class, not your subclass of it.
# @param targets [List[PortfolioTarget]] The list of targets to be ordered
def ManageRisk(self, algorithm: QCAlgorithm, targets: List[PortfolioTarget]) -> List[PortfolioTarget]:
# Start the timer
self.context.executionTimer.start('Monitor.Base -> ManageRisk')
# We are basically ignoring the current portfolio targets to be assessed for risk
# and building our own based on the current open positions
targets = []
self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> start")
managePositionFrequency = max(self.managePositionFrequency, 1)
# Continue the processing only if we are at the specified schedule
if self.context.Time.minute % managePositionFrequency != 0:
return []
# Method to allow child classes access to the manageRisk method before any changes are made
self.preManageRisk()
self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> preManageRisk")
# Loop through all open positions
for orderTag, orderId in list(self.context.openPositions.items()):
# Skip this contract if in the meantime it has been removed by the onOrderEvent
if orderTag not in self.context.openPositions:
continue
self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions")
# Get the book position
bookPosition = self.context.allPositions[orderId]
# Get the order id
orderId = bookPosition.orderId
# Get the order tag
orderTag = bookPosition.orderTag
self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions -> orderTag: {orderTag}, orderId: {orderId}")
# Check if this is a fully filled position
if bookPosition.openOrder.filled is False:
continue
# Possible Scenarios:
# - Credit Strategy:
# -> openPremium > 0
# -> profitTarget <= 1
# -> stopLossMultiplier >= 1
# -> maxLoss = Depending on the strategy
# - Debit Strategy:
# -> openPremium < 0
# -> profitTarget >= 0
# -> stopLossMultiplier <= 1
# -> maxLoss = openPremium
# Get the current value of the position
bookPosition.getPositionValue(self.context)
# Extract the positionPnL (per share)
positionPnL = bookPosition.positionPnL
# Exit if the positionPnL is not available (bid-ask spread is too wide)
if positionPnL is None:
return []
bookPosition.updatePnLRange(self.context.Time.date(), positionPnL)
self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions -> orderTag: {orderTag}, orderId: {orderId} -> bookPosition: {bookPosition}")
# Special method to monitor the position and handle custom actions on it.
self.monitorPosition(bookPosition)
# Initialize the closeReason
closeReason = []
# Check if we've hit the stop loss threshold
stopLossFlg = self.checkStopLoss(bookPosition)
if stopLossFlg:
closeReason.append("Stop Loss trigger")
profitTargetFlg = self.checkProfitTarget(bookPosition)
if profitTargetFlg:
closeReason.append("Profit target")
# Check if we've hit the Dit threshold
hardDitStopFlg, softDitStopFlg = self.checkDitThreshold(bookPosition)
if hardDitStopFlg:
closeReason.append("Hard Dit cutoff")
elif softDitStopFlg:
closeReason.append("Soft Dit cutoff")
# Check if we've hit the Dte threshold
hardDteStopFlg, softDteStopFlg = self.checkDteThreshold(bookPosition)
if hardDteStopFlg:
closeReason.append("Hard Dte cutoff")
elif softDteStopFlg:
closeReason.append("Soft Dte cutoff")
# Check if this is the last trading day before expiration and we have reached the cutoff time
expiryCutoffFlg = self.checkMarketCloseCutoffDttm(bookPosition)
if expiryCutoffFlg:
closeReason.append("Expiration date cutoff")
# Check if this is the last trading day before expiration and we have reached the cutoff time
endOfBacktestCutoffFlg = self.checkEndOfBacktest()
if endOfBacktestCutoffFlg:
closeReason.append("End of backtest cutoff")
# Set the stopLossFlg = True to force a Market Order
stopLossFlg = True
# Check any custom condition from the strategy to determine closure.
shouldCloseFlg, customReasons = self.shouldClose(bookPosition)
if shouldCloseFlg:
closeReason.append(customReasons or "It should close from child")
# A custom method to handle
self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions -> orderTag: {orderTag}, orderId: {orderId} -> shouldCloseFlg: {shouldCloseFlg}, customReasons: {customReasons}")
# Update the stats of each contract
# TODO: add back this section
# if self.strategyParam("includeLegDetails") and self.context.Time.minute % self.strategyParam("legDatailsUpdateFrequency") == 0:
# for contract in position["contracts"]:
# self.updateContractStats(bookPosition, position, contract)
# if self.strategyParam("trackLegDetails"):
# underlyingPrice = self.context.GetLastKnownPrice(self.context.Securities[self.context.underlyingSymbol]).Price
# self.context.positionTracking[orderId][self.context.Time][f"{self.name}.underlyingPrice"] = underlyingPrice
# self.context.positionTracking[orderId][self.context.Time][f"{self.name}.PnL"] = positionPnL
# Check if we need to close the position
if (
profitTargetFlg # We hit the profit target
or stopLossFlg # We hit the stop loss (making sure we don't exceed the max loss in case of spreads)
or hardDteStopFlg # The position must be closed when reaching the DTE threshold (hard stop)
or softDteStopFlg # Soft DTE stop: close as soon as it is profitable
or hardDitStopFlg # The position must be closed when reaching the DIT threshold (hard stop)
or softDitStopFlg # Soft DIT stop: close as soon as it is profitable
or expiryCutoffFlg # This is the last trading day before expiration, we have reached the cutoff time
or endOfBacktestCutoffFlg # This is the last trading day before the end of the backtest -> Liquidate all positions
or shouldCloseFlg # This will be the flag that is defined by the child classes of monitor
):
# Close the position
targets = self.closePosition(bookPosition, closeReason, stopLossFlg=stopLossFlg)
# Stop the timer
self.context.executionTimer.stop('Monitor.Base -> ManageRisk')
return targets
"""
Method to allow child classes access to the manageRisk method before any changes are made
"""
def preManageRisk(self):
pass
"""
Special method to monitor the position and handle custom actions on it.
These actions can be:
- add a working order to open a hedge position to defend the current one
- add a working order to increase the size of the position to improve avg price
"""
def monitorPosition(self, position):
pass
"""
Another special method that should be ovewritten by child classes.
This method can look for indicators or other decisions to close the position.
"""
def shouldClose(self, position):
pass
def checkMarketCloseCutoffDttm(self, position):
if position.strategyParam('marketCloseCutoffTime') != None:
return self.context.Time >= position.expiryMarketCloseCutoffDttm(self.context)
else:
return False
def checkStopLoss(self, position):
# Get the Stop Loss multiplier
stopLossMultiplier = self.stopLossMultiplier
capStopLoss = self.capStopLoss
# Get the amount of credit received to open the position
openPremium = position.openOrder.premium
# Get the quantity used to open the position
positionQuantity = position.orderQuantity
# Maximum Loss (pre-computed at the time of creating the order)
maxLoss = position.openOrder.maxLoss * positionQuantity
if capStopLoss:
# Add the premium to compute the net loss
netMaxLoss = maxLoss + openPremium
else:
netMaxLoss = float("-Inf")
stopLoss = None
# Check if we are using a stop loss
if stopLossMultiplier is not None:
# Set the stop loss amount
stopLoss = -abs(openPremium) * stopLossMultiplier
# Extract the positionPnL (per share)
positionPnL = position.positionPnL
# Tolerance level, e.g., 0.05 for 5%
tolerance = 0.05
# Check if we've hit the stop loss threshold or are within the tolerance range
stopLossFlg = False
if stopLoss is not None and (netMaxLoss <= positionPnL <= stopLoss or netMaxLoss <= positionPnL <= stopLoss * (1 + tolerance)):
stopLossFlg = True
# Keep track of the midPrices of this order for faster debugging
position.priceProgressList.append(round(position.orderMidPrice, 2))
return stopLossFlg
def checkProfitTarget(self, position):
# Get the amount of credit received to open the position
openPremium = position.openOrder.premium
# Extract the positionPnL (per share)
positionPnL = position.positionPnL
# Get the target profit amount (if it has been set at the time of creating the order)
targetProfit = position.targetProfit
# Set the target profit amount if the above step returned no value
if targetProfit is None and self.profitTarget is not None:
targetProfit = abs(openPremium) * self.profitTarget
# Tolerance level, e.g., 0.05 for 5%
tolerance = 0.05
# Check if we hit the profit target or are within the tolerance range
profitTargetFlg = False
if targetProfit is not None and (positionPnL >= targetProfit or positionPnL >= targetProfit * (1 - tolerance)):
profitTargetFlg = True
return profitTargetFlg
def checkDitThreshold(self, position):
# Get the book position
bookPosition = self.context.allPositions[position.orderId]
# How many days has this position been in trade for
currentDit = (self.context.Time.date() - bookPosition.openFilledDttm.date()).days
hardDitStopFlg = False
softDitStopFlg = False
# Extract the positionPnL (per share)
positionPnL = position.positionPnL
# Check for DTE stop
if (
position.strategyParam("ditThreshold") is not None # The ditThreshold has been specified
and position.strategyParam("dte") > position.strategyParam("ditThreshold") # We are using the ditThreshold only if the open DTE was larger than the threshold
and currentDit >= position.strategyParam("ditThreshold") # We have reached the DTE threshold
):
# Check if this is a hard DTE cutoff
if (
position.strategyParam("forceDitThreshold") is True
or (position.strategyParam("hardDitThreshold") is not None and currentDit >= position.strategyParam("hardDitThreshold"))
):
hardDitStopFlg = True
# closeReason = closeReason or "Hard DIT cutoff"
# Check if this is a soft DTE cutoff
elif positionPnL >= 0:
softDitStopFlg = True
# closeReason = closeReason or "Soft DIT cutoff"
return hardDitStopFlg, softDitStopFlg
def checkDteThreshold(self, position):
hardDteStopFlg = False
softDteStopFlg = False
# Extract the positionPnL (per share)
positionPnL = position.positionPnL
# How many days to expiration are left for this position
currentDte = (position.expiry.date() - self.context.Time.date()).days
# Check for DTE stop
if (
position.strategyParam("dteThreshold") is not None # The dteThreshold has been specified
and position.strategyParam("dte") > position.strategyParam("dteThreshold") # We are using the dteThreshold only if the open DTE was larger than the threshold
and currentDte <= position.strategyParam("dteThreshold") # We have reached the DTE threshold
):
# Check if this is a hard DTE cutoff
if position.strategyParam("forceDteThreshold") is True:
hardDteStopFlg = True
# closeReason = closeReason or "Hard DTE cutoff"
# Check if this is a soft DTE cutoff
elif positionPnL >= 0:
softDteStopFlg = True
# closeReason = closeReason or "Soft DTE cutoff"
return hardDteStopFlg, softDteStopFlg
def checkEndOfBacktest(self):
if self.context.endOfBacktestCutoffDttm is not None and self.context.Time >= self.context.endOfBacktestCutoffDttm:
return True
return False
def closePosition(self, position, closeReason, stopLossFlg=False):
# Start the timer
self.context.executionTimer.start()
targets = []
# Get the context
context = self.context
# Get the strategy parameters
# parameters = self.parameters
# Get Order Id and expiration
orderId = position.orderId
# expiryStr = position.expiryStr
orderTag = position.orderTag
orderMidPrice = position.orderMidPrice
limitOrderPrice = position.limitOrderPrice
bidAskSpread = position.bidAskSpread
# Get the details currently open position
openPosition = context.openPositions[orderTag]
# Get the book position
bookPosition = context.allPositions[orderId]
# Get the last trading day before expiration
expiryLastTradingDay = bookPosition.expiryLastTradingDay(context)
# Get the date/time threshold by which the position must be closed (on the last trading day before expiration)
expiryMarketCloseCutoffDttm = None
if bookPosition.strategyParam("marketCloseCutoffTime") != None:
expiryMarketCloseCutoffDttm = bookPosition.expiryMarketCloseCutoffDttm(context)
# Get the contracts and their side
contracts = [l.contract for l in bookPosition.legs]
contractSide = bookPosition.contractSide
# Set the expiration threshold at 15:40 of the expiration date (but no later than the market close cut-off time).
expirationThreshold = None
if expiryMarketCloseCutoffDttm != None:
expirationThreshold = min(expiryLastTradingDay + timedelta(hours=15, minutes=40), expiryMarketCloseCutoffDttm + bookPosition.strategyParam("limitOrderExpiration"))
# Set the expiration date for the Limit order. Make sure it does not exceed the expiration threshold
limitOrderExpiryDttm = min(context.Time + bookPosition.strategyParam("limitOrderExpiration"), expirationThreshold)
else:
limitOrderExpiryDttm = min(context.Time + bookPosition.strategyParam("limitOrderExpiration"), expiryLastTradingDay + timedelta(hours=15, minutes=40))
# Determine if we are going to use a Limit Order
useLimitOrders = (
# Check if we are supposed to use Limit orders as a default
bookPosition.strategyParam("useLimitOrders")
# Make sure there is enough time left to expiration.
# Once we cross the expiration threshold (10 minutes from market close on the expiration day) we are going to submit a Market order
and (expirationThreshold is None or context.Time <= expirationThreshold)
# It's not a stop loss (stop losses are executed through a Market order)
and not stopLossFlg
)
# Determine if we are going to use a Market Order
useMarketOrders = not useLimitOrders
# Get the price of the underlying at the time of closing the position
priceAtClose = None
if context.Securities.ContainsKey(bookPosition.underlyingSymbol()):
if context.Securities[bookPosition.underlyingSymbol()] is not None:
priceAtClose = context.Securities[bookPosition.underlyingSymbol()].Close
else:
self.context.logger.warning("priceAtClose is None")
# Set the midPrice for the order to close
bookPosition.closeOrder.orderMidPrice = orderMidPrice
# Set the Limit order expiration.
bookPosition.closeOrder.limitOrderExpiryDttm = limitOrderExpiryDttm
# Set the timestamp when the closing order is created
bookPosition.closeDttm = context.Time
# Set the date when the closing order is created
bookPosition.closeDt = context.Time.strftime("%Y-%m-%d")
# Set the price of the underlying at the time of submitting the order to close
bookPosition.underlyingPriceAtOrderClose = priceAtClose
# Set the price of the underlying at the time of submitting the order to close:
# - This is the same as underlyingPriceAtOrderClose in case of Market Orders
# - In case of Limit orders, this is the actual price of the underlying at the time when the Limit Order was triggered (price is updated later by the manageLimitOrders method)
bookPosition.underlyingPriceAtClose = priceAtClose
# Set the mid-price of the position at the time of closing
bookPosition.closeOrderMidPrice = orderMidPrice
bookPosition.closeOrderMidPriceMin = orderMidPrice
bookPosition.closeOrderMidPriceMax = orderMidPrice
# Set the Limit Order price of the position at the time of closing
bookPosition.closeOrderLimitPrice = limitOrderPrice
bookPosition.closeOrder.limitOrderPrice = limitOrderPrice
# Set the close DTE
bookPosition.closeDTE = (bookPosition.expiry.date() - context.Time.date()).days
# Set the Days in Trade
bookPosition.DIT = (context.Time.date() - bookPosition.openFilledDttm.date()).days
# Set the close reason
bookPosition.closeReason = closeReason
if useMarketOrders:
# Log the parameters used to validate the order
self.context.logger.debug("Executing Market Order to close the position:")
self.context.logger.debug(f" - orderTag: {orderTag}")
self.context.logger.debug(f" - strikes: {[c.Strike for c in contracts]}")
self.context.logger.debug(f" - orderQuantity: {bookPosition.orderQuantity}")
self.context.logger.debug(f" - midPrice: {orderMidPrice}")
self.context.logger.debug(f" - bidAskSpread: {bidAskSpread}")
self.context.logger.debug(f" - closeReason: {closeReason}")
# Store the Bid-Ask spread at the time of executing the order
bookPosition["closeOrderBidAskSpread"] = bidAskSpread
# legs = []
# isComboOrder = len(contracts) > 1
if useMarketOrders:
position.limitOrder = False
elif useLimitOrders:
position.limitOrder = True
for leg in position.legs:
# Extract order parameters
symbol = leg.symbol
orderSide = leg.orderSide
# orderQuantity = leg.orderQuantity
# TODO: I'm not sure about this order side check here
if orderSide != 0:
targets.append(PortfolioTarget(symbol, orderSide))
# Submit the close orders
context.workingOrders[orderTag] = WorkingOrder(
targets=targets,
orderId=orderId,
useLimitOrder=useLimitOrders,
limitOrderPrice=limitOrderPrice,
orderType="close",
fills=0
)
# Stop the timer
context.executionTimer.stop()
return targets
# Optional: Be notified when securities change
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
pass
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
from CustomIndicators import ATRLevels
from Tools import Underlying
from Strategy import WorkingOrder
class CCMonitor(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 5,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.5,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": None,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
self.fiveMinuteITM = {}
self.HODLOD = {}
self.triggerHODLOD = {}
# The dictionary of consolidators
self.consolidators = dict()
# self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# EMAs for the 8, 21 and 34 periods
self.EMAs = {8: {}, 21: {}, 34: {}}
# self.stdDevs = {}
# Add a dictionary to keep track of whether the position reached 50% profit
self.reachedHalfProfit = {}
def monitorPosition(self, position):
pass
def shouldClose(self, position):
return False, None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
from CustomIndicators import ATRLevels
from Tools import Underlying
from Strategy import WorkingOrder
class FPLMonitorModel(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 1,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 0.5,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 2,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
self.fiveMinuteITM = {}
self.HODLOD = {}
self.triggerHODLOD = {}
# The dictionary of consolidators
self.consolidators = dict()
self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# EMAs for the 8, 21 and 34 periods
self.EMAs = {8: {}, 21: {}, 34: {}}
# self.stdDevs = {}
# Add a dictionary to keep track of whether the position reached 50% profit
self.reachedHalfProfit = {}
def monitorPosition(self, position):
"""
TODO:
# These can be complementar
- check if 2x(1.0) premium was reached and increase position quantity
- check if 2.5x(1.5) premium was reached and increase position
"""
symbol = position.underlyingSymbol()
underlying = Underlying(self.context, position.underlyingSymbol())
# Check if any price in the priceProgressList reached 50% profit
if any(abs(price*100) / abs(position.openOrder.fillPrice*100) <= 0.5 for price in position.priceProgressList):
self.reachedHalfProfit[position.orderTag] = True
# Check if the price of the position reaches 1.0 premium (adding a buffer so we can try and get a fill at 1.0)
if any(price / abs(position.openOrder.fillPrice) >= 0.9 for price in position.priceProgressList):
# Increase the quantity by 50%
new_quantity = position.Quantity * 1.5
orderTag = position.orderTag
orderId = position.orderId
self.context.workingOrders[orderTag] = WorkingOrder(
orderId=orderId,
useLimitOrder=True,
orderType="update",
limitOrderPrice=1.0,
fills=0,
quantity=new_quantity
)
bar = underlying.Security().GetLastData()
stats = position.strategy.stats
if bar is not None:
high = bar.High
low = bar.Low
for period, emas in self.EMAs.items():
if symbol in emas:
ema = emas[symbol]
if ema.IsReady:
if low <= ema.Current.Value <= high:
# The price has touched the EMA
stats.touchedEMAs[symbol] = True
def shouldClose(self, position):
"""
TODO:
- check if ATR indicator has been breached and exit
- check if half premium was reached and close 50%-80% of position (not really possible now as we have a True/False return)
"""
score = 0
reason = ""
stats = position.strategy.stats
# Assign a score of 3 if 5m ITM threshold is met
if position.orderTag in self.fiveMinuteITM and self.fiveMinuteITM[position.orderTag]:
score += 3
reason = "5m ITM"
# Assign a score of 1 if HOD/LOD breach occurs
if position.orderTag in self.HODLOD and self.HODLOD[position.orderTag] and position.underlyingSymbol() in stats.touchedEMAs and stats.touchedEMAs[position.underlyingSymbol()]:
score += 1
reason = "HOD/LOD"
# Assign a score of 2 if the position reached 50% profit and is now at break-even or slight loss
if position.orderTag in self.reachedHalfProfit and self.reachedHalfProfit[position.orderTag] and position.positionPnL <= 0:
score += 2
reason = "Reached 50% profit"
# Return True if the total score is 3 or more
if score >= 3:
return True, reason
return False, ""
def preManageRisk(self):
# Check if it's time to plot and return if it's not the 1 hour mark
if self.context.Time.minute % 60 != 0:
return
# Plot ATR Levels on the "Underlying Price" chart
for i, level in enumerate(self.ATRLevels.BullLevels()[:3]):
self.context.Plot("Underlying Price", f"Bull Level {i+1}", level)
for i, level in enumerate(self.ATRLevels.BearLevels()[:3]):
self.context.Plot("Underlying Price", f"Bear Level {i+1}", level)
# Loop through all open positions
for _orderTag, orderId in list(self.context.openPositions.items()):
# Get the book position
bookPosition = self.context.allPositions[orderId]
# TODO:
# if price is 1.0x premium received, increase position quantity
# if price is 1.5x premium received, increase position quantity
return super().preManageRisk()
def on5MinuteData(self, sender: object, consolidated_bar: TradeBar) -> None:
"""
On a new 5m bar we check if we should close the position.
"""
# pass
for _orderTag, orderId in list(self.context.openPositions.items()):
# Get the book position
bookPosition = self.context.allPositions[orderId]
# if bookPosition.strategyId == "IronCondor":
# continue
# self.handleHODLOD(bookPosition, consolidated_bar)
self.handleFiveMinuteITM(bookPosition, consolidated_bar)
def on15MinuteData(self, sender: object, consolidated_bar: TradeBar) -> None:
for _orderTag, orderId in list(self.context.openPositions.items()):
# Get the book position
bookPosition = self.context.allPositions[orderId]
if bookPosition.strategyId == "IronCondor":
continue
self.handleHODLOD(bookPosition, consolidated_bar)
# pass
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
super().OnSecuritiesChanged(algorithm, changes)
for security in changes.AddedSecurities:
if security.Type != SecurityType.Equity and security.Type != SecurityType.Index:
continue
self.context.logger.info(f"Adding consolidator for {security.Symbol}")
self.consolidators[security.Symbol] = []
# Creating a 5-minute consolidator.
consolidator5m = TradeBarConsolidator(timedelta(minutes=5))
consolidator5m.DataConsolidated += self.on5MinuteData
self.context.SubscriptionManager.AddConsolidator(security.Symbol, consolidator5m)
self.consolidators[security.Symbol].append(consolidator5m)
# Creating a 15-minute consolidator.
consolidator15m = TradeBarConsolidator(timedelta(minutes=15))
consolidator15m.DataConsolidated += self.on15MinuteData
self.context.SubscriptionManager.AddConsolidator(security.Symbol, consolidator15m)
self.consolidators[security.Symbol].append(consolidator15m)
# Creating the Daily ATRLevels indicator
self.context.RegisterIndicator(security.Symbol, self.ATRLevels, Resolution.Daily)
self.context.WarmUpIndicator(security.Symbol, self.ATRLevels, Resolution.Daily)
# Creating the EMAs
for period in self.EMAs.keys():
ema = ExponentialMovingAverage(period)
self.EMAs[period][security.Symbol] = ema
self.context.RegisterIndicator(security.Symbol, ema, consolidator15m)
# Creating the Standard Deviation indicator
# self.stdDevs[security.Symbol] = StandardDeviation(20)
# self.context.RegisterIndicator(security.Symbol, self.stdDevs[security.Symbol], consolidator5m)
# NOTE: commented out as for some reason in the middle of the backtest SPX is removed from the universe???!
# for security in changes.RemovedSecurities:
# if security.Type != SecurityType.Equity and security.Type != SecurityType.Index:
# continue
# if security.Symbol not in self.consolidators:
# continue
# self.context.logger.info(f"Removing consolidator for {security.Symbol}")
# consolidator = self.consolidators.pop(security.Symbol)
# self.context.SubscriptionManager.RemoveConsolidator(security.Symbol, consolidator)
# consolidator.DataConsolidated -= self.onFiveMinuteData
def handleHODLOD(self, bookPosition, consolidated_bar):
stats = bookPosition.strategy.stats
# Get the high/low of the day before the update
highOfDay = stats.highOfTheDay
lowOfDay = stats.lowOfTheDay
# currentDay = self.context.Time.date()
if bookPosition.orderTag not in self.triggerHODLOD:
self.triggerHODLOD[bookPosition.orderTag] = RollingWindow[bool](2) # basically wait 25 minutes before triggering
if bookPosition.strategyId == 'CallCreditSpread' and consolidated_bar.Close > highOfDay:
self.triggerHODLOD[bookPosition.orderTag].Add(True)
elif bookPosition.strategyId == "PutCreditSpread" and consolidated_bar.Close < lowOfDay:
self.triggerHODLOD[bookPosition.orderTag].Add(True)
if bookPosition.orderTag in self.triggerHODLOD:
# Check if all values are True and the RollingWindow is full
if all(self.triggerHODLOD[bookPosition.orderTag]) and self.triggerHODLOD[bookPosition.orderTag].IsReady:
self.HODLOD[bookPosition.orderTag] = True
def handleFiveMinuteITM(self, bookPosition, consolidated_bar):
soldLeg = [leg for leg in bookPosition.legs if leg.isSold][0]
# Check if we should close the position
if bookPosition.strategyId == 'CallCreditSpread' and consolidated_bar.Close > soldLeg.strike:
self.fiveMinuteITM[bookPosition.orderTag] = True
elif bookPosition.strategyId == "PutCreditSpread" and consolidated_bar.Close < soldLeg.strike:
self.fiveMinuteITM[bookPosition.orderTag] = True
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class HedgeRiskManagementModel(Base):
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
# region imports
from AlgorithmImports import *
# endregion
from .Base import Base
from CustomIndicators import ATRLevels
from Tools import Underlying
from Strategy import WorkingOrder
class IBSMonitor(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 5,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.2,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 2,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
self.fiveMinuteITM = {}
self.HODLOD = {}
self.triggerHODLOD = {}
# The dictionary of consolidators
self.consolidators = dict()
# self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# EMAs for the 8, 21 and 34 periods
self.EMAs = {8: {}, 21: {}, 34: {}}
# self.stdDevs = {}
# Add a dictionary to keep track of whether the position reached 50% profit
self.reachedHalfProfit = {}
def monitorPosition(self, position):
pass
def shouldClose(self, position):
return False, None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class NoStopLossModel(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 1,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 0.9,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": None,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
from CustomIndicators import ATRLevels
from Tools import Underlying
from Strategy import WorkingOrder
class SPXButterflyMonitor(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 5,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 1.9,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
self.fiveMinuteITM = {}
self.HODLOD = {}
self.triggerHODLOD = {}
# The dictionary of consolidators
self.consolidators = dict()
# self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# EMAs for the 8, 21 and 34 periods
self.EMAs = {8: {}, 21: {}, 34: {}}
# self.stdDevs = {}
# Add a dictionary to keep track of whether the position reached 50% profit
self.reachedHalfProfit = {}
def monitorPosition(self, position):
pass
def shouldClose(self, position):
"""
TODO:
- check if ATR indicator has been breached and exit
- check if half premium was reached and close 50%-80% of position (not really possible now as we have a True/False return)
"""
score = 0
reason = ""
stats = position.strategy.stats
# Assign a score of 3 if 5m ITM threshold is met
# if position.orderTag in self.fiveMinuteITM and self.fiveMinuteITM[position.orderTag]:
# score += 3
# reason = "5m ITM"
# Assign a score of 1 if HOD/LOD breach occurs
# if position.orderTag in self.HODLOD and self.HODLOD[position.orderTag] and position.underlyingSymbol() in stats.touchedEMAs and stats.touchedEMAs[position.underlyingSymbol()]:
# score += 1
# reason = "HOD/LOD"
# Assign a score of 2 if the position reached 50% profit and is now at break-even or slight loss
# if position.orderTag in self.reachedHalfProfit and self.reachedHalfProfit[position.orderTag] and position.positionPnL <= 0:
# score += 2
# reason = "Reached 50% profit"
# Return True if the total score is 3 or more
# if score >= 3:
# return True, reason
return False, ""
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
from CustomIndicators import ATRLevels
from Tools import Underlying
from Strategy import WorkingOrder
class SPXCondorMonitor(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 5,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.2,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 2,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
self.fiveMinuteITM = {}
self.HODLOD = {}
self.triggerHODLOD = {}
# The dictionary of consolidators
self.consolidators = dict()
# self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# EMAs for the 8, 21 and 34 periods
self.EMAs = {8: {}, 21: {}, 34: {}}
# self.stdDevs = {}
# Add a dictionary to keep track of whether the position reached 50% profit
self.reachedHalfProfit = {}
def monitorPosition(self, position):
pass
def shouldClose(self, position):
return False, None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
from CustomIndicators import ATRLevels
from Tools import Underlying
from Strategy import WorkingOrder
class SPXicMonitor(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 1,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 1.5,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 1.2,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
self.fiveMinuteITM = {}
self.HODLOD = {}
self.triggerHODLOD = {}
# The dictionary of consolidators
self.consolidators = dict()
# self.ATRLevels = ATRLevels("ATRLevels", length = 14)
# EMAs for the 8, 21 and 34 periods
self.EMAs = {8: {}, 21: {}, 34: {}}
# self.stdDevs = {}
# Add a dictionary to keep track of whether the position reached 50% profit
self.reachedHalfProfit = {}
def monitorPosition(self, position):
pass
def shouldClose(self, position):
return False, None
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class StopLossModel(Base):
DEFAULT_PARAMETERS = {
# The frequency (in minutes) with which each position is managed
"managePositionFrequency": 1,
# Profit Target Factor (Multiplier of the premium received/paid when the position was opened)
"profitTarget": 0.7,
# Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received)
# The position is closed (Market Order) if:
# Position P&L < -abs(openPremium) * stopLossMultiplier
# where:
# - openPremium is the premium received (positive) in case of credit strategies
# - openPremium is the premium paid (negative) in case of debit strategies
#
# Credit Strategies (i.e. $2 credit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit)
# - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$)
# Debit Strategies (i.e. $4 debit):
# - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit)
# - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$)
# self.stopLossMultiplier = 3 * self.profitTarget
# self.stopLossMultiplier = 0.6
"stopLossMultiplier": 2.0,
# Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars)
"capStopLoss": True,
}
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
#region imports from AlgorithmImports import * #endregion # Your New Python File from .HedgeRiskManagementModel import HedgeRiskManagementModel from .NoStopLossModel import NoStopLossModel from .StopLossModel import StopLossModel from .FPLMonitorModel import FPLMonitorModel from .SPXicMonitor import SPXicMonitor from .CCMonitor import CCMonitor from .SPXButterflyMonitor import SPXButterflyMonitor from .SPXCondorMonitor import SPXCondorMonitor from .IBSMonitor import IBSMonitor
#region imports
from AlgorithmImports import *
#endregion
from Tools import Helper
# https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/key-concepts
# Portfolio construction scaffolding class; basic method args.
class Base(PortfolioConstructionModel):
def __init__(self, context):
self.context = context
self.context.logger.debug(f"{self.__class__.__name__} -> __init__")
# Create list of PortfolioTarget objects from Insights
def CreateTargets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]:
# super().CreateTargets(algorithm, insights)
targets = []
for insight in insights:
self.context.logger.debug(f'Insight: {insight.Id}')
# Let's find the order that this insight belongs to
order = Helper().findIn(
self.context.workingOrders.values(),
lambda v: any(i.Id == insight.Id for i in v.insights))
position = self.context.allPositions[order.orderId]
target = PortfolioTarget(insight.Symbol, insight.Direction * position.orderQuantity)
self.context.logger.debug(f'Target: {target.Symbol} {target.Quantity}')
order.targets.append(target)
targets.append(target)
return targets
# Determines if the portfolio should rebalance based on the provided rebalancing func
# def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool:
# return True
# # Determines the target percent for each insight
# def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:
# return {}
# # Gets the target insights to calculate a portfolio target percent for, they will be piped to DetermineTargetPercent()
# def GetTargetInsights(self) -> List[Insight]:
# return []
# # Determine if the portfolio construction model should create a target for this insight
# def ShouldCreateTargetForInsight(self, insight: Insight) -> bool:
# return True
# OPTIONAL: Security change details
# def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# # Security additions and removals are pushed here.
# # This can be used for setting up algorithm state.
# # changes.AddedSecurities:
# # changes.RemovedSecurities:
# pass
#region imports
from AlgorithmImports import *
#endregion
from .Base import Base
class OptionsPortfolioConstruction(Base):
def __init__(self, context):
# Call the Base class __init__ method
super().__init__(context)
#region imports
from AlgorithmImports import *
#endregion
# https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/key-concepts
# Portfolio construction scaffolding class; basic method args.
class OptionsPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, context):
pass
# Create list of PortfolioTarget objects from Insights
def CreateTargets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]:
return []
# Determines if the portfolio should rebalance based on the provided rebalancing func
def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool:
return True
# Determines the target percent for each insight
def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:
return {}
# Gets the target insights to calculate a portfolio target percent for, they will be piped to DetermineTargetPercent()
def GetTargetInsights(self) -> List[Insight]:
return []
# Determine if the portfolio construction model should create a target for this insight
def ShouldCreateTargetForInsight(self, insight: Insight) -> bool:
return True
# OPTIONAL: Security change details
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# Security additions and removals are pushed here.
# This can be used for setting up algorithm state.
# changes.AddedSecurities:
# changes.RemovedSecurities:
pass
#region imports from AlgorithmImports import * #endregion # Your New Python File from .OptionsPortfolioConstruction import OptionsPortfolioConstruction
#region imports
from AlgorithmImports import *
#endregion
import matplotlib.pyplot as plt
import mplfinance
import numpy as np
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
# Your New Python File
class Charting:
def __init__(self, data, symbol = None):
self.data = data
self.symbol = symbol
def plot(self):
mplfinance.plot(self.data,
type='candle',
style='charles',
title=f'{self.symbol.Value if self.symbol else "General"} OHLC',
ylabel='Price ($)',
figratio=(15, 10))#region imports from AlgorithmImports import * #endregion # Your New Python File from .Charting import Charting
#region imports
from AlgorithmImports import *
#endregion
import dataclasses
from dataclasses import dataclass, field
from operator import attrgetter
from typing import Dict, List, Optional
from Tools import ContractUtils
import importlib
from Tools import Helper, ContractUtils, Logger, Underlying
"""
Use it like this:
position_key = "some_key" # Replace with an appropriate key
position_data = Position(orderId="12345", orderTag="SPX_Put", Strategy="CreditPutSpread", StrategyTag="CPS", expiryStr="20220107", openDttm="2022-01-07 09:30:00", openDt="2022-01-07", openDTE=0, targetPremium=500, orderQuantity=1, maxOrderQuantity=5, openOrderMidPrice=10.0, openOrderMidPriceMin=9.0, openOrderMidPriceMax=11.0, openOrderBidAskSpread=1.0, openOrderLimitPrice=10.0, underlyingPriceAtOpen=4500.0)
# Create Leg objects for sold and bought options
sold_put_leg = Leg(leg_type="SoldPut", option_symbol="SPXW220107P4500", quantity=-1, strike=4500, expiry="20220107")
bought_put_leg = Leg(leg_type="BoughtPut", option_symbol="SPXW220107P4490", quantity=1, strike=4490, expiry="20220107")
# Add the Leg objects to the Position's legs attribute
position_data.legs.extend([sold_put_leg, bought_put_leg])
# Add the Position to the self.positions dictionary
self.positions[position_key] = position_data
"""
@dataclass
class _ParentBase:
# With the __getitem__ and __setitem__ methods here we are transforming the
# dataclass into a regular dict. This method is to allow getting fields using ["field"]
def __getitem__(self, key):
return super().__getattribute__(key)
def __setitem__(self, key, value):
return super().__setattr__(key, value)
r"""Skip default fields in :func:`~dataclasses.dataclass`
:func:`object representation <repr()>`.
Notes
-----
Credit: Pietro Oldrati, 2022-05-08, Unilicense
https://stackoverflow.com/a/72161437/1396928
"""
def __repr__(self):
"""Omit default fields in object representation."""
nodef_f_vals = (
(f.name, attrgetter(f.name)(self))
for f in dataclasses.fields(self)
if attrgetter(f.name)(self) != f.default
)
nodef_f_repr = ", ".join(f"{name}={value}" for name, value in nodef_f_vals)
return f"{self.__class__.__name__}({nodef_f_repr})"
# recursive method that checks the fields of each dataclass and calls asdict if we have another dataclass referenced
# otherwise it just builds a dictionary and assigns the values and keys.
def asdict(self):
result = {}
for f in dataclasses.fields(self):
fieldValue = attrgetter(f.name)(self)
if isinstance(fieldValue, dict):
result[f.name] = {}
for k, v in fieldValue.items():
if hasattr(type(v), "__dataclass_fields__"):
result[f.name][k] = v.asdict()
else:
result[f.name][k] = v
elif hasattr(type(fieldValue), "__dataclass_fields__"):
result[f.name] = fieldValue.asdict()
else:
if fieldValue != f.default: result[f.name] = fieldValue
return result
@dataclass
class WorkingOrder(_ParentBase):
positionKey: str = ""
insights: List[Insight] = field(default_factory=list)
targets: List[PortfolioTarget] = field(default_factory=list)
orderId: str = ""
strategy: str = "" # Ex: FPLModel actual class
strategyTag: str = "" # Ex: FPLModel
orderType: str = ""
fills: int = 0
useLimitOrder: bool = True
limitOrderPrice: float = 0.0
lastRetry: Optional[datetime.date] = None
fillRetries: int = 0 # number retries to get a fill
@dataclass
class Leg(_ParentBase):
key: str = ""
expiry: Optional[datetime.date] = None
contractSide: int = 0 # TODO: this one i think would be the one to use instead of self.contractSide
symbol: str = ""
quantity: int = 0
strike: float = 0.0
contract: OptionContract = None
# attributes used for order placement
# orderSide: int # TODO: also this i'm not sure what it brings as i can use contractSide.
# orderQuantity: int
# limitPrice: float
@property
def isCall(self):
return self.contract.Right == OptionRight.Call
@property
def isPut(self):
return self.contract.Right == OptionRight.Put
@property
def isSold(self):
return self.contractSide == -1
@property
def isBought(self):
return self.contractSide == 1
@dataclass
class OrderType(_ParentBase):
premium: float = 0.0
fills: int = 0
limitOrderExpiryDttm: str = ""
limitOrderPrice: float = 0.0
bidAskSpread: float = 0.0
midPrice: float = 0.0
midPriceMin: float = 0.0
midPriceMax: float = 0.0
limitPrice: float = 0.0
fillPrice: float = 0.0
openPremium: float = 0.0
stalePrice: bool = False
filled: bool = False
maxLoss: float = 0.0
transactionIds: List[int] = field(default_factory=list)
priceProgressList: List[float] = field(default_factory=list)
@dataclass
class Position(_ParentBase):
"""
The position class should have a structure to hold data and attributes that define it's functionality. Like what the target premium should be or what the slippage should be.
"""
# These are structural attributes that never change.
orderId: str = "" # Ex: 1
orderTag: str = "" # Ex: PutCreditSpread-1
strategy: str = "" # Ex: FPLModel actual class
strategyTag: str = "" # Ex: FPLModel
strategyId: str = "" # Ex: PutCreditSpread, IronCondor
expiryStr: str = ""
expiry: Optional[datetime.date] = None
linkedOrderTag: str = ""
targetPremium: float = 0.0
orderQuantity: int = 0
maxOrderQuantity: int = 0
targetProfit: Optional[float] = None
legs: List[Leg] = field(default_factory=list)
contractSide: Dict[str, int] = field(default_factory=dict)
# These are attributes that change based on the position's lifecycle.
# The first set of attributes are set when the position is opened.
# Attributes that hold data about the order type
openOrder: OrderType = field(default_factory=OrderType)
closeOrder: OrderType = field(default_factory=OrderType)
# Open attributes that will be set when the position is opened.
openDttm: str = ""
openDt: str = ""
openDTE: int = 0
openOrderMidPrice: float = 0.0
openOrderMidPriceMin: float = 0.0
openOrderMidPriceMax: float = 0.0
openOrderBidAskSpread: float = 0.0
openOrderLimitPrice: float = 0.0
openPremium: float = 0.0
underlyingPriceAtOpen: float = 0.0
openFilledDttm: float = 0.0
openStalePrice: bool = False
# Attributes that hold the current state of the position
orderMidPrice: float = 0.0
limitOrderPrice: float = 0.0
bidAskSpread: float = 0.0
positionPnL: float = 0.0
# Close attributes that will be set when the position is closed.
closeDttm: str = ""
closeDt: str = ""
closeDTE: float = float("NaN")
closeOrderMidPrice: float = 0.0
closeOrderMidPriceMin: float = 0.0
closeOrderMidPriceMax: float = 0.0
closeOrderBidAskSpread: float = float("NaN")
closeOrderLimitPrice: float = 0.0
closePremium: float = 0.0
underlyingPriceAtClose: float = float("NaN")
underlyingPriceAtOrderClose: float = float("NaN")
DIT: int = 0 # days in trade
closeStalePrice: bool = False
closeReason: List[str] = field(default_factory=list, init=False)
# Other attributes that will hold the P&L and other stats.
PnL: float = 0.0
PnLMin: float = 0.0
PnLMax: float = 0.0
PnLMinDIT: float = 0.0
PnLMaxDIT: float = 0.0
# Attributes that determine the status of the position.
orderCancelled: bool = False
filled: bool = False
limitOrder: bool = False # True if we want the order to be a limit order when it is placed.
priceProgressList: List[float] = field(default_factory=list)
def underlyingSymbol(self):
if not self.legs:
raise ValueError(f"Missing legs/contracts")
contracts = [v.symbol for v in self.legs]
return contracts[0].Underlying
def strategyModule(self):
try:
strategy_module = importlib.import_module(f'Alpha.{self.strategy.name}')
strategy_class = getattr(strategy_module, self.strategy.name)
return strategy_class
except (ImportError, AttributeError):
raise ValueError(f"Unknown strategy: {self.strategy}")
def strategyParam(self, parameter_name):
"""
// Create a Position instance
pos = Position(
orderId="123",
orderTag="ABC",
strategy="TestAlphaModel",
strategyTag="XYZ",
expiryStr="2023-12-31"
)
// Get targetProfit parameter from the position's strategy
print(pos.strategyParam('targetProfit')) // 0.5
"""
return self.strategyModule().parameter(parameter_name)
@property
def isCreditStrategy(self):
return self.strategyId in ["PutCreditSpread", "CallCreditSpread", "IronCondor", "IronFly", "CreditButterfly", "ShortStrangle", "ShortStraddle", "ShortCall", "ShortPut"]
@property
def isDebitStrategy(self):
return self.strategyId in ["DebitButterfly", "ReverseIronFly", "ReverseIronCondor", "CallDebitSpread", "PutDebitSpread", "LongStrangle", "LongStraddle", "LongCall", "LongPut"]
# Slippage used to set Limit orders
def getPositionValue(self, context):
# Start the timer
context.executionTimer.start()
contractUtils = ContractUtils(context)
# Get the amount of credit received to open the position
openPremium = self.openOrder.premium
orderQuantity = self.orderQuantity
slippage = self.strategyParam("slippage")
# Loop through all legs of the open position
orderMidPrice = 0.0
limitOrderPrice = 0.0
bidAskSpread = 0.0
for leg in self.legs:
contract = leg.contract
# Reverse the original contract side
orderSide = -self.contractSide[leg.symbol]
# Compute the Bid-Ask spread
bidAskSpread += contractUtils.bidAskSpread(contract)
# Get the latest mid-price
midPrice = contractUtils.midPrice(contract)
# Adjusted mid-price (including slippage)
adjustedMidPrice = midPrice + orderSide * slippage
# Total order mid-price
orderMidPrice -= orderSide * midPrice
# Total Limit order mid-price (including slippage)
limitOrderPrice -= orderSide * adjustedMidPrice
# Add the parameters needed to place a Market/Limit order if needed
leg.orderSide = orderSide
leg.orderQuantity = orderQuantity
leg.limitPrice = adjustedMidPrice
# Check if the mid-price is positive: avoid closing the position if the Bid-Ask spread is too wide (more than 25% of the credit received)
positionPnL = openPremium + orderMidPrice * orderQuantity
if self.strategyParam("validateBidAskSpread") and bidAskSpread > self.strategyParam("bidAskSpreadRatio") * openPremium:
context.logger.trace(f"The Bid-Ask spread is too wide. Open Premium: {openPremium}, Mid-Price: {orderMidPrice}, Bid-Ask Spread: {bidAskSpread}")
positionPnL = None
# Store the full mid-price of the position
self.orderMidPrice = orderMidPrice
# Store the Limit Order mid-price of the position (including slippage)
self.limitOrderPrice = limitOrderPrice
# Store the full bid-ask spread of the position
self.bidAskSpread = bidAskSpread
# Store the position PnL
self.positionPnL = positionPnL
# Stop the timer
context.executionTimer.stop()
def updateStats(self, context, orderType):
underlying = Underlying(context, self.underlyingSymbol())
# If we do use combo orders then we might not need to do this check as it has the midPrice in there.
# Store the price of the underlying at the time of submitting the Market Order
self[f"underlyingPriceAt{orderType.title()}"] = underlying.Close()
def updateOrderStats(self, context, orderType):
# Start the timer
context.executionTimer.start()
# leg = next((leg for leg in self.legs if contract.Symbol == leg.symbol), None)
# Get the side of the contract at the time of opening: -1 -> Short +1 -> Long
# contractSide = leg.contractSide
contractUtils = ContractUtils(context)
# Get the contracts
contracts = [v.contract for v in self.legs]
# Get the slippage
slippage = self.strategyParam("slippage") or 0.0
# Sign of the order: open -> 1 (use orderSide as is), close -> -1 (reverse the orderSide)
orderSign = 2*int(orderType == "open")-1
# Sign of the transaction: open -> -1, close -> +1
transactionSign = -orderSign
# Get the mid price of each contract
prices = np.array(list(map(contractUtils.midPrice, contracts)))
# Get the order sides
orderSides = np.array([c.contractSide for c in self.legs])
# Total slippage
totalSlippage = sum(abs(orderSides)) * slippage
# Compute the total order price (including slippage)
# This calculates the sum of contracts midPrice so the midPrice difference between contracts.
midPrice = transactionSign * sum(orderSides * prices) - totalSlippage
# Compute Bid-Ask spread
bidAskSpread = sum(list(map(contractUtils.bidAskSpread, contracts)))
# Store the Open/Close Fill Price (if specified)
closeFillPrice = self.closeOrder.fillPrice
order = self[f"{orderType}Order"]
# Keep track of the Limit order mid-price range
order.midPriceMin = min(order.midPriceMin, midPrice)
order.midPriceMax = max(order.midPriceMax, midPrice)
order.midPrice = midPrice
order.bidAskSpread = bidAskSpread
# Exit if we don't need to include the details
# if not self.strategyParam("includeLegDetails") or context.Time.minute % self.strategyParam("legDatailsUpdateFrequency") != 0:
# return
# # Get the EMA memory factor
# emaMemory = self.strategyParam("emaMemory")
# # Compute the decay such that the contribution of each new value drops to 5% after emaMemory iterations
# emaDecay = 0.05**(1.0/emaMemory)
# # Update the counter (used for the average)
# bookPosition["statsUpdateCount"] += 1
# statsUpdateCount = bookPosition["statsUpdateCount"]
# # Compute the Greeks (retrieve it as a dictionary)
# greeks = self.bsm.computeGreeks(contract).__dict__
# # Add the midPrice and PnL values to the greeks dictionary to generalize the processing loop
# greeks["midPrice"] = midPrice
# # List of variables for which we are going to update the stats
# #vars = ["midPrice", "Delta", "Gamma", "Vega", "Theta", "Rho", "Vomma", "Elasticity", "IV"]
# vars = [var.title() for var in self.strategyParam("greeksIncluded")] + ["midPrice", "IV"]
# Get the fill price at the open
openFillPrice = self.openOrder.fillPrice
# Check if the fill price is set
if not math.isnan(openFillPrice):
# Compute the PnL of position. openPremium will be positive for credit and closePremium will be negative so we just add them together.
self.PnL = self.openPremium + self.closePremium
# Add the PnL to the list of variables for which we want to update the stats
# vars.append("PnL")
# greeks["PnL"] = PnL
# for var in vars:
# # Set the name of the field to be updated
# fieldName = f"{fieldPrefix}.{var}"
# strategyLeg = positionStrategyLeg[var]
# # Get the latest value from the dictionary
# fieldValue = greeks[var]
# # Special case for the PnL
# if var == "PnL" and statsUpdateCount == 2:
# # Initialize the EMA for the PnL
# strategyLeg.EMA = fieldValue
# # Update the Min field
# strategyLeg.Min = min(strategyLeg.Min, fieldValue)
# # Update the Max field
# strategyLeg.Max = max(strategyLeg.Max, fieldValue)
# # Update the Close field (this is the most recent value of the greek)
# strategyLeg.Close = fieldValue
# # Update the EMA field (IMPORTANT: this must be done before we update the Avg field!)
# strategyLeg.EMA = emaDecay * strategyLeg.EMA + (1-emaDecay)*fieldValue
# # Update the Avg field
# strategyLeg.Avg = (strategyLeg.Avg*(statsUpdateCount-1) + fieldValue)/statsUpdateCount
# if self.strategyParam("trackLegDetails") and var == "IV":
# if context.Time not in context.positionTracking[self.orderId]:
# context.positionTracking[self.orderId][context.Time] = {"orderId": self.orderId
# , "Time": context.Time
# }
# context.positionTracking[self.orderId][context.Time][fieldName] = fieldValue
# Stop the timer
context.executionTimer.stop()
def updatePnLRange(self, currentDate, positionPnL):
# How many days has this position been in trade for
# currentDit = (self.context.Time.date() - bookPosition.openFilledDttm.date()).days
currentDit = (currentDate - self.openFilledDttm.date()).days
# Keep track of the P&L range throughout the life of the position (mark the DIT of when the Min/Max PnL occurs)
if 100 * positionPnL < self.PnLMax:
self.PnLMinDIT = currentDit
self.PnLMin = min(self.PnLMin, 100 * positionPnL)
if 100 * positionPnL > self.PnLMax:
self.PnLMaxDIT = currentDit
self.PnLMax = max(self.PnLMax, 100 * positionPnL)
def expiryLastTradingDay(self, context):
# Get the last trading day for the given expiration date (in case it falls on a holiday)
return context.lastTradingDay(self.expiry)
def expiryMarketCloseCutoffDttm(self, context):
# Set the date/time threshold by which the position must be closed (on the last trading day before expiration)
return datetime.combine(self.expiryLastTradingDay(context), self.strategyParam("marketCloseCutoffTime"))
def cancelOrder(self, context, orderType = 'open', message = ''):
self.orderCancelled = True
execOrder = self[f"{orderType}Order"]
orderTransactionIds = execOrder.transactionIds
context.logger.info(f" >>> CANCEL-----> {orderType} order with message: {message}")
context.logger.debug("Expired or the limit order was not filled in the allocated time.")
context.logger.info(f"Cancel {self.orderTag} & Progress of prices: {execOrder.priceProgressList}")
context.logger.info(f"Position progress of prices: {self.priceProgressList}")
context.charting.updateStats(self)
for id in orderTransactionIds:
context.logger.info(f"Canceling order: {id}")
ticket = context.Transactions.GetOrderTicket(id)
if ticket:
ticket.Cancel()
#region imports from AlgorithmImports import * #endregion from .Position import Position, Leg, OrderType, WorkingOrder
#region imports
from AlgorithmImports import *
#endregion
import pytest
from unittest.mock import patch, call
import pandas as pd
@pytest.fixture
def logger(mock_algorithm, mocked_logger):
return mocked_logger(mock_algorithm, className="TestClass", logLevel=3)
def test_logger_initialization(mock_algorithm, mocked_logger):
logger = mocked_logger(mock_algorithm, className="TestClass", logLevel=3)
assert logger.context == mock_algorithm
assert logger.className == "TestClass"
assert logger.logLevel == 3
def test_log_method(logger, mock_algorithm):
with patch('sys._getframe') as mock_frame:
mock_frame.return_value.f_code.co_name = 'test_function'
logger.Log("Test message", trsh=2)
mock_algorithm.Log.assert_called_once_with(" INFO -> TestClass.test_function: Test message")
@pytest.mark.parametrize("method,expected_prefix", [
("error", "ERROR"),
("warning", "WARNING"),
("info", "INFO"),
("debug", "DEBUG"),
("trace", "TRACE")
])
def test_log_levels(logger, mock_algorithm, method, expected_prefix):
with patch('sys._getframe') as mock_frame:
mock_frame.return_value.f_code.co_name = 'test_function'
getattr(logger, method)("Test message")
mock_algorithm.Log.assert_called_once_with(f" {expected_prefix} -> TestClass.test_function: Test message")
def test_log_level_filtering(mock_algorithm, mocked_logger):
logger = mocked_logger(mock_algorithm, className="TestClass", logLevel=2)
with patch('sys._getframe') as mock_frame:
mock_frame.return_value.f_code.co_name = 'test_function'
logger.error("Error message")
logger.warning("Warning message")
logger.info("Info message")
logger.debug("Debug message")
logger.trace("Trace message")
assert mock_algorithm.Log.call_count == 3
mock_algorithm.Log.assert_has_calls([
call(" ERROR -> TestClass.test_function: Error message"),
call(" WARNING -> TestClass.test_function: Warning message"),
call(" INFO -> TestClass.test_function: Info message")
])
def test_dataframe_logging(logger, mock_algorithm):
test_data = [
{'name': 'Alice', 'age': 30},
{'name': 'Bob', 'age': 25}
]
expected_output = "\n name age\nAlice 30\n Bob 25"
with patch('sys._getframe') as mock_frame:
mock_frame.return_value.f_code.co_name = 'test_function'
with patch('pandas.DataFrame.to_string', return_value=expected_output):
logger.dataframe(test_data)
mock_algorithm.Log.assert_called_once_with(f" INFO -> TestClass.test_function: {expected_output}")
def test_dataframe_logging_empty_data(logger, mock_algorithm):
test_data = []
logger.dataframe(test_data)
mock_algorithm.Log.assert_not_called()
def test_dataframe_logging_dict_input(logger, mock_algorithm):
test_data = {'name': ['Alice', 'Bob'], 'age': [30, 25]}
expected_output = "\n name age\nAlice 30\n Bob 25"
with patch('sys._getframe') as mock_frame:
mock_frame.return_value.f_code.co_name = 'test_function'
with patch('pandas.DataFrame.to_string', return_value=expected_output):
logger.dataframe(test_data)
mock_algorithm.Log.assert_called_once_with(f" INFO -> TestClass.test_function: {expected_output}")#region imports
from AlgorithmImports import *
#endregion
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
import pytest
from unittest.mock import patch, MagicMock, call
import time
def test_timer_initialization(mock_algorithm, mocked_timer):
timer = mocked_timer(mock_algorithm)
assert timer.context == mock_algorithm
assert timer.performance == {}
def test_timer_start(mock_algorithm, mocked_timer):
timer = mocked_timer(mock_algorithm)
with patch('time.perf_counter', return_value=100.0):
timer.start('test_method')
assert 'test_method' in timer.performance
assert timer.performance['test_method']['startTime'] == 100.0
def test_timer_stop(mock_algorithm, mocked_timer):
timer = mocked_timer(mock_algorithm)
timer.performance['test_method'] = timer.performanceTemplate.copy()
timer.performance['test_method']['startTime'] = 100.0
with patch('time.perf_counter', return_value=150.0):
timer.stop('test_method')
performance = timer.performance['test_method']
assert performance['calls'] == 1
assert performance['elapsedLast'] == 50.0
assert performance['elapsedMin'] == 50.0
assert performance['elapsedMax'] == 50.0
assert performance['elapsedTotal'] == 50.0
assert performance['elapsedMean'] == 50.0
def test_timer_show_stats(mock_algorithm, mocked_timer):
timer = mocked_timer(mock_algorithm)
timer.performance['method1'] = {
'calls': 2,
'elapsedMin': 10.0,
'elapsedMean': 15.0,
'elapsedMax': 20.0,
'elapsedTotal': 30.0,
'elapsedLast': 15.0,
'startTime': None
}
timer.performance['method2'] = {
'calls': 1,
'elapsedMin': 5.0,
'elapsedMean': 5.0,
'elapsedMax': 5.0,
'elapsedTotal': 5.0,
'elapsedLast': 5.0,
'startTime': None
}
timer.showStats()
# Check that Log method was called with the correct arguments
expected_calls = [
call("Execution Stats (method1):"),
call(" --> calls:2"),
call(" --> elapsedMin:0:00:10"),
call(" --> elapsedMean:0:00:15"),
call(" --> elapsedMax:0:00:20"),
call(" --> elapsedTotal:0:00:30"),
call(" --> elapsedLast:0:00:15"),
call("Execution Stats (method2):"),
call(" --> calls:1"),
call(" --> elapsedMin:0:00:05"),
call(" --> elapsedMean:0:00:05"),
call(" --> elapsedMax:0:00:05"),
call(" --> elapsedTotal:0:00:05"),
call(" --> elapsedLast:0:00:05"),
call("Summary:"),
call(" --> elapsedTotal: 0:00:35")
]
mock_algorithm.Log.assert_has_calls(expected_calls, any_order=True)
def test_timer_multiple_methods(mock_algorithm, mocked_timer):
timer = mocked_timer(mock_algorithm)
with patch('time.perf_counter') as mock_time:
mock_time.side_effect = [100.0, 150.0, 200.0, 300.0]
timer.start('method1')
timer.stop('method1')
timer.start('method2')
timer.stop('method2')
assert 'method1' in timer.performance
assert 'method2' in timer.performance
assert timer.performance['method1']['elapsedTotal'] == 50.0
assert timer.performance['method2']['elapsedTotal'] == 100.0#region imports from AlgorithmImports import * #endregion # Your New Python File
#region imports from AlgorithmImports import * #endregion # Your New Python File
#region imports
from AlgorithmImports import *
#endregion
import pytest
from unittest.mock import MagicMock, patch
@pytest.fixture
def mock_resolution():
return MagicMock(Minute="Minute", Hour="Hour", Daily="Daily")
@pytest.fixture
def mock_algorithm_imports(mock_resolution):
mock_imports = MagicMock()
mock_imports.Resolution = mock_resolution
return mock_imports
@pytest.fixture(autouse=True)
def patch_algorithm_imports(mock_algorithm_imports):
with patch.dict('sys.modules', {'AlgorithmImports': mock_algorithm_imports}):
yield mock_algorithm_imports
@pytest.fixture
def mock_algorithm():
return MagicMock()
@pytest.fixture
def mock_qc_data():
return MagicMock()
@pytest.fixture
def mock_symbol():
return MagicMock()
@pytest.fixture
def mock_resolution_class():
class MockResolution:
Minute = "Minute"
Hour = "Hour"
Daily = "Daily"
return MockResolution
@pytest.fixture
def mocked_timer(patch_algorithm_imports):
from Tools.Timer import Timer
return Timer
@pytest.fixture
def mocked_logger(patch_algorithm_imports):
from Tools.Logger import Logger
return Logger
#region imports
from AlgorithmImports import *
#endregion
########################################################################################
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
# #
# Copyright [2021] [Rocco Claudio Cannizzaro] #
# #
########################################################################################
import numpy as np
from math import *
from scipy import optimize
from scipy.stats import norm
from Tools import Logger, ContractUtils
class BSM:
def __init__(self, context, tradingDays = 365.0):
# Set the context
self.context = context
# Set the logger
self.logger = Logger(context, className = type(self).__name__, logLevel = context.logLevel)
# Initialize the contract utils
self.contractUtils = ContractUtils(context)
# Set the IR
self.riskFreeRate = context.riskFreeRate
# Set the number of trading days
self.tradingDays = tradingDays
def isITM(self, contract, spotPrice = None):
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
if contract.Right == OptionRight.Call:
# A Call option is in the money if the underlying price is above the strike price
return contract.Strike < spotPrice
else:
# A Put option is in the money if the underlying price is below the strike price
return spotPrice < contract.Strike
def bsmD1(self, contract, sigma, tau = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Use the risk free rate unless otherwise specified
if ir == None:
ir = self.riskFreeRate
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Strike price
strikePrice = contract.Strike
# Check edge cases:
# - The contract is expired -> tau = 0
# - The IV could not be computed (deep ITM or far OTM options) -> sigma = 0
if tau == 0 or sigma == 0:
# Set the sign based on whether it is a Call (+1) or a Put (-1)
sign = 2*int(contract.Right == OptionRight.Call)-1
if(self.isITM(contract, spotPrice = spotPrice)):
# Deep ITM options:
# - Call: d1 = Inf -> Delta = Norm.CDF(d1) = 1
# - Put: d1 = -Inf -> Delta = -Norm.CDF(-d1) = -1
d1 = sign * float('inf')
else:
# Far OTM options:
# - Call: d1 = -Inf -> Delta = Norm.CDF(d1) = 0
# - Put: d1 = Inf -> Delta = -Norm.CDF(-d1) = 0
d1 = sign * float('-inf')
else:
d1 = (np.log(spotPrice/strikePrice) + (ir + 0.5*sigma**2)*tau)/(sigma * np.sqrt(tau))
return d1
def bsmD2(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
if d1 == None:
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute D2
d2 = d1 - sigma * np.sqrt(tau)
return d2
# Compute the DTE as a time fraction of the year
def optionTau(self, contract, atTime = None):
if atTime == None:
atTime = self.context.Time
# Get the expiration date and add 16 hours to the market close
expiryDttm = contract.Expiry + timedelta(hours = 16)
# Time until market close
timeDiff = expiryDttm - atTime
# Days to expiration: use the fraction of minutes until market close in case of 0-DTE (390 minutes = 6.5h -> from 9:30 to 16:00)
dte = max(0, timeDiff.days, timeDiff.seconds/(60.0*390.0))
# DTE as a fraction of a year
tau = dte/self.tradingDays
return tau
# Pricing of a European option based on the Black Scholes Merton model (without dividends)
def bsmPrice(self, contract, sigma, tau = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute D2
d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
# X*e^(-r*tau)
Xert = contract.Strike * np.exp(-self.riskFreeRate*tau)
#Price the option
if contract.Right == OptionRight.Call:
# Call Option
theoreticalPrice = norm.cdf(d1)*spotPrice - norm.cdf(d2)*Xert
else:
# Put Option
theoreticalPrice = norm.cdf(-d2)*Xert - norm.cdf(-d1)*spotPrice
return theoreticalPrice
# Compute the Theta of an option
def bsmTheta(self, contract, sigma, tau = None, d1 = None, d2 = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
if d1 == None:
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute D2
if d2 == None:
d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
# -S*N'(d1)*sigma/(2*sqrt(tau))
SNs = -(spotPrice * norm.pdf(d1) * sigma) / (2.0 * np.sqrt(tau))
# r*X*e^(-r*tau)
rXert = self.riskFreeRate * contract.Strike * np.exp(-self.riskFreeRate*tau)
# Compute Theta (divide by the number of trading days to get a daily Theta value)
if contract.Right == OptionRight.Call:
theta = (SNs - rXert * norm.cdf(d2))/self.tradingDays
else:
theta = (SNs + rXert * norm.cdf(-d2))/self.tradingDays
return theta
# Compute the Theta of an option
def bsmRho(self, contract, sigma, tau = None, d1 = None, d2 = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
if d1 == None:
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute D2
if d2 == None:
d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
# tau*X*e^(-r*tau)
tXert = tau * self.riskFreeRate * contract.Strike * np.exp(-self.riskFreeRate*tau)
# Compute Theta
if contract.Right == OptionRight.Call:
rho = tXert * norm.cdf(d2)
else:
rho = -tXert * norm.cdf(-d2)
return rho
# Compute the Gamma of an option
def bsmGamma(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
if d1 == None:
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute Gamma
if(sigma == 0 or tau == 0):
gamma = float('inf')
else:
gamma = norm.pdf(d1) / (spotPrice * sigma * np.sqrt(tau))
return gamma
# Compute the Vega of an option
def bsmVega(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
if d1 == None:
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute Vega
vega = spotPrice * norm.pdf(d1) * np.sqrt(tau)
return vega
# Compute the Vomma of an option
def bsmVomma(self, contract, sigma, tau = None, d1 = None, d2 = None, ir = None, spotPrice = None, atTime = None):
# Get the DTE as a fraction of a year
if tau == None:
tau = self.optionTau(contract, atTime = atTime)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
if d1 == None:
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute D2
if d2 == None:
d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
# Compute Vomma
if(sigma == 0):
vomma = float('inf')
else:
vomma = spotPrice * norm.pdf(d1) * np.sqrt(tau) * d1 * d2 / sigma
return vomma
# Compute Implied Volatility from the price of an option
def bsmIV(self, contract, tau = None, saveIt = False):
# Start the timer
self.context.executionTimer.start()
# Inner function used to compute the root
def f(sigma, contract, tau):
return self.bsmPrice(contract, sigma = sigma, tau = tau) - self.contractUtils.midPrice(contract)
# First order derivative (Vega)
def fprime(sigma, contract, tau):
return self.bsmVega(contract, sigma = sigma, tau = tau)
# Second order derivative (Vomma)
def fprime2(sigma, contract, tau):
return self.bsmVomma(contract, sigma = sigma, tau = tau)
# Initialize the IV to zero in case anything goes wrong
IV = 0
# Initialize the flag to mark whether we were able to find the root
converged = False
# Find the root -> Implied Volatility: Use Halley's method
try:
# Start the search at the lastest known value for the IV (if previously calculated)
x0 = 0.1
if hasattr(contract, "BSMImpliedVolatility"):
x0 = contract.BSMImpliedVolatility
sol = optimize.root_scalar(f, x0 = x0, args = (contract, tau), fprime = fprime, fprime2 = fprime2, method = 'halley', xtol = 1e-6)
# Get the convergence status
converged = sol.converged
# Set the IV if we found the root
if converged:
IV = sol.root
except:
pass
# Fallback method (Bisection) if Halley's optimization failed
if not converged:
# Find the root -> Implied Volatility
try:
sol = optimize.root_scalar(f, bracket = [0.0001, 2], args = (contract, tau), xtol = 1e-6)
# Get the convergence status
converged = sol.converged
# Set the IV if we found the root
if converged:
IV = sol.root
except:
pass
# Check if we need to save the IV as an attribute of the contract object
if saveIt:
contract.BSMImpliedVolatility = IV
# Stop the timer
self.context.executionTimer.stop()
# Return the result
return IV
# Compute the Delta of an option
def bsmDelta(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None):
if d1 == None:
if tau == None:
# Get the DTE as a fraction of a year
tau = self.optionTau(contract, atTime = atTime)
# Compute D1
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
### if (d1 == None)
# Compute option delta (rounded to 2 digits)
if contract.Right == OptionRight.Call:
delta = norm.cdf(d1)
else:
delta = -norm.cdf(-d1)
return delta
def computeGreeks(self, contract, sigma = None, ir = None, spotPrice = None, atTime = None, saveIt = False):
# Start the timer
self.context.executionTimer.start("Tools.BSMLibrary -> computeGreeks")
# Avoid recomputing the Greeks if we have already done it for this time bar
if hasattr(contract, "BSMGreeks") and contract.BSMGreeks.lastUpdated == self.context.Time:
return contract.BSMGreeks
# Get the DTE as a fraction of a year
tau = self.optionTau(contract, atTime = atTime)
if sigma == None:
# Compute Implied Volatility
sigma = self.bsmIV(contract, tau = tau, saveIt = saveIt)
### if (sigma == None)
# Get the current price of the underlying unless otherwise specified
if spotPrice == None:
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute D1
d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice)
# Compute D2
d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
# First order derivatives
delta = self.bsmDelta(contract, sigma = sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
theta = self.bsmTheta(contract, sigma, tau = tau, d1 = d1, d2 = d2, ir = ir, spotPrice = spotPrice)
vega = self.bsmVega(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
rho = self.bsmRho(contract, sigma, tau = tau, d1 = d1, d2 = d2, ir = ir, spotPrice = spotPrice)
# Second Order derivatives
gamma = self.bsmGamma(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice)
vomma = self.bsmVomma(contract, sigma, tau = tau, d1 = d1, d2 = d2, ir = ir, spotPrice = spotPrice)
# Lambda (a.k.a. elasticity or leverage: the percentage change in option value per percentage change in the underlying price)
elasticity = delta * np.float64(spotPrice)/np.float64(self.contractUtils.midPrice(contract))
# Create a Greeks object
greeks = BSMGreeks(delta = delta
, gamma = gamma
, vega = vega
, theta = theta
, rho = rho
, vomma = vomma
, elasticity = elasticity
, IV = sigma
, lastUpdated = self.context.Time
)
# Check if we need to save the Greeks as an attribute of the contract object
if saveIt:
contract.BSMGreeks = greeks
# Stop the timer
self.context.executionTimer.stop("Tools.BSMLibrary -> computeGreeks")
return greeks
# Compute and store the Greeks for a list of contracts
def setGreeks(self, contracts, sigma = None, ir = None):
# Start the timer
self.context.executionTimer.start("Tools.BSMLibrary -> setGreeks")
if isinstance(contracts, list):
# Loop through all contracts
for contract in contracts:
# Get the current price of the underlying
spotPrice = self.contractUtils.getUnderlyingLastPrice(contract)
# Compute the Greeks for the contract
self.computeGreeks(contract, sigma = sigma, ir = ir, spotPrice = spotPrice, saveIt = True)
else:
# Get the current price of the underlying
spotPrice = self.contractUtils.getUnderlyingLastPrice(contracts)
# Compute the Greeks on a single contract
self.computeGreeks(contracts, sigma = sigma, ir = ir, spotPrice = spotPrice, saveIt = True)
# Log the contract details
self.logger.trace(f"Contract: {contracts.Symbol}")
self.logger.trace(f" -> Contract Mid-Price: {self.contractUtils.midPrice(contracts)}")
self.logger.trace(f" -> Spot: {spotPrice}")
self.logger.trace(f" -> Strike: {contracts.Strike}")
self.logger.trace(f" -> Type: {'Call' if contracts.Right == OptionRight.Call else 'Put'}")
self.logger.trace(f" -> IV: {contracts.BSMImpliedVolatility}")
self.logger.trace(f" -> Delta: {contracts.BSMGreeks.Delta}")
self.logger.trace(f" -> Gamma: {contracts.BSMGreeks.Gamma}")
self.logger.trace(f" -> Vega: {contracts.BSMGreeks.Vega}")
self.logger.trace(f" -> Theta: {contracts.BSMGreeks.Theta}")
self.logger.trace(f" -> Rho: {contracts.BSMGreeks.Rho}")
self.logger.trace(f" -> Vomma: {contracts.BSMGreeks.Vomma}")
self.logger.trace(f" -> Elasticity: {contracts.BSMGreeks.Elasticity}")
# Stop the timer
self.context.executionTimer.stop("Tools.BSMLibrary -> setGreeks")
return
class BSMGreeks:
def __init__(self, delta = None, gamma = None, vega = None, theta = None, rho = None, vomma = None, elasticity = None, IV = None, lastUpdated = None, precision = 5):
self.Delta = self.roundIt(delta, precision)
self.Gamma = self.roundIt(gamma, precision)
self.Vega = self.roundIt(vega, precision)
self.Theta = self.roundIt(theta, precision)
self.Rho = self.roundIt(rho, precision)
self.Vomma = self.roundIt(vomma, precision)
self.Elasticity = self.roundIt(elasticity, precision)
self.IV = self.roundIt(IV, precision)
self.lastUpdated = lastUpdated
def roundIt(self, value, precision = None):
if precision:
return round(value, precision)
else:
return value
#region imports
from AlgorithmImports import *
#endregion
from Tools import Underlying
class Charting:
def __init__(self, context, openPositions=True, Stats=True, PnL=True, WinLossStats=True, Performance=True, LossDetails=True, totalSecurities=False, Trades=True, Distribution=True):
self.context = context
self.resample = datetime.min
# QUANTCONNECT limitations in terms of charts
# Tier Max Series Max Data Points per Series
# Free 10 4,000
# Quant Researcher 10 8,000
# Team 25 16,000
# Trading Firm 25 32,000
# Institution 100 96,000
# Max datapoints set to 4000 (free), 8000 (researcher), 16000 (team) (the maximum allowed by QC)
self.resamplePeriod = (context.EndDate - context.StartDate) / 8_000
# Max number of series allowed
self.maxSeries = 10
self.charts = []
# Create an object to store all the stats
self.stats = CustomObject()
# Store the details about which charts will be plotted (there is a maximum of 10 series per backtest)
self.stats.plot = CustomObject()
self.stats.plot.openPositions = openPositions
self.stats.plot.Stats = Stats
self.stats.plot.PnL = PnL
self.stats.plot.WinLossStats = WinLossStats
self.stats.plot.Performance = Performance
self.stats.plot.LossDetails = LossDetails
self.stats.plot.totalSecurities = totalSecurities
self.stats.plot.Trades = Trades
self.stats.plot.Distribution = Distribution
# Initialize performance metrics
self.stats.won = 0
self.stats.lost = 0
self.stats.winRate = 0.0
self.stats.premiumCaptureRate = 0.0
self.stats.totalCredit = 0.0
self.stats.totalDebit = 0.0
self.stats.PnL = 0.0
self.stats.totalWinAmt = 0.0
self.stats.totalLossAmt = 0.0
self.stats.averageWinAmt = 0.0
self.stats.averageLossAmt = 0.0
self.stats.maxWin = 0.0
self.stats.maxLoss = 0.0
self.stats.testedCall = 0
self.stats.testedPut = 0
totalSecurities = Chart("Total Securities")
totalSecurities.AddSeries(Series('Total Securities', SeriesType.Line, 0))
# Setup Charts
if openPositions:
activePositionsPlot = Chart('Open Positions')
activePositionsPlot.AddSeries(Series('Open Positions', SeriesType.Line, ''))
self.charts.append(activePositionsPlot)
if Stats:
statsPlot = Chart('Stats')
statsPlot.AddSeries(Series('Won', SeriesType.Line, '', Color.Green))
statsPlot.AddSeries(Series('Lost', SeriesType.Line, '', Color.Red))
self.charts.append(statsPlot)
if PnL:
pnlPlot = Chart('Profit and Loss')
pnlPlot.AddSeries(Series('PnL', SeriesType.Line, ''))
self.charts.append(pnlPlot)
if WinLossStats:
winLossStatsPlot = Chart('Win and Loss Stats')
winLossStatsPlot.AddSeries(Series('Average Win', SeriesType.Line, '$', Color.Green))
winLossStatsPlot.AddSeries(Series('Average Loss', SeriesType.Line, '$', Color.Red))
self.charts.append(winLossStatsPlot)
if Performance:
performancePlot = Chart('Performance')
performancePlot.AddSeries(Series('Win Rate', SeriesType.Line, '%'))
performancePlot.AddSeries(Series('Premium Capture', SeriesType.Line, '%'))
self.charts.append(performancePlot)
# Loss Details chart. Only relevant in case of credit strategies
if LossDetails:
lossPlot = Chart('Loss Details')
lossPlot.AddSeries(Series('Short Put Tested', SeriesType.Line, ''))
lossPlot.AddSeries(Series('Short Call Tested', SeriesType.Line, ''))
self.charts.append(lossPlot)
if Trades:
tradesPlot = Chart('Trades')
tradesPlot.AddSeries(CandlestickSeries('UNDERLYING', '$'))
tradesPlot.AddSeries(Series("OPEN TRADE", SeriesType.Scatter, "", Color.Green, ScatterMarkerSymbol.Triangle))
tradesPlot.AddSeries(Series("CLOSE TRADE", SeriesType.Scatter, "", Color.Red, ScatterMarkerSymbol.TriangleDown))
self.charts.append(tradesPlot)
if Distribution:
distributionPlot = Chart('Distribution')
distributionPlot.AddSeries(Series('Distribution', SeriesType.Bar, ''))
self.charts.append(distributionPlot)
# Add the charts to the context
for chart in self.charts:
self.context.AddChart(chart)
# TODO: consider this for strategies.
# Call the chart initialization method of each strategy (give a chance to setup custom charts)
# for strategy in self.strategies:
# strategy.setupCharts()
# Add the first data point to the charts
self.updateCharts()
def updateUnderlying(self, bar):
# Add the latest data point to the underlying chart
# self.context.Plot("UNDERLYING", "UNDERLYING", bar)
self.context.Plot("Trades", "UNDERLYING", bar)
def updateCharts(self, symbol=None):
# Start the timer
self.context.executionTimer.start()
# TODO: consider this for strategies.
# Call the updateCharts method of each strategy (give a chance to update any custom charts)
# for strategy in self.strategies:
# strategy.updateCharts()
# Exit if there is nothing to update
if self.context.Time.time() >= time(15, 59, 0):
return
# self.context.logger.info(f"Time: {self.context.Time}, Resample: {self.resample}")
# In order to not exceed the maximum number of datapoints, we resample the charts.
if self.context.Time <= self.resample: return
self.resample = self.context.Time + self.resamplePeriod
plotInfo = self.stats.plot
if plotInfo.Trades:
# If symbol is defined then we print the symbol data on the chart
if symbol is not None:
underlying = Underlying(self.context, symbol)
self.context.Plot("Trades", "UNDERLYING", underlying.Security().GetLastData())
if plotInfo.totalSecurities:
self.context.Plot("Total Securities", "Total Securities", self.context.Securities.Count)
# Add the latest stats to the plots
if plotInfo.openPositions:
self.context.Plot("Open Positions", "Open Positions", self.context.openPositions.Count)
if plotInfo.Stats:
self.context.Plot("Stats", "Won", self.stats.won)
self.context.Plot("Stats", "Lost", self.stats.lost)
if plotInfo.PnL:
self.context.Plot("Profit and Loss", "PnL", self.stats.PnL)
if plotInfo.WinLossStats:
self.context.Plot("Win and Loss Stats", "Average Win", self.stats.averageWinAmt)
self.context.Plot("Win and Loss Stats", "Average Loss", self.stats.averageLossAmt)
if plotInfo.Performance:
self.context.Plot("Performance", "Win Rate", self.stats.winRate)
self.context.Plot("Performance", "Premium Capture", self.stats.premiumCaptureRate)
if plotInfo.LossDetails:
self.context.Plot("Loss Details", "Short Put Tested", self.stats.testedPut)
self.context.Plot("Loss Details", "Short Call Tested", self.stats.testedCall)
if plotInfo.Distribution:
self.context.Plot("Distribution", "Distribution", 0)
# Stop the timer
self.context.executionTimer.stop()
def plotTrade(self, trade, orderType):
# Start the timer
self.context.executionTimer.start()
# Add the trade to the chart
strikes = []
for leg in trade.legs:
if trade.isCreditStrategy:
if leg.isSold:
strikes.append(leg.strike)
else:
if leg.isBought:
strikes.append(leg.strike)
# self.context.logger.info(f"plotTrades!! : Strikes: {strikes}")
if orderType == "open":
for strike in strikes:
self.context.Plot("Trades", "OPEN TRADE", strike)
else:
for strike in strikes:
self.context.Plot("Trades", "CLOSE TRADE", strike)
# NOTE: this can not be made because there is a limit of 10 Series on all charts so it will fail!
# for strike in strikes:
# self.context.Plot("Trades", f"TRADE {strike}", strike)
# Stop the timer
self.context.executionTimer.stop()
def updateStats(self, closedPosition):
# Start the timer
self.context.executionTimer.start()
orderId = closedPosition.orderId
# Get the position P&L
positionPnL = closedPosition.PnL
# Get the price of the underlying at the time of closing the position
priceAtClose = closedPosition.underlyingPriceAtClose
if closedPosition.isCreditStrategy:
# Update total credit (the position was opened for a credit)
self.stats.totalCredit += closedPosition.openPremium
# Update total debit (the position was closed for a debit)
self.stats.totalDebit += closedPosition.closePremium
else:
# Update total credit (the position was closed for a credit)
self.stats.totalCredit += closedPosition.closePremium
# Update total debit (the position was opened for a debit)
self.stats.totalDebit += closedPosition.openPremium
# Update the total P&L
self.stats.PnL += positionPnL
# Update Win/Loss counters
if positionPnL > 0:
self.stats.won += 1
self.stats.totalWinAmt += positionPnL
self.stats.maxWin = max(self.stats.maxWin, positionPnL)
self.stats.averageWinAmt = self.stats.totalWinAmt / self.stats.won
else:
self.stats.lost += 1
self.stats.totalLossAmt += positionPnL
self.stats.maxLoss = min(self.stats.maxLoss, positionPnL)
self.stats.averageLossAmt = -self.stats.totalLossAmt / self.stats.lost
# Check if this is a Credit Strategy
if closedPosition.isCreditStrategy:
# Get the strikes for the sold contracts
sold_puts = [leg.strike for leg in closedPosition.legs if leg.isSold and leg.isPut]
sold_calls = [leg.strike for leg in closedPosition.legs if leg.isSold and leg.isCall]
if sold_puts and sold_calls:
# Get the short put and short call strikes
shortPutStrike = min(sold_puts)
shortCallStrike = max(sold_calls)
# Check if the short Put is in the money
if priceAtClose <= shortPutStrike:
self.stats.testedPut += 1
# Check if the short Call is in the money
elif priceAtClose >= shortCallStrike:
self.stats.testedCall += 1
# Check if the short Put is being tested
elif (priceAtClose-shortPutStrike) < (shortCallStrike - priceAtClose):
self.stats.testedPut += 1
# The short Call is being tested
else:
self.stats.testedCall += 1
# Update the Win Rate
if ((self.stats.won + self.stats.lost) > 0):
self.stats.winRate = 100*self.stats.won/(self.stats.won + self.stats.lost)
if self.stats.totalCredit > 0:
self.stats.premiumCaptureRate = 100*self.stats.PnL/self.stats.totalCredit
# Trigger an update of the charts
self.updateCharts()
self.plotTrade(closedPosition, "close")
# Stop the timer
self.context.executionTimer.stop()
# Dummy class useful to create empty objects
class CustomObject:
pass
#region imports
from AlgorithmImports import *
#endregion
from .Logger import Logger
class ContractUtils:
def __init__(self, context):
# Set the context
self.context = context
# Set the logger
self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel)
def getUnderlyingPrice(self, symbol):
security = self.context.Securities[symbol]
return self.context.GetLastKnownPrice(security).Price
def getUnderlyingLastPrice(self, contract):
# Get the context
context = self.context
# Get the object from the Securities dictionary if available (pull the latest price), else use the contract object itself
if contract.UnderlyingSymbol in context.Securities:
security = context.Securities[contract.UnderlyingSymbol]
# Check if we have found the security
if security is not None:
# Get the last known price of the security
return context.GetLastKnownPrice(security).Price
else:
# Get the UnderlyingLastPrice attribute of the contract
return contract.UnderlyingLastPrice
def getSecurity(self, contract):
# Get the Securities object
Securities = self.context.Securities
# Check if we can extract the Symbol attribute
if hasattr(contract, "Symbol") and contract.Symbol in Securities:
# Get the security from the Securities dictionary if available (pull the latest price), else use the contract object itself
security = Securities[contract.Symbol]
else:
# Use the contract itself
security = contract
return security
# Returns the mid-price of an option contract
def midPrice(self, contract):
security = self.getSecurity(contract)
return 0.5 * (security.BidPrice + security.AskPrice)
def volume(self, contract):
security = self.getSecurity(contract)
return security.Volume
def openInterest(self, contract):
security = self.getSecurity(contract)
return security.OpenInterest
def delta(self, contract):
security = self.getSecurity(contract)
return security.Delta
def gamma(self, contract):
security = self.getSecurity(contract)
return security.Gamma
def theta(self, contract):
security = self.getSecurity(contract)
return security.Theta
def vega(self, contract):
security = self.getSecurity(contract)
return security.Vega
def rho(self, contract):
security = self.getSecurity(contract)
return security.Rho
def bidPrice(self, contract):
security = self.getSecurity(contract)
return security.BidPrice
def askPrice(self, contract):
security = self.getSecurity(contract)
return security.AskPrice
def bidAskSpread(self, contract):
security = self.getSecurity(contract)
return abs(security.AskPrice - security.BidPrice)
#region imports
from AlgorithmImports import *
#endregion
from .Underlying import Underlying
from .ProviderOptionContract import ProviderOptionContract
import operator
class DataHandler:
# The supported cash indices by QC https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/tickdata/us-cash-indices#05-Supported-Indices
# These need to be added using AddIndex instead of AddEquity
CashIndices = ['VIX','SPX','NDX']
def __init__(self, context, ticker, strategy):
self.ticker = ticker
self.context = context
self.strategy = strategy
# Method to add the ticker[String] data to the context.
# @param resolution [Resolution]
# @return [Symbol]
def AddUnderlying(self, resolution=Resolution.Minute):
if self.__CashTicker():
return self.context.AddIndex(self.ticker, resolution=resolution)
else:
return self.context.AddEquity(self.ticker, resolution=resolution)
# SECTION BELOW IS FOR ADDING THE OPTION CHAIN
# Method to add the option chain data to the context.
# @param resolution [Resolution]
def AddOptionsChain(self, underlying, resolution=Resolution.Minute):
if self.ticker == "SPX":
# Underlying is SPX. We'll load and use SPXW and ignore SPX options (these close in the AM)
return self.context.AddIndexOption(underlying.Symbol, "SPXW", resolution)
elif self.__CashTicker():
# Underlying is an index
return self.context.AddIndexOption(underlying.Symbol, resolution)
else:
# Underlying is an equity
return self.context.AddOption(underlying.Symbol, resolution)
# Should be called on an option object like this: option.SetFilter(self.OptionFilter)
# !This method is called every minute if the algorithm resolution is set to minute
def SetOptionFilter(self, universe):
# Start the timer
self.context.executionTimer.start('Tools.DataHandler -> SetOptionFilter')
self.context.logger.debug(f"SetOptionFilter -> universe: {universe}")
# Include Weekly contracts
# nStrikes contracts to each side of the ATM
# Contracts expiring in the range (DTE-5, DTE)
filteredUniverse = universe.Strikes(-self.strategy.nStrikesLeft, self.strategy.nStrikesRight)\
.Expiration(max(0, self.strategy.dte - self.strategy.dteWindow), max(0, self.strategy.dte))\
.IncludeWeeklys()
self.context.logger.debug(f"SetOptionFilter -> filteredUniverse: {filteredUniverse}")
# Stop the timer
self.context.executionTimer.stop('Tools.DataHandler -> SetOptionFilter')
return filteredUniverse
# SECTION BELOW HANDLES OPTION CHAIN PROVIDER METHODS
def optionChainProviderFilter(self, symbols, min_strike_rank, max_strike_rank, minDte, maxDte):
self.context.executionTimer.start('Tools.DataHandler -> optionChainProviderFilter')
self.context.logger.debug(f"optionChainProviderFilter -> symbols count: {len(symbols)}")
if len(symbols) == 0:
self.context.logger.warning("No symbols provided to optionChainProviderFilter")
return None
filteredSymbols = [symbol for symbol in symbols
if minDte <= (symbol.ID.Date.date() - self.context.Time.date()).days <= maxDte]
self.context.logger.debug(f"Filtered symbols count: {len(filteredSymbols)}")
self.context.logger.debug(f"Context Time: {self.context.Time.date()}")
unique_dates = set(symbol.ID.Date.date() for symbol in symbols)
self.context.logger.debug(f"Unique symbol dates: {unique_dates}")
self.context.logger.debug(f"optionChainProviderFilter -> filteredSymbols: {filteredSymbols}")
if not filteredSymbols:
self.context.logger.warning("No symbols left after date filtering")
return None
if not self.__CashTicker():
filteredSymbols = [x for x in filteredSymbols if self.context.Securities[x.ID.Symbol].IsTradable]
self.context.logger.debug(f"Tradable filtered symbols count: {len(filteredSymbols)}")
if not filteredSymbols:
self.context.logger.warning("No tradable symbols left after filtering")
return None
underlying = Underlying(self.context, self.strategy.underlyingSymbol)
underlyingLastPrice = underlying.Price()
self.context.logger.debug(f"Underlying last price: {underlyingLastPrice}")
if underlyingLastPrice is None:
self.context.logger.warning("Underlying price is None")
return None
try:
atm_strike = sorted(filteredSymbols, key=lambda x: abs(x.ID.StrikePrice - underlyingLastPrice))[0].ID.StrikePrice
except IndexError:
self.context.logger.error("Unable to find ATM strike. Check if filteredSymbols is empty or if strike prices are available.")
return None
self.context.logger.debug(f"ATM strike: {atm_strike}")
strike_list = sorted(set([i.ID.StrikePrice for i in filteredSymbols]))
atm_strike_rank = strike_list.index(atm_strike)
min_strike = strike_list[max(0, atm_strike_rank + min_strike_rank + 1)]
max_strike = strike_list[min(atm_strike_rank + max_strike_rank - 1, len(strike_list)-1)]
selectedSymbols = [symbol for symbol in filteredSymbols
if min_strike <= symbol.ID.StrikePrice <= max_strike]
self.context.logger.debug(f"Selected symbols count: {len(selectedSymbols)}")
contracts = []
for symbol in selectedSymbols:
self.AddOptionContracts([symbol], resolution=self.context.timeResolution)
contract = ProviderOptionContract(symbol, underlyingLastPrice, self.context)
contracts.append(contract)
self.context.executionTimer.stop('Tools.DataHandler -> optionChainProviderFilter')
return contracts
def getOptionContracts(self, slice=None):
self.context.executionTimer.start('Tools.DataHandler -> getOptionContracts')
contracts = None
minDte = max(0, self.strategy.dte - self.strategy.dteWindow)
maxDte = max(0, self.strategy.dte)
self.context.logger.debug(f"getOptionContracts -> minDte: {minDte}")
self.context.logger.debug(f"getOptionContracts -> maxDte: {maxDte}")
if slice:
for chain in slice.OptionChains:
if self.strategy.optionSymbol == None or chain.Key != self.strategy.optionSymbol:
continue
if chain.Value.Contracts.Count != 0:
contracts = [
contract for contract in chain.Value if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte
]
self.context.logger.debug(f"getOptionContracts -> number of contracts from slice: {len(contracts) if contracts else 0}")
if contracts is None:
symbols = self.context.OptionChainProvider.GetOptionContractList(self.ticker, self.context.Time)
contracts = self.optionChainProviderFilter(symbols, -self.strategy.nStrikesLeft, self.strategy.nStrikesRight, minDte, maxDte)
self.context.executionTimer.stop('Tools.DataHandler -> getOptionContracts')
return contracts
# Method to add option contracts data to the context.
# @param contracts [Array]
# @param resolution [Resolution]
# @return [Symbol]
def AddOptionContracts(self, contracts, resolution = Resolution.Minute):
# Add this contract to the data subscription so we can retrieve the Bid/Ask price
if self.__CashTicker():
for contract in contracts:
if contract not in self.context.optionContractsSubscriptions:
self.context.AddIndexOptionContract(contract, resolution)
self.context.optionContractsSubscriptions.append(contract)
else:
for contract in contracts:
if contract not in self.context.optionContractsSubscriptions:
self.context.AddOptionContract(contract, resolution)
self.context.optionContractsSubscriptions.append(contract)
# PRIVATE METHODS
# Internal method to determine if we are using a cashticker to add the data.
# @returns [Boolean]
def __CashTicker(self):
return self.ticker in self.CashIndices
#region imports
from AlgorithmImports import *
#endregion
class Helper:
def findIn(self, data, condition):
return next((v for v in data if condition(v)), None)
#region imports
from AlgorithmImports import *
#endregion
import sys
import pandas as pd
from collections import deque
class Logger:
def __init__(self, context, className=None, logLevel=0, buffer_size=100):
self.context = context
self.className = className
self.logLevel = logLevel
self.log_buffer = deque(maxlen=buffer_size)
self.current_pattern = []
self.pattern_count = 0
def Log(self, msg, trsh=0):
if self.logLevel < trsh:
return
className = f"{self.className}." if self.className else ""
prefix = ["ERROR", "WARNING", "INFO", "DEBUG", "TRACE"][min(trsh, 4)]
log_msg = f"{prefix} -> {className}{sys._getframe(2).f_code.co_name}: {msg}"
self.process_log(log_msg)
def process_log(self, log_msg):
if not self.current_pattern:
self.print_log(log_msg)
self.current_pattern.append(log_msg)
else:
pattern_index = self.find_pattern_start(log_msg)
if pattern_index == -1:
self.print_pattern()
self.print_log(log_msg)
self.current_pattern.append(log_msg)
else:
if pattern_index == 0:
self.pattern_count += 1
else:
self.print_pattern()
self.print_log("--- New log cycle starts ---")
self.current_pattern = self.current_pattern[pattern_index:]
self.pattern_count = 1
self.log_buffer.append(log_msg)
def find_pattern_start(self, log_msg):
for i in range(len(self.current_pattern)):
if log_msg == self.current_pattern[i]:
if self.is_pattern_repeating(i):
return i
return -1
def is_pattern_repeating(self, start_index):
pattern_length = len(self.current_pattern) - start_index
if len(self.log_buffer) < pattern_length:
return False
return list(self.log_buffer)[-pattern_length:] == self.current_pattern[start_index:]
def print_pattern(self):
if self.pattern_count > 1:
self.print_log(f"The following pattern repeated {self.pattern_count} times:")
for msg in self.current_pattern:
self.print_log(f" {msg}")
elif self.pattern_count == 1:
for msg in self.current_pattern:
self.print_log(msg)
self.pattern_count = 0
def print_log(self, msg):
self.context.Log(msg)
def error(self, msg):
self.Log(msg, trsh=0)
def warning(self, msg):
self.Log(msg, trsh=1)
def info(self, msg):
self.Log(msg, trsh=2)
def debug(self, msg):
self.Log(msg, trsh=3)
def trace(self, msg):
self.Log(msg, trsh=4)
def dataframe(self, data):
if isinstance(data, list):
columns = list(data[0].keys())
else:
columns = list(data.keys())
df = pd.DataFrame(data, columns=columns)
if df.shape[0] > 0:
self.info(f"\n{df.to_string(index=False)}")
def __del__(self):
self.print_pattern()
#region imports
from AlgorithmImports import *
#endregion
class Performance:
def __init__(self, context):
self.context = context
self.logger = self.context.logger
self.dailyTracking = datetime.now()
self.seenSymbols = set()
self.tradedSymbols = set()
self.chainSymbols = set()
self.tradedToday = False
self.tracking = {}
def endOfDay(self, symbol):
day_summary = {
"Time": (datetime.now() - self.dailyTracking).total_seconds(),
"Portfolio": len(self.context.Portfolio),
"Invested": sum(1 for kvp in self.context.Portfolio if kvp.Value.Invested),
"Seen": len(self.seenSymbols),
"Traded": len(self.tradedSymbols),
"Chains": len(self.chainSymbols)
}
# Convert Symbol instance to string
symbol_str = str(symbol)
# Ensure the date is in the tracking dictionary
date_key = self.context.Time.date()
if date_key not in self.tracking:
self.tracking[date_key] = {}
# Ensure the symbol is in the tracking dictionary
if symbol_str not in self.tracking[date_key]:
self.tracking[date_key][symbol_str] = {}
# Store the day summary
self.tracking[date_key][symbol_str] = day_summary
self.dailyTracking = datetime.now()
self.tradedToday = False
def OnOrderEvent(self, orderEvent):
if orderEvent.Status == OrderStatus.Filled or orderEvent.Status == OrderStatus.PartiallyFilled:
if orderEvent.Quantity > 0:
self.logger.trace(f"Filled {orderEvent.Symbol}")
self.tradedSymbols.add(orderEvent.Symbol)
self.tradedToday = True
else:
self.logger.trace(f"Unwound {orderEvent.Symbol}")
def OnUpdate(self, data):
if data.OptionChains:
for kvp in data.OptionChains:
chain = kvp.Value # Access the OptionChain from the KeyValuePair
self.chainSymbols.update([oc.Symbol for oc in chain])
if not self.tradedToday:
for optionContract in (contract for contract in chain if contract.Symbol not in self.tradedSymbols):
self.seenSymbols.add(optionContract.Symbol)
def show(self, csv=False):
if csv:
self.context.Log("Day,Symbol,Time,Portfolio,Invested,Seen,Traded,Chains")
for day in sorted(self.tracking.keys()):
for symbol, stats in self.tracking[day].items():
if csv:
self.context.Log(f"{day},{symbol},{stats['Time']},{stats['Portfolio']},{stats['Invested']},{stats['Seen']},{stats['Traded']},{stats['Chains']}")
else:
self.context.Log(f"{day} - {symbol}: {stats}")
# region imports
from AlgorithmImports import *
# endregion
from datetime import datetime
class ProviderOptionContract:
def __init__(self, symbol, underlying_price, context):
self.Symbol = symbol
self.Underlying = symbol.Underlying
self.UnderlyingSymbol = symbol.Underlying
self.ID = symbol.ID
self.UnderlyingLastPrice = underlying_price
self.security = context.Securities[symbol]
@property
def Expiry(self):
return self.ID.Date
@property
def Strike(self):
return self.ID.StrikePrice
@property
def Right(self):
return self.ID.OptionRight
@property
def BidPrice(self):
return self.security.BidPrice
@property
def AskPrice(self):
return self.security.AskPrice
@property
def LastPrice(self):
return self.security.Price
# Add any other properties or methods you commonly use from OptionContract#region imports
from AlgorithmImports import *
#endregion
from .Underlying import Underlying
import operator
class StrictDataHandler:
# The supported cash indices by QC https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/tickdata/us-cash-indices#05-Supported-Indices
# These need to be added using AddIndex instead of AddEquity
CashIndices = ['VIX','SPX','NDX']
def __init__(self, context, ticker, strategy):
self.ticker = ticker
self.context = context
self.strategy = strategy
# Method to add the ticker[String] data to the context.
# @param resolution [Resolution]
# @return [Symbol]
def AddUnderlying(self, resolution=Resolution.Minute):
if self.__CashTicker():
return self.context.AddIndex(self.ticker, resolution=resolution)
else:
return self.context.AddEquity(self.ticker, resolution=resolution)
# SECTION BELOW IS FOR ADDING THE OPTION CHAIN
# Method to add the option chain data to the context.
# @param resolution [Resolution]
def AddOptionsChain(self, underlying, resolution=Resolution.Minute):
if self.ticker == "SPX":
# Underlying is SPX. We'll load and use SPXW and ignore SPX options (these close in the AM)
return self.context.AddIndexOption(underlying.Symbol, "SPXW", resolution)
elif self.__CashTicker():
# Underlying is an index
return self.context.AddIndexOption(underlying.Symbol, resolution)
else:
# Underlying is an equity
return self.context.AddOption(underlying.Symbol, resolution)
# Should be called on an option object like this: option.SetFilter(self.OptionFilter)
# !This method is called every minute if the algorithm resolution is set to minute
def SetOptionFilter(self, universe):
# Start the timer
self.context.executionTimer.start('Tools.DataHandler -> SetOptionFilter')
self.context.logger.debug(f"SetOptionFilter -> universe: {universe}")
# Include Weekly contracts
# nStrikes contracts to each side of the ATM
# Contracts expiring in the range (DTE-5, DTE)
filteredUniverse = universe.Strikes(-self.strategy.nStrikesLeft, self.strategy.nStrikesRight)\
.Expiration(max(0, self.strategy.dte - self.strategy.dteWindow), max(0, self.strategy.dte))\
.IncludeWeeklys()
self.context.logger.debug(f"SetOptionFilter -> filteredUniverse: {filteredUniverse}")
# Stop the timer
self.context.executionTimer.stop('Tools.DataHandler -> SetOptionFilter')
return filteredUniverse
# SECTION BELOW HANDLES OPTION CHAIN PROVIDER METHODS
def optionChainProviderFilter(self, symbols, min_strike_rank, max_strike_rank, minDte, maxDte):
self.context.executionTimer.start('Tools.DataHandler -> optionChainProviderFilter')
self.context.logger.debug(f"optionChainProviderFilter -> symbols count: {len(symbols)}")
# Check if we got any symbols to process
if len(symbols) == 0:
return None
# Filter the symbols based on the expiry range
filteredSymbols = [symbol for symbol in symbols
if minDte <= (symbol.ID.Date.date() - self.context.Time.date()).days <= maxDte
]
self.context.logger.debug(f"Context Time: {self.context.Time.date()}")
unique_dates = set(symbol.ID.Date.date() for symbol in symbols)
self.context.logger.debug(f"Unique symbol dates: {unique_dates}")
self.context.logger.debug(f"optionChainProviderFilter -> filteredSymbols: {filteredSymbols}")
# Exit if there are no symbols for the selected expiry range
if not filteredSymbols:
return None
# If this is not a cash ticker, filter out any non-tradable symbols.
# to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.`
if not self.__CashTicker():
filteredSymbols = [x for x in filteredSymbols if self.context.Securities[x.ID.Symbol].IsTradable]
underlying = Underlying(self.context, self.strategy.underlyingSymbol)
# Get the latest price of the underlying
underlyingLastPrice = underlying.Price()
# Find the ATM strike
atm_strike = sorted(filteredSymbols
,key = lambda x: abs(x.ID.StrikePrice - underlying.Price())
)[0].ID.StrikePrice
# Get the list of available strikes
strike_list = sorted(set([i.ID.StrikePrice for i in filteredSymbols]))
# Find the index of ATM strike in the sorted strike list
atm_strike_rank = strike_list.index(atm_strike)
# Get the Min and Max strike price based on the specified number of strikes
min_strike = strike_list[max(0, atm_strike_rank + min_strike_rank + 1)]
max_strike = strike_list[min(atm_strike_rank + max_strike_rank - 1, len(strike_list)-1)]
# Get the list of symbols within the selected strike range
selectedSymbols = [symbol for symbol in filteredSymbols
if min_strike <= symbol.ID.StrikePrice <= max_strike
]
self.context.logger.debug(f"optionChainProviderFilter -> selectedSymbols: {selectedSymbols}")
# Loop through all Symbols and create a list of OptionContract objects
contracts = []
for symbol in selectedSymbols:
# Create the OptionContract
contract = self.context.OptionContract(symbol, symbol.Underlying)
self.AddOptionContracts([contract], resolution = self.context.timeResolution)
# Set the BidPrice
contract.BidPrice = self.Securities[contract.Symbol].BidPrice
# Set the AskPrice
contract.AskPrice = self.Securities[contract.Symbol].AskPrice
# Set the UnderlyingLastPrice
contract.UnderlyingLastPrice = underlyingLastPrice
# Add this contract to the output list
contracts.append(contract)
self.context.executionTimer.stop('Tools.DataHandler -> optionChainProviderFilter')
# Return the list of contracts
return contracts
def getOptionContracts(self, slice):
# Start the timer
self.context.executionTimer.start('Tools.DataHandler -> getOptionContracts')
contracts = None
# Set the DTE range (make sure values are not negative)
minDte = max(0, self.strategy.dte - self.strategy.dteWindow)
maxDte = max(0, self.strategy.dte)
self.context.logger.debug(f"getOptionContracts -> minDte: {minDte}")
self.context.logger.debug(f"getOptionContracts -> maxDte: {maxDte}")
# Loop through all chains
for chain in slice.OptionChains:
# Look for the specified optionSymbol
if chain.Key != self.strategy.optionSymbol:
continue
# Make sure there are any contracts in this chain
if chain.Value.Contracts.Count != 0:
# Put the contracts into a list so we can cache the Greeks across multiple strategies
contracts = [
contract for contract in chain.Value if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte
]
self.context.logger.debug(f"getOptionContracts -> number of contracts: {len(contracts) if contracts else 0}")
# If no chains were found, use OptionChainProvider to see if we can find any contracts
# Only do this for short term expiration contracts (DTE < 3) where slice.OptionChains usually fails to retrieve any chains
# We don't want to do this all the times for performance reasons
if contracts == None and self.strategy.dte < 3:
# Get the list of available option Symbols
symbols = self.context.OptionChainProvider.GetOptionContractList(self.ticker, self.context.Time)
# Get the contracts
contracts = self.optionChainProviderFilter(symbols, -self.strategy.nStrikesLeft, self.strategy.nStrikesRight, minDte, maxDte)
# Stop the timer
self.context.executionTimer.stop('Tools.DataHandler -> getOptionContracts')
return contracts
# Method to add option contracts data to the context.
# @param contracts [Array]
# @param resolution [Resolution]
# @return [Symbol]
def AddOptionContracts(self, contracts, resolution = Resolution.Minute):
# Add this contract to the data subscription so we can retrieve the Bid/Ask price
if self.__CashTicker():
for contract in contracts:
if contract.Symbol not in self.context.optionContractsSubscriptions:
self.context.AddIndexOptionContract(contract, resolution)
else:
for contract in contracts:
if contract.Symbol not in self.context.optionContractsSubscriptions:
self.context.AddOptionContract(contract, resolution)
# PRIVATE METHODS
# Internal method to determine if we are using a cashticker to add the data.
# @returns [Boolean]
def __CashTicker(self):
return self.ticker in self.CashIndices
#region imports
from AlgorithmImports import *
#endregion
import time as timer
import math
from datetime import timedelta
class Timer:
performanceTemplate = {
"calls": 0.0,
"elapsedMin": float('Inf'),
"elapsedMean": None,
"elapsedMax": float('-Inf'),
"elapsedTotal": 0.0,
"elapsedLast": None,
"startTime": None,
}
def __init__(self, context):
self.context = context
self.performance = {}
def start(self, methodName=None):
# Get the name of the calling method
methodName = methodName or sys._getframe(1).f_code.co_name
# Get current performance stats
performance = self.performance.get(methodName, Timer.performanceTemplate.copy())
# Get the startTime
performance["startTime"] = timer.perf_counter()
# Save it back in the dictionary
self.performance[methodName] = performance
def stop(self, methodName=None):
# Get the name of the calling method
methodName = methodName or sys._getframe(1).f_code.co_name
# Get current performance stats
performance = self.performance.get(methodName)
# Compute the elapsed
elapsed = timer.perf_counter() - performance["startTime"]
# Update the stats
performance["calls"] += 1
performance["elapsedLast"] = elapsed
performance["elapsedMin"] = min(performance["elapsedMin"], elapsed)
performance["elapsedMax"] = max(performance["elapsedMax"], elapsed)
performance["elapsedTotal"] += elapsed
performance["elapsedMean"] = performance["elapsedTotal"]/performance["calls"]
def showStats(self, methodName=None):
methods = methodName or self.performance.keys()
total_elapsed = 0.0 # Initialize total elapsed time
for method in methods:
performance = self.performance.get(method)
if performance:
self.context.Log(f"Execution Stats ({method}):")
for key in performance:
if key != "startTime":
if key == "calls" or performance[key] == None:
value = performance[key]
elif math.isinf(performance[key]):
value = None
else:
value = timedelta(seconds=performance[key])
self.context.Log(f" --> {key}:{value}")
total_elapsed += performance.get("elapsedTotal", 0) # Accumulate elapsedTotal
else:
self.context.Log(f"There are no execution stats available for method {method}!")
# Print the total elapsed time over all methods
self.context.Log("Summary:")
self.context.Log(f" --> elapsedTotal: {timedelta(seconds=total_elapsed)}")
#region imports
from AlgorithmImports import *
#endregion
"""
Underlying class for the Options Strategy Framework.
This class is used to get the underlying price.
Example:
self.underlying = Underlying(self, self.underlyingSymbol)
self.underlyingPrice = self.underlying.Price()
"""
class Underlying:
def __init__(self, context, underlyingSymbol):
self.context = context
self.underlyingSymbol = underlyingSymbol
def Security(self):
return self.context.Securities[self.underlyingSymbol]
# Returns the underlying symbol current price.
def Price(self):
return self.Security().Price
def Close(self):
return self.Security().Close
#region imports from AlgorithmImports import * from .Timer import Timer from .Logger import Logger from .ContractUtils import ContractUtils from .DataHandler import DataHandler from .Underlying import Underlying from .BSMLibrary import BSM, BSMGreeks from .Helper import Helper from .Charting import Charting from .Performance import Performance from .ProviderOptionContract import ProviderOptionContract #endregion
# region imports
from AlgorithmImports import *
# endregion
import numpy as np
import pandas as pd
# The custom algo imports
from Execution import AutoExecutionModel, SmartPricingExecutionModel, SPXExecutionModel
from Monitor import HedgeRiskManagementModel, NoStopLossModel, StopLossModel, FPLMonitorModel, SPXicMonitor, CCMonitor, SPXButterflyMonitor, SPXCondorMonitor, IBSMonitor
from PortfolioConstruction import OptionsPortfolioConstruction
# The alpha models
from Alpha import FPLModel, CCModel, SPXic, SPXButterfly, SPXCondor, IBS
# The execution classes
from Initialization import SetupBaseStructure, HandleOrderEvents
from Tools import Performance
"""
Algorithm Structure Case v1:
1. We run the SetupBaseStructure.Setup() that will set the defaults for all the holders of data and base configuration
2. We have inside each AlphaModel a set of default parameters that will not be assigned to the context.
- This means that each AlphaModel (Strategy) will have their own configuration defined in each class.
- The AlphaModel will add the Underlying and options chains required
- The QC algo will call the AlphaModel#Update method every 1 minute (self.timeResolution)
- The Update method will call the AlphaModel#getOrder method
- The getOrder method should use self.order (Alpha.Utils.Order) methods to get the options
- The options returned will use the Alpha.Utils.Scanner and the Alpha.Utils.OrderBuilder classes
- The final returned method requred to be returned by getOrder method is the Order#getOrderDetails
- The Update method now in AlphaModel will use the getOrder method output to create Insights
"""
class CentralAlgorithm(QCAlgorithm):
def Initialize(self):
# WARNING!! If your are going to trade SPX 0DTE options then make sure you set the startDate after July 1st 2022.
# This is the start of the data we have.
self.SetStartDate(2024, 1, 3)
self.SetEndDate(2024, 2, 4)
# self.SetStartDate(2024, 4, 1)
# self.SetEndDate(2024, 4, 30)
# self.SetEndDate(2022, 9, 15)
# Warmup for some days
# self.SetWarmUp(timedelta(14))
# Logging level:
# -> 0 = ERROR
# -> 1 = WARNING
# -> 2 = INFO
# -> 3 = DEBUG
# -> 4 = TRACE (Attention!! This can consume your entire daily log limit)
self.logLevel = 3 if self.LiveMode else 2
# Set the initial account value
self.initialAccountValue = 100_000
self.SetCash(self.initialAccountValue)
# Time Resolution
self.timeResolution = Resolution.Minute
# Set Export method
self.CSVExport = False
# Should the trade log be displayed
self.showTradeLog = False
# Show the execution statistics
self.showExecutionStats = False
# Show the performance statistics
self.showPerformanceStats = False
# Set the algorithm base variables and structures
self.structure = SetupBaseStructure(self).Setup()
self.performance = Performance(self)
# Set the algorithm framework models
# self.SetAlpha(FPLModel(self))
# self.SetAlpha(SPXic(self))
# self.SetAlpha(CCModel(self))
# self.SetAlpha(SPXButterfly(self))
# self.SetAlpha(SPXCondor(self))
self.SetAlpha(IBS(self))
self.SetPortfolioConstruction(OptionsPortfolioConstruction(self))
# self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel())
# self.SetExecution(SpreadExecutionModel())
self.SetExecution(SPXExecutionModel(self))
# self.SetExecution(AutoExecutionModel(self))
# self.SetExecution(SmartPricingExecutionModel(self))
# self.SetExecution(ImmediateExecutionModel())
# self.SetRiskManagement(NoStopLossModel(self))
# self.SetRiskManagement(StopLossModel(self))
# self.SetRiskManagement(FPLMonitorModel(self))
# self.SetRiskManagement(SPXicMonitor(self))
# self.SetRiskManagement(CCMonitor(self))
# self.SetRiskManagement(SPXButterflyMonitor(self))
# self.SetRiskManagement(SPXCondorMonitor(self))
self.SetRiskManagement(IBSMonitor(self))
# Initialize the security every time that a new one is added
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
self.structure.CompleteSecurityInitializer(security)
for security in changes.RemovedSecurities:
self.structure.ClearSecurity(security)
def OnEndOfDay(self, symbol):
self.structure.checkOpenPositions()
self.performance.endOfDay(symbol)
def OnOrderEvent(self, orderEvent):
# Start the timer
self.executionTimer.start()
# Log the order event
self.logger.debug(orderEvent)
self.performance.OnOrderEvent(orderEvent)
HandleOrderEvents(self, orderEvent).Call()
# Loop through all strategies
# for strategy in self.strategies:
# # Call the Strategy orderEvent handler
# strategy.handleOrderEvent(orderEvent)
# Stop the timer
self.executionTimer.stop()
def OnEndOfAlgorithm(self) -> None:
# Convert the dictionary into a Pandas Data Frame
# dfAllPositions = pd.DataFrame.from_dict(self.allPositions, orient = "index")
# Convert the dataclasses into Pandas Data Frame
dfAllPositions = pd.json_normalize(obj.asdict() for k,obj in self.allPositions.items())
if self.showExecutionStats:
self.Log("")
self.Log("---------------------------------")
self.Log(" Execution Statistics ")
self.Log("---------------------------------")
self.executionTimer.showStats()
self.Log("")
if self.showPerformanceStats:
self.Log("---------------------------------")
self.Log(" Performance Statistics ")
self.Log("---------------------------------")
self.performance.show()
self.Log("")
self.Log("")
if self.showTradeLog:
self.Log("---------------------------------")
self.Log(" Trade Log ")
self.Log("---------------------------------")
self.Log("")
if self.CSVExport:
# Print the csv header
self.Log(dfAllPositions.head(0).to_csv(index = False, header = True, line_terminator = " "))
# Print the data frame to the log in csv format (one row at a time to avoid QC truncation limitation)
for i in range(0, len(dfAllPositions.index)):
self.Log(dfAllPositions.iloc[[i]].to_csv(index = False, header = False, line_terminator = " "))
else:
self.Log(f"\n#{dfAllPositions.to_string()}")
self.Log("")
def lastTradingDay(self, expiry):
# Get the trading calendar
tradingCalendar = self.TradingCalendar
# Find the last trading day for the given expiration date
lastDay = list(tradingCalendar.GetDaysByType(TradingDayType.BusinessDay, expiry - timedelta(days = 20), expiry))[-1].Date
return lastDay