| Overall Statistics |
|
Total Trades 21616 Average Win 0.00% Average Loss 0.00% Compounding Annual Return -0.291% Drawdown 1.100% Expectancy -0.132 Net Profit -0.581% Sharpe Ratio -0.452 Probabilistic Sharpe Ratio 0.645% Loss Rate 47% Win Rate 53% Profit-Loss Ratio 0.63 Alpha -0.004 Beta 0.023 Annual Standard Deviation 0.004 Annual Variance 0 Information Ratio -0.508 Tracking Error 0.148 Treynor Ratio -0.089 Total Fees $46474.40 Estimated Strategy Capacity $0 Lowest Capacity Asset ES XKGCMV4QK9VL |
#region imports
from AlgorithmImports import *
# from QuantConnect.Api import Api
from datetime import datetime, timedelta, timezone
import numpy as np
import pandas as pd
import warnings
import math
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.colors import LightSource
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
import matplotlib.tri as mtri
from matplotlib import cm
import matplotlib.dates as dates
import dateutil
import random
from random import randrange
warnings.filterwarnings('ignore')
#endregion
PROJECT_ID = 12797570
# one day algo is aa856e5f51a79fd3f57aacdbccb30202
#2k backtest is 6aabfd389d5ce3d779512e4412e16bc9
class EntryModel:
def __init__(self, api: Api, projectId: int = 12797570, backtestId: str = "6aabfd389d5ce3d779512e4412e16bc9"):
"""
Initializes an instance of EntryModel for a given Project ID.
Args:
api (Api): The instance of QuantConnect.Api
projectId (int): The ID of the project the backtest is associated with
backtestId (str): The ID of the backtest to run the entry model analysis for
"""
self.qb = QuantBook()
self.api = api
if projectId is None:
projectId = PROJECT_ID
if projectId is None:
print(f"Please pass a Project ID or assign it to the variable PROJECT_ID in entry_analyzer.py.")
return
self.projectId = projectId
if backtestId is None:
backtestId = sorted([b for b in api.ListBacktests(projectId).Backtests if b.Completed], key=lambda b: b.Created, reverse=True)[0].BacktestId
self.backtestId = backtestId
self.backtest = api.ReadBacktest(projectId, backtestId)
self.backtest_orders = FetchBacktestOrders(api, projectId, backtestId)
self.trades_df = pd.DataFrame([(order.Symbol.Canonical.Value, order.Time, order.Price, order.Quantity)
for order in self.backtest_orders],
columns=['symbol', 'entry_time', 'entry_price', 'quantity'])
self.trades_df.loc[:,'entry_time'] = pd.to_datetime(self.trades_df.entry_time, utc=True) \
.dt.tz_convert('US/Eastern') \
.dt.tz_localize(None)
qb = self.qb
canonical_ticker = self.trades_df.symbol.unique().tolist()[0].lstrip('/')
self.future = qb.AddFuture(canonical_ticker,
Resolution.Tick,
dataNormalizationMode=DataNormalizationMode.BackwardsRatio,
dataMappingMode=DataMappingMode.OpenInterest,
contractDepthOffset=0)
qb.SetStartDate(qb.Time - timedelta(1))
self.future.SetMarketPrice(qb.GetLastKnownPrice(self.future))
self.lastPrice = self.future.Price
if self.future.Price == 0:
prev_trading_day = self.future.Exchange.Hours.GetPreviousTradingDay(qb.Time - timedelta(1))
_end = self.future.Exchange.Hours.GetNextMarketClose(prev_trading_day, False)
_start = _end - timedelta(minutes=1)
lastPrice = qb.History(Tick, self.future.Symbol, _start, _end).lastprice.dropna().iloc[-1]
self.lastPrice = lastPrice
print(f"Entry Model initialized for backtest '{self.backtest.Name}' requested at {self.backtest.Created} with Algorithm ID: {self.backtest.BacktestId}.")
def plot_avg_returns(self,
ignore_fees: bool = False,
fwd_period: timedelta = timedelta(minutes=30),
resolution: Resolution = Resolution.Second,
ignore_overnight_returns: bool = True,
figscale: float = 1.0) -> None:
"""
Visualizes the average forward returns for the given entry model
Args:
ignore_fees (bool): Specifies whether fees should be ignored when computing returns. Defaults to `False`.
fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=10)`.
resolution (Resolution): The data resolution in use when computing returns. Defaults to `Resolution.Second`.
ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`.
figscale (float): The scaling factor of the figure to plot. Default is 1.0.
Returns:
None
"""
trades_df = self.trades_df.copy()
qb = self.qb
if resolution is Resolution.Second:
trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1)
future = self.future
min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice
shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0)
intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period)
for entry_time in trades_df.entry_time]
df_list = []
prev_end = datetime.min
for interval in intervals:
start, end = interval
start = max(start, prev_end)
if abs((end - start).total_seconds()) <= 1:
continue
if resolution is Resolution.Tick:
ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick)
df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade],
columns=['symbol', 'time', 'close'])
if df.empty:
continue
df.loc[:,'time'] = pd.to_datetime(df.time)
df = df.set_index(['symbol', 'time'])
elif resolution is Resolution.Second:
df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False)
if df.empty:
continue
df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0)
df_list.append(df)
prev_end = min(end, df.droplevel(0).index[-1])
if len(df_list) == 0:
print(f"No historical data found for the specified period.")
return
history = pd.concat(df_list, axis=0, sort=True)
history = history.loc[~history.index.duplicated(keep='first')]
if history.empty:
print(f"No historical data found for the specified period.")
return
returns = history.groupby(level=0).close.pct_change().fillna(0)
if ignore_overnight_returns:
s = returns.groupby(returns.index.get_level_values(1).date).head(1)
returns.loc[s.index] = 0
returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True)
paths_long = []
paths_short = []
fees_pct = 0
shortcount = 0
longcount = 0
allcount = 0
yy_shorts = []
zz_shorts = []
max_shorts = []
min_shorts = []
high_shorts = []
minutes = 600
printtrue = True
aa_array = []
bb_array = []
cc_array = []
if not ignore_fees:
scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1)
fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0])
fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor
for trade in trades_df.itertuples():
path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period]
if printtrue == True:
print('entry price',trade.entry_price)
printtrue = False
if path.empty:
continue
if trade.quantity >= 0:
paths_long.append(path.reset_index(drop=True))
else:
paths_short.append(path.reset_index(drop=True)) #path.mul(-1) is default
flag_long = False
flag_short = False
if len(paths_long) > 1:
flag_long = True
paths_df_long = pd.concat(paths_long, axis=1, ignore_index=True).fillna(0)
paths_df_long = pd.concat([pd.DataFrame(np.zeros((1, paths_df_long.columns.size)), columns=paths_df_long.columns), paths_df_long], ignore_index=True, axis=0)
paths_df_long.loc[:,'mean'] = paths_df_long.mean(axis=1)
if ignore_overnight_returns:
min_pct_change /= future.SymbolProperties.ContractMultiplier
paths_df_long.loc[:,'mean'].iloc[1] = paths_df_long.loc[:,'mean'].clip(lower=-min_pct_change, upper=min_pct_change).iloc[1]
paths_df_long.loc[:,'stdev'] = paths_df_long.std(axis=1)
td_idx_long = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_long.index])
paths_df_long.index = td_idx_long.seconds
post_entry_mean_returns_long = paths_df_long.loc[:,'mean'].add(1).cumprod() - fees_pct - 1
post_entry_stdev_long = paths_df_long.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill')
if len(paths_short) > 1:
flag_short = True
paths_df_short = pd.concat(paths_short, axis=1, ignore_index=True).fillna(0)
paths_df_short = pd.concat([pd.DataFrame(np.zeros((1, paths_df_short.columns.size)), columns=paths_df_short.columns), paths_df_short], ignore_index=True, axis=0)
paths_df_short.loc[:,'mean'] = paths_df_short.mean(axis=1)
if ignore_overnight_returns:
min_pct_change /= future.SymbolProperties.ContractMultiplier
paths_df_short.loc[:,'mean'].iloc[1] = paths_df_short.loc[:,'mean'].clip(lower=-min_pct_change,upper=min_pct_change).iloc[1]
paths_df_short.loc[:,'stdev'] = paths_df_short.std(axis=1)
td_idx_short = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_short.index])
paths_df_short.index = td_idx_short.seconds
post_entry_mean_returns_short = paths_df_short.loc[:,'mean'].add(1).cumprod() - fees_pct - 1
post_entry_stdev_short = paths_df_short.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill')
figsize_x, figsize_y = 15*figscale, 10*figscale
fig, ax = plt.subplots(ncols=1, nrows=2, figsize=(figsize_x, figsize_y), sharex=False)
if flag_long:
post_entry_mean_returns_long.plot(ax=ax[0], color='b', lw=2)
post_entry_mean_returns_long.add(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':')
post_entry_mean_returns_long.sub(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':')
if flag_short:
post_entry_mean_returns_short.plot(ax=ax[1], color='b', lw=2)
post_entry_mean_returns_short.add(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':')
post_entry_mean_returns_short.sub(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':')
ax[0].set_title('Trade Direction: Long')
ax[1].set_title('Trade Direction: Short')
ax[1].set_xlabel('Time after entry (mm:ss)')
ax[0].set_xlabel('Time after entry (mm:ss)')
ax[0].set_ylabel('Average return (%)')
ax[1].set_ylabel('Average return (%)')
formatter = matplotlib.ticker.FuncFormatter(format_func)
ax[0].xaxis.set_major_formatter(formatter)
ax[0].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60))
ax[1].xaxis.set_major_formatter(formatter)
ax[1].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60))
fig.subplots_adjust(hspace=.5)
plt.show()
def plot_random_avg_returns(self,
ignore_fees: bool = False,
fwd_period: timedelta = timedelta(minutes=30),
resolution: Resolution = Resolution.Second,
ignore_overnight_returns: bool = True,
figscale: float = 1.0) -> None:
"""
Visualizes the average forward returns for the given entry model
Args:
ignore_fees (bool): Specifies whether fees should be ignored when computing returns. Defaults to `False`.
fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=10)`.
resolution (Resolution): The data resolution in use when computing returns. Defaults to `Resolution.Second`.
ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`.
figscale (float): The scaling factor of the figure to plot. Default is 1.0.
Returns:
None
"""
trades_df = self.trades_df.copy()
qb = QuantBook()
future = qb.AddFuture("ES",
Resolution.Second,
dataNormalizationMode=DataNormalizationMode.BackwardsRatio,
dataMappingMode=DataMappingMode.OpenInterest,
contractDepthOffset=0)
printcount = 0
marketclose = 16*60
d1 = datetime.strptime(str(trades_df.min(axis=0)['entry_time']), '%Y-%m-%d %H:%M:%S.%f').replace(hour=0, minute=0, second=0, microsecond=0) #need to get the hour minutes to 0
d2 = datetime.strptime(str(trades_df.max(axis=0)['entry_time']), '%Y-%m-%d %H:%M:%S.%f').replace(hour=0, minute=0, second=0, microsecond=0)
delta = d2 - d1
#### EXPERIMENTAL ####
day_delta = (delta.days) #+ delta.seconds
trades_df = pd.DataFrame(columns=['symbol', 'entry_time', 'entry_price', 'quantity'])
fwd_minutes = int(fwd_period.total_seconds())/60
print('datetime start',datetime.now())
while printcount < 5000:
random_day = randrange(day_delta)
random_hour = random.randint(9,15)
if random_hour == 9:
random_minute = random.randint(30, 59)
else:
random_minute = random.randint(0, 59)
random_second = random.randint(0, 60)
random_milli = random.randint(0, 999)
day = d1 + timedelta(days=random_day, hours=random_hour, minutes=random_minute, seconds=random_second, milliseconds=random_milli) #day is the random timestamp
hourmin = (int(day.strftime('%H'))*60)+int(day.strftime('%M')) #extract hourmin from timestamp for checks
if np.is_busday(day.strftime("%Y-%m-%d")) == True and hourmin <= marketclose - fwd_minutes:
#print('date chosen is:',day)
start = day
end = start + timedelta(seconds=1)
#ticks = list(qb.History[Tick](future.Symbol, start, end, Resolution.Tick))
#ticks = list(qb.History(future.Symbol, start, end, Resolution.Second))
if not qb.History(future.Symbol, start,end,Resolution.Second).empty:
#print('dataframe is empty')
#continue
#print('timestamp chosen', day)
price = qb.History(future.Symbol, start,end,Resolution.Second).close
if price[0] == 0:
print('error, price is 0!:', day)
#if len(ticks) >0
printcount += 1
trades_df2 = pd.DataFrame([(future.Symbol.Canonical.Value, day, price[0], 0)],
columns=['symbol', 'entry_time', 'entry_price', 'quantity'])
trades_df = trades_df.append(trades_df2) #this will be deprecated in future version
trades_df.loc[:,'entry_time'] = pd.to_datetime(trades_df.entry_time, utc=True) \
.dt.tz_convert('US/Eastern') \
.dt.tz_localize(None)
print('datetime end',datetime.now())
qb = self.qb
if resolution is Resolution.Second:
trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1)
future = self.future
min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice
shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0)
intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period)
for entry_time in trades_df.entry_time]
df_list = []
prev_end = datetime.min
for interval in intervals:
start, end = interval
start = max(start, prev_end)
if abs((end - start).total_seconds()) <= 1:
continue
if resolution is Resolution.Tick:
ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick)
df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade],
columns=['symbol', 'time', 'close'])
if df.empty:
continue
df.loc[:,'time'] = pd.to_datetime(df.time)
df = df.set_index(['symbol', 'time'])
elif resolution is Resolution.Second:
df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False)
if df.empty:
continue
df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0)
df_list.append(df)
prev_end = min(end, df.droplevel(0).index[-1])
if len(df_list) == 0:
print(f"No historical data found for the specified period.")
return
history = pd.concat(df_list, axis=0, sort=True)
history = history.loc[~history.index.duplicated(keep='first')]
if history.empty:
print(f"No historical data found for the specified period.")
return
returns = history.groupby(level=0).close.pct_change().fillna(0)
if ignore_overnight_returns:
s = returns.groupby(returns.index.get_level_values(1).date).head(1)
returns.loc[s.index] = 0
returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True)
paths_long = []
paths_short = []
fees_pct = 0
shortcount = 0
longcount = 0
allcount = 0
yy_shorts = []
zz_shorts = []
max_shorts = []
min_shorts = []
high_shorts = []
minutes = 600
printtrue = True
aa_array = []
bb_array = []
cc_array = []
if not ignore_fees:
scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1)
fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0])
fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor
for trade in trades_df.itertuples():
path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period]
if printtrue == True:
print('entry price',trade.entry_price)
printtrue = False
if path.empty:
continue
if trade.quantity == 0:
paths_long.append(path.reset_index(drop=True))
paths_short.append(path.reset_index(drop=True)) #path.mul(-1) is default
flag_long = False
flag_short = False
if len(paths_long) > 1:
flag_long = True
paths_df_long = pd.concat(paths_long, axis=1, ignore_index=True).fillna(0)
paths_df_long = pd.concat([pd.DataFrame(np.zeros((1, paths_df_long.columns.size)), columns=paths_df_long.columns), paths_df_long], ignore_index=True, axis=0)
paths_df_long.loc[:,'mean'] = paths_df_long.mean(axis=1)
if ignore_overnight_returns:
min_pct_change /= future.SymbolProperties.ContractMultiplier
paths_df_long.loc[:,'mean'].iloc[1] = paths_df_long.loc[:,'mean'].clip(lower=-min_pct_change, upper=min_pct_change).iloc[1]
paths_df_long.loc[:,'stdev'] = paths_df_long.std(axis=1)
td_idx_long = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_long.index])
paths_df_long.index = td_idx_long.seconds
post_entry_mean_returns_long = paths_df_long.loc[:,'mean'].add(1).cumprod() - fees_pct - 1
post_entry_stdev_long = paths_df_long.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill')
#print('paths long first 10',post_entry_mean_returns_long.head(20))
#print('paths max',post_entry_mean_returns_long.max())
if len(paths_short) > 1:
flag_short = True
paths_df_short = pd.concat(paths_short, axis=1, ignore_index=True).fillna(0)
paths_df_short = pd.concat([pd.DataFrame(np.zeros((1, paths_df_short.columns.size)), columns=paths_df_short.columns), paths_df_short], ignore_index=True, axis=0)
paths_df_short.loc[:,'mean'] = paths_df_short.mean(axis=1)
if ignore_overnight_returns:
min_pct_change /= future.SymbolProperties.ContractMultiplier
paths_df_short.loc[:,'mean'].iloc[1] = paths_df_short.loc[:,'mean'].clip(lower=-min_pct_change,upper=min_pct_change).iloc[1]
paths_df_short.loc[:,'stdev'] = paths_df_short.std(axis=1)
td_idx_short = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_short.index])
paths_df_short.index = td_idx_short.seconds
post_entry_mean_returns_short = paths_df_short.loc[:,'mean'].add(1).cumprod() - fees_pct - 1
post_entry_stdev_short = paths_df_short.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill')
figsize_x, figsize_y = 15*figscale, 10*figscale
fig, ax = plt.subplots(ncols=1, nrows=2, figsize=(figsize_x, figsize_y), sharex=False)
if flag_long:
post_entry_mean_returns_long.plot(ax=ax[0], color='b', lw=2)
#post_entry_mean_returns_long.add(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':')
#post_entry_mean_returns_long.sub(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':')
if flag_short:
post_entry_mean_returns_short.plot(ax=ax[1], color='b', lw=2)
#post_entry_mean_returns_short.add(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':')
#post_entry_mean_returns_short.sub(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':')
ax[0].set_title('Trade Direction: Long')
ax[1].set_title('Trade Direction: Short')
ax[1].set_xlabel('Time after entry (mm:ss)')
ax[0].set_xlabel('Time after entry (mm:ss)')
ax[0].set_ylabel('Average return (%)')
ax[1].set_ylabel('Average return (%)')
formatter = matplotlib.ticker.FuncFormatter(format_func)
ax[0].xaxis.set_major_formatter(formatter)
ax[0].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60))
ax[1].xaxis.set_major_formatter(formatter)
ax[1].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60))
ax[0].ticklabel_format(axis='y',style='plain')
ax[1].ticklabel_format(axis='y',style='plain')
#yticklabels = [f"{float(item.get_text()):.3%}" for item in ax[1].get_yticklabels()]
#ax[1].set_yticklabels(yticklabels)
fig.subplots_adjust(hspace=.5)
plt.show()
def plot_3d_avg_returns(self,
ignore_fees: bool = False,
fwd_period: timedelta = timedelta(minutes=10),
resolution: Resolution = Resolution.Second,
ignore_overnight_returns: bool = True,
figscale: float = 1.3) -> None:
"""
Visualizes the average forward returns for the given entry model
Args:
ignore_fees (bool): Specifies whether fees should be ignored when computing returns. Defaults to `False`.
fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=10)`.
resolution (Resolution): The data resolution in use when computing returns. Defaults to `Resolution.Second`.
ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`.
figscale (float): The scaling factor of the figure to plot. Default is 1.0.
Returns:
None
"""
trades_df = self.trades_df.copy()
qb = self.qb
if resolution is Resolution.Second:
trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1)
future = self.future
min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice
shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0)
intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period)
for entry_time in trades_df.entry_time]
df_list = []
prev_end = datetime.min
for interval in intervals:
start, end = interval
start = max(start, prev_end)
if abs((end - start).total_seconds()) <= 1:
continue
if resolution is Resolution.Tick:
ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick)
df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade],
columns=['symbol', 'time', 'close'])
if df.empty:
continue
df.loc[:,'time'] = pd.to_datetime(df.time)
df = df.set_index(['symbol', 'time'])
elif resolution is Resolution.Second:
df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False)
if df.empty:
continue
df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0)
df_list.append(df)
prev_end = min(end, df.droplevel(0).index[-1])
if len(df_list) == 0:
print(f"No historical data found for the specified period.")
return
history = pd.concat(df_list, axis=0, sort=True)
history = history.loc[~history.index.duplicated(keep='first')]
if history.empty:
print(f"No historical data found for the specified period.")
return
returns = history.groupby(level=0).close.pct_change().fillna(0)
if ignore_overnight_returns:
s = returns.groupby(returns.index.get_level_values(1).date).head(1)
returns.loc[s.index] = 0
returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True)
paths_long = []
paths_short = []
fees_pct = 0
shortcount = 0
longcount = 0
allcount = 0
yy_longs = []
zz_longs = []
yy_shorts = []
zz_shorts = []
seconds = 900
printtrue = True
aa_array = []
bb_array = []
cc_array = []
aa2_array = []
bb2_array = []
cc2_array = []
shortpath = 0
path600 = 0
path601 = 0
fpsec = int(fwd_period.total_seconds())
if not ignore_fees:
scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1)
fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0])
fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor
for trade in trades_df.itertuples():
path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period]
allcount += 1
ttime = int(datetime.strftime(trade.entry_time, '%M')) + (int(datetime.strftime(trade.entry_time, '%H'))*60)
zz = trade.entry_time
ztime = (int(datetime.strftime(zz, '%H'))*60*60 +
int(datetime.strftime(zz, '%M'))*60 +
int(datetime.strftime(zz, '%S')))
if path.empty:
continue
if trade.quantity >= 0 and len(path) >= 600 and ttime < 950:
paths_long.append(path.reset_index(drop=True))
yy_long_path = path.add(1).cumprod() - 1
yy_longs.append(yy_long_path.values.tolist()[:fpsec])
zz_longs.append(ztime)
longcount += 1
elif trade.quantity < 0 and len(path) >= 600 and ttime < 950:
paths_short.append(path.mul(-1).reset_index(drop=True))
yy_short_path = path.add(1).cumprod() - 1 #does not include fees
yy_shorts.append(yy_short_path.mul(-1).values.tolist()[:fpsec])
zz_shorts.append(ztime)
shortcount += 1
elif trade.quantity >= 0 and len(path) < 600 and ttime < 950:
shortpath += 1
elif trade.quantity < 0 and len(path) < 600 and ttime < 950:
shortpath += 1
print('count of long trades',longcount)
print('count of short trades',shortcount)
print('count of paths with 600', path600)
print('count of paths < 600', shortpath)
flag_long = False
flag_short = False
yy_longs2 = np.array(yy_longs)
yy = yy_longs2.flatten() #dont need?
zz_longs2 = np.array(zz_longs)
yy_shorts2 = np.array(yy_shorts)
yy = yy_shorts2.flatten() #dont need?
zz_shorts2 = np.array(zz_shorts)
for v in zz_longs2:
lookup = ([])
for d in zz_longs2:
if v - d <= seconds and d - v <= seconds: #can encounter duplicates, if its duplicate it should do each one individually
if np.where(zz_longs2 == d)[0].size > 1:
multilook = np.array(np.where(zz_longs2 == d)[0])
for i in multilook:
lookup.append(i)
else:
lookupx = np.where(zz_longs2 == d)[0] #if single
lookup.append(lookupx)
lookup_np = np.array(lookup) # convert array into numpy array
rows_looked_up_already = []
for r in lookup_np: #Now we have the lookup value for rows, find all matching y columns for those rows
i = r.item() # get number value
if len(yy_longs2[i]) == fpsec: # remove this logic later on
rows_looked_up_already.append(yy_longs2[i])
elif len(yy_longs2[i]) != fpsec:
print('wrong yy_longs2 len',len(yy_longs2[i]))
ox = np.mean(rows_looked_up_already, axis=0) #sum temp rows, axis=0 is default
xx = [1*i for i in range(len(ox))] #create data points for x for every value in y
zz = [v for i in range(len(ox))] #same with timestamp of original trade
aa_array.append(xx)
bb_array.append(ox)
cc_array.append(zz)
a = np.array(aa_array)
b = np.array(bb_array)
c = np.array(cc_array)
aa=a.flatten()
bb=b.flatten()
cc=c.flatten()
for v in zz_shorts2:
lookup = ([])
lookup_new = np.array(lookup)
for d in zz_shorts2:
if v - d <= seconds and d - v <= seconds: #can encounter duplicates, if its duplicate it should do each one individually
if np.where(zz_shorts2 == d)[0].size > 1:
multilook = np.array(np.where(zz_shorts2 == d)[0])
for i in multilook:
lookup.append(i)
else:
lookupx = np.where(zz_shorts2 == d)[0] #if single
lookup.append(lookupx)
np.append(lookup_new,lookupx) #add single to a lookup array
lookup_np = np.array(lookup) # convert array into numpy array
rows_looked_up_already = []
for r in lookup_np: #Now we have the lookup value for rows, find all matching y columns for those rows
i = r.item() # get number value
rows_looked_up_already.append(yy_shorts2[i])
ox = np.mean(rows_looked_up_already, axis=0) #sum temp rows
xx = [1*i for i in range(len(ox))]
zz = [v for i in range(len(ox))]
aa2_array.append(xx)
bb2_array.append(ox)
cc2_array.append(zz)
a2 = np.array(aa2_array)
b2 = np.array(bb2_array)
c2 = np.array(cc2_array)
aa2=a2.flatten()
bb2=b2.flatten()
cc2=c2.flatten()
maxbb = max(bb.max(), bb2.max(), bb.min()*-1, bb2.min()*-1)
######LOGIC TO COUNT ENTRIES OVER TIME
mtime = np.arange(34200, 57600, 60,dtype='int64')
countlong = []
countlong2 = []
for item in mtime:
count = np.where((cc<(item+300))&(cc>(item-300)))[0].size
countx = count/fpsec
countlong.append(item)
countlong2.append(countx)
countshort = []
countshort2 = []
for item in mtime:
count = np.where((cc2<(item+60))&(cc2>(item-60)))[0].size
countx = count/fpsec
countshort.append(item)
countshort2.append(countx)
bbend = bb[::(fpsec-1)]
ccend = cc[::(fpsec-1)]
bb2end = bb2[::(fpsec-1)]
cc2end = cc2[::(fpsec-1)]
cs, bs = zip(*sorted(zip(ccend, bbend)))
cs2, bs2 = zip(*sorted(zip(cc2end, bb2end)))
def HMSFormatter(value, loc):
h = value // 3600
m = (value - h * 3600) // 60
s = value % 60
return "%02d:%02d:%02d" % (h,m,s)
fig = plt.figure(plt.figure(figsize=plt.figaspect(0.5)*3))
ax1 = fig.add_subplot(2,2,1,projection='3d')
ax2 = fig.add_subplot(2,2,2,projection='3d')
ax3 = fig.add_subplot(4,2,5)
ax4 = fig.add_subplot(4,2,6)
ax5 = fig.add_subplot(4,2,7)
ax6 = fig.add_subplot(4,2,8)
major_ticks = np.arange(34200, 57600, 3600)
ax3.plot(countlong,countlong2, color='tab:purple')
ax4.plot(countshort,countshort2, color='tab:purple')
ax5.plot(cs,bs, color='tab:purple')
ax6.plot(cs2,bs2, color='tab:purple')
ax3.set_title('Long Entries Count 300')
ax3.set_xlim([34200, 57600])
ax3.set_xticks(major_ticks)
ax3.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax4.set_title('Short Entries Count 60')
ax4.set_xlim([34200, 57600])
ax4.set_xticks(major_ticks)
ax4.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax5.set_title('Long Final Value')
ax5.set_xlim([34200, 57600])
ax5.set_xticks(major_ticks)
ax5.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax5.set_ylim([maxbb*-1, maxbb])
ax6.set_title('Short Final Value')
ax6.set_xlim([34200, 57600])
ax6.set_xticks(major_ticks)
ax6.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax6.set_ylim([maxbb*-1, maxbb])
ax1.plot_trisurf(aa, cc, bb, cmap=cm.plasma, edgecolor='none') #switch z and y values so the chart looks cleaner
ax1.view_init(elev=41., azim=360)
ax1.set_box_aspect(aspect = (1,3,1))
ax1.set_title('Trade Direction: Long')
ax1.set_xlabel('Seconds after entry')
ax1.set_ylabel('Entry Time',labelpad=20)
ax1.set_zlabel('Percent Gain',labelpad=20)
ax1.set_yticks(major_ticks)
ax1.set_zlim([maxbb*-1, maxbb])
ax1.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
#ax1.invert_xaxis()
ax2.plot_trisurf(aa2, cc2, bb2, cmap=cm.plasma, edgecolor='none')
ax2.view_init(elev=41., azim=360)
ax2.set_box_aspect(aspect = (1,3,1))
ax2.set_title('Trade Direction: Short')
ax2.set_xlabel('Seconds after entry')
ax2.set_ylabel('Entry Time',labelpad=20)
ax2.set_zlabel('Percent Gain',labelpad=20)
ax2.set_yticks(major_ticks)
ax2.set_zlim([maxbb*-1, maxbb])
ax2.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax2.invert_xaxis()
plt.show()
def plot_3d_avg_nomean_returns(self,
ignore_fees: bool = False,
fwd_period: timedelta = timedelta(minutes=10),
resolution: Resolution = Resolution.Second,
ignore_overnight_returns: bool = True,
figscale: float = 1.3) -> None:
"""
Visualizes the average forward returns for the given entry model
Args:
ignore_fees (bool): Specifies whether fees should be ignored when computing returns. Defaults to `False`.
fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=10)`.
resolution (Resolution): The data resolution in use when computing returns. Defaults to `Resolution.Second`.
ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`.
figscale (float): The scaling factor of the figure to plot. Default is 1.0.
Returns:
None
"""
trades_df = self.trades_df.copy()
qb = self.qb
if resolution is Resolution.Second:
trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1)
future = self.future
min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice
shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0)
intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period)
for entry_time in trades_df.entry_time]
df_list = []
prev_end = datetime.min
for interval in intervals:
start, end = interval
start = max(start, prev_end)
if abs((end - start).total_seconds()) <= 1:
continue
if resolution is Resolution.Tick:
ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick)
df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade],
columns=['symbol', 'time', 'close'])
if df.empty:
continue
df.loc[:,'time'] = pd.to_datetime(df.time)
df = df.set_index(['symbol', 'time'])
elif resolution is Resolution.Second:
df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False)
if df.empty:
continue
df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0)
df_list.append(df)
prev_end = min(end, df.droplevel(0).index[-1])
if len(df_list) == 0:
print(f"No historical data found for the specified period.")
return
history = pd.concat(df_list, axis=0, sort=True)
history = history.loc[~history.index.duplicated(keep='first')]
if history.empty:
print(f"No historical data found for the specified period.")
return
returns = history.groupby(level=0).close.pct_change().fillna(0)
if ignore_overnight_returns:
s = returns.groupby(returns.index.get_level_values(1).date).head(1)
returns.loc[s.index] = 0
returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True)
paths_long = []
paths_short = []
fees_pct = 0
shortcount = 0
longcount = 0
allcount = 0
yy_longs = []
zz_longs = []
yy_shorts = []
zz_shorts = []
seconds = 1800 #which means it wont mean here
printtrue = True
aa_array = []
bb_array = []
cc_array = []
aa2_array = []
bb2_array = []
cc2_array = []
shortpath = 0
path600 = 0
path601 = 0
fpsec = int(fwd_period.total_seconds())
if not ignore_fees:
scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1)
fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0])
fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor
for trade in trades_df.itertuples():
path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period]
allcount += 1
ttime = int(datetime.strftime(trade.entry_time, '%M')) + (int(datetime.strftime(trade.entry_time, '%H'))*60)
zz = trade.entry_time
ztime = (int(datetime.strftime(zz, '%H'))*60*60 +
int(datetime.strftime(zz, '%M'))*60 +
int(datetime.strftime(zz, '%S')))
if path.empty:
continue
if trade.quantity >= 0 and len(path) >= 600 and ttime < 950:
paths_long.append(path.reset_index(drop=True))
yy_long_path = path.add(1).cumprod() - 1
yy_longs.append(yy_long_path.values.tolist()[:fpsec])
zz_longs.append(ztime)
longcount += 1
elif trade.quantity < 0 and len(path) >= 600 and ttime < 950:
paths_short.append(path.mul(-1).reset_index(drop=True))
yy_short_path = path.add(1).cumprod() - 1 #does not include fees
yy_shorts.append(yy_short_path.mul(-1).values.tolist()[:fpsec])
zz_shorts.append(ztime)
shortcount += 1
elif trade.quantity >= 0 and len(path) < 600 and ttime < 950:
shortpath += 1
elif trade.quantity < 0 and len(path) < 600 and ttime < 950:
shortpath += 1
print('count of long trades',longcount)
print('count of short trades',shortcount)
print('count of paths with 600', path600)
print('count of paths < 600', shortpath)
flag_long = False
flag_short = False
yy_longs2 = np.array(yy_longs)
yy = yy_longs2.flatten() #dont need?
zz_longs2 = np.array(zz_longs)
yy_shorts2 = np.array(yy_shorts)
yy = yy_shorts2.flatten() #dont need?
zz_shorts2 = np.array(zz_shorts)
for v in zz_longs2:
lookup = ([])
for d in zz_longs2:
if v - d <= seconds and d - v <= seconds: #can encounter duplicates, if its duplicate it should do each one individually
if np.where(zz_longs2 == d)[0].size > 1:
multilook = np.array(np.where(zz_longs2 == d)[0])
for i in multilook:
lookup.append(i)
else:
lookupx = np.where(zz_longs2 == d)[0] #if single
lookup.append(lookupx)
lookup_np = np.array(lookup) # convert array into numpy array
rows_looked_up_already = []
for r in lookup_np: #Now we have the lookup value for rows, find all matching y columns for those rows
i = r.item() # get number value
if len(yy_longs2[i]) == fpsec: # remove this logic later on
rows_looked_up_already.append(yy_longs2[i])
elif len(yy_longs2[i]) != fpsec:
print('wrong yy_longs2 len',len(yy_longs2[i]))
ox = np.mean(rows_looked_up_already, axis=0) #sum temp rows, axis=0 is default
xx = [1*i for i in range(len(ox))] #create data points for x for every value in y
zz = [v for i in range(len(ox))] #same with timestamp of original trade
aa_array.append(xx)
bb_array.append(ox)
cc_array.append(zz)
a = np.array(aa_array)
b = np.array(bb_array)
c = np.array(cc_array)
aa=a.flatten()
bb=b.flatten()
cc=c.flatten()
for v in zz_shorts2:
lookup = ([])
lookup_new = np.array(lookup)
for d in zz_shorts2:
if v - d <= seconds and d - v <= seconds: #can encounter duplicates, if its duplicate it should do each one individually
if np.where(zz_shorts2 == d)[0].size > 1:
multilook = np.array(np.where(zz_shorts2 == d)[0])
for i in multilook:
lookup.append(i)
else:
lookupx = np.where(zz_shorts2 == d)[0] #if single
lookup.append(lookupx)
np.append(lookup_new,lookupx) #add single to a lookup array
lookup_np = np.array(lookup) # convert array into numpy array
rows_looked_up_already = []
for r in lookup_np: #Now we have the lookup value for rows, find all matching y columns for those rows
i = r.item() # get number value
rows_looked_up_already.append(yy_shorts2[i])
ox = np.mean(rows_looked_up_already, axis=0) #sum temp rows
xx = [1*i for i in range(len(ox))]
zz = [v for i in range(len(ox))]
aa2_array.append(xx)
bb2_array.append(ox)
cc2_array.append(zz)
a2 = np.array(aa2_array)
b2 = np.array(bb2_array)
c2 = np.array(cc2_array)
aa2=a2.flatten()
bb2=b2.flatten()
cc2=c2.flatten()
maxbb = max(bb.max(), bb2.max(), bb.min()*-1, bb2.min()*-1)
######LOGIC TO COUNT ENTRIES OVER TIME
mtime = np.arange(34200, 57600, 60,dtype='int64')
countlong = []
countlong2 = []
for item in mtime:
count = np.where((cc<(item+300))&(cc>(item-300)))[0].size
countx = count/fpsec
countlong.append(item)
countlong2.append(countx)
countshort = []
countshort2 = []
for item in mtime:
count = np.where((cc2<(item+60))&(cc2>(item-60)))[0].size
countx = count/fpsec
countshort.append(item)
countshort2.append(countx)
bbend = bb[::(fpsec-1)]
ccend = cc[::(fpsec-1)]
bb2end = bb2[::(fpsec-1)]
cc2end = cc2[::(fpsec-1)]
cs, bs = zip(*sorted(zip(ccend, bbend)))
cs2, bs2 = zip(*sorted(zip(cc2end, bb2end)))
def HMSFormatter(value, loc):
h = value // 3600
m = (value - h * 3600) // 60
s = value % 60
return "%02d:%02d:%02d" % (h,m,s)
fig = plt.figure(plt.figure(figsize=plt.figaspect(0.5)*3))
ax1 = fig.add_subplot(2,2,1,projection='3d')
ax2 = fig.add_subplot(2,2,2,projection='3d')
ax3 = fig.add_subplot(4,2,5)
ax4 = fig.add_subplot(4,2,6)
ax5 = fig.add_subplot(4,2,7)
ax6 = fig.add_subplot(4,2,8)
major_ticks = np.arange(34200, 57600, 3600)
ax3.plot(countlong,countlong2, color='tab:purple')
ax4.plot(countshort,countshort2, color='tab:purple')
ax5.plot(cs,bs, color='tab:purple')
ax6.plot(cs2,bs2, color='tab:purple')
ax3.set_title('Long Entries Count 300')
ax3.set_xlim([34200, 57600])
ax3.set_xticks(major_ticks)
ax3.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax4.set_title('Short Entries Count 60')
ax4.set_xlim([34200, 57600])
ax4.set_xticks(major_ticks)
ax4.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax5.set_title('Long Final Value')
ax5.set_xlim([34200, 57600])
ax5.set_xticks(major_ticks)
ax5.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax5.set_ylim([maxbb*-1, maxbb])
ax6.set_title('Short Final Value')
ax6.set_xlim([34200, 57600])
ax6.set_xticks(major_ticks)
ax6.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax6.set_ylim([maxbb*-1, maxbb])
ax1.plot_trisurf(aa, cc, bb, cmap=cm.plasma, edgecolor='none') #switch z and y values so the chart looks cleaner
ax1.view_init(elev=41., azim=360)
ax1.set_box_aspect(aspect = (1,3,1))
ax1.set_title('Trade Direction: Long')
ax1.set_xlabel('Seconds after entry')
ax1.set_ylabel('Entry Time',labelpad=20)
ax1.set_zlabel('Percent Gain',labelpad=20)
ax1.set_yticks(major_ticks)
ax1.set_zlim([maxbb*-1, maxbb])
ax1.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
#ax1.invert_xaxis()
ax2.plot_trisurf(aa2, cc2, bb2, cmap=cm.plasma, edgecolor='none')
ax2.view_init(elev=41., azim=360)
ax2.set_box_aspect(aspect = (1,3,1))
ax2.set_title('Trade Direction: Short')
ax2.set_xlabel('Seconds after entry')
ax2.set_ylabel('Entry Time',labelpad=20)
ax2.set_zlabel('Percent Gain',labelpad=20)
ax2.set_yticks(major_ticks)
ax2.set_zlim([maxbb*-1, maxbb])
ax2.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter))
ax2.invert_xaxis()
plt.show()
def plot_histogram(self, fwd_period: timedelta = timedelta(minutes=5),
ignore_overnight_returns: bool = True,
figscale: float = 1.0) -> None:
"""
Visualizes the chance to hit a percent away from entry price.
Args:
fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=30)`.
ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`.
figscale (float): The scaling factor of the figure to plot. Default is 1.0.
Returns:
None
"""
trades_df = self.trades_df.copy()
qb = self.qb
trades_df.loc[:,'exit_time'] = trades_df.entry_time + fwd_period
future = self.future
intervals = [*map(tuple, trades_df.loc[:,['entry_time', 'exit_time']].to_numpy().astype('datetime64[ms]').astype(datetime).tolist())]
df_list = []
prev_end = datetime.min
for i, interval in enumerate(intervals):
start, end = interval
start = start.replace(microsecond=0) - timedelta(seconds=1)
end = end.replace(microsecond=0) + timedelta(seconds=1)
start = max(start, prev_end)
if abs((end-start).total_seconds()) <= 1:
continue
ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick)
df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade],
columns=['symbol', 'time', 'price'])
if df.empty:
continue
df.loc[:,'time'] = pd.to_datetime(df.time)
df.set_index(['symbol', 'time'], inplace=True)
df_list.append(df)
prev_end = min(end, df.droplevel(0).index[-1].to_pydatetime().replace(microsecond=0) + timedelta(seconds=1))
data = pd.concat(df_list, axis=0, sort=True)
data = data.loc[~data.index.duplicated(keep='first')]
price_ranges_long = []
price_ranges_short = []
ignore_overnight_returns = True
printonce = True
printsecond = True
printthird = True
printcount = 0
for trade in trades_df.itertuples(): #remove head
path = data.loc[future.Symbol.Value].loc[trade.entry_time:trade.exit_time].pct_change().fillna(0).add(1).cumprod() - 1
if ignore_overnight_returns:
s = path.groupby(path.index.date).head(1)
path.loc[s.index] = 0
if path.empty:
continue
if trade.quantity >= 0:
printcount += 1
price_range_pct = (min(math.ceil(path.min().iloc[0]*2e3)/2e3, 0), max(math.floor(path.max().iloc[0]*2e3)/2e3, 0))
price_ranges_long.append(price_range_pct)
else:
price_range_pct = (min(math.ceil(path.min().iloc[0]*2e3)/2e3, 0), max(math.floor(path.max().iloc[0]*2e3)/2e3, 0))
price_ranges_short.append(price_range_pct)
bins = np.linspace(-.01, .01, 41) #change this too, defaut is (-0.1, .01, 21)
binsarr = [round(i,8) for i in bins.tolist()]
ranges_long_df = pd.DataFrame(price_ranges_long, columns=['low', 'high']).clip(lower=bins[0], upper=bins[-1])
#### NEW ########
ranges_long_flat = ranges_long_df.to_numpy().flatten()
binlocation = [binsarr.index(i.item()) for i in ranges_long_flat]
counts_long = pd.Series(bins[binlocation]).value_counts().sort_index().reindex(bins).fillna(0)
ranges_short_df = pd.DataFrame(price_ranges_short, columns=['low', 'high']).clip(lower=bins[0], upper=bins[-1])
#### NEW ########
ranges_short_flat = ranges_short_df.to_numpy().flatten()
binlocation = [binsarr.index(i.item()) for i in ranges_short_flat]
counts_short = pd.Series(bins[binlocation]).value_counts().sort_index().reindex(bins).fillna(0)
#counts_long = pd.concat([pd.Series(bins[np.searchsorted(bins, ranges_long_df.high.to_numpy(), side='right')-1]),
# pd.Series(bins[np.searchsorted(bins, ranges_long_df.low.to_numpy(), side='right')-1])]) \
# .value_counts().sort_index() \
# .reindex(bins).fillna(0)
#counts_short = pd.concat([pd.Series(bins[np.searchsorted(bins, ranges_short_df.low.to_numpy(), side='right')-1]),
# pd.Series(bins[np.searchsorted(bins, ranges_short_df.high.to_numpy(), side='right')-1])]) \
# .value_counts().sort_index() \
# .reindex(bins).fillna(0)
counts_long.loc[counts_long.index >= 0] = counts_long.loc[counts_long.index >= 0].sort_index(ascending=False).cumsum() #need to subtract a 1 here
counts_long.loc[counts_long.index <= 0] = counts_long.loc[counts_long.index <= 0].cumsum()
print('ascending cumsum',counts_long.loc[counts_long.index <= 0])
counts_long /= counts_long.loc[0]
counts_short.loc[counts_short.index >= 0] = counts_short.loc[counts_short.index >= 0].sort_index(ascending=False).cumsum()
counts_short.loc[counts_short.index <= 0] = counts_short.loc[counts_short.index <= 0].cumsum()
counts_short /= counts_short.loc[0]
#print('counts long v2',counts_long)
##print('counts long',counts_long)
major_ticks = np.linspace(0, 1, 21) ##will this work?
figsize_x, figsize_y = (figscale*15, figscale*10)
fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(figsize_x,figsize_y))
fig.suptitle(f'Chance to hit % away', fontsize=16)
counts_long.plot.barh(ax=ax1, title='Trade Direction: Long', color='tab:purple', width=1, edgecolor='k', lw=.5)
plt.yticks(fontsize=6)
counts_short.plot.barh(ax=ax2, title='Trade Direction: Short', color='tab:purple', width=1, edgecolor='k', lw=.5)
fig.canvas.draw()
##xticklabels = [f"{float(item.get_text()):.0%}" for item in ax1.get_xticklabels()]
yticklabels = [f"{float(item.get_text()):.3%}" for item in ax1.get_yticklabels()] #need to change this too, default is [f"{round(float(item.get_text()),3):.1%}" for item in ax1.get_yticklabels()]
####xticklabels = np.linspace(0, 1, 21)
ax1.set_xticks(major_ticks) # will these work?
ax2.set_xticks(major_ticks)
ax1.set_yticklabels(yticklabels)
ax2.set_yticklabels(yticklabels)
##ax1.set_xticklabels(xticklabels)
##ax2.set_xticklabels(xticklabels)
##ax1.yaxis.label.set_fontsize(6)
#ax1.set_ylabel(fontsize = 6) this wont work
##ax1.set_xticklabels(xlabels)
##ax2.set_xticklabels(xlabels)
fig.subplots_adjust(hspace=.3)
ax1.set_axisbelow(True)
ax2.set_axisbelow(True)
#plt.grid(visible=True, which='major', color='grey', linestyle='--')
plt.yticks(fontsize=6)
ax1.xaxis.grid(lw=.1, color='grey')
ax2.xaxis.grid(lw=.1, color='grey')
plt.show()
def FetchBacktestOrders(api: Api, projectId: int, backtestId: str) -> list:
'''Fetches the orders of a given backtest using QuantConnect.Api
Args:
api (Api): The instance of QuantConnect.Api that is automatically created on start of the research environment
projectId (int): The ID of the project the backtest is associated with
backtestId (str): The ID of the backtest fetch the orders for
Returns:
backtestOrders (list): The list of orders
'''
backtestOrders = []
start = 0
stepsize = 100
end = start + stepsize
while True:
orders = api.ReadBacktestOrders(projectId, backtestId, start, end)
backtestOrders.extend(orders)
start += stepsize
end = start + stepsize
if len(orders) < 100:
break
return backtestOrders
def GetOrderFeeAmount(qb: QuantBook, order: Order) -> float:
'''Computes the order fee amount for a given order
Args:
qb (QuantBook): The QuantBook instance
order (Order): The order to compute the fees for
Returns:
fee_amount (float): The cash amount of the order fee
'''
symbol = order.Symbol
if symbol in qb.Securities.Keys:
security = qb.Securities[symbol]
else:
security = qb.AddFutureContract(order.Symbol)
feeModel = security.FeeModel
submitOrderRequest = SubmitOrderRequest(OrderType.Market, SecurityType.Future, symbol, 1, 0, 0, 0, qb.Time, "", None)
order = Order.CreateOrder(submitOrderRequest)
orderFeeParameters = OrderFeeParameters(security, order)
fee_amount = feeModel.GetOrderFee(orderFeeParameters).Value.Amount
return fee_amount
def format_func(x: float, pos) -> None:
minutes = int((x%3600)//60)
seconds = int(x%60)
return "{:d}:{:02d}".format(minutes, seconds) #region imports
from datetime import timedelta
from AlgorithmImports import *
#endregion
class RetrospectiveTanButterfly(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 9, 17) # Set Start Date
self.SetEndDate(2022, 9, 17)
self.SetCash(1000000000) # Set Strategy Cash
self.symbolData = {}
self.canLong = True
self.canShort = True
#symbol = self.AddSecurity(SecurityType.Future, Futures.Indices.SP500EMini , Resolution.Tick, extendedMarketHours = False, dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0).Symbol
self.contract = self.AddFuture(Futures.Indices.SP500EMini , Resolution.Tick, extendedMarketHours = False, dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0)
symbol = self.contract.Symbol
#symbol.SetFilter(0, 90)
#self.futureSP500 = self.AddFuture(Futures.Indices.SP500EMini, extendedMarketHours = True)
#self.futureGold = self.AddFuture(Futures.Metals.Gold, extendedMarketHours = True)
#future = self.AddFuture(Futures.Indices.SP500EMini , Resolution.Tick, extendedMarketHours = False)
#future.SetFilter(0, 90)
#symbol = future.Symbol
#continuousContract = [Futures.Indices.SP500EMini]
#currentContract = self.Securities[symbol.Mapped]
self.symbolData[symbol] = SymbolData()
self.symbolData[symbol].bidPrice = self.Securities[symbol].BidPrice
self.symbolData[symbol].askPrice = self.Securities[symbol].AskPrice
#symbol.SetFilter(lambda x: x.FrontMonth().OnlyApplyFilterAtMarketOpen())
#self.contracts = [symbol]
#tickers = ["SPY", "QQQ"]
#for ticker in tickers:
# symbol = self.AddEquity(ticker, Resolution.Tick).Symbol
# self.symbolData[symbol] = SymbolData()
# self.symbolData[symbol].bidPrice = self.Securities[symbol].BidPrice
# self.symbolData[symbol].askPrice = self.Securities[symbol].AskPrice
def OnData(self, data):
for symbol, symbolData in self.symbolData.items():
if not data.Ticks.ContainsKey(symbol): continue
#underlying = symbol.Underlying
if self.Time.second == 00 or self.Time.second == 30:
symbolData.buyRollingVolume5 = symbolData.buyRollingVolume4
symbolData.sellRollingVolume5 = symbolData.sellRollingVolume4
symbolData.buyRollingVolume4 = symbolData.buyRollingVolume3
symbolData.sellRollingVolume4 = symbolData.sellRollingVolume3
symbolData.buyRollingVolume3 = symbolData.buyRollingVolume2
symbolData.sellRollingVolume3 = symbolData.sellRollingVolume2
symbolData.buyRollingVolume2 = symbolData.buyRollingVolume1
symbolData.sellRollingVolume2 = symbolData.sellRollingVolume1
symbolData.buyRollingVolume1 = 0
symbolData.sellRollingVolume1 = 0
ticks = data.Ticks[symbol]
for tick in ticks:
if tick.TickType == TickType.Quote:
symbolData.bidPrice = tick.BidPrice if tick.BidPrice != 0 else symbolData.bidPrice
symbolData.askPrice = tick.AskPrice if tick.AskPrice != 0 else symbolData.askPrice
elif tick.TickType == TickType.Trade:
if tick.Price - symbolData.bidPrice > symbolData.askPrice - tick.Price:
symbolData.sellVolume += tick.Quantity
symbolData.sellRollingVolume1 += tick.Quantity
else:
symbolData.buyVolume += tick.Quantity
symbolData.buyRollingVolume1 += tick.Quantity
if (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) - (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) < 5: #and > 5:
#self.Log(f"Can Long volume buy Delta: {symbolData.buyIntraVolume - symbolData.sellIntraVolume}")
self.canLong = True
elif (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) - (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) < 5:
#self.Log(f"Can Short volume sell Delta: {symbolData.sellIntraVolume - symbolData.buyIntraVolume}")
self.canShort = True
if (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) - (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) >= 700 and self.canLong == True:
self.Log(f"volume buy Delta: {(symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) - (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5)}")
self.canLong = False
self.MarketOrder(self.contract.Mapped, 1)
#self.Buy(symbol, 1)
elif (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) - (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) >= 700 and self.canShort == True:
self.Log(f"volume sell Delta: {(symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) - (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5)}")
self.canShort = False
self.MarketOrder(self.contract.Mapped, -1)
#self.Buy(symbol, -1)
def OnEndOfDay(self, symbol):
symbolData = self.symbolData[symbol]
self.Debug(f"{symbol.Value}'s buy volume is {symbolData.buyVolume} and sell volume is {symbolData.sellVolume} for today")
self.Log(f"{symbol.Value}'s buy volume is {symbolData.buyVolume} and sell volume is {symbolData.sellVolume} for today")
symbolData.ClearDay()
class SymbolData:
def __init__(self):
self.buyVolume = 0
self.sellVolume = 0
self.buyRollingVolume1 = 0
self.sellRollingVolume1 = 0
self.buyRollingVolume2 = 0
self.sellRollingVolume2 = 0
self.buyRollingVolume3 = 0
self.sellRollingVolume3 = 0
self.buyRollingVolume4 = 0
self.sellRollingVolume4 = 0
self.buyRollingVolume5 = 0
self.sellRollingVolume5 = 0
self.buyRollingVolume = self.buyRollingVolume1 + self.buyRollingVolume2 + self.buyRollingVolume3 + self.buyRollingVolume4 + self.buyRollingVolume5
self.sellRollingVolume = self.sellRollingVolume1 + self.sellRollingVolume2 + self.sellRollingVolume3 + self.sellRollingVolume4 + self.sellRollingVolume5
self.bidPrice = 0
self.askPrice = 0
self.canShort = True
self.canLong = True
def ClearDay(self):
self.buyVolume = 0
self.sellVolume = 0
def ClearIntra(self):
self.buyIntraVolume = 0
self.sellIntraVolume = 0