Overall Statistics
Total Orders
1071
Average Win
0.31%
Average Loss
-0.25%
Compounding Annual Return
-3.637%
Drawdown
20.200%
Expectancy
-0.095
Start Equity
1000000
End Equity
948982.14
Net Profit
-5.102%
Sharpe Ratio
-0.481
Sortino Ratio
-0.512
Probabilistic Sharpe Ratio
5.706%
Loss Rate
59%
Win Rate
41%
Profit-Loss Ratio
1.22
Alpha
-0.109
Beta
0.509
Annual Standard Deviation
0.145
Annual Variance
0.021
Information Ratio
-1.023
Tracking Error
0.144
Treynor Ratio
-0.137
Total Fees
$4093.84
Estimated Strategy Capacity
$3500000.00
Lowest Capacity Asset
EFX R735QTJ8XC9X
Portfolio Turnover
12.74%
Drawdown Recovery
252
from AlgorithmImports import *
import numpy as np

class SP500MomentumFFTStrategy(QCAlgorithm):
    """
    FFT x 50MA Momentum Strategy
    
    Uses Fast Fourier Transform to identify cyclic patterns
    Combined with 50-day MA for trend confirmation
    Rides high momentum stocks with dynamic position sizing
    """
    
    def initialize(self):
        self.set_start_date(2024, 1, 1)
        self.set_end_date(2025, 6,1 )
        self.set_cash(1000000)
        self.set_benchmark("SPY")
        
        # Add warm-up for indicators
        self.set_warm_up(100, Resolution.DAILY)  # Need 100 days for FFT analysis
        
        # Universe
        self.spy_symbol = self.add_equity("SPY", Resolution.DAILY).symbol
        self.add_universe(self.universe.etf(self.spy_symbol, self.universe_settings))
        
        # Core parameters
        self.lookback_period = 100  # Days for FFT analysis
        self.ma_period = 50  # 50-day moving average
        self.top_momentum_count = 15  # Number of highest momentum stocks to hold
        self.rebalance_days = 5  # Rebalance every 5 days
        self.momentum_threshold = 0.05  # Minimum 5% momentum to enter
        self.stop_loss = -0.08  # 8% stop loss
        self.take_profit = 0.25  # Take profit at 25%
        
        # FFT parameters
        self.fft_top_frequencies = 3  # Look at top 3 frequency components
        self.min_fft_power = 0.1  # Minimum power threshold for valid cycle
        
        # Tracking
        self.universe_symbols = []
        self.entry_prices = {}
        self.momentum_scores = {}
        self.last_rebalance = datetime.min
        
        # Schedule regular rebalancing
        self.schedule.on(
            self.date_rules.every_day("SPY"),
            self.time_rules.at(10, 0),
            self.rebalance_portfolio
        )
        
        # Daily position management
        self.schedule.on(
            self.date_rules.every_day("SPY"),
            self.time_rules.at(15, 30),
            self.manage_positions
        )
        
        self.debug("FFT Momentum Strategy Initialized")
    
    def on_securities_changed(self, changes):
        """Handle universe changes"""
        for removed in changes.removed_securities:
            if removed.symbol in self.universe_symbols:
                self.universe_symbols.remove(removed.symbol)
        
        for added in changes.added_securities:
            if added.symbol != self.spy_symbol:
                self.universe_symbols.append(added.symbol)
    
    def calculate_fft_momentum(self, symbol):
        """
        Calculate momentum score using FFT analysis
        Returns a score based on cyclic pattern strength and trend alignment
        """
        try:
            # Get historical prices
            history = self.history(symbol, self.lookback_period, Resolution.DAILY)
            if len(history) < self.lookback_period:
                return None
            
            prices = history["close"].values
            
            # Detrend the data for FFT
            detrended = prices - np.linspace(prices[0], prices[-1], len(prices))
            
            # Apply FFT
            fft_result = np.fft.fft(detrended)
            frequencies = np.fft.fftfreq(len(detrended))
            power_spectrum = np.abs(fft_result) ** 2
            
            # Get dominant frequencies (excluding DC component)
            positive_freq_idx = frequencies > 0
            positive_freqs = frequencies[positive_freq_idx]
            positive_power = power_spectrum[positive_freq_idx]
            
            # Find top frequency components
            top_indices = np.argsort(positive_power)[-self.fft_top_frequencies:]
            dominant_freqs = positive_freqs[top_indices]
            dominant_powers = positive_power[top_indices]
            
            # Calculate cycle strength (normalized power of dominant frequencies)
            total_power = np.sum(positive_power)
            if total_power > 0:
                cycle_strength = np.sum(dominant_powers) / total_power
            else:
                return None
            
            # Skip if no strong cyclic pattern
            if cycle_strength < self.min_fft_power:
                return None
            
            # Calculate 50-day MA
            ma_50 = np.mean(prices[-self.ma_period:])
            current_price = prices[-1]
            
            # Price position relative to MA (trend confirmation)
            ma_position = (current_price - ma_50) / ma_50
            
            # Calculate short-term momentum
            momentum_5d = (prices[-1] / prices[-6] - 1) if len(prices) > 5 else 0
            momentum_20d = (prices[-1] / prices[-21] - 1) if len(prices) > 20 else 0
            
            # Combine factors for final score
            # Higher score = stronger cyclic pattern + above MA + positive momentum
            score = 0
            
            # Cyclic strength component (40% weight)
            score += cycle_strength * 0.4
            
            # Trend component (30% weight) - must be above MA
            if ma_position > 0:
                score += min(ma_position * 2, 0.3)  # Cap at 0.3
            else:
                return None  # Skip stocks below 50MA
            
            # Momentum component (30% weight)
            combined_momentum = (momentum_5d * 0.6 + momentum_20d * 0.4)
            if combined_momentum > 0:
                score += min(combined_momentum * 2, 0.3)  # Cap at 0.3
            else:
                return None  # Skip negative momentum stocks
            
            # Calculate expected cycle period for position sizing
            if len(dominant_freqs) > 0:
                primary_period = 1 / dominant_freqs[-1]  # Strongest frequency
                self.debug(f"{self.get_ticker(symbol)}: Cycle period ~{primary_period:.1f} days, Score: {score:.3f}")
            
            return score
            
        except Exception as e:
            return None
    
    def rebalance_portfolio(self):
        """Rebalance based on FFT momentum scores"""
        
        # Skip during warm-up
        if self.is_warming_up:
            return
        
        # Only rebalance every N days
        if (self.time - self.last_rebalance).days < self.rebalance_days:
            return
        
        self.debug(f"=== REBALANCING {self.time.date()} ===")
        
        # Calculate FFT momentum for all stocks
        self.momentum_scores = {}
        
        for symbol in self.universe_symbols[:200]:  # Limit to top 200 for speed
            try:
                # Skip if already processing too many
                if len(self.momentum_scores) > 50:
                    break
                
                # Skip financial companies (optional)
                fund = self.securities[symbol].fundamentals
                if fund.has_fundamental_data:
                    code = fund.company_reference.industry_template_code
                    if code and len(code) > 0 and code[0] in ['B', 'I', 'F', 'R']:
                        continue
                
                # Calculate FFT momentum score
                score = self.calculate_fft_momentum(symbol)
                if score is not None and score > 0:
                    self.momentum_scores[symbol] = score
                    
            except:
                continue
        
        # Need minimum stocks
        if len(self.momentum_scores) < 5:
            self.debug(f"Insufficient momentum stocks: {len(self.momentum_scores)}")
            self.last_rebalance = self.time
            return
        
        # Select top momentum stocks
        sorted_scores = sorted(self.momentum_scores.items(), key=lambda x: x[1], reverse=True)
        top_momentum = [s for s, score in sorted_scores[:self.top_momentum_count] if score > 0]
        
        self.debug(f"Top momentum stocks: {len(top_momentum)}")
        for symbol, score in sorted_scores[:5]:
            self.debug(f"  {self.get_ticker(symbol)}: {score:.3f}")
        
        # Sell positions not in top momentum (unless still winning)
        for symbol in list(self.portfolio.keys()):
            if symbol == self.spy_symbol:
                continue
                
            if self.portfolio[symbol].invested and symbol not in top_momentum:
                # Check if still has momentum
                if symbol in self.entry_prices:
                    entry = self.entry_prices[symbol]
                    current = self.securities[symbol].price
                    gain = (current - entry) / entry
                    
                    # Keep if still strongly positive and not at stop/target
                    if 0.05 < gain < self.take_profit:
                        self.debug(f"Holding momentum winner: {self.get_ticker(symbol)} +{gain:.1%}")
                        continue
                
                # Exit position
                self.liquidate(symbol)
                self.debug(f"Exited: {self.get_ticker(symbol)}")
                if symbol in self.entry_prices:
                    del self.entry_prices[symbol]
        
        # Enter new positions with dynamic sizing based on score
        total_score = sum(self.momentum_scores[s] for s in top_momentum if s in self.momentum_scores)
        
        for symbol in top_momentum:
            if not self.portfolio[symbol].invested:
                try:
                    # Dynamic position sizing based on relative score
                    score = self.momentum_scores.get(symbol, 0)
                    if score > 0 and total_score > 0:
                        weight = (score / total_score) * 0.95  # Use 95% of capital
                        weight = min(weight, 0.15)  # Max 15% per position
                        
                        self.set_holdings(symbol, weight)
                        self.entry_prices[symbol] = self.securities[symbol].price
                        self.debug(f"Entered: {self.get_ticker(symbol)} @ {weight:.1%} allocation")
                except:
                    continue
        
        positions = len([s for s in self.portfolio.keys() if self.portfolio[s].invested and s != self.spy_symbol])
        self.debug(f"Portfolio: {positions} positions")
        self.last_rebalance = self.time
    
    def manage_positions(self):
        """Daily position management - stops and targets"""
        
        if self.is_warming_up:
            return
        
        for symbol in list(self.portfolio.keys()):
            if not self.portfolio[symbol].invested or symbol == self.spy_symbol:
                continue
            
            if symbol in self.entry_prices:
                entry = self.entry_prices[symbol]
                current = self.securities[symbol].price
                gain = (current - entry) / entry
                
                # Stop loss
                if gain < self.stop_loss:
                    self.liquidate(symbol)
                    self.debug(f"Stop loss: {self.get_ticker(symbol)} at {gain:.1%}")
                    del self.entry_prices[symbol]
                
                # Take profit
                elif gain > self.take_profit:
                    self.liquidate(symbol)
                    self.debug(f"Take profit: {self.get_ticker(symbol)} at {gain:.1%}")
                    del self.entry_prices[symbol]
                
                # Trailing stop for big winners
                elif gain > 0.15:
                    # Implement trailing stop at 10% from peak
                    history = self.history(symbol, 20, Resolution.DAILY)
                    if len(history) > 0:
                        recent_high = max(history["high"])
                        drawdown = (recent_high - current) / recent_high
                        
                        if drawdown > 0.10:
                            self.liquidate(symbol)
                            self.debug(f"Trailing stop: {self.get_ticker(symbol)} at {gain:.1%}")
                            del self.entry_prices[symbol]
    
    def get_ticker(self, symbol):
        """Get ticker string"""
        return str(symbol).split(' ')[0]
    
    def on_data(self, data):
        """Required but unused"""
        pass