Overall Statistics
Total Trades
890
Average Win
4.41%
Average Loss
-4.75%
Compounding Annual Return
85.617%
Drawdown
52.600%
Expectancy
0.130
Net Profit
1041.121%
Sharpe Ratio
1.342
Sortino Ratio
1.4
Probabilistic Sharpe Ratio
49.070%
Loss Rate
41%
Win Rate
59%
Profit-Loss Ratio
0.93
Alpha
0
Beta
0
Annual Standard Deviation
0.629
Annual Variance
0.396
Information Ratio
1.372
Tracking Error
0.629
Treynor Ratio
0
Total Fees
$190753.35
Estimated Strategy Capacity
$44000000.00
Lowest Capacity Asset
MES YEBKSYL2454X
Portfolio Turnover
400.43%
# region imports
from datetime import timedelta
from AlgorithmImports import *
# endregion


CASH = 100_000
LEVERAGE = 0.5                                      # maximum allowed position leverage. For Cfd maximum leverage is 50
START_DATE = (2020, 1, 1)
END_DATE = (2024, 3, 5)
RESOLUTION = Resolution.Minute
TIMEFRAME = timedelta(minutes=30)

INDEX_TICKER = "SPX"                                # SPX | NDX
TRADE_TICKER = "MES"                                # SPY/QQQ - ETF, SPX500USD/NAS100USD  - Cfd, MES/MNQ - Future
TICKER_TYPE = SecurityType.Future                   # Equity/Cfd/Future
ENABLE_FEES = True                                  # emulate broker fees?

# session
USE_SESSION = False                                  # enable/disable session limits for entries
SESSION_START = (9, 30)                             # trading session start, (H, M)
SESSION_END = (15, 30)                              # trading session end, (H, M)
CLOSE_POSITION_ON_SESSION_END = True                # if True, existing position will be closed at end of session
REOPEN_PROFITABLE_POSITION_ON_NEXT_SESSION = True   # if True, profitable position will be reopened at start of session

# entry parameters
HALF_TREND_AMPLITUDE = 2
HALF_TREND_CHANNEL_DEVIATION = 2
HALT_TREND_ATR_PERIOD = 100
EMA_PERIOD = 200

# risk management
RISK = 4                # risk per trade, percent
ATR_PERIOD = 15         # ATR parameters for stop loss
ATR_MULTIPLIER = 2
TARGET1 = 1
TARGET2 = 2
TRAILING_STOP_DISTANCE = 0.05     # trailing stop distance in %. Starts after breaking even. Set to 0 to disable


class BB_HalfTrend(QCAlgorithm):
    def Initialize(self):
        self.SetCash(CASH)
        self.SetStartDate(*START_DATE)
        if END_DATE:
            self.SetEndDate(*END_DATE)
        self.UniverseSettings.Resolution = RESOLUTION
        self.SetSecurityInitializer(
            MySecurityInitializer(self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices)))

        if TICKER_TYPE == SecurityType.Cfd:
            self.SetBrokerageModel(OandaBrokerageModel(AccountType.Margin))
        else:
            self.SetBrokerageModel(InteractiveBrokersBrokerageModel(AccountType.Margin))

        index = self.AddIndex(INDEX_TICKER, RESOLUTION, Market.USA)
        if TICKER_TYPE == SecurityType.Future:
            security = self.AddFuture(TRADE_TICKER, RESOLUTION, dataMappingMode=DataMappingMode.LastTradingDay,
                                      dataNormalizationMode=DataNormalizationMode.Raw)
            security.SetFilter(0, 182)
            self.symbolData = FutureData(self, index, security)
        else:
            security = self.AddSecurity(TICKER_TYPE, TRADE_TICKER, RESOLUTION)
            self.symbolData = SymbolData(self, index, security)

        if USE_SESSION:
            self.Schedule.On(self.DateRules.EveryDay(security.Symbol), self.TimeRules.At(*SESSION_START), self.SessionStart)
            self.Schedule.On(self.DateRules.EveryDay(security.Symbol), self.TimeRules.At(*SESSION_END), self.SessionEnd)

        self.tradeEnabled = True

        self.chart = Chart("Signals")
        self.chart.AddSeries(Series("Close", SeriesType.Line, "$", Color.Brown))
        self.chart.AddSeries(Series("HalfTrend", SeriesType.Line, "$", Color.Blue))
        self.chart.AddSeries(Series("EMA", SeriesType.Line, "$", Color.Purple))
        self.chart.AddSeries(Series("HalfTrend Buy", SeriesType.Scatter, "$", Color.Green,
                                    ScatterMarkerSymbol.Triangle))
        self.chart.AddSeries(Series("HalfTrend Sell", SeriesType.Scatter, "$", Color.Red,
                                    ScatterMarkerSymbol.TriangleDown))
        self.AddChart(self.chart)

    def SessionStart(self):
        if REOPEN_PROFITABLE_POSITION_ON_NEXT_SESSION:
            self.tradeEnabled = True
            self.symbolData.RestorePositions()

    def SessionEnd(self):
        if CLOSE_POSITION_ON_SESSION_END:
            self.tradeEnabled = False
            self.symbolData.BeforeLiquidation()
            self.Liquidate(tag="End of session liquidation")

    def OnData(self, slice: Slice) -> None:
        self.symbolData.OnData(slice)

    def OnOrderEvent(self, orderEvent):
        self.symbolData.OnOrderEvent(orderEvent)


