| Overall Statistics |
|
Total Orders 388 Average Win 1.87% Average Loss -1.74% Compounding Annual Return 32.338% Drawdown 22.200% Expectancy 0.516 Start Equity 10000 End Equity 53799.83 Net Profit 437.998% Sharpe Ratio 1.13 Sortino Ratio 0.847 Probabilistic Sharpe Ratio 70.573% Loss Rate 27% Win Rate 73% Profit-Loss Ratio 1.07 Alpha 0.139 Beta 0.487 Annual Standard Deviation 0.178 Annual Variance 0.032 Information Ratio 0.402 Tracking Error 0.181 Treynor Ratio 0.413 Total Fees $259.92 Estimated Strategy Capacity $49000000.00 Lowest Capacity Asset MGC Z0A8S1BLJYXP Portfolio Turnover 19.29% Drawdown Recovery 197 |
from AlgorithmImports import *
import pandas_ta as ta
qb = QuantBook()
import matplotlib.pyplot as plt
plt.style.use("seaborn-v0_8")
"""
qb.add_crypto = BTCUSD
qb.add_forex = EURUSD
qb.add_equity = "XLK", "TLT", "SPY", "QQQ", "IWL", "GLD", "TLT", "VNQ"
Example:
symbol = qb.add_equity("QQQ").symbol
start_time = datetime(2020, 1, 1)
end_time = datetime(2025, 1, 1)
leverage = 1.0
mode = 1
stoploss_atr = None
takeprofit_atr = None
entry_condition = lambda df: (df['ibs'] < 0.1) & (df['rsi(14)'] < 55)
exit_condition = lambda df: df['close'] > df['high'].shift(1)
"""
""" Strategies:
Panic selling: (QQQ, SPY, XLRE, GLD, XLK,XLF,XLC,XLP, IEUR, STXE, DAX, XLV)
entry_condition = lambda df: (df['ibs'] < 0.1) & ((df['close'] > df['sma(20)']) | (df['close'] > df['sma(200)']))
exit_condition = lambda df: df['close'] > df['high'].shift(1)
IBS Strategy: (QQQ, SPY)
entry_condition = lambda df: df['ibs'] < 0.1
exit_condition = lambda df: df['ibs'] > 0.8
MACD Strategy: (QQQ,SPY,NQ)
entry_condition = lambda df: (df['macd_hist'] < df['macd_hist'].shift(1)) & (df['macd_hist'].shift(1) < df['macd_hist'].shift(2)) & (df['macd_hist'].shift(2) < df['macd_hist'].shift(3)) & (df['macd_hist'] < 0) & (df['close'] < df['close'].shift(1))
exit_condition = lambda df: df['close'] > df['high'].shift(1)
ADX strategy: (QQQ,SPY,GLD, DAX, IEUR)
entry_condition = lambda df: (df["adx(5)"] > 40) & (df['close'] < df['high'].shift(1)) & ((df['close'] > df['sma(20)']) | (df['close'] > df['sma(200)']))
exit_condition = lambda df: df['close'] > df['high'].shift(1)
Volatility Edges: (QQQ, STXE)
entry_condition = lambda df: (df['ibs'] < 0.6) & (df['close'] < (df["tp(10)"] - df['atr(10)'] * 1.5)) & (df["adx(10)"] > 20)
exit_condition = lambda df: df['close'] > df['high'].shift(1)
Turn Around Tuesday: (QQQ, SPY, XLRE, GLD, XLK, XLF, XLC, XLP, IEUR, STXE, DAX)
entry_condition = lambda df: (df['close'] < df['close'].shift(1)) & (df['close'].shift(1) < df['close'].shift(2)) & (df.index.weekday.isin([0]))
exit_condition = lambda df: df['close'] > df['high'].shift(1)
24H strategy: (SPY, IEUR)
entry_condition = lambda df: (df['close'] < df['close'].shift(1)) & (df['close'].shift(1) < df['close'].shift(2)) & (df.index.weekday.isin([0, 1]))
exit_condition = lambda df: (df['entry'].shift(1) == 1) and max_holding_days = 1
FUTURES:
symbol = qb.add_future("NQ").symbol
Combinations:
ADX + Panic selling
entry_condition = lambda df: ((df["adx(5)"] > 40) & (df['close'] < df['high'].shift(1)) & ((df['close'] > df['sma(20)']) | (df['close'] > df['sma(200)'])) | (df['ibs'] < 0.1) & ((df['close'] > df['sma(20)']) | (df['close'] > df['sma(200)'])))
"""
######################################################################################
symbol = qb.add_future("MNQ").symbol
start_time = datetime(2020, 1, 1)
end_time = datetime(2025, 1, 1)
sl_thresh = None # None
max_holding_days = None # none
cooldown_days = None # None
entry_condition = lambda df: (df['ibs'] < 0.6) & (df['close'] < (df["tp(10)"] - df['atr(10)'] * 1.5)) & (df["adx(10)"] > 20)
exit_condition = lambda df: df['close'] > df['high'].shift(1)
######################################################################################
# Import of data from QuantBook
data = qb.history(symbol, start_time, end_time, Resolution.DAILY ) # download of data
data = data.reset_index()
data.set_index('time', inplace=True)
data["return"] = np.log(data["close"] / data["close"].shift(1))
data['entry'] = 0
data['exit'] = 0
data['position'] = 0
in_position = False
#Define indicators
data["ibs"] = (data["close"] - data["low"]) / (data["high"] - data["low"])
data['rsi(14)'] = ta.rsi(data['close'], length=14,min_periods=1)
data['adx(10)'] = ta.adx(high=data['high'], low=data['low'], close=data['close'], length=10,min_periods=1)['ADX_10']
data['adx(5)'] = ta.adx(high=data['high'], low=data['low'], close=data['close'], length=5,min_periods=1)['ADX_5']
data['atr(10)'] = ta.atr(high=data['high'], low=data['low'], close=data['close'], length=10,min_periods=1)
data["tp"] = (data["low"] + data["close"] + data["high"]) / 3
data["tp(10)"] = data["tp"].rolling(10).mean()
data["rsi(2)"] = ta.rsi(data['close'], length=2,min_periods=1)
data['sma(20)'] = ta.sma(data['close'], length=20,min_periods=1)
data['sma(3)'] = ta.sma(data['close'], length=3,min_periods=1)
data['sma(30)'] = ta.sma(data['close'], length=30,min_periods=1)
data['sma(200)'] = ta.sma(data['close'], length=200,min_periods=1)
macd = ta.macd(data['close'], fast=12, slow=26, signal=9)
data['macd_line'] = macd['MACD_12_26_9'] # hlavní MACD
data['macd_signal'] = macd['MACDs_12_26_9'] # signální linie
data['macd_hist'] = macd['MACDh_12_26_9'] # histogram
data["W%R"] = ta.willr(data['high'], data['low'], data['close'], length=14,min_periods=1)
# Nescessary drop of uncomplete data
data = data.dropna()
# Calculation of cumulative return (zůstává stejné)
data["cumulative_return"] = data["return"].cumsum().apply(np.exp)
# === NOVÁ LOGIKA PRO POZICE + SBÍRÁNÍ TRADES (všechny exits) ===
data['position'] = 0
data['entry_signal'] = entry_condition(data).astype(bool)
data['exit_signal'] = exit_condition(data).astype(bool)
trades = [] # nově zde – sbíráme všechny uzavřené trades
in_trade = False
entry_i = None
entry_price = np.nan
last_sl_i = None
for i in range(len(data)):
row_index = data.index[i]
if not in_trade:
# Cooldown po SL?
can_enter = True
if cooldown_days is not None and last_sl_i is not None:
days_since_sl = i - last_sl_i
if days_since_sl <= cooldown_days:
can_enter = False
# Použijeme předpočítaný signál na aktuálním baru (vstup i výstup na close)
entry_sig = data['entry_signal'].iloc[i]
if can_enter and entry_sig:
data.loc[row_index, 'position'] = 1
in_trade = True
entry_i = i
entry_price = data.loc[row_index, 'close'] # vstup na close
else:
# Normální exit – také předpočítaný signál na aktuálním close
normal_exit = data['exit_signal'].iloc[i]
# Stop Loss – na close (chceš vše na close, tak to necháme takto)
sl_exit = False
if sl_thresh is not None:
current_ret = data.loc[row_index, 'close'] / entry_price - 1
if current_ret <= sl_thresh:
sl_exit = True
# Časový stop – opravíme +1
time_exit = False
if max_holding_days is not None:
days_held = i - entry_i # první den držení = 0
if days_held >= max_holding_days:
time_exit = True
if normal_exit or sl_exit or time_exit:
data.loc[row_index, 'position'] = 0
# Log-returny tradeu: od prvního baru PO vstupu až po exit
# vstup na close indexu entry_i → první výnos je na indexu entry_i+1
# exit na close indexu i → poslední výnos je return na indexu i
if i > entry_i:
trade_return = data['return'].iloc[entry_i+1 : i+1].sum()
trades.append(np.exp(trade_return) - 1)
in_trade = False
if sl_exit:
last_sl_i = i
else:
data.loc[row_index, 'position'] = 1
trades = np.array(trades) # převod na array pro pozdější výpočty
# Pozice, která byla otevřená během výnosu na baru t (drženo přes noc z t-1 do t)
data['position_at_open'] = data['position'].shift(1).fillna(0)
data["strategy_return"] = data["return"] * data["position_at_open"]
data["strategy_cumulative_return"] = data["strategy_return"].cumsum().apply(np.exp)
data.loc[row_index, 'entry'] = 1
# ... (grafy zůstávají stejné)
# === METRIKY – BEZPEČNÉ VÝPOČTY (žádné warningy při 0 trades) ===
time = len(data.index)
time_years = time / 252
count = (data['position'] == 1).sum()
time_in_the_market = count / time if time > 0 else 0
total_trades = len(trades) # nyní správně včetně SL/time exitů
total_return = data["strategy_cumulative_return"].iloc[-1]
# Drawdown a duration (zůstává stejné)
data['drawdown'] = (data["strategy_cumulative_return"] - data["strategy_cumulative_return"].cummax()) / data["strategy_cumulative_return"].cummax()
drawdown_line = data['drawdown'].copy()
begin_drawdown = drawdown_line[drawdown_line == 0].index
end_drawdown = begin_drawdown[1:]
end_drawdown = end_drawdown.append(pd.DatetimeIndex([drawdown_line.index[-1]]))
periods = end_drawdown - begin_drawdown
max_drawdown_duration = periods.max().days if len(periods) > 0 else 0
max_drawdown = data['drawdown'].min()
cagr = total_return ** (1 / time_years) - 1 if time_years > 0 else 0
cagr_risk = cagr / time_in_the_market if time_in_the_market > 0 else 0
benchmark_total_return = data["cumulative_return"].iloc[-1]
benchmark_cagr = benchmark_total_return ** (1 / time_years) - 1 if time_years > 0 else 0
calmar_ratio = cagr / abs(max_drawdown) if max_drawdown != 0 else np.nan
# === TRADES METRIKY – bezpečné (pokud není trade) ===
if len(trades) > 0:
gross_profit = trades[trades > 0].sum()
gross_loss = abs(trades[trades < 0].sum())
profit_factor = gross_profit / gross_loss if gross_loss != 0 else np.inf
avg_win = trades[trades > 0].mean()
avg_loss = abs(trades[trades < 0].mean())
payoff_ratio = avg_win / avg_loss if avg_loss != 0 else np.inf
win_rate = (trades > 0).mean()
kelly_fraction = win_rate - ((1 - win_rate) / payoff_ratio)
kelly_fraction = float(np.clip(kelly_fraction, 0.0, 1.0))
else:
profit_factor = payoff_ratio = avg_win = avg_loss = win_rate = kelly_fraction = np.nan
average_time_to_trade = count / total_trades if total_trades > 0 else 0
# Sortino a Sharpe (zůstávají stejné, ale bezpečné)
TRW = 0
excess_returns = data["strategy_return"] - (TRW / 252)
downside_returns = np.where(excess_returns < 0, excess_returns, 0)
downside_dev = np.sqrt(np.mean(downside_returns ** 2))
annual_return = data["strategy_return"].mean() * 252
annual_downside_dev = downside_dev * np.sqrt(252)
sortino_ratio = (annual_return - TRW) / annual_downside_dev if annual_downside_dev != 0 else np.nan
annual_std = data["strategy_return"].std() * np.sqrt(252)
risk_reward = (annual_return - TRW) / annual_std if annual_std != 0 else np.nan
# === PRINTY – BEZPEČNÉ FORMÁTOVÁNÍ (žádné chyby) ===
payoff_str = f"{payoff_ratio:.2f}" if np.isfinite(payoff_ratio) else "N/A"
profit_factor_str = f"{profit_factor:.2f}" if np.isfinite(profit_factor) else ("Inf" if profit_factor == np.inf else "N/A")
win_rate_str = f"{win_rate:.2%}" if np.isfinite(win_rate) else "N/A"
calmar_str = f"{calmar_ratio:.2f}" if np.isfinite(calmar_ratio) else "N/A"
kelly_str = f"{kelly_fraction:.2f}" if np.isfinite(kelly_fraction) else "N/A"
sortino_str = f"{sortino_ratio:.2f}" if np.isfinite(sortino_ratio) else "N/A"
sharpe_str = f"{risk_reward:.2f}" if np.isfinite(risk_reward) else "N/A"
average_time_to_trade_str = f"{average_time_to_trade:.1f}" if total_trades > 0 else "N/A"
max_drawdown_duration_str = f"{max_drawdown_duration:.0f}" if max_drawdown_duration > 0 else "0"
sl_str = f"{sl_thresh:.1%}" if sl_thresh is not None else "None"
maxhold_str = f"{max_holding_days}d" if max_holding_days is not None else "None"
cooldown_str = f"{cooldown_days}d" if cooldown_days is not None else "None"
print(f"[Strategy time in the market: {time_in_the_market:.2%}]")
print(f"[Total Trades: {total_trades}] [Average duration of trade: {average_time_to_trade_str} Days]")
print(f"[Payoff Ratio: {payoff_str}] [Profit Factor: {profit_factor_str}] [Win Rate: {win_rate_str}]")
print(f"[CAGR of strategy: {cagr:.2%}] [Risk-adjusted CAGR: {cagr_risk:.2%}] [Benchmark CAGR: {benchmark_cagr:.2%}]")
print(f"[Max Drawdown: {max_drawdown:.2%}] [Calmar Ratio: {calmar_str}] [Max DD Duration: {max_drawdown_duration_str} days]")
print(f"[Kelly Criterion: {kelly_str}] [Sortino Ratio: {sortino_str}] [Sharpe Ratio: {sharpe_str}]")
plt.figure(figsize=(24, 12))
plt.plot(data.index, data['cumulative_return'], label='Cumulative Benchmark Return', linewidth=2)
plt.plot(data.index, data['strategy_cumulative_return'], label='Cumulative Strategy Return', linewidth=2)
plt.title(f"Backtest | QQQ Panic Selling | SL {sl_str} | MaxHold {maxhold_str} | Cool {cooldown_str}")
plt.xlabel('Time')
plt.ylabel('Cumulative Return')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
plt.figure(figsize=(24, 4))
plt.plot(data.index, data['drawdown'], label='Drawdown', color='red', linewidth=2)
plt.title(f"Drawdown")
plt.xlabel('Time')
plt.ylabel('Drawdown')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# region imports
from AlgorithmImports import *
# endregion
class ContractFiltration:
"""Třída pro filtraci a výběr futures kontraktů."""
def __init__(self, algorithm):
"""Inicializace filtrace.
Args:
algorithm: Instance QCAlgorithm
"""
self.algorithm = algorithm
self.futures = {} # Dictionary pro uložení futures objektů
self.active_contracts = {} # Dictionary pro aktivní kontrakty
self.variable_mapping = {} # Mapování canonical -> název proměnné v algoritmu
def add_futures_contracts(self, future_list, resolution=Resolution.MINUTE, variable_names=None):
"""Přidání futures kontraktů do algoritmu s filtrací.
Args:
future_list: Seznam futures symbolů (např. [Futures.Indices.NASDAQ_100_E_MINI])
resolution: Časové rozlišení dat
variable_names: Seznam názvů proměnných (volitelné). Pokud není zadáno, použije se ticker symbol.
"""
for i, future_ticker in enumerate(future_list):
# Přidání futures do algoritmu
future = self.algorithm.add_future(future_ticker, resolution)
# Nastavení vlastního filtru
future.set_filter(self._contract_filter)
# Uložení canonical symbolu
canonical = future.symbol
self.futures[canonical] = future
self.active_contracts[canonical] = None
# Určení názvu proměnné
if variable_names and i < len(variable_names):
# Použít uživatelem zadaný název
var_name = variable_names[i]
else:
# Automaticky extrahovat ticker symbol z canonical symbolu
var_name = self._extract_ticker_name(canonical)
# Mapování na proměnnou v algoritmu
self.variable_mapping[canonical] = var_name
setattr(self.algorithm, var_name, None)
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Přidán futures: {canonical} -> self.{var_name}")
def _extract_ticker_name(self, canonical_symbol):
"""Extrahuje ticker název z canonical symbolu.
Args:
canonical_symbol: Canonical symbol futures
Returns:
Ticker název (např. 'NQ', 'MGC', 'ES')
"""
# Canonical symbol má formát např. "NQ" nebo "MGC"
# Value vrací ticker string
ticker = str(canonical_symbol.value)
# Odstranit případné whitespace a vrátit uppercase
return ticker.strip().upper()
def _contract_filter(self, future_filter_universe):
"""Filtr pro výběr futures kontraktů.
Filtruje kontrakty s expirací 60-120 dní a vybere nejbližší.
Args:
future_filter_universe: Universe futures kontraktů
Returns:
Filtrované kontrakty
"""
# Filtrace podle expirace: 60 až 120 dní
return future_filter_universe.expiration(1, 120)
def get_active_contract(self, canonical_symbol):
"""Získání aktivního kontraktu pro daný canonical symbol.
Args:
canonical_symbol: Canonical symbol futures
Returns:
Symbol aktivního kontraktu nebo None
"""
return self.active_contracts.get(canonical_symbol)
def update_active_contracts(self, data):
"""Aktualizace aktivních kontraktů z dat.
DŮLEŽITÉ: Pokud je na kontraktu otevřená pozice, kontrakt se NE-MĚNÍ
až do jejího uzavření. Tím se vyhne problémům s obchodováním na různých
kontraktech.
Args:
data: Slice objekt s aktuálními daty
"""
for canonical_symbol in self.futures.keys():
current_active = self.active_contracts[canonical_symbol]
if current_active is None:
ticker = self._extract_ticker_name(canonical_symbol)
for symbol in self.algorithm.portfolio.keys():
if (symbol.security_type == SecurityType.FUTURE and
ticker in str(symbol) and
self.algorithm.portfolio[symbol].invested):
current_active = symbol
self.active_contracts[canonical_symbol] = symbol
self.algorithm.debug(f"RESTART: Synchronizován s pozicí {symbol}")
break
if current_active is not None:
if self.algorithm.portfolio[current_active].invested:
continue
# Zkontrolovat, zda máme futures chain data
if data.future_chains.contains_key(canonical_symbol):
chain = data.future_chains[canonical_symbol]
if len(chain) > 0:
valid_contracts = [c for c in chain
if 60 <= (c.expiry.date() - data.time.date()).days <= 120]
if valid_contracts:
# Vybrat kontrakt s nejbližší expirací
sorted_contracts = sorted(valid_contracts, key=lambda x: x.expiry)
nearest_contract = sorted_contracts[0]
# Aktualizovat pouze pokud se změnil
if self.active_contracts[canonical_symbol] != nearest_contract.symbol:
old_contract = self.active_contracts[canonical_symbol]
self.active_contracts[canonical_symbol] = nearest_contract.symbol
# Aktualizovat proměnnou v algoritmu, pokud je namapována
if canonical_symbol in self.variable_mapping:
var_name = self.variable_mapping[canonical_symbol]
setattr(self.algorithm, var_name, nearest_contract.symbol)
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Aktualizován self.{var_name} = {nearest_contract.symbol}")
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Aktualizován aktivní kontrakt pro {canonical_symbol} Starý: {old_contract} Nový: {nearest_contract.symbol} Expirace: {nearest_contract.expiry}")
self.algorithm.debug(50 * "-")
#else:
#self.algorithm.debug(f"Žádné kontrakty pro {canonical_symbol}")
else:
# Pokud nejsou futures chain data, zkus najít z securities
self._fallback_update_contract(canonical_symbol)
def _fallback_update_contract(self, canonical_symbol):
"""Fallback metoda pro nalezení aktivního kontraktu ze securities.
Args:
canonical_symbol: Canonical symbol
"""
# Pokud už máme aktivní kontrakt, nemusíme hledat znovu
if self.active_contracts[canonical_symbol] is not None:
return
ticker = self._extract_ticker_name(canonical_symbol)
# Hledání v securities
matching_contracts = []
for symbol in self.algorithm.securities.keys():
if symbol.security_type == SecurityType.FUTURE and ticker in str(symbol):
# Získat expiraci pokud je dostupná
if hasattr(symbol.id, 'date'):
matching_contracts.append((symbol, symbol.id.date))
else:
matching_contracts.append((symbol, None))
if matching_contracts:
# Seřadit podle expirace a vybrat nejbližší
contracts_with_dates = [c for c in matching_contracts if c[1] is not None]
if contracts_with_dates:
contracts_with_dates.sort(key=lambda x: x[1])
selected = contracts_with_dates[0][0]
else:
# Pokud nemáme datum, vezmi první
selected = matching_contracts[0][0]
self.active_contracts[canonical_symbol] = selected
# Aktualizovat proměnnou v algoritmu
if canonical_symbol in self.variable_mapping:
var_name = self.variable_mapping[canonical_symbol]
setattr(self.algorithm, var_name, selected)
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Fallback: Nastaven self.{var_name} = {selected}")
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Fallback: Nalezen kontrakt pro {canonical_symbol}: {selected}")
def get_all_active_contracts(self):
"""Získání všech aktivních kontraktů.
Returns:
Dictionary {canonical_symbol: active_contract_symbol}
"""
return {k: v for k, v in self.active_contracts.items() if v is not None}
def is_contract_active(self, symbol):
"""Kontrola, zda je kontrakt aktivní.
Args:
symbol: Symbol kontraktu ke kontrole
Returns:
True pokud je kontrakt aktivní
"""
return symbol in self.active_contracts.values()
# region imports
from AlgorithmImports import *
# endregion
class DataHandler:
"""Třída pro správu a ukládání tržních dat."""
def __init__(self, algorithm):
"""Inicializace data handleru.
Args:
algorithm: Instance QCAlgorithm
"""
self.algorithm = algorithm
self._daily_bars = {} # Dictionary pro uložení aktuálních daily barů
self._history_data = {} # Dictionary pro uložení historických dat
self._minute_consolidators = {} # Dictionary pro minute consolidatory
self._intraday_bars = {} # Dictionary pro intra-day daily bary (9:31-16:00)
self._last_bar_date = {} # Tracking posledního data pro detekci nového dne
def subscribe_daily_data(self, canonical_symbol, history_days=199):
"""Subscribe daily data pro daný canonical symbol a stáhne historii.
Args:
canonical_symbol: Canonical symbol (continuous contract) pro subscribe
history_days: Počet dní history dat k stažení
"""
# Inicializace pro tento symbol
self._daily_bars[canonical_symbol] = None
self._history_data[canonical_symbol] = []
# Stažení historických dat Z CANONICAL SYMBOLU
self._load_history(canonical_symbol, history_days)
# Inicializace intra-day baru pro tento symbol
self._intraday_bars[canonical_symbol] = None
self._last_bar_date[canonical_symbol] = None
# Vytvoření MINUTE consolidatoru pro sledování CANONICAL SYMBOLU
# Minute bary použijeme pro konstrukci intra-day daily baru (9:31-16:00)
consolidator = TradeBarConsolidator(timedelta(minutes=1))
consolidator.data_consolidated += lambda sender, bar: self._on_minute_bar_updated(canonical_symbol, bar)
# Registrace consolidatoru NA CANONICAL SYMBOL
self.algorithm.subscription_manager.add_consolidator(canonical_symbol, consolidator)
self._minute_consolidators[canonical_symbol] = consolidator
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Subscribed minute data pro canonical symbol {canonical_symbol}")
def _load_history(self, canonical_symbol, days):
"""Načtení historických dat z QuantConnect pro canonical symbol.
Args:
canonical_symbol: Canonical symbol (continuous contract) pro načtení historie
days: Počet dní k načtení
"""
try:
# Stažení historických daily barů Z CANONICAL SYMBOLU (continuous contract)
# To zajistí konzistentní historická data
history = self.algorithm.history[TradeBar](canonical_symbol, days, Resolution.DAILY)
if history:
# Převod na list trade barů
self._history_data[canonical_symbol] = list(history)
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Načteno {len(self._history_data[canonical_symbol])} historických barů pro canonical {canonical_symbol}")
if len(self._history_data[canonical_symbol]) > 0:
last_bar = self._history_data[canonical_symbol][-1]
if not self.algorithm.is_warming_up:
self.algorithm.debug(f" Poslední bar: Close={last_bar.close:.2f}, High={last_bar.high:.2f}, Low={last_bar.low:.2f}")
else:
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Žádná historická data pro {canonical_symbol}")
self._history_data[canonical_symbol] = []
except Exception as e:
self.algorithm.debug(f"Data handler: Chyba při načítání historie pro {canonical_symbol}: {str(e)}")
self._history_data[canonical_symbol] = []
def _on_minute_bar_updated(self, canonical_symbol, minute_bar):
"""Callback při aktualizaci minute baru z canonical symbolu.
Konstruuje intra-day daily bar (9:31-16:00) který se aktualizuje každou minutu.
Args:
canonical_symbol: Canonical symbol (continuous contract)
minute_bar: Minute bar z canonical symbolu
"""
current_time = minute_bar.time.time()
current_date = minute_bar.time.date()
# Zkontrolovat, zda je čas v rozmezí 9:31-16:00 (obchodní hodiny)
market_open = time(9, 31)
market_close = time(16, 0)
if current_time < market_open or current_time > market_close:
return # Mimo obchodní hodiny
# Detekce nového dne - uložit včerejší bar do historie
if self._last_bar_date.get(canonical_symbol) != current_date:
# Nový den - uložit včerejší bar do historie (pokud existuje)
if self._intraday_bars.get(canonical_symbol) is not None:
yesterday_bar = self._intraday_bars[canonical_symbol]
if not self.algorithm.is_warming_up:
self.algorithm.debug(f" Saving Yesterday BAR: Close={yesterday_bar.close:.2f}, High={yesterday_bar.high:.2f}, Low={yesterday_bar.low:.2f}")
if not self.algorithm.is_warming_up:
self.algorithm.debug(50 * "-")
# Přidat do historie
self._history_data[canonical_symbol].append(yesterday_bar)
# Udržení pouze posledních 200 barů
if len(self._history_data[canonical_symbol]) > 250:
self._history_data[canonical_symbol].pop(0)
# Začít nový intra-day bar
self._intraday_bars[canonical_symbol] = TradeBar(
minute_bar.time,
canonical_symbol,
minute_bar.open,
minute_bar.high,
minute_bar.low,
minute_bar.close,
minute_bar.volume
)
self._last_bar_date[canonical_symbol] = current_date
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Vytvořen nový intra-day bar pro {canonical_symbol}")
else:
# Stejný den - aktualizovat existující intra-day bar
if self._intraday_bars.get(canonical_symbol) is not None:
intraday_bar = self._intraday_bars[canonical_symbol]
# Aktualizovat high, low, close
if minute_bar.high > intraday_bar.high:
intraday_bar.high = minute_bar.high
if minute_bar.low < intraday_bar.low:
intraday_bar.low = minute_bar.low
intraday_bar.close = minute_bar.close
intraday_bar.volume += minute_bar.volume
# Uložit aktualizovaný bar
self._intraday_bars[canonical_symbol] = intraday_bar
# Aktualizovat daily bar (pro kompatibilitu s existujícím kódem)
self._daily_bars[canonical_symbol] = self._intraday_bars[canonical_symbol]
def get_yesterday_bar(self, canonical_symbol):
"""Získání včerejšího konsolidovaného baru.
Args:
canonical_symbol: Symbol
Returns:
TradeBar včerejšího dne nebo None
"""
history = self._history_data.get(canonical_symbol, [])
if len(history) >= 1:
yesterday_bar = history[-1]
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Yesterday bar - High: {yesterday_bar.high:.2f}, Close: {yesterday_bar.close:.2f}")
return yesterday_bar
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Nedostatek dat pro yesterday bar. Dostupných barů: {len(history)}")
return None
def get_yesterday_close(self, canonical_symbol):
"""Získání včerejšího close.
Args:
canonical_symbol: Symbol
Returns:
float včerejšího close nebo None
"""
yesterday_bar = self.get_yesterday_bar(canonical_symbol)
if yesterday_bar:
return yesterday_bar.close
return None
def get_day_before_yesterday_close(self, canonical_symbol):
"""Získání close předvčerejšího dne.
Args:
canonical_symbol: Symbol
Returns:
float předvčerejšího close nebo None
"""
history = self._history_data.get(canonical_symbol, [])
if len(history) >= 2:
day_before_yesterday_bar = history[-2]
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Day before yesterday close: {day_before_yesterday_bar.close:.2f}")
return day_before_yesterday_bar.close
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"Data handler: Nedostatek dat pro day before yesterday. Dostupných barů: {len(history)}")
return None
def get_current_daily_bar(self, canonical_symbol):
"""Získání aktuálního daily baru.
Args:
canonical_symbol: Symbol pro získání baru
Returns:
TradeBar nebo None pokud není k dispozici
"""
return self._daily_bars.get(canonical_symbol)
def get_history_data(self, canonical_symbol, bars=199):
"""Získání historických dat.
Args:
canonical_symbol: Symbol pro získání historie
bars: Počet barů k vrácení (default 199)
Returns:
List trade barů
"""
history = self._history_data.get(canonical_symbol, [])
# Vrátit pouze požadovaný počet posledních barů
return history[-bars:] if len(history) > bars else history
def get_all_daily_data(self, canonical_symbol):
"""Získání všech dostupných daily dat (historie + aktuální bar).
Args:
canonical_symbol: Symbol pro získání dat
Returns:
List všech dostupných trade barů
"""
return self._history_data.get(canonical_symbol, [])
# region imports
from AlgorithmImports import *
# endregion
class IndicatorManager:
"""Třída pro správu indikátorů."""
def __init__(self, algorithm, data_handler):
"""Inicializace indicator manageru.
Args:
algorithm: Instance QCAlgorithm
data_handler: Instance DataHandler pro přístup k datům
"""
self.algorithm = algorithm
self.data_handler = data_handler
self._indicators = {} # Dictionary pro všechny indikátory
def _calculate_sma(self, data, period, price_type='close'):
if len(data) < period:
return None
last_bars = data[-period:]
if price_type == 'close':
values = [bar.close for bar in last_bars]
elif price_type == 'high':
values = [bar.high for bar in last_bars]
elif price_type == 'low':
values = [bar.low for bar in last_bars]
else:
return None
return sum(values) / len(values)
def _calculate_atr(self, data, period):
if len(data) < period + 1:
return None
true_ranges = []
for i in range(len(data) - period, len(data)):
if i == 0:
tr = data[i].high - data[i].low
else:
high_low = data[i].high - data[i].low
high_close = abs(data[i].high - data[i-1].close)
low_close = abs(data[i].low - data[i-1].close)
tr = max(high_low, high_close, low_close)
true_ranges.append(tr)
return sum(true_ranges) / len(true_ranges)
def _calculate_adx(self, data, period):
if len(data) < period * 2:
return None
plus_dm = []
minus_dm = []
tr_list = []
for i in range(1, len(data)):
high_diff = data[i].high - data[i-1].high
low_diff = data[i-1].low - data[i].low
plus_dm.append(high_diff if high_diff > low_diff and high_diff > 0 else 0)
minus_dm.append(low_diff if low_diff > high_diff and low_diff > 0 else 0)
high_low = data[i].high - data[i].low
high_close = abs(data[i].high - data[i-1].close)
low_close = abs(data[i].low - data[i-1].close)
tr_list.append(max(high_low, high_close, low_close))
if len(plus_dm) < period or len(tr_list) < period:
return None
plus_di = sum(plus_dm[-period:]) / sum(tr_list[-period:]) * 100 if sum(tr_list[-period:]) > 0 else 0
minus_di = sum(minus_dm[-period:]) / sum(tr_list[-period:]) * 100 if sum(tr_list[-period:]) > 0 else 0
dx = abs(plus_di - minus_di) / (plus_di + minus_di) * 100 if (plus_di + minus_di) > 0 else 0
return dx
def _calculate_ibs(self, bar):
if bar is None:
return None
range_value = bar.high - bar.low
if range_value == 0:
return 0.5
ibs = (bar.close - bar.low) / range_value
return ibs
def _calculate_rsi(self, data, period):
if len(data) < period + 1:
return None
gains = []
losses = []
for i in range(len(data) - period, len(data)):
if i == 0:
continue
change = data[i].close - data[i-1].close
if change > 0:
gains.append(change)
losses.append(0)
else:
gains.append(0)
losses.append(abs(change))
if len(gains) == 0:
return None
avg_gain = sum(gains) / len(gains)
avg_loss = sum(losses) / len(losses)
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_all_indicators(self, canonical_symbol):
all_data = self.data_handler.get_all_daily_data(canonical_symbol)
current_bar = self.data_handler.get_current_daily_bar(canonical_symbol)
indicators = {
'sma_190': None,
'sma_200': None,
'sma_10': None,
'sma_low_10': None,
'sma_high_10': None,
'sma_close_10': None,
'tp_10': None,
'atr_10': None,
'adx_10': None,
'ibs': None,
'rsi_5': None
}
if len(all_data) < 10:
return indicators
# SMA 190 and 200
if len(all_data) >= 190:
indicators['sma_190'] = self._calculate_sma(all_data, 190, 'close')
if len(all_data) >= 200:
indicators['sma_200'] = self._calculate_sma(all_data, 200, 'close')
# SMA 10
indicators['sma_10'] = self._calculate_sma(all_data, 10, 'close')
indicators['sma_low_10'] = self._calculate_sma(all_data, 10, 'low')
indicators['sma_high_10'] = self._calculate_sma(all_data, 10, 'high')
indicators['sma_close_10'] = self._calculate_sma(all_data, 10, 'close')
# TP10 = (SMA.low(10) + SMA.high(10) + SMA.close(10)) / 3
if all([indicators['sma_low_10'], indicators['sma_high_10'], indicators['sma_close_10']]):
indicators['tp_10'] = (indicators['sma_low_10'] + indicators['sma_high_10'] + indicators['sma_close_10']) / 3
# ATR(10)
indicators['atr_10'] = self._calculate_atr(all_data, 10)
# ADX(10)
indicators['adx_10'] = self._calculate_adx(all_data, 10)
# IBS - na aktuálním baru nebo posledním dostupném
bar_for_ibs = current_bar if current_bar else (all_data[-1] if all_data else None)
indicators['ibs'] = self._calculate_ibs(bar_for_ibs)
# RSI(5)
if len(all_data) >= 6:
indicators['rsi_5'] = self._calculate_rsi(all_data, 5)
# Uložení indikátorů
self._indicators[canonical_symbol] = indicators
return indicators
def get_indicators(self, canonical_symbol):
"""Získání naposledy vypočítaných indikátorů.
Args:
canonical_symbol: Symbol
Returns:
Dictionary indikátorů nebo None
"""
return self._indicators.get(canonical_symbol)
def update_all_indicators(self):
"""Aktualizace všech indikátorů pro všechny symboly.
Returns:
Dictionary s vypočítanými indikátory
"""
results = {}
for canonical_symbol in self.data_handler._daily_bars.keys():
indicators = self.calculate_all_indicators(canonical_symbol)
results[canonical_symbol] = indicators
return results
from AlgorithmImports import *
# import data
from contract_filtration import ContractFiltration
from data_handling import DataHandler
from indicators import IndicatorManager
from trading_logic import TradingLogic
from risk_managment import RiskManager
class AT_01(QCAlgorithm):
def initialize(self):
"""Settings of Alghoritm"""
self.set_start_date(2020, 1, 1)
self.set_end_date(2026, 1, 1)
self.set_cash(10000)
self.set_benchmark("QQQ")
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH)
self.set_security_initializer(lambda security: security.set_fee_model(InteractiveBrokersFeeModel()))
self.settings.seed_initial_prices = True
resolution_data = Resolution.MINUTE
resolution_history = Resolution.DAILY
""" DONT LIquidate alghoritm at any cost ! You already lost part of profit 2 times !!! """
"""
Selection of Future chains contrakts
write future in to self.future_list and code will automatically (daily at open) create a variable self.[TICKER] with selected contract
Example:
self.MNQ - active contract for Micro Nasdaq 100 E-mini
self.MGC - active contract for Micro Gold
"""
self.future_list = [Futures.Indices.MICRO_NASDAQ_100_E_MINI, Futures.Metals.MICRO_GOLD, Futures.Indices.MICRO_SP_500_E_MINI]
self.contract_filter = ContractFiltration(self)
self.contract_filter.add_futures_contracts(self.future_list, resolution_data)
# Inicializace Data Handler pro daily data
self.data_handler = DataHandler(self)
# Subscribe daily data pro CANONICAL symboly (continuous contracts)
# Všechna data (historická i aktuální) budou z canonical symbolů
# To zajistí konzistenci - neporovnáváme dnešní close jednoho kontraktu s včerejším high jiného
for canonical_symbol in self.contract_filter.futures.keys():
self.data_handler.subscribe_daily_data(canonical_symbol, history_days=250)
self.debug(f"Subscribed daily data for: {canonical_symbol}")
# Subscribe TLT pro MGC strategy 1
self.tlt = self.add_equity("TLT", Resolution.DAILY).symbol
# Inicializace Indicator Manager
self.indicator_manager = IndicatorManager(self, self.data_handler)
# Inicializace Risk Manager
self.risk_manager = RiskManager(self)
# Inicializace Trading Logic
self.trading_logic = TradingLogic(self, self.data_handler, self.indicator_manager, self.risk_manager)
self.set_warm_up(timedelta(days=300))
"""Event Scheldues part"""
self.schedule.on(self.date_rules.every_day(),
self.time_rules.after_market_open("SPY", 0),
self._update_active_contracts)
self.schedule.on(self.date_rules.every_day(),
self.time_rules.after_market_open("SPY", 1),
self._execute_mes_entry)
for hour in range(10, 17):
self.schedule.on(self.date_rules.every_day(),
self.time_rules.at(hour, 0),
self._hourly_logging)
self.schedule.on(self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", 5),
self._hourly_logging)
self.schedule.on(self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", 10),
self._hourly_logging)
# Check stop conditions every hour
for hour in range(10, 17):
self.schedule.on(self.date_rules.every_day(),
self.time_rules.at(hour, 30),
self._check_risk_stops)
self.schedule.on(self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", 5),
self._execute_trading)
self.schedule.on(self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", 5),
self._execute_mes_exit)
# Dictionaries, varianbles
self._last_data_slice = None
"""
DATA FLOW ARCHITECTURE:
1. Canonical symboly (MNQ, MGC) = zdroj pro:
- Historická data
- Všechny indikátory (SMA, ATR, ADX, IBS, TP)
- Yesterday high/low/close (z intra-day barů 9:31-16:00)
- Intra-day bary se aktualizují každou minutu během obchodního dne
2. Konkrétní kontrakty (active_contract) = zdroj pro:
- Aktuální LIVE ceny pro trading signály
- Exekuci obchodů (set_holdings, liquidate)
3. Výhody:
- Konzistentní indikátory (z continuous contract)
- Aktuální ceny bez stale price warnings (z live kontraktu)
- Indikátory se aktualizují průběžně (každou hodinu)
To DO: Risk managment/sizing module and in INITIALIZATION add risk mesaures as SL, Max days hold, re-entry days.
"""
def on_data(self, data):
self._last_data_slice = data
# Aktualizace aktivních kontraktů, pokud jsou dostupná futures chain data
if data.future_chains.count > 0:
self.contract_filter.update_active_contracts(data)
def _update_active_contracts(self):
"""Actualization of contracts every day."""
# Will use last slice of data
if self._last_data_slice:
self.contract_filter.update_active_contracts(self._last_data_slice)
def _update_indicators(self):
"""Denní aktualizace indikátorů."""
if not self.is_warming_up:
self.debug("=== Hodinová aktualizace indikátorů ===")
all_indicators = self.indicator_manager.update_all_indicators()
# Log všech indikátorů
for symbol, indicators in all_indicators.items():
if not self.is_warming_up:
self.debug(f"Indikátory pro {symbol}:")
for name, value in indicators.items():
if value is not None:
self.debug(f" {name}: {value:.4f}")
def _hourly_logging(self):
"""Hodinový logging indikátorů a cen."""
if not self.is_warming_up:
self.debug(f"=== Hodinový Report {self.time} ===")
# Helper funkce pro formátování indikátorů
def format_indicator(value):
return f"{value:.2f}" if isinstance(value, (int, float)) else 'N/A'
# Aktualizace indikátorů
self.indicator_manager.update_all_indicators()
# Log indikátorů pro všechny symboly
for canonical_symbol in self.data_handler._daily_bars.keys():
indicators = self.indicator_manager.get_indicators(canonical_symbol)
# Získání aktivního kontraktu
active_contract = self.contract_filter.get_active_contract(canonical_symbol)
# Aktuální cena
current_price = None
if active_contract and self.securities.contains_key(active_contract):
current_price = self.securities[active_contract].price
if active_contract and current_price and not self.is_warming_up:
self.debug(f"Symbol: {canonical_symbol} | Aktivní kontrakt: {active_contract} | Aktuální Close: {current_price:.2f}")
if indicators:
sma_190 = format_indicator(indicators.get('sma_190', 'N/A'))
sma_10 = format_indicator(indicators.get('sma_10', 'N/A'))
tp_10 = format_indicator(indicators.get('tp_10', 'N/A'))
atr_10 = format_indicator(indicators.get('atr_10', 'N/A'))
adx_10 = format_indicator(indicators.get('adx_10', 'N/A'))
ibs = format_indicator(indicators.get('ibs', 'N/A'))
if not self.is_warming_up:
self.debug(f" SMA(190): {sma_190} | SMA(10): {sma_10} | TP(10): {tp_10} | ATR(10): {atr_10}")
self.debug(f" ADX(10): {adx_10} | IBS: {ibs}")
if not self.is_warming_up:
self.debug(50 * "-")
def _execute_trading(self):
"""Spuštění trading logiky před market close."""
self.debug(f"=== Spouštím trading logiku před close {self.time} ===")
# Pokus o aktualizaci kontraktů
if self._last_data_slice:
self.contract_filter.update_active_contracts(self._last_data_slice)
# Fallback aktualizace kontraktů pokud nejsou nastaveny
for canonical_symbol in self.contract_filter.futures.keys():
if self.contract_filter.get_active_contract(canonical_symbol) is None:
self.contract_filter._fallback_update_contract(canonical_symbol)
# Aktualizace indikátorů před tradingem
self.indicator_manager.update_all_indicators()
# Spuštění trading logiky
self.trading_logic.execute_trading_logic()
def _execute_mes_entry(self):
"""Spuštění MES entry logiky po market open."""
self.debug(f"=== Spouštím MES entry logiku po open {self.time} ===")
# Aktualizace kontraktů
if self._last_data_slice:
self.contract_filter.update_active_contracts(self._last_data_slice)
# Fallback aktualizace kontraktů
for canonical_symbol in self.contract_filter.futures.keys():
if self.contract_filter.get_active_contract(canonical_symbol) is None:
self.contract_filter._fallback_update_contract(canonical_symbol)
# Aktualizace indikátorů
self.indicator_manager.update_all_indicators()
# Spuštění MES entry logiky
self.trading_logic.execute_mes_entry_logic()
def _execute_mes_exit(self):
"""Spuštění MES exit logiky před market close."""
self.debug(f"=== Spouštím MES exit logiku před close {self.time} ===")
# Spuštění MES exit logiky
self.trading_logic.execute_mes_exit_logic()
def _check_risk_stops(self):
"""Kontrola risk management stop podmínek."""
symbols_to_exit = self.risk_manager.check_stop_conditions()
if symbols_to_exit:
self.debug(f"=== Risk Manager liquidating {len(symbols_to_exit)} positions ===")
self.risk_manager.liquidate_positions(symbols_to_exit)
def on_order_event(self, order_event):
"""Loging/Debuging of Orders"""
if order_event.status == OrderStatus.FILLED:
self.debug(f"Order Filled: {order_event.symbol} - {order_event.fill_quantity} @ ${order_event.fill_price}")
elif order_event.status == OrderStatus.CANCELED:
self.debug(f"Order Canceled: {order_event.symbol}")
elif order_event.status == OrderStatus.INVALID:
self.debug(f"Order Invalid: {order_event.symbol} - {order_event.message}")
def on_brokerage_message(self, message):
"""Will Debug/Log Brokers message"""
self.log(f"BROKERAGE MESSAGE: {message}")
self.debug(f"BROKERAGE MESSAGE: {message}")# region imports
from AlgorithmImports import *
from datetime import datetime, timedelta
# endregion
class RiskManager:
"""Risk management module for position sizing, stop losses, and max days in trade."""
def __init__(self, algorithm):
"""Initialize risk manager.
Args:
algorithm: Instance QCAlgorithm
"""
self.algorithm = algorithm
# Risk parameters
self.min_exposure_per_trade = 1.0 # Minimum exposure per trade in dollars
self.use_stop_loss = True # Enable/disable stop loss
self.stop_loss_percentage = 0.05 # 5% stop loss
self.use_max_days_in_trade = True # Enable/disable max days stop
self.max_days_in_trade = 10 # Maximum days to hold a position
# Position tracking
self._position_entry_prices = {} # {symbol: entry_price}
self._position_entry_dates = {} # {symbol: entry_date}
self._position_directions = {} # {symbol: 1 for long, -1 for short}
self.algorithm.debug("Risk Manager initialized")
self.algorithm.debug(f"Stop Loss: {self.use_stop_loss} ({self.stop_loss_percentage*100}%)")
self.algorithm.debug(f"Max Days in Trade: {self.use_max_days_in_trade} ({self.max_days_in_trade} days)")
def calculate_position_size(self, symbol, target_allocation):
"""Calculate position size ensuring minimum exposure.
Args:
symbol: Symbol to trade
target_allocation: Target allocation as percentage (-1.0 to 1.0)
Returns:
Number of contracts to trade (can be negative for shorts)
"""
if not self.algorithm.securities.contains_key(symbol):
self.algorithm.debug(f"Risk Manager: Symbol {symbol} not in securities")
return 0
security = self.algorithm.securities[symbol]
current_price = security.price
if current_price == 0:
self.algorithm.debug(f"Risk Manager: Price is 0 for {symbol}")
return 0
# Get contract multiplier
if security.symbol_properties:
contract_multiplier = security.symbol_properties.contract_multiplier
else:
contract_multiplier = 1
# Calculate target value based on allocation
portfolio_value = self.algorithm.portfolio.total_portfolio_value
target_value = abs(target_allocation) * portfolio_value
# Calculate contracts needed
contract_value = current_price * contract_multiplier
contracts_needed = target_value / contract_value
# Ensure at least 1 contract if exposure would be less than minimum
if contracts_needed < 1:
# Check if 1 contract meets minimum exposure
one_contract_exposure = contract_value
if one_contract_exposure >= self.min_exposure_per_trade:
contracts_needed = 1
else:
# Calculate minimum contracts for minimum exposure
contracts_needed = max(1, int(self.min_exposure_per_trade / contract_value) + 1)
else:
contracts_needed = int(contracts_needed)
# Apply direction (negative for shorts)
if target_allocation < 0:
contracts_needed = -contracts_needed
self.algorithm.debug(f"Risk Manager: {symbol}")
self.algorithm.debug(f" Target Allocation: {target_allocation*100:.2f}%")
self.algorithm.debug(f" Price: ${current_price:.2f}")
self.algorithm.debug(f" Contract Multiplier: {contract_multiplier}")
self.algorithm.debug(f" Contract Value: ${contract_value:.2f}")
self.algorithm.debug(f" Contracts: {contracts_needed}")
self.algorithm.debug(f" Total Exposure: ${abs(contracts_needed * contract_value):.2f}")
return int(contracts_needed)
def set_holdings_with_risk_management(self, symbol, target_allocation):
"""Set holdings with risk management (position sizing, stop loss tracking).
Args:
symbol: Symbol to trade
target_allocation: Target allocation as percentage (-1.0 to 1.0)
"""
# Calculate position size
quantity = self.calculate_position_size(symbol, target_allocation)
if quantity == 0:
self.algorithm.debug(f"Risk Manager: No position for {symbol} (quantity = 0)")
return
# Execute the order
self.algorithm.set_holdings(symbol, target_allocation)
# Track entry for stop loss and max days
if not self.algorithm.portfolio[symbol].invested:
return
current_price = self.algorithm.securities[symbol].price
self._position_entry_prices[symbol] = current_price
self._position_entry_dates[symbol] = self.algorithm.time
self._position_directions[symbol] = 1 if quantity > 0 else -1
self.algorithm.debug(f"Risk Manager: Position opened for {symbol}")
self.algorithm.debug(f" Entry Price: ${current_price:.2f}")
self.algorithm.debug(f" Entry Date: {self.algorithm.time}")
self.algorithm.debug(f" Direction: {'LONG' if quantity > 0 else 'SHORT'}")
def check_stop_conditions(self):
"""Check all open positions for stop loss and max days conditions.
Returns:
List of symbols that should be liquidated
"""
symbols_to_liquidate = []
for symbol in list(self._position_entry_prices.keys()):
# Check if still invested
if not self.algorithm.portfolio[symbol].invested:
# Clean up tracking
self._cleanup_position(symbol)
continue
should_exit = False
exit_reason = ""
# Check stop loss
if self.use_stop_loss:
if self._check_stop_loss(symbol):
should_exit = True
exit_reason = "Stop Loss"
# Check max days in trade
if self.use_max_days_in_trade and not should_exit:
if self._check_max_days_in_trade(symbol):
should_exit = True
exit_reason = "Max Days in Trade"
if should_exit:
self.algorithm.debug(f"*** RISK MANAGER EXIT: {symbol} - {exit_reason} ***")
symbols_to_liquidate.append(symbol)
return symbols_to_liquidate
def _check_stop_loss(self, symbol):
"""Check if stop loss is triggered for a position.
Args:
symbol: Symbol to check
Returns:
True if stop loss triggered
"""
if symbol not in self._position_entry_prices:
return False
entry_price = self._position_entry_prices[symbol]
current_price = self.algorithm.securities[symbol].price
direction = self._position_directions.get(symbol, 1)
if direction == 1: # Long position
pnl_percentage = (current_price - entry_price) / entry_price
else: # Short position
pnl_percentage = (entry_price - current_price) / entry_price
if pnl_percentage <= -self.stop_loss_percentage:
self.algorithm.debug(f"Stop Loss triggered for {symbol}:")
self.algorithm.debug(f" Entry: ${entry_price:.2f}")
self.algorithm.debug(f" Current: ${current_price:.2f}")
self.algorithm.debug(f" Loss: {pnl_percentage*100:.2f}%")
return True
return False
def _check_max_days_in_trade(self, symbol):
"""Check if max days in trade is exceeded.
Args:
symbol: Symbol to check
Returns:
True if max days exceeded
"""
if symbol not in self._position_entry_dates:
return False
entry_date = self._position_entry_dates[symbol]
days_held = (self.algorithm.time - entry_date).days
if days_held >= self.max_days_in_trade:
self.algorithm.debug(f"Max Days in Trade exceeded for {symbol}:")
self.algorithm.debug(f" Entry Date: {entry_date}")
self.algorithm.debug(f" Days Held: {days_held}")
return True
return False
def _cleanup_position(self, symbol):
"""Clean up tracking for a closed position.
Args:
symbol: Symbol to clean up
"""
if symbol in self._position_entry_prices:
del self._position_entry_prices[symbol]
if symbol in self._position_entry_dates:
del self._position_entry_dates[symbol]
if symbol in self._position_directions:
del self._position_directions[symbol]
def liquidate_positions(self, symbols):
"""Liquidate positions and clean up tracking.
Args:
symbols: List of symbols to liquidate
"""
for symbol in symbols:
self.algorithm.liquidate(symbol)
self._cleanup_position(symbol)
def get_position_info(self, symbol):
"""Get information about an open position.
Args:
symbol: Symbol to get info for
Returns:
Dictionary with position info or None
"""
if symbol not in self._position_entry_prices:
return None
entry_price = self._position_entry_prices[symbol]
entry_date = self._position_entry_dates[symbol]
direction = self._position_directions[symbol]
current_price = self.algorithm.securities[symbol].price
days_held = (self.algorithm.time - entry_date).days
if direction == 1:
pnl_percentage = (current_price - entry_price) / entry_price
else:
pnl_percentage = (entry_price - current_price) / entry_price
return {
'entry_price': entry_price,
'entry_date': entry_date,
'current_price': current_price,
'direction': 'LONG' if direction == 1 else 'SHORT',
'days_held': days_held,
'pnl_percentage': pnl_percentage * 100
}
# region imports
from AlgorithmImports import *
# endregion
class TradingLogic:
"""Třída pro obchodní logiku."""
def __init__(self, algorithm, data_handler, indicator_manager, risk_manager):
"""Inicializace trading logic.
Args:
algorithm: Instance QCAlgorithm
data_handler: Instance DataHandler
indicator_manager: Instance IndicatorManager
risk_manager: Instance RiskManager
"""
self.algorithm = algorithm
self.data_handler = data_handler
self.indicator_manager = indicator_manager
self.risk_manager = risk_manager
self._order_q = 1
self._order_qmgc = 2
def execute_trading_logic(self):
"""Hlavní trading logika - voláno před market close."""
if self.algorithm.is_warming_up:
return
# Získání canonical symbolu pro MNQ
mnq_canonical = None
for canonical in self.data_handler._daily_bars.keys():
if 'MNQ' in str(canonical):
mnq_canonical = canonical
break
if mnq_canonical is None:
self.algorithm.debug("Trading Logic: MNQ canonical symbol nenalezen")
return
#self.algorithm.debug(f"Trading Logic: MNQ canonical = {mnq_canonical}")
# Získání aktivního kontraktu - zkusíme více způsobů
active_contract = self._get_active_contract(mnq_canonical)
if active_contract is None:
self.algorithm.debug("Trading Logic: Aktivní MNQ kontrakt nenalezen ani jedním způsobem")
return
#self.algorithm.debug(f"Trading Logic: Používám kontrakt {active_contract}")
# Sell logika - zkontrolovat nejdříve
self._check_sell_signal(mnq_canonical, active_contract)
# Buy logika
self._check_buy_signal(mnq_canonical, active_contract)
# === MGC TRADING LOGIC ===
# Získání canonical symbolu pro MGC
mgc_canonical = None
for canonical in self.data_handler._daily_bars.keys():
if 'MGC' in str(canonical):
mgc_canonical = canonical
break
if mgc_canonical is None:
self.algorithm.debug("Trading Logic: MGC canonical symbol nenalezen")
return
# Získání aktivního kontraktu pro MGC
mgc_active_contract = self._get_active_contract_mgc(mgc_canonical)
if mgc_active_contract is None:
self.algorithm.debug("Trading Logic: Aktivní MGC kontrakt nenalezen")
return
# MGC Sell a Buy logika
self._check_sell_signal_mgc(mgc_canonical, mgc_active_contract)
self._check_buy_signal_mgc(mgc_canonical, mgc_active_contract)
def _get_active_contract(self, canonical_symbol):
"""Získání aktivního kontraktu pomocí různých metod.
Args:
canonical_symbol: Canonical symbol
Returns:
Symbol aktivního kontraktu nebo None
"""
#self.algorithm.debug(f"Hledám aktivní kontrakt pro {canonical_symbol}")
# Metoda 1: Z contract filteru
active = self.algorithm.contract_filter.get_active_contract(canonical_symbol)
if active:
#self.algorithm.debug(f"✓ Kontrakt nalezen z contract_filter: {active}")
return active
else:
self.algorithm.debug("Contract_filter nevrátil žádný kontrakt")
# Metoda 2: Z invested holdings
self.algorithm.debug(f"Kontroluji holdings, celkem položek: {len(self.algorithm.portfolio)}")
for kvp in self.algorithm.portfolio:
security = kvp.value
if security.symbol.security_type == SecurityType.FUTURE:
#self.algorithm.debug(f" Našel futures security: {security.symbol}, invested={security.invested}")
if 'MNQ' in str(security.symbol) and security.invested:
#self.algorithm.debug(f"✓ Kontrakt nalezen z holdings: {security.symbol}")
return security.symbol
# Metoda 3: Ze subscribed securities
#self.algorithm.debug(f"Kontroluji securities, celkem: {len(self.algorithm.securities)}")
mnq_contracts = []
for symbol in self.algorithm.securities.keys():
if symbol.security_type == SecurityType.FUTURE:
#self.algorithm.debug(f" Našel futures: {symbol}")
if 'MNQ' in str(symbol):
mnq_contracts.append(symbol)
#self.algorithm.debug(f" -> Je to MNQ kontrakt")
if mnq_contracts:
# Vybrat první dostupný
selected = mnq_contracts[0]
#self.algorithm.debug(f"✓ Kontrakt nalezen ze securities: {selected}")
return selected
self.algorithm.debug("✗ Žádný MNQ kontrakt nebyl nalezen žádnou metodou")
return None
def _get_yesterday_bar_from_contract(self, contract_symbol):
"""Získání včerejšího daily baru z konkrétního kontraktu.
Args:
contract_symbol: Symbol konkrétního kontraktu
Returns:
TradeBar včerejšího dne nebo None
"""
try:
# Načtení 2 dní historie pro konkrétní kontrakt
# 2 dny = včerejší + předvčerejší (pro jistotu)
history = self.algorithm.history[TradeBar](contract_symbol, 2, Resolution.DAILY)
if history and len(list(history)) > 0:
history_list = list(history)
# Vrátit poslední bar (včerejší)
yesterday_bar = history_list[-1]
self.algorithm.debug(f"Yesterday bar z kontraktu {contract_symbol}: High={yesterday_bar.high:.2f}, Close={yesterday_bar.close:.2f}")
return yesterday_bar
else:
self.algorithm.debug(f"Žádná historie pro kontrakt {contract_symbol}")
return None
except Exception as e:
self.algorithm.debug(f"Chyba při načítání yesterday bar z {contract_symbol}: {str(e)}")
return None
def _get_yesterday_close_from_contract(self, contract_symbol):
"""Získání včerejšího close z konkrétního kontraktu.
Args:
contract_symbol: Symbol konkrétního kontraktu
Returns:
float včerejšího close nebo None
"""
yesterday_bar = self._get_yesterday_bar_from_contract(contract_symbol)
if yesterday_bar:
return yesterday_bar.close
return None
def _get_day_before_yesterday_close_from_contract(self, contract_symbol):
"""Získání předvčerejšího close z konkrétního kontraktu.
Args:
contract_symbol: Symbol konkrétního kontraktu
Returns:
float předvčerejšího close nebo None
"""
try:
# Načtení 3 dní historie pro konkrétní kontrakt
history = self.algorithm.history[TradeBar](contract_symbol, 3, Resolution.DAILY)
if history and len(list(history)) >= 2:
history_list = list(history)
# Vrátit předposlední bar (předvčerejší)
day_before_yesterday_bar = history_list[-2]
self.algorithm.debug(f"Day before yesterday close z kontraktu {contract_symbol}: Close={day_before_yesterday_bar.close:.2f}")
return day_before_yesterday_bar.close
else:
self.algorithm.debug(f"Nedostatek historie pro kontrakt {contract_symbol}")
return None
except Exception as e:
self.algorithm.debug(f"Chyba při načítání day before yesterday close z {contract_symbol}: {str(e)}")
return None
def _check_buy_signal(self, canonical_symbol, active_contract):
"""Kontrola buy signálu.
Args:
canonical_symbol: Canonical symbol
active_contract: Aktivní kontrakt symbol (pouze pro exekuci)
"""
# Získání indikátorů Z CANONICAL
indicators = self.indicator_manager.get_indicators(canonical_symbol)
if indicators is None:
self.algorithm.debug("Trading Logic: Indikátory nejsou k dispozici")
return
tp_10 = indicators.get('tp_10')
atr_10 = indicators.get('atr_10')
adx_10 = indicators.get('adx_10')
ibs = indicators.get('ibs')
sma_190 = indicators.get('sma_190')
sma_10 = indicators.get('sma_10')
yesterday_close = self.data_handler.get_yesterday_close(canonical_symbol)
day_before_yesterday_close = self.data_handler.get_day_before_yesterday_close(canonical_symbol)
not_invested = self.algorithm.portfolio[active_contract].invested
# Kontrola, zda máme všechny potřebné indikátory
if any(x is None for x in [tp_10, atr_10, adx_10, ibs]):
self.algorithm.debug("Trading Logic: Chybí některé indikátory pro buy signal")
return
# Aktuální cena Z CANONICAL SYMBOLU
# Všechna data z jednoho zdroje = konzistentní porovnání
if not self.algorithm.securities.contains_key(canonical_symbol):
self.algorithm.debug(f"Trading Logic: Security {canonical_symbol} není v securities")
return
current_price = self.algorithm.securities[canonical_symbol].price
if current_price == 0:
self.algorithm.debug(f"Trading Logic: Cena pro {canonical_symbol} je 0")
return
# Výpočet threshold z indikátorů (také z canonical symbolu)
buy_threshold = tp_10 - (1.5 * atr_10)
""" BUY LOGIC CODE """
volatility_edges = (current_price < buy_threshold and adx_10 > 20 and ibs < 0.6 and not not_invested)
turn_around_tuesday = (current_price < yesterday_close and yesterday_close < day_before_yesterday_close and self.algorithm.time.weekday() == 0 and not not_invested)
panic_selling = (ibs < 0.1 and (((current_price > sma_190) or (sma_190 == None)) or (current_price > sma_10)) and not not_invested)
# Kontrola podmínek
if volatility_edges or turn_around_tuesday or panic_selling:
self.algorithm.debug(f"*** BUY SIGNAL ***")
self.algorithm.debug(50 * "-")
# Nákup - použijeme risk manager pro správnou velikost pozice
self.algorithm.order(active_contract, self._order_q)
else:
self.algorithm.debug("BUY SIGNAL NOT TIGERRED")
self.algorithm.debug(50 * "-")
def _check_sell_signal(self, canonical_symbol, active_contract):
"""Kontrola sell signálu.
Sell podmínky:
- Jsme v pozici
- current price > yesterday high
- OBA z CANONICAL symbolu (konzistence)
Args:
canonical_symbol: Canonical symbol
active_contract: Aktivní kontrakt symbol (pouze pro exekuci)
"""
# Pokud nejsme v pozici, nemáme co prodávat
if not self.algorithm.portfolio[active_contract].invested:
return
# JSME V POZICI - načítáme yesterday high Z CANONICAL SYMBOLU
# Stejný zdroj jako všechna ostatní data = konzistence
yesterday_bar = self.data_handler.get_yesterday_bar(canonical_symbol)
if yesterday_bar is None:
self.algorithm.debug(f"Trading Logic: Yesterday bar pro canonical {canonical_symbol} není dostupný")
return
yesterday_high = yesterday_bar.high
# Aktuální cena Z CANONICAL SYMBOLU (stejný zdroj jako yesterday high)
if not self.algorithm.securities.contains_key(canonical_symbol):
self.algorithm.debug(f"Trading Logic: {canonical_symbol} není v securities")
return
current_price = self.algorithm.securities[canonical_symbol].price
if current_price == 0:
self.algorithm.debug(f"Trading Logic: Cena pro {canonical_symbol} je 0")
return
self.algorithm.debug(f"Sell Signal Check:")
self.algorithm.debug(f" Kontrakt v pozici: {active_contract}")
self.algorithm.debug(f" Current Price (from canonical {canonical_symbol}): {current_price:.2f}")
self.algorithm.debug(f" Yesterday High (from canonical {canonical_symbol}): {yesterday_high:.2f}")
# Kontrola sell podmínky
if current_price > yesterday_high:
self.algorithm.debug(f"*** SELL SIGNAL! ***")
self.algorithm.debug(f" Všechna data z canonical: {canonical_symbol}")
self.algorithm.debug(f" Obchod vykonán na kontraktu: {active_contract}")
self.algorithm.liquidate(active_contract)
else:
self.algorithm.debug("Sell podmínky nejsou splněny")
def _get_active_contract_mgc(self, canonical_symbol):
"""Získání aktivního kontraktu pro MGC.
Args:
canonical_symbol: Canonical symbol
Returns:
Symbol aktivního kontraktu nebo None
"""
# Metoda 1: Z contract filteru
active = self.algorithm.contract_filter.get_active_contract(canonical_symbol)
if active:
return active
# Metoda 2: Z invested holdings
for kvp in self.algorithm.portfolio:
security = kvp.value
if security.symbol.security_type == SecurityType.FUTURE:
if 'MGC' in str(security.symbol) and security.invested:
return security.symbol
# Metoda 3: Ze subscribed securities
mgc_contracts = []
for symbol in self.algorithm.securities.keys():
if symbol.security_type == SecurityType.FUTURE and 'MGC' in str(symbol):
mgc_contracts.append(symbol)
if mgc_contracts:
return mgc_contracts[0]
return None
def _check_buy_signal_mgc(self, canonical_symbol, active_contract):
"""Kontrola buy signálu pro MGC - dvě strategie.
Strategy 1: 3-day high + TLT close > yesterday TLT close + NOT Thursday
Strategy 2: Intraday high > yesterday 10-day high + IBS < 0.15
Args:
canonical_symbol: Canonical symbol
active_contract: Aktivní kontrakt symbol
"""
# Check not invested
if self.algorithm.portfolio[active_contract].invested:
return
# Get indicators
indicators = self.indicator_manager.get_indicators(canonical_symbol)
if indicators is None:
self.algorithm.debug("MGC: Indikátory nejsou k dispozici")
return
ibs = indicators.get('ibs')
sma_high_10 = indicators.get('sma_high_10')
# Get current price
if not self.algorithm.securities.contains_key(canonical_symbol):
return
current_price = self.algorithm.securities[canonical_symbol].price
if current_price == 0:
return
# Get yesterday bar
yesterday_bar = self.data_handler.get_yesterday_bar(canonical_symbol)
if yesterday_bar is None:
return
# Get 3-day history for strategy 1
history = self.data_handler.get_history_data(canonical_symbol, 3)
if len(history) < 3:
self.algorithm.debug("MGC: Nedostatek historických dat (< 3 dny)")
return
# === STRATEGY 1: 3-day high + TLT + NOT Thursday ===
three_day_high = max([bar.close for bar in history[-3:]])
is_three_day_high = (current_price >= three_day_high)
# TLT condition
tlt_yesterday_close = None
tlt_current_close = None
if self.algorithm.securities.contains_key(self.algorithm.tlt):
tlt_current_close = self.algorithm.securities[self.algorithm.tlt].close
tlt_history = self.algorithm.history[TradeBar](self.algorithm.tlt, 2, Resolution.DAILY)
if tlt_history and len(list(tlt_history)) >= 1:
tlt_yesterday_close = list(tlt_history)[-1].close
tlt_condition = (tlt_current_close and tlt_yesterday_close and tlt_current_close > tlt_yesterday_close)
is_not_thursday = (self.algorithm.time.weekday() != 3)
strategy_1 = (is_three_day_high and tlt_condition and is_not_thursday)
# === STRATEGY 2: Intraday high > yesterday 10-day high + IBS < 0.15 ===
current_daily_bar = self.data_handler.get_current_daily_bar(canonical_symbol)
if current_daily_bar is None:
self.algorithm.debug("MGC: Current daily bar není dostupný")
return
intraday_high = current_daily_bar.high
# Yesterday 10-day high = sma_high_10 from yesterday
# We need to get yesterday's value - use yesterday_bar.high as approximation
# Better: calculate 10-day high ending yesterday
history_11_days = self.data_handler.get_history_data(canonical_symbol, 11)
if len(history_11_days) >= 11:
yesterday_10day_high = max([bar.high for bar in history_11_days[-11:-1]])
else:
yesterday_10day_high = sma_high_10 # fallback
strategy_2 = (intraday_high > yesterday_10day_high and ibs < 0.15)
# Execute buy if any strategy triggers
if strategy_1:
self.algorithm.debug(f"*** MGC BUY SIGNAL (Strategy 1) ***")
self.algorithm.debug(f"3-day high: {is_three_day_high}, TLT: {tlt_condition}, NOT Thu: {is_not_thursday}")
self.algorithm.order(active_contract, self._order_qmgc)
elif strategy_2:
self.algorithm.debug(f"*** MGC BUY SIGNAL (Strategy 2) ***")
self.algorithm.debug(f"Intraday high: {intraday_high:.2f} > Yesterday 10-day high: {yesterday_10day_high:.2f}, IBS: {ibs:.4f}")
self.algorithm.order(active_contract, self._order_qmgc)
else:
self.algorithm.debug("MGC: BUY SIGNAL NOT TRIGGERED")
def _check_sell_signal_mgc(self, canonical_symbol, active_contract):
"""Kontrola sell signálu pro MGC.
Sell: close > yesterday high
Args:
canonical_symbol: Canonical symbol
active_contract: Aktivní kontrakt symbol
"""
# Pokud nejsme v pozici, nemáme co prodávat
if not self.algorithm.portfolio[active_contract].invested:
return
# Get yesterday bar
yesterday_bar = self.data_handler.get_yesterday_bar(canonical_symbol)
if yesterday_bar is None:
return
yesterday_high = yesterday_bar.high
# Get current price
if not self.algorithm.securities.contains_key(canonical_symbol):
return
current_price = self.algorithm.securities[canonical_symbol].price
if current_price == 0:
return
self.algorithm.debug(f"MGC Sell Check: Current={current_price:.2f}, Yesterday High={yesterday_high:.2f}")
# Sell condition
if current_price > yesterday_high:
self.algorithm.debug(f"*** MGC SELL SIGNAL! ***")
self.algorithm.liquidate(active_contract)
else:
self.algorithm.debug("MGC: Sell podmínky nejsou splněny")
def execute_mes_entry_logic(self):
"""MES Entry logika - voláno po market open."""
self.algorithm.debug("=== MES ENTRY LOGIC STARTED ===")
self.algorithm.debug(f"Available canonical symbols: {list(self.data_handler._daily_bars.keys())}")
# Získání canonical symbolu pro MES - hledáme podle futures typu
# Micro S&P 500 může být označen jako ES (canonical pro všechny E-mini S&P)
mes_canonical = None
for canonical in self.data_handler._daily_bars.keys():
self.algorithm.debug(f"Checking canonical: {canonical}")
canonical_str = str(canonical)
# Hledáme ES (ne NQ a ne MGC)
if 'MES' in canonical_str and 'NQ' not in canonical_str and 'MGC' not in canonical_str and 'MNQ' not in canonical_str:
mes_canonical = canonical
self.algorithm.debug(f"Found MES canonical: {mes_canonical}")
break
if mes_canonical is None:
self.algorithm.debug("MES: Canonical symbol nenalezen - checking contract_filter")
self.algorithm.debug(f"Contract filter futures: {list(self.algorithm.contract_filter.futures.keys())}")
# Try to find in contract_filter
for canonical in self.algorithm.contract_filter.futures.keys():
canonical_str = str(canonical)
if 'MES' in canonical_str and 'NQ' not in canonical_str and 'MGC' not in canonical_str and 'MNQ' not in canonical_str:
mes_canonical = canonical
self.algorithm.debug(f"Found MES/ES in contract_filter: {mes_canonical}")
break
if mes_canonical is None:
self.algorithm.debug("MES: Still not found - skipping")
return
# Získání aktivního kontraktu
mes_active_contract = self._get_active_contract_mes(mes_canonical)
if mes_active_contract is None:
self.algorithm.debug("MES: Aktivní kontrakt nenalezen")
return
# Check entry signal
self._check_mes_short_entry(mes_canonical, mes_active_contract)
def execute_mes_exit_logic(self):
"""MES Exit logika - voláno před market close."""
# Získání canonical symbolu pro MES
mes_canonical = None
for canonical in self.data_handler._daily_bars.keys():
canonical_str = str(canonical)
if 'MES' in canonical_str and 'NQ' not in canonical_str and 'MGC' not in canonical_str and 'MNQ' not in canonical_str:
mes_canonical = canonical
break
if mes_canonical is None:
for canonical in self.algorithm.contract_filter.futures.keys():
canonical_str = str(canonical)
if 'MES' in canonical_str and 'NQ' not in canonical_str and 'MGC' not in canonical_str and 'MNQ' not in canonical_str:
mes_canonical = canonical
break
if mes_canonical is None:
return
# Získání aktivního kontraktu
mes_active_contract = self._get_active_contract_mes(mes_canonical)
if mes_active_contract is None:
return
# Exit all positions at close
if self.algorithm.portfolio[mes_active_contract].invested:
self.algorithm.debug(f"*** MES EXIT at CLOSE ***")
self.algorithm.liquidate(mes_active_contract)
def _get_active_contract_mes(self, canonical_symbol):
"""Získání aktivního kontraktu pro MES.
Args:
canonical_symbol: Canonical symbol
Returns:
Symbol aktivního kontraktu nebo None
"""
# Metoda 1: Z contract filteru
active = self.algorithm.contract_filter.get_active_contract(canonical_symbol)
if active:
return active
# Metoda 2: Z invested holdings
for kvp in self.algorithm.portfolio:
security = kvp.value
if security.symbol.security_type == SecurityType.FUTURE:
symbol_str = str(security.symbol)
if 'MES' in symbol_str and 'NQ' not in symbol_str and 'MGC' not in symbol_str and 'MNQ' not in symbol_str and security.invested:
return security.symbol
# Metoda 3: Ze subscribed securities
mes_contracts = []
for symbol in self.algorithm.securities.keys():
if symbol.security_type == SecurityType.FUTURE:
symbol_str = str(symbol)
if 'MES' in symbol_str and 'NQ' not in symbol_str and 'MGC' not in symbol_str and 'MNQ' not in symbol_str:
mes_contracts.append(symbol)
if mes_contracts:
return mes_contracts[0]
return None
def _check_mes_short_entry(self, canonical_symbol, active_contract):
"""Kontrola MES short entry signálu po market open.
Entry conditions (all must be true):
1. The open must be at least 0.75% higher than yesterday's close
2. Yesterday's 5-day RSI must be lower than 50
3. Yesterday's volume must be lower than the day before
4. Yesterday's close must be below the 200-day moving average
Args:
canonical_symbol: Canonical symbol
active_contract: Aktivní kontrakt symbol
"""
# Check if already in position
if self.algorithm.portfolio[active_contract].invested:
self.algorithm.debug("MES: Již v pozici, přeskakuji entry")
return
# Get current price (open)
if not self.algorithm.securities.contains_key(canonical_symbol):
self.algorithm.debug("MES: Canonical symbol není v securities")
return
current_open = self.algorithm.securities[canonical_symbol].open
if current_open == 0:
self.algorithm.debug("MES: Open cena je 0")
return
# Get yesterday's data
yesterday_bar = self.data_handler.get_yesterday_bar(canonical_symbol)
if yesterday_bar is None:
self.algorithm.debug("MES: Yesterday bar není dostupný")
return
yesterday_close = yesterday_bar.close
yesterday_volume = yesterday_bar.volume
# Get day before yesterday's data
history = self.data_handler.get_history_data(canonical_symbol, 3)
if len(history) < 2:
self.algorithm.debug("MES: Nedostatek historických dat")
return
day_before_yesterday_bar = history[-2]
day_before_yesterday_volume = day_before_yesterday_bar.volume
# Get indicators
indicators = self.indicator_manager.get_indicators(canonical_symbol)
if indicators is None:
self.algorithm.debug("MES: Indikátory nejsou k dispozici")
return
rsi_5 = indicators.get('rsi_5')
sma_200 = indicators.get('sma_200')
if rsi_5 is None or sma_200 is None:
self.algorithm.debug("MES: Chybí RSI nebo SMA_200")
return
# Calculate conditions
open_gap_pct = ((current_open - yesterday_close) / yesterday_close) * 100
condition_1 = open_gap_pct >= 1.5
condition_2 = rsi_5 < 50
condition_3 = yesterday_volume < day_before_yesterday_volume
condition_4 = yesterday_close < sma_200
if not self.algorithm.is_warming_up:
self.algorithm.debug(f"=== MES SHORT ENTRY CHECK ===")
self.algorithm.debug(f"Current Open: {current_open:.2f}")
self.algorithm.debug(f"Yesterday Close: {yesterday_close:.2f}")
self.algorithm.debug(f"Open Gap %: {open_gap_pct:.2f}%")
self.algorithm.debug(f"Condition 1 (Open >= +0.75%): {condition_1}")
self.algorithm.debug(f"RSI(5): {rsi_5:.2f}, Condition 2 (RSI < 50): {condition_2}")
self.algorithm.debug(f"Yesterday Volume: {yesterday_volume:.0f}, Day Before: {day_before_yesterday_volume:.0f}")
self.algorithm.debug(f"Condition 3 (Vol declining): {condition_3}")
self.algorithm.debug(f"SMA(200): {sma_200:.2f}, Condition 4 (Close < SMA200): {condition_4}")
# Check all conditions
if condition_1 and condition_2 and condition_3 and condition_4:
self.algorithm.debug(f"*** MES SHORT ENTRY SIGNAL ***")
self.algorithm.debug(50 * "-")
# Short 10% of portfolio using risk manager
self.algorithm.order(active_contract, -self._order_q)
else:
self.algorithm.debug("MES: Entry conditions not met")
self.algorithm.debug(50 * "-")