Custom Indicators


LEAN supports over 100 pre-built indicators, but a custom indicator is an indicator you define. It receives input, performs some calculation, and sets its output value. Custom indicators are helpful when you want to achieve any of the following results:

  • Use an indicator that LEAN doesn't currently implement
  • Combine built-in indicators beyond the logic of Indicator Extensions
  • Create your own unique indicator for trading

Define Indicators

Custom indicators must implement the PythonIndicator class. The indicator must have an Updateupdate method and Namename, Timetime, and Valuevalue attributes. The Updateupdate method must accept an IndicatorDataPoint, QuoteBar, or TradeBar and return a boolean that represents if the indicator is ready. The Timetime attribute represents the last time you updated the indicator and the Valuevalue attribute represents the current indicator value. The following definition provides an example of a custom simple moving average indicator.

Custom indicators subsclass the IndicatorBase<IndicatorDataPoint>, BarIndicator, or TradeBarIndicator class, depending on the indicator type. The following definition provides an example of a custom simple moving average indicator that inherits the IndicatorBase<IndicatorDataPoint> class. To view examples of indicators that inherit the BarIndicator or TradeBarIndicator class, see the AverageTrueRange or VolumeWeightedAveragePriceIndicator implementation in the LEAN GitHub repository.

public class CustomSimpleMovingAverage : IndicatorBase<IndicatorDataPoint>, IIndicatorWarmUpPeriodProvider
    private RollingWindow<decimal> _window;
    public override bool IsReady => _window.IsReady;
    public int WarmUpPeriod => _window.Size;

    public CustomSimpleMovingAverage(string name, int period) : base(name)
        _window = new RollingWindow<decimal>(period);

    protected override decimal ComputeNextValue(IndicatorDataPoint input)
        return _window.Sum() / _window.Size;

    public override void Reset()
class CustomSimpleMovingAverage(PythonIndicator):
    def __init__(self, name, period): = name
        self.warm_up_period = period
        self.time = datetime.min
        self.value = 0
        self.queue = deque(maxlen=period)

    def update(self, input: BaseData) -> bool:
        count = len(self.queue)
        self.time = input.time
        self.value = sum(self.queue) / count
        return count == self.queue.maxlen

The following definition provides an example of a custom money flow index indicator.

public class CustomMoneyFlowIndex : TradeBarIndicator, IIndicatorWarmUpPeriodProvider
    private decimal _previousTypicalPrice;
    private RollingWindow<decimal> _negativeMoneyFlow;
    private RollingWindow<decimal> _positiveMoneyFlow;
    public override bool IsReady => _positiveMoneyFlow.IsReady;
    public int WarmUpPeriod => _positiveMoneyFlow.Size;
    public CustomMoneyFlowIndex(string name, int period) : base(name)
        _negativeMoneyFlow = new(period);
        _positiveMoneyFlow = new(period);
        _previousTypicalPrice = 0m;
    protected override decimal ComputeNextValue(TradeBar input)
        var typicalPrice = (input.High + input.Low + input.Close) / 3;
        var moneyFlow = typicalPrice * input.Volume;
        _negativeMoneyFlow.Add(typicalPrice < _previousTypicalPrice ? moneyFlow: 0);
        _positiveMoneyFlow.Add(typicalPrice > _previousTypicalPrice ? moneyFlow: 0);
        _previousTypicalPrice = moneyFlow;
        var positiveMoneyFlowSum = _positiveMoneyFlow.Sum();
        var totalMoneyFlow = positiveMoneyFlowSum + _negativeMoneyFlow.Sum();
        return totalMoneyFlow == 0 ? 100m : 100m * positiveMoneyFlowSum / totalMoneyFlow;
    public override void Reset()
        _previousTypicalPrice = 0m;
