Overall Statistics
Total Orders
1384
Average Win
0.68%
Average Loss
-0.56%
Compounding Annual Return
27.833%
Drawdown
32.900%
Expectancy
0.682
Start Equity
1000000
End Equity
9131981.96
Net Profit
813.198%
Sharpe Ratio
1.039
Sortino Ratio
1.093
Probabilistic Sharpe Ratio
60.125%
Loss Rate
24%
Win Rate
76%
Profit-Loss Ratio
1.22
Alpha
0.096
Beta
0.962
Annual Standard Deviation
0.172
Annual Variance
0.03
Information Ratio
0.966
Tracking Error
0.096
Treynor Ratio
0.186
Total Fees
$17952.67
Estimated Strategy Capacity
$230000000.00
Lowest Capacity Asset
LOW R735QTJ8XC9X
Portfolio Turnover
2.39%
Drawdown Recovery
448
from AlgorithmImports import *
from plot import SignalDiagnosticsState


class TextualStabilityAlphaModel(AlphaModel):
    
    def __init__(self, algorithm, signal_model, plotter, spy, rank_fraction, minimum_selected_count, target_gross_exposure):
        # Store the framework collaborators and strategy parameters.
        self._algorithm = algorithm
        self._signal_model = signal_model
        self._plotter = plotter
        self._spy = spy
        self._rank_fraction = rank_fraction
        self._minimum_selected_count = minimum_selected_count
        self._target_gross_exposure = target_gross_exposure
        self._security_by_ticker = {}
        self._insights = []
        self._current_target_symbols = []
        # Add a Scheduled Event to emit monthly insights on the first SPY trading day reached each month.
        algorithm.schedule.on(
            algorithm.date_rules.month_start(self._spy), 
            algorithm.time_rules.after_market_open(self._spy, 30), 
            self._create_insights
        )

    def update(self, _algorithm, _data):
        insights = self._insights
        self._insights = []
        return insights

    def on_securities_changed(self, _algorithm, changes):
        # Track added equity securities by filing ticker and install the original fee model.
        for security in changes.added_securities:
            if security.symbol == self._spy or security.symbol.security_type != SecurityType.EQUITY:
                continue
            self._security_by_ticker[str(security.symbol.value).upper().replace("-", ".")] = security
        # Remove securities that leave the manual universe from active lookup state.
        for security in changes.removed_securities:
            ticker = str(security.symbol.value).upper().replace("-", ".")
            if self._security_by_ticker.get(ticker) == security:
                del self._security_by_ticker[ticker]
            if security.symbol in self._current_target_symbols:
                self._current_target_symbols.remove(security.symbol)

    def _create_insights(self):
        # Select tradable active filings using the original point-in-time ranking rule.
        selection = self._signal_model.select_active_filings(self._algorithm.time.date(), self._security_by_ticker, self._rank_fraction, self._minimum_selected_count)
        if selection.active_count < self._minimum_selected_count or not selection.has_selected_securities():
            self._queue_flat_insights(selection)
            return
        # Emit equal-weight long insights for selected securities and flat insights for dropped holdings.
        selected_symbols = [security.symbol for security in selection.change_by_security]
        equal_weight = self._target_gross_exposure / selection.selected_count()
        insights = []
        for security in selection.change_by_security:
            insights.append(Insight.price(security.symbol, timedelta(days=45), InsightDirection.UP, weight=equal_weight))
        for symbol in sorted(self._current_target_symbols, key=lambda x: x.value):
            if symbol in selected_symbols:
                continue
            insights.append(Insight.price(symbol, timedelta(days=45), InsightDirection.FLAT, weight=0))
        self._insights = insights
        self._current_target_symbols = selected_symbols
        self._plotter.plot_selection(SignalDiagnosticsState(selection.active_count, selection.selected_count(), selection.average_change(), selection.selection_cutoff(), equal_weight * selection.selected_count()))

    def _queue_flat_insights(self, selection):
        # Emit flat insights for previously selected holdings when no selected security is tradable.
        insights = []
        for symbol in sorted(self._current_target_symbols, key=lambda x: x.value):
            insights.append(Insight.price(symbol, timedelta(days=45), InsightDirection.FLAT, weight=0))
        self._insights = insights
        self._current_target_symbols = []
        self._plotter.plot_selection(SignalDiagnosticsState(selection.active_count, selection.selected_count(), selection.average_change(), selection.selection_cutoff(), 0.0))
# region imports
from AlgorithmImports import *
from alpha import TextualStabilityAlphaModel
from models import FilingSignalModel
from plot import SignalDiagnosticsPlotter
# endregion


class TimingAwareTextualStabilityAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2016, 1, 1)
        self.set_end_date(2024, 12, 31)
        self.set_cash(1_000_000)
        self.universe_settings.resolution = Resolution.DAILY
        spy = self.add_equity("SPY", Resolution.DAILY)
        # Load the point-in-time filing signal model from the public CSV.
        self._signal_model = FilingSignalModel(self.download("https://raw.githubusercontent.com/EmilySun0920/Event-Signal/main/qc_event_signals_21_120.csv"))
        self.add_universe_selection(ManualUniverseSelectionModel(
            [Symbol.create(ticker, SecurityType.EQUITY, Market.USA) for ticker in self._signal_model.tickers]
            )
        )
        # Add the framework alpha model with the original ranking and exposure parameters.
        self.add_alpha(TextualStabilityAlphaModel(self, self._signal_model, SignalDiagnosticsPlotter(self), spy, 0.30, 3, 1.00))
        self.set_portfolio_construction(InsightWeightingPortfolioConstructionModel(lambda time: None, PortfolioBias.LONG))
