| Overall Statistics |
|
Total Orders 2340 Average Win 0.11% Average Loss -0.07% Compounding Annual Return 20.686% Drawdown 17.200% Expectancy 1.099 Start Equity 1000000 End Equity 2562943.50 Net Profit 156.294% Sharpe Ratio 0.837 Sortino Ratio 1.012 Probabilistic Sharpe Ratio 46.925% Loss Rate 19% Win Rate 81% Profit-Loss Ratio 1.59 Alpha 0.01 Beta 0.895 Annual Standard Deviation 0.14 Annual Variance 0.02 Information Ratio -0.044 Tracking Error 0.049 Treynor Ratio 0.131 Total Fees $2620.56 Estimated Strategy Capacity $25000000.00 Lowest Capacity Asset NOB R735QTJ8XC9X Portfolio Turnover 0.66% |
# region imports
from AlgorithmImports import *
from universe import TopologicalGraphUniverseSelectionModel
# endregion
np.random.seed(0)
class TopologicalPortfolio(QCAlgorithm):
def initialize(self) -> None:
self.set_start_date(2020, 3, 25)
self.set_end_date(2025, 3, 25)
self.set_cash(1000000)
# We would like to compare with SPY for the correlation and risk-adjusted return.
spy = self.add_equity("SPY").symbol
self.set_benchmark(spy)
# Lookback window to construct and analyze the topological structure.
history_lookback = self.get_parameter("history_lookback", 150)
# Set the period to reconstruct the topological complex.
recalibrate_period = self.get_parameter("recalibrate_period", 125)
# Construct a portfolio with SPY constituents.
self.universe_model = TopologicalGraphUniverseSelectionModel(
spy,
history_lookback,
recalibrate_period,
lambda u: [x.symbol for x in sorted(
[x for x in u if x.weight],
key=lambda x: x.weight,
reverse=True
)[:200]]
)
self.add_universe_selection(self.universe_model)
# Set a scheduled event to rebalance the portfolio daily.
self.schedule.on(self.date_rules.every_day(spy), self.time_rules.at(9, 31), self.rebalance)
# Set the warm up to warm up the universe selection.
self.set_warm_up(timedelta(365))
def rebalance(self) -> None:
if self.universe_model.clustered_symbols:
# Obtain the weights invested in each constituents.
weights = self.weight_distribution(self.universe_model.clustered_symbols)
# Rebalance by designated weights.
self.set_holdings([PortfolioTarget(symbol, weight) for symbol, weight in weights.items()], liquidate_existing_holdings=True)
def weight_distribution(self, clustered_symbols):
# Assign weight between and within giant and small clusters. Note that we do not invest in outliers.
weights = {}
def assign_weights(nested_list, level=1):
num_elements = len(nested_list)
if num_elements == 0:
return
weight_per_element = 1 / num_elements
for item in nested_list:
if isinstance(item, list):
assign_weights(item, level + 1)
else:
weights[item] = weights.get(item, 0) + weight_per_element / (2 ** (level - 1))
# Calculate the overall weights.
assign_weights(clustered_symbols)
return pd.Series(weights) / sum(weights.values())# region imports
from AlgorithmImports import *
from Selection.ETFConstituentsUniverseSelectionModel import ETFConstituentsUniverseSelectionModel
import kmapper as km
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from umap import UMAP
# endregion
np.random.seed(0)
class TopologicalGraphUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
def __init__(self, etf_symbol: Symbol, lookback_window: int = 250, recalibration_period: timedelta = None, universe_filter_func: Callable[list[ETFConstituentUniverse], list[Symbol]] = None) -> None:
self._symbol = etf_symbol
self.lookback_window = lookback_window
self.recalibration_period = recalibration_period
self.clustered_symbols = None
super().__init__(etf_symbol, None, universe_filter_func)
def create_universes(self, algorithm: QCAlgorithm) -> list[Universe]:
universe_list = super().create_universes(algorithm)
# Initial warm up of the universe.
next_open = algorithm.securities[self._symbol].exchange.hours.get_next_market_open(algorithm.time, False)
algorithm.schedule.on(
algorithm.date_rules.on([next_open]),
algorithm.time_rules.at(9, 31),
lambda: self.get_graph_symbols(algorithm)
)
return universe_list
def get_graph_symbols(self, algorithm: QCAlgorithm) -> None:
# Construct simplicial complex.
graph, symbol_list = self.construct_simplicial_complex(algorithm, self.lookback_window)
if len(symbol_list) > 0:
self.clustered_symbols = self.clustering_symbols(graph, symbol_list)
# Set schedule event to reconstruct the topological structure.
algorithm.schedule.on(
algorithm.date_rules.on([algorithm.time + timedelta(self.recalibration_period)]),
algorithm.time_rules.at(0, 1),
lambda: self.get_graph_symbols(algorithm)
)
def construct_simplicial_complex(self, algorithm: QCAlgorithm, lookback_window: int) -> tuple[dict[str, object], list[Symbol]]:
if not self.universe.selected:
return {}, []
# Obtain historical data to construct a graph of stock relationship.
prices = algorithm.history(self.universe.selected, lookback_window, Resolution.DAILY).unstack(0).close
# Calculate daily log return. Then, transpose the data since we're relating stocks.
log_returns = np.log(prices / prices.shift(1)).dropna().T
if log_returns.empty:
return {}, []
# Initialize the mapper algorithm.
mapper = km.KeplerMapper()
# Project the data into a 2d subspace via 2 transformation, PCA and UMAP.
# PCA: since it can retain the most variance while denoising, as well as fast.
# UMAP: handles non-linear relationships well and preserves both local and global structures.
# MDS and Isomap are not included due to their potential sensitivity to noise and outliers in financial data.
projected_data = mapper.fit_transform(log_returns, projection=[PCA(n_components=0.8, random_state=1), UMAP(n_components=1, random_state=1, n_jobs=-1)])
# Cluster the data with DBSCAN since it is better in handling noise.
# We are interested in the correlation distance to cluster and form a portfolio.
graph = mapper.map(projected_data, log_returns, clusterer=DBSCAN(metric='correlation', n_jobs=-1))
return graph, prices.columns
def clustering_symbols(self, graph: dict[str, object], symbol_list: list[Symbol]) -> list[list[object]]:
# Each connected structure as a giant cluster.
linked_clusters = []
for x, y in graph['links'].items():
isin = False
for i in range(len(linked_clusters)):
if x in linked_clusters[i] or y in linked_clusters[i]:
linked_clusters[i] = list(set(linked_clusters[i] + [x] + y))
isin = True
if isin:
continue
linked_clusters.append([x] + y)
linked_clusters += [[x] for x in graph['nodes'] if x not in [z for y in linked_clusters for z in y]]
# Convert the node into symbol.
return [[list([symbol_list[graph['nodes'][x]]][0]) for x in linked_cluster] for linked_cluster in linked_clusters]