| Overall Statistics |
|
Total Orders 411 Average Win 8.77% Average Loss -2.80% Compounding Annual Return 55.510% Drawdown 73.400% Expectancy 0.652 Start Equity 1000000.00 End Equity 14524956.74 Net Profit 1352.496% Sharpe Ratio 1.019 Sortino Ratio 0.97 Probabilistic Sharpe Ratio 35.283% Loss Rate 60% Win Rate 40% Profit-Loss Ratio 3.13 Alpha 0.46 Beta 0.271 Annual Standard Deviation 0.475 Annual Variance 0.225 Information Ratio 0.812 Tracking Error 0.489 Treynor Ratio 1.784 Total Fees $3456758.05 Estimated Strategy Capacity $280000.00 Lowest Capacity Asset ETHUSD 2XR Portfolio Turnover 12.23% Drawdown Recovery 1024 |
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 08/01/2026
# # Version ~ ~ ~ 1.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
### Library classes are snippets of code/classes you can reuse between projects. They are
### added to projects on compile.
###
### To import this class use the following import with your values subbed in for the {} sections:
### from {libraryProjectName} import {libraryFileName}
###
### Example using your newly imported library from 'Library.py' like so:
###
### from {libraryProjectName} import Library
### x = Library.add(1,1)
### print(x)
###
import talib as ta
class QCPortfolioRiskManagement():
# init method or constructor
def __init__(self, maximum_drawdown_percent = 0.05):
self.maximum_drawdown_percent = +np.abs(maximum_drawdown_percent)
self.account_leverage = 0
def ControlDrawdownLevel(self, algorithm, dd_total_equity, targets):
self.account_leverage = algorithm.Portfolio.TotalHoldingsValue / algorithm.Portfolio.TotalPortfolioValue
if dd_total_equity[-1] != 0 and self.account_leverage != 0:
diff = float(np.abs(dd_total_equity[-1] - dd_total_equity[-2])) / float(dd_total_equity[-1])
drawdown = float(diff) / float(self.account_leverage)
if drawdown < 1.0 and drawdown > self.maximum_drawdown_percent:
algorithm.debug('DCRiskManagementModel:: (Current)' + "\t" + 'Drawdown:' + "\t" + str(drawdown))
algorithm.debug("DCRiskManagementModel:: Leverage control ahead!")
symbol = targets
algorithm.liquidate([symbol])
def CalcEqDrawDown(self, algorithm, dd_total_equity):
dd_total_equity = np.append(dd_total_equity, algorithm.Portfolio.TotalPortfolioValue)
return dd_total_equity
class QCRSIPortfolioRiskManagement():
# init method or constructor
def __init__(self, rsi_interval = 14):
self.rsi_interval = +np.abs(rsi_interval)
def ControlRSI(self, algorithm, history_data, targets, min_border, max_border, rl_weight):
if len(history_data) > self.rsi_interval:
_data_ = np.array(history_data, float)
rsi = ta.RSI(_data_.ravel(), timeperiod=self.rsi_interval)
rsi = rsi[np.logical_not(np.isnan(rsi))][-1]
else:
return [min_border, max_border, rl_weight]
symbol = targets
curr_holding_leverage = (algorithm.portfolio[symbol].holdings_value / algorithm.Portfolio.TotalPortfolioValue)
if curr_holding_leverage > 2.5:
if rsi > 70:
if algorithm.fast > algorithm.slow:
algorithm.SetHoldings(symbol, +1.30)
elif algorithm.fast < algorithm.slow:
algorithm.SetHoldings(symbol, -1.30)
min_border = -1
max_border = -0
rl_weight -= 0.25
if rl_weight < 0.0:
rl_weight = +1
return [min_border, max_border, rl_weight]
class QCDefaultPortfolioRiskManagement():
# init method or constructor
def __init__(self, take_profit_factor = 0.125):
self.take_profit_factor = +np.abs(take_profit_factor)
self.min_border = -1
self.max_border = -0
self.rl_weight = +1
self.hourly_trade_flag_en = False
self.exit_stop_loss_flag = False
def TakeProfit(self, algorithm, targets):
if algorithm.Portfolio.TotalUnrealizedProfit > 0 and algorithm.Portfolio.TotalUnrealizedProfit >= (self.take_profit_factor *(algorithm.Portfolio.TotalPortfolioValue)):
symbol = targets
algorithm.liquidate()
algorithm.SetHoldings(symbol, 0.00)
algorithm.debug("DCRiskManagementModel:: Take Profit!")
self.min_border = -1
self.max_border = -0
self.rl_weight = +1
self.exit_stop_loss_flag = False
return [self.min_border, self.max_border, self.rl_weight, self.exit_stop_loss_flag]
def ControlLoss(self, algorithm, targets, total_equity, min_border, max_border, rl_weight):
symbol = targets
total_equity = np.append(total_equity, algorithm.Portfolio.TotalPortfolioValue)
if total_equity[-1] != 0:
diff = float(total_equity[-1] - total_equity[-2])/float(total_equity[-1])
if diff < 0 and np.abs(diff) <= 0.25:
algorithm.debug("DCRiskManagementModel:: Do Not Trade==> Not wise trading decision.")
min_border -= 0.25
max_border += 0.25
rl_weight -= 0.25
if rl_weight < 0.0:
rl_weight = +1
self.hourly_trade_flag_en = False
elif diff < 0 and np.abs(diff) > 0.25:
algorithm.liquidate()
algorithm.SetHoldings(symbol, 0.00)
algorithm.debug("DCRiskManagementModel:: Stop Loss!")
min_border = -1
max_border = -0
rl_weight = +1
self.hourly_trade_flag_en = False
else:
self.hourly_trade_flag_en = True
return [min_border, max_border, rl_weight, self.hourly_trade_flag_en]
def AdjustPortfolioManagement(self, algorithm, targets, exit_stop_loss_flag, trading_quantum, pred_bet, hist_bet):
symbol = targets
if exit_stop_loss_flag == True:
if pred_bet < hist_bet and algorithm.fast < algorithm.slow:
algorithm.SetHoldings(symbol, +0.5)
elif pred_bet >= hist_bet and algorithm.fast > algorithm.slow:
algorithm.SetHoldings(symbol, -0.5)
# Calculate the fee adjusted quantity of shares with given buying power
if algorithm.Portfolio.MarginRemaining > (trading_quantum *pred_bet):
quantity = trading_quantum
weight = float(quantity * pred_bet) / float(algorithm.Portfolio.TotalPortfolioValue)
return [True, exit_stop_loss_flag]
else:
algorithm.debug("DCRiskManagementModel:: Do Not Trade==> Not enough free margin.")
if algorithm.Portfolio.TotalUnrealizedProfit < 0 and np.abs(algorithm.Portfolio.TotalUnrealizedProfit) >= (0.125 *(algorithm.Portfolio.TotalPortfolioValue)):
algorithm.liquidate()
algorithm.SetHoldings(symbol, 0.00)
exit_stop_loss_flag = True
algorithm.debug("DCRiskManagementModel:: EXIT - STOP LOSS!")
return [False, exit_stop_loss_flag]# # Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0) # https://creativecommons.org/licenses/by-nc-nd/4.0/ # System.py -- Main API core. # # QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation. # ######################################################## # Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved. # # mpikos@live.com # # Copyright ~ ~ ~ 08/01/2026 # # Version ~ ~ ~ 1.0 # # ~ ~ ~ ######################################################## # region imports from AlgorithmImports import * # endregion # # Fractal dynamics helpers
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 08/01/2026
# # Version ~ ~ ~ 1.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
import numpy as np
def hurst_dfa(prices: np.ndarray, min_window: int = 8, max_window: int = 64) -> float:
"""
Estimate Hurst exponent using a light-weight Detrended Fluctuation Analysis (DFA).
Returns H in ~[0,1]. Interpretation:
H>0.5 trending/persistent, H<0.5 mean-reverting/anti-persistent.
"""
x = np.asarray(prices, dtype=float).ravel()
if x.size < max(max_window, 32):
return 0.5
x = x[np.isfinite(x)]
if x.size < max(max_window, 32):
return 0.5
# Work on log-prices for scale invariance
x = np.log(np.maximum(x, 1e-12))
y = np.cumsum(x - np.mean(x))
# window sizes (log-spaced)
max_window = int(min(max_window, x.size // 4))
min_window = int(max(4, min_window))
if max_window <= min_window:
return 0.5
ws = np.unique(np.floor(np.logspace(np.log10(min_window), np.log10(max_window), 8)).astype(int))
F = []
n = y.size
for w in ws:
if w < 4:
continue
m = n // w
if m < 2:
continue
yy = y[:m*w].reshape(m, w)
t = np.arange(w, dtype=float)
# detrend each segment with linear fit
rms = 0.0
for seg in yy:
p = np.polyfit(t, seg, 1)
trend = p[0]*t + p[1]
rms += np.mean((seg - trend)**2)
rms = np.sqrt(rms / m)
if np.isfinite(rms) and rms > 0:
F.append((w, rms))
if len(F) < 3:
return 0.5
wv = np.log([a for a, _ in F])
fv = np.log([b for _, b in F])
# slope in log-log
H = np.polyfit(wv, fv, 1)[0]
# clamp to a sane range
return float(np.clip(H, 0.05, 0.95))
def higuchi_fd(prices: np.ndarray, kmax: int = 10) -> float:
"""
Higuchi fractal dimension (FD) estimate for a 1D time series.
FD in (1,2). Higher -> rougher/noisier series.
"""
x = np.asarray(prices, dtype=float).ravel()
x = x[np.isfinite(x)]
n = x.size
if n < 32:
return 1.5
kmax = int(max(2, min(kmax, n // 4)))
L = []
k_vals = range(1, kmax + 1)
for k in k_vals:
Lk = 0.0
for m in range(k):
idx = np.arange(m, n, k)
if idx.size < 2:
continue
diff = np.abs(np.diff(x[idx]))
Lm = (np.sum(diff) * (n - 1)) / (idx.size * k)
Lk += Lm
Lk /= k
if np.isfinite(Lk) and Lk > 0:
L.append((k, Lk))
if len(L) < 3:
return 1.5
kv = np.log([a for a, _ in L])
lv = np.log([b for _, b in L])
# slope is -FD
fd = -np.polyfit(kv, lv, 1)[0]
return float(np.clip(fd, 1.05, 1.95))
def fractal_regime(hurst: float) -> str:
"""
Simple regime classifier based on Hurst exponent.
"""
if hurst is None or not np.isfinite(hurst):
return "unknown"
if hurst >= 0.56:
return "trend"
if hurst <= 0.44:
return "mean_revert"
return "noise"
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 08/01/2026
# # Version ~ ~ ~ 1.5
# # ~ ~ ~
########################################################
from System import *
from QuantConnect.Data import *
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
from QuantConnect.Securities import *
from QuantConnect.Data.Consolidators import *
from QuantConnect.Orders import OrderStatus
from QuantConnect import *
from QuantConnect.Orders import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Data.Market import TradeBar
from QuantConnect.DataSource import *
from QuantConnect.Data.UniverseSelection import *
from AlgorithmImports import *
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
import math
import random
import pandas as pd
import numpy as np
from datetime import timedelta, datetime
from datetime import timedelta
from collections import deque
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import talib as ta
from fractal import hurst_dfa, higuchi_fd, fractal_regime
from statistics import mode
from itertools import groupby
import xgboost as xgb
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC, LinearSVC, NuSVC, SVR, NuSVR, OneClassSVM
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn import preprocessing
from collections import Counter
from itertools import groupby
from collections import defaultdict
import sympy
from sympy import *
from sympy.utilities.lambdify import lambdify, implemented_function
from sympy.parsing.sympy_parser import parse_expr
from Library import QCDefaultPortfolioRiskManagement, QCPortfolioRiskManagement, QCRSIPortfolioRiskManagement
class MarketDynamics(QCAlgorithm):
global X
X = 60
global xn
xn = int(float(60)/float(X))
global daily_trades_allowance_quant
daily_trades_allowance_quant = 0
global minute_trades_allowance_quant_thresh
minute_trades_allowance_quant_thresh = 1
global daily_trades_allowance_quant_thresh
daily_trades_allowance_quant_thresh = minute_trades_allowance_quant_thresh *24 *xn
global minute_trades_allowance_quant
minute_trades_allowance_quant = 0
global daily_trade_flag_en
daily_trade_flag_en = False
global hourly_trade_flag_en
hourly_trade_flag_en = False
global bet_arr
bet_arr = [0, 0]
global hist_arr
hist_arr = [0, 0]
global start_equity_amount
start_equity_amount = 1000000
global total_equity
total_equity = [start_equity_amount]
global min_border
min_border = -1
global max_border
max_border = -0
global margins_border
margins_border = [[-0, -1], [+0, +1]]
global trading_quantum
trading_quantum = 0.01
global scaler
global arr1
arr1 = [-1,-1]
global arr2
arr2 = [-1,-1]
global dataset
dataset = [-1]
global stopPriceBuy
stopPriceBuy = -1
global closeness_factor
closeness_factor = 0.05
global normalized
normalized = []
global comp_price
comp_price = []
global projected_price
projected_price = []
global __quantity
__quantity = 0
global day_close_price_arr
day_close_price_arr = [-1, -1]
global high_price_arr
high_price_arr = [-1, -1]
global low_price_arr
low_price_arr = [-1, -1]
global volume_arr
volume_arr = [-1, -1]
global open_price_arr
open_price_arr = [-1, -1]
global close_price_arr
close_price_arr = [-1, -1]
global datetime_arr
datetime_arr = []
global loss_factor
loss_factor = 0.0125
global rsi_interval
rsi_interval = 14
global U
U = 24
global Z
Z = 1
global W
W = 12
global scheduled_event_1
scheduled_event_1 = None
global scheduled_event_2
scheduled_event_2 = None
global scheduled_event_3
scheduled_event_3 = None
global scheduled_event_4
scheduled_event_4 = None
global symbol_minimum_trade_order_size
symbol_minimum_trade_order_size = 0.5
global verbage
verbage = "None"
global t_indicator_arr
t_indicator_arr = []
global delta_prediction_arr
delta_prediction_arr = []
global real
global last_trade_order
last_trade_order = "Null"
global a0_arr
a0_arr = []
global offset
offset = 1000
global sign
sign = +1
global sign_
sign_ = +1
global _sign
_sign = +1
global weekly_stockprices_arr
weekly_stockprices_arr = [[], [], [], []]
global weekly_stockprices_arr1
weekly_stockprices_arr1 = []
global weekly_stockprices_arr2
weekly_stockprices_arr2 = []
global weekly_stockprices_arr3
weekly_stockprices_arr3 = []
global weekly_stockprices_arr4
weekly_stockprices_arr4 = []
global high_price_arr1
high_price_arr1 = []
global high_price_arr2
high_price_arr2 = []
global high_price_arr3
high_price_arr3 = []
global high_price_arr4
high_price_arr4 = []
global low_price_arr1
low_price_arr1 = []
global low_price_arr2
low_price_arr2 = []
global low_price_arr3
low_price_arr3 = []
global low_price_arr4
low_price_arr4 = []
global volume_arr1
volume_arr1 = []
global volume_arr2
volume_arr2 = []
global volume_arr3
volume_arr3 = []
global volume_arr4
volume_arr4 = []
global open_price_arr1
open_price_arr1 = []
global open_price_arr2
open_price_arr2 = []
global open_price_arr3
open_price_arr3 = []
global open_price_arr4
open_price_arr4 = []
global close_price_arr1
close_price_arr1 = []
global close_price_arr2
close_price_arr2 = []
global close_price_arr3
close_price_arr3 = []
global close_price_arr4
close_price_arr4 = []
global day_close_price_arr1
day_close_price_arr1 = []
global day_close_price_arr2
day_close_price_arr2 = []
global day_close_price_arr3
day_close_price_arr3 = []
global day_close_price_arr4
day_close_price_arr4 = []
global ETHUSD_a0
ETHUSD_a0 = 0
global ethusd_a0
ethusd_a0 = 0
global dotusd_a0
dotusd_a0 = 0
global DD_arr
DD_arr = [0.0, 0.0]
global total_equity_
total_equity_ = [start_equity_amount]
global out_DD
out_DD = [-1, -1]
global sign_inp_arr
sign_inp_arr = []
global rsi_intra_daily
rsi_intra_daily = 50
global exp_factor
exp_factor = 1.0
global enable_live_mode
enable_live_mode = True
global take_profit_factor
take_profit_factor = 0.30
global mom_array
mom_array = [[], [], [], []]
global rsi_intra_daily_array
rsi_intra_daily_array = [[], [], [], []]
global exposure_risk_factor
exposure_risk_factor = 0.1
global dayscounter
dayscounter = 0
global shorts_occurences_arr
shorts_occurences_arr = []
global longs_occurences_arr
longs_occurences_arr = []
global new_interval
new_interval = U
global o_direction
o_direction = 0
global close
close = 0.0
global prediction_price_hpc_XGB_arr
prediction_price_hpc_XGB_arr = [0.0]
global prediction_mom_value_arr
prediction_mom_value_arr = [0.0]
global exp_ma_arr
exp_ma_arr = 24 *[0.0]
global rsi
rsi = -1
global ctr_c
global ctr_w
ctr_c = 0
ctr_w = 0
global episode_list
episode_list = []
global sign_arr
sign_arr = []
global exit_stop_loss_flag
exit_stop_loss_flag = False
global dd_total_equity
dd_total_equity = [0]
global rl_weight
rl_weight = +1
global history_hf
history_hf = [0]
FastEmaPeriod = 12
SlowEmaPeriod = 26
global target_dir
target_dir = 0
def main_job(self, pos):
inp = np.array(pos, dtype='float64').ravel()
P = self.lagrangepoly(inp)
#print(P)
return P
global x
global zeropoly
global onepoly
x = sympy.symbols('x')
zeropoly = x - x
onepoly = zeropoly + 1
global result
result = zeropoly
def create_dataset(self, dataset, look_back=1):
dataX, dataY = [], []
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), 0]
dataX.append(a)
dataY.append(dataset[i + look_back, 0])
return np.array(dataX), np.array(dataY)
def scale(self, X, x_min, x_max):
nom = (X-X.min(axis=0))*(x_max-x_min)
denom = X.max(axis=0) - X.min(axis=0)
denom[denom==0] = 1
return x_min + nom/denom
def LSTM(self, dataset, scaler_factor):
np.random.seed(7)
dataset = dataset.reshape(-1, 1)
# split into train and test sets
train_size = int(len(dataset) * 0.67)
test_size = len(dataset) - train_size
train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:]
# reshape into X=t and Y=t+1
look_back = 10
trainX, trainY = self.create_dataset(train, look_back)
testX, testY = self.create_dataset(test, look_back)
# reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1]))
testX = np.reshape(testX, (testX.shape[0], 1, testX.shape[1]))
# create and fit the LSTM network
model = Sequential()
model.add(LSTM(36, input_shape=(1, look_back)))
model.add(Dense(24))
model.add(Dense(12))
model.add(Dense(1))
model.compile(loss='binary_crossentropy', optimizer='adam')
model.fit(trainX, trainY, epochs=100, batch_size=1000, verbose=0)
# make predictions
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)
# invert predictions
trainPredict = scaler_factor.inverse_transform(trainPredict)
trainY = scaler_factor.inverse_transform([trainY])
testPredict = scaler_factor.inverse_transform(testPredict)
testY = scaler_factor.inverse_transform([testY])
# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(trainY[0], trainPredict[:,0]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(testY[0], testPredict[:,0]))
print('Test Score: %.2f RMSE' % (testScore))
return [testPredict[-1, -1]]
# --- helper: convert { -1,0,+1 } -> float and compute a vote score
def _vote_score(self, tot, n1, n2):
# weights: tot carries more information, then n1, n2
return (0.10 * float(tot) + 0.45 * float(n1) + 0.45 * float(n2))
def _clip(self, x, lo, hi):
return max(lo, min(hi, x))
# 2) RSI signals -> produce a signed bias in [-1, +1]
# In mean-revert: RSI<30 => bullish, RSI>70 => bearish (contrarian)
# In trend: RSI<30 => bearish, RSI>70 => bullish (confirmatory / “weakness continues”)
def rsi_bias(self, r):
if r < 30: return +1.0
if r > 70: return -1.0
return 0.0
def lagrangepoly(self, yseq, xseq=None):
"""Build a Lagrange polynomial from a sequence of `y` values.
If no sequence of `x`s is given, use x = 1, 2, ..."""
if xseq is None:
xseq = list(range(1, len(yseq) + 1))
assert len(yseq) == len(xseq)
result = zeropoly
for j, (xj, yj) in enumerate(zip(xseq, yseq)):
# Build the j'th base polynomial
polyj = onepoly
for m, xm in enumerate(xseq):
if m != j:
polyj *= (x - xm) / (xj - xm)
# Add in the j'th polynomial
result += yj * polyj
return sympy.expand(result)
def get_last_invested_unrealized_profit(self) -> float:
last_unrealized_profit = 0.0
if self.portfolio.invested:
last_unrealized_profit = self.portfolio.total_unrealised_profit
return last_unrealized_profit
def TakeProfit(self):
global take_profit_factor
last_unrealized_profit = self.get_last_invested_unrealized_profit()
if last_unrealized_profit >= 2.5 * self.atr.Current.Value * abs(self.Portfolio[self.symbol].Quantity):
for ticker in self.tickers:
self.EmitInsights(
Insight.Price(ticker, timedelta(self.FastEmaPeriod), InsightDirection.Down)
)
self.Liquidate("ETHUSD")
self.SetHoldings("ETHUSD", 0.0)
self.Debug("Take Profit!")
def ControlDrawdownLevel(self):
global total_equity_
global DD_arr
global close
global U
global X
total_equity_ = np.append(total_equity_, self.Portfolio.TotalPortfolioValue)
self.equity_curve._append(pd.Series(close))
if not self.equity_curve.empty:
#self.Plot('Drawdown Percent', 'Current', self.current_drawdown_percent(self.equity_curve.to_numpy()))
current_dd_percent = self.current_drawdown_percent(self.equity_curve.to_numpy())
max_dd_percent = self.max_drawdown_percent(self.equity_curve.to_numpy())
current_dd = current_dd_percent / 100
#self.Debug('Drawdown Percent' + "\t" + '(Current):' + "\t" + str(current_dd))
else:
return
self.account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
if total_equity_[-1] != 0 and self.account_leverage != 0:
diff = float(np.abs(total_equity_[-1] - total_equity_[-2])) / float(total_equity_[-1])
drawdown = float(diff) / float(self.account_leverage)
if np.isclose([drawdown], [current_dd]):
mean_drawdawn = np.mean([drawdown, current_dd])
else:
mean_drawdawn = current_dd
DD_arr = np.append(DD_arr, self.truncate(drawdown, 4))
if DD_arr[-1] != 0.0:
DD_delta = float(np.abs(DD_arr[-1] - DD_arr[-2])) / float(DD_arr[-1])
'''if DD_delta >= 1.25:'''
if drawdown > 0.10:
self.Debug('Drawdown' + "\t" + '(Current):' + "\t" + str(drawdown))
self.Debug("Leverage control ahead!")
for ticker in self.tickers:
self.EmitInsights(
Insight.Price(ticker, timedelta(self.FastEmaPeriod), InsightDirection.Down)
)
self.Liquidate("ETHUSD")
self.SetHoldings("ETHUSD", 0.00)
def current_drawdown_percent(self, equity_curve):
hwm = max(equity_curve)
current = equity_curve[-1]
return (hwm - current) / hwm * 100
def max_drawdown_percent(self, equity_curve):
i = np.argmax(np.maximum.accumulate(equity_curve) - equity_curve)
if equity_curve[:i].size == 0:
return np.nan
j = np.argmax(equity_curve[:i])
return abs((equity_curve[i]/equity_curve[j]) - 1) * 100
def truncate(self, number, decimals=0):
if not isinstance(decimals, int):
raise TypeError("decimal places must be an integer.")
elif decimals < 0:
raise ValueError("decimal places has to be 0 or more.")
elif decimals == 0:
return math.trunc(number)
factor = 10.0 ** decimals
return math.trunc(number * factor) / factor
def AdaptEquityInvestement(self):
global trading_quantum
trading_quantum += 0.001
self.Debug("Month change!")
self.Debug("Portfolio invested cash (current) :" + str(self.Portfolio.TotalMarginUsed))
def GetWeekData(self):
global weekly_stockprices_arr
global weekly_stockprices_arr1
global ETHUSD_a0
weekly_stockprices_arr1 = np.append(weekly_stockprices_arr1, ETHUSD_a0)
weekly_stockprices_arr = [weekly_stockprices_arr1]
def TakeProfit1(self):
global min_border
global max_border
global rl_weight
global exit_stop_loss_flag
[min_border, max_border, rl_weight, exit_stop_loss_flag] = self.obj1.TakeProfit(self, self.symbol)
def CalcEqDrawDown(self):
global dd_total_equity
dd_total_equity = self.obj2.CalcEqDrawDown(self, dd_total_equity)
def ControlEqDrawdownLevel(self):
global dd_total_equity
self.obj2.ControlDrawdownLevel(self, dd_total_equity, self.symbol)
def ControlLoss(self):
global total_equity
global min_border
global max_border
global rl_weight
global hourly_trade_flag_en
[min_border, max_border, rl_weight, hourly_trade_flag_en] = self.obj1.ControlLoss(self, self.symbol, total_equity, min_border, max_border, rl_weight)
def ControlRSI(self):
global history_hf
global min_border
global max_border
global rl_weight
[min_border, max_border, rl_weight] = self.obj3.ControlRSI(self, history_hf, self.symbol, min_border, max_border, rl_weight)
def GetClose(self):
self.Liquidate()
self.SetHoldings("ETHUSD", 0)
self.Log("Liquidating..")
def Initialize(self):
global X
global U
global Z
global W
global scheduled_event_1
global scheduled_event_2
global scheduled_event_3
global scheduled_event_4
global take_profit_factor
self.SetBrokerageModel(BrokerageName.ALPHA_STREAMS)
self.equity_curve = pd.Series()
self.SetStartDate(2020, 1, 1)
self.SetCash(start_equity_amount)
self.tickers = ["ETHUSD"]
self.StartingCapital = self.Portfolio.Cash
self.leverage_targets = 10.0
self.leverage_targets_default = 1.0
self.symbol = self.AddCrypto(self.tickers[0], Resolution.HOUR, fillForward=True, leverage=self.leverage_targets).Symbol
self.targets = "ETHUSD"
self.closeWindow1 = RollingWindow[float](1440)
# --- Fractal dynamics configuration (regime detection & position sizing) ---
# NOTE: These do NOT guarantee any ROI; they help adapt the strategy to trending vs mean-reverting regimes.
self.fractal_window = 512 # lookback (bars) for fractal metrics
self.fractal_kmax = 10 # Higuchi k_max
self.max_abs_holdings = 10.0 # cap target holdings (e.g., 150% notional)
self.fractal_hurst = 0.50
self.fractal_fd = 1.50
self.fractal_regime = "unknown" # {"trend","mean_revert","noise","unknown"}
self.fractal_ready = False
self.entradeflag = True
self.fast = self.EMA(self.symbol, 30, Resolution.DAILY)
self.slow = self.EMA(self.symbol, 60, Resolution.DAILY)
self.atr = self.ATR(self.symbol, 14, MovingAverageType.SIMPLE, Resolution.DAILY)
self.sma_arr = []
self.period = 1
self.ma = 2
self.donchian = self.DCH(self.symbol, self.period, self.period, Resolution.DAILY)
self.sma = self.SMA(self.symbol, self.ma, Resolution.DAILY)
self.obj1 = QCDefaultPortfolioRiskManagement()
self.obj1.__init__(take_profit_factor)
self.obj2 = QCPortfolioRiskManagement()
self.obj2.__init__(0.05)
self.obj3 = QCRSIPortfolioRiskManagement()
self.obj3.__init__(14)
scheduled_event_1 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.ChangeHourFlag)
self.Schedule.On(self.DateRules.WeekStart("ETHUSD"), \
self.TimeRules.At(0, 0, 0), \
self.GetWeekData)
scheduled_event_2 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.ControlDrawdownLevel)
'''
scheduled_event_3 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.TakeProfit)
'''
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(15))), \
self.TakeProfit1)
'''
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.ControlLoss)
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(15))), \
self.ControlRSI)
'''
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.CalcEqDrawDown)
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.ControlEqDrawdownLevel)
self.Schedule.On(self.DateRules.MonthStart("ETHUSD"), \
self.TimeRules.At(0, 0, 0), \
self.AdaptEquityInvestement)
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(9, 31),
Action(self.One))
self.scheduled_event_4 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(hours=(1))), \
self.ReSchedulePolicy)
self.SetWarmup(1440)
self.warmup_period = 1440
consolidator1 = TradeBarConsolidator(timedelta(1))
consolidator1.DataConsolidated += self.OnDailyData
self.SubscriptionManager.AddConsolidator("ETHUSD", consolidator1)
consolidatorm1 = TradeBarConsolidator(60)
consolidatorm1.DataConsolidated += self.OnMinuteData
self.SubscriptionManager.AddConsolidator("ETHUSD", consolidatorm1)
self.daily1 = RollingWindow[TradeBar](2)
self.minute1 = RollingWindow[TradeBar](2)
self.window1 = RollingWindow[TradeBar](2)
self._newHour = False
self._newDay = False
global daily_trade_flag_en
daily_trade_flag_en = False
global hourly_trade_flag_en
hourly_trade_flag_en = False
self.dayswarmup = 60
self._period = (24 *1 *365)
self.conf_swing_level = 0.0
self.mom_arr = [[]]
self.mom_state = 0
self.mom_bear_count = 0
self.mom_bull_count = 0
self.mom_confirm = 3
self.dataframe = [[]]
self.trainsignaldatabase = [[]]
self.deltapricesdatabase = [[]]
self.trainsignaldatabase_resched = []
self.entry_func_flag = False
self.entry_func_flag1 = False
self.hour_ctr = 0
self.hour_ctr1 = 0
self.confidence = 0
self.in_stock = 0
self.last_trade_time = self.Time
self.last_trade_time1 = {}
self.position_entry_time = {}
self.position_best_spread = {}
self.position_entry_prices = {}
self.position_max_net_pnl = {}
self.trade_cooldown_hours = 6
self.min_trade_interval = timedelta(minutes=90)
self.fractal_state = {"regime": "noise"}
self.eth_prices = []
self.is_bull_market = True
self.symbols = {}
for ticker in self.tickers:
try:
self.symbols[ticker] = (self.symbol)
except Exception as e:
self.log(f"Could not add {ticker}: {str(e)}. Skipping.")
self.MAX_PORTFOLIO_ALLOCATION = 0.10
self.MAX_POSITION_SIZE_USD = 1000.0
def One(self):
if not (self.window1.IsReady and self.daily1.IsReady and self.minute1.IsReady): return
currBar1 = self.window1[0].Close
yesterdayc1 = self.daily1[1].Close
minuteBarC1 = self.minute1[1].Close
minuteBar01 = self.minute1[1].Open
def OnDailyData(self, sender, bar):
self.daily1.Add(bar)
def OnMinuteData(self, sender, bar):
self.minute1.Add(bar)
def OnEndOfDay(self):
self._newDay = True
def ChangeHourFlag(self):
global new_interval
if self.hour_ctr == new_interval:
self._newHour = False
self.hour_ctr = 0
self.entry_func_flag = True
self.GetClose()
self.hour_ctr += 1
def ReSchedulePolicy(self):
global U
global X
global min_border
global max_border
global shorts_occurences_arr
global longs_occurences_arr
global new_interval
global o_direction
global target_dir
if self.hour_ctr1 == new_interval:
self.entry_func_flag = True
self.hour_ctr1 = 0
self.hour_ctr1 += 1
self._newHour = True
if self.entry_func_flag == False:
return
self._newHour = False
signaltraindata = self.trainsignaldatabase_resched
n = 100
if len(signaltraindata) >= (self.warmup_period - 1):
d = defaultdict(list)
for k, v in groupby(signaltraindata):
d[k].append(len([*v]))
for iindex in range(len(d[0][:])):
shorts_occurences_arr = np.append(shorts_occurences_arr, d[0][iindex])
for iindex in range(len(d[1][:])):
longs_occurences_arr = np.append(longs_occurences_arr, d[1][iindex])
shorts_occurences_arr_l = np.sort(shorts_occurences_arr)
shorts_occurences_arr_l = shorts_occurences_arr_l[-n:]
if len(shorts_occurences_arr_l) >= (n - 1):
scaler = MinMaxScaler(feature_range=tuple(np.sort([min_border, max_border]).reshape(1, -1)[0]))
scaler = scaler.fit(shorts_occurences_arr_l.reshape(-1, 1))
short_momentum_future_duration = int(self.LSTM(shorts_occurences_arr_l, scaler)[0])
new_interval1 = np.abs(int(short_momentum_future_duration))
else:
o_direction = 0
new_interval1 = 0
return
longs_occurences_arr_l = np.sort(longs_occurences_arr)
longs_occurences_arr_l = longs_occurences_arr_l[-n:]
if len(longs_occurences_arr_l) >= (n - 1):
scaler = MinMaxScaler(feature_range=tuple(np.sort([min_border, max_border]).reshape(1, -1)[0]))
scaler = scaler.fit(longs_occurences_arr_l.reshape(-1, 1))
long_momentum_future_duration = int(self.LSTM(longs_occurences_arr_l, scaler)[0])
new_interval2 = np.abs(int(long_momentum_future_duration))
else:
o_direction = 0
new_interval2 = 0
return
if new_interval1 >= new_interval2:
new_interval = new_interval1
o_direction = +1
else:
new_interval = new_interval2
o_direction = -1
if new_interval <= 0 or new_interval < 10:
return
else:
return
self.Debug("Current (re-Scheduled) trade date:" + "\t" + str(self.Time))
self.Debug("Next Run (After):" + "\t" + str(new_interval) + "\t" + "minute(s)")
if o_direction == +1:
confidence_score = float(np.round(np.interp(self.confidence, [+0.0, +1.0], [0, 100])))
if confidence_score > 85.0:
self._set_fractal_holdings(+1, self._regime_boost())
order_direction = "buy"
order_confidence = str(confidence_score)
weight_interval = str(new_interval)
self.Debug("Buy Long order" + "\t" + "(Confidence lvl::" + "\t" + str(order_confidence) + ")")
else:
self._set_fractal_holdings(+1, 1.0)
self.last_trade_time = self.Time
elif o_direction == -1:
confidence_score = float(np.round(np.interp(self.confidence, [+0.0, +1.0], [0, 100])))
if confidence_score > 85.0:
self._set_fractal_holdings(-1, self._regime_boost())
order_direction = "sell"
order_confidence = str(confidence_score)
weight_interval = str(new_interval)
self.Debug("Sell Short order" + "\t" + "(Confidence lvl::" + "\t" + str(order_confidence) + ")")
else:
self._set_fractal_holdings(-1, 1.0)
self.last_trade_time = self.Time
self.Schedule.Remove(self.scheduled_event_4)
self.scheduled_event_4 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(days=(int(new_interval)))), \
self.ReSchedulePolicy)
if self.hour_ctr1 < new_interval:
self.entry_func_flag = False
def _update_momentum_regime(self, mom_value: float):
"""Track momentum regime with hysteresis (N confirmations)."""
try:
v = float(mom_value)
except Exception:
return
if not np.isfinite(v):
return
if v < 0:
self.mom_bear_count += 1
self.mom_bull_count = 0
elif v > 0:
self.mom_bull_count += 1
self.mom_bear_count = 0
else:
self.mom_bear_count = max(0, self.mom_bear_count - 1)
self.mom_bull_count = max(0, self.mom_bull_count - 1)
if self.mom_bear_count >= self.mom_confirm:
self.mom_state = -1
elif self.mom_bull_count >= self.mom_confirm:
self.mom_state = +1
# Diagnostics
#self.Plot("Regime", "MOM", float(v))
#self.Plot("Regime", "MOM_State", float(self.mom_state))
# 2) RSI signals -> produce a signed bias in [-1, +1]
# In mean-revert: RSI<30 => bullish, RSI>70 => bearish (contrarian)
# In trend: RSI<30 => bearish, RSI>70 => bullish (confirmatory / “weakness continues”)
def _update_market_regime1(self, data):
"""Detect bull/bear market based on ETHUSD 45-day momentum"""
eth_symbol = self.symbols.get("ETHUSD")
if eth_symbol is None:
return
sym_a = eth_symbol
if not data.contains_key(sym_a):
return
bar = data[sym_a]
if bar is None:
return
eth_price = bar.close if hasattr(bar, 'close') else bar.price
self.eth_prices.append(eth_price)
# Keep last 45 days (1080 hourly bars = 45 days * 24 hours)
if len(self.eth_prices) > 1080:
self.eth_prices.pop(0)
# Need at least 1080 bars (45 days) to determine regime
if len(self.eth_prices) < 1080:
return
# Calculate momentum: current vs 45 days ago
momentum = (self.eth_prices[-1] - self.eth_prices[0]) / self.eth_prices[0]
# Bull market: price up > 5% over 45 days
# Bear market: price down or flat
was_bull = self.is_bull_market
self.is_bull_market = momentum > 0.05
if was_bull != self.is_bull_market:
regime = "BULL" if self.is_bull_market else "BEAR"
self.log(f"*** REGIME CHANGE: Now in {regime} MARKET (45d momentum: {momentum*100:.1f}%) ***")
# ---------------- Fractal dynamics helpers ----------------
def _update_fractal_state(self, price_series: np.ndarray):
"""Update rolling fractal metrics on a 1D array of prices (chronological)."""
try:
if price_series is None or len(price_series) < int(self.fractal_window):
self.fractal_ready = False
return
# Defensive: remove NaNs/Infs
s = np.asarray(price_series, dtype=float)
s = s[np.isfinite(s)]
if len(s) < int(self.fractal_window):
self.fractal_ready = False
return
# Use last window only
s = s[-int(self.fractal_window):]
self.fractal_hurst = float(hurst_dfa(s))
self.fractal_fd = float(higuchi_fd(s, kmax=int(self.fractal_kmax)))
self.fractal_regime = str(fractal_regime(self.fractal_hurst))
self.fractal_ready = True
except Exception as e:
# Never break trading loop due to diagnostics
self.fractal_ready = False
self.fractal_regime = "unknown"
def _regime_boost(self) -> float:
"""Map regime -> multiplicative exposure boost."""
if not getattr(self, "fractal_ready", False):
return 0.75
r = getattr(self, "fractal_regime", "unknown")
if r == "trend":
return 1.15
if r == "mean_revert":
return 0.95
if r == "noise":
return 0.65
return 0.75
def _set_fractal_holdings(self, direction: int, base_strength: float):
"""Safe wrapper around SetHoldings with inverse-volatility sizing.
direction: +1 long, -1 short
base_strength: 0..1 scales the target exposure (confidence)
"""
try:
if not hasattr(self, "atr") or self.atr is None or not self.atr.IsReady:
return
price = float(self.Securities[self.symbol].Price)
if price <= 0:
return
atr_val = float(self.atr.Current.Value)
if atr_val <= 0:
return
# Inverse-volatility position sizing (de-risk when ATR spikes)
vol = atr_val / price # fraction of price
risk_budget = 0.015 # tune 0.5%..2.0%
size = risk_budget / max(vol, 1e-4)
# Clamp to avoid oversized targets
size = min(size, 0.30)
strength = self._clip(float(base_strength), 0.0, 1.0)
target = float(direction) * float(size) * float(strength)
# Global cap (SetHoldings target fraction)
target = max(-float(self.max_abs_holdings), min(float(self.max_abs_holdings), target))
data = self.data
for ticker, (self.symbol) in self.symbols.items():
if not (data.contains_key(self.symbol)):
continue
bar_a = data[self.symbol]
if bar_a is None:
continue
price_a = bar_a.close if hasattr(bar_a, 'close') else bar_a.price
if price_a <= 0:
continue
spread = self.closeWindow1[-1] - self.closeWindow1[0]
spread_pct = abs(spread) / price_a * 100 if price_a > 0 else 0
portfolio_value = self.portfolio.total_portfolio_value
max_allocation = min(self.MAX_PORTFOLIO_ALLOCATION * portfolio_value, self.MAX_POSITION_SIZE_USD)
size_usd = max_allocation
qty = size_usd / price_a
gross_profit = abs(spread) * qty
current_qty_a = self.portfolio[self.symbol].quantity
if abs(current_qty_a) > 0.0001:
continue
if ticker in self.last_trade_time1:
hours_since = (self.time - self.last_trade_time1[ticker]).total_seconds() / 3600
if hours_since < self.trade_cooldown_hours:
continue
if not self.is_bull_market and self.mom_state == -1:
if spread < 0:
security_ethusd = self.Securities[self.symbol]
buying_power = self.Portfolio.MarginRemaining
if buying_power > 0:
self.SetHoldings(self.symbol, +(qty *strength))
else:
security_ethusd = self.Securities[self.symbol]
buying_power = self.Portfolio.MarginRemaining
if buying_power > 0:
self.SetHoldings(self.symbol, -(qty *strength))
elif self.is_bull_market and self.mom_state == +1:
if spread > 0:
security_ethusd = self.Securities[self.symbol]
buying_power = self.Portfolio.MarginRemaining
if buying_power > 0:
self.SetHoldings(self.symbol, -(qty *strength))
else:
security_ethusd = self.Securities[self.symbol]
buying_power = self.Portfolio.MarginRemaining
if buying_power > 0:
self.SetHoldings(self.symbol, +(qty *strength))
current_qty_a = self.portfolio[self.symbol].quantity
if abs(current_qty_a) < 0.0001:
if spread < 0:
security_ethusd = self.Securities[self.symbol]
buying_power = self.Portfolio.MarginRemaining
if buying_power > 0:
self.SetHoldings(self.symbol, +(qty *strength))
self.last_trade_time1[ticker] = self.time
except Exception:
# fall back to flat if anything goes wrong
self.SetHoldings(self.symbol, 0.0)
def OnData(self, data):
global daily_trades_allowance_quant
global daily_trades_allowance_quant_thresh
global minute_trades_allowance_quant
global minute_trades_allowance_quant_thresh
global daily_trade_flag_en
global hourly_trade_flag_en
global bet_arr
global hist_arr
global arr1
global arr2
global min_border
global max_border
global scaler
global margins_border
global dataset
global normalized
global projected_price
global rsi_interval
global __quantity
global total_equity
global day_close_price_arr
global high_price_arr1
global low_price_arr1
global volume_arr1
global datetime_arr
global open_price_arr1
global close_price_arr1
global day_close_price_arr1
global verbage
global DD_arr
global t_indicator_arr
global delta_prediction_arr
global real
global last_trade_order
global a0_arr
global ETHUSD_a0
global offset
global sign
global sign_
global weekly_stockprices_arr
global out_DD
global sign_inp_arr
global rsi_intra_daily
global exp_factor
global no_ctr
global episode_list
global sign_arr
global _sign
global mom_array
global rsi_intra_daily_array
global exposure_risk_factor
global dayscounter
global enable_live_mode
global close
global prediction_price_hpc_XGB_arr
global prediction_mom_value_arr
global exp_ma_arr
global rsi
global x
global zeropoly
global onepoly
global ctr_c
global ctr_w
global result
global trading_quantum
global exit_stop_loss_flag
global history_hf
global rl_weight
global target_dir
self.data = data
for ticker in self.tickers:
if data.ContainsKey(ticker):
if ticker == "ETHUSD":
self.closeWindow1.Add(data["ETHUSD"].Close)
if not self.closeWindow1.IsReady: return
for ticker in self.tickers:
if data.Bars.ContainsKey(ticker):
if ticker == "ETHUSD":
self.window1.Add(data.Bars["ETHUSD"])
if not (self.window1.IsReady and self.daily1.IsReady): return
for ticker in self.tickers:
if data[ticker] is None : return
if not self.donchian.IsReady: return
# Update regime detection
self._update_market_regime1(data)
np.random.seed(7)
if self.closeWindow1[0] >= self.closeWindow1[1]:
signal = 1
else:
signal = 0
self.trainsignaldatabase_resched = np.append(self.trainsignaldatabase_resched, signal)
if self._newDay == True:
daily_trades_allowance_quant = 0
daily_trade_flag_en = True
self._newDay = False
dayscounter += 1
if dayscounter >= self.dayswarmup:
enable_live_mode = True
exposure_risk_factor += 0.1
self.entradeflag = False
if exposure_risk_factor > 4.0:
exposure_risk_factor = 4.0
if len(self.trainsignaldatabase_resched) > self._period:
'''self.trainsignaldatabase_resched = self.trainsignaldatabase_resched[-self.warmup_period:]'''
self.trainsignaldatabase_resched = []
self.Debug("Day change!")
if self._newHour == True:
minute_trades_allowance_quant = 0
hourly_trade_flag_en = True
self._newHour = False
open_1 = np.array(self.daily1[1].Open, float).ravel()
open_price_arr1 = np.append(open_price_arr1, open_1)
open_price_arr1 = open_price_arr1[-self.warmup_period:]
close_1 = np.array(self.daily1[1].Close, float).ravel()
close_price_arr1 = np.append(close_price_arr1, close_1)
close_price_arr1 = close_price_arr1[-self.warmup_period:]
high_1 = self.daily1[1].High
high_price_arr1 = np.append(high_price_arr1, high_1)
high_price_arr1 = high_price_arr1[-self.warmup_period:]
low_1 = self.daily1[1].Low
low_price_arr1 = np.append(low_price_arr1, low_1)
low_price_arr1 = low_price_arr1[-self.warmup_period:]
yesterdayc1 = self.daily1[1].Close
day_close_price_arr1 = np.append(day_close_price_arr1, yesterdayc1)
day_close_price_arr1 = day_close_price_arr1[-self.warmup_period:]
volume1 = self.daily1[1].Volume
volume_arr1 = np.append(volume_arr1, volume1)
volume_arr1 = volume_arr1[-self.warmup_period:]
self.Debug("(Virtual) Hour change!")
else:
return
day_close_price_arr = [day_close_price_arr1]
open_price_arr = [open_price_arr1]
high_price_arr = [high_price_arr1]
low_price_arr = [low_price_arr1]
close_price_arr = [close_price_arr1]
volume_arr = [volume_arr1]
self.Debug("Current trade date:" + "\t" + str(self.Time))
self.Log(str(self.Portfolio.TotalPortfolioValue))
self.closeWindow = [self.closeWindow1]
datetime_arr = np.append(datetime_arr, self.Time.strftime('%Y-%m-%d'))
ticker_idx = 0
for ticker in self.tickers:
tmp_arr_values = []
tmp_arr_values1 = []
tmp_arr_values2 = []
for index in range(1439, 0, -1):
tmp_arr_values = np.append(tmp_arr_values, self.closeWindow[ticker_idx][index])
delta = (self.closeWindow[ticker_idx][(index - 1)] - self.closeWindow[ticker_idx][index])
tmp_arr_values1 = np.append(tmp_arr_values1, delta)
if self.closeWindow[ticker_idx][index] <= self.closeWindow[ticker_idx][(index - 1)]:
signal = 1
else:
signal = 0
tmp_arr_values2 = np.append(tmp_arr_values2, signal)
self.dataframe[ticker_idx] = tmp_arr_values
self.deltapricesdatabase[ticker_idx] = tmp_arr_values1
self.trainsignaldatabase[ticker_idx] = tmp_arr_values2
self.hist = self.dataframe[ticker_idx]
historical_train_data = self.hist
# Get historical equity curves.
history_hf = self.history([self.Symbol("ETHUSD")], timedelta(365), Resolution.DAILY)['close'].unstack(0)
a0_arr = np.append(a0_arr, np.mean(historical_train_data))
stockprices = historical_train_data
upperband, middleband, lowerband = ta.BBANDS(np.array(stockprices).ravel()*100000, timeperiod=90, nbdevup=2, nbdevdn=2,matype=0)
upVal = upperband[-1]/100000
middleVal = middleband[-1]/100000
lowVal = lowerband[-1]/100000
a = np.array([upVal, middleVal, lowVal])
a0 = self.closeWindow[ticker_idx][0]
projected_price = a.flat[np.abs(a - a0).argmin()]
projected_price = projected_price[np.logical_not(np.isnan(projected_price))]
if len(day_close_price_arr) >= 14:
real1 = ta.AD(np.array(high_price_arr[ticker_idx][-14:], float).ravel(), np.array(low_price_arr[ticker_idx][-14:], float).ravel(), np.array(day_close_price_arr[ticker_idx][-14:], float).ravel(), np.array(volume_arr[ticker_idx][-14:], float).ravel())
real1 = np.mean(real1[np.logical_not(np.isnan(real1))])
else:
real1 = -1.0
if len(day_close_price_arr) >= 14:
real2 = ta.MFI(high_price_arr[ticker_idx], low_price_arr[ticker_idx], close_price_arr[ticker_idx], volume_arr[ticker_idx], timeperiod=14)
real2 = np.mean(real2[np.logical_not(np.isnan(real2))])
else:
real2 = -1
arr1 = np.append(arr1, self.truncate(a0, 4))
bet_arr = np.append(bet_arr, projected_price)
arr2 = np.append(arr2, self.truncate(np.abs(bet_arr[-1]), 4))
close = arr1[-1]
stopPriceBuy = close * .99
stopPriceSell = close * 1.01
if len(stockprices) > rsi_interval:
_data_ = np.array(stockprices, float)
rsi = ta.RSI(_data_.ravel(), timeperiod=rsi_interval)
rsi = rsi[np.logical_not(np.isnan(rsi))][-1]
else:
return
if len(weekly_stockprices_arr[ticker_idx]) > rsi_interval:
_data_ = np.array(weekly_stockprices_arr[ticker_idx], float)
rsi_intra_daily = ta.RSI(_data_.ravel(), timeperiod=rsi_interval)
rsi_intra_daily = rsi_intra_daily[np.logical_not(np.isnan(rsi_intra_daily))][-1]
else:
rsi_intra_daily = 50
rsi_intra_daily_array[ticker_idx] = np.append(rsi_intra_daily_array[ticker_idx], rsi_intra_daily)
# Fractal regime update (uses same close series we use for intra-day RSI)
if len(stockprices) >= int(getattr(self, "fractal_window", 512)):
_fractal_prices = np.array(stockprices[-int(self.fractal_window):], dtype=float)
self._update_fractal_state(
np.array(_fractal_prices)[-self.fractal_window:]
)
real = ta.MOM(np.asarray(day_close_price_arr1), timeperiod=10)
real = np.mean(real[np.logical_not(np.isnan(real))])
self.mom_arr = np.append(self.mom_arr, real)
mom_array[ticker_idx] = np.append(mom_array[ticker_idx], self.mom_arr[-1])
# Momentum regime update (bear => allow shorts more easily)
self._update_momentum_regime(float(real))
File_data = stockprices[-int(self.fractal_window):]
input = []
ctr = False
for index in range(1, len(File_data) - 1, 1):
if index % 2 == 0:
ctr = True
else:
ctr = False
if ctr == False:
if File_data[index+1] > File_data[index]:
input = np.append(input, +1)
elif File_data[index+1] == File_data[index]:
input = np.append(input, 0)
elif File_data[index+1] < File_data[index]:
input = np.append(input, -1)
else:
if File_data[index+1] > File_data[index-1]:
input = np.append(input, +1)
elif File_data[index+1] == File_data[index-1]:
input = np.append(input, 0)
elif File_data[index+1] < File_data[index-1]:
input = np.append(input, -1)
#self.Debug(str(input))
predictor = input[1::2]
evaluator = input[1::3]
ctr = 0
for index in range(0, len(input), 3):
result_ = zeropoly
for pos in range(index, index+3):
result += self.main_job(input[pos:pos+3])
result_ += self.main_job(input[pos:pos+3])
f = sympy.simplify(result)
f_ = sympy.simplify(result_)
#self.Debug(str(f))
# evaluating the expression {f}
prediction_f = np.sign(f_.evalf(subs={x:predictor[ctr]}))
#self.Debug(str(prediction_f))
if prediction_f == evaluator[ctr]:
#self.Debug("CORRECT!")
ctr_c += 1
else:
#self.Debug("wrong...")
ctr_w += 1
ctr += 1
#self.Debug(str(ctr_c) + "\t" + str(ctr_w))
output = []
ctr = False
for index in range(1439, (1439 - 3), -1):
if index % 2 == 0:
ctr = True
else:
ctr = False
if ctr == False:
if self.closeWindow[ticker_idx][index] < self.closeWindow[ticker_idx][(index - 1)]:
output = np.append(output, +1)
elif self.closeWindow[ticker_idx][index] == self.closeWindow[ticker_idx][(index - 1)]:
output = np.append(output, 0)
elif self.closeWindow[ticker_idx][index] > self.closeWindow[ticker_idx][(index - 1)]:
output = np.append(output, -1)
else:
if self.closeWindow[ticker_idx][index] < self.closeWindow[ticker_idx][(index - 2)]:
output = np.append(output, +1)
elif self.closeWindow[ticker_idx][index] == self.closeWindow[ticker_idx][(index - 2)]:
output = np.append(output, 0)
elif self.closeWindow[ticker_idx][index] > self.closeWindow[ticker_idx][(index - 2)]:
output = np.append(output, -1)
for index in range(0, len(output)):
for pos in range(index, index+3):
result += self.main_job(output[pos:pos+3])
f = sympy.simplify(result)
#self.Debug(str(f))
prediction_f_next1 = np.sign(f.evalf(subs={x:output[0]}))
prediction_f_next2 = np.sign(f_.evalf(subs={x:output[0]}))
prediction_f_next_tot = np.sign(np.mean([prediction_f_next1, prediction_f_next2]))
[exit_flag, exit_stop_loss_flag] = self.obj1.AdjustPortfolioManagement(self, self.symbol, exit_stop_loss_flag, trading_quantum, arr2[-1], close)
if exit_flag == False:
return
if hourly_trade_flag_en == True and exit_stop_loss_flag == False:
## PREDICT PRICE MODE ##
#self.Liquidate(self.symbol)
#self.SetHoldings(self.symbol, 0.0)
# --- inside OnData after you have rsi, rsi_intra_daily, prediction_f_next_tot/1/2,
# and you already updated fractal regime (self.fractal_state['regime'])
regime = self.fractal_state.get("regime", "noise") # "trend" | "mean_revert" | "noise"
# 1) Prediction “direction” score in [-1, +1]
pred_score = self._vote_score(prediction_f_next_tot, prediction_f_next1, prediction_f_next2)
pred_score = self._clip(pred_score, -1.0, +1.0)
confidence = abs(pred_score)
self.confidence = confidence
if confidence < 0.45:
return
daily_bias = self.rsi_bias(rsi)
intra_bias = self.rsi_bias(rsi_intra_daily)
if regime == "trend":
# flip contrarian RSI logic to trend-follow (optional but often improves in persistent trends)
daily_bias *= -1.0
intra_bias *= -1.0
# 3) Combine signals (single final score)
# Weight predictions more than RSI in noise/trend; allow RSI more influence in mean-revert
if regime == "mean_revert":
w_pred, w_rsi_d, w_rsi_i = 0.55, 0.30, 0.15
elif regime == "trend":
w_pred, w_rsi_d, w_rsi_i = 0.70, 0.20, 0.10
else: # noise
w_pred, w_rsi_d, w_rsi_i = 0.80, 0.15, 0.05
score = (w_pred * pred_score) + (w_rsi_d * daily_bias) + (w_rsi_i * intra_bias)
score = self._clip(score, -1.0, +1.0)
# 4) Optional: conflict penalty (avoid taking trades when predictions and RSI disagree hard)
conflict = abs(pred_score - (0.5 * daily_bias + 0.5 * intra_bias))
if conflict > 1.25:
score *= 0.35 # dampen aggressiveness
# 5) Convert score -> target exposure
# Deadband reduces churn around 0
deadband = 0.35 if regime == "noise" else 0.20
if abs(score) < deadband:
target_dir = 0
target_mag = 0.0
else:
target_dir = +1 if score > 0 else -1
# map |score| from [deadband..1] -> [min_size..1]
min_size = 0.25 if regime != "noise" else 0.15
target_mag = min_size + (1.0 - min_size) * ((abs(score) - deadband) / (1.0 - deadband))
target_mag = self._clip(target_mag, 0.0, 1.0)
# 6) Cooldown / hysteresis (prevents flip-flopping)
# Initialize once in Initialize():
# self.last_trade_time = self.Time
# self.min_trade_interval = timedelta(minutes=30)
if hasattr(self, "last_trade_time") and (self.Time - self.last_trade_time) < self.min_trade_interval:
# only allow reducing risk during cooldown
current_qty = self.Portfolio[self.symbol].Quantity
if current_qty != 0 and target_dir == 0:
self.SetHoldings(self.symbol, 0.0)
return
# 7) Execute only one final action
if target_dir == 0:
self.SetHoldings(self.symbol, 0.0)
else:
self._set_fractal_holdings(target_dir, target_mag)
self.last_trade_time = self.Time