| Overall Statistics |
|
Total Orders 10334 Average Win 0.05% Average Loss -0.05% Compounding Annual Return -2.849% Drawdown 16.600% Expectancy -0.056 Start Equity 100000 End Equity 86539.23 Net Profit -13.461% Sharpe Ratio -1.557 Sortino Ratio -1.89 Probabilistic Sharpe Ratio 0.002% Loss Rate 54% Win Rate 46% Profit-Loss Ratio 1.06 Alpha -0.047 Beta -0.142 Annual Standard Deviation 0.037 Annual Variance 0.001 Information Ratio -0.772 Tracking Error 0.164 Treynor Ratio 0.403 Total Fees $10366.27 Estimated Strategy Capacity $180000000.00 Lowest Capacity Asset SST V2245V5VOQQT Portfolio Turnover 11.59% Drawdown Recovery 3 |
from AlgorithmImports import *
from scipy.stats import norm, uniform
class HeadAndShouldersPattern:
def __init__(self, sequence, window_size, max_lookback, step_size):
self._sequence = np.array(sequence)
self._window_size = window_size
self._max_lookback = max_lookback
self._step_size = step_size
# Create pattern references
if not hasattr(HeadAndShouldersPattern, "ref"):
np.random.seed(1)
ref_count = 100
v1 = np.array([0] * ref_count) + 0.02 * norm.rvs(size=(ref_count, ))
p1 = np.array([1] * ref_count) + 0.2 * norm.rvs(size=(ref_count, ))
v2 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
v3 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
p3 = p1 + 0.02 * norm.rvs(size=(ref_count, ))
p2 = 1.5 * np.maximum(p1, p3) + abs(uniform.rvs(size=(ref_count, )))
v4 = v1 + 0.02 * norm.rvs(size=(ref_count, ))
ref = np.array([
v1,
(v1*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )),
(v1+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v1*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )),
p1,
(v2*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )),
(v2+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v2*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )),
v2,
(v2*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )),
(v2+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v2*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )),
p2,
(v3*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )),
(v3+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v3*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )),
v3,
(v3*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )),
(v3+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v3*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )),
p3,
(v4*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )),
(v4+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v4*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )),
v4
])
HeadAndShouldersPattern.ref = ((ref - ref.mean(axis=1, keepdims=True)) / ref.std(axis=1, keepdims=True)).T
# Warm up the factor values
self._rows = HeadAndShouldersPattern.ref.shape[0]
self._scan()
def update(self, price):
# Update the trailing window
self._sequence = np.append(self._sequence, price)[-self._max_lookback:]
# Update the factor values
self._scan()
def _scan(self):
self.corr = 0
self.similarity_score = 0
# Select varying lengths of trailing windows
for i in range(self._window_size, self._max_lookback, self._step_size):
# Check if enough history to fill the trailing window
if len(self._sequence[-i:]) != i:
break
# Select the trailing data and downsample it to 25 data points
sub_sequence = self._downsample(self._sequence[-i:], self._window_size)
# Normalize the data in the trailing window
sub_sequence_std = np.std(sub_sequence)
if sub_sequence_std == 0:
continue
norm_sub_sequence = (sub_sequence - np.mean(sub_sequence)) / sub_sequence_std
# Evaluate the pattern presence
# Calculate correlation and similarity scores for each reference pattern
corr_scores = np.ndarray(shape=(self._rows))
similarity_scores = np.ndarray(shape=(self._rows))
for j in range(self._rows):
score, similarity = self._matching(norm_sub_sequence, HeadAndShouldersPattern.ref[j, :])
corr_scores[j] = score
similarity_scores[j] = similarity
# Aggregate the results to produce a single value for each factor
self.corr = max(self.corr, np.mean(corr_scores))
self.similarity_score = max(self.similarity_score, np.mean(similarity_scores))
def _downsample(self, data, target_length):
factor = len(data) // target_length
return [np.mean(data[i*factor:(i+1)*factor]) for i in range(target_length)]
def _matching(self, sub_sequence, ref_pattern):
correlation = np.corrcoef(sub_sequence, ref_pattern)[0, 1]
similarity = np.sum((sub_sequence - ref_pattern) ** 2)
return correlation, similarity# region imports
from AlgorithmImports import *
from head_and_shoulders import HeadAndShouldersPattern
# endregion
class HeadAndShouldersPatternDetectionAlgorithm(QCAlgorithm):
_trades = []
_new_symbols = []
_pattern_by_symbol = {}
def initialize(self):
self.set_start_date(self.end_date - timedelta(5*365))
self.set_cash(100_000)
self.settings.seed_initial_prices = True
# Define the pattern detection settings.
self._window_size = self.get_parameter("window_size", 25)
self._max_lookback = self.get_parameter("max_lookback", 50)
self._step_size = self.get_parameter("step_size", 10) # lookback jumps from window_size -> window_size+step_size -> window_size+2*step_size ... -> max_lookback
# Define the universe settings.
self._liquidity_filter_size = self.get_parameter("liquidity_filter_size", 500)
self._universe_size = self.get_parameter("universe_size", 5)
# Define the trade settings.
self._hold_duration = timedelta(self.get_parameter("hold_duration", 3))
# Add a universe of US Equities.
self.universe_settings.resolution = Resolution.DAILY
self.add_universe(self._select_assets)
def _select_assets(self, fundamentals) -> List[Symbol]:
filtered = sorted(fundamentals, key=lambda f: f.dollar_volume, reverse=True)[:self._liquidity_filter_size]
# Create pattern detection objects for new securities.
filtered_symbols = [f.symbol for f in filtered]
new_symbols = [symbol for symbol in filtered_symbols if symbol not in self._pattern_by_symbol]
if new_symbols:
history = self.history(new_symbols, self._max_lookback, Resolution.DAILY)
for symbol in new_symbols:
self._pattern_by_symbol[symbol] = HeadAndShouldersPattern(
history.loc[symbol]['close'].values if symbol in history.index else np.array([]),
self._window_size,
self._max_lookback,
self._step_size
)
# Remove pattern detection objects for delisted securities.
delisted_symbols = [symbol for symbol in self._pattern_by_symbol.keys() if symbol not in filtered_symbols]
for symbol in delisted_symbols:
self._pattern_by_symbol.pop(symbol)
# Scan for new patterns.
for f in filtered:
if f.symbol not in new_symbols:
self._pattern_by_symbol[f.symbol].update(f.adjusted_price)
# Select symbols with high correlation and low DTW distance to the reference patterns.
# Step 1: Sort symbols by correlation in descending order.
reverse_sorted_by_corr = [symbol for symbol, _ in sorted(self._pattern_by_symbol.items(), key=lambda x: x[1].corr, reverse=True)]
# Step 2: Sort symbols by DTW distance in ascending order.
sorted_by_dtw = [symbol for symbol, _ in sorted(self._pattern_by_symbol.items(), key=lambda x: x[1].similarity_score, reverse=False)]
# Step 3: Add the ranks of each factor together.
rank_by_symbol = {symbol: reverse_sorted_by_corr.index(symbol)+sorted_by_dtw.index(symbol) for symbol in self._pattern_by_symbol.keys()}
# Step 4: Select the symbols with the best combined rank across both factors.
return [symbol for symbol, _ in sorted(rank_by_symbol.items(), key=lambda x: x[1])[:self._universe_size]]
def on_data(self, data: Slice):
# Short every stock when it first enters the universe (when we first detect the pattern).
for symbol in self._new_symbols:
self._trades.append(TimeBasedTrade(self, self.securities[symbol], self._hold_duration))
self._new_symbols = []
# Scan for exits.
closed_trades = []
for i, trade in enumerate(self._trades):
trade.scan(self)
if trade.closed:
closed_trades.append(i)
# Delete closed trades.
for i in closed_trades[::-1]:
del self._trades[i]
def on_securities_changed(self, changes):
for security in changes.added_securities:
if security.symbol not in self._new_symbols:
self._new_symbols.append(security.symbol)
class TimeBasedTrade:
def __init__(self, algorithm, security, hold_duration):
self._security = security
# Determine the position size.
self._quantity = -int(2_000 / self._security.price)
if self._quantity == 0:
self.closed = True
return
# Enter the trade.
algorithm.market_order(self._security, self._quantity)
self.closed = False
# Set the trade exit date.
self._exit_date = algorithm.time + hold_duration
def scan(self, algorithm):
# Perform a simple time-based exit.
if not self.closed and self._exit_date <= algorithm.time:
if self._security.is_tradable:
algorithm.market_order(self._security, -self._quantity)
self.closed = True