Overall Statistics
Total Orders
660
Average Win
0.83%
Average Loss
-0.36%
Compounding Annual Return
27.643%
Drawdown
38.000%
Expectancy
0.992
Start Equity
100000
End Equity
339058.66
Net Profit
239.059%
Sharpe Ratio
0.727
Sortino Ratio
0.859
Probabilistic Sharpe Ratio
29.426%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
2.29
Alpha
0.078
Beta
1.57
Annual Standard Deviation
0.242
Annual Variance
0.059
Information Ratio
0.902
Tracking Error
0.126
Treynor Ratio
0.112
Total Fees
$674.16
Estimated Strategy Capacity
$520000000.00
Lowest Capacity Asset
WMT R735QTJ8XC9X
Portfolio Turnover
1.65%
Drawdown Recovery
594
# region imports
from AlgorithmImports import *
from scipy.optimize import minimize
from itertools import product
# endregion

##############################################
# - Researched and Implemented by: Ethan Huang
# - Mentored by: Rudy Osuna 
# - Triton Quantitative Trading @ UC San Diego 
##############################################

class VirtualSkyBlueCormorant(QCAlgorithm):

    def initialize(self):
        self.set_start_date(self.end_date - timedelta(5*365))
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True   
        self.universe_settings.asynchronous = True
        self.universe_settings.resolution = Resolution.DAILY
        self._universe = self.add_universe(self._select)
        self.averages = {}
        self._window_size = 100
        self._position_size = 0.05
        self.set_warm_up(250)

    def on_warmup_finished(self):
        time_rule = self.time_rules.at(8, 0)
        # Rebalance the portfolio weekly after warmup.
        self.schedule.on(self.date_rules.week_start("SPY"), time_rule, self._rebalance)
        # Rebalance the portfolio today too.
        if self.live_mode:
            self._rebalance()
        else:
            self.schedule.on(self.date_rules.today, time_rule, self._rebalance)

    def _select(self, fundamental):
        for f in fundamental:
            if f.symbol not in self.averages:
                self.averages[f.symbol] = SymbolData(f.symbol)
            self.averages[f.symbol].update(f.end_time, f.adjusted_price)
        return [f.symbol for f in sorted(
            [f for f in fundamental if f.has_fundamental_data and f.price > 10],
            key=lambda f: f.dollar_volume
        )[-100:]]

    def on_securities_changed(self, changes):
        [self.liquidate(s.symbol) for s in changes.removed_securities if s.invested]
        for s in changes.added_securities:
            s.session.size = self._window_size

    def _fit_lppls(self, prices, t, weights):
        """Fit LPPLS model using weighted least squares. Requires numpy/scipy for mathematical optimization."""
        if np.any(prices <= 0):
            return None
        log_prices = np.log(prices)
        # Grid search over parameter space to find global optimum.
        best_res = None
        best_err = float('inf')
        # Bounds: tc_rel (days to crash), m (power law), w (oscillation).
        bounds = ((0.1, 80.0), (0.1, 0.9), (4.0, 15.0))
        # Simplified triple-nested loop using itertools.
        for tc_g, m_g, w_g in product([3.0, 7.0, 15.0, 20.0, 50.0], [0.3, 0.5, 0.6], [6.0, 8.0, 10.0, 12.0]):
            res = minimize(self._lppls_error, x0=[tc_g, m_g, w_g], args=(t, weights, log_prices),
                           method='L-BFGS-B', bounds=bounds, options={'ftol': 1e-5})
            if res.success and res.fun < best_err and np.isfinite(res.fun):
                best_err = res.fun
                best_res = res
        if best_res is None:
            return None
        # Reconstruct the best fit.
        tc_rel, m, w = best_res.x
        X = self._lppls_design_matrix(t, tc_rel, m, w)
        W_diag = np.diag(weights)
        # Solve weighted least squares: (X^T W X)^-1 X^T W y.
        beta = np.linalg.lstsq(X.T @ W_diag @ X, X.T @ W_diag @ log_prices, rcond=None)[0]
        if len(beta) != 4:
            return None
        a, b, c1, c2 = beta
        # Convert oscillation components to amplitude and phase.
        c = np.sqrt(c1**2 + c2**2)
        phi = np.arctan2(c2, c1)
        # Calculate R-squared goodness of fit metric.
        resid = log_prices - (X @ beta)
        weighted_rss = np.sum(weights * (resid ** 2))
        ss_tot = np.sum(weights * (log_prices - np.sum(weights * log_prices)) ** 2)
        r2 = 1.0 - (weighted_rss / (ss_tot + 1e-12))
        return [tc_rel, m, w, a, b, c, phi], True, {'r2': r2}

    def _lppls_design_matrix(self, t, tc_rel, m, w):
        """Build the LPPLS design matrix X from nonlinear parameters."""
        # Critical time is current time plus days to crash.
        tc = t[-1] + tc_rel
        # Time to critical point (can be negative for past crashes).
        dt = np.abs(tc - t)
        dt[dt == 0] = 1e-6
        # Power-law term with oscillation (cosine and sine harmonics).
        f = dt ** m
        g = f * np.cos(w * np.log(dt))
        h = f * np.sin(w * np.log(dt))
        # Design matrix: [1, power, oscillation_cos, oscillation_sin].
        return np.vstack([np.ones_like(t), f, g, h]).T

    def _lppls_error(self, params, t, weights, log_prices):
        """Weighted residual error for a given set of LPPLS nonlinear parameters."""
        tc_rel, m, w = params
        X = self._lppls_design_matrix(t, tc_rel, m, w)
        W_diag = np.diag(weights)
        XtW = X.T @ W_diag
        XtWX = XtW @ X
        # Reject ill-conditioned matrices to avoid numerical instability.
        if np.linalg.cond(XtWX) > 1e10:
            return float('inf')
        beta = np.linalg.lstsq(XtWX, XtW @ log_prices, rcond=None)[0]
        resid = log_prices - (X @ beta)
        return np.sum(weights * (resid ** 2))

    def _rebalance(self):
        for symbol in self._universe.selected:
            security = self.securities[symbol]
            avg = self.averages.get(symbol)
            if self.is_warming_up or not (security.session.is_ready and avg is not None and avg.is_ready):
                continue
            closes = np.array([bar.close for bar in security.session][::-1])
            closes = closes[(closes > 0) & ~np.isnan(closes)]
            if len(closes) < int(self._window_size * 0.5):
                continue
            trend_is_up = avg.is_uptrend
            is_high_regime = avg.is_high_regime
            # Prepare LPPLS fit inputs.
            t = np.arange(len(closes), dtype=float)
            # Exponential weights emphasize recent data (0 = oldest, 1 = newest).
            weights = np.exp(np.linspace(0.0, 1.0, len(t)))
            weights /= np.sum(weights)
            # Run LPPLS fit.
            fit_output = self._fit_lppls(closes, t, weights)
            if fit_output is None:
                continue
            params, success, metrics = fit_output
            if not success:
                continue
            tc_rel, m, w, a, b, c, phi = params
            days_to_crash = float(tc_rel)
            r2 = metrics.get('r2', -1.0)
            # Power-law exponent and oscillation frequency constraints.
            is_valid_physics = (0.35 < m < 0.65) and (7.0 < w < 12.0)
            # Negative log-linear term indicates exponential growth (bubble).
            bubble_condition = b < 0
            # Oscillation amplitude must be smaller than log-linear term.
            osc_valid = c < abs(b)
            is_imminent = (0.0 < days_to_crash < 30.0)
            good_fit = (r2 > 0.7)
            # Execute trading logic based on LPPLS signal.
            if is_valid_physics and bubble_condition and osc_valid and is_imminent and good_fit:
                if security.holdings.is_long:
                    self.liquidate(security)
                self.set_holdings(security, -self._position_size)
            # Trend-following when no crash signal.
            else:
                if security.holdings.is_short:
                    self.liquidate(security)
                elif not security.holdings.invested and trend_is_up and is_high_regime:
                    self.set_holdings(security, self._position_size)


class SymbolData:

    def __init__(self, symbol):
        self.symbol = symbol
        # 1% above slow EMA indicates uptrend.
        self.tolerance = 1.01
        # Fast MA: 50-period, Slow MA: 200-period.
        self.fast = ExponentialMovingAverage(50)
        self.slow = ExponentialMovingAverage(200)
        self.is_uptrend = False
        self.is_high_regime = False
        self.is_ready = False
        self.scale = 0

    def update(self, time, value):
        if self.fast.update(time, value) and self.slow.update(time, value):
            fast = self.fast.current.value
            slow = self.slow.current.value
            # Moving average crossover: detect when fast MA crosses above slow MA.
            self.is_uptrend = fast > slow * self.tolerance
            self.is_high_regime = value > slow * 1.05
            self.is_ready = True
            self.scale = (fast - slow) / ((fast + slow) / 2.0) if self.is_uptrend else 0