# region imports
from AlgorithmImports import *
from universe import PiotroskiScoreUniverseSelectionModel
# endregion
class PiotroskiScoreAlgorithm(QCAlgorithm):
def initialize(self):
# Algorithm cash and period setup
self.set_cash(10_000_000)
self.set_start_date(2022, 10, 1)
self.set_end_date(2025, 10, 1)
# Configure settings to rebalance monthly.
rebalance_date = self.date_rules.month_start(Symbol.create('SPY', SecurityType.EQUITY, Market.USA))
# Universe settings
self.universe_settings.schedule.on(rebalance_date)
self.universe_settings.resolution = Resolution.HOUR
self.settings.rebalance_portfolio_on_insight_changes = False
self.add_universe_selection(PiotroskiScoreUniverseSelectionModel(
self,
self.get_parameter("score_threshold", 7),
self.get_parameter('universe_size', 100)
))
# Long only strategy on selected assets
self.add_alpha(ConstantAlphaModel(InsightType.PRICE, InsightDirection.UP, timedelta(30)))
# Weight sectors equally
self.set_portfolio_construction(SectorWeightingPortfolioConstructionModel(rebalance_date))
# Avoid illiquid assets. Maximum 1% spread allowed before execution
self.set_execution(SpreadExecutionModel(0.01))
self.set_warm_up(timedelta(750))
# region imports
from AlgorithmImports import *
# endregion
class PiotroskiScore:
def get_score(self, old_factors, current_factors):
return (
self.get_r_o_a_score(current_factors) # ROA
+ self.get_operating_cash_flow_score(current_factors) # CFO
+ self.get_r_o_a_change_score(old_factors, current_factors) # ΔROA
+ self.get_accruals_score(current_factors) # ACCRUAL
+ self.get_leverage_score(old_factors, current_factors) # ΔLEVER
+ self.get_liquidity_score(old_factors, current_factors) # ΔLIQUID
+ self.get_share_issued_score(old_factors, current_factors) # EQ_OFFER
+ self.get_gross_margin_score(old_factors, current_factors) # ΔMARGIN
+ self.get_asset_turnover_score(old_factors, current_factors) # ΔTURN
)
def get_r_o_a_score(self, factors, classify=True):
'''Get the Profitability - Return of Asset sub-score of Piotroski F-Score
"I define ROA ... as net income before extraordinary items ..., scaled by
beginning of the year total assets." (p. 7)
"Net income before extraordinary items for the fiscal year preceding
portfolio formation scaled by total assets at the beginning of year t."
(p. 13)
'''
# Nearest ROA as current year data
roa = factors.roa
if not classify:
return roa
# 1 score if ROA datum exists and positive, else 0
return int(roa and roa > 0)
def get_operating_cash_flow_score(self, factors, classify=True):
'''Get the Profitability - Operating Cash Flow sub-score of Piotroski F-Score
"I define ... CRO as ... cash flow from operations ..., scaled by
beginning of the year total assets." (p. 7)
"Cash flow from operations scaled by total assets at the beginning
of year t". (p. 13)
'''
# Nearest Operating Cash Flow as current year data
operating_cashflow = factors.operating_cash_flow
if not classify:
return operating_cashflow
# 1 score if operating cash flow datum exists and positive, else 0
return int(operating_cashflow and operating_cashflow > 0)
def get_r_o_a_change_score(self, old_factors, current_factors, classify=True):
'''Get the Profitability - Change in Return of Assets sub-score of Piotroski F-Score
"I define ΔROA as the current year's ROA less the prior year's ROA. If ΔROA > 0,
the indicator variable F_ΔROA equals one, zero otherwise." (p. 7)
"Change in annual ROA for the year preceding portfolio formation. ΔROA is
calculated as ROA for year t less the firm's ROA for year t-1." (p. 13)
'''
# if current or previous year's ROA data does not exist, return 0 score
roa = current_factors.roa
old_roa = old_factors.roa
if not roa or not old_roa:
return 0
if not classify:
return roa - old_roa
# 1 score if change in ROA positive, else 0 score
return int(roa > old_roa)
def get_accruals_score(self, factors, classify=True):
'''Get the Profitability - Accruals sub-score of Piotroski F-Score
"I define the variable ACCRUAL as current year's net income before
extraordinary items less cash flow from operations, scaled by
beginning of the year total assets. The indicator variable
F_ACCRUAL equals one if CFO ROA, zero otherwise." (p. 7)
"Net income before extraordinary items less cash flow from
operations, scaled by total assets at the beginning of year t." (p. 13)
'''
# Nearest Operating Cash Flow, Total Assets, ROA as current year data
operating_cashflow = factors.operating_cash_flow
total_assets = factors.total_assets
roa = factors.roa
if not classify:
return operating_cashflow / total_assets - roa if total_assets else None
# 1 score if operating cash flow, total assets and ROA exists, and operating cash flow / total assets > ROA, else 0
if int(operating_cashflow and total_assets and roa and operating_cashflow / total_assets > roa):
a = 1
return int(operating_cashflow and total_assets and roa and operating_cashflow / total_assets > roa)
def get_leverage_score(self, old_factors, current_factors, classify=True):
'''Get the Leverage, Liquidity and Source of Funds - Change in Leverage sub-score of Piotroski F-Score
"I measure ΔLEVER as the historical change in the ratio of total
long-term debt to average total assets, and view an increase (decrease)
in financial leverage as a negative (positive) signal.... I define the
indicator variable F_ΔLEVER to equal one (zero) if the firm's leverage
ratio fell (rose) in the year preceding portfolio formation." (p. 8)
"Change in the firm's debt-to-assets ratio between the end of year t
and year t-1. The debt-to-asset ratio is defined as the firm's total
long-term debt (including the portion of long-term debt classified
as current) scaled by average total assets." (p. 13)
'''
# if current or previous year's long term debt to equity ratio data does not exist, return 0 score
long_term_debt_ratio = current_factors.long_term_debt_equity_ratio
old_long_term_debt_ratio = old_factors.long_term_debt_equity_ratio
if not long_term_debt_ratio or not old_long_term_debt_ratio:
return 0
if not classify:
return long_term_debt_ratio - old_long_term_debt_ratio
# 1 score if long term debt ratio is lower in the current year, else 0 score
return int(long_term_debt_ratio < old_long_term_debt_ratio)
def get_liquidity_score(self, old_factors, current_factors, classify=True):
'''Get the Liquidity score
"The variable ΔLIQUID measures the historical change in the firm's
current ratio between the current and prior year, where I define the
current ratio as the ratio of current assets to current liabilities
at fiscal year-end. I assume that an improvement in liquidity (i.e.,
ΔLIQUID > 0) is a good signal about the firm's ability to service
current debt obligations. The indicator variable F_ΔLIQUID equals
one if the firm's liquidity improved, zero otherwise." (p. 8)
"Change in the firm's current ratio between the end of year t and
year t-1. Current ratio is defined as total current assets divided
by total current liabilities." (p. 13)
'''
# if current or previous year's current ratio data does not exist, return 0 score
current_ratio = current_factors.current_ratio
old_current_ratio = old_factors.current_ratio
if not current_ratio or not old_current_ratio:
return 0
if not classify:
return current_ratio - old_current_ratio
# 1 score if current ratio is higher in the current year, else 0 score
return int(current_ratio > old_current_ratio)
def get_share_issued_score(self, old_factors, current_factors, classify=True):
'''Get the share issued score
"I define the indicator variable EQ_OFFER to equal one if the firm
did not issue common equity in the year preceding portfolio formation,
zero otherwise." (p. 8)
'''
# if current or previous year's issued shares data does not exist, return 0 score
current_shares = current_factors.ordinary_shares_number
old_shares = old_factors.ordinary_shares_number
if not current_shares or not old_shares:
return 0
if not classify:
return current_shares - old_shares
# 1 score if shares issued did not increase in the current year, else 0 score
return int(current_shares <= old_shares)
def get_gross_margin_score(self, old_factors, current_factors, classify=True):
'''Get the gross margin score
"I define ΔMARGIN as the firm's current gross margin ratio (gross
margin scaled by total sales) less the prior year's gross margin
ratio.... The indicator variable F_ΔMARGIN equals one if ΔMARGIN
is positive, zero otherwise." (p. 8)
"Gross margin (net sales less cost of good sold) for the year
preceding portfolio formation, scaled by net sales for the year,
less the firm's gross margin (scaled by net sales) from year t-1."
(p. 13)
'''
# if current or previous year's gross margin data does not exist, return 0 score
gross_margin = current_factors.gross_margin
old_gross_margin = old_factors.gross_margin
if not gross_margin or not old_gross_margin:
return 0
if not classify:
return gross_margin - old_gross_margin
# 1 score if gross margin is higher in the current year, else 0 score
return int(gross_margin > old_gross_margin)
def get_asset_turnover_score(self, old_factors, current_factors, classify=True):
'''Get the asset turnover score
"I define ΔTURN as the firm's current year asset turnover ratio
(total sales scaled by beginning of the year total assets) less
the prior year's asset turnover ratio.... The indicator variable
F_ΔTURN equals one if ΔTURN is positive, zero otherwise." (p. 8)
"Change in the firm's asset turnover ratio between the end of
year t and year t-1. The asset turnover ratio is defined as net
sales scaled by average total assets for the year" (p. 13)
'''
# if current or previous year's asset turnover data does not exist, return 0 score
asset_turnover = current_factors.assets_turnover
old_asset_turnover = old_factors.assets_turnover
if not asset_turnover or not old_asset_turnover:
return 0
if not classify:
return asset_turnover - old_asset_turnover
# 1 score if asset turnover is higher in the current year, else 0 score
return int(asset_turnover > old_asset_turnover)# region imports
from AlgorithmImports import *
# endregion
class PiotroskiFactors:
def __init__(self, f):
self.roa = f.operation_ratios.roa.one_year
self.operating_cash_flow = f.financial_statements.cash_flow_statement.cash_flow_from_continuing_operating_activities.twelve_months
self.long_term_debt_equity_ratio = f.operation_ratios.long_term_debt_equity_ratio.one_year
self.current_ratio = f.operation_ratios.current_ratio.one_year
self.ordinary_shares_number = f.financial_statements.balance_sheet.ordinary_shares_number.twelve_months
self.gross_margin = f.operation_ratios.gross_margin.one_year
self.assets_turnover = f.operation_ratios.assets_turnover.one_year
self.total_assets = f.financial_statements.balance_sheet.total_assets.twelve_months
@staticmethod
def are_available(f):
return (
not np.isnan(f.operation_ratios.roa.one_year) and
not np.isnan(f.financial_statements.cash_flow_statement.cash_flow_from_continuing_operating_activities.twelve_months) and
not np.isnan(f.operation_ratios.long_term_debt_equity_ratio.one_year) and
not np.isnan(f.operation_ratios.current_ratio.one_year) and
not np.isnan(f.financial_statements.balance_sheet.ordinary_shares_number.twelve_months) and
not np.isnan(f.operation_ratios.gross_margin.one_year) and
not np.isnan(f.operation_ratios.assets_turnover.one_year) and
not np.isnan(f.financial_statements.balance_sheet.total_assets.twelve_months)
)# region imports
from AlgorithmImports import *
from piotroski_score import PiotroskiScore
from pitroski_factors import PiotroskiFactors
# endregion
class SymbolData:
_piotroski_score = PiotroskiScore()
def __init__(self, fundamental):
self.is_ready = False
self.score = None
self.period_ending_date = datetime.min
self._factors = RollingWindow(2)
self.update(fundamental)
def update(self, fundamental):
period_ending_date = fundamental.financial_statements.period_ending_date.twelve_months
# If a new 10K hasn't been released yet...
if self.period_ending_date == period_ending_date:
# If it's been 5 months since the previous fiscal year, rebalance this asset.
if (self._factors.is_ready and
(fundamental.end_time - self.period_ending_date).days > 5*30):
(old_factors, current_factors) = list(self._factors)[::-1]
self.score = self._piotroski_score.get_score(old_factors, current_factors)
self.is_ready = True
# When a new 10K is released, add the fundamental data to the RollingWindow.
else:
# If these two fundamental snap-shots aren't in consecutive years, reset the RollingWindow.
if period_ending_date.year - self.period_ending_date.year > 1:
self._factors.reset()
self._factors.add(PiotroskiFactors(fundamental))
self.period_ending_date = period_ending_date
return self.is_ready# region imports
from AlgorithmImports import *
from symbol_data import SymbolData
from pitroski_factors import PiotroskiFactors
# endregion
# TODO: Add the fundamental issue we found here to the docs.
class PiotroskiScoreUniverseSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self, algorithm, threshold, universe_size=100):
super().__init__(self._select_assets)
self._algorithm = algorithm
self._threshold = threshold
self._universe_size = universe_size
self._symbol_data_by_symbol = {}
def _select_assets(self, fundamentals):
# We only want stocks with fundamental data and price > $1
filtered = [
f for f in fundamentals
if (f.has_fundamental_data and
f.price > 1 and
f.dollar_volume > 100_000 and
not np.isnan(f.valuation_ratios.pb_ratio))
]
# Select the 20% of firms with the greatest Book-to-Market ratio.
filtered = sorted(
filtered, key=lambda f: 1 / f.valuation_ratios.pb_ratio
)[-int(0.2*len(filtered)):]
# Select the firms that have the fundamental data we need.
filtered = [f for f in filtered if PiotroskiFactors.are_available(f)]
# Calculate the f-scores of all the firms that passed the preceding filters.
f_scores = {}
for f in filtered:
if f.symbol not in self._symbol_data_by_symbol:
self._symbol_data_by_symbol[f.symbol] = SymbolData(f)
else:
symbol_data = self._symbol_data_by_symbol[f.symbol]
if symbol_data.update(f):
f_scores[f.symbol] = symbol_data.score
# Wait until we have sufficient history.
if self._algorithm.is_warming_up:
return []
# Modified:
# Select stocks with the highest F-Score, and take the top 100:
top_symbols = [
symbol for symbol, score in sorted(
f_scores.items(), key=lambda x: x[1], reverse=True
) if score >= self._threshold
][:self._universe_size]
self._algorithm.plot('F-Scores', 'Total', len(f_scores))
self._algorithm.plot('F-Scores', 'Above Thresold', len(top_symbols))
return top_symbols
# Original Paper:
# Select ALL stocks over the threshold.
#return [symbol for symbol, fscore in f_scores.items() if fscore >= self._threshold]