Overall Statistics
# https://quantpedia.com/Screener/Details/12
# https://www.quantconnect.com/tutorials/strategy-library/pairs-trading-with-stocks

import numpy as np
import pandas as pd
from scipy import stats
from math import floor
from datetime import timedelta
from collections import deque
import itertools as it
from decimal import Decimal

# ! 1. Should we use the rolling window to constantly update our model and the deviations?
# ! 2. Should we trade more than 20 stocks at a time?
# ! 3. Should we find some other formulas to replace the Pearson's correlation in order to find the more correlated paired stocks?
# ! 4. Should we expand the **whole** universe instead of only looking at the stocks in S&P 500?


class PairsTradingAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018,1,5)
        self.SetEndDate(2021,7,1)
        self.SetCash(100000)

        self.SetUniverseSelection(QC500UniverseSelectionModel())
        tickers = [
            'XLK', 'QQQ', 'BANC', 'BBVA', 'BBD', 'BCH', 'BLX', 'BSBR', 'BSAC', 'SAN',
            'CIB', 'BXS', 'BAC', 'BOH', 'BMO', 'BK', 'BNS', 'BKU', 'BBT','NBHC', 'OFG',
            'BFR', 'CM', 'COF', 'C', 'VLY', 'WFC', 'WAL', 'WBK','RBS', 'SHG', 'STT', 'STL',
            'SCNB', 'SMFG', 'STI',
            # 'DKT', 'DB', 'EVER', 'KB', 'KEY', , 'MTB', 'BMA', 'MFCB', 'MSL', 'MTU', 'MFG',
            # 'PVTD', 'PB', 'PFS', 'RF', 'RY', 'RBS', 'SHG', 'STT', 'STL', 'SCNB', 'SMFG', 'STI',
            # 'SNV', 'TCB', 'TD', 'USB', 'UBS', 'VLY', 'WFC', 'WAL', 'WBK', 'WF', 'YDKN', 'ZBK'
        ]
        self.threshold = 2
        self.symbols = []
        self.symbols_has_his = []
        for i in tickers:
            self.symbols.append(self.AddEquity(i, Resolution.Daily).Symbol)

        self.pairs = {}
        self.formation_period = 252

        self.history_price = {}
        self.symbols_has_his = self.symbols.copy()
        for symbol in self.symbols:
            hist = self.History([symbol], self.formation_period+1, Resolution.Daily)
            if hist.empty:
                self.symbols_has_his.remove(symbol)
            else:
                self.history_price[str(symbol)] = deque(maxlen=self.formation_period)
                for tuple in hist.loc[str(symbol)].itertuples():
                    self.history_price[str(symbol)].append(float(tuple.close))
                if len(self.history_price[str(symbol)]) < self.formation_period:
                    self.symbols_has_his.remove(symbol)
                    self.history_price.pop(str(symbol))

        self.symbol_pairs = list(it.combinations(self.symbols_has_his, 2))
        # Add the benchmark
        self.AddEquity("SPY", Resolution.Daily)
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), self.Rebalance)
        self.count = 0
        self.sorted_pairs = None


    def OnData(self, data):
        # Update the price series everyday
        for symbol in self.symbols_has_his:
            if data.Bars.ContainsKey(symbol) and str(symbol) in self.history_price:
                self.history_price[str(symbol)].append(float(data[symbol].Close))

        if self.sorted_pairs is None: return

        for i in self.sorted_pairs:
            # calculate the spread of two price series
            spread = np.array(self.history_price[str(i[0])]) - np.array(self.history_price[str(i[1])])
            mean = np.mean(spread)
            std = np.std(spread)
            ratio = self.Portfolio[i[0]].Price / self.Portfolio[i[1]].Price
            # long-short position is opened when pair prices have diverged by two standard deviations
            if spread[-1] > mean + self.threshold * std:
                if not self.Portfolio[i[0]].Invested and not self.Portfolio[i[1]].Invested:
                    quantity = int(self.CalculateOrderQuantity(i[0], 0.2))
                    self.Sell(i[0], quantity)
                    self.Buy(i[1],  floor(ratio*quantity))

            elif spread[-1] < mean - self.threshold * std:
                quantity = int(self.CalculateOrderQuantity(i[0], 0.2))
                if not self.Portfolio[i[0]].Invested and not self.Portfolio[i[1]].Invested:
                    self.Sell(i[1], quantity)
                    self.Buy(i[0], floor(ratio*quantity))

            # the position is closed when prices revert back
            elif self.Portfolio[i[0]].Invested and self.Portfolio[i[1]].Invested:
                    self.Liquidate(i[0])
                    self.Liquidate(i[1])


    def Rebalance(self):
        # schedule the event to fire every half year to select pairs with the smallest historical distance
        if self.count % 6 == 0:
            distances = {}
            for i in self.symbol_pairs:
                distances[i] = Pair(i[0], i[1], self.history_price[str(i[0])],  self.history_price[str(i[1])]).distance()
                self.sorted_pairs = sorted(distances, key = lambda x: distances[x])[:4]
        self.count += 1

class Pair:
    def __init__(self, symbol_a, symbol_b, price_a, price_b):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = price_a
        self.price_b = price_b

    def distance(self):
        # calculate the sum of squared deviations between two normalized price series
        norm_a = np.array(self.price_a)/self.price_a[0]
        norm_b = np.array(self.price_b)/self.price_b[0]
        return sum((norm_a - norm_b)**2)
import numpy as np
import pandas as pd

class DistanceApproachHighVariancePair:
    threshold = None
    def __init__(
        self,
        symbol_a:str, symbol_b:str,
        price_a, price_b,
        threshold=1.5
        ):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = np.array(price_a)
        self.price_b = np.array(price_b)
        self.threshold = threshold
        self.distance = self._distance()
        self.var = self._variance()
        self.hist_dev = self._dev()

    def _distance(self):
        # calculate the sum of squared deviations between two normalized price series
        return np.square(self.price_a - self.price_b).sum()

    def _variance(self):
        spread = self.price_a - self.price_b
        mean = np.mean(spread)
        var = np.sum(np.square(spread - mean)) / (len(spread) - 1)
        return var

    def _dev(self):
        spread = self.price_a - self.price_b
        mean = np.mean(spread)
        dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
        return dev
from AlgorithmImports import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from itertools import groupby
from math import ceil

class QC500UniverseSelectionModel(FundamentalUniverseSelectionModel):
    '''Defines the QC500 universe as a universe selection model for framework algorithm
    For details: https://github.com/QuantConnect/Lean/pull/1663'''

    def __init__(self, filterFineData = True, universeSettings = None, numOfUniverse = 500):
        '''Initializes a new default instance of the QC500UniverseSelectionModel'''
        super().__init__(filterFineData, universeSettings)
        self.numberOfSymbolsCoarse = 1000
        self.numberOfSymbolsFine = numOfUniverse
        self.dollarVolumeBySymbol = {}
        self.lastMonth = -1

    def SelectCoarse(self, algorithm, coarse):
        '''Performs coarse selection for the QC500 constituents.
        The stocks must have fundamental data
        The stock must have positive previous-day close price
        The stock must have positive volume on the previous trading day'''
        if algorithm.Time.month == self.lastMonth:
            return Universe.Unchanged

        sortedByDollarVolume = sorted(
            [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
            key = lambda x: x.DollarVolume,
            reverse=True
        )[:self.numberOfSymbolsCoarse]

        self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}

        # If no security has met the QC500 criteria, the universe is unchanged.
        # A new selection will be attempted on the next trading day as self.lastMonth is not updated
        if len(self.dollarVolumeBySymbol) == 0:
            return Universe.Unchanged

        # return the symbol objects our sorted collection
        return list(self.dollarVolumeBySymbol.keys())

    def SelectFine(self, algorithm, fine):
        '''Performs fine selection for the QC500 constituents
        The company's headquarter must in the U.S.
        The stock must be traded on either the NYSE or NASDAQ
        At least half a year since its initial public offering
        The stock's market cap must be greater than 500 million'''

        sortedBySector = sorted(
            [x for x in fine if x.CompanyReference.CountryId == "USA"
                and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
                and (algorithm.Time - x.SecurityReference.IPODate).days > 180
                and x.MarketCap > 5e8],
            key = lambda x: x.CompanyReference.IndustryTemplateCode
        )

        count = len(sortedBySector)

        # If no security has met the QC500 criteria, the universe is unchanged.
        # A new selection will be attempted on the next trading day as self.lastMonth is not updated
        if count == 0:
            return Universe.Unchanged

        # Update self.lastMonth after all QC500 criteria checks passed
        self.lastMonth = algorithm.Time.month

        percent = self.numberOfSymbolsFine / count
        sortedByDollarVolume = []

        # select stocks with top dollar volume in every single sector
        for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode):
            y = sorted(
                g,
                key = lambda x: self.dollarVolumeBySymbol[x.Symbol],
                reverse = True
            )
            c = ceil(len(y) * percent)
            sortedByDollarVolume.extend(y[:c])

        sortedByDollarVolume = sorted(
            sortedByDollarVolume,
            key = lambda x: self.dollarVolumeBySymbol[x.Symbol],
            reverse=True
        )
        return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
