Overall Statistics
Total Trades
1354
Average Win
0.66%
Average Loss
-0.90%
Compounding Annual Return
93.321%
Drawdown
61.000%
Expectancy
0.200
Net Profit
136.642%
Sharpe Ratio
1.436
Probabilistic Sharpe Ratio
47.168%
Loss Rate
31%
Win Rate
69%
Profit-Loss Ratio
0.73
Alpha
1.302
Beta
-1.159
Annual Standard Deviation
0.819
Annual Variance
0.671
Information Ratio
1.257
Tracking Error
0.85
Treynor Ratio
-1.016
Total Fees
$0.00
Estimated Strategy Capacity
$860000.00
Lowest Capacity Asset
EURGBP 8G
class CustomSlope:
    def __init__(self, algorithm, name, period, symbol, atr):
        self.Name = name
        self.Time = datetime.min
        self.IsReady = False
        self.multiplier = 3
        self.symbol = symbol

        self._atr = atr

        self._final_upperband = RollingWindow[float](2)
        self._final_lowerband = RollingWindow[float](2)
        self._supertrend = RollingWindow[float](2)
        self._close = RollingWindow[float](3)

        warmUpData = self.algorithm.History(symbol, 20, Resolution.Daily)
        if not warmUpData.empty:
            for bar in warmUpData.loc[symbol, :].itertuples():
                tradebar = TradeBar(bar.Index, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
                self._close.Add(bar.close)

                if not self._atr.IsReady:
                    continue

                _basic_upperband = ((bar.high + bar.low)/2 + self.multiplier * self._atr.Current.Value)
                _basic_lowerband = ((bar.high + bar.low)/2 - self.multiplier * self._atr.Current.Value)

                if self._final_lowerband.IsReady and self._final_upperband.IsReady:
                    if (_basic_upperband < self._final_upperband[0]) or (self._close[1] > self._final_upperband[0]):
                        self._final_upperband.Add(_basic_upperband)
                    else:
                        self._final_upperband.Add(self._final_upperband[0])

                    if (_basic_lowerband > self._final_lowerband[0]) or (self._close[1] < self._final_lowerband[0]):
                        self._final_lowerband.Add(_basic_lowerband)
                    else:
                        self._final_lowerband.Add(self._final_lowerband[0])
                else:
                    self._final_upperband.Add(0)
                    self._final_lowerband.Add(0)

                if self._supertrend.IsReady:
                    if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] < self._final_upperband[0]):
                        self._supertrend.Add(self._final_upperband[0])
                    if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] > self._final_upperband[0]):
                        self._supertrend.Add(self._final_lowerband[0])
                    if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] > self._final_lowerband[0]):
                        self._supertrend.Add(self._final_lowerband[0])
                    if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] < self._final_lowerband[0]):
                        self._supertrend.Add(self._final_upperband[0])
                else:
                    self._supertrend.Add(0)
        else:
            self.algorithm.Debug(f'{str(symbol)} warmUpData is empty')

    def __repr__(self):
        return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)

    def Update(self, high, low, close):
        # Need to do this at the end, so that
        self._close.Add(close)

        self.IsReady = self._supertrend.IsReady

        if self.IsReady:
            # Update rolling windows
            _basic_upperband = ((high + low)/2 + self.multiplier * self._atr.Current.Value)
            _basic_lowerband = ((high + low)/2 - self.multiplier * self._atr.Current.Value)

            if self._final_lowerband.IsReady and self._final_upperband.IsReady:
                if (_basic_upperband < self._final_upperband[0]) or (self._close[1] > self._final_upperband[0]):
                    self._final_upperband.Add(_basic_upperband)
                else:
                    self._final_upperband.Add(self._final_upperband[0])
                if (_basic_lowerband > self._final_lowerband[0]) or (self._close[1] < self._final_lowerband[0]):
                    self._final_lowerband.Add(_basic_lowerband)
                else:
                    self._final_lowerband.Add(self._final_lowerband[0])

            if self._supertrend.IsReady:
                if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] < self._final_upperband[0]):
                    self._supertrend.Add(self._final_upperband[0])
                if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] > self._final_upperband[0]):
                    self._supertrend.Add(self._final_lowerband[0])
                if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] > self._final_lowerband[0]):
                    self._supertrend.Add(self._final_lowerband[0])
                if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] < self._final_lowerband[0]):
                    self._supertrend.Add(self._final_upperband[0])
            else:
                self._supertrend.Add(0)

        return self.IsReady

    @property
    def IsReady(self):
        if self._close.IsReady and self._final_lowerband.IsReady and self._final_upperband.IsReady:
            return True
        return False

    def Update2(self, time, value):
        self.queue.appendleft(value)
        count = len(self.queue)
        self.Time = time

        self.IsReady = count == self.queue.maxlen

        #### start here the indicator calulation
        if self.IsReady:
            y = np.log(self.queue)
            x = [range(len(y))]
            slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
            self.Slope = slope * 10000 # value is very small an will display 0 if not multiplyed
            self.Intercept = intercept
            self.R_value = r_value * 10
            self.P_value = p_value
            self.Std_err = std_err
        #### finish the custom indicator

        return self.IsReady
