| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from numpy import average as npAverage
from numpy import nan as npNaN
from numpy import log as npLog
from numpy import power as npPower
from numpy import sqrt as npSqrt
from numpy import zeros_like as npZeroslike
from numba import njit, jit
def zero_crossing(z):
z=z[z!=0]
zero_crossings = np.where(np.diff(np.sign(z)))[0]
if len(zero_crossings)>=1:
return np.sign(z[zero_crossings[::-1][0]+1])
elif len(zero_crossings)==0:
if np.sign(z[-1]) > 0:
return 2
elif np.sign(z[-1]) < 0:
return -2
else:
return 0
else:
return 0
@njit
def jma(close, length=None, phase=None, **kwargs):
"""Jurik Moving Average Average (JMA)
Mark Jurik's Moving Average (JMA) attempts to eliminate noise to see the "true"
underlying activity. It has extremely low lag, is very smooth and is responsive
to market gaps.
Sources:
https://c.mql5.com/forextsd/forum/164/jurik_1.pdf
https://www.prorealcode.com/prorealtime-indicators/jurik-volatility-bands/
Calculation:
Default Inputs:
length=7, phase=0
Args:
close (pd.Series): Series of 'close's
length (int): Period of calculation. Default: 7
phase (float): How heavy/light the average is [-100, 100]. Default: 0
offset (int): How many lengths to offset the result. Default: 0
Kwargs:
fillna (value, optional): pd.DataFrame.fillna(value)
fill_method (value, optional): Type of fill method
Returns:
pd.Series: New feature generated.
"""
# Validate Arguments
_length = int(length) if length and length > 0 else 7
phase = float(phase) if phase and phase != 0 else 0
if close is None: return
# Define base variables
jma = npZeroslike(close)
volty = npZeroslike(close)
v_sum = npZeroslike(close)
ub = npZeroslike(close)
lb = npZeroslike(close)
kv = det0 = det1 = ma2 = 0.0
jma[0] = ma1 = uBand = lBand = close[0]
# Static variables
sum_length = 10
length = 0.5 * (_length - 1)
pr = 0.5 if phase < -100 else 2.5 if phase > 100 else 1.5 + phase * 0.01
length1 = max((npLog(npSqrt(length)) / npLog(2.0)) + 2.0, 0)
pow1 = max(length1 - 2.0, 0.5)
length2 = length1 * npSqrt(length)
bet = length2 / (length2 + 1)
beta = 0.45 * (_length - 1) / (0.45 * (_length - 1) + 2.0)
m = close.shape[0]
for i in range(1, m):
price = close[i]
# Price volatility
del1 = price - uBand
del2 = price - lBand
volty[i] = max(abs(del1),abs(del2)) if abs(del1)!=abs(del2) else 0
# Relative price volatility factor
v_sum[i] = v_sum[i - 1] + (volty[i] - volty[max(i - sum_length, 0)]) / sum_length
avg_volty = npAverage(v_sum[max(i - 65, 0):i + 1])
d_volty = 0 if avg_volty ==0 else volty[i] / avg_volty
r_volty = max(1.0, min(npPower(length1, 1 / pow1), d_volty))
# Jurik volatility bands
pow2 = npPower(r_volty, pow1)
kv = npPower(bet, npSqrt(pow2))
uBand = price if (del1 > 0) else price - (kv * del1)
lBand = price if (del2 < 0) else price - (kv * del2)
ub[i] = uBand
lb[i] = lBand
# Jurik Dynamic Factor
power = npPower(r_volty, pow1)
alpha = npPower(beta, power)
# 1st stage - prelimimary smoothing by adaptive EMA
ma1 = ((1 - alpha) * price) + (alpha * ma1)
# 2nd stage - one more prelimimary smoothing by Kalman filter
det0 = ((price - ma1) * (1 - beta)) + (beta * det0)
ma2 = ma1 + pr * det0
# 3rd stage - final smoothing by unique Jurik adaptive filter
det1 = ((ma2 - jma[i - 1]) * (1 - alpha) * (1 - alpha)) + (alpha * alpha * det1)
jma[i] = jma[i-1] + det1
# Remove initial lookback data and convert to pandas frame
jma[0:_length - 1] = npNaN
ub[0:_length - 1] = npNaN
lb[0:_length - 1] = npNaN
return jma
def dmx(high, low, close, length=10, phase=-100):
cs = close.shift(1).bfill()
tr = np.maximum(cs.values, high.values) - np.minimum(cs.values, low.values)
trs = jma(tr, length=length, phase=phase)
trs[0:length] = 0
up = high - high.shift(1)
dn = low.shift(1) - low
pos = ((up > dn) & (up > 0)) * up
neg = ((dn > up) & (dn > 0)) * dn
pos = pos.fillna(0)
pos_ = jma(pos.values,length=length, phase=phase)
pos_[0:length] = 0
neg = neg.fillna(0)
neg_ = jma(neg.values,length=length, phase=phase)
neg_[0:length] = 0
dip = 100 * pos_ / trs
din = 100 * neg_ / trs
dx = 100 * (dip - din)/(dip + din)
dx[0:length] = 0
#dmx = jma(dx,length=8, phase=-50)
#dmx[0:8] = 0
return dx
def rsx(close, length=14, phase=0):
diff = close.diff()
which_dn = diff < 0
up, dn = diff, diff*0
up[which_dn], dn[which_dn] = 0, -up[which_dn]
up=up.fillna(0)
dn=dn.fillna(0)
emaup = jma(up.values, length=length, phase=phase)
emadn = jma(dn.values, length=length, phase=phase)
rsx = 100 * emaup/(emaup + emadn)
rsx[0:length] = 0
#rsx = jma(rsx,length=8, phase=-50)
#rsx[0:8] = 0
return rsx
def rsx_win(x):
if x < 0:
return 0
elif x > 100:
return 100
else:
return x
# region imports
from AlgorithmImports import *
from analyse import *
from datetime import datetime
# endregion
class NqTrend(QCAlgorithm):
def initialize(self):
self.set_start_date(2025, 6, 5)
self.set_cash(100000)
self.i = 0
self.future = self.add_future(
Futures.Indices.NASDAQ_100_E_MINI,
Resolution.MINUTE,
extended_market_hours=True,
data_normalization_mode=DataNormalizationMode.BACKWARDS_RATIO,
data_mapping_mode = DataMappingMode.LAST_TRADING_DAY,
contract_depth_offset = 0
)
self.future.set_filter(lambda future_filter_universe: future_filter_universe.front_month())
self.df = pd.DataFrame()
def OnEndOfAlgorithm(self):
self.debug(self.i)
self.debug(self.df)
def on_data(self, data: Slice):
self.i = self.i + 1
bar = data.bars.get(self.future.symbol)
if bar:
# if self.i < 10:
# self.debug(f"Current Bar: {bar.symbol} - End Time: {bar.end_time} - Expiry: {bar.symbol.ID.Date}")
if bar.end_time.hour == 8 and bar.end_time.minute >= 31 and bar.end_time.minute <= 59:
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
elif bar.end_time.hour == 9 and bar.end_time.minute <= 30:
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
elif bar.end_time.hour == 9 and bar.end_time.minute >= 31 and bar.end_time.minute <= 59:
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
# Append rows Generate signals
elif (bar.end_time.hour >= 10 and bar.end_time.hour <= 12) or (bar.end_time.hour >= 13 and bar.end_time.hour <= 16):
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
# Close out if any positions (Assuming it crosses vwap or other stop loss); append rows
elif (bar.end_time.time() >= time(hour=11, minute=56) and bar.end_time.time() <= time(hour=12, minute=0)):
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
# Mandatory close out all positions. Append rows
elif bar.end_time.hour == 12 and bar.end_time.minute >= 1 and bar.end_time.minute <= 15:
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
# Append rows Generate signals
elif (bar.end_time.time() >= time(hour=12, minute=16) and bar.end_time.time() <= time(hour=15, minute=55)):
self.df = pd.concat([self.df, pd.DataFrame({'time': bar.end_time, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}, index=[1])], ignore_index=True)
# Close out if any positions (Assuming it crosses vwap or other stop loss). Append rows
elif (bar.end_time.time() >= time(hour=15, minute=56) and bar.end_time.time() <= time(hour=15, minute=59)):
#self.df = pd.DataFrame()
pass # Mandatory close out all positions. Clear all buffers