Overall Statistics
Total Orders
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Start Equity
1000000
End Equity
1000000
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-0.448
Tracking Error
0.15
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
#region imports
from AlgorithmImports import *
from universe import MarksFundamentalUniverseSelectionModel
import time
#endregion

class SwingTradingAlgo(QCAlgorithm):
    '''
    Swing trade based on technicals and fundamentals.
    '''
    def initialize(self):
        self.algo_start_time = time.time()

        # Intialize backtest
        # self.set_start_date(1998, 1, 1)
        self.set_start_date(2022, 1, 1)
        self.set_end_date(2024, 11, 13)
        self.set_cash(1_000_000)

        self.universe_symbol_count = {}

        self.universe_settings.resolution = Resolution.DAILY
        self.add_universe_selection(MarksFundamentalUniverseSelectionModel(self))

        self.add_alpha(ConstantAlphaModel(InsightType.PRICE, InsightDirection.UP, timedelta(1)))

    def on_date(self, slice):
        pass

    def on_securities_changed(self, changes: SecurityChanges) -> None:
        # Track the count of symbols in the universe each day
        self.universe_symbol_count[self.time.date()] = len(self.active_securities)
        # for security in changes.removed_securities:
        #     self.debug(f"{self.time} - {security.symbol}")
        # if changes.added_securities or changes.removed_securities:
        #     self.debug(f"{self.time} - {len(self.active_securities)} symbols in universe.")

    def on_end_of_algorithm(self):
        runtime = time.time() - self.algo_start_time
        self.debug(f"Runtime: {runtime / 60:.2f} minutes")

        # Plot the number of symbols in the universe over time
        dates = list(self.universe_symbol_count.keys())
        counts = list(self.universe_symbol_count.values())
        
        plt.figure(figsize=(10, 6))
        plt.plot(dates, counts, label="Number of Symbols in Universe")
        plt.title("Universe Symbol Count Over Time")
        plt.xlabel("Date")
        plt.ylabel("Number of Symbols")
        plt.legend()
        plt.grid(True)
        plt.savefig("UniverseSymbolCount.png")  # Save the plot as an image
        self.debug("Plot saved as UniverseSymbolCount.png")

    # def on_delistings(self, delistings):
    #     ''' Delete EPS history for delisted stocks for memory efficiency '''
    #     for symbol, delisting in delistings.items():
    #         if delisting.Type == DelistingType.Delisted:
    #             if symbol in self.eps_data:
    #                 del self.eps_data[symbol]
# from AlgorithmImports import *
# from collections import deque
# import numpy as np
# import time

# class MarksFundamentals(QCAlgorithm):
#     def initialize(self):
#         self.algo_start_time = time.time()

#         # Intialize backtest
#         # self.set_start_date(1998, 1, 1)
#         self.set_start_date(2023, 1, 1)
#         # self.set_end_date(2024, 11, 13)
#         self.set_cash(100000)

#         # Initialize dictionary to store fundamentals data for US equities
#         self.eps_data = {}
#         self.pe_data = {}

#         # Track the last known EPS value for AAPL
#         self.last_known_eps_entry = None  # Stores the last (file_date, basic_eps) tuple for comparison

#         # Initialize US equities universe
#         self.universe_settings.Resolution = Resolution.Daily
#         self.add_universe(self.universe_filter)
        
#     def universe_filter(self, fundamental):
#         '''
#         1. EPS
        # 2. PE ratio
        # 3. Daily dollar volume > $4 million
        # 4. Price > $5
#         '''


    # def eps_pe_filter(self, eps, pe, symbol):
    #     '''
    #     Check if stock meets fundamental filter requirements.
    #     '''
    #     # Extract both file_date and EPS values for each quarter
    #     (q1_date, q1_eps), (q2_date, q2_eps), (q3_date, q3_eps), (q4_date, q4_eps), (q5_date, q5_eps), (q6_date, q6_eps), (q7_date, q7_eps), (q8_date, q8_eps) = eps[7], eps[6], eps[5], eps[4], eps[3], eps[2], eps[1], eps[0]

    #     # Ensure all quarters have positive earnings
    #     positive_earnings = all(q > 0 for q in [q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps])

    #     return positive_earnings

        # Calculate the year-over-year growth for each of the last 4 quarters
        # q1_annual_growth = q1_eps / q5_eps
        # q2_annual_growth = q2_eps / q6_eps
        # q3_annual_growth = q3_eps / q7_eps
        # q4_annual_growth = q4_eps / q8_eps

        # continuous_eps_growth = q1_annual_growth > q2_annual_growth > q3_annual_growth > q4_annual_growth


# def filter_eps_trends(eps):
#     """
#     1. The eps annual growth percentages have increased over the past 4 quarters
#     """
#     q1_annual_growth = eps[7] / eps[0]
#     q2_annual_growth = eps[6] / eps[1]
#     q3_annual_growth = eps[5] / eps[2]
#     q4_annual_growth = eps[4] / eps[3]

#     consistent_eps_growth = q1_annual_growth > q2_annual_growth > q3_annual_growth > q4_annual_growth
#     return consistent_eps_growth

