| Overall Statistics |
|
Total Trades 663 Average Win 1.90% Average Loss -0.97% Compounding Annual Return -3.380% Drawdown 64.800% Expectancy -0.113 Net Profit -31.875% Sharpe Ratio -0.197 Probabilistic Sharpe Ratio 0.001% Loss Rate 70% Win Rate 30% Profit-Loss Ratio 1.95 Alpha -0.021 Beta -0.011 Annual Standard Deviation 0.111 Annual Variance 0.012 Information Ratio -0.767 Tracking Error 0.193 Treynor Ratio 1.964 Total Fees $676.82 Estimated Strategy Capacity $26.00 |
import numpy as np
from heap import *
import requests
import statistics as stat
from collections import deque
import copy
from itertools import groupby
from math import ceil
class Piotroski(QCAlgorithm):
#########
# SETUP #
####################################################################################################################################
# PARAMETERS
LEVERAGE = 1
SPLIT = 0 # % long
STOP_LOSS = 0.07
CASH_TO_POSITIONS = 1e3
MAX_CONCENTRATION = 0.2 # want at least 5 positions: 0.2
G_SCORE_LONG = False
G_SCORE_SHORT = False
QUARTILE_PTC = 0.99 # default: 0.2
MIN_VOLUME = 0
MIN_SHARE = 5.00 # 5: no penny stocks
MAX_SHARE = 250 # <= cash_to_positions
COARSE_LIMIT = 100000 # >=13k tickers on Tiingo
FINE_LIMIT = 100000 # <= coarse_limit
MIN_AGE = 365*2 # 365*2: filter ipo movement
# FILTERS
G_LONG_MIN = 5 # default: 5
F_LONG_MIN = 8 # default: 8
SHORT_MAX = 2 # default: 2
AZ_SAFE = 2.6 # default: 2.6
AZ_DISTRESS = 1.1 # default: 1.1
MKTCAP_MIN = 1e8 # >= 25 million
MKTCAP_MAX = 1e11 # filter megacaps
G_QUARTERS = 30 # min statistically significant
MONTHLY = False # False = Quarterly Rebalancing
# DYNAMIC WEIGHTING
LONG_CONV = {
9: {9 : 5, 8 : 4, 7 : 3, 6 : 2, 5 : 1},
8: {8 : 4, 7 : 3, 6 : 2, 5 : 1},
7: {7 : 4, 6 : 2, 5: 1},
6: {6 : 2, 5 : 1},
5: {5 : 1}
}
SHORT_CONV = {
0: {0 : 4, 1 : 3, 2 : 2, 3 : 1},
1: {1 : 4, 2 : 2, 3 : 1},
2: {2 : 4, 3 : 1},
3: {3 : 1}
}
def Initialize(self):
self.SetStartDate(2010, 1, 1) # Set Start Date - After 1/1/2000
self.SetEndDate(2021, 2, 28) # Set End Date
self.SetCash(5e3) # Set Strategy Cash (5k portfolio)
self.AddEquity("SPY", Resolution.Daily)
self.uni_flag = True
self.certify_and_place_flag = False
self.take_positions_flag = False
self.active_positions = (self.Portfolio.Cash) // self.CASH_TO_POSITIONS
self.long_positions = int(self.active_positions * self.SPLIT)
self.short_positions = int(self.active_positions * (1 - self.SPLIT))
self.tiingo = Tiingo()
self.gsParent = GScoreParent(self.G_QUARTERS, self.MONTHLY) # Responsible for holding all trailing data & doing calculations
self.TrailData = {}
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.select_coarse, self.select_fine)
self.account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
self.SetWarmup(1)
self.Schedule.On(self.DateRules.MonthEnd(), self.TimeRules.AfterMarketOpen('SPY'), self.SetFlag)
self.Schedule.On(self.DateRules.MonthEnd(), self.TimeRules.AfterMarketOpen('SPY'), self.FullLiquidate)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen('SPY'), self.TakePositions)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY'), self.PlotLev)
# self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerageModel)
self.SetBenchmark('SPY')
#################
# POSITION CODE #
####################################################################################################################################
def GetShortPositions(self):
target = self.short_positions
while target > 0:
try:
return self.shortheap.popn(target)
except:
self.Debug('Short heap does not have ' + str(target) + ' stocks')
target -= 1
self.Debug('Short heap has 0 stocks - returning empty list')
return []
def GetLongPositions(self):
target = self.long_positions
while target > 0:
try:
return self.longheap.popn(target)
except:
self.Debug('Long heap does not have ' + str(target) + ' stocks')
target -= 1
self.Debug('Long heap has 0 stocks - returning empty list')
return []
def FullLiquidate(self):
if self.uni_flag:
self.Debug('FULL LIQUIDATION')
self.Liquidate()
self.TrailData = {}
self.longs = []
self.shorts = []
def ClearPositions(self):
self.Debug('LIQUIDATING')
long_short_list = [t.getTicker() for t in self.longs] + [t.getTicker() for t in self.shorts]
for i in self.Portfolio.Values:
if (i.Invested) and (i.Symbol not in long_short_list):
self.Liquidate(i.Symbol)
self.DeleteKey(i.Symbol)
# Selects stocks to invest from stocks stored in heaps
# Conditions on whether the heaps are ready and if it is the correct time to
# invest
def TakePositions(self):
if self.take_positions_flag:
if self.Time.month in [1, 4, 7, 10] or self.MONTHLY:
self.active_positions = (self.Portfolio.Cash) // self.CASH_TO_POSITIONS
self.long_positions = int(self.active_positions * self.SPLIT)
self.short_positions = int(self.active_positions * (1 - self.SPLIT))
self.longs = self.GetLongPositions()
self.num_longs = len(self.longs)
self.shorts = self.GetShortPositions()
self.num_shorts = len(self.shorts)
self.Rebalance()
self.take_positions_flag = False
# Manages max position concentration by splitting excess concentration over
# other longs/shorts
def CapConcentrations(self):
for stocks in [self.longs, self.shorts]:
over = 0
for i in range(len(stocks)):
currWeight = stocks[i].getWeight()
if currWeight > self.MAX_CONCENTRATION:
over += currWeight - self.MAX_CONCENTRATION
stocks[i].setWeight(self.MAX_CONCENTRATION)
else:
additional = over / (len(stocks) - i)
if currWeight + additional < self.MAX_CONCENTRATION:
newWeight = currWeight + additional
else:
newWeight = self.MAX_CONCENTRATION
additional = self.MAX_CONCENTRATION - currWeight
stocks[i].setWeight(newWeight)
over -= additional
# Balances concentrations and scales down disproportionately large
# investment if current concentrations do not satisfy the SPLIT variable
if self.SPLIT == 1:
longProp = sum([t.getWeight() for t in self.longs]) / self.SPLIT
shortProp = 1
elif self.SPLIT == 0:
shortProp = sum([t.getWeight() for t in self.shorts]) / (1 - self.SPLIT)
longProp = 1
else:
longProp = sum([t.getWeight() for t in self.longs]) / self.SPLIT
shortProp = sum([t.getWeight() for t in self.shorts]) / (1 - self.SPLIT)
minProp = min(longProp, shortProp)
for t in self.longs:
t.setWeight(t.getWeight() * minProp / longProp)
for t in self.shorts:
t.setWeight(t.getWeight() * minProp / shortProp)
# Accounts for the desired split
def BalanceConcentrations(self):
for t in self.longs:
t.setWeight(t.getWeight() * self.SPLIT)
for t in self.shorts:
t.setWeight(t.getWeight() * (1 - self.SPLIT))
if self.SPLIT == 1:
self.shorts = []
elif self.SPLIT == 0:
self.longs = []
# Finds desired position concentrations for all stocks to invest in
# Sets the certify_and_place_flag to True, which motivates the stocks to be
# invested in
def Rebalance(self):
if hasattr(self, 'longs') == False or hasattr(self, 'shorts') == False:
return
self.ClearPositions()
if len(self.longs) > 0:
long_max = max([t.getFscore() for t in self.longs])
if len(self.shorts) > 0:
short_min = min([t.getFscore() for t in self.shorts])
longsum = 0
for stock in self.longs:
stock.setConvFscore(self.LONG_CONV[long_max][stock.getFscore()])
longsum += stock.getConvFscore()
shortsum = 0
for stock in self.shorts:
stock.setConvFscore(self.SHORT_CONV[short_min][stock.getFscore()])
shortsum += stock.getConvFscore()
self.Debug('REBALANCING')
if self.G_SCORE_LONG or self.G_SCORE_SHORT:
self.Debug('position | stock | f/gscore | converted | weight')
else:
self.Debug('position | stock | fscore | converted | weight')
for t in self.longs:
t.setWeight(t.getConvFscore() / longsum)
for t in self.shorts:
t.setWeight(t.getConvFscore() / shortsum)
self.BalanceConcentrations()
self.CapConcentrations()
for t in self.longs:
message = 'LONG' + ' | ' + str(t.getTicker()) + ' | ' + str(t.getFscore()) + ' | ' + str(t.getConvFscore()) + ' | ' + str("{:.2f}".format(t.getWeight()))
self.Log(message)
for t in self.shorts:
t.setWeight(-1 * t.getWeight())
message = 'SHORT' + ' | ' + str(t.getTicker()) + ' | ' + str(t.getFscore()) + ' | ' + str(t.getConvFscore()) + ' | ' + str("{:.2f}".format(t.getWeight()))
self.Log(message)
self.Log('\n')
self.certify_and_place_flag = True
def DeleteKey(self, ticker):
if ticker in self.TrailData:
del self.TrailData[ticker]
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
# Three step verification process:
# (1) Check if a stock is being delisted
# (2) Check if there is data for a stock
# (3) Check if a stock needs to be stop-lossed
# (4) Repopulate stocks if necessary
if self.Transactions.GetOpenOrders():
# self.Log('OPEN ORDERS')
return
if hasattr(self, 'longs') == False or hasattr(self, 'shorts') == False:
return
need_to_rebalance = False
# (1) Checks if any stocks are delisted and removes them if so
for stock in self.longs:
if data.Delistings.ContainsKey(stock.getTicker()):
self.longs.remove(stock)
self.num_longs -= 1
need_to_rebalance = True
self.DeleteKey(stock.getTicker())
for stock in self.shorts:
if data.Delistings.ContainsKey(stock.getTicker()):
self.shorts.remove(stock)
self.num_shorts -= 1
need_to_rebalance = True
self.DeleteKey(stock.getTicker())
# (2) If rebalance has run, then stock weights are prepared. Stocks
# should however still be certified as tradeable before setting holdings
# Certification uses the data slices of OnData.
if self.certify_and_place_flag:
for stock in self.longs:
if not data.ContainsKey(stock.getTicker()):
self.longs.remove(stock)
self.num_longs -= 1
need_to_rebalance = True
self.DeleteKey(stock.getTicker())
for stock in self.shorts:
if not data.ContainsKey(stock.getTicker()):
self.shorts.remove(stock)
self.num_shorts -= 1
need_to_rebalance = True
self.DeleteKey(stock.getTicker())
self.SetHoldings([PortfolioTarget(t.getTicker(), t.getWeight()) for t in self.longs])
self.SetHoldings([PortfolioTarget(t.getTicker(), t.getWeight()) for t in self.shorts])
self.certify_and_place_flag = False
# (3) Checks if longs/shorts in portfolio need to be stop-lossed
for stock in self.longs:
symbol = stock.getTicker()
current_price = self.Portfolio[symbol].Price
self.TrailData[symbol] = max(self.TrailData[symbol], current_price) if symbol in self.TrailData else current_price
if current_price < self.TrailData[symbol] * (1 - self.STOP_LOSS):
self.Debug('stop-lossing long stock: ' + str(symbol) + ' | High at: ' + "{:.2f}".format(self.TrailData[symbol]) + \
' | Current price: ' + "{:.2f}".format(current_price) + ' | Bought at: ' + "{:.2f}".format(self.Portfolio[symbol].AveragePrice))
self.longs.remove(stock)
self.num_longs -= 1
self.DeleteKey(stock.getTicker())
need_to_rebalance = True
for stock in self.shorts:
symbol = stock.getTicker()
current_price = self.Portfolio[symbol].Price
self.TrailData[symbol] = min(self.TrailData[symbol], current_price) if symbol in self.TrailData else current_price
if current_price > self.TrailData[symbol] * (1 + self.STOP_LOSS):
self.Debug('stop-lossing short stock: ' + str(symbol) + ' | Low at: ' + "{:.2f}".format(self.TrailData[symbol]) + \
' | Current price: ' + "{:.2f}".format(current_price) + ' | Bought at: ' + "{:.2f}".format(self.Portfolio[symbol].AveragePrice))
self.shorts.remove(stock)
self.num_shorts -= 1
self.DeleteKey(stock.getTicker())
need_to_rebalance = True
# (4) Repopulates longs/shorts with replacements if any stock was stop-
# lossed or eliminated because data was not available
if need_to_rebalance == True:
while self.num_longs < self.long_positions:
try:
stock = self.longheap.pop()
self.Debug('adding long stock: ' + str(stock.getTicker()) + ' | fscore: ' + str(stock.getFscore()))
self.longs.append(stock)
self.num_longs += 1
except:
self.Debug('No long stock left to replace stop loss | Number of long stocks: ' + str(self.num_longs))
break
while self.num_shorts < self.short_positions:
try:
stock = self.shortheap.pop()
self.Debug('adding short stock: ' + str(stock.getTicker()) + ' | fscore: ' + str(stock.getFscore()))
self.shorts.append(stock)
self.num_shorts += 1
except:
self.Debug('No short stock left to replace stop loss | Number of short stocks: ' + str(self.num_shorts))
break
self.Rebalance()
def PlotLev(self):
self.account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
self.Plot('Leverage', self.account_leverage)
#################
# UNIVERSE CODE #
####################################################################################################################################
# uni_flag is used to determine whether it is time to generate a new batch
# of stocks to invest in
def SetFlag(self):
if self.MONTHLY:
self.uni_flag = True
if self.Time.month in [12, 3, 6, 9]:
self.uni_flag = True
def select_coarse(self, coarse):
if self.uni_flag == False:
return Universe.Unchanged
self.prices = {}
for x in coarse:
if x.HasFundamentalData:
self.prices[x.Symbol] = x.Price
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > self.MIN_VOLUME and x.Price > self.MIN_SHARE and x.Price < self.MAX_SHARE], key = lambda x: x.DollarVolume, reverse=True)[:self.COARSE_LIMIT]
self.dv_by_symbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}
if len(self.dv_by_symbol) == 0:
return Universe.Unchanged
return list(self.dv_by_symbol.keys())
def select_fine(self, fine):
sortedBySector = sorted([x for x in fine if x.CompanyReference.CountryId == "USA"
and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
and (self.Time - x.SecurityReference.IPODate).days > self.MIN_AGE
and x.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.FinancialServices
and x.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.Utilities
and x.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.Healthcare
and x.AssetClassification.MorningstarIndustryGroupCode != MorningstarIndustryGroupCode.MetalsAndMining
and self.MKTCAP_MIN <= x.MarketCap
and x.MarketCap <= self.MKTCAP_MAX
and not x.SecurityReference.IsDepositaryReceipt],
key = lambda x: x.CompanyReference.IndustryTemplateCode)
count = len(sortedBySector)
if count == 0:
return Universe.Unchanged
self.last_month = self.Time.month
percent = self.FINE_LIMIT / count
sortedByDollarVolume = []
for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode):
y = sorted(g, key = lambda x: self.dv_by_symbol[x.Symbol], reverse = True)
c = ceil(len(y) * percent)
sortedByDollarVolume.extend(y[:c])
sortedByDollarVolume = sorted(sortedByDollarVolume, key = lambda x: self.dv_by_symbol[x.Symbol], reverse=True)
final_securities = sortedByDollarVolume[:self.FINE_LIMIT]
# $50mm-$100bn market cap range
filteredByMktCap = [x for x in final_securities if self.MKTCAP_MIN < x.MarketCap < self.MKTCAP_MAX]
# exclude utility/financial stocks
filteredByType = [x for x in filteredByMktCap if x.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.FinancialServices
and x.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.Utilities
and not x.SecurityReference.IsDepositaryReceipt
and x.AssetClassification.MorningstarIndustryGroupCode != MorningstarIndustryGroupCode.MetalsAndMining]
if self.G_SCORE_LONG or self.G_SCORE_SHORT:
# g-score, f-score, altman-z
filtered_fine = [x for x in filteredByType if x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths and
x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths and
x.OperationRatios.ROA.ThreeMonths and x.OperationRatios.ROA.OneYear and
x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths and x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths and
x.OperationRatios.GrossMargin.ThreeMonths and x.OperationRatios.GrossMargin.OneYear and
x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths and x.OperationRatios.LongTermDebtEquityRatio.OneYear and
x.OperationRatios.CurrentRatio.ThreeMonths and x.OperationRatios.CurrentRatio.OneYear and
x.OperationRatios.AssetsTurnover.ThreeMonths and x.OperationRatios.AssetsTurnover.OneYear and x.ValuationRatios.PBRatio and
x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths and
x.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.TwelveMonths and
x.FinancialStatements.BalanceSheet.WorkingCapital.TwelveMonths and
x.FinancialStatements.BalanceSheet.RetainedEarnings.TwelveMonths and
x.FinancialStatements.BalanceSheet.TotalEquity.TwelveMonths and
x.FinancialStatements.IncomeStatement.EBIT.TwelveMonths and
x.EarningReports.BasicAverageShares.TwelveMonths and
x.FinancialStatements.CashFlowStatement.OperatingCashFlow.TwelveMonths and
x.FinancialStatements.IncomeStatement.ResearchAndDevelopment.TwelveMonths and
x.FinancialStatements.CashFlowStatement.CapExReported.TwelveMonths and
x.FinancialStatements.IncomeStatement.SellingGeneralAndAdministration.TwelveMonths]
self.gsParent.examine(filtered_fine)
else:
# f-score, altman-z
filtered_fine = [x for x in filteredByType if x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths and
x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths and
x.OperationRatios.ROA.ThreeMonths and x.OperationRatios.ROA.OneYear and
x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths and x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths and
x.OperationRatios.GrossMargin.ThreeMonths and x.OperationRatios.GrossMargin.OneYear and
x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths and x.OperationRatios.LongTermDebtEquityRatio.OneYear and
x.OperationRatios.CurrentRatio.ThreeMonths and x.OperationRatios.CurrentRatio.OneYear and
x.OperationRatios.AssetsTurnover.ThreeMonths and x.OperationRatios.AssetsTurnover.OneYear and x.ValuationRatios.PBRatio and
x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths and
x.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.TwelveMonths and
x.FinancialStatements.BalanceSheet.WorkingCapital.TwelveMonths and
x.FinancialStatements.BalanceSheet.RetainedEarnings.TwelveMonths and
x.FinancialStatements.BalanceSheet.TotalEquity.TwelveMonths and
x.FinancialStatements.IncomeStatement.EBIT.TwelveMonths and
x.EarningReports.BasicAverageShares.TwelveMonths]
sorted_by_mb = sorted(filtered_fine, key=lambda x: x.ValuationRatios.PBRatio, reverse=True)
longs = []
shorts = []
longheap = []
shortheap = []
if self.G_SCORE_LONG:
# top quartile for G-Score Longs
for x in sorted_by_mb[:int(len(sorted_by_mb) * self.QUARTILE_PTC)]:
az = ZScore(x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalEquity.TwelveMonths,
x.FinancialStatements.BalanceSheet.WorkingCapital.ThreeMonths,
x.FinancialStatements.BalanceSheet.RetainedEarnings.TwelveMonths,
x.FinancialStatements.IncomeStatement.EBIT.TwelveMonths).ObjectiveScore()
gs = self.gsParent.compute_g_score(x)
if gs != None:
if gs >= self.G_LONG_MIN and az>self.AZ_SAFE:
longs.append(x)
longheap.append(Long(x.Symbol, gs, x.MarketCap, az))
else:
# bottom quartile for F-Score Longs
for x in sorted_by_mb[int(len(sorted_by_mb) * (1 - self.QUARTILE_PTC)):]:
az = ZScore(x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalEquity.TwelveMonths,
x.FinancialStatements.BalanceSheet.WorkingCapital.ThreeMonths,
x.FinancialStatements.BalanceSheet.RetainedEarnings.TwelveMonths,
x.FinancialStatements.IncomeStatement.EBIT.TwelveMonths).ObjectiveScore()
fs = self.tiingo.GetFScore(x.Symbol)
if fs == None:
fs = FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths,
x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths,
x.OperationRatios.ROA.ThreeMonths, x.OperationRatios.ROA.OneYear,
x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths, x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths,
x.OperationRatios.GrossMargin.ThreeMonths, x.OperationRatios.GrossMargin.OneYear,
x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths, x.OperationRatios.LongTermDebtEquityRatio.OneYear,
x.OperationRatios.CurrentRatio.ThreeMonths, x.OperationRatios.CurrentRatio.OneYear,
x.OperationRatios.AssetsTurnover.ThreeMonths, x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore()
if fs != None:
if fs >= self.F_LONG_MIN and az>self.AZ_SAFE:
longs.append(x)
longheap.append(Long(x.Symbol, fs, x.MarketCap, az))
if self.G_SCORE_SHORT:
# top quartile for G-Score Shorts
for x in sorted_by_mb[:int(len(sorted_by_mb) * self.QUARTILE_PTC)]:
az = ZScore(x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalEquity.TwelveMonths,
x.FinancialStatements.BalanceSheet.WorkingCapital.ThreeMonths,
x.FinancialStatements.BalanceSheet.RetainedEarnings.TwelveMonths,
x.FinancialStatements.IncomeStatement.EBIT.TwelveMonths).ObjectiveScore()
gs = self.gsParent.compute_g_score(x)
if gs != None:
if gs <= self.SHORT_MAX and az<self.AZ_DISTRESS:
shorts.append(x)
shortheap.append(Short(x.Symbol, gs, x.MarketCap, az))
else:
# bottom quartile for F-Score Shorts
for x in sorted_by_mb[int(len(sorted_by_mb) * (1 - self.QUARTILE_PTC)):]:
az = ZScore(x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.TwelveMonths,
x.FinancialStatements.BalanceSheet.TotalEquity.TwelveMonths,
x.FinancialStatements.BalanceSheet.WorkingCapital.ThreeMonths,
x.FinancialStatements.BalanceSheet.RetainedEarnings.TwelveMonths,
x.FinancialStatements.IncomeStatement.EBIT.TwelveMonths).ObjectiveScore()
fs = self.tiingo.GetFScore(x.Symbol)
if fs == None:
fs = FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths,
x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths,
x.OperationRatios.ROA.ThreeMonths, x.OperationRatios.ROA.OneYear,
x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths, x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths,
x.OperationRatios.GrossMargin.ThreeMonths, x.OperationRatios.GrossMargin.OneYear,
x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths, x.OperationRatios.LongTermDebtEquityRatio.OneYear,
x.OperationRatios.CurrentRatio.ThreeMonths, x.OperationRatios.CurrentRatio.OneYear,
x.OperationRatios.AssetsTurnover.ThreeMonths, x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore()
if fs != None:
if fs <= self.SHORT_MAX and az<self.AZ_DISTRESS:
shorts.append(x)
shortheap.append(Short(x.Symbol, fs, x.MarketCap, az))
self.Debug("------NEW UNIVERSE------")
self.Debug('total filtered stocks before quantiles: ' + str(len(filtered_fine)))
self.Debug('number of stocks in top quartile: ' + str(int(len(sorted_by_mb) * self.QUARTILE_PTC)))
self.Debug('number of stocks in bottom quartile: ' + str(int(len(sorted_by_mb) - len(sorted_by_mb) * (1 - self.QUARTILE_PTC))))
if self.G_SCORE_LONG:
self.Debug('long g-scores-heap: ' + str(len(shortheap)))
self.Debug('period: ' + str(self.gsParent.periods))
else:
self.Debug('long f-scores-heap: ' + str(len(longheap)))
if self.G_SCORE_SHORT:
self.Debug('short g-scores-heap: ' + str(len(shortheap)))
self.Debug('period: ' + str(self.gsParent.periods))
else:
self.Debug('short f-scores-heap: ' + str(len(shortheap)))
self.longheap = Heap(longheap)
self.shortheap = Heap(shortheap)
self.uni_flag = False
self.take_positions_flag = True
self.Debug(len(longheap))
self.Debug(len(shortheap))
return [t.Symbol for t in longs] + [t.Symbol for t in shorts]
class FScore(object):
def __init__(self, netincome, operating_cashflow, roa_current,
roa_past, issued_current, issued_past, grossm_current, grossm_past,
longterm_current, longterm_past, curratio_current, curratio_past,
assetturn_current, assetturn_past):
self.netincome = netincome
self.operating_cashflow = operating_cashflow
self.roa_current = roa_current
self.roa_past = roa_past
self.issued_current = issued_current
self.issued_past = issued_past
self.grossm_current = grossm_current
self.grossm_past = grossm_past
self.longterm_current = longterm_current
self.longterm_past = longterm_past
self.curratio_current = curratio_current
self.curratio_past = curratio_past
self.assetturn_current = assetturn_current
self.assetturn_past = assetturn_past
def ObjectiveScore(self):
fscore = 0
fscore += np.where(self.netincome > 0, 1, 0)
fscore += np.where(self.operating_cashflow > 0, 1, 0)
fscore += np.where(self.roa_current > self.roa_past, 1, 0)
fscore += np.where(self.operating_cashflow > self.roa_current, 1, 0)
fscore += np.where(self.longterm_current <= self.longterm_past, 1, 0)
fscore += np.where(self.curratio_current >= self.curratio_past, 1, 0)
fscore += np.where(self.issued_current <= self.issued_past, 1, 0)
fscore += np.where(self.grossm_current >= self.grossm_past, 1, 0)
fscore += np.where(self.assetturn_current >= self.assetturn_past, 1, 0)
return fscore
class Tiingo(object):
header = {
'Content-Type': 'application/json',
'Authorization' : 'Token 77e682c2cf272e1c63699775fde80fce4ecbd698'
}
def GetFScore(self, ticker):
requestResponse = requests.get(f"https://api.tiingo.com/tiingo/fundamentals/{ticker}/statements", headers=self.header)
if str(requestResponse) == '<Response [200]>':
jsonResponse = requestResponse.json()
if len(jsonResponse) > 0 and 'statementData' in jsonResponse[0] and 'overview' in jsonResponse[0]['statementData']:
for codePair in jsonResponse[0]['statementData']['overview']:
if codePair['dataCode'] == 'piotroskiFScore':
return codePair['value']
return None
class ZScore(object):
def __init__(self, totalassets, totalliabilities, equity, workingcapital, retainedearnings, ebit):
self.totalassets = float(totalassets)
self.totalliabilities = float(totalliabilities)
self.workingcapital = float(workingcapital)
self.retainedearnings = float(retainedearnings)
self.ebit = float(ebit)
self.equity = float(equity)
def ObjectiveScore(self):
X1 = 6.56 * (self.workingcapital / self.totalassets)
X2 = 3.26 * (self.retainedearnings / self.totalassets)
X3 = 6.72 * (self.ebit / self.totalassets)
X4 = 1.05 * (self.equity / self.totalliabilities)
return X1 + X2 + X3 + X4
class GScoreParent(object):
def __init__(self, desiredQuarters, monthly):
self.industries = set()
self.gROAs = {}
self.ratios = {}
self.periods = 0
self.desiredPeriods = desiredQuarters if not monthly else desiredQuarters * 3
self.monthly = monthly
def examine(self, filtered):
# compute the variance of the ROA for each stock
for security in filtered:
industry = security.AssetClassification.MorningstarSectorCode
# --- something to the effect of creating holders for every variable for every industry --- #
if industry not in self.industries:
self.industries.add(industry)
self.gROAs[industry] = {}
symbol = security.Symbol
if symbol not in self.gROAs[industry]:
self.gROAs[industry][symbol] = deque(maxlen=self.desiredPeriods)
self.gROAs[industry][symbol].append(security.OperationRatios.ROA.TwelveMonths)
self.periods += 1
if self.periods < self.desiredPeriods:
return
for industry in self.industries:
self.ratios[industry] = {}
securitiesForIndustry = [x for x in filtered if x.AssetClassification.MorningstarSectorCode == industry]
industryVarROAs = {}
for symbol, ROA in self.gROAs[industry].items():
if len(ROA) == ROA.maxlen:
if self.monthly:
usableROAs = [ROA[i*3] for i in range(len(ROA) // 3) ]
else:
usableROAs = ROA
industryVarROAs[symbol] = stat.variance(usableROAs)
else:
industryVarROAs[symbol] = 1000000 # dummy value
if len(industryVarROAs) < 2 or len(securitiesForIndustry) == 0:
continue
varROA_median = stat.median(industryVarROAs.values())
# we will now map Symbols to fundamental ratios, and compute the median for each ratio
# ROA 1-year
industryROAoneYs = {x.Symbol:x.OperationRatios.ROA.OneYear for x in securitiesForIndustry}
ROA1Y_median = stat.median(industryROAoneYs.values())
# Cash Flow ROA
industryCFROAs = {x.Symbol: (
x.FinancialStatements.CashFlowStatement.OperatingCashFlow.TwelveMonths
/ x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths
) for x in securitiesForIndustry}
CFROA_median = stat.median(industryCFROAs.values())
# R&D to MktCap
industryRDtoMktCap = {x.Symbol: (
x.FinancialStatements.IncomeStatement.ResearchAndDevelopment.TwelveMonths / x.MarketCap
) for x in securitiesForIndustry}
RD2MktCap_median = stat.median(industryRDtoMktCap.values())
# CapEx to MktCap
industryCaPextoMktCap = {x.Symbol: (
x.FinancialStatements.CashFlowStatement.CapExReported.TwelveMonths / x.MarketCap
) for x in securitiesForIndustry}
CaPex2MktCap_median = stat.median(industryCaPextoMktCap.values())
# Advertising to MktCap
industryAdtoMktCap = {x.Symbol: (
x.FinancialStatements.IncomeStatement.SellingGeneralAndAdministration.TwelveMonths / x.MarketCap
) for x in securitiesForIndustry}
Ad2MktCap_median = stat.median(industryAdtoMktCap.values())
self.ratios[industry] = {"varroa": (industryVarROAs, varROA_median),
"roa": (industryROAoneYs, ROA1Y_median),
"cfroa": (industryCFROAs, CFROA_median),
"rd": (industryRDtoMktCap, RD2MktCap_median),
"capex": (industryCaPextoMktCap, CaPex2MktCap_median),
"ad": (industryAdtoMktCap, Ad2MktCap_median)}
def compute_g_score(self, security):
g_score = None
industry = security.AssetClassification.MorningstarSectorCode
symbol = security.Symbol
if industry in self.ratios and "varroa" in self.ratios[industry] and symbol in self.ratios[industry]["varroa"][0]:
g_score = 0
if self.ratios[industry]["cfroa"][0][symbol] > self.ratios[industry]["roa"][0][symbol]:
g_score += 1
for _, (ratio_dict, median) in self.ratios[industry].items():
if symbol in ratio_dict and ratio_dict[symbol] > median:
g_score += 1
return g_scoreimport heapq
class Stock:
def __init__(self, ticker, fscore, mkcap, az):
self.ticker = ticker
self.fscore = fscore
self.mkcap = mkcap
self.az = az
self.weight = -1
self.conv_fscore = -1
def getTicker(self):
return self.ticker
def getFscore(self):
return self.fscore
def getWeight(self):
return self.weight
def getConvFscore(self):
return self.conv_fscore
def setWeight(self, w):
self.weight = w
def setConvFscore(self, n):
self.conv_fscore = n
def __str__(self):
return '(' + str(self.ticker) + ', ' + str(self.fscore) + ')'
class Long(Stock):
def __init__(self, ticker, fscore, mkcap, az):
super().__init__(ticker, fscore, mkcap, az)
def __lt__(self, other):
if self.fscore > other.fscore:
return True
if self.fscore < other.fscore:
return False
if self.az > other.az:
return True
if self.az < other.az:
return False
if self.mkcap > other.mkcap:
return True
if self.mkcap < other.mkcap:
return False
return str(self.ticker) > str(other.ticker)
class Short(Stock):
def __init__(self, ticker, fscore, mkcap, az):
super().__init__(ticker, fscore, mkcap, az)
def __lt__(self, other):
if self.fscore < other.fscore:
return True
if self.fscore > other.fscore:
return False
if self.az < other.az:
return True
if self.az > other.az:
return False
if self.mkcap < other.mkcap:
return True
if self.mkcap > other.mkcap:
return False
return str(self.ticker) < str(other.ticker)
class Heap:
def __init__(self, tickers=None):
if not tickers or len(tickers) == 0:
self.tickers = []
self.ticker_type = None
else:
self.tickers = tickers
self.ticker_type = type(tickers[0])
heapq.heapify(self.tickers)
def to_list(self):
return self.tickers
def push(self, item):
if self.ticker_type and type(item) != self.ticker_type:
raise Exception('Cannot add different types into same heap')
if not self.ticker_type:
self.ticker_type = type(item)
heapq.heappush(self.tickers, item)
def pop(self):
if self.size() == 0:
raise Exception('Cannot pop from empty heap')
if self.size() == 1:
self.ticker_type = None
return heapq.heappop(self.tickers)
def pushpop(self, item):
self.push(item)
return self.pop()
def popn(self, n):
if n > self.size():
raise Exception('Cannot pop more elements than heap size')
return [self.pop() for i in range(n)]
def size(self):
return len(self.tickers)
def __str__(self):
if self.size() == 0:
return 'Empty heap'
return str(self.ticker_type.__name__) + ': ' + ' '.join('(' + str(x.ticker) + ', ' + str(x.fscore) + ')' for x in self.tickers)