| Overall Statistics |
|
Total Orders 2905 Average Win 0.06% Average Loss -0.05% Compounding Annual Return 0.364% Drawdown 5.900% Expectancy 0.010 Start Equity 100000 End Equity 100516.13 Net Profit 0.516% Sharpe Ratio -0.658 Sortino Ratio -0.829 Probabilistic Sharpe Ratio 9.569% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.13 Alpha -0.031 Beta -0.113 Annual Standard Deviation 0.037 Annual Variance 0.001 Information Ratio 0.167 Tracking Error 0.205 Treynor Ratio 0.216 Total Fees $2913.05 Estimated Strategy Capacity $25000000.00 Lowest Capacity Asset ALL R735QTJ8XC9X Portfolio Turnover 10.51% |
from AlgorithmImports import *
from technical_pattern import TechnicalPattern
from scipy.stats import norm, uniform
class HeadAndShouldersPattern(TechnicalPattern):
def __init__(self, sequence, window_size, max_lookback, step_size):
self.sequence = np.array(sequence)
self.window_size = window_size
self.max_lookback = max_lookback
self.step_size = step_size
# Create pattern references
if not hasattr(HeadAndShouldersPattern, "ref"):
np.random.seed(1)
ref_count = 100
v1 = np.array([0] * ref_count) + 0.02 * norm.rvs(size=(ref_count, ))
p1 = np.array([1] * ref_count) + 0.2 * norm.rvs(size=(ref_count, ))
v2 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
v3 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
p3 = p1 + 0.02 * norm.rvs(size=(ref_count, ))
p2 = 1.5 * np.maximum(p1, p3) + abs(uniform.rvs(size=(ref_count, )))
v4 = v1 + 0.02 * norm.rvs(size=(ref_count, ))
ref = np.array([
v1,
(v1*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )),
(v1+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v1*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )),
p1,
(v2*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )),
(v2+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v2*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )),
v2,
(v2*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )),
(v2+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v2*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )),
p2,
(v3*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )),
(v3+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v3*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )),
v3,
(v3*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )),
(v3+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v3*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )),
p3,
(v4*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )),
(v4+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )),
(v4*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )),
v4
])
HeadAndShouldersPattern.ref = ((ref - ref.mean(axis=1, keepdims=True)) / ref.std(axis=1, keepdims=True)).T
# Warm up the factor values
self.rows = HeadAndShouldersPattern.ref.shape[0]
self.scan()
def update(self, price):
# Update the trailing window
self.sequence = np.append(self.sequence, price)[-self.max_lookback:]
# Update the factor values
self.scan()
def scan(self):
self.corr = 0
self.similarity_score = 0
# Select varying lengths of trailing windows
for i in range(self.window_size, self.max_lookback, self.step_size):
# Check if enough history to fill the trailing window
if len(self.sequence[-i:]) != i:
break
# Select the trailing data and downsample it to 25 data points
sub_sequence = self.downsample(self.sequence[-i:], self.window_size)
# Normalize the data in the trailing window
sub_sequence_std = np.std(sub_sequence)
if sub_sequence_std == 0:
continue
norm_sub_sequence = (sub_sequence - np.mean(sub_sequence)) / sub_sequence_std
# Evaluate the pattern presence
# Calculate correlation and similarity scores for each reference pattern
corr_scores = np.ndarray(shape=(self.rows))
similarity_scores = np.ndarray(shape=(self.rows))
for j in range(self.rows):
score, similarity = self.matching(norm_sub_sequence, HeadAndShouldersPattern.ref[j, :])
corr_scores[j] = score
similarity_scores[j] = similarity
# Aggregate the results to produce a single value for each factor
self.corr = max(self.corr, np.mean(corr_scores))
self.similarity_score = max(self.similarity_score, np.mean(similarity_scores))
def downsample(self, data, target_length):
factor = len(data) // target_length
return [np.mean(data[i*factor:(i+1)*factor]) for i in range(target_length)]
def matching(self, sub_sequence, ref_pattern):
correlation = np.corrcoef(sub_sequence, ref_pattern)[0, 1]
similarity = np.sum((sub_sequence - ref_pattern) ** 2)
return correlation, similarity# region imports
from AlgorithmImports import *
from head_and_shoulders import HeadAndShouldersPattern
# endregion
class HeadAndShouldersPatternDetectionAlgorithm(QCAlgorithm):
trades = []
new_symbols = []
pattern_by_symbol = {}
def initialize(self):
self.set_start_date(2022, 1, 1)
self.set_end_date(2023, 6, 1)
self.set_cash(100000)
# Pattern detection settings
self.window_size = self.get_parameter("window_size", 25)
self.max_lookback = self.get_parameter("max_lookback", 50)
self.step_size = self.get_parameter("step_size", 10) # lookback jumps from window_size -> window_size+step_size -> window_size+2*step_size ... -> max_lookback
# Universe settings
self.coarse_size = self.get_parameter("coarse_size", 500)
self.universe_size = self.get_parameter("universe_size", 5)
# Trade settings
self.hold_duration = timedelta(self.get_parameter("hold_duration", 3))
# Define universe
self.universe_settings.resolution = Resolution.DAILY
self.add_universe(self.coarse_filter_function)
def coarse_filter_function(self, coarse: List[CoarseFundamental]) -> List[Symbol]:
coarse = sorted(coarse, key=lambda x: x.dollar_volume, reverse=True)[:self.coarse_size]
# Create pattern detection objects for new securities
coarse_symbols = [c.symbol for c in coarse]
new_symbols = [symbol for symbol in coarse_symbols if symbol not in self.pattern_by_symbol]
if new_symbols:
history = self.history(new_symbols, self.max_lookback, Resolution.DAILY)
for symbol in new_symbols:
self.pattern_by_symbol[symbol] = HeadAndShouldersPattern(
history.loc[symbol]['close'].values if symbol in history.index else np.array([]),
self.window_size,
self.max_lookback,
self.step_size
)
# Remove pattern detection objects for delisted securities
delisted_symbols = [symbol for symbol in self.pattern_by_symbol.keys() if symbol not in coarse_symbols]
for symbol in delisted_symbols:
self.pattern_by_symbol.pop(symbol)
# Scan for new patterns
for c in coarse:
if c.symbol not in new_symbols:
self.pattern_by_symbol[c.symbol].update(c.adjusted_price)
# Select symbols with high correlation and low DTW distance to the reference patterns
# Step 1: Sort symbols by correlation in descending order
reverse_sorted_by_corr = [symbol for symbol, _ in sorted(self.pattern_by_symbol.items(), key=lambda x: x[1].corr, reverse=True)]
# Step 2: Sort symbols by DTW distance in ascending order
sorted_by_dtw = [symbol for symbol, _ in sorted(self.pattern_by_symbol.items(), key=lambda x: x[1].similarity_score, reverse=False)]
# Step 3: Add the ranks of each factor together
rank_by_symbol = {symbol: reverse_sorted_by_corr.index(symbol)+sorted_by_dtw.index(symbol) for symbol in self.pattern_by_symbol.keys()}
# Step 4: Select the symbols with the best combined rank across both factors
return [ symbol for symbol, _ in sorted(rank_by_symbol.items(), key=lambda x: x[1])[:self.universe_size] ]
def on_data(self, data: Slice):
# Short every stock when it first enters the universe (when we first detect the pattern)
for symbol in self.new_symbols:
self.trades.append(Trade(self, symbol, self.hold_duration))
self.new_symbols = []
# Scan for exits
closed_trades = []
for i, trade in enumerate(self.trades):
trade.scan(self)
if trade.closed:
closed_trades.append(i)
# Delete closed trades
for i in closed_trades[::-1]:
del self.trades[i]
def on_securities_changed(self, changes):
for security in changes.added_securities:
if security.symbol not in self.new_symbols:
self.new_symbols.append(security.symbol)
class Trade:
def __init__(self, algorithm, symbol, hold_duration):
self.symbol = symbol
# Determine position size
self.quantity = -int(2_000 / algorithm.securities[symbol].price)
if self.quantity == 0:
self.closed = True
return
# Enter trade
algorithm.market_order(symbol, self.quantity)
self.closed = False
# Set variable for exit logic
self.exit_date = algorithm.time + hold_duration
def scan(self, algorithm):
# Simple time-based exit
if not self.closed and self.exit_date <= algorithm.time:
algorithm.market_order(self.symbol, -self.quantity)
self.closed = True from AlgorithmImports import *
from scipy.signal import detrend, savgol_filter
from tslearn.metrics import dtw_path
from scipy.interpolate import interp1d
class TechnicalPattern:
def matching(self, series: np.array, ref: np.array) -> np.array:
# Smoothen the series from noise,
# Savgol filter is a method that smoothen the noise but preserve sharp changes
series_ = savgol_filter(series, 3, 2)
# normalize for distance-efficient match
series_ = (series_ - series_.mean()) / series_.std()
# Compute the DTW path and obtain similarity score (DTW distance)
path, similarity = dtw_path(ref, series_)
# Match the series by DTW path
series_ = np.array([series_[x[1]] for x in path])
ref = np.array([ref[x[0]] for x in path])
# Calculate if the series are similar, we use correlation coefficient here
# You may also use SWZ algorithm (Savin et al, 2007), fourier transform component-matching, etc.
score = np.corrcoef(series_, ref)[0, 1]
return score, similarity
def downsample(self, values, num_points):
if num_points == len(values):
return values
adj_values = []
duplicates = int(2 * len(values) / num_points)
if duplicates > 0:
for x in values:
for i in range(duplicates):
adj_values.append(x)
else:
adj_values = values
num_steps = num_points - 2
step_size = int(len(adj_values) / num_steps)
smoothed_data = [adj_values[0]]
for i in range(num_steps):
start_idx = i * step_size
end_idx = len(adj_values) - 1 if i == num_steps-1 else (i+1)*step_size - 1
segment = np.array(adj_values[start_idx:end_idx+1])
avg = sum(segment) / len(segment)
smoothed_data.append(avg)
smoothed_data.append(adj_values[-1])
return smoothed_data