Overall Statistics
Total Orders
4662
Average Win
0.87%
Average Loss
-0.55%
Compounding Annual Return
-0.436%
Drawdown
26.000%
Expectancy
0.000
Start Equity
100000.0
End Equity
97836.17
Net Profit
-2.164%
Sharpe Ratio
-0.315
Sortino Ratio
-0.296
Probabilistic Sharpe Ratio
0.370%
Loss Rate
61%
Win Rate
39%
Profit-Loss Ratio
1.56
Alpha
-0.027
Beta
0.006
Annual Standard Deviation
0.084
Annual Variance
0.007
Information Ratio
-0.592
Tracking Error
0.192
Treynor Ratio
-4.332
Total Fees
$0.00
Estimated Strategy Capacity
$320000.00
Lowest Capacity Asset
BTCUSD E3
Portfolio Turnover
26.95%
Drawdown Recovery
203
from AlgorithmImports import *

class OrderBlock12H1HStrategy(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2020, 1, 1)
        self.SetEndDate(2025, 1, 1)
        self.SetCash(100000)
        self.symbol = self.AddCrypto("BTCUSD", Resolution.Hour, Market.Bitfinex).Symbol

        # Rolling bar histories for structure recognition
        self.h12_bars = RollingWindow[TradeBar](60)   # for 12H structure (~30 days)
        self.h1_bars = RollingWindow[TradeBar](500)   # for 1H confirmation logic

        self.consolidator_12h = TradeBarConsolidator(timedelta(hours=12))
        self.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_12h)
        self.consolidator_12h.DataConsolidated += self.OnDataConsolidated_12h

        self.prim_trend = None   # "bull" or "bear"
        self.prim_bos = None
        self.prim_OB = None      # (low, high, direction)
        self.pending_entry = None  # (side, entry, stop, tp)
        self.last_position_dir = None
        self.open_stop = None
        self.open_tp = None

    def OnDataConsolidated_12h(self, sender, bar):
        self.h12_bars.Add(bar)
        if self.h12_bars.IsReady:
            self.prim_trend, bos_idx = self.Get12HMarketStructure()
            if bos_idx is not None:
                bars = list(self.h12_bars)[::-1]
                self.prim_bos = bars[bos_idx]
                self.prim_OB = self.Find12HOrderBlock(bos_idx, self.prim_trend)

    def OnData(self, data: Slice):
        if data.Bars.ContainsKey(self.symbol):
            self.h1_bars.Add(data.Bars[self.symbol])

        if not (self.h12_bars.IsReady and self.prim_OB and self.prim_bos):
            return

        price = self.Securities[self.symbol].Price
        position = self.Portfolio[self.symbol].Invested
        OB_lo, OB_hi, direction = self.prim_OB
        in_prim_OB = OB_lo <= price <= OB_hi

        # ENTRY LOGIC: 1H confirmation inside 12H OB zone
        if in_prim_OB and self.h1_bars.IsReady and not position and not self.pending_entry:
            choch_idx = self.FindCHoCHon1H(direction)
            if choch_idx is not None:
                entry_OB = self.Find1HOrderBlock(choch_idx, direction)
                if entry_OB is not None:
                    entry, stop, tp, side = self.GetOrderParams(entry_OB, self.prim_OB, direction)
                    if entry is not None and stop is not None and tp is not None:
                        self.pending_entry = (side, entry, stop, tp)
                        self.Debug(f"Signal: {side} {entry}, stop {stop}, tp {tp}")

        # PLACE LIMIT ORDER
        if not position and self.pending_entry:
            # Always cancel old open orders before trying new one
            self.Transactions.CancelOpenOrders(self.symbol)
            side, entry, stop, tp = self.pending_entry
            quantity = self.CalculateOrderQuantity(entry, stop, side)
            if quantity < 0.0001:
                self.Debug(f"Order skipped: too small qty={quantity}")
                self.pending_entry = None
                return
            self.Debug(f"Placing {side} order for {quantity} BTC at {entry} | Cash={self.Portfolio.Cash}")
            if side == "long":
                self.LimitOrder(self.symbol, quantity, entry)
            else:
                self.LimitOrder(self.symbol, -quantity, entry)
            self.last_position_dir = side
            self.open_stop = stop
            self.open_tp = tp
            self.pending_entry = None

        # TRADE MANAGEMENT
        if position:
            side = self.last_position_dir
            stop = self.open_stop
            tp = self.open_tp
            if stop is None or tp is None:
                return
            if side == "long" and (price <= stop or price >= tp):
                self.Liquidate(self.symbol)
                self.Debug(f"Exit long at {price} [Stop:{stop} TP:{tp}]")
                self.open_stop = None
                self.open_tp = None
            elif side == "short" and (price >= stop or price <= tp):
                self.Liquidate(self.symbol)
                self.Debug(f"Exit short at {price} [Stop:{stop} TP:{tp}]")
                self.open_stop = None
                self.open_tp = None

    # === Market Structure and Order Block Methods ===

    def Get12HMarketStructure(self):
        bars = list(self.h12_bars)[::-1] # oldest→newest
        swing_highs = self.FindSwings(bars, True, 2)
        swing_lows = self.FindSwings(bars, False, 2)
        if len(swing_highs) < 2 or len(swing_lows) < 2:
            return None, None
        if swing_highs[-1][0] > swing_highs[-2][0]:
            return "bull", swing_highs[-1][1]
        elif swing_lows[-1][0] < swing_lows[-2][0]:
            return "bear", swing_lows[-1][1]
        return None, None

    def Find12HOrderBlock(self, bos_idx, trend):
        bars = list(self.h12_bars)[::-1]
        if bos_idx == 0:
            return None
        if trend == "bull":
            for j in range(bos_idx-1, -1, -1):
                if bars[j].Close < bars[j].Open:
                    return (bars[j].Low, bars[j].High, "bull")
        elif trend == "bear":
            for j in range(bos_idx-1, -1, -1):
                if bars[j].Close > bars[j].Open:
                    return (bars[j].Low, bars[j].High, "bear")
        return None

    def FindCHoCHon1H(self, trend):
        bars = list(self.h1_bars)[::-1]
        if len(bars) < 10:
            return None
        if trend == "bull":
            swing_highs = self.FindSwings(bars, True, 2)
            for i in range(len(swing_highs)-2, -1, -1):
                chhoch_idx = swing_highs[i][1]
                if bars[-1].Close > swing_highs[i][0]:
                    return chhoch_idx
        else:
            swing_lows = self.FindSwings(bars, False, 2)
            for i in range(len(swing_lows)-2, -1, -1):
                chhoch_idx = swing_lows[i][1]
                if bars[-1].Close < swing_lows[i][0]:
                    return chhoch_idx
        return None

    def Find1HOrderBlock(self, chhoch_idx, trend):
        bars = list(self.h1_bars)[::-1]
        if chhoch_idx == 0:
            return None
        if trend == "bull":
            for j in range(chhoch_idx-1, -1, -1):
                if bars[j].Close < bars[j].Open:
                    return (bars[j].Low, bars[j].High)
        else:
            for j in range(chhoch_idx-1, -1, -1):
                if bars[j].Close > bars[j].Open:
                    return (bars[j].Low, bars[j].High)
        return None

    def GetOrderParams(self, entry_OB, prim_OB, trend):
        if not entry_OB:
            return (None, None, None, None)
        if trend == "bull":
            entry = entry_OB[1]   # High of 1H OB for buys
            stop = entry_OB[0]    # low of 1H OB
            tp = prim_OB[1]       # high of 12H OB
            side = "long"
        else:
            entry = entry_OB[0]   # low of 1H OB for sells
            stop = entry_OB[1]    # high of 1H OB
            tp = prim_OB[0]       # low of 12H OB
            side = "short"
        return (entry, stop, tp, side)

    def CalculateOrderQuantity(self, entry, stop, side):
        equity = self.Portfolio.TotalPortfolioValue
        risk = equity * 0.01       # 1% risk in USD (for logic, but will be capped by cash)
        price = entry
        dist = abs(entry - stop)
        if dist == 0:
            return 0
        # Max BTC you can actually afford with 50% of available cash
        max_btc_by_cash = equity / price * 0.50
        btc_qty = min(risk / dist, max_btc_by_cash)
        btc_qty = max(0, round(btc_qty, 5))
        return btc_qty

    def FindSwings(self, bars, high, left_right):
        res = []
        for i in range(left_right, len(bars)-left_right):
            window = bars[i-left_right:i+left_right+1]
            candidates = [b.High if high else b.Low for b in window]
            target = bars[i].High if high else bars[i].Low
            if high and target == max(candidates):
                res.append((target, i))
            if not high and target == min(candidates):
                res.append((target, i))
        return res