import pandas as pd
from datetime import datetime, timedelta
import re
from enum import Enum


class ReferenceType(Enum):
    AVERAGE = 1
    PREVIOUS_CLOSE = 2

class VolatilityType(Enum):
    STANDARD_DEVIATION = 1
    HIGH_LOW = 2    # This trigger very little orders, ineffective.
    ATR = 3

class PositionAmountType(Enum):
    AVERAGE = 1
    INCREMENTAL = 2
    DOUBLEUP = 3


class FXMomentumAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2020, 12, 28)
        # self.SetStartDate(2017, 12, 28)
        self.SetEndDate(datetime.now())
        self.SetCash(100000)
        self.resolution = Resolution.Minute
        self._grid_number = 10
        self._leverage_ratio = 50
        self._reference_price = None
        self._std = None
        self._stop_loss = None
        self._cash_preserve_ratio = 0.99
        self._cash_per_position = None

        self.IS_READY = False
        self.REFERENCE_PRICE_METHOD = ReferenceType.PREVIOUS_CLOSE
        self.POSITION_AMOUNT_METHOD = PositionAmountType.AVERAGE
        self.VOLATILITY_METHOD = VolatilityType.STANDARD_DEVIATION
        self.VOLATILITY_MULTIPLIER = 2
        self.STOP_LOSS_MODE = False
        self.STOP_LOSS_RESCALE = False
        self.STOP_LOSS_MULTIPLIER = 1.2 # 10% more on upper side and 10% less on lower side

        # self.pair = 'GBPCHF'
        # self.pair = 'AUDCAD'
        # self.pair = 'GBPCAD'
        # self.pair = 'USDJPY'
        # self.pair = 'AUDJPY'
        self.pair = 'EURGBP'
        # self.pair = 'GBPUSD'

        self.forex = self.AddForex(
            self.pair,
            self.resolution,
            Market.Oanda,
            True,
            self._leverage_ratio
        )
        # self.Debug(f'Our Leverage: {self.forex.MarginModel.GetLeverage(self.forex)}')

        ################################################
        # Adding Schedule event
        self.Schedule.On(
            self.DateRules.MonthStart(self.pair),
            self.TimeRules.AfterMarketOpen(self.pair, 0),
            self.openMarket
        )
        self.Schedule.On(
            self.DateRules.MonthEnd(self.pair),
            self.TimeRules.BeforeMarketClose(self.pair, 5),
            self.closeMarket
        )

    def OnData(self, data):
        if not self.IS_READY or self.pair not in data:
            return

        if not self.STOP_LOSS_MODE:
            return

        # Stop loss
        if data[self.pair].Close < (self._reference_price - self._stop_loss) or data[self.pair].Close > (self._reference_price + self._stop_loss):
            # self.Debug(f'{self.Time}: Break out and clean the open orders')
            # Liquidate all positions and close all open orders
            self.Liquidate(self.pair)
            if self.STOP_LOSS_RESCALE:
                self.openMarket()

    def setParameters(self):
        history = self.History([self.pair], 93, Resolution.Daily)
        history = history.droplevel(0)

        # Set up reference price
        if self.REFERENCE_PRICE_METHOD == ReferenceType.PREVIOUS_CLOSE:
            # Get close of previous day
            self._reference_price = self.History(
                [self.pair],
                2880,
                Resolution.Minute
            ).droplevel(0).close[-1]
        elif self.REFERENCE_PRICE_METHOD == ReferenceType.AVERAGE:
            # Daily moving average
            self.Debug('Average mean')
            self._reference_price = history.close[-20:].mean()

        # Cash amount per position
        if self.POSITION_AMOUNT_METHOD == PositionAmountType.AVERAGE:
            # Leveraged Equal weighted
            self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / (self._grid_number * 2)
        elif self.POSITION_AMOUNT_METHOD == PositionAmountType.INCREMENTAL:
            # Leveraged incremental weighted
            self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / (sum(range(1, self._grid_number+1)) * 2)
        elif self.POSITION_AMOUNT_METHOD == PositionAmountType.DOUBLEUP:
            # Leveraged double up weighted
            self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / ((2**self._grid_number - 1) * 2)

        # Get volatility of previous day
        if self.VOLATILITY_METHOD == VolatilityType.STANDARD_DEVIATION:
            # The 2 * Standardization = 95%
            self._std = history.close.std() * self.VOLATILITY_MULTIPLIER
        elif self.VOLATILITY_METHOD == VolatilityType.HIGH_LOW:
            # The high and low of the previous market open day
            self._std = abs(history.high.max() - history.low.min())
        elif self.VOLATILITY_METHOD == VolatilityType.ATR:
            # ATR
            self._std = ta.ATR(history.high, history.low, history.close, timeperiod=60)[-1]

        self._stop_loss = self._std * self.STOP_LOSS_MULTIPLIER


    def openMarket(self):
        # self.Debug(f'{self.Time} Market open. Order number ({len(self.Transactions.GetOpenOrders(self.pair))})')
        self.setParameters()

        # Cancel all open orders before creating new orders
        self.Liquidate(self.pair)

        # Set up the grid limit order
        
        for i in range(1, self._grid_number + 1):
            qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price - self._std / self._grid_number * i))
            order = self.LimitOrder(
                self.pair,
                qty,
                round(self._reference_price - self._std / self._grid_number * i, 5),
                f'Long{i}'
            )
            qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price + self._std / self._grid_number * i))
            order = self.LimitOrder(
                self.pair,
                -qty,
                round(self._reference_price + self._std / self._grid_number * i, 5),
                f'Short{i}'
            )
        self.IS_READY = True

    def closeMarket(self):
        # self.Debug(f'{self.Time} Market close')

        # Liquidate all positions by the end of the day
        self.Liquidate(self.pair)

        # self.Debug(f'{self.Time} Market close. Order number ({len(self.Transactions.GetOpenOrders(self.pair))})')

    def OnOrderEvent(self, orderEvent):
        order = self.Transactions.GetOrderById(orderEvent.OrderId)
        if orderEvent.Status == OrderStatus.Filled:
            # self.Debug(
            #     "{0}: {1} ({2})".format(
            #         self.Time,
            #         orderEvent,
            #         order.Tag
            #     )
            # )

            match = re.match(r'(.*)(\d+)(.*)$', order.Tag)

            if not match:
                return

            if match.group(1) == 'Long':
                i = int(match.group(2))
                if match.group(3) == '-Liquidate':
                    qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price - self._std / self._grid_number * i))
                    self.LimitOrder(
                        self.pair,
                        qty,
                        round(self._reference_price - self._std / self._grid_number * i, 5),
                        f'Long{match.group(2)}'
                    )
                else:
                    if (i - 1) == 0:
                        self.LimitOrder(
                            self.pair,
                            -abs(order.Quantity),
                            round(self._reference_price, 5),
                            f'Long{match.group(2)}-Liquidate'
                        )
                    elif i > 1:
                        self.LimitOrder(
                            self.pair,
                            -abs(order.Quantity),
                            round(self._reference_price - self._std / self._grid_number * (i - 1), 5),
                            f'Long{match.group(2)}-Liquidate'
                        )
            elif match.group(1) == 'Short':
                i = int(match.group(2))
                if match.group(3) == '-Liquidate':
                    qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price + self._std / self._grid_number * i))
                    self.LimitOrder(
                        self.pair,
                        -qty,
                        round(self._reference_price + self._std / self._grid_number * i, 5),
                        f'Short{match.group(2)}'
                    )
                    openOrders = self.Transactions.GetOpenOrders(self.pair)
                    for order in openOrders:
                        if order.Tag in [f'Short{match.group(2)}-Liquidate', f'Short{match.group(2)}-Stoploss']:
                            self.Transactions.CancelOrder(order.Id)
                else:
                    if (i - 1) == 0:
                        self.LimitOrder(
                            self.pair,
                            abs(order.Quantity),
                            round(self._reference_price, 5),
                            f'Short{match.group(2)}-Liquidate'
                        )
                    else:
                        self.LimitOrder(
                            self.pair,
                            abs(order.Quantity),
                            round(self._reference_price + self._std / self._grid_number * (i - 1), 5),
                            f'Short{match.group(2)}-Liquidate'
                        )
        self.IS_READY = False

    def get_qty_multiplier(self, grid_number):
        if self.POSITION_AMOUNT_METHOD == PositionAmountType.AVERAGE:
            # Even weighted
            return 1
        elif self.POSITION_AMOUNT_METHOD == PositionAmountType.INCREMENTAL:
            # Incremental
            return grid_number
        elif self.POSITION_AMOUNT_METHOD == PositionAmountType.DOUBLEUP:
            # Double up weighted
            if not isinstance(grid_number, (int)):
                grid_number = int(grid_number)
            return 2**(grid_number - 1)