| Overall Statistics |
|
Total Orders 19 Average Win 0.04% Average Loss 0.00% Compounding Annual Return 6.888% Drawdown 0.700% Expectancy 4.752 Start Equity 1000000 End Equity 1003419.85 Net Profit 0.342% Sharpe Ratio -0.204 Sortino Ratio -0.273 Probabilistic Sharpe Ratio 57.749% Loss Rate 43% Win Rate 57% Profit-Loss Ratio 9.07 Alpha 0.022 Beta 0.071 Annual Standard Deviation 0.028 Annual Variance 0.001 Information Ratio 3.375 Tracking Error 0.112 Treynor Ratio -0.08 Total Fees $0.00 Estimated Strategy Capacity $240000000.00 Lowest Capacity Asset BA R735QTJ8XC9X Portfolio Turnover 3.96% Drawdown Recovery 4 |
###############################################################################
# Standard library imports
import datetime as DT
import pandas as pd
import pytz
from io import StringIO
# QuantConnect specific imports
from AlgorithmImports import *
# Import from files
from notes_and_inputs import *
from object_store import *
from symbol_data import SymbolData
################################################################################
class GoogleSheetAutoTrader(QCAlgorithm):
def initialize(self):
"""Initialize algorithm."""
# Set backtest details
self.set_backtest_details()
# Add strategy variables
self.add_strategy_variables()
# Add instrument data to the algo
self.add_instrument_data()
# schedule functions
self.schedule_functions()
# Log info for live trading
if self.live_mode:
self.my_log(f"QCAlgorithm initialized {VERSION}")
#-------------------------------------------------------------------------------
def set_backtest_details(self):
"""Set the backtest details."""
self.set_start_date(START_DT.year, START_DT.month, START_DT.day)
if END_DATE:
self.set_end_date(END_DT.year, END_DT.month, END_DT.day)
self.set_cash(CASH)
self.set_time_zone(TIMEZONE)
# Setup trading framework
# Transaction and submit/execution rules from brokerage model
'''https://www.quantconnect.com/docs/v2/writing-algorithms/
reality-modeling/brokerages/supported-models'''
# account types: AccountType.MARGIN, AccountType.CASH
self.set_brokerage_model(
BrokerageName.TRADE_STATION,
AccountType.MARGIN
)
# Configure all universe securities
# This sets the data normalization mode to raw
# ALEX: set_ overrides the brokerage model, use add_
self.add_security_initializer(self.custom_security_initializer)
#-------------------------------------------------------------------------------
def add_strategy_variables(self):
"""Create required strategy variables."""
# Keep track of SymbolData class instances and Symbols
self.symbol_data = {}
self.symbol_objects = {}
#-------------------------------------------------------------------------------
def add_instrument_data(self):
"""Add instrument data to the algo."""
# Set data resolution
if DATA_RESOLUTION == 'SECOND':
self.resolution = Resolution.SECOND
else:
self.resolution = Resolution.MINUTE
# Add data for the benchmark
self.bm = self.add_equity(BENCHMARK, self.resolution).symbol
self.get_exchange_info()
# Read the current state of the Google Sheet for stocks to trade
self.read_google_sheet()
# Restore the algo state
restore_state_from_object_store(self)
#-------------------------------------------------------------------------------
def get_exchange_info(self):
"""Get the securities exchange info."""
# Get the SecurityExchangeHours Class object for the symbol
self.exchange_hours = self.securities[self.bm].exchange.hours
# Create a datetime I know the market was open for the full day
dt = DT.datetime(2021, 1, 4)
# Get the next open datetime from the SecurityExchangeHours Class
mkt_open_dt = self.exchange_hours.get_next_market_open(dt, False)
# Save the typical (regualar session) market open and close times
self.mkt_open = mkt_open_dt.time()
mkt_close_dt = self.exchange_hours.get_next_market_close(dt, False)
self.mkt_close = mkt_close_dt.time()
# Get the exchange timezone
self.mkt_tz = pytz.timezone(str(self.exchange_hours.time_zone))
# Create pytz timezone objects for the exchange tz and local tz
exchange_tz = self.mkt_tz
local_tz = pytz.timezone(TIMEZONE)
# Get the difference in the timezones
# REF: http://pytz.sourceforge.net/#tzinfo-api
# for pytz timezone.utcoffset() method
# 3600 seconds/hour
exchange_utc_offset_hrs = int(exchange_tz.utcoffset(dt).seconds/3600)
local_utc_offset_hrs = int(local_tz.utcoffset(dt).seconds/3600)
self.offset_hrs = exchange_utc_offset_hrs-local_utc_offset_hrs
# NOTE: offset hours are very helpful if you want to schedule functions
# around market open/close times
# Get the market close time for the local time zone
self.mkt_close_local_tz = \
(mkt_close_dt-DT.timedelta(hours=self.offset_hrs)).time()
#-------------------------------------------------------------------------------
def schedule_functions(self):
"""Scheduling the functions required by the algo."""
# For live trading, have a heartbeat function run every 10 minutes
if self.live_mode:
self.schedule.on(
self.date_rules.every_day(self.bm),
self.time_rules.every(DT.timedelta(minutes=10)),
self.heartbeat
)
# Save state to Object Store at end of day (5 minutes after the close)
self.schedule.on(
self.date_rules.every_day(self.bm),
self.time_rules.before_market_close(self.bm, -5),
self.save_state
)
#-------------------------------------------------------------------------------
def read_google_sheet(self):
"""Read and parse the Google Sheet with trading strategy parameters."""
try:
# Download the CSV data from Google Sheets
csv = self.download(GOOGLE_SHEET_URL)
# Check if CSV data is empty or invalid
if not csv or len(csv.strip()) == 0:
if self.live_mode:
self.my_log("Google Sheet returned empty data - skipping update")
return
else:
raise ValueError("Google Sheet returned empty data")
df = pd.read_csv(StringIO(csv))
# Check if dataframe is empty
if df.empty:
if self.live_mode:
self.my_log("Google Sheet contains no data rows - skipping update")
return
else:
raise ValueError("Google Sheet contains no data rows")
except Exception as e:
# Check if live trading
if self.live_mode:
self.my_log(f"Error reading Google Sheet: {str(e)}")
return
# For backtesting, raise error
else:
raise ValueError(f"Error reading Google Sheet: {str(e)}")
# Loop through the contents one row at a time
for index, row in df.iterrows():
ticker = None
try:
# Get the trade information
ticker = str(row['Symbol']).strip().upper()
# Skip if NAN
if ticker == 'NAN' or ticker == '':
continue
# Skip if invalid direction
direction = str(row['Type']).strip().upper()
if direction not in ['LONG', 'SHORT']:
self.my_log(f"{ticker} invalid type: {direction}")
continue
shares = int(row['Shares'])
price = float(row['Price'])
entry_time_str = str(row['TimeOfEntry'])
entry_time_hr = int(entry_time_str.split(":")[0])
entry_time_min = int(entry_time_str.split(":")[1])
entry_time = DT.time(entry_time_hr, entry_time_min)
if entry_time <= DT.time(9,30):
self.my_log(f"{ticker} invalid entry time <= 9:30: {entry_time}")
continue
price_trigger_str = str(row['PriceTrigger']).upper()
if price_trigger_str == 'TRUE':
use_price_trigger = True
elif price_trigger_str == 'FALSE':
use_price_trigger = False
else:
self.my_log(
f"{ticker} invalid price_trigger: {price_trigger_str}"
)
continue
time_trigger_str = str(row['TimeTrigger']).upper()
if time_trigger_str == 'TRUE':
use_time_trigger = True
elif time_trigger_str == 'FALSE':
use_time_trigger = False
else:
self.my_log(
f"{ticker} invalid time_trigger: {time_trigger_str}"
)
continue
# Verify exactly one is True
sum_triggers = int(use_time_trigger) + int(use_price_trigger)
if sum_triggers != 1:
self.my_log(
f"{ticker} invalid triggers (exactly 1 must be True)"
)
continue
pt1_pct = float(row['PT1%'])/100
pt1_sell_pct = float(row['PT1Sell%'])/100
pt2_pct = float(row['PT2%'])/100
pt2_sell_pct = float(row['PT2Sell%'])/100
ts3_pct = float(row['TS3'])/100
ts3_sell_pct = float(row['TS3%'])/100
ten_day_ema_str = str(row['10DEMA4']).upper()
if ten_day_ema_str == 'TRUE':
use_ten_day_ema = True
elif ten_day_ema_str == 'FALSE':
use_ten_day_ema = False
else:
self.my_log(f"{ticker} invalid 10DEMA4: {ten_day_ema_str}")
continue
pt4_sell_pct = float(row['PT4Sell%'])/100
ten_period_ema_str = str(row['DTSell10EMA5Min']).upper()
if ten_period_ema_str == 'TRUE':
use_ten_period_ema = True
elif ten_period_ema_str == 'FALSE':
use_ten_period_ema = False
else:
self.my_log(
f"{ticker} invalid DTSell10EMA5Min: {ten_period_ema_str}"
)
continue
pt5_sell_pct = float(row['PT5Sell%'])/100
vol_multiplier = float(row['Vol>21Day'])
sl_pct = float(row['StopLoss%'])/100
if use_price_trigger:
within_pct = float(row['Within %'])/100
else:
within_pct = 0 # n/a
# Package parameters into a dictionary
params = {
'ticker': ticker,
'direction': direction,
'shares': shares,
'price': price,
'entry_time': entry_time,
'use_price_trigger': use_price_trigger,
'use_time_trigger': use_time_trigger,
'within_pct': within_pct,
'pt1_pct': pt1_pct,
'pt1_sell_pct': pt1_sell_pct,
'pt2_pct': pt2_pct,
'pt2_sell_pct': pt2_sell_pct,
'ts3_pct': ts3_pct,
'ts3_sell_pct': ts3_sell_pct,
'use_ten_day_ema': use_ten_day_ema,
'pt4_sell_pct': pt4_sell_pct,
'use_ten_period_ema': use_ten_period_ema,
'pt5_sell_pct': pt5_sell_pct,
'vol_multiplier': vol_multiplier,
'sl_pct': sl_pct
}
# Check if ticker already has a SymbolData instance
if ticker in self.symbol_data:
sd = self.symbol_data[ticker]
# If in position, ignore updates
if sd.in_position:
continue
# Check if parameters have changed
if sd.parameters_changed(params):
self.my_log(f"{ticker} parameters updated")
sd.update_parameters(params)
else:
# Create new SymbolData instance
# First add equity if not already added
if ticker not in self.symbol_objects:
symbol_obj = self.add_equity(ticker, self.resolution).symbol
self.symbol_objects[ticker] = symbol_obj
else:
symbol_obj = self.symbol_objects[ticker]
# Create SymbolData instance with parameters
self.symbol_data[ticker] = SymbolData(self, symbol_obj, params)
self.my_log(f"Added new symbol: {ticker}")
except Exception as e:
# Check if live trading
if self.live_mode:
if ticker:
self.my_log(
f"Error reading Google Sheet inputs for {ticker}: {str(e)}"
)
else:
self.my_log(
f"Error reading Google Sheet row {index}: {str(e)}"
)
# For backtesting, raise error
else:
if ticker:
raise ValueError(
f"Error reading Google Sheet inputs for {ticker}: {str(e)}"
)
else:
raise ValueError(
f"Error reading Google Sheet row {index}: {str(e)}"
)
#-------------------------------------------------------------------------------
def on_data(self, data):
"""Main trading logic - called on each data slice."""
# Loop through all symbol_data instances
for ticker, sd in self.symbol_data.items():
try:
# Only check exit conditions (entries handled in on_1min_bar)
sd.check_exit_conditions(data)
except Exception as e:
self.my_log(f"Error processing {ticker}: {str(e)}")
#-------------------------------------------------------------------------------
def on_order_event(self, order_event):
"""Handle order events and route to appropriate SymbolData instance."""
# Get the symbol from the order event
symbol = order_event.symbol
ticker = symbol.value
# Route to the appropriate SymbolData instance
if ticker in self.symbol_data:
self.symbol_data[ticker].on_order_event(order_event)
# Save state after order fills in live trading
if self.live_mode and order_event.status == OrderStatus.FILLED:
save_state_to_object_store(self)
#-------------------------------------------------------------------------------
def heartbeat(self):
"""Heartbeat function for algorithm used in live trading."""
# Check for Google Sheet changes
self.read_google_sheet()
#-------------------------------------------------------------------------------
def save_state(self):
"""Save algorithm state to Object Store."""
save_state_to_object_store(self)
#-------------------------------------------------------------------------------
def my_log(self, message):
"""Add algo time to log."""
self.log(f'{self.time}: {message}')
#-------------------------------------------------------------------------------
def custom_security_initializer(self, security: Security) -> None:
# Set data normalization to Raw
security.set_data_normalization_mode(DataNormalizationMode.RAW)
# Set the margin model
# security.margin_model = PatternDayTradingMarginModel()
# Overwrite the security buying power
# security.set_buying_power_model(BuyingPowerModel.NULL)
# security.set_buying_power_model(SecurityMarginModel(4))
# Set fee model
# security.set_fee_model(ConstantFeeModel(0))
# security.set_fee_model(CustomFeeModel())
#-------------------------------------------------------------------------------
def on_splits(self, splits):
"""Built-in event handler for split events."""
# Loop through the splits
for symbol, split in splits.items():
try:
# Verify this is not a warning
if split.type == 1:
# Get the split factor
split = split.split_factor
# Catch the appropriate symbol_data instance
sd = self.symbol_data.get(symbol)
if sd:
# Log message when desired
if PRINT_SPLITS or self.live_mode:
self.my_log(
f"New {symbol.value} split: {split}. "
f"Updating {symbol.value}'s indicators."
)
# Adjust the indicators by the adjustment factor
sd.adjust_indicators(split, is_split=True)
except:
self.my_log(f'on_splits() error for {symbol.value}')
#-------------------------------------------------------------------------------
def on_dividends(self, dividends):
"""Built-in event handler for dividend events."""
# Loop through the dividends
for symbol, dividend in dividends.items():
try:
# Get the dividend distribution amount
dividend = dividend.distribution
# Get last 2 daily prices
hist = self.history([symbol], 2, Resolution.DAILY)
if hist.empty or len(hist) < 2:
continue
price = hist.iloc[-1]['close'] # [-1] for last
previous_close = hist.iloc[0]['close'] # [0] for first
# Calculate the dividend adjustment factor
af = (previous_close-dividend)/previous_close
# Catch the appropriate symbol_data instance
sd = self.symbol_data.get(symbol)
if sd:
# Log message when desired
if PRINT_DIVIDENDS or self.live_mode:
self.my_log(
f"New {symbol.value} dividend={dividend}. "
f"Close={price}, previous close={previous_close}, "
f"so dividend adjustment factor={af}. "
f"Updating {symbol.value}'s indicators."
)
# Adjust the previous bars by the dividend adjustment factor
sd.adjust_indicators(af, is_split=False)
except:
self.my_log(f'on_dividends() error for {symbol.value}')
#-------------------------------------------------------------------------------
# from AlgorithmImports import *
"""
Google Sheet AutoTrader (Swing) Strategy
Version 1.0.1
Platform: QuantConnect
By: Aaron Eller
r.aaron.eller@gmail.com
Revision Notes:
1.0.0 (11/17/2025) - Initial.
1.0.1 (11/20/2020) - Updated object_store.py functions to handle dates
as strings for json serialization.
- Updated read_google_sheet() to handle unexpected
error with no return from the csv download.
References:
-QC (Lean) Class List
https://lean-api-docs.netlify.app/annotated.html
"""
# Standard library imports
import datetime as DT
###############################################################################
# Backtest inputs
START_DATE = "11-01-2025" # must be in "MM-DD-YYYY" format
END_DATE = "" # must be in "MM-DD-YYYY" format or None
CASH = 1_000_000
#-------------------------------------------------------------------------------
# Strategy Inputs
# Enter the URL to access the Google Sheet
GOOGLE_SHEET_URL = "https://docs.google.com/spreadsheets/d/e/2PACX-1vRRRcTvi0Liq6VoaCe3TYr1fFazLOfuDr_rhq5s5YI-hIDvKCGacDT0RaeAejxbTfFzWCUAFhyRhc5Q/pub?gid=0&single=true&output=csv"
# Enter the desired data resolution - either 'SECOND' or 'MINUTE'
# Entries will be checked every minute during the regular session.
# Exits will be checked based on this resolution.
DATA_RESOLUTION = 'SECOND'
# Set benchmark - used for scheduling functions
BENCHMARK = 'SPY'
#-------------------------------------------------------------------------------
# LOGGING INPUTS
PRINT_SPLITS = True # print split info
PRINT_DIVIDENDS = True # print dividend info
PRINT_SIGNALS = True # print new signal information
PRINT_ORDERS = True # print new orders
################################################################################
############################ END OF ALL USER INPUTS ############################
################################################################################
# VALIDATE USER INPUTS - DO NOT CHANGE BELOW!!!
#-------------------------------------------------------------------------------
VERSION = "Version 1.0.0"
TIMEZONE = "US/Eastern" # e.g. "US/Eastern", "US/Central", "US/Pacific"
# Verify DATA_RESOLUTION
DATA_RESOLUTION = DATA_RESOLUTION.upper()
if DATA_RESOLUTION not in ['SECOND', 'MINUTE']:
raise ValueError(f"DATA_RESOLUTION must be 'SECOND' or 'MINUTE'!")
#-------------------------------------------------------------------------------
# Verify start date
try:
START_DT = DT.datetime.strptime(START_DATE, '%m-%d-%Y')
except:
raise ValueError(
f"Invalid START_DATE format ({START_DATE}). Must be in MM-DD-YYYY "
"format."
)
# Verify end date
try:
if END_DATE:
END_DT = DT.datetime.strptime(END_DATE, '%m-%d-%Y')
except:
raise ValueError(
f"Invalid END_DATE format ({END_DATE}). Must be in MM-DD-YYYY "
"format or set to None to run to date."
)# Standard library imports
import json
import datetime as DT
# QuantConnect specific imports
from AlgorithmImports import *
################################################################################
# Object Store State Management
################################################################################
OBJECT_STORE_KEY = "algo_state"
def make_json_serializable(obj):
"""
Convert objects to JSON-serializable format.
Handles datetime.time objects and other non-serializable types.
"""
import datetime as DT
if isinstance(obj, DT.time):
# Convert time to string in HH:MM:SS format
return obj.strftime('%H:%M:%S')
elif isinstance(obj, DT.datetime):
return obj.isoformat()
elif isinstance(obj, dict):
return {k: make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [make_json_serializable(item) for item in obj]
else:
return obj
def restore_params_from_json(params):
"""
Convert params from JSON format back to Python objects.
Specifically handles entry_time string -> time object conversion.
"""
import datetime as DT
# Make a copy to avoid modifying original
restored_params = params.copy()
# Convert entry_time string back to time object
if 'entry_time' in restored_params and isinstance(restored_params['entry_time'], str):
time_str = restored_params['entry_time']
# Parse HH:MM:SS format
parts = time_str.split(':')
restored_params['entry_time'] = DT.time(
int(parts[0]),
int(parts[1]),
int(parts[2]) if len(parts) > 2 else 0
)
return restored_params
def save_state_to_object_store(algo):
"""
Save the current state of all positions to the Object Store.
Call this at the end of each trading day or when positions change.
"""
if not algo.live_mode:
return # Only save state in live trading
state = {
'last_updated': algo.time.isoformat(),
'symbols': {}
}
# Loop through all symbol_data instances
for ticker, sd in algo.symbol_data.items():
# Only save state for symbols that are in position
if sd.in_position:
symbol_state = {
# Trade parameters
'direction': sd.direction,
'trade_type': sd.trade_type,
'shares': sd.shares,
# Position status
'cost_basis': sd.cost_basis,
'current_shares': sd.current_shares,
'in_position': sd.in_position,
# Stop loss
'stop_price': sd.stop_price,
'sell_stop_pct': sd.sell_stop_pct,
# Best prices for trailing stop
'highest_price': sd.highest_price,
'lowest_price': sd.lowest_price,
# Profit target status
'pt1_filled': sd.pt1_filled,
'pt2_filled': sd.pt2_filled,
'ts3_active': sd.ts3_active,
'pt4_filled': sd.pt4_filled,
'pt5_filled': sd.pt5_filled,
# Pending order flags
'pt1_pending': sd.pt1_pending,
'pt2_pending': sd.pt2_pending,
'ts3_pending': sd.ts3_pending,
'pt4_pending': sd.pt4_pending,
'pt5_pending': sd.pt5_pending,
'stop_loss_pending': sd.stop_loss_pending,
# Strategy parameters (needed to recreate SymbolData)
# Convert to JSON-serializable format
'params': make_json_serializable(sd.params)
}
state['symbols'][ticker] = symbol_state
# Save to Object Store as JSON
try:
state_json = json.dumps(state, indent=2)
algo.object_store.save(OBJECT_STORE_KEY, state_json)
algo.log(f"State saved to Object Store: {len(state['symbols'])} positions")
except Exception as e:
algo.log(f"Error saving state to Object Store: {str(e)}")
def restore_state_from_object_store(algo):
"""
Restore position state from Object Store after algo restart.
Call this during initialize() after setting up symbol_data instances.
"""
if not algo.live_mode:
return # Only restore state in live trading
# Check if state exists in Object Store
if not algo.object_store.contains_key(OBJECT_STORE_KEY):
algo.log("No saved state found in Object Store")
return
try:
# Load state from Object Store
state_json = algo.object_store.read(OBJECT_STORE_KEY)
state = json.loads(state_json)
last_updated = state.get('last_updated')
algo.log(f"Restoring state from {last_updated}")
# Restore state for each symbol
for ticker, symbol_state in state['symbols'].items():
# Get or create SymbolData instance
if ticker not in algo.symbol_data:
# Create new SymbolData instance with saved parameters
if ticker not in algo.symbol_objects:
symbol_obj = algo.add_equity(ticker, algo.resolution).symbol
algo.symbol_objects[ticker] = symbol_obj
else:
symbol_obj = algo.symbol_objects[ticker]
# Import SymbolData here to avoid circular import
from symbol_data import SymbolData
# Restore params from JSON format (convert time strings back to time objects)
restored_params = restore_params_from_json(symbol_state['params'])
algo.symbol_data[ticker] = SymbolData(algo, symbol_obj, restored_params)
sd = algo.symbol_data[ticker]
# Restore position status
sd.in_position = symbol_state['in_position']
sd.cost_basis = symbol_state['cost_basis']
sd.current_shares = symbol_state['current_shares']
# Restore stop loss
sd.stop_price = symbol_state['stop_price']
# Restore best prices
sd.highest_price = symbol_state['highest_price']
sd.lowest_price = symbol_state['lowest_price']
# Restore profit target status
sd.pt1_filled = symbol_state['pt1_filled']
sd.pt2_filled = symbol_state['pt2_filled']
sd.ts3_active = symbol_state['ts3_active']
sd.pt4_filled = symbol_state['pt4_filled']
sd.pt5_filled = symbol_state['pt5_filled']
# Restore pending flags
sd.pt1_pending = symbol_state['pt1_pending']
sd.pt2_pending = symbol_state['pt2_pending']
sd.ts3_pending = symbol_state['ts3_pending']
sd.pt4_pending = symbol_state['pt4_pending']
sd.pt5_pending = symbol_state['pt5_pending']
sd.stop_loss_pending = symbol_state['stop_loss_pending']
algo.log(
f"Restored {ticker}: {symbol_state['current_shares']} shares @ "
f"{symbol_state['cost_basis']:.2f}, stop @ {symbol_state['stop_price']:.2f}"
)
algo.log(f"State restored: {len(state['symbols'])} positions")
except Exception as e:
algo.log(f"Error restoring state from Object Store: {str(e)}")
def clear_state_from_object_store(algo):
"""
Clear saved state from Object Store.
Useful for testing or when you want to start fresh.
"""
if algo.object_store.contains_key(OBJECT_STORE_KEY):
algo.object_store.delete(OBJECT_STORE_KEY)
algo.log("State cleared from Object Store")# Standard library imports
import datetime as DT
import pandas as pd
# QuantConnect specific imports
from AlgorithmImports import *
# Import from files
from notes_and_inputs import *
################################################################################
class SymbolData(object):
"""
Class to store data and strategy for a specific symbol from Google Sheet.
"""
def __init__(self, algo, symbol_object, params):
"""Initialize SymbolData object with Google Sheet parameters.
Args:
algo: Reference to the QCAlgorithm instance
symbol_object: The Symbol object for this ticker
params: Dictionary containing the strategy parameters from Google Sheet
"""
# Save a reference to the QCAlgorithm class
self.algo = algo
# Save the .Symbol object and the ticker symbol
self.symbol_object = symbol_object
self.ticker = symbol_object.value
# Parse and store strategy parameters from Google Sheet
self.parse_parameters(params)
# Add strategy variables
self.set_strategy_variables()
# Add the bars and indicators required
self.add_bars()
self.add_indicators()
# Log info for live trading
if self.algo.live_mode:
self.algo.my_log(
f"SymbolData instance created for {self.ticker}, "
f"Type={self.trade_type}, Shares={self.shares}, "
f"indicators ready = {self.indicators_ready}"
)
#-------------------------------------------------------------------------------
def parse_parameters(self, params):
"""Parse strategy parameters from params dictionary."""
# Store the entire params dict for easy comparison later
self.params = params.copy()
# Basic trade parameters
self.trade_type = params['direction'] # Already uppercase LONG/SHORT
self.shares = params['shares']
# Entry parameters - Price-based
self.entry_price = params['price'] if params['use_price_trigger'] else None
self.within_pct = params['within_pct']
# Entry parameters - Time-based
self.entry_time = params['entry_time'] if params['use_time_trigger'] else None
# Track which trigger to use
self.use_price_trigger = params['use_price_trigger']
self.use_time_trigger = params['use_time_trigger']
# Profit targets
self.pt1_pct = params['pt1_pct']
self.pt1_sell_pct = params['pt1_sell_pct']
self.pt2_pct = params['pt2_pct']
self.pt2_sell_pct = params['pt2_sell_pct']
# Trailing stop (TS3)
self.ts3_pct = params['ts3_pct']
self.ts3_sell_pct = params['ts3_sell_pct']
# 10D EMA exit
self.use_10d_ema = params['use_ten_day_ema']
self.pt4_sell_pct = params['pt4_sell_pct']
# 5-min 10 EMA exit (day trading)
self.use_5min_ema = params['use_ten_period_ema']
self.pt5_sell_pct = params['pt5_sell_pct']
# Volume filter
self.vol_multiplier = params['vol_multiplier']
# Hard stop loss
self.sell_stop_pct = params['sl_pct']
# Determine trade direction
self.direction = 1 if self.trade_type == 'LONG' else -1
#-------------------------------------------------------------------------------
def parameters_changed(self, new_params):
"""Check if parameters have changed compared to new_params dictionary."""
# Simple comparison - just check if the dicts are different
return self.params != new_params
#-------------------------------------------------------------------------------
def update_parameters(self, new_params):
"""Update parameters without disposing of the instance."""
# Update all parameters
self.parse_parameters(new_params)
# Reset entry triggers since strategy changed
self.entry_triggered = False
self.entry_time_triggered = False
#-------------------------------------------------------------------------------
def set_strategy_variables(self):
"""Set strategy specific variables."""
self.reset_trade_variables()
self.volume_qualified = False
#-------------------------------------------------------------------------------
def reset_trade_variables(self):
"""Reset trade specific variables."""
# Keep track of orders
self.entry_order = None
self.stop_price = None
# Track position status
self.in_position = False
self.cost_basis = None
self.current_shares = 0
# Track best prices for trailing stop
self.highest_price = None # For longs
self.lowest_price = None # For shorts
# Reset profit target status
self.pt1_filled = False
self.pt2_filled = False
self.ts3_active = False
self.pt4_filled = False
self.pt5_filled = False
# Reset pending order flags (prevent multiple executions)
self.pt1_pending = False
self.pt2_pending = False
self.ts3_pending = False
self.pt4_pending = False
self.pt5_pending = False
self.stop_loss_pending = False
# Reset entry triggers
self.entry_triggered = False
self.entry_time_triggered = False
#-------------------------------------------------------------------------------
def add_bars(self):
"""Add bars and consolidators required."""
# Daily bar consolidator for 10D EMA
if self.use_10d_ema:
self.calendar_initialized = False
daily_consolidator = TradeBarConsolidator(self.daily_calendar)
daily_consolidator.data_consolidated += self.on_daily_bar
self.algo.subscription_manager.add_consolidator(
self.symbol_object, daily_consolidator
)
self.daily_consolidator = daily_consolidator
else:
self.daily_consolidator = None
# 5-minute bar consolidator for 5-min 10 EMA
if self.use_5min_ema:
min5_consolidator = TradeBarConsolidator(DT.timedelta(minutes=5))
min5_consolidator.data_consolidated += self.on_5min_bar
self.algo.subscription_manager.add_consolidator(
self.symbol_object, min5_consolidator
)
self.min5_consolidator = min5_consolidator
else:
self.min5_consolidator = None
# 1-minute bar consolidator for volume tracking
if self.vol_multiplier > 0:
min1_consolidator = TradeBarConsolidator(DT.timedelta(minutes=1))
min1_consolidator.data_consolidated += self.on_1min_bar
self.algo.subscription_manager.add_consolidator(
self.symbol_object, min1_consolidator
)
self.min1_consolidator = min1_consolidator
else:
self.min1_consolidator = None
#-------------------------------------------------------------------------------
def daily_calendar(self, dt):
"""
Set up daily consolidator calendar info for the US equity market.
This should return a start datetime object that is timezone unaware
with a valid date/time for the desired securities' exchange's time zone.
"""
# Need to handle case where algo initializes and this function is called
# for the first time.
if not self.calendar_initialized:
# Since this doesn't matter, we'll pass dt as start and one day
# as the timedelta until end_dt
start_dt = dt
end_dt = start_dt + DT.timedelta(1)
self.calendar_initialized = True
return CalendarInfo(start_dt, end_dt-start_dt)
# Create a datetime.datetime object to represent the market open for the
# **EXCHANGE** timezone
start = dt.replace(
hour=self.algo.mkt_open.hour,
minute=self.algo.mkt_open.minute,
second=0,
microsecond=0
)
# Get today's end time from the SecurityExchangeHours Class object
end = self.algo.exchange_hours.get_next_market_close(start, False)
# Catch when start is after the passed dt
# QC now throws an error in this case
if start > dt:
# To handle the QC error, pass period for no data
# Set the end to be the next desired start
end = start
# And set start to dt to avoid QC throwing error
start = dt
# This will result in the next dt being the desired start time
# Return the start datetime and the consolidation period
return CalendarInfo(start, end-start)
#-------------------------------------------------------------------------------
def add_indicators(self):
"""Add indicators required."""
self.indicators = []
# 10-period EMA on daily timeframe
if self.use_10d_ema:
self.ema_10d = ExponentialMovingAverage(10)
self.indicators.append(self.ema_10d)
else:
self.ema_10d = None
self.bar_window_daily = RollingWindow[TradeBar](20)
# 10-period EMA on 5-minute timeframe
min_5min_bars = [20]
if self.use_5min_ema:
self.ema_5min = ExponentialMovingAverage(10)
self.indicators.append(self.ema_5min)
else:
self.ema_5min = None
self.bar_window_5min = RollingWindow[TradeBar](20)
# Volume tracking:
# dictionary of RollingWindows for cumulative volume at each minute
if self.vol_multiplier > 0:
# Dictionary:
# minute_of_day -> RollingWindow of cumulative volumes (21 days)
self.cumulative_volume_by_minute = {}
# Track today's cumulative volume
self.today_cumulative_volume = 0
# Reset flag at market open
self.volume_reset_today = False
else:
self.cumulative_volume_by_minute = None
# Warm up indicators
self.warmup_indicators()
# Check if indicators are ready
self.indicators_ready = all(ind.is_ready for ind in self.indicators) if self.indicators else True
#-------------------------------------------------------------------------------
def warmup_indicators(self):
"""Warm up indicators with historical data."""
# Determine warmup period needed
self.warming_up = True
warmup_days = 0
if self.use_10d_ema or self.vol_multiplier > 0:
warmup_days = 40 # 21 bars ~ 31 calendar days
else:
warmup_days = 10
# Get historical minute data
minute_bars = self.algo.history[TradeBar](
self.symbol_object,
DT.timedelta(days=warmup_days),
Resolution.MINUTE
)
# Loop through the minute bars and update the indicators
for bar in minute_bars:
# Pass directly to the minute bar event handler (no consolidating)
self.on_1min_bar(None, bar)
# Pass to the 5-minute and 1-day consolidators
if self.min5_consolidator:
self.min5_consolidator.update(bar)
if self.daily_consolidator:
self.daily_consolidator.update(bar)
self.warming_up = False
#-------------------------------------------------------------------------------
def on_daily_bar(self, sender, bar):
"""Handle consolidated daily bar data."""
if self.ema_10d:
self.ema_10d.update(bar.end_time, bar.close)
self.bar_window_daily.add(bar)
#-------------------------------------------------------------------------------
def on_5min_bar(self, sender, bar):
"""Handle consolidated 5-minute bar data."""
if self.ema_5min:
self.ema_5min.update(bar.end_time, bar.close)
self.bar_window_5min.add(bar)
#-------------------------------------------------------------------------------
def on_1min_bar(self, sender, bar):
"""Handle consolidated 1-minute bar data for volume tracking."""
if self.cumulative_volume_by_minute is None:
return
# Reset cumulative volume at market open (9:30 AM)
current_time = bar.end_time.time()
market_open = DT.time(9, 30)
if current_time == DT.time(9, 31) and not self.volume_reset_today:
self.today_cumulative_volume = 0
self.volume_reset_today = True
# Add this bar's volume to today's cumulative
self.today_cumulative_volume += bar.volume
# Get minutes since market open (9:30 AM = minute 0)
minutes_since_open = (current_time.hour - 9) * 60 + (current_time.minute - 30)
# Only track during market hours (9:30 AM to 4:00 PM)
if 0 <= minutes_since_open <= 390: # 390 minutes in trading day
# Create RollingWindow for this minute if it doesn't exist
if minutes_since_open not in self.cumulative_volume_by_minute:
self.cumulative_volume_by_minute[minutes_since_open] = RollingWindow[float](21)
# Update the rolling window with today's cumulative volume at this minute
self.cumulative_volume_by_minute[minutes_since_open].add(self.today_cumulative_volume)
# Reset flag at end of day
if current_time >= DT.time(16, 0):
self.volume_reset_today = False
elif not self.warming_up:
# Check for entry signals after volume is updated
self.check_and_execute_entry(current_time)
#-------------------------------------------------------------------------------
def check_and_execute_entry(self, current_time):
"""Check entry conditions and execute if met. Called from on_1min_bar.
Args:
current_time: The current bar end time
"""
# Skip if already in position or entry already triggered
if self.in_position or self.entry_triggered:
return
# Handle time-based entry
if self.use_time_trigger and self.entry_time is not None:
# Check if current time matches entry time
if current_time != self.entry_time:
return # Not the right time yet, skip all checks
# It's the right time - now check volume if required
if self.vol_multiplier > 0:
vol_okay, vol_msg = self.check_volume_condition()
if not vol_okay:
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} time entry BLOCKED. {vol_msg}"
)
return
# Volume passed (or not required), execute entry
self.entry_triggered = True
if PRINT_SIGNALS or self.algo.live_mode:
vol_msg = vol_msg if self.vol_multiplier > 0 else ""
self.algo.my_log(
f"{self.ticker} ENTRY SIGNAL VALID: time trigger met at "
f"{current_time} {vol_msg}"
)
self.execute_entry()
return
# Handle price-based entry
if self.use_price_trigger:
# Get current price
current_price = self.algo.securities[self.symbol_object].price
if current_price == 0:
return
# Check volume condition if required
if self.vol_multiplier > 0:
vol_okay, vol_msg = self.check_volume_condition()
if not vol_okay:
return
# Check price trigger
if self.check_price_trigger(current_price):
self.entry_triggered = True
if PRINT_SIGNALS or self.algo.live_mode:
vol_msg = f'and {vol_msg}' if self.vol_multiplier > 0 else ""
self.algo.my_log(
f"{self.ticker} ENTRY SIGNAL VALID: price trigger met {vol_msg}"
)
# Execute entry
self.execute_entry()
#-------------------------------------------------------------------------------
def check_price_trigger(self, current_price):
"""Check if price trigger conditions are met."""
if self.entry_price is None:
return False
if self.direction == 1: # Long
# Price must be at or above entry price
# But not more than Within% above
max_price = self.entry_price * (1 + self.within_pct)
if self.entry_price <= current_price <= max_price:
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} LONG price trigger: current={current_price:.2f}, "
f"entry={self.entry_price:.2f}, max={max_price:.2f} "
f"(within {self.within_pct:.2%})"
)
return True
else: # Short
# Price must be at or below entry price
# But not more than Within% below
min_price = self.entry_price * (1 - self.within_pct)
if min_price <= current_price <= self.entry_price:
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} SHORT price trigger: current={current_price:.2f}, "
f"entry={self.entry_price:.2f}, min={min_price:.2f} "
f"(within {self.within_pct:.2%})"
)
return True
return False
#-------------------------------------------------------------------------------
def check_volume_condition(self):
"""Check if volume condition is met (cumulative volume vs 21-day average)."""
if self.vol_multiplier <= 0 or self.cumulative_volume_by_minute is None:
msg = "no volume filter used"
return True, msg # No volume filter
current_time = self.algo.time.time()
# Get minutes since market open (9:30 AM)
minutes_since_open = (current_time.hour - 9) * 60 + (current_time.minute - 30)
# Check if we have data for this minute
if minutes_since_open not in self.cumulative_volume_by_minute:
msg = "minutes_since_open not in cumulative_volume_by_minute"
return False, msg # No historical data yet
window = self.cumulative_volume_by_minute[minutes_since_open]
# Need at least some historical data
if not window.is_ready or window.count == 0:
msg = "window is not ready"
return False, msg
# Calculate average cumulative volume at this time over past 21 days
avg_cumulative_volume = sum(window) / window.count
# Calculate required volume and actual multiplier
required_volume = avg_cumulative_volume * self.vol_multiplier
if avg_cumulative_volume > 0:
actual_multiplier = self.today_cumulative_volume / avg_cumulative_volume
else:
actual_multiplier = 0
# Check if today's cumulative volume meets threshold
if self.today_cumulative_volume >= required_volume:
msg = (
f"{self.ticker} volume condition MET: "
f"current={self.today_cumulative_volume:,.0f}, "
f"21-day avg={avg_cumulative_volume:,.0f}, "
f"required={required_volume:,.0f} "
f"(multiplier: actual={actual_multiplier:.2f}x vs "
f"required={self.vol_multiplier:.2f}x)"
)
return True, msg
else:
msg = (
f"{self.ticker} volume condition NOT MET: "
f"current={self.today_cumulative_volume:,.0f}, "
f"21-day avg={avg_cumulative_volume:,.0f}, "
f"required={required_volume:,.0f} "
f"(multiplier: actual={actual_multiplier:.2f}x vs "
f"required={self.vol_multiplier:.2f}x)"
)
return False, msg
#-------------------------------------------------------------------------------
def execute_entry(self):
"""Execute entry order."""
if self.entry_order is not None:
return # Entry order already pending
# Calculate quantity based on direction
quantity = self.shares if self.direction == 1 else -self.shares
# Place market order
self.entry_order = self.algo.market_order(
self.symbol_object,
quantity,
tag=f"{self.trade_type} entry"
)
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} {self.trade_type} entry order placed for "
f"{quantity} shares"
)
#-------------------------------------------------------------------------------
def check_exit_conditions(self, data):
"""Check all exit conditions and execute if triggered."""
if not self.in_position or self.current_shares == 0:
return
if self.symbol_object not in data.bars:
return
bar = data.bars[self.symbol_object]
current_price = bar.close
# Update best price for trailing stop
self.update_best_price(current_price)
# Check hard stop loss first (most critical)
if self.stop_price is not None and self.sell_stop_pct > 0 and not self.stop_loss_pending:
if self.check_stop_loss(current_price):
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} Hard stop loss triggered: current={current_price:.2f}, "
f"stop={self.stop_price:.2f}, cost_basis={self.cost_basis:.2f}"
)
self.execute_stop_loss_exit()
self.stop_loss_pending = True
return # Exit immediately - no other conditions matter
# Check profit targets in order
if not self.pt1_filled and not self.pt1_pending and self.pt1_pct > 0:
if self.check_profit_target(current_price, self.pt1_pct):
target_price = self.cost_basis * (1 + self.pt1_pct) if self.direction == 1 else self.cost_basis * (1 - self.pt1_pct)
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} PT1 triggered: current={current_price:.2f}, "
f"target={target_price:.2f}, cost_basis={self.cost_basis:.2f}, "
f"profit={(current_price-self.cost_basis)/self.cost_basis:.2%}"
)
self.execute_profit_target_exit(1, self.pt1_sell_pct)
self.pt1_pending = True
if not self.pt2_filled and not self.pt2_pending and self.pt2_pct > 0:
if self.check_profit_target(current_price, self.pt2_pct):
target_price = self.cost_basis * (1 + self.pt2_pct) if self.direction == 1 else self.cost_basis * (1 - self.pt2_pct)
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} PT2 triggered: current={current_price:.2f}, "
f"target={target_price:.2f}, cost_basis={self.cost_basis:.2f}, "
f"profit={(current_price-self.cost_basis)/self.cost_basis:.2%}"
)
self.execute_profit_target_exit(2, self.pt2_sell_pct)
self.pt2_pending = True
# Activate trailing stop after PT2
self.ts3_active = True
# Check trailing stop (only after PT2)
if self.ts3_active and not self.ts3_pending and self.ts3_pct > 0:
if self.check_trailing_stop(current_price):
if self.direction == 1:
trailing_price = self.highest_price * (1 - self.ts3_pct)
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} TS3 trailing stop triggered: current={current_price:.2f}, "
f"trailing_price={trailing_price:.2f}, highest={self.highest_price:.2f}, "
f"trail_pct={self.ts3_pct:.2%}"
)
else:
trailing_price = self.lowest_price * (1 + self.ts3_pct)
if PRINT_SIGNALS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} TS3 trailing stop triggered: current={current_price:.2f}, "
f"trailing_price={trailing_price:.2f}, lowest={self.lowest_price:.2f}, "
f"trail_pct={self.ts3_pct:.2%}"
)
self.execute_trailing_stop_exit()
self.ts3_pending = True
# Check 10D EMA exit
if not self.pt4_filled and not self.pt4_pending and self.use_10d_ema and self.ema_10d and self.ema_10d.is_ready:
ema_value = self.ema_10d.current.value
if self.check_ema_exit(current_price, ema_value):
if PRINT_SIGNALS or self.algo.live_mode:
direction_text = "below" if self.direction == 1 else "above"
self.algo.my_log(
f"{self.ticker} 10D EMA exit triggered: current={current_price:.2f} {direction_text} "
f"EMA={ema_value:.2f}"
)
self.execute_ema_exit(4, self.pt4_sell_pct, "10D EMA")
self.pt4_pending = True
# Check 5-min EMA exit
if not self.pt5_filled and not self.pt5_pending and self.use_5min_ema and self.ema_5min and self.ema_5min.is_ready:
ema_value = self.ema_5min.current.value
if self.check_ema_exit(current_price, ema_value):
if PRINT_SIGNALS or self.algo.live_mode:
direction_text = "below" if self.direction == 1 else "above"
self.algo.my_log(
f"{self.ticker} 5min EMA exit triggered: current={current_price:.2f} {direction_text} "
f"EMA={ema_value:.2f}"
)
self.execute_ema_exit(5, self.pt5_sell_pct, "5min EMA")
self.pt5_pending = True
#-------------------------------------------------------------------------------
def update_best_price(self, current_price):
"""Update the best price achieved for trailing stop."""
if self.direction == 1: # Long
if self.highest_price is None or current_price > self.highest_price:
self.highest_price = current_price
else: # Short
if self.lowest_price is None or current_price < self.lowest_price:
self.lowest_price = current_price
#-------------------------------------------------------------------------------
def check_profit_target(self, current_price, target_pct):
"""Check if profit target percentage is reached."""
if self.cost_basis is None:
return False
if self.direction == 1: # Long
target_price = self.cost_basis * (1 + target_pct)
return current_price >= target_price
else: # Short
target_price = self.cost_basis * (1 - target_pct)
return current_price <= target_price
#-------------------------------------------------------------------------------
def check_trailing_stop(self, current_price):
"""Check if trailing stop is triggered."""
if self.direction == 1: # Long
if self.highest_price is None:
return False
trailing_price = self.highest_price * (1 - self.ts3_pct)
return current_price <= trailing_price
else: # Short
if self.lowest_price is None:
return False
trailing_price = self.lowest_price * (1 + self.ts3_pct)
return current_price >= trailing_price
#-------------------------------------------------------------------------------
def check_ema_exit(self, current_price, ema_value):
"""Check if EMA exit condition is met."""
if self.direction == 1: # Long
return current_price < ema_value
else: # Short
return current_price > ema_value
#-------------------------------------------------------------------------------
def check_stop_loss(self, current_price):
"""Check if hard stop loss is triggered."""
if self.stop_price is None:
return False
if self.direction == 1: # Long
return current_price <= self.stop_price
else: # Short
return current_price >= self.stop_price
#-------------------------------------------------------------------------------
def execute_profit_target_exit(self, pt_number, sell_pct):
"""Execute profit target exit."""
shares_to_sell = int(self.current_shares * sell_pct)
if shares_to_sell == 0:
return
quantity = -shares_to_sell if self.direction == 1 else shares_to_sell
# Log BEFORE submitting order (variables may reset on fill)
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} PT{pt_number} exit: selling {abs(quantity)} shares "
f"({sell_pct:.0%} of position)"
)
self.algo.market_order(
self.symbol_object,
quantity,
tag=f"PT{pt_number} exit"
)
#-------------------------------------------------------------------------------
def execute_trailing_stop_exit(self):
"""Execute trailing stop exit."""
shares_to_sell = int(self.current_shares * self.ts3_sell_pct)
if shares_to_sell == 0:
return
quantity = -shares_to_sell if self.direction == 1 else shares_to_sell
# Log BEFORE submitting order (variables may reset on fill)
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} Trailing Stop TS3 triggered: selling {abs(quantity)} shares"
)
self.algo.market_order(
self.symbol_object,
quantity,
tag="TS3 trailing stop exit"
)
#-------------------------------------------------------------------------------
def execute_ema_exit(self, pt_number, sell_pct, ema_type):
"""Execute EMA-based exit."""
shares_to_sell = int(self.current_shares * sell_pct)
if shares_to_sell == 0:
return
quantity = -shares_to_sell if self.direction == 1 else shares_to_sell
# Log BEFORE submitting order (variables may reset on fill)
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} {ema_type} exit: selling {abs(quantity)} shares "
f"({sell_pct:.0%} of position)"
)
self.algo.market_order(
self.symbol_object,
quantity,
tag=f"PT{pt_number} {ema_type} exit"
)
#-------------------------------------------------------------------------------
def execute_stop_loss_exit(self):
"""Execute hard stop loss - exit entire position."""
if self.current_shares == 0:
return
# Exit 100% of position
quantity = -self.current_shares if self.direction == 1 else self.current_shares
# Log BEFORE submitting order (variables may reset on fill)
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} Hard stop loss triggered: selling {abs(quantity)} shares "
f"(100% of position) at stop price {self.stop_price:.2f}"
)
self.algo.market_order(
self.symbol_object,
quantity,
tag="Hard stop loss exit"
)
#-------------------------------------------------------------------------------
def on_order_event(self, order_event):
"""Handle order events."""
order = self.algo.transactions.get_order_by_id(order_event.order_id)
if order_event.status != OrderStatus.FILLED:
return
symbol = order.symbol
tag = order.tag
qty = int(order.quantity)
fill_price = order_event.fill_price
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{symbol.value} {tag}: {qty} shares filled @ {fill_price:.2f}"
)
# Handle entry order fill
if 'entry' in tag.lower():
self.entry_order = None
self.in_position = True
self.cost_basis = fill_price
self.current_shares = abs(qty)
self.highest_price = fill_price if self.direction == 1 else None
self.lowest_price = fill_price if self.direction == -1 else None
# Set stop price for manual tracking
if self.sell_stop_pct > 0:
if self.direction == 1: # Long
self.stop_price = round(self.cost_basis * (1 - self.sell_stop_pct), 2)
else: # Short
self.stop_price = round(self.cost_basis * (1 + self.sell_stop_pct), 2)
if PRINT_ORDERS or self.algo.live_mode:
self.algo.my_log(
f"{self.ticker} Stop loss set at {self.stop_price:.2f}"
)
# Handle exit order fill
elif 'exit' in tag.lower():
self.current_shares -= abs(qty)
# Clear pending flag and set filled flag based on exit type
if 'pt1' in tag.lower():
self.pt1_pending = False
self.pt1_filled = True
elif 'pt2' in tag.lower():
self.pt2_pending = False
self.pt2_filled = True
elif 'ts3' in tag.lower() or 'trailing' in tag.lower():
self.ts3_pending = False
# Don't set filled flag for TS3 - it can trigger again if price moves up then down
elif 'pt4' in tag.lower() or '10d ema' in tag.lower():
self.pt4_pending = False
self.pt4_filled = True
elif 'pt5' in tag.lower() or '5min ema' in tag.lower():
self.pt5_pending = False
self.pt5_filled = True
elif 'stop loss' in tag.lower():
self.stop_loss_pending = False
# Stop loss exits 100%, so position will be fully closed
# If fully exited, reset
if self.current_shares <= 0:
self.in_position = False
self.reset_trade_variables()
#-------------------------------------------------------------------------------
def dispose(self):
"""Clean up consolidators."""
if self.daily_consolidator:
self.daily_consolidator.data_consolidated -= self.on_daily_bar
self.algo.subscription_manager.remove_consolidator(
self.symbol_object, self.daily_consolidator
)
if self.min5_consolidator:
self.min5_consolidator.data_consolidated -= self.on_5min_bar
self.algo.subscription_manager.remove_consolidator(
self.symbol_object, self.min5_consolidator
)
if self.min1_consolidator:
self.min1_consolidator.data_consolidated -= self.on_1min_bar
self.algo.subscription_manager.remove_consolidator(
self.symbol_object, self.min1_consolidator
)
#-------------------------------------------------------------------------------
def adjust_indicators(self, adjustment, is_split=False):
"""Adjust all indicators for splits or dividends."""
# Handle daily bars
if len(list(self.bar_window_daily)) > 0 and self.ema_10d:
# Get a list of the current bars
bars_daily = list(self.bar_window_daily)
# Current order is newest to oldest (default for rolling window)
# Reverse the list to be oldest to newest
bars_daily.reverse()
# Reset all indicators
self.ema_10d.reset()
self.bar_window_daily.reset()
# Loop through the bars from oldest to newest
for bar in bars_daily:
# Adjust the bar by the adjustment factor
bar.open *= adjustment
bar.high *= adjustment
bar.low *= adjustment
bar.close *= adjustment
# Update volume on split adjustment
if is_split:
vol_adjustment = 1.0/adjustment
bar.volume *= vol_adjustment
# Use the bar to update the indicators
self.ema_10d.update(bar.end_time, bar.close)
self.bar_window_daily.add(bar)
# Handle 5min bars
if len(list(self.bar_window_5min)) > 0 and self.ema_5min:
# Get a list of the current bars
bars_5min = list(self.bar_window_5min)
# Current order is newest to oldest (default for rolling window)
# Reverse the list to be oldest to newest
bars_5min.reverse()
# Reset all indicators
self.ema_5min.reset()
self.bar_window_5min.reset()
# Loop through the bars from oldest to newest
for bar in bars_5min:
# Adjust the bar by the adjustment factor
bar.open *= adjustment
bar.high *= adjustment
bar.low *= adjustment
bar.close *= adjustment
# Update volume on split adjustment
if is_split:
vol_adjustment = 1.0/adjustment
bar.volume *= vol_adjustment
# Use the bar to update the indicators
self.ema_5min.update(bar.end_time, bar.close)
self.bar_window_5min.add(bar)
# Handle all of the volumes
if self.vol_multiplier > 0 and is_split:
vol_adjustment = 1.0/adjustment
# Loop through each minute (key) of dict
for minute in self.cumulative_volume_by_minute.keys():
# Get a list of the current volumes
volumes = list(self.cumulative_volume_by_minute[minute])
# Reverse the list to be oldest to newest
volumes.reverse()
# Reset all indicators
self.cumulative_volume_by_minute[minute].reset()
# Loop through the volumes from oldest to newest
for vol in volumes:
# Update volume on split adjustment
vol *= vol_adjustment
# Use the updated vol to update the indicator
self.cumulative_volume_by_minute[minute].add(vol)