from AlgorithmImports import *
import itertools as it
import numpy as np
import pandas as pd
from enum import Enum
from sklearn import linear_model


class PairStatus(Enum):
    '''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
    Short = -1  # Status to keep shorting
    Flat = 0    # Status to close position
    Long = 1    # Status to keep Long

class DistanceApproachPearsonCorrelationPair:
    threshold = None
    def __init__(
        self,
        symbol_a:str, symbol_b:str,
        price_a, price_b,
        threshold=1.5
        ):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = np.array(price_a)
        self.price_b = np.array(price_b)
        self.threshold = threshold
        self.distance = self._pearson_coef()

    def _pearson_coef(self):
        # To get the exact correlation from the corr matrix
        return_a = np.diff(np.log(self.price_a), axis=0).flatten()
        return_b = np.diff(np.log(self.price_b), axis=0).flatten()
        coef = np.corrcoef(return_a, return_b)[0][1]
        return coef

class DistanceApproachPearsonCorrelationAlphaModel(AlphaModel):
    def __init__(
        self,
        capacity = 20,
        resolution = Resolution.Daily
    ):
        self.resolution = resolution
        self.symbol_pairs = []
        self.formation_period = 22 * 12
        self.threshold = 1.5
        self.historical_data = dict()
        self.invested_long_list = list()
        self.invested_short_list = list()
        self._changes = None
        self.capacity = capacity
        self.lastChange = None

        # resolutionString = Extensions.GetEnumString(resolution, Resolution)
        # self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)

    def _create_pairs(self, symbols):
        # Creating the pairs
        return list(it.combinations(symbols, 2))

    def Update(self, algorithm, data):

        algorithm.Debug(f'Time is {algorithm.Time}')
        insights = []

        if self._changes is not None:
            for added in self._changes.AddedSecurities:
                # * Dealing with new added securities, need to
                # * 1. Create the RollingWindow
                # * 2. Add the data into the RollingWindow (Up to today)

                if added.Symbol not in self.historical_data.keys():
                    hist = algorithm.History(
                        [added.Symbol],
                        self.formation_period,
                        Resolution.Daily
                    )
                    if not hist.empty:
                        self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
                        for bar in hist.loc[added.Symbol, :].itertuples():
                            self.historical_data[added.Symbol].Add(bar.close)
                        if not self.historical_data[added.Symbol].IsReady:
                            del self.historical_data[added.Symbol]
                        # algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
                    else:
                        algorithm.Debug('ERROR: Hist is empty')

            for removed in self._changes.RemovedSecurities:
                # * Dealing with securities removed, need to
                # * 1. Check if the removed securities are in the invested list
                # * 2. If yes, keep add the latest data into the RollingWindow
                # * 3. If not, then we remove this RollingWindow
                invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
                if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
                    # algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
                    del self.historical_data[removed.Symbol]

        for s in self.historical_data.keys():
            # * Dealing with already existed securities, need to
            # * 1. Add the latest data into the RollingWindow
            # ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
            if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
                continue

            hist = algorithm.History(
                [s],
                5,
                Resolution.Daily
            )

            # Somehow there are 'close' missing in the returned history.
            # Therefore adding this logic to prevent this scenario crash
            if 'close' in hist.columns:
                self.historical_data[s].Add(hist['close'][-1])
            else:
                self.historical_data[s].Add(self.historical_data[s][0])

            # algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
            # algorithm.Debug(self.historical_data[s][0])
            # algorithm.Debug(self.historical_data[s][1])
            # algorithm.Debug(self.historical_data[s][2])

        # We update date daily, but process data to signals monthly
        if self.lastChange == algorithm.Time.month:
            return insights
        self.lastChange = algorithm.Time.month

        self.symbol_pairs = self._create_pairs(self.historical_data.keys())
        # algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')

        # Calculate distance and then sort by distance
        distances = {}
        for i in self.symbol_pairs:
            distances[i] = DistanceApproachPearsonCorrelationPair(
                i[0],
                i[1],
                self._normalize_data(self.historical_data[i[0]]),
                self._normalize_data(self.historical_data[i[1]])
            )
            # algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')

        sorted_pairs = {k:v for k,v in sorted(
            distances.items(),
            key = lambda x: (x[1].distance)
        )}

        # For each security, we add the highest 50 correlated securities to the list
        pearson_pair_table = dict()

        for _, pair in sorted_pairs.items():
            if pair.symbol_a not in pearson_pair_table.keys():
                pearson_pair_table[pair.symbol_a] = [pair.symbol_b]
            else:
                if len(pearson_pair_table[pair.symbol_a]) < 50:
                    pearson_pair_table[pair.symbol_a].append(pair.symbol_b)
            if pair.symbol_b not in pearson_pair_table.keys():
                pearson_pair_table[pair.symbol_b] = [pair.symbol_a]
            else:
                if len(pearson_pair_table[pair.symbol_b]) < 50:
                    pearson_pair_table[pair.symbol_b].append(pair.symbol_a)


        self.price_dataframe = pd.concat(
            [self._dataframelize(self.historical_data[symbol], symbol) for symbol in self.historical_data.keys()],
            axis=1
        )

        divergence = dict()
        for stock, benchmark_portfolio in pearson_pair_table.items():
            p = self.price_dataframe[stock]
            stock_rtn = np.log(p.pct_change()+1)
            # Equal weight to construct this benchmark portfolio return
            benchmark_rtn = (self.price_dataframe[benchmark_portfolio].apply(lambda x: np.log(x.pct_change()+1), axis=0).sum(axis=1)/len(benchmark_portfolio))

            regr = linear_model.LinearRegression()
            # a stock’s return, Lret, deviates from its pairs portfolioreturns, Cret.
            # RetDiff = beta ∗ (Cret-Rf) − (Lret−Rf)
            cret = np.array(benchmark_rtn).copy()
            lret = np.array(stock_rtn).reshape((-1,1)).copy()
            # Remove last cell which is na
            cret = cret[1:]
            lret = lret[1:]
            cret_constant = np.array([list(x) for x in zip(cret, [1]*len(cret))])
            regr.fit(cret_constant, lret)
            beta = regr.coef_[0][0]
            divergence[stock] = beta * benchmark_rtn.iloc[-1] - stock_rtn.iloc[-1]

        # Cleaning the pair and Create insights
        divergence = {k:v for k, v in sorted(
            divergence.items(),
            key = lambda item: item[1]
        )}

        # Start submitting the singal
        insightExpiry = Expiry.EndOfDay(algorithm.Time)
        new_long_list = list(divergence.keys())[:self.capacity]
        new_short_list = list(divergence.keys())[-self.capacity:]
        flat_list = [s for s in self.invested_long_list if s not in new_long_list] + [s for s in self.invested_short_list if s not in new_short_list]
        self.invested_long_list = new_long_list
        self.invested_short_list = new_short_list

        for symbol in flat_list:
            insights.append(
                Insight.Price(
                    str(symbol),
                    insightExpiry,
                    InsightDirection.Flat,
                    None, None, None, 0.05
                )
            )

        for symbol in self.invested_short_list:
            insights.append(
                Insight.Price(
                    str(symbol),
                    insightExpiry,
                    InsightDirection.Down,
                    None, None, None, 0.05
                )
            )

        for symbol in self.invested_long_list:
            insights.append(
                Insight.Price(
                    str(symbol),
                    insightExpiry,
                    InsightDirection.Up,
                    None, None, None, 0.05
                )
            )

        # Reset the changes
        self._changes = None

        # algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
        return insights


    def OnSecuritiesChanged(self, algorithm, changes):
        self._changes = changes

    def _normalize_data(self, series):
        # Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
        arr = np.array([x for x in series.GetEnumerator()])
        max = arr[1:].max()
        min = arr[1:].min()
        return (arr - min) / (max - min)

    def _dataframelize(self, price_rolling_window, symbol):
        l = [n for n in price_rolling_window]
        l.reverse()
        df = pd.DataFrame(l, columns=[symbol])
        return df
from AlgorithmImports import *
import itertools as it
import numpy as np
from enum import Enum

# * 1. Variants of how to rank the distance + high variance pair
# * 1.1. Should we use IR factor score
# * 1.2. or use linear regression?

class PairStatus(Enum):
    '''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
    Short = -1  # Status to keep shorting
    Flat = 0    # Status to close position
    Long = 1    # Status to keep Long