class SymbolData:
    def __init__(self, algo: BB_HalfTrend, index: Security, security: Security):
        self.algo = algo
        self.index = index
        self.symbol = security.Symbol
        self.lastProfit = None
        self.lastPosition = None
        self.lastDate = None
        self.orderManager = OrderManager(algo, security.Symbol)
        self.consolidator = algo.Consolidate(index.Symbol, TIMEFRAME, self.OnBar)

        self.ema = ExponentialMovingAverage(EMA_PERIOD)
        history = algo.History(index.Symbol, timeframeToBars(TIMEFRAME, RESOLUTION) * EMA_PERIOD, RESOLUTION)
        for i, row in history.loc[index.Symbol].iterrows():
            self.ema.Update(i, row.close)
        self.halfTrend = HalfTrendIndicator(algo, index.Symbol)
        self.halfTrend.WarmUp()
        self.WarmupSecurity()

    def OnOrderEvent(self, orderEvent: OrderEvent):
        if self.orderManager is not None:
            self.orderManager.OnOrderEvent(orderEvent)

    def WarmupSecurity(self):
        self.atr = AverageTrueRange(ATR_PERIOD)
        history = self.algo.History(self.symbol, timeframeToBars(TIMEFRAME, RESOLUTION) * ATR_PERIOD, RESOLUTION)
        for i, row in history.loc[self.symbol].iterrows():
            bar = TradeBar(i, self.symbol, row.open, row.high, row.low, row.close, 0.0)
            self.atr.Update(bar)

    @property
    def IsReady(self):
        return self.atr.IsReady and self.ema.IsReady and self.halfTrend.IsReady

    def GetBuySignal(self, price):
        if not self.IsReady:
            return False

        return self.halfTrend.BuySignal and price > self.ema.Current.Value

    def GetSellSignal(self, price):
        if not self.IsReady:
            return False

        return self.halfTrend.SellSignal and price < self.ema.Current.Value

    def CalculateQuantity(self, symbol: Symbol):
        max_qty = int(self.algo.CalculateOrderQuantity(symbol, LEVERAGE))
        self.orderManager.riskDistance = self.atr.Current.Value * ATR_MULTIPLIER
        var = self.algo.Portfolio.TotalPortfolioValue * RISK / 100.
        qty = int(var / self.orderManager.riskDistance)
        if qty > max_qty:
            self.algo.Log(f"Limited quantity to {max_qty} due to margin requirements")
            qty = max_qty
        return qty

    def OpenPosition(self, direction, qty=None, tag=""):
        if qty is None:
            qty = self.CalculateQuantity(self.symbol)
        if qty > 0:
            if self.algo.Portfolio[self.symbol].Invested:
                qty += self.algo.Portfolio[self.symbol].AbsoluteQuantity
            self.algo.MarketOrder(self.symbol, direction*qty, tag=f"Entry {tag}")

    def BeforeLiquidation(self):
        if self.algo.Portfolio[self.symbol].UnrealizedProfit > 0:
            self.algo.Log(f"Saving position on session break for {self.symbol}")
            self.lastProfit = self.algo.Portfolio[self.symbol].UnrealizedProfit
            self.orderManager.SavePosition()

    def RestorePositions(self):
        if self.lastProfit is not None:
            self.algo.Log(f"Restoring position for {self.symbol}")
            self.orderManager.RestorePosition()
        self.lastProfit = None

    def OnData(self, slice: Slice):
        if self.algo.IsWarmingUp:
            return

        if slice.Time.date() != self.lastDate:
            self.lastDate = slice.Time.date()
            if self.ema.IsReady:
                self.algo.Plot("Signals", "EMA", self.ema.Current.Value)
            if self.atr.IsReady:
                self.algo.Plot("ATR", "ATR", self.atr.Current.Value)

        if not self.algo.tradeEnabled:
            return

        if not slice.Bars.ContainsKey(self.index.Symbol):
            return

        if self.algo.Portfolio[self.symbol].Invested:
            return

        if self.GetBuySignal(slice.Bars[self.index.Symbol].Close):
            self.OpenPosition(1)
        if self.GetSellSignal(slice.Bars[self.index.Symbol].Close):
            self.OpenPosition(-1)

    def OnBar(self, bar: TradeBar):
        self.atr.Update(bar)
        self.ema.Update(bar.EndTime, bar.Close)
        self.algo.Plot("Index", self.index.Symbol.Value, bar.Open, bar.High, bar.Low, bar.Close)


