| Overall Statistics |
|
Total Orders 315 Average Win 3.76% Average Loss -3.18% Compounding Annual Return 17.829% Drawdown 51.000% Expectancy 0.237 Start Equity 100000 End Equity 263863.63 Net Profit 163.864% Sharpe Ratio 0.526 Sortino Ratio 0.539 Probabilistic Sharpe Ratio 13.684% Loss Rate 43% Win Rate 57% Profit-Loss Ratio 1.18 Alpha 0.087 Beta 0.341 Annual Standard Deviation 0.223 Annual Variance 0.05 Information Ratio 0.118 Tracking Error 0.243 Treynor Ratio 0.343 Total Fees $6642.43 Estimated Strategy Capacity $0 Lowest Capacity Asset EWY RUMXNBHKSXGL Portfolio Turnover 13.58% Drawdown Recovery 1301 Avg. Lost% Per Losser -3.50% Avg. Win% Per Winner 3.98% Max Win% 23.37% Max Loss% -17.44% *Profit Ratio 1.43 |
from AlgorithmImports import *
from reversion import *
from regime import *
from datetime import datetime, timedelta
class ConstantNoBenchmarkAlphaModel(AlphaModel):
''' Provides an implementation of IAlphaModel that always returns the same insight for each security'''
def __init__(self, algo, type, direction, period, magnitude = None, confidence = None, weight = None):
'''Initializes a new instance of the ConstantAlphaModel class
Args:
type: The type of insight
direction: The direction of the insight
period: The period over which the insight with come to fruition
magnitude: The predicted change in magnitude as a +- percentage
confidence: The confidence in the insight
weight: The portfolio weight of the insights'''
self.algo = algo
self.type = type
self.direction = direction
self.period = period
self.magnitude = magnitude
self.confidence = confidence
self.weight = weight
self.securities = []
self.insights_time_by_symbol = {}
self.Name = '{}({},{},{}'.format(self.__class__.__name__, type, direction, strfdelta(period))
if magnitude is not None:
self.Name += ',{}'.format(magnitude)
if confidence is not None:
self.Name += ',{}'.format(confidence)
self.Name += ')'
def update(self, algorithm, data):
''' Creates a constant insight for each security as specified via the constructor
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
is_holding = 0
if not self.algo.securities[self.algo._symbol].exchange.hours.is_open(self.algo.time + timedelta(minutes=5), extended_market_hours=False):
return []
for security in self.securities:
if security.symbol.value == self.algo.etf:
df_train = self.algo.history(self.algo._symbol, datetime(2018, 1, 1), datetime(2019, 12, 31), Resolution.DAILY)
df_etf = self.algo.history(self.algo._symbol, 100, Resolution.DAILY)
for holding in self.algo.portfolio.values():
if holding is None:
continue
symbol = holding.symbol
qty = holding.quantity
invested = holding.invested
if symbol.value == self.algo.etf and qty > 0 and invested:
is_holding = 1
arima = SimplifiedARIMAStrategy(self.algo, is_holding)
arima_signal = arima.generate_signals(df_etf.tail(50)['close'].values)
wk_signal = features_wkclustering_regime(self.algo, df_train, df_etf, 20)
if is_holding == 1 and arima_signal == 0 and self.algo.time.hour == 10:
insights.append(Insight(security.symbol, self.period, self.type, InsightDirection.FLAT, self.magnitude, self.confidence, weight = self.weight))
elif (is_holding == 1 or (arima_signal == 1 and wk_signal == 0)) and self.algo.time.hour == 10:
insigths = self.algo.insights.get_active_insights(self.algo.utc_time)
self.algo.insights.cancel(insigths)
insights = []
insights.append(Insight(security.symbol, self.period, self.type, self.direction, self.magnitude, self.confidence, weight = self.weight))
return insights
for security in self.securities:
history = self.algo.history(security.symbol, 100, Resolution.DAILY)
if security.symbol == self.algo._symbol or security.symbol.value == self.algo.etf:
continue
else:
if security.price < history.tail(21)['close'].max() * 0.88:
insights.append(Insight(security.symbol, self.period, self.type, InsightDirection.FLAT, self.magnitude, self.confidence, weight = self.weight))
elif security.price != 0 and self.should_emit_insight(algorithm.utc_time, security.symbol) and not is_holding:
insights.append(Insight(security.symbol, self.period, self.type, self.direction, self.magnitude, self.confidence, weight = self.weight))
return insights
def on_securities_changed(self, algorithm, changes):
''' Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.added_securities:
self.securities.append(added)
# this will allow the insight to be re-sent when the security re-joins the universe
for removed in changes.removed_securities:
if removed in self.securities:
self.securities.remove(removed)
if removed.symbol in self.insights_time_by_symbol:
self.insights_time_by_symbol.pop(removed.symbol)
def should_emit_insight(self, utc_time, symbol):
if symbol.is_canonical():
# canonical futures & options are none tradable
return False
generated_time_utc = self.insights_time_by_symbol.get(symbol)
if generated_time_utc is not None:
# we previously emitted a insight for this symbol, check it's period to see
# if we should emit another insight
if utc_time - generated_time_utc < self.period:
return False
# we either haven't emitted a insight for this symbol or the previous
# insight's period has expired, so emit a new insight now for this symbol
self.insights_time_by_symbol[symbol] = utc_time
return True
def strfdelta(tdelta):
d = tdelta.days
h, rem = divmod(tdelta.seconds, 3600)
m, s = divmod(rem, 60)
return "{}.{:02d}:{:02d}:{:02d}".format(d,h,m,s)# region imports
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
from scipy.spatial.distance import squareform
from scipy.cluster.hierarchy import linkage, dendrogram, fcluster
from scipy.sparse.csgraph import minimum_spanning_tree
from sklearn.metrics import silhouette_score, davies_bouldin_score, adjusted_rand_score
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Any
import warnings
# endregion
class MSTClustering:
"""Implement Minimum Spanning Tree clustering."""
def __init__(self, distance_matrix: pd.DataFrame):
self.distance_matrix = distance_matrix
self.tickers = distance_matrix.index.tolist()
def build_mst(self) -> Tuple[Any, Dict]:
"""
STEP 2.1 & 2.2: Build MST using Kruskal's algorithm.
Returns: (mst_sparse_matrix, edge_dict)
"""
print("\n" + "=" * 80)
print("PHASE 2: MINIMUM SPANNING TREE (MST) CLUSTERING")
print("=" * 80)
print(f"\nSTEP 2.1-2.2: Building MST with Kruskal's algorithm...")
# Convert to numpy array
dist_array = self.distance_matrix.values
# Compute MST
mst = minimum_spanning_tree(dist_array)
# Extract edges
mst_full = mst.toarray()
edges = []
for i in range(len(mst_full)):
for j in range(i+1, len(mst_full)):
if mst_full[i, j] > 0:
edges.append((i, j, mst_full[i, j]))
elif mst_full[j, i] > 0:
edges.append((i, j, mst_full[j, i]))
print(f"✓ MST constructed")
print(f" Number of edges: {len(edges)}")
print(f" Number of nodes: {len(self.tickers)}")
print(f" Total MST weight: {sum([e[2] for e in edges]):.3f}")
return mst, edges
def extract_clusters(self, edges: List[Tuple], n_clusters: int = 30) -> Dict[str, int]:
"""
STEP 2.3: Extract clusters from MST by removing longest edges.
Args:
edges: List of (node1, node2, weight) tuples
n_clusters: Target number of clusters
Returns: Dictionary mapping ticker to cluster_id
"""
print(f"\nSTEP 2.3: Extracting {n_clusters} clusters from MST...")
# Sort edges by weight (ascending)
sorted_edges = sorted(edges, key=lambda x: x[2])
# Keep only the shortest edges to create n_clusters
n_edges_to_keep = len(self.tickers) - n_clusters
kept_edges = sorted_edges[:n_edges_to_keep]
# Build adjacency list
from collections import defaultdict
adj_list = defaultdict(list)
for i, j, w in kept_edges:
adj_list[i].append(j)
adj_list[j].append(i)
# Find connected components using DFS
visited = [False] * len(self.tickers)
cluster_assignments = {}
cluster_id = 0
def dfs(node, cluster_id):
visited[node] = True
cluster_assignments[self.tickers[node]] = cluster_id
for neighbor in adj_list[node]:
if not visited[neighbor]:
dfs(neighbor, cluster_id)
for i in range(len(self.tickers)):
if not visited[i]:
dfs(i, cluster_id)
cluster_id += 1
print(f"✓ MST clusters extracted: {cluster_id} clusters")
# Print cluster sizes
cluster_sizes = pd.Series(cluster_assignments.values()).value_counts().sort_index()
print(f" Cluster size distribution:")
print(f" Mean: {cluster_sizes.mean():.1f}")
print(f" Median: {cluster_sizes.median():.1f}")
print(f" Max: {cluster_sizes.max()}")
print(f" Min: {cluster_sizes.min()}")
return cluster_assignments
class HierarchicalClustering:
"""Implement Hierarchical Clustering with Gap Index optimization."""
def __init__(self, distance_matrix: pd.DataFrame):
self.distance_matrix = distance_matrix
self.tickers = distance_matrix.index.tolist()
np.random.seed(42)
def build_dendrogram(self) -> Any:
"""
STEP 3.1: Build hierarchical clustering dendrogram using Ward's method.
Returns: Linkage matrix
"""
# Convert distance matrix to condensed form
dist_condensed = squareform(self.distance_matrix.values, checks=False)
# Perform hierarchical clustering
linkage_matrix = linkage(dist_condensed, method='ward')
return linkage_matrix
def calculate_gap_index(self, algo, linkage_matrix: Any, min_clusters: int = 4,
max_clusters: int = 67,
n_references: int = 10) -> Tuple[int, List[float]]:
"""
STEP 3.2: Calculate Gap Index to find optimal number of clusters.
Args:
linkage_matrix: Linkage matrix from hierarchical clustering
max_clusters: Maximum number of clusters to test
n_references: Number of random reference datasets
Returns: (optimal_k, gap_values)
"""
n_samples = len(self.tickers)
gap_values = []
# Original distance matrix
dist_matrix = self.distance_matrix.values
for k in range(min_clusters, min(max_clusters + 1, n_samples)):
# Get cluster assignments for k clusters
clusters = fcluster(linkage_matrix, k, criterion='maxclust')
# Calculate within-cluster dispersion for real data
W_k = self._calculate_dispersion(dist_matrix, clusters)
# Calculate within-cluster dispersion for random data
W_k_random_list = []
for _ in range(n_references):
# Generate random distance matrix with same structure
random_dist = np.random.uniform(
dist_matrix.min(),
dist_matrix.max(),
dist_matrix.shape
)
random_dist = (random_dist + random_dist.T) / 2 # Make symmetric
np.fill_diagonal(random_dist, 0)
# Cluster random data
random_condensed = squareform(random_dist)
random_linkage = linkage(random_condensed, method='ward')
random_clusters = fcluster(random_linkage, k, criterion='maxclust')
W_k_random = self._calculate_dispersion(random_dist, random_clusters)
W_k_random_list.append(W_k_random)
# Gap statistic
W_k_random_mean = np.mean(W_k_random_list)
gap = np.log(W_k_random_mean) - np.log(W_k)
gap_values.append(gap)
#if k % 10 == 0 or k == 2:
# print(f" k={k:2d}: Gap = {gap:.4f}")
# Find optimal k (maximum gap)
optimal_k = gap_values.index(max(gap_values)) + 2 # +2 because we start from k=2
#algo.log.debug(f"\n✓ Optimal number of clusters: {optimal_k}")
#print(f" Maximum Gap Index: {max(gap_values):.4f}")
return optimal_k, gap_values
def _calculate_dispersion(self, dist_matrix: np.ndarray, clusters: np.ndarray) -> float:
"""Calculate within-cluster dispersion."""
total_dispersion = 0.0
unique_clusters = np.unique(clusters)
for cluster_id in unique_clusters:
cluster_indices = np.where(clusters == cluster_id)[0]
if len(cluster_indices) > 1:
cluster_distances = dist_matrix[np.ix_(cluster_indices, cluster_indices)]
total_dispersion += np.sum(cluster_distances ** 2) / (2 * len(cluster_indices))
return total_dispersion
def extract_clusters(self, linkage_matrix: Any, n_clusters: int) -> Dict[str, int]:
"""
STEP 3.3: Extract clusters at optimal cut.
Args:
linkage_matrix: Linkage matrix from hierarchical clustering
n_clusters: Number of clusters to extract
Returns: Dictionary mapping ticker to cluster_id
"""
print(f"\nSTEP 3.3: Extracting {n_clusters} clusters from dendrogram...")
# Cut dendrogram at optimal height
cluster_labels = fcluster(linkage_matrix, n_clusters, criterion='maxclust')
# Create mapping
cluster_assignments = {ticker: int(label - 1) # Convert to 0-indexed
for ticker, label in zip(self.tickers, cluster_labels)}
# Print cluster sizes
cluster_sizes = pd.Series(cluster_assignments.values()).value_counts().sort_index()
return cluster_assignments
class ClusterConsensus:
"""Compare and merge clustering results."""
@staticmethod
def compare_methods(mst_clusters: Dict[str, int],
hc_clusters: Dict[str, int]) -> float:
"""
STEP 4.1: Compare MST and Hierarchical Clustering results using ARI.
Args:
mst_clusters: MST cluster assignments
hc_clusters: Hierarchical cluster assignments
Returns: Adjusted Rand Index (ARI)
"""
print("\n" + "=" * 80)
print("PHASE 4: CROSS-VALIDATION & CONSENSUS")
print("=" * 80)
print(f"\nSTEP 4.1: Comparing MST and Hierarchical Clustering...")
# Ensure same order
tickers = sorted(mst_clusters.keys())
mst_labels = [mst_clusters[t] for t in tickers]
hc_labels = [hc_clusters[t] for t in tickers]
# Calculate ARI
ari = adjusted_rand_score(mst_labels, hc_labels)
print(f"✓ Adjusted Rand Index (ARI): {ari:.4f}")
if ari >= 0.7:
print(f" → Strong agreement ✓ (trust the clustering)")
elif ari >= 0.4:
print(f" → Moderate agreement ⚠ (review carefully)")
else:
print(f" → Weak agreement ✗ (use hierarchical clustering)")
return ari
@staticmethod
def merge_clusters(mst_clusters: Dict[str, int],
hc_clusters: Dict[str, int],
ari: float) -> Dict[str, int]:
"""
STEP 4.2: Merge clustering methods based on agreement.
Args:
mst_clusters: MST cluster assignments
hc_clusters: Hierarchical cluster assignments
ari: Adjusted Rand Index
Returns: Final consensus cluster assignments
"""
print(f"\nSTEP 4.2: Merging cluster assignments...")
if ari < 0.7:
print(f" Using Hierarchical Clustering results (ARI < 0.7)")
final_clusters = hc_clusters.copy()
else:
print(f" Creating consensus from both methods (ARI >= 0.7)")
final_clusters = {}
for ticker in mst_clusters.keys():
# If methods agree, use that assignment
# If they disagree, use hierarchical clustering (more stable)
if mst_clusters[ticker] == hc_clusters[ticker]:
final_clusters[ticker] = hc_clusters[ticker]
else:
final_clusters[ticker] = hc_clusters[ticker]
n_clusters = len(set(final_clusters.values()))
print(f"✓ Final consensus clusters: {n_clusters}")
return final_clusters
class QualityValidation:
"""Validate clustering quality."""
def __init__(self, distance_matrix: pd.DataFrame,
correlation_matrix: pd.DataFrame,
cluster_assignments: Dict[str, int]):
self.distance_matrix = distance_matrix
self.correlation_matrix = correlation_matrix
self.cluster_assignments = cluster_assignments
self.tickers = list(cluster_assignments.keys())
def validate_all(self) -> Dict[str, float]:
"""
STEP 5.1 & 5.2: Perform all quality validation checks.
Returns: Dictionary of validation metrics
"""
print("\n" + "=" * 80)
print("PHASE 5: QUALITY CHECKS")
print("=" * 80)
metrics = {}
# Prepare data for sklearn metrics
cluster_labels = [self.cluster_assignments[t] for t in self.tickers]
dist_array = self.distance_matrix.loc[self.tickers, self.tickers].values
# Metric 1: Silhouette Score
print(f"\nCalculating Silhouette Score...")
silhouette = silhouette_score(dist_array, cluster_labels, metric='precomputed')
metrics['silhouette_score'] = silhouette
print(f"✓ Silhouette Score: {silhouette:.4f} (target: > 0.4)")
if silhouette > 0.4:
print(f" → PASS ✓")
else:
print(f" → FAIL ✗")
# Metric 2: Davies-Bouldin Index
print(f"\nCalculating Davies-Bouldin Index...")
# Need to convert to feature space for DB index
from sklearn.manifold import MDS
mds = MDS(n_components=10, dissimilarity='precomputed', random_state=42)
coords = mds.fit_transform(dist_array)
db_index = davies_bouldin_score(coords, cluster_labels)
metrics['davies_bouldin_index'] = db_index
print(f"✓ Davies-Bouldin Index: {db_index:.4f} (target: < 1.5)")
if db_index < 1.5:
print(f" → PASS ✓")
else:
print(f" → FAIL ✗")
# Metric 3: Mean Intra-Cluster Correlation
print(f"\nCalculating Mean Intra-Cluster Correlation...")
intra_corr = self._calculate_intra_cluster_correlation()
metrics['intra_cluster_corr'] = intra_corr
print(f"✓ Mean Intra-Cluster Correlation: {intra_corr:.4f} (target: > 0.6)")
if intra_corr > 0.6:
print(f" → PASS ✓")
else:
print(f" → FAIL ✗")
# Metric 4: Mean Inter-Cluster Correlation
print(f"\nCalculating Mean Inter-Cluster Correlation...")
inter_corr = self._calculate_inter_cluster_correlation()
metrics['inter_cluster_corr'] = inter_corr
print(f"✓ Mean Inter-Cluster Correlation: {inter_corr:.4f} (target: < 0.3)")
if inter_corr < 0.3:
print(f" → PASS ✓")
else:
print(f" → FAIL ✗")
# Summary
passed = sum([
silhouette > 0.4,
db_index < 1.5,
intra_corr > 0.6,
inter_corr < 0.3
])
print(f"\n" + "=" * 80)
print(f"QUALITY SUMMARY: {passed}/4 metrics passed")
print("=" * 80)
if passed == 4:
metrics['quality_level'] = 'EXCELLENT'
print(f"✓ EXCELLENT clustering - All targets met!")
elif passed >= 2:
metrics['quality_level'] = 'GOOD'
print(f"✓ GOOD clustering - {passed}/4 targets met")
else:
metrics['quality_level'] = 'POOR'
print(f"✗ POOR clustering - Only {passed}/4 targets met")
return metrics
def _calculate_intra_cluster_correlation(self) -> float:
"""Calculate mean correlation within clusters."""
corr_matrix = self.correlation_matrix.loc[self.tickers, self.tickers]
intra_corrs = []
for cluster_id in set(self.cluster_assignments.values()):
cluster_tickers = [t for t, c in self.cluster_assignments.items() if c == cluster_id]
if len(cluster_tickers) > 1:
cluster_corr = corr_matrix.loc[cluster_tickers, cluster_tickers]
# Get upper triangle (excluding diagonal)
mask = np.triu(np.ones_like(cluster_corr, dtype=bool), k=1)
intra_corrs.extend(cluster_corr.values[mask])
return np.mean(intra_corrs) if intra_corrs else 0.0
def _calculate_inter_cluster_correlation(self) -> float:
"""Calculate mean correlation between clusters."""
corr_matrix = self.correlation_matrix.loc[self.tickers, self.tickers]
inter_corrs = []
clusters = set(self.cluster_assignments.values())
for c1 in clusters:
for c2 in clusters:
if c1 < c2: # Avoid double counting
tickers_c1 = [t for t, c in self.cluster_assignments.items() if c == c1]
tickers_c2 = [t for t, c in self.cluster_assignments.items() if c == c2]
if tickers_c1 and tickers_c2:
cross_corr = corr_matrix.loc[tickers_c1, tickers_c2]
inter_corrs.extend(cross_corr.values.flatten())
return np.mean(inter_corrs) if inter_corrs else 0.0
def plot_quasi_diagonal(self, save_path: str = None):
"""
STEP 5.1: Plot quasi-diagonalized correlation matrix.
Args:
save_path: Path to save the plot
"""
print(f"\nGenerating quasi-diagonalized correlation matrix plot...")
# Sort tickers by cluster
sorted_tickers = sorted(self.tickers, key=lambda t: self.cluster_assignments[t])
# Reorder correlation matrix
corr_reordered = self.correlation_matrix.loc[sorted_tickers, sorted_tickers]
# Create plot
plt.figure(figsize=(14, 12))
sns.heatmap(corr_reordered, cmap='RdBu_r', center=0,
vmin=-1, vmax=1, square=True,
xticklabels=False, yticklabels=False,
cbar_kws={'label': 'Correlation'})
plt.title('Quasi-Diagonalized Correlation Matrix\n(Reordered by Cluster)',
fontsize=14, fontweight='bold')
plt.xlabel('ETFs (grouped by cluster)')
plt.ylabel('ETFs (grouped by cluster)')
plt.show()
plt.close()
class BestInClassSelector:
"""Select best performing ETF from each cluster."""
def __init__(self, cluster_assignments: Dict[str, int], momentum_scores):
self.cluster_assignments = cluster_assignments
self.momentum_scores = momentum_scores
def select_best_in_class(self, algo) -> Dict[int, str]:
"""
STEP 6.1: Select best performer from each cluster.
Args:
momentum_scores: Dictionary of ticker to momentum score
Returns: Dictionary of cluster_id to best_ticker
"""
print(f"\nSTEP 6.1: Selecting best-in-class from each cluster...")
clusters = []
for cluster_id in set(self.cluster_assignments.values()):
# Get all tickers in this cluster
cluster_tickers = [t for t, c in self.cluster_assignments.items()
if c == cluster_id and t in self.momentum_scores]
#algo.log.debug(f"[BestInClassSelector] Selecting best in cluster {cluster_id} --> {[x.value for x in cluster_tickers]}")
if cluster_tickers:
sorted_tickers = sorted(
cluster_tickers,
key=lambda t: self.momentum_scores[t],
reverse=True
)
top_tickers = sorted_tickers[:1]
clusters.extend(top_tickers)
return clusters
def document_redundancy(self) -> Dict[str, Any]:
"""
STEP 6.2: Document redundancy removed.
Returns: Dictionary with redundancy statistics
"""
print(f"\nSTEP 6.2: Documenting redundancy removed...")
cluster_stats = {}
total_redundancies = 0
for cluster_id in set(self.cluster_assignments.values()):
cluster_tickers = [t for t, c in self.cluster_assignments.items() if c == cluster_id]
cluster_size = len(cluster_tickers)
redundancies = cluster_size - 1 # All except the leader
cluster_stats[cluster_id] = {
'size': cluster_size,
'redundancies_removed': redundancies,
'tickers': cluster_tickers
}
total_redundancies += redundancies
initial_count = len(self.cluster_assignments)
final_count = len(set(self.cluster_assignments.values()))
efficiency = total_redundancies / initial_count if initial_count > 0 else 0
print(f"✓ Redundancy analysis complete")
print(f" Initial ETFs: {initial_count}")
print(f" Final ETFs (cluster leaders): {final_count}")
print(f" Total redundancies removed: {total_redundancies}")
print(f" Efficiency: {efficiency:.1%}")
return {
'initial_count': initial_count,
'final_count': final_count,
'total_redundancies': total_redundancies,
'efficiency': efficiency,
'cluster_stats': cluster_stats
}
class OutputGenerator:
"""Generate final outputs and reports."""
@staticmethod
def generate_report(
n_etfs: int,
mst_clusters: Dict[str, int],
hc_clusters: Dict[str, int],
final_clusters: Dict[str, int],
ari: float,
validation_metrics: Dict[str, float],
redundancy_stats: Dict[str, Any],
cluster_leaders: Dict[int, str]
) -> str:
"""
STEP 7.1 & 7.2: Generate comprehensive report.
Returns: Report as string
"""
print("\n" + "=" * 80)
print("PHASE 7: OUTPUT AND REPORTING")
print("=" * 80)
n_mst_clusters = len(set(mst_clusters.values()))
n_hc_clusters = len(set(hc_clusters.values()))
n_final_clusters = len(set(final_clusters.values()))
report = f"""
{'=' * 80}
STAGE 2 CLUSTERING REPORT
{'=' * 80}
Input: {n_etfs} ETFs from Stage 1 (momentum filtered)
MST Method Results:
- Clusters identified: {n_mst_clusters}
- Method: Kruskal's Minimum Spanning Tree
Hierarchical Clustering Results:
- Optimal clusters (Gap Index): {n_hc_clusters}
- Silhouette Score: {validation_metrics['silhouette_score']:.4f} {'✓' if validation_metrics['silhouette_score'] > 0.4 else '✗'}
- Davies-Bouldin Index: {validation_metrics['davies_bouldin_index']:.4f} {'✓' if validation_metrics['davies_bouldin_index'] < 1.5 else '✗'}
Consensus Results:
- Method Agreement (ARI): {ari:.4f} {'✓ Strong' if ari >= 0.7 else '⚠ Moderate' if ari >= 0.4 else '✗ Weak'}
- Final clusters: {n_final_clusters}
- Confidence level: {'HIGH' if ari >= 0.7 else 'MEDIUM' if ari >= 0.4 else 'LOW'}
Cluster Quality:
- Mean Intra-Cluster Corr: {validation_metrics['intra_cluster_corr']:.4f} {'✓' if validation_metrics['intra_cluster_corr'] > 0.6 else '✗'} (target >0.6)
- Mean Inter-Cluster Corr: {validation_metrics['inter_cluster_corr']:.4f} {'✓' if validation_metrics['inter_cluster_corr'] < 0.3 else '✗'} (target <0.3)
- Overall Quality: {validation_metrics['quality_level']}
Redundancy Elimination:
- Total redundancies removed: {redundancy_stats['total_redundancies']}
- ETFs carried forward to Stage 3: {redundancy_stats['final_count']}
- Efficiency: {redundancy_stats['efficiency']:.1%}
Examples of Cluster Leaders:
"""
# Add cluster leaders
for i, (cluster_id, ticker) in enumerate(sorted(cluster_leaders.items()), 1):
cluster_size = redundancy_stats['cluster_stats'][cluster_id]['size']
report += f" {i:2d}. Cluster {cluster_id:2d}: {ticker} (eliminated {cluster_size-1} redundant ETFs)\n"
report += f"\n{'=' * 80}\n"
report += f"READY FOR STAGE 3: Greedy Correlation Algorithm\n"
report += f"{'=' * 80}\n"
print(report)
return report'''
Usage:
def Initialize(self):
self.log = Log(self)
# code xxxxxx
self.log.log("---->1")
'''
from AlgorithmImports import *
import time
class Log():
def __init__(self, algo):
self.timer = round(time.time() * 1000)
self.algo = algo
self.maxLine = 600
self.count = 0
self.debug(f"Live mode={self.algo.live_mode}.....Log Initialized")
def log(self, message):
self.algo.Log(f"[LOG] {message}")
def info(self, message):
now = round(time.time() * 1000)
timer = (now - self.timer) / 1000
self.timer = now
if (self.algo.Time <= self.algo.Time.replace(hour=9, minute=35)):
self.algo.Log(f"[INFO] {message}")
def debug(self, message):
if (self.count < self.maxLine or self.algo.live_mode):
self.algo.Log(f"[DEUBG] {message}")
self.count += 1
def live(self, message):
if self.algo.live_mode:
self.algo.Log(f"[DEUBG] {message}")
# region imports
from AlgorithmImports import *
from security_initializer import CustomSecurityInitializer
from universe import QuantMomentumSelectionModel
from datetime import timedelta
from logger import Log
from utils import Utils
from alpha.ConstantNoBenchmarkAlphaModel import ConstantNoBenchmarkAlphaModel
from portfolio.VolTargetPortfolioConstructionModel import VolTargetPortfolioConstructionModel
# endregion
'''
Scope: ETF sector rotation using momentum pick testing with multi timeframe
'''
# lean project-create --language python "SEC04"
# lean cloud backtest "SEC04" --push --open
class SEC04(QCAlgorithm):
def initialize(self):
self.set_start_date(2020, 1, 1)
#self.set_end_date(2022, 11, 30)
#self.set_end_date(2020, 12, 31)
#self.set_start_date(2021, 8, 1)
#self.set_end_date(2022, 2, 28)
self.init_cash = 100000
self.set_cash(self.init_cash) # Set Strategy Cash
self._symbol = self.add_equity("SPY", Resolution.HOUR).Symbol
self.etf = "VOO"
self.logger = Log(self)
self.utils = Utils(self, self._symbol)
### Parameters ###
lookback = self.get_parameter("lookback", 50)
period = self.get_parameter("period", 21)
target_vol = self.get_parameter("target_vol", 0.4)
self.model = {}
### Reality Modeling ###
# Interactive Broker Brokerage fees and margin
self.settings.free_portfolio_value_percentage = 0.05
self.settings.rebalance_portfolio_on_insight_changes = True
self.settings.minimum_order_margin_portfolio_percentage = 0.05
self.set_security_initializer(CustomSecurityInitializer(InteractiveBrokersBrokerageModel(AccountType.CASH), FuncSecuritySeeder(self.get_last_known_prices)))
self.universe_settings.resolution = Resolution.DAILY
self.universe_settings.schedule.on(self.date_rules.week_start())
#self.universe_settings.schedule.on(self.date_rules.month_start())
self.add_universe_selection(QuantMomentumSelectionModel(self, lookback))
self.add_alpha(ConstantNoBenchmarkAlphaModel(self, InsightType.PRICE, InsightDirection.UP, timedelta(1)))
self.set_portfolio_construction(VolTargetPortfolioConstructionModel(self, target_vol = target_vol, period = period))
#self.set_portfolio_construction(EqualWeightingPortfolioConstructionModel())
self.set_execution(ImmediateExecutionModel())
self.schedule.on(self.date_rules.every_day(), self.time_rules.before_market_close(self._symbol, 0), self.utils.plot)
self.figi = {}
def on_end_of_algorithm(self):
self.utils.stats()
from AlgorithmImports import *
class LimitedEqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.UP, long targets are returned and
for insights of direction InsightDirection.DOWN, short targets are returned.'''
def __init__(self, algo, limited_percent = 0.1, rebalance = Resolution.DAILY, portfolio_bias = PortfolioBias.LONG_SHORT):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolio_bias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
super().__init__()
self.portfolio_bias = portfolio_bias
self.limited_percent = limited_percent
self.algo = algo
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancing_func
rebalancing_func = rebalance
if isinstance(rebalance, Resolution):
rebalance = Extensions.to_time_span(rebalance)
if isinstance(rebalance, timedelta):
rebalancing_func = lambda dt: dt + rebalance
if rebalancing_func:
self.set_rebalancing_func(rebalancing_func)
def determine_target_percent(self, active_insights):
'''Will determine the target percent for each insight
Args:
active_insights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.direction != InsightDirection.FLAT and self.respect_portfolio_bias(x) for x in active_insights)
percent = min(self.limited_percent, 0 if count == 0 else 0.98 / count)
has_active_insight: bool = any (
insight.symbol == self.algo._symbol for insight in active_insights
)
if has_active_insight:
count -= 1
#self.algo.log.debug(f"[LimitedEqualWeightingPortfolioConstructionModel] Active insights {[insight.symbol.value for insight in active_insights]} ")
for insight in active_insights:
if insight.symbol == self.algo._symbol:
result[insight] = 0
else:
result[insight] = (insight.direction if self.respect_portfolio_bias(insight) else InsightDirection.FLAT) * percent
return result
def respect_portfolio_bias(self, insight):
'''Method that will determine if a given insight respects the portfolio bias
Args:
insight: The insight to create a target for
'''
return self.portfolio_bias == PortfolioBias.LONG_SHORT or insight.direction == self.portfolio_bias# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from Portfolio.RiskParityPortfolioOptimizer import RiskParityPortfolioOptimizer
import numpy as np
### <summary>
### Risk Parity Portfolio Construction Model
### </summary>
### <remarks>Spinu, F. (2013). An algorithm for computing risk parity weights. Available at SSRN 2297383.
### Available at https://papers.ssrn.com/sol3/Papers.cfm?abstract_id=2297383</remarks>
class RiskParityVolTargetPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, algo,
rebalance = Resolution.DAILY,
portfolio_bias = PortfolioBias.LONG_SHORT,
lookback = 1,
period = 252, vol_period = 10, target_vol = 0.25,
resolution = Resolution.DAILY,
optimizer = None):
"""Initialize the model
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolio_bias: Specifies the bias of the portfolio (Short, Long/Short, Long)
lookback(int): Historical return lookback period
period(int): The time interval of history price to calculate the weight
resolution: The resolution of the history price
optimizer(class): Method used to compute the portfolio weights"""
super().__init__()
if portfolio_bias == PortfolioBias.SHORT:
raise ArgumentException("Long position must be allowed in RiskParityPortfolioConstructionModel.")
self.lookback = lookback
self.period = period
self.resolution = resolution
self.sign = lambda x: -1 if x < 0 else (1 if x > 0 else 0)
self.algo = algo
self.vol_period = vol_period
self.target_vol = target_vol
self.optimizer = RiskParityPortfolioOptimizer() if optimizer is None else optimizer
self._symbol_data_by_symbol = {}
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancing_func
rebalancing_func = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.to_time_span(rebalance)
if isinstance(rebalance, timedelta):
rebalancing_func = lambda dt: dt + rebalance
if rebalancing_func:
self.set_rebalancing_func(rebalancing_func)
def determine_target_percent(self, active_insights):
"""Will determine the target percent for each insight
Args:
active_insights: list of active insights
Returns:
dictionary of insight and respective target weight
"""
targets = {}
# If we have no insights just return an empty target list
if len(active_insights) == 0:
return targets
symbols = [insight.symbol for insight in active_insights]
# Create a dictionary keyed by the symbols in the insights with an pandas.series as value to create a data frame
returns = { str(symbol) : data.return_ for symbol, data in self._symbol_data_by_symbol.items() if symbol in symbols }
returns = pd.DataFrame(returns)
# The portfolio optimizer finds the optional weights for the given data
weights = self.optimizer.optimize(returns)
weights = pd.Series(weights, index = returns.columns)
for insight in active_insights:
ticker = str(insight.symbol)
history = self.algo.history(insight.symbol, self.vol_period, Resolution.DAILY)
if history.empty: return
closes = history['close'].values
log_returns = np.diff(np.log(closes))
std = float(np.std(log_returns, ddof=1))
realized_vol = math.sqrt(252) * std if std > 0 else 0.0
pos_pct = self.target_vol / realized_vol if realized_vol > 0 else 0.0
pos_pct = round(max(0.0, min(1, pos_pct)), 4)
targets[insight] = weights[ticker] * pos_pct
return targets
def on_securities_changed(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# clean up data for removed securities
super().on_securities_changed(algorithm, changes)
for removed in changes.removed_securities:
symbol_data = self._symbol_data_by_symbol.pop(removed.symbol, None)
symbol_data.reset()
algorithm.unregister_indicator(symbol_data.roc)
# initialize data for added securities
symbols = [ x.symbol for x in changes.added_securities ]
history = algorithm.history(symbols, self.lookback * self.period, self.resolution)
if history.empty: return
tickers = history.index.levels[0]
for ticker in tickers:
symbol = SymbolCache.get_symbol(ticker)
if symbol not in self._symbol_data_by_symbol:
symbol_data = self.RiskParitySymbolData(symbol, self.lookback, self.period)
symbol_data.warm_up_indicators(history.loc[ticker])
self._symbol_data_by_symbol[symbol] = symbol_data
algorithm.register_indicator(symbol, symbol_data.roc, self.resolution)
class RiskParitySymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, symbol, lookback, period):
self._symbol = symbol
self.roc = RateOfChange(f'{symbol}.roc({lookback})', lookback)
self.roc.updated += self.on_rate_of_change_updated
self.window = RollingWindow(period)
def reset(self):
self.roc.updated -= self.on_rate_of_change_updated
self.roc.reset()
self.window.reset()
def warm_up_indicators(self, history):
for tuple in history.itertuples():
self.roc.update(tuple.Index, tuple.close)
def on_rate_of_change_updated(self, roc, value):
if roc.is_ready:
self.window.add(value)
def add(self, time, value):
item = IndicatorDataPoint(self._symbol, time, value)
self.window.add(item)
@property
def return_(self):
return pd.Series(
data = [x.value for x in self.window],
index = [x.end_time for x in self.window])
@property
def is_ready(self):
return self.window.is_ready
def __str__(self, **kwargs):
return '{}: {:.2%}'.format(self.roc.name, self.window[0])
from AlgorithmImports import *
import numpy as np
from datetime import timedelta
import math
class VolTargetPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.UP, long targets are returned and
for insights of direction InsightDirection.DOWN, short targets are returned.'''
def __init__(self, algo, period=3, target_vol=0.3, rebalance = Resolution.DAILY, portfolio_bias = PortfolioBias.LONG_SHORT):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolio_bias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
super().__init__()
self.portfolio_bias = portfolio_bias
self.period = period
self.algo = algo
self.target_vol = target_vol
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancing_func
rebalancing_func = rebalance
if isinstance(rebalance, Resolution):
rebalance = Extensions.to_time_span(rebalance)
if isinstance(rebalance, timedelta):
rebalancing_func = lambda dt: dt + rebalance
if rebalancing_func:
self.set_rebalancing_func(rebalancing_func)
def determine_target_percent(self, active_insights):
'''Will determine the target percent for each insight
Args:
active_insights: The active insights to generate a target for'''
targets = {}
weights = {}
total = 0.0
count = 0
# If we have no insights just return an empty target list
if len(active_insights) == 0:
return targets
# give equal weighting to each security
for insight in active_insights:
ticker = str(insight.symbol)
if insight.direction == InsightDirection.FLAT:
weights[ticker] = 0.0
else:
history = self.algo.history(insight.symbol, self.period, Resolution.DAILY)
if history.empty: return
closes = history['close'].values
log_returns = np.diff(np.log(closes))
std = float(np.std(log_returns, ddof=1))
realized_vol = math.sqrt(252) * std if std > 0 else 0.0
pos_pct = self.target_vol / realized_vol if realized_vol > 0 else 0.0
if (insight.symbol == self.algo._symbol):
pos_pct = 0.0
else:
pos_pct = round(max(0.0, pos_pct), 4)
weights[ticker] = pos_pct
total += pos_pct
count += 1
if total > 0:
factor = 0.98 / total
else:
factor = 0.0
for insight in active_insights:
ticker = str(insight.symbol)
if insight.direction == InsightDirection.FLAT:
targets[insight] = 0.0
else:
targets[insight] = weights[ticker] * factor
return targets
def respect_portfolio_bias(self, insight):
'''Method that will determine if a given insight respects the portfolio bias
Args:
insight: The insight to create a target for
'''
return self.portfolio_bias == PortfolioBias.LONG_SHORT or insight.direction == self.portfolio_biasfrom AlgorithmImports import *
from sklearn.mixture import GaussianMixture
from scipy import stats
from sklearn.cluster import KMeans
from scipy.stats import wasserstein_distance
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
def load_crisis_dates():
return [
('2018-12-12', '2018-12-24'),
('2019-05-03', '2019-06-03')
]
def load_bull_dates():
return [
('2018-12-26', '2019-02-05'),
('2019-03-08', '2019-03-21')
]
'''
def load_crisis_dates():
return [
('2020-02-19', '2020-03-23'), # COVID-19 Crash
('2022-01-03', '2022-03-14'), # 2022 Bear Market
('2022-03-29', '2022-06-16'), # 2022 Bear Market
('2022-08-16', '2023-01-05'), # 2022 Bear Market
]
'''
def rolling_vol_regime(df, lookback):
df['returns'] = df['close'].pct_change()
df['rolling_vol_20d'] = df['returns'].rolling(window=lookback).std() * np.sqrt(252)
df['Signal'] = np.where(df['rolling_vol_20d'] <= 0.35, 1, 0)
df = df.reset_index()
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['Signal'].values[0]
return signal
def cpd_regime(df, lookback):
vol_quantile=0.7
dd_quantile=0.3
smooth_win=5
smooth_pct=0.6
df['Returns'] = df['close'].pct_change()
df['Volatility'] = df['Returns'].rolling(window=lookback*2).std()
df['Drawdown'] = (df['close'] / df['close'].cummax()) - 1
vol_thresh = df['Volatility'].quantile(vol_quantile)
dd_thresh = df['Drawdown'].quantile(dd_quantile)
df['CPD_Bear'] = ((df['Volatility'] > vol_thresh) | (df['Drawdown'] < dd_thresh)).astype(int)
df['CPD_Bear_Smooth'] = df['CPD_Bear'].rolling(window=smooth_win, min_periods=1).mean()
df['CPD_Signal'] = (df['CPD_Bear_Smooth'] >= smooth_pct).astype(int)
df['Signal'] = 1 - df['CPD_Signal'].shift(1).fillna(0)
df = df.reset_index()
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['Signal'].values[0]
return signal
def hmm_strategy(df, lookback, n_components=2):
df['Returns'] = df['close'].pct_change()
df['Rolling_Mean'] = df['Returns'].rolling(window=lookback).mean()
df['Rolling_Std'] = df['Returns'].rolling(window=lookback).std()
df['Rolling_Skew'] = df['Returns'].rolling(window=lookback).skew()
features = df[['Rolling_Mean', 'Rolling_Std', 'Rolling_Skew']].dropna().values
model = GaussianMixture(n_components=n_components, covariance_type='full', random_state=42)
states = model.fit_predict(features)
df['HMM_State'] = np.nan
df.loc[df[['Rolling_Mean', 'Rolling_Std', 'Rolling_Skew']].dropna().index, 'HMM_State'] = states
state_stats = df.groupby('HMM_State').agg({'Rolling_Std': 'mean'})
bear_state = state_stats['Rolling_Std'].idxmax()
df['HMM_Bear'] = (df['HMM_State'] == bear_state).astype(int)
df['Signal'] = 1 - df['HMM_Bear'].shift(1).fillna(0)
df = df.reset_index()
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['Signal'].values[0]
return signal
def stats_hmm_regime(df, lookback):
df = prepare_features(df, lookback)
# Store mean and std for key indicators
m3_vol_mean_crisis = 0.021387468345054863
m3_vol_std_crisis = 0.0066984480130070105
m3_momentum_mean_crisis = -0.03200106346901794
m3_drawdown_mean_crisis = -0.07457071751896066
# Calculate likelihood scores
vol_score = np.abs(df['volatility'] - m3_vol_mean_crisis) / m3_vol_std_crisis
momentum_score = (df['momentum'] < m3_momentum_mean_crisis).astype(float)
drawdown_score = (df['drawdown'] < m3_drawdown_mean_crisis).astype(float)
# Combined score
combined_score = vol_score + momentum_score + drawdown_score
# Threshold for regime classification
df['Signal'] = (combined_score <= 1.5).astype(int)
df = df.reset_index()
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['Signal'].values[0]
return signal
def features_clustering_regime(algo, df_train, df, lookback):
if not ("m2_crisis_cluster" in algo.model and "m2_kmeans" in algo.model):
df_train = df_train.reset_index()
df_train['time'] = pd.to_datetime(df_train['time'])
df_train['Date'] = df_train['time'].dt.strftime('%Y-%m-%d')
df_train = prepare_features(df_train, lookback)
# Select features for clustering
feature_cols = ['volatility', 'momentum', 'drawdown', 'skewness', 'trend_strength']
features_train = df_train[feature_cols].copy()
features_train = features_train.fillna(method='bfill').fillna(method='ffill')
crisis_dates = load_crisis_dates()
if crisis_dates is not None:
# Calibration mode
crisis_mask = pd.Series(False, index=df_train.index)
for start, end in crisis_dates:
mask = (df_train['Date'] >= start) & (df_train['Date'] <= end)
crisis_mask = crisis_mask | mask
# Normalize features
scaler = StandardScaler()
features_train_scaled = scaler.fit_transform(features_train)
m2_scaler = scaler
# Cluster into 3 regimes: normal, stressed, crisis
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
clusters = kmeans.fit_predict(features_train_scaled)
# Identify which cluster corresponds to crisis
cluster_crisis_ratio = []
for i in range(3):
cluster_mask = clusters == i
ratio = crisis_mask[cluster_mask].sum() / cluster_mask.sum()
cluster_crisis_ratio.append(ratio)
# Map clusters to regimes (crisis cluster = 1)
m2_crisis_cluster = np.argmax(cluster_crisis_ratio)
m2_kmeans = kmeans
algo.model['m2_crisis_cluster'] = m2_crisis_cluster
algo.model['m2_kmeans'] = m2_kmeans
algo.model['m2_scaler'] = m2_scaler
else:
m2_crisis_cluster = algo.model['m2_crisis_cluster']
m2_kmeans = algo.model['m2_kmeans']
m2_scaler = algo.model['m2_scaler']
# Prediction
df = df.reset_index()
df['time'] = pd.to_datetime(df['time'])
df['Date'] = df['time'].dt.strftime('%Y-%m-%d')
df = prepare_features(df, lookback)
# Select features for clustering
feature_cols = ['volatility', 'momentum', 'drawdown', 'skewness', 'trend_strength']
features = df[feature_cols].copy()
features = features.fillna(method='bfill').fillna(method='ffill')
features_scaled = m2_scaler.transform(features)
clusters = m2_kmeans.predict(features_scaled)
df['Signal'] = (clusters != m2_crisis_cluster).astype(int)
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['Signal'].values[0]
return signal
def features_bull_clustering_regime(algo, df_train, df, lookback):
if not ("m3_crisis_cluster" in algo.model and "m3_kmeans" in algo.model):
df_train = df_train.reset_index()
df_train['time'] = pd.to_datetime(df_train['time'])
df_train['Date'] = df_train['time'].dt.strftime('%Y-%m-%d')
df_train = prepare_features(df_train, lookback)
# Select features for clustering
feature_cols = ['volatility', 'momentum', 'drawdown', 'skewness', 'trend_strength']
features_train = df_train[feature_cols].copy()
features_train = features_train.fillna(method='bfill').fillna(method='ffill')
crisis_dates = load_bull_dates()
if crisis_dates is not None:
# Calibration mode
crisis_mask = pd.Series(False, index=df_train.index)
for start, end in crisis_dates:
mask = (df_train['Date'] >= start) & (df_train['Date'] <= end)
crisis_mask = crisis_mask | mask
# Normalize features
scaler = StandardScaler()
features_train_scaled = scaler.fit_transform(features_train)
m2_scaler = scaler
# Cluster into 3 regimes: normal, stressed, crisis
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
clusters = kmeans.fit_predict(features_train_scaled)
# Identify which cluster corresponds to crisis
cluster_crisis_ratio = []
for i in range(3):
cluster_mask = clusters == i
ratio = crisis_mask[cluster_mask].sum() / cluster_mask.sum()
cluster_crisis_ratio.append(ratio)
# Map clusters to regimes (crisis cluster = 1)
m2_crisis_cluster = np.argmax(cluster_crisis_ratio)
m2_kmeans = kmeans
algo.model['m3_crisis_cluster'] = m2_crisis_cluster
algo.model['m3_kmeans'] = m2_kmeans
algo.model['m3_scaler'] = m2_scaler
else:
m2_crisis_cluster = algo.model['m3_crisis_cluster']
m2_kmeans = algo.model['m3_kmeans']
m2_scaler = algo.model['m3_scaler']
# Prediction
df = df.reset_index()
df['time'] = pd.to_datetime(df['time'])
df['Date'] = df['time'].dt.strftime('%Y-%m-%d')
df = prepare_features(df, lookback)
# Select features for clustering
feature_cols = ['volatility', 'momentum', 'drawdown', 'skewness', 'trend_strength']
features = df[feature_cols].copy()
features = features.fillna(method='bfill').fillna(method='ffill')
features_scaled = m2_scaler.transform(features)
clusters = m2_kmeans.predict(features_scaled)
df['Signal'] = (clusters == m2_crisis_cluster).astype(int)
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['Signal'].values[0]
return signal
def mbr_regime(df, lookback):
df = prepare_features(df, lookback)
momentum_threshold = -2.6468268
df['MBR_Regime'] = np.where(df['close'].diff(10) > momentum_threshold, 1, 0)
df = df.reset_index()
signal = df.loc[df.groupby("symbol")["time"].idxmax()]['MBR_Regime'].values[0]
return signal
def features_wkclustering_regime(algo, df_train, df, lookback):
if not ("wk_risky_cluster" in algo.model and "wk_segments_train" in algo.model and "wk_cluster_labels" in algo.model):
df_train = df_train.reset_index()
df_train['time'] = pd.to_datetime(df_train['time'])
# Calculate returns for each symbol
df_train['Returns'] = df_train.groupby('symbol')['close'].pct_change()
# Get returns as a continuous series (assuming single symbol or merged data)
returns_train = df_train['Returns'].dropna().values
# Create overlapping windows of return distributions
step_size = max(1, lookback // 6) # Slide by ~1/6 of window
segments_train = []
segment_indices = []
for start in range(0, len(returns_train) - lookback, step_size):
segment = returns_train[start:start + lookback]
segments_train.append(segment)
segment_indices.append(start + lookback // 2)
# Compute pairwise Wasserstein distances
n_segments = len(segments_train)
dist_matrix = np.zeros((n_segments, n_segments))
for i in range(n_segments):
for j in range(i + 1, n_segments):
d = wasserstein_distance(segments_train[i], segments_train[j])
dist_matrix[i, j] = dist_matrix[j, i] = d
# Cluster into 2 regimes: normal vs stressed (or 3 if needed)
n_clusters = 2 # Can adjust to 3 for normal/stressed/crisis
kmeans_wasserstein = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
cluster_labels = kmeans_wasserstein.fit_predict(dist_matrix)
# Determine which cluster represents "risky" regime
# Risky regime typically has higher volatility
cluster_volatilities = []
for c in range(n_clusters):
cluster_mask = cluster_labels == c
cluster_segments = [segments_train[i] for i in range(len(segments_train)) if cluster_mask[i]]
avg_vol = np.mean([np.std(seg) for seg in cluster_segments])
cluster_volatilities.append(avg_vol)
risky_cluster = np.argmax(cluster_volatilities) # Higher volatility = risky
algo.model['wk_risky_cluster'] = risky_cluster
algo.model['wk_segments_train'] = segments_train
algo.model['wk_cluster_labels'] = cluster_labels
else:
risky_cluster = algo.model['wk_risky_cluster']
segments_train = algo.model['wk_segments_train']
cluster_labels = algo.model['wk_cluster_labels']
# PREDICTION PHASE
df = df.reset_index()
df['time'] = pd.to_datetime(df['time'])
df['Returns'] = df.groupby('symbol')['close'].pct_change()
# Get recent window of returns for prediction
returns_pred = df['Returns'].dropna().values
# Take the most recent window for comparison
if len(returns_pred) >= lookback:
recent_segment = returns_pred[-lookback:]
else:
recent_segment = returns_pred # Use all available if less than window
# Compute distances from recent segment to all training segments
distances_to_train = np.array([
wasserstein_distance(recent_segment, seg)
for seg in segments_train
])
# Find k-nearest training segments and vote on cluster
k = min(5, len(segments_train)) # Use 5 nearest neighbors
nearest_indices = np.argsort(distances_to_train)[:k]
nearest_clusters = cluster_labels[nearest_indices]
predicted_cluster = np.bincount(nearest_clusters).argmax()
# Signal: 1 if normal regime, 0 if risky regime
signal = 1 if predicted_cluster != risky_cluster else 0
return signal
def prepare_features(df, lookback_window):
"""Prepare features for regime detection"""
df = df.copy()
#df['Date'] = pd.to_datetime(df['time'])
#df = df.sort_values('Date').reset_index(drop=True)
# Returns
df['returns'] = df['close'].pct_change()
# Volatility (rolling std)
df['volatility'] = df['returns'].rolling(window=lookback_window).std()
# Realized volatility (alternative measure)
df['realized_vol'] = df['returns'].abs().rolling(window=lookback_window).mean()
# Rolling mean
df['rolling_mean'] = df['close'].rolling(window=lookback_window).mean()
# Price momentum
df['momentum'] = df['close'].pct_change(lookback_window)
# Rolling max drawdown
rolling_max = df['close'].rolling(window=lookback_window).max()
df['drawdown'] = (df['close'] - rolling_max) / rolling_max
# Skewness of returns
df['skewness'] = df['returns'].rolling(window=lookback_window).skew()
# Kurtosis of returns
df['kurtosis'] = df['returns'].rolling(window=lookback_window).kurt()
# VIX-like measure (rolling volatility of volatility)
df['vol_of_vol'] = df['volatility'].rolling(window=10).std()
# Trend strength
df['trend_strength'] = (df['close'] - df['rolling_mean']) / df['rolling_mean']
# Consecutive down days
df['down_days'] = (df['returns'] < 0).rolling(window=5).sum()
# Volume proxies (using price volatility as proxy since we don't have volume)
df['price_impact'] = df['returns'].abs() * df['volatility']
return dfimport numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
class SimplifiedARIMAStrategy:
def __init__(self, algo, holding):
self.alpha = 0.1
self.algo = algo
self.holding = holding
def generate_signals(self, window_prices):
position = self.holding
weights = np.exp(-self.alpha * np.arange(len(window_prices))[::-1])
weights /= weights.sum()
forecast = np.sum(window_prices * weights)
returns = np.diff(window_prices) / window_prices[:-1]
volatility = np.std(returns)
current_price = window_prices[-1]
z_score = (current_price - forecast) / volatility if volatility > 0 else 0
if window_prices[-1] < window_prices[0]:
return 0
if position == 0:
if z_score < -2:
return 1
elif position == 1:
if z_score > 0:
return 0
else:
return 1
return 0# region imports
from AlgorithmImports import *
# endregion
class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None:
super().__init__(brokerage_model, security_seeder)
def initialize(self, security: Security) -> None:
super().initialize(security)
security.set_slippage_model(VolumeShareSlippageModel())
security.set_settlement_model(ImmediateSettlementModel())
security.set_leverage(1.0)
security.set_buying_power_model(CashBuyingPowerModel())
security.set_fee_model(InteractiveBrokersFeeModel())
security.set_margin_model(SecurityMarginModel.NULL)# region imports
from AlgorithmImports import *
from cluster import *
import numpy as np
import numpy as np
import pandas as pd
import math
from scipy import stats
from dateutil.relativedelta import relativedelta
# endregion
class QuantMomentumSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self, algo, lookback):
self.algo = algo
self.lookback = lookback
self.etfs = [
'JNUG','IBLC','BITQ','DAPP','AAAU','AADR','AAXJ','ACES','ACIO','ACSI','ACWI','ACWV','ACWX','ADME','AESR','AFK','AFLG','AFMC','AFSM','AGG','AGGY','AGZD','AIA','AIEQ','AIRR','ALFA','ALTS','ALTY','AMOM','AMU','AMUB','ANGL','AOA','AOK','AOM','AOR','ARCM','ARGT','ARKQ','ARKW','ASEA','ASET','ASHR','ASHS','ATMP','AUSF','AVDE','AVDV','AVEM','AVUS','AVUV','BAB','BAPR','BAR','BATT','BAUG','BBAX','BBC','BBCA','BBEU','BBH','BBIN','BBJP','BBP','BBRE','BCD','BCI','BDCZ','BDEC','BDRY','BFIT','BFOR','BIBL','BIL','BIV','BIZD','BJAN','BJK','BJUL','BJUN','BKF','BKLN','BLCN','BLES','BLOK','BNO','BNOV','BOCT','BOSS','BOTZ','BOUT','BRF','BRZU','BSDE','BSEP','BTAL','BTEC','BUG','BUL','BUY','BWX','BWZ','CALF','CARZ','CATH','CCOR','CDC','CDL','CEFS','CEMB','CEW','CEZ','CFA','CFO','CGW','CHIQ','CIBR','CID','CIL','CLIX','CLOU','CMBS','CMDY','CN','CNBS','CNRG','CNXT','CNYA','COM','COMB','COMT','COPX','CORN','CORP','COW','COWZ','CPER','CPI','CQQQ','CRAK','CRBN','CSA','CSB','CSD','CSM','CUT','CVY','CWB','CWI','CWS','CXSE','CYB','CZA','DALI','DAUG','DAX','DBA','DBAW','DBB','DBC','DBE','DBEF','DBEM','DBEU','DBEZ','DBJP','DBMF','DBO','DBP','DBV','DDIV','DDLS','DDWM','DEEF','DEF','DEM','DES','DEUS','DEW','DFE','DFJ','DFNL','DGRE','DGRO','DGRS','DGRW','DGS','DGT','DGZ','DHS','DIA','DIAL','DIM','DINT','DIVB','DIVO','DJCI','DJD','DJP','DLN','DLS','DMRE','DNL','DNOV','DOG','DOL','DON','DOO','DRIV','DRN','DRSK','DSI','DSTL','DTD','DTEC','DTH','DURA','DUSA','DVLU','DVOL','DVP','DVY','DVYA','DVYE','DWAS','DWAW','DWM','DWMF','DWSH','DWUS','DWX','DXJ','DYNF','EAGG','EASG','EBIZ','ECNS','ECON','ECOW','EDEN','EDIV','EDOG','EDOW','EDV','EELV','EEM','EEMA','EEMO','EEMS','EEMV','EEMX','EES','EET','EFA','EFAD','EFAS','EFAV','EFAX','EFG','EFNL','EFV','EFZ','EIDO','EINC','EIRL','EIS','EJUL','ELD','EMB','EMDV','EMGF','GFOF','EMHY','EMIF','EMLC','EMLP','EMMF','EMNT','EMQQ','EMTL','EMTY','EMXC','ENFR','ENOR','ENZL','EPHE','EPI','EPOL','EPP','EPRF','EPS','EPU','EQAL','EQL','EQWL','ERM','ESGD','ESGE','ESGU','ESGV','ESML','ESPO','EUDG','EUDV','EUFN','EUM','EUSA','SATO','EVX','EWA','EWC','EWD','EWG','EWH','EWI','EWJ','EWJV','EWK','EWL','EWM','EWN','EWO','EWP','EWQ','EWS','EWT','EWU','EWUS','EWW','EWX','EWY','EWZ','EWZS','EXI','EXT','EYLD','EZA','EZM','EZU','FAAR','FAB','FAD','FALN','FAN','FAUG','FBT','FCA','FCAL','FCEF','FCG','FCOM','FCPI','FCTR','FCVT','FDD','FDEM','FDEV','FDHY','FDIS','FDL','FDLO','FDM','FDMO','FDN','FDNI','FDRR','FDT','FDVV','FEM','FEMB','FEMS','FENY','FEP','FEUZ','FEX','FEZ','FFIU','FFR','FFTY','FGD','FGM','FHLC','FID','FIDI','FIDU','FILL','FINX','FISR','FITE','FIVA','FIW','FIXD','FJP','FKU','FLAU','FLAX','FLBL','FLBR','FLCA','FLCH','FLCO','FLDR','FLEE','FLGB','FLGR','FLHY','FLIA','FLIN','FLJH','FLJP','FLKR','FLLA','FLMB','FLMI','FLMX','FLN','FLQL','FLQM','FLQS','FLRN','FLRT','FLSA','FLSP','FLSW','FLTB','FLTR','FLTW','FM','FMAT','FMB','FMF','FMHI','FMK','FNCL','FNDA','FNDB','FNDC','FNDE','FNDF','FNDX','FNGS','FNK','FNOV','FNX','FNY','FPA','FPE','FPEI','FPX','FPXE','FPXI','FQAL','FRDM','FREL','FRI','FSMB','FSMD','FSTA','FSZ','FTA','FTAG','FTC','FTCS','FTEC','FTGC','FTHI','FTLS','FTRI','FTSD','FTSL','FTSM','FTXG','FTXH','FTXL','FTXN','FTXO','FUD','FUTY','FV','FVAL','FVC','FVD','FVL','FXA','FXB','FXC','FXD','FXE','FXF','FXG','FXH','FXI','FXL','FXN','FXO','FXR','FXU','FXY','FXZ','FYC','FYLD','FYT','FYX','GAA','GAL','GAMR','GBDV','GBF','GBIL','GCC','GDMA','GDX','GDXJ','GEM','GFIN','GHYB','GHYG','GIGB','GII','GLD','GLDM','GLTR','GMF','GMOM','GNOM','GNR','GOAU','GOEX','GQRE','GREK','GRID','GRN','GSEU','GSEW','GSG','GSIE','GSJY','GSLC','GSP','GSSC','GTIP','GUNR','GURU','GUSH','GVAL','GVI','GVIP','GWX','GXC','GYLD','HACK','HAIL','HAP','HAUZ','HAWX','HDEF','HDG','HDMV','HDV','HEDJ','HEEM','HEFA','HERD','HERO','HEWJ','HEZU','HFXI','HIBL','HIPS','HLAL','HMOP','HNDL','HOMZ','HSCZ','HSPX','HTAB','HTEC','HTRB','HTUS','HUSV','HYD','HYDB','HYDW','HYEM','HYG','HYGH','HYGV','HYHG','HYLB','HYLD','HYLS','HYMB','HYUP','HYXU','HYZD','IAI','IAK','IAT','IAU','IBB','IBDQ','IBDR','IBDS','IBDT','IBDU','IBHE','IBMN','IBMO','IBMP','IBMQ','IBND','IBUY','ICF','ICLN','ICOW','IDEV','IDHQ','IDIV','IDLV','IDMO','IDNA','IDOG','IDRV','IDU','IDV','IDX','IEDI','IEF','IEFA','IEI','IEMG','IEO','IETC','IEUR','IEUS','IEV','IEZ','IFGL','IFRA','IFV','IG','IGBH','IGEB','IGF','IGHG','IGIB','IGLB','IGM','IGN','IGOV','IGSB','IGV','IHAK','IHDG','IHE','IHF','IHI','IIGD','IJH','IJJ','IJK','IJR','IJS','IJT','IJUL','ILF','ILTB','IMOM','IMTB','IMTM','INCO','INDA','INDL','INDS','INDY','INFR','INKM','INTF','IOO','IPAC','IPAY','IPKW','IPO','IPOS','IQDF','IQDG','IQDY','IQLT','IQSI','IQSU','ISCF','ISHG','ISMD','ISRA','ISTB','ITA','ITB','ITEQ','ITM','ITOT','IUS','IUSG','IUSV','IVAL','IVE','IVLU','IVOG','IVOL','IVOO','IVOV','IVV','IVW','IWB','IWC','IWD','IWF','IWL','IWM','IWN','IWO','IWP','IWR','IWS','IWV','IWX','IWY','IXC','IXG','IXJ','IXN','IXP','IXUS','IYC','IYE','IYF','IYG','IYH','IYJ','IYK','IYM','IYR','IYT','IYW','IYY','IYZ','IZRL','JETS','JHEM','JHMD','JHML','JHMM','JHSC','JMBS','JMIN','JMOM','JMUB','JOYY','JPEM','JPIN','JPMB','JPME','JPN','JPSE','JPUS','JPXN','JQUA','JSMD','JSML','JUST','JVAL','JXI','KARS','KBA','KBE','KBWB','KBWD','KBWP','KBWR','KBWY','KCE','KEMQ','KEMX','KGRN','KIE','KNG','KOCT','KOIN','KOMP','KORP','KRE','KRMA','KSA','KURE','KWEB','KXI','LDSF','LDUR','LEGR','LEMB','LFEQ','LGH','LGLV','LGOV','LIT','LKOR','LMBS','LOUP','LQD','LQDH','LQDI','LRGE','LRGF','LSAF','LTPZ','LVHD','LVHI','MAGA','MAGS','MBB','MBSD','MCHI','MDIV','MDY','MDYG','MDYV','MEAR','MFDX','MFEM','MFUS','MGC','MGK','MGV','MILN','MINT','MJ','MLN','MLPB','MLPX','MMIN','MMIT','MMTM','MNA','MOAT','MOM','MOO','MORT','MOTI','MOTO','MRGR','MUNI','MUST','MVIN','MXI','MYY','NACP','NANR','NERD','NETL','NFLT','NFRA','NFTY','NIB','NLR','NOCT','NORW','NTSX','NUAG','NUDM','NUEM','NUGT','NUHY','NULC','NULG','NULV','NUMG','NUMV','NURE','NUSA','NUSC','NXTG','OBOR','OCIO','OEF','OGIG','OIH','OIL','OILK','OLD','OMFL','OMFS','ONEO','ONEQ','ONEV','ONEY','ONLN','ORG','OSCV','OUNZ','OUSA','OUSM','OVF','OVL','OVM','OVS','PALL','PAPR','PAUG','PAWZ','PBD','PBE','PBJ','PBP','PBTP','PBUS','PBW','PCEF','PCY','PDBC','PDEC','PDN','PDP','PEJ','PEK','PEX','PEXL','PEY','PEZ','PFF','PFFA','PFFD','PFFR','PFI','PFIG','PFLD','PFM','PFXF','PGF','PGHY','PGJ','PGM','PGX','PHB','PHDG','PHO','PICB','PICK','PID','PIE','PIN','PIO','PIZ','PJAN','PJP','PJUL','PJUN','PKB','PKW','PLW','PNOV','PNQI','POCT','PPA','PPH','PPLN','PPLT','PPTY','PREF','PRF','PRFZ','PRN','PRNT','PSCC','PSCD','PSCF','PSCH','PSCI','PSCT','PSCU','PSEP','PSET','PSI','PSK','PSL','PSM','PSP','PSQ','PSR','PTEU','PTF','PTH','PTIN','PTLC','PTMC','PTNQ','PUI','PVAL','PVI','PWB','PWC','PWS','PWV','PWZ','PXE','PXF','PXH','PXI','PYZ','PZA','PZT','QABA','QAI','QARP','QAT','QCLN','QDEF','QDF','QDIV','QED','QEFA','QEMM','QGRO','QINT','QLC','QLTA','QLV','QLVD','QLVE','QMOM','QQEW','QQH','QQQ','QQQE','QQXT','QRFT','QTEC','QTUM','QUAL','QUS','QVAL','QWLD','QYLD','RAAX','RAFE','RALS','RDVY','RECS','REET','REGL','REK','REMX','RESI','REVS','REZ','RFCI','RFDA','RFDI','RFEM','RFEU','RFFC','RFG','RFV','RIGS','RINF','RING','RISE','RLY','RNEM','ROAM','ROBO','ROBT','RODM','ROKT','ROSC','ROUS','RPAR','RPG','RPV','RSP','RSX','RTH','RVNU','RWJ','RWK','RWL','RWM','RWO','RWR','RWX','RXI','RYLD','RYU','RZG','RZV','SBB','SBIO','SBM','SCHA','SCHB','SCHC','SCHD','SCHE','SCHF','SCHG','SCHH','SCHI','SCHJ','SCHK','SCHM','SCHO','SCHP','SCHQ','SCHR','SCHV','SCHX','SCJ','SCZ','SDCI','SDEM','SDG','SDIV','SDOG','SDVY','SDY','SECT','SEF','SEIX','SFY','SFYF','SFYX','SGDJ','SGDM','SGG','SGOL','SH','SHAG','SHE','SHM','SHY','SHYD','SHYL','SIL','SILJ','SIMS','SIVR','SIZE','SJB','SJNK','SKOR','SKYY','SLQD','SLV','SLVP','SLX','SLYG','SLYV','SMB','SMCP','SMDV','SMH','SMIN','SMLF','SMLV','SMMD','SMMU','SMMV','SMOG','SNPE','SNSR','SOCL','SOXX','SOYB','SPAB','SPBO','SPDV','SPDW','SPEM','SPEU','SPFF','SPGM','SPGP','SPHB','SPHD','SPHQ','SPHY','SPIP','SPLB','SPLG','SPLV','SPMB','SPMD','SPMO','SPMV','SPSB','SPSK','SPSM','SPTI','SPTL','SPTM','SPTS','SPUS','SPVM','SPVU','SPXE','SPXN','SPXT','SPXV','SPY','SPYD','SPYG','SPYV','SPYX','SQLV','SRET','SRLN','SRVR','SSPY','STOT','STPZ','SUB','SUSA','SUSB','SUSC','SUSL','SVXY','SWAN','SYV','SZNE','TAGS','TAIL','TAN','TAO','TAXF','TBF','TBLU','TBX','TDIV','TDTF','TDTT','TDV','TFI','TFLO','THD','TILT','TIPX','TIPZ','TLH','TLT','TLTD','TLTE','TMDV','TMFC','TOK','TOKE','TOLZ','TOTL','TPHD','TPIF','TPLC','TPSC','TPYP','TRTY','TUR','UAE','UBOT','UCIB','UCON','UDN','UEVM','UFO','UGA','UITB','UIVM','ULVM','UNG','URA','URNM','URTH','USCI','USFR','USIG','USL','USMC','USMF','USMV','USO','USOI','USRT','USSG','USTB','USVM','UTES','UTRN','UUP','VALQ','VAMO','VAW','VB','VBK','VBR','VCIT','VCR','VDC','VDE','VEA','VEGA','VEGI','VEGN','VEU','VFH','VFMF','VFMO','VFMV','VFQY','VFVA','VGIT','VGK','VGLT','VGSH','VGT','VHT','VIDI','VIG','VIGI','VIOG','VIOO','VIOV','VIS','VIXM','VIXY','VLU','VMBS','VNLA','VNM','VNQ','VNQI','VO','VOE','VONE','VONG','VONV','VOO','VOOG','VOOV','VOT','VOX','VPC','VPL','VPU','VRAI','VRIG','VRP','VSDA','VSGX','VSL','VSMV','VSS','VT','VTC','VTHR','VTI','VTIP','VTV','VTWG','VTWO','VTWV','VUG','VUSE','VV','VWO','VWOB','VXF','VXUS','VXX','VXZ','VYM','VYMI','WBIY','WCLD','WDIV','WEBL','WIL','WIP','WLDR','WOMN','WOOD','WPS','WTMF','XAR','XBI','XCEM','XDIV','XHB','XHE','XHS','XITK','XLB','XLC','XLE','XLF','XLG','XLI','XLK','XLP','XLRE','XLSR','XLU','XLV','XLY','XME','XMHQ','XMLV','XMMO','XMPT','XMVM','XNTK','XPH','XRLV','XRT','XSD','XSHD','XSHQ','XSLV','XSMO','XSOE','XSVM','XSW','XT','XTL','XTN','YLD','YOLO','YXI','YYY','ZIG','ZROZ','EJAN','IJAN','KJAN','NJAN','RDOG','GXG','PLTM','IBTE','AFIF','XES','AMZA','MLPA','GLIN','GLDI','AMLP','JPIB','HYXF','TDSA','BTEK','TGIF','LIV','DOCT','INDF','VPN','MOON','DEFN','FEVR','BNE','QQD','QPT','OVT','EPRE','PVAL','CRUZ','UNL','SMI','CYA','HHH','INNO','WTAI','BKES','RJA','IUSA','CWC','CSH','WEAT','DAM','PSCE','NBCC','CANE','WEED','UAV','ORFN','SESG','EMCA','SLVO','INC','TGN','INTL','DIP','PBL','ENAV','BIGT','CETF','TUNE','PXJ','KEM','ALUM','INOV','SOF','CHAI','PSH','ISPY','RTRE','AUGM','LIAB','GLBL','TRSY','INFO','SCY','EDGE','EGLE','FLXN','XDIV','BITK','ESK'
]
self.scores = {}
def create_universes(self, algorithm: QCAlgorithm) -> list[Universe]:
return [FundamentalUniverseFactory(Market.USA, algorithm.universe_settings,
lambda fundamental: self.select(algorithm, fundamental))]
def days_from_mid_month(self, date=None):
"""
Calculate days from the 15th of the reference month.
- If day >= 15: counts from 15th of current month
- If day < 15: counts from 15th of previous month
"""
if date is None:
date = datetime.now()
if date.day >= 15:
mid_month = datetime(date.year, date.month, 15)
else:
prev_month = date - relativedelta(months=1)
mid_month = datetime(prev_month.year, prev_month.month, 15)
return (date - mid_month).days
def select(self, algo: QCAlgorithm, coarse: list[Fundamental]) -> list[Symbol]:
distance_matrix = None
filtered = [x.symbol for x in coarse if not x.has_fundamental_data and x.price > 10 and x.dollar_volume > 500_000 and x.market_cap == 0 and x.symbol.value in self.etfs]
revert_etf = [x.symbol for x in coarse if x.symbol.value == "VOO"]
final = revert_etf
#self.algo.logger.debug(f"[universe] Step 1 - {len(filtered)} stock(s) scanned")
current_date = self.algo.time
if current_date.day < 15:
minus_date = current_date.day - 1
else:
minus_date = current_date.day - 15
#minus_date = self.days_from_mid_month(current_date)
#minus_date = current_date.day - 1
#minus_date = 0
data = self.algo.history(filtered, self.algo.time - timedelta(minus_date) - timedelta(500), self.algo.time - timedelta(minus_date), Resolution.DAILY)
#data = self.algo.history(filtered, 500, Resolution.DAILY)
if data.empty: return final
raw = data.copy()
# ═══════════════════════════════════════════════════════════════════════
# PHASE 1: MOMENTUM FILTERING
# ═══════════════════════════════════════════════════════════════════════
#momentum_symbols = self.momentum_filter(data)
top_symbols, momentum_symbols = self.momentum_filter(data)
final = list(dict.fromkeys(final + top_symbols))
return final
def data_preparation(self, data):
prices = pd.DataFrame(index=data.index)
data = data.reset_index()
data['Date'] = pd.to_datetime(data['time']).dt.date
data = data.drop(columns=['time'])
prices = data.pivot(index='Date', columns='symbol', values='close')
log_returns = np.log(prices / prices.shift(1))
log_returns = log_returns.iloc[1:]
non_null_ratio = log_returns.notna().sum() / len(log_returns)
valid_etfs = non_null_ratio[non_null_ratio >= 1].index
cleaned_returns = log_returns[valid_etfs].round(6)
cleaned_returns = cleaned_returns.T.drop_duplicates().T
correlation_matrix = cleaned_returns.corr(method='pearson')
distance_matrix = np.sqrt(0.5 * (1 - correlation_matrix))
return correlation_matrix, distance_matrix, cleaned_returns
def momentum_filter(self, history: pd.DataFrame):
# Work on a copy to avoid SettingWithCopy pitfalls
history = history.copy()
# ----- per‑symbol lagged prices -----
g = history.groupby(level='symbol')
history['price_10d'] = g['close'].shift(10)
history['price_1m'] = g['close'].shift(21)
history['price_2m'] = g['close'].shift(42)
history['price_3m'] = g['close'].shift(63)
history['price_6m'] = g['close'].shift(126)
history['price_9m'] = g['close'].shift(189)
history['price_12m'] = g['close'].shift(252)
history['10d_returns'] = (history['close'] - history['price_10d']) / history['price_10d']
history['1m_returns'] = (history['close'] - history['price_1m']) / history['price_1m']
history['2m_returns'] = (history['close'] - history['price_2m']) / history['price_2m']
history['3m_returns'] = (history['close'] - history['price_3m']) / history['price_3m']
history['6m_returns'] = (history['close'] - history['price_6m']) / history['price_6m']
history['9m_returns'] = (history['close'] - history['price_9m']) / history['price_9m']
history['12m_returns'] = (history['close'] - history['price_12m']) / history['price_12m']
history['ema_10'] = g['close'].transform(lambda x: x.ewm(span=10, adjust=False).mean())
history['ema_20'] = g['close'].transform(lambda x: x.ewm(span=20, adjust=False).mean())
history['ema_50'] = g['close'].transform(lambda x: x.ewm(span=50, adjust=False).mean())
history['uptrend'] = np.where(
(history['close'] > history['ema_10']) &
(history['ema_10'] > history['ema_20']) & (history['ema_20'] > history['ema_50']),
1,
0
)
history['uptrend_rate_21d'] = g['uptrend'].transform(lambda x: x.rolling(window=21, min_periods=1).sum())
# ----- per‑symbol 100d high ratio -----
history['52w_high_lag'] = history['close'] / g['close'].transform(lambda x: x.rolling(window=252).max())
# ----- momentum score -----
history['momentum_score'] = (
history['12m_returns']
)
rets = g['close'].transform(lambda x: x.pct_change())
history['sortino_score'] = history['momentum_score'] / rets.where(rets < 0, 0).groupby(level='symbol').transform(lambda x: x.rolling(42).std())
top = history
# ----- take latest bar per symbol -----
top = top.reset_index()
top = top.loc[top.groupby("symbol")["time"].idxmax()]
# ----- filters -----
#top = top[(top['1m_returns'] > 0.01) & (top['2m_returns'] > 0.02) | (top['3m_returns'] > 0.03)]
#top = top[top['52w_high_lag'] > 0.9]
r_squared_dict = {}
for symbol, group_data in history.groupby(level='symbol'):
symbol_data = group_data.tail(63)
prices = symbol_data['close'].values
returns = np.diff(prices) / prices[:-1] * 100
days = np.arange(len(prices))
slope, intercept, r_value, p_value, std_err = stats.linregress(days, prices)
r_squared = r_value ** 2
r_squared_dict[symbol] = round(r_squared,4)
top['r_squared'] = top['symbol'].map(r_squared_dict)
top = top.sort_values('r_squared', ascending=False).head(int(len(top) * 0.9))
top = top[top['momentum_score'] >= top['momentum_score'].quantile(0.9)]
top = top[top['uptrend_rate_21d'] >= top['uptrend_rate_21d'].quantile(0.9)]
# ----- refresh scores dict -----
self.scores = {}
score_map = (
top.set_index('symbol')['sortino_score']
.round(4)
.to_dict()
)
self.scores.update(score_map)
picked_symbols = top['symbol'].tolist()
top_symbols = top.nlargest(1, 'sortino_score')['symbol'].tolist()
return top_symbols, picked_symbols
def avg_correlation(self, returns, lookback):
corr = returns.tail(lookback).corr()
arr = corr.to_numpy()
n = arr.shape[0]
tri = np.triu_indices(n=n, k=1)
upper_vals = arr[tri]
return float(np.nanmean(upper_vals)) if upper_vals.size > 0 else float("nan")
from AlgorithmImports import *
from Newtonsoft.Json import JsonConvert
import System
import psutil
class Utils():
def __init__(self, algo, ticker):
self.algo = algo
self.ticker = ticker
self.mkt = []
self.insights_key = f"{self.algo.project_id}/Live_{self.algo.live_mode}_insights"
self.algo.set_benchmark(ticker)
self._initial_portfolio_value = self.algo.init_cash
self._initial_benchmark_price = 0
self._portfolio_high_watermark = 0
self.init_chart()
def init_chart(self):
chart_name = "Strategy Performance"
chart = Chart(chart_name)
strategy_series = Series("Strategy", SeriesType.LINE, 0, "$")
strategy_series.color = Color.ORANGE
chart.add_series(strategy_series)
benchmark_series = Series("Benchmark", SeriesType.LINE, 0, "$")
benchmark_series.color = Color.LIGHT_GRAY
chart.add_series(benchmark_series)
drawdown_series = Series("Drawdown", SeriesType.LINE, 1, "%")
drawdown_series.color = Color.INDIAN_RED
chart.add_series(drawdown_series)
allocation_series = Series("Allocation", SeriesType.LINE, 2, "%")
allocation_series.color = Color.CORNFLOWER_BLUE
chart.add_series(allocation_series)
holding_series = Series("Holdings", SeriesType.LINE, 3, "")
holding_series.color = Color.YELLOW_GREEN
chart.add_series(holding_series)
self.algo.add_chart(chart)
def plot(self):
if self.algo.live_mode or self.algo.is_warming_up:
return
# Capture initial reference values
if self._initial_portfolio_value == 0.0:
self._initial_portfolio_value = float(self.algo.portfolio.total_portfolio_value)
benchmark_price = float(self.algo.securities[self.algo._symbol].price)
if self._initial_benchmark_price == 0.0 and benchmark_price > 0.0:
self._initial_benchmark_price = benchmark_price
# Ensure both initial values are set
if self._initial_portfolio_value == 0.0 or self._initial_benchmark_price == 0.0:
return
# Current values
current_portfolio_value = float(self.algo.portfolio.total_portfolio_value)
# Defensive check (avoid division by zero)
if self._initial_portfolio_value == 0.0 or self._initial_benchmark_price == 0.0:
return
# Normalize (start at 1.0)
normalized_portfolio = current_portfolio_value / self._initial_portfolio_value
normalized_benchmark = benchmark_price / self._initial_benchmark_price
current_value = self.algo.portfolio.total_portfolio_value
if current_value > self._portfolio_high_watermark:
self._portfolio_high_watermark = current_value
drawdown = 0.0
if self._portfolio_high_watermark != 0.0:
drawdown = (current_value - self._portfolio_high_watermark) / self._portfolio_high_watermark * 100.0
holding_count = 0
for symbol in list(self.algo.securities.keys()):
if symbol is None:
continue
holding = self.algo.portfolio[symbol]
if holding is None or not holding.invested:
continue
holding_count += 1
chart_name = "Strategy Performance"
self.algo.plot(chart_name, "Drawdown", drawdown)
self.algo.plot(chart_name, "Strategy", normalized_portfolio*self.algo.init_cash)
self.algo.plot(chart_name, "Benchmark", normalized_benchmark*self.algo.init_cash)
self.algo.plot(chart_name, "Allocation", round(self.algo.portfolio.total_holdings_value / self.algo.portfolio.total_portfolio_value,2)*100)
self.algo.plot(chart_name, "Holdings", holding_count)
self.algo.plot('Strategy Equity', self.ticker, normalized_benchmark*self.algo.init_cash)
def pctc(no1, no2):
return((float(str(no2))-float(str(no1)))/float(str(no1)))
def stats(self):
df = None
trades = self.algo.trade_builder.closed_trades
for trade in trades:
data = {
'symbol': trade.symbol,
'time': trade.entry_time,
'entry_price': trade.entry_price,
'exit_price': trade.exit_price,
'pnl': trade.profit_loss,
'pnl_pct': (trade.exit_price - trade.entry_price)/trade.entry_price,
}
df = pd.concat([pd.DataFrame(data=data, index=[0]), df])
if df is not None:
profit = df.query('pnl >= 0')['pnl'].sum()
loss = df.query('pnl < 0')['pnl'].sum()
avgWinPercentPerWin = "{0:.2%}".format(df.query('pnl >= 0')['pnl_pct'].mean())
avgLostPercentPerLost = "{0:.2%}".format(df.query('pnl < 0')['pnl_pct'].mean())
maxLost = "{0:.2%}".format(df.query('pnl < 0')['pnl_pct'].min())
maxWin = "{0:.2%}".format(df.query('pnl > 0')['pnl_pct'].max())
self.algo.set_summary_statistic("*Profit Ratio", round(profit / abs(loss),2))
self.algo.set_summary_statistic("Avg. Win% Per Winner", avgWinPercentPerWin)
self.algo.set_summary_statistic("Avg. Lost% Per Losser", avgLostPercentPerLost)
self.algo.set_summary_statistic("Max Loss%", maxLost)
self.algo.set_summary_statistic("Max Win%", maxWin)
def read_insight(self):
if self.algo.object_store.contains_key(self.insights_key) and self.algo.live_mode:
insights = self.algo.object_store.read_json[System.Collections.Generic.List[Insight]](self.insights_key)
self.algo.log.debug(f"Read {len(insights)} insight(s) from the Object Store")
self.algo.insights.add_range(insights)
#self.algo.object_store.delete(self.insights_key)
def store_insight(self):
if self.algo.live_mode:
insights = self.algo.insights.get_insights(lambda x: x.is_active(self.algo.utc_time))
# If we want to save all insights (expired and active), we can use
# insights = self.insights.get_insights(lambda x: True)
self.algo.log.debug(f"Save {len(insights)} insight(s) to the Object Store.")
content = ','.join([JsonConvert.SerializeObject(x) for x in insights])
self.algo.object_store.save(self.insights_key, f'[{content}]')
def trace_memory(self, name):
self.algo.log.debug(f"[{name}] RAM memory % used: {psutil.virtual_memory()[2]} / RAM Used (GB): {round(psutil.virtual_memory()[3]/1000000000,2)}")