| Overall Statistics |
|
Total Trades 533 Average Win 0.36% Average Loss -0.15% Compounding Annual Return 3.072% Drawdown 8.400% Expectancy 0.201 Net Profit 8.877% Sharpe Ratio 0.687 Probabilistic Sharpe Ratio 25.770% Loss Rate 65% Win Rate 35% Profit-Loss Ratio 2.46 Alpha 0 Beta 0 Annual Standard Deviation 0.032 Annual Variance 0.001 Information Ratio 0.687 Tracking Error 0.032 Treynor Ratio 0 Total Fees $2595.97 Estimated Strategy Capacity $5800000000.00 Lowest Capacity Asset ZC XUBT0M6O6LNP |
# Standard library imports
from AlgorithmImports import *
import datetime as DT
from dateutil.parser import parse
# Import from files
from notes_and_inputs import *
################################################################################
class MyCustomData(PythonData):
"""
Custom Data Class
REFs:
https://www.quantconnect.com/forum/discussion/4079/python-best-practise-for-using-consolidator-on-custom-data/p1
"""
def GetSource(self, config, date, isLiveMode):
# Get file specific to the asset symbol
symbol = config.Symbol.Value
# Must use dictionary.get() method because this will be called on initialization
# without a valid symbol
# symbol must also always be all caps, because QC converts it to all caps!
file = CSV_FILES.get(symbol, '')
return SubscriptionDataSource(
file, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
# New object
asset = MyCustomData()
asset.Symbol = config.Symbol
# try:
# Example File Format:
# <Date> <Time> <Open> <High> <Low> <Close> <Volume>
# 2/1/2018 10:30:00 13.59 13.78 13.41 13.67 19817603
# If first character is not a digit, return
if not (line.strip() and line[0].isdigit()):
return None
data = line.split(',')
# Get the date
date = parse(data[0])
time = parse(data[1]).time()
# Combine date/time to get DT.datetime object
asset.Time = DT.datetime.combine(date, time)
# Set the value used for filling positions / Using the open price
value = float(data[2])
asset.Value = value
# Get the prices
asset["Open"] = float(data[2])
asset["High"] = float(data[3])
asset["Low"] = float(data[4])
asset["Close"] = float(data[5])
asset["Volume"] = float(data[6])
# except ValueError:
# # Do nothing
# return None
return asset# Standard library imports
from AlgorithmImports import *
import datetime as DT
# Import from files
from notes_and_inputs import *
from symbol_data import *
from custom_data import *
################################################################################
class EquitiesStrategyAlgorithm(QCAlgorithm):
def Initialize(self):
"""Initialize algorithm."""
# Set backtest details
self.set_backtest_details()
# Initialize algo parameters
self.initialize_parameters()
# Add all desired instruments to the algo
self.add_instruments()
self.SetWarmUp(DT.timedelta(days = 305))
#-------------------------------------------------------------------------------
def set_backtest_details(self):
"""Set the backtest details."""
# Set the start and end date (if applicable)
self.SetStartDate(
BACKTEST_START_DT.year,
BACKTEST_START_DT.month,
BACKTEST_START_DT.day
)
if END_DATE:
self.SetEndDate(END_DT.year, END_DT.month, END_DT.day)
# Set the starting cash amount
self.SetCash(CASH)
# Set the timezone for algo logs
self.SetTimeZone(TIMEZONE)
# Setup trading framework
# Transaction and submit/execution rules will use IB models
# Cannot use the built-in models with the custom data
if DATA_SOURCE == 'QC':
self.SetBrokerageModel(
BrokerageName.InteractiveBrokersBrokerage,
AccountType.Margin
)
# Will override the desired reality methods with this function
# Configure all algorithm securities
self.SetSecurityInitializer(self.custom_security_initializer)
#-------------------------------------------------------------------------------
def initialize_parameters(self):
"""Read all algo input parameters and set up all others required."""
# Read user inputs
self.min_p1 = MIN_P1
self.min_p2 = MIN_P2
self.level_a = LEVEL_A
self.level_b = LEVEL_B
self.level_c = LEVEL_C
self.stop_ix = STOP_IX
self.length = LENGTH
self.stop_ix_r = STOP_IX_R
self.stop_dollar = STOP_DOLLAR
self.atr_length_r = ATR_LENGTH_R
self.risk_pct = RISK_PCT
self.lookback = LOOKBACK
# Create an empty list to hold order info
# This will hold (contract, qty, tag) tuples for the desired orders when
# data was not available at the time an order was initially desired.
self.orders = []
#-------------------------------------------------------------------------------
def add_instruments(self):
"""Add desired instrument data to the algo."""
# Create a dictionary to hold all symbol data objects
self.symbol_data = {}
# Loop through all futures
for root in FUTURES:
# Check if 'QC' data is used
if DATA_SOURCE == 'QC':
# Set the continuous future contract mapping based on input
if CONTINUOUS_MAPPING == 'LastTradingDay':
contract_mapping = DataMappingMode.LastTradingDay
elif CONTINUOUS_MAPPING == 'FirstDayMonth':
contract_mapping = DataMappingMode.FirstDayMonth
elif CONTINUOUS_MAPPING == 'OpenInterest':
contract_mapping = DataMappingMode.OpenInterest
# Set the continuous future pricing based on input
if CONTINUOUS_PRICE_SCALE == 'BackwardsPanamaCanal':
price_scaling = DataNormalizationMode.BackwardsPanamaCanal
elif CONTINUOUS_PRICE_SCALE == 'BackwardsRatio':
price_scaling = DataNormalizationMode.BackwardsRatio
elif CONTINUOUS_PRICE_SCALE == 'Raw':
price_scaling = DataNormalizationMode.Raw
# Add future data and save link to the continuous future
f = self.AddFuture(
root,
extendedMarketHours=True, # defaults to False
dataNormalizationMode = price_scaling,
dataMappingMode = contract_mapping,
contractDepthOffset = 0
)
# Set future contract filter
days = int(10*ROLL_DAYS_BEFORE_EXPIRY)
f.SetFilter(DT.timedelta(0), DT.timedelta(days))
# Set data resolution based on input
# Will be referenced in SymbolData class.
if DATA_RESOLUTION == 'SECOND':
self.resolution = Resolution.Second
elif DATA_RESOLUTION == 'MINUTE':
self.resolution = Resolution.Minute
elif DATA_RESOLUTION == 'HOUR':
self.resolution = Resolution.Hour
# Otherwise 'CUSTOM' data is used
else:
f = self.AddData(MyCustomData, root)
# Create symbol data object for the equity
self.symbol_data[root] = SymbolData(self, root, f)
#-------------------------------------------------------------------------------
def custom_security_initializer(self, security):
"""
Define models to be used for securities as they are added to the
algorithm's universe.
"""
# Define the data normalization mode
security.SetDataNormalizationMode(DataNormalizationMode.Adjusted)
#-------------------------------------------------------------------------------
def OnData(self, data):
"""Built-in event handler for new data."""
# Check if there are any orders to try to place
if len(self.orders) > 0:
# Process orders in the queue
self.ProcessOrders()
#-------------------------------------------------------------------------------
def ProcessOrders(self):
"""Process any orders in the queue once data is available."""
# Loop through copy of order tuples
for tup in self.orders[:]:
# Get individual order tuple values
contract = tup[0]
qty = tup[1]
tag = tup[2]
# Check if data is available for the contract
if self.Securities[contract].HasData:
# Get root symbol and it's SymbolData class instance
root = contract.ID.Symbol
symbol_data = self.symbol_data[root]
# Log message when desired
self.Log(
f"Algo now has data for {contract}. Placing {tag} order "
f"for {qty} contracts."
)
# Place the desired MarketOrder
self.MarketOrder(contract, qty, tag=tag)
# Remove from list
self.orders.remove(tup)
#-------------------------------------------------------------------------------
def OnOrderEvent(self, orderEvent):
"""Built-in event handler for orders."""
# Skip if not filled
if orderEvent.Status != OrderStatus.Filled:
return
# Get the order details
order = self.Transactions.GetOrderById(orderEvent.OrderId)
order_qty = order.Quantity
# Get current qty
qty = self.Portfolio[order.Symbol].Quantity
# Get the current order's average fill price
avg_fill = round(orderEvent.FillPrice, 4)
# Get the symbol_data object for the root symbol
if DATA_SOURCE == 'QC':
symbol = str(order.Symbol)
root = order.Symbol.ID.Symbol
symbol_data = self.symbol_data[root]
else:
# Remove the .MyCustomData
symbol = str(order.Symbol).split(".")[0]
symbol_data = self.symbol_data[symbol]
# Get active stop order id
if symbol_data.stop_order:
stop_order_id = symbol_data.stop_order.OrderId
else:
stop_order_id = 0
# Check for filled stop order
if orderEvent.OrderId == stop_order_id:
# Filled stop loss order - set to None
symbol_data.stop_order = None
# Log message when desired
if PRINT_ORDERS:
self.Log(
f"{symbol} filled stop order for {order_qty} shares @ "
f"{avg_fill}"
)
# Set mp to 0
# symbol_data.mp = 0
return
# Check for a market exit order
elif qty == 0:
# Filled exit order
# symbol_data.stop_order = None
# Log message when desired
if PRINT_ORDERS:
self.Log(f"{symbol} filled exit order for {order_qty} shares @ "
f"{avg_fill}")
# Set mp to 0
# symbol_data.mp = 0
return
# Otherwise qty not 0, so entry order
else:
# Filled entry order
# Get and save the cost basis for the position
cost_basis = avg_fill
symbol_data.cost_basis = cost_basis
# Log message when desired
if PRINT_ORDERS:
if qty > 0:
self.Log(
f"{symbol} filled long entry order for {order_qty} "
f"shares @ {avg_fill}, new qty = {qty}"
)
elif qty < 0:
self.Log(
f"{symbol} filled short entry order for {order_qty} "
f"shares @ {avg_fill}, new qty = {qty}"
)
# Immediately place new stop market order
symbol_data.PlaceStopOrder(qty)#region imports
from AlgorithmImports import *
#endregion
"""
Matt Custom Strategy
Version 1.0.7 for Futures
Platform: QuantConnect
By: Aaron Eller
For: Matt Blonc
www.excelintrading.com
aaron@excelintrading.com
Revision Notes:
1.0.0 (09/08/2021) - Initial
1.0.1 (09/10/2021) - Added 'CUSTOM_TIMES' BAR option.
1.0.2 (09/17/2021) - Added DATA_SOURCE input and custom data logic.
- Modified symbol_data.set_indicators() to handle custom
data consolidators.
- Modified set_backtest_details() and OnOrderEvent() to
handle custom data symbols.
- Modified to work with futures.
1.0.3 (09/22/2021) - Updated PlaceStopOrder() to use atr_r.
- Updated up/down count and up/down support/resistance
logic.
1.0.4 (09/23/2021) - Added logic to prevent reversal entry and exit on same
bar that results in doubling the position size.
1.0.5 (09/24/2021) - Updated to work with custom future's data.
1.0.6 (12/29/2022) - Added CONTINUOUS_MAPPING and CONTINUOUS_PRICE_SCALE.
- Updated SymbolData.CustomCalendar().
- Replaced SymbolData.DailyUSEquityCalendar() with
DailyFutureCalendar.
- Added OPEN_TIMES and CLOSE_TIMES inputs necessary when
custom data is used.
1.0.7 (01/06/2023) - Changed 'QC' data to subscribe to the active contract
to be traded.
- Changed contract roll time check to be handled in
SymbolData class via scheduled function rather than
in QCAlgorithm.OnData().
- Added DATA_RESOLUTION input and logic.
- Added ROLL_TO_NEW_CONTRACT input and logic. This also
added QCAlgorithm.orders list and logic.
- Updated logic to ignore orders for zero quantity to
avoid errors.
References:
-Continuous Futures
https://www.quantconnect.com/forum/discussion/12644/
continuous-futures-support/p1
-QC (Lean) Class List
https://lean-api-docs.netlify.app/annotated.html
-Creating a Custom Indicator
https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1
"""
################################################################################
import datetime as DT
# USER INPUTS
# Backtest details
START_DATE = "01-01-2019" # MM-DD-YYYY format
END_DATE = "10-21-2021" #"12-31-2021" # MM-DD-YYYY format (or None for to current date)
CASH = 400000 # starting portfolio cash amount
TIMEZONE = "US/Central" # e.g. "US/Eastern", "US/Central", "US/Pacific"
# Set the futures to trade
# Be sure to use ALL CAPS
FUTURES = ['ZC','ZS']
# Must tell QC the multiplier if CUSTOM data is used
MULTIPLIERS = {}
MULTIPLIERS['CL'] = 1000
# MULTIPLIERS['ZF'] = ?
# Set the number of days until expiry to trade the next contract
ROLL_DAYS_BEFORE_EXPIRY = 10
#-------------------------------------------------------------------------------
# CONTINUOUS FUTURES INPUTS
# Define the data resolution for the futures to be fed to the algorithm
# Must be "SECOND", "MINUTE", or "HOUR" for futures
# NOTE: 'SECOND' is very slow so 'MINUTE' is optimal for faster testing
DATA_RESOLUTION = 'HOUR'
# Define the desired continuous futures' contract mapping
# This is when contracts are rolled
# Must be one of the following:
# "LastTradingDay": The contract maps on the previous day of expiration of the
# front month.
# "FirstDayMonth": The contract maps on the first date of the delivery month
# of the front month. If the contract expires prior to this date, then it
# rolls on the contract's last trading date instead.
# "OpenInterest": The contract maps when the back month contract has a higher
# traded volume that the current front month.
CONTINUOUS_MAPPING = 'OpenInterest'
# Define the desired continuous futures' price scaling
# Must be one of the following:
# "BackwardsPanamaCanal": Eliminates price jumps between two consecutive
# contracts, adding a factor based on the difference of their prices. Last
# contract is the true one, factor 0
# "BackwardsRatio": Eliminates price jumps between two consecutive contracts,
# multiplying the prices by their ratio. Last contract is the true one,
# factor 1.
# "Raw": No price adjustment is made.
CONTINUOUS_PRICE_SCALE = 'BackwardsPanamaCanal'
# When the algo switches to a new contract, should we immediately take the same
# position that we had with the old contract in the new one?
ROLL_TO_NEW_CONTRACT = True
#-------------------------------------------------------------------------------
# BAR TO TRACK FOR EACH EQUITY
# Tell algorithm to use QC data or custom data
DATA_SOURCE = 'QC' # must be 'QC' or 'CUSTOM'
# Must tell QC the open/close times if CUSTOM data is used
OPEN_TIMES = {}
CLOSE_TIMES = {}
OPEN_TIMES['CL'] = DT.time(18,0)
CLOSE_TIMES['CL'] = DT.time(17,0)
#-------------------------------------------------------------------------------
# The following inputs are used for 'QC' data:
# Use the following format: '5 min', '1 hr', '1 day'
# Or to use CUSTOM_TIMES below, set BAR = 'CUSTOM_TIMES'
BAR = '1 hr'
# Define the custom intraday minutely bars to track
# List specific bar start/stop times
# Only used when BAR above is set to be 'CUSTOM_TIMES'
CUSTOM_TIMES = [
'1800-2200', # 4hrs at market open
'2200-0200', # 4hrs
'0200-0600', # 4hrs
'0600-1000', # 4hrs
'1000-1400', # 4hrs
'1400-1700', # 3hrs to end at market close
]
#-------------------------------------------------------------------------------
# The following inputs are used for 'CUSTOM' data
# Create a dictionary to hold all links to csv custom data
CSV_FILES = {}
# Add link for all EQUITIES listed above
# Note that the link must automatically download the data as a csv file
# If using dropbox remember to add the &dl=1 to trigger a download
CSV_FILES['CL'] = 'https://www.dropbox.com/s/oe1zop5jsmy3uij/QCL%23C.csv?dl=1'
# Set period for custom data
CUSTOM_DATA_PERIOD = DT.timedelta(minutes=60)
# 'days', 'minutes', 'hours' are all possible arguments for DT.timedelta
#-------------------------------------------------------------------------------
# INDICATOR INPUTS
MIN_P1 = 3
MIN_P2 = 8
LEVEL_A = .7
LEVEL_B = .8
LEVEL_C = 1.7
STOP_IX = 6
LENGTH = 40
LOOKBACK = 305 # default setting for MaxBarsBack in Multicharts
# POSITION SIZING INPUTS
# STOP_IX_R = 2
# ATR_LENGTH_R = 160
STOP_IX_R = 6
ATR_LENGTH_R = 40
RISK_PCT = 0.0014
STOP_DOLLAR = 1400
#-------------------------------------------------------------------------------
# Turn on/off logs
PRINT_CX_UPDATES = True # print logs for updating futures' contracts
PRINT_SIGNALS = True # print entry/exit signals
PRINT_ORDERS = True # print order details
################################################################################
############################ END OF ALL USER INPUTS ############################
################################################################################
################################################################################
# VALIDATE USER INPUTS - DO NOT CHANGE BELOW!!!
# Verify start date
try:
START_DT = DT.datetime.strptime(START_DATE, '%m-%d-%Y')
except:
raise ValueError("Invalid START_DATE format ({}). Must be in MM-DD-YYYY "
"format.".format(START_DATE))
# Verify end date
try:
if END_DATE:
END_DT = DT.datetime.strptime(END_DATE, '%m-%d-%Y')
except:
raise ValueError("Invalid END_DATE format ({}). Must be in MM-DD-YYYY "
"format or set to None to run to date.".format(END_DATE))
#-------------------------------------------------------------------------------
# Verify BAR
# First check if 'CUSTOM_TIMES' is used
if BAR == 'CUSTOM_TIMES':
# Create empty list of times to fill
BAR_TIMES = []
# Loop through CUSTOM_TIMES
for time_str in CUSTOM_TIMES:
try:
# Get bar start hour and minutes
start_str = time_str.split('-')[0]
start_hr = int(start_str[:2])
start_min = int(start_str[-2:])
# Get bar end hour and minutes
end_str = time_str.split('-')[1]
end_hr = int(end_str[:2])
end_min = int(end_str[-2:])
# Create a datetime.time object for start and end times
time_start = DT.time(
hour=start_hr, minute=start_min, second=0, microsecond=0)
time_end = DT.time(
hour=end_hr, minute=end_min, second=0, microsecond=0)
# Add (start time, end time) tuple to BAR_DTS list
BAR_TIMES.append((time_start, time_end))
# Set BAR_UNIT to be minutes
BAR_UNIT = 'min'
except:
raise ValueError("Invalid CUSTOM_TIMES entry: {}".format(time_str))
else:
try:
# Get the bar integer and bar unit
BAR_INT = int(BAR.split(" ")[0])
BAR_UNIT = BAR.split(" ")[1].lower()
# Verify bar unit is valid
if BAR_UNIT not in ['min', 'hr', 'day']:
raise ValueError(
"Invalid BAR ({}). Unit must be 'min', 'hr', or 'day'.".format(
BAR))
# Check for hourly bar unit
elif BAR_UNIT == 'hr':
# Convert hr bar int to be minutes
BAR_INT *= 60
# Change the BAR_UNIT to now be 'min'
BAR_UNIT = 'min'
# Only allow '1 day' bars
elif BAR_UNIT == 'day' and BAR_INT != 1:
raise ValueError(
"Invalid BAR ({}). Multiple 'day' bars are not allowed.".format(
BAR))
except:
raise ValueError(
"Invalid BAR: {}. Use '5 min', '1 hr', or '1 day' format".format(
BAR))
# Verify data source
if DATA_SOURCE == 'QC':
pass
elif DATA_SOURCE == 'CUSTOM':
# Make sure all equities have a data source
for future in FUTURES:
if future not in CSV_FILES:
raise ValueError("Using 'CUSTOM' data and no CSV_FILES link defined"
" for {}.".format(equity))
else:
raise ValueError("Invalid DATA_SOURCE ({}). Must be 'QC' or "
"'CUSTOM'.".format(DATA_SOURCE))
#-------------------------------------------------------------------------------
# Verify DATA_RESOLUTION input
DATA_RESOLUTION = DATA_RESOLUTION.upper()
resolutions = ['SECOND', 'MINUTE', 'HOUR']
if DATA_RESOLUTION not in resolutions:
raise ValueError(
f"Invalid DATA_RESOLUTION ({DATA_RESOLUTION}). Must be: {resolutions}"
)
#-------------------------------------------------------------------------------
# Verify CONTINUOUS_MAPPING input
maps = ["LastTradingDay", "FirstDayMonth", "OpenInterest"]
if CONTINUOUS_MAPPING not in maps:
raise ValueError(
f"Invalid CONTINUOUS_MAPPING ({CONTINUOUS_MAPPING}). Must be: {maps}"
)
#-------------------------------------------------------------------------------
# Verify CONTINUOUS_PRICE_SCALE input
scales = ["BackwardsPanamaCanal", "BackwardsRatio", "Raw"]
if CONTINUOUS_PRICE_SCALE not in scales:
raise ValueError(
f"Invalid CONTINUOUS_PRICE_SCALE ({CONTINUOUS_PRICE_SCALE}). Must be: "
f"{scales}"
)
#-------------------------------------------------------------------------------
# Calculate the minimum number of bars required to warm up the indicators
WARMUP_DAYS = int(1*max(ATR_LENGTH_R, LENGTH))
# Set the start date based on the desired days to warm up the algo
# approximately 252 market days per 365 calendar days
# CALENDAR_DAYS = int(WARMUP_DAYS*(365/252))
CALENDAR_DAYS = 1
BACKTEST_START_DT = START_DT - DT.timedelta(days=CALENDAR_DAYS)#region imports
from AlgorithmImports import *
#endregion
# Standard library imports
import datetime as DT
import math
import numpy as np
import pytz
import random
# Import from files
from notes_and_inputs import *
################################################################################
class SymbolData(object):
"""Class to store data for a specific security symbol."""
def __init__(self, algo, root, continuous_contract):
"""Initialize SymbolData object."""
# Save the parameters
self.algo = algo
self.symbol = root
self.continuous_contract = continuous_contract
self.symbol_object = continuous_contract.Symbol
self.security = continuous_contract.Symbol
# Add strategy variables
self.add_strategy_variables()
# Get the exchange info
self.get_exchange_info()
# Add indicators
self.add_indicators()
# Schedule functions
self.schedule_functions()
#-------------------------------------------------------------------------------
def add_strategy_variables(self):
"""Add strategy variables required."""
# Create a variables to hold the contract details
self.consolidator = None
self.contract = None
self.min_tick = None
# Save the multiplier from the inputs if custom data is used
if DATA_SOURCE == 'CUSTOM':
self.multiplier = MULTIPLIERS[self.symbol]
self.have_contract_specs = True
else:
# We cannot initially get the contract specs, because we don't have
# access to an actual contract upon initialization.
# So set have_contract_specs variable False
self.have_contract_specs = False
#-------------------------------------------------------------------------------
def get_exchange_info(self):
"""Get the security's exchange info."""
# Get the security symbol
security = self.security
# Get the SecurityExchangeHours Class object for the symbol
self.exchange_hours = self.algo.Securities[security].Exchange.Hours
if DATA_SOURCE == 'CUSTOM':
# Create a datetime I know the market was open
# Add the open time for the root symbol
open_time = OPEN_TIMES[self.symbol]
open_dt = DT.datetime(
2021, 1, 4,
hour=open_time.hour,
minute=open_time.minute
)
close_time = CLOSE_TIMES[self.symbol]
if close_time > open_time:
close_dt = DT.datetime(
2021, 1, 4,
hour=close_time.hour,
minute=close_time.minute
)
else:
close_dt = DT.datetime(
2021, 1, 5,
hour=close_time.hour,
minute=close_time.minute
)
# Create a datetime I know the market was open (that next Monday)
dt = DT.datetime(2021, 1, 4)
else:
# Create a datetime I know the market was closed (Saturday)
dt = DT.datetime(2021, 1, 2)
# Get typical open time from the SecurityExchangeHours Class
open_dt = self.exchange_hours.GetNextMarketOpen(
dt, extendedMarket=True)
# Create a datetime I know the market was open (that next Monday)
dt = DT.datetime(2021, 1, 4)
# Get next close datetime and time
close_dt = self.exchange_hours.GetNextMarketClose(
dt, extendedMarket=True)
self.mkt_open = open_dt.time()
self.mkt_close = close_dt.time()
# Check if there is a trading halt that needs to be considered
if DATA_SOURCE == 'QC':
next_open_dt = self.exchange_hours.GetNextMarketOpen(
close_dt, extendedMarket=True)
next_open_time = next_open_dt.time()
if self.mkt_open != next_open_time:
# This open time is continuing after a trading halt
# Get the next period's close time and save as main close time
close_dt = \
self.exchange_hours.GetNextMarketClose(
next_open_dt, extendedMarket=True)
self.mkt_close = close_dt.time()
# Get the typical period in minutes
self.period_minutes = (close_dt-open_dt).seconds/60
# Get the exchange timezone
self.mkt_tz = pytz.timezone(str(self.exchange_hours.TimeZone))
# Create pytz timezone objects for the exchange tz and local tz
exchange_tz = self.mkt_tz
local_tz = pytz.timezone(TIMEZONE)
# Get the difference in the timezones
# REF: http://pytz.sourceforge.net/#tzinfo-api
# for pytz timezone.utcoffset() method
# 3600 seconds/hour
exchange_utc_offset_hrs = int(exchange_tz.utcoffset(dt).seconds/3600)
local_utc_offset_hrs = int(local_tz.utcoffset(dt).seconds/3600)
self.offset_hrs = exchange_utc_offset_hrs-local_utc_offset_hrs
# NOTE: offset hours are very helpful if you want to schedule functions
# around market open/close times
# Get the market open and close times for the local time zone
self.mkt_open_local_tz = \
(open_dt-DT.timedelta(hours=self.offset_hrs)).time()
self.mkt_close_local_tz = \
(close_dt-DT.timedelta(hours=self.offset_hrs)).time()
#-------------------------------------------------------------------------------
def add_indicators(self):
"""Set up the security's indicators to be updated on the desired BAR."""
# Set up the ATRs for the equity
# self.atr = AverageTrueRange(
# self.algo.length, MovingAverageType.Exponential)
self.atr = AverageTrueRange(
self.algo.length, MovingAverageType.Simple
)
# self.atr_r = AverageTrueRange(
# self.algo.atr_length_r, MovingAverageType.Exponential)
self.atr_r = AverageTrueRange(
self.algo.atr_length_r, MovingAverageType.Simple
)
# REF: https://lean-api-docs.netlify.app/MovingAverageTypeExtensions_8cs_source.html
# Keep a rolling window of the last length bars
self.lookback = self.algo.lookback
self.bars = RollingWindow[TradeBar](self.lookback)
# Keep a link to the active stop loss order
self.stop_order = None
# Save the positions active cost basis
# Will use this actual amount for the stop loss instead of using the
# assumed fill price at the bar's open
self.cost_basis = None
# Initialize other variables required for the algo
self.mp = 0
self.up_s_exit = -99999
self.dn_s_exit = 99999
self.up_f_exit = -99999
self.dn_f_exit = 99999
self.up_signal = 0
self.dn_signal = 0
self.up_fix = 0
self.dn_fix = 0
self.up_ct = 1
self.dn_ct = 1
self.up_res = 99999
self.dn_res = 99999
self.up_sup = -99999
self.dn_sup = -99999
self.p1_up_ready = False
self.p1_dn_ready = False
self.up_f = -99999
self.dn_f = 99999
self.p1_up_f = 99999
self.p1_dn_f = -99999
self.p1_up_e = 0
self.p1_dn_e = 0
self.p1_up_noise_a = 0
self.p1_dn_noise_a = 0
self.p1_up_noise_b = 0
self.p1_dn_noise_b = 0
self.p1_up_noise_c = 0
self.p1_dn_noise_c = 0
self.p1_up_entry = 99999
self.p1_dn_entry = -99999
self.reversal = 0
self.p2_up_ready = False
self.p2_dn_ready = False
self.p2_up_e = 0
self.p2_dn_e = 0
self.p2_up_noise_a = 0
self.p2_dn_noise_a = 0
self.p2_up_noise_b = 0
self.p2_dn_noise_b = 0
self.p2_up_noise_c = 0
self.p2_dn_noise_c = 0
self.up_swing = 0
self.dn_swing = 0
self.up_e = 0
self.dn_e = 0
self.up_noise_a = 0
self.dn_noise_a = 0
self.up_noise_b = 0
self.dn_noise_b = 0
self.up_noise_c = 0
self.dn_noise_c = 0
self.up_entry = 99999
self.dn_entry = -99999
self.up_ap = 99999
self.dn_ap = 99999
self.up_am = -99999
self.dn_am = -99999
self.up_bp = 99999
self.dn_bp = 99999
self.up_bm = -99999
self.dn_bm = -99999
self.up_cp = 99999
self.dn_cp = 99999
self.up_cm = -99999
self.dn_cm = -99999
self.contracts = 0
self.counter = 0 ##New
#-------------------------------------------------------------------------------
def schedule_functions(self):
"""Schedule required functions for the class."""
if DATA_SOURCE == 'QC':
# Schedule event to check for time to roll future contracts
# Rather than checking for data.SymbolChangedEvents in OnData (slow)
# check once per day, 1 minute after the open
roll_time = (
DT.datetime.combine(
DT.datetime.today(), self.mkt_open_local_tz
) + DT.timedelta(minutes=1)
).time()
self.algo.Schedule.On(
self.algo.DateRules.EveryDay(self.symbol_object),
self.algo.TimeRules.At(roll_time.hour, roll_time.minute),
self.contract_roll_check
)
else: # 'CUSTOM' data
# Create a tradebar consolidator based on custom data
c = self.algo.ResolveConsolidator(self.symbol, CUSTOM_DATA_PERIOD)
# Event handler to be called on each new consolidated bar
c.DataConsolidated += self.OnDataConsolidated
# Link the consolidator with our contract and add it to the manager
self.algo.SubscriptionManager.AddConsolidator(self.symbol, c)
#-------------------------------------------------------------------------------
def contract_roll_check(self):
"""
Check if the current contract needs to be rolled.
This is only called if we are using 'QC' data.
"""
# Catch if the current contract has not be set yet (start)
if not self.contract:
# Update the current contract
self.update_current_contract()
return
# Check for change in the current contract
if self.contract != self.continuous_contract.Mapped:
# Save old contract and new contract
new_contract = self.continuous_contract.Mapped
old_contract = self.contract
# Get the qty of the old contract (old contract still referenced)
qty = self.current_qty
# Log message when desired
self.algo.Log(
f"{self.symbol} contract to trade changing from {old_contract}"
f" to {new_contract}"
)
# Update the current contract
self.update_current_contract(new_contract)
# Check for an open position for the old contract
if qty != 0:
# Log message when desired
self.algo.Log(f"Closing {qty} of {old_contract}")
# Place order to exit the old contract
self.algo.MarketOrder(old_contract, -qty, tag='roll exit')
# Check if we want to roll to new contract
if ROLL_TO_NEW_CONTRACT:
# Log message when desired
self.algo.Log(f"Opening {qty} of {new_contract}")
tag = 'roll entry'
# Check if there is data available for the new contract
if self.algo.Securities[new_contract].HasData:
# Place order to enter the new contract
self.algo.MarketOrder(new_contract, qty, tag=tag)
else:
# Otherwise add the desired order to the list of orders
# to process asap once we have data
self.algo.orders.append((new_contract, qty, tag))
#-------------------------------------------------------------------------------
def update_current_contract(self, new_contract=None):
"""
Update the current contract to the new desired contract.
This is only called if we are using 'QC' data.
"""
# Check if new_contract is not passed, use the mapped contract
if not new_contract:
if self.continuous_contract.Mapped:
new_contract = self.continuous_contract.Mapped
else:
return
# Remove the previous trade bar consolidator (if one)
# Have not updated self.contract yet, so referencing old contract
if self.consolidator:
self.algo.SubscriptionManager.RemoveConsolidator(
self.contract, self.consolidator
)
# Check if need to get the contract specs
if not self.have_contract_specs:
self.get_contract_specs()
self.have_contract_specs = True
# We need to subscribe to the new contract's data!
self.algo.AddFutureContract(
new_contract,
self.algo.resolution,
extendedMarketHours=True, # defaults to False
)
# Create the desried trade bar consolidator
self.future_calendar_initialized = False
if BAR_UNIT == 'min':
if BAR == 'CUSTOM_TIMES':
c = TradeBarConsolidator(self.CustomCalendar)
else:
c = TradeBarConsolidator(timedelta(minutes=BAR_INT))
elif BAR_UNIT == 'day':
c = TradeBarConsolidator(self.DailyFutureCalendar)
# Create an event handler to be called on each new consolidated bar
c.DataConsolidated += self.OnDataConsolidated
# Link the consolidator with our contract and add it to the manager
self.algo.SubscriptionManager.AddConsolidator(new_contract, c)
# Save the contract consolidator for future reference
self.consolidator = c
self.contract = new_contract
#-------------------------------------------------------------------------------
def get_contract_specs(self):
"""
Get the future's contract specs.
This is only called if we are using 'QC' data.
"""
if self.continuous_contract.Mapped:
# Get the SymbolProperties of the current mapped contract
contract = self.continuous_contract.Mapped
symbol_properties = self.algo.Securities[contract].SymbolProperties
# Get and save the contract specs
self.min_tick = symbol_properties.MinimumPriceVariation
self.multiplier = symbol_properties.ContractMultiplier
self.description = symbol_properties.Description
self.tick_value = self.min_tick*self.multiplier
self.point_value = self.tick_value/self.min_tick
# Log message when desired
self.algo.Log(
f"get_contract_specs() for {self.symbol}, min tick="
f"{self.min_tick}, multiplier={self.multiplier}, description="
f"{self.description}, tick value={self.tick_value}, point value="
f"{self.point_value}"
)
#-------------------------------------------------------------------------------
def OnDataConsolidated(self, sender, bar):
"""Event handler for desired custom bars."""
# Skip if 'QC' data and the bar is not for our current contract
if DATA_SOURCE == 'QC' and (bar.Symbol != self.contract):
return
# Ignore bar's with no volume -> can be invalid around holidays
if bar.Volume == 0:
return
# Get the symbol of the new custom bar
symbol = str(bar.Symbol)
# Get previous atr value before updating it
try:
previous_atr = self.atr.Current.Value
except:
# Exception happens on the very first bar fed to the algo
previous_atr = None
# Update the 2 ATRs
self.atr.Update(bar)
self.atr_r.Update(bar)
# Do not continue if an atr is not ready or not to backtest start date
if not self.atr.IsReady or not self.atr_r.IsReady \
or self.algo.Time < START_DT:
# Add bar to rolling window and return
self.bars.Add(bar)
return
# Check for a previous bar to compare to
if len(list(self.bars)) > 0:
# Get last bar for reference
# Most recent bar is at beginning of the rolling window, so use [0]
last_bar = self.bars[0]
# Get previous values before updating them
# These are all of the TradeStation variables with [1] reference
previous_mp = self.mp
previous_reversal = self.reversal
previous_p1_up_ready = self.p1_up_ready
previous_p1_dn_ready = self.p1_dn_ready
previous_up_ap = self.up_ap
previous_dn_am = self.dn_am
previous_up_swing = self.up_swing
previous_dn_swing = self.dn_swing
previous_up_cm = self.up_cm
previous_up_bm = self.up_bm
previous_dn_cp = self.dn_cp
previous_dn_bp = self.up_bp
previous_p1_up_entry = self.p1_up_entry
previous_p1_dn_entry = self.p1_dn_entry
previous_p1_up_f = self.p1_up_f
previous_p1_dn_f = self.p1_dn_f
previous_p1_up_e = self.p1_up_e
previous_p1_dn_e = self.p1_dn_e
previous_p1_up_noise_a = self.p1_up_noise_a
previous_p1_up_noise_b = self.p1_up_noise_b
previous_p1_up_noise_c = self.p1_up_noise_c
previous_p1_dn_noise_a = self.p1_dn_noise_a
previous_p1_dn_noise_b = self.p1_dn_noise_b
previous_p1_dn_noise_c = self.p1_dn_noise_c
# 1. Market Position
# Exit
# up_s_exit, up_f_exit, dn_s_exit, and dn_f_exit have not changed
# yet, so not saving the "previous values"
if previous_mp == 1 and last_bar.Low < self.up_f_exit:
self.mp = 0
if PRINT_SIGNALS:
self.algo.Log(f"{bar.Time} Previous MP=1, Previous Low < UpFExit, so "
"now MP=0")
if previous_mp == -1 and last_bar.High > self.dn_f_exit:
self.mp = 0
if PRINT_SIGNALS:
self.algo.Log(f"{bar.Time} Previous MP=-1, Previous High > DnFExit, so "
"now MP=0")
##EL CODE NOT CONVERTED
# if (RRRatio<>0) then begin
# if (MP[1]=1 and High[1]>=UpTExit[1]) then MP=0;
# if (MP[1]=-1 and Low[1]<=DnTExit[1]) then MP=0;
# end;
# Entry
# up_signal and dn_signal have not changed yet, so not saving the
# "previous values"
if previous_mp < 1 and self.up_signal > 0:
self.mp = 1
if PRINT_SIGNALS:
self.algo.Log(f"{bar.Time} Previous MP<1, Previous UpSignal, so now "
"MP=1")
if previous_mp > -1 and self.dn_signal < 0:
self.mp = -1
if PRINT_SIGNALS:
self.algo.Log(f"{bar.Time} Previous MP>-1, Previous DnSignal, so now "
"MP=-1")
# Stop
# up_s_exit and dn_s_exithave not changed yet, so not saving the
# "previous values"
if self.mp == 1 and bar.Low <= self.up_s_exit:
self.mp = 0
if PRINT_SIGNALS:
self.algo.Log(f"{bar.Time} current MP=1, previous Low < UpSExit, so now "
"MP=0")
if self.mp == -1 and bar.High >= self.dn_s_exit:
self.mp = 0
if PRINT_SIGNALS:
self.algo.Log(f"{bar.Time} current MP=-1, previous High < DnSExit, so now "
"MP=0")
# 2. Fractal Index
if round(self.atr.Current.Value,2) != 0:
atr = round(self.atr.Current.Value,2)
else:
atr = self.atr.Current.Value
self.up_fix = 0
self.dn_fix = 0
############# Up FX: check for a new higher high#############################################################################
if bar.High > last_bar.High:
self.up_ct = 1
# Loop through a list of historical bars
historical_bars = list(self.bars)
# NOTE: rolling window to list has bars in newest to oldest order
# Loop through a list of historical bars
for historical_bar in historical_bars:
if (bar.High >= historical_bar.High) \
and (self.up_ct < self.lookback):
# Increment up count
self.up_ct += 1
else:
break
# Check if up count is only 1
if self.up_ct == 1:
bars = [bar]
else:
# Get last up_ct bars-1
bars = historical_bars[:self.up_ct-1]
# And add the current bar
bars.append(bar)
self.up_sup = 99999999
#loop
for counter in range(0, self.up_ct, 1):
if self.bars[counter].Low < self.up_sup:
self.up_sup = self.bars[counter].Low
# Get up support and resistance
# self.up_res = max([bar.High for bar in bars])
self.up_res = bar.High
# Update up fIX
self.up_fix = (self.up_res-self.up_sup)/atr
############# Down FX: check for a new lower low#############################################################################
if bar.Low < last_bar.Low:
self.dn_ct = 1
# Loop through a list of historical bars
historical_bars = list(self.bars)
# NOTE: rolling window to list has bars in newest to oldest order
# Loop through a list of historical bars
for historical_bar in historical_bars:
if (bar.Low <= historical_bar.Low) \
and (self.dn_ct < self.lookback):
# Increment down count
self.dn_ct += 1
else:
break
# Check if down count is only 1
if self.dn_ct == 1:
bars = [bar]
else:
# Get last dn_ct bars-1
bars = historical_bars[:self.dn_ct]
# And add the current bar
bars.append(bar)
self.dn_res = -99999999
#loop
for counter in range(0, self.dn_ct, 1):
if self.bars[counter].High > self.dn_res:
self.dn_res = self.bars[counter].High
# Get down support and resistance
self.dn_sup = bar.Low
# Update dn fix
self.dn_fix = (self.dn_res-self.dn_sup)/atr
# 3. Setup
# Up P1
if self.dn_ct >= self.lookback:
self.p1_up_ready = False
else:
if self.dn_fix >= self.algo.min_p1:
if self.up_f <= self.dn_res:
self.p1_up_ready = True
self.p1_up_f = self.dn_res
self.p1_up_e = (self.dn_res+self.dn_sup)*0.50
dist = self.dn_res-self.dn_sup
self.p1_up_noise_a = dist * self.algo.level_a
self.p1_up_noise_b = dist * self.algo.level_b
self.p1_up_noise_c = dist * self.algo.level_c
self.p1_up_entry = self.p1_up_e \
+ self.p1_up_noise_a*0.50
# Down P1
if self.up_ct >= self.lookback:
self.p1_dn_ready = False
else:
if self.up_fix >= self.algo.min_p1:
if self.dn_f >= self.up_sup:
self.p1_dn_ready = True
self.p1_dn_f = self.up_sup
self.p1_dn_e = (self.up_res+self.up_sup)*0.50
dist = self.up_res-self.up_sup
self.p1_dn_noise_a = dist * self.algo.level_a
self.p1_dn_noise_b = dist * self.algo.level_b
self.p1_dn_noise_c = dist * self.algo.level_c
self.p1_dn_entry = self.p1_dn_e \
- self.p1_dn_noise_a*0.50
# Up P2
if self.up_fix >= self.algo.min_p2:
if previous_reversal == 0:
self.reversal = 1
if previous_reversal == -1:
self.reversal = 1
if self.mp < 1:
self.p2_up_ready = True
self.p2_up_e = bar.High - self.algo.min_p2*atr*0.50
self.p2_up_noise_a = \
self.algo.min_p2*atr*self.algo.level_a
self.p2_up_noise_b = \
self.algo.min_p2*atr*self.algo.level_b
self.p2_up_noise_c = \
self.algo.min_p2*atr*self.algo.level_c
# Down P2
if self.dn_fix >= self.algo.min_p2:
if previous_reversal == 0:
self.reversal = -1
if previous_reversal == 1:
self.reversal = -1
if self.mp > -1:
self.p2_dn_ready = True
self.p2_dn_e = bar.Low + self.algo.min_p2*atr*0.50
self.p2_dn_noise_a = \
self.algo.min_p2*atr*self.algo.level_a
self.p2_dn_noise_b = \
self.algo.min_p2*atr*self.algo.level_b
self.p2_dn_noise_c = \
self.algo.min_p2*atr*self.algo.level_c
# 4. Entry
self.up_signal = 0
self.dn_signal = 0
# Up P1
if previous_p1_up_ready and bar.High >= previous_p1_up_entry:
self.up_swing = 1
self.up_signal = 1
self.up_f = previous_p1_up_f
self.up_e = previous_p1_up_e
self.up_noise_a = previous_p1_up_noise_a
self.up_noise_b = previous_p1_up_noise_b
self.up_noise_c = previous_p1_up_noise_c
self.up_entry = previous_p1_up_entry
self.up_ap = self.up_e + self.up_noise_a*0.50
self.up_bp = self.up_e + self.up_noise_b*0.50
self.up_cp = self.up_e + self.up_noise_c*0.50
self.up_am = self.up_e - self.up_noise_a*0.50
self.up_bm = self.up_e - self.up_noise_b*0.50
self.up_cm = self.up_e - self.up_noise_c*0.50
self.p1_up_ready = False
self.p2_up_ready = False
# Down P1
if previous_p1_dn_ready and bar.Low <= previous_p1_dn_entry:
self.dn_swing = -1
self.dn_signal = -1
self.dn_f = previous_p1_dn_f
self.dn_e = previous_p1_dn_e
self.dn_noise_a = previous_p1_dn_noise_a
self.dn_noise_b = previous_p1_dn_noise_b
self.dn_noise_c = previous_p1_dn_noise_c
self.dn_entry = previous_p1_dn_entry
self.dn_ap = self.dn_e + self.dn_noise_a*0.50
self.dn_bp = self.dn_e + self.dn_noise_b*0.50
self.dn_cp = self.dn_e + self.dn_noise_c*0.50
self.dn_am = self.dn_e - self.dn_noise_a*0.50
self.dn_bm = self.dn_e - self.dn_noise_b*0.50
self.dn_cm = self.dn_e - self.dn_noise_c*0.50
self.p1_dn_ready = False
self.p2_dn_ready = False
# Up P2
if self.p2_up_ready:
self.up_swing = 2
self.up_signal = 2
self.up_f = -99999
self.up_e = self.p2_up_e
self.up_noise_a = self.p2_up_noise_a
self.up_noise_b = self.p2_up_noise_b
self.up_noise_c = self.p2_up_noise_c
self.up_entry = bar.High
self.up_ap = self.up_e + self.up_noise_a*0.50
self.up_bp = self.up_e + self.up_noise_b*0.50
self.up_cp = self.up_e + self.up_noise_c*0.50
self.up_am = self.up_e - self.up_noise_a*0.50
self.up_bm = self.up_e - self.up_noise_b*0.50
self.up_cm = self.up_e - self.up_noise_c*0.50
self.p1_up_ready = False
self.p2_up_ready = False
# Down P2
if self.p2_dn_ready:
self.dn_swing = -2
self.dn_signal = -2
self.dn_f = 99999
self.dn_e = self.p2_dn_e
self.dn_noise_a = self.p2_dn_noise_a
self.dn_noise_b = self.p2_dn_noise_b
self.dn_noise_c = self.p2_dn_noise_c
self.dn_entry = bar.Low
self.dn_ap = self.dn_e + self.dn_noise_a*0.50
self.dn_bp = self.dn_e + self.dn_noise_b*0.50
self.dn_cp = self.dn_e + self.dn_noise_c*0.50
self.dn_am = self.dn_e - self.dn_noise_a*0.50
self.dn_bm = self.dn_e - self.dn_noise_b*0.50
self.dn_cm = self.dn_e - self.dn_noise_c*0.50
self.p1_dn_ready = False
self.p2_dn_ready = False
# 5. Re-entry
# Up
if self.up_signal == 0 and self.mp < 1 and previous_mp < 1:
if self.up_swing > 0 and bar.High >= previous_up_ap:
self.up_signal = 3
# Down
if self.dn_signal == 0 and self.mp > -1 and previous_mp > -1:
if self.dn_swing < 0 and bar.Low <= previous_dn_am:
self.dn_signal = -3
# 6. End
# Up
if self.up_signal == 0 and previous_up_swing > 0:
if (self.dn_fix >= min(self.algo.min_p1, self.algo.min_p2)) \
and (bar.Low < min(previous_up_bm, previous_up_cm)):
self.up_swing = 0
self.up_f = -99999
# Down
if self.dn_signal == 0 and previous_dn_swing < 0:
if (self.up_fix >= min(self.algo.min_p1, self.algo.min_p2)) \
and (bar.High > max(previous_dn_bp,previous_dn_cp)):
self.dn_swing = 0
self.dn_f = 99999
# 7. Trailing
# Up
if self.up_swing > 0:
self.up_ap = max(self.up_ap, bar.High)
self.up_bp = max(self.up_bp, bar.High)
self.up_cp = max(self.up_cp, bar.High)
self.up_am = self.up_ap-self.up_noise_a
self.up_bm = self.up_bp-self.up_noise_b
self.up_cm = self.up_cp-self.up_noise_c
# Down
if self.dn_swing < 0:
self.dn_am = min(self.dn_am, bar.Low)
self.dn_bm = min(self.dn_bm, bar.Low)
self.dn_cm = min(self.dn_cm, bar.Low)
self.dn_ap = self.dn_am+self.dn_noise_a
self.dn_bp = self.dn_bm+self.dn_noise_b
self.dn_cp = self.dn_cm+self.dn_noise_c
# 8. Exit
# Up
if self.up_swing > 0:
if self.up_entry >= self.up_bm:
self.up_f_exit = min(self.up_entry, self.up_bm)
else:
self.up_f_exit = max(self.up_entry, self.up_cm)
# Down
if self.dn_swing < 0:
if self.dn_entry <= self.dn_bp:
self.dn_f_exit = max(self.dn_entry, self.dn_bp)
else:
self.dn_f_exit = min(self.dn_entry, self.dn_cp)
# 9. Stop Exit
# Instead of delaying the stop exit like TradeStation logic does,
# the stop loss order is placed immediately after an entry order is
# filled
# 10. Mixed signals
if self.up_signal > 0 and self.dn_signal < 0:
self.up_signal = 0
self.dn_signal = 0
# 11. Entry/Re-entry/Exit
current_shares = self.current_qty
entry = False
if self.mp < 1 and self.up_signal >= 1:
# Get the signal
if self.up_signal == 1:
signal = "L1"
elif self.up_signal == 2:
signal = "L2"
elif self.up_signal == 3:
signal = "L3"
# Get desired position size
self.GetPositionSize()
# Get order qty
desired_shares = self.contracts
order_qty = desired_shares-current_shares
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} {signal} Entry Signal: "
f"Have {current_shares} cxs, want {desired_shares}, "
f"so order cxs = {order_qty}"
)
# Cancel previous stop order (if one)
self.CancelStopOrder()
# Place market buy order if order qty not 0
if order_qty != 0:
if DATA_SOURCE == 'QC':
self.algo.MarketOrder(self.contract, order_qty)
else:
self.algo.MarketOrder(self.security, order_qty)
entry = True
elif self.mp > -1 and self.dn_signal <= -1:
# Get the signal
if self.dn_signal == -1:
signal = "S1"
elif self.dn_signal == -2:
signal = "S2"
elif self.dn_signal == -3:
signal = "S3"
# Get desired position size
self.GetPositionSize()
# Get order qty
desired_shares = -self.contracts
order_qty = desired_shares-current_shares
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} {signal} Entry Signal: "
f"Have {current_shares} shares, want {desired_shares}, "
f"so order shares = {order_qty}"
)
# Cancel previous stop order (if one)
self.CancelStopOrder()
# Place market sell order if order qty not 0
if order_qty != 0:
if DATA_SOURCE == 'QC':
self.algo.MarketOrder(self.contract, order_qty)
else:
self.algo.MarketOrder(self.security, order_qty)
entry = True
# Exit
if not entry:
# Check for active long position
if self.mp == 1 and current_shares > 0:
# NOTE: stop loss order is placed immediately after an entry
# order is filled
# Check for exit signal
if bar.Low < self.up_f_exit:
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} LXF Exit Signal: "
f"Sell {current_shares} shares"
)
self.ExitPosition(current_shares)
# Check for active short position
elif self.mp == -1 and current_shares < 0:
# NOTE: stop loss order is placed immediately after an entry
# order is filled
# Check for exit signal
if bar.High > self.dn_f_exit:
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} SXF Exit Signal: "
f"Buy to cover {current_shares} shares"
)
self.ExitPosition(current_shares)
# Add bar to rolling window
self.bars.Add(bar)
#-------------------------------------------------------------------------------
def ExitPosition(self, current_shares):
"""Immediately exit an active position."""
self.CancelStopOrder()
# Place a market order to close position
if DATA_SOURCE == 'QC':
self.algo.MarketOrder(self.contract, -current_shares)
else:
self.algo.MarketOrder(self.security, -current_shares)
# # Set mp to 0
# self.mp = 0
#-------------------------------------------------------------------------------
def CancelStopOrder(self):
"""Cancel the open stop order."""
if self.stop_order:
# Log message when desired
if PRINT_ORDERS:
self.algo.Log(f"Cancelling {self.symbol} stop order.")
self.stop_order.Cancel()
# Set back to None
self.stop_order = None
#-------------------------------------------------------------------------------
def PlaceStopOrder(self, current_shares):
"""Place the stop loss order."""
# Get the previous completee bar's atr
atr = self.atr_r.Current.Value
# Check for long position
if current_shares > 0:
# Use up stop
stop = self.up_s_exit = \
round(self.cost_basis-self.algo.stop_ix*atr,2)
position = 'long'
elif current_shares < 0:
# Use down stop
stop = self.dn_s_exit = \
round(self.cost_basis+self.algo.stop_ix*atr,2)
position = 'short'
else:
# Don't continue if no position
return
# Place stop market order for -current shares at the desired stop price
if DATA_SOURCE == 'QC':
symbol = self.contract
else:
symbol = self.security
self.stop_order = \
self.algo.StopMarketOrder(symbol, -current_shares, stop)
# Log message when desired
if PRINT_ORDERS:
self.algo.Log(
f"{self.symbol} {position} stop order for {-current_shares}"
f" shares placed at {stop}"
)
#-------------------------------------------------------------------------------
def GetPositionSize(self):
"""Get the desired position size."""
# atr_r = self.atr_r.Current.Value
# # risk = self.algo.risk_pct*self.algo.Portfolio.TotalPortfolioValue
# risk = self.algo.stop_dollar
# n_rough = risk/(self.algo.stop_ix_r*atr_r*self.multiplier)
# nr = int(n_rough)
# self.contracts = round(nr, 0)
if round(self.atr_r.Current.Value,2) != 0:
atr_r = round(self.atr_r.Current.Value,2)
else:
atr_r = self.atr_r.Current.Value
sizing = 1500
self.contracts = math.trunc(round(sizing/(self.algo.stop_ix_r*atr_r*self.multiplier),2))
#-------------------------------------------------------------------------------
def DailyFutureCalendar(self, dt):
"""
Set up daily consolidator calendar info for the US future market.
This should return a start datetime object that is timezone unaware
with a valid date/time for the desired securities' exchange's time zone.
Useful Refs:
datetime.replace() method:
https://docs.python.org/3/library/datetime.html#datetime.datetime.replace
"""
# Need to handle case where algo initializes and this function is called
# for the first time.
if not self.future_calendar_initialized:
# Since this doesn't matter, we'll pass dt as start and one day
# as the timedelta until end_dt
start_dt = dt
end_dt = start_dt + DT.timedelta(1)
self.future_calendar_initialized = True
return CalendarInfo(start_dt, end_dt-start_dt)
# Create a datetime.datetime object to represent the market open for the
# **EXCHANGE** timezone
start = dt.replace(hour=self.mkt_open.hour, minute=self.mkt_open.minute,
second=0, microsecond=0)
# Set end based on typical daily period
end = start + DT.timedelta(minutes=self.period_minutes)
# Catch when start is after the passed dt
# QC now throws an error in this case
if start > dt:
# To handle the QC error, pass period for no data
# Set the end to be the next desired start
end = start
# And set start to dt to avoid QC throwing error
start = dt
# This will result in the next dt being the desired start time
# Return the start datetime and the consolidation period
return CalendarInfo(start, end-start)
#-------------------------------------------------------------------------------
def CustomCalendar(self, dt):
"""
Set up custom consolidator calendar info.
This should return a start datetime object that is timezone unaware
with a valid date/time for the desired securities' exchange's time zone.
Useful Refs:
datetime.replace() method:
https://docs.python.org/3/library/datetime.html#datetime.datetime.replace
"""
# Need to handle case where algo initializes and this function is called
# for the first time.
if not self.future_calendar_initialized:
# Since this doesn't matter, we'll pass dt as start and one day
# as the timedelta until end_dt
start_dt = dt
end_dt = start_dt + DT.timedelta(1)
self.future_calendar_initialized = True
return CalendarInfo(start_dt, end_dt-start_dt)
# Set start and end to None
start = None
end = None
# Loop through all BAR_TIMES
for tup in BAR_TIMES:
# Check for dt time before the end time
if dt.time() < tup[1]:
# Set the start time to be the bar's start time
start = dt.replace(hour=tup[0].hour, minute=tup[0].minute)
# Set the end time to be the bar's end time
end = dt.replace(hour=tup[1].hour, minute=tup[1].minute)
# Break from loop
break
# Make sure that start and end are set
if not start or not end:
# Get the first bar start and end times
start_time = BAR_TIMES[0][0]
end_time = BAR_TIMES[0][1]
# Set the start time to be the bar's start time
start = dt.replace(hour=start_time.hour, minute=start_time.minute)
# Set the end time to be the bar's end time
end = dt.replace(hour=end_time.hour, minute=end_time.minute)
# Make sure that the end is after the start
# If not, add a day to the end
if end < start:
end = end + DT.timedelta(1)
# Catch when start is after the passed dt
# QC now throws an error in this case
if start > dt:
# To handle the QC error, pass period for no data
# Set the end to be the next desired start
end = start
# And set start to dt to avoid QC throwing error
start = dt
# This will result in the next dt being the desired start time
# return start datetime and period
return CalendarInfo(start, end-start)
#-------------------------------------------------------------------------------
@property
def current_qty(self):
"""Return the current contract quantity held in the portfolio."""
# Check if the current contract is set
if self.contract:
if DATA_SOURCE == 'QC':
return int(self.algo.Portfolio[self.contract].Quantity)
else:
return int(self.algo.Portfolio[self.security].Quantity)
# Otherwise return 0
return 0