Overall Statistics
# region imports
from AlgorithmImports import *
from universe import PiotroskiScoreUniverseSelectionModel
# endregion


class PiotroskiScoreAlgorithm(QCAlgorithm):

    def initialize(self):
        # Algorithm cash and period setup
        self.set_cash(10_000_000)
        self.set_start_date(2022, 10, 1)
        self.set_end_date(2025, 10, 1)
        
        # Configure settings to rebalance monthly.
        rebalance_date = self.date_rules.month_start(Symbol.create('SPY', SecurityType.EQUITY, Market.USA))
        
        # Universe settings
        self.universe_settings.schedule.on(rebalance_date)
        self.universe_settings.resolution = Resolution.HOUR
        self.settings.rebalance_portfolio_on_insight_changes = False
        self.add_universe_selection(PiotroskiScoreUniverseSelectionModel(
            self,
            self.get_parameter("score_threshold", 7),
            self.get_parameter('universe_size', 100)
        ))

        # Long only strategy on selected assets
        self.add_alpha(ConstantAlphaModel(InsightType.PRICE, InsightDirection.UP, timedelta(30)))
        # Weight sectors equally
        self.set_portfolio_construction(SectorWeightingPortfolioConstructionModel(rebalance_date))
        # Avoid illiquid assets. Maximum 1% spread allowed before execution 
        self.set_execution(SpreadExecutionModel(0.01))
        self.set_warm_up(timedelta(750))
# region imports
from AlgorithmImports import *
# endregion


