Overall Statistics
Total Orders
448
Average Win
3.14%
Average Loss
-1.90%
Compounding Annual Return
21.328%
Drawdown
21.900%
Expectancy
0.536
Start Equity
1000000
End Equity
8465263.43
Net Profit
746.526%
Sharpe Ratio
0.89
Sortino Ratio
0.974
Probabilistic Sharpe Ratio
48.052%
Loss Rate
42%
Win Rate
58%
Profit-Loss Ratio
1.65
Alpha
0.068
Beta
0.788
Annual Standard Deviation
0.146
Annual Variance
0.021
Information Ratio
0.551
Tracking Error
0.094
Treynor Ratio
0.165
Total Fees
$20636.85
Estimated Strategy Capacity
$1500000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
10.89%
Drawdown Recovery
299
#region imports
from AlgorithmImports import *
#endregion

import numpy as np
import pywt
from sklearn.svm import SVR


def _partition_array(arr, size=None, splits=None):
    arr = np.asarray(arr, dtype=float)

    if not (bool(size is None) ^ bool(splits is None)):
        raise ValueError("Size XOR Splits should not be None")

    X = []
    y = []

    if size is not None:
        for i in range(len(arr) - size):
            X.append(arr[i:i + size])
            y.append(arr[i + size])

    elif splits is not None:
        size = len(arr) // splits
        if size <= 0:
            return np.empty((0, 0)), np.empty((0,))
        for i in range(0, len(arr) - size, size):
            X.append(arr[i:i + size])
            y.append(arr[i + size])

    return np.asarray(X, dtype=float), np.asarray(y, dtype=float)


def _svm_forecast(data, sample_size=20):
    data = np.asarray(data, dtype=float)

    X, y = _partition_array(data, size=sample_size)
    if len(X) < 50:
        return float(data[-1])

    model = SVR(C=1.0, epsilon=0.001, gamma="scale")
    model.fit(X, y)

    return float(model.predict(data[np.newaxis, -sample_size:])[0])


def forecast(data):
    """
    data: 1D array of returns (recommended: log returns)
    returns: forecasted next return
    """
    data = np.asarray(data, dtype=float)

    w = pywt.Wavelet("sym4")

    threshold = 0.0040
    level = min(6, pywt.dwt_max_level(len(data), w.dec_len))
    coeffs = pywt.wavedec(data, w, level=level)

    for i in range(len(coeffs)):
        if i > 0:
            scale = np.max(np.abs(coeffs[i])) + 1e-12
            coeffs[i] = pywt.threshold(coeffs[i], threshold * scale, mode="soft")

        f = _svm_forecast(coeffs[i], sample_size=20)

        coeffs[i] = np.roll(coeffs[i], -1)
        coeffs[i][-1] = f

    rec = pywt.waverec(coeffs, w)
    return float(rec[-1])
#region imports
from AlgorithmImports import *
#endregion

import numpy as np
import SVMWavelet as svmw


class ShortTheEdge(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2015, 1, 1)
        self.SetCash(1_000_000)

        self.SetBrokerageModel(AlphaStreamsBrokerageModel())

        self.SetRiskManagement(CompositeRiskManagementModel(
            MaximumUnrealizedProfitPercentPerSecurity(0.1325),
            TrailingStopRiskManagementModel(0.0835)
        ))

        self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol

        self.period = 248                   
        self.long_weight = 0.9975           
        self.short_weight = -0.9975         
        self.short_trigger_k = 1.6         
        self.use_bear_regime_filter = True
        self.debug_daily = False

        self.hma200 = self.HMA(self.spy, 185, Resolution.Daily)

        self.window = RollingWindow[float](self.period)
        self._seeded = False

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.AfterMarketOpen(self.spy, 5),
            self.Rebalance
        )

        self.SetWarmup(self.period, Resolution.Daily)

    def OnData(self, data):
        if not self._seeded:
            hist = self.History(self.spy, self.period, Resolution.Daily)
            if not hist.empty:
                try:
                    hist_symbol = hist.loc[self.spy]
                except Exception:
                    hist_symbol = hist
                for _, row in hist_symbol.iterrows():
                    self.window.Add(float(row["close"]))
            self._seeded = True

        if data.Bars.ContainsKey(self.spy):
            self.window.Add(float(data.Bars[self.spy].Close))

    def Rebalance(self):
        if self.IsWarmingUp:
            return
        if self.window.Count < self.period:
            return

        sec = self.Securities[self.spy]
        if (not sec.IsTradable) or sec.Price <= 0:
            if self.debug_daily:
                self.Debug(f"{self.Time} SKIP: SPY not tradable / price not ready (Price={sec.Price})")
            return

        closes = np.array(list(self.window))[::-1]  
        recent = float(closes[-1])

        rets = np.diff(np.log(closes))
        if len(rets) < 60:
            return
        sigma = float(np.std(rets[-60:])) + 1e-12

        forecasted_close = float(svmw.forecast(closes))
        f_ret = (forecasted_close / recent) - 1.0
        edge = f_ret / sigma

        allow_short = True
        if self.use_bear_regime_filter:
            allow_short = self.hma200.IsReady and \
                          sec.Price < float(self.hma200.Current.Value)

        target = self.long_weight

        if allow_short and edge < -self.short_trigger_k:
            target = self.short_weight

        if self.debug_daily:
            self.Debug(
                f"{self.Time} edge={edge:.2f} sigma={sigma:.5f} "
                f"allow_short={allow_short} k={self.short_trigger_k} target={target:.3f} price={sec.Price:.2f}"
            )

        current_holdings = self.Portfolio[self.spy].HoldingsValue / max(self.Portfolio.TotalPortfolioValue, 1)
        if abs(current_holdings - target) < 0.02:
            return

        current_qty = self.Portfolio[self.spy].Quantity
        if (current_qty > 0 and target < 0) or (current_qty < 0 and target > 0):
            self.Liquidate(self.spy)

        self.SetHoldings(self.spy, target)