| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 1000000 End Equity 1000000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.448 Tracking Error 0.15 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
#region imports
from AlgorithmImports import *
from universe import MarksFundamentalUniverseSelectionModel
import time
#endregion
class SwingTradingAlgo(QCAlgorithm):
'''
Swing trade based on technicals and fundamentals.
'''
def initialize(self):
self.algo_start_time = time.time()
# Intialize backtest
# self.set_start_date(1998, 1, 1)
self.set_start_date(2022, 1, 1)
self.set_end_date(2024, 11, 13)
self.set_cash(1_000_000)
self.universe_symbol_count = {}
self.universe_settings.resolution = Resolution.DAILY
self.add_universe_selection(MarksFundamentalUniverseSelectionModel(self))
self.add_alpha(ConstantAlphaModel(InsightType.PRICE, InsightDirection.UP, timedelta(1)))
def on_date(self, slice):
pass
def on_securities_changed(self, changes: SecurityChanges) -> None:
# Track the count of symbols in the universe each day
self.universe_symbol_count[self.time.date()] = len(self.active_securities)
# for security in changes.removed_securities:
# self.debug(f"{self.time} - {security.symbol}")
# if changes.added_securities or changes.removed_securities:
# self.debug(f"{self.time} - {len(self.active_securities)} symbols in universe.")
def on_end_of_algorithm(self):
runtime = time.time() - self.algo_start_time
self.debug(f"Runtime: {runtime / 60:.2f} minutes")
# Plot the number of symbols in the universe over time
dates = list(self.universe_symbol_count.keys())
counts = list(self.universe_symbol_count.values())
plt.figure(figsize=(10, 6))
plt.plot(dates, counts, label="Number of Symbols in Universe")
plt.title("Universe Symbol Count Over Time")
plt.xlabel("Date")
plt.ylabel("Number of Symbols")
plt.legend()
plt.grid(True)
plt.savefig("UniverseSymbolCount.png") # Save the plot as an image
self.debug("Plot saved as UniverseSymbolCount.png")
# def on_delistings(self, delistings):
# ''' Delete EPS history for delisted stocks for memory efficiency '''
# for symbol, delisting in delistings.items():
# if delisting.Type == DelistingType.Delisted:
# if symbol in self.eps_data:
# del self.eps_data[symbol]# from AlgorithmImports import *
# from collections import deque
# import numpy as np
# import time
# class MarksFundamentals(QCAlgorithm):
# def initialize(self):
# self.algo_start_time = time.time()
# # Intialize backtest
# # self.set_start_date(1998, 1, 1)
# self.set_start_date(2023, 1, 1)
# # self.set_end_date(2024, 11, 13)
# self.set_cash(100000)
# # Initialize dictionary to store fundamentals data for US equities
# self.eps_data = {}
# self.pe_data = {}
# # Track the last known EPS value for AAPL
# self.last_known_eps_entry = None # Stores the last (file_date, basic_eps) tuple for comparison
# # Initialize US equities universe
# self.universe_settings.Resolution = Resolution.Daily
# self.add_universe(self.universe_filter)
# def universe_filter(self, fundamental):
# '''
# 1. EPS
# 2. PE ratio
# 3. Daily dollar volume > $4 million
# 4. Price > $5
# '''
# def eps_pe_filter(self, eps, pe, symbol):
# '''
# Check if stock meets fundamental filter requirements.
# '''
# # Extract both file_date and EPS values for each quarter
# (q1_date, q1_eps), (q2_date, q2_eps), (q3_date, q3_eps), (q4_date, q4_eps), (q5_date, q5_eps), (q6_date, q6_eps), (q7_date, q7_eps), (q8_date, q8_eps) = eps[7], eps[6], eps[5], eps[4], eps[3], eps[2], eps[1], eps[0]
# # Ensure all quarters have positive earnings
# positive_earnings = all(q > 0 for q in [q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps])
# return positive_earnings
# Calculate the year-over-year growth for each of the last 4 quarters
# q1_annual_growth = q1_eps / q5_eps
# q2_annual_growth = q2_eps / q6_eps
# q3_annual_growth = q3_eps / q7_eps
# q4_annual_growth = q4_eps / q8_eps
# continuous_eps_growth = q1_annual_growth > q2_annual_growth > q3_annual_growth > q4_annual_growth
# def filter_eps_trends(eps):
# """
# 1. The eps annual growth percentages have increased over the past 4 quarters
# """
# q1_annual_growth = eps[7] / eps[0]
# q2_annual_growth = eps[6] / eps[1]
# q3_annual_growth = eps[5] / eps[2]
# q4_annual_growth = eps[4] / eps[3]
# consistent_eps_growth = q1_annual_growth > q2_annual_growth > q3_annual_growth > q4_annual_growth
# return consistent_eps_growth
# # Apply the filter across all symbols
# selected_symbols = []
# for symbol, group in quarters_df.groupby('symbol'):
# eps_series = group['basic_eps'].values
# # Ensure 8 quarters of data
# if len(eps_series) != 8:
# self.debug(f"Error: unknown number of quarters returned for {symbol}: {len(eps_series)}")
# # EPS trend filter
# if filter_eps_trends(eps_series):
# selected_symbols.append(symbol)#region imports
from AlgorithmImports import *
#endregion
class MarksFundamentalUniverseSelectionModel(FundamentalUniverseSelectionModel):
"""
This universe model refreshes daily to contain US equities filtered by EPS trends and PE ratios.
"""
def __init__(self, algorithm):
self.algo = algorithm
self.algo.debug("Initialized universe model.")
self.quarterly_eps = {} # Contains RollingWindow objects of EPS for every stock
self.quarterly_file_date = {} # Contains RollingWindow objects of filedates for every stock
super().__init__(self.select)
def select(self, fundamental):
"""
1. Eight quarters of positive earnings
Input:
- fundamental: List of Fundamental objects
"""
# Initial filter for stocks with available fundamental data
filtered_stocks = [x for x in fundamental if not np.isnan(x.earning_reports.basic_eps.three_months) and not np.isnan(x.valuation_ratios.pe_ratio)]
# self.algorithm.debug(f"Initial filter: {len(filtered_stocks)}")
selected_by_fundamentals = self.select_fundamentals(filtered_stocks)
# self.algorithm.debug(f"Fundamental filter: {len(selected_by_fundamentals)}")
# if len(selected_by_fundamentals) > 0:
# self.algo.debug(f"{self.algo.time} - {len(selected_by_fundamentals)} filtered symbols")
return selected_by_fundamentals
def select_fundamentals(self, fundamental):
"""
1. EPS
2. PE ratio
3. Daily dollar volume > $4 million
4. Price > $5
"""
selected_by_fundamentals = []
for stock in fundamental:
symbol = stock.symbol
file_date = str(stock.earning_reports.file_date.three_months.date())
basic_eps = stock.earning_reports.basic_eps.three_months
pe_ratio = stock.valuation_ratios.pe_ratio
date = stock.end_time
# Add symbol to the dict if it does not exist
if symbol not in self.quarterly_eps:
self.quarterly_eps[symbol] = RollingWindow[float](8)
self.quarterly_file_date[symbol] = RollingWindow[str](8)
# Store new quarterly EPS data
if self.new_quarterly_eps(symbol, basic_eps, file_date):
self.quarterly_eps[symbol].add(basic_eps)
self.quarterly_file_date[symbol].add(file_date)
# Apply fundamental filter once there is enough data
if self.quarterly_eps[symbol].is_ready:
if self.eps_filter(symbol, date):
selected_by_fundamentals.append(symbol)
return selected_by_fundamentals
def eps_filter(self, symbol, date):
"""
EPS Filters:
1. Eight quarters of positive earnings
2. Four quarters of increasing EPS annual growth
"""
eps_data = self.quarterly_eps[symbol]
q1_eps, q2_eps, q3_eps, q4_eps = eps_data[0], eps_data[1], eps_data[2], eps_data[3]
q5_eps, q6_eps, q7_eps, q8_eps = eps_data[4], eps_data[5], eps_data[6], eps_data[7]
date_data = self.quarterly_file_date[symbol]
q1_date, q2_date, q3_date, q4_date, q5_date, q6_date, q7_date, q8_date = date_data[0], date_data[1], date_data[2], date_data[3], date_data[4], date_data[5], date_data[6], date_data[7]
# Check that all EPS values are positive
positive_earnings = all(eps > 0 for eps in [q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps])
# consistent_growth =
if positive_earnings:
q1_annual_growth = q1_eps / q5_eps
q2_annual_growth = q2_eps / q6_eps
q3_annual_growth = q3_eps / q7_eps
q4_annual_growth = q4_eps / q8_eps
exponential_growth = q1_annual_growth >= 2 and q2_annual_growth >= 2
else:
exponential_growth = False
# try:
# except:
# self.algo.debug([q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps])
# self.algo.quit()
# consistent_growth = q1_annual_growth > q3_annual_growth > q4_annual_growth and q2_annual_growth > q3_annual_growth > q4_annual_growth
# if positive_earnings and exponential_growth:
# # Print the symbol and its EPS values
# eps_values = [q1_eps, q2_eps, q3_eps, q4_eps, q5_eps, q6_eps, q7_eps, q8_eps]
# date_values = [q1_date, q2_date, q3_date, q4_date, q5_date, q6_date, q7_date, q8_date]
# both_values = [[date, eps] for date, eps in zip(date_values, eps_values)]
# self.algo.debug(f"{date} - Symbol: {symbol}")
# self.algo.debug(f"{both_values}")
return positive_earnings and exponential_growth
def new_quarterly_eps(self, symbol, eps, file_date):
return self.quarterly_eps[symbol][0] != eps or self.quarterly_file_date[symbol][0] != file_date