| Overall Statistics |
|
Total Orders 540 Average Win 2.25% Average Loss -1.18% Compounding Annual Return 46.460% Drawdown 41.700% Expectancy 0.779 Start Equity 10000 End Equity 101277.35 Net Profit 912.774% Sharpe Ratio 1.25 Sortino Ratio 1.465 Probabilistic Sharpe Ratio 71.246% Loss Rate 39% Win Rate 61% Profit-Loss Ratio 1.90 Alpha 0.231 Beta 0.92 Annual Standard Deviation 0.248 Annual Variance 0.062 Information Ratio 1.168 Tracking Error 0.192 Treynor Ratio 0.337 Total Fees $564.11 Estimated Strategy Capacity $0 Lowest Capacity Asset AMD R735QTJ8XC9X Portfolio Turnover 3.84% Drawdown Recovery 494 |
# region imports
from AlgorithmImports import *
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import numpy as np
import pandas as pd
import json
# endregion
class ClusteringWithDefensiveAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetCash(10000)
# ML METRICS TRACKING - Store in portfolio statistics for reliability
self.trade_log = [] # Simple list to track all trades
self.last_rebalance_positions = {} # Track what we bought at rebalance
# Define the Magnificent 7 + AMD
self.symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'TSLA', 'META', 'AMD']
# Add defensive assets
self.defensive_symbols = {
'GLD': self.AddEquity('GLD', Resolution.Daily).Symbol,
'SHY': self.AddEquity('SHY', Resolution.Daily).Symbol
}
# Add SPY as market benchmark
self.spy = self.AddEquity('SPY', Resolution.Daily).Symbol
# Add equity data for stocks
self.equity_symbols = {}
self.indicators = {}
for ticker in self.symbols:
symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
self.equity_symbols[ticker] = symbol
self.indicators[ticker] = {
'RSI': self.RSI(symbol, 14),
'MACD': self.MACD(symbol, 12, 26, 9, MovingAverageType.Exponential),
'ATR': self.ATR(symbol, 14),
'BB': self.BB(symbol, 20, 2, MovingAverageType.Simple),
'SMA50': self.SMA(symbol, 50),
'SMA200': self.SMA(symbol, 200)
}
self.market_indicators = {
'SMA50': self.SMA(self.spy, 50),
'SMA200': self.SMA(self.spy, 200),
'RSI': self.RSI(self.spy, 14)
}
self.defensive_indicators = {}
for name, symbol in self.defensive_symbols.items():
self.defensive_indicators[name] = {
'SMA20': self.SMA(symbol, 20),
'SMA50': self.SMA(symbol, 50)
}
self.lookback = 252
self.n_clusters = 3
self.kmeans = None
self.cluster_performance = {}
self.defensive_performance = {}
self.trained = False
self.SetWarmUp(timedelta(days=250))
# Schedule rebalancing and metrics calculation
self.Schedule.On(self.DateRules.MonthStart(),
self.TimeRules.AfterMarketOpen('SPY', 30),
self.Rebalance)
# Save metrics weekly to ensure they're saved even if OnEndOfAlgorithm doesn't fire
self.Schedule.On(self.DateRules.WeekStart(),
self.TimeRules.At(9, 0),
self.SaveMetrics)
def OnWarmupFinished(self):
self.TrainModel()
def CalculateIndicatorFeatures(self, ticker, history_data):
try:
close_prices = history_data['close']
high_prices = history_data['high']
low_prices = history_data['low']
if len(close_prices) < 250:
return None
delta = close_prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
loss = loss.replace(0, 0.0001)
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
exp1 = close_prices.ewm(span=12, adjust=False).mean()
exp2 = close_prices.ewm(span=26, adjust=False).mean()
macd_line = exp1 - exp2
signal_line = macd_line.ewm(span=9, adjust=False).mean()
macd_histogram = macd_line - signal_line
high_low = high_prices - low_prices
high_close = np.abs(high_prices - close_prices.shift())
low_close = np.abs(low_prices - close_prices.shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = ranges.max(axis=1)
atr = true_range.rolling(14).mean()
sma_20 = close_prices.rolling(20).mean()
std_20 = close_prices.rolling(20).std()
bb_upper = sma_20 + (std_20 * 2)
bb_lower = sma_20 - (std_20 * 2)
bb_width = (bb_upper - bb_lower) / sma_20.replace(0, 1)
bb_range = (bb_upper - bb_lower).replace(0, 1)
bb_position = (close_prices - bb_lower) / bb_range
sma_50 = close_prices.rolling(50).mean()
sma_200 = close_prices.rolling(200).mean()
features = [
rsi.iloc[-20:].mean() / 100,
macd_histogram.iloc[-20:].mean() / close_prices.iloc[-20:].mean(),
atr.iloc[-20:].mean() / close_prices.iloc[-20:].mean(),
bb_width.iloc[-20:].mean(),
bb_position.iloc[-20:].mean(),
1 if sma_50.iloc[-1] > sma_200.iloc[-1] else 0,
(sma_50.iloc[-1] - sma_200.iloc[-1]) / sma_200.iloc[-1] if sma_200.iloc[-1] > 0 else 0
]
return features
except:
return None
def TrainDefensiveAssets(self, history):
try:
spy_data = history.loc[self.spy]['close']
spy_sma200 = spy_data.rolling(200).mean()
market_down_mask = spy_data < spy_sma200
gld_data = history.loc[self.defensive_symbols['GLD']]['close']
shy_data = history.loc[self.defensive_symbols['SHY']]['close']
gld_returns_down = gld_data.pct_change()[market_down_mask]
shy_returns_down = shy_data.pct_change()[market_down_mask]
self.defensive_performance['GLD'] = {
'sharpe': gld_returns_down.mean() / gld_returns_down.std() if gld_returns_down.std() > 0 else 0
}
self.defensive_performance['SHY'] = {
'sharpe': shy_returns_down.mean() / shy_returns_down.std() if shy_returns_down.std() > 0 else 0
}
except:
pass
def TrainModel(self):
if self.trained:
return
try:
all_symbols = list(self.equity_symbols.values()) + [self.spy] + list(self.defensive_symbols.values())
history = self.History(all_symbols, 2520, Resolution.Daily)
except:
return
if history.empty:
return
self.TrainDefensiveAssets(history)
features_list = []
valid_symbols = []
for ticker in self.symbols:
symbol = self.equity_symbols[ticker]
try:
if symbol not in history.index.get_level_values(0):
continue
stock_data = history.loc[symbol]
if len(stock_data) < 250:
continue
features = self.CalculateIndicatorFeatures(ticker, stock_data)
if features is None or any(np.isnan(features)) or any(np.isinf(features)):
continue
features_list.append(features)
valid_symbols.append(ticker)
except:
continue
if len(features_list) < 2:
return
X = np.array(features_list)
n_clusters_actual = min(self.n_clusters, len(valid_symbols))
self.kmeans = KMeans(n_clusters=n_clusters_actual, random_state=42, n_init=10)
cluster_labels = self.kmeans.fit_predict(X)
for i, ticker in enumerate(valid_symbols):
cluster = cluster_labels[i]
symbol = self.equity_symbols[ticker]
try:
stock_data = history.loc[symbol]['close']
total_return = (stock_data.iloc[-1] / stock_data.iloc[0]) - 1
if cluster not in self.cluster_performance:
self.cluster_performance[cluster] = []
self.cluster_performance[cluster].append(total_return)
except:
continue
self.trained = True
def IsMarketInDowntrend(self):
if not self.market_indicators['SMA50'].IsReady or not self.market_indicators['SMA200'].IsReady:
return False
spy_price = self.Securities[self.spy].Price
sma50 = self.market_indicators['SMA50'].Current.Value
sma200 = self.market_indicators['SMA200'].Current.Value
return spy_price < sma200 or sma50 < sma200
def SelectDefensiveAsset(self):
if len(self.defensive_performance) == 0:
return 'SHY'
gld_sharpe = self.defensive_performance.get('GLD', {}).get('sharpe', 0)
shy_sharpe = self.defensive_performance.get('SHY', {}).get('sharpe', 0)
gld_momentum = 0
shy_momentum = 0
if self.defensive_indicators['GLD']['SMA20'].IsReady and self.defensive_indicators['GLD']['SMA50'].IsReady:
gld_sma20 = self.defensive_indicators['GLD']['SMA20'].Current.Value
gld_sma50 = self.defensive_indicators['GLD']['SMA50'].Current.Value
gld_momentum = 1 if gld_sma20 > gld_sma50 else -1
if self.defensive_indicators['SHY']['SMA20'].IsReady and self.defensive_indicators['SHY']['SMA50'].IsReady:
shy_sma20 = self.defensive_indicators['SHY']['SMA20'].Current.Value
shy_sma50 = self.defensive_indicators['SHY']['SMA50'].Current.Value
shy_momentum = 1 if shy_sma20 > shy_sma50 else -1
gld_score = gld_sharpe + (gld_momentum * 0.5)
shy_score = shy_sharpe + (shy_momentum * 0.5)
return 'GLD' if gld_score > shy_score else 'SHY'
def CheckGoldenCross(self, ticker):
sma50 = self.indicators[ticker]['SMA50']
sma200 = self.indicators[ticker]['SMA200']
if not sma50.IsReady or not sma200.IsReady:
return False
return sma50.Current.Value > sma200.Current.Value
def GetCurrentFeatures(self, ticker):
ind = self.indicators[ticker]
if not all([ind['RSI'].IsReady, ind['MACD'].IsReady, ind['ATR'].IsReady,
ind['BB'].IsReady, ind['SMA50'].IsReady, ind['SMA200'].IsReady]):
return None
try:
rsi = ind['RSI'].Current.Value / 100
macd_histogram = ind['MACD'].Histogram.Current.Value
current_price = self.Securities[self.equity_symbols[ticker]].Price
macd_histogram = macd_histogram / current_price if current_price > 0 else 0
atr = ind['ATR'].Current.Value
atr = atr / current_price if current_price > 0 else 0
bb = ind['BB']
bb_width = (bb.UpperBand.Current.Value - bb.LowerBand.Current.Value) / bb.MiddleBand.Current.Value if bb.MiddleBand.Current.Value > 0 else 0
bb_range = bb.UpperBand.Current.Value - bb.LowerBand.Current.Value
bb_position = (current_price - bb.LowerBand.Current.Value) / bb_range if bb_range > 0 else 0.5
golden_cross = 1 if self.CheckGoldenCross(ticker) else 0
ma_spread = (ind['SMA50'].Current.Value - ind['SMA200'].Current.Value) / ind['SMA200'].Current.Value if ind['SMA200'].Current.Value > 0 else 0
features = [rsi, macd_histogram, atr, bb_width, bb_position, golden_cross, ma_spread]
if any(np.isnan(features)) or any(np.isinf(features)):
return None
return features
except:
return None
def Rebalance(self):
if not self.trained or self.kmeans is None or self.IsWarmingUp:
return
# Close previous month's positions and record outcomes
for ticker, entry_data in self.last_rebalance_positions.items():
symbol = self.equity_symbols[ticker]
if self.Portfolio[symbol].Invested:
exit_price = self.Securities[symbol].Price
profit = (exit_price - entry_data['entry_price']) / entry_data['entry_price']
# Record completed trade
self.trade_log.append({
'symbol': ticker,
'prediction': entry_data['prediction'],
'probability': entry_data['probability'],
'actual': 1 if profit > 0 else 0,
'profit': profit
})
self.Liquidate(symbol)
# Clear previous positions
self.last_rebalance_positions = {}
market_down = self.IsMarketInDowntrend()
current_features = []
valid_symbols = []
for ticker in self.symbols:
if not self.CheckGoldenCross(ticker):
continue
features = self.GetCurrentFeatures(ticker)
if features is None:
continue
current_features.append(features)
valid_symbols.append(ticker)
selected_stocks = []
if len(current_features) > 0:
X_current = np.array(current_features)
cluster_predictions = self.kmeans.predict(X_current)
if len(self.cluster_performance) > 0:
best_cluster = max(self.cluster_performance.keys(),
key=lambda c: np.mean(self.cluster_performance[c]))
else:
best_cluster = 0
distances = self.kmeans.transform(X_current)
for i, ticker in enumerate(valid_symbols):
cluster = cluster_predictions[i]
probability = 1.0 / (1.0 + distances[i][cluster])
prediction = 1 if cluster == best_cluster else 0
if cluster == best_cluster:
selected_stocks.append(ticker)
# Store for next rebalance
self.last_rebalance_positions[ticker] = {
'prediction': prediction,
'probability': probability,
'entry_price': self.Securities[self.equity_symbols[ticker]].Price
}
if len(selected_stocks) == 0:
selected_stocks = valid_symbols
# Store all as predictions
for i, ticker in enumerate(valid_symbols):
self.last_rebalance_positions[ticker] = {
'prediction': 1,
'probability': 1.0 / (1.0 + distances[i][cluster_predictions[i]]),
'entry_price': self.Securities[self.equity_symbols[ticker]].Price
}
# Execute trades
if market_down and len(selected_stocks) == 0:
for kvp in self.Portfolio:
symbol = kvp.Key
holding = kvp.Value
ticker = symbol.Value
if ticker in self.symbols and holding.Invested:
self.Liquidate(symbol)
defensive_asset = self.SelectDefensiveAsset()
defensive_symbol = self.defensive_symbols[defensive_asset]
self.SetHoldings(defensive_symbol, 1.0)
elif len(selected_stocks) > 0:
for name, symbol in self.defensive_symbols.items():
if self.Portfolio[symbol].Invested:
self.Liquidate(symbol)
weight = 1.0 / len(selected_stocks)
for ticker in selected_stocks:
symbol = self.equity_symbols[ticker]
self.SetHoldings(symbol, weight)
else:
self.Liquidate()
def SaveMetrics(self):
"""Save metrics periodically to ensure they're captured"""
if len(self.trade_log) == 0:
return
try:
y_true = [t['actual'] for t in self.trade_log]
y_pred = [t['prediction'] for t in self.trade_log]
y_prob = [t['probability'] for t in self.trade_log]
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
try:
roc_auc = roc_auc_score(y_true, y_prob)
except:
roc_auc = 0.5
cm = confusion_matrix(y_true, y_pred)
tn, fp, fn, tp = cm.ravel() if cm.size == 4 else (0, 0, 0, 0)
metrics_data = {
'model': 'K-Means',
'n': len(self.trade_log),
'accuracy': float(accuracy),
'precision': float(precision),
'recall': float(recall),
'f1': float(f1),
'roc_auc': float(roc_auc),
'tp': int(tp), 'fp': int(fp), 'fn': int(fn), 'tn': int(tn),
'trades': self.trade_log
}
self.ObjectStore.Save("clustering_metrics.json", json.dumps(metrics_data))
except Exception as e:
self.Debug(f"Error saving metrics: {str(e)}")
def OnEndOfAlgorithm(self):
"""Final save and log metrics"""
# Close any remaining positions
for ticker, entry_data in self.last_rebalance_positions.items():
symbol = self.equity_symbols[ticker]
if self.Portfolio[symbol].Invested:
exit_price = self.Securities[symbol].Price
profit = (exit_price - entry_data['entry_price']) / entry_data['entry_price']
self.trade_log.append({
'symbol': ticker,
'prediction': entry_data['prediction'],
'probability': entry_data['probability'],
'actual': 1 if profit > 0 else 0,
'profit': profit
})
if len(self.trade_log) == 0:
self.Debug("No trades recorded")
return
y_true = [t['actual'] for t in self.trade_log]
y_pred = [t['prediction'] for t in self.trade_log]
y_prob = [t['probability'] for t in self.trade_log]
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
try:
roc_auc = roc_auc_score(y_true, y_prob)
except:
roc_auc = 0.5
cm = confusion_matrix(y_true, y_pred)
tn, fp, fn, tp = cm.ravel() if cm.size == 4 else (0, 0, 0, 0)
# LOG METRICS
self.Debug("="*50)
self.Debug("K-MEANS ML METRICS")
self.Debug(f"N={len(self.trade_log)}")
self.Debug(f"Acc={accuracy:.4f} Prec={precision:.4f} Rec={recall:.4f}")
self.Debug(f"F1={f1:.4f} AUC={roc_auc:.4f}")
self.Debug(f"TP={tp} FP={fp} FN={fn} TN={tn}")
self.Debug("="*50)
# SAVE TO OBJECTSTORE
metrics_data = {
'model': 'K-Means',
'n': len(self.trade_log),
'accuracy': float(accuracy),
'precision': float(precision),
'recall': float(recall),
'f1': float(f1),
'roc_auc': float(roc_auc),
'tp': int(tp), 'fp': int(fp), 'fn': int(fn), 'tn': int(tn),
'trades': self.trade_log
}
try:
self.ObjectStore.Save("clustering_metrics.json", json.dumps(metrics_data))
self.Debug("Metrics saved to ObjectStore")
except Exception as e:
self.Debug(f"Error saving: {str(e)}")
def OnData(self, data):
pass