class CustomMoneyFlowIndex(PythonIndicator):
    def __init__(self, name, period):
        super().__init__() = name
        self.value = 0
        self.previous_typical_price = 0
        self.negative_money_flow = deque(maxlen=period)
        self.positive_money_flow = deque(maxlen=period)
    def update(self, input):
        if not isinstance(input, TradeBar):
            raise TypeError('CustomMoneyFlowIndex.update: input must be a TradeBar')
        typical_price = (input.high + input.low + input.close) / 3
        money_flow = typical_price * input.volume
        # We need to avoid double rounding errors
        if abs(self.previous_typical_price / typical_price - 1) < 1e-10:
            self.previous_typical_price = typical_price
        self.negative_money_flow.appendleft(money_flow if typical_price < self.previous_typical_price else 0)
        self.positive_money_flow.appendleft(money_flow if typical_price > self.previous_typical_price else 0)
        self.previous_typical_price = typical_price
        positive_money_flow_sum = sum(self.positive_money_flow)        
        total_money_flow = positive_money_flow_sum + sum(self.negative_money_flow)
        self.value = 100
        if total_money_flow != 0:
            self.value *= positive_money_flow_sum / total_money_flow
        return len(self.positive_money_flow) == self.positive_money_flow.maxlen

Create Indicators

You must define a custom indicator before you can create an instance of it.

To create a custom indicator, call the indicator constructor.

private CustomSimpleMovingAverage _sma;

_sma = new CustomSimpleMovingAverage("My SMA", 10);
self.custom_sma = CustomSimpleMovingAverage("My SMA", 10)


The process to update custom indicators is the same process you use to update manual indicators. For more information about updating manual indicators, see Manual Updates or Automatic Updates.

Warm Up Indicators

The process to warm up custom indicators is the same process you use to warm up manual indicators.


The following examples demonstrate some common practices for implementing custom indicators.

Example 1: Custom Money Flow Index

The following algorithm implements a custom Money Flow Index indicator. We estimate the supply-demand balance of SPY and trade using the average money flow direction.

public class CombiningIndicatorsAlgorithm : QCAlgorithm
    private Symbol _spy;
    private CustomMoneyFlowIndex _customMfi;

    public override void Initialize()
        SetStartDate(2022, 1, 1);
        SetEndDate(2022, 6, 1);

        // Request daily SPY data to feed the indicators to generate trade signals and trade.
        _spy = AddEquity("SPY", dataNormalizationMode: DataNormalizationMode.Raw).Symbol;

        // Create a custom money flow index to generate a trade signal.
        _customMfi = new CustomMoneyFlowIndex(20);

        // Warm up for immediate usage of indicators.
        SetWarmUp(20, Resolution.Daily);

    public override void OnData(Slice slice)
        if (slice.Bars.TryGetValue(_spy, out var bar))
            // Update the custom MFI with the updated trade bar to obtain the updated trade signal.

            // Buy if the positive money flow is above the negative, which indicates demand is greater than supply, driving up the price.
            if (_customMfi > 50)
                SetHoldings(_spy, 1);
            // Sell if the positive money flow is below negative, indicating demand is less than supply, driving down the price.
                SetHoldings(_spy, -1);

    public class CustomMoneyFlowIndex : TradeBarIndicator, IIndicatorWarmUpPeriodProvider
        private decimal _previousTypicalPrice;
        private RollingWindow<decimal> _negativeMoneyFlow;
        private RollingWindow<decimal> _positiveMoneyFlow;
        public override bool IsReady => _positiveMoneyFlow.IsReady;
        public int WarmUpPeriod => _positiveMoneyFlow.Size;
        public CustomMoneyFlowIndex(int period) : base("CustomMFI")
            _negativeMoneyFlow = new(period);
            _positiveMoneyFlow = new(period);
            _previousTypicalPrice = 0m;
        protected override decimal ComputeNextValue(TradeBar input)
            // Estimate the money flow by averaging the price multiplied by volume.
            var typicalPrice = (input.High + input.Low + input.Close) / 3;
            var moneyFlow = typicalPrice * input.Volume;
            // We need to avoid double rounding errors.
            _negativeMoneyFlow.Add(typicalPrice < _previousTypicalPrice ? moneyFlow: 0);
            _positiveMoneyFlow.Add(typicalPrice > _previousTypicalPrice ? moneyFlow: 0);
            _previousTypicalPrice = moneyFlow;
            // Add the period money flow to calculate the aggregated money flow.
            var positiveMoneyFlowSum = _positiveMoneyFlow.Sum();
            var totalMoneyFlow = positiveMoneyFlowSum + _negativeMoneyFlow.Sum();
            // Set the value to be the positive money flow ratio.
            return totalMoneyFlow == 0 ? 100m : 100m * positiveMoneyFlowSum / totalMoneyFlow;
        public override void Reset()
            _previousTypicalPrice = 0m;
