Overall Statistics
Total Orders
2169
Average Win
0.36%
Average Loss
-0.42%
Compounding Annual Return
2.358%
Drawdown
18.200%
Expectancy
0.033
Start Equity
100000
End Equity
112365.45
Net Profit
12.365%
Sharpe Ratio
-0.163
Sortino Ratio
-0.188
Probabilistic Sharpe Ratio
1.618%
Loss Rate
44%
Win Rate
56%
Profit-Loss Ratio
0.86
Alpha
-0.015
Beta
-0.031
Annual Standard Deviation
0.104
Annual Variance
0.011
Information Ratio
-0.433
Tracking Error
0.178
Treynor Ratio
0.543
Total Fees
$2395.06
Estimated Strategy Capacity
$500000000.00
Lowest Capacity Asset
NFLX SEWJWLJNHZDX
Portfolio Turnover
9.47%
Drawdown Recovery
285
# region imports
from AlgorithmImports import * 
from portfolio_optimization import PortfolioOptimization
from symbol_data import SymbolData
# endregion


class BrainMLRankPortfolio(QCAlgorithm):

    def initialize(self):
        self.set_start_date(self.end_date - timedelta(5*365))
        self.set_cash(100_000)
        self._num_days = 252
        self._training_days = 5 * 252
        self._edge = 0.05
        # Session window for training fetching price data.
        session_size = self._num_days + 2
        tickers = ["META", "AAPL", "AMZN", "NFLX", "GOOGL"]
        self._faang = []
        for ticker in tickers:
            security = self.add_equity(ticker, Resolution.DAILY)
            # Attach Brain ML ranking data feeds for 2, 3, and 5-day horizons.
            r2 = self.add_data(BrainStockRanking2Day, security).symbol
            r3 = self.add_data(BrainStockRanking3Day, security).symbol
            r5 = self.add_data(BrainStockRanking5Day, security).symbol
            security.brain_stock_rank = [r2, r3, r5]
            # Per-symbol PCA + SVM model state.
            security.symbol_data = SymbolData(security.symbol, self._edge)
            security.side = None
            # Enable the session window; QC auto-fills it from cached history.
            security.session.size = session_size
            for bar in self.history[TradeBar](security, session_size):
                security.session.update(bar)
            self._faang.append(security)
        # Benchmark against SPY.
        self._spy = self.add_equity("SPY", Resolution.DAILY)
        self.set_portfolio_construction(
            InsightWeightingPortfolioConstructionModel(lambda time: Expiry.END_OF_WEEK(time))
        )
        self._portfolio_opt = PortfolioOptimization(0, 1)
        # Emit weekly insights every Monday before market open.
        self.schedule.on(
            self.date_rules.week_start("SPY"),
            self.time_rules.at(8, 0),
            self._insight_generator
        )

    def _insight_generator(self):
        insights = []
        long_price_by_symbol = {}
        short_price_by_symbol = {}
        benchmark_hist = self.history(self._spy, self._num_days, Resolution.DAILY)
        if benchmark_hist.empty:
            return
        benchmark = benchmark_hist["close"].unstack("symbol")
        for security in self._faang:
            symbol_data = security.symbol_data
            ranking_hist = self.history(security.brain_stock_rank, self._num_days, Resolution.DAILY)
            # Current in-progress bar, completed sessions start at index 1.
            session = self.securities[security.symbol].session
            completed = list(session)[1:self._num_days + 1][::-1]
            if ranking_hist.empty or not completed:
                continue
            ranking_df = ranking_hist["rank"].unstack("symbol")
            price_series = pd.DataFrame(
                [b.close for b in completed],
                index=pd.to_datetime([b.end_time for b in completed]),
                columns=[security.symbol]
            )
            if symbol_data.check_model(ranking_df, price_series, benchmark):
                train_hist = self.history(security.brain_stock_rank, self._training_days, Resolution.DAILY)
                ranking_df_train = train_hist["rank"].unstack("symbol")
                # Training with price data.
                price_series_train = (
                    self.history(security.symbol, self._training_days, Resolution.DAILY)["close"].unstack("symbol")
                )
                benchmark_train = (
                    self.history(self._spy, self._training_days, Resolution.DAILY)["close"].unstack("symbol")
                )
                symbol_data.update_model(ranking_df_train, price_series_train, benchmark_train)
                security.side = symbol_data.check_bias(ranking_df, price_series, benchmark)
            predict = symbol_data.prediction(ranking_df.iloc[-1].values.reshape(1, -1))
            if predict == 1 and security.side in ("long", "both"):
                long_price_by_symbol[security.symbol] = price_series
            elif predict == 0 and security.side in ("short", "both"):
                short_price_by_symbol[security.symbol] = price_series
        long_beta = 1
        short_beta = 1
        if long_price_by_symbol:
            long_df = pd.concat(long_price_by_symbol.values(), axis=1)
            long_weights = (
                self._portfolio_opt.calculate_position_size(long_df)
                if long_df.shape[1] > 1 else [1.]
            )
            long_beta = self._portfolio_opt.calculate_beta(long_df, long_weights, benchmark)
        else:
            long_df = benchmark
            long_weights = [1.]
        if short_price_by_symbol:
            short_df = pd.concat(short_price_by_symbol.values(), axis=1)
            short_weights = (
                self._portfolio_opt.calculate_position_size(-short_df)
                if short_df.shape[1] > 1 else [1.]
            )
            short_beta = self._portfolio_opt.calculate_beta(short_df, short_weights, benchmark)
        else:
            short_df = benchmark
            short_weights = [1.]
        # Emit long insights weighted by the short side's beta, and vice versa.
        for n, col in enumerate(long_df.columns):
            insights.append(
                Insight.price(col, Expiry.END_OF_WEEK, InsightDirection.UP, weight=short_beta * long_weights[n])
            )
        for n, col in enumerate(short_df.columns):
            insights.append(
                Insight.price(col, Expiry.END_OF_WEEK, InsightDirection.DOWN, weight=long_beta * short_weights[n])
            )
        self.emit_insights(insights)
