Overall Statistics
Total Trades
13196
Average Win
0.41%
Average Loss
-0.33%
Compounding Annual Return
46.342%
Drawdown
24.800%
Expectancy
0.222
Net Profit
9537.727%
Sharpe Ratio
1.654
Sortino Ratio
1.965
Probabilistic Sharpe Ratio
98.947%
Loss Rate
45%
Win Rate
55%
Profit-Loss Ratio
1.24
Alpha
0.264
Beta
0.488
Annual Standard Deviation
0.186
Annual Variance
0.035
Information Ratio
1.168
Tracking Error
0.187
Treynor Ratio
0.629
Total Fees
$280552.51
Estimated Strategy Capacity
$2000.00
Lowest Capacity Asset
GRF TWTA0VJCB0PX
Portfolio Turnover
76.23%
from AlgorithmImports import *
from datetime import datetime
from pandas.tseries.offsets import BDay
from typing import Dict, List
import json

# Class to store dividend information
class DividendInfo():
    def __init__(
            self, 
            ticker: str, 
            ex_div_date: datetime,
            payday: datetime, 
            record_date: datetime,
            dividend_value: float,
            ann_dividend_value: float,
            announcement_date: datetime
        ):
        self.ticker = ticker
        self.ex_div_date = ex_div_date
        self.payday = payday
        self.record_date = record_date
        self.dividend_value = dividend_value
        self.ann_dividend_value = ann_dividend_value
        self.announcement_date = announcement_date

# Custom class for handling probability data from Dropbox
class DropboxData(PythonData):
    def GetSource(self, config, date, isLiveMode):
        url = "https://www.dropbox.com/scl/fi/72x0szo3y42e58qvvtxbv/predicted_prob_cv_spypredictcyclical.csv?rlkey=k3t6zgnehs96wtzjhyhjiakyp&dl=1"
        return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config, line, date, isLiveMode):
        if not line.strip() or line.startswith('date,probpositive'):
            return None

        data = line.split(',')
        date = datetime.strptime(data[0], '%Y-%m-%d')

        dataObject = DropboxData()
        dataObject.Symbol = config.Symbol
        dataObject.Time = date
        dataObject.Value = float(data[1])
        return dataObject

class TradingDividendPaydate(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2012, 1, 1)
        #self.SetEndDate(2021, 4, 15)
        self.SetCash(100000)

        symbol: Symbol = self.AddEquity('SPY', Resolution.Minute).Symbol
        self.AddData(DropboxData, "ProbabilityData", Resolution.Daily)

        # Store drip tickers
        csv_string_file: str = self.Download('data.quantpedia.com/backtesting_data/economic/drip_tickers.csv')
        lines: str = csv_string_file.split('\r\n')
        self.drip_tickers: List[str] = [x for x in lines[1:]]

        # Dividend data
        self.dividend_data = {}
        dividend_data: str = self.Download('data.quantpedia.com/backtesting_data/economic/dividend_dates.json')
        dividend_data_json: Dict[str] = json.loads(dividend_data)
        
        for obj in dividend_data_json:
            ex_div_date: datetime.date = datetime.strptime(obj['date'], "%Y-%m-%d").date()
            for stock_data in obj['stocks']:
                ticker: str = stock_data['ticker']
                payday: datetime.date = datetime.strptime(stock_data['PayDate'], '%m/%d/%Y').date()
                if payday not in self.dividend_data:
                    self.dividend_data[payday] = {}
                record_date: datetime.date = datetime.strptime(stock_data['RecordDate'], '%m/%d/%Y').date()
                dividend_value: float = stock_data['Div']
                ann_dividend_value: float = stock_data['AnnDiv']
                announcement_date: datetime.date = datetime.strptime(stock_data['AnnounceDate'], '%m/%d/%Y').date()
                self.dividend_data[payday][ticker] = DividendInfo(ticker, ex_div_date, payday, record_date, dividend_value, ann_dividend_value, announcement_date)

        self.active_universe: List[Symbol] = []
        self.selection_flag: bool = False
        self.UniverseSettings.Resolution = Resolution.Minute
        self.AddUniverse(self.FundamentalSelectionFunction)
        self.Schedule.On(self.DateRules.MonthEnd(symbol), self.TimeRules.AfterMarketOpen(symbol), self.Selection)
        self.Schedule.On(self.DateRules.EveryDay(symbol), self.TimeRules.BeforeMarketClose(symbol, 16), self.Rebalance)

    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())

    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        if not self.selection_flag:
            return Universe.Unchanged
        
        self.selection_flag = False

        selection: List[Fundamental] = [x for x in fundamental if x.Symbol.Value in self.drip_tickers and x.MarketCap != 0 \
                                        and ((x.SecurityReference.ExchangeId == "NYS") or (x.SecurityReference.ExchangeId == "NAS") or (x.SecurityReference.ExchangeId == "ASE"))]

        sorted_by_market_cap = sorted(selection, key = lambda x: x.MarketCap, reverse = True)
        half = int(len(sorted_by_market_cap) / 3)
        self.active_universe = [x.Symbol for x in sorted_by_market_cap[-half:]]

        return self.active_universe
    
    def Rebalance(self) -> None:
        stocks_invested = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in stocks_invested:
            self.MarketOnCloseOrder(symbol, -self.Portfolio[symbol].Quantity)

        day_to_check = (self.Time.date() + BDay(1)).date()

        if day_to_check in self.dividend_data:
            payday_tickers = list(self.dividend_data[day_to_check].keys())

            long = []
            for symbol in self.active_universe:
                if symbol.Value in payday_tickers:
                    long.append(symbol) 
            
            if len(long) != 0:
                portfolio_value = (self.Portfolio.MarginRemaining * 1.2) / len(long)
                for symbol in long:
                    price = self.Securities[symbol].Price
                    if price != 0:
                        quantity = portfolio_value / price
                        self.MarketOnCloseOrder(symbol, quantity)

    def Selection(self):
        if self.Time.month % 3 == 0:
            self.selection_flag = True