# # Apply the filter across all symbols
# selected_symbols = []
# for symbol, group in quarters_df.groupby('symbol'):
#     eps_series = group['basic_eps'].values
#     # Ensure 8 quarters of data
#     if len(eps_series) != 8:
#         self.debug(f"Error: unknown number of quarters returned for {symbol}: {len(eps_series)}")
#     # EPS trend filter
#     if filter_eps_trends(eps_series):
#         selected_symbols.append(symbol)
#region imports
from AlgorithmImports import *
#endregion

class MarksFundamentalUniverseSelectionModel(FundamentalUniverseSelectionModel):
    """
    This universe model refreshes daily to contain US equities filtered by EPS trends and PE ratios.
    """
    def __init__(self, algorithm):
        self.algo = algorithm
        self.algo.debug("Initialized universe model.")
        self.quarterly_eps = {}           # Contains RollingWindow objects of EPS for every stock
        self.quarterly_file_date = {}    # Contains RollingWindow objects of filedates for every stock

        super().__init__(self.select)

    def select(self, fundamental):
        """
        1. Eight quarters of positive earnings

        Input:
            - fundamental: List of Fundamental objects
        """

        # Initial filter for stocks with available fundamental data
        filtered_stocks = [x for x in fundamental if not np.isnan(x.earning_reports.basic_eps.three_months) and not np.isnan(x.valuation_ratios.pe_ratio)]
        # self.algorithm.debug(f"Initial filter: {len(filtered_stocks)}")

        selected_by_fundamentals = self.select_fundamentals(filtered_stocks)
        # self.algorithm.debug(f"Fundamental filter: {len(selected_by_fundamentals)}")

        # if len(selected_by_fundamentals) > 0:
        #     self.algo.debug(f"{self.algo.time} - {len(selected_by_fundamentals)} filtered symbols")

        return selected_by_fundamentals

    def select_fundamentals(self, fundamental):
        """
        1. EPS
        2. PE ratio
        3. Daily dollar volume > $4 million
        4. Price > $5
        """
        selected_by_fundamentals = []
        
        for stock in fundamental:
            symbol = stock.symbol
            file_date = str(stock.earning_reports.file_date.three_months.date())
            basic_eps = stock.earning_reports.basic_eps.three_months
            pe_ratio  = stock.valuation_ratios.pe_ratio
            date = stock.end_time

            # Add symbol to the dict if it does not exist
            if symbol not in self.quarterly_eps:
                self.quarterly_eps[symbol] = RollingWindow[float](8)
                self.quarterly_file_date[symbol] = RollingWindow[str](8)

            # Store new quarterly EPS data
            if self.new_quarterly_eps(symbol, basic_eps, file_date):
                self.quarterly_eps[symbol].add(basic_eps)
                self.quarterly_file_date[symbol].add(file_date)

            # Apply fundamental filter once there is enough data
            if self.quarterly_eps[symbol].is_ready:
                if self.eps_filter(symbol, date):
                    selected_by_fundamentals.append(symbol)

        return selected_by_fundamentals

    def eps_filter(self, symbol, date):
        """
        EPS Filters:
        1. Eight quarters of positive earnings
        2. Four quarters of increasing EPS annual growth
        """
        eps_data = self.quarterly_eps[symbol]
        q1_eps, q2_eps, q3_eps, q4_eps = eps_data[0], eps_data[1], eps_data[2], eps_data[3]
        q5_eps, q6_eps, q7_eps, q8_eps = eps_data[4], eps_data[5], eps_data[6], eps_data[7]

        date_data = self.quarterly_file_date[symbol]
        q1_date, q2_date, q3_date, q4_date, q5_date, q6_date, q7_date, q8_date = date_data[0], date_data[1], date_data[2], date_data[3], date_data[4], date_data[5], date_data[6], date_data[7]

        # Check that all EPS values are positive
        positive_earnings = all(eps > 0 for eps in [q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps])

        # consistent_growth = 
        if positive_earnings:
            q1_annual_growth = q1_eps / q5_eps
            q2_annual_growth = q2_eps / q6_eps
            q3_annual_growth = q3_eps / q7_eps
            q4_annual_growth = q4_eps / q8_eps

            exponential_growth = q1_annual_growth >= 2 and q2_annual_growth >= 2
        else:
            exponential_growth = False

        # try:
        # except:
        #     self.algo.debug([q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps])
        #     self.algo.quit()

        # consistent_growth = q1_annual_growth > q3_annual_growth > q4_annual_growth and q2_annual_growth > q3_annual_growth > q4_annual_growth

        # if positive_earnings and exponential_growth:
        #     # Print the symbol and its EPS values
        #     eps_values = [q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps]
        #     date_values = [q1_date, q2_date, q3_date, q4_date, q5_date, q6_date, q7_date, q8_date]
        #     both_values = [[date, eps] for date, eps in zip(date_values, eps_values)]

        #     self.algo.debug(f"{date} - Symbol: {symbol}")
        #     self.algo.debug(f"{both_values}")
        
        return positive_earnings and exponential_growth

    def new_quarterly_eps(self, symbol, eps, file_date):
        return self.quarterly_eps[symbol][0] != eps or self.quarterly_file_date[symbol][0] != file_date