Overall Statistics
Total Trades
585
Average Win
0.71%
Average Loss
-0.88%
Compounding Annual Return
0.083%
Drawdown
39.500%
Expectancy
0.005
Net Profit
0.955%
Sharpe Ratio
0.059
Probabilistic Sharpe Ratio
0.021%
Loss Rate
44%
Win Rate
56%
Profit-Loss Ratio
0.81
Alpha
0.01
Beta
-0.033
Annual Standard Deviation
0.104
Annual Variance
0.011
Information Ratio
-0.661
Tracking Error
0.191
Treynor Ratio
-0.184
Total Fees
$201.03
Estimated Strategy Capacity
$23000000.00
Lowest Capacity Asset
GPS R735QTJ8XC9X
import numpy as np
from scipy.optimize import minimize
from math import sqrt

sp100_stocks = ['AAPL','MSFT','AMZN','FB','BRKB','GOOGL','GOOG','JPM','JNJ','V','PG','XOM','UNH','BAC','MA','T','DIS','INTC','HD','VZ','MRK','PFE','CVX','KO','CMCSA','CSCO','PEP','WFC','C','BA','ADBE','WMT','CRM','MCD','MDT','BMY','ABT','NVDA','NFLX','AMGN','PM','PYPL','TMO','COST','ABBV','ACN','HON','NKE','UNP','UTX','NEE','IBM','TXN','AVGO','LLY','ORCL','LIN','SBUX','AMT','LMT','GE','MMM','DHR','QCOM','CVS','MO','LOW','FIS','AXP','BKNG','UPS','GILD','CHTR','CAT','MDLZ','GS','USB','CI','ANTM','BDX','TJX','ADP','TFC','CME','SPGI','COP','INTU','ISRG','CB','SO','D','FISV','PNC','DUK','SYK','ZTS','MS','RTN','AGN','BLK']

def MonthDiff(d1, d2):
    return (d1.year - d2.year) * 12 + d1.month - d2.month

def Return(values):
    return (values[-1] - values[0]) / values[0]
    
def Volatility(values):
    values = np.array(values)
    returns = (values[1:] - values[:-1]) / values[:-1]
    return np.std(returns) * sqrt(len(values))

# Custom fee model
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))

# Quandl free data
class QuandlFutures(PythonQuandl):
    def __init__(self):
        self.ValueColumnName = "settle"

# Quandl short interest data.
class QuandlFINRA_ShortVolume(PythonQuandl):
    def __init__(self):
        self.ValueColumnName = 'SHORTVOLUME'    # also 'TOTALVOLUME' is accesible

