| Overall Statistics |
|
Total Trades 91 Average Win 0.02% Average Loss -0.02% Compounding Annual Return -0.184% Drawdown 0.300% Expectancy -0.235 Net Profit -0.195% Sharpe Ratio -0.894 Probabilistic Sharpe Ratio 3.020% Loss Rate 60% Win Rate 40% Profit-Loss Ratio 0.93 Alpha -0.002 Beta -0 Annual Standard Deviation 0.002 Annual Variance 0 Information Ratio -1.873 Tracking Error 0.147 Treynor Ratio 5.788 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset BAC.MyCustomData 2S |
# Standard library imports
from AlgorithmImports import *
import datetime as DT
from dateutil.parser import parse
# Import from files
from notes_and_inputs import *
################################################################################
class MyCustomData(PythonData):
"""
Custom Data Class
REFs:
https://www.quantconnect.com/forum/discussion/4079/python-best-practise-for-using-consolidator-on-custom-data/p1
"""
def GetSource(self, config, date, isLiveMode):
# Get file specific to the asset symbol
symbol = config.Symbol.Value
# Must use dictionary.get() method because this will be called on initialization
# without a valid symbol
# symbol must also always be all caps, because QC converts it to all caps!
file = CSV_FILES.get(symbol, '')
return SubscriptionDataSource(
file, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
# New object
asset = MyCustomData()
asset.Symbol = config.Symbol
# try:
# Example File Format:
# <Date> <Time> <Open> <High> <Low> <Close> <Volume>
# 2/1/2018 10:30:00 13.59 13.78 13.41 13.67 19817603
# If first character is not a digit, return
if not (line.strip() and line[0].isdigit()):
return None
data = line.split(',')
# Get the date
date = parse(data[0])
time = parse(data[1]).time()
# Combine date/time to get DT.datetime object
asset.Time = DT.datetime.combine(date, time)
# Set the value used for filling positions / Using the open price
value = float(data[2])
asset.Value = value
# Get the prices
asset["Open"] = float(data[2])
asset["High"] = float(data[3])
asset["Low"] = float(data[4])
asset["Close"] = float(data[5])
asset["Volume"] = float(data[6])
# except ValueError:
# # Do nothing
# return None
return asset# Standard library imports
from AlgorithmImports import *
import datetime as DT
# Import from files
from notes_and_inputs import *
from symbol_data import *
from custom_data import *
################################################################################
class EquitiesStrategyAlgorithm(QCAlgorithm):
def Initialize(self):
"""Initialize algorithm."""
# Set backtest details
self.set_backtest_details()
# Initialize algo parameters
self.initialize_parameters()
# Add all desired instruments to the algo
self.add_instruments()
# Warm up the algorithm
# self.SetWarmUp(WARMUP_DAYS)
#-------------------------------------------------------------------------------
def set_backtest_details(self):
"""Set the backtest details."""
# Set the start and end date (if applicable)
self.SetStartDate(
BACKTEST_START_DT.year,
BACKTEST_START_DT.month,
BACKTEST_START_DT.day)
if END_DATE:
self.SetEndDate(END_DT.year, END_DT.month, END_DT.day)
# Set the starting cash amount
self.SetCash(CASH)
# Set the timezone for algo logs
self.SetTimeZone(TIMEZONE)
# Setup trading framework
# Transaction and submit/execution rules will use IB models
# Cannot use the built-in models with the custom data
if DATA_SOURCE == 'QC':
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage,
AccountType.Margin)
# Will override the desired reality methods with this function
# Configure all algorithm securities
self.SetSecurityInitializer(self.custom_security_initializer)
#-------------------------------------------------------------------------------
def initialize_parameters(self):
"""Read all algo input parameters and set up all others required."""
# Read user inputs
self.min_p1 = MIN_P1
self.min_p2 = MIN_P2
self.level_a = LEVEL_A
self.level_b = LEVEL_B
self.level_c = LEVEL_C
self.stop_ix = STOP_IX
self.length = LENGTH
self.stop_ix_r = STOP_IX_R
self.stop_dollar = STOP_DOLLAR
self.atr_length_r = ATR_LENGTH_R
self.risk_pct = RISK_PCT
self.lookback = LOOKBACK
#-------------------------------------------------------------------------------
def add_instruments(self):
"""Add desired instrument data to the algo."""
# Create a dictionary to hold all symbol data objects
self.symbol_data = {}
# Loop through all equities
for equity in EQUITIES:
# Check if 'QC' data is used
if DATA_SOURCE == 'QC':
# Add price data to the algo and save QC symbol object
equity_symbol = self.AddEquity(equity, Resolution.Minute).Symbol
# Otherwise 'CUSTOM' data is used
else:
equity_symbol = self.AddData(MyCustomData, equity).Symbol
# Create symbol data object for the equity
self.symbol_data[equity] = SymbolData(self, equity, equity_symbol)
#-------------------------------------------------------------------------------
def custom_security_initializer(self, security):
"""
Define models to be used for securities as they are added to the
algorithm's universe.
"""
# Define the data normalization mode
security.SetDataNormalizationMode(DataNormalizationMode.Adjusted)
#-------------------------------------------------------------------------------
def OnOrderEvent(self, orderEvent):
"""Built-in event handler for orders."""
# Skip if not filled
if orderEvent.Status != OrderStatus.Filled:
return
# Get the order details
order = self.Transactions.GetOrderById(orderEvent.OrderId)
order_qty = order.Quantity
symbol = str(order.Symbol)
# Remove the .MyCustomData if custom data is used
if DATA_SOURCE == 'CUSTOM':
symbol = symbol.split(".")[0]
# Get current qty
qty = self.Portfolio[symbol].Quantity
# Get the current order's average fill price
avg_fill = round(orderEvent.FillPrice, 4)
# Get the symbol data object for the symbol
symbol_data = self.symbol_data[symbol]
# Get active stop order id
if symbol_data.stop_order:
stop_order_id = symbol_data.stop_order.OrderId
else:
stop_order_id = 0
# Check for filled stop order
if orderEvent.OrderId == stop_order_id:
# Filled stop loss order - set to None
symbol_data.stop_order = None
# Log message when desired
if PRINT_ORDERS:
self.Log(f"{symbol} filled stop order for {order_qty} shares @ "
f"{avg_fill}")
# Set mp to 0
# symbol_data.mp = 0
return
# Check for a market exit order
elif qty == 0:
# Filled exit order
# symbol_data.stop_order = None
# Log message when desired
if PRINT_ORDERS:
self.Log(f"{symbol} filled exit order for {order_qty} shares @ "
f"{avg_fill}")
# Set mp to 0
# symbol_data.mp = 0
return
# Otherwise qty not 0, so entry order
else:
# Filled entry order
# Get and save the cost basis for the position
cost_basis = avg_fill
symbol_data.cost_basis = cost_basis
# Log message when desired
if PRINT_ORDERS:
if qty > 0:
self.Log(f"{symbol} filled long entry order for "
f"{order_qty} shares @ {avg_fill}, new qty = {qty}")
elif qty < 0:
self.Log(f"{symbol} filled short entry order for "
f"{order_qty} shares @ {avg_fill}, new qty = {qty}")
# Immediately place new stop market order
symbol_data.PlaceStopOrder(qty)"""
Matt Custom Strategy
Version 1.0.2
Platform: QuantConnect
By: Aaron Eller
For: Matt Blonc
www.excelintrading.com
aaron@excelintrading.com
Revision Notes:
1.0.0 (09/08/2021) - Initial
1.0.1 (09/10/2021) - Added 'CUSTOM_TIMES' BAR option.
1.0.2 (09/17/2021) - Added DATA_SOURCE input and custom data logic.
- Modified symbol_data.set_indicators() to handle custom
data consolidators.
- Modified set_backtest_details() and OnOrderEvent() to
handle custom data symbols.
1.0.3 (09/22/2021) - Updated PlaceStopOrder() to use atr_r.
- Updated up/down count and up/down support/resistance
logic.
References:
-Creating a Custom Indicator
https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1
"""
################################################################################
import datetime as DT
# USER INPUTS
# Backtest details
START_DATE = "09-01-2020" # MM-DD-YYYY format
END_DATE = None #"12-31-2021" # MM-DD-YYYY format (or None for to current date)
CASH = 400000 # starting portfolio cash amount
TIMEZONE = "US/Eastern" # e.g. "US/Eastern", "US/Central", "US/Pacific"
# Set the equities to trade
# Be sure to use ALL CAPS
EQUITIES = ['BAC'] #, 'T', 'AA']
#-------------------------------------------------------------------------------
# BAR TO TRACK FOR EACH EQUITY
# Tell algorithm to use QC data or custom data
DATA_SOURCE = 'CUSTOM' # must be 'QC' or 'CUSTOM'
#-------------------------------------------------------------------------------
# The following inputs are used for 'QC' data:
# Use the following format: '5 min', '1 hr', '1 day'
# Or to use CUSTOM_TIMES below, set BAR = 'CUSTOM_TIMES'
BAR = 'CUSTOM_TIMES'
# Define the custom intraday minutely bars to track
# List specific bar start/stop times
# Only used when BAR above is set to be 'CUSTOM_TIMES'
CUSTOM_TIMES = [
'0930-1030',
'1030-1130',
'1130-1230',
'1230-1330',
'1330-1430',
'1430-1530',
'1530-1600',
]
#-------------------------------------------------------------------------------
# The following inputs are used for 'CUSTOM' data
# Create a dictionary to hold all links to csv custom data
CSV_FILES = {}
# Add link for all EQUITIES listed above
# Note that the link must automatically download the data as a csv file
# If using dropbox remember to add the &dl=1 to trigger a download
CSV_FILES['BAC'] = 'https://www.dropbox.com/s/thit8qxyb0pz4qi/BAC%2060%20Minutes.csv?dl=1'
# Set period for custom data
CUSTOM_DATA_PERIOD = DT.timedelta(minutes=60)
# 'days', 'minutes', 'hours' are all possible arguments for DT.timedelta
#-------------------------------------------------------------------------------
# INDICATOR INPUTS
MIN_P1 = 3
MIN_P2 = 8
LEVEL_A = .7
LEVEL_B = .8
LEVEL_C = 1.7
STOP_IX = 6
LENGTH = 40
LOOKBACK = 34 # default setting for MaxBarsBack in TradeStation = 50
# POSITION SIZING INPUTS
STOP_IX_R = 2
ATR_LENGTH_R = 160
STOP_DOLLAR = 1100
RISK_PCT = 0.0025
#-------------------------------------------------------------------------------
# Turn on/off logs
PRINT_SIGNALS = True # print entry/exit signals
PRINT_ORDERS = True # print order details
################################################################################
############################ END OF ALL USER INPUTS ############################
################################################################################
################################################################################
# VALIDATE USER INPUTS - DO NOT CHANGE BELOW!!!
# Verify start date
try:
START_DT = DT.datetime.strptime(START_DATE, '%m-%d-%Y')
except:
raise ValueError("Invalid START_DATE format ({}). Must be in MM-DD-YYYY "
"format.".format(START_DATE))
# Verify end date
try:
if END_DATE:
END_DT = DT.datetime.strptime(END_DATE, '%m-%d-%Y')
except:
raise ValueError("Invalid END_DATE format ({}). Must be in MM-DD-YYYY "
"format or set to None to run to date.".format(END_DATE))
#-------------------------------------------------------------------------------
# Verify BAR
# First check if 'CUSTOM_TIMES' is used
if BAR == 'CUSTOM_TIMES':
# Create empty list of times to fill
BAR_TIMES = []
# Loop through CUSTOM_TIMES
for time_str in CUSTOM_TIMES:
try:
# Get bar start hour and minutes
start_str = time_str.split('-')[0]
start_hr = int(start_str[:2])
start_min = int(start_str[-2:])
# Get bar end hour and minutes
end_str = time_str.split('-')[1]
end_hr = int(end_str[:2])
end_min = int(end_str[-2:])
# Create a datetime.time object for start and end times
time_start = DT.time(
hour=start_hr, minute=start_min, second=0, microsecond=0)
time_end = DT.time(
hour=end_hr, minute=end_min, second=0, microsecond=0)
# Add (start time, end time) tuple to BAR_DTS list
BAR_TIMES.append((time_start, time_end))
# Set BAR_UNIT to be minutes
BAR_UNIT = 'min'
except:
raise ValueError("Invalid CUSTOM_TIMES entry: {}".format(time_str))
else:
try:
# Get the bar integer and bar unit
BAR_INT = int(BAR.split(" ")[0])
BAR_UNIT = BAR.split(" ")[1].lower()
# Verify bar unit is valid
if BAR_UNIT not in ['min', 'hr', 'day']:
raise ValueError(
"Invalid BAR ({}). Unit must be 'min', 'hr', or 'day'.".format(
BAR))
# Check for hourly bar unit
elif BAR_UNIT == 'hr':
# Convert hr bar int to be minutes
BAR_INT *= 60
# Change the BAR_UNIT to now be 'min'
BAR_UNIT = 'min'
# Only allow '1 day' bars
elif BAR_UNIT == 'day' and BAR_INT != 1:
raise ValueError(
"Invalid BAR ({}). Multiple 'day' bars are not allowed.".format(
BAR))
except:
raise ValueError(
"Invalid BAR: {}. Use '5 min', '1 hr', or '1 day' format".format(
BAR))
# Verify data source
if DATA_SOURCE == 'QC':
pass
elif DATA_SOURCE == 'CUSTOM':
# Make sure all equities have a data source
for equity in EQUITIES:
if equity not in CSV_FILES:
raise ValueError("Using 'CUSTOM' data and no CSV_FILES link defined"
" for {}.".format(equity))
else:
raise ValueError("Invalid DATA_SOURCE ({}). Must be 'QC' or "
"'CUSTOM'.".format(DATA_SOURCE))
#-------------------------------------------------------------------------------
# Calculate the minimum number of bars required to warm up the indicators
WARMUP_DAYS = int(1*max(ATR_LENGTH_R, LENGTH))
# Set the start date based on the desired days to warm up the algo
# approximately 252 market days per 365 calendar days
# CALENDAR_DAYS = int(WARMUP_DAYS*(365/252))
CALENDAR_DAYS = 1
BACKTEST_START_DT = START_DT - DT.timedelta(days=CALENDAR_DAYS)# Standard library imports
import datetime as DT
import math
import numpy as np
import pytz
import random
# Import from files
from notes_and_inputs import *
################################################################################
class SymbolData(object):
"""Class to store data for a specific security symbol."""
def __init__(self, algo, symbol, security_symbol):
"""Initialize SymbolData object."""
self.algo = algo
self.symbol = symbol
self.security = security_symbol
# Set the indicators for the security
self.set_indicators()
#-------------------------------------------------------------------------------
def set_indicators(self):
"""Set up the security's indicators to be updated on the desired BAR."""
# Set up the ATRs for the equity
# self.atr = AverageTrueRange(
# self.algo.length, MovingAverageType.Exponential)
self.atr = AverageTrueRange(
self.algo.length, MovingAverageType.Simple)
# self.atr_r = AverageTrueRange(
# self.algo.atr_length_r, MovingAverageType.Exponential)
self.atr_r = AverageTrueRange(
self.algo.atr_length_r, MovingAverageType.Simple)
# REF: https://lean-api-docs.netlify.app/MovingAverageTypeExtensions_8cs_source.html
# Check if QC data is used
if DATA_SOURCE == 'QC':
# Set up the desired bar to update the indicators
# Create a trade bar consolidator
if BAR_UNIT == 'min':
if BAR == 'CUSTOM_TIMES':
c = TradeBarConsolidator(self.CustomCalendar)
else:
c = TradeBarConsolidator(timedelta(minutes=BAR_INT))
elif BAR_UNIT == 'day':
c = TradeBarConsolidator(self.DailyUSEquityCalendar)
# Event handler to be called on each new consolidated bar
c.DataConsolidated += self.OnDataConsolidated
# Link the consolidator with our contract and add it to the manager
self.algo.SubscriptionManager.AddConsolidator(self.security, c)
# Otherwise custom data
else:
# Create a tradebar consolidator based on custom data
c = self.algo.ResolveConsolidator(self.symbol, CUSTOM_DATA_PERIOD)
# Event handler to be called on each new consolidated bar
c.DataConsolidated += self.OnDataConsolidated
# Link the consolidator with our contract and add it to the manager
self.algo.SubscriptionManager.AddConsolidator(self.symbol, c)
# Keep a rolling window of the last length bars
self.lookback = self.algo.lookback
self.bars = RollingWindow[TradeBar](self.lookback)
# Keep a link to the active stop loss order
self.stop_order = None
# Save the positions active cost basis
# Will use this actual amount for the stop loss instead of using the
# assumed fill price at the bar's open
self.cost_basis = None
# Initialize other variables required for the algo
self.mp = 0
self.up_s_exit = -99999
self.dn_s_exit = 99999
self.up_f_exit = -99999
self.dn_f_exit = 99999
self.up_signal = 0
self.dn_signal = 0
self.up_fix = 0
self.dn_fix = 0
self.up_ct = 1
self.dn_ct = 1
self.up_res = 99999
self.dn_res = 99999
self.up_sup = -99999
self.dn_sup = -99999
self.p1_up_ready = False
self.p1_dn_ready = False
self.up_f = -99999
self.dn_f = 99999
self.p1_up_f = 99999
self.p1_dn_f = -99999
self.p1_up_e = 0
self.p1_dn_e = 0
self.p1_up_noise_a = 0
self.p1_dn_noise_a = 0
self.p1_up_noise_b = 0
self.p1_dn_noise_b = 0
self.p1_up_noise_c = 0
self.p1_dn_noise_c = 0
self.p1_up_entry = 99999
self.p1_dn_entry = -99999
self.reversal = 0
self.p2_up_ready = False
self.p2_dn_ready = False
self.p2_up_e = 0
self.p2_dn_e = 0
self.p2_up_noise_a = 0
self.p2_dn_noise_a = 0
self.p2_up_noise_b = 0
self.p2_dn_noise_b = 0
self.p2_up_noise_c = 0
self.p2_dn_noise_c = 0
self.up_swing = 0
self.dn_swing = 0
self.up_e = 0
self.dn_e = 0
self.up_noise_a = 0
self.dn_noise_a = 0
self.up_noise_b = 0
self.dn_noise_b = 0
self.up_noise_c = 0
self.dn_noise_c = 0
self.up_entry = 99999
self.dn_entry = -99999
self.up_ap = 99999
self.dn_ap = 99999
self.up_am = -99999
self.dn_am = -99999
self.up_bp = 99999
self.dn_bp = 99999
self.up_bm = -99999
self.dn_bm = -99999
self.up_cp = 99999
self.dn_cp = 99999
self.up_cm = -99999
self.dn_cm = -99999
self.contracts = 0
#-------------------------------------------------------------------------------
def OnDataConsolidated(self, sender, bar):
"""Event handler for desired custom bars."""
# Get the symbol of the new custom bar
symbol = str(bar.Symbol)
# Get previous atr value before updating it
try:
previous_atr = self.atr.Current.Value
except:
# Exception happens on the very first bar fed to the algo
previous_atr = None
# Update the 2 ATRs
self.atr.Update(bar)
self.atr_r.Update(bar)
# Do not continue if an atr is not ready or not to backtest start date
if not self.atr.IsReady or not self.atr_r.IsReady \
or self.algo.Time < START_DT:
# Add bar to rolling window and return
self.bars.Add(bar)
return
# Check for a previous bar to compare to
if len(list(self.bars)) > 0:
# Get last bar for reference
# Most recent bar is at beginning of the rolling window, so use [0]
last_bar = self.bars[0]
# Get previous values before updating them
# These are all of the TradeStation variables with [1] reference
previous_mp = self.mp
previous_reversal = self.reversal
previous_p1_up_ready = self.p1_up_ready
previous_p1_dn_ready = self.p1_dn_ready
previous_up_ap = self.up_ap
previous_dn_am = self.dn_am
previous_up_swing = self.up_swing
previous_dn_swing = self.dn_swing
previous_up_cm = self.up_cm
previous_dn_cp = self.dn_cp
previous_p1_up_entry = self.p1_up_entry
previous_p1_dn_entry = self.p1_dn_entry
previous_p1_up_f = self.p1_up_f
previous_p1_dn_f = self.p1_dn_f
previous_p1_up_e = self.p1_up_e
previous_p1_dn_e = self.p1_dn_e
previous_p1_up_noise_a = self.p1_up_noise_a
previous_p1_up_noise_b = self.p1_up_noise_b
previous_p1_up_noise_c = self.p1_up_noise_c
previous_p1_dn_noise_a = self.p1_dn_noise_a
previous_p1_dn_noise_b = self.p1_dn_noise_b
previous_p1_dn_noise_c = self.p1_dn_noise_c
# 1. Market Position
# Exit
# up_s_exit, up_f_exit, dn_s_exit, and dn_f_exit have not changed
# yet, so not saving the "previous values"
if previous_mp == 1 and bar.Low <= self.up_s_exit:
self.mp = 0
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log("Previous MP=1, Low <= UpSExit, so now MP=0")
if previous_mp == 1 and last_bar.Low < self.up_f_exit:
self.mp = 0
if PRINT_SIGNALS:
self.algo.Log("Previous MP=1, Previous Low < UpFExit, so "
"now MP=0")
if previous_mp == -1 and bar.High >= self.dn_s_exit:
self.mp = 0
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log("Previous MP=-1, High >= DnSExit, so now MP=0")
if previous_mp == -1 and last_bar.High > self.dn_f_exit:
self.mp = 0
if PRINT_SIGNALS:
self.algo.Log("Previous MP=-1, Previous High > DnFExit, so "
"now MP=0")
# Entry
# up_signal and dn_signal have not changed yet, so not saving the
# "previous values"
if previous_mp < 1 and self.up_signal > 0:
self.mp = 1
if PRINT_SIGNALS:
self.algo.Log("Previous MP<1, Previous UpSignal, so now "
"MP=1")
if previous_mp > -1 and self.dn_signal < 0:
self.mp = -1
if PRINT_SIGNALS:
self.algo.Log("Previous MP>-1, Previous DnSignal, so now "
"MP=-1")
# 2. Index
atr = self.atr.Current.Value
self.up_fix = 0
self.dn_fix = 0
# Up: check for new higher high
if bar.High > last_bar.High:
self.up_ct = 1
# Loop through a list of historical bars
historical_bars = list(self.bars)
# NOTE: rolling window to list has bars in newest to oldest order
# Loop through a list of historical bars
for historical_bar in historical_bars:
if (bar.High >= historical_bar.High) \
and (self.up_ct < self.lookback):
# Increment up count
self.up_ct += 1
else:
break
# Check if up count is only 1
if self.up_ct == 1:
bars = [bar]
else:
# Get last up_ct bars-1
bars = historical_bars[:self.up_ct-1]
# And add the current bar
bars.append(bar)
# Get up support and resistance
# self.up_res = max([bar.High for bar in bars])
self.up_res = bar.High
self.up_sup = min([bar.Low for bar in bars])
# Update up fix
self.up_fix = (self.up_res-self.up_sup)/atr
# Down: check for a new lower low
if bar.Low < last_bar.Low:
self.dn_ct = 1
# Loop through a list of historical bars
historical_bars = list(self.bars)
# NOTE: rolling window to list has bars in newest to oldest order
# Loop through a list of historical bars
for historical_bar in historical_bars:
if (bar.Low <= historical_bar.Low) \
and (self.dn_ct < self.lookback):
# Increment down count
self.dn_ct += 1
else:
break
# Check if down count is only 1
if self.dn_ct == 1:
bars = [bar]
else:
# Get last dn_ct bars-1
bars = historical_bars[:self.dn_ct]
# And add the current bar
bars.append(bar)
# Get down support and resistance
self.dn_res = max([bar.High for bar in bars])
# self.dn_sup = min([bar.Low for bar in bars])
self.dn_sup = bar.Low
# Update dn fix
self.dn_fix = (self.dn_res-self.dn_sup)/atr
# 3. Setup
# Up P1
if self.dn_ct >= self.lookback:
self.p1_up_ready = False
else:
if self.dn_fix >= self.algo.min_p1:
if self.up_f <= self.dn_res:
self.p1_up_ready = True
self.p1_up_f = self.dn_res
self.p1_up_e = (self.dn_res+self.dn_sup)*0.50
dist = self.dn_res-self.dn_sup
self.p1_up_noise_a = dist * self.algo.level_a
self.p1_up_noise_b = dist * self.algo.level_b
self.p1_up_noise_c = dist * self.algo.level_c
self.p1_up_entry = self.p1_up_e \
+ self.p1_up_noise_a*0.50
# Down P1
if self.up_ct >= self.lookback:
self.p1_dn_ready = False
else:
if self.up_fix >= self.algo.min_p1:
if self.dn_f >= self.up_sup:
self.p1_dn_ready = True
self.p1_dn_f = self.up_sup
self.p1_dn_e = (self.up_res+self.up_sup)*0.50
dist = self.up_res-self.up_sup
self.p1_dn_noise_a = dist * self.algo.level_a
self.p1_dn_noise_b = dist * self.algo.level_b
self.p1_dn_noise_c = dist * self.algo.level_c
self.p1_dn_entry = self.p1_dn_e \
- self.p1_dn_noise_a*0.50
# Up P2
if self.up_fix >= self.algo.min_p2:
if previous_reversal == 0:
self.reversal = 1
if previous_reversal == -1:
self.reversal = 1
if self.mp < 1:
self.p2_up_ready = True
self.p2_up_e = bar.High - self.algo.min_p2*atr*0.50
self.p2_up_noise_a = \
self.algo.min_p2*atr*self.algo.level_a
self.p2_up_noise_b = \
self.algo.min_p2*atr*self.algo.level_b
self.p2_up_noise_c = \
self.algo.min_p2*atr*self.algo.level_c
# Down P2
if self.dn_fix >= self.algo.min_p2:
if previous_reversal == 0:
self.reversal = -1
if previous_reversal == 1:
self.reversal = -1
if self.mp > -1:
self.p2_dn_ready = True
self.p2_dn_e = bar.Low + self.algo.min_p2*atr*0.50
self.p2_dn_noise_a = \
self.algo.min_p2*atr*self.algo.level_a
self.p2_dn_noise_b = \
self.algo.min_p2*atr*self.algo.level_b
self.p2_dn_noise_c = \
self.algo.min_p2*atr*self.algo.level_c
# 4. Entry
self.up_signal = 0
self.dn_signal = 0
# Up P1
if previous_p1_up_ready and bar.High >= previous_p1_up_entry:
self.up_swing = 1
self.up_signal = 1
self.up_f = previous_p1_up_f
self.up_e = previous_p1_up_e
self.up_noise_a = previous_p1_up_noise_a
self.up_noise_b = previous_p1_up_noise_b
self.up_noise_c = previous_p1_up_noise_c
self.up_entry = previous_p1_up_entry
self.up_ap = self.up_e + self.up_noise_a*0.50
self.up_bp = self.up_e + self.up_noise_b*0.50
self.up_cp = self.up_e + self.up_noise_c*0.50
self.up_am = self.up_e - self.up_noise_a*0.50
self.up_bm = self.up_e - self.up_noise_b*0.50
self.up_cm = self.up_e - self.up_noise_c*0.50
self.p1_up_ready = False
self.p2_up_ready = False
# Down P1
if previous_p1_dn_ready and bar.Low <= previous_p1_dn_entry:
self.dn_swing = -1
self.dn_signal = -1
self.dn_f = previous_p1_dn_f
self.dn_e = previous_p1_dn_e
self.dn_noise_a = previous_p1_dn_noise_a
self.dn_noise_b = previous_p1_dn_noise_b
self.dn_noise_c = previous_p1_dn_noise_c
self.dn_entry = previous_p1_dn_entry
self.dn_ap = self.dn_e + self.dn_noise_a*0.50
self.dn_bp = self.dn_e + self.dn_noise_b*0.50
self.dn_cp = self.dn_e + self.dn_noise_c*0.50
self.dn_am = self.dn_e - self.dn_noise_a*0.50
self.dn_bm = self.dn_e - self.dn_noise_b*0.50
self.dn_cm = self.dn_e - self.dn_noise_c*0.50
self.p1_dn_ready = False
self.p2_dn_ready = False
# Up P2
if self.p2_up_ready:
self.up_swing = 2
self.up_signal = 2
self.up_f = -99999
self.up_e = self.p2_up_e
self.up_noise_a = self.p2_up_noise_a
self.up_noise_b = self.p2_up_noise_b
self.up_noise_c = self.p2_up_noise_c
self.up_entry = bar.High
self.up_ap = self.up_e + self.up_noise_a*0.50
self.up_bp = self.up_e + self.up_noise_b*0.50
self.up_cp = self.up_e + self.up_noise_c*0.50
self.up_am = self.up_e - self.up_noise_a*0.50
self.up_bm = self.up_e - self.up_noise_b*0.50
self.up_cm = self.up_e - self.up_noise_c*0.50
self.p1_up_ready = False
self.p2_up_ready = False
# Down P2
if self.p2_dn_ready:
self.dn_swing = -2
self.dn_signal = -2
self.dn_f = 99999
self.dn_e = self.p2_dn_e
self.dn_noise_a = self.p2_dn_noise_a
self.dn_noise_b = self.p2_dn_noise_b
self.dn_noise_c = self.p2_dn_noise_c
self.dn_entry = bar.Low
self.dn_ap = self.dn_e + self.dn_noise_a*0.50
self.dn_bp = self.dn_e + self.dn_noise_b*0.50
self.dn_cp = self.dn_e + self.dn_noise_c*0.50
self.dn_am = self.dn_e - self.dn_noise_a*0.50
self.dn_bm = self.dn_e - self.dn_noise_b*0.50
self.dn_cm = self.dn_e - self.dn_noise_c*0.50
self.p1_dn_ready = False
self.p2_dn_ready = False
# 5. Re-entry
# Up
if self.up_signal == 0 and self.mp < 1 and previous_mp < 1:
if self.up_swing > 0 and bar.High >= previous_up_ap:
self.up_signal = 3
# Down
if self.dn_signal == 0 and self.mp > -1 and previous_mp > -1:
if self.dn_swing < 0 and bar.Low <= previous_dn_am:
self.dn_signal = -3
# 6. End
# Up
if self.up_signal == 0 and previous_up_swing > 0:
if (self.dn_fix >= min(self.algo.min_p1, self.algo.min_p2)) \
and (bar.Low < previous_up_cm):
self.up_swing = 0
self.up_f = -99999
# Down
if self.dn_signal == 0 and previous_dn_swing < 0:
if (self.up_fix >= min(self.algo.min_p1, self.algo.min_p2)) \
and (bar.High > previous_dn_cp):
self.dn_swing = 0
self.dn_f = 99999
# 7. Trailing
# Up
if self.up_swing > 0:
self.up_ap = max(self.up_ap, bar.High)
self.up_bp = max(self.up_bp, bar.High)
self.up_cp = max(self.up_cp, bar.High)
self.up_am = self.up_ap-self.up_noise_a
self.up_bm = self.up_bp-self.up_noise_b
self.up_cm = self.up_cp-self.up_noise_c
# Down
if self.dn_swing < 0:
self.dn_am = min(self.dn_am, bar.Low)
self.dn_bm = min(self.dn_bm, bar.Low)
self.dn_cm = min(self.dn_cm, bar.Low)
self.dn_ap = self.dn_am+self.dn_noise_a
self.dn_bp = self.dn_bm+self.dn_noise_b
self.dn_cp = self.dn_cm+self.dn_noise_c
# 8. Exit
# Up
if self.up_swing > 0:
if self.up_entry >= self.up_bm:
self.up_f_exit = min(self.up_entry, self.up_bm)
else:
self.up_f_exit = max(self.up_entry, self.up_cm)
# Down
if self.dn_swing < 0:
if self.dn_entry <= self.dn_bp:
self.dn_f_exit = max(self.dn_entry, self.dn_bp)
else:
self.dn_f_exit = min(self.dn_entry, self.dn_cp)
# 9. Stop Exit
# Instead of delaying the stop exit like TradeStation logic does,
# the stop loss order is placed immediately after an entry order is
# filled
# 10. Mixed signals
if self.up_signal > 0 and self.dn_signal < 0:
self.up_signal = 0
self.dn_signal = 0
# 11. Entry/Re-entry/Exit
current_shares = int(self.algo.Portfolio[self.security].Quantity)
if self.mp < 1 and self.up_signal >= 1:
# Get the signal
if self.up_signal == 1:
signal = "L1"
elif self.up_signal == 2:
signal = "L2"
elif self.up_signal == 3:
signal = "L3"
# Get desired position size
self.GetPositionSize()
# Get order qty
desired_shares = self.contracts
order_qty = desired_shares-current_shares
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} {signal} Entry Signal: "
f"Have {current_shares} shares, want {desired_shares}, "
f"so order shares = {order_qty}"
)
# Cancel previous stop order (if one)
self.CancelStopOrder()
# Place market buy order
self.algo.MarketOrder(self.security, order_qty)
elif self.mp > -1 and self.dn_signal <= -1:
# Get the signal
if self.dn_signal == -1:
signal = "S1"
elif self.dn_signal == -2:
signal = "S2"
elif self.dn_signal == -3:
signal = "S3"
# Get desired position size
self.GetPositionSize()
# Get order qty
desired_shares = -self.contracts
order_qty = desired_shares-current_shares
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} {signal} Entry Signal: "
f"Have {current_shares} shares, want {desired_shares}, "
f"so order shares = {order_qty}"
)
# Cancel previous stop order (if one)
self.CancelStopOrder()
# Place market sell order
self.algo.MarketOrder(self.security, order_qty)
# Exit
# Check for active long position
if self.mp == 1 and current_shares > 0:
# NOTE: stop loss order is placed immediately after an entry
# order is filled
# Check for exit signal
if bar.Low < self.up_f_exit:
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} LXF Exit Signal: "
f"Sell {current_shares} shares"
)
self.ExitPosition(current_shares)
# Check for active short position
elif self.mp == -1 and current_shares < 0:
# NOTE: stop loss order is placed immediately after an entry
# order is filled
# Check for exit signal
if bar.High > self.dn_f_exit:
# Log message when desired
if PRINT_SIGNALS:
self.algo.Log(
f"{self.symbol} SXF Exit Signal: "
f"Buy to cover {current_shares} shares"
)
self.ExitPosition(current_shares)
# Add bar to rolling window
self.bars.Add(bar)
#-------------------------------------------------------------------------------
def ExitPosition(self, current_shares):
"""Immediately exit an active position."""
self.CancelStopOrder()
# Place a market order to close position
self.algo.MarketOrder(self.security, -current_shares)
# # Set mp to 0
# self.mp = 0
#-------------------------------------------------------------------------------
def CancelStopOrder(self):
"""Cancel the open stop order."""
if self.stop_order:
# Log message when desired
if PRINT_ORDERS:
self.algo.Log(f"Cancelling {self.symbol} stop order.")
self.stop_order.Cancel()
# Set back to None
self.stop_order = None
#-------------------------------------------------------------------------------
def PlaceStopOrder(self, current_shares):
"""Place the stop loss order."""
# Get the previous completee bar's atr
atr = self.atr_r.Current.Value
# Check for long position
if current_shares > 0:
# Use up stop
stop = self.up_s_exit = \
round(self.cost_basis-self.algo.stop_ix*atr,2)
position = 'long'
elif current_shares < 0:
# Use down stop
stop = self.dn_s_exit = \
round(self.cost_basis+self.algo.stop_ix*atr,2)
position = 'short'
else:
# Don't continue if no position
return
# Place stop market order for -current shares at the desired stop price
self.stop_order = \
self.algo.StopMarketOrder(self.security, -current_shares, stop)
# Log message when desired
if PRINT_ORDERS:
self.algo.Log(
f"{self.symbol} {position} stop order for {-current_shares}"
f" shares placed at {stop}"
)
#-------------------------------------------------------------------------------
def GetPositionSize(self):
"""Get the desired position size."""
atr_r = self.atr_r.Current.Value
# risk = self.algo.risk_pct*self.algo.Portfolio.TotalPortfolioValue
risk = self.algo.stop_dollar
# NOTE: big point value is 1 for equities
n_rough = risk/(self.algo.stop_ix_r*atr_r)
nr = int(n_rough)
# self.contracts = round(nr, 0)
self.contracts = 100
#-------------------------------------------------------------------------------
def DailyUSEquityCalendar(self, dt):
"""
Set up daily consolidator calendar info for the US equity market.
This should return a start datetime object that is timezone unaware
with a valid date/time for the desired securities' exchange's time zone.
Useful Refs:
datetime.replace() method:
https://docs.python.org/3/library/datetime.html#datetime.datetime.replace
"""
# Need to handle case where algo initializes and this function is called
# for the first time. When that occurs, the current datetime when you
# requested a backtest is passed to this function as dt. This is the
# only time that dt has a timezone attached to it.
if dt.tzinfo:
# Since this doesn't matter, we'll pass dt as start and one day
# as the timedelta until end_dt
start_dt = dt
end_dt = start_dt + timedelta(1)
return CalendarInfo(start_dt, end_dt-start_dt)
# Set start time to be 930am ET
start = dt.replace(hour=9, minute=30, second=0, microsecond=0)
# Set end time to be 16:00 ET
end = dt.replace(hour=16, minute=0, second=0, microsecond=0)
# return start datetime and period
return CalendarInfo(start, end-start)
#-------------------------------------------------------------------------------
def CustomCalendar(self, dt):
"""
Set up custom consolidator calendar info.
This should return a start datetime object that is timezone unaware
with a valid date/time for the desired securities' exchange's time zone.
Useful Refs:
datetime.replace() method:
https://docs.python.org/3/library/datetime.html#datetime.datetime.replace
"""
# Need to handle case where algo initializes and this function is called
# for the first time. When that occurs, the current datetime when you
# requested a backtest is passed to this function as dt. This is the
# only time that dt has a timezone attached to it.
if dt.tzinfo:
# Since this doesn't matter, we'll pass dt as start and one day
# as the timedelta until end_dt
start_dt = dt
end_dt = start_dt + timedelta(1)
return CalendarInfo(start_dt, end_dt-start_dt)
# Set start and end to None
start = None
end = None
# Loop through all BAR_TIMES
for tup in BAR_TIMES:
# Check for dt time before the start time
if dt.time() <= tup[0]:
# Set the start time to be the bar's start time
start = dt.replace(hour=tup[0].hour, minute=tup[0].minute)
# Set the end time
end = dt.replace(hour=tup[1].hour, minute=tup[1].minute)
# Break from loop
break
# Make sure that start and end are set
if not start or not end:
# Get the first bar start and end times
start_time = BAR_TIMES[0][0]
end_time = BAR_TIMES[0][1]
# Set start to the next day's first start time
start = dt + timedelta(1)
start = start.replace(
hour=start_time.hour, minute=start_time.minute, second=0,
microsecond=0)
# Set the end time
end = start.replace(
hour=end_time.hour, minute=end_time.minute, second=0,
microsecond=0)
# return start datetime and period
return CalendarInfo(start, end-start)