| Overall Statistics |
|
Total Orders 193 Average Win 0.04% Average Loss -0.02% Compounding Annual Return 17.384% Drawdown 0.100% Expectancy 0.550 Start Equity 100000 End Equity 100142.82 Net Profit 0.143% Sharpe Ratio -1.25 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 42% Win Rate 58% Profit-Loss Ratio 1.66 Alpha 0 Beta 0 Annual Standard Deviation 0.016 Annual Variance 0 Information Ratio 2.143 Tracking Error 0.016 Treynor Ratio 0 Total Fees $13.68 Estimated Strategy Capacity $0 Lowest Capacity Asset MNQ YVXOP65RE0HT Portfolio Turnover 286.07% Drawdown Recovery 0 |
from AlgorithmImports import *
class PivotPoints_HighLow:
"""Custom Pivot Points High Low indicator"""
def __init__(self, left_bars, right_bars, lookback, algorithm):
self.algorithm = algorithm
self.left_bars = left_bars
self.right_bars = right_bars
self.lookback = self.left_bars + self.right_bars + 4
self.bars = RollingWindow[TradeBar](lookback)
self.high_pivots = []
self.low_pivots = []
self.IsReady = False
# Store pivot locations for plotting
self.pivot_times = []
self.pivot_highs = []
self.pivot_lows = []
def Update(self, bar):
"""Update indicator with new bar"""
self.bars.add(bar)
if self.bars.count >= self.lookback:
self.IsReady = True
self._calculate_pivots()
#else:
#self.algorithm.debug(f"indicator bar count: {self.bars.count}")
def _calculate_pivots(self):
"""Calculate pivot points using left/right bar confirmation"""
if self.bars.count < self.left_bars + self.right_bars + 1:
#self.algorithm.Debug(f"Not enough bars: {self.bars.count}")
return
# Convert RollingWindow (most-recent-first) → oldest-first list
bars_list = [self.bars[i] for i in range(self.bars.count - 1, -1, -1)]
n = len(bars_list)
self.high_pivots.clear()
self.low_pivots.clear()
# Loop forward in chronological order
for i in range(self.left_bars, n - self.right_bars):
center_bar = bars_list[i]
left_bars = bars_list[i - self.left_bars : i]
right_bars = bars_list[i + 1 : i + 1 + self.right_bars]
# --- Check HIGH pivot ---
if all(b.high < center_bar.high for b in left_bars + right_bars):
self.high_pivots.append({
'price': center_bar.high,
'time': center_bar.EndTime,
'index': i
})
# --- Check LOW pivot ---
if all(b.low > center_bar.low for b in left_bars + right_bars):
self.low_pivots.append({
'price': center_bar.low,
'time': center_bar.EndTime,
'index': i
})
# Sort by time descending (latest first)
self.high_pivots.sort(key=lambda x: x['time'], reverse=True)
self.low_pivots.sort(key=lambda x: x['time'], reverse=True)
# Limit to 10 most recent
self.high_pivots = self.high_pivots[:10]
self.low_pivots = self.low_pivots[:10]
# self.algorithm.debug(f"High pivots ({len(self.high_pivots)}): {[p['price'] for p in self.high_pivots]}")
# self.algorithm.debug(f"Low pivots ({len(self.low_pivots)}): {[p['price'] for p in self.low_pivots]}")
def get_recent_high_pivots(self, count=3):
"""Get the most recent high pivot points"""
if not self.IsReady or not self.high_pivots:
return [0] * count
pivots = []
for i in range(min(count, len(self.high_pivots))):
pivots.append(self.high_pivots[i]['price'])
# Pad with zeros if we don't have enough pivots
while len(pivots) < count:
pivots.append(0)
return pivots
def get_recent_low_pivots(self, count=3):
"""Get the most recent low pivot points"""
if not self.IsReady or not self.low_pivots:
return [0] * count
pivots = []
for i in range(min(count, len(self.low_pivots))):
pivots.append(self.low_pivots[i]['price'])
# Pad with zeros if we don't have enough pivots
while len(pivots) < count:
pivots.append(0)
return pivots
def get_pivot_distances(self, current_high, current_low):
"""Get distances from current price to pivot levels"""
high_distances = []
low_distances = []
# Calculate distances to high pivots
for pivot in self.high_pivots[:3]: # Top 3 most recent
distance = pivot['price'] - current_high
high_distances.append(distance)
# Pad if needed
while len(high_distances) < 3:
high_distances.append(0)
# Calculate distances to low pivots
for pivot in self.low_pivots[:3]: # Top 3 most recent
distance = pivot['price'] - current_low
low_distances.append(distance)
# Pad if needed
while len(low_distances) < 3:
low_distances.append(0)
return high_distances, low_distancesfrom AlgorithmImports import *
from datetime import datetime, time, timedelta
import json
from indicators import PivotPoints_HighLow
from utils import ConfigurationManager, AlertSystem, EconomicCalendar, RiskManager, FeatureCalculator
from order_management import OrderManager
import xgboost as xgb
import numpy as np
import joblib
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
class ScalpingMLStrategy(QCAlgorithm):
def Initialize(self):
# Set start and end dates
self.set_start_date(2025, 9, 8)
self.set_end_date(2025, 9, 10)
self.set_cash(100000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.set_time_zone("Europe/Paris")
# Initialize configuration manager
self.debug("Initializing Configuration Manager...")
self.config = ConfigurationManager()
# Initialize Feature Manager
self.feature_calculator = FeatureCalculator(self)
# Initialize alert system
self.debug("Initializing Alert System...")
self.alerts = AlertSystem(self)
# Initialize economic calendar
self.debug("Initializing Economic Calendar")
self.economic_calendar = EconomicCalendar()
# Initialize Order Manager
self.debug("Initializing Order Manager...")
self.order_manager = OrderManager(self)
# Add Micro Nasdaq futures
ticker = Futures.Indices.MICRO_NASDAQ_100_E_MINI
future = self.add_future(ticker, Resolution.TICK)
future.SetFilter(0, 90) # Filter to get front month contract
future.set_data_filter(CustomTickDataFilter(self))
self.consolidator_by_symbol = {}
self.calls = 0
self.symb = None
# Trading hours
self.trading_start = time(
self.config.get('trading_start_hour', 18),
self.config.get('trading_start_minute', 0)
)
self.trading_end = time(
self.config.get('trading_end_hour', 22),
self.config.get('trading_end_minute', 0)
)
# Risk management parameters from config
self.daily_loss_limit = self.config.get('daily_loss_limit', -500)
self.daily_win_limit = self.config.get('daily_win_limit', 100)
self.stop_loss = self.config.get('stop_loss', 12.5)
self.take_profit = self.config.get('take_profit', 12.5)
self.entry_offset = self.config.get('entry_offset', 2.5)
# Daily PNL tracking
self.daily_pnl = 0
self.last_date = None
self.trading_stopped = False
self.start_of_day_total_profit = 0
# Data structures
self.h500_bars = RollingWindow[TradeBar](1000)
self.h5000_bars = RollingWindow[TradeBar](200)
self.m1_bars = RollingWindow[TradeBar](1000)
# Indicators
atr_period = self.config.get('atr_period', 14)
self.atr_14 = AverageTrueRange(atr_period)
pivot_10_params = self.config.get('pivot_10_params', [10, 10, 20])
pivot_5_params = self.config.get('pivot_5_params', [5, 5, 20])
self.pivot_10_10_20_m1 = PivotPoints_HighLow(*pivot_10_params, self)
self.pivot_5_5_20_m1 = PivotPoints_HighLow(*pivot_5_params, self)
self.pivot_5_5_20_h5000 = PivotPoints_HighLow(*pivot_5_params, self)
volume_period = self.config.get('volume_sma_period', 15)
self.volume_sma_15 = SimpleMovingAverage(volume_period)
# XGBoost ML models
self.ml_model_1 = None
self.ml_model_2 = None
self.training_data = []
self.prediction_buffer = RollingWindow(50)
self.model_1_key = "model_buy_20251019_105614.pkl"
self.model_2_key = "model_sell_20251019_105614.pkl"
self.initial_training = False
self.warm_up_needed = False
# Try to load models from Object Store if configured
if self.config.get('load_models_from_store', False):
self.LoadModelsFromStore()
else:
self.warm_up_needed = True
if self.warm_up_needed:
self.set_warm_up(timedelta(days=7))
# Performance tracking
self.daily_stats = {}
self.conflicting_predictions_count = 0
self.buy_predictions_count = 0
self.sell_predictions_count = 0
self._last_prediction_1: bool = False
self._last_prediction_2: bool = False
self._last_validated_bar_time = None
self._validation_failures = {}
# Cumulative Delta tracking
self.cumulative_delta = 0
self.cumulative_delta_bars = RollingWindow[float](100)
self.last_price = 0
# Economic announcements - used as trade filter, not ML feature
self.daily_economic_score = 0
self.economic_filter_threshold = self.config.get('economic_filter_threshold', 7)
# Charts
self.debug("Setting up charts...")
self.SetupCharts()
# Schedule daily tasks - PARIS TIME
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.at(23, 0), # 11 PM Paris time
self.EndOfDayTasks
)
self.schedule.on(
self.date_rules.week_end(), # Every Friday
self.time_rules.at(22, 0), # 16:00 Paris time (after regular market close)
self.TrainMLModels
)
# schedule daily reset at market open
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.at(6, 0), # 6 AM Paris time (before your trading session)
self.ResetDailyPNL
)
# Initialize logging
self.debug("ScalpingMLStrategy initialized with configuration:")
self.debug(f"Daily limits: Loss ${self.daily_loss_limit}, Win ${self.daily_win_limit}")
self.debug(f"Trading hours: {self.trading_start} - {self.trading_end} Paris time")
self.debug(f"Economic News filter threshold: {self.economic_filter_threshold}")
self.debug(f"Buy prediction threshold: {self.config.get('buy_prediction_threshold')}")
self.debug(f"Sell prediction threshold: {self.config.get('sell_prediction_threshold')}")
self.alerts.send_alert("Strategy initialized successfully", "INFO")
def LoadModelsFromStore(self):
"""Load pre-trained models from Object Store"""
try:
# Check if models exist in Object Store
if self.object_store.contains_key(self.model_1_key) and self.object_store.contains_key(self.model_2_key):
self.debug("Loading models from Object Store...")
# Load model 1 (Buy signals)
file_path = self.object_store.get_file_path(self.model_1_key)
self.ml_model_1 = joblib.load(file_path)
self.debug(f"Loaded ML model 1 from object store at {file_path}")
# Load model 2 (Sell signals)
file_path = self.object_store.get_file_path(self.model_2_key)
self.ml_model_2 = joblib.load(file_path)
self.debug(f"Loaded ML model 2 from object store at {file_path}")
# Load model metadata if available
if self.object_store.contains_key("model_metadata"):
metadata_json = self.object_store.read("model_metadata")
metadata = json.loads(metadata_json)
self.debug(f"Models loaded successfully. Trained on: {metadata.get('training_date')}")
self.debug(f"Training samples: {metadata.get('training_samples')}")
self.initial_training = True
self.log("Models loaded successfully from Object Store")
self.alerts.send_alert("Models loaded from Object Store", "INFO")
self.set_warm_up(timedelta(days=2))
else:
self.debug("No pre-trained models found in Object Store. Will train during warm-up.")
self.log("No pre-trained models found in Object Store. Will train during warm-up.")
self.warm_up_needed = True
self.initial_training = False
except Exception as e:
error_msg = f"Error loading models from Object Store: {str(e)}"
self.error(error_msg)
self.alerts.send_alert(error_msg, "ERROR")
self.debug("Will train new models during warm-up.")
self.warm_up_needed = True
self.initial_training = False
def OnSecuritiesChanged(self, changes):
"""Handle security changes to get the active futures contract and remove old consolidators"""
# --- Remove consolidators for expired/removed contracts ---
for removed in changes.RemovedSecurities:
symbol = removed.Symbol
if symbol in self.consolidator_by_symbol:
info = self.consolidator_by_symbol.pop(symbol)
# remove all three consolidators
for key in ('h500', 'h5000', 'm1'):
consolidator = info.get(key)
if consolidator is None:
continue
try:
# detach correct event handler
if key == 'h500':
consolidator.DataConsolidated -= self.OnH500TickConsolidated
elif key == 'h5000':
consolidator.DataConsolidated -= self.OnH5000TickConsolidated
elif key == 'm1':
consolidator.DataConsolidated -= self.OnM1DataConsolidated
except Exception:
pass
try:
self.subscription_manager.remove_consolidator(symbol, consolidator)
except Exception:
pass
self.log(f"Removed consolidators for {symbol}")
# --- Add consolidators for newly added futures contract(s) ---
for added in changes.AddedSecurities:
if added.Symbol.SecurityType == SecurityType.FUTURE:
self.symb = added.Symbol
self.log(f"Added future {self.symb} – creating consolidators")
self._add_tick_consolidators_for_symbol(self.symb)
def _add_tick_consolidators_for_symbol(self, symbol):
if symbol in self.consolidator_by_symbol:
return
h500_count = self.config.get('h500_trade_count', 500)
h5000_count = self.config.get('h5000_trade_count', 5000)
self.log(f"Registering consolidators for {symbol}: "
f"H500={h500_count}, H5000={h5000_count}, M1=1-minute")
# --- tick-based consolidators ---
h500_con = TickConsolidator(h500_count)
h5000_con = TickConsolidator(h5000_count)
m1_con = TickConsolidator(timedelta(minutes=1))
h500_con.data_consolidated += self.OnH500TickConsolidated
h5000_con.data_consolidated += self.OnH5000TickConsolidated
m1_con.data_consolidated += self.OnM1DataConsolidated
self.subscription_manager.add_consolidator(symbol, h500_con)
self.subscription_manager.add_consolidator(symbol, h5000_con)
self.subscription_manager.add_consolidator(symbol, m1_con)
self.consolidator_by_symbol[symbol] = {
'h500': h500_con,
'h5000': h5000_con,
'm1': m1_con
}
def _validate_bar(self, bar, source="Unknown"):
"""
Comprehensive bar validation to catch bad data early in the pipeline.
Args:
bar: The bar object to validate
source: String identifier for logging where validation failed
Returns:
bool: True if bar is valid, False otherwise
"""
# Check if bar exists
if bar is None:
self.error(f"[{source}] Received None bar")
return False
# Check if bar has required attributes
required_attrs = ['Open', 'High', 'Low', 'Close', 'Volume']
for attr in required_attrs:
if not hasattr(bar, attr):
self.error(f"[{source}] Bar missing attribute: {attr}")
return False
# Extract OHLC values
try:
o = float(bar.Open)
h = float(bar.High)
l = float(bar.Low)
c = float(bar.Close)
v = float(bar.Volume)
except (ValueError, TypeError) as e:
self.error(f"[{source}] Cannot convert bar values to float: {e}")
return False
# Check for zero or negative prices
if o <= 0 or h <= 0 or l <= 0 or c <= 0:
self.error(f"[{source}] Invalid price values: O={o}, H={h}, L={l}, C={c}")
return False
# Check for negative volume
if v < 0:
self.error(f"[{source}] Negative volume: {v}")
return False
# Check OHLC relationships
if h < max(o, c) or l > min(o, c):
self.error(f"[{source}] Invalid OHLC relationship: O={o}, H={h}, L={l}, C={c}")
return False
if h < l:
self.error(f"[{source}] High < Low: H={h}, L={l}")
return False
# Check for reasonable price ranges (outlier detection)
if self.symb is not None and hasattr(self.securities[self.symb], 'Price'):
current_price = self.securities[self.symb].Price
if current_price > 0:
# Check if any OHLC value is more than 10% away from current price
max_deviation = 0.10 # 10%
prices_to_check = [o, h, l, c]
for price in prices_to_check:
deviation = abs(price - current_price) / current_price
if deviation > max_deviation:
self.error(f"[{source}] Price outlier detected: {price} vs current {current_price} (deviation: {deviation:.2%})")
return False
# Check for extreme price ranges within the bar
bar_range = h - l
bar_mid = (h + l) / 2
if bar_mid > 0:
range_pct = bar_range / bar_mid
if range_pct > 0.05: # Bar range > 5% of mid price
self.debug(f"[{source}] Warning: Large bar range detected: {range_pct:.2%} of mid price")
# Update last validated time
if hasattr(bar, 'EndTime') or hasattr(bar, 'time'):
self._last_validated_bar_time = getattr(bar, 'EndTime', getattr(bar, 'time', None))
return True
def on_data(self, data):
"""Main data handling method"""
if self.is_warming_up:
return
# Check if trading is stopped for the day
if self.trading_stopped:
return
# Get economic announcement score for today (used as filter, not ML feature)
self.daily_economic_score = self.economic_calendar.get_impact_score(self.time.date())
for chain in data.FutureChains:
contracts = list(chain.Value)
if not contracts:
continue
# pick most liquid / front-month by open interest
contract = max(contracts, key=lambda c: c.OpenInterest or 0)
self._add_tick_consolidators_for_symbol(contract.Symbol)
def OnTickConsolidated(self, sender, consolidated):
"""
Called by TickConsolidator when it emits a consolidated TradeBar (or Tick->TradeBar).
"""
self.calls += 1
try:
symbol = getattr(consolidated, "Symbol", None)
end_time = getattr(consolidated, "Endtime", getattr(consolidated, "time", None))
openp = getattr(consolidated, "Open", None)
highp = getattr(consolidated, "High", None)
lowp = getattr(consolidated, "Low", None)
closep = getattr(consolidated, "Close", None)
vol = getattr(consolidated, "Volume", None)
self.debug(f"OnTickConsolidated called #{self.calls} for {symbol} at {end_time} O:{openp} H:{highp} L:{lowp} C:{closep} V:{vol}")
self.OnH500DataConsolidated(sender, consolidated)
except Exception as e:
self.error(f"Error in OnTickConsolidated: {str(e)}")
def ProcessTickData(self, tick):
"""Process individual tick data"""
# Update cumulative delta
self.UpdateCumulativeDelta(tick)
# Update last price
self.last_price = tick.Price
def OnM1DataConsolidated(self, sender, bar):
"""Handle M1 bar completion"""
if not self._validate_bar(bar, "M1"):
return
self.m1_bars.add(bar)
# Update M1 indicators
self.pivot_10_10_20_m1.Update(bar)
self.pivot_5_5_20_m1.Update(bar)
self.volume_sma_15.Update(IndicatorDataPoint(bar.time, bar.Volume))
# plot M1 data
self.plot("M1 Data", "Volume", bar.Volume)
if self.volume_sma_15.IsReady:
self.plot("Market Data", "Volume", bar.Volume)
def OnH500TickConsolidated(self, sender, consolidated):
"""Called when the 500-tick consolidator emits a bar."""
try:
if not self._validate_bar(consolidated, "H500"):
return
self.calls += 1
self.OnH500DataConsolidated(sender, consolidated)
except Exception as e:
self.error(f"Error in OnH500TickConsolidated: {e}")
def OnH5000TickConsolidated(self, sender, consolidated):
"""Called when the 5000-tick consolidator emits a bar."""
try:
if not self._validate_bar(consolidated, "H5000"):
return
self.OnH5000DataConsolidated(sender, consolidated)
except Exception as e:
self.error(f"Error in OnH5000TickConsolidated: {e}")
def OnH500DataConsolidated(self, sender, bar):
"""Handle H500 bar completion"""
self.h500_bars.add(bar)
if self.h500_bars.count >= 50:
self.atr_14.Update(bar)
# Always store training data (even during warmup)
variables = self.CalculateVariables(bar)
if variables is not None:
self.StoreTrainingData(variables, bar)
else:
return
# During warmup, don't trade - just collect data
if self.is_warming_up:
return
# Update daily PNL and check limits
self.UpdateDailyPNL()
# Manage orders using OrderManager
self.order_manager.manage_orders(max_positions=self.config.get('max_positions', 1))
# Check trading hours
if not self.IsTradingHours():
self.debug("Trading stopped, outside Paris Trading hours")
return
# Run initial training
if not self.initial_training:
if len(self.training_data) >= self.config.get('min_training_samples', 1000):
self.debug("Running first XGboost Model training...")
self.TrainMLModels()
self.initial_training = True
else:
self.debug(f"Not enough samples for initial training. Current: {len(self.training_data)}, Required: {self.config.get('min_training_samples')}, Try increasing warm up time.")
if self.h500_bars.count < 50:
return
# plot H500 OHLC data
self.plot("H500 + PPHL(10,10,20)", "H500 Candles", bar)
self.plot("H500 + PPHL(5,5,20)", "H500 Candles", bar)
# Plot the pivot points if they are ready
if self.pivot_10_10_20_m1.IsReady:
if self.pivot_10_10_20_m1.high_pivots:
latest_high = self.pivot_10_10_20_m1.high_pivots[-1]
self.plot("H500 + PPHL(10,10,20)", "Pivot_High_10", float(latest_high['price']))
if self.pivot_10_10_20_m1.low_pivots:
latest_low = self.pivot_10_10_20_m1.low_pivots[-1]
self.plot("H500 + PPHL(10,10,20)", "Pivot_Low_10", float(latest_low['price']))
if self.pivot_5_5_20_m1.IsReady:
if self.pivot_5_5_20_m1.high_pivots:
latest_high = self.pivot_5_5_20_m1.high_pivots[-1]
self.plot("H500 + PPHL(5,5,20)", "Pivot_High_5", float(latest_high['price']))
if self.pivot_5_5_20_m1.low_pivots:
latest_low = self.pivot_5_5_20_m1.low_pivots[-1]
self.plot("H500 + PPHL(5,5,20)", "Pivot_Low_5", float(latest_low['price']))
# Check economic filter before making predictions
economic_filter_passed = self.CheckEconomicFilter()
# Make predictions only if economic filter passes
if economic_filter_passed and self.ml_model_1 is not None and self.ml_model_2 is not None:
prediction_1 = self.MakePrediction1(variables)
prediction_2 = self.MakePrediction2(variables)
# Count conflicts vs. non-conflicts
if prediction_1 or prediction_2:
if prediction_1 and prediction_2:
self.conflicting_predictions_count += 1
self.debug(f"Conflict detected: both BUY and SELL predicted. "
f"Total conflicts: {self.conflicting_predictions_count} out of {len(self.training_data)} bars. [{self.time.date()}]")
else:
if prediction_1:
self.buy_predictions_count += 1
elif prediction_2:
self.sell_predictions_count += 1
# --- Execute Trades ---
if prediction_1:
self.debug("Handling BUY prediction!")
self.HandleBuySignal(bar)
elif prediction_2:
self.debug("Handling SELL prediction!")
self.HandleSellSignal(bar)
elif not economic_filter_passed:
self.log(f"Trading blocked by economic filter. Score: {self.daily_economic_score}")
self.plot("Economic Filter", "Blocked", 1)
def OnH5000DataConsolidated(self, sender, bar):
"""Handle H5000 bar completion"""
self.h5000_bars.add(bar)
self.pivot_5_5_20_h5000.Update(bar)
def CalculateVariables(self, h500_bar):
"""Delegate to feature calculator"""
return self.feature_calculator.calculate_variables(h500_bar)
def CheckEconomicFilter(self):
"""Check if economic conditions allow trading"""
if self.daily_economic_score >= self.economic_filter_threshold:
return False
return True
def MakePrediction1(self, variables):
"""Make prediction 1 (BUY signals)"""
features = [variables.get(key, 0) for key in [
'upper_wick_H500', 'lower_wick_H500', 'atr_14',
'PPHL_L_10_M1_1',
'PPHL_H_10_M1_1', 'PPHL_H_10_M1_2',
'PPHL_H_5_M1_2', 'PPHL_H_5_M1_3',
'upper_wick_DC', 'lower_wick_DC', 'ratio_volume',
'upper_wick_H5000', 'lower_wick_H5000'
]]
try:
prob = float(self.ml_model_1.predict_proba([features])[0][1])
self.plot("Raw Probabilities Buy", "Model1_prob", prob)
#threshold = self.config.get("buy_prediction_threshold", 0.62)
threshold = 0.57 # enter only if below this (anticipating algo fakeouts)
signal = prob < threshold
return signal
except:
return False
def MakePrediction2(self, variables):
"""Make prediction 2 (SELL signals)"""
features = [variables.get(key, 0) for key in [
'upper_wick_H500', 'lower_wick_H500', 'atr_14',
'PPHL_L_10_M1_1', 'PPHL_L_10_M1_2', 'PPHL_L_10_M1_3',
'PPHL_L_5_M1_1',
'PPHL_H_10_M1_1', 'PPHL_H_10_M1_2',
'PPHL_H_5_M1_1', 'PPHL_H_5_M1_2',
'upper_wick_DC', 'lower_wick_DC', 'ratio_volume',
'upper_wick_H5000', 'lower_wick_H5000'
]]
try:
prob = float(self.ml_model_2.predict_proba([features])[0][1])
self.plot("Raw Probabilities Sell", "Model2_prob", prob)
prediction_threshold = self.config.get('sell_prediction_threshold', 0.62)
signal = prob > prediction_threshold
return signal
except:
return False
def HandleBuySignal(self, bar):
"""Main orchestrator for buy-side signal"""
if self.trading_stopped:
self.debug("Trading Stopped.")
return
self.plot("H500 + PPHL(10,10,20)", "Prediction1", bar.Close)
self.plot("H500 + PPHL(5,5,20)", "Prediction1", bar.Close)
self.plot("Predictions + Orders", "Prediction1", 1)
# Check if we can actually place an order using OrderManager
if not self.order_manager.can_place_order(self.symb):
return
min_tick = 0.25
# --- BREAKOUT ENTRY LOGIC ---
# Place a Buy Stop Order just above the high of the signal bar
entry_price = bar.high + min_tick
# Ensure prices are rounded to the nearest tick
entry_price = round(entry_price / min_tick) * min_tick
sl_price = entry_price - self.stop_loss
tp_price = entry_price + self.take_profit
self.log(f"Placing BREAKOUT BUY for {self.symb} at {entry_price}")
# Use a StopMarketOrder for entry
self.order_manager.place_buy_stop_order(self.symb, entry_price, sl_price, tp_price)
def HandleSellSignal(self, bar):
"""Main orchestrator for sell-side signal"""
if self.trading_stopped:
self.debug("Trading Stopped.")
return
self.plot("H500 + PPHL(10,10,20)", "Prediction2", bar.Close)
self.plot("H500 + PPHL(5,5,20)", "Prediction2", bar.Close)
self.plot("Predictions + Orders", "Prediction2", 1)
# Check if we can actually place an order using OrderManager
if not self.order_manager.can_place_order(self.symb):
return
min_tick = 0.25
# --- BREAKOUT ENTRY LOGIC ---
# Place a Sell Stop Order just below the low of the signal bar
entry_price = bar.low - min_tick
entry_price = round(entry_price / min_tick) * min_tick
sl_price = entry_price + self.stop_loss
tp_price = entry_price - self.take_profit
self.log(f"Placing BREAKOUT SELL for {self.symb} at {entry_price}")
self.order_manager.place_sell_stop_order(self.symb, entry_price, sl_price, tp_price)
def ResetDailyPNL(self):
"""Reset counters at the start of each trading day"""
self.start_of_day_total_profit = self.portfolio.total_profit
self.start_of_day_total_fees = self.portfolio.total_fees
self.daily_pnl = 0
self.trading_stopped = False
self.last_date = self.time.date()
self.log(f"Daily PNL and trading flags reset. Date: {self.last_date} (profit={self.start_of_day_total_profit}, fees={self.start_of_day_total_fees})")
def UpdateDailyPNL(self):
"""Update daily PNL and enforce limits with throttled plotting"""
daily_profit = self.portfolio.total_profit - self.start_of_day_total_profit
daily_fees = self.portfolio.total_fees - self.start_of_day_total_fees
self.daily_pnl = daily_profit - daily_fees
self.plot("PNL", "Daily Net Profit (Fee Adj)", self.daily_pnl)
self.plot("PNL", "Daily Fees", daily_fees)
# --- THROTTLE plotting ---
now = self.time
if not hasattr(self, "_last_pnl_plot_time") or \
now - self._last_pnl_plot_time >= timedelta(minutes=1):
self.plot("Performance Metrics", "Daily_PNL", self.daily_pnl)
self._last_pnl_plot_time = now
# --- CHECK LIMITS ---
if (self.daily_pnl <= self.daily_loss_limit or
self.daily_pnl >= self.daily_win_limit):
if not self.trading_stopped:
self.trading_stopped = True
self.log(f"Trading stopped for the day. Daily PNL: {self.daily_pnl:.2f}")
self.alerts.send_alert(
f"Daily limit reached. PNL: ${self.daily_pnl:.2f}", "RISK"
)
def IsTradingHours(self):
"""Check if current time is within trading hours (6pm-10pm Paris time)"""
current_time = self.time.time()
if not hasattr(self, '_last_tz_check'):
self._last_tz_check = self.time
self.debug(f"Trading hours check - Current time: {self.time} ({self.time_zone})")
self.debug(f"Trading window: {self.trading_start} - {self.trading_end}")
is_trading = self.trading_start <= current_time <= self.trading_end
# Throttled logging
if hasattr(self, '_last_tz_log_time'):
if (self.time - self._last_tz_log_time).total_seconds() >= 3600:
self.debug(f"Trading hours check: {current_time} - Trading: {is_trading}")
self._last_tz_log_time = self.time
else:
self._last_tz_log_time = self.time
return is_trading
def UpdateCumulativeDelta(self, tick):
"""Enhanced cumulative delta calculation"""
if self.last_price > 0:
if tick.Price > self.last_price:
volume = getattr(tick, 'Quantity', 1)
self.cumulative_delta += volume
elif tick.Price < self.last_price:
volume = getattr(tick, 'Quantity', 1)
self.cumulative_delta -= volume
self.cumulative_delta_bars.add(self.cumulative_delta)
if self.time.second % 30 == 0:
self.plot("Market Data", "Cumulative_Delta", self.cumulative_delta)
self.last_price = tick.Price
def StoreTrainingData(self, variables, h500_bar):
"""Store data for ML model training using the Triple-Barrier Method"""
if variables is None:
return
training_record = {
'variables': variables.copy(),
'bar': h500_bar,
'time': self.time,
'target_1': None,
'target_2': None
}
self.training_data.append(training_record)
TIME_HORIZON_BARS = self.config.get('time_horizon_bars', 10)
if len(self.training_data) > TIME_HORIZON_BARS:
record_to_label = self.training_data[-(TIME_HORIZON_BARS + 1)]
if record_to_label['target_1'] is None:
future_bars = [self.training_data[i]['bar'] for i in range(-TIME_HORIZON_BARS, 0)]
entry_price = record_to_label['bar'].Close
PROFIT_TAKE_TICKS = self.config.get('profit_take_ticks', 12)
STOP_LOSS_TICKS = self.config.get('stop_loss_ticks', 6)
buy_profit_target = entry_price + (PROFIT_TAKE_TICKS * 0.25)
buy_stop_loss = entry_price - (STOP_LOSS_TICKS * 0.25)
sell_profit_target = entry_price - (PROFIT_TAKE_TICKS * 0.25)
sell_stop_loss = entry_price + (STOP_LOSS_TICKS * 0.25)
target_1 = 0
target_2 = 0
for bar in future_bars:
if target_1 == 0 and bar.high >= buy_profit_target:
target_1 = 1
if target_1 == 0 and bar.low <= buy_stop_loss:
break
for bar in future_bars:
if target_2 == 0 and bar.low <= sell_profit_target:
target_2 = 1
if target_2 == 0 and bar.high >= sell_stop_loss:
break
record_to_label['target_1'] = target_1
record_to_label['target_2'] = target_2
def TrainMLModels(self):
"""XGBoost model training with validation and class imbalance handling"""
min_samples = self.config.get('min_training_samples', 1000)
if len(self.training_data) < min_samples:
self.log(f"Insufficient training data: {len(self.training_data)} < {min_samples}")
return
try:
complete_records = [r for r in self.training_data if r['target_1'] is not None and r['target_2'] is not None]
if len(complete_records) < min_samples:
self.log(f"Insufficient complete records: {len(complete_records)}")
return
# Cap training data at 50,000 samples
max_samples = 50000
if len(complete_records) > max_samples:
complete_records = complete_records[-max_samples:]
self.debug(f"Training data capped at {max_samples} samples")
self.log(f"Training data capped at {max_samples} samples")
feature_names_pred1 = [
'upper_wick_H500', 'lower_wick_H500', 'atr_14', 'PPHL_L_10_M1_1',
'PPHL_H_10_M1_1', 'PPHL_H_10_M1_2', 'PPHL_H_5_M1_2', 'PPHL_H_5_M1_3',
'upper_wick_DC', 'lower_wick_DC', 'ratio_volume',
'upper_wick_H5000', 'lower_wick_H5000'
]
feature_names_pred2 = [
'upper_wick_H500', 'lower_wick_H500', 'atr_14', 'PPHL_L_10_M1_1',
'PPHL_L_10_M1_2', 'PPHL_L_10_M1_3', 'PPHL_L_5_M1_1', 'PPHL_H_10_M1_1',
'PPHL_H_10_M1_2', 'PPHL_H_5_M1_1', 'PPHL_H_5_M1_2', 'upper_wick_DC',
'lower_wick_DC', 'ratio_volume', 'upper_wick_H5000', 'lower_wick_H5000'
]
features_1 = [[rec['variables'].get(key, 0) for key in feature_names_pred1] for rec in complete_records]
targets_1 = [rec['target_1'] for rec in complete_records]
features_2 = [[rec['variables'].get(key, 0) for key in feature_names_pred2] for rec in complete_records]
targets_2 = [rec['target_2'] for rec in complete_records]
# Handle class imbalance
counts_1 = np.bincount(targets_1)
scale_pos_weight_1 = counts_1[0] / counts_1[1] if counts_1[1] > 0 else 1
counts_2 = np.bincount(targets_2)
scale_pos_weight_2 = counts_2[0] / counts_2[1] if counts_2[1] > 0 else 1
self.log(f"Buy model imbalance ratio (0s/1s): {scale_pos_weight_1:.2f}")
self.log(f"Sell model imbalance ratio (0s/1s): {scale_pos_weight_2:.2f}")
# Split data
X1_train, X1_test, y1_train, y1_test = train_test_split(features_1, targets_1, test_size=0.2, random_state=42, stratify=targets_1)
X2_train, X2_test, y2_train, y2_test = train_test_split(features_2, targets_2, test_size=0.2, random_state=42, stratify=targets_2)
self.debug(f"Training models with {len(X1_train)} samples, validating with {len(X1_test)} samples.")
# Train Model 1 (Buy signals)
self.ml_model_1 = xgb.XGBClassifier(
n_estimators=250,
max_depth=2,
learning_rate=0.01,
sub_sample=0.8,
scale_pos_weight=scale_pos_weight_1,
random_state=42,
gamma=0.0648,
eval_metric='logloss'
)
self.ml_model_1.fit(X1_train, y1_train)
# Train Model 2 (Sell signals)
self.ml_model_2 = xgb.XGBClassifier(
n_estimators=55,
max_depth=5,
learning_rate=0.01,
scale_pos_weight=scale_pos_weight_2,
random_state=42,
sub_sample=0.869,
gamma=0.333,
eval_metric='logloss'
)
self.ml_model_2.fit(X2_train, y2_train)
# Evaluate on test set
pred_1 = self.ml_model_1.predict(X1_test)
pred_2 = self.ml_model_2.predict(X2_test)
self.log("--- Buy Model Validation Report ---")
self.log(classification_report(y1_test, pred_1))
self.log("--- Sell Model Validation Report ---")
self.log(classification_report(y2_test, pred_2))
self.debug("--- Buy Model Validation Report ---")
self.debug(classification_report(y1_test, pred_1))
self.debug("--- Sell Model Validation Report ---")
self.debug(classification_report(y2_test, pred_2))
except Exception as e:
self.log(f"Error training ML models: {e}")
def on_order_event(self, orderEvent):
"""Enhanced order event handling with detailed logging"""
order_id = orderEvent.order_id
# CRITICAL: Log every order event for debugging
self.debug(f"[ORDER EVENT] ID={order_id}, Status={orderEvent.Status}, " +
f"FillQty={orderEvent.FillQuantity}, FillPrice={orderEvent.FillPrice}")
if orderEvent.Status == OrderStatus.FILLED:
self.debug(f"Order {order_id} filled: {orderEvent.FillQuantity} @ {orderEvent.FillPrice}")
fill_price = orderEvent.FillPrice
self.plot("Predictions + Orders", "ExecutedOrder", fill_price)
# Check if this is an ENTRY order that needs OCO orders
#self.debug(f"order id in active orders? {order_id in self.active_orders}")
if order_id in self.order_manager.active_orders:
order_info = self.order_manager.active_orders[order_id]
# Check if this order needs OCO orders placed
self.debug(f"Needs OCO? {order_info.get('needs_oco')}")
if order_info.get('needs_oco', False):
self.debug(f"[ENTRY FILL] Order {order_id} is an entry order - placing OCO orders")
# Place OCO orders for stop loss and take profit
self.PlaceOCOOrders(orderEvent, order_info)
# Log trade
self.LogTrade(orderEvent, order_info)
# Remove from active orders (now managed by OCO)
del self.order_manager.active_orders[order_id]
self.order_manager.order_times.pop(order_id, None)
# Plot executed order
self.plot("Predictions + Orders", "Executed_Order", 1)
# Send trade alert
if self.config.get('send_trade_alerts', True):
alert_msg = f"Position opened: {order_info['type']} @ {orderEvent.FillPrice}"
self.alerts.send_alert(alert_msg, "TRADE")
else:
self.debug(f"[ENTRY FILL] Order {order_id} doesn't need OCO (already managed)")
# Handle OCO order fills (stop loss or take profit)
elif order_id in self.order_manager.oco_orders:
self.debug(f"[OCO FILL] Order {order_id} is an OCO order (SL or TP)")
oco_info = self.order_manager.oco_orders[order_id]
order_type = "Stop Loss" if oco_info.get('is_stop_loss', False) else "Take Profit"
# Cancel the opposite leg
opposite_id = oco_info['opposite_order_id']
ticket = self.transactions.get_order_ticket(opposite_id)
if ticket and ticket.status not in [OrderStatus.FILLED, OrderStatus.CANCELED]:
self.debug(f"[OCO FILL] Canceling opposite order {opposite_id}")
ticket.cancel("OCO counterpart filled")
# Remove both OCO records from tracking
self.order_manager.oco_orders.pop(opposite_id, None)
self.order_manager.oco_orders.pop(order_id, None)
alert_msg = f"{order_type} executed @ {orderEvent.FillPrice}"
self.alerts.send_alert(alert_msg, "TRADE")
self.debug(alert_msg)
self.log(alert_msg)
# Verify position is flat
current_qty = self.portfolio[self.symb].quantity
if current_qty == 0:
self.debug("[OCO FILL] Position now flat after exit.")
else:
self.debug(f"[OCO FILL] WARNING: Position not flat! Qty={current_qty}")
# Log the exit trade
exit_trade = {
'time': self.time,
'type': 'EXIT',
'reason': order_type,
'fill_price': orderEvent.FillPrice,
'quantity': orderEvent.FillQuantity,
'pnl': self.CalculateExitPNL(orderEvent, oco_info)
}
self.order_manager.trade_journal.append(exit_trade)
else:
self.debug(f"[UNKNOWN FILL] Order {order_id} not found in tracking dicts")
elif orderEvent.Status == OrderStatus.CANCELED:
self.log(f"Order {order_id} canceled: {orderEvent.Message}")
# Clean up canceled orders
if order_id in self.order_manager.active_orders:
self.debug(f"[CANCELED] Removing order {order_id} from active_orders")
del self.order_manager.active_orders[order_id]
if order_id in self.order_manager.order_times:
del self.order_manager.order_times[order_id]
if order_id in self.order_manager.oco_orders:
self.debug(f"[CANCELED] Removing order {order_id} from oco_orders")
del self.order_manager.oco_orders[order_id]
elif orderEvent.Status == OrderStatus.INVALID:
error_msg = f"Invalid order {order_id}: {orderEvent.Message}"
self.log(error_msg)
self.error(error_msg)
if self.config.get('send_error_alerts', True):
self.alerts.send_alert(error_msg, "ERROR")
def PlaceOCOOrders(self, filled_order_event, order_info):
"""Place OCO (stop loss and take profit) orders"""
# Calculate SL/TP based on actual fill price
sl_price = self.stop_loss
tp_price = self.take_profit
self.order_manager.place_oco_orders(self.symb, filled_order_event, order_info, sl_price, tp_price)
def LogTrade(self, order_event, order_info):
"""Log trade information"""
trade_info = {
'time': self.time,
'type': order_info['type'],
'entry_price': order_info.get('entry_price'),
'fill_price': order_event.FillPrice,
'quantity': order_event.FillQuantity,
'pnl': 0 # Will be calculated when position is closed
}
self.order_manager.trade_journal.append(trade_info)
#self.debug(f"[LogTrade] Trade logged: {trade_info}")
def CalculateExitPNL(self, order_event, oco_info):
"""Calculate P&L for exit trades"""
entry_price = oco_info.get('entry_price', 0)
exit_price = order_event.FillPrice
quantity = abs(order_event.FillQuantity)
# Determine if original trade was long or short
is_long = order_event.FillQuantity < 0 # Exit quantity opposite of entry
if is_long:
pnl = (exit_price - entry_price) * quantity
else:
pnl = (entry_price - exit_price) * quantity
return pnl
def EndOfDayTasks(self):
"""End of day tasks - liquidate positions and log statistics"""
if self.is_warming_up:
return
# Cancel all pending orders
self.transactions.cancel_open_orders()
# Liquidate all positions at end of trading day
self.liquidate()
# Clear order tracking
self.order_manager.active_orders.clear()
self.order_manager.order_times.clear()
self.order_manager.oco_orders.clear()
# log daily statistics
self.logDailyStats()
# Send daily summary alert
if self.config.get('send_daily_summary', True):
self.SendDailySummary()
self.log("End of day liquidation completed")
def logDailyStats(self):
"""log daily statistics"""
if self.last_date is None:
return
yesterday_trades = [t for t in self.order_manager.trade_journal
if t['time'].date() == self.last_date]
if yesterday_trades:
winning_trades = len([t for t in yesterday_trades if t.get('pnl', 0) > 0])
losing_trades = len([t for t in yesterday_trades if t.get('pnl', 0) <= 0])
total_pnl = sum([t.get('pnl', 0) for t in yesterday_trades])
daily_stats = {
'date': self.last_date,
'total_trades': len(yesterday_trades),
'winning_trades': winning_trades,
'losing_trades': losing_trades,
'total_pnl': total_pnl,
'win_rate': winning_trades / len(yesterday_trades) if yesterday_trades else 0
}
self.daily_stats[self.last_date] = daily_stats
self.log(f"Daily Stats - Trades: {len(yesterday_trades)}, " +
f"Wins: {winning_trades}, Losses: {losing_trades}, PNL: {total_pnl:.2f}")
def SendDailySummary(self):
"""Send daily summary alert"""
today_trades = [t for t in self.order_manager.trade_journal
if t['time'].date() == self.time.date()]
if today_trades:
total_pnl = sum([t.get('pnl', 0) for t in today_trades])
winning_trades = len([t for t in today_trades if t.get('pnl', 0) > 0])
summary = f"""Daily Summary for {self.time.date()}:
Total Trades: {len(today_trades)}
Winning Trades: {winning_trades}
Total PNL: ${total_pnl:.2f}
Win Rate: {winning_trades/len(today_trades):.1%}
Economic Score: {self.daily_economic_score}"""
self.alerts.send_alert(summary, "DAILY_SUMMARY")
def SetupCharts(self):
# Chart 1: H500 + PPHL(10,10,20) + predictions
chart1 = Chart("H500 + PPHL(10,10,20)")
chart1.add_series(CandlestickSeries("H500 Candles", 0, "$"))
chart1.add_series(Series("PPHL(10,10,20)", SeriesType.CANDLE, 0))
self.pivot_high_10_series = Series("Pivot_High_10", SeriesType.LINE, "$")
self.pivot_high_10_series.color = Color.BLUE
self.pivot_low_10_series = Series("Pivot_Low_10", SeriesType.LINE, "$")
self.pivot_low_10_series.color = Color.RED
chart1.add_series(self.pivot_high_10_series)
chart1.add_series(self.pivot_low_10_series)
# Prediction markers
prediction1_series_chart1 = Series("Prediction1", SeriesType.SCATTER, "$", Color.GREEN, ScatterMarkerSymbol.TRIANGLE)
chart1.add_series(prediction1_series_chart1)
prediction2_series_chart1 = Series("Prediction2", SeriesType.SCATTER, "$", Color.RED, ScatterMarkerSymbol.TRIANGLE_DOWN)
chart1.add_series(prediction2_series_chart1)
self.add_chart(chart1)
# Chart 2: H500 + PPHL(5,5,20) + predictions
chart2 = Chart("H500 + PPHL(5,5,20)")
chart2.add_series(CandlestickSeries("H500 Candles", 0, "$"))
chart2.add_series(Series("PPHL(5,5,20)", SeriesType.CANDLE, 0))
self.pivot_high_5_series = Series("Pivot_High_5", SeriesType.LINE, "$")
self.pivot_high_5_series.color = Color.BLUE
self.pivot_low_5_series = Series("Pivot_Low_5", SeriesType.LINE, "$")
self.pivot_low_5_series.color = Color.RED
chart2.add_series(self.pivot_high_5_series)
chart2.add_series(self.pivot_low_5_series)
prediction1_series_chart2 = Series("Prediction1", SeriesType.SCATTER, "$", Color.GREEN, ScatterMarkerSymbol.TRIANGLE)
chart2.add_series(prediction1_series_chart2)
prediction2_series_chart2 = Series("Prediction2", SeriesType.SCATTER, "$", Color.RED, ScatterMarkerSymbol.TRIANGLE_DOWN)
chart2.add_series(prediction2_series_chart2)
self.add_chart(chart2)
# Chart 3: Predictions + executed orders
chart3 = Chart("Predictions + Orders")
prediction1_series_chart3 = Series("Prediction1", SeriesType.SCATTER, "$", Color.GREEN, ScatterMarkerSymbol.TRIANGLE)
chart3.add_series(prediction1_series_chart3)
prediction2_series_chart3 = Series("Prediction2", SeriesType.SCATTER, "$", Color.RED, ScatterMarkerSymbol.TRIANGLE_DOWN)
chart3.add_series(prediction2_series_chart3)
executed_order_series = Series("ExecutedOrder", SeriesType.SCATTER, "$", Color.BLUE, ScatterMarkerSymbol.CIRCLE)
chart3.add_series(executed_order_series)
self.add_chart(chart3)
# Chart 4: Performance Metrics
chart4 = Chart("Performance Metrics")
series_info_4 = [
("Daily_PNL", SeriesType.LINE, Color.GREEN),
("Cumulative_PNL", SeriesType.LINE, Color.BLUE)
]
for name, stype, color in series_info_4:
s = Series(name, stype)
s.color = color
chart4.add_series(s)
self.add_chart(chart4)
# Chart 5: Market Data
chart5 = Chart("Market Data")
series_info_5 = [
("Volume", SeriesType.LINE, Color.PURPLE),
("ATR", SeriesType.LINE, Color.ORANGE),
("Cumulative_Delta", SeriesType.LINE, Color.CYAN)
]
for name, stype, color in series_info_5:
s = Series(name, stype)
s.color = color
chart5.add_series(s)
self.add_chart(chart5)
# Chart 6: Economic Filter
chart6 = Chart("Economic Filter")
s = Series("Blocked", SeriesType.SCATTER)
s.color = Color.RED
chart6.add_series(s)
self.add_chart(chart6)
def on_end_of_algorithm(self):
"""Enhanced end of algorithm reporting"""
self.log("=== ALGORITHM COMPLETED ===")
# Send final summary alert
final_summary = f"""Algorithm Completed:
Total Runtime: {(self.end_date - self.start_date).days} days
Total Conflicting Predictions: {self.conflicting_predictions_count}
Total Non Conflicting BUY signals: {self.buy_predictions_count}
Total Non Conflicting SELL signals: {self.sell_predictions_count}
Total Trades: {len(self.order_manager.trade_journal)}
Final portfolio Value: ${self.portfolio.total_portfolio_value:.2f}
Economic Filter Usage: Threshold {self.economic_filter_threshold}"""
self.alerts.send_alert(final_summary, "FINAL")
class CustomTickDataFilter(SecurityDataFilter):
"""
Removes outlier ticks before they reach the main logic.
1. Price filter : Rejects ticks > `price_threshold` away from the last trade price.
2. Volume filter : Rejects ticks whose size > `volume_multiplier` × avg size of the
last `volume_lookback` accepted ticks.
Adjust the three constructor parameters to fine-tune the behaviour.
"""
def __init__(self,
algorithm: QCAlgorithm,
price_threshold: float = 0.03, # 3.0% price deviation
volume_multiplier: float = 10.0, # 10× average tick volume
volume_lookback: int = 100) -> None:
self._algorithm = algorithm
self._price_threshold = price_threshold
self._volume_multiplier = volume_multiplier
self._volume_lookback = volume_lookback
self._last_price: dict[Symbol, float] = {}
self._volume_windows: dict[Symbol, list[int]] = {}
def filter(self, security: Security, data: BaseData) -> bool:
if not isinstance(data, Tick):
return True # only filter Tick objects
symbol = security.symbol
tick: Tick = data
# 1. Price Outlier Filter
ref_price = self._last_price.get(symbol)
if ref_price is None or ref_price <= 0:
ref_price = security.price if security.price > 0 else tick.last_price
# abs_diff = abs(tick.last_price - ref_price)
# if abs_diff > 100: # absolute units
# self._algorithm.debug(
# f"{self._algorithm.time} | {symbol}: price outlier "
# f"{tick.last_price:.2f} ({abs_diff}) removed"
# )
# return False
if ref_price > 0:
deviation = abs(tick.last_price - ref_price) / ref_price
if deviation > self._price_threshold:
self._algorithm.debug(
f"{self._algorithm.time} | {symbol}: price outlier "
f"{tick.last_price:.2f} ({deviation:.2%}) removed"
)
return False
# 2. Volume Outlier Filter
window = self._volume_windows.get(symbol, [])
# if window:
# avg_vol = sum(window) / len(window)
# if avg_vol > 0 and tick.quantity > self._volume_multiplier * avg_vol:
# # self._algorithm.debug(
# # f"{self._algorithm.time} | {symbol}: volume outlier "
# # f"{tick.quantity} (> {self._volume_multiplier:.0f}× {avg_vol:.2f}) removed"
# # )
# return False
# 3. Update Internal State (accepted ticks advance windows)
window.append(int(tick.quantity))
if len(window) > self._volume_lookback:
window.pop(0)
self._volume_windows[symbol] = window
self._last_price[symbol] = tick.last_price
return Truefrom AlgorithmImports import *
from datetime import timedelta
class OrderManager:
"""Handles all order-related operations for the trading strategy"""
def __init__(self, algorithm):
"""
Initialize the OrderManager
Args:
algorithm: Reference to the main QCAlgorithm instance
"""
self.algo = algorithm
self.active_orders = {}
self.order_times = {}
self.oco_orders = {} # Track OCO (One-Cancels-Other) orders
self.trade_journal = []
def can_place_order(self, symbol):
"""
Check if we can place a new entry order
Args:
symbol: The trading symbol
Returns:
bool: True if order can be placed, False otherwise
"""
# Check position
current_pos = abs(self.algo.portfolio[symbol].quantity)
if current_pos != 0:
return False
# Check for active OCO orders (position being managed)
if len(self.oco_orders) > 0:
return False
# Check for ANY pending entry orders
if len(self.active_orders) > 0:
return False
return True
def place_buy_order(self, symbol, entry_price, sl_price, tp_price):
"""
Place limit buy order
Args:
symbol: The trading symbol
entry_price: Entry price for the limit order
sl_price: Stop loss price
tp_price: Take profit price
"""
order_ticket = self.algo.limit_order(symbol, 1, entry_price)
if order_ticket:
self.active_orders[order_ticket.order_id] = {
'type': 'BUY',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'time': self.algo.time,
'needs_oco': True # CRITICAL FLAG
}
self.algo.log(f"Buy limit order submitted (ID: {order_ticket.order_id}) at {entry_price}, SL: {sl_price}, TP: {tp_price}")
self.algo.debug(f"Buy limit order submitted (ID: {order_ticket.order_id})")
self.algo.plot("Predictions + Orders", "Buy_Signal", 1)
return order_ticket
return None
def place_sell_order(self, symbol, entry_price, sl_price, tp_price):
"""
Place limit sell order
Args:
symbol: The trading symbol
entry_price: Entry price for the limit order
sl_price: Stop loss price
tp_price: Take profit price
"""
order_ticket = self.algo.limit_order(symbol, -1, entry_price)
if order_ticket:
self.active_orders[order_ticket.order_id] = {
'type': 'SELL',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'time': self.algo.time,
'needs_oco': True # CRITICAL FLAG
}
self.algo.log(f"Sell limit order submitted (ID: {order_ticket.order_id}) at {entry_price}, SL: {sl_price}, TP: {tp_price}")
self.algo.debug(f"Sell limit order submitted (ID: {order_ticket.order_id})")
self.algo.plot("Predictions + Orders", "Sell_Signal", 1)
return order_ticket
return None
def place_buy_stop_order(self,
symbol,
stop_price,
sl_price,
tp_price,
quantity: int = 1,
use_stop_limit: bool = False,
limit_offset: float = 0.0):
"""
Place a stop (buy) entry order (buy when price rises to stop_price).
Args:
symbol: Symbol to trade
stop_price: Price at which the stop triggers (entry)
sl_price: Stop loss price (for later OCO)
tp_price: Take profit price (for later OCO)
quantity: Number of contracts/shares to buy (positive integer)
use_stop_limit: If True, place a stop-limit order instead of a stop-market
limit_offset: If use_stop_limit True, limit_price = stop_price + limit_offset
Returns:
OrderTicket or None
"""
# basic guard — respect overall order-placement policy
if not self.can_place_order(symbol):
self.algo.debug(f"Cannot place buy stop for {symbol} — placement blocked by can_place_order()")
return None
# Ensure quantity positive for a buy
if quantity <= 0:
self.algo.error("place_buy_stop_order: quantity must be positive for a buy")
return None
if use_stop_limit:
limit_price = stop_price + limit_offset
order_ticket = self.algo.stop_limit_order(symbol, quantity, stop_price, limit_price)
else:
order_ticket = self.algo.stop_market_order(symbol, quantity, stop_price)
if order_ticket:
self.active_orders[order_ticket.order_id] = {
'type': 'BUY_STOP',
'entry_price': stop_price,
'sl_price': sl_price,
'tp_price': tp_price,
'time': self.algo.time,
'needs_oco': True,
'quantity': quantity
}
self.algo.log(f"Buy stop order submitted (ID: {order_ticket.order_id}) at stop {stop_price}, SL: {sl_price}, TP: {tp_price}")
self.algo.debug(f"Buy stop order submitted (ID: {order_ticket.order_id})")
self.algo.plot("Predictions + Orders", "Buy_Signal", 1)
return order_ticket
self.algo.error(f"Failed to submit buy stop order for {symbol} at {stop_price}")
return None
def place_sell_stop_order(self,
symbol,
stop_price,
sl_price,
tp_price,
quantity: int = 1,
use_stop_limit: bool = False,
limit_offset: float = 0.0):
"""
Place a stop (sell) entry order (sell/short when price falls to stop_price).
Args:
symbol: Symbol to trade
stop_price: Price at which the stop triggers (entry)
sl_price: Stop loss price (for later OCO)
tp_price: Take profit price (for later OCO)
quantity: Number of contracts/shares to short (positive integer; will be sent as negative quantity)
use_stop_limit: If True, place a stop-limit order instead of a stop-market
limit_offset: If use_stop_limit True, limit_price = stop_price - limit_offset
Returns:
OrderTicket or None
"""
# basic guard — respect overall order-placement policy
if not self.can_place_order(symbol):
self.algo.debug(f"Cannot place sell stop for {symbol} — placement blocked by can_place_order()")
return None
if quantity <= 0:
self.algo.error("place_sell_stop_order: quantity must be positive (will be submitted as negative to short/sell)")
return None
# For a sell/short we submit a negative quantity
submit_qty = -abs(quantity)
if use_stop_limit:
# For a sell stop-limit you generally set the limit below the stop (user-provided offset)
limit_price = stop_price - limit_offset
order_ticket = self.algo.stop_limit_order(symbol, submit_qty, stop_price, limit_price)
else:
order_ticket = self.algo.stop_market_order(symbol, submit_qty, stop_price)
if order_ticket:
self.active_orders[order_ticket.order_id] = {
'type': 'SELL_STOP',
'entry_price': stop_price,
'sl_price': sl_price,
'tp_price': tp_price,
'time': self.algo.time,
'needs_oco': True,
'quantity': submit_qty
}
self.algo.log(f"Sell stop order submitted (ID: {order_ticket.order_id}) at stop {stop_price}, SL: {sl_price}, TP: {tp_price}")
self.algo.debug(f"Sell stop order submitted (ID: {order_ticket.order_id})")
self.algo.plot("Predictions + Orders", "Sell_Signal", 1)
return order_ticket
self.algo.error(f"Failed to submit sell stop order for {symbol} at {stop_price}")
return None
def place_oco_orders(self, symbol, filled_order_event, order_info, stop_loss, take_profit):
"""
Place OCO (stop loss and take profit) orders
Args:
symbol: The trading symbol
filled_order_event: The filled entry order event
order_info: Information about the entry order
stop_loss: Stop loss distance in points
take_profit: Take profit distance in points
"""
try:
quantity = -filled_order_event.FillQuantity
fill_price = filled_order_event.FillPrice
self.algo.debug(f"[PlaceOCOOrders] Entry: type={order_info['type']}, fill={fill_price}, qty={quantity}")
# Calculate SL/TP based on actual fill price
if order_info['type'] == 'BUY':
sl_price = fill_price - stop_loss
tp_price = fill_price + take_profit
else: # SELL
sl_price = fill_price + stop_loss
tp_price = fill_price - take_profit
self.algo.debug(f"[PlaceOCOOrders] Calculated SL={sl_price}, TP={tp_price}")
# Validate SL/TP make sense
if not self._validate_oco_prices(order_info['type'], fill_price, sl_price, tp_price):
return False
# Place stop loss order
self.algo.debug(f"[PlaceOCOOrders] Placing stop market order: qty={quantity}, stop={sl_price}")
sl_ticket = self.algo.stop_market_order(symbol, quantity, sl_price)
# Place take profit order
self.algo.debug(f"[PlaceOCOOrders] Placing limit order: qty={quantity}, limit={tp_price}")
tp_ticket = self.algo.limit_order(symbol, quantity, tp_price)
if sl_ticket and tp_ticket:
self.algo.debug(f"[PlaceOCOOrders] Both orders placed: SL_ID={sl_ticket.order_id}, TP_ID={tp_ticket.order_id}")
self.oco_orders[sl_ticket.order_id] = {
'opposite_order_id': tp_ticket.order_id,
'is_stop_loss': True,
'entry_order_id': filled_order_event.order_id,
'entry_price': fill_price
}
self.oco_orders[tp_ticket.order_id] = {
'opposite_order_id': sl_ticket.order_id,
'is_stop_loss': False,
'entry_order_id': filled_order_event.order_id,
'entry_price': fill_price
}
success_msg = f"OCO orders placed: Entry={fill_price:.2f}, SL={sl_price:.2f}, TP={tp_price:.2f}"
self.algo.log(success_msg)
self.algo.debug(success_msg)
return True
else:
error_msg = f"Failed to place OCO orders: sl_ticket={sl_ticket}, tp_ticket={tp_ticket}"
self.algo.error(error_msg)
return False
except Exception as e:
error_msg = f"Exception in PlaceOCOOrders: {str(e)}"
self.algo.error(error_msg)
import traceback
self.algo.debug(traceback.format_exc())
return False
def _validate_oco_prices(self, order_type, fill_price, sl_price, tp_price):
"""
Validate that OCO prices make sense
Args:
order_type: 'BUY' or 'SELL'
fill_price: Entry fill price
sl_price: Stop loss price
tp_price: Take profit price
Returns:
bool: True if valid, False otherwise
"""
if order_type == 'BUY':
if sl_price >= fill_price:
error_msg = f"Invalid SL for BUY: Fill={fill_price}, SL={sl_price}"
self.algo.error(error_msg)
return False
if tp_price <= fill_price:
error_msg = f"Invalid TP for BUY: Fill={fill_price}, TP={tp_price}"
self.algo.error(error_msg)
return False
else: # SELL
if sl_price <= fill_price:
error_msg = f"Invalid SL for SELL: Fill={fill_price}, SL={sl_price}"
self.algo.error(error_msg)
return False
if tp_price >= fill_price:
error_msg = f"Invalid TP for SELL: Fill={fill_price}, TP={tp_price}"
self.algo.error(error_msg)
return False
return True
def manage_orders(self, max_positions=1):
"""
Manage active orders - cancel expired and manage OCO orders
Args:
max_positions: Maximum number of allowed positions
"""
# Cancel expired orders
self.cancel_expired_orders()
# Manage OCO orders
self.manage_oco_orders()
def cancel_expired_orders(self, timeout_seconds=30):
"""
Cancel orders that have been active for more than timeout period
Args:
timeout_seconds: Timeout in seconds (default: 60)
"""
current_time = self.algo.time
expired_orders = []
for order_id, info in self.active_orders.items():
order_time = info.get('time', current_time)
age_seconds = (current_time - order_time).total_seconds()
if age_seconds > timeout_seconds:
self.algo.debug(f"Order {order_id} expired (age {age_seconds:.1f}s)")
expired_orders.append(order_id)
for order_id in expired_orders:
ticket = self.algo.transactions.get_order_ticket(order_id)
if ticket and ticket.status in [OrderStatus.SUBMITTED, OrderStatus.PARTIALLY_FILLED]:
ticket.cancel("Timeout expired")
# Clean up tracking
self.active_orders.pop(order_id, None)
self.order_times.pop(order_id, None)
def manage_oco_orders(self):
"""Manage OCO (One-Cancels-Other) orders for stop loss and take profit"""
filled_orders = []
for order_id, oco_info in self.oco_orders.items():
order_ticket = self.algo.transactions.get_order_ticket(order_id)
if order_ticket and order_ticket.status == OrderStatus.FILLED:
filled_orders.append(order_id)
# Cancel the opposite order
opposite_order_id = oco_info.get('opposite_order_id')
if opposite_order_id:
self.algo.transactions.cancel_order(opposite_order_id)
if opposite_order_id in self.oco_orders:
del self.oco_orders[opposite_order_id]
# Remove filled orders from OCO tracking
for order_id in filled_orders:
if order_id in self.oco_orders:
del self.oco_orders[order_id]
def log_trade(self, order_event, order_info):
"""
Log trade information
Args:
order_event: The order event
order_info: Information about the order
"""
trade_info = {
'time': self.algo.time,
'type': order_info['type'],
'entry_price': order_info.get('entry_price'),
'fill_price': order_event.FillPrice,
'quantity': order_event.FillQuantity,
'pnl': 0 # Will be calculated when position is closed
}
self.trade_journal.append(trade_info)
def calculate_exit_pnl(self, order_event, oco_info):
"""
Calculate P&L for exit trades
Args:
order_event: The exit order event
oco_info: OCO order information
Returns:
float: The calculated PNL
"""
entry_price = oco_info.get('entry_price', 0)
exit_price = order_event.FillPrice
quantity = abs(order_event.FillQuantity)
# Determine if original trade was long or short
is_long = order_event.FillQuantity < 0 # Exit quantity opposite of entry
if is_long:
pnl = (exit_price - entry_price) * quantity
else:
pnl = (entry_price - exit_price) * quantity
return pnl
def end_of_day_cleanup(self):
"""Clear all order tracking at end of day"""
self.active_orders.clear()
self.order_times.clear()
self.oco_orders.clear()
self.algo.log("Order tracking cleared for end of day")from AlgorithmImports import *
from datetime import datetime, timedelta
import json
import numpy as np
from typing import Dict, List, Optional
from datetime import date
class ConfigurationManager:
"""Manage strategy configuration with defaults and overrides"""
def __init__(self):
self.config = {
# Trading hours
'trading_start_hour': 18, # 6 PM Paris time
'trading_start_minute': 0,
'trading_end_hour': 22, # 10 PM Paris time
'trading_end_minute': 0,
# Risk management
'daily_loss_limit': -500,
'daily_win_limit': 100,
'stop_loss': 12.5,
'take_profit': 12.5,
'entry_offset': 2.5,
'max_positions': 1,
# Consolidator settings
'h500_trade_count': 500,
'h5000_trade_count': 5000,
# Indicator parameters
'atr_period': 14,
'pivot_10_params': [10, 10, 20], # left_bars, right_bars, lookback
'pivot_5_params': [5, 5, 20], # left_bars, right_bars, lookback
'volume_sma_period': 15,
# ML model settings
'min_training_samples': 50000,
'buy_prediction_threshold': 0.6480,
'sell_prediction_threshold': 0.5533,
"load_models_from_store": True,
# Triple-Barrier method for ML training target calculations
'profit_take_ticks': 12.0,
'stop_loss_ticks': 6.0,
'time_horizon_bars': 10,
# Economic filter
'economic_filter_threshold': 7, # Impact score threshold for blocking trades
# Alert settings
'send_trade_alerts': True,
'send_error_alerts': True,
'send_daily_summary': True,
'alert_webhook_url': '', # Optional webhook for external alerts
# Performance tracking
'log_feature_importance': True,
'export_results_daily': True,
}
def get(self, key, default=None):
"""Get configuration value with optional default"""
return self.config.get(key, default)
def set(self, key, value):
"""Set configuration value"""
self.config[key] = value
def update_from_dict(self, config_dict):
"""Update configuration from dictionary"""
self.config.update(config_dict)
def load_from_json(self, json_string):
"""Load configuration from JSON string"""
try:
config_dict = json.loads(json_string)
self.update_from_dict(config_dict)
return True
except Exception as e:
print(f"Error loading configuration: {str(e)}")
return False
def to_json(self):
"""Export configuration as JSON string"""
return json.dumps(self.config, indent=2)
class AlertSystem:
"""Handle alerts and notifications for the trading system"""
def __init__(self, algorithm):
self.algorithm = algorithm
self.alerts = []
self.alert_levels = {
'INFO': 0,
'TRADE': 1,
'RISK': 2,
'ERROR': 3,
'TRAINING': 1,
'DAILY_SUMMARY': 1,
'FINAL': 2
}
self._last_alert = None
self._last_alert_time = None
self._cooldown_seconds = 30 # don’t resend same alert within 30s
# def send_alert(self, message, level='INFO', send_webhook=False):
# """Send alert with specified level"""
# timestamp = self.algorithm.Time if self.algorithm else datetime.now()
# alert = {
# 'timestamp': timestamp,
# 'level': level,
# 'message': message
# }
# self.alerts.append(alert)
# # Log to algorithm
# if self.algorithm:
# self.algorithm.log(f"[{level}] {message}")
# # Send webhook if configured
# if send_webhook and self.algorithm:
# webhook_url = self.algorithm.config.get('alert_webhook_url')
# if webhook_url:
# self._send_webhook(webhook_url, alert)
def send_alert(self, message: str, level: str = "INFO", use_backup=False):
"""
Send alert to Telegram and log it to algorithm.
:param message: The message text
:param level: INFO | WARNING | ERROR
:param use_backup: if True, send to backup channel/bot
"""
bot_token = "8409848732:AAHgRZ4TFcbdv68V5McNANmpknkT47R9RUU" # Apoll.
# Prepare timestamp
timestamp = (
self.algorithm.time if self.algorithm else datetime.now
)
formatted = f"[{timestamp:%Y-%m-%d %H:%M:%S}] [{level}] {message}"
if self.algorithm:
self.algorithm.log(f"[{level}] {message}")
# Deduplicate & rate-limit
if (
self._last_alert == message
and self._last_alert_time
and (datetime.now - self._last_alert_time).total_seconds()
< self._cooldown_seconds
):
if self.algorithm:
self.algorithm.debug("Skipped duplicate alert within cooldown.")
return
if not bot_token:
if self.algorithm:
self.algorithm.debug("Telegram bot token not configured.")
return
# Send Telegram notification
try:
self.algorithm.notify.telegram(id="-3091412874", message=message, token=bot_token) # Apoll.
self._last_alert = message
self._last_alert_time = self.algorithm.time
except Exception as e:
if self.algorithm:
self.algorithm.debug(f"Failed to send Telegram alert: {e}")
def get_alerts_by_level(self, level):
"""Get all alerts of a specific level"""
return [a for a in self.alerts if a['level'] == level]
def get_recent_alerts(self, count=10):
"""Get most recent alerts"""
return self.alerts[-count:] if len(self.alerts) > count else self.alerts
def clear_alerts(self):
"""Clear all stored alerts"""
self.alerts = []
class EconomicCalendar:
"""Track economic announcements and their impact on trading"""
def __init__(self):
# Economic events with their impact scores (1-10 scale)
self.events = {
'NFP': {'impact': 10, 'day_of_week': 4, 'week_of_month': 1}, # First Friday
'FOMC': {'impact': 10, 'frequency': 'monthly'},
'CPI': {'impact': 9, 'frequency': 'monthly'},
'GDP': {'impact': 8, 'frequency': 'quarterly'},
'Retail_Sales': {'impact': 7, 'frequency': 'monthly'},
'PMI': {'impact': 6, 'frequency': 'monthly'},
'Consumer_Confidence': {'impact': 5, 'frequency': 'monthly'},
'Housing_Starts': {'impact': 4, 'frequency': 'monthly'},
'Jobless_Claims': {'impact': 3, 'frequency': 'weekly', 'day_of_week': 3} # Thursday
}
# Pre-defined high-impact dates (can be loaded from external source)
self.high_impact_dates = self._load_high_impact_dates()
# Cache for daily scores
self.score_cache = {}
def _load_high_impact_dates(self):
"""Load predefined high-impact economic event dates"""
# This would normally load from an external data source
# For now enter manual dates and impact level
high_impact_dates = {
datetime(2025, 10, 5).date(): 9, # NFP
datetime(2025, 10, 11).date(): 8, # CPI
datetime(2025, 10, 31).date(): 10, # FOMC
datetime(2025, 11, 2).date(): 9, # NFP
datetime(2025, 11, 13).date(): 8, # CPI
datetime(2025, 12, 8).date(): 9, # NFP
datetime(2025, 12, 12).date(): 8, # CPI
datetime(2025, 12, 20).date(): 10, # FOMC
}
return high_impact_dates
def get_impact_score(self, date_value):
"""Get economic impact score for a specific date."""
# Normalize to datetime.date
if isinstance(date_value, date):
date_obj = date_value
else:
raise TypeError(f"Unsupported type for date: {type(date_value)}")
# Check cache first
if date_obj in self.score_cache:
return self.score_cache[date_obj]
# Check predefined high-impact dates
if date_obj in self.high_impact_dates:
score = self.high_impact_dates[date_obj]
else:
# Calculate score based on regular schedule
score = self._calculate_regular_score(date_obj)
# Cache and return
self.score_cache[date_obj] = score
return score
def _calculate_regular_score(self, date):
"""Calculate impact score based on regular economic calendar"""
score = 0
weekday = date.weekday()
# Check for weekly events
if weekday == 3: # Thursday - Jobless Claims
score = max(score, self.events['Jobless_Claims']['impact'])
# Check for NFP (first Friday of month)
if weekday == 4 and 1 <= date.day <= 7:
score = max(score, self.events['NFP']['impact'])
# Add logic for other regular events as needed
# This is simplified - in production, you'd have more sophisticated scheduling
return score
def is_high_impact_day(self, date, threshold=7):
"""Check if a date has high economic impact"""
return self.get_impact_score(date) >= threshold
def get_upcoming_events(self, current_date, days_ahead=7):
"""Get upcoming high-impact events"""
upcoming = []
for i in range(days_ahead):
check_date = current_date + timedelta(days=i)
score = self.get_impact_score(check_date)
if score >= 5: # Medium impact or higher
upcoming.append({
'date': check_date,
'impact_score': score
})
return upcoming
def add_custom_event(self, date, impact_score):
"""Add a custom economic event"""
if not isinstance(date, datetime):
date = date.date() if hasattr(date, 'date') else date
self.high_impact_dates[date] = impact_score
# Clear cache for this date
if date in self.score_cache:
del self.score_cache[date]
class RiskManager:
"""Advanced risk management utilities"""
def __init__(self, config):
self.config = config
self.position_sizes = []
self.consecutive_losses = 0
self.consecutive_wins = 0
self.daily_trades = 0
self.last_trade_time = None
def calculate_position_size(self, account_value, volatility, risk_per_trade=0.01):
"""Calculate position size based on Kelly Criterion or fixed risk"""
max_risk = account_value * risk_per_trade
# Adjust for volatility
if volatility > 0:
position_size = max_risk / volatility
else:
position_size = 1 # Default to 1 contract
# Apply limits
max_contracts = self.config.get('max_positions', 1)
position_size = min(position_size, max_contracts)
return int(position_size)
def should_reduce_risk(self):
"""Determine if risk should be reduced based on recent performance"""
# Reduce risk after consecutive losses
if self.consecutive_losses >= 3:
return True
# Reduce risk if too many trades in a day
max_daily_trades = self.config.get('max_daily_trades', 10)
if self.daily_trades >= max_daily_trades:
return True
return False
def update_trade_result(self, is_win):
"""Update consecutive wins/losses"""
if is_win:
self.consecutive_wins += 1
self.consecutive_losses = 0
else:
self.consecutive_losses += 1
self.consecutive_wins = 0
self.daily_trades += 1
def reset_daily_counters(self):
"""Reset daily trade counters"""
self.daily_trades = 0
def get_risk_adjustment_factor(self):
"""Get risk adjustment factor based on current conditions"""
factor = 1.0
# Reduce after losses
if self.consecutive_losses >= 2:
factor *= 0.5
elif self.consecutive_losses >= 3:
factor *= 0.25
# Increase after wins (but conservatively)
if self.consecutive_wins >= 3:
factor *= 1.25
return min(max(factor, 0.25), 2.0) # Keep between 0.25x and 2x
# class TradeBar:
# """Custom TradeBar class for consolidator compatibility"""
# def __init__(self, time, symbol, open_price, high, low, close, volume):
# self.Time = time
# self.Symbol = symbol
# self.Open = open_price
# self.High = high
# self.Low = low
# self.Close = close
# self.Volume = volume
# self.Value = close # For compatibility with some indicators
# def __str__(self):
# return f"TradeBar({self.Time}, O:{self.Open:.2f}, H:{self.High:.2f}, L:{self.Low:.2f}, C:{self.Close:.2f}, V:{self.Volume})"
# class IndicatorDataPoint:
# """Data point for indicators"""
# def __init__(self, time, value):
# self.Time = time
# self.Value = value
class FeatureCalculator:
"""
Robust feature builder for your algorithm.
Key fixes:
- Use the most-recent pivots ([:3]) instead of slicing [-3:].
- Use a reference price from the same timeframe as the pivot (M1, H5000, or H500).
- Provide extensive debug output so you can see why features are zero.
- Optional normalization by ATR (set NORMALIZE_BY_ATR = True).
"""
NORMALIZE_BY_ATR = False # set True to divide distances by atr_14 (recommended for ML)
def __init__(self, algorithm):
self.algorithm = algorithm
def calculate_variables(self, h500_bar):
# Warmup guards (same as you had)
if (self.algorithm.h500_bars.count < 50 or
self.algorithm.m1_bars.count < 50 or
self.algorithm.h5000_bars.count < 10 or
not getattr(self.algorithm, "atr_14", None) or
not getattr(self.algorithm.atr_14, "IsReady", False) and not getattr(self.algorithm.atr_14, "is_ready", False)):
#self.algorithm.Debug("FeatureCalculator: warmup not finished - returning None")
return None
vars = {}
# use the same property names you use elsewhere (case-sensitive in QC)
vars['upper_wick_H500'] = h500_bar.High - max(h500_bar.Open, h500_bar.Close)
vars['lower_wick_H500'] = min(h500_bar.Open, h500_bar.Close) - h500_bar.Low
# ATR access - compatible with both naming conventions .Current.Value or .Current.Value
try:
atr_val = float(self.algorithm.atr_14.Current.Value)
except Exception:
# fallback if property name differs
atr_val = float(getattr(self.algorithm.atr_14, "Current", getattr(self.algorithm.atr_14, "current", 0)).Value if getattr(self.algorithm.atr_14, "Current", None) else 0.0)
vars['atr_14'] = atr_val
# Build pivot distance groups and merge
d10 = self._calc_pivot_distances(self.algorithm.pivot_10_10_20_m1, h500_bar, suffix="10_M1")
d5_m1 = self._calc_pivot_distances(self.algorithm.pivot_5_5_20_m1, h500_bar, suffix="5_M1")
d5_h5000 = self._calc_pivot_distances(self.algorithm.pivot_5_5_20_h5000, h500_bar, suffix="5_H5000")
vars.update(d10)
vars.update(d5_m1)
vars.update(d5_h5000)
# DC and volume features
vars['upper_wick_DC'] = self.calculate_upper_wick_dc()
vars['lower_wick_DC'] = self.calculate_lower_wick_dc()
vars['ratio_volume'] = self.calculate_volume_ratio()
h5000_bar = self.algorithm.h5000_bars[0]
vars['upper_wick_H5000'] = h5000_bar.High - max(h5000_bar.Open, h5000_bar.Close)
vars['lower_wick_H5000'] = min(h5000_bar.Open, h5000_bar.Close) - h5000_bar.Low
#self.algorithm.Debug(f"Vars computed: {vars}")
return vars
def _calc_pivot_distances(self, pivot_indicator, h500_bar, suffix):
"""
Robust pivot-distance calculator.
Returns dict with keys:
- PPHL_H_{suffix}_1..3 and PPHL_L_{suffix}_1..3
"""
# default zero dict
zero_dict = {f"PPHL_H_{suffix}_{i}": 0.0 for i in range(1,4)}
zero_dict.update({f"PPHL_L_{suffix}_{i}": 0.0 for i in range(1,4)})
if pivot_indicator is None:
#self.algorithm.Debug(f"PivotIndicator ({suffix}) is None -> returning zeros")
return zero_dict
# is-ready check (support both naming conventions)
is_ready = getattr(pivot_indicator, "IsReady", None)
if is_ready is None:
is_ready = getattr(pivot_indicator, "is_ready", False)
if not is_ready:
#self.algorithm.Debug(f"PivotIndicator ({suffix}) not ready -> returning zeros")
return zero_dict
# Grab pivot lists safely
high_pivots = []
low_pivots = []
if hasattr(pivot_indicator, "high_pivots") and pivot_indicator.high_pivots:
# pivot_indicator.high_pivots is sorted most-recent-first in your indicator
# we want the most recent up to 3
for p in pivot_indicator.high_pivots[:3]:
if isinstance(p, dict):
high_pivots.append(float(p.get("price", p.get("value", 0))))
else:
high_pivots.append(float(p))
if hasattr(pivot_indicator, "low_pivots") and pivot_indicator.low_pivots:
for p in pivot_indicator.low_pivots[:3]:
if isinstance(p, dict):
low_pivots.append(float(p.get("price", p.get("value", 0))))
else:
low_pivots.append(float(p))
# pad to length 3 (most recent first)
while len(high_pivots) < 3:
high_pivots.append(0.0)
while len(low_pivots) < 3:
low_pivots.append(0.0)
# choose reference price from matching timeframe:
# M1 pivots -> use m1_bars latest; H5000 pivots -> use h5000_bars latest;
# otherwise fallback to provided h500_bar
if "M1" in suffix:
ref_bar = self.algorithm.m1_bars[0] if self.algorithm.m1_bars.count > 0 else None
elif "H5000" in suffix:
ref_bar = self.algorithm.h5000_bars[0] if self.algorithm.h5000_bars.count > 0 else None
else:
ref_bar = h500_bar
if ref_bar is None:
#self.algorithm.Debug(f"PivotIndicator ({suffix}): reference bar missing -> returning zeros")
return zero_dict
ref_high = getattr(ref_bar, "High", getattr(ref_bar, "high", None))
ref_low = getattr(ref_bar, "Low", getattr(ref_bar, "low", None))
if ref_high is None or ref_low is None:
#self.algorithm.Debug(f"PivotIndicator ({suffix}): reference bar has no High/Low -> returning zeros")
return zero_dict
# compute distances: pivot_price - reference_price (positive means pivot above current reference)
distances = {}
for i, pivot in enumerate(high_pivots[:3]):
key = f"PPHL_H_{suffix}_{i+1}"
if pivot == 0.0:
distances[key] = 0.0
else:
raw = pivot - ref_high
if self.NORMALIZE_BY_ATR and getattr(self.algorithm, "atr_14", None):
divisor = max(1e-6, float(getattr(self.algorithm.atr_14, "Current", getattr(self.algorithm.atr_14, "current", 0)).Value))
distances[key] = raw / divisor
else:
distances[key] = raw
for i, pivot in enumerate(low_pivots[:3]):
key = f"PPHL_L_{suffix}_{i+1}"
if pivot == 0.0:
distances[key] = 0.0
else:
raw = pivot - ref_low
if self.NORMALIZE_BY_ATR and getattr(self.algorithm, "atr_14", None):
divisor = max(1e-6, float(getattr(self.algorithm.atr_14, "Current", getattr(self.algorithm.atr_14, "current", 0)).Value))
distances[key] = raw / divisor
else:
distances[key] = raw
# self.algorithm.Debug(f"Pivot Distances ({suffix}): high_pivots={high_pivots}, low_pivots={low_pivots}, ref_high={ref_high}, ref_low={ref_low}")
# self.algorithm.Debug(f"Pivot Distances ({suffix}) computed: {distances}")
return distances
def get_high_pivot_points(self, pivot_indicator):
"""
Backwards compatibility helper (not used by _calc_pivot_distances).
Returns list of 3 floats (most recent first) or padded zeros.
"""
if not pivot_indicator or not getattr(pivot_indicator, "high_pivots", None):
return [0.0, 0.0, 0.0]
res = []
for p in pivot_indicator.high_pivots[:3]:
if isinstance(p, dict):
res.append(float(p.get("price", p.get("value", 0))))
else:
res.append(float(p))
while len(res) < 3:
res.append(0.0)
return res[:3]
def get_low_pivot_points(self, pivot_indicator):
if not pivot_indicator or not getattr(pivot_indicator, "low_pivots", None):
return [0.0, 0.0, 0.0]
res = []
for p in pivot_indicator.low_pivots[:3]:
if isinstance(p, dict):
res.append(float(p.get("price", p.get("value", 0))))
else:
res.append(float(p))
while len(res) < 3:
res.append(0.0)
return res[:3]
def calculate_upper_wick_dc(self):
if self.algorithm.cumulative_delta_bars.count < 2:
return 0.0
# assume cumulative_delta is a numeric (or use .Value)
cur = getattr(self.algorithm, "cumulative_delta", None)
prev = getattr(self.algorithm.cumulative_delta_bars[0], "Value", getattr(self.algorithm.cumulative_delta_bars[0], "value", self.algorithm.cumulative_delta_bars[0]))
try:
cur_val = float(cur)
prev_val = float(prev)
except Exception:
return 0.0
return cur_val - max(cur_val, prev_val)
def calculate_lower_wick_dc(self):
if self.algorithm.cumulative_delta_bars.count < 2:
return 0.0
cur = getattr(self.algorithm, "cumulative_delta", None)
prev = getattr(self.algorithm.cumulative_delta_bars[0], "Value", getattr(self.algorithm.cumulative_delta_bars[0], "value", self.algorithm.cumulative_delta_bars[0]))
try:
cur_val = float(cur)
prev_val = float(prev)
except Exception:
return 0.0
return cur_val - min(cur_val, prev_val)
def calculate_volume_ratio(self):
if self.algorithm.m1_bars.count < 16 or not getattr(self.algorithm, "volume_sma_15", None) or not getattr(self.algorithm.volume_sma_15, "IsReady", False):
return 1.0
cur_vol = float(self.algorithm.m1_bars[0].Volume if hasattr(self.algorithm.m1_bars[0], "Volume") else getattr(self.algorithm.m1_bars[0], "volume", self.algorithm.m1_bars[0].volume))
avg_vol = float(self.algorithm.volume_sma_15.Current.Value)
return cur_vol / avg_vol if avg_vol > 0 else 1.0