class FutureData(SymbolData):
    def __init__(self, algo: BB_HalfTrend, index: Security, future: Future):
        super().__init__(algo, index, future)
        self.future = future
        self.orderManager = None
        self.contract = None

    def WarmupSecurity(self):
        self.atr = AverageTrueRange(ATR_PERIOD)
        history = self.algo.History(self.symbol, timeframeToBars(TIMEFRAME, RESOLUTION) * ATR_PERIOD, RESOLUTION)
        for i, row in history.droplevel([0,1]).iterrows():
            bar = TradeBar(i, self.symbol, row.open, row.high, row.low, row.close, 0.0)
            self.atr.Update(bar)

    def OpenPosition(self, direction, qty=None, tag=""):
        contract = self.algo.Securities[self.future.Mapped]
        if not self.algo.Portfolio[contract.Symbol].Invested or self.orderManager is None:
            self.orderManager = OrderManager(self.algo, contract.Symbol)
        if qty is None:
            qty = self.CalculateQuantity(contract.Symbol)
        if qty > 0:
            if self.algo.Portfolio[contract.Symbol].Invested:
                qty += self.algo.Portfolio[contract.Symbol].AbsoluteQuantity

            self.contract = contract
            self.algo.MarketOrder(contract.Symbol, direction*qty, tag=f"Entry {tag}")

    def OnData(self, slice: Slice):
        if self.algo.IsWarmingUp:
            return

        if slice.Time.date() != self.lastDate:
            self.lastDate = slice.Time.date()
            if self.ema.IsReady:
                self.algo.Plot("Signals", "EMA", self.ema.Current.Value)
            if self.atr.IsReady:
                self.algo.Plot("ATR", "ATR", self.atr.Current.Value)

        # rollover
        for symbol, changed_event in slice.SymbolChangedEvents.items():
            if symbol != self.symbol:
                continue
            old_symbol = changed_event.OldSymbol
            new_symbol = changed_event.NewSymbol
            self.algo.Log(f"Rollover for {symbol} - changed at {self.algo.Time}: "
                          f"{old_symbol} exp {self.algo.Securities[old_symbol].Expiry} -> "
                          f"{new_symbol} exp {self.algo.Securities[new_symbol].Expiry}")
            if self.contract is not None and self.algo.Portfolio[self.contract.Symbol].Invested:
                self.orderManager.SavePosition()
                self.algo.Liquidate(self.contract.Symbol, tag="Rollover")
                self.contract = self.algo.Securities[new_symbol]
                self.orderManager.symbol = self.contract.Symbol
                self.orderManager.RestorePosition()

        if self.contract and self.orderManager:
            self.orderManager.OnPrice(self.algo.Securities[self.contract.Symbol].Close)

        if not self.algo.tradeEnabled:
            return

        if not slice.Bars.ContainsKey(self.symbol) or not slice.Bars.ContainsKey(self.index.Symbol):
            return

        if self.contract and self.algo.Portfolio[self.contract.Symbol].Invested:
            return

        if self.GetBuySignal(slice.Bars[self.index.Symbol].Close):
            self.OpenPosition(1)
        if self.GetSellSignal(slice.Bars[self.index.Symbol].Close):
            self.OpenPosition(-1)