# region imports
from AlgorithmImports import * 
from Portfolio.RiskParityPortfolioOptimizer import RiskParityPortfolioOptimizer # type: ignore
# endregion


class PortfolioOptimization:

    def __init__(self, min_weight=0., max_weight=1.):
        self._optimizer = RiskParityPortfolioOptimizer(min_weight, max_weight)

    def calculate_position_size(self, asset_df):
        returns = asset_df.pct_change().dropna()
        return self._optimizer.optimize(returns)

    def calculate_beta(self, asset_df, weights, benchmark):
        # Align asset prices and benchmark to a shared index before computing beta.
        aligned_assets, aligned_bench = asset_df.align(benchmark, join='inner', axis=0)
        if len(aligned_assets) < 2:
            return 1.0
        asset_returns = aligned_assets.pct_change()[1:]
        bench_returns = aligned_bench.pct_change()[1:]
        bench_var = bench_returns.var()
        if bench_var == 0 or np.isnan(bench_var):
            return 1.0
        portfolio_returns = np.dot(asset_returns, np.asarray(weights).reshape(-1, 1))
        cov = np.cov(np.concatenate((portfolio_returns, bench_returns), axis=1).T)[0, 1]
        beta = cov / bench_var
        return beta if not np.isnan(beta) else 1.0
# region imports
from AlgorithmImports import * 
from pandas.tseries.holiday import USFederalHolidayCalendar
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix
from sklearn.svm import SVC
# endregion
np.random.seed(0)