# Quantpedia data
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaFutures(PythonData):
    def GetSource(self, config, date, isLiveMode):
        return SubscriptionDataSource("data.quantpedia.com/backtesting_data/futures/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config, line, date, isLiveMode):
        data = QuantpediaFutures()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit(): return None
        split = line.split(';')
        
        data.Time = datetime.strptime(split[0], "%d.%m.%Y") + timedelta(days=1)
        data['settle'] = float(split[1])
        data.Value = float(split[1])

        return data
        
# NOTE: Manager for new trades. It's represented by certain count of equally weighted brackets for long and short positions.
# If there's a place for new trade, it will be managed for time of holding period.
class TradeManager():
    def __init__(self, algorithm, long_size, short_size, holding_period):
        self.algorithm = algorithm  # algorithm to execute orders in.
        
        self.long_size = long_size
        self.short_size = short_size
        
        self.long_len = 0
        self.short_len = 0
    
        # Arrays of ManagedSymbols
        self.symbols = []
        
        self.holding_period = holding_period    # Days of holding.
    
    # Add stock symbol object
    def Add(self, symbol, long_flag):
        # Open new long trade.
        managed_symbol = ManagedSymbol(symbol, self.holding_period, long_flag)
        
        if long_flag:
            # If there's a place for it.
            if self.long_len < self.long_size:
                self.symbols.append(managed_symbol)
                self.algorithm.SetHoldings(symbol, 1 / self.long_size)
                self.long_len += 1
            else:
                self.algorithm.Log("There's not place for additional trade.")

        # Open new short trade.
        else:
            # If there's a place for it.
            if self.short_len < self.short_size:
                self.symbols.append(managed_symbol)
                self.algorithm.SetHoldings(symbol, - 1 / self.short_size)
                self.short_len += 1
            else:
                self.algorithm.Log("There's not place for additional trade.")
   
    # Decrement holding period and liquidate symbols.
    def TryLiquidate(self):
        symbols_to_delete = []
        for managed_symbol in self.symbols:
            managed_symbol.days_to_liquidate -= 1
            
            # Liquidate.
            if managed_symbol.days_to_liquidate == 0:
                symbols_to_delete.append(managed_symbol)
                self.algorithm.Liquidate(managed_symbol.symbol)
                
                if managed_symbol.long_flag: self.long_len -= 1
                else: self.short_len -= 1

        # Remove symbols from management.
        for managed_symbol in symbols_to_delete:
            self.symbols.remove(managed_symbol)
    
    def LiquidateTicker(self, ticker):
        symbol_to_delete = None
        for managed_symbol in self.symbols:
            if managed_symbol.symbol.Value == ticker:
                self.algorithm.Liquidate(managed_symbol.symbol)
                symbol_to_delete = managed_symbol
                if managed_symbol.long_flag: self.long_len -= 1
                else: self.short_len -= 1
                
                break
        
        if symbol_to_delete: self.symbols.remove(symbol_to_delete)
        else: self.algorithm.Debug("Ticker is not held in portfolio!")
    
class ManagedSymbol():
    def __init__(self, symbol, days_to_liquidate, long_flag):
        self.symbol = symbol
        self.days_to_liquidate = days_to_liquidate
        self.long_flag = long_flag
        
class PortfolioOptimization(object):
    def __init__(self, df_return, risk_free_rate, num_assets):
        self.daily_return = df_return
        self.risk_free_rate = risk_free_rate
        self.n = num_assets # numbers of risk assets in portfolio
        self.target_vol = 0.05

    def annual_port_return(self, weights):
        # calculate the annual return of portfolio
        return np.sum(self.daily_return.mean() * weights) * 252

    def annual_port_vol(self, weights):
        # calculate the annual volatility of portfolio
        return np.sqrt(np.dot(weights.T, np.dot(self.daily_return.cov() * 252, weights)))

    def min_func(self, weights):
        # method 1: maximize sharp ratio
        return - self.annual_port_return(weights) / self.annual_port_vol(weights)
        
        # method 2: maximize the return with target volatility
        #return - self.annual_port_return(weights) / self.target_vol

    def opt_portfolio(self):
        # maximize the sharpe ratio to find the optimal weights
        cons = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
        bnds = tuple((0, 1) for x in range(2)) + tuple((0, 0.25) for x in range(self.n - 2))
        opt = minimize(self.min_func,                               # object function
                       np.array(self.n * [1. / self.n]),            # initial value
                       method='SLSQP',                              # optimization method
                       bounds=bnds,                                 # bounds for variables 
                       constraints=cons)                            # constraint conditions
                      
        opt_weights = opt['x']
 
        return opt_weights
# https://quantpedia.com/strategies/post-earnings-announcement-effect/
#
# The investment universe consists of all stocks from NYSE, AMEX, and NASDAQ except financial and utility firms and stocks with prices less than $5.
# Two factors are used: EAR (Earnings Announcement Return) and SUE (Standardized Unexpected Earnings). SUE is constructed by dividing the earnings 
# surprise (calculated as actual earnings minus expected earnings; expected earnings are computed using a seasonal random walk model with drift)
# by the standard deviation of earnings surprises. EAR is the abnormal return for firms recorded over a three-day window centered on the last 
# announcement date, in excess of the return of a portfolio of firms with similar risk exposures.
# Stocks are sorted into quintiles based on the EAR and SUE. To avoid look-ahead bias, data from the previous quarter are used to sort stocks. 
# Stocks are weighted equally in each quintile. The investor goes long stocks from the intersection of top SUE and EAR quintiles and goes short
# stocks from the intersection of the bottom SUE and EAR quintiles the second day after the actual earnings announcement and holds the portfolio 
# one quarter (or 60 working days). The portfolio is rebalanced every quarter.

import fk_tools
import numpy as np
from collections import deque
from pandas.tseries.offsets import BDay
from dateutil.relativedelta import relativedelta

class PostEarningsAnnouncementEffect(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2010, 1, 1)
        self.SetCash(100000)

        self.period = 13
        self.eps_data = {} # EPS quarterly data
        
        self.coarse_count = 500

        # 4 prices around 3 earnings days.
        self.ear_period = 4
        self.symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        
        # Surprise data count needed to count standard deviation.
        self.surprise_period = 4
        self.earnings_surprise = {}
        
        self.long = []
        self.short = []
        
        # This month's selected stocks.
        self.last_fine = []     
        
        # SUE and EAR history for previous quarter used for statistics.
        self.sue_ear_history_previous = deque()
        self.sue_ear_history_actual = deque()
        
        # Equally weighted brackets for traded symbols.
        self.trade_manager = fk_tools.TradeManager(self, 15, 15, 60)
        
        self.month = 12
        self.selection_flag = False
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        
        self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.Selection)
        self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol), self.DayClose)

    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            
            security.SetFeeModel(fk_tools.CustomFeeModel(self))
            security.SetLeverage(5)
    
    def CoarseSelectionFunction(self, coarse):
        # At the begining of the month pick whole new set of stocks.
        if self.selection_flag: 
            selected = sorted([x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == 'usa'],
                key=lambda x: x.DollarVolume, reverse=True)
        
            return [x.Symbol for x in selected[:self.coarse_count]]
        
        # During the month, filter just already picked stocks.
        else:
            return self.last_fine

    def FineSelectionFunction(self, fine):
        if self.selection_flag:
            self.last_fine = [x.Symbol for x in fine if (x.EarningReports.BasicEPS.ThreeMonths != 0)]
            self.selection_flag = False
        
        fine_symbols = [x.Symbol for x in fine]
        filtered_fine = [x for x in fine if x.EarningReports.FileDate.year != 1 and (self.Time.date() == (x.EarningReports.FileDate + BDay(1)).date())]      
        
        # SUE and EAR data.
        sue_ear = {}
        
        market_return = 0
        if len(filtered_fine) != 0:
            # EAR calc.
            history = self.History(self.symbol, self.ear_period, Resolution.Daily)
            if len(history) == self.ear_period and 'close' in history:
                market_hist = history['close']
                market_return = fk_tools.Return(market_hist)
        
        for stock in filtered_fine:
            symbol = stock.Symbol

            # Store eps data.
            if symbol not in self.eps_data:
                self.eps_data[symbol] = deque(maxlen = self.period)
            data = (stock.EarningReports.FileDate.date(), stock.EarningReports.BasicEPS.ThreeMonths)
            # NOTE: Handles duplicate values. QC fine contains duplicated stocks in some cases.
            if data not in self.eps_data[symbol]:
                self.eps_data[symbol].append(data)
                
            if len(self.eps_data[symbol]) == self.eps_data[symbol].maxlen:
                recent_eps_data = self.eps_data[symbol][-1]
                
                year_range = range(self.Time.year - 3, self.Time.year)
                
                last_month_date = recent_eps_data[0] + relativedelta(months = -1)
                next_month_date = recent_eps_data[0] + relativedelta(months = 1)
                month_range = [last_month_date.month, recent_eps_data[0].month, next_month_date.month]

                # Earnings with todays month number 4 years back.
                seasonal_eps_data = [x for x in self.eps_data[symbol] if x[0].month in month_range and x[0].year in year_range]
                if len(seasonal_eps_data) != 3: continue
                
                # Make sure we have a consecutive seasonal data. Same months with one year difference.
                year_diff = np.diff([x[0].year for x in seasonal_eps_data])
                if all(x == 1 for x in year_diff):
                    seasonal_eps = [x[1] for x in seasonal_eps_data]
                    diff_values = np.diff(seasonal_eps)
                    drift = np.average(diff_values)
                    
                    # SUE calculation.
                    last_earnings = seasonal_eps[-1]
                    expected_earnings = last_earnings + drift
                    actual_earnings = recent_eps_data[1]

                    # Store sue value with earnigns date.
                    earnings_surprise = actual_earnings - expected_earnings

                    if symbol not in self.earnings_surprise:
                        self.earnings_surprise[symbol] = deque()
                    else:
                        # Surprise data is ready.
                        if len(self.earnings_surprise[symbol]) >= self.surprise_period:
                            earnings_surprise_std = np.std(self.earnings_surprise[symbol])
                            sue = earnings_surprise / earnings_surprise_std
                    
                            # EAR calc.
                            stock_hist = self.History(symbol, self.ear_period, Resolution.Daily)
                            if len(stock_hist) == self.ear_period and 'close' in stock_hist and market_return != 0:
                                stock_return = fk_tools.Return(stock_hist['close'])
                                ear = stock_return - market_return
    
                                sue_ear[symbol] = (sue, ear)
                                
                                # Store pair in this month's history.
                                self.sue_ear_history_actual.append((sue, ear))

                    self.earnings_surprise[symbol].append(earnings_surprise)
                
        if len(sue_ear) != 0: 
            # Wait until we have history data for previous three months.
            if len(self.sue_ear_history_previous) != 0:
                # Sort by SUE and EAR.
                sue_values = [x[0] for x in self.sue_ear_history_previous]
                ear_values = [x[1] for x in self.sue_ear_history_previous]
                
                top_sue_quintile  = np.percentile(sue_values, 80)
                bottom_sue_quintile = np.percentile(sue_values, 20)
            
                top_ear_quintile = np.percentile(ear_values, 80)
                bottom_ear_quintile = np.percentile(ear_values, 20)
                
                self.long = [x[0] for x in sue_ear.items() if x[1][0] >= top_sue_quintile and x[1][1] >= top_ear_quintile]
                self.short = [x[0] for x in sue_ear.items() if x[1][0] <= bottom_sue_quintile and x[1][1] <= bottom_ear_quintile]
            
        return self.long + self.short

    def DayClose(self):
        # Open new trades.
        for symbol in self.long:
            self.trade_manager.Add(symbol, True)
        for symbol in self.short:
            self.trade_manager.Add(symbol, False)
        
        self.trade_manager.TryLiquidate()
        
        self.long.clear()
        self.short.clear()
        
    def Selection(self):
        self.selection_flag = True
        
        # Every three months.    
        if self.month % 3 == 0:
            # Save previous month history.
            self.sue_ear_history_previous = [x for x in self.sue_ear_history_actual]
            self.sue_ear_history_actual.clear()

        self.month += 1
        if self.month > 12:
            self.month = 1