| Overall Statistics |
|
Total Trades 6 Average Win 0% Average Loss 0% Compounding Annual Return 382.326% Drawdown 10.500% Expectancy 0 Net Profit 11.860% Sharpe Ratio 7.423 Probabilistic Sharpe Ratio 77.139% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.713 Beta 2.341 Annual Standard Deviation 0.387 Annual Variance 0.15 Information Ratio 5.444 Tracking Error 0.358 Treynor Ratio 1.228 Total Fees $26.76 |
from System.Collections.Generic import List
from QuantConnect.Data.UniverseSelection import *
import operator
from math import ceil,floor
from scipy import stats
import numpy as np
from datetime import timedelta
class Piotroski(QCAlgorithm):
def Initialize(self):
''' Backtesting Parameters '''
self.SetStartDate(2020, 7, 30)
# self.SetEndDate(2012, 1, 1)
self.SetCash(50000)
''' Universe Settings '''
self.benchmark = Symbol.Create("SPY", SecurityType.Equity, Market.USA)
self.UniverseSettings.Resolution = Resolution.Minute
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.topScoreSymbolsCoarse = 10000
self.topScoreSymbolsFine = 100
''' Schedule Settings '''
self.AddEquity("SPY", Resolution.Minute)
self.SetBenchmark("SPY")
self.Schedule.On(self.DateRules.MonthEnd("SPY"), self.TimeRules.BeforeMarketClose("SPY", 10), self.Liquidate)
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), Action(self.Rebalance))
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 2), Action(self.Daily))
''' Other Settings '''
self.month = -1
self.symbols = []
self.changes = []
self.qualityStocks = []
self.lookback = 15
self.initiated = -1
def CoarseSelectionFunction(self, coarse):
if self.month != self.Time.month:
sortedCoarse = [x for x in coarse
if x.HasFundamentalData
and x.Price > 5]
sortedDollarVolume = sorted(sortedCoarse, key=lambda x: x.DollarVolume, reverse=True)
topCoarse = sortedDollarVolume[:self.topScoreSymbolsCoarse]
return [x.Symbol for x in topCoarse]
else: return self.symbols
def FineSelectionFunction(self, fine):
if self.month != self.Time.month:
self.month = self.Time.month
''' Retrieve all stocks that have the valid variation ratios that we want '''
filteredFine = [x for x in fine if x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths
and x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths
and x.OperationRatios.ROA.ThreeMonths
and x.OperationRatios.ROA.OneYear
and x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths
and x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths
and x.OperationRatios.GrossMargin.ThreeMonths
and x.OperationRatios.GrossMargin.OneYear
and x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths
and x.OperationRatios.LongTermDebtEquityRatio.OneYear
and x.OperationRatios.CurrentRatio.ThreeMonths
and x.OperationRatios.CurrentRatio.OneYear
and x.OperationRatios.AssetsTurnover.ThreeMonths
and x.OperationRatios.AssetsTurnover.OneYear
and x.ValuationRatios.NormalizedPERatio
and x.EarningReports.BasicAverageShares.ThreeMonths
and x.EarningReports.BasicEPS.TwelveMonths
and x.ValuationRatios.PayoutRatio > 0]
''' Using the FScore class, retrieve the stocks that have a score of X or higher '''
sortedByFScore = [x for x in filteredFine if FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths,
x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths,
x.OperationRatios.ROA.ThreeMonths,
x.OperationRatios.ROA.OneYear,
x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths,
x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths,
x.OperationRatios.GrossMargin.ThreeMonths,
x.OperationRatios.GrossMargin.OneYear,
x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths,
x.OperationRatios.LongTermDebtEquityRatio.OneYear,
x.OperationRatios.CurrentRatio.ThreeMonths,
x.OperationRatios.CurrentRatio.OneYear,
x.OperationRatios.AssetsTurnover.ThreeMonths,
x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore() > 6
]
self.qualityStocks = sortedByFScore
''' The Piotroski score ranks quality stocks, but we still need to determine value by filtering it more '''
sortedByNormalizedPE = sorted(sortedByFScore, key=lambda x: (x.ValuationRatios.NormalizedPERatio), reverse = False)
topFine = sortedByNormalizedPE[:self.topScoreSymbolsFine]
self.symbols = [i.Symbol for i in topFine]
return self.symbols
else: return self.symbols
def Daily(self):
if self.initiated < 0:
# self.Rebalance()
self.initiated = 1
def Rebalance(self):
# Fetch the historical data to perform the linear regression
history = self.History(
self.symbols + [self.benchmark],
self.lookback,
Resolution.Hour).close.unstack(level=0)
symbols = self.SelectSymbols(history)
''' Invest 100% in the selected symbols '''
buyingPower = (self.Portfolio.MarginRemaining / 3) * .995
if buyingPower > 0:
for symbol in symbols:
if self.Securities[symbol].Price > 0:
orderSize = buyingPower / self.Securities[symbol].Price
self.MarketOrder(symbol, orderSize)
else: self.Log("Insufficient Buying Power: " + str(self.Portfolio.MarginRemaining))
def SelectSymbols(self, history):
'''Select symbols with the highest intercept/alpha to the benchmark
'''
alphas = dict()
# Get the benchmark returns
benchmark = history[self.benchmark].pct_change().dropna()
# Conducts linear regression for each symbol and save the intercept/alpha
for symbol in self.symbols:
# Get the security returns
if not symbol in history: continue
returns = history[symbol].pct_change().dropna()
returns = np.vstack([returns, np.ones(len(returns))]).T
if len(returns) != len(benchmark): continue
# Simple linear regression function in Numpy
result = np.linalg.lstsq(returns, benchmark)
alphas[symbol] = result[0][1]
# Select symbols with the highest intercept/alpha to the benchmark
selected = sorted(alphas.items(), key=lambda x: x[1], reverse=True)[:20]
return [x[0] for x in selected]
class FScore(object):
def __init__(self,
netincome,
operating_cashflow,
roa_current,
roa_past,
issued_current,
issued_past,
grossm_current,
grossm_past,
longterm_current,
longterm_past,
curratio_current,
curratio_past,
assetturn_current,
assetturn_past):
self.netincome = netincome
self.operating_cashflow = operating_cashflow
self.roa_current = roa_current
self.roa_past = roa_past
self.issued_current = issued_current
self.issued_past = issued_past
self.grossm_current = grossm_current
self.grossm_past = grossm_past
self.longterm_current = longterm_current
self.longterm_past = longterm_past
self.curratio_current = curratio_current
self.curratio_past = curratio_past
self.assetturn_current = assetturn_current
self.assetturn_past = assetturn_past
def ObjectiveScore(self):
''' The Piotroski score is broken down into profitability; leverage, liquidity, and source of funds; and operating efficiency categories, as follows: '''
fscore = 0
''' Profitability Criteria '''
fscore += np.where(self.netincome > 0, 1, 0) # Positive Net Income (X Months?)
fscore += np.where(self.operating_cashflow > 0, 1, 0) # Positive Operating Cash Flow
fscore += np.where(self.roa_current > self.roa_past, 1, 0) # Positive Return on Assets
fscore += np.where(self.operating_cashflow > self.roa_current, 1, 0) # Cash flow from operations being greater than net income (quality of earnings)
''' Leverage, Liquidity, and Source of Dunds Criteria '''
fscore += np.where(self.longterm_current <= self.longterm_past, 1, 0) # Lower ratio of long term debt in the current period, compared to the previous year (decreased leverage)
fscore += np.where(self.curratio_current >= self.curratio_past, 1, 0) # Higher current ratio this year compared to the previous year (more liquidity)
fscore += np.where(self.issued_current <= self.issued_past, 1, 0) # No new shares were issued in the last year
''' Operating Efficiency Criteria '''
# A higher gross margin compared to the previous year
fscore += np.where(self.grossm_current >= self.grossm_past, 1, 0) # A higher gross margin compared to the previous year
fscore += np.where(self.assetturn_current >= self.assetturn_past, 1, 0) # A higher asset turnover ratio compared to the previous year (1 point)
return fscore