class DistanceApproachHighVariancePair:
    threshold = None
    def __init__(
        self,
        symbol_a:str, symbol_b:str,
        price_a, price_b,
        threshold=1.5
        ):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = np.array(price_a)
        self.price_b = np.array(price_b)
        self.threshold = threshold
        self.distance = self._distance()
        self.var = self._variance()
        self.hist_dev = self._dev()
        self.current_spread = self._spread()

    def _distance(self):
        # calculate the sum of squared deviations between two normalized price series
        return np.square(self.price_a - self.price_b).sum()

    def _variance(self):
        spread = self.price_a - self.price_b
        mean = np.mean(spread)
        var = np.sum(np.square(spread - mean)) / (len(spread) - 1)
        return var

    def _dev(self):
        spread = self.price_a - self.price_b
        mean = np.mean(spread)
        dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
        return dev

    def _spread(self):
        spread = self.price_a[0] - self.price_b[0]
        return spread

class DistanceApproachHighVarianceAlphaModel(AlphaModel):
    def __init__(
        self,
        capacity = 20,
        resolution = Resolution.Daily
    ):
        self.resolution = resolution
        self.symbol_pairs = []
        self.formation_period = 22 * 12
        self.threshold = 1.5
        self.max_pair_percentage = 0.3
        self.historical_data = dict()
        self.invested_pairs = dict()
        self._changes = None
        self.capacity = capacity

        # resolutionString = Extensions.GetEnumString(resolution, Resolution)
        # self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)

    def _create_pairs(self, symbols):
        # Creating the pairs
        return list(it.combinations(symbols, 2))

    def Update(self, algorithm, data):
        # * Example of launching the update on the exact date time
        # if algorithm.Time.hour == 9 and algorithm.Time.minute == 31:
        #     return [Insight.Price("SPY", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.5)]
        # return []

        algorithm.Debug(f'Time is {algorithm.Time}')

        insights = []

        if self._changes is not None:
            for added in self._changes.AddedSecurities:
                # * Dealing with new added securities, need to
                # * 1. Create the RollingWindow
                # * 2. Add the data into the RollingWindow (Up to today)

                if added.Symbol not in self.historical_data.keys():
                    hist = algorithm.History(
                        [added.Symbol],
                        self.formation_period,
                        Resolution.Daily
                    )
                    if not hist.empty:
                        self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
                        for bar in hist.loc[added.Symbol, :].itertuples():
                            self.historical_data[added.Symbol].Add(bar.close)
                        if not self.historical_data[added.Symbol].IsReady:
                            del self.historical_data[added.Symbol]
                        # algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
                    else:
                        algorithm.Debug('ERROR: Hist is empty')

            for removed in self._changes.RemovedSecurities:
                # * Dealing with securities removed, need to
                # * 1. Check if the removed securities are in the invested list
                # * 2. If yes, keep add the latest data into the RollingWindow
                # * 3. If not, then we remove this RollingWindow
                invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
                if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
                    # algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
                    del self.historical_data[removed.Symbol]

        for s in self.historical_data.keys():
            # * Dealing with already existed securities, need to
            # * 1. Add the latest data into the RollingWindow
            # ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
            if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
                continue

            hist = algorithm.History(
                [s],
                5,
                Resolution.Daily
            )

            # Somehow there are 'close' missing in the returned history.
            # Therefore adding this logic to prevent this scenario crash
            if 'close' in hist.columns:
                self.historical_data[s].Add(hist['close'][-1])
            else:
                self.historical_data[s].Add(self.historical_data[s][0])

            # algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
            # algorithm.Debug(self.historical_data[s][0])
            # algorithm.Debug(self.historical_data[s][1])
            # algorithm.Debug(self.historical_data[s][2])

        self.symbol_pairs = self._create_pairs(self.historical_data.keys())
        # algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')

        # Calculate distance and then sort by distance
        distances = {}
        for i in self.symbol_pairs:
            distances[i] = DistanceApproachHighVariancePair(
                i[0],
                i[1],
                self._normalize_data(self.historical_data[i[0]]),
                self._normalize_data(self.historical_data[i[1]])
            )
            # algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')

        sorted_pairs = {k:v for k, v in sorted(
            distances.items(),
            key = lambda x: (x[1].distance)
        )[:int(len(distances) * self.max_pair_percentage)]}

        sorted_pairs = {k:v for k, v in sorted(
            sorted_pairs.items(),
            key = lambda x: x[1].var,
            reverse = True
        )}

        # Cleaning the pair and Create insights
        insightExpiry = Expiry.EndOfDay(algorithm.Time)
        for k, v in sorted_pairs.items():

            invested_stock_symbols = {paired_key[0] for paired_key in self.invested_pairs.keys()} | {paired_key[1] for paired_key in self.invested_pairs.keys()}

            # Taking care of the delisting scenario
            if data.Delistings.ContainsKey(str(k[0])) or data.Delistings.ContainsKey(str(k[1])):
                self.invested_pairs[k] = PairStatus.Flat
                insights.append(
                    Insight.Price(
                        str(k[0]),
                        insightExpiry,
                        InsightDirection.Flat,
                        None, None, None, 0.05    # Weight
                    )
                )
                insights.append(
                    Insight.Price(
                        str(k[1]),
                        insightExpiry,
                        InsightDirection.Flat,
                        None, None, None, 0.05    # Weight
                    ),
                )
                # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
                del self.invested_pairs[k]
                del self.historical_data[k[0]]
                del self.historical_data[k[1]]
                continue

            if k[0] not in invested_stock_symbols and k[1] not in invested_stock_symbols:
                if len(self.invested_pairs) >= self.capacity:
                    # algorithm.Debug(f'Invested_pairs length = {len(self.invested_pairs)}')
                    continue
                if v.current_spread >= self.threshold * v.hist_dev:
                    self.invested_pairs[k] = PairStatus.Short

                    insights.append(
                        Insight.Price(
                            str(k[0]),
                            # timedelta(days=1),
                            insightExpiry,
                            InsightDirection.Down,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    insights.append(
                        Insight.Price(
                            str(k[1]),
                            insightExpiry,
                            InsightDirection.Up,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Short ({len(insights)})')
                elif v.current_spread <= -(self.threshold * v.hist_dev):
                    self.invested_pairs[k] = PairStatus.Long
                    insights.append(
                        Insight.Price(
                            str(k[0]),
                            insightExpiry,
                            InsightDirection.Up,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    insights.append(
                        Insight.Price(
                            str(k[1]),
                            insightExpiry,
                            InsightDirection.Down,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Long ({len(insights)})')
                else:
                    pass
            else:
                if k not in self.invested_pairs.keys():
                    continue

                if self.invested_pairs[k] == PairStatus.Long:
                    if v.current_spread < 0:
                        # self.invested_pairs[k] = PairStatus.Long
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[0]),
                        #         insightExpiry,
                        #         InsightDirection.Up,
                        #         None, None, None, 0.05    # Weight
                        #     )
                        # )
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[1]),
                        #         insightExpiry,
                        #         InsightDirection.Down,
                        #         None, None, None, 0.05    # Weight
                        #     ),
                        # )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Long ({len(insights)})')
                        pass
                    # elif v.current_spread >= self.threshold * v.hist_dev:
                    #     self.invested_pairs[k] = PairStatus.Short
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[0]),
                    #             insightExpiry,
                    #             InsightDirection.Down,
                    #             None, None, None, 0.05    # Weight
                    #         )
                    #     )
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[1]),
                    #             insightExpiry,
                    #             InsightDirection.Up,
                    #             None, None, None, 0.05    # Weight
                    #         ),
                    #     )
                    #     # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Short ({len(insights)})')
                    else:
                        self.invested_pairs[k] = PairStatus.Flat
                        insights.append(
                            Insight.Price(
                                str(k[0]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            )
                        )
                        insights.append(
                            Insight.Price(
                                str(k[1]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            ),
                        )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
                        del self.invested_pairs[k]
                        del self.historical_data[k[0]]
                        del self.historical_data[k[1]]
                elif self.invested_pairs[k] == PairStatus.Short:
                    if v.current_spread > 0:
                        # self.invested_pairs[k] = PairStatus.Short
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[0]),
                        #         insightExpiry,
                        #         InsightDirection.Down,
                        #         None, None, None, 0.05    # Weight
                        #     )
                        # )
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[1]),
                        #         insightExpiry,
                        #         InsightDirection.Up,
                        #         None, None, None, 0.05    # Weight
                        #     ),
                        # )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Short ({len(insights)})')
                        pass
                    # elif v.current_spread <= -(self.threshold * v.hist_dev):
                    #     self.invested_pairs[k] = PairStatus.Long
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[0]),
                    #             insightExpiry,
                    #             InsightDirection.Up,
                    #             None, None, None, 0.05    # Weight
                    #         )
                    #     )
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[1]),
                    #             insightExpiry,
                    #             InsightDirection.Down,
                    #             None, None, None, 0.05    # Weight
                    #         ),
                    #     )
                    #     # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Long ({len(insights)})')
                    else:
                        self.invested_pairs[k] = PairStatus.Flat
                        insights.append(
                            Insight.Price(
                                str(k[0]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            )
                        )
                        insights.append(
                            Insight.Price(
                                str(k[1]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            ),
                        )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Close ({len(insights)})')
                        del self.invested_pairs[k]
                        del self.historical_data[k[0]]
                        del self.historical_data[k[1]]

            # algorithm.Debug(f'{str(k[0])}/{str(k[1])}({v.distance}): {v.current_spread} - {self.threshold * v.hist_dev}')

        # Reset the changes
        self._changes = None

        # algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        self._changes = changes

    def _normalize_data(self, series):
        # Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
        arr = np.array([x for x in series.GetEnumerator()])
        max = arr[1:].max()
        min = arr[1:].min()
        return (arr - min) / (max - min)
from AlgorithmImports import *
import itertools as it
import numpy as np
from enum import Enum

# * We don't do Long->Short and Short->Long
# * For these actions, we will close the existing positions and will open the new positions in the next day

class PairStatus(Enum):
    '''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
    Short = -1  # Status to keep shorting
    Flat = 0    # Status to close position
    Long = 1    # Status to keep Long

class DistanceApproachPair:

    def __init__(self,
        symbol_a, symbol_b,
        price_a, price_b,
        threshold=1.5
        ):
        self.threshold = threshold
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = np.array(price_a)
        self.price_b = np.array(price_b)
        self.distance = self._distance()
        self.hist_dev = self._dev()
        self.current_spread = self._spread()

    def _distance(self):
        # calculate the sum of squared deviations between two normalized price series
        return np.square(self.price_a[1:] - self.price_b[1:]).sum()

    def _dev(self):
        spread = self.price_a[1:] - self.price_b[1:]
        mean = np.mean(spread)
        dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
        return dev

    def _spread(self):
        spread = self.price_a[0] - self.price_b[0]
        return spread

class BasicDistanceApproachAlphaModel(AlphaModel):
    def __init__(
        self,
        capacity = 10,
        resolution = Resolution.Daily
    ):
        self.resolution = resolution
        self.symbol_pairs = []
        self.formation_period = 22 * 12
        self.threshold = 1.5
        self.historical_data = dict()
        self.invested_pairs = dict()
        self._changes = None
        self.capacity = capacity

        # resolutionString = Extensions.GetEnumString(resolution, Resolution)
        # self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)

    def _create_pairs(self, symbols):
        # Creating the pairs
        return list(it.combinations(symbols, 2))

    def Update(self, algorithm, data):
        # * Example of launching the update on the exact date time
        # if algorithm.Time.hour == 9 and algorithm.Time.minute == 31:
        #     return [Insight.Price("SPY", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.5)]
        # return []

        algorithm.Debug(f'Time is {algorithm.Time}')

        insights = []

        if self._changes is not None:
            for added in self._changes.AddedSecurities:
                # * Dealing with new added securities, need to
                # * 1. Create the RollingWindow
                # * 2. Add the data into the RollingWindow (Up to today)

                if added.Symbol not in self.historical_data.keys():
                    hist = algorithm.History(
                        [added.Symbol],
                        self.formation_period,
                        Resolution.Daily
                    )
                    if not hist.empty:
                        self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
                        for bar in hist.loc[added.Symbol, :].itertuples():
                            self.historical_data[added.Symbol].Add(bar.close)
                        if not self.historical_data[added.Symbol].IsReady:
                            del self.historical_data[added.Symbol]
                        # algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
                    else:
                        algorithm.Debug('ERROR: Hist is empty')

            for removed in self._changes.RemovedSecurities:
                # * Dealing with securities removed, need to
                # * 1. Check if the removed securities are in the invested list
                # * 2. If yes, keep add the latest data into the RollingWindow
                # * 3. If not, then we remove this RollingWindow
                invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
                if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
                    # algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
                    del self.historical_data[removed.Symbol]

        for s in self.historical_data.keys():
            # * Dealing with already existed securities, need to
            # * 1. Add the latest data into the RollingWindow
            # ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
            if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
                continue

            hist = algorithm.History(
                [s],
                5,
                Resolution.Daily
            )

            # Somehow there are 'close' missing in the returned history.
            # Therefore adding this logic to prevent this scenario crash
            if 'close' in hist.columns:
                self.historical_data[s].Add(hist['close'][-1])
            else:
                self.historical_data[s].Add(self.historical_data[s][0])

            # algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
            # algorithm.Debug(self.historical_data[s][0])
            # algorithm.Debug(self.historical_data[s][1])
            # algorithm.Debug(self.historical_data[s][2])

        self.symbol_pairs = self._create_pairs(self.historical_data.keys())
        # algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')

        # Calculate distance and then sort by distance
        distances = {}
        for i in self.symbol_pairs:
            distances[i] = DistanceApproachPair(
                i[0],
                i[1],
                self._normalize_data(self.historical_data[i[0]]),
                self._normalize_data(self.historical_data[i[1]])
            )
            # algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')

        sorted_pairs = {k:v for k,v in sorted(
            distances.items(),
            key = lambda x: (x[1].distance)
        )}

        # Updating the pair and Create insights
        insightExpiry = Expiry.EndOfDay(algorithm.Time)
        for k, v in sorted_pairs.items():

            invested_stock_symbols = {paired_key[0] for paired_key in self.invested_pairs.keys()} | {paired_key[1] for paired_key in self.invested_pairs.keys()}

            # Taking care of the delisting scenario
            if data.Delistings.ContainsKey(str(k[0])) or data.Delistings.ContainsKey(str(k[1])):
                self.invested_pairs[k] = PairStatus.Flat
                insights.append(
                    Insight.Price(
                        str(k[0]),
                        insightExpiry,
                        InsightDirection.Flat,
                        None, None, None, 0.05    # Weight
                    )
                )
                insights.append(
                    Insight.Price(
                        str(k[1]),
                        insightExpiry,
                        InsightDirection.Flat,
                        None, None, None, 0.05    # Weight
                    ),
                )
                # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
                del self.invested_pairs[k]
                del self.historical_data[k[0]]
                del self.historical_data[k[1]]
                continue

            if k[0] not in invested_stock_symbols and k[1] not in invested_stock_symbols:
                if len(self.invested_pairs) >= self.capacity:
                    # algorithm.Debug(f'Invested_pairs length = {len(self.invested_pairs)}')
                    continue
                if v.current_spread >= self.threshold * v.hist_dev:
                    self.invested_pairs[k] = PairStatus.Short

                    insights.append(
                        Insight.Price(
                            str(k[0]),
                            # timedelta(days=1),
                            insightExpiry,
                            InsightDirection.Down,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    insights.append(
                        Insight.Price(
                            str(k[1]),
                            insightExpiry,
                            InsightDirection.Up,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Short ({len(insights)})')
                elif v.current_spread <= -(self.threshold * v.hist_dev):
                    self.invested_pairs[k] = PairStatus.Long
                    insights.append(
                        Insight.Price(
                            str(k[0]),
                            insightExpiry,
                            InsightDirection.Up,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    insights.append(
                        Insight.Price(
                            str(k[1]),
                            insightExpiry,
                            InsightDirection.Down,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Long ({len(insights)})')
                else:
                    pass
            else:
                if k not in self.invested_pairs.keys():
                    continue

                if self.invested_pairs[k] == PairStatus.Long:
                    if v.current_spread < 0:
                        # self.invested_pairs[k] = PairStatus.Long
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[0]),
                        #         insightExpiry,
                        #         InsightDirection.Up,
                        #         None, None, None, 0.05    # Weight
                        #     )
                        # )
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[1]),
                        #         insightExpiry,
                        #         InsightDirection.Down,
                        #         None, None, None, 0.05    # Weight
                        #     ),
                        # )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Long ({len(insights)})')
                        pass
                    # elif v.current_spread >= self.threshold * v.hist_dev:
                    #     self.invested_pairs[k] = PairStatus.Short
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[0]),
                    #             insightExpiry,
                    #             InsightDirection.Down,
                    #             None, None, None, 0.05    # Weight
                    #         )
                    #     )
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[1]),
                    #             insightExpiry,
                    #             InsightDirection.Up,
                    #             None, None, None, 0.05    # Weight
                    #         ),
                    #     )
                    #     # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Short ({len(insights)})')
                    else:
                        self.invested_pairs[k] = PairStatus.Flat
                        insights.append(
                            Insight.Price(
                                str(k[0]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            )
                        )
                        insights.append(
                            Insight.Price(
                                str(k[1]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            ),
                        )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
                        del self.invested_pairs[k]
                        del self.historical_data[k[0]]
                        del self.historical_data[k[1]]
                elif self.invested_pairs[k] == PairStatus.Short:
                    if v.current_spread > 0:
                        # self.invested_pairs[k] = PairStatus.Short
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[0]),
                        #         insightExpiry,
                        #         InsightDirection.Down,
                        #         None, None, None, 0.05    # Weight
                        #     )
                        # )
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[1]),
                        #         insightExpiry,
                        #         InsightDirection.Up,
                        #         None, None, None, 0.05    # Weight
                        #     ),
                        # )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Short ({len(insights)})')
                        pass
                    # elif v.current_spread <= -(self.threshold * v.hist_dev):
                    #     self.invested_pairs[k] = PairStatus.Long
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[0]),
                    #             insightExpiry,
                    #             InsightDirection.Up,
                    #             None, None, None, 0.05    # Weight
                    #         )
                    #     )
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[1]),
                    #             insightExpiry,
                    #             InsightDirection.Down,
                    #             None, None, None, 0.05    # Weight
                    #         ),
                    #     )
                    #     # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Long ({len(insights)})')
                    else:
                        self.invested_pairs[k] = PairStatus.Flat
                        insights.append(
                            Insight.Price(
                                str(k[0]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            )
                        )
                        insights.append(
                            Insight.Price(
                                str(k[1]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            ),
                        )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Close ({len(insights)})')
                        del self.invested_pairs[k]
                        del self.historical_data[k[0]]
                        del self.historical_data[k[1]]

            # algorithm.Debug(f'{str(k[0])}/{str(k[1])}({v.distance}): {v.current_spread} - {self.threshold * v.hist_dev}')

        # Reset the changes
        self._changes = None

        # algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        self._changes = changes

    def _normalize_data(self, series):
        # Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
        arr = np.array([x for x in series.GetEnumerator()])
        max = arr[1:].max()
        min = arr[1:].min()
        return (arr - min) / (max - min)
# from clr import AddReference
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm.Framework")

from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)

class CustomEqualWeightingPortfolioConstructionModel1(PortfolioConstructionModel):

    '''
    Description:
        Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
    Details:
        - The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
        - For InsightDirection.Up, long targets are returned
        - For InsightDirection.Down, short targets are returned
        - For InsightDirection.Flat, closing position targets are returned
    '''

    def __init__(self, rebalancingParam = False):

        '''
        Description:
            Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
        Args:
            rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance)
                - Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction
        '''

        self.insightCollection = InsightCollection()
        self.removedSymbols = []
        self.nextExpiryTime = UTCMIN
        self.rebalancingTime = UTCMIN

        # if the rebalancing parameter is not False but a positive integer
        # convert rebalancingParam to timedelta and create rebalancingFunc
        if rebalancingParam > 0:
            self.rebalancing = True
            rebalancingParam = timedelta(days = rebalancingParam)
            self.rebalancingFunc = lambda dt: dt + rebalancingParam
        else:
            self.rebalancing = rebalancingParam

    def CreateTargets(self, algorithm, insights):

        '''
        Description:
            Create portfolio targets from the specified insights
        Args:
            algorithm: The algorithm instance
            insights: The insights to create portfolio targets from
        Returns:
            An enumerable of portfolio targets to be sent to the execution model
        '''

        targets = []

        # check if we have new insights coming from the alpha model or if some existing insights have expired
        # or if we have removed symbols from the universe
        if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
            return targets

        # here we get the new insights and add them to our insight collection
        for insight in insights:
            self.insightCollection.Add(insight)

        # ! Test: to be removed as I don't manage the portfolio by removedSymbols??
        # create flatten target for each security that was removed from the universe
        # if self.removedSymbols is not None:
        #     universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
        #     targets.extend(universeDeselectionTargets)
        #     self.removedSymbols = None

        # get insight that haven't expired of each symbol that is still in the universe
        activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)

        # get the last generated active insight for each symbol
        lastActiveInsights = []
        for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
            lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])

        # determine target percent for the given insights (check function DetermineTargetPercent for details)
        percents = self.DetermineTargetPercent(lastActiveInsights)

        errorSymbols = {}
        # check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details)
        if self.ShouldCreateTargets(algorithm, lastActiveInsights):
            for insight in lastActiveInsights:
                target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
                if not target is None:
                    targets.append(target)
                else:
                    errorSymbols[insight.Symbol] = insight.Symbol

            # update rebalancing time
            if self.rebalancing:
                self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)

        # ! Test: We don't need to create flatten targets by the expired signals
        # get expired insights and create flatten targets for each symbol
        # expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)

        # expiredTargets = []
        # for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
        #     if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
        #         expiredTargets.append(PortfolioTarget(symbol, 0))
        #         continue

        # targets.extend(expiredTargets)

        # here we update the next expiry date in the insight collection
        self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
        if self.nextExpiryTime is None:
            self.nextExpiryTime = UTCMIN

        return targets

    def DetermineTargetPercent(self, lastActiveInsights):

        '''
        Description:
            Determine the target percent from each insight
        Args:
            lastActiveInsights: The active insights to generate a target from
        '''

        result = {}

        # give equal weighting to each security
        count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
        percent = 0 if count == 0 else 1.0 / count

        for insight in lastActiveInsights:
            result[insight] = insight.Direction * percent

        return result

    def ShouldCreateTargets(self, algorithm, lastActiveInsights):

        '''
        Description:
            Determine whether we should rebalance the portfolio to keep equal weighting when:
                - It is time to rebalance regardless
                - We want to include some new security in the portfolio
                - We want to modify the direction of some existing security
        Args:
            lastActiveInsights: The last active insights to check
        '''

        # it is time to rebalance
        # TODO Check out what is the time to rebalance
        if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime:
            return True

        # TODO Adjust the timing when to rebalance
        for insight in lastActiveInsights:
            # if there is an insight for a new security that's not invested, then rebalance
            if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
                return True
            # if there is an insight to close a long position, then rebalance
            elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
                return True
            # if there is an insight to close a short position, then rebalance
            elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
                return True
            else:
                continue

        return False

    def OnSecuritiesChanged(self, algorithm, changes):

        '''
        Description:
            Event fired each time the we add/remove securities from the data feed
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm
        '''

        # get removed symbol and invalidate them in the insight collection
        # TODO Do we need to remove this?
        self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
        self.insightCollection.Clear(self.removedSymbols)
from SP500Universe import QC500UniverseSelectionModel
from BasicDistanceApproachAlphaModel import BasicDistanceApproachAlphaModel
from DistanceApproachZeroCrossingAlphaModel import DistanceApproachZeroCrossingAlphaModel
from DistanceApproachHighVarianceAlphaModel import DistanceApproachHighVarianceAlphaModel
from DistanceApproachPearsonCorrelationAlphaModel import DistanceApproachPearsonCorrelationAlphaModel
from DistanceApproachPearsonCorrelationAlphaModel2 import DistanceApproachPearsonCorrelationAlphaModel2
from CustomEqualWeightingPortfolioConstructionModel1 import CustomEqualWeightingPortfolioConstructionModel1
from CustomEqualWeightingPortfolioConstructionModel2 import CustomEqualWeightingPortfolioConstructionModel2, ConstructionMethod

class PairsTradingAlgorithm(QCAlgorithm):

    def Initialize(self):
        # Official test
        self.SetStartDate(2019,1,5)
        self.SetEndDate(2021,9,16)
        # Debug test
        # self.SetStartDate(2021,1,6)
        # self.SetEndDate(2021,4,23)
        self.SetCash(200000)
        self.capacity = 20
        # self.spy = self.SetBenchmark(
        #     self.AddEquity(
        #         'SPY', Resolution.Daily
        #     ).Symbol
        # )

        # * Not implemented
        # SetDataNormalizationMode(???)

        self.SetUniverseSelection(QC500UniverseSelectionModel(numOfUniverse=500))
        self.UniverseSettings.Resolution = Resolution.Daily
        # self.AddAlpha(BasicDistanceApproachAlphaModel(capacity=self.capacity))
        # self.AddAlpha(DistanceApproachHighVarianceAlphaModel(capacity=self.capacity))
        self.AddAlpha(DistanceApproachZeroCrossingAlphaModel(capacity=self.capacity))
        # self.AddAlpha(DistanceApproachPearsonCorrelationAlphaModel(capacity=self.capacity))
        # TODO Working on it
        # self.AddAlpha(DistanceApproachPearsonCorrelationAlphaModel2(capacity=self.capacity))

        # self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
        # self.SetPortfolioConstruction(CustomEqualWeightingPortfolioConstructionModel1())
        self.SetPortfolioConstruction(
            CustomEqualWeightingPortfolioConstructionModel2
            (
                capacity=self.capacity,
                method=ConstructionMethod.MARKET_NEUTRAL
                # method=ConstructionMethod.LONG_ONLY
            )
        )

        # * Not implemented
        # self.SetExecution(ImmediateExecutionModel())

    def OnData(self, data):
        # Update the price series everyday
        # self.Log(f'Today onData() is {self.Time}')
        # self.Log(data.keys()[300].Value)
        pass

    ###################################################################
    # Log the end of day prices and plot the diagram
    def OnEndOfDay(self, symbol):
        self.Plot('AvailCash', 'Cash', self.Portfolio.Cash)
        self.Plot('AvailCash', 'Portfolio', self.Portfolio.TotalPortfolioValue)
        self.Plot('HeldPositions', "Positions", len([x.Key for x in self.Portfolio if x.Value.Invested]))
from AlgorithmImports import *
import itertools as it
import numpy as np
import pandas as pd
from enum import Enum
from sklearn import linear_model


class PairStatus(Enum):
    '''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
    Short = -1  # Status to keep shorting
    Flat = 0    # Status to close position
    Long = 1    # Status to keep Long

class DistanceApproachPearsonCorrelationPair:
    threshold = None
    def __init__(
        self,
        symbol_a:str, symbol_b:str,
        price_a, price_b,
        threshold=1.5
        ):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = np.array(price_a)
        self.price_b = np.array(price_b)
        self.threshold = threshold
        self.distance = self._pearson_coef()

    def _pearson_coef(self):
        # To get the exact correlation from the corr matrix
        return_a = np.diff(np.log(self.price_a), axis=0).flatten()
        return_b = np.diff(np.log(self.price_b), axis=0).flatten()
        coef = np.corrcoef(return_a, return_b)[0][1]
        return coef

class DistanceApproachPearsonCorrelationAlphaModel2(AlphaModel):
    def __init__(
        self,
        capacity = 20,
        resolution = Resolution.Daily
    ):
        self.resolution = resolution
        self.symbol_pairs = []
        self.formation_period = 22 * 12
        self.threshold = 1.5
        self.historical_data = dict()
        self.invested_long_list = list()
        self.invested_short_list = list()
        self._changes = None
        self.capacity = capacity
        self.lastChange = None

        # resolutionString = Extensions.GetEnumString(resolution, Resolution)
        # self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)

    def _create_pairs(self, symbols):
        # Creating the pairs
        return list(it.combinations(symbols, 2))

    def Update(self, algorithm, data):

        algorithm.Debug(f'Time is {algorithm.Time}')
        insights = []
        if self.lastChange == algorithm.Time.month:
            return insights
        self.lastChange = algorithm.Time.month

        if self._changes is not None:
            for added in self._changes.AddedSecurities:
                # * Dealing with new added securities, need to
                # * 1. Create the RollingWindow
                # * 2. Add the data into the RollingWindow (Up to today)

                if added.Symbol not in self.historical_data.keys():
                    hist = algorithm.History(
                        [added.Symbol],
                        self.formation_period,
                        Resolution.Daily
                    )
                    if not hist.empty:
                        self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
                        for bar in hist.loc[added.Symbol, :].itertuples():
                            self.historical_data[added.Symbol].Add(bar.close)
                        if not self.historical_data[added.Symbol].IsReady:
                            del self.historical_data[added.Symbol]
                        # algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
                    else:
                        algorithm.Debug('ERROR: Hist is empty')

            for removed in self._changes.RemovedSecurities:
                # * Dealing with securities removed, need to
                # * 1. Check if the removed securities are in the invested list
                # * 2. If yes, keep add the latest data into the RollingWindow
                # * 3. If not, then we remove this RollingWindow
                invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
                if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
                    # algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
                    del self.historical_data[removed.Symbol]

        for s in self.historical_data.keys():
            # * Dealing with already existed securities, need to
            # * 1. Add the latest data into the RollingWindow
            # ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
            if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
                continue

            hist = algorithm.History(
                [s],
                5,
                Resolution.Daily
            )

            # Somehow there are 'close' missing in the returned history.
            # Therefore adding this logic to prevent this scenario crash
            if 'close' in hist.columns:
                self.historical_data[s].Add(hist['close'][-1])
            else:
                self.historical_data[s].Add(self.historical_data[s][0])

            # algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
            # algorithm.Debug(self.historical_data[s][0])
            # algorithm.Debug(self.historical_data[s][1])
            # algorithm.Debug(self.historical_data[s][2])

        self.symbol_pairs = self._create_pairs(self.historical_data.keys())
        # algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')

        # Calculate distance and then sort by distance
        distances = {}
        for i in self.symbol_pairs:
            distances[i] = DistanceApproachPearsonCorrelationPair(
                i[0],
                i[1],
                self._normalize_data(self.historical_data[i[0]]),
                self._normalize_data(self.historical_data[i[1]])
            )
            # algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')

        sorted_pairs = {k:v for k,v in sorted(
            distances.items(),
            key = lambda x: (x[1].distance)
        )}

        # For each security, we add the highest 50 correlated securities to the list
        pearson_pair_table = dict()

        for _, pair in sorted_pairs.items():
            if pair.symbol_a not in pearson_pair_table.keys():
                pearson_pair_table[pair.symbol_a] = [pair.symbol_b]
            else:
                if len(pearson_pair_table[pair.symbol_a]) < 50:
                    pearson_pair_table[pair.symbol_a].append(pair.symbol_b)
            if pair.symbol_b not in pearson_pair_table.keys():
                pearson_pair_table[pair.symbol_b] = [pair.symbol_a]
            else:
                if len(pearson_pair_table[pair.symbol_b]) < 50:
                    pearson_pair_table[pair.symbol_b].append(pair.symbol_a)


        self.price_dataframe = pd.concat(
            [self._dataframelize(self.historical_data[symbol], symbol) for symbol in self.historical_data.keys()],
            axis=1
        )

        divergence = dict()
        divergence_MA22 = dict()
        for stock, benchmark_portfolio in pearson_pair_table.items():
            p = self.price_dataframe[stock]
            stock_rtn = np.log(p.pct_change()+1)
            # Equal weight to construct this benchmark portfolio return
            benchmark_rtn = (self.price_dataframe[benchmark_portfolio].apply(lambda x: np.log(x.pct_change()+1), axis=0).sum(axis=1)/len(benchmark_portfolio))

            regr = linear_model.LinearRegression()
            # a stock’s return, Lret, deviates from its pairs portfolioreturns, Cret.
            # RetDiff = beta ∗ (Cret-Rf) − (Lret−Rf)
            cret = np.array(benchmark_rtn).copy()
            lret = np.array(stock_rtn).reshape((-1,1)).copy()
            # Remove last cell which is na
            cret = cret[1:]
            lret = lret[1:]
            cret_constant = np.array([list(x) for x in zip(cret, [1]*len(cret))])
            regr.fit(cret_constant, lret)
            beta = regr.coef_[0][0]
            divergence[stock] = beta * benchmark_rtn.iloc[-1] - stock_rtn.iloc[-1]
            divergence_MA22[stock] = (beta * benchmark_rtn - stock_rtn).ewm(span=22, adjust=False).mean().iloc[-1]

        # Cleaning the pair and Create insights
        divergence = {k:v for k, v in sorted(
            divergence_MA22.items(),
            # divergence.items(),
            key = lambda item: item[1]
        )}

        # Start submitting the singal
        insightExpiry = Expiry.EndOfDay(algorithm.Time)
        new_long_list = list(divergence.keys())[:self.capacity]
        new_short_list = list(divergence.keys())[-self.capacity:]
        flat_list = [s for s in self.invested_long_list if s not in new_long_list] + [s for s in self.invested_short_list if s not in new_short_list]
        self.invested_long_list = new_long_list
        self.invested_short_list = new_short_list

        for symbol in flat_list:
            insights.append(
                Insight.Price(
                    str(symbol),
                    insightExpiry,
                    InsightDirection.Flat,
                    None, None, None, 0.05
                )
            )

        for symbol in self.invested_short_list:
            insights.append(
                Insight.Price(
                    str(symbol),
                    insightExpiry,
                    InsightDirection.Down,
                    None, None, None, 0.05
                )
            )

        for symbol in self.invested_long_list:
            insights.append(
                Insight.Price(
                    str(symbol),
                    insightExpiry,
                    InsightDirection.Up,
                    None, None, None, 0.05
                )
            )

        # Reset the changes
        self._changes = None

        # algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
        return insights


    def OnSecuritiesChanged(self, algorithm, changes):
        self._changes = changes

    def _normalize_data(self, series):
        # Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
        arr = np.array([x for x in series.GetEnumerator()])
        max = arr[1:].max()
        min = arr[1:].min()
        return (arr - min) / (max - min)

    def _dataframelize(self, price_rolling_window, symbol):
        l = [n for n in price_rolling_window]
        l.reverse()
        df = pd.DataFrame(l, columns=[symbol])
        return df
# from clr import AddReference
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm.Framework")

from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)

class CustomEqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):

    '''
    Description:
        Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
    Details:
        - The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
        - For InsightDirection.Up, long targets are returned
        - For InsightDirection.Down, short targets are returned
        - For InsightDirection.Flat, closing position targets are returned
    '''

    def __init__(self, rebalancingParam = False):

        '''
        Description:
            Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
        Args:
            rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance)
                - Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction
        '''

        self.insightCollection = InsightCollection()
        self.removedSymbols = []
        self.nextExpiryTime = UTCMIN
        self.rebalancingTime = UTCMIN

        # if the rebalancing parameter is not False but a positive integer
        # convert rebalancingParam to timedelta and create rebalancingFunc
        if rebalancingParam > 0:
            self.rebalancing = True
            rebalancingParam = timedelta(days = rebalancingParam)
            self.rebalancingFunc = lambda dt: dt + rebalancingParam
        else:
            self.rebalancing = rebalancingParam

    def CreateTargets(self, algorithm, insights):

        '''
        Description:
            Create portfolio targets from the specified insights
        Args:
            algorithm: The algorithm instance
            insights: The insights to create portfolio targets from
        Returns:
            An enumerable of portfolio targets to be sent to the execution model
        '''

        targets = []

        # check if we have new insights coming from the alpha model or if some existing insights have expired
        # or if we have removed symbols from the universe
        if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
            return targets

        # here we get the new insights and add them to our insight collection
        for insight in insights:
            self.insightCollection.Add(insight)

        # create flatten target for each security that was removed from the universe
        if self.removedSymbols is not None:
            universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
            targets.extend(universeDeselectionTargets)
            self.removedSymbols = None

        # get insight that haven't expired of each symbol that is still in the universe
        activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)

        # get the last generated active insight for each symbol
        lastActiveInsights = []
        for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
            lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])

        # determine target percent for the given insights (check function DetermineTargetPercent for details)
        percents = self.DetermineTargetPercent(lastActiveInsights)

        errorSymbols = {}
        # check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details)
        if self.ShouldCreateTargets(algorithm, lastActiveInsights):
            for insight in lastActiveInsights:
                target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
                if not target is None:
                    targets.append(target)
                else:
                    errorSymbols[insight.Symbol] = insight.Symbol

            # update rebalancing time
            if self.rebalancing:
                self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)

        # get expired insights and create flatten targets for each symbol
        expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)

        expiredTargets = []
        for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
            if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
                expiredTargets.append(PortfolioTarget(symbol, 0))
                continue

        targets.extend(expiredTargets)

        # here we update the next expiry date in the insight collection
        self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
        if self.nextExpiryTime is None:
            self.nextExpiryTime = UTCMIN

        return targets

    def DetermineTargetPercent(self, lastActiveInsights):

        '''
        Description:
            Determine the target percent from each insight
        Args:
            lastActiveInsights: The active insights to generate a target from
        '''

        result = {}

        # give equal weighting to each security
        count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
        percent = 0 if count == 0 else 1.0 / count

        for insight in lastActiveInsights:
            result[insight] = insight.Direction * percent

        return result

    def ShouldCreateTargets(self, algorithm, lastActiveInsights):

        '''
        Description:
            Determine whether we should rebalance the portfolio to keep equal weighting when:
                - It is time to rebalance regardless
                - We want to include some new security in the portfolio
                - We want to modify the direction of some existing security
        Args:
            lastActiveInsights: The last active insights to check
        '''

        # it is time to rebalance
        if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime:
            return True

        for insight in lastActiveInsights:
            # if there is an insight for a new security that's not invested, then rebalance
            if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
                return True
            # if there is an insight to close a long position, then rebalance
            elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
                return True
            # if there is an insight to close a short position, then rebalance
            elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
                return True
            else:
                continue

        return False

    def OnSecuritiesChanged(self, algorithm, changes):

        '''
        Description:
            Event fired each time the we add/remove securities from the data feed
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm
        '''

        # get removed symbol and invalidate them in the insight collection
        self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
        self.insightCollection.Clear(self.removedSymbols)
