Overall Statistics
Total Trades
6
Average Win
0%
Average Loss
0%
Compounding Annual Return
382.326%
Drawdown
10.500%
Expectancy
0
Net Profit
11.860%
Sharpe Ratio
7.423
Probabilistic Sharpe Ratio
77.139%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0.713
Beta
2.341
Annual Standard Deviation
0.387
Annual Variance
0.15
Information Ratio
5.444
Tracking Error
0.358
Treynor Ratio
1.228
Total Fees
$26.76
from System.Collections.Generic import List
from QuantConnect.Data.UniverseSelection import *
import operator
from math import ceil,floor
from scipy import stats
import numpy as np
from datetime import timedelta

class Piotroski(QCAlgorithm):
    
    def Initialize(self):
        ''' Backtesting Parameters '''
        self.SetStartDate(2020, 7, 30)  
        # self.SetEndDate(2012, 1, 1)
        self.SetCash(50000)            

        ''' Universe Settings '''
        self.benchmark = Symbol.Create("SPY", SecurityType.Equity, Market.USA)
        self.UniverseSettings.Resolution = Resolution.Minute        
        self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        self.topScoreSymbolsCoarse = 10000
        self.topScoreSymbolsFine   = 100

        ''' Schedule Settings '''
        self.AddEquity("SPY", Resolution.Minute)
        self.SetBenchmark("SPY")
        self.Schedule.On(self.DateRules.MonthEnd("SPY"), self.TimeRules.BeforeMarketClose("SPY", 10), self.Liquidate)
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), Action(self.Rebalance))
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 2), Action(self.Daily))
        
        ''' Other Settings '''
        self.month   = -1
        self.symbols = []
        self.changes = []
        self.qualityStocks = []
        self.lookback = 15
        self.initiated = -1
        
    def CoarseSelectionFunction(self, coarse):
        if self.month != self.Time.month:
            sortedCoarse = [x for x in coarse 
                            if x.HasFundamentalData 
                            and x.Price > 5]
            sortedDollarVolume =  sorted(sortedCoarse, key=lambda x: x.DollarVolume, reverse=True) 
            topCoarse = sortedDollarVolume[:self.topScoreSymbolsCoarse]
            return [x.Symbol for x in topCoarse]
        else: return self.symbols

    def FineSelectionFunction(self, fine):
        if self.month != self.Time.month:
            self.month = self.Time.month

            ''' Retrieve all stocks that have the valid variation ratios that we want '''
            filteredFine = [x for x in fine if x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths 
                                            and x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths 
                                            and x.OperationRatios.ROA.ThreeMonths 
                                            and x.OperationRatios.ROA.OneYear 
                                            and x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths 
                                            and x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths 
                                            and x.OperationRatios.GrossMargin.ThreeMonths 
                                            and x.OperationRatios.GrossMargin.OneYear 
                                            and x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths 
                                            and x.OperationRatios.LongTermDebtEquityRatio.OneYear  
                                            and x.OperationRatios.CurrentRatio.ThreeMonths 
                                            and x.OperationRatios.CurrentRatio.OneYear  
                                            and x.OperationRatios.AssetsTurnover.ThreeMonths 
                                            and x.OperationRatios.AssetsTurnover.OneYear  
                                            and x.ValuationRatios.NormalizedPERatio
                                            and x.EarningReports.BasicAverageShares.ThreeMonths 
                                            and x.EarningReports.BasicEPS.TwelveMonths
                                            and x.ValuationRatios.PayoutRatio > 0]
                                    
            ''' Using the FScore class, retrieve the stocks that have a score of X or higher '''    
            sortedByFScore = [x for x in filteredFine if FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths,
                                                                x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths,
                                                                x.OperationRatios.ROA.ThreeMonths, 
                                                                x.OperationRatios.ROA.OneYear,
                                                                x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths, 
                                                                x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths,  
                                                                x.OperationRatios.GrossMargin.ThreeMonths, 
                                                                x.OperationRatios.GrossMargin.OneYear,
                                                                x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths, 
                                                                x.OperationRatios.LongTermDebtEquityRatio.OneYear,
                                                                x.OperationRatios.CurrentRatio.ThreeMonths, 
                                                                x.OperationRatios.CurrentRatio.OneYear,
                                                                x.OperationRatios.AssetsTurnover.ThreeMonths, 
                                                                x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore() > 6
                                                                ]
                                                                
            self.qualityStocks = sortedByFScore
            
            ''' The Piotroski score ranks quality stocks, but we still need to determine value by filtering it more '''
            sortedByNormalizedPE = sorted(sortedByFScore, key=lambda x: (x.ValuationRatios.NormalizedPERatio), reverse = False)
                                                                        
            topFine = sortedByNormalizedPE[:self.topScoreSymbolsFine]

            self.symbols = [i.Symbol for i in topFine]
            return self.symbols
        else: return self.symbols
        
    def Daily(self):
        
        if self.initiated < 0:
            # self.Rebalance()
            self.initiated = 1
        
    def Rebalance(self):
    
        # Fetch the historical data to perform the linear regression
        history = self.History(
            self.symbols + [self.benchmark], 
            self.lookback,
            Resolution.Hour).close.unstack(level=0)
            
        symbols = self.SelectSymbols(history)
        
        ''' Invest 100% in the selected symbols '''
        buyingPower = (self.Portfolio.MarginRemaining / 3) * .995
        if buyingPower > 0:
            for symbol in symbols:
                if self.Securities[symbol].Price > 0:
                    orderSize = buyingPower / self.Securities[symbol].Price
                    self.MarketOrder(symbol, orderSize)
        else: self.Log("Insufficient Buying Power: " + str(self.Portfolio.MarginRemaining))

    def SelectSymbols(self, history):
        '''Select symbols with the highest intercept/alpha to the benchmark
        '''
        alphas = dict()

        # Get the benchmark returns
        benchmark = history[self.benchmark].pct_change().dropna()

        # Conducts linear regression for each symbol and save the intercept/alpha
        for symbol in self.symbols:
            
            # Get the security returns
            if not symbol in history: continue
            returns = history[symbol].pct_change().dropna()
            returns = np.vstack([returns, np.ones(len(returns))]).T
            if len(returns) != len(benchmark): continue

            # Simple linear regression function in Numpy
            result = np.linalg.lstsq(returns, benchmark)
            alphas[symbol] = result[0][1]

        # Select symbols with the highest intercept/alpha to the benchmark
        selected = sorted(alphas.items(), key=lambda x: x[1], reverse=True)[:20]
        return [x[0] for x in selected]
         

