Overall Statistics
Total Orders
10334
Average Win
0.05%
Average Loss
-0.05%
Compounding Annual Return
-2.849%
Drawdown
16.600%
Expectancy
-0.056
Start Equity
100000
End Equity
86539.23
Net Profit
-13.461%
Sharpe Ratio
-1.557
Sortino Ratio
-1.89
Probabilistic Sharpe Ratio
0.002%
Loss Rate
54%
Win Rate
46%
Profit-Loss Ratio
1.06
Alpha
-0.047
Beta
-0.142
Annual Standard Deviation
0.037
Annual Variance
0.001
Information Ratio
-0.772
Tracking Error
0.164
Treynor Ratio
0.403
Total Fees
$10366.27
Estimated Strategy Capacity
$180000000.00
Lowest Capacity Asset
SST V2245V5VOQQT
Portfolio Turnover
11.59%
Drawdown Recovery
3
from AlgorithmImports import *
from scipy.stats import norm, uniform


class HeadAndShouldersPattern:

    def __init__(self, sequence, window_size, max_lookback, step_size):
        self._sequence = np.array(sequence)
        self._window_size = window_size
        self._max_lookback = max_lookback
        self._step_size = step_size
        
        # Create pattern references
        if not hasattr(HeadAndShouldersPattern, "ref"):
            np.random.seed(1)
            ref_count = 100
            v1 = np.array([0] * ref_count) + 0.02 * norm.rvs(size=(ref_count, ))
            p1 = np.array([1] * ref_count) + 0.2 * norm.rvs(size=(ref_count, ))
            v2 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
            v3 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
            p3 = p1 + 0.02 * norm.rvs(size=(ref_count, ))
            p2 = 1.5 * np.maximum(p1, p3) + abs(uniform.rvs(size=(ref_count, )))
            v4 = v1 + 0.02 * norm.rvs(size=(ref_count, ))
            ref = np.array([
                v1, 
                (v1*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v1+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v1*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                p1, 
                (v2*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                v2, 
                (v2*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                p2, 
                (v3*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                v3, 
                (v3*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                p3, 
                (v4*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v4+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v4*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                v4
            ])
            HeadAndShouldersPattern.ref = ((ref - ref.mean(axis=1, keepdims=True)) / ref.std(axis=1, keepdims=True)).T

        # Warm up the factor values
        self._rows = HeadAndShouldersPattern.ref.shape[0]
        self._scan()

    
    def update(self, price):
        # Update the trailing window
        self._sequence = np.append(self._sequence, price)[-self._max_lookback:]
        
        # Update the factor values
        self._scan()

    def _scan(self):
        self.corr = 0
        self.similarity_score = 0
        
        # Select varying lengths of trailing windows
        for i in range(self._window_size, self._max_lookback, self._step_size):
            # Check if enough history to fill the trailing window
            if len(self._sequence[-i:]) != i:
                break 

            # Select the trailing data and downsample it to 25 data points
            sub_sequence = self._downsample(self._sequence[-i:], self._window_size)
            
            # Normalize the data in the trailing window
            sub_sequence_std = np.std(sub_sequence)
            if sub_sequence_std == 0:
                continue
            norm_sub_sequence = (sub_sequence - np.mean(sub_sequence)) / sub_sequence_std
            
            # Evaluate the pattern presence
            # Calculate correlation and similarity scores for each reference pattern
            corr_scores = np.ndarray(shape=(self._rows))
            similarity_scores = np.ndarray(shape=(self._rows))
            for j in range(self._rows):
                score, similarity = self._matching(norm_sub_sequence, HeadAndShouldersPattern.ref[j, :])
                corr_scores[j] = score
                similarity_scores[j] = similarity

            # Aggregate the results to produce a single value for each factor
            self.corr = max(self.corr, np.mean(corr_scores))
            self.similarity_score = max(self.similarity_score, np.mean(similarity_scores))

    def _downsample(self, data, target_length):
        factor = len(data) // target_length
        return [np.mean(data[i*factor:(i+1)*factor]) for i in range(target_length)]

    def _matching(self, sub_sequence, ref_pattern):
        correlation = np.corrcoef(sub_sequence, ref_pattern)[0, 1]
        similarity = np.sum((sub_sequence - ref_pattern) ** 2)
        return correlation, similarity
# region imports
from AlgorithmImports import *
from head_and_shoulders import HeadAndShouldersPattern
# endregion

class HeadAndShouldersPatternDetectionAlgorithm(QCAlgorithm):

    _trades = []
    _new_symbols = []
    _pattern_by_symbol = {}

    def initialize(self):
        self.set_start_date(self.end_date - timedelta(5*365))
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        # Define the pattern detection settings.
        self._window_size = self.get_parameter("window_size", 25)
        self._max_lookback = self.get_parameter("max_lookback", 50)
        self._step_size = self.get_parameter("step_size", 10) # lookback jumps from window_size -> window_size+step_size -> window_size+2*step_size ... -> max_lookback
        # Define the universe settings.
        self._liquidity_filter_size = self.get_parameter("liquidity_filter_size", 500)
        self._universe_size = self.get_parameter("universe_size", 5)
        # Define the trade settings.
        self._hold_duration = timedelta(self.get_parameter("hold_duration", 3))
        # Add a universe of US Equities.
        self.universe_settings.resolution = Resolution.DAILY
        self.add_universe(self._select_assets)

    def _select_assets(self, fundamentals) -> List[Symbol]:
        filtered = sorted(fundamentals, key=lambda f: f.dollar_volume, reverse=True)[:self._liquidity_filter_size]
        # Create pattern detection objects for new securities.
        filtered_symbols = [f.symbol for f in filtered]
        new_symbols = [symbol for symbol in filtered_symbols if symbol not in self._pattern_by_symbol]
        if new_symbols:
            history = self.history(new_symbols, self._max_lookback, Resolution.DAILY)
            for symbol in new_symbols:
                self._pattern_by_symbol[symbol] = HeadAndShouldersPattern(
                    history.loc[symbol]['close'].values if symbol in history.index else np.array([]), 
                    self._window_size, 
                    self._max_lookback, 
                    self._step_size
                )
        # Remove pattern detection objects for delisted securities.
        delisted_symbols = [symbol for symbol in self._pattern_by_symbol.keys() if symbol not in filtered_symbols]
        for symbol in delisted_symbols:
            self._pattern_by_symbol.pop(symbol)
        # Scan for new patterns.
        for f in filtered:
            if f.symbol not in new_symbols:
                self._pattern_by_symbol[f.symbol].update(f.adjusted_price)
        # Select symbols with high correlation and low DTW distance to the reference patterns.
        #  Step 1: Sort symbols by correlation in descending order.
        reverse_sorted_by_corr = [symbol for symbol, _ in sorted(self._pattern_by_symbol.items(), key=lambda x: x[1].corr, reverse=True)]
        #  Step 2: Sort symbols by DTW distance in ascending order.
        sorted_by_dtw = [symbol for symbol, _ in sorted(self._pattern_by_symbol.items(), key=lambda x: x[1].similarity_score, reverse=False)]
        #  Step 3: Add the ranks of each factor together.
        rank_by_symbol = {symbol: reverse_sorted_by_corr.index(symbol)+sorted_by_dtw.index(symbol) for symbol in self._pattern_by_symbol.keys()}
        #  Step 4: Select the symbols with the best combined rank across both factors.
        return [symbol for symbol, _ in sorted(rank_by_symbol.items(), key=lambda x: x[1])[:self._universe_size]]

    def on_data(self, data: Slice):
        # Short every stock when it first enters the universe (when we first detect the pattern).
        for symbol in self._new_symbols:
            self._trades.append(TimeBasedTrade(self, self.securities[symbol], self._hold_duration))
        self._new_symbols = []
        # Scan for exits.
        closed_trades = []
        for i, trade in enumerate(self._trades):
            trade.scan(self)
            if trade.closed:
                closed_trades.append(i)
        # Delete closed trades.
        for i in closed_trades[::-1]:
            del self._trades[i]

    def on_securities_changed(self, changes):
        for security in changes.added_securities:
            if security.symbol not in self._new_symbols:
                self._new_symbols.append(security.symbol)


class TimeBasedTrade:

    def __init__(self, algorithm, security, hold_duration):
        self._security = security
        # Determine the position size.
        self._quantity = -int(2_000 / self._security.price)
        if self._quantity == 0:
            self.closed = True
            return
        # Enter the trade.
        algorithm.market_order(self._security, self._quantity)
        self.closed = False
        # Set the trade exit date.
        self._exit_date = algorithm.time + hold_duration

    def scan(self, algorithm):
        # Perform a simple time-based exit.
        if not self.closed and self._exit_date <= algorithm.time:
            if self._security.is_tradable:
                algorithm.market_order(self._security, -self._quantity)    
            self.closed = True