from AlgorithmImports import *
import itertools as it
import numpy as np
from enum import Enum

# * 1. Variants of how to rank the distance + high variance pair
# * 1.1. Should we use IR factor score
# * 1.2. or use linear regression?

class PairStatus(Enum):
    '''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
    Short = -1  # Status to keep shorting
    Flat = 0    # Status to close position
    Long = 1    # Status to keep Long

class DistanceApproachZeroCrossingPair:
    threshold = None
    def __init__(
        self,
        symbol_a:str, symbol_b:str,
        price_a, price_b,
        threshold=1.5
        ):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.price_a = np.array(price_a)
        self.price_b = np.array(price_b)
        self.threshold = threshold
        self.distance = self._distance()
        self.num_of_crossing = self._zeroCrossing()
        self.hist_dev = self._dev()
        self.current_spread = self._spread()

    def _distance(self):
        # calculate the sum of squared deviations between two normalized price series
        return np.square(self.price_a - self.price_b).sum()

    def _zeroCrossing(self):
        def rolling_window(a, window):
            shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
            strides = a.strides + (a.strides[-1],)
            return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
        spread = self.price_a - self.price_b
        num_of_zero_crossing = 0
        for x in rolling_window(spread, 2):
            if (x[0] * x[1]) < 0:
                num_of_zero_crossing += 1
        return num_of_zero_crossing

    def _dev(self):
        spread = self.price_a - self.price_b
        mean = np.mean(spread)
        dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
        return dev

    def _spread(self):
        spread = self.price_a[0] - self.price_b[0]
        return spread

class DistanceApproachZeroCrossingAlphaModel(AlphaModel):
    def __init__(
        self,
        capacity = 20,
        resolution = Resolution.Daily
    ):
        self.resolution = resolution
        self.symbol_pairs = []
        self.formation_period = 22 * 12
        self.threshold = 1.5
        self.max_pair_percentage = 0.3
        self.historical_data = dict()
        self.invested_pairs = dict()
        self._changes = None
        self.capacity = capacity

        # resolutionString = Extensions.GetEnumString(resolution, Resolution)
        # self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)

    def _create_pairs(self, symbols):
        # Creating the pairs
        return list(it.combinations(symbols, 2))

    def Update(self, algorithm, data):
        # * Example of launching the update on the exact date time
        # if algorithm.Time.hour == 9 and algorithm.Time.minute == 31:
        #     return [Insight.Price("SPY", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.5)]
        # return []

        algorithm.Debug(f'Time is {algorithm.Time}')

        insights = []

        if self._changes is not None:
            for added in self._changes.AddedSecurities:
                # * Dealing with new added securities, need to
                # * 1. Create the RollingWindow
                # * 2. Add the data into the RollingWindow (Up to today)

                if added.Symbol not in self.historical_data.keys():
                    hist = algorithm.History(
                        [added.Symbol],
                        self.formation_period,
                        Resolution.Daily
                    )
                    if not hist.empty:
                        self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
                        for bar in hist.loc[added.Symbol, :].itertuples():
                            self.historical_data[added.Symbol].Add(bar.close)
                        if not self.historical_data[added.Symbol].IsReady:
                            del self.historical_data[added.Symbol]
                        # algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
                    else:
                        algorithm.Debug('ERROR: Hist is empty')

            for removed in self._changes.RemovedSecurities:
                # * Dealing with securities removed, need to
                # * 1. Check if the removed securities are in the invested list
                # * 2. If yes, keep add the latest data into the RollingWindow
                # * 3. If not, then we remove this RollingWindow
                invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
                if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
                    # algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
                    del self.historical_data[removed.Symbol]

        for s in self.historical_data.keys():
            # * Dealing with already existed securities, need to
            # * 1. Add the latest data into the RollingWindow
            # ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
            if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
                continue

            hist = algorithm.History(
                [s],
                5,
                Resolution.Daily
            )

            # Somehow there are 'close' missing in the returned history.
            # Therefore adding this logic to prevent this scenario crash
            if 'close' in hist.columns:
                self.historical_data[s].Add(hist['close'][-1])
            else:
                self.historical_data[s].Add(self.historical_data[s][0])

            # algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
            # algorithm.Debug(self.historical_data[s][0])
            # algorithm.Debug(self.historical_data[s][1])
            # algorithm.Debug(self.historical_data[s][2])

        self.symbol_pairs = self._create_pairs(self.historical_data.keys())
        # algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')

        # Calculate distance and then sort by distance
        distances = {}
        for i in self.symbol_pairs:
            distances[i] = DistanceApproachZeroCrossingPair(
                i[0],
                i[1],
                self._normalize_data(self.historical_data[i[0]]),
                self._normalize_data(self.historical_data[i[1]])
            )
            # algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')

        sorted_pairs = {k:v for k, v in sorted(
            distances.items(),
            key = lambda x: (x[1].distance)
        )[:int(len(distances) * self.max_pair_percentage)]}

        sorted_pairs = {k:v for k, v in sorted(
            sorted_pairs.items(),
            key = lambda x: x[1].num_of_crossing,
            reverse = True
        )}

        # Cleaning the pair and Create insights
        insightExpiry = Expiry.EndOfDay(algorithm.Time)
        for k, v in sorted_pairs.items():

            invested_stock_symbols = {paired_key[0] for paired_key in self.invested_pairs.keys()} | {paired_key[1] for paired_key in self.invested_pairs.keys()}

            # Taking care of the delisting scenario
            if data.Delistings.ContainsKey(str(k[0])) or data.Delistings.ContainsKey(str(k[1])):
                self.invested_pairs[k] = PairStatus.Flat
                insights.append(
                    Insight.Price(
                        str(k[0]),
                        insightExpiry,
                        InsightDirection.Flat,
                        None, None, None, 0.05    # Weight
                    )
                )
                insights.append(
                    Insight.Price(
                        str(k[1]),
                        insightExpiry,
                        InsightDirection.Flat,
                        None, None, None, 0.05    # Weight
                    ),
                )
                # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
                del self.invested_pairs[k]
                del self.historical_data[k[0]]
                del self.historical_data[k[1]]
                continue

            if k[0] not in invested_stock_symbols and k[1] not in invested_stock_symbols:
                if len(self.invested_pairs) >= self.capacity:
                    # algorithm.Debug(f'Invested_pairs length = {len(self.invested_pairs)}')
                    continue
                if v.current_spread >= self.threshold * v.hist_dev:
                    self.invested_pairs[k] = PairStatus.Short

                    insights.append(
                        Insight.Price(
                            str(k[0]),
                            # timedelta(days=1),
                            insightExpiry,
                            InsightDirection.Down,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    insights.append(
                        Insight.Price(
                            str(k[1]),
                            insightExpiry,
                            InsightDirection.Up,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Short ({len(insights)})')
                elif v.current_spread <= -(self.threshold * v.hist_dev):
                    self.invested_pairs[k] = PairStatus.Long
                    insights.append(
                        Insight.Price(
                            str(k[0]),
                            insightExpiry,
                            InsightDirection.Up,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    insights.append(
                        Insight.Price(
                            str(k[1]),
                            insightExpiry,
                            InsightDirection.Down,
                            None, None, None, 0.05    # Weight
                        )
                    )
                    # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Long ({len(insights)})')
                else:
                    pass
            else:
                if k not in self.invested_pairs.keys():
                    continue

                if self.invested_pairs[k] == PairStatus.Long:
                    if v.current_spread < 0:
                        # self.invested_pairs[k] = PairStatus.Long
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[0]),
                        #         insightExpiry,
                        #         InsightDirection.Up,
                        #         None, None, None, 0.05    # Weight
                        #     )
                        # )
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[1]),
                        #         insightExpiry,
                        #         InsightDirection.Down,
                        #         None, None, None, 0.05    # Weight
                        #     ),
                        # )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Long ({len(insights)})')
                        pass
                    # elif v.current_spread >= self.threshold * v.hist_dev:
                    #     self.invested_pairs[k] = PairStatus.Short
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[0]),
                    #             insightExpiry,
                    #             InsightDirection.Down,
                    #             None, None, None, 0.05    # Weight
                    #         )
                    #     )
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[1]),
                    #             insightExpiry,
                    #             InsightDirection.Up,
                    #             None, None, None, 0.05    # Weight
                    #         ),
                    #     )
                    #     # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Short ({len(insights)})')
                    else:
                        self.invested_pairs[k] = PairStatus.Flat
                        insights.append(
                            Insight.Price(
                                str(k[0]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            )
                        )
                        insights.append(
                            Insight.Price(
                                str(k[1]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            ),
                        )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
                        del self.invested_pairs[k]
                        del self.historical_data[k[0]]
                        del self.historical_data[k[1]]
                elif self.invested_pairs[k] == PairStatus.Short:
                    if v.current_spread > 0:
                        # self.invested_pairs[k] = PairStatus.Short
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[0]),
                        #         insightExpiry,
                        #         InsightDirection.Down,
                        #         None, None, None, 0.05    # Weight
                        #     )
                        # )
                        # insights.append(
                        #     Insight.Price(
                        #         str(k[1]),
                        #         insightExpiry,
                        #         InsightDirection.Up,
                        #         None, None, None, 0.05    # Weight
                        #     ),
                        # )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Short ({len(insights)})')
                        pass
                    # elif v.current_spread <= -(self.threshold * v.hist_dev):
                    #     self.invested_pairs[k] = PairStatus.Long
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[0]),
                    #             insightExpiry,
                    #             InsightDirection.Up,
                    #             None, None, None, 0.05    # Weight
                    #         )
                    #     )
                    #     insights.append(
                    #         Insight.Price(
                    #             str(k[1]),
                    #             insightExpiry,
                    #             InsightDirection.Down,
                    #             None, None, None, 0.05    # Weight
                    #         ),
                    #     )
                    #     # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Long ({len(insights)})')
                    else:
                        self.invested_pairs[k] = PairStatus.Flat
                        insights.append(
                            Insight.Price(
                                str(k[0]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            )
                        )
                        insights.append(
                            Insight.Price(
                                str(k[1]),
                                insightExpiry,
                                InsightDirection.Flat,
                                None, None, None, 0.05    # Weight
                            ),
                        )
                        # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Close ({len(insights)})')
                        del self.invested_pairs[k]
                        del self.historical_data[k[0]]
                        del self.historical_data[k[1]]

            # algorithm.Debug(f'{str(k[0])}/{str(k[1])}({v.distance}): {v.current_spread} - {self.threshold * v.hist_dev}')

        # Reset the changes
        self._changes = None

        # algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        self._changes = changes

    def _normalize_data(self, series):
        # Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
        arr = np.array([x for x in series.GetEnumerator()])
        max = arr[1:].max()
        min = arr[1:].min()
        return (arr - min) / (max - min)
# from clr import AddReference
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm.Framework")

from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
from enum import Enum

class ConstructionMethod(Enum):
    MARKET_NEUTRAL = 1
    LONG_ONLY = 2

class CustomEqualWeightingPortfolioConstructionModel2(PortfolioConstructionModel):

    '''
    Description:
        Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
    Details:
        - The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
        - For InsightDirection.Up, long targets are returned
        - For InsightDirection.Down, short targets are returned
        - For InsightDirection.Flat, closing position targets are returned
    '''

    def __init__(
        self,
        # rebalancingParam = False
        capacity = 10,
        # method = ConstructionMethod.LONG_ONLY,
        method = ConstructionMethod.MARKET_NEUTRAL
        ):

        '''
        Description:
            Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
        Args:
        '''

        self.insightCollection = InsightCollection()
        self.removedSymbols = []
        self.nextExpiryTime = UTCMIN
        self.construction_method = method
        self.capacity = capacity

    def CreateTargets(self, algorithm, insights):

        '''
        Description:
            Create portfolio targets from the specified insights
        Args:
            algorithm: The algorithm instance
            insights: The insights to create portfolio targets from
        Returns:
            An enumerable of portfolio targets to be sent to the execution model
        '''

        targets = []

        # check if we have new insights coming from the alpha model or if some existing insights have expired
        # or if we have removed symbols from the universe
        if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
            return targets

        # here we get the new insights and add them to our insight collection
        for insight in insights:
            self.insightCollection.Add(insight)

        # get insight that haven't expired of each symbol that is still in the universe
        activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)

        # get the last generated active insight for each symbol
        lastActiveInsights = []
        for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
            lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])

        # determine target percent for the given insights
        percents = self.DetermineTargetPercent(lastActiveInsights, self.capacity)

        errorSymbols = {}
        # check if we actually want to create new targets for the securities
        for insight in lastActiveInsights:
            if self.ShouldCreateTargets(algorithm, insight):
                target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
                if not target is None:
                    targets.append(target)
                else:
                    errorSymbols[insight.Symbol] = insight.Symbol

        # here we update the next expiry date in the insight collection
        self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
        if self.nextExpiryTime is None:
            self.nextExpiryTime = UTCMIN

        return targets

    def DetermineTargetPercent(self, lastActiveInsights, capacity):

        '''
        Description:
            Determine the target percent from each insight
        Args:
            lastActiveInsights: The active insights to generate a target from
        '''

        result = {}

        # give equal weighting to each security
        # ! Don't use insight! Need to find another fix number to replace this, such as `capacity`
        if self.construction_method == ConstructionMethod.MARKET_NEUTRAL:
            count = capacity * 2
        #     count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
        elif self.construction_method == ConstructionMethod.LONG_ONLY:
            count = capacity
        #     count = sum(x.Direction == InsightDirection.Up for x in lastActiveInsights)
        percent = 0 if count == 0 else 1.0 / count

        for insight in lastActiveInsights:
            result[insight] = insight.Direction * percent

        return result

    def ShouldCreateTargets(self, algorithm, insight):
        # if there is an insight for a new security that's not invested, then rebalance
        if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
            if self.construction_method == ConstructionMethod.MARKET_NEUTRAL:
                pass
            elif self.construction_method == ConstructionMethod.LONG_ONLY and insight.Direction == InsightDirection.Down:
                return False
            return True
        # if there is an insight to close a long position, then rebalance
        elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
            return True
        # if there is an insight to close a short position, then rebalance
        elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
            return True

        return False

    def OnSecuritiesChanged(self, algorithm, changes):

        # get removed symbol and invalidate them in the insight collection
        pass