| Overall Statistics |
|
Total Orders 58 Average Win 0.44% Average Loss -0.33% Compounding Annual Return 5.982% Drawdown 2.100% Expectancy 1.018 Start Equity 100000 End Equity 113816.00 Net Profit 13.816% Sharpe Ratio -0.634 Sortino Ratio -0.783 Probabilistic Sharpe Ratio 94.170% Loss Rate 13% Win Rate 87% Profit-Loss Ratio 1.32 Alpha -0.016 Beta 0.034 Annual Standard Deviation 0.022 Annual Variance 0 Information Ratio -0.636 Tracking Error 0.13 Treynor Ratio -0.409 Total Fees $0.00 Estimated Strategy Capacity $6500000.00 Lowest Capacity Asset E R735QTJ8XC9X Portfolio Turnover 0.18% Drawdown Recovery 142 |
from AlgorithmImports import *
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import warnings
# Ignore sklearn convergence warnings for clean console logs
warnings.filterwarnings("ignore", category=UserWarning)
# ==============================================================================
# 1. MARCENKO-PASTUR COVARIANCE DENOISING
# ==============================================================================
class CovarianceDenoising:
@staticmethod
def denoise_covariance(cov, T, N):
"""
Applies Analytical Marcenko-Pastur Denoising (Constant Residual Eigenvalue)
T = Number of observations (days of history)
N = Number of variables (stocks)
"""
# 1. Covariance to Correlation
std = np.sqrt(np.diag(cov))
corr = cov.values / np.outer(std, std)
corr[corr < -1], corr[corr > 1] = -1, 1 # Clip bounds for floating point dust
# 2. Eigen Decomposition
e_vals, e_vecs = np.linalg.eigh(corr)
indices = e_vals.argsort()[::-1]
e_vals, e_vecs = e_vals[indices], e_vecs[:, indices]
# 3. Marcenko-Pastur Maximum Eigenvalue
q = T / float(N)
e_max = (1 + np.sqrt(1. / q)) ** 2
# 4. Find number of signal factors
n_facts = e_vals[e_vals > e_max].shape[0]
# 5. Denoise by constant residual eigenvalue
if n_facts > 0 and n_facts < N:
e_vals_ = np.copy(e_vals)
e_vals_[n_facts:] = e_vals_[n_facts:].sum() / float(N - n_facts)
corr_denoised = np.dot(e_vecs, e_vals_ * e_vecs.T)
d = np.diag(corr_denoised)
corr_denoised = corr_denoised / np.sqrt(np.outer(d, d))
else:
corr_denoised = corr
# 6. Correlation back to Covariance
cov_denoised = corr_denoised * np.outer(std, std)
return pd.DataFrame(cov_denoised, index=cov.index, columns=cov.columns)
# ==============================================================================
# 2. NESTED CLUSTERED OPTIMIZATION (NCO) + MEAN VARIANCE OPTIMIZATION (MVO)
# ==============================================================================
class NestedClusteredOptimisation:
def __init__(self, max_clusters=10):
self.max_clusters = max_clusters
self.weights = None
def _get_optimal_clusters(self, corr):
"""Uses Silhouette Scores to find the optimal K-Means cluster count"""
max_k = min(self.max_clusters, corr.shape[0] - 1)
if max_k < 2:
return {0: corr.columns.tolist()}
best_k, best_score = 2, -1
for k in range(2, max_k + 1):
kmeans = KMeans(n_clusters=k, n_init=10, random_state=42)
kmeans.fit(corr)
sil_score = silhouette_score(corr, kmeans.labels_)
if sil_score > best_score:
best_score = sil_score
best_k = k
kmeans = KMeans(n_clusters=best_k, n_init=10, random_state=42)
kmeans.fit(corr)
clusters = {i: corr.columns[np.where(kmeans.labels_ == i)[0]].tolist() for i in range(best_k)}
return clusters
def _opt_port(self, cov):
"""Calculates Markowitz Minimum-Variance (MVO) weights for a given covariance matrix"""
inv_cov = np.linalg.pinv(cov.values)
ones = np.ones(len(inv_cov))
w = np.dot(inv_cov, ones)
# Normalize weights so they sum to 1
w /= np.sum(w)
return pd.Series(w, index=cov.columns)
@staticmethod
def _cov2corr(cov):
"""Converts Covariance to Correlation"""
std = np.sqrt(np.diag(cov))
corr = cov / np.outer(std, std)
corr[corr < -1], corr[corr > 1] = -1, 1
return corr
def allocate(self, covariance):
"""Executes the NCO + MVO Pipeline"""
corr = self._cov2corr(covariance)
clusters = self._get_optimal_clusters(corr)
# Step 1: Intra-cluster MVO weights
intra_weights = pd.DataFrame(0.0, index=covariance.index, columns=clusters.keys())
for i, cluster in clusters.items():
cov_cluster = covariance.loc[cluster, cluster]
intra_weights.loc[cluster, i] = self._opt_port(cov_cluster)
# Step 2: Inter-cluster MVO weights
cov_inter = intra_weights.T.dot(covariance).dot(intra_weights)
inter_weights = self._opt_port(cov_inter)
# Step 3: Final weight = Intra * Inter
self.weights = intra_weights.mul(inter_weights, axis=1).sum(axis=1)
return self.weights
# ==============================================================================
# 3. QUANTCONNECT ALGORITHM: 100-STOCK NCO-MVO (QUARTERLY + SMART STOP)
# ==============================================================================
class UltimateFactorNCO(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2024, 1, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Minute
self.AddUniverse(self.FundamentalSelection)
self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
# Target exact 100 stock portfolio
self.final_count = 100
self.candidates = []
# Execution Variables
self.current_quarter = 0
self.pending_weights = {}
self.pending_liquidations = []
self.weight_buffer = 0.02
# Smart Stop Loss Variables
self.trailing_stop_pct = 0.15
self.high_water_marks = {}
self.stop_loss_blacklist = set()
self.SetWarmUp(126) # Match warmup to our new history lookback
self.SetBrokerageModel(BrokerageName.ALPACA)
# QUARTERLY Rebalance Schedule
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.QueueTrades)
def FundamentalSelection(self, fundamental):
"""Builds a strict 100-stock Value universe: 60 Large, 30 Mid, 10 Small every Quarter"""
current_q = (self.Time.month - 1) // 3 + 1
if current_q == self.current_quarter:
return Universe.Unchanged
self.current_quarter = current_q
self.stop_loss_blacklist.clear()
# 1. Base Filter (Ex-Financials/Real Estate)
filtered = [f for f in fundamental if f.HasFundamentalData
and f.Price > 5
and f.MarketCap > 1e8
and f.ValuationRatios.PBRatio > 0
and f.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.FinancialServices
and f.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.RealEstate]
if len(filtered) < 1000: return Universe.Unchanged
# 2. Sort by size to define strata
sorted_by_size = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)
large_caps = sorted_by_size[:200]
mid_caps = sorted_by_size[200:500]
small_caps = sorted_by_size[500:1000]
def get_value_stocks(bucket, count):
sorted_bucket = sorted(bucket, key=lambda x: 1 / x.ValuationRatios.PBRatio, reverse=True)
return [x.Symbol for x in sorted_bucket[:count]]
# 3. Extract exact 60/30/10 ratio for 100 stocks
large_value_symbols = get_value_stocks(large_caps, 60)
mid_value_symbols = get_value_stocks(mid_caps, 30)
small_value_symbols = get_value_stocks(small_caps, 10)
self.candidates = large_value_symbols + mid_value_symbols + small_value_symbols
self.Debug(f"Q{self.current_quarter} Universe Generated: 100 Stratified Value Targets")
return self.candidates
def QueueTrades(self):
if self.IsWarmingUp or not self.candidates: return
# Enforce Quarterly Execution
if self.Time.month not in [1, 4, 7, 10]:
return
# Risk-Off Check
spy_history = self.History(self.spy, 200, Resolution.Daily)
if not spy_history.empty:
spy_current = spy_history['close'].iloc[-1]
spy_sma = spy_history['close'].mean()
if spy_current < spy_sma:
self.Liquidate()
self.pending_weights.clear()
self.Debug("Market Risk-Off: Liquidating to Cash")
return
active_candidates = [c for c in self.candidates if c not in self.stop_loss_blacklist]
# Fetch 126 days of history (approx 6 months) to support 100 stocks
history = self.History(active_candidates, 126, Resolution.Daily)
if history.empty: return
prices = history['close'].unstack(level=0).ffill().dropna(axis=1)
if prices.empty: return
mom_scores = (prices.iloc[-1] / prices.iloc[0]) - 1
top_symbols = mom_scores.sort_values(ascending=False).head(self.final_count).index.tolist()
target_prices = prices[top_symbols]
target_prices.index = pd.to_datetime(target_prices.index)
try:
# 1. Base Math Data
target_returns = target_prices.pct_change().dropna(how='all')
raw_cov = target_returns.cov()
# 2. MARCENKO-PASTUR DENOISING
T = len(target_returns)
N = len(target_returns.columns)
denoised_cov = CovarianceDenoising.denoise_covariance(raw_cov, T, N)
# 3. Execute NCO + MVO Pipeline
nco = NestedClusteredOptimisation(max_clusters=10)
nco_weights = nco.allocate(covariance=denoised_cov)
# 4. Queue Liquidations
current_holdings = [x.Key for x in self.Portfolio if x.Value.Invested]
self.pending_liquidations = [sym for sym in current_holdings if sym not in nco_weights.index]
# 5. Apply Weight Buffer
self.pending_weights.clear()
for symbol, target_weight in nco_weights.items():
if target_weight <= 0.0001: continue
current_weight = 0
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
current_weight = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue
weight_delta = abs(target_weight - current_weight)
if weight_delta >= self.weight_buffer:
self.pending_weights[symbol] = round(target_weight, 4)
self.ExecutePendingTrades()
except Exception as e:
self.Debug(f"NCO Execution Error: {e}")
def OnData(self, data):
"""Intraday Smart Stop Loss & Fallback Execution"""
for symbol in list(self.Portfolio.Keys):
if not self.Portfolio[symbol].Invested:
if symbol in self.high_water_marks:
del self.high_water_marks[symbol]
continue
price = self.Securities[symbol].Price
if price == 0: continue
hwm = self.high_water_marks.get(symbol, price)
if price > hwm:
self.high_water_marks[symbol] = price
elif price < hwm * (1 - self.trailing_stop_pct):
self.SetHoldings(symbol, 0)
self.stop_loss_blacklist.add(symbol)
del self.high_water_marks[symbol]
self.Debug(f"Stop Loss: Liquidated {symbol.Value} at {price}. Blacklisted until next quarter.")
if not self.pending_weights and not self.pending_liquidations: return
if self.Time.minute % 10 == 0:
self.ExecutePendingTrades()
def ExecutePendingTrades(self):
completed_liquidations = []
for symbol in self.pending_liquidations:
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.SetHoldings(symbol, 0)
completed_liquidations.append(symbol)
for symbol in completed_liquidations:
self.pending_liquidations.remove(symbol)
completed_allocations = []
for symbol, weight in self.pending_weights.items():
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.SetHoldings(symbol, weight)
completed_allocations.append(symbol)
for symbol in completed_allocations:
del self.pending_weights[symbol]from AlgorithmImports import *
import pandas as pd
import numpy as np
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.spatial.distance import squareform
from sklearn.covariance import OAS
# ==============================================================================
# MARCOS LÓPEZ DE PRADO HRP CLASSES (with Shrinkage & Modifications)
# ==============================================================================
class HierarchicalRiskParity:
def __init__(self):
self.weights = list()
self.seriated_correlations = None
self.seriated_distances = None
self.ordered_indices = None
self.clusters = None
@staticmethod
def _tree_clustering(correlation, method='single'):
distances = np.sqrt((1 - correlation).round(5) / 2)
clusters = linkage(squareform(distances.values), method=method)
return distances, clusters
def _quasi_diagnalization(self, num_assets, curr_index):
if curr_index < num_assets:
return [curr_index]
left = int(self.clusters[curr_index - num_assets, 0])
right = int(self.clusters[curr_index - num_assets, 1])
return (self._quasi_diagnalization(num_assets, left) + self._quasi_diagnalization(num_assets, right))
def _get_seriated_matrix(self, assets, distances, correlations):
ordering = assets[self.ordered_indices]
seriated_distances = distances.loc[ordering, ordering]
seriated_correlations = correlations.loc[ordering, ordering]
return seriated_distances, seriated_correlations
def _recursive_bisection(self, covariances, assets):
self.weights = pd.Series(1.0, index=self.ordered_indices)
clustered_alphas = [self.ordered_indices]
while clustered_alphas:
clustered_alphas = [cluster[start:end]
for cluster in clustered_alphas
for start, end in ((0, len(cluster) // 2), (len(cluster) // 2, len(cluster)))
if len(cluster) > 1]
for subcluster in range(0, len(clustered_alphas), 2):
left_cluster = clustered_alphas[subcluster]
right_cluster = clustered_alphas[subcluster + 1]
left_subcovar = covariances.iloc[left_cluster, left_cluster]
inv_diag = 1 / np.diag(left_subcovar.values)
parity_w = inv_diag * (1 / np.sum(inv_diag))
left_cluster_var = np.dot(parity_w, np.dot(left_subcovar, parity_w))
right_subcovar = covariances.iloc[right_cluster, right_cluster]
inv_diag = 1 / np.diag(right_subcovar.values)
parity_w = inv_diag * (1 / np.sum(inv_diag))
right_cluster_var = np.dot(parity_w, np.dot(right_subcovar, parity_w))
alloc_factor = 1 - left_cluster_var / (left_cluster_var + right_cluster_var)
self.weights[left_cluster] *= alloc_factor
self.weights[right_cluster] *= 1 - alloc_factor
self.weights.index = assets[self.ordered_indices]
self.weights = pd.DataFrame(self.weights).T
@staticmethod
def _calculate_returns(asset_prices, resample_by):
if resample_by is not None:
asset_prices = asset_prices.resample(resample_by).last()
asset_returns = asset_prices.pct_change()
asset_returns = asset_returns.dropna(how='all')
return asset_returns
@staticmethod
def _shrink_covariance(covariance):
oas = OAS()
oas.fit(covariance)
shrinked_covariance = oas.covariance_
return pd.DataFrame(shrinked_covariance, index=covariance.columns, columns=covariance.columns)
@staticmethod
def _cov2corr(covariance):
d_matrix = np.zeros_like(covariance)
diagnoal_sqrt = np.sqrt(np.diag(covariance))
np.fill_diagonal(d_matrix, diagnoal_sqrt)
d_inv = np.linalg.inv(d_matrix)
corr = np.dot(np.dot(d_inv, covariance), d_inv)
corr = pd.DataFrame(corr, index=covariance.columns, columns=covariance.columns)
return corr
class HierarchicalRiskParityModified(HierarchicalRiskParity):
def allocate(self, asset_prices, covariance, resample_by='B', use_shrinkage=False):
if not isinstance(asset_prices, pd.DataFrame):
raise ValueError("Asset prices matrix must be a dataframe")
if not isinstance(asset_prices.index, pd.DatetimeIndex):
raise ValueError("Asset prices dataframe must be indexed by date.")
asset_returns = self._calculate_returns(asset_prices, resample_by=resample_by)
num_assets = asset_returns.shape[1]
assets = asset_returns.columns
cov = pd.DataFrame(covariance, columns=assets, index=assets)
if use_shrinkage:
cov = self._shrink_covariance(covariance=cov)
corr = self._cov2corr(covariance=cov)
distances, self.clusters = self._tree_clustering(correlation=corr)
self.ordered_indices = self._quasi_diagnalization(num_assets, 2 * num_assets - 2)
self.seriated_distances, self.seriated_correlations = self._get_seriated_matrix(assets=assets, distances=distances, correlations=corr)
self._recursive_bisection(covariances=cov, assets=assets)
# ==============================================================================
# CUSTOM SECURITY INITIALIZER (For Slippage & Brokerage Models)
# ==============================================================================
class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
def __init__(self, brokerage_model, security_seeder):
super().__init__(brokerage_model, security_seeder)
def Initialize(self, security):
# 1. Apply the default Alpaca brokerage models (fees, margin, etc.)
super().Initialize(security)
# 2. Apply our custom 0.1% slippage penalty on top
security.SetSlippageModel(ConstantSlippageModel(0.001))
# ==============================================================================
# QUANTCONNECT ALGORITHM
# ==============================================================================
class UltimateFactorHRP(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 1)
self.SetCash(100000)
# 1. Environment & Universe
self.UniverseSettings.Resolution = Resolution.Minute
self.AddUniverse(self.FundamentalSelection)
self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
# 2. Strategy Variables
self.max_candidates = 50
self.final_count = 15
self.candidates = []
self.weight_buffer = 0.02 # 2% weight delta buffer
# 3. State Variables
self.next_universe_time = self.Time
self.pending_weights = {}
self.pending_liquidations = []
self.SetWarmUp(60)
# 4. Brokerage & Custom Security Initializer
self.SetBrokerageModel(BrokerageName.ALPACA)
self.SetSecurityInitializer(CustomSecurityInitializer(self.BrokerageModel, SecuritySeeder.Null))
# 5. Scheduling (Daily Rebalance)
self.Schedule.On(self.DateRules.EveryDay(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.QueueTrades)
def FundamentalSelection(self, fundamental):
"""Locks universe updates to a weekly cycle"""
if self.Time < self.next_universe_time:
return Universe.Unchanged
self.next_universe_time = self.Time + timedelta(days=7)
filtered = [f for f in fundamental if f.HasFundamentalData and f.Price > 5 and f.MarketCap > 1e8]
sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)
self.candidates = [x.Symbol for x in sorted_by_cap[:self.max_candidates]]
return self.candidates
def QueueTrades(self):
"""Calculates daily targets and pushes them into the execution queues"""
if self.IsWarmingUp or not self.candidates: return
# Risk-Off Check
spy_history = self.History(self.spy, 200, Resolution.Daily)
if not spy_history.empty:
spy_current = spy_history['close'].iloc[-1]
spy_sma = spy_history['close'].mean()
if spy_current < spy_sma:
self.Liquidate()
self.pending_weights.clear()
self.Debug("Market Risk-Off: Liquidating to Cash")
return
# Fetch candidate history
history = self.History(self.candidates, 60, Resolution.Daily)
if history.empty: return
prices = history['close'].unstack(level=0).ffill().dropna(axis=1)
if prices.empty or len(prices.columns) < self.final_count: return
# Target Top 15 Momentum Stocks
mom_scores = (prices.iloc[-1] / prices.iloc[0]) - 1
top_symbols = mom_scores.sort_values(ascending=False).head(self.final_count).index.tolist()
target_prices = prices[top_symbols]
target_prices.index = pd.to_datetime(target_prices.index)
try:
# HRP Math & Allocation
target_returns = target_prices.pct_change().dropna(how='all')
target_cov = target_returns.cov()
hrp = HierarchicalRiskParityModified()
hrp.allocate(asset_prices=target_prices, covariance=target_cov, resample_by='B', use_shrinkage=True)
hrp_weights_series = hrp.weights.iloc[0]
# Queue Liquidations
current_holdings = [x.Key for x in self.Portfolio if x.Value.Invested]
self.pending_liquidations = [sym for sym in current_holdings if sym not in hrp_weights_series.index]
# Apply Weight Delta Filter
self.pending_weights.clear()
for symbol, target_weight in hrp_weights_series.items():
current_weight = 0
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
current_weight = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue
weight_delta = abs(target_weight - current_weight)
if weight_delta >= self.weight_buffer:
self.pending_weights[symbol] = target_weight
# Trigger first attempt
self.ExecutePendingTrades()
except Exception as e:
self.Debug(f"Trade Execution Error: {e}")
def OnData(self, data):
"""Fallback loop: Retries stuck trades every 10 minutes"""
if not self.pending_weights and not self.pending_liquidations: return
if self.Time.minute % 10 == 0:
self.ExecutePendingTrades()
def ExecutePendingTrades(self):
"""Safely processes the queues only if valid live price data exists"""
completed_liquidations = []
for symbol in self.pending_liquidations:
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.SetHoldings(symbol, 0)
completed_liquidations.append(symbol)
for symbol in completed_liquidations:
self.pending_liquidations.remove(symbol)
completed_allocations = []
for symbol, weight in self.pending_weights.items():
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.SetHoldings(symbol, weight)
completed_allocations.append(symbol)
for symbol in completed_allocations:
del self.pending_weights[symbol]from AlgorithmImports import *
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import warnings
# Ignore sklearn convergence warnings for clean console logs
warnings.filterwarnings("ignore", category=UserWarning)
# ==============================================================================
# 1. MARCENKO-PASTUR COVARIANCE DENOISING
# ==============================================================================
class CovarianceDenoising:
@staticmethod
def denoise_covariance(cov, T, N):
"""
Applies Analytical Marcenko-Pastur Denoising (Constant Residual Eigenvalue)
T = Number of observations (days of history)
N = Number of variables (stocks)
"""
# 1. Covariance to Correlation
std = np.sqrt(np.diag(cov))
corr = cov.values / np.outer(std, std)
corr[corr < -1], corr[corr > 1] = -1, 1 # Clip bounds for floating point dust
# 2. Eigen Decomposition
e_vals, e_vecs = np.linalg.eigh(corr)
# Sort descending
indices = e_vals.argsort()[::-1]
e_vals, e_vecs = e_vals[indices], e_vecs[:, indices]
# 3. Marcenko-Pastur Maximum Eigenvalue (Analytical Boundary)
q = T / float(N)
e_max = (1 + np.sqrt(1. / q)) ** 2
# 4. Find number of signal factors (Eigenvalues > e_max)
n_facts = e_vals[e_vals > e_max].shape[0]
# 5. Denoise by constant residual eigenvalue
if n_facts > 0 and n_facts < N:
e_vals_ = np.copy(e_vals)
# Average the noise eigenvalues and assign that constant value to all of them
e_vals_[n_facts:] = e_vals_[n_facts:].sum() / float(N - n_facts)
corr_denoised = np.dot(e_vecs, e_vals_ * e_vecs.T)
# Rescale diagonals to 1
d = np.diag(corr_denoised)
corr_denoised = corr_denoised / np.sqrt(np.outer(d, d))
else:
corr_denoised = corr
# 6. Correlation back to Covariance
cov_denoised = corr_denoised * np.outer(std, std)
return pd.DataFrame(cov_denoised, index=cov.index, columns=cov.columns)
# ==============================================================================
# 2. NESTED CLUSTERED OPTIMIZATION (NCO)
# ==============================================================================
class NestedClusteredOptimisation:
def __init__(self, max_clusters=10):
self.max_clusters = max_clusters
self.weights = None
def _get_optimal_clusters(self, corr):
"""Uses Silhouette Scores to find the optimal K-Means cluster count dynamically"""
max_k = min(self.max_clusters, corr.shape[0] - 1)
if max_k < 2:
return {0: corr.columns.tolist()}
best_k, best_score = 2, -1
for k in range(2, max_k + 1):
kmeans = KMeans(n_clusters=k, n_init=10, random_state=42)
kmeans.fit(corr)
sil_score = silhouette_score(corr, kmeans.labels_)
if sil_score > best_score:
best_score = sil_score
best_k = k
kmeans = KMeans(n_clusters=best_k, n_init=10, random_state=42)
kmeans.fit(corr)
clusters = {i: corr.columns[np.where(kmeans.labels_ == i)[0]].tolist() for i in range(best_k)}
return clusters
def _opt_port(self, cov):
"""Calculates Inverse Variance weights for a given covariance matrix"""
ivp = 1.0 / np.diag(cov)
ivp /= ivp.sum()
return pd.Series(ivp, index=cov.columns)
@staticmethod
def _cov2corr(cov):
"""Converts Covariance to Correlation"""
std = np.sqrt(np.diag(cov))
corr = cov / np.outer(std, std)
corr[corr < -1], corr[corr > 1] = -1, 1
return corr
def allocate(self, covariance):
"""Executes the NCO Pipeline"""
corr = self._cov2corr(covariance)
clusters = self._get_optimal_clusters(corr)
# Step 1: Intra-cluster weights
intra_weights = pd.DataFrame(0.0, index=covariance.index, columns=clusters.keys())
for i, cluster in clusters.items():
cov_cluster = covariance.loc[cluster, cluster]
intra_weights.loc[cluster, i] = self._opt_port(cov_cluster)
# Step 2: Inter-cluster weights
cov_inter = intra_weights.T.dot(covariance).dot(intra_weights)
inter_weights = self._opt_port(cov_inter)
# Step 3: Final weight = Intra * Inter
self.weights = intra_weights.mul(inter_weights, axis=1).sum(axis=1)
return self.weights
# ==============================================================================
# 3. QUANTCONNECT ALGORITHM: DENOISED NCO (QUARTERLY + SMART STOP)
# ==============================================================================
class UltimateFactorNCO(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Minute
self.AddUniverse(self.FundamentalSelection)
self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
# Target exact 60 stock portfolio
self.final_count = 60
self.candidates = []
# Execution Variables
self.current_quarter = 0
self.pending_weights = {}
self.pending_liquidations = []
self.weight_buffer = 0.02
# Smart Stop Loss Variables
self.trailing_stop_pct = 0.15
self.high_water_marks = {}
self.stop_loss_blacklist = set()
self.SetWarmUp(60)
self.SetBrokerageModel(BrokerageName.ALPACA)
# QUARTERLY Rebalance Schedule
self.Schedule.On(self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.QueueTrades)
def FundamentalSelection(self, fundamental):
"""Builds a strict 60-stock Value universe: 36 Large, 18 Mid, 6 Small every Quarter"""
current_q = (self.Time.month - 1) // 3 + 1
if current_q == self.current_quarter:
return Universe.Unchanged
self.current_quarter = current_q
self.stop_loss_blacklist.clear()
# 1. Base Filter (Ex-Financials/Real Estate)
filtered = [f for f in fundamental if f.HasFundamentalData
and f.Price > 5
and f.MarketCap > 1e8
and f.ValuationRatios.PBRatio > 0
and f.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.FinancialServices
and f.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.RealEstate]
if len(filtered) < 1000: return Universe.Unchanged
# 2. Sort by size to define strata
sorted_by_size = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)
large_caps = sorted_by_size[:200]
mid_caps = sorted_by_size[200:500]
small_caps = sorted_by_size[500:1000]
def get_value_stocks(bucket, count):
sorted_bucket = sorted(bucket, key=lambda x: 1 / x.ValuationRatios.PBRatio, reverse=True)
return [x.Symbol for x in sorted_bucket[:count]]
# 3. Extract exact 60/30/10 ratio
large_value_symbols = get_value_stocks(large_caps, 36)
mid_value_symbols = get_value_stocks(mid_caps, 18)
small_value_symbols = get_value_stocks(small_caps, 6)
self.candidates = large_value_symbols + mid_value_symbols + small_value_symbols
self.Debug(f"Q{self.current_quarter} Universe Generated: 60 Stratified Value Targets")
return self.candidates
def QueueTrades(self):
if self.IsWarmingUp or not self.candidates: return
# Enforce Quarterly Execution
if self.Time.month not in [1, 4, 7, 10]:
return
# Risk-Off Check
spy_history = self.History(self.spy, 200, Resolution.Daily)
if not spy_history.empty:
spy_current = spy_history['close'].iloc[-1]
spy_sma = spy_history['close'].mean()
if spy_current < spy_sma:
self.Liquidate()
self.pending_weights.clear()
self.Debug("Market Risk-Off: Liquidating to Cash")
return
active_candidates = [c for c in self.candidates if c not in self.stop_loss_blacklist]
# Fetch 60 days of history
history = self.History(active_candidates, 60, Resolution.Daily)
if history.empty: return
prices = history['close'].unstack(level=0).ffill().dropna(axis=1)
if prices.empty: return
mom_scores = (prices.iloc[-1] / prices.iloc[0]) - 1
top_symbols = mom_scores.sort_values(ascending=False).head(self.final_count).index.tolist()
target_prices = prices[top_symbols]
target_prices.index = pd.to_datetime(target_prices.index)
try:
# 1. Base Math Data
target_returns = target_prices.pct_change().dropna(how='all')
raw_cov = target_returns.cov()
# 2. MARCENKO-PASTUR DENOISING
T = len(target_returns)
N = len(target_returns.columns)
denoised_cov = CovarianceDenoising.denoise_covariance(raw_cov, T, N)
# 3. Execute Native NCO Pipeline
nco = NestedClusteredOptimisation(max_clusters=10)
nco_weights = nco.allocate(covariance=denoised_cov)
# 4. Queue Liquidations
current_holdings = [x.Key for x in self.Portfolio if x.Value.Invested]
self.pending_liquidations = [sym for sym in current_holdings if sym not in nco_weights.index]
# 5. Apply Weight Buffer
self.pending_weights.clear()
for symbol, target_weight in nco_weights.items():
if target_weight <= 0.0001: continue
current_weight = 0
if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested:
current_weight = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue
weight_delta = abs(target_weight - current_weight)
if weight_delta >= self.weight_buffer:
self.pending_weights[symbol] = round(target_weight, 4)
self.ExecutePendingTrades()
except Exception as e:
self.Debug(f"NCO Execution Error: {e}")
def OnData(self, data):
"""Intraday Smart Stop Loss & Fallback Execution"""
for symbol in list(self.Portfolio.Keys):
if not self.Portfolio[symbol].Invested:
if symbol in self.high_water_marks:
del self.high_water_marks[symbol]
continue
price = self.Securities[symbol].Price
if price == 0: continue
hwm = self.high_water_marks.get(symbol, price)
if price > hwm:
self.high_water_marks[symbol] = price
elif price < hwm * (1 - self.trailing_stop_pct):
self.SetHoldings(symbol, 0)
self.stop_loss_blacklist.add(symbol)
del self.high_water_marks[symbol]
self.Debug(f"Stop Loss: Liquidated {symbol.Value} at {price}. Blacklisted until next quarter.")
if not self.pending_weights and not self.pending_liquidations: return
if self.Time.minute % 10 == 0:
self.ExecutePendingTrades()
def ExecutePendingTrades(self):
completed_liquidations = []
for symbol in self.pending_liquidations:
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.SetHoldings(symbol, 0)
completed_liquidations.append(symbol)
for symbol in completed_liquidations:
self.pending_liquidations.remove(symbol)
completed_allocations = []
for symbol, weight in self.pending_weights.items():
if self.Securities.ContainsKey(symbol) and self.Securities[symbol].Price > 0:
self.SetHoldings(symbol, weight)
completed_allocations.append(symbol)
for symbol in completed_allocations:
del self.pending_weights[symbol]