| Overall Statistics |
|
Total Orders 9452 Average Win 1.66% Average Loss -0.73% Compounding Annual Return 460.450% Drawdown 97.900% Expectancy 0.338 Start Equity 1000000.00 End Equity 48524547299.20 Net Profit 4852354.730% Sharpe Ratio 4.64 Sortino Ratio 14.559 Probabilistic Sharpe Ratio 100.000% Loss Rate 59% Win Rate 41% Profit-Loss Ratio 2.26 Alpha 2.82 Beta -0.084 Annual Standard Deviation 0.606 Annual Variance 0.368 Information Ratio 4.321 Tracking Error 0.634 Treynor Ratio -33.683 Total Fees $66295430210.89 Estimated Strategy Capacity $120000.00 Lowest Capacity Asset ETHUSD E3 Portfolio Turnover 410.13% Drawdown Recovery 1636 |
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
### Library classes are snippets of code/classes you can reuse between projects. They are
### added to projects on compile.
###
### To import this class use the following import with your values subbed in for the {} sections:
### from {libraryProjectName} import {libraryFileName}
###
### Example using your newly imported library from 'Library.py' like so:
###
### from {libraryProjectName} import Library
### x = Library.add(1,1)
### print(x)
###
import talib as ta
class QCPortfolioRiskManagement():
# init method or constructor
def __init__(self, maximum_drawdown_percent = 0.05):
self.maximum_drawdown_percent = +np.abs(maximum_drawdown_percent)
self.account_leverage = 0
def ControlDrawdownLevel(self, algorithm, dd_total_equity, targets):
self.account_leverage = algorithm.Portfolio.TotalHoldingsValue / algorithm.Portfolio.TotalPortfolioValue
if dd_total_equity[-1] != 0 and self.account_leverage != 0:
diff = float(np.abs(dd_total_equity[-1] - dd_total_equity[-2])) / float(dd_total_equity[-1])
drawdown = float(diff) / float(self.account_leverage)
if drawdown < 1.0 and drawdown > self.maximum_drawdown_percent:
algorithm.debug('DCRiskManagementModel:: (Current)' + "\t" + 'Drawdown:' + "\t" + str(drawdown))
algorithm.debug("DCRiskManagementModel:: Leverage control ahead!")
symbol = targets
algorithm.liquidate([symbol])
algorithm.SetHoldings(symbol, 0.00)
def CalcEqDrawDown(self, algorithm, dd_total_equity):
dd_total_equity = np.append(dd_total_equity, algorithm.Portfolio.TotalPortfolioValue)
return dd_total_equity
class QCRSIPortfolioRiskManagement():
# init method or constructor
def __init__(self, rsi_interval = 14):
self.rsi_interval = +np.abs(rsi_interval)
def ControlRSI(self, algorithm, history_data, targets, min_border, max_border, rl_weight):
if len(history_data) > self.rsi_interval:
_data_ = np.array(history_data, float)
rsi = ta.RSI(_data_.ravel(), timeperiod=self.rsi_interval)
rsi = rsi[np.logical_not(np.isnan(rsi))][-1]
else:
return [min_border, max_border, rl_weight]
symbol = targets
curr_holding_leverage = (algorithm.portfolio[symbol].holdings_value / algorithm.Portfolio.TotalPortfolioValue)
if curr_holding_leverage > 2.5:
if rsi > 70:
if algorithm.fast > algorithm.slow:
algorithm.SetHoldings(symbol, +1.30)
elif algorithm.fast < algorithm.slow:
algorithm.SetHoldings(symbol, -1.30)
min_border = -1
max_border = -0
rl_weight -= 0.25
if rl_weight < 0.0:
rl_weight = +1
return [min_border, max_border, rl_weight]
class QCDefaultPortfolioRiskManagement():
# init method or constructor
def __init__(self, take_profit_factor = 0.125):
self.take_profit_factor = +np.abs(take_profit_factor)
self.min_border = -1
self.max_border = -0
self.rl_weight = +1
self.hourly_trade_flag_en = False
self.exit_stop_loss_flag = False
def TakeProfit(self, algorithm, targets):
if algorithm.Portfolio.TotalUnrealizedProfit > 0 and algorithm.Portfolio.TotalUnrealizedProfit >= (self.take_profit_factor *(algorithm.Portfolio.TotalPortfolioValue)):
symbol = targets
algorithm.liquidate()
algorithm.SetHoldings(symbol, 0.00)
algorithm.debug("DCRiskManagementModel:: Take Profit!")
self.min_border = -1
self.max_border = -0
self.rl_weight = +1
self.exit_stop_loss_flag = False
return [self.min_border, self.max_border, self.rl_weight, self.exit_stop_loss_flag]
def ControlLoss(self, algorithm, targets, total_equity, min_border, max_border, rl_weight):
symbol = targets
total_equity = np.append(total_equity, algorithm.Portfolio.TotalPortfolioValue)
if total_equity[-1] != 0:
diff = float(total_equity[-1] - total_equity[-2])/float(total_equity[-1])
if diff < 0 and np.abs(diff) <= 0.25:
algorithm.debug("DCRiskManagementModel:: Do Not Trade==> Not wise trading decision.")
min_border -= 0.25
max_border += 0.25
rl_weight -= 0.25
if rl_weight < 0.0:
rl_weight = +1
self.hourly_trade_flag_en = False
elif diff < 0 and np.abs(diff) > 0.25:
algorithm.liquidate()
algorithm.SetHoldings(symbol, 0.00)
algorithm.debug("DCRiskManagementModel:: Stop Loss!")
min_border = -1
max_border = -0
rl_weight = +1
self.hourly_trade_flag_en = False
else:
self.hourly_trade_flag_en = True
return [min_border, max_border, rl_weight, self.hourly_trade_flag_en]
def AdjustPortfolioManagement(self, algorithm, targets, exit_stop_loss_flag, trading_quantum, pred_bet, hist_bet):
symbol = targets
if exit_stop_loss_flag == True:
if pred_bet < hist_bet and algorithm.fast < algorithm.slow:
algorithm.SetHoldings(symbol, +0.5)
elif pred_bet >= hist_bet and algorithm.fast > algorithm.slow:
algorithm.SetHoldings(symbol, -0.5)
# Calculate the fee adjusted quantity of shares with given buying power
if algorithm.Portfolio.MarginRemaining > (trading_quantum *pred_bet):
quantity = trading_quantum
weight = float(quantity * pred_bet) / float(algorithm.Portfolio.TotalPortfolioValue)
return [True, exit_stop_loss_flag]
else:
algorithm.debug("DCRiskManagementModel:: Do Not Trade==> Not enough free margin.")
if algorithm.Portfolio.TotalUnrealizedProfit < 0 and np.abs(algorithm.Portfolio.TotalUnrealizedProfit) >= (0.125 *(algorithm.Portfolio.TotalPortfolioValue)):
algorithm.liquidate()
algorithm.SetHoldings(symbol, 0.00)
exit_stop_loss_flag = True
algorithm.debug("DCRiskManagementModel:: EXIT - STOP LOSS!")
return [False, exit_stop_loss_flag]# # Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0) # https://creativecommons.org/licenses/by-nc-nd/4.0/ # System.py -- Main API core. # # QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation. # ######################################################## # Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved. # # mpikos@live.com # # Copyright ~ ~ ~ 04/04/2026 # # Version ~ ~ ~ 4.0 # # ~ ~ ~ ######################################################## # region imports from AlgorithmImports import * # endregion # # Fractal dynamics helpers
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
# ------------------------------------------------------------------------------
# CORE PARAMETERS
# ------------------------------------------------------------------------------
NP_TRIGGER = 0.00000001 # Net profit threshold for execution
ASSET_TYPE = 'crypto' # Options: 'crypto'
# Trading Pairs Configuration
# Format: List of ticker symbols to trade
# The system will look for these pairs on both Exchange1 and Exchange2
TRADING_PAIRS = [
"BTCUSD",
"ETHUSD",
]
# ------------------------------------------------------------------------------
# EXCHANGE/BROKER FEE STRUCTURES (Reference Only)
# ------------------------------------------------------------------------------
# NOTE: Actual fees used come from EXCHANGES config below.
# These are reference values to help you configure exchanges.
# Crypto Exchange Fees (basis points, 1 bps = 0.01%)
# Using MAKER fees for limit orders, not taker fees
CRYPTO_FEES_BPS = {
'KRAKEN': 8, # 0.08% maker fee with volume
'BITFINEX': 10, # 0.10%
'Default': 8
}
# ------------------------------------------------------------------------------
# NETWORK/TRANSACTION COSTS
# ------------------------------------------------------------------------------
SLIPPAGE_BPS = 2 # Base slippage assumption (0.02%)
NETWORK_FEE_ESTIMATE_USD = 0.10 # Generic network fee
MAX_SLIPPAGE_BPS = 20 # Maximum acceptable slippage
# Solana-Specific (for crypto arbitrage on Solana-based DEXs)
JUPITER_FEE_BPS = 10 # Jupiter aggregator fee
OKX_FEE_BPS = 10 # OKX exchange fee
JITO_TIP_LAMPORTS = 10000 # Jito MEV tip (reduced)
SOL_GAS_ESTIMATE_LAMPORTS = 2000 # Transaction gas (reduced)
# ------------------------------------------------------------------------------
# RISK MANAGEMENT DEFAULTS
# ------------------------------------------------------------------------------
MAX_DRAWDOWN_PERCENT = 0.05 # 5% max drawdown
MAX_PORTFOLIO_ALLOCATION = 0.75 # 75% max per trade
MAX_POSITION_SIZE_USD = 1000000.0
MIN_TRADE_SIZE_USD = 100.0
STOP_LOSS_PERCENT = 0.01 # 1% stop loss
TAKE_PROFIT_PERCENT = 0.50 # 50% take profit
# ------------------------------------------------------------------------------
# EXECUTION PARAMETERS
# ------------------------------------------------------------------------------
TRADE_TIMEOUT_SECONDS = 30 # Max time to fill order
MAX_TRADES_PER_HOUR = 100
MAX_OPEN_POSITIONS = 100
VOLATILITY_LOOKBACK_PERIODS = 20
# ------------------------------------------------------------------------------
# STATISTICAL PARAMETERS
# ------------------------------------------------------------------------------
Z_SCORE_THRESHOLD_MIN = 1.5 # Minimum Z-score for signal
Z_SCORE_THRESHOLD_MAX = 3.0 # Maximum Z-score threshold
SPREAD_LOOKBACK_PERIODS = 252 # Number of periods for spread analysis
# ------------------------------------------------------------------------------
# EXCHANGE CONFIGURATIONS (fully generic - change to any exchanges)
# ------------------------------------------------------------------------------
# Supported Markets for crypto: KRAKEN, COINBASE, BINANCE, BITFINEX, GEMINI, etc.
EXCHANGES = {
'Exchange1': {
'name': 'Exchange A', # Descriptive name for logging
'market': 'KRAKEN', # QuantConnect Market enum value
'asset_type': 'crypto', # 'crypto', 'equity', 'forex', 'future'
'fee_bps': 8 # Fee in basis points (8 = 0.08% maker)
},
'Exchange2': {
'name': 'Exchange B',
'market': 'BITFINEX',
'asset_type': 'crypto',
'fee_bps': 10 # 0.10% maker fee
}
}
# Example configurations for other exchange types:
#
# BINANCE vs BITFINEX (Crypto):
# EXCHANGES = {
# 'Exchange1': {'name': 'Binance', 'market': 'BINANCE', 'asset_type': 'crypto', 'fee_bps': 10},
# 'Exchange2': {'name': 'Bitfinex', 'market': 'BITFINEX', 'asset_type': 'crypto', 'fee_bps': 20}
# }
# ------------------------------------------------------------------------------
# LOGGING & MONITORING
# ------------------------------------------------------------------------------
LOG_LEVEL = 'INFO' # Options: DEBUG, INFO, WARNING, ERROR
ENABLE_DETAILED_LOGGING = True
LOG_TRADE_DETAILS = True
LOG_RISK_METRICS = True
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
import numpy as np
def hurst_dfa(prices: np.ndarray, min_window: int = 8, max_window: int = 64) -> float:
"""
Estimate Hurst exponent using a light-weight Detrended Fluctuation Analysis (DFA).
Returns H in ~[0,1]. Interpretation:
H>0.5 trending/persistent, H<0.5 mean-reverting/anti-persistent.
"""
x = np.asarray(prices, dtype=float).ravel()
if x.size < max(max_window, 32):
return 0.5
x = x[np.isfinite(x)]
if x.size < max(max_window, 32):
return 0.5
# Work on log-prices for scale invariance
x = np.log(np.maximum(x, 1e-12))
y = np.cumsum(x - np.mean(x))
# window sizes (log-spaced)
max_window = int(min(max_window, x.size // 4))
min_window = int(max(4, min_window))
if max_window <= min_window:
return 0.5
ws = np.unique(np.floor(np.logspace(np.log10(min_window), np.log10(max_window), 8)).astype(int))
F = []
n = y.size
for w in ws:
if w < 4:
continue
m = n // w
if m < 2:
continue
yy = y[:m*w].reshape(m, w)
t = np.arange(w, dtype=float)
# detrend each segment with linear fit
rms = 0.0
for seg in yy:
p = np.polyfit(t, seg, 1)
trend = p[0]*t + p[1]
rms += np.mean((seg - trend)**2)
rms = np.sqrt(rms / m)
if np.isfinite(rms) and rms > 0:
F.append((w, rms))
if len(F) < 3:
return 0.5
wv = np.log([a for a, _ in F])
fv = np.log([b for _, b in F])
# slope in log-log
H = np.polyfit(wv, fv, 1)[0]
# clamp to a sane range
return float(np.clip(H, 0.05, 0.95))
def higuchi_fd(prices: np.ndarray, kmax: int = 10) -> float:
"""
Higuchi fractal dimension (FD) estimate for a 1D time series.
FD in (1,2). Higher -> rougher/noisier series.
"""
x = np.asarray(prices, dtype=float).ravel()
x = x[np.isfinite(x)]
n = x.size
if n < 32:
return 1.5
kmax = int(max(2, min(kmax, n // 4)))
L = []
k_vals = range(1, kmax + 1)
for k in k_vals:
Lk = 0.0
for m in range(k):
idx = np.arange(m, n, k)
if idx.size < 2:
continue
diff = np.abs(np.diff(x[idx]))
Lm = (np.sum(diff) * (n - 1)) / (idx.size * k)
Lk += Lm
Lk /= k
if np.isfinite(Lk) and Lk > 0:
L.append((k, Lk))
if len(L) < 3:
return 1.5
kv = np.log([a for a, _ in L])
lv = np.log([b for _, b in L])
# slope is -FD
fd = -np.polyfit(kv, lv, 1)[0]
return float(np.clip(fd, 1.05, 1.95))
def fractal_regime(hurst: float) -> str:
"""
Simple regime classifier based on Hurst exponent.
"""
if hurst is None or not np.isfinite(hurst):
return "unknown"
if hurst >= 0.56:
return "trend"
if hurst <= 0.44:
return "mean_revert"
return "noise"
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
from config import NP_TRIGGER
class ProfitResult:
def __init__(self, gross_profit, friction, net_profit_weighted, triggers_execution):
self.gross_profit = gross_profit
self.friction = friction
self.net_profit_weighted = net_profit_weighted
self.triggers_execution = triggers_execution
class FrictionProfitAnalyzer:
def analyze_opportunity(self, vector_id, gross_profit, volume_usd):
friction = volume_usd * 0.0016
net_profit = gross_profit - friction
triggers = net_profit > NP_TRIGGER
return ProfitResult(gross_profit, friction, net_profit, triggers)
class FrictionEvaluator:
pass
#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
from System import *
from QuantConnect.Data import *
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
from QuantConnect.Securities import *
from QuantConnect.Data.Consolidators import *
from QuantConnect.Orders import OrderStatus
from QuantConnect import *
from QuantConnect.Orders import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Data.Market import TradeBar
from QuantConnect.DataSource import *
from QuantConnect.Data.UniverseSelection import *
from AlgorithmImports import *
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
import math
import random
import pandas as pd
import numpy as np
from datetime import timedelta, datetime
from datetime import timedelta
from collections import deque
import talib as ta
from fractal import hurst_dfa, higuchi_fd, fractal_regime
from order_book import SetCustomHoldings
from statistics import mode
from itertools import groupby
from collections import Counter
import sympy
from sympy import *
from sympy.utilities.lambdify import lambdify, implemented_function
from sympy.parsing.sympy_parser import parse_expr
from friction_simple import FrictionProfitAnalyzer
import config
from Library import QCDefaultPortfolioRiskManagement, QCPortfolioRiskManagement, QCRSIPortfolioRiskManagement
class ArbitrageLogic:
def __init__(self):
self.MIN_SPREAD_BPS = config.Z_SCORE_THRESHOLD_MIN
self.MIN_SPREAD_PERCENT = config.NP_TRIGGER * 100
def calculate_arbitrage_opportunity(self, pair: str, spread: float, price_a: float):
"""Calculate if arbitrage opportunity meets thresholds"""
spread_magnitude = abs(spread)
spread_pct = spread_magnitude / price_a if price_a > 0 else 0
if spread_pct < self.MIN_SPREAD_PERCENT / 100:
return None
spread_bps = spread_pct * 10000
if spread_bps < self.MIN_SPREAD_BPS:
return None
return spread_magnitude
class MarketDynamics(QCAlgorithm):
def _check_exits(self, data):
"""Check and execute exits for existing positions"""
for ticker, (sym_a, sym_b) in self.symbols.items():
current_qty_a = self.portfolio[sym_a].quantity
current_qty_b = self.portfolio[sym_b].quantity
# Skip if no position
if abs(current_qty_a) < 0.0001 and abs(current_qty_b) < 0.0001:
continue
# Get current prices
if not (data.contains_key(sym_a) and data.contains_key(sym_b)):
continue
bar_a = data[sym_a]
bar_b = data[sym_b]
if bar_a is None or bar_b is None:
continue
price_a = bar_a.close if hasattr(bar_a, 'close') else bar_a.price
price_b = bar_b.close if hasattr(bar_b, 'close') else bar_b.price
if price_a <= 0 or price_b <= 0:
continue
# Calculate current spread
current_spread = price_b - price_a
current_spread_pct = abs(current_spread) / price_a * 100
# Update best spread seen (lowest spread = highest profit)
if ticker not in self.position_best_spread:
self.position_best_spread[ticker] = current_spread_pct
else:
self.position_best_spread[ticker] = min(self.position_best_spread[ticker], current_spread_pct)
# Exit conditions
should_exit = False
exit_reason = ""
# Get entry prices to calculate realized P&L
if ticker not in self.position_entry_prices:
continue
entry_price_a, entry_price_b, entry_qty = self.position_entry_prices[ticker]
# Calculate realized P&L for this position
# Need to determine which direction we traded
# If qty_a > 0, we bought A and sold B (betting spread would narrow)
# P&L = (price_a - entry_price_a) * qty_a + (entry_price_b - price_b) * qty_b
# Determine trade direction from current positions
if current_qty_a > 0: # Long A, Short B
# Profit when: A goes up OR B goes down OR spread narrows
pnl_leg_a = (price_a - entry_price_a) * abs(current_qty_a)
pnl_leg_b = (entry_price_b - price_b) * abs(current_qty_b)
else: # Short A, Long B
# Profit when: A goes down OR B goes up OR spread narrows
pnl_leg_a = (entry_price_a - price_a) * abs(current_qty_a)
pnl_leg_b = (price_b - entry_price_b) * abs(current_qty_b)
# Gross profit from both legs (can be negative!)
gross_pnl = pnl_leg_a + pnl_leg_b
# Calculate fees for closing (entry fees already paid)
position_value = abs(entry_qty) * price_a
exit_fees = self.profit_analyzer.analyze_opportunity(
f"{ticker}_exit_fees",
gross_pnl,
position_value
).friction
# Net P&L = gross profit - exit fees (entry fees already incurred)
net_pnl = gross_pnl - exit_fees
# Update max net P&L seen
if ticker not in self.position_max_net_pnl:
self.position_max_net_pnl[ticker] = net_pnl
else:
self.position_max_net_pnl[ticker] = max(self.position_max_net_pnl[ticker], net_pnl)
max_net_pnl = self.position_max_net_pnl[ticker]
# 1. Trailing profit exit: Exit when net P&L drops 20% from max
if max_net_pnl > 0: # Only apply if we've been profitable
profit_drawdown_pct = (max_net_pnl - net_pnl) / max_net_pnl * 100
if profit_drawdown_pct > 20: # Lost 20% of max profit
should_exit = True
exit_reason = f"Trailing stop (max ${max_net_pnl:.2f}, current ${net_pnl:.2f}, dropped {profit_drawdown_pct:.1f}%)"
# 2. Time-based exit: Force exit after holding period
if ticker in self.position_entry_time:
hours_held = (self.time - self.position_entry_time[ticker]).total_seconds() / 3600
if hours_held >= self.trade_cooldown_hours:
should_exit = True
exit_reason = f"Time limit (held {hours_held:.1f}h, max ${max_net_pnl:.2f}, current ${net_pnl:.2f})"
if should_exit:
# Close both legs
self.liquidate(sym_a)
self.liquidate(sym_b)
self.log(f"EXIT: {ticker} - {exit_reason} | Final spread: {current_spread_pct:.2f}%")
# Clean up tracking
if ticker in self.position_entry_time:
del self.position_entry_time[ticker]
if ticker in self.position_best_spread:
del self.position_best_spread[ticker]
if ticker in self.position_entry_prices:
del self.position_entry_prices[ticker]
if ticker in self.position_max_net_pnl:
del self.position_max_net_pnl[ticker]
def _update_market_regime(self, data):
"""Detect bull/bear market based on BTC 45-day momentum"""
btc_symbol = self.symbols.get("BTCUSDT")
if btc_symbol is None:
return
sym_a, sym_b = btc_symbol
if not data.contains_key(sym_a):
return
bar = data[sym_a]
if bar is None:
return
btc_price = bar.close if hasattr(bar, 'close') else bar.price
self.btc_prices.append(btc_price)
# Keep last 45 days (1080 hourly bars = 45 days * 24 hours)
if len(self.btc_prices) > 1080:
self.btc_prices.pop(0)
# Need at least 1080 bars (45 days) to determine regime
if len(self.btc_prices) < 1080:
return
# Calculate momentum: current vs 45 days ago
momentum = (self.btc_prices[-1] - self.btc_prices[0]) / self.btc_prices[0]
# Bull market: price up > 5% over 45 days
# Bear market: price down or flat
was_bull = self.is_bull_market
self.is_bull_market = momentum > 0.05
if was_bull != self.is_bull_market:
regime = "BULL" if self.is_bull_market else "BEAR"
self.log(f"*** REGIME CHANGE: Now in {regime} MARKET (45d momentum: {momentum*100:.1f}%) ***")
global X
X = 60
global xn
xn = int(float(60)/float(X))
global daily_trades_allowance_quant
daily_trades_allowance_quant = 0
global minute_trades_allowance_quant_thresh
minute_trades_allowance_quant_thresh = 1
global daily_trades_allowance_quant_thresh
daily_trades_allowance_quant_thresh = minute_trades_allowance_quant_thresh *24 *xn
global minute_trades_allowance_quant
minute_trades_allowance_quant = 0
global daily_trade_flag_en
daily_trade_flag_en = False
global hourly_trade_flag_en
hourly_trade_flag_en = False
global bet_arr
bet_arr = [0, 0]
global hist_arr
hist_arr = [0, 0]
global start_equity_amount
start_equity_amount = 1000000
global total_equity
total_equity = [start_equity_amount]
global min_border
min_border = -1
global max_border
max_border = -0
global margins_border
margins_border = [[-0, -1], [+0, +1]]
global trading_quantum
trading_quantum = 0.01
global scaler
global arr1
arr1 = [-1,-1]
global arr2
arr2 = [-1,-1]
global dataset
dataset = [-1]
global stopPriceBuy
stopPriceBuy = -1
global closeness_factor
closeness_factor = 0.05
global normalized
normalized = []
global comp_price
comp_price = []
global projected_price
projected_price = []
global __quantity
__quantity = 0
global day_close_price_arr
day_close_price_arr = [-1, -1]
global high_price_arr
high_price_arr = [-1, -1]
global low_price_arr
low_price_arr = [-1, -1]
global volume_arr
volume_arr = [-1, -1]
global open_price_arr
open_price_arr = [-1, -1]
global close_price_arr
close_price_arr = [-1, -1]
global datetime_arr
datetime_arr = []
global loss_factor
loss_factor = 0.0125
global rsi_interval
rsi_interval = 14
global U
U = 6
global Z
Z = 1
global W
W = 1
global V
V = 3
global scheduled_event_1
scheduled_event_1 = None
global scheduled_event_2
scheduled_event_2 = None
global scheduled_event_3
scheduled_event_3 = None
global scheduled_event_4
scheduled_event_4 = None
global symbol_minimum_trade_order_size
symbol_minimum_trade_order_size = 0.5
global verbage
verbage = "None"
global t_indicator_arr
t_indicator_arr = []
global delta_prediction_arr
delta_prediction_arr = []
global real
global last_trade_order
last_trade_order = "Null"
global a0_arr
a0_arr = []
global offset
offset = 1000
global sign
sign = +1
global sign_
sign_ = +1
global _sign
_sign = +1
global weekly_stockprices_arr
weekly_stockprices_arr = [[], [], [], []]
global weekly_stockprices_arr1
weekly_stockprices_arr1 = []
global weekly_stockprices_arr2
weekly_stockprices_arr2 = []
global weekly_stockprices_arr3
weekly_stockprices_arr3 = []
global weekly_stockprices_arr4
weekly_stockprices_arr4 = []
global high_price_arr1
high_price_arr1 = []
global high_price_arr2
high_price_arr2 = []
global high_price_arr3
high_price_arr3 = []
global high_price_arr4
high_price_arr4 = []
global low_price_arr1
low_price_arr1 = []
global low_price_arr2
low_price_arr2 = []
global low_price_arr3
low_price_arr3 = []
global low_price_arr4
low_price_arr4 = []
global volume_arr1
volume_arr1 = []
global volume_arr2
volume_arr2 = []
global volume_arr3
volume_arr3 = []
global volume_arr4
volume_arr4 = []
global open_price_arr1
open_price_arr1 = []
global open_price_arr2
open_price_arr2 = []
global open_price_arr3
open_price_arr3 = []
global open_price_arr4
open_price_arr4 = []
global close_price_arr1
close_price_arr1 = []
global close_price_arr2
close_price_arr2 = []
global close_price_arr3
close_price_arr3 = []
global close_price_arr4
close_price_arr4 = []
global day_close_price_arr1
day_close_price_arr1 = []
global day_close_price_arr2
day_close_price_arr2 = []
global day_close_price_arr3
day_close_price_arr3 = []
global day_close_price_arr4
day_close_price_arr4 = []
global ETHUSD_a0
ETHUSD_a0 = 0
global ethusd_a0
ethusd_a0 = 0
global dotusd_a0
dotusd_a0 = 0
global DD_arr
DD_arr = [0.0, 0.0]
global total_equity_
total_equity_ = [start_equity_amount]
global out_DD
out_DD = [-1, -1]
global sign_inp_arr
sign_inp_arr = []
global rsi_intra_daily
rsi_intra_daily = 50
global exp_factor
exp_factor = 1.0
global enable_live_mode
enable_live_mode = True
global take_profit_factor
take_profit_factor = 0.30
global mom_array
mom_array = [[], [], [], []]
global rsi_intra_daily_array
rsi_intra_daily_array = [[0], [0], [0], [0]]
global shorts_occurences_arr
shorts_occurences_arr = []
global longs_occurences_arr
longs_occurences_arr = []
global new_interval
new_interval = U
global o_direction
o_direction = 0
global close
close = 0.0
global prediction_price_hpc_XGB_arr
prediction_price_hpc_XGB_arr = [0.0]
global prediction_mom_value_arr
prediction_mom_value_arr = [0.0]
global exp_ma_arr
exp_ma_arr = 24 *[0.0]
global rsi
rsi = -1
global ctr_c
global ctr_w
ctr_c = 0
ctr_w = 0
global episode_list
episode_list = []
global sign_arr
sign_arr = []
global exit_stop_loss_flag
exit_stop_loss_flag = False
global dd_total_equity
dd_total_equity = [0]
global rl_weight
rl_weight = +1
global history_hf
history_hf = [0]
FastEmaPeriod = 12
SlowEmaPeriod = 26
def main_job(self, pos):
inp = np.array(pos, dtype='float64').ravel()
P = self.lagrangepoly(inp)
#print(P)
return P
global x
global zeropoly
global onepoly
x = sympy.symbols('x')
zeropoly = x - x
onepoly = zeropoly + 1
global result
result = zeropoly
# --- helper: convert { -1,0,+1 } -> float and compute a vote score
def _vote_score(self, tot, n1, n2):
# weights: tot carries more information, then n1, n2
return (0.10 * float(tot) + 0.45 * float(n1) + 0.45 * float(n2))
def _clip(self, x, lo, hi):
return max(lo, min(hi, x))
# 2) RSI signals -> produce a signed bias in [-1, +1]
# In mean-revert: RSI<30 => bullish, RSI>70 => bearish (contrarian)
# In trend: RSI<30 => bearish, RSI>70 => bullish (confirmatory / “weakness continues”)
def rsi_bias(self, r):
"""Map RSI to a smooth contrarian bias in [-1,+1].
+1 => bullish (oversold), -1 => bearish (overbought), 0 => neutral.
Uses a linear ramp between 30..70 to avoid rare triggers.
"""
try:
r = float(r)
except Exception:
return 0.0
# linear ramp: 30 -> +1, 50 -> 0, 70 -> -1
bias = (50.0 - r) / 20.0
return self._clip(bias, -1.0, +1.0)
def lagrangepoly(self, yseq, xseq=None):
"""Build a Lagrange polynomial from a sequence of `y` values.
If no sequence of `x`s is given, use x = 1, 2, ..."""
if xseq is None:
xseq = list(range(1, len(yseq) + 1))
assert len(yseq) == len(xseq)
result = zeropoly
for j, (xj, yj) in enumerate(zip(xseq, yseq)):
# Build the j'th base polynomial
polyj = onepoly
for m, xm in enumerate(xseq):
if m != j:
polyj *= (x - xm) / (xj - xm)
# Add in the j'th polynomial
result += yj * polyj
return sympy.expand(result)
def get_last_invested_unrealized_profit(self) -> float:
last_unrealized_profit = 0.0
if self.portfolio.invested:
last_unrealized_profit = self.portfolio.total_unrealised_profit
return last_unrealized_profit
def TakeProfit(self):
global take_profit_factor
last_unrealized_profit = self.get_last_invested_unrealized_profit()
if last_unrealized_profit >= 2.5 * self.atr.Current.Value * abs(self.Portfolio[self.symbol].Quantity):
for ticker in self.tickers:
self.EmitInsights(
Insight.Price(ticker, timedelta(self.FastEmaPeriod), InsightDirection.Down)
)
self.Liquidate("ETHUSD")
self.SetHoldings("ETHUSD", 0.0)
self.Debug("Take Profit!")
def ControlDrawdownLevel(self):
global total_equity_
global DD_arr
global close
global U
global X
total_equity_ = np.append(total_equity_, self.Portfolio.TotalPortfolioValue)
self.equity_curve._append(pd.Series(close))
if not self.equity_curve.empty:
#self.Plot('Drawdown Percent', 'Current', self.current_drawdown_percent(self.equity_curve.to_numpy()))
current_dd_percent = self.current_drawdown_percent(self.equity_curve.to_numpy())
max_dd_percent = self.max_drawdown_percent(self.equity_curve.to_numpy())
current_dd = current_dd_percent / 100
#self.Debug('Drawdown Percent' + "\t" + '(Current):' + "\t" + str(current_dd))
else:
return
self.account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
if total_equity_[-1] != 0 and self.account_leverage != 0:
diff = float(np.abs(total_equity_[-1] - total_equity_[-2])) / float(total_equity_[-1])
drawdown = float(diff) / float(self.account_leverage)
if np.isclose([drawdown], [current_dd]):
mean_drawdawn = np.mean([drawdown, current_dd])
else:
mean_drawdawn = current_dd
DD_arr = np.append(DD_arr, self.truncate(drawdown, 4))
if DD_arr[-1] != 0.0:
DD_delta = float(np.abs(DD_arr[-1] - DD_arr[-2])) / float(DD_arr[-1])
'''if DD_delta >= 1.25:'''
if drawdown > 0.10:
self.Debug('Drawdown' + "\t" + '(Current):' + "\t" + str(drawdown))
self.Debug("Leverage control ahead!")
for ticker in self.tickers:
self.EmitInsights(
Insight.Price(ticker, timedelta(self.FastEmaPeriod), InsightDirection.Down)
)
self.Liquidate("ETHUSD")
self.SetHoldings("ETHUSD", 0.00)
def current_drawdown_percent(self, equity_curve):
hwm = max(equity_curve)
current = equity_curve[-1]
return (hwm - current) / hwm * 100
def max_drawdown_percent(self, equity_curve):
i = np.argmax(np.maximum.accumulate(equity_curve) - equity_curve)
if equity_curve[:i].size == 0:
return np.nan
j = np.argmax(equity_curve[:i])
return abs((equity_curve[i]/equity_curve[j]) - 1) * 100
def exp_MA(self, arr):
# Program to calculate exponential
# moving average using formula
x=0.5 # smoothening factor
i = 1
# Initialize an empty list to
# store exponential moving averages
moving_averages = []
# Insert first exponential average in the list
moving_averages.append(arr[0])
# Loop through the array elements
while i < len(arr):
# Calculate the exponential
# average by using the formula
window_average = round((x*arr[i])+
(1-x)*moving_averages[-1], 2)
# Store the cumulative average
# of current window in moving average list
moving_averages.append(window_average)
# Shift window to right by one position
i += 1
print(moving_averages)
return moving_averages
def HFT_daily(self):
global prediction_mom_value_arr
global exp_ma_arr
global rsi
if len(self.mom_arr) > 1:
self.mom_arr[np.isnan(self.mom_arr)] = 0
prediction_mom_value = self.mom_arr[-24:]
prediction_mom_value_arr = np.append(prediction_mom_value_arr, prediction_mom_value)
exp_ma = np.mean(self.exp_MA(prediction_mom_value_arr)[-24:])
exp_ma_arr = np.append(exp_ma_arr, exp_ma)
if exp_ma_arr[-1] >= prediction_mom_value_arr[-1] and self.mom_arr[-1] >= 0:
if rsi < 30:
self.SetHoldings(self.symbol, 0.0)
self._set_fractal_holdings(-1, 0.35)
elif rsi > 90:
self.SetHoldings(self.symbol, 0.0)
self._set_fractal_holdings(+1, 0.35)
def truncate(self, number, decimals=0):
if not isinstance(decimals, int):
raise TypeError("decimal places must be an integer.")
elif decimals < 0:
raise ValueError("decimal places has to be 0 or more.")
elif decimals == 0:
return math.trunc(number)
factor = 10.0 ** decimals
return math.trunc(number * factor) / factor
def AdaptEquityInvestement(self):
global trading_quantum
trading_quantum += 0.001
self.Debug("Month change!")
self.Debug("Portfolio invested cash (current) :" + str(self.Portfolio.TotalMarginUsed))
def GetWeekData(self):
global weekly_stockprices_arr
global weekly_stockprices_arr1
global ETHUSD_a0
weekly_stockprices_arr1 = np.append(weekly_stockprices_arr1, ETHUSD_a0)
weekly_stockprices_arr = [weekly_stockprices_arr1]
def TakeProfit1(self):
global min_border
global max_border
global rl_weight
global exit_stop_loss_flag
[min_border, max_border, rl_weight, exit_stop_loss_flag] = self.obj1.TakeProfit(self, self.symbol)
return exit_stop_loss_flag
def CalcEqDrawDown(self):
global dd_total_equity
dd_total_equity = self.obj2.CalcEqDrawDown(self, dd_total_equity)
def ControlEqDrawdownLevel(self):
global dd_total_equity
self.obj2.ControlDrawdownLevel(self, dd_total_equity, self.symbol)
def ControlLoss(self):
global total_equity
global min_border
global max_border
global rl_weight
global hourly_trade_flag_en
[min_border, max_border, rl_weight, hourly_trade_flag_en] = self.obj1.ControlLoss(self, self.symbol, total_equity, min_border, max_border, rl_weight)
def ControlRSI(self):
global history_hf
global min_border
global max_border
global rl_weight
[min_border, max_border, rl_weight] = self.obj3.ControlRSI(self, history_hf, self.symbol, min_border, max_border, rl_weight)
def Initialize(self):
global X
global U
global Z
global W
global V
global scheduled_event_1
global scheduled_event_2
global scheduled_event_3
global scheduled_event_4
global take_profit_factor
# --- Shorting support (robust across QC python runtimes) ---
try:
self.SetBrokerageModel(BrokerageName.ALPHA_STREAMS, AccountType.Margin)
except Exception as e:
self.SetBrokerageModel(BrokerageName.ALPHA_STREAMS)
self.Debug(f"Brokerage model set without AccountType.Margin (fallback). Reason: {e}")
self.equity_curve = pd.Series()
self.SetStartDate(2020, 1, 1)
self.SetCash(start_equity_amount)
self.tickers = ["ETHUSD"]
self.StartingCapital = self.Portfolio.Cash
self.leverage_targets = 10.0
self.leverage_targets_clip = 10.0
self.leverage_targets_default = 1.0
self.symbol = self.AddCrypto(self.tickers[0], Resolution.HOUR, Market.KRAKEN, fillForward=True, leverage=self.leverage_targets).Symbol
# Ensure leverage/margin model is applied to allow short exposure when supported
try:
self.Securities[self.symbol].SetLeverage(self.leverage_targets)
except Exception as e:
self.Debug(f"SetLeverage failed/ignored: {e}")
try:
# Some environments require an explicit margin buying power model for shorts
self.Securities[self.symbol].SetBuyingPowerModel(SecurityMarginModel(self.leverage_targets))
except Exception:
pass
self.targets = "ETHUSD"
self.closeWindow1 = RollingWindow[float](1440)
# --- Fractal dynamics configuration (regime detection & position sizing) ---
# NOTE: These do NOT guarantee any ROI; they help adapt the strategy to trending vs mean-reverting regimes.
self.fractal_window = 512 # lookback (bars) for fractal metrics
self.fractal_kmax = 10 # Higuchi k_max
self.max_abs_holdings = 0.75 # cap target holdings (e.g., 150% notional)
self.fractal_hurst = 0.50
self.fractal_fd = 1.50
self.fractal_regime = "unknown" # {"trend","mean_revert","noise","unknown"}
self.fractal_ready = False
self.mom_state = 0
self.entradeflag = True
self.fast = self.EMA(self.symbol, 30, Resolution.HOUR)
self.slow = self.EMA(self.symbol, 60, Resolution.HOUR)
self.atr = self.ATR(self.symbol, 14, MovingAverageType.SIMPLE, Resolution.HOUR)
self.sma_arr = []
self.period = 1
self.ma = 2
self.donchian = self.DCH(self.symbol, self.period, self.period, Resolution.HOUR)
self.sma = self.SMA(self.symbol, self.ma, Resolution.HOUR)
self.obj1 = QCDefaultPortfolioRiskManagement()
self.obj1.__init__(take_profit_factor)
self.obj2 = QCPortfolioRiskManagement()
self.obj2.__init__(0.05)
self.obj3 = QCRSIPortfolioRiskManagement()
self.obj3.__init__(14)
scheduled_event_1 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.ChangeHourFlag)
self.Schedule.On(self.DateRules.WeekStart("ETHUSD"), \
self.TimeRules.At(0, 0, 0), \
self.GetWeekData)
scheduled_event_2 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(V *X))), \
self.ControlDrawdownLevel)
'''
scheduled_event_3 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(U *X))), \
self.TakeProfit)
'''
'''
scheduled_event_4 = self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(W *X))), \
self.HFT_daily)
'''
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(15))), \
self.TakeProfit1)
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(Z *X))), \
self.ControlLoss)
#self.Schedule.On(self.DateRules.EveryDay(), \
# self.TimeRules.Every(timedelta(minutes=(15))), \
# self.ControlRSI)
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(V *X))), \
self.CalcEqDrawDown)
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.Every(timedelta(minutes=(V *X))), \
self.ControlEqDrawdownLevel)
self.Schedule.On(self.DateRules.MonthStart("ETHUSD"), \
self.TimeRules.At(0, 0, 0), \
self.AdaptEquityInvestement)
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(9, 31),
Action(self.One))
self.SetWarmup(1440)
self.warmup_period = 1440
consolidator1 = TradeBarConsolidator(timedelta(1))
consolidator1.DataConsolidated += self.OnDailyData
self.SubscriptionManager.AddConsolidator("ETHUSD", consolidator1)
consolidatorm1 = TradeBarConsolidator(60)
consolidatorm1.DataConsolidated += self.OnMinuteData
self.SubscriptionManager.AddConsolidator("ETHUSD", consolidatorm1)
self.daily1 = RollingWindow[TradeBar](2)
self.minute1 = RollingWindow[TradeBar](2)
self.window1 = RollingWindow[TradeBar](2)
self._newHour = False
self._newDay = False
global daily_trade_flag_en
daily_trade_flag_en = False
global hourly_trade_flag_en
hourly_trade_flag_en = False
self.conf_swing_level = 0.0
self.mom_arr = [[]]
self.dataframe = [[]]
self.trainsignaldatabase = [[]]
self.deltapricesdatabase = [[]]
self.in_stock = 0
self.min_trade_interval = timedelta(minutes=10)
# --- Momentum regime tracking (short-bias confirmation) ---
self.mom_state = 0
self.mom_bear_count = 0
self.mom_bull_count = 0
self.mom_confirm = 3
self.last_signal_score = 0.0
self.last_pred_score = 0.0
self.last_trade_dir = 0
self.min_trade_interval = timedelta(minutes=10)
self.fractal_state = {"regime": "noise"}
self.DaysCounter = 0
self.y = 1
self.m = 1
self.monthly_data = []
self.rsi_inter_daily_array = []
# --- Extra Logic ---
self.last_trade_time = self.Time
self.last_trade_time1 = {}
self.position_entry_time = {}
self.position_best_spread = {} # Track best spread seen for each position
self.position_entry_prices = {} # Track entry prices for P&L calculation
self.position_max_net_pnl = {} # Track maximum net P&L seen for trailing stop
# Regime detection: Track BTC price momentum
self.btc_prices = [] # Store last 45 days of BTC prices (1080 hourly bars)
self.is_bull_market = True # Start assuming bull market
# Load configuration
self.tickers = config.TRADING_PAIRS
exchange1_config = config.EXCHANGES['Exchange1']
exchange2_config = config.EXCHANGES['Exchange2']
self.exchange_a = getattr(Market, exchange1_config['market'])
self.exchange_b = getattr(Market, exchange2_config['market'])
self.exchange_a_fee_bps = exchange1_config['fee_bps']
self.exchange_b_fee_bps = exchange2_config['fee_bps']
self.symbols = {}
for ticker in self.tickers:
try:
sym_a = self.add_crypto(ticker, Resolution.HOUR, self.exchange_a, leverage=self.leverage_targets).symbol
sym_b = self.add_crypto(ticker, Resolution.HOUR, self.exchange_b, leverage=self.leverage_targets).symbol
self.symbols[ticker] = (sym_a, sym_b)
except Exception as e:
self.log(f"Could not add {ticker}: {str(e)}. Skipping.")
self.profit_analyzer = FrictionProfitAnalyzer()
self.logic = ArbitrageLogic()
self.max_position_size_usd = config.MAX_POSITION_SIZE_USD
self.min_trade_size_usd = config.MIN_TRADE_SIZE_USD
self.max_open_positions = config.MAX_OPEN_POSITIONS
self.trade_cooldown_hours = 6
self.UniverseSettings.Leverage = self.leverage_targets
self.log(f"Initialized Simplified Arbitrage")
self.log(f"Exchange A: {exchange1_config['name']} ({exchange1_config['market']}) - Fee: {self.exchange_a_fee_bps}bps")
self.log(f"Exchange B: {exchange2_config['name']} ({exchange2_config['market']}) - Fee: {self.exchange_b_fee_bps}bps")
self.log(f"Trading pairs: {', '.join(self.symbols.keys())}")
self.log(f"Min spread percent: {self.logic.MIN_SPREAD_PERCENT}%")
self.log(f"Min spread (bps): {self.logic.MIN_SPREAD_BPS}")
self.log(f"Max position size: ${self.max_position_size_usd:,.0f}")
self.log(f"Min trade size: ${self.min_trade_size_usd:,.0f}")
def One(self):
if not (self.window1.IsReady and self.daily1.IsReady and self.minute1.IsReady): return
currBar1 = self.window1[0].Close
yesterdayc1 = self.daily1[1].Close
minuteBarC1 = self.minute1[1].Close
minuteBar01 = self.minute1[1].Open
def OnDailyData(self, sender, bar):
self.daily1.Add(bar)
def OnMinuteData(self, sender, bar):
self.minute1.Add(bar)
def ChangeHourFlag(self):
self._newHour = True
def OnEndOfDay(self):
self._newDay = True
# ---------------- Fractal dynamics helpers ----------------
def _update_fractal_state(self, price_series: np.ndarray):
"""Update rolling fractal metrics on a 1D array of prices (chronological)."""
try:
if price_series is None or len(price_series) < int(self.fractal_window):
self.fractal_ready = False
return
# Defensive: remove NaNs/Infs
s = np.asarray(price_series, dtype=float)
s = s[np.isfinite(s)]
if len(s) < int(self.fractal_window):
self.fractal_ready = False
return
# Use last window only
s = s[-int(self.fractal_window):]
self.fractal_hurst = float(hurst_dfa(s))
self.fractal_fd = float(higuchi_fd(s, kmax=int(self.fractal_kmax)))
self.fractal_regime = str(fractal_regime(self.fractal_hurst))
self.fractal_ready = True
except Exception as e:
# Never break trading loop due to diagnostics
self.fractal_ready = False
self.fractal_regime = "unknown"
def _set_fractal_holdings(self, data, direction: int, base_strength: float):
"""Safe wrapper around SetHoldings with inverse-volatility sizing.
direction: +1 long, -1 short
base_strength: 0..1 scales the target exposure (confidence)
"""
try:
if not hasattr(self, "atr") or self.atr is None or not self.atr.IsReady:
return
price = float(self.Securities[self.symbol].Price)
if price <= 0:
return
atr_val = float(self.atr.Current.Value)
if atr_val <= 0:
return
# Inverse-volatility position sizing (de-risk when ATR spikes)
vol = atr_val / price # fraction of price
risk_budget = 0.015 # tune 0.5%..2.0%
size = risk_budget / max(vol, 1e-4)
# Clamp to avoid oversized targets
size = min(size, 0.30)
strength = self._clip(float(base_strength), 0.0, 1.0)
target = float(direction) * float(size) * float(strength)
strength *= self.leverage_targets_clip
target_stregth = self.leverage_targets_clip
# Global cap (SetHoldings target fraction)
target = max(-float(self.max_abs_holdings), min(float(self.max_abs_holdings), target))
for ticker, (sym_a, sym_b) in self.symbols.items():
if not (data.contains_key(sym_a) and data.contains_key(sym_b)):
continue
bar_a = data[sym_a]
bar_b = data[sym_b]
if bar_a is None or bar_b is None:
continue
price_a = bar_a.close if hasattr(bar_a, 'close') else bar_a.price
price_b = bar_b.close if hasattr(bar_b, 'close') else bar_b.price
if price_a <= 0 or price_b <= 0:
continue
spread = price_b - price_a
spread_pct = abs(spread) / price_a * 100 if price_a > 0 else 0
self.log(f"{ticker}: Spread={spread_pct:.4f}% | A=${price_a:.2f} B=${price_b:.2f}")
spread_opportunity = self.logic.calculate_arbitrage_opportunity(ticker, spread, price_a)
if spread_opportunity is None:
continue
portfolio_value = self.portfolio.total_portfolio_value
max_allocation = min(config.MAX_PORTFOLIO_ALLOCATION * portfolio_value, self.max_position_size_usd)
size_usd = max_allocation
qty = size_usd / price_a
gross_profit = abs(spread) * qty
profit_result = self.profit_analyzer.analyze_opportunity(
f"{ticker}_arb",
gross_profit,
size_usd
)
if profit_result.net_profit_weighted <= 0:
self.log(f"{ticker}: REJECTED - Net profit ${profit_result.net_profit_weighted:.2f} (Gross=${gross_profit:.2f} Fees=${profit_result.friction:.2f})")
continue
if not profit_result.triggers_execution:
self.log(f"{ticker}: REJECTED - Below trigger threshold")
continue
current_qty_a = self.portfolio[sym_a].quantity
current_qty_b = self.portfolio[sym_b].quantity
if abs(current_qty_a) > 0.0001 or abs(current_qty_b) > 0.0001:
self.log(f"{ticker}: SKIP - Already holding position")
continue
if ticker in self.last_trade_time1:
hours_since = (self.time - self.last_trade_time1[ticker]).total_seconds() / 3600
if hours_since < self.trade_cooldown_hours:
continue
# BULL MARKET: Bet on spread convergence (original strategy)
# BEAR MARKET: FLIP - Bet on spread widening
if self.is_bull_market:
# Normal: buy low exchange, sell high exchange
if spread > 0:
security_sym_a = self.Securities[sym_a]
buying_power_sym_a = self.Portfolio.MarginRemaining
if buying_power_sym_a > 0:
SetCustomHoldings(self, sym_a, +target_stregth) # Buy A (cheaper)
security_sym_b = self.Securities[sym_b]
buying_power_sym_b = self.Portfolio.MarginRemaining
if buying_power_sym_b > 0:
SetCustomHoldings(self, sym_b, -target_stregth) # Sell B (expensive)
else:
security_sym_b = self.Securities[sym_b]
buying_power_sym_b = self.Portfolio.MarginRemaining
if buying_power_sym_b > 0:
SetCustomHoldings(self, sym_b, +target_stregth) # Buy B (cheaper)
security_sym_a = self.Securities[sym_a]
buying_power_sym_a = self.Portfolio.MarginRemaining
if buying_power_sym_a > 0:
SetCustomHoldings(self, sym_a, -target_stregth) # Sell A (expensive)
else:
# BEAR MARKET: FLIP - buy expensive, sell cheap (bet spread widens)
if spread > 0:
security_sym_a = self.Securities[sym_a]
buying_power_sym_a = self.Portfolio.MarginRemaining
if buying_power_sym_a > 0:
SetCustomHoldings(self, sym_a, -target_stregth) # Sell A (cheaper)
security_sym_b = self.Securities[sym_b]
buying_power_sym_b = self.Portfolio.MarginRemaining
if buying_power_sym_b > 0:
SetCustomHoldings(self, sym_b, +target_stregth) # Buy B (expensive)
else:
security_sym_b = self.Securities[sym_b]
buying_power_sym_b = self.Portfolio.MarginRemaining
if buying_power_sym_b > 0:
SetCustomHoldings(self, sym_b, -target_stregth) # Sell B (cheaper)
security_sym_a = self.Securities[sym_a]
buying_power_sym_a = self.Portfolio.MarginRemaining
if buying_power_sym_a > 0:
SetCustomHoldings(self, sym_a, +target_stregth) # Buy A (expensive)
self.last_trade_time1[ticker] = self.time
self.position_entry_time[ticker] = self.time
self.position_best_spread[ticker] = spread_pct # Initialize best spread
self.position_entry_prices[ticker] = (price_a, price_b, qty) # Store entry prices and qty
self.position_max_net_pnl[ticker] = 0 # Initialize max net P&L
self.log(f"TRADE EXECUTED: {ticker} Net=${profit_result.net_profit_weighted:.2f} Gross=${gross_profit:.2f} Fees=${profit_result.friction:.2f} Spread={spread_pct:.3f}% Qty={qty:.6f}")
except Exception:
# fall back to flat if anything goes wrong
self.SetHoldings(self.symbol, 0.0)
def OnOrderEvent(self, orderEvent: OrderEvent):
if orderEvent.Status == OrderStatus.Invalid:
self.last_reject_time[orderEvent.Symbol] = self.Time
if orderEvent.Status == OrderStatus.Canceled:
# sometimes buying power rejections show as canceled in certain models
self.last_reject_time[orderEvent.Symbol] = self.Time
def OnData(self, data):
global daily_trades_allowance_quant
global daily_trades_allowance_quant_thresh
global minute_trades_allowance_quant
global minute_trades_allowance_quant_thresh
global daily_trade_flag_en
global hourly_trade_flag_en
global bet_arr
global hist_arr
global arr1
global arr2
global min_border
global max_border
global scaler
global margins_border
global dataset
global normalized
global projected_price
global rsi_interval
global __quantity
global total_equity
global day_close_price_arr
global high_price_arr1
global low_price_arr1
global volume_arr1
global datetime_arr
global open_price_arr1
global close_price_arr1
global day_close_price_arr1
global verbage
global DD_arr
global t_indicator_arr
global delta_prediction_arr
global real
global last_trade_order
global a0_arr
global ETHUSD_a0
global offset
global sign
global sign_
global weekly_stockprices_arr
global out_DD
global sign_inp_arr
global rsi_intra_daily
global exp_factor
global no_ctr
global episode_list
global sign_arr
global _sign
global mom_array
global rsi_intra_daily_array
global dayscounter
global enable_live_mode
global close
global prediction_price_hpc_XGB_arr
global prediction_mom_value_arr
global exp_ma_arr
global rsi
global x
global zeropoly
global onepoly
global ctr_c
global ctr_w
global result
global trading_quantum
global exit_stop_loss_flag
global history_hf
global rl_weight
global order_status_flag
for ticker in self.tickers:
if data.ContainsKey(ticker):
if ticker == "ETHUSD":
self.closeWindow1.Add(data["ETHUSD"].Close)
if not self.closeWindow1.IsReady: return
for ticker in self.tickers:
if data.Bars.ContainsKey(ticker):
if ticker == "ETHUSD":
self.window1.Add(data.Bars["ETHUSD"])
if not (self.window1.IsReady and self.daily1.IsReady): return
for ticker in self.tickers:
if data[ticker] is None : return
if not self.donchian.IsReady: return
if self.is_warming_up:
return
# Update regime detection
self._update_market_regime(data)
# First, check for exits on existing positions
self._check_exits(data)
np.random.seed(7)
if self._newDay == True:
daily_trades_allowance_quant = 0
daily_trade_flag_en = True
self._newDay = False
ETHUSD_a0 = self.closeWindow1[0]
self.DaysCounter += 1
if self.DaysCounter == (self.m *30):
self.monthly_data.append(self.closeWindow1[0])
self.m += 1
if self.DaysCounter == (self.y *365):
weekly_stockprices_arr = [[], [], [], []]
day_close_price_arr1 = []
self.y += 1
self.Debug("Day change!")
if self._newHour == True:
minute_trades_allowance_quant = 0
hourly_trade_flag_en = True
self._newHour = False
open_1 = np.array(self.daily1[1].Open, float).ravel()
open_price_arr1 = np.append(open_price_arr1, open_1)
open_price_arr1 = open_price_arr1[-self.warmup_period:]
close_1 = np.array(self.daily1[1].Close, float).ravel()
close_price_arr1 = np.append(close_price_arr1, close_1)
close_price_arr1 = close_price_arr1[-self.warmup_period:]
high_1 = self.daily1[1].High
high_price_arr1 = np.append(high_price_arr1, high_1)
high_price_arr1 = high_price_arr1[-self.warmup_period:]
low_1 = self.daily1[1].Low
low_price_arr1 = np.append(low_price_arr1, low_1)
low_price_arr1 = low_price_arr1[-self.warmup_period:]
yesterdayc1 = self.daily1[1].Close
day_close_price_arr1 = np.append(day_close_price_arr1, yesterdayc1)
day_close_price_arr1 = day_close_price_arr1[-self.warmup_period:]
volume1 = self.daily1[1].Volume
volume_arr1 = np.append(volume_arr1, volume1)
volume_arr1 = volume_arr1[-self.warmup_period:]
self.Debug("(Virtual) Hour change!")
else:
return
day_close_price_arr = [day_close_price_arr1]
open_price_arr = [open_price_arr1]
high_price_arr = [high_price_arr1]
low_price_arr = [low_price_arr1]
close_price_arr = [close_price_arr1]
volume_arr = [volume_arr1]
self.Debug("Current trade date:" + "\t" + str(self.Time))
self.Log(str(self.Portfolio.TotalPortfolioValue))
self.closeWindow = [self.closeWindow1]
datetime_arr = np.append(datetime_arr, self.Time.strftime('%Y-%m-%d'))
ticker_idx = 0
for ticker in self.tickers:
tmp_arr_values = []
tmp_arr_values1 = []
tmp_arr_values2 = []
for index in range(1439, 0, -1):
tmp_arr_values = np.append(tmp_arr_values, self.closeWindow[ticker_idx][index])
delta = (self.closeWindow[ticker_idx][(index - 1)] - self.closeWindow[ticker_idx][index])
tmp_arr_values1 = np.append(tmp_arr_values1, delta)
if self.closeWindow[ticker_idx][index] <= self.closeWindow[ticker_idx][(index - 1)]:
signal = 1
else:
signal = 0
tmp_arr_values2 = np.append(tmp_arr_values2, signal)
self.dataframe[ticker_idx] = tmp_arr_values
self.deltapricesdatabase[ticker_idx] = tmp_arr_values1
self.trainsignaldatabase[ticker_idx] = tmp_arr_values2
self.hist = self.dataframe[ticker_idx]
historical_train_data = self.hist
# Get historical equity curves.
history_hf = self.history([self.Symbol("ETHUSD")], timedelta(365), Resolution.Daily)['close'].unstack(0)
a0_arr = np.append(a0_arr, np.mean(historical_train_data))
stockprices = historical_train_data
upperband, middleband, lowerband = ta.BBANDS(np.array(stockprices).ravel()*100000, timeperiod=90, nbdevup=2, nbdevdn=2,matype=0)
upVal = upperband[-1]/100000
middleVal = middleband[-1]/100000
lowVal = lowerband[-1]/100000
a = np.array([upVal, middleVal, lowVal])
a0 = self.closeWindow[ticker_idx][0]
projected_price = a.flat[np.abs(a - a0).argmin()]
projected_price = projected_price[np.logical_not(np.isnan(projected_price))]
if len(day_close_price_arr) >= 14:
real1 = ta.AD(np.array(high_price_arr[ticker_idx][-14:], float).ravel(), np.array(low_price_arr[ticker_idx][-14:], float).ravel(), np.array(day_close_price_arr[ticker_idx][-14:], float).ravel(), np.array(volume_arr[ticker_idx][-14:], float).ravel())
real1 = np.mean(real1[np.logical_not(np.isnan(real1))])
else:
real1 = -1.0
if len(day_close_price_arr) >= 14:
real2 = ta.MFI(high_price_arr[ticker_idx], low_price_arr[ticker_idx], close_price_arr[ticker_idx], volume_arr[ticker_idx], timeperiod=14)
real2 = np.mean(real2[np.logical_not(np.isnan(real2))])
else:
real2 = -1
arr1 = np.append(arr1, self.truncate(a0, 4))
bet_arr = np.append(bet_arr, projected_price)
arr2 = np.append(arr2, self.truncate(np.abs(bet_arr[-1]), 4))
close = arr1[-1]
stopPriceBuy = close * .99
stopPriceSell = close * 1.01
if len(stockprices) > rsi_interval:
_data_ = np.array(stockprices, float)
rsi = ta.RSI(_data_.ravel(), timeperiod=rsi_interval)
rsi = rsi[np.logical_not(np.isnan(rsi))][-1]
else:
return
self.rsi_inter_daily_array.append(rsi)
if len(weekly_stockprices_arr[ticker_idx]) > rsi_interval:
_data__ = np.array(weekly_stockprices_arr[ticker_idx], float)
rsi_intra_daily = ta.RSI(_data__.ravel(), timeperiod=rsi_interval)
rsi_intra_daily = rsi_intra_daily[np.logical_not(np.isnan(rsi_intra_daily))][-1]
else:
rsi_intra_daily = 50
rsi_intra_daily_array[ticker_idx] = np.append(rsi_intra_daily_array[ticker_idx], rsi_intra_daily)
if len(self.monthly_data) > rsi_interval:
_data___ = np.array(self.monthly_data, float)
rsi_intra_monthly = ta.RSI(_data___.ravel(), timeperiod=rsi_interval)
rsi_intra_monthly = rsi_intra_monthly[np.logical_not(np.isnan(rsi_intra_monthly))][-1]
else:
rsi_intra_monthly = 50
# Fractal regime update (uses same close series we use for intra-day RSI)
if len(stockprices) >= int(getattr(self, "fractal_window", 512)):
_fractal_prices = np.array(stockprices[-int(self.fractal_window):], dtype=float)
self._update_fractal_state(
np.array(_fractal_prices)[-self.fractal_window:]
)
real = ta.MOM(day_close_price_arr1, timeperiod=10)
real = np.mean(real[np.logical_not(np.isnan(real))])
self.mom_arr = np.append(self.mom_arr, real)
mom_array[ticker_idx] = np.append(mom_array[ticker_idx], self.mom_arr[-1])
File_data = stockprices[-int(self.fractal_window):]
input = []
ctr = False
for index in range(1, len(File_data) - 1, 1):
if index % 2 == 0:
ctr = True
else:
ctr = False
if ctr == False:
if File_data[index+1] > File_data[index]:
input = np.append(input, +1)
elif File_data[index+1] == File_data[index]:
input = np.append(input, 0)
elif File_data[index+1] < File_data[index]:
input = np.append(input, -1)
else:
if File_data[index+1] > File_data[index-1]:
input = np.append(input, +1)
elif File_data[index+1] == File_data[index-1]:
input = np.append(input, 0)
elif File_data[index+1] < File_data[index-1]:
input = np.append(input, -1)
#self.Debug(str(input))
predictor = input[1::2]
evaluator = input[1::3]
ctr = 0
for index in range(0, len(input), 3):
result_ = zeropoly
for pos in range(index, index+3):
result += self.main_job(input[pos:pos+3])
result_ += self.main_job(input[pos:pos+3])
f = sympy.simplify(result)
f_ = sympy.simplify(result_)
#self.Debug(str(f))
# evaluating the expression {f}
prediction_f = np.sign(f_.evalf(subs={x:predictor[ctr]}))
#self.Debug(str(prediction_f))
if prediction_f == evaluator[ctr]:
#self.Debug("CORRECT!")
ctr_c += 1
else:
#self.Debug("wrong...")
ctr_w += 1
ctr += 1
#self.Debug(str(ctr_c) + "\t" + str(ctr_w))
output = []
ctr = False
for index in range(1439, (1439 - 3), -1):
if index % 2 == 0:
ctr = True
else:
ctr = False
if ctr == False:
if self.closeWindow[ticker_idx][index] < self.closeWindow[ticker_idx][(index - 1)]:
output = np.append(output, +1)
elif self.closeWindow[ticker_idx][index] == self.closeWindow[ticker_idx][(index - 1)]:
output = np.append(output, 0)
elif self.closeWindow[ticker_idx][index] > self.closeWindow[ticker_idx][(index - 1)]:
output = np.append(output, -1)
else:
if self.closeWindow[ticker_idx][index] < self.closeWindow[ticker_idx][(index - 2)]:
output = np.append(output, +1)
elif self.closeWindow[ticker_idx][index] == self.closeWindow[ticker_idx][(index - 2)]:
output = np.append(output, 0)
elif self.closeWindow[ticker_idx][index] > self.closeWindow[ticker_idx][(index - 2)]:
output = np.append(output, -1)
for index in range(0, len(output)):
for pos in range(index, index+3):
result += self.main_job(output[pos:pos+3])
f = sympy.simplify(result)
#self.Debug(str(f))
prediction_f_next1 = np.sign(f.evalf(subs={x:output[0]}))
prediction_f_next2 = np.sign(f_.evalf(subs={x:output[0]}))
prediction_f_next_tot = np.sign(np.mean([prediction_f_next1, prediction_f_next2]))
[exit_flag, exit_stop_loss_flag] = self.obj1.AdjustPortfolioManagement(self, self.symbol, exit_stop_loss_flag, trading_quantum, arr2[-1], close)
if exit_flag == False:
return
if hourly_trade_flag_en == True and exit_stop_loss_flag == False:
## PREDICT PRICE MODE ##
# --- inside OnData after you have rsi, rsi_intra_daily, prediction_f_next_tot/1/2,
# and you already updated fractal regime (self.fractal_state['regime'])
# --- Unified scoring + execution (higher trade frequency, lower drawdown) ---
# 1) Prediction “direction” score in [-1, +1]
pred_score = self._vote_score(prediction_f_next_tot, prediction_f_next1, prediction_f_next2)
pred_score = self._clip(pred_score, -1.0, +1.0)
# Do not starve the strategy: allow weaker signals through
if abs(pred_score) < 0.10:
return
# 2) Smooth RSI biases (contrarian by default)
daily_bias = self.rsi_bias(rsi)
intra_bias = self.rsi_bias(rsi_intra_daily)
# 3) Combine signals into a single score
w_pred, w_rsi_d, w_rsi_i = 0.05, 0.85, 0.10
score = (w_pred * pred_score) + (w_rsi_d * daily_bias) + (w_rsi_i * intra_bias)
score = self._clip(score, -1.0, +1.0)
current_qty = self.Portfolio[self.symbol].Quantity
if current_qty > 0:
if score < -0.15 or rsi > 65:
self.SetHoldings(self.symbol, 0.0)
self.last_trade_time = self.Time
return
elif current_qty < 0:
if score > 0.15 or rsi < 35:
self.SetHoldings(self.symbol, 0.0)
self.last_trade_time = self.Time
return
# 5) Convert score -> target direction + magnitude
deadband = 0.12
if abs(score) < deadband:
target_dir = 0
target_mag = 0.0
else:
target_dir = +1 if score > 0 else -1
# map |score| from [deadband..1] -> [min_size..1]
min_size = 0.20
target_mag = min_size + (1.0 - min_size) * ((abs(score) - deadband) / (1.0 - deadband))
target_mag = self._clip(target_mag, 0.0, 1.0)
# 6) Cooldown / hysteresis
if hasattr(self, "last_trade_time") and (self.Time - self.last_trade_time) < self.min_trade_interval:
current_dir = 0 if current_qty == 0 else (1 if current_qty > 0 else -1)
# Allow going flat anytime
if current_qty != 0 and target_dir == 0:
self.SetHoldings(self.symbol, 0.0)
self.last_trade_time = self.Time
return
# Allow reversal only if signal is strong enough
if current_dir != 0 and target_dir != 0 and current_dir != target_dir and abs(score) >= 0.55:
self._set_fractal_holdings(data, target_dir, target_mag)
self.last_trade_time = self.Time
return
# 7) Execute action
if target_dir == 0:
self.SetHoldings(self.symbol, 0.0)
else:
self._set_fractal_holdings(data, target_dir, target_mag)
self.last_trade_time = self.Time
# self.min_trade_interval = timedelta(minutes=30)
if hasattr(self, "last_trade_time") and (self.Time - self.last_trade_time) < self.min_trade_interval:
# During cooldown: allow exit or strong reversal, block small flips
current_qty = self.Portfolio[self.symbol].Quantity
current_dir = 0 if current_qty == 0 else (1 if current_qty > 0 else -1)
# Always allow going flat
if current_qty != 0 and target_dir == 0:
self.SetHoldings(self.symbol, 0.0)
self.last_trade_time = self.Time
# Allow reversal only if signal is strong enough
elif current_dir != 0 and target_dir != 0 and current_dir != target_dir and abs(score) >= 0.75:
self._set_fractal_holdings(data, target_dir, target_mag)
self.last_trade_time = self.Time
return
# 7) Execute only one final action
if target_dir == 0:
self.SetHoldings(self.symbol, 0.0)
else:
self._set_fractal_holdings(data, target_dir, target_mag)
self.last_trade_time = self.Time#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
# Your New Python File
def SetCustomHoldings(self, ticker, target_stregth):
if self.Portfolio.MarginRemaining < 50:
return
if self.Transactions.GetOpenOrders(ticker):
return
# place orders here
SafeMarketOrder(self, ticker, target_stregth, tag="rebalance")
def SafeMarketOrder(self, symbol, target_percent, tag="rebalance"):
# 1) avoid stacking orders (keep your edited guard style)
if self.Transactions.GetOpenOrders(symbol):
return None
sec = self.Securities[symbol]
if not sec.HasData or sec.Price <= 0:
return None
# 2) (optional) don’t trade during warmup
if self.IsWarmingUp:
return None
# 3) hard cap the target exposure (adjust to taste)
# for cash-like crypto, keep <= 1.0; for margin, you can allow >1 but be careful
max_abs_target = 1.0
target_percent = max(-max_abs_target, min(max_abs_target, float(target_percent)))
# 4) get the *order quantity* that matches the target % and respects buying power
qty = self.CalculateOrderQuantity(symbol, target_percent) # QC helper
if qty is None or abs(qty) < 1e-8:
return None
# 5) place the order (delta quantity)
return self.MarketOrder(symbol, qty, tag=tag)#
# Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# System.py -- Main API core.
#
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.5.0.0.10862. Copyright 2014 QuantConnect Corporation.
#
########################################################
# Developed by Anastasios N. Bikos. ©DeepConscius All Rights Reserved.
# # mpikos@live.com
# # Copyright ~ ~ ~ 04/04/2026
# # Version ~ ~ ~ 4.0
# # ~ ~ ~
########################################################
# region imports
from AlgorithmImports import *
# endregion
class RiskManager:
def __init__(self, algorithm):
self.algorithm = algorithm
def update_positions(self, data):
pass
def check_drawdown_limits(self):
return True, "OK"
def check_rate_limits(self):
return True, "OK"
def check_portfolio_limits(self, size_usd):
return True, "OK"
def calculate_position_size(self, gross_profit, price):
return gross_profit * 1000
def create_position_tracker(self, symbol, qty, price):
pass
def record_trade(self, vector_id, profit):
pass