class PiotroskiScore:

    def get_score(self, old_factors, current_factors):
        return (
            self.get_r_o_a_score(current_factors)   # ROA
            + self.get_operating_cash_flow_score(current_factors)  # CFO
            + self.get_r_o_a_change_score(old_factors, current_factors)  # ΔROA
            + self.get_accruals_score(current_factors)  # ACCRUAL
            + self.get_leverage_score(old_factors, current_factors)  # ΔLEVER
            + self.get_liquidity_score(old_factors, current_factors)  # ΔLIQUID
            + self.get_share_issued_score(old_factors, current_factors)  # EQ_OFFER
            + self.get_gross_margin_score(old_factors, current_factors)  # ΔMARGIN
            + self.get_asset_turnover_score(old_factors, current_factors)  # ΔTURN
        )

    def get_r_o_a_score(self, factors, classify=True):
        '''Get the Profitability - Return of Asset sub-score of Piotroski F-Score
        
        "I define ROA ... as net income before extraordinary items ..., scaled by 
        beginning of the year total assets." (p. 7)

        "Net income before extraordinary items for the fiscal year preceding 
        portfolio formation scaled by total assets at the beginning of year t."
        (p. 13)
        '''
        # Nearest ROA as current year data
        roa = factors.roa
        if not classify:
            return roa
        # 1 score if ROA datum exists and positive, else 0
        return int(roa and roa > 0)

    def get_operating_cash_flow_score(self, factors, classify=True):
        '''Get the Profitability - Operating Cash Flow sub-score of Piotroski F-Score
        
        "I define ... CRO as ... cash flow from operations ..., scaled by 
        beginning of the year total assets." (p. 7)

        "Cash flow from operations scaled by total assets at the beginning 
        of year t". (p. 13)
        '''
        # Nearest Operating Cash Flow as current year data
        operating_cashflow = factors.operating_cash_flow
        if not classify:
            return operating_cashflow
        # 1 score if operating cash flow datum exists and positive, else 0
        return int(operating_cashflow and operating_cashflow > 0)

    def get_r_o_a_change_score(self, old_factors, current_factors, classify=True):
        '''Get the Profitability - Change in Return of Assets sub-score of Piotroski F-Score
        
        "I define ΔROA as the current year's ROA less the prior year's ROA. If ΔROA > 0,
        the indicator variable F_ΔROA equals one, zero otherwise." (p. 7)

        "Change in annual ROA for the year preceding portfolio formation. ΔROA is
        calculated as ROA for year t less the firm's ROA for year t-1." (p. 13)
        '''
        # if current or previous year's ROA data does not exist, return 0 score
        roa = current_factors.roa
        old_roa = old_factors.roa
        if not roa or not old_roa:
            return 0
        if not classify:
            return roa - old_roa
        # 1 score if change in ROA positive, else 0 score
        return int(roa > old_roa)

    def get_accruals_score(self, factors, classify=True):
        '''Get the Profitability - Accruals sub-score of Piotroski F-Score
        
        "I define the variable ACCRUAL as current year's net income before 
        extraordinary items less cash flow from operations, scaled by 
        beginning of the year total assets. The indicator variable 
        F_ACCRUAL equals one if CFO  ROA, zero otherwise." (p. 7)

        "Net income before extraordinary items less cash flow from 
        operations, scaled by total assets at the beginning of year t." (p. 13)
        '''
        # Nearest Operating Cash Flow, Total Assets, ROA as current year data
        operating_cashflow = factors.operating_cash_flow
        total_assets = factors.total_assets
        roa = factors.roa
        if not classify:
            return operating_cashflow / total_assets - roa if total_assets else None
        # 1 score if operating cash flow, total assets and ROA exists, and operating cash flow / total assets > ROA, else 0
        if int(operating_cashflow and total_assets and roa and operating_cashflow / total_assets > roa):
            a = 1
        return int(operating_cashflow and total_assets and roa and operating_cashflow / total_assets > roa)

    def get_leverage_score(self, old_factors, current_factors, classify=True):
        '''Get the Leverage, Liquidity and Source of Funds - Change in Leverage sub-score of Piotroski F-Score
        
        "I measure ΔLEVER as the historical change in the ratio of total 
        long-term debt to average total assets, and view an increase (decrease)
        in financial leverage as a negative (positive) signal.... I define the
        indicator variable F_ΔLEVER to equal one (zero) if the firm's leverage 
        ratio fell (rose) in the year preceding portfolio formation." (p. 8)


        "Change in the firm's debt-to-assets ratio between the end of year t 
        and year t-1. The debt-to-asset ratio is defined as the firm's total
        long-term debt (including the portion of long-term debt classified 
        as current) scaled by average total assets." (p. 13)
        '''
        # if current or previous year's long term debt to equity ratio data does not exist, return 0 score
        long_term_debt_ratio = current_factors.long_term_debt_equity_ratio
        old_long_term_debt_ratio = old_factors.long_term_debt_equity_ratio
        if not long_term_debt_ratio or not old_long_term_debt_ratio:
            return 0
        if not classify:
            return long_term_debt_ratio - old_long_term_debt_ratio
        # 1 score if long term debt ratio is lower in the current year, else 0 score
        return int(long_term_debt_ratio < old_long_term_debt_ratio)

    def get_liquidity_score(self, old_factors, current_factors, classify=True):
        '''Get the Liquidity score
        
        "The variable ΔLIQUID measures the historical change in the firm's 
        current ratio between the current and prior year, where I define the 
        current ratio as the ratio of current assets to current liabilities 
        at fiscal year-end. I assume that an improvement in liquidity (i.e., 
        ΔLIQUID > 0) is a good signal about the firm's ability to service 
        current debt obligations. The indicator variable F_ΔLIQUID equals 
        one if the firm's liquidity improved, zero otherwise." (p. 8)

        "Change in the firm's current ratio between the end of year t and 
        year t-1. Current ratio is defined as total current assets divided 
        by total current liabilities." (p. 13)
        '''
        # if current or previous year's current ratio data does not exist, return 0 score
        current_ratio = current_factors.current_ratio
        old_current_ratio = old_factors.current_ratio
        if not current_ratio or not old_current_ratio:
            return 0
        if not classify:
            return current_ratio - old_current_ratio
        # 1 score if current ratio is higher in the current year, else 0 score
        return int(current_ratio > old_current_ratio)

    def get_share_issued_score(self, old_factors, current_factors, classify=True):
        '''Get the share issued score
        
        "I define the indicator variable EQ_OFFER to equal one if the firm 
        did not issue common equity in the year preceding portfolio formation, 
        zero otherwise." (p. 8)
        '''
        # if current or previous year's issued shares data does not exist, return 0 score
        current_shares = current_factors.ordinary_shares_number
        old_shares = old_factors.ordinary_shares_number
        if not current_shares or not old_shares:
            return 0
        if not classify:
            return current_shares - old_shares
        # 1 score if shares issued did not increase in the current year, else 0 score
        return int(current_shares <= old_shares)

    def get_gross_margin_score(self, old_factors, current_factors, classify=True):
        '''Get the gross margin score
        
        "I define ΔMARGIN as the firm's current gross margin ratio (gross 
        margin scaled by total sales) less the prior year's gross margin 
        ratio.... The indicator variable F_ΔMARGIN equals one if ΔMARGIN 
        is positive, zero otherwise." (p. 8)

        "Gross margin (net sales less cost of good sold) for the year 
        preceding portfolio formation, scaled by net sales for the year, 
        less the firm's gross margin (scaled by net sales) from year t-1." 
        (p. 13)
        '''
        # if current or previous year's gross margin data does not exist, return 0 score
        gross_margin = current_factors.gross_margin
        old_gross_margin = old_factors.gross_margin
        if not gross_margin or not old_gross_margin:
            return 0
        if not classify:
            return gross_margin - old_gross_margin
        # 1 score if gross margin is higher in the current year, else 0 score
        return int(gross_margin > old_gross_margin)

    def get_asset_turnover_score(self, old_factors, current_factors, classify=True):
        '''Get the asset turnover score
        
        "I define ΔTURN as the firm's current year asset turnover ratio 
        (total sales scaled by beginning of the year total assets) less
        the prior year's asset turnover ratio.... The indicator variable
        F_ΔTURN equals one if ΔTURN is positive, zero otherwise." (p. 8)

        "Change in the firm's asset turnover ratio between the end of 
        year t and year t-1. The asset turnover ratio is defined as net 
        sales scaled by average total assets for the year" (p. 13)
        '''
        # if current or previous year's asset turnover data does not exist, return 0 score
        asset_turnover = current_factors.assets_turnover
        old_asset_turnover = old_factors.assets_turnover
        if not asset_turnover or not old_asset_turnover:
            return 0
        if not classify:
            return asset_turnover - old_asset_turnover
        # 1 score if asset turnover is higher in the current year, else 0 score
        return int(asset_turnover > old_asset_turnover)