class SignalProcessor:

    def _prepare_weekly_data(self, ranking_df, price_series, benchmark):
        # Shift resampled weekly prices forward one business day to align with trading dates.
        bd = pd.offsets.CustomBusinessDay(calendar=USFederalHolidayCalendar())
        price_series = price_series.resample("W-MON").first()
        price_series.index = price_series.index + bd
        benchmark = benchmark.resample("W-MON").first()
        benchmark.index = benchmark.index + bd
        ranking_df = ranking_df.resample("W-MON").first()
        ranking_df.index = ranking_df.index + bd
        price_series = np.log(price_series / price_series.shift(1)).shift(-1)
        benchmark = np.log(benchmark / benchmark.shift(1)).shift(-1)
        price_series.index = pd.to_datetime(price_series.index, utc=True).date
        benchmark.index = pd.to_datetime(benchmark.index, utc=True).date
        ranking_df.index = pd.to_datetime(ranking_df.index, utc=True).date
        df = pd.concat([ranking_df, price_series, benchmark], axis=1).dropna()
        # Label weeks where the asset outperformed the benchmark as 1, underperformed as 0.
        active_return = df.iloc[:, -2].subtract(df.iloc[:, -1], axis=0)
        active_return[active_return <= 0] = 0
        active_return[active_return > 0] = 1
        return df.iloc[:, :-2], active_return

    def model_fitting(self, ranking_df, price_series, benchmark):
        # Fit and return a PCA + SVM classifier pair for the given symbol data.
        features, labels = self._prepare_weekly_data(ranking_df, price_series, benchmark)
        # Dimensionality reduction via PCA to mitigate overfitting.
        pca = PCA(n_components='mle')
        reduced = pd.DataFrame(pca.fit_transform(features.fillna(0)))
        svm = SVC(kernel='rbf')
        svm.fit(reduced, labels.fillna(0))
        return pca, svm

    def model_prediction(self, ranking_data, pca, svm):
        # Return directional prediction for the coming week: 1 = up, 0 = down.
        reduced = pd.DataFrame(pca.transform(ranking_data)).fillna(0)
        return int(svm.predict(reduced))

    def model_score(self, ranking_df, price_series, benchmark, pca, svm):
        # Return accuracy score of the SVM classifier on current data.
        features, labels = self._prepare_weekly_data(ranking_df, price_series, benchmark)
        reduced = pd.DataFrame(pca.transform(features.fillna(0)))
        return svm.score(reduced, labels.fillna(0))

    def model_bias(self, ranking_df, price_series, benchmark, pca, svm, edge):
        features, labels = self._prepare_weekly_data(ranking_df, price_series, benchmark)
        reduced = pd.DataFrame(pca.transform(features.fillna(0)))
        predicted = svm.predict(reduced)
        matrix = confusion_matrix(labels.fillna(0), predicted)
        if matrix.shape[0] <= 1:
            return None
        # Compute sensitivity (recall on positives) and specificity (recall on negatives).
        sensitivity = matrix[1, 1] / sum(matrix[1, :])
        specificity = matrix[0, 0] / sum(matrix[0, :])
        threshold = 0.5 + edge / 2
        # Return which side the model has sufficient edge on.
        if sensitivity > threshold:
            return "both" if specificity > threshold else "long"
        return "short" if specificity > threshold else None
# region imports
from AlgorithmImports import * 
from signal_processor import SignalProcessor
# endregion


class SymbolData:

    def __init__(self, symbol, edge):
        # Stores PCA + SVM models for each symbol and exposes check/update/predict methods.
        self._signal_processor = SignalProcessor()
        self.symbol = symbol
        self.edge = edge
        self._pca = None
        self._svm = None

    def check_model(self, ranking_df, price_series, benchmark):
        # Return True if the model is unfit or below the required edge threshold.
        if self._pca is None or self._svm is None:
            return True
        score = self._signal_processor.model_score(ranking_df, price_series, benchmark, self._pca, self._svm)
        return score < 0.5 + self.edge / 2

    def update_model(self, ranking_df, price_series, benchmark):
        # Refit the PCA and SVM models using the provided training data.
        self._pca, self._svm = self._signal_processor.model_fitting(ranking_df, price_series, benchmark)

    def check_bias(self, ranking_df, price_series, benchmark):
        # Return the tradeable side based on classifier sensitivity and specificity.
        return self._signal_processor.model_bias(ranking_df, price_series, benchmark, self._pca, self._svm, self.edge)

    def prediction(self, ranking_data):
        # Return the directional prediction for the coming week.
        return self._signal_processor.model_prediction(ranking_data, self._pca, self._svm)