| Overall Statistics |
|
Total Orders 355 Average Win 0.85% Average Loss -0.30% Compounding Annual Return 4.911% Drawdown 9.300% Expectancy 0.090 Start Equity 500000 End Equity 530977.78 Net Profit 6.196% Sharpe Ratio -0.182 Sortino Ratio -0.22 Probabilistic Sharpe Ratio 21.150% Loss Rate 71% Win Rate 29% Profit-Loss Ratio 2.82 Alpha -0.043 Beta 0.423 Annual Standard Deviation 0.093 Annual Variance 0.009 Information Ratio -0.759 Tracking Error 0.102 Treynor Ratio -0.04 Total Fees $604.79 Estimated Strategy Capacity $4800000.00 Lowest Capacity Asset MELI TV0AIPE7984L Portfolio Turnover 3.89% |
from AlgorithmImports import *
from collections import OrderedDict
from data_classes import (
SymbolData,
MarketHours,
)
from utils import (
get_update_filter,
)
def initialize_algo_data(algorithm):
algorithm.Data = {}
algorithm.Counter = {}
algorithm.SymbolMarketHours = {}
algorithm.symbol_ticker_map = {}
algorithm.ticker_symbol_map = {}
algorithm.splits_info_dict = {}
algorithm.ticker_data_dict = {}
algorithm.order_que = []
algorithm.SetSecurityInitializer(lambda x: x.SetMarketPrice(algorithm.GetLastKnownPrice(x)))
# Update Strategy Settings
algorithm.strategy_setting.update(algorithm.backtest_setting[algorithm.strategy_setting["backtest_function"]])
for ticker in algorithm.strategy_setting['tickers']:
if ticker not in algorithm.general_setting["tickers"]:
algorithm.general_setting["tickers"][ticker] = {}
algorithm.general_setting["tickers"][ticker]["type"] = "equity"
for ticker in algorithm.general_setting["tickers"]:
if algorithm.general_setting["tickers"][ticker]["type"] == "equity":
symbol = algorithm.AddEquity(
ticker,
Resolution.Minute,
dataNormalizationMode=DataNormalizationMode.Raw,
).Symbol
if ticker == algorithm.ref_ticker:
algorithm.ref_symbol = symbol
elif algorithm.general_setting["tickers"][ticker]["type"] == "forex":
symbol = algorithm.AddForex(
ticker,
Resolution.Minute,
Market.Oanda,
).Symbol
elif algorithm.general_setting["tickers"][ticker]["type"] == "index":
symbol = algorithm.AddIndex(ticker, Resolution.Minute).Symbol
elif algorithm.general_setting["tickers"][ticker]["type"] == "cfd":
symbol = algorithm.AddCfd(
ticker,
Resolution.Minute,
Market.Oanda,
).Symbol
algorithm.Data[symbol] = SymbolData(
algorithm,
symbol,
ticker,
algorithm.general_setting,
algorithm.consolidator_settings,
)
algorithm.Counter[symbol] = {}
algorithm.Counter[symbol]["counter"] = 0
algorithm.Counter[symbol]["last_order_counter"] = 0
algorithm.Counter[symbol]["last_log_counter"] = 0
algorithm.SymbolMarketHours[symbol] = MarketHours(algorithm, symbol)
algorithm.symbol_ticker_map[symbol] = ticker
algorithm.ticker_symbol_map[ticker] = symbol
def initialize_algo_scheduler(algorithm):
algorithm.Schedule.On(
algorithm.DateRules.MonthEnd(algorithm.ref_symbol),
algorithm.TimeRules.AfterMarketClose(algorithm.ref_symbol, 30),
algorithm.SaveData,
)
algorithm.Schedule.On(
algorithm.DateRules.every_day(algorithm.ref_symbol),
algorithm.TimeRules.every(timedelta(minutes=1)),
algorithm.UpdateAlgo,
)
def process_data(algorithm, ticker, symbolData):
if algorithm.IsWarmingUp:
return
if algorithm.Time.minute not in algorithm.general_setting['valid_minutes']:
return
symbol = algorithm.ticker_symbol_map[ticker]
consolidator = symbolData.consolidators[algorithm.general_setting['base_timeframe']]
if len(consolidator.time) == 0:
return
to_reset = False
if ticker not in algorithm.ticker_data_dict:
to_reset = True
if get_update_filter(algorithm.Time, "1D", algorithm.SymbolMarketHours[algorithm.ref_symbol], algorithm.general_setting):
if ticker in algorithm.splits_info_dict:
if algorithm.Time.date() > algorithm.splits_info_dict[ticker]:
to_reset = True
del algorithm.splits_info_dict[ticker]
if to_reset:
data_df = algorithm.History(TradeBar, symbol, timedelta(algorithm.general_setting['warmup_days']), Resolution.Minute)
data_df = data_df.loc[symbol].resample(algorithm.general_setting['base_timeframe'].replace("m","Min"), closed='right', label='right').agg(
{
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum',
}
)
data_df = data_df.dropna()
data_df['is_warmup'] = True
data_df['datetime'] = data_df.index - timedelta(minutes=algorithm.general_setting['base_minutes'])
algorithm.ticker_data_dict[ticker] = data_df.to_dict(orient='index')
algorithm.ticker_data_dict[ticker] = OrderedDict(algorithm.ticker_data_dict[ticker])
if algorithm.Time > (algorithm.SymbolMarketHours[algorithm.ref_symbol].CurrentOpen + timedelta(minutes=algorithm.general_setting['base_minutes'])):
data_timestamp = pd.Timestamp(algorithm.Time.replace(second=0, microsecond=0) - timedelta(minutes=1))
if data_timestamp not in algorithm.ticker_data_dict[ticker]:
data_dict = {}
data_dict['datetime'] = consolidator.time[0]
data_dict['open'] = consolidator.open[0]
data_dict['high'] = consolidator.high[0]
data_dict['low'] = consolidator.low[0]
data_dict['close'] = consolidator.close[0]
if algorithm.general_setting["tickers"][ticker]["type"] in ["equity"]:
data_dict['volume'] = consolidator.volume[0]
data_dict['is_warmup'] = False
algorithm.ticker_data_dict[ticker][data_timestamp] = data_dict
if len(algorithm.ticker_data_dict[ticker]) > algorithm.general_setting['max_10m_bars']:
algorithm.ticker_data_dict[ticker].popitem(last=False)
from AlgorithmImports import *
## General Settings
general_setting = {
"tickers": {
"SPY": {"type": "equity"},
},
"ref_ticker": "SPY",
"consolidator_timeframes": ["10m"],
"base_timeframe": "10m",
"base_minutes": 10,
"valid_minutes": [1, 11, 21, 31, 41, 51],
"order_counter_diff": 15,
"warmup_days": 365,
"max_bars_to_compute_sr": 250,
"max_10m_bars": 10000,
}
## Consolidator Settings
consolidator_settings = {
"10m": {
"timeframe_minutes": 10,
"indicators": [],
"window": 100,
},
}
## Allowed parameters
# ALLOWED_BACKTEST_FUNCTIONS = ['one_timeframe_original_strategy',
# 'original',
# 'original_with_daily_or_4h_confirmation_on_entry',
# 'original_with_upper_lower_SR_exits',
# 'original_with_upper_lower_SR_entries',
# 'original_with_SRPoint_exits_also',
# 'original_but_do_not_enter_if_4H_or_daily_SR_near',
# 'original_but_enter_if_4H_or_daily_SR_broken_and_hold_if_not']
# ALLOWED_TIMEFRAMES = ['1D', '4H', '2H', '1H', '30m', '10m']
## Strategy Settings
strategy_setting = {
"backtest_function": "original",
## Other settings
"sr_lookback": 5,
"stoploss_pct": 15,
"sr_level_threshold_pct": 6,
"takeprofit_multiplier": 3,
#High Beta
#"tickers": ['TSLA', 'BE', 'CVNA', 'ARM'],
#"tickers": ['ACHR','AFRM','APA','APLD','AR','ARM','BE','CAVA','CLSK','COIN','CORZ','CVNA','DJT','HUT','IREN','MARA','MSTR','MTDR','PR','QS','QUBT','RIOT','SM','W'],
#Moderate-High Beta
"tickers": ['AMAT','AMD','APO','APP','ARM','ASML','BA','BN','BX','DASH','HCA','KKR','LRCX','MELI','NVDA','PH','PLTR','RCRUY','SHOP','SPOT','TSLA','BE'],
#Low Beta
#"tickers": = ['AAPL','AMZN','ANET','AVGO','BABA','BAC','BMY','C','CMCSA','CSCO','DIS','GILD','GOOG','GOOGL','INTC','JPM','KO','META','MRK','MSFT','MU','NEE','NKE','ORCL','PDD','PFE','QQQ','SBUX','SCHW','SPY','T','TSM','UBER','VZ','WFC','WMT','XOM'],
#"tickers": ['AMD', 'APP', 'ARM','LRCX', 'NVDA', 'PLTR', 'SHOP', 'SPOT'],
"risk_pct": 0.05,
"enter_long_trades": True,
"enter_short_trades": True,
}
backtest_setting = {
"one_timeframe_original_strategy": {
"one_timeframe_strategy": False,
"timeframe_1": '4H',
"timeframe_2": '',
"timeframe_3": '',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_1',
"entry_style": 'point',
"use_entry_distance_filter": False,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_1',
"early_exit_timeframe_ref": 'timeframe_1',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original": {
"one_timeframe_strategy": False,
"timeframe_1": '1D',
"timeframe_2": '4H',
"timeframe_3": '2H',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'point',
"use_entry_distance_filter": False,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original_with_daily_or_4h_confirmation_on_entry": {
"one_timeframe_strategy": False,
"timeframe_1": '1D',
"timeframe_2": '4H',
"timeframe_3": '2H',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": True,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'point',
"use_entry_distance_filter": False,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original_with_upper_lower_SR_exits": {
"one_timeframe_strategy": False,
"timeframe_1": '4H',
"timeframe_2": '2H',
"timeframe_3": '1H',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'point',
"use_entry_distance_filter": False,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'upper_support_1',
"early_exit_resistance_ref": 'lower_resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original_with_upper_lower_SR_entries": {
"one_timeframe_strategy": False,
"timeframe_1": '4H',
"timeframe_2": '2H',
"timeframe_3": '1H',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'upper_lower_sr',
"use_entry_distance_filter": False,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original_with_SRPoint_exits_also": {
"one_timeframe_strategy": False,
"timeframe_1": '4H',
"timeframe_2": '2H',
"timeframe_3": '1H',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'point',
"use_entry_distance_filter": False,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": True,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original_but_do_not_enter_if_4H_or_daily_SR_near": {
"one_timeframe_strategy": False,
"timeframe_1": '4H',
"timeframe_2": '2H',
"timeframe_3": '1H',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'point',
"use_entry_distance_filter": True,
"use_break_for_entry": False,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": False,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
"original_but_enter_if_4H_or_daily_SR_broken_and_hold_if_not": {
"one_timeframe_strategy": False,
"timeframe_1": '2H',
"timeframe_2": '1H',
"timeframe_3": '30m',
"use_stop_loss": True,
"use_take_profit": False,
"use_trailing_stop": True,
"use_multiple_higher_timeframe_for_entry": False,
"entry_timeframe_ref": 'timeframe_3',
"entry_style": 'point',
"use_entry_distance_filter": True,
"use_break_for_entry": True,
"trailing_stoploss_timeframe_ref": 'timeframe_3',
"early_exit_timeframe_ref": 'timeframe_2',
"early_exit_support_ref": 'support_1',
"early_exit_resistance_ref": 'resistance_1',
"use_point_exit": False,
"point_exit_timeframe_ref": 'timeframe_1',
"use_multi_support_early_exit": True,
"early_exit_price_ref_2": 'timeframe_3',
"early_exit_timeframe_ref_2": 'timeframe_1',
"early_exit_support_ref_2": 'support_1',
"early_exit_resistance_ref_2": 'resistance_1',
},
}
from AlgorithmImports import *
import numpy as np
from datetime import datetime, timedelta
from collections import deque
class SymbolData:
def __init__(
self,
algorithm,
symbol,
ticker,
general_setting,
consolidator_settings,
):
self.symbol = symbol
self.ticker = ticker
self.algorithm = algorithm
self.general_setting = general_setting
self.consolidator_settings = consolidator_settings
if general_setting["tickers"][self.ticker]["type"] in ["forex", "cfd"]:
self.consolidator_type = "quote"
elif general_setting["tickers"][self.ticker]["type"] in ["equity", "index"]:
self.consolidator_type = "trade"
self.consolidators = {}
for timeframe in self.general_setting["consolidator_timeframes"]:
self.consolidators[timeframe] = DataConsolidator(
algorithm,
symbol,
ticker,
general_setting,
consolidator_settings[timeframe],
self.consolidator_type,
)
@property
def IsReady(self):
# All consolidators are ready
is_ready = (
np.prod([self.consolidators[_t].IsReady for _t in self.general_setting["consolidator_timeframes"]]) == 1
)
return is_ready
class DataConsolidator:
def __init__(
self,
algorithm,
symbol,
ticker,
general_setting,
consolidator_setting,
consolidator_type,
):
self.symbol = symbol
self.ticker = ticker
self.algorithm = algorithm
self.general_setting = general_setting
self.consolidator_setting = consolidator_setting
self.consolidator_type = consolidator_type
self.ticker_type = self.general_setting["tickers"][self.ticker]["type"]
self.window_length = int(self.consolidator_setting["window"])
self.time = deque(maxlen=self.window_length)
self.open = deque(maxlen=self.window_length)
self.high = deque(maxlen=self.window_length)
self.low = deque(maxlen=self.window_length)
self.close = deque(maxlen=self.window_length)
if self.general_setting["tickers"][self.ticker]["type"] in ["equity"]:
self.volume = deque(maxlen=self.window_length)
self.BarCount = 0
if self.consolidator_type == "quote":
if self.consolidator_setting["timeframe_minutes"] in [5, 10, 15, 30, 60]:
self.Con = QuoteBarConsolidator(
TimeSpan.FromMinutes(self.consolidator_setting["timeframe_minutes"])
)
elif self.consolidator_type == "trade":
if self.consolidator_setting["timeframe_minutes"] in [5, 10, 15, 30, 60]:
self.Con = TradeBarConsolidator(
TimeSpan.FromMinutes(self.consolidator_setting["timeframe_minutes"])
)
self.Con.DataConsolidated += self.ConHandler
algorithm.SubscriptionManager.AddConsolidator(symbol, self.Con)
for _indicator in self.consolidator_setting["indicators"]:
self.indicators[_indicator] = self.get_indicator(symbol, _indicator)
def ConHandler(self, sender, bar):
self.BarCount += 1
self.time.appendleft(bar.Time)
self.open.appendleft(bar.Open)
self.high.appendleft(bar.High)
self.low.appendleft(bar.Low)
self.close.appendleft(bar.Close)
if self.general_setting["tickers"][self.ticker]["type"] in ["equity"]:
self.volume.appendleft(bar.Volume)
@property
def IsReady(self):
is_ready = len(self.close) == self.window_length
return is_ready
class MarketHours:
def __init__(self, algorithm, symbol):
self.symbol = symbol
self.hours = algorithm.Securities[self.symbol].Exchange.Hours
Time_000_Day = algorithm.Time.replace(hour=0, minute=0, second=0, microsecond=0)
self.CurrentOpen = self.hours.GetNextMarketOpen(Time_000_Day, extendedMarketHours=False).replace(second=0, microsecond=0)
self.CurrentClose = self.hours.GetNextMarketClose(self.CurrentOpen, extendedMarketHours=False).replace(second=0, microsecond=0)
self.NextOpen = self.hours.GetNextMarketOpen(self.CurrentClose, extendedMarketHours=False).replace(second=0, microsecond=0)
self.isopen = self.hours.IsOpen(algorithm.Time, extendedMarketHours=False)
def Update(self, algorithm):
Time_000_Day = algorithm.Time.replace(hour=0, minute=0, second=0, microsecond=0)
self.CurrentOpen = self.hours.GetNextMarketOpen(Time_000_Day, extendedMarketHours=False).replace(second=0, microsecond=0)
self.CurrentClose = self.hours.GetNextMarketClose(self.CurrentOpen, extendedMarketHours=False).replace(second=0, microsecond=0)
self.NextOpen = self.hours.GetNextMarketOpen(self.CurrentClose, extendedMarketHours=False).replace(second=0, microsecond=0)
self.isopen = self.hours.IsOpen(algorithm.Time, extendedMarketHours=False)
from AlgorithmImports import *
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import deque
import pickle
from config import (
general_setting,
consolidator_settings,
strategy_setting,
backtest_setting,
)
from algo_functions import (
initialize_algo_data,
initialize_algo_scheduler,
process_data,
)
from strategy_functions import (
initialize_strategy,
update_strategy,
update_signal_tracker,
)
from utils import (
get_update_filter,
get_lowest_timeframe,
)
class AlgoSystem(QCAlgorithm):
def Initialize(self):
# self.SetStartDate(2010, 1, 1)
# self.SetStartDate(2020, 1, 1)
# self.SetStartDate(2023, 1, 1)
self.SetStartDate(2024, 1, 1)
self.SetTimeZone(TimeZones.NewYork)
self.SetCash(500000)
self.SetWarmUp(int(3 * 20 * 6.5 * 60), Resolution.Minute)
#security.set_leverage(3) # Use 3x leverage
self.SetBrokerageModel(
BrokerageName.CHARLES_SCHWAB, AccountType.Margin
)
# Settings Initialization
self.general_setting = general_setting
self.consolidator_settings = consolidator_settings
self.strategy_setting = strategy_setting
self.backtest_setting = backtest_setting
self.ref_ticker = general_setting['ref_ticker']
self.ref_symbol = None
# Algo Initialization
initialize_algo_data(self)
initialize_algo_scheduler(self)
initialize_strategy(self)
def SaveData(self):
pass
# self.ObjectStore.SaveBytes(
# f"TEST_DATA_4",
# pickle.dumps(self.ticker_data_dict)
# )
def UpdateAlgo(self):
self.SymbolMarketHours[self.ref_symbol].Update(self)
## Only Process Data during Core Trading hours
# if self.Time.time() > datetime(2000, 1, 1, 16, 1).time():
# return
# if self.Time.time() <= datetime(2000, 1, 1, 9, 30).time():
# return
if self.Time.replace(second=0, microsecond=0) > (self.SymbolMarketHours[self.ref_symbol].CurrentClose + timedelta(minutes=1)):
return
if self.Time.replace(second=0, microsecond=0) <= self.SymbolMarketHours[self.ref_symbol].CurrentOpen:
return
for symbol, symbolData in self.Data.items():
if not symbolData.IsReady:
continue
ticker = self.symbol_ticker_map[symbol]
process_data(self, ticker, symbolData)
if get_update_filter(self.Time, get_lowest_timeframe(self.strategy_setting), self.SymbolMarketHours[self.ref_symbol], self.general_setting):
update_strategy(self, ticker)
if self.Time.replace(second=0, microsecond=0) < (self.SymbolMarketHours[self.ref_symbol].CurrentClose):
update_signal_tracker(self)
def OnData(self, data):
for symbol, symbolData in self.Data.items():
if not (
data.ContainsKey(symbol)
and data[symbol] is not None
and symbolData.IsReady
):
continue
ticker = self.symbol_ticker_map[symbol]
self.price_dict[ticker] = data[symbol].Price
self.Counter[symbol]["counter"] += 1
if data.Splits.ContainsKey(symbol):
split = data.Splits[symbol]
if split.Type == SplitType.Warning:
# self.Debug(f"At {self.Time}, symbol is going to split in the text trade day by a split factor of {split.SplitFactor}")
if ticker not in self.splits_info_dict:
self.splits_info_dict[ticker] = self.Time.date()
if len(self.order_que) > 0:
for order_dict in self.order_que:
self.MarketOrder(order_dict['symbol'], order_dict['quantity'])
self.order_que = []
from AlgorithmImports import *
from collections import deque
import math
import numpy as np
import scipy
from scipy.stats import norm
import talib as ta
import statsmodels.api as sm
class QuantityTracker:
def __init__(self, ticker, symbol):
self.ticker = ticker
self.symbol = symbol
self.quantity = 0
class SignalTracker:
def __init__(
self,
algorithm,
symbol,
ticker,
strategy_setting,
):
self.algorithm = algorithm
self.symbol = symbol
self.ticker = ticker
self.strategy_setting = strategy_setting
self.enter_long_trades = self.strategy_setting["enter_long_trades"]
self.enter_short_trades = self.strategy_setting["enter_short_trades"]
self.reset()
self.last_entry_date = None
self.last_entry_datetime = None
def reset(self):
self.in_trade = False
self.direction = 0
self.quantity = 0
self.entryDatetime = None
self.stop_price = None
self.entry_price = None
self.stop_size = None
self.target_price = None
self.target_size = None
self.target_range = None
self.trailing_price = None
self.min_price = None
self.max_price = None
self.exit_type = None
self.start_trailing = False
self.break_direction = 0
self.price_to_break = None
def check_entry(self, signal_direction, price):
if self.in_trade:
return False
if signal_direction == 0:
return False
has_enter_trade = False
is_valid_long_entry = True
is_valid_short_entry = True
if self.enter_long_trades and (signal_direction > 0) and is_valid_long_entry:
self.in_trade = True
self.direction = 1
self.entryDatetime = self.algorithm.Time
self.stop_price = price * (1 - self.strategy_setting['stoploss_pct'] / 100)
self.entry_price = price
self.stop_size = self.entry_price - self.stop_price
self.target_price = self.entry_price + (self.strategy_setting['takeprofit_multiplier'] * self.stop_size)
self.trailing_price = self.entry_price
self.target_size = self.target_price - self.entry_price
self.target_range = self.target_size / self.entry_price
self.min_price = self.entry_price
self.max_price = self.entry_price
quantity = int(math.floor(self.algorithm.Portfolio.TotalPortfolioValue / self.entry_price * self.strategy_setting['risk_pct'])) * self.direction
if quantity != 0:
has_enter_trade = True
self.quantity = quantity
self.last_entry_date = self.algorithm.Time.date()
self.last_entry_datetime = self.algorithm.Time
else:
self.reset()
elif self.enter_short_trades and (signal_direction < 0) and is_valid_short_entry:
self.in_trade = True
self.direction = -1
self.entryDatetime = self.algorithm.Time
self.stop_price = price * (1 + self.strategy_setting['stoploss_pct'] / 100)
self.entry_price = price
self.stop_size = self.stop_price - self.entry_price
self.target_price = self.entry_price - (self.strategy_setting['takeprofit_multiplier'] * self.stop_size)
self.trailing_price = self.entry_price
self.target_size = self.entry_price - self.target_price
self.target_range = self.target_size / self.entry_price
self.min_price = self.entry_price
self.max_price = self.entry_price
quantity = int(math.floor(self.algorithm.Portfolio.TotalPortfolioValue / self.entry_price * self.strategy_setting['risk_pct'])) * self.direction
if quantity != 0:
has_enter_trade = True
self.quantity = quantity
self.last_entry_date = self.algorithm.Time.date()
self.last_entry_datetime = self.algorithm.Time
else:
self.reset()
return has_enter_trade
def check_exit(self, price):
if not self.in_trade:
return False
if price > self.max_price:
self.max_price = price
if price < self.min_price:
self.min_price = price
to_exit = False
if self.direction > 0:
if self.strategy_setting["use_stop_loss"] and (price <= self.stop_price):
to_exit = True
self.exit_type = 'StopLoss'
elif self.strategy_setting["use_take_profit"] and (price >= self.target_price):
to_exit = True
self.exit_type = 'TakeProfit'
elif self.direction < 0:
if self.strategy_setting["use_stop_loss"] and (price >= self.stop_price):
to_exit = True
self.exit_type = 'StopLoss'
elif self.strategy_setting["use_take_profit"] and (price <= self.target_price):
to_exit = True
self.exit_type = 'TakeProfit'
return to_exit
def update_stoploss(self, high_price, low_price):
if not self.strategy_setting["use_trailing_stop"]:
return
if not self.in_trade:
return
if self.direction > 0:
new_stop_price = high_price * (1 - self.strategy_setting['stoploss_pct'] / 100)
if new_stop_price > self.stop_price:
self.stop_price = new_stop_price
elif self.direction < 0:
new_stop_price = low_price * (1 + self.strategy_setting['stoploss_pct'] / 100)
if new_stop_price < self.stop_price:
self.stop_price = new_stop_price
def check_early_exit(
self,
price, support_price, resistance_price,
support_point=0,
resist_point=0,
price_2=None,
support_price_2=None,
resistance_price_2=None,
):
if not self.in_trade:
return False
to_exit = False
if self.direction > 0:
if self.strategy_setting['use_multi_support_early_exit']:
if (support_price > 0) and (price < support_price):
support_distance_2 = 100 * (price_2 - support_price_2) / price_2
if 0 <= support_distance_2 < self.strategy_setting['sr_level_threshold_pct']:
pass
else:
to_exit = True
self.exit_type = 'Support_Hit'
else:
if (support_price > 0) and (price < support_price):
to_exit = True
self.exit_type = 'Support_Hit'
elif resist_point > 0:
to_exit = True
self.exit_type = 'Resist_Point'
elif self.direction < 0:
if self.strategy_setting['use_multi_support_early_exit']:
if (resistance_price > 0) and (price > resistance_price):
resist_distance_2 = 100 * (resistance_price_2 - price_2) / price_2
if 0 <= resist_distance_2 < self.strategy_setting['sr_level_threshold_pct']:
pass
else:
to_exit = True
self.exit_type = 'Resistance_Hit'
else:
if (resistance_price > 0) and (price > resistance_price):
to_exit = True
self.exit_type = 'Resistance_Hit'
elif support_point > 0:
to_exit = True
self.exit_type = 'Support_Point'
return to_exit
def update_break_direction(
self,
break_direction, price_to_break,
):
if self.in_trade:
return
if not self.strategy_setting['use_break_for_entry']:
return
if break_direction == 0:
return
self.break_direction = break_direction
self.price_to_break = price_to_break
def get_break_signal_direction(self, price):
break_signal_direction = 0
if self.in_trade:
return break_signal_direction
if not self.strategy_setting['use_break_for_entry']:
return break_signal_direction
if self.break_direction == 0:
return break_signal_direction
if self.break_direction > 0:
price_distance = 100 * (self.price_to_break - price) / price
if price > self.price_to_break:
break_signal_direction = 1
elif price_distance < self.strategy_setting['sr_level_threshold_pct']:
pass
else:
self.break_direction = 0
self.price_to_break = None
elif self.break_direction < 0:
price_distance = 100* (price - self.price_to_break) / price
if price < self.price_to_break:
break_signal_direction = -1
elif price_distance < self.strategy_setting['sr_level_threshold_pct']:
pass
else:
self.break_direction = 0
self.price_to_break = None
return break_signal_direction
from AlgorithmImports import *
import numpy as np
# Constants
STABILITY_PERIOD = 36
LOOK_BACK_PERIOD = 5
PIVOT_LOOK_BACK = 3
PCT_DIFFERENCE = 0
ShortTermPeriod = 5
MediumTermPeriod = 21
LongTermPeriod = 89
VeryLowCloseLevel = 0.10
DownCloseLevel = 0.33
UpCloseLevel = 0.67
VeryHighCloseLevel = 0.90
NormalDev = 0.67449
AboveNormalDev = 1.28155
LargeDev = 1.64485
ExtremeDev = 2.32635
SignalPosition = 0.35
TickSize = 0.01
def get_VSA_data(data):
"""
Calculate the VSA variables from the price data and return them
along with the price data.
:param data: the input price data
"""
open, close, high, low, volume, date_time = extract_prices_and_volume(data)
# Calculate VSA variables
(clear_weakness_confirmed, clear_weakness, weakness,
clear_strength_confirmed, clear_strength, strength) = calculate_VSA_variables(close, high, low, volume)
return (open, close, high, low, volume, date_time, clear_weakness_confirmed, clear_weakness, weakness,
clear_strength_confirmed, clear_strength, strength)
def get_SR_and_VSA_data(data):
"""
Calculate both the support/resistance and VSA variables from the price data and return them
along with the price data.
:param data: the input price data
"""
open, close, high, low, volume, date_time = extract_prices_and_volume(data)
# Calculate SR variables
(resistance_1, resistance_2, support_1, support_2, upper_support_1,
lower_resistance_1, ResistPoint, SupportPoint) = calculate_SR_data(open, close, high, low)
# Calculate VSA variables
(clear_weakness_confirmed, clear_weakness, weakness,
clear_strength_confirmed, clear_strength, strength) = calculate_VSA_variables(close, high, low, volume)
return (open, close, high, low, volume, date_time, resistance_1, resistance_2, support_1, support_2, upper_support_1,
lower_resistance_1, ResistPoint, SupportPoint, clear_weakness_confirmed, clear_weakness, weakness,
clear_strength_confirmed, clear_strength, strength)
def UB_cov(x1, x2, N=10):
ub_cov = np.zeros(x1.size)
for i in range(N, x1.size):
ub_cov[i] = N / (N - 1) * (np.mean(x1[i - N + 1:i + 1] * x2[i - N + 1:i + 1]) - np.mean(x1[i - N + 1:i + 1]) * np.mean(x2[i - N + 1:i + 1]))
return ub_cov
def calculate_SR_data(open_price, close_price, high, low):
# Highs
p_resist_points = np.zeros(high.size)
p_resist_points[:] = np.nan
do_not_use_this_bar_up = np.zeros(high.size)
for i in range(2, high.size - 2):
if high[i] > high[i - 1]:
if high[i - 1] > high[i - 2]:
if high[i] > high[i + 1]:
if high[i + 1] > high[i + 2]:
p_resist_points[i] = high[i]
do_not_use_this_bar_up[i] = 1
# Lows
p_support_points = np.zeros(low.size)
p_support_points[:] = np.nan
do_not_use_this_bar_down = np.zeros(high.size)
for i in range(2, low.size - 2):
if low[i] < low[i - 1]:
if low[i - 1] < low[i - 2]:
if low[i] < low[i + 1]:
if low[i + 1] < low[i + 2]:
p_support_points[i] = low[i]
do_not_use_this_bar_down[i] = 1
# UseBar
use_bar = np.zeros(high.size)
resist_points = np.zeros(high.size)
support_points = np.zeros(high.size)
for i in range(2, high.size - 2):
use_bar[i] = (do_not_use_this_bar_up[i - 1] + do_not_use_this_bar_up[i - 2] +
do_not_use_this_bar_up[i + 1] + do_not_use_this_bar_up[i + 2] +
do_not_use_this_bar_down[i - 1] + do_not_use_this_bar_down[i - 2] +
do_not_use_this_bar_down[i + 1] + do_not_use_this_bar_down[i + 2])
for i in range(high.size):
resist_points[i] = np.nan if use_bar[i] > 0 else p_resist_points[i]
support_points[i] = np.nan if use_bar[i] > 0 else p_support_points[i]
# These are ResistPoint and SupportPoint in the CSV
ResistPoint = np.copy(resist_points)
ResistPoint[np.isnan(ResistPoint)] = 0
SupportPoint = np.copy(support_points)
SupportPoint[np.isnan(SupportPoint)] = 0
# SPNOW/RPNOW
sp_now = np.copy(low)
rp_now = np.copy(high)
for i in range(1, high.size):
if np.isnan(support_points[i]):
sp_now[i] = sp_now[i - 1]
else:
sp_now[i] = support_points[i]
if np.isnan(resist_points[i]):
rp_now[i] = rp_now[i - 1]
else:
rp_now[i] = resist_points[i]
def recursive_s(s1, s2, sp_now):
s3 = np.copy(sp_now)
for i in range(1, s3.size):
s3[i] = s2[i - 1] if s1[i] != s1[i - 1] else s3[i - 1]
return s3
s1 = np.copy(sp_now)
s2 = recursive_s(s1, s1, sp_now)
s3 = recursive_s(s1, s2, sp_now)
s4 = recursive_s(s1, s3, sp_now)
s5 = recursive_s(s1, s4, sp_now)
s6 = recursive_s(s1, s5, sp_now)
s7 = recursive_s(s1, s6, sp_now)
s8 = recursive_s(s1, s7, sp_now)
s9 = recursive_s(s1, s8, sp_now)
s10 = recursive_s(s1, s9, sp_now)
s11 = recursive_s(s1, s10, sp_now)
s12 = recursive_s(s1, s11, sp_now)
s13 = recursive_s(s1, s12, sp_now)
s14 = recursive_s(s1, s13, sp_now)
s15 = recursive_s(s1, s14, sp_now)
s16 = recursive_s(s1, s15, sp_now)
s17 = recursive_s(s1, s16, sp_now)
s18 = recursive_s(s1, s17, sp_now)
s19 = recursive_s(s1, s18, sp_now)
s20 = recursive_s(s1, s19, sp_now)
s21 = recursive_s(s1, s20, sp_now)
s22 = recursive_s(s1, s21, sp_now)
s23 = recursive_s(s1, s22, sp_now)
s24 = recursive_s(s1, s23, sp_now)
s25 = recursive_s(s1, s24, sp_now)
s26 = recursive_s(s1, s25, sp_now)
s27 = recursive_s(s1, s26, sp_now)
s28 = recursive_s(s1, s27, sp_now)
s29 = recursive_s(s1, s28, sp_now)
s30 = recursive_s(s1, s29, sp_now)
def recursive_r(r1, r2, rp_now):
r3 = np.copy(rp_now)
for i in range(1, r1.size):
r3[i] = r2[i - 1] if r1[i] != r1[i - 1] else r3[i - 1]
return r3
r1 = np.copy(rp_now)
r2 = recursive_r(r1, r1, rp_now)
r3 = recursive_r(r1, r2, rp_now)
r4 = recursive_r(r1, r3, rp_now)
r5 = recursive_r(r1, r4, rp_now)
r6 = recursive_r(r1, r5, rp_now)
r7 = recursive_r(r1, r6, rp_now)
r8 = recursive_r(r1, r7, rp_now)
r9 = recursive_r(r1, r8, rp_now)
r10 = recursive_r(r1, r9, rp_now)
r11 = recursive_r(r1, r10, rp_now)
r12 = recursive_r(r1, r11, rp_now)
r13 = recursive_r(r1, r12, rp_now)
r14 = recursive_r(r1, r13, rp_now)
r15 = recursive_r(r1, r14, rp_now)
r16 = recursive_r(r1, r15, rp_now)
r17 = recursive_r(r1, r16, rp_now)
r18 = recursive_r(r1, r17, rp_now)
r19 = recursive_r(r1, r18, rp_now)
r20 = recursive_r(r1, r19, rp_now)
r21 = recursive_r(r1, r20, rp_now)
r22 = recursive_r(r1, r21, rp_now)
r23 = recursive_r(r1, r22, rp_now)
r24 = recursive_r(r1, r23, rp_now)
r25 = recursive_r(r1, r24, rp_now)
r26 = recursive_r(r1, r25, rp_now)
r27 = recursive_r(r1, r26, rp_now)
r28 = recursive_r(r1, r27, rp_now)
r29 = recursive_r(r1, r28, rp_now)
r30 = recursive_r(r1, r29, rp_now)
# Pivots
pp = np.zeros(high.size)
pivot_r1 = np.zeros(pp.size)
pivot_r2 = np.zeros(pp.size)
pivot_r3 = np.zeros(pp.size)
pivot_s1 = np.zeros(pp.size)
pivot_s2 = np.zeros(pp.size)
pivot_s3 = np.zeros(pp.size)
for i in range(PIVOT_LOOK_BACK, pp.size):
pp[i] = (np.amax(high[i - PIVOT_LOOK_BACK:i]) + np.amin(low[i - PIVOT_LOOK_BACK:i]) + 2 * open_price[i]) / 4
pivot_r1[i] = 2 * pp[i] - np.amin(low[i - PIVOT_LOOK_BACK:i])
pivot_s1[i] = 2 * pp[i] - np.amax(high[i - PIVOT_LOOK_BACK:i])
pivot_r2[i] = pp[i] + pivot_r1[i] - pivot_s1[i]
pivot_s2[i] = pp[i] + pivot_s1[i] - pivot_r1[i]
pivot_r3[i] = pivot_r2[i] + pivot_r1[i] - pp[i]
pivot_s3[i] = pivot_s2[i] + pivot_s1[i] - pp[i]
# Refs
ref_pt1 = np.zeros(high.size)
for i in range(LOOK_BACK_PERIOD - 1, ref_pt1.size):
ref_pt1[i] = np.amin(low[i - LOOK_BACK_PERIOD + 1:i + 1])
next_lower_support_1 = np.zeros(high.size)
for i in range(next_lower_support_1.size):
if ref_pt1[i] >= s1[i]:
next_lower_support_1[i] = s1[i]
elif ref_pt1[i] >= s2[i]:
next_lower_support_1[i] = s2[i]
elif ref_pt1[i] >= s3[i]:
next_lower_support_1[i] = s3[i]
elif ref_pt1[i] >= s4[i]:
next_lower_support_1[i] = s4[i]
elif ref_pt1[i] >= s5[i]:
next_lower_support_1[i] = s5[i]
elif ref_pt1[i] >= s6[i]:
next_lower_support_1[i] = s6[i]
elif ref_pt1[i] >= s7[i]:
next_lower_support_1[i] = s7[i]
elif ref_pt1[i] >= s8[i]:
next_lower_support_1[i] = s8[i]
elif ref_pt1[i] >= s9[i]:
next_lower_support_1[i] = s9[i]
elif ref_pt1[i] >= s10[i]:
next_lower_support_1[i] = s10[i]
elif ref_pt1[i] >= s11[i]:
next_lower_support_1[i] = s11[i]
elif ref_pt1[i] >= s12[i]:
next_lower_support_1[i] = s12[i]
elif ref_pt1[i] >= s13[i]:
next_lower_support_1[i] = s13[i]
elif ref_pt1[i] >= s14[i]:
next_lower_support_1[i] = s14[i]
elif ref_pt1[i] >= s15[i]:
next_lower_support_1[i] = s15[i]
elif ref_pt1[i] >= s16[i]:
next_lower_support_1[i] = s16[i]
elif ref_pt1[i] >= s17[i]:
next_lower_support_1[i] = s17[i]
elif ref_pt1[i] >= s18[i]:
next_lower_support_1[i] = s18[i]
elif ref_pt1[i] >= s19[i]:
next_lower_support_1[i] = s19[i]
elif ref_pt1[i] >= s20[i]:
next_lower_support_1[i] = s20[i]
elif ref_pt1[i] >= s21[i]:
next_lower_support_1[i] = s21[i]
elif ref_pt1[i] >= s22[i]:
next_lower_support_1[i] = s22[i]
elif ref_pt1[i] >= s23[i]:
next_lower_support_1[i] = s23[i]
elif ref_pt1[i] >= s24[i]:
next_lower_support_1[i] = s24[i]
elif ref_pt1[i] >= s25[i]:
next_lower_support_1[i] = s25[i]
elif ref_pt1[i] >= s26[i]:
next_lower_support_1[i] = s26[i]
elif ref_pt1[i] >= s27[i]:
next_lower_support_1[i] = s27[i]
elif ref_pt1[i] >= s28[i]:
next_lower_support_1[i] = s28[i]
elif ref_pt1[i] >= s29[i]:
next_lower_support_1[i] = s29[i]
elif ref_pt1[i] >= s30[i]:
next_lower_support_1[i] = s30[i]
elif ref_pt1[i] >= pivot_s1[i]:
next_lower_support_1[i] = pivot_s1[i]
elif ref_pt1[i] >= pivot_s2[i]:
next_lower_support_1[i] = pivot_s2[i]
elif ref_pt1[i] >= pivot_s3[i]:
next_lower_support_1[i] = pivot_s3[i]
else:
if i >= 144:
lookback = 144
else:
lookback = int(1 * i)
next_lower_support_1[i] = np.amin(low[i - lookback + 1:i + 1])
ref_pt2 = next_lower_support_1 * (1 - PCT_DIFFERENCE)
next_lower_support_2 = np.zeros(high.size)
for i in range(next_lower_support_2.size):
if ref_pt2[i] > s1[i]:
next_lower_support_2[i] = s1[i]
elif ref_pt2[i] > s2[i]:
next_lower_support_2[i] = s2[i]
elif ref_pt2[i] > s3[i]:
next_lower_support_2[i] = s3[i]
elif ref_pt2[i] > s4[i]:
next_lower_support_2[i] = s4[i]
elif ref_pt2[i] > s5[i]:
next_lower_support_2[i] = s5[i]
elif ref_pt2[i] > s6[i]:
next_lower_support_2[i] = s6[i]
elif ref_pt2[i] > s7[i]:
next_lower_support_2[i] = s7[i]
elif ref_pt2[i] > s8[i]:
next_lower_support_2[i] = s8[i]
elif ref_pt2[i] > s9[i]:
next_lower_support_2[i] = s9[i]
elif ref_pt2[i] > s10[i]:
next_lower_support_2[i] = s10[i]
elif ref_pt2[i] > s11[i]:
next_lower_support_2[i] = s11[i]
elif ref_pt2[i] > s12[i]:
next_lower_support_2[i] = s12[i]
elif ref_pt2[i] > s13[i]:
next_lower_support_2[i] = s13[i]
elif ref_pt2[i] > s14[i]:
next_lower_support_2[i] = s14[i]
elif ref_pt2[i] > s15[i]:
next_lower_support_2[i] = s15[i]
elif ref_pt2[i] > s16[i]:
next_lower_support_2[i] = s16[i]
elif ref_pt2[i] > s17[i]:
next_lower_support_2[i] = s17[i]
elif ref_pt2[i] > s18[i]:
next_lower_support_2[i] = s18[i]
elif ref_pt2[i] > s19[i]:
next_lower_support_2[i] = s19[i]
elif ref_pt2[i] > s20[i]:
next_lower_support_2[i] = s20[i]
elif ref_pt2[i] > s21[i]:
next_lower_support_2[i] = s21[i]
elif ref_pt2[i] > s22[i]:
next_lower_support_2[i] = s22[i]
elif ref_pt2[i] > s23[i]:
next_lower_support_2[i] = s23[i]
elif ref_pt2[i] > s24[i]:
next_lower_support_2[i] = s24[i]
elif ref_pt2[i] > s25[i]:
next_lower_support_2[i] = s25[i]
elif ref_pt2[i] > s26[i]:
next_lower_support_2[i] = s26[i]
elif ref_pt2[i] > s27[i]:
next_lower_support_2[i] = s27[i]
elif ref_pt2[i] > s28[i]:
next_lower_support_2[i] = s28[i]
elif ref_pt2[i] > s29[i]:
next_lower_support_2[i] = s29[i]
elif ref_pt2[i] > s30[i]:
next_lower_support_2[i] = s30[i]
elif ref_pt2[i] > pivot_s1[i]:
next_lower_support_2[i] = pivot_s1[i]
elif ref_pt2[i] > pivot_s2[i]:
next_lower_support_2[i] = pivot_s2[i]
elif ref_pt2[i] > pivot_s3[i]:
next_lower_support_2[i] = pivot_s3[i]
else:
next_lower_support_2[i] = ref_pt2[i]
ref_pt2a = next_lower_support_2 * (1 - PCT_DIFFERENCE)
next_lower_support_3 = np.zeros(high.size)
for i in range(next_lower_support_3.size):
if ref_pt2a[i] > s1[i]:
next_lower_support_3[i] = s1[i]
elif ref_pt2a[i] > s2[i]:
next_lower_support_3[i] = s2[i]
elif ref_pt2a[i] > s3[i]:
next_lower_support_3[i] = s3[i]
elif ref_pt2a[i] > s4[i]:
next_lower_support_3[i] = s4[i]
elif ref_pt2a[i] > s5[i]:
next_lower_support_3[i] = s5[i]
elif ref_pt2a[i] > s6[i]:
next_lower_support_3[i] = s6[i]
elif ref_pt2a[i] > s7[i]:
next_lower_support_3[i] = s7[i]
elif ref_pt2a[i] > s8[i]:
next_lower_support_3[i] = s8[i]
elif ref_pt2a[i] > s9[i]:
next_lower_support_3[i] = s9[i]
elif ref_pt2a[i] > s10[i]:
next_lower_support_3[i] = s10[i]
elif ref_pt2a[i] > s11[i]:
next_lower_support_3[i] = s11[i]
elif ref_pt2a[i] > s12[i]:
next_lower_support_3[i] = s12[i]
elif ref_pt2a[i] > s13[i]:
next_lower_support_3[i] = s13[i]
elif ref_pt2a[i] > s14[i]:
next_lower_support_3[i] = s14[i]
elif ref_pt2a[i] > s15[i]:
next_lower_support_3[i] = s15[i]
elif ref_pt2a[i] > s16[i]:
next_lower_support_3[i] = s16[i]
elif ref_pt2a[i] > s17[i]:
next_lower_support_3[i] = s17[i]
elif ref_pt2a[i] > s18[i]:
next_lower_support_3[i] = s18[i]
elif ref_pt2a[i] > s19[i]:
next_lower_support_3[i] = s19[i]
elif ref_pt2a[i] > s20[i]:
next_lower_support_3[i] = s20[i]
elif ref_pt2a[i] > s21[i]:
next_lower_support_3[i] = s21[i]
elif ref_pt2a[i] > s22[i]:
next_lower_support_3[i] = s22[i]
elif ref_pt2a[i] > s23[i]:
next_lower_support_3[i] = s23[i]
elif ref_pt2a[i] > s24[i]:
next_lower_support_3[i] = s24[i]
elif ref_pt2a[i] > s25[i]:
next_lower_support_3[i] = s25[i]
elif ref_pt2a[i] > s26[i]:
next_lower_support_3[i] = s26[i]
elif ref_pt2a[i] > s27[i]:
next_lower_support_3[i] = s27[i]
elif ref_pt2a[i] > s28[i]:
next_lower_support_3[i] = s28[i]
elif ref_pt2a[i] > s29[i]:
next_lower_support_3[i] = s29[i]
elif ref_pt2a[i] > s30[i]:
next_lower_support_3[i] = s30[i]
elif ref_pt2a[i] > pivot_s1[i]:
next_lower_support_3[i] = pivot_s1[i]
elif ref_pt2a[i] > pivot_s2[i]:
next_lower_support_3[i] = pivot_s2[i]
elif ref_pt2a[i] > pivot_s3[i]:
next_lower_support_3[i] = pivot_s3[i]
else:
next_lower_support_3[i] = ref_pt2a[i]
ref_pt3 = np.zeros(high.size)
for i in range(LOOK_BACK_PERIOD - 1, ref_pt3.size):
ref_pt3[i] = np.amax(high[i - LOOK_BACK_PERIOD + 1:i + 1])
next_higher_resist_1 = np.zeros(high.size)
for i in range(next_higher_resist_1.size):
if ref_pt3[i] <= r1[i]:
next_higher_resist_1[i] = r1[i]
elif ref_pt3[i] <= r2[i]:
next_higher_resist_1[i] = r2[i]
elif ref_pt3[i] <= r3[i]:
next_higher_resist_1[i] = r3[i]
elif ref_pt3[i] <= r4[i]:
next_higher_resist_1[i] = r4[i]
elif ref_pt3[i] <= r5[i]:
next_higher_resist_1[i] = r5[i]
elif ref_pt3[i] <= r6[i]:
next_higher_resist_1[i] = r6[i]
elif ref_pt3[i] <= r7[i]:
next_higher_resist_1[i] = r7[i]
elif ref_pt3[i] <= r8[i]:
next_higher_resist_1[i] = r8[i]
elif ref_pt3[i] <= r9[i]:
next_higher_resist_1[i] = r9[i]
elif ref_pt3[i] <= r10[i]:
next_higher_resist_1[i] = r10[i]
elif ref_pt3[i] <= r11[i]:
next_higher_resist_1[i] = r11[i]
elif ref_pt3[i] <= r12[i]:
next_higher_resist_1[i] = r12[i]
elif ref_pt3[i] <= r13[i]:
next_higher_resist_1[i] = r13[i]
elif ref_pt3[i] <= r14[i]:
next_higher_resist_1[i] = r14[i]
elif ref_pt3[i] <= r15[i]:
next_higher_resist_1[i] = r15[i]
elif ref_pt3[i] <= r16[i]:
next_higher_resist_1[i] = r16[i]
elif ref_pt3[i] <= r17[i]:
next_higher_resist_1[i] = r17[i]
elif ref_pt3[i] <= r18[i]:
next_higher_resist_1[i] = r18[i]
elif ref_pt3[i] <= r19[i]:
next_higher_resist_1[i] = r19[i]
elif ref_pt3[i] <= r20[i]:
next_higher_resist_1[i] = r20[i]
elif ref_pt3[i] <= r21[i]:
next_higher_resist_1[i] = r21[i]
elif ref_pt3[i] <= r22[i]:
next_higher_resist_1[i] = r22[i]
elif ref_pt3[i] <= r23[i]:
next_higher_resist_1[i] = r23[i]
elif ref_pt3[i] <= r24[i]:
next_higher_resist_1[i] = r24[i]
elif ref_pt3[i] <= r25[i]:
next_higher_resist_1[i] = r25[i]
elif ref_pt3[i] <= r26[i]:
next_higher_resist_1[i] = r26[i]
elif ref_pt3[i] <= r27[i]:
next_higher_resist_1[i] = r27[i]
elif ref_pt3[i] <= r28[i]:
next_higher_resist_1[i] = r28[i]
elif ref_pt3[i] <= r29[i]:
next_higher_resist_1[i] = r29[i]
elif ref_pt3[i] <= r30[i]:
next_higher_resist_1[i] = r30[i]
elif ref_pt3[i] <= pivot_r1[i]:
next_higher_resist_1[i] = pivot_r1[i]
elif ref_pt3[i] <= pivot_r2[i]:
next_higher_resist_1[i] = pivot_r2[i]
elif ref_pt3[i] <= pivot_r3[i]:
next_higher_resist_1[i] = pivot_r3[i]
else:
if i >= 144:
lookback = 144
else:
lookback = int(1 * i)
next_higher_resist_1[i] = np.amax(high[i - lookback + 1:i + 1])
ref_pt4 = next_higher_resist_1 * (1 + PCT_DIFFERENCE)
next_higher_resist_2 = np.zeros(high.size)
for i in range(next_higher_resist_2.size):
if ref_pt4[i] < r1[i]:
next_higher_resist_2[i] = r1[i]
elif ref_pt4[i] < r2[i]:
next_higher_resist_2[i] = r2[i]
elif ref_pt4[i] < r3[i]:
next_higher_resist_2[i] = r3[i]
elif ref_pt4[i] < r4[i]:
next_higher_resist_2[i] = r4[i]
elif ref_pt4[i] < r5[i]:
next_higher_resist_2[i] = r5[i]
elif ref_pt4[i] < r6[i]:
next_higher_resist_2[i] = r6[i]
elif ref_pt4[i] < r7[i]:
next_higher_resist_2[i] = r7[i]
elif ref_pt4[i] < r8[i]:
next_higher_resist_2[i] = r8[i]
elif ref_pt4[i] < r9[i]:
next_higher_resist_2[i] = r9[i]
elif ref_pt4[i] < r10[i]:
next_higher_resist_2[i] = r10[i]
elif ref_pt4[i] < r11[i]:
next_higher_resist_2[i] = r11[i]
elif ref_pt4[i] < r12[i]:
next_higher_resist_2[i] = r12[i]
elif ref_pt4[i] < r13[i]:
next_higher_resist_2[i] = r13[i]
elif ref_pt4[i] < r14[i]:
next_higher_resist_2[i] = r14[i]
elif ref_pt4[i] < r15[i]:
next_higher_resist_2[i] = r15[i]
elif ref_pt4[i] < r16[i]:
next_higher_resist_2[i] = r16[i]
elif ref_pt4[i] < r17[i]:
next_higher_resist_2[i] = r17[i]
elif ref_pt4[i] < r18[i]:
next_higher_resist_2[i] = r18[i]
elif ref_pt4[i] < r19[i]:
next_higher_resist_2[i] = r19[i]
elif ref_pt4[i] < r20[i]:
next_higher_resist_2[i] = r20[i]
elif ref_pt4[i] < r21[i]:
next_higher_resist_2[i] = r21[i]
elif ref_pt4[i] < r22[i]:
next_higher_resist_2[i] = r22[i]
elif ref_pt4[i] < r23[i]:
next_higher_resist_2[i] = r23[i]
elif ref_pt4[i] < r24[i]:
next_higher_resist_2[i] = r24[i]
elif ref_pt4[i] < r25[i]:
next_higher_resist_2[i] = r25[i]
elif ref_pt4[i] < r26[i]:
next_higher_resist_2[i] = r26[i]
elif ref_pt4[i] < r27[i]:
next_higher_resist_2[i] = r27[i]
elif ref_pt4[i] < r28[i]:
next_higher_resist_2[i] = r28[i]
elif ref_pt4[i] < r29[i]:
next_higher_resist_2[i] = r29[i]
elif ref_pt4[i] < r30[i]:
next_higher_resist_2[i] = r30[i]
elif ref_pt4[i] < pivot_r1[i]:
next_higher_resist_2[i] = pivot_r1[i]
elif ref_pt4[i] < pivot_r2[i]:
next_higher_resist_2[i] = pivot_r2[i]
elif ref_pt4[i] < pivot_r3[i]:
next_higher_resist_2[i] = pivot_r3[i]
else:
next_higher_resist_2[i] = ref_pt4[i]
ref_pt4a = next_higher_resist_2 * (1 + PCT_DIFFERENCE)
next_higher_resist_3 = np.zeros(high.size)
for i in range(next_higher_resist_3.size):
if ref_pt4a[i] < r1[i]:
next_higher_resist_3[i] = r1[i]
elif ref_pt4a[i] < r2[i]:
next_higher_resist_3[i] = r2[i]
elif ref_pt4a[i] < r3[i]:
next_higher_resist_3[i] = r3[i]
elif ref_pt4a[i] < r4[i]:
next_higher_resist_3[i] = r4[i]
elif ref_pt4a[i] < r5[i]:
next_higher_resist_3[i] = r5[i]
elif ref_pt4a[i] < r6[i]:
next_higher_resist_3[i] = r6[i]
elif ref_pt4a[i] < r7[i]:
next_higher_resist_3[i] = r7[i]
elif ref_pt4a[i] < r8[i]:
next_higher_resist_3[i] = r8[i]
elif ref_pt4a[i] < r9[i]:
next_higher_resist_3[i] = r9[i]
elif ref_pt4a[i] < r10[i]:
next_higher_resist_3[i] = r10[i]
elif ref_pt4a[i] < r11[i]:
next_higher_resist_3[i] = r11[i]
elif ref_pt4a[i] < r12[i]:
next_higher_resist_3[i] = r12[i]
elif ref_pt4a[i] < r13[i]:
next_higher_resist_3[i] = r13[i]
elif ref_pt4a[i] < r14[i]:
next_higher_resist_3[i] = r14[i]
elif ref_pt4a[i] < r15[i]:
next_higher_resist_3[i] = r15[i]
elif ref_pt4a[i] < r16[i]:
next_higher_resist_3[i] = r16[i]
elif ref_pt4a[i] < r17[i]:
next_higher_resist_3[i] = r17[i]
elif ref_pt4a[i] < r18[i]:
next_higher_resist_3[i] = r18[i]
elif ref_pt4a[i] < r19[i]:
next_higher_resist_3[i] = r19[i]
elif ref_pt4a[i] < r20[i]:
next_higher_resist_3[i] = r20[i]
elif ref_pt4a[i] < r21[i]:
next_higher_resist_3[i] = r21[i]
elif ref_pt4a[i] < r22[i]:
next_higher_resist_3[i] = r22[i]
elif ref_pt4a[i] < r23[i]:
next_higher_resist_3[i] = r23[i]
elif ref_pt4a[i] < r24[i]:
next_higher_resist_3[i] = r24[i]
elif ref_pt4a[i] < r25[i]:
next_higher_resist_3[i] = r25[i]
elif ref_pt4a[i] < r26[i]:
next_higher_resist_3[i] = r26[i]
elif ref_pt4a[i] < r27[i]:
next_higher_resist_3[i] = r27[i]
elif ref_pt4a[i] < r28[i]:
next_higher_resist_3[i] = r28[i]
elif ref_pt4a[i] < r29[i]:
next_higher_resist_3[i] = r29[i]
elif ref_pt4a[i] < r30[i]:
next_higher_resist_3[i] = r30[i]
elif ref_pt4a[i] < pivot_r1[i]:
next_higher_resist_3[i] = pivot_r1[i]
elif ref_pt4a[i] < pivot_r2[i]:
next_higher_resist_3[i] = pivot_r2[i]
elif ref_pt4a[i] < pivot_r3[i]:
next_higher_resist_3[i] = pivot_r3[i]
else:
next_higher_resist_3[i] = ref_pt4a[i]
# Support/Resist Highs/Lows
support_high = np.zeros(high.size)
support_low = np.zeros(high.size)
support_next_low = np.zeros(high.size)
resist_low = np.zeros(high.size)
resist_high = np.zeros(high.size)
resist_next_high = np.zeros(high.size)
counter = np.zeros(high.size)
for i in range(high.size):
if i == 0:
support_high[i] = s1[i]
support_low[i] = s1[i]
support_next_low[i] = s1[i]
resist_low[i] = r1[i]
resist_high[i] = r1[i]
resist_next_high[i] = r1[i]
else:
if (close_price[i - 1] < support_low[i - 1]) or (close_price[i - 1] > resist_high[i - 1]):
support_high[i] = next_lower_support_1[i]
support_low[i] = next_lower_support_2[i]
support_next_low[i] = next_lower_support_3[i]
resist_low[i] = next_higher_resist_1[i]
resist_high[i] = next_higher_resist_2[i]
resist_next_high[i] = next_higher_resist_3[i]
else:
if counter[i - 1] > STABILITY_PERIOD:
support_high[i] = next_lower_support_1[i]
support_low[i] = next_lower_support_2[i]
support_next_low[i] = next_lower_support_3[i]
resist_low[i] = next_higher_resist_1[i]
resist_high[i] = next_higher_resist_2[i]
resist_next_high[i] = next_higher_resist_3[i]
else:
support_high[i] = support_high[i - 1]
support_low[i] = support_low[i - 1]
support_next_low[i] = support_next_low[i - 1]
resist_low[i] = resist_low[i - 1]
resist_high[i] = resist_high[i - 1]
resist_next_high[i] = resist_next_high[i - 1]
counter[i] = counter[i - 1] + 1
# Final variables
resistance_1 = np.copy(resist_low)
resistance_2 = np.copy(resist_high)
resistance_3 = np.copy(resist_next_high)
support_1 = np.copy(support_high)
support_2 = np.copy(support_low)
support_3 = np.copy(support_next_low)
resist_low_save_1 = np.zeros(high.size)
support_high_save_1 = np.zeros(high.size)
for i in range(1, high.size):
if resist_low[i] != resist_low[i - 1]:
resist_low_save_1[i] = resist_low[i - 1]
else:
resist_low_save_1[i] = resist_low_save_1[i - 1]
if support_high[i] != support_high[i - 1]:
support_high_save_1[i] = support_high[i - 1]
else:
support_high_save_1[i] = support_high_save_1[i - 1]
upper_support_1 = np.copy(support_high_save_1)
lower_resistance_1 = np.copy(resist_low_save_1)
return resistance_1, resistance_2, support_1, support_2, upper_support_1, lower_resistance_1, ResistPoint, SupportPoint
def calculate_VSA_variables(close, high, low, volume):
hl2 = (high + low) / 2
# Set signal locations
log = np.zeros(close.size)
bar_height = np.zeros(close.size)
R = np.zeros(close.size)
for i in range(close.size):
R[i] = np.amax([high[i] - low[i], TickSize])
log[i] = np.log(R[i])
for i in range(4, close.size):
avg = np.mean(log[i - 4:i + 1])
bar_height[i] = np.e ** avg
signal_loc_1 = bar_height * SignalPosition
# Volume and Range Distribution
V = np.zeros(close.size)
for i in range(close.size):
V[i] = np.amax([volume[i], TickSize])
V_LNmu = np.zeros(close.size)
V_LNsigma = np.zeros(close.size)
R_LNmu = np.zeros(close.size)
R_LNsigma = np.zeros(close.size)
log_V = np.log(V)
log_R = np.log(R)
for i in range(MediumTermPeriod - 1, close.size):
V_LNmu[i] = np.mean(log_V[i - MediumTermPeriod + 1:i + 1])
V_LNsigma[i] = np.std(log_V[i - MediumTermPeriod + 1:i + 1]) * np.sqrt(
MediumTermPeriod / (MediumTermPeriod - 1))
R_LNmu[i] = np.mean(log_R[i - MediumTermPeriod + 1:i + 1])
R_LNsigma[i] = np.std(log_R[i - MediumTermPeriod + 1:i + 1]) * np.sqrt(
MediumTermPeriod / (MediumTermPeriod - 1))
# Classify Volumes
V_Rank = np.zeros(close.size)
for i in range(close.size):
for vi in range(-4, 5):
check = np.e ** (
V_LNmu[i] + V_LNsigma[i] * ((vi == -4) * ((-V_LNmu[i] + np.log(TickSize / 2) / V_LNsigma[i])) +
(vi == -3) * (-1 * ExtremeDev) +
(vi == -2) * (-1 * LargeDev) +
(vi == -1) * (-1 * AboveNormalDev) +
(vi == 0) * (-1 * NormalDev) +
(vi == 1) * (NormalDev) +
(vi == 2) * (AboveNormalDev) +
(vi == 3) * (LargeDev) +
(vi == 4) * (ExtremeDev)))
if V[i] > check:
V_Rank[i] = vi
R_Rank = np.zeros(close.size)
for i in range(close.size):
for ri in range(-4, 5):
check = np.e ** (
R_LNmu[i] + R_LNsigma[i] * ((ri == -4) * ((-R_LNmu[i] + np.log(TickSize / 2) / R_LNsigma[i])) +
(ri == -3) * (-1 * ExtremeDev) +
(ri == -2) * (-1 * LargeDev) +
(ri == -1) * (-1 * AboveNormalDev) +
(ri == 0) * (-1 * NormalDev) +
(ri == 1) * (NormalDev) +
(ri == 2) * (AboveNormalDev) +
(ri == 3) * (LargeDev) +
(ri == 4) * (ExtremeDev)))
if R[i] > check:
R_Rank[i] = ri
# Classify Bars
up_bar = np.zeros(close.size)
dn_bar = np.zeros(close.size)
up_close = np.zeros(close.size)
dn_close = np.zeros(close.size)
mid_close = np.zeros(close.size)
vh_close = np.zeros(close.size)
vl_close = np.zeros(close.size)
c_rank = np.zeros(close.size)
for i in range(1, close.size):
up_bar[i] = 1 if close[i] > close[i - 1] else 0
dn_bar[i] = 1 if close[i] < close[i - 1] else 0
up_close[i] = 1 if ((close[i] - low[i]) / R[i]) > UpCloseLevel else 0
dn_close[i] = 1 if ((close[i] - low[i]) / R[i]) < DownCloseLevel else 0
mid_close[i] = 1 if not up_close[i] and not dn_close[i] else 0
vh_close[i] = 1 if up_close[i] and ((close[i] - low[i]) / R[i]) >= VeryHighCloseLevel and R[i] > TickSize else 0
vl_close[i] = 1 if dn_close[i] and ((close[i] - low[i]) / R[i]) <= VeryLowCloseLevel and R[i] > TickSize else 0
if vh_close[i]:
c_rank[i] = 2
elif up_close[i]:
c_rank[i] = 1
elif vl_close[i]:
c_rank[i] = -2
elif dn_close[i]:
c_rank[i] = -1
# Classify Trends
bar_number = np.arange(close.size)
st_trend_top = UB_cov(bar_number, close, N=ShortTermPeriod)
st_trend_bottom = UB_cov(bar_number, bar_number, N=ShortTermPeriod)
st_trend = st_trend_top / st_trend_bottom
st_trend = np.nan_to_num(st_trend)
mt_trend_top = UB_cov(bar_number, close, N=MediumTermPeriod)
mt_trend_bottom = UB_cov(bar_number, bar_number, N=MediumTermPeriod)
mt_trend = mt_trend_top / mt_trend_bottom
mt_trend = np.nan_to_num(mt_trend)
lt_trend_top = UB_cov(bar_number, close, N=LongTermPeriod)
lt_trend_bottom = UB_cov(bar_number, bar_number, N=LongTermPeriod)
lt_trend = lt_trend_top / lt_trend_bottom
lt_trend = np.nan_to_num(lt_trend)
# Indicators of Weakness
is_buying_climax = np.zeros(close.size)
is_up_thrust = np.zeros(close.size)
is_false_up_thrust = np.zeros(close.size)
is_weakness_confirmed = np.zeros(close.size)
is_supply = np.zeros(close.size)
is_supply_swamping_demand = np.zeros(close.size)
is_effort_to_move_down = np.zeros(close.size)
is_confirmed_effort_dn = np.zeros(close.size)
is_serious_swamping = np.zeros(close.size)
is_no_demand = np.zeros(close.size)
is_supply_entering = np.zeros(close.size)
is_no_pro_participation_up = np.zeros(close.size)
# Indicators of Strength
is_selling_climax = np.zeros(close.size)
is_reverse_up_thrust = np.zeros(close.size)
is_testing_for_supply = np.zeros(close.size)
is_strength_confirmed = np.zeros(close.size)
is_demand = np.zeros(close.size)
is_stopping_volume = np.zeros(close.size)
is_effort_to_move_up = np.zeros(close.size)
is_confirmed_effort_up = np.zeros(close.size)
is_serious_stopping = np.zeros(close.size)
is_no_supply = np.zeros(close.size)
is_demand_entering = np.zeros(close.size)
is_no_pro_participation_dn = np.zeros(close.size)
for i in range(1, close.size):
# Weakness
is_buying_climax[i] = 1 if up_bar[i] and c_rank[i] < 0 and V_Rank[i] > 2 and R_Rank[i] > 2 and st_trend[
i] > 0 and mt_trend[i] > 0 and high[i] > high[i - 1] else 0
is_up_thrust[i] = 1 if c_rank[i] < 0 and V_Rank[i] > 0 and R_Rank[i] > 0 and st_trend[i] > 0 and mt_trend[
i] > 0 and high[i] > high[i - 1] else 0
is_false_up_thrust[i] = 1 if (up_bar[i - 1] and V_Rank[i - 1] > 0) and dn_bar[i] and c_rank[i] < 0 and R_Rank[
i] < 1 and st_trend[i] > 0 and high[i] > high[i - 1] and not is_up_thrust[i - 1] else 0
is_weakness_confirmed[i] = 1 if dn_bar[i] or dn_close[i] or (
up_bar[i] and V_Rank[i] < 0 and high[i] < high[i - 1]) else 0
is_supply[i] = 1 if dn_bar[i] and c_rank[i] < 0 and V_Rank[i] > 0 and st_trend[i] > 0 else 0
is_supply_swamping_demand[i] = 1 if up_bar[i] and close[i] <= hl2[i] and V_Rank[i] > 0 and st_trend[i] > 0 and \
mt_trend[i] > 0 and not is_buying_climax[i] and not is_up_thrust[i] else 0
is_effort_to_move_down[i] = 1 if dn_bar[i] and c_rank[i] < 0 and (0 <= V_Rank[i] <= 2) and R_Rank[i] > 0 else 0
is_confirmed_effort_dn[i] = 1 if not (c_rank[i] > 0 and V_Rank[i] > -1) else 0
is_serious_swamping[i] = 1 if c_rank[i] < 0 and R_Rank[i] > 0 else 0
is_no_demand[i] = 1 if up_bar[i] and close[i] <= hl2[i] and V_Rank[i] < 0 and R_Rank[i] < 0 else 0
is_supply_entering[i] = 1 if up_bar[i] and V_Rank[i] > 0 and R_Rank[i] < 0 and not is_supply_swamping_demand[
i] else 0
is_no_pro_participation_up[i] = 1 if up_bar[i] and V_Rank[i] < 0 and R_Rank[i] > 0 else 0
# Strength
is_selling_climax[i] = 1 if dn_bar[i] and c_rank[i] > 0 and V_Rank[i] > 2 and R_Rank[i] > 2 and st_trend[
i] < 0 and mt_trend[i] < 0 and low[i] < low[i - 1] else 0
is_reverse_up_thrust[i] = 1 if c_rank[i] > 0 and V_Rank[i] > 0 and R_Rank[i] > 0 and st_trend[i] < 0 and \
mt_trend[i] < 0 and low[i] < low[i - 1] else 0
is_testing_for_supply[i] = 1 if c_rank[i] > 0 and V_Rank[i] < 0 and st_trend[i] < 0 and low[i] < low[
i - 1] else 0
is_strength_confirmed[i] = 1 if up_bar[i] or up_close[i] or (
dn_bar[i] and V_Rank[i] < 0 and low[i] > low[i - 1]) else 0
is_demand[i] = 1 if up_bar[i] and c_rank[i] > 0 and V_Rank[i] > 0 and st_trend[i] < 0 else 0
is_stopping_volume[i] = 1 if dn_bar[i] and close[i] >= hl2[i] and V_Rank[i] > 0 and st_trend[i] < 0 and \
mt_trend[i] < 0 and not is_selling_climax[i] and not is_reverse_up_thrust[i] else 0
is_effort_to_move_up[i] = 1 if up_bar[i] and c_rank[i] > 0 and (0 <= V_Rank[i] <= 2) and R_Rank[i] > 0 else 0
is_confirmed_effort_up[i] = 1 if not (c_rank[i] < 0 and V_Rank[i] > -1) else 0
is_serious_stopping[i] = 1 if c_rank[i] > 0 and R_Rank[i] > 0 else 0
is_no_supply[i] = 1 if dn_bar[i] and close[i] >= hl2[i] and V_Rank[i] < 0 and R_Rank[i] < 0 else 0
is_demand_entering[i] = 1 if dn_bar[i] and V_Rank[i] > 0 and R_Rank[i] < 0 and not is_stopping_volume[i] else 0
is_no_pro_participation_dn[i] = 1 if dn_bar[i] and V_Rank[i] < 0 and R_Rank[i] > 0 and not \
is_testing_for_supply[i] else 0
# Final variables
clear_weakness_confirmed = np.zeros(close.size)
clear_weakness = np.zeros(close.size)
weakness = np.zeros(close.size)
uncertain_weakness_confirmed = np.zeros(close.size)
uncertain_weakness = np.zeros(close.size)
clear_strength_confirmed = np.zeros(close.size)
clear_strength = np.zeros(close.size)
strength = np.zeros(close.size)
uncertain_strength_confirmed = np.zeros(close.size)
uncertain_strength = np.zeros(close.size)
for i in range(close.size):
if i != close.size - 1:
clear_weakness_confirmed[i] = high[i] + signal_loc_1[i] if is_buying_climax[i] or (
(is_up_thrust[i] or is_false_up_thrust[i]) and not is_weakness_confirmed[i + 1]) else 0
clear_weakness[i] = high[i] + signal_loc_1[i] if is_buying_climax[i] or is_up_thrust[i] or is_false_up_thrust[
i] else 0
weakness[i] = high[i] + signal_loc_1[i] if is_supply[i] or is_supply_swamping_demand[i] else 0
uncertain_weakness_confirmed[i] = high[i] + signal_loc_1[i] if is_no_demand[i] or is_no_pro_participation_up[
i] else 0
uncertain_weakness[i] = high[i] + signal_loc_1[i] if is_no_demand[i] or is_supply_entering[i] or \
is_no_pro_participation_up[i] else 0
if i != close.size - 1:
clear_strength_confirmed[i] = low[i] - signal_loc_1[i] if is_selling_climax[i] or (
(is_reverse_up_thrust[i] or is_testing_for_supply[i]) and not is_strength_confirmed[
i + 1]) else 0
clear_strength[i] = low[i] - signal_loc_1[i] if is_selling_climax[i] or is_reverse_up_thrust[i] or \
is_testing_for_supply[i] else 0
strength[i] = low[i] - signal_loc_1[i] if is_demand[i] or is_stopping_volume[i] else 0
uncertain_strength_confirmed[i] = low[i] - signal_loc_1[i] if is_no_supply[i] or is_no_pro_participation_dn[
i] else 0
uncertain_strength[i] = low[i] - signal_loc_1[i] if is_no_supply[i] or is_demand_entering[i] or \
is_no_pro_participation_dn[i] else 0
return clear_weakness_confirmed, clear_weakness, weakness, clear_strength_confirmed, clear_strength, strength
from AlgorithmImports import *
import pickle
from signal_classes import (
SignalTracker,
)
from utils import (
get_update_filter,
ten_min_data_to_higher_timeframe_df,
get_high_timeframe_direction,
)
from sr_functions import (
calculate_SR_data,
)
def initialize_strategy(algorithm):
algorithm.signal_tracker_dict = {}
algorithm.sr_data_dict = {}
algorithm.signal_direction_dict = {}
algorithm.price_dict = {}
algorithm.has_new_signal_bar = {}
for ticker in algorithm.strategy_setting['tickers']:
symbol = algorithm.ticker_symbol_map[ticker]
algorithm.signal_tracker_dict[ticker] = SignalTracker(
algorithm, symbol, ticker, algorithm.strategy_setting,
)
algorithm.sr_data_dict[ticker] = {}
algorithm.signal_direction_dict[ticker] = 0
algorithm.price_dict[ticker] = None
algorithm.has_new_signal_bar[ticker] = False
def update_strategy(algorithm, ticker):
if ticker not in algorithm.strategy_setting['tickers']:
return
if ticker not in algorithm.ticker_data_dict:
return
if algorithm.Time >= algorithm.SymbolMarketHours[algorithm.ref_symbol].CurrentClose:
return
algorithm.has_new_signal_bar[ticker] = True
ten_min_data = list(algorithm.ticker_data_dict[ticker].values())
timeframe_ref = 'timeframe_1'
if get_update_filter(algorithm.Time, algorithm.strategy_setting[timeframe_ref], algorithm.SymbolMarketHours[algorithm.ref_symbol], algorithm.general_setting):
algorithm.sr_data_dict[ticker][timeframe_ref] = {}
timeframe_data_df = ten_min_data_to_higher_timeframe_df(algorithm.Time, ten_min_data, algorithm.strategy_setting[timeframe_ref]).tail(algorithm.general_setting['max_bars_to_compute_sr'])
(
algorithm.sr_data_dict[ticker][timeframe_ref]['resistance_1'], algorithm.sr_data_dict[ticker][timeframe_ref]['resistance_2'],
algorithm.sr_data_dict[ticker][timeframe_ref]['support_1'], algorithm.sr_data_dict[ticker][timeframe_ref]['support_2'],
algorithm.sr_data_dict[ticker][timeframe_ref]['upper_support_1'], algorithm.sr_data_dict[ticker][timeframe_ref]['lower_resistance_1'],
algorithm.sr_data_dict[ticker][timeframe_ref]['ResistPoint'], algorithm.sr_data_dict[ticker][timeframe_ref]['SupportPoint']
) = calculate_SR_data(
timeframe_data_df['open'].values, timeframe_data_df['close'].values,
timeframe_data_df['high'].values, timeframe_data_df['low'].values,
)
algorithm.sr_data_dict[ticker][timeframe_ref]['open'] = timeframe_data_df['open'].values
algorithm.sr_data_dict[ticker][timeframe_ref]['high'] = timeframe_data_df['high'].values
algorithm.sr_data_dict[ticker][timeframe_ref]['low'] = timeframe_data_df['low'].values
algorithm.sr_data_dict[ticker][timeframe_ref]['close'] = timeframe_data_df['close'].values
if not algorithm.strategy_setting['one_timeframe_strategy']:
for timeframe_ref in ['timeframe_2','timeframe_3']:
if not get_update_filter(algorithm.Time, algorithm.strategy_setting[timeframe_ref], algorithm.SymbolMarketHours[algorithm.ref_symbol], algorithm.general_setting):
continue
algorithm.sr_data_dict[ticker][timeframe_ref] = {}
timeframe_data_df = ten_min_data_to_higher_timeframe_df(algorithm.Time, ten_min_data, algorithm.strategy_setting[timeframe_ref]).tail(algorithm.general_setting['max_bars_to_compute_sr'])
(
algorithm.sr_data_dict[ticker][timeframe_ref]['resistance_1'], algorithm.sr_data_dict[ticker][timeframe_ref]['resistance_2'],
algorithm.sr_data_dict[ticker][timeframe_ref]['support_1'], algorithm.sr_data_dict[ticker][timeframe_ref]['support_2'],
algorithm.sr_data_dict[ticker][timeframe_ref]['upper_support_1'], algorithm.sr_data_dict[ticker][timeframe_ref]['lower_resistance_1'],
algorithm.sr_data_dict[ticker][timeframe_ref]['ResistPoint'], algorithm.sr_data_dict[ticker][timeframe_ref]['SupportPoint']
) = calculate_SR_data(
timeframe_data_df['open'].values, timeframe_data_df['close'].values,
timeframe_data_df['high'].values, timeframe_data_df['low'].values,
)
algorithm.sr_data_dict[ticker][timeframe_ref]['open'] = timeframe_data_df['open'].values
algorithm.sr_data_dict[ticker][timeframe_ref]['high'] = timeframe_data_df['high'].values
algorithm.sr_data_dict[ticker][timeframe_ref]['low'] = timeframe_data_df['low'].values
algorithm.sr_data_dict[ticker][timeframe_ref]['close'] = timeframe_data_df['close'].values
algorithm.signal_direction_dict[ticker] = 0
(is_valid_long_entry, is_valid_short_entry, break_direction, price_to_break) = get_entry_filters(
algorithm, ticker, algorithm.strategy_setting['entry_timeframe_ref'], algorithm.strategy_setting['entry_style'],
use_entry_distance_filter=algorithm.strategy_setting['use_entry_distance_filter'],
use_break_for_entry=algorithm.strategy_setting['use_break_for_entry'],
)
algorithm.signal_tracker_dict[ticker].update_break_direction(break_direction, price_to_break)
if algorithm.strategy_setting['one_timeframe_strategy']:
if is_valid_long_entry:
algorithm.signal_direction_dict[ticker] = 1
elif is_valid_short_entry:
algorithm.signal_direction_dict[ticker] = -1
else:
if is_valid_long_entry or is_valid_short_entry:
high_timeframe_direction = get_high_timeframe_direction(algorithm, "timeframe_1", ticker)
if algorithm.strategy_setting['use_multiple_higher_timeframe_for_entry'] and (high_timeframe_direction == 0):
high_timeframe_direction = get_high_timeframe_direction(algorithm, "timeframe_2", ticker)
if is_valid_long_entry and (high_timeframe_direction > 0):
algorithm.signal_direction_dict[ticker] = 1
elif is_valid_short_entry and (high_timeframe_direction < 0):
algorithm.signal_direction_dict[ticker] = -1
def update_signal_tracker(algorithm):
for ticker in algorithm.strategy_setting['tickers']:
if algorithm.price_dict[ticker] is None:
continue
symbol = algorithm.ticker_symbol_map[ticker]
has_enter_trade = algorithm.signal_tracker_dict[ticker].check_entry(algorithm.signal_direction_dict[ticker], algorithm.price_dict[ticker])
has_exit_trade = False
if has_enter_trade:
algorithm.Log(f"{str(algorithm.Time)}: {ticker}: ENTRY: Price: {algorithm.price_dict[ticker]:,.2f}: Quantity: {algorithm.signal_tracker_dict[ticker].quantity}")
order_dict = {"symbol": symbol, "quantity": algorithm.signal_tracker_dict[ticker].quantity}
algorithm.order_que += [order_dict]
else:
to_exit = algorithm.signal_tracker_dict[ticker].check_exit(algorithm.price_dict[ticker])
if to_exit:
algorithm.Log(f"{str(algorithm.Time)}: {ticker}: EXIT: {algorithm.signal_tracker_dict[ticker].exit_type}: Price: {algorithm.price_dict[ticker]:,.2f}")
order_dict = {"symbol": symbol, "quantity": -algorithm.signal_tracker_dict[ticker].quantity}
algorithm.order_que += [order_dict]
algorithm.signal_tracker_dict[ticker].reset()
has_exit_trade = True
algorithm.signal_direction_dict[ticker] = 0
if algorithm.has_new_signal_bar[ticker]:
algorithm.has_new_signal_bar[ticker] = False
if not has_exit_trade:
break_signal_direction = algorithm.signal_tracker_dict[ticker].get_break_signal_direction(algorithm.sr_data_dict[ticker][algorithm.strategy_setting['entry_timeframe_ref']]['close'][-1])
has_enter_trade = algorithm.signal_tracker_dict[ticker].check_entry(break_signal_direction, algorithm.price_dict[ticker])
if has_enter_trade:
algorithm.Log(f"{str(algorithm.Time)}: {ticker}: BREAK ENTRY: Price: {algorithm.price_dict[ticker]:,.2f}: Quantity: {algorithm.signal_tracker_dict[ticker].quantity}")
order_dict = {"symbol": symbol, "quantity": algorithm.signal_tracker_dict[ticker].quantity}
algorithm.order_que += [order_dict]
algorithm.signal_tracker_dict[ticker].update_stoploss(
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['trailing_stoploss_timeframe_ref']]['high'][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['trailing_stoploss_timeframe_ref']]['low'][-1],
)
if algorithm.strategy_setting['use_point_exit']:
to_exit = algorithm.signal_tracker_dict[ticker].check_early_exit(
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']]['close'][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']][algorithm.strategy_setting['early_exit_support_ref']][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']][algorithm.strategy_setting['early_exit_resistance_ref']][-1],
support_point=algorithm.sr_data_dict[ticker][algorithm.strategy_setting['point_exit_timeframe_ref']]['SupportPoint'][-2],
resist_point=algorithm.sr_data_dict[ticker][algorithm.strategy_setting['point_exit_timeframe_ref']]['ResistPoint'][-2],
)
elif algorithm.strategy_setting['use_multi_support_early_exit']:
to_exit = algorithm.signal_tracker_dict[ticker].check_early_exit(
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']]['close'][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']][algorithm.strategy_setting['early_exit_support_ref']][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']][algorithm.strategy_setting['early_exit_resistance_ref']][-1],
price_2=algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_price_ref_2']]['close'][-1],
support_price_2=algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref_2']][algorithm.strategy_setting['early_exit_support_ref_2']][-1],
resistance_price_2=algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref_2']][algorithm.strategy_setting['early_exit_resistance_ref_2']][-1],
)
else:
to_exit = algorithm.signal_tracker_dict[ticker].check_early_exit(
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']]['close'][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']][algorithm.strategy_setting['early_exit_support_ref']][-1],
algorithm.sr_data_dict[ticker][algorithm.strategy_setting['early_exit_timeframe_ref']][algorithm.strategy_setting['early_exit_resistance_ref']][-1],
)
if to_exit:
algorithm.Log(f"{str(algorithm.Time)}: {ticker}: EARLY EXIT: {algorithm.signal_tracker_dict[ticker].exit_type}: Price: {algorithm.price_dict[ticker]:,.2f}")
order_dict = {"symbol": symbol, "quantity": -algorithm.signal_tracker_dict[ticker].quantity}
algorithm.order_que += [order_dict]
algorithm.signal_tracker_dict[ticker].reset()
def get_entry_filters(
algorithm, ticker, timeframe_ref, entry_style,
use_entry_distance_filter=False,
use_break_for_entry=False,
):
break_direction = 0
price_to_break = None
is_valid_long_entry = False
is_valid_short_entry = False
if entry_style in ['point']:
if algorithm.sr_data_dict[ticker][timeframe_ref]['SupportPoint'][-3] > 0:
is_valid_long_entry = True
elif algorithm.sr_data_dict[ticker][timeframe_ref]['ResistPoint'][-3] > 0:
is_valid_short_entry = True
elif entry_style in ['upper_lower_sr']:
if algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-2] > algorithm.sr_data_dict[ticker][timeframe_ref]['lower_resistance_1'][-2]:
is_valid_long_entry = True
elif algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-2] < algorithm.sr_data_dict[ticker][timeframe_ref]['upper_support_1'][-2]:
is_valid_short_entry = True
if use_entry_distance_filter:
if is_valid_long_entry:
resist_distance_timeframe_1 = 100 * (algorithm.sr_data_dict[ticker]['timeframe_1']['resistance_1'][-1] - algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1]) / algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1]
resist_distance_timeframe_2 = 100 * (algorithm.sr_data_dict[ticker]['timeframe_2']['resistance_1'][-1] - algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1]) / algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1]
is_valid_long_entry = False
if (resist_distance_timeframe_1 < 0) or (resist_distance_timeframe_1 > algorithm.strategy_setting['sr_level_threshold_pct']):
if (resist_distance_timeframe_2 < 0) or (resist_distance_timeframe_2 > algorithm.strategy_setting['sr_level_threshold_pct']):
if use_break_for_entry:
if (resist_distance_timeframe_1 < 0) or (resist_distance_timeframe_2 < 0):
is_valid_long_entry = True
else:
break_direction = 1
if resist_distance_timeframe_1 > algorithm.strategy_setting['sr_level_threshold_pct']:
price_to_break = algorithm.sr_data_dict[ticker]['timeframe_1']['resistance_1'][-1]
else:
price_to_break = algorithm.sr_data_dict[ticker]['timeframe_2']['resistance_1'][-1]
##-##
if price_to_break is None:
algorithm.Log("!!!")
##-##
else:
is_valid_long_entry = True
elif is_valid_short_entry:
support_distance_timeframe_1 = 100 * (algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1] - algorithm.sr_data_dict[ticker]['timeframe_1']['support_1'][-1]) / algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1]
support_distance_timeframe_2 = 100 * (algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1] - algorithm.sr_data_dict[ticker]['timeframe_2']['support_1'][-1]) / algorithm.sr_data_dict[ticker][timeframe_ref]['close'][-1]
is_valid_short_entry = False
if (support_distance_timeframe_1 < 0) or (support_distance_timeframe_1 > algorithm.strategy_setting['sr_level_threshold_pct']):
if (support_distance_timeframe_2 < 0) or (support_distance_timeframe_2 > algorithm.strategy_setting['sr_level_threshold_pct']):
if use_break_for_entry:
if (support_distance_timeframe_1 < 0) or (support_distance_timeframe_2 < 0):
is_valid_short_entry = True
else:
break_direction = -1
if support_distance_timeframe_1 > algorithm.strategy_setting['sr_level_threshold_pct']:
price_to_break = algorithm.sr_data_dict[ticker]['timeframe_1']['support_1'][-1]
else:
price_to_break = algorithm.sr_data_dict[ticker]['timeframe_2']['support_1'][-1]
##-##
if price_to_break is None:
algorithm.Log("!!!")
##-##
else:
is_valid_short_entry = True
return (is_valid_long_entry, is_valid_short_entry, break_direction, price_to_break)
#region imports
from AlgorithmImports import *
#endregion
import scipy
import math
def get_position_size(price, riskPct, algorithm, symbol):
Balance = algorithm.Portfolio.TotalPortfolioValue
LotSize = algorithm.Securities[symbol].SymbolProperties.LotSize
conversionRate = algorithm.Securities[symbol].QuoteCurrency.ConversionRate
units = int(math.floor(Balance / (price * LotSize * conversionRate) * riskPct))
return units
def timeframe_to_minutes(timeframe):
tf_to_m = {
'4H': 4 * 60,
'2H': 2 * 60,
'1H': 1 * 60,
'30m': 30,
'10m': 10,
}
return tf_to_m[timeframe]
def get_update_filter(cur_datetime, timeframe, market_hour, general_setting):
cur_datetime_truncated = cur_datetime.replace(second=0, microsecond=0)
update_filter = False
if timeframe == "1D":
update_filter = cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=1))
elif timeframe == "2H":
update_filter = cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=1))
update_filter = update_filter or (cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=121)))
update_filter = update_filter or (cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=241)))
update_filter = update_filter or (cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=361)))
elif timeframe == "4H":
update_filter = cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=1))
update_filter = update_filter or (cur_datetime_truncated == (market_hour.CurrentOpen + timedelta(minutes=241)))
else:
if timeframe in ['10m']:
update_filter = cur_datetime_truncated.minute in [1, 11, 21, 31, 41, 51]
elif timeframe in ['30m']:
update_filter = cur_datetime_truncated.minute in [1, 31]
elif timeframe in ['1H']:
update_filter = cur_datetime_truncated.minute in [31]
return update_filter
def get_lowest_timeframe(strategy_settings):
if strategy_settings['one_timeframe_strategy']:
lowest_timeframe = strategy_settings['timeframe_1']
else:
lowest_timeframe = strategy_settings['timeframe_3']
return lowest_timeframe
def ten_min_data_to_higher_timeframe(ten_min_data: List[Dict[str, Any]], timeframe: str) -> List[Dict[str, Any]]:
"""
Convert the ten-minute data to higher timeframe data specified by the timeframe input. Based on the timeframe
input we call the most appropriate function below to consolidate the data. If the desired timeframe is not
1D, 1H, 2H, 4H, 30m, or 10m then we throw an error.
"""
if timeframe[-1] == 'D':
return ten_min_to_daily_data(ten_min_data)
elif timeframe[-1] == 'H':
return ten_min_to_hourly_data(ten_min_data, num_hours=int(timeframe[0]))
elif timeframe[-1] == 'm':
if timeframe[:-1] == '30':
return ten_min_to_thirty_min_data(ten_min_data)
else:
return ten_min_data
else:
raise ValueError(f'Cannot convert ten minute data, timeframe {timeframe} not understood')
def ten_min_to_daily_data(ten_min_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Convert the ten-minute data to daily data. Both the input and output are a list of price data dictionaries.
The daily open price is the open of the first market hour 10-min candle.
The daily close price is the close of the last market hour 10-min candle.
The daily high is the high of the highest market hour 10-min candle.
The daily low is the low of the lowest market hour 10-min candle.
NOTE: for some reason there is not a 10-min candle in a given day with a close price that matches the daily close
when the daily data is pulled directly from Schwab.
"""
unique_dates = np.unique([d['datetime'].date() for d in ten_min_data])
daily_data = []
for date in unique_dates:
data_for_that_day = [d for d in ten_min_data if d['datetime'].date() == date]
day_data = {
'open': data_for_that_day[0]['open'],
'high': float(np.amax([d['high'] for d in data_for_that_day])),
'low': float(np.amin([d['low'] for d in data_for_that_day])),
'close': data_for_that_day[-1]['close'],
'volume': int(np.sum([d['volume'] for d in data_for_that_day])),
'datetime': date
}
daily_data.append(day_data)
return daily_data
def ten_min_to_hourly_data(ten_min_data: List[Dict[str, Any]], num_hours: int = 4) -> List[Dict[str, Any]]:
"""
Convert the ten-minute data to hourly data. The type of hourly candle (1-hour, 2-hours, 4-hours) is
specified by num_hours.
"""
unique_dates = np.unique([d['datetime'].date() for d in ten_min_data])
data_points_per_candle = num_hours*6
hourly_data = []
for date in unique_dates:
data_for_that_day = [d for d in ten_min_data if d['datetime'].date() == date]
num_candles = int(np.ceil(len(data_for_that_day)/data_points_per_candle))
for i in range(num_candles):
if i == num_candles - 1:
data_for_that_candle = data_for_that_day[data_points_per_candle*i:]
else:
data_for_that_candle = data_for_that_day[data_points_per_candle*i:data_points_per_candle*(i + 1)]
candle_data = {
'open': data_for_that_candle[0]['open'],
'high': float(np.amax([d['high'] for d in data_for_that_candle])),
'low': float(np.amin([d['low'] for d in data_for_that_candle])),
'close': data_for_that_candle[-1]['close'],
'volume': int(np.sum([d['volume'] for d in data_for_that_candle])),
'datetime': data_for_that_candle[0]['datetime']
}
hourly_data.append(candle_data)
return hourly_data
def ten_min_to_thirty_min_data(ten_min_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Convert the ten-minute data to 30 minute data.
"""
unique_dates = np.unique([d['datetime'].date() for d in ten_min_data])
data_points_per_candle = 3
thirty_min_data = []
for date in unique_dates:
data_for_that_day = [d for d in ten_min_data if d['datetime'].date() == date]
num_candles = int(np.ceil(len(data_for_that_day)/data_points_per_candle))
for i in range(num_candles):
if i == num_candles - 1:
data_for_that_candle = data_for_that_day[data_points_per_candle*i:]
else:
data_for_that_candle = data_for_that_day[data_points_per_candle*i:data_points_per_candle*(i + 1)]
candle_data = {
'open': data_for_that_candle[0]['open'],
'high': float(np.amax([d['high'] for d in data_for_that_candle])),
'low': float(np.amin([d['low'] for d in data_for_that_candle])),
'close': data_for_that_candle[-1]['close'],
'volume': int(np.sum([d['volume'] for d in data_for_that_candle])),
'datetime': data_for_that_candle[0]['datetime']
}
thirty_min_data.append(candle_data)
return thirty_min_data
# def ten_min_data_to_higher_timeframe_df(ten_min_data, timeframe):
# confirmed_datetime = ten_min_data[-1]['datetime'] + timedelta(minutes=10)
# timeframe_data = ten_min_data_to_higher_timeframe(ten_min_data, timeframe)
# data_df = pd.DataFrame(timeframe_data)
# if timeframe == '1D':
# data_df = data_df[data_df['datetime'] < confirmed_datetime.date()]
# else:
# closetime = data_df['datetime'] + timedelta(minutes=timeframe_to_minutes(timeframe))
# data_df = data_df[closetime <= confirmed_datetime]
# return data_df
def ten_min_data_to_higher_timeframe_df(cur_datetime, ten_min_data, timeframe):
timeframe_data = ten_min_data_to_higher_timeframe(ten_min_data, timeframe)
data_df = pd.DataFrame(timeframe_data)
if timeframe == '1D':
data_df = data_df[data_df['datetime'] < cur_datetime.date()]
else:
closetime = data_df['datetime'] + timedelta(minutes=timeframe_to_minutes(timeframe))
data_df = data_df[closetime <= cur_datetime]
return data_df
def get_high_timeframe_direction(algorithm, timeframe_ref, ticker):
high_timeframe_direction = 0
count = 1
while (high_timeframe_direction == 0) and (count <= algorithm.strategy_setting['sr_lookback']):
if algorithm.sr_data_dict[ticker][timeframe_ref]['SupportPoint'][-count] > 0:
high_timeframe_direction = 1
if algorithm.sr_data_dict[ticker][timeframe_ref]['ResistPoint'][-count] > 0:
high_timeframe_direction = -1
count += 1
return high_timeframe_direction