Overall Statistics
Total Trades
Average Win
Average Loss
Compounding Annual Return
Net Profit
Sharpe Ratio
Probabilistic Sharpe Ratio
Loss Rate
Win Rate
Profit-Loss Ratio
Annual Standard Deviation
Annual Variance
Information Ratio
Tracking Error
Treynor Ratio
Total Fees
Estimated Strategy Capacity
Lowest Capacity Asset
Portfolio Turnover
# https://quantpedia.com/strategies/post-earnings-announcement-effect/
# The investment universe consists of all stocks from NYSE, AMEX, and NASDAQ except financial and utility firms and stocks with prices less than $5.
# Two factors are used: EAR (Earnings Announcement Return) and SUE (Standardized Unexpected Earnings). SUE is constructed by dividing the earnings 
# surprise (calculated as actual earnings minus expected earnings; expected earnings are computed using a seasonal random walk model with drift)
# by the standard deviation of earnings surprises. EAR is the abnormal return for firms recorded over a three-day window centered on the last 
# announcement date, in excess of the return of a portfolio of firms with similar risk exposures.
# Stocks are sorted into quintiles based on the EAR and SUE. To avoid look-ahead bias, data from the previous quarter are used to sort stocks. 
# Stocks are weighted equally in each quintile. The investor goes long stocks from the intersection of top SUE and EAR quintiles and goes short
# stocks from the intersection of the bottom SUE and EAR quintiles the second day after the actual earnings announcement and holds the portfolio 
# one quarter (or 60 working days). The portfolio is rebalanced every quarter.
# QC Implementation:
#   - Universe consists of stocks, with earnings data from https://www.nasdaq.com/market-activity/earnings available.
#   - At least 4 years of seasonal earnings data is required to calculate earnigns surprise.
#   - At least 4 years of earnings surprise values are required for SUE calculation.
#   - Only long leg is traded since research paper claims that major part of strategy performance is produced by long leg only.

#region imports
from AlgorithmImports import *
import numpy as np
from collections import deque
from pandas.tseries.offsets import BDay
from dateutil.relativedelta import relativedelta