# Custom fee model
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))
from AlgorithmImports import *
from datetime import datetime
import json
from pandas.tseries.offsets import BDay

# Class to store dividend information
class DividendInfo:
    def __init__(self, ticker, ex_div_date, payday, record_date, dividend_value, ann_dividend_value, announcement_date):
        self.ticker = ticker
        self.ex_div_date = ex_div_date
        self.payday = payday
        self.record_date = record_date
        self.dividend_value = dividend_value
        self.ann_dividend_value = ann_dividend_value
        self.announcement_date = announcement_date

# Custom class for handling probability data from Dropbox
class DropboxData(PythonData):
    def GetSource(self, config, date, isLiveMode):
        url = "https://www.dropbox.com/scl/fi/72x0szo3y42e58qvvtxbv/predicted_prob_cv_spypredictcyclical.csv?rlkey=k3t6zgnehs96wtzjhyhjiakyp&dl=1"
        return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config, line, date, isLiveMode):
        if not line.strip() or line.startswith('date,probpositive'):
            return None

        data = line.split(',')
        date = datetime.strptime(data[0], '%Y-%m-%d')

        dataObject = DropboxData()
        dataObject.Symbol = config.Symbol
        dataObject.Time = date
        dataObject.Value = float(data[1])
        return dataObject

