Overall Statistics
Total Orders
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Start Equity
100000
End Equity
100000
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-0.675
Tracking Error
0.162
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
Drawdown Recovery
0
# region imports
from AlgorithmImports import *
from datetime import timedelta
# endregion

class ETFConstituentUniverseAlgorithm(QCAlgorithm):
    
    def initialize(self) -> None:
        self.set_start_date(2018, 1, 1)
        self.set_end_date(2025, 11, 30)
        self.set_cash(100_000)
        
        # Important for large universes
        self.universe_settings.asynchronous = True
        
        # Add SPY ETF constituents universe - FIXED: use self.Universe not self.universe
        self.AddUniverse(
            self.Universe.ETF("SPY", Market.USA, self.UniverseSettings, self.etf_constituents_filter)
        )
        
        # Store current constituents
        self.latest_constituents = []
        
        # Optional: schedule a daily log of current holdings
        self.Schedule.On(self.DateRules.EveryDay(), 
                        self.TimeRules.AfterMarketOpen("SPY", 30), 
                        self.log_current_constituents)

    def etf_constituents_filter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
        """Runs daily with the latest SPY holdings data"""
        # Store the latest constituents for use elsewhere
        self.latest_constituents = constituents
        
        # Log top 10 holdings to avoid spam
        for c in constituents[:10]:
            self.Debug(f"{self.Time.date()} | {c.Symbol} | Weight: {c.Weight:.4f} | "
                      f"Shares: {c.SharesHeld:,.0f} | Value: ${c.MarketValue:,.0f}")
        
        # Return all symbols to add to universe
        return [c.Symbol for c in constituents]

    def log_current_constituents(self) -> None:
        """Log current top 5 holdings"""
        if not self.latest_constituents:
            return
            
        top5 = sorted(self.latest_constituents, key=lambda x: x.Weight, reverse=True)[:5]
        self.Log("=== Top 5 SPY holdings today ===")
        for c in top5:
            self.Log(f"{c.Symbol} - Weight: {c.Weight*100:.2f}%")

    def on_securities_changed(self, changes: SecurityChanges) -> None:
        """Handle securities being added/removed from universe"""
        self.Log(f"Securities changed: {len(changes.AddedSecurities)} added, {len(changes.RemovedSecurities)} removed")
        
        # Example: Set up any new securities
        for security in changes.AddedSecurities:
            # Set leverage, fees, etc.
            security.SetLeverage(1.0)
            
        for security in changes.RemovedSecurities:
            # Liquidate removed securities
            if security.Invested:
                self.Liquidate(security.Symbol)

    # --------------------- HISTORICAL DATA EXAMPLE ---------------------
    def on_data(self, data: Slice) -> None:
        """Run once a month to show historical constituents"""
        # Check if it's the first day of the month
        if self.Time.day == 1:
            self.show_historical_example()

    def show_historical_example(self) -> None:
        """Demonstrate how to get historical constituent data"""
        try:
            # Get last 30 days of SPY constituents history
            history = self.History([self.Symbol.Create("SPY", SecurityType.Equity, Market.USA)], 30, Resolution.DAILY)
            
            if history.empty:
                self.Log("No historical constituent data available yet.")
                return
                
            self.Log(f"--- Historical SPY constituents for last month ---")
            
            # Process historical data
            for symbol, time in history.index:
                constituents = history.loc[symbol].loc[time]
                if hasattr(constituents, '__iter__'):
                    total_weight = sum(c.Weight for c in constituents)
                    self.Log(f"{time.date()} | Constituents: {len(constituents)} | Total weight: {total_weight:.4f}")
                    
                    # Example: Find AAPL weight
                    aapl = next((c for c in constituents if c.Symbol.Value == "AAPL"), None)
                    if aapl:
                        self.Log(f"   AAPL weight: {aapl.Weight*100:.3f}%")
            
        except Exception as e:
            self.Log(f"Error getting historical data: {e}")

    def get_current_top_holdings(self, top_n: int = 10) -> List[ETFConstituentData]:
        """Helper method to get current top holdings"""
        if not self.latest_constituents:
            return []
        return sorted(self.latest_constituents, key=lambda x: x.Weight, reverse=True)[:top_n]

    def get_sector_breakdown(self) -> Dict[str, float]:
        """Example: Calculate sector weights from current constituents"""
        sector_weights = {}
        if not self.latest_constituents:
            return sector_weights
            
        for constituent in self.latest_constituents:
            # Note: You might need additional data for sectors
            # This is a placeholder - you'd need to get sector data separately
            sector = "Unknown"  # Replace with actual sector data
            sector_weights[sector] = sector_weights.get(sector, 0) + constituent.Weight
            
        return sector_weights