| Overall Statistics |
|
Total Trades 216 Average Win 2.08% Average Loss -2.31% Compounding Annual Return 24.014% Drawdown 31.900% Expectancy 0.322 Net Profit 113.457% Sharpe Ratio 1.107 Probabilistic Sharpe Ratio 50.163% Loss Rate 30% Win Rate 70% Profit-Loss Ratio 0.90 Alpha 0.205 Beta 0.3 Annual Standard Deviation 0.196 Annual Variance 0.038 Information Ratio 0.802 Tracking Error 0.221 Treynor Ratio 0.721 Total Fees $1831.59 |
from QuantConnect.Data.UniverseSelection import *
import math
import numpy as np
import pandas as pd
import scipy as sp
# import statsmodels.api as sm
class FundamentalFactorAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2002, 4, 4) #Set Start Date
self.SetEndDate(2005, 10, 10) #Set Start Date
self.SetCash(50000) #Set Strategy Cash
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.spy = self.AddEquity("SPY", Resolution.Hour).Symbol
self.holding_months = 1
self.num_screener = 40
self.num_stocks = 4
self.formation_days = 126
self.lowmom = False
self.month_count = self.holding_months
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.At(0, 0), Action(self.monthly_rebalance))
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.At(10, 0), Action(self.rebalance))
# rebalance the universe selection once a month
self.rebalence_flag = 0
# make sure to run the universe selection at the start of the algorithm even it's not the manth start
self.first_month_trade_flag = 1
self.trade_flag = 0
self.symbols = None
def CoarseSelectionFunction(self, coarse):
if self.rebalence_flag or self.first_month_trade_flag:
# drop stocks which have no fundamental data or have too low prices
selected = [x for x in coarse if (x.HasFundamentalData) and (float(x.Price) > 5)]
# rank the stocks by dollar volume
filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True)
return [ x.Symbol for x in filtered[:1000]]
else:
return self.symbols
def FineSelectionFunction(self, fine):
if self.rebalence_flag or self.first_month_trade_flag:
try:
filtered_fine = [x for x in fine if (x.ValuationRatios.EVToEBITDA > 0)
and (x.EarningReports.BasicAverageShares.ThreeMonths > 0)
and x.EarningReports.BasicAverageShares.ThreeMonths * (x.EarningReports.BasicEPS.TwelveMonths*x.ValuationRatios.PERatio) > 2e9]
except:
filtered_fine = [x for x in fine if (x.ValuationRatios.EVToEBITDA > 0)
and (x.EarningReports.BasicAverageShares.ThreeMonths > 0)]
sortedByfactor1 = sorted(filtered_fine, key=lambda x: x.OperationRatios.ROIC.Value, reverse=True)
sortedByfactor2 = sorted(filtered_fine, key=lambda x: x.OperationRatios.LongTermDebtEquityRatio.Value, reverse=True)
sortedByfactor3 = sorted(filtered_fine, key=lambda x: x.ValuationRatios.FCFYield, reverse=True)
stock_dict = {}
# assign a score to each stock, you can also change the rule of scoring here.
for i,ele in enumerate(sortedByfactor1):
rank1 = i
rank2 = sortedByfactor2.index(ele)
rank3 = sortedByfactor3.index(ele)
score = sum([rank1*0.4,rank2*0.2,rank3*0.4])
stock_dict[ele] = score
# sort the stocks by their scores
self.sorted_stock = sorted(stock_dict.items(), key=lambda d:d[1],reverse=False)
sorted_symbol = [x[0] for x in self.sorted_stock]
self.sorted_stock = sorted(stock_dict.items(), key=lambda d:d[1],reverse=True)
#
self.sorted_symbol = [self.sorted_stock[i][0] for i in range(len(self.sorted_stock))]
top= self.sorted_symbol[:self.num_screener]
#top = sorted(filtered_fine, key = lambda x: x.ValuationRatios.FCFYield, reverse=True)[:self.num_screener]
self.symbols = [x.Symbol for x in top]
self.rebalence_flag = 0
self.first_month_trade_flag = 0
self.trade_flag = 1
return self.symbols
else:
return self.symbols
def OnData(self, data):
pass
def monthly_rebalance(self):
self.rebalence_flag = 1
def rebalance(self):
spy_hist = self.History([self.spy], 200, Resolution.Daily).loc[str(self.spy)]['close']
spy_hist1=self.History([self.spy], 50, Resolution.Daily).loc[str(self.spy)]['close']
if self.Securities[self.spy].Price < spy_hist.mean() and self.Securities[self.spy].Price < spy_hist1.mean():
for symbol in self.Portfolio.Keys:
if symbol.Value != "TLT":
self.Liquidate()
self.AddEquity("TLT")
self.SetHoldings("TLT", 1)
return
if self.Securities[self.spy].Price < spy_hist.mean() and self.Securities[self.spy].Price > spy_hist1.mean():
self.state=True
elif self.Securities[self.spy].Price > spy_hist.mean():
self.state=False
if self.symbols is None: return
chosen_df = self.calc_return(self.symbols)
chosen_df = chosen_df.iloc[:self.num_stocks]
self.existing_pos = 0
add_symbols = []
for symbol in self.Portfolio.Keys:
if symbol.Value == 'SPY': continue
if (symbol.Value not in chosen_df.index):
self.SetHoldings(symbol, 0)
elif (symbol.Value in chosen_df.index):
self.existing_pos += 1
weight = 0.99/len(chosen_df)
for symbol in chosen_df.index:
self.AddEquity(symbol)
self.SetHoldings(symbol, weight)
def calc_return(self, stocks):
hist = self.History(stocks, self.formation_days, Resolution.Daily)
current = self.History(stocks, 10, Resolution.Minute)
self.price = {}
ret = {}
for symbol in stocks:
if str(symbol) in hist.index.levels[0] and str(symbol) in current.index.levels[0]:
self.price[symbol.Value] = list(hist.loc[str(symbol)]['close'])
self.price[symbol.Value].append(current.loc[str(symbol)]['close'][0])
for symbol in self.price.keys():
ret[symbol] = (self.price[symbol][-1] - self.price[symbol][0]) / self.price[symbol][0]
df_ret = pd.DataFrame.from_dict(ret, orient='index')
df_ret.columns = ['return']
if self.state==True:
#if self.Securities[self.spy].Price < spy_hist.mean() and self.Securities[self.spy].Price > spy_hist1.mean()::
sort_return = df_ret.sort_values(by = ['return'], ascending = True)
else:
sort_return = df_ret.sort_values(by = ['return'], ascending = self.lowmom)
#sort_return = df_ret.sort_values(by = ['return'], ascending = self.lowmom)
return sort_return