Overall Statistics
Total Orders
9001
Average Win
0.67%
Average Loss
-0.62%
Compounding Annual Return
-18.651%
Drawdown
84.900%
Expectancy
-0.059
Start Equity
100000
End Equity
15226.05
Net Profit
-84.774%
Sharpe Ratio
-0.577
Sortino Ratio
-0.383
Probabilistic Sharpe Ratio
0.000%
Loss Rate
55%
Win Rate
45%
Profit-Loss Ratio
1.10
Alpha
-0.151
Beta
0.185
Annual Standard Deviation
0.234
Annual Variance
0.055
Information Ratio
-0.836
Tracking Error
0.263
Treynor Ratio
-0.729
Total Fees
$18442.70
Estimated Strategy Capacity
$170000000.00
Lowest Capacity Asset
ES YTG30NVEFCW1
Portfolio Turnover
998.17%
Drawdown Recovery
13
# region imports
from AlgorithmImports import *
from collections import defaultdict, deque
# endregion

class IntradayMomentumES(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2017, 1, 1)
        self.set_cash(100000)
        
        # Warm up for 120 days to build history
        self.set_warm_up(timedelta(days=120))

        # Add ES futures with RAW data (no normalization)
        self._es = self.add_future(Futures.Indices.SP_500_E_MINI, 
                                   Resolution.MINUTE,
                                   data_normalization_mode=DataNormalizationMode.RAW,
                                   extended_market_hours=True)
        self._es.set_filter(0, 90)
        
        # Store the canonical symbol
        self._canonical = self._es.symbol
        
        # Track price deltas by time slot (09:30-10:00, 10:00-10:30, etc.)
        # Key: time slot string (e.g., "09:30"), Value: list of all |Close-Open| deltas
        self._slot_history = defaultdict(list)
        
        # Track if we have a position and when to exit
        self._exit_time = None
        self._current_contract = None
        
        # Track consolidators per contract symbol
        self._consolidators = {}
        
        # Settings
        self._percentile_threshold = 0.95
        self._min_history = 120
    
    def on_securities_changed(self, changes: SecurityChanges):
        for added in changes.added_securities:
            if added.symbol.security_type == SecurityType.FUTURE and not added.symbol.is_canonical():
                # Check if this is the front month (mapped) contract
                if self.securities[self._canonical].mapped == added.symbol:
                    # Create consolidator for front month contract only
                    consolidator = TradeBarConsolidator(timedelta(minutes=30))
                    consolidator.data_consolidated += self._on_thirty_min_bar
                    self.subscription_manager.add_consolidator(added.symbol, consolidator)
                    self._consolidators[added.symbol] = consolidator
        
        for removed in changes.removed_securities:
            if removed.symbol in self._consolidators:
                self.subscription_manager.remove_consolidator(removed.symbol, self._consolidators[removed.symbol])
                del self._consolidators[removed.symbol]
    
    def _on_thirty_min_bar(self, sender, bar: TradeBar):
        # Get time slot (e.g., "09:30" for 09:30-10:00 bar)
        slot_key = bar.time.strftime("%H:%M")
        
        # Calculate absolute price delta for this bar
        abs_price_delta = abs(bar.close - bar.open)
        
        # Determine direction based on signed delta
        price_delta = bar.close - bar.open
        
        # Get historical deltas for this time slot
        slot_deltas = self._slot_history[slot_key]
        
        # Require full 120 slots before trading
        if len(slot_deltas) >= self._min_history and not self.is_warming_up:
            # Calculate 90th percentile of historical absolute deltas
            sorted_deltas = sorted(slot_deltas)
            percentile_index = int(len(sorted_deltas) * self._percentile_threshold)
            threshold = sorted_deltas[percentile_index]
            
            # Plot threshold and current delta
            self.plot("Momentum Signals", "Threshold", threshold)
            self.plot("Momentum Signals", "Abs Delta", abs_price_delta)
            
            # Check if current move exceeds threshold
            if abs_price_delta > threshold:
                # Determine direction: Close > Open = LONG, Close < Open = SHORT
                direction = 1 if price_delta > 0 else -1
                
                # Enter position if not already in one
                if self._current_contract is None:
                    self._current_contract = bar.symbol
                    tag = f"Slot:{slot_key}|Delta:{abs_price_delta:.2f}|Threshold:{threshold:.2f}|Dir:{direction}"
                    self.market_order(bar.symbol, direction, tag=tag)
                    # Set exit time to 1 bar (30 min) from now
                    self._exit_time = bar.end_time + timedelta(minutes=30)
        
        # Add current absolute delta to history for this slot
        slot_deltas.append(abs_price_delta)
        
        # Keep list at max 120 items
        if len(slot_deltas) > 120:
            slot_deltas.pop(0)
    
    def on_data(self, data: Slice):
        # Exit position at the designated time
        if self._exit_time is not None and self.time >= self._exit_time:
            if self._current_contract is not None and self.portfolio[self._current_contract].invested:
                self.liquidate(self._current_contract)
            self._exit_time = None
            self._current_contract = None