class FScore(object):
    
    def __init__(self, 
                 netincome, 
                 operating_cashflow, 
                 roa_current,
                 roa_past, 
                 issued_current, 
                 issued_past, 
                 grossm_current, 
                 grossm_past,
                 longterm_current, 
                 longterm_past, 
                 curratio_current, 
                 curratio_past,
                 assetturn_current, 
                 assetturn_past):
                     
        self.netincome = netincome
        self.operating_cashflow = operating_cashflow
        self.roa_current = roa_current
        self.roa_past = roa_past
        self.issued_current = issued_current
        self.issued_past = issued_past
        self.grossm_current = grossm_current
        self.grossm_past = grossm_past
        self.longterm_current = longterm_current
        self.longterm_past = longterm_past
        self.curratio_current = curratio_current
        self.curratio_past = curratio_past
        self.assetturn_current = assetturn_current
        self.assetturn_past = assetturn_past

    def ObjectiveScore(self):
        ''' The Piotroski score is broken down into profitability; leverage, liquidity, and source of funds; and operating efficiency categories, as follows: '''
        
        fscore = 0
        ''' Profitability Criteria '''
        fscore += np.where(self.netincome > 0,                            1, 0) # Positive Net Income (X Months?)
        fscore += np.where(self.operating_cashflow > 0,                   1, 0) # Positive Operating Cash Flow
        fscore += np.where(self.roa_current > self.roa_past,              1, 0) # Positive Return on Assets
        fscore += np.where(self.operating_cashflow > self.roa_current,    1, 0) # Cash flow from operations being greater than net income (quality of earnings)
        
        ''' Leverage, Liquidity, and Source of Dunds Criteria '''
        fscore += np.where(self.longterm_current <= self.longterm_past,   1, 0) # Lower ratio of long term debt in the current period, compared to the previous year (decreased leverage) 
        fscore += np.where(self.curratio_current >= self.curratio_past,   1, 0) # Higher current ratio this year compared to the previous year (more liquidity)
        fscore += np.where(self.issued_current <= self.issued_past,       1, 0) # No new shares were issued in the last year
        
        ''' Operating Efficiency Criteria '''
        # A higher gross margin compared to the previous year
        fscore += np.where(self.grossm_current >= self.grossm_past,       1, 0) # A higher gross margin compared to the previous year 
        fscore += np.where(self.assetturn_current >= self.assetturn_past, 1, 0) # A higher asset turnover ratio compared to the previous year (1 point)
        
        return fscore