# region imports
from AlgorithmImports import *
# endregion


class PiotroskiFactors:

    def __init__(self, f):
        self.roa = f.operation_ratios.roa.one_year
        self.operating_cash_flow = f.financial_statements.cash_flow_statement.cash_flow_from_continuing_operating_activities.twelve_months
        self.long_term_debt_equity_ratio = f.operation_ratios.long_term_debt_equity_ratio.one_year
        self.current_ratio = f.operation_ratios.current_ratio.one_year
        self.ordinary_shares_number = f.financial_statements.balance_sheet.ordinary_shares_number.twelve_months
        self.gross_margin = f.operation_ratios.gross_margin.one_year
        self.assets_turnover = f.operation_ratios.assets_turnover.one_year
        self.total_assets = f.financial_statements.balance_sheet.total_assets.twelve_months

    @staticmethod
    def are_available(f):
        return (
            not np.isnan(f.operation_ratios.roa.one_year) and
            not np.isnan(f.financial_statements.cash_flow_statement.cash_flow_from_continuing_operating_activities.twelve_months) and
            not np.isnan(f.operation_ratios.long_term_debt_equity_ratio.one_year) and
            not np.isnan(f.operation_ratios.current_ratio.one_year) and
            not np.isnan(f.financial_statements.balance_sheet.ordinary_shares_number.twelve_months) and
            not np.isnan(f.operation_ratios.gross_margin.one_year) and 
            not np.isnan(f.operation_ratios.assets_turnover.one_year) and
            not np.isnan(f.financial_statements.balance_sheet.total_assets.twelve_months)
        )