# Trading algorithm main class
class TradingDividendPaydate(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2023, 11, 1)  # Start Date
        self.SetCash(100000)           # Set Strategy Cash

        # Define a symbol for market hours reference (e.g., SPY for U.S. equity market)
        self.spy = self.AddEquity('SPY', Resolution.Minute).Symbol
        self.AddData(DropboxData, "ProbabilityData", Resolution.Daily)

        # Store drip tickers
        csv_string_file = self.Download('https://data.quantpedia.com/backtesting_data/economic/drip_tickers.csv')
        lines = csv_string_file.split('\r\n')
        self.drip_tickers = [x for x in lines[1:]]
        self.Debug(f"Drip tickers: {self.drip_tickers}")

        self.active_universe = []
        
        self.UniverseSettings.Resolution = Resolution.Minute
        self.AddUniverse(self.FundamentalSelectionFunction)
        self.Selection()
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY", 10), self.Selection)
        self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.BeforeMarketClose(self.spy, 16), self.Rebalance)

        # Schedule the daily data fetching function
        self.api_key = "QYbTvNezBansXL1jzYp53dsOGtwYjjBH"  # Replace with your actual API key
        self.dividend_data = {}
        self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.AfterMarketOpen(self.spy), self.FetchDailyDividendData)

    def FetchDailyDividendData(self):
        # Determine the next business day for the pay date
        next_business_day = self.Time + BDay(1)
        formatted_date = next_business_day.strftime('%Y-%m-%d')

        # Fetch and parse Polygon API data for the next business day
        polygon_data = self.fetch_and_parse_polygon_data(self.api_key, formatted_date)
        self.dividend_data = self.create_dividend_info(polygon_data)

    def fetch_and_parse_polygon_data(self, api_key, pay_date):
        url = f"https://api.polygon.io/v3/reference/dividends?pay_date={pay_date}&apiKey={api_key}&limit=1000&all_pages=true"
        json_response = self.Download(url)

        try:
            data = json.loads(json_response)
            return data
        except json.JSONDecodeError as e:
            self.Debug(f"JSON parsing error: {e}")
            # Handle the error (e.g., return an empty list or a default value)
            return []

    def create_dividend_info(self, polygon_data):
        # Check if polygon_data contains 'results' and it's a list
        if 'results' not in polygon_data or not isinstance(polygon_data['results'], list):
            self.Debug(f"Invalid or unexpected data format in polygon_data: {polygon_data}")
            return {}

        dividend_info_dict = {}
        for item in polygon_data['results']:
            try:
                ticker = item['ticker']
                ex_div_date = datetime.strptime(item['ex_dividend_date'], "%Y-%m-%d").date()
                payday = datetime.strptime(item['pay_date'], '%Y-%m-%d').date()
                record_date = datetime.strptime(item['record_date'], '%Y-%m-%d').date()
                dividend_value = item['cash_amount']
                ann_dividend_value = None  # Assuming annual dividend value is not available
                announcement_date = None  # Assuming announcement date is not available

                # Create DividendInfo object and add it to the dictionary
                dividend_info = DividendInfo(ticker, ex_div_date, payday, record_date, dividend_value, ann_dividend_value, announcement_date)
                if payday not in dividend_info_dict:
                    dividend_info_dict[payday] = {}
                dividend_info_dict[payday][ticker] = dividend_info
            except KeyError as e:
                self.Debug(f"Key error in item: {item}, error: {e}")
            except ValueError as e:
                self.Debug(f"Value error in item: {item}, error: {e}")

        return dividend_info_dict

    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())

    def FundamentalSelectionFunction(self, fundamental):
        self.Debug("Universe Selection Function called")
        if not self.selection_flag:
            return Universe.Unchanged
        
        self.selection_flag = False

        selection = [x for x in fundamental if x.Symbol.Value in self.drip_tickers and x.MarketCap != 0 \
                    and ((x.SecurityReference.ExchangeId == "NYS") or (x.SecurityReference.ExchangeId == "NAS") or (x.SecurityReference.ExchangeId == "ASE"))]

        sorted_by_market_cap = sorted(selection, key=lambda x: x.MarketCap, reverse=True)
        half = int(len(sorted_by_market_cap) / 2)
        self.active_universe = [x.Symbol for x in sorted_by_market_cap[-half:]]

        # Log each symbol in the active universe for debugging
        #for symbol in self.active_universe:
            #self.Debug(f"Selected Symbol: {symbol.Value}")

        return self.active_universe

    def Rebalance(self):
        stocks_invested = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in stocks_invested:
            self.MarketOnCloseOrder(symbol, -self.Portfolio[symbol].Quantity)

        day_to_check = (self.Time.date() + BDay(1)).date()

        if day_to_check in self.dividend_data:
            payday_tickers = list(self.dividend_data[day_to_check].keys())

            long = []
            for symbol in self.active_universe:
                if symbol.Value in payday_tickers:
                    long.append(symbol)

            if len(long) != 0:
                portfolio_value = (self.Portfolio.MarginRemaining * 1.2) / len(long)
                for symbol in long:
                    price = self.Securities[symbol].Price
                    if price != 0:
                        quantity = portfolio_value / price
                        self.MarketOnCloseOrder(symbol, quantity)

    def Selection(self):
        self.selection_flag = True

# Custom fee model
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))