from collections import deque 

class CustomIndicatorsAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2022, 1, 1)
        self.set_end_date(2022, 6, 1)

        # Request daily SPY data to feed the indicators to generate trade signals and trade.
        self.spy = self.add_equity("SPY").symbol

        # Create a custom money flow index to generate a trade signal.
        self.custom_mfi = CustomMoneyFlowIndex(20)

        # Warm up for immediate usage of indicators.
        self.set_warm_up(20, Resolution.DAILY)

    def on_data(self, slice: Slice) -> None:
        bar = slice.bars.get(self.spy)
        if bar:
            # Update the custom MFI with the updated trade bar to obtain the updated trade signal.

            # Buy if the positive money flow is above negative, indicating demand is greater than supply, driving up the price.
            if self.custom_mfi.current.value > 50:
                self.set_holdings(self.spy, 1)
            # Sell if the positive money flow is below negative, indicating demand is less than supply, driving down the price.
                self.set_holdings(self.spy, -1)

class CustomMoneyFlowIndex(PythonIndicator):
    def __init__(self, period: int) -> None:
        self.value = 0
        self.previous_typical_price = 0
        self.negative_money_flow = deque(maxlen=period)
        self.positive_money_flow = deque(maxlen=period)
    def update(self, input: BaseData) -> bool:
        if not isinstance(input, TradeBar):
            raise TypeError('CustomMoneyFlowIndex.update: input must be a TradeBar')
        # Estimate the money flow by averaging the price multiplied by volume.
        typical_price = (input.high + input.low + input.close) / 3
        money_flow = typical_price * input.volume
        # We need to avoid double-rounding errors.
        if abs(self.previous_typical_price / typical_price - 1) < 1e-10:
            self.previous_typical_price = typical_price
        # Add the period money flow to calculate the aggregated money flow.
        self.negative_money_flow.appendleft(money_flow if typical_price < self.previous_typical_price else 0)
        self.positive_money_flow.appendleft(money_flow if typical_price > self.previous_typical_price else 0)
        self.previous_typical_price = typical_price
        positive_money_flow_sum = sum(self.positive_money_flow)        
        total_money_flow = positive_money_flow_sum + sum(self.negative_money_flow)
        # Set the value to be the positive money flow ratio.
        self.value = 100
        if total_money_flow != 0:
            self.value *= positive_money_flow_sum / total_money_flow
        # Set the is_ready property to receive the required bars to fill all windows.
        return len(self.positive_money_flow) == self.positive_money_flow.maxlen

Example 2: Multiple Symbol Custom Indicator

The following algorithm implements a custom Cointegration price divergence indicator involving 2 symbols: GOOGL & GOOG. It trades the arbitrage between 2 cointegrated stocks on their price convergence after deviating more than 2 standard deviations.

using Accord.Statistics;
using MathNet.Numerics.LinearRegression;