# region imports
from AlgorithmImports import *

from piotroski_score import PiotroskiScore
from pitroski_factors import PiotroskiFactors
# endregion


class SymbolData:

    _piotroski_score = PiotroskiScore()

    def __init__(self, fundamental):
        self.is_ready = False
        self.score = None
        self.period_ending_date = datetime.min
        self._factors = RollingWindow(2)
        self.update(fundamental)
    
    def update(self, fundamental):
        period_ending_date = fundamental.financial_statements.period_ending_date.twelve_months
        # If a new 10K hasn't been released yet...
        if self.period_ending_date == period_ending_date:
            # If it's been 5 months since the previous fiscal year, rebalance this asset.
            if (self._factors.is_ready and 
                (fundamental.end_time - self.period_ending_date).days > 5*30):
                (old_factors, current_factors) = list(self._factors)[::-1]
                self.score = self._piotroski_score.get_score(old_factors, current_factors)
                self.is_ready = True
        # When a new 10K is released, add the fundamental data to the RollingWindow.
        else:
            # If these two fundamental snap-shots aren't in consecutive years, reset the RollingWindow.
            if period_ending_date.year - self.period_ending_date.year > 1:
                self._factors.reset()
            self._factors.add(PiotroskiFactors(fundamental))
            self.period_ending_date = period_ending_date
        return self.is_ready
# region imports
from AlgorithmImports import *

from symbol_data import SymbolData
from pitroski_factors import PiotroskiFactors
# endregion


# TODO: Add the fundamental issue we found here to the docs.

class PiotroskiScoreUniverseSelectionModel(FundamentalUniverseSelectionModel):

    def __init__(self, algorithm, threshold, universe_size=100):
        super().__init__(self._select_assets)
        self._algorithm = algorithm
        self._threshold = threshold
        self._universe_size = universe_size
        self._symbol_data_by_symbol = {}

    def _select_assets(self, fundamentals):
        # We only want stocks with fundamental data and price > $1
        filtered = [
            f for f in fundamentals
            if (f.has_fundamental_data and 
                f.price > 1 and 
                f.dollar_volume > 100_000 and 
                not np.isnan(f.valuation_ratios.pb_ratio))
        ]

        # Select the 20% of firms with the greatest Book-to-Market ratio.
        filtered = sorted(
            filtered, key=lambda f: 1 / f.valuation_ratios.pb_ratio
        )[-int(0.2*len(filtered)):]

        # Select the firms that have the fundamental data we need.
        filtered = [f for f in filtered if PiotroskiFactors.are_available(f)]
        
        # Calculate the f-scores of all the firms that passed the preceding filters.
        f_scores = {}
        for f in filtered:
            if f.symbol not in self._symbol_data_by_symbol:
                self._symbol_data_by_symbol[f.symbol] = SymbolData(f)
            else:
                symbol_data = self._symbol_data_by_symbol[f.symbol]
                if symbol_data.update(f):
                    f_scores[f.symbol] = symbol_data.score
        
        # Wait until we have sufficient history.
        if self._algorithm.is_warming_up:
            return []

        # Modified:
        # Select stocks with the highest F-Score, and take the top 100:
        top_symbols = [
            symbol for symbol, score in sorted(
                f_scores.items(), key=lambda x: x[1], reverse=True
            ) if score >= self._threshold
        ][:self._universe_size]

        self._algorithm.plot('F-Scores', 'Total', len(f_scores))
        self._algorithm.plot('F-Scores', 'Above Thresold', len(top_symbols))

        return top_symbols 

        # Original Paper:
        # Select ALL stocks over the threshold.
        #return [symbol for symbol, fscore in f_scores.items() if fscore >= self._threshold]