# region imports
from AlgorithmImports import *
import io
# endregion


class FilingSelection:
    def __init__(self, active_count, change_by_security):
        self.active_count = active_count
        self.change_by_security = change_by_security

    def selected_count(self):
        # Count the tradable securities that survived selection.
        return len(self.change_by_security)

    def average_change(self):
        if not self.change_by_security:
            return 0.0
        # Average the selected textual change values for diagnostics.
        return float(np.mean(list(self.change_by_security.values())))

    def selection_cutoff(self):
        if not self.change_by_security:
            return 0.0
        # Use the largest selected change value as the cutoff diagnostic.
        return float(np.max(list(self.change_by_security.values())))

    def has_selected_securities(self):
        return self.selected_count() > 0


class FilingSignalModel:
    _required_columns = ["ticker", "filing_date", "signal_available_date", "entry_date", "exit_date", "similarity", "change", "log_change"]

    def __init__(self, filing_signal_csv):
        # Parse valid point-in-time filing events and cache their tickers.
        self.events = self._parse_events(filing_signal_csv)
        self.tickers = sorted(self.events["ticker"].drop_duplicates().tolist())

    def _parse_events(self, filing_signal_csv):
        # Load the filing signal CSV into a typed event table.
        events = pd.read_csv(io.StringIO(filing_signal_csv))
        missing_columns = [column for column in self._required_columns if column not in events.columns]
        # Convert filing timing columns to timestamps.
        for column in ["filing_date", "signal_available_date", "entry_date", "exit_date"]:
            events[column] = pd.to_datetime(events[column], errors="coerce")
        # Convert signal columns to numeric values.
        for column in ["similarity", "change", "log_change"]:
            events[column] = pd.to_numeric(events[column], errors="coerce")
        events["ticker"] = events["ticker"].astype(str).str.strip().str.upper().str.replace("-", ".", regex=False)
        events = events.dropna(subset=self._required_columns).copy()
        events = events.loc[(events["filing_date"] <= events["signal_available_date"]) & (events["signal_available_date"] <= events["entry_date"]) & (events["entry_date"] < events["exit_date"])].sort_values(["entry_date", "ticker"]).reset_index(drop=True)
        return events

    def select_active_filings(self, current_date, security_by_ticker, rank_fraction, minimum_selected_count):
        current_date = pd.Timestamp(current_date)
        # Select filing events whose entry and exit windows contain the current date.
        active_events = self.events[(self.events["entry_date"] <= current_date) & (self.events["exit_date"] >= current_date)].copy()
        # Keep the latest active filing per ticker when duplicate filings overlap.
        if not active_events.empty:
            active_events = active_events.sort_values(["ticker", "filing_date"]).drop_duplicates(subset=["ticker"], keep="last").reset_index(drop=True)
        active_events = active_events[active_events["ticker"].isin(security_by_ticker)].copy()
        # Return an empty selection when too few active filings exist.
        if len(active_events) < minimum_selected_count:
            return FilingSelection(len(active_events), {})
        # Rank active firms by lowest textual change and ticker tie-breaker.
        active_events = active_events.sort_values(["change", "ticker"]).reset_index(drop=True)
        selected_count = math.floor(len(active_events) * rank_fraction)
        selected_count = max(selected_count, minimum_selected_count)
        selected_count = min(selected_count, len(active_events))
        change_by_security = {}
        for event in active_events.head(selected_count).itertuples(index=False):
            security = security_by_ticker[event.ticker]
            if not security.has_data or not security.is_tradable:
                continue
            change_by_security[security] = float(event.change)
        return FilingSelection(len(active_events), change_by_security)
# region imports
from AlgorithmImports import *
# endregion


class SignalDiagnosticsState:
    def __init__(self, active_count, selected_count, average_selected_change, selection_cutoff, gross_target_exposure):
        self.active_count = active_count
        self.selected_count = selected_count
        self.average_selected_change = average_selected_change
        self.selection_cutoff = selection_cutoff
        self.gross_target_exposure = gross_target_exposure


class SignalDiagnosticsPlotter:
    def __init__(self, algorithm):
        # Store the algorithm so plotting remains isolated from alpha logic.
        self._algorithm = algorithm

    def _plot(self, chart_name, series):
        # Plot each named series on the requested chart.
        for name, value in series:
            self._algorithm.plot(chart_name, name, value)

    def plot_selection(self, state):
        # Plot monthly filing availability and portfolio construction diagnostics.
        self._plot("Signal Availability", [("Active Events", state.active_count), ("Selected Firms", state.selected_count)])
        self._plot("Signal Construction", [("Average Selected Change", state.average_selected_change), ("Selection Cutoff", state.selection_cutoff), ("Gross Target Exposure", state.gross_target_exposure)])