class PostEarningsAnnouncementEffect(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2010, 1, 1)

        self.earnings_surprise = {}
        self.min_seasonal_eps_period = 4
        self.min_surprise_period = 4
        self.long = []
        # SUE and EAR history for previous quarter used for statistics.
        self.sue_ear_history_previous = []
        self.sue_ear_history_actual = []
        # EPS data keyed by tickers, which are keyed by dates
        self.eps_by_ticker = {}
        # daily price data
        self.price_data_with_date = {}
        self.price_period = 63

        self.market = self.AddEquity('SPY', Resolution.Daily).Symbol
        self.price_data_with_date[self.market] = deque(maxlen=self.price_period)

        # parse earnings dataset
        self.first_date:datetime.date|None = None
        earnings_data:str = self.Download('data.quantpedia.com/backtesting_data/economic/earnings_dates_eps.json')
        earnings_data_json:list[dict] = json.loads(earnings_data)
        for obj in earnings_data_json:
            date:datetime.date = datetime.strptime(obj['date'], "%Y-%m-%d").date()
            if not self.first_date: self.first_date = date

            for stock_data in obj['stocks']:
                ticker:str = stock_data['ticker']

                if stock_data['eps'] == '':

                # initialize dictionary for dates for specific ticker
                if ticker not in self.eps_by_ticker:
                    self.eps_by_ticker[ticker] = {}
                # store EPS value keyed date, which is keyed by ticker
                self.eps_by_ticker[ticker][date] = float(stock_data['eps'])

        self.month = 12
        self.selection_flag = False
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        self.Schedule.On(self.DateRules.MonthStart(self.market), self.TimeRules.AfterMarketOpen(self.market), self.Selection)

    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
        # remove earnings surprise data so it remains consecutive
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            if symbol in self.earnings_surprise:
                del self.earnings_surprise[symbol]
    def CoarseSelectionFunction(self, coarse):
        # update daily price data
        for stock in coarse:
            symbol = stock.Symbol

            if symbol in self.price_data_with_date:
                self.price_data_with_date[symbol].append((self.Time.date(), stock.AdjustedPrice))
        if not self.selection_flag:
            return Universe.Unchanged
        self.selection_flag = False
        # filter only symbols, which have earnings data from csv
        selected = [x.Symbol for x in coarse if x.Symbol.Value in self.eps_by_ticker]
        # warmup price data
        for symbol in selected:
            if symbol in self.price_data_with_date:
            self.price_data_with_date[symbol] = deque(maxlen=self.price_period)
            history = self.History(symbol, self.price_period, Resolution.Daily)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet.")
            closes = history.loc[symbol].close
            for time, close in closes.iteritems():
                self.price_data_with_date[symbol].append((time.date(), close))
        # market price data is not ready yet
        if len(self.price_data_with_date[self.market]) != self.price_data_with_date[self.market].maxlen:
            return Universe.Unchanged

        return [x for x in selected if len(self.price_data_with_date[x]) == self.price_data_with_date[x].maxlen]
    def FineSelectionFunction(self, fine):
        # SUE and EAR data
        sue_ear = {}
        current_date = self.Time.date()
        prev_three_months = current_date - relativedelta(months=3)
        for stock in fine:
            symbol = stock.Symbol
            ticker = symbol.Value
            recent_eps_data = None

            # store all EPS data since previous three months window    
            for date in self.eps_by_ticker[ticker]:
                if date < current_date and date >= prev_three_months:
                    EPS_value = self.eps_by_ticker[ticker][date]
                    # create tuple (EPS date, EPS value of specific stock)
                    recent_eps_data = (date, EPS_value)
            if recent_eps_data:
                last_earnings_date = recent_eps_data[0]
                # get earnings history until previous earnings
                earnings_eps_history = [(x, self.eps_by_ticker[ticker][x]) for x in self.eps_by_ticker[ticker] if x < last_earnings_date]
                # seasonal earnings for previous years
                # prev_month_date = last_earnings_date - relativedelta(months=1)
                # next_month_date = last_earnings_date + relativedelta(months=1)
                # month_range = [prev_month_date.month, last_earnings_date.month, next_month_date.month]
                # seasonal_eps_data = [x for x in earnings_eps_history if x[0].month in month_range]
                seasonal_eps_data = [x for x in earnings_eps_history if x[0].month == last_earnings_date.month]
                if len(seasonal_eps_data) >= self.min_seasonal_eps_period:
                    # make sure we have a consecutive seasonal data. Same months with one year difference
                    year_diff = np.diff([x[0].year for x in seasonal_eps_data])
                    if all(x == 1 for x in year_diff):
                        # SUE calculation
                        seasonal_eps = [x[1] for x in seasonal_eps_data]
                        diff_values = np.diff(seasonal_eps)
                        drift = np.average(diff_values)
                        last_earnings_eps = seasonal_eps[-1]
                        expected_earnings = last_earnings_eps + drift
                        actual_earnings = recent_eps_data[1]
                        earnings_surprise = actual_earnings - expected_earnings
                        # initialize suprise data
                        if symbol not in self.earnings_surprise:
                            self.earnings_surprise[symbol] = []
                        # surprise data is ready.
                        elif len(self.earnings_surprise[symbol]) >= self.min_surprise_period:
                            earnings_surprise_std = np.std(self.earnings_surprise[symbol])
                            sue = earnings_surprise / earnings_surprise_std
                            # EAR calculation
                            min_day = last_earnings_date - BDay(2)
                            max_day = last_earnings_date + BDay(1)
                            stock_closes_around_earnings = [x for x in self.price_data_with_date[symbol] if x[0] >= min_day and x[0] <= max_day]
                            market_closes_around_earnings = [x for x in self.price_data_with_date[self.market] if x[0] >= min_day and x[0] <= max_day]
                            if len(stock_closes_around_earnings) == 4 and len(market_closes_around_earnings) == 4:
                                stock_return = stock_closes_around_earnings[-1][1] / stock_closes_around_earnings[0][1] - 1
                                market_return = stock_closes_around_earnings[-1][1] / stock_closes_around_earnings[0][1] - 1
                                ear = stock_return - market_return
                                sue_ear[symbol] = (sue, ear)
                                # store pair in this month's history
                                self.sue_ear_history_actual.append((sue, ear))
        # wait until we have history data for previous three months.
        if len(sue_ear) != 0 and len(self.sue_ear_history_previous) != 0:
            # Sort by SUE and EAR.
            sue_values = [x[0] for x in self.sue_ear_history_previous]
            ear_values = [x[1] for x in self.sue_ear_history_previous]
            top_sue_quintile  = np.percentile(sue_values, 80)
            bottom_sue_quintile = np.percentile(sue_values, 20)
            top_ear_quintile = np.percentile(ear_values, 80)
            bottom_ear_quintile = np.percentile(ear_values, 20)
            self.long = [x[0] for x in sue_ear.items() if x[1][0] >= top_sue_quintile and x[1][1] >= top_ear_quintile]
        return self.long
    def OnData(self, data):
        # trade execution
        invested = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in invested:
            if symbol not in self.long:

        long_count = len(self.long)
        for symbol in self.long:
            if symbol in data and data[symbol]:
                self.SetHoldings(symbol, 1 / long_count)


    def Selection(self):
        self.selection_flag = True
        # store new EAR and SUE values every three months
        if self.month % 3 == 0:
            # Save previous month history.
            self.sue_ear_history_previous = self.sue_ear_history_actual

        self.month += 1
        if self.month > 12:
            self.month = 1

# Custom fee model
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))