public class CombiningIndicatorsAlgorithm : QCAlgorithm
    private Symbol _goog1, _goog2;

    public override void Initialize()
        SetStartDate(2019, 1, 1);
        SetEndDate(2023, 1, 1);

        // Request 2 classes of Google stock data to feed the indicators and generate trade signals.
        var goog1 = AddEquity("GOOGL").Symbol;
        var goog2 = AddEquity("GOOG").Symbol;

        // Create a custom money flow index to generate a trade signal.
        var cointegration = new Cointegration(this, goog1, goog2, 252, DateRules.MonthStart());
        // Add a handler to trade on updates.
        cointegration.Updated += OnUpdated;
        // Register the indicator to update automatically with daily data.
        RegisterIndicator(goog1, cointegration, Resolution.Daily);
        RegisterIndicator(goog2, cointegration, Resolution.Daily);

        // Warm up for immediate usage of indicators.
        WarmUpIndicator(goog1, cointegration, Resolution.Daily);
        WarmUpIndicator(goog2, cointegration, Resolution.Daily);

    private void OnUpdated(object sender, IndicatorDataPoint point)
        var indicator = (sender as Cointegration);
        if (indicator.IsReady)
            var holding = Portfolio[indicator.Symbol1];

            // If the residual is lower than -2x SD, it means class A price is much higher than what it should be compared to class C.
            // We sell class A and buy class C to bet on their price convergence.
            if (point < -2m && !holding.IsShort)
                SetHoldings(indicator.Symbol1, -0.5m);
                SetHoldings(indicator.Symbol2, 0.5m * indicator.Ratio);
            // If the residual is higher than the threshold, it means class A price is much lower than what it should be compared to class C.
            // We buy class A and sell class C to bet on their price convergence.
            else if (point > 2m && !holding.IsLong)
                SetHoldings(indicator.Symbol1, 0.5m);
                SetHoldings(indicator.Symbol2, -0.5m * indicator.Ratio);
            // Close positions of the price are converged.
            else if ((holding.IsShort && point > 0m) || (holding.IsLong && point < 0m))