class HalfTrendIndicator:
    def __init__(self, algo: BB_HalfTrend, symbol: Symbol):
        self.algo = algo
        self.symbol = symbol
        self.consolidator = algo.Consolidate(symbol, TIMEFRAME, self.OnBar)
        self.atr = AverageTrueRange(HALT_TREND_ATR_PERIOD)
        self.highPrice = Maximum(HALF_TREND_AMPLITUDE)
        self.lowPrice = Minimum(HALF_TREND_AMPLITUDE)
        self.highma = SimpleMovingAverage(HALF_TREND_AMPLITUDE)
        self.lowma = SimpleMovingAverage(HALF_TREND_AMPLITUDE)
        self.trend = 0
        self.nextTrend = 0
        self.maxLowPrice = 0
        self.minHighPrice = float("inf")
        self.up = 0
        self.down = 0
        self.atrHigh = 0.
        self.atrLow = 0.
        self.arrowUp = None
        self.arrowDown = None
        self.bar_1 = None
        self.ht = None
        self.WarmUp()

    def WarmUp(self):
        warmupLength = max(3 * HALT_TREND_ATR_PERIOD, HALF_TREND_AMPLITUDE)
        history = self.algo.History(self.symbol, timeframeToBars(TIMEFRAME, RESOLUTION) * warmupLength, RESOLUTION)
        for i, row in history.loc[self.symbol].iterrows():
            bar = TradeBar(i, self.symbol, row.open, row.high, row.low, row.close, 0.0)
            self.consolidator.Update(bar)

    @property
    def IsReady(self):
        return self.atr.IsReady and self.highPrice.IsReady

    def OnBar(self, bar: TradeBar):
        self.atr.Update(bar)
        self.highPrice.Update(bar.EndTime, bar.High)
        self.lowPrice.Update(bar.EndTime, bar.Low)
        self.highma.Update(bar.EndTime, bar.High)
        self.lowma.Update(bar.EndTime, bar.Low)

        if self.bar_1 is None:
            self.bar_1 = bar

        trend_1 = self.trend
        up_1 = self.up
        down_1 = self.down

        atr2 = self.atr.Current.Value / 2
        dev = HALF_TREND_CHANNEL_DEVIATION * atr2
        if self.nextTrend == 1:
            self.maxLowPrice = max(self.lowPrice.Current.Value, self.maxLowPrice)

            if self.highma.Current.Value < self.maxLowPrice and bar.Close < self.bar_1.Low:
                self.trend = 1
                self.nextTrend = 0
                self.minHighPrice = self.highPrice.Current.Value
        else:
            self.minHighPrice = min(self.highPrice.Current.Value, self.minHighPrice)

            if self.lowma.Current.Value > self.minHighPrice and bar.Close > self.bar_1.High:
                self.trend = 0
                self.nextTrend = 1
                self.maxLowPrice = self.lowPrice.Current.Value

        if self.trend == 0:
            if trend_1 != 0:
                self.up = down_1
                self.arrowUp = self.up - atr2
            else:
                self.up = max(self.maxLowPrice, up_1)
            self.atrHigh = self.up + dev
            self.atrLow = self.up - dev
        else:
            if trend_1 != 1:
                self.down = up_1
                self.arrowDown = self.down + atr2
            else:
                self.down = min(self.minHighPrice, down_1)
            self.atrHigh = self.down + dev
            self.atrLow = self.down - dev

        self.ht = self.up if self.trend == 0 else self.down
        self.buySignal = self.arrowUp is not None and self.trend == 0 and trend_1 == 1
        self.sellSignal = self.arrowDown is not None and self.trend == 1 and trend_1 == 0

        self.bar_1 = bar

        if not self.algo.IsWarmingUp:
            if self.ht != 0:
                self.algo.Plot("Signals", "HalfTrend", self.ht)
            self.algo.Plot("Signals", "Close", bar.Close)

            if self.buySignal:
                self.algo.Log("HalfTrend Buy Signal")
                self.algo.Plot("Signals", "HalfTrend Buy", bar.Low)

            if self.sellSignal:
                self.algo.Log("HalfTrend Sell Signal")
                self.algo.Plot("Signals", "HalfTrend Sell", bar.High)

    @property
    def HT(self):
        return self.ht

    @property
    def BuySignal(self):
        return self.buySignal

    @property
    def SellSignal(self):
        return self.sellSignal


class MySecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None:
        super().__init__(brokerage_model, security_seeder)

    def Initialize(self, security: Security) -> None:
        super().Initialize(security)
        if not ENABLE_FEES:
            security.FeeModel = ConstantFeeModel(0.0, "USD")


class OrderManager:
    def __init__(self, algo: BB_HalfTrend, symbol: Symbol):
        self.algo = algo
        self.symbol = symbol
        self.SLOrder = None
        self.TP1Order = None
        self.TP2Order = None
        self.entryPrice = None
        self.riskDistance = None
        self.lastPosition = None
        self.lastStopPrice = None
        self.lastStopQty = None
        self.lastTP1Price = None
        self.lastTP1Qty = None
        self.lastTP2Price = None
        self.lastTP2Qty = None

    def SavePosition(self):
        self.lastPosition = self.algo.Portfolio[self.symbol].Quantity
        if self.SLOrder:
            order = self.algo.Transactions.GetOrderById(self.SLOrder.OrderId)
            self.lastStopPrice = order.StopPrice
            self.lastStopQty = order.Quantity
        if self.TP1Order:
            order = self.algo.Transactions.GetOrderById(self.TP1Order.OrderId)
            self.lastTP1Price = order.LimitPrice
            self.lastTP1Qty = order.Quantity
        if self.TP2Order:
            order = self.algo.Transactions.GetOrderById(self.TP2Order.OrderId)
            self.lastTP2Price = order.LimitPrice
            self.lastTP2Qty = order.Quantity

    def RestorePosition(self):
        if not bool(self.lastPosition):
            return

        currentPrice = self.algo.Securities[self.symbol].Close
        direction = 1 if self.lastPosition > 0 else -1
        if self.lastStopPrice and currentPrice * direction <= self.lastStopPrice * direction:
            self.algo.Log("Stop price reached, did not recover position!")
        elif self.lastTP1Price and currentPrice * direction >= self.lastTP1Price * direction:
            self.algo.Log("Take Profit #1 reached, did not recover position!")
        elif self.lastTP2Price and currentPrice * direction >= self.lastTP2Price * direction:
            self.algo.Log("Take Profit #2 reached, did not recover position!")
        else:
            self.algo.MarketOrder(self.symbol, self.lastPosition, tag="Position Restored")
            if self.lastStopPrice is not None:
                self.SLOrder = self.algo.StopMarketOrder(self.symbol, self.lastStopQty, stopPrice=self.lastStopPrice,
                                                         tag="Stop Loss")
            if self.lastTP1Price:
                self.TP1Order = self.algo.LimitOrder(self.symbol, self.lastTP1Qty, limitPrice=self.lastTP1Price,
                                                     tag="Take Profit #1")
            if self.lastTP2Price:
                self.TP2Order = self.algo.LimitOrder(self.symbol, self.lastTP2Qty, limitPrice=self.lastTP2Price,
                                                     tag="Take Profit #2")
        self.lastPosition = None
        self.lastStopPrice = None
        self.lastStopQty = None
        self.lastTP1Price = None
        self.lastTP1Qty = None
        self.lastTP2Price = None
        self.lastTP2Qty = None

    def OnPrice(self, price: float):
        if not TRAILING_STOP_DISTANCE:
            return

        if not self.SLOrder:
            return

        order = self.algo.Transactions.GetOrderById(self.SLOrder.OrderId)
        stopPrice = order.StopPrice
        direction = 1 if order.Quantity < 0 else -1

        if stopPrice * direction >= self.entryPrice * direction:
            newStopPrice = price * (1 - direction * TRAILING_STOP_DISTANCE)
            if newStopPrice * direction > stopPrice * direction:
                security = self.algo.Securities[self.symbol]
                self.SLOrder.UpdateStopPrice(r(security, newStopPrice))

    def OnOrderEvent(self, orderEvent: OrderEvent):
        if orderEvent.Status == OrderStatus.Invalid:
            self.algo.Log(f"Invalid order: {orderEvent}")

        if orderEvent.Status not in [OrderStatus.Filled, OrderStatus.Canceled]:
            return

        if orderEvent.Status == OrderStatus.Canceled:
            if self.SLOrder is not None and orderEvent.OrderId == self.SLOrder.OrderId:
                self.SLOrder = None
            if self.TP1Order is not None and orderEvent.OrderId == self.TP1Order.OrderId:
                self.TP1Order = None
            if self.TP2Order is not None and orderEvent.OrderId == self.TP2Order.OrderId:
                self.TP2Order = None

        if orderEvent.Status == OrderStatus.Filled:
            order = self.algo.Transactions.GetOrderById(orderEvent.OrderId)

            # entry filled
            if "Entry" in order.Tag:
                self.entryPrice = orderEvent.FillPrice
                direction = 1 if orderEvent.Quantity > 0 else -1
                qty = -order.Quantity
                security = self.algo.Securities[self.symbol]
                stopPrice = r(security, self.entryPrice - direction * self.riskDistance)
                tp1Price = r(security, self.entryPrice + direction * self.riskDistance * TARGET1)
                self.SLOrder = self.algo.StopMarketOrder(self.symbol, qty, stopPrice=stopPrice, tag="Stop Loss")
                self.TP1Order = self.algo.LimitOrder(self.symbol, qty // 2, limitPrice=tp1Price, tag="Take Profit #1")

            # stop loss filled
            if self.SLOrder is not None and orderEvent.OrderId == self.SLOrder.OrderId:
                self.SLOrder = None
                if self.TP1Order is not None:
                    self.TP1Order.Cancel()
                    self.TP1Order = None
                if self.TP2Order is not None:
                    self.TP2Order.Cancel()
                    self.TP2Order = None
                if self.algo.Portfolio[self.symbol].Invested:
                    self.algo.Log(f"Liquidating remaining position of {self.symbol}")
                    self.algo.Liquidate(self.symbol)

            # take profit #1 filled
            if self.TP1Order is not None and orderEvent.OrderId == self.TP1Order.OrderId:
                self.TP1Order = None
                security = self.algo.Securities[self.symbol]
                qty = -self.algo.Portfolio[self.symbol].Quantity
                direction = 1 if qty < 0 else -1
                if self.SLOrder is not None:
                    self.SLOrder.UpdateStopPrice(r(security, self.entryPrice))
                    self.SLOrder.UpdateQuantity(qty)
                limitPrice = r(security, self.entryPrice + direction * self.riskDistance * TARGET2)
                self.TP2Order = self.algo.LimitOrder(self.symbol, qty, limitPrice=limitPrice, tag="Take Profit #2")

            # take profit #2 filled
            if self.TP2Order is not None and orderEvent.OrderId == self.TP2Order.OrderId:
                self.TP2Order = None
                if self.SLOrder is not None:
                    self.SLOrder.Cancel()
                    self.SLOrder = None
                if self.algo.Portfolio[self.symbol].Invested:
                    self.algo.Log(f"Liquidating remaining position of {self.symbol}")
                    self.algo.Liquidate(self.symbol)


def timeframeToBars(timeframe, resolution):
    if resolution == Resolution.Second:
        td = timedelta(seconds=1)
    elif resolution == Resolution.Minute:
        td = timedelta(minutes=1)
    elif resolution == Resolution.Hour:
        td = timedelta(minutes=60)
    elif resolution == Resolution.Daily:
        td = timedelta(days=1)
    else:
        raise Exception("Tick resolution is not supported")

    return int(timeframe / td)


def r(security: Security, value: float):
    mpv = security.SymbolProperties.MinimumPriceVariation
    return round(int(value / mpv) * mpv, 6)