Overall Statistics
Total Orders
715
Average Win
0.74%
Average Loss
-0.45%
Compounding Annual Return
20.897%
Drawdown
51.700%
Expectancy
0.530
Start Equity
100000
End Equity
258351.37
Net Profit
158.351%
Sharpe Ratio
0.503
Sortino Ratio
0.587
Probabilistic Sharpe Ratio
12.521%
Loss Rate
42%
Win Rate
58%
Profit-Loss Ratio
1.65
Alpha
0.033
Beta
1.993
Annual Standard Deviation
0.303
Annual Variance
0.092
Information Ratio
0.513
Tracking Error
0.18
Treynor Ratio
0.076
Total Fees
$724.84
Estimated Strategy Capacity
$2100000000.00
Lowest Capacity Asset
MA TIX2XDPLFR6T
Portfolio Turnover
1.93%
Drawdown Recovery
835
############################################
# Researched and Implemented by: Ethan Huang
# Mentored by: Rudy Osuna 
# Triton Quantitative Trading @ UC San Diego 
############################################
# region imports
from AlgorithmImports import *
from scipy.optimize import minimize
from itertools import product
# endregion


class VirtualSkyBlueCormorant(QCAlgorithm):

    def initialize(self):
        self.set_start_date(self.end_date - timedelta(5*365))
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        self.universe_settings.resolution = Resolution.DAILY
        self.universe_settings.schedule.on(self.date_rules.week_start())
        self._universe = self.add_universe(self._select)
        self._window_size = 100
        self._position_size = 0.045
        self.set_warm_up(250)

    def on_warmup_finished(self):
        time_rule = self.time_rules.at(8, 0)
        # Rebalance the portfolio weekly after warmup.
        self.schedule.on(self.date_rules.week_start("SPY"), time_rule, self._rebalance)
        # Rebalance the portfolio today too.
        if self.live_mode:
            self._rebalance()
        else:
            self.schedule.on(self.date_rules.today, time_rule, self._rebalance)

    def _select(self, fundamental):
        # Select the top 100 most liquid stocks by dollar volume with price above $10.
        return [f.symbol for f in sorted(
            [f for f in fundamental if f.has_fundamental_data and f.price > 10],
            key=lambda f: f.dollar_volume
        )[-100:]]

    def on_securities_changed(self, changes):
        # Once a security leaves the universe deregister the EMA indicators.
        for security in changes.removed_securities:
            if security.invested:
                self.liquidate(security.symbol)
            self.deregister_indicator(security.fast_ema)
            self.deregister_indicator(security.slow_ema)
        # Initialize EMA indicators for newly added securities.
        for security in changes.added_securities:
            security.session.size = self._window_size
            security.fast_ema = self.ema(security.symbol, 50, Resolution.DAILY)
            security.slow_ema = self.ema(security.symbol, 200, Resolution.DAILY)
            self.warm_up_indicator(security.symbol, security.fast_ema, Resolution.DAILY)
            self.warm_up_indicator(security.symbol, security.slow_ema, Resolution.DAILY)

    def _fit_lppls(self, prices, t, weights):
        """Fit LPPLS model using weighted least squares."""
        if np.any(prices <= 0):
            return None
        log_prices = np.log(prices)
        # Grid search over parameter space to find global optimum.
        best_res = None
        best_err = float('inf')
        # Bounds: tc_rel (days to crash), m (power law), w (oscillation).
        bounds = ((0.1, 80.0), (0.1, 0.9), (4.0, 15.0))
        # Simplified triple-nested loop using itertools.
        for tc_g, m_g, w_g in product([3.0, 7.0, 15.0, 20.0, 50.0], [0.3, 0.5, 0.6], [6.0, 8.0, 10.0, 12.0]):
            res = minimize(self._lppls_error, x0=[tc_g, m_g, w_g], args=(t, weights, log_prices),
                           method='L-BFGS-B', bounds=bounds, options={'ftol': 1e-5})
            if res.success and res.fun < best_err and np.isfinite(res.fun):
                best_err = res.fun
                best_res = res
        if best_res is None:
            return None
        # Reconstruct the best fit.
        tc_rel, m, w = best_res.x
        X = self._lppls_design_matrix(t, tc_rel, m, w)
        W_diag = np.diag(weights)
        # Solve weighted least squares: (X^T W X)^-1 X^T W y.
        beta = np.linalg.lstsq(X.T @ W_diag @ X, X.T @ W_diag @ log_prices, rcond=None)[0]
        if len(beta) != 4:
            return None
        a, b, c1, c2 = beta
        # Convert oscillation components to amplitude and phase.
        c = np.sqrt(c1**2 + c2**2)
        phi = np.arctan2(c2, c1)
        # Calculate R-squared goodness of fit metric.
        resid = log_prices - (X @ beta)
        weighted_rss = np.sum(weights * (resid ** 2))
        ss_tot = np.sum(weights * (log_prices - np.sum(weights * log_prices)) ** 2)
        r2 = 1.0 - (weighted_rss / (ss_tot + 1e-12))
        return [tc_rel, m, w, a, b, c, phi], True, {'r2': r2}

    def _lppls_design_matrix(self, t, tc_rel, m, w):
        """Build the LPPLS design matrix X from nonlinear parameters."""
        # Critical time is current time plus days to crash.
        tc = t[-1] + tc_rel
        # Time to critical point (can be negative for past crashes).
        dt = np.abs(tc - t)
        dt[dt == 0] = 1e-6
        # Power-law term with oscillation (cosine and sine harmonics).
        f = dt ** m
        g = f * np.cos(w * np.log(dt))
        h = f * np.sin(w * np.log(dt))
        # Design matrix: [1, power, oscillation_cos, oscillation_sin].
        return np.vstack([np.ones_like(t), f, g, h]).T

    def _lppls_error(self, params, t, weights, log_prices):
        """Weighted residual error for a given set of LPPLS nonlinear parameters."""
        tc_rel, m, w = params
        X = self._lppls_design_matrix(t, tc_rel, m, w)
        W_diag = np.diag(weights)
        XtW = X.T @ W_diag
        XtWX = XtW @ X
        # Reject ill-conditioned matrices to avoid numerical instability.
        if np.linalg.cond(XtWX) > 1e10:
            return float('inf')
        beta = np.linalg.lstsq(XtWX, XtW @ log_prices, rcond=None)[0]
        resid = log_prices - (X @ beta)
        return np.sum(weights * (resid ** 2))

    def _rebalance(self):
        for symbol in self._universe.selected:
            security = self.securities[symbol]
            if self.is_warming_up or not (security.session.is_ready and security.fast_ema.is_ready and security.slow_ema.is_ready):
                continue
            closes = np.array([bar.close for bar in security.session][::-1])
            closes = closes[(closes > 0) & ~np.isnan(closes)]
            if len(closes) < int(self._window_size * 0.5):
                continue
            fast = security.fast_ema.current.value
            slow = security.slow_ema.current.value
            tolerance = 1.01
            trend_is_up = fast > slow * tolerance
            is_high_regime = security.price > slow * 1.05
            # Prepare LPPLS fit inputs.
            t = np.arange(len(closes), dtype=float)
            # Exponential weights emphasize recent data (0 = oldest, 1 = newest).
            weights = np.exp(np.linspace(0.0, 1.0, len(t)))
            weights /= np.sum(weights)
            # Run LPPLS fit.
            fit_output = self._fit_lppls(closes, t, weights)
            if fit_output is None:
                continue
            params, success, metrics = fit_output
            if not success:
                continue
            tc_rel, m, w, a, b, c, phi = params
            days_to_crash = float(tc_rel)
            r2 = metrics.get('r2', -1.0)
            # Power-law exponent and oscillation frequency constraints.
            is_valid_physics = (0.35 < m < 0.65) and (7.0 < w < 12.0)
            # Negative log-linear term indicates exponential growth (bubble).
            bubble_condition = b < 0
            # Oscillation amplitude must be smaller than log-linear term.
            osc_valid = c < abs(b)
            is_imminent = (0.0 < days_to_crash < 30.0)
            good_fit = (r2 > 0.7)
            # Execute trading logic based on LPPLS signal.
            if is_valid_physics and bubble_condition and osc_valid and is_imminent and good_fit:
                self.set_holdings(security, -self._position_size)
            # Trend-following when no crash signal.
            else:
                if security.holdings.is_short:
                    self.liquidate(security)
                elif not security.holdings.invested and trend_is_up and is_high_regime:
                    self.set_holdings(security, self._position_size)