| Overall Statistics |
|
Total Trades
12988
Average Win
0.22%
Average Loss
-0.21%
Compounding Annual Return
5.655%
Drawdown
41.000%
Expectancy
0.060
Net Profit
89.737%
Sharpe Ratio
0.362
Probabilistic Sharpe Ratio
0.595%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
1.03
Alpha
0.063
Beta
-0.022
Annual Standard Deviation
0.166
Annual Variance
0.027
Information Ratio
-0.315
Tracking Error
0.228
Treynor Ratio
-2.748
Total Fees
$2103.39
Estimated Strategy Capacity
$2200000.00
Lowest Capacity Asset
CGC WURRV2N8PSH1
|
from QuantConnect.DataSource import *
import numpy as np
from enum import Enum
class BrainLanguageMetrics(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2010, 1, 1)
self.init_cash = 100000
self.SetCash(self.init_cash)
self.market = self.AddEquity('SPY', Resolution.Daily).Symbol
self.mkt = [] # benchmark chart data
# metric dictionary with signal optimism flag
# metric_dictionary:dict[int, (str, bool)] = {
# # 1 : ('SentenceCount', True),
# # 2 : ('MeanSentenceLength', True),
# # 3 : ('Sentiment', True),
# # 4 : ('Uncertainty', False),
# # 5 : ('Litigious', False),
# # 6 : ('Constraining', False),
# # 7 : ('Interesting', True),
# # 8 : ('Readability', True),
# 9 : ('LexicalRichness', True),
# 10 : ('LexicalDensity', True),
# 11 : ('SpecificDensity', True),
# 12 : ('SPY', True),
# }
self.metric_values = [
#'LexicalRichness', #9
'LexicalDensity', #10
# 'SpecificDensity' #11
]
# opt parameters
# self.metric_property:tuple = metric_dictionary[int(self.GetParameter("metric"))]
# self.metric_property:tuple = metric_dictionary[11]
# self.portfolio_size_property:int = int(self.GetParameter("portfolio_size"))
self.portfolio_size_property:int = 10
# self.universe_size_property:int = int(self.GetParameter("universe_size"))
self.universe_size_property:int = 500
# self.long = []
# self.short = []
self.traded_quantity = {}
self.metric = {}
self.metric_symbols = {}
self.price = {}
self.recent_universe = []
self.coarse_count = self.universe_size_property
self.selection_flag = False
self.rebalance_flag = False
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.MonthStart(self.market), self.TimeRules.AfterMarketOpen(self.market), self.Selection)
# self.Schedule.On(self.DateRules.EveryDay(self.market), self.TimeRules.AfterMarketOpen(self.market), self.PrintBenchmark)
def PrintBenchmark(self):
mkt_price = self.History(self.market, 2, Resolution.Daily)['close'].unstack(level=0).iloc[-1]
self.mkt.append(mkt_price)
mkt_perf = self.init_cash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.market, mkt_perf)
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
security.SetFeeModel(CustomFeeModel(self))
security.SetLeverage(10)
# remove recently stored metric value
for security in changes.RemovedSecurities:
symbol = security.Symbol
if symbol in self.metric:
del self.metric[symbol]
def CoarseSelectionFunction(self, coarse):
# return old universe if selection is not needed
if self.rebalance_flag and not self.selection_flag:
for stock in coarse:
symbol = stock.Symbol
if symbol in self.recent_universe:
self.price[symbol] = stock.AdjustedPrice
return self.recent_universe
if not self.selection_flag:
return Universe.Unchanged
self.selection_flag = False
if self.universe_size_property == 500 or self.universe_size_property == 1000:
# select top n stocks by dollar volume
selected = [x for x in sorted([x for x in coarse if x.HasFundamentalData],
key = lambda x: x.DollarVolume, reverse = True)[:self.coarse_count]]
elif self.universe_size_property == 3000:
selected = [x for x in coarse if x.HasFundamentalData]
for stock in selected:
symbol = stock.Symbol
self.price[symbol] = stock.AdjustedPrice
if symbol in self.metric:
continue
# create RollingWindow for specific stock symbol
# self.metric[symbol] = RollingWindow[float](self.period)
self.metric[symbol] = None
# subscribe to Brain Language Metrics data
dataset_symbol = self.AddData(BrainCompanyFilingLanguageMetrics10K , symbol).Symbol
# warmup Brain Language Metrics data
history = self.History(dataset_symbol, 3*30, Resolution.Daily)
# self.Debug(f"We got {len(history)} items from our history request for {dataset_symbol}")
if not history.empty:
metrics = []
for metric_value in self.metric_values:
m = getattr(history['reportsentiment'].iloc[-1], metric_value)
metrics.append(m)
# sent = history['reportsentiment'].iloc[-1].Sentiment
self.metric[symbol] = (history.iloc[-1].reportdate, metrics[0]) #, metrics[1])#, metrics[2])
# store metric symbol under stock symbol
self.metric_symbols[symbol] = dataset_symbol
# return stock, which have short interest data ready
return [x.Symbol for x in selected if x.Symbol in self.metric and x.Symbol in self.price]
def FineSelectionFunction(self, fine):
fine = [x for x in fine if x.MarketCap != 0
and ((x.SecurityReference.ExchangeId == "NYS")
or (x.SecurityReference.ExchangeId == "NAS")
or (x.SecurityReference.ExchangeId == "ASE"))]
if self.universe_size_property == 3000:
fine = sorted(fine, key = lambda x:x.MarketCap, reverse=True)[:self.coarse_count]
self.recent_universe = [x.Symbol for x in fine]
metric_cnt = len(self.metric_values)
for ms_i in range(metric_cnt):
metric = { stock.Symbol : self.metric[stock.Symbol][ms_i+1] for stock in fine \
if stock.Symbol in self.metric and \
self.metric[stock.Symbol] is not None and \
self.metric[stock.Symbol][ms_i+1] is not None and \
(self.Time - self.metric[stock.Symbol][0]).days <= 30
}
if len(metric) < self.portfolio_size_property:
continue
# sorting by metric
sorted_by_metric = sorted(metric.items(), key = lambda x: x[1], reverse=True)
percentile = int(len(sorted_by_metric) / self.portfolio_size_property)
long = [x[0] for x in sorted_by_metric[:percentile]]
short = [x[0] for x in sorted_by_metric[-percentile:]]
# calculate quantity for every stock in every portfolio
long_cnt = len(long)
short_cnt = len(short)
for symbol in long:
q = int(((self.Portfolio.TotalPortfolioValue / metric_cnt) / long_cnt) / self.price[symbol])
if symbol not in self.traded_quantity:
self.traded_quantity[symbol] = 0
self.traded_quantity[symbol] += q
for symbol in short:
q = -int(((self.Portfolio.TotalPortfolioValue / metric_cnt) / short_cnt) / self.price[symbol])
if symbol not in self.traded_quantity:
self.traded_quantity[symbol] = 0
self.traded_quantity[symbol] += q
# self.short = []
# self.long = []
return list(self.traded_quantity.keys())
def OnData(self, data):
# update metric value for each stock
for stock_symbol, metric_symbol in self.metric_symbols.items():
# check if there are data for subscribed metric_symbol
if metric_symbol in data and data[metric_symbol]:
metrics = []
for metric_value in self.metric_values:
m = getattr(data[metric_symbol].ReportSentiment, metric_value)
metrics.append(m)
# sent = data[metric_symbol].ReportSentiment.Sentiment
# update metric value for specific stock
self.metric[stock_symbol] = (self.Time, metrics[0])#, metrics[1])#, metrics[2])
# monthly rebalance
if not self.rebalance_flag:
return
self.rebalance_flag = False
if self.universe_size_property == 3000:
if self.Time.year in [2014, 2016] and self.Time.month == 6:
self.Liquidate()
return
self.Liquidate()
for symbol, q in self.traded_quantity.items():
if q != 0:
self.MarketOrder(symbol, q)
# long_c = len(self.long)
# short_c = len(self.short)
# for symbol in self.long:
# self.SetHoldings(symbol, 1/long_c)
# for symbol in self.short:
# self.SetHoldings(symbol, -1/short_c)
# self.weight.clear()
# self.long.clear()
# self.short.clear()
self.traded_quantity.clear()
def Selection(self):
# if metric is market, hold SPY only without rebalance and selection
# if self.metric_property[0] == self.market.Value:
# if not self.Portfolio[self.market].Invested:
# self.SetHoldings(self.market, 1)
# else:
# new universe selection every three months
if self.Time.month % 3 == 0:
self.selection_flag = True
# rebalance once a month
self.rebalance_flag = True
# Custom fee model
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))
# Quandl short interest data.
class QuandlShortVolume(PythonQuandl):
def __init__(self):
self.ValueColumnName = 'SHORTVOLUME' # also 'TOTALVOLUME' is accesible