public class Cointegration : IndicatorBase<IndicatorDataPoint>, IIndicatorWarmUpPeriodProvider
    // The standard deviation of the residuals, such that the returned indicator value is relative to the SD.
    private decimal _residualSD = 1e10m;
    // Store the coefficient and intercept of the cointegrated series for calculating the spread of a new data point.
    private decimal[] _coefficients = new[] { 0m, 0m };
    private Dictionary<Symbol, RollingWindow<IndicatorDataPoint>> _windows = new();
    private readonly int _period;

    public Symbol Symbol1 { get; set; } 

    public Symbol Symbol2 { get; set; } 
    // The ratio of Symbol2 within the ocintegration relationship compared to Symbol1.
    public decimal Ratio => _coefficients[0];
    public int WarmUpPeriod => _period;

    public override bool IsReady => _windows.All(kvp => kvp.Value.IsReady) && _coefficients.All(x => x != 0m);
    public Cointegration(QCAlgorithm algorithm, Symbol symbol1, Symbol symbol2, int period, IDateRule recalibratingDateRule)
        : base("Cointegration")
        Symbol1 = symbol1;
        Symbol2 = symbol2;

        // Use rolling windows to save the price data for cointegration analysis.
        _windows[symbol1] = new(period);
        _windows[symbol2] = new(period);

        // Adjust the cointegration factor between the 2 classes' price series.
            algorithm.TimeRules.At(23, 59),

        _period = period;
    public override bool Update(IBaseData input)
        // Update the rolling windows for cointegration analysis.
        if (!_windows.TryGetValue(input.Symbol, out var window))
            throw new ArgumentException($"{input.Symbol} is not part of the Cointegration relation.");
        window.Add(new IndicatorDataPoint(input.Symbol, input.EndTime, input.Value));

        var nextResult = ValidateAndComputeNextValue((IndicatorDataPoint)input);
        if (nextResult.Status == IndicatorStatus.Success)
            Current = new IndicatorDataPoint(input.EndTime, nextResult.Value);

        return IsReady;

    protected override IndicatorResult ValidateAndComputeNextValue(IndicatorDataPoint input)
        // If no cointegration relationship is found, an invalid result will always be returned.
        if (_coefficients.All(x => x == 0m))
            return new IndicatorResult(0m, IndicatorStatus.ValueNotReady);
        return new IndicatorResult(ComputeNextValue(input));
    protected override decimal ComputeNextValue(IndicatorDataPoint input)
        // Calculate the updated cointegrated series spread only if all symbol data points are updated.
        if (_windows.All(kvp => kvp.Value[0].EndTime == input.EndTime))
            return (_coefficients[0] * _windows[Symbol2][0].Value + _coefficients[1] - _windows[Symbol1][0].Value) / _residualSD;
        return Current.Value;
    private void CalculateCointegration()
        // Lag direction is unimportant; it is just a sign flip in the linear regression, so we don't need to flip the window order.
        var y = _windows[Symbol1].Select(x => (double)x.Value).ToArray();
        var x = _windows[Symbol2].Select(x => (double)x.Value).ToArray();
        if (x.Length < 2 || y.Length < 2)

        // Perform Linear Regression on both price series to investigate their relationship.
        var regressionResult = SimpleRegression.Fit(x, y);
        var intercept = regressionResult.Item1;
        var slope = regressionResult.Item2;

        // Calculate the residuals series to check if it is stationary, meaning if the 2 price series move together.
        var residuals = new double[x.Length];
        for (int i = 0; i < x.Length; i++)
            residuals[i] = y[i] - (intercept + slope * x[i]);

        // Check if the residuals are stationary using the augmented Dickey-Fuller test.
        if (ADFTest(residuals, Convert.ToDouble(_period)))
            // If cointegrated, update the positional sizing ratio and the spread threshold of the trade trigger.
            _coefficients = new[] { Convert.ToDecimal(slope), Convert.ToDecimal(intercept) };
            _residualSD = Convert.ToDecimal(Measures.StandardDeviation(residuals));
            _coefficients = new[] { 0m, 0m };
            _residualSD = 1e10m;             // An arbitrarily large number that the class A price will never reach.

    private static bool ADFTest(double[] series, double N)
        var n = series.Length;
        var lagged = new double[n - 1];
        var differences = new double[n - 1];

        // Fit linear regression for the residual series on unit root: ΔY_t = α + βY_{t-1} + ε_t.
        for (int i = 1; i < n; i++)
            lagged[i - 1] = series[i - 1];
            differences[i - 1] = series[i] - series[i - 1];

        var regressionResult = SimpleRegression.Fit(lagged, differences);
        var alpha = regressionResult.Item1;  // Intercept
        var beta = regressionResult.Item2;   // Coefficient of lagged term

        // Calculate the ADF statistic and check if the null hypothesis is rejected.
        var adfStatistic = beta / Measures.StandardError(differences);

        // Reject the null hypothesis of a unit root is present if test statistic <= critical value
        //This means no unit root exists for the difference series, and the residuals are stationary.
        var critical = -1.941d + -0.2686d / N + -3.365d / N / N + 31.223d / N / N / N;
        return adfStatistic <= critical;
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.stattools import adfuller

class CustomIndicatorsAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2019, 1, 1)
        self.set_end_date(2023, 1, 1)

        # Request 2 classes of Google stock data to feed the indicators to generate trade signals and trade.
        goog1 = self.add_equity("GOOGL").symbol
        goog2 = self.add_equity("GOOG").symbol

        # Create a custom money flow index to generate a trade signal.
        self.cointegration = Cointegration(self, goog1, goog2, 252, self.date_rules.month_start())
        # Add a handler to trade on updates.
        self.cointegration.updated += self.on_updated
        # Register the indicator to update automatically with daily data.
        self.register_indicator(goog1, self.cointegration, Resolution.DAILY)
        self.register_indicator(goog2, self.cointegration, Resolution.DAILY)

        # Warm up for immediate usage of indicators.
        self.warm_up_indicator(goog1, self.cointegration, Resolution.DAILY)
        self.warm_up_indicator(goog2, self.cointegration, Resolution.DAILY)

    def on_updated(self, sender: object, point: IndicatorDataPoint) -> None:
        if sender.is_ready:
            holding = self.portfolio[self.cointegration.symbol1]

            # If the residual is lower than -2x SD, it means class A price is much higher than what it should be compared to class C.
            # We sell class A and buy class C to bet on their price convergence.
            if point.value < -2 and not holding.is_short:
                self.set_holdings(self.cointegration.symbol1, -0.5)
                self.set_holdings(self.cointegration.symbol2, 0.5 * self.cointegration.ratio)
            # If the residual is higher than the threshold, it means class A price is much lower than what it should be compared to class C.
            # We buy class A and sell class C to bet on their price convergence.
            elif point.value > 2 and not holding.is_long:
                self.set_holdings(self.cointegration.symbol1, 0.5)
                self.set_holdings(self.cointegration.symbol2, -0.5 * self.cointegration.ratio)
            # Close positions of the price are converged.
            elif (holding.is_short and point.value > 0) or (holding.is_long and point.value < 0):

class Cointegration(PythonIndicator):
    # The standard deviation of the residuals is such that the returned indicator value is relative to the SD.
    _residual_sd = 1e10
    # Store the coefficient and intercept of the cointegrated series for calculating the spread of a new data point.
    _coefficients = [0, 0]
    _windows = {}

    def ratio(self) -> float:
        return self._coefficients[0]

    def is_ready(self) -> bool:
        return all(window.is_ready for window in self._windows.values()) and all(x != 0 for x in self._coefficients)

    def __init__(self, algorithm: QCAlgorithm, symbol1: Symbol, symbol2: Symbol, period: int, recalibrating_date_rule: IDateRule) -> None:
        self.symbol1 = symbol1
        self.symbol2 = symbol2
        self.value = 0

        # Use rolling windows to save the price data for cointegration analysis.
        self._windows[symbol1] = RollingWindow[IndicatorDataPoint](period)
        self._windows[symbol2] = RollingWindow[IndicatorDataPoint](period)

        # Adjust the cointegration factor between the 2 classes' price series.
  , 59), 

        self.warm_up_period = period
    def update(self, input: BaseData) -> bool:
        # Update the rolling windows for cointegration analysis.
        window = self._windows.get(input.symbol)
        if not window:
            raise Exception(f"{input.Symbol} is not part of the Cointegration relation.")
        window.add(IndicatorDataPoint(input.symbol, input.end_time, input.value))

        if not all(x == 0 for x in self._coefficients):
            # Calculate the updated cointegrated series spread only if all symbol data points are updated.
            if all(window[0].end_time == input.end_time for window in self._windows.values()):
                self.value = (self._coefficients[0] * self._windows[self.symbol2][0].value + self._coefficients[1] - self._windows[self.symbol1][0].value) / self._residual_sd

        return self.is_ready

    def calculate_cointegration(self) -> None:
        # Lag direction is unimportant; it is just a sign flip in the linear regression, so we don't need to flip the window order.
        y = np.array([x.value for x in self._windows[self.symbol1]]).reshape(-1, 1)
        x = np.array([x.value for x in self._windows[self.symbol2]]).reshape(-1, 1)

        # Perform Linear Regression on both price series to investigate their relationship.
        lr = LinearRegression().fit(x, y)
        slope = float(lr.coef_[0])
        intercept = float(lr.intercept_)

        # Calculate the residuals series to check if it is stationary, meaning if the 2 price series move together.
        residuals = y - (intercept + slope * x)

        # Check if the residuals are stationary using the augmented Dickey-Fuller test.
        # This means no unit root exists for the difference series, and the residuals are stationary.
        critical = -1.941 + -0.2686 / self.warm_up_period + -3.365 / self.warm_up_period**2 + 31.223 / self.warm_up_period**3
        adf_reject = adfuller(residuals)[0] <= -3.45
        if adf_reject:
            # If cointegrated, update the positional sizing ratio and the spread threshold of the trade trigger.
            self._coefficients = [slope, intercept]
            self._residual_sd = float(np.std(residuals))
            self._coefficients = [0, 0]
            self._residual_sd = 100000000          # An arbitrarily large number that the class A price will never reach.

Other Examples

For more examples, see the following algorithms:

Demonstration Algorithms Python

