Overall Statistics
Total Orders
1488
Average Win
1.72%
Average Loss
-0.27%
Compounding Annual Return
73.960%
Drawdown
28.800%
Expectancy
0.872
Start Equity
100000
End Equity
539916.77
Net Profit
439.917%
Sharpe Ratio
1.54
Sortino Ratio
2.011
Probabilistic Sharpe Ratio
78.822%
Loss Rate
74%
Win Rate
26%
Profit-Loss Ratio
6.33
Alpha
0.373
Beta
1.032
Annual Standard Deviation
0.315
Annual Variance
0.099
Information Ratio
1.31
Tracking Error
0.287
Treynor Ratio
0.471
Total Fees
$7125.64
Estimated Strategy Capacity
$0
Lowest Capacity Asset
COIN XNNJXSXHIHYD
Portfolio Turnover
6.54%
Drawdown Recovery
189
# region imports
from AlgorithmImports import *
from scipy.optimize import minimize
import numpy as np
import pandas as pd
import json
import datetime
# endregion

class VirtualSkyBlueCormorant(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_cash(100000)
        self.settings.seed_initial_prices = True
        # 20 stocks - 17 volatile, 3 stable
        self.tickers = [
            "NVDA", "SMCI", "PLTR", "AI",   
            "IONQ", "RKLB", "DNA", "ASTS",  
            "TSLA", "PLUG", "QS", "LCID",   
            "MSTR", "COIN", "MARA",         
            "GME", "CVNA",                 
            "KO", "WM", "JNJ"              
        ]
        
        self.symbols = []
        for ticker in self.tickers:
            sym = self.add_equity(ticker, Resolution.DAILY).symbol
            self.symbols.append(sym)
            
        # 2. LPPLS Configuration
        self.window_size = 252
        self.run_every_n_days = 1
        self._last_run_day = None

        # 3. Schedule the Scan
        self.schedule.on(
            self.date_rules.every_day("SPY"), 
            self.time_rules.after_market_open("SPY", 30),
            self.ScanForBubbles
        )
        
        # 4. Storage for Research Data
        # We track the returns of the whole portfolio vs the benchmark to calculate DM later
        self.daily_returns = {} 
        self.previous_value = 100000 

    def ScanForBubbles(self):
        # 1. Time Control
        today = self.time.date()
        if self._last_run_day is not None and (today - self._last_run_day).days < self.run_every_n_days:
            return
        self._last_run_day = today

        # 2. Loop through every stock in our list
        for symbol in self.symbols:
            self.RunLPPLSOnSymbol(symbol)

    def RunLPPLSOnSymbol(self, symbol):
        # Data Extraction
        history = self.history(symbol, self.window_size, Resolution.DAILY)
        if history is None or history.empty: return

        try:
            if isinstance(history, pd.DataFrame):
                closes = history['close'].values
            else:
                closes = np.array(history)
        except: return

        # Clean Data
        closes = np.array([p for p in closes if (p is not None) and (not np.isnan(p)) and (p > 0)])
        if len(closes) < int(self.window_size * 0.5): return

        prices = closes
        t = np.arange(len(prices), dtype=float)
        current_price = prices[-1]

        # --- INDICATORS ---
        # Trend Filter
        sma_short = np.mean(prices[-30:]) if len(prices) >= 30 else np.mean(prices)
        trend_is_up = current_price > sma_short
        
        # Regime Filter (Z-Score > 2 is safer than 5%)
        # But keeping your 5% logic for consistency with your previous test
        long_term_mean = np.mean(prices)
        is_high_regime = current_price > (long_term_mean * 1.05)

        # --- LPPLS FIT ---
        weights = np.exp(np.linspace(0.0, 1.0, len(t)))
        weights /= np.sum(weights)

        try:
            fit_output = self.fit_lppls(prices, t, weights)
            if fit_output is None: return

            params, success, metrics = fit_output
            if not success: return

            tc_rel, m, w, a, b, c, phi = params
            days_to_crash = float(tc_rel)
            
            predicted_date = self.time + datetime.timedelta(days=days_to_crash)
            predicted_str = predicted_date.strftime("%Y-%m-%d")
            
            r2 = metrics.get('r2', -1.0)
            if r2 > 0.4: 
                self.Log(f"{symbol.Value} PREDICTED CRASH: {predicted_str} (in {days_to_crash:.1f} days) | R2: {r2:.2f}")

            # --- FILTERS ---
            is_valid_physics = (0.3 < m < 0.7) and (6.0 < w < 13.0)
            is_imminent = (0.0 < days_to_crash < 21.0)
            good_fit = (r2 > 0.6)

            # --- EXECUTION ---
            
            # SIGNAL: CRASH IS COMING
            if is_valid_physics and is_imminent and good_fit and is_high_regime:
                # Debug only on actionable signals
                self.debug(f"!!! SIGNAL {symbol.Value} !!! Crash: {predicted_str}")
                
                if self.portfolio[symbol].is_long:
                    self.liquidate(symbol)
                
                # Enter Short (small position per stock)
                self.set_holdings(symbol, -0.05)

            # NO SIGNAL: TREND FOLLOW
            else:
                if self.portfolio[symbol].is_short:
                    if trend_is_up: 
                        self.liquidate(symbol)
                
                elif not self.portfolio[symbol].invested and trend_is_up:
                    self.set_holdings(symbol, 0.05)
                
                elif self.portfolio[symbol].is_long and not trend_is_up:
                    self.liquidate(symbol)

        except Exception as e:
            return

    # --- MATH ENGINE ---
    def fit_lppls(self, prices, t, weights):
        prices = np.array(prices, dtype=float)
        if np.any(prices <= 0): return None
        log_prices = np.log(prices)
        
        def matrix_error_func(params):
            tc_rel, m, w = params
            tc = t[-1] + tc_rel
            dt = np.abs(tc - t)
            dt[dt == 0] = 1e-6

            f = dt ** m
            g = f * np.cos(w * np.log(dt))
            h = f * np.sin(w * np.log(dt))
            
            ones = np.ones_like(t)
            X = np.vstack([ones, f, g, h]).T
            W_diag = np.diag(weights)
            
            try:
                XtW = X.T @ W_diag
                beta = np.linalg.solve(XtW @ X, XtW @ log_prices)
            except np.linalg.LinAlgError:
                return np.inf

            preds = X @ beta
            resid = log_prices - preds
            weighted_rss = np.sum(weights * (resid ** 2))
            return weighted_rss

        # Grid Search
        best_res = None
        best_err = np.inf
        
        tc_guesses = [10.0, 30.0, 60.0]
        m_guesses  = [0.3, 0.5, 0.7]
        w_guesses  = [6.0, 9.0, 12.0]
        bounds = ((0.1, 100.0), (0.1, 0.9), (4.0, 15.0))

        for tc_g in tc_guesses:
            for m_g in m_guesses:
                for w_g in w_guesses:
                    try:
                        res = minimize(
                            matrix_error_func, 
                            x0=[tc_g, m_g, w_g], 
                            method='L-BFGS-B', 
                            bounds=bounds,
                            options={'ftol': 1e-9}
                        )
                        if res.success and res.fun < best_err:
                            best_err = res.fun
                            best_res = res
                    except: continue

        if best_res is None: return None

        # Reconstruct
        tc_rel, m, w = best_res.x
        tc = t[-1] + tc_rel
        dt = np.abs(tc - t)
        dt[dt == 0] = 1e-6
        
        f = dt ** m
        g = f * np.cos(w * np.log(dt))
        h = f * np.sin(w * np.log(dt))
        
        ones = np.ones_like(t)
        X = np.vstack([ones, f, g, h]).T
        W_diag = np.diag(weights)
        
        try:
            XtW = X.T @ W_diag
            beta = np.linalg.solve(XtW @ X, XtW @ log_prices)
            A, B, C1, C2 = beta
        except:
            return None

        C = np.sqrt(C1**2 + C2**2)
        phi = np.arctan2(C2, C1) 

        # R2 Metric
        preds = X @ beta
        resid = log_prices - preds
        weighted_rss = np.sum(weights * (resid ** 2))
        
        mean_lp = np.sum(weights * log_prices)
        ss_tot = np.sum(weights * (log_prices - mean_lp) ** 2)
        r2 = 1.0 - (weighted_rss / (ss_tot + 1e-12))

        return [tc_rel, m, w, A, B, C, phi], True, {'r2': r2}