| Overall Statistics |
|
Total Orders 360 Average Win 0.14% Average Loss -0.18% Compounding Annual Return 8.382% Drawdown 14.300% Expectancy 0.095 Start Equity 1000000 End Equity 1062517.93 Net Profit 6.252% Sharpe Ratio 0.085 Sortino Ratio 0.099 Probabilistic Sharpe Ratio 30.255% Loss Rate 39% Win Rate 61% Profit-Loss Ratio 0.79 Alpha -0.026 Beta 0.576 Annual Standard Deviation 0.125 Annual Variance 0.016 Information Ratio -0.512 Tracking Error 0.103 Treynor Ratio 0.018 Total Fees $1094.34 Estimated Strategy Capacity $22000000.00 Lowest Capacity Asset VICI WT7NDQSMCSBP Portfolio Turnover 6.92% Drawdown Recovery 115 |
import numpy as np
from scipy.cluster import hierarchy
import pandas as pd
from AlgorithmImports import *
class AssetWeightCalculator:
def __init__(self, algorithm: QCAlgorithm):
self.algorithm = algorithm
self.risk_free = self.algorithm.add_equity("BIL", Resolution.DAILY)
def coarse_selection(self, coarse):
"""
Available CoarseFundamental properties:
- symbol: Symbol object
- price: Current price
- volume: Daily volume
- dollar_volume: Daily dollar volume (price * volume)
- has_fundamental_data: Boolean indicating if fundamental data exists
- market_cap: Market cap (but only updated monthly)
- adjustment_factor: Stock split adjustment factor
"""
# First basic filtering
filtered = [x for x in coarse if (
x.price > 10 and # Price filter to avoid penny stocks
x.volume > 500000 and # Minimum daily volume for liquidity
x.has_fundamental_data and # Must have fundamental data
x.dollar_volume > 5000000 # Minimum $5M daily dollar volume
)]
# Sort by dollar volume (most liquid first)
sorted_by_volume = sorted(filtered,
key=lambda x: x.dollar_volume,
reverse=True)
# Take top 200 most liquid stocks
top_liquid = sorted_by_volume[:500]
# Loggin some statistics
if top_liquid:
self.algorithm.log("\nCoarse Selection Statistics:")
self.algorithm.log(f"Avg Price: ${np.mean([x.price for x in top_liquid]):.2f}")
self.algorithm.log(f"Avg Volume: {np.mean([x.volume for x in top_liquid]):,.0f}")
self.algorithm.log(f"Avg Dollar Volume: ${np.mean([x.dollar_volume for x in top_liquid]):,.2f}")
# Loggin top 5 most liquid stocks
self.algorithm.log("\nTop 5 Most Liquid Stocks:")
for stock in top_liquid[:5]:
self.algorithm.log(f"{stock.symbol}: ${stock.dollar_volume:,.2f} daily volume")
return [x.symbol for x in top_liquid]
def fine_selection(self, fine):
"""
Comprehensive long-term stock selection
"""
market_cap_filtered = [x for x in fine if x.market_cap is not None and x.market_cap > 10e9]
# Examine the fundamental data of the first few companies
for i, company in enumerate(market_cap_filtered[:5]):
pass
qualified_companies = []
for company in market_cap_filtered:
try:
financial_score = 0
growth_score = 0
quality_score = 0
value_score = 0
# Financial Strength
if (company.financial_statements is not None and
company.financial_statements.balance_sheet is not None):
current_assets = company.financial_statements.balance_sheet.current_assets.value
current_liabilities = company.financial_statements.balance_sheet.current_liabilities.value
total_debt = company.financial_statements.balance_sheet.total_debt.value
total_equity = company.financial_statements.balance_sheet.total_equity.value
current_ratio = current_assets / current_liabilities if current_liabilities != 0 else 0
# Modified debt-equity handling
if total_equity <= 0:
# Negative equity is a red flag
financial_score -= 1 # Penalty for negative equity
else:
debt_equity = total_debt / total_equity
if debt_equity < 1.5:
financial_score += 1
if current_ratio > 1.5:
financial_score += 1
if hasattr(company, 'valuation_ratios'):
# Debug valuation ratios for first few companies
if len(qualified_companies) < 5:
self.algorithm.debug(f"\nGrowth Metrics for {company.symbol.value}:")
self.algorithm.debug(f"First Year Est. EPS Growth: {getattr(company.valuation_ratios, 'first_year_estimated_eps_growth', 'Not Available')}")
self.algorithm.debug(f"Sustainable Growth Rate: {getattr(company.valuation_ratios, 'sustainable_growth_rate', 'Not Available')}")
# Growth checks with null handling
eps_growth = getattr(company.valuation_ratios, 'first_year_estimated_eps_growth', None)
sustainable_growth = getattr(company.valuation_ratios, 'sustainable_growth_rate', None)
if eps_growth is not None and not np.isnan(eps_growth) and eps_growth > 0.10:
growth_score += 1
if len(qualified_companies) < 5:
self.algorithm.debug(f"Added growth score for EPS growth: {eps_growth:.2%}")
if sustainable_growth is not None and not np.isnan(sustainable_growth) and sustainable_growth > 0.10:
growth_score += 1
if len(qualified_companies) < 5:
self.algorithm.debug(f"Added growth score for sustainable growth: {sustainable_growth:.2%}")
# Quality (Profitability and Efficiency)
if company.valuation_ratios.earning_yield > 0.06: # 6% earnings yield
quality_score += 1
if company.valuation_ratios.fcf_yield > 0.05: # 5% free cash flow yield
quality_score += 1
# Value
if company.valuation_ratios.pe_ratio is not None:
if (company.valuation_ratios.pe_ratio <
company.valuation_ratios.pe_ratio_5_year_average):
value_score += 1
if company.valuation_ratios.ev_to_ebitda < 12: # Common threshold
value_score += 1
total_score = (financial_score + growth_score + quality_score + value_score)
if total_score > 3:
qualified_companies.append((company, total_score))
except Exception as e:
self.algorithm.debug(f"Error processing company {company.symbol}: {str(e)}")
continue
self.algorithm.debug(f"Companies with scores > 3: {len(qualified_companies)}")
# Sort by total score
sorted_companies = sorted(qualified_companies,
key=lambda x: x[1],
reverse=True)
# Return symbols for top companies
filtered = [company.symbol for company, _ in sorted_companies]
return self.low_corr_assets(filtered)
def calculate_sharpe_ratio(self, symbol, period=756): # This is 3 yrs worth of trading days
"""
Calculates the sharpe
"""
try:
# If a KeyValuePair was recieved only take the symbol
if hasattr(symbol, "Key"):
symbol = symbol.Key
history = self.algorithm.history([symbol], period, Resolution.DAILY)
if history.empty:
self.algorithm.debug(f"No history for {symbol.value}")
return None
# Get risk-free rate
rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.DAILY)
risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data
# Sharpe ratio logic
returns = history['close'].pct_change().dropna()
excess_returns = returns - (risk_free_rate/252)
mean_excess_return = excess_returns.mean() * 252
std_dev = excess_returns.std() * np.sqrt(252)
return mean_excess_return / std_dev if std_dev != 0 else None
except Exception as e:
self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}")
return None
def low_corr_assets(self, symbols):
"""
Selects assets with low correlation using hierarchical clustering.
Returns a list of symbols sorted by their Sharpe ratios within clusters.
Parameters:
symbols: list of symbols to analyze
"""
try:
correlation_period = 252 * 3
all_returns = {}
# Fetch returns
for symbol in symbols:
history = self.algorithm.history([symbol], correlation_period, Resolution.DAILY)
if not history.empty:
close_prices = history.loc[symbol]['close']
returns = close_prices.pct_change().dropna()
if len(returns) > 0:
all_returns[symbol] = returns
if not all_returns:
return []
# Create DataFrame with proper alignment
returns_df = pd.DataFrame(all_returns)
# Remove any columns with all NaN values
returns_df = returns_df.dropna(axis=1, how='all')
# Fill any remaining NaN values with 0 or forward fill
returns_df = returns_df.fillna(method='ffill')
# Calculate correlation matrix
corr_matrix = returns_df.corr()
# Replace any remaining NaN values in correlation matrix with 0
corr_matrix = corr_matrix.fillna(0)
# Convert correlations to distances
distance_matrix = np.sqrt(2 * (1 - corr_matrix))
# Ensure distance matrix contains only finite values
if not np.all(np.isfinite(distance_matrix)):
self.algorithm.debug("Warning: Distance matrix contains non-finite values")
# Replace non-finite values with maximum finite value
max_finite = np.nanmax(distance_matrix[np.isfinite(distance_matrix)])
distance_matrix[~np.isfinite(distance_matrix)] = max_finite
# Convert to condensed form for linkage
condensed_dist = []
for i in range(len(distance_matrix)):
for j in range(i + 1, len(distance_matrix)):
dist = distance_matrix.iloc[i, j]
if np.isfinite(dist): # Only add finite distances
condensed_dist.append(dist)
else:
condensed_dist.append(0) # or some other appropriate value
# Verify we have valid data for clustering
if not condensed_dist:
self.algorithm.debug("No valid distances for clustering")
return list(symbols)[:30]
# Perform hierarchical clustering
linkage = hierarchy.linkage(condensed_dist, method='complete')
clusters = hierarchy.fcluster(linkage, t=0.5, criterion='distance')
# Select best assets from each cluster
selected_assets = []
cluster_ids = np.unique(clusters)
for cluster_id in cluster_ids:
cluster_mask = clusters == cluster_id
cluster_assets = returns_df.columns[cluster_mask]
# Calculate Sharpe ratios for this cluster
cluster_sharpes = {}
for asset in cluster_assets:
sharpe = self.calculate_sharpe_ratio(asset)
if sharpe is not None:
cluster_sharpes[asset] = sharpe
# Select asset with highest Sharpe from cluster
if cluster_sharpes:
best_asset = max(cluster_sharpes.items(), key=lambda x: x[1])[0]
selected_assets.append(best_asset)
# Take top 20 assets
final_assets = selected_assets[:30]
# Get correlation matrix for final selection
final_returns = returns_df[[symbol for symbol in final_assets]]
final_corr = final_returns.corr()
# Log correlation statistics
self.algorithm.debug("\nCorrelation Statistics for Final 30 Assets:")
# Average correlation
corr_values = final_corr.values[np.triu_indices_from(final_corr.values, k=1)]
avg_corr = np.mean(corr_values)
self.algorithm.debug(f"Average Correlation: {avg_corr:.3f}")
# Correlation range
min_corr = np.min(corr_values)
max_corr = np.max(corr_values)
self.algorithm.debug(f"Correlation Range: {min_corr:.3f} to {max_corr:.3f}")
# Most correlated pair
max_corr_idx = np.unravel_index(np.argmax(final_corr.values * (1 - np.eye(len(final_assets)))), final_corr.shape)
self.algorithm.debug(f"Most correlated pair: {final_assets[max_corr_idx[0]].value} - {final_assets[max_corr_idx[1]].value} ({final_corr.iloc[max_corr_idx]:.3f})")
# Least correlated pair
min_corr_idx = np.unravel_index(np.argmin(final_corr.values + np.eye(len(final_assets))), final_corr.shape)
self.algorithm.debug(f"Least correlated pair: {final_assets[min_corr_idx[0]].value} - {final_assets[min_corr_idx[1]].value} ({final_corr.iloc[min_corr_idx]:.3f})")
# Print full correlation matrix for final selection
self.algorithm.debug("\nFinal Correlation Matrix:")
for i in range(len(final_assets)):
row = [f"{final_corr.iloc[i,j]:.3f}" for j in range(len(final_assets))]
self.algorithm.debug(f"{final_assets[i].value}: {', '.join(row)}")
self.algorithm.debug(f"\nSelected {len(final_assets)} low-correlation assets")
return final_assets
except Exception as e:
self.algorithm.debug(f"Error in low_corr_assets: {str(e)}")
return list(symbols)[:30]
def normalize_scores(self, scores):
"""
The list of scores from the ranking method are
normalized using a z score so that an additive
operation may be used in WeightCombiner()
"""
values = np.array(list(scores.values()))
mean = np.mean(values)
std_dev = np.std(values)
if std_dev == 0:
# If no variation in scores, assign equal normalized scores
return {symbol: 0 for symbol in scores.keys()}
normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()}
return normalized_scores
from AlgorithmImports import *
class MACDSignalGenerator:
def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05):
self.algorithm = algorithm
self.symbols = set(symbols)
self.cash_buffer = cash_buffer
self.macd_indicators = {} # {symbol: {variant: MACD}}
# Define MACD parameters for different variants
self.macd_variants = {
"slow": {"fast": 12, "slow": 26, "signal": 9},
"slow-med": {"fast": 9, "slow": 19, "signal": 5},
"med-fast": {"fast": 7, "slow": 15, "signal": 3},
"fast": {"fast": 5, "slow": 12, "signal": 2},
}
def remove_symbols(self, symbols: list):
"""
Removes MACD indicators for the specified symbols.
"""
for symbol in symbols:
if symbol in self.macd_indicators:
for variant, macd in self.macd_indicators[symbol].items():
self.algorithm.unregister_indicator(macd)
self.algorithm.debug(f"Unregistering {symbol.value} {variant} MACD indicator")
del self.macd_indicators[symbol]
# Remove from symbols set
self.symbols.discard(symbol)
# Liquidate position
if self.algorithm.portfolio.contains_key(symbol):
self.algorithm.liquidate(symbol)
def add_symbols(self, new_symbols):
"""
Add in the new symbols that are given by AssetWeightCalculator.
"""
# Convert to set for efficient operations
new_symbols = set(new_symbols)
# Only process truly new symbols
actually_new = new_symbols - self.symbols
if not actually_new:
return
# Get historical data for new symbols
history = self.algorithm.history([s for s in new_symbols],
35, # Longest MACD period needed
Resolution.HOUR)
for symbol in actually_new:
# Check if security has data
if not self.algorithm.securities[symbol].has_data:
self.algorithm.debug(f"Waiting for data: {symbol.value}")
continue
self.macd_indicators[symbol] = {}
# Check if no history
if symbol not in history.index.get_level_values(0):
self.algorithm.log(f"No History for adding")
continue
symbol_history = history.loc[symbol]
for variant, params in self.macd_variants.items():
macd = self.algorithm.macd(
symbol=symbol,
fast_period=params["fast"],
slow_period=params["slow"],
signal_period=params["signal"],
type=MovingAverageType.EXPONENTIAL,
resolution=Resolution.HOUR,
selector=Field.CLOSE
)
# Warm up MACD with historical data
for time, row in symbol_history.iterrows():
macd.update(time, row['close'])
self.macd_indicators[symbol][variant] = macd
self.algorithm.log(f"Adding macd: {symbol} and {variant}")
# Only add symbol after proper setup
self.symbols.add(symbol)
def calculate_position_sizes(self):
position_sizes = {}
max_position_limit = 0.1
# Check if we have any symbols to process
if not self.symbols or not self.macd_indicators:
self.algorithm.debug("No symbols available for position calculation")
return position_sizes
# Calculate base position size
max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants))
total_portfolio_allocation = 0 # Track total allocation
for symbol in self.macd_indicators:
position_sizes[symbol] = {}
symbol_total = 0 # Track total for this symbol
for variant, macd in self.macd_indicators[symbol].items():
if macd.is_ready:
security = self.algorithm.securities[symbol]
if not security.has_data or not security.is_tradable:
self.algorithm.debug(f"Security not ready: {symbol.value}")
continue
# Distance between fast and slow
distance = macd.fast.current.value - macd.slow.current.value
# Calculate initial position size
position_size = max_position * (distance / macd.slow.current.value) * 350 # Your scalar
# Ensure non-negative and within variant limit
position_size = max(0, min(position_size, max_position))
position_sizes[symbol][variant] = position_size
symbol_total += position_size
else:
position_sizes[symbol][variant] = 0
# If symbol total exceeds max limit, scale down proportionally
if symbol_total > max_position_limit:
scale_factor = max_position_limit / symbol_total
for variant in position_sizes[symbol]:
position_sizes[symbol][variant] *= scale_factor
symbol_total = max_position_limit
total_portfolio_allocation += symbol_total
# If total allocation exceeds 100%, scale everything down proportionally
if total_portfolio_allocation > 1:
scale_factor = 1 / total_portfolio_allocation
for symbol in position_sizes:
for variant in position_sizes[symbol]:
position_sizes[symbol][variant] *= scale_factor
# Log position sizes for verification
for symbol in position_sizes:
total_size = sum(position_sizes[symbol].values())
if total_size > max_position_limit:
self.algorithm.debug(f"WARNING: {symbol.value} position size {total_size:.3f} exceeds limit")
return position_sizesfrom AlgorithmImports import *
from ContinuousMACDSignalGenerator import MACDSignalGenerator
from AssetWeightCalculator import AssetWeightCalculator
class TestMACDInitializationAlgorithm(QCAlgorithm):
def Initialize(self):
"""
Things to add:
1. Warmup period for asset selector
2. Initialization with warmed up assets
"""
self.set_start_date(2012, 1, 1)
self.set_cash(1000000)
self.set_benchmark("SPY")
# Set maximum leverage to 1 (no leverage)
self.universe_settings.leverage = 1
self.bond_etf = self.add_equity("BIL", Resolution.HOUR)
self.spy = self.add_equity("SPY", Resolution.HOUR)
# Initialize tracking set for universe changes
self.current_symbols = set()
# Initialize the asset weight calculator
self.asset_calculator = AssetWeightCalculator(self)
# Add universe for coarse and fine selection
self.spy = self.add_equity("SPY", Resolution.HOUR)
self.add_universe(self.asset_calculator.coarse_selection, self.asset_calculator.fine_selection)
# Universe settings
self.universe_settings.Resolution = Resolution.HOUR
# Initialize MACD generator
self.macd_generator = MACDSignalGenerator(self, [])
# Scheduled ranking update
self.schedule.on(self.date_rules.week_start("SPY"),
self.time_rules.after_market_open("SPY", 1),
self.rank_and_update_symbols
)
# Schedule Monday 10:36 rebalancing
self.schedule.on(
self.date_rules.week_start("SPY"),
self.time_rules.after_market_open("SPY", 65),
self.rebalance_positions
)
def rank_and_update_symbols(self):
# Skip during warmup
if self.is_warming_up:
self.debug("Skipping rank_and_update during warmup")
return
# Log current state
owned_symbols = [symbol for symbol in self.portfolio.keys() if self.portfolio[symbol].quantity > 0]
self.debug(f"Currently owned symbols: {[s.value for s in owned_symbols]}")
# Get new universe, excluding tracking symbols and limiting to 20
excluded_symbols = {"BIL", "SPY"}
new_symbols = set(s.key for s in self.active_securities
if s.key.value not in excluded_symbols)
new_symbols = set(list(new_symbols)[:20])
self.debug(f"New universe symbols: {[s.value for s in new_symbols]}")
# Handle removals first
removed_symbols = self.current_symbols - new_symbols
if removed_symbols:
self.debug(f"Removing symbols: {[x.value for x in removed_symbols]}")
self.macd_generator.remove_symbols(list(removed_symbols))
# Double-check liquidation
for symbol in removed_symbols:
if self.portfolio.contains_key(symbol) and self.portfolio[symbol].invested:
self.liquidate(symbol)
# Then handle additions
added_symbols = new_symbols - self.current_symbols
if added_symbols:
self.debug(f"Adding symbols: {[x.value for x in added_symbols]}")
self.macd_generator.add_symbols(list(added_symbols))
# Update tracking set
self.current_symbols = new_symbols
# Verify final state
self.debug(f"Final universe size: {len(self.current_symbols)}")
self.debug(f"MACD symbols count: {len(self.macd_generator.symbols)}")
# Verify alignment
if self.current_symbols != self.macd_generator.symbols:
self.debug("WARNING: Universe and MACD symbols misaligned!")
self.debug(f"Universe only: {[s.value for s in self.current_symbols - self.macd_generator.symbols]}")
self.debug(f"MACD only: {[s.value for s in self.macd_generator.symbols - self.current_symbols]}")
def rebalance_positions(self):
"""Actual position rebalancing 64 mins after re-ranking"""
if self.is_warming_up:
return
# Get current positions that shouldn't be there
invalid_positions = [symbol for symbol in self.portfolio.keys()
if symbol not in self.macd_generator.symbols
and self.portfolio[symbol].invested
and symbol.Value not in ["BIL", "SPY"]] # This could end up being inefficient where liquidation occurs when loading up on size
# Liquidate invalid positions
for symbol in invalid_positions:
self.debug(f"Liquidating invalid position: {symbol.value}")
self.liquidate(symbol)
# Calculate new positions
position_sizes = self.macd_generator.calculate_position_sizes()
# Verify we're only trading valid symbols
for symbol in list(position_sizes.keys()):
if symbol not in self.macd_generator.symbols:
self.debug(f"WARNING: Position calculated for non-MACD symbol: {symbol.value}")
del position_sizes[symbol]
# Build target weights for all assets
target_weights = {}
total_equity_weight = 0
for symbol, variants in position_sizes.items():
security = self.securities[symbol]
if not security.has_data or not security.is_tradable:
continue
total_size = sum(variants.values())
if abs(total_size) > 0.001:
target_weights[symbol] = total_size
total_equity_weight += total_size
# Set bond weight as the remainder, but never negative
bond_weight = max(0, 1 - total_equity_weight)
target_weights[self.bond_etf.symbol] = bond_weight
# Normalize weights if total > 1.0
total_weight = sum(target_weights.values())
if total_weight > 1.0:
self.debug(f"Total target weight {total_weight} > 1.0, normalizing weights.")
for symbol in target_weights:
target_weights[symbol] /= total_weight
# Set all holdings in one pass
for symbol, weight in target_weights.items():
self.set_holdings(symbol, weight)
# Log the current portfolio
invested_value = self.portfolio.total_portfolio_value
self.debug(f"Portfolio value: {invested_value}")
self.debug(f"Bond weight: {bond_weight}")
# Final verification
actual_positions = [s for s in self.portfolio.keys()
if self.portfolio[s].invested
and s.value not in ["BIL", "SPY"]]
self.debug(f"Positions after rebalance: {len(actual_positions)}")
self.debug(f"Position symbols: {[s.value for s in actual_positions]}")
self.debug(f"Bond position set to {self.portfolio[self.bond_etf.symbol].quantity} shares of {self.bond_etf.symbol.value}")
self.debug(f"Cash remaining: {self.portfolio.cash}")
self.debug(f"Amount in equities: {self.portfolio.total_portfolio_value - self.portfolio.cash - self.portfolio[self.bond_etf.symbol].quantity * self.bond_etf.price}")