| Overall Statistics |
|
Total Trades 55 Average Win 0.64% Average Loss -0.23% Compounding Annual Return 22.369% Drawdown 1.900% Expectancy 0.827 Net Profit 5.218% Sharpe Ratio 2.585 Probabilistic Sharpe Ratio 81.606% Loss Rate 52% Win Rate 48% Profit-Loss Ratio 2.80 Alpha 0 Beta 0 Annual Standard Deviation 0.059 Annual Variance 0.004 Information Ratio 2.585 Tracking Error 0.059 Treynor Ratio 0 Total Fees $2271.97 Estimated Strategy Capacity $32000000.00 Lowest Capacity Asset IFF R735QTJ8XC9X Portfolio Turnover 7.36% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 3, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 25
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# benchmarking against SPY
self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
self.mkt = []
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 40
self.num_fine_symbols = 7 # keeps 8 since python is zero-indexed
# Train immediately
self.Train(self.classifier_training)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 10), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 20), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen("SPY", 10),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.AfterMarketOpen("SPY", 10),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
return
#if self.trained:
# self.Liquidate() #Liquidate the whole portfolio
# self.make_predictions()
# self.LinOptProg()
# self.Debug(self.weights)
# a_securities = [s for s in self.securities]
# for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
# if wt != 0:
# self.Debug(f"{security.Symbol}: {wt}")
# self.SetHoldings(security.Symbol, wt)
# self.trained = False
def OnEndOfDay(self):
# code here plots benchmark against our portfolio performance on the equity chart
mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1]
self.mkt.append(mkt_price)
mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.MKT, mkt_perf)
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
def gb_returns(self, X, y, security):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 585,
n_iter_no_change = 20)
mean_fit_out = mean_clf.fit(X,y)
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
direct = mean_fit_out.predict(prediction_data)
if direct >= 0:
quantile_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 20,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 585)
elif direct < 0:
quantile_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 20,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 585)
quantile_fit_out = quantile_clf.fit(X,y)
return (mean_fit_out, quantile_fit_out)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
self.Debug(dirs)
bounds = [(0, min(0.6, 2.5 / len(self.returns))) if d == 1 else (max(-0.6, -2.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.02, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends?
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
#tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
self.Log(full_data)
return full_data
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=30):
"""
Calculates the (absolute) value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
days : int
number of days the VaR is calculated over. 30 by default
"""
cov_matrix = returns.cov()
cov_matrix
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, stdev, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
CVaR = conf_level**-1 * norm.pdf(conf_level) * stdev - returns
return CVaR
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 2, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 25
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# benchmarking against SPY
self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
self.mkt = []
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.no_trade = True
# Requesting data
# small number of coarse / fine symbols for testing, eventually increase
# self.AddUniverseSelection(ScheduledUniverseSelectionModel(
# self.DateRules.Every(DayOfWeek.Sunday),
# self.TimeRules.At(1,0),
# FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)))
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 40
self.num_fine_symbols = 4 # keeps 5 since python is zero-indexed
# self._changes = None
# Train immediately
self.Train(self.classifier_training)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.EveryDay(), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
# filtering reduces run time, may want to choose a max number to select based on some criteria?
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
# condition to select top n stocks
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
# self.Log(f"{self.weights}")
# if we have no changes, do nothing
# if self._changes is None:
# return
# liquidate removed securities
# for security in self._changes.RemovedSecurities:
# if security.Invested:
# self.Liquidate(security.Symbol)
# we want 1/n allocation in each security in our universe
# n = len(self._changes.AddedSecurities)
# for security in self._changes.AddedSecurities:
# self.SetHoldings(security.Symbol, 0.7/n)
# self._changes = None
if self.no_trade:
self.make_predictions()
self.LinOptProg()
self.Debug(self.weights)
a_securities = [s for s in self.securities]
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
self.Liquidate(security.Symbol)
if wt != 0:
self.Debug(f"{security.Symbol}: {wt}")
self.SetHoldings(security.Symbol, wt)
self.no_trade = False
def OnEndOfDay(self):
# code here plots benchmark against our portfolio performance on the equity chart
mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1]
self.mkt.append(mkt_price)
mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.MKT, mkt_perf)
# put stuff here to see if it's working in the algo
#for i in self.securities:
#self.Log(f"At end of day we have {i.Symbol.Value} in universe")
self.no_trade = True
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def classifier_training(self):
# self.direction_mods = []
self.return_mods = []
self.quantile_mods = []
# FIX THIS LATER, TEMPORARY ATTEMPT - Will
active_securities = [s.Symbol.Value for s in self.securities]
self.Debug(active_securities)
self.Log(f"Training Started at {self.Time}")
# self.Log(f"Length of Active Securities : {len(self.securities)}")
self.dat = self.get_all_data(active_securities) # get tickers
# self.Log(f"Shape of Training Data : {self.dat.shape}")
for security in active_securities:
# self.Log(f"Security: {security}")
data = self.dat[self.dat["symbol"] == security]
# self.Log(f"data shape : {data.shape}")
try:
# y_class = data["direction"]
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
# Train the models
# classf = self.gb_select(X, y_class)
(ret, qut) = self.gb_returns(X, y_reg)
except: # If the training fails, return No Model
# classf = 'NoModel'
ret = "NoModel"
qut = "NoModel"
# self.direction_mods.append(classf)
self.return_mods.append(ret)
self.quantile_mods.append(qut)
# self.Log(f"Training Ended at {self.Time}")
del self.dat
def make_predictions(self):
# self.directions = []
self.returns = []
self.quantiles = []
act_securities = [s.Symbol.Value for s in self.securities]
self.dat = self.will_get_all_data(act_securities, training=False)
for i in range(len(act_securities)):
security = act_securities[i]
data = self.dat[self.dat["symbol"] == security]
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
# c_pred = self.direction_mods[i].predict(prediction_data)
r_pred = self.return_mods[i].predict(prediction_data)
q_pred = self.quantile_mods[i].predict(prediction_data)
except:
r_pred = 0
q_pred = 0
# self.directions.append(c_pred)
self.returns.append(r_pred[0])
self.quantiles.append(q_pred[0])
del self.dat
del self.return_mods
del self.quantile_mods
# def gb_select(self, X, y):
# """
# Gradient Boosting implementation for classification
# """
# clf = GradientBoostingClassifier(n_estimators = 100,
# learning_rate = 0.05,
# max_depth = 6,
# random_state = 1693)
# clf_fit = clf.fit(X, y)
#
# return clf_fit
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 100,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 20)
quantile_clf = GradientBoostingRegressor(n_estimators = 100,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 20,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_out = quantile_clf.fit(X,y)
mean_fit_out = mean_clf.fit(X,y)
return (mean_fit_out, quantile_fit_out)
def LinOptProg(self):
"""
Convex optimization Function
"""
#if len(self.returns) == 0:
# return
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = (0, min(0.6, 2 / len(self.returns)))
A = np.array([-1*np.multiply(dirs, self.quantiles), np.ones(len(dirs)).reshape(-1,1)]).squeeze()
b = np.array([0.02, 1])
res = linprog(-1*abs(self.returns), A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(returns))
del self.returns
del self.quantiles
def get_all_data(self,tickers):
"""
Gets historical data for training from tickers argument,
pass in tickers in active universe
"""
self.dat = pd.DataFrame()
for ticker in tickers:
# Create a Quant Book Object as in Research
qb=QuantBook()
symbol = qb.AddEquity(ticker,Resolution.Daily).Symbol
# Get Price History
history = self.History(symbol, self.lookback+6)
# convert the historical data to a pandas DataFrame
df = pd.DataFrame(history)
df['direction'] = np.where(df['close'] > df['open'], 1, 0)
df['return']=df['close'].pct_change()
# Prepare indicator data
start = self.Time-timedelta(days=self.lookback*2+20)
end = self.Time
rsi = qb.Indicator(RelativeStrengthIndex(20),symbol, Resolution.Daily,start=start,end=end)
bbdf = qb.Indicator(BollingerBands(20, 2), symbol, Resolution.Daily,start=start,end=end)
sma = qb.Indicator(SimpleMovingAverage(20), symbol, Resolution.Daily,start=start,end=end)
# Add relevant columns
final_df=df.droplevel(0).join(rsi).join(bbdf).join(sma)
final_df['price_diff']=final_df["open"]-final_df["simplemovingaverage"]
final_df['band_diff_up']=final_df["open"]-final_df["upperband"]
final_df['band_diff_mid']=final_df["open"]-final_df["middleband"]
final_df['band_diff_lower']=final_df["open"]-final_df["lowerband"]
#Add tiingo data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, 2*self.lookback,Resolution.Daily)
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat(
[tiingo.drop(['polarity'], axis=1),
tiingo['polarity'].apply(pd.Series)], axis=1)
tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
hist=final_df.join(tiingo.set_index('publisheddate'))
# Create 5 days lag data
lags = range(1, 5)
final_df=hist.assign(**{
f'{col} (t-{lag})': final_df[col].shift(lag)
for lag in lags
for col in final_df
}).dropna().drop(columns = ['close','high','low','volume'])
final_df['symbol'] = ticker
self.dat=pd.concat([self.dat, final_df])
self.Debug(self.dat.shape)
return self.dat
def will_get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 25 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
ind_lookback = 20 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
elif training:
hist_lookback = self.lookback + 6
ind_lookback = self.lookback * 2 + 20
tiingo_lookback = self.lookback * 2
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
ind_lookback = 20 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
self.dat = pd.DataFrame()
for ticker in tickers:
# Create a Quant Book Object as in Research
qb=QuantBook()
symbol = qb.AddEquity(ticker, Resolution.Daily).Symbol
# Get Price History
# fix this, don't need qb history in second one maybe?
# or maybe use delta in indicators?
if historical:
history = self.History(symbol, hist_lookback)
else:
history = qb.History(symbol, hist_lookback)
# convert the historical data to a pandas DataFrame
df = pd.DataFrame(history)
df['direction'] = np.where(df['close'] > df['open'], 1, 0)
df['return']=df['close'].pct_change()
# Prepare indicator data
if historical:
start = self.Time-timedelta(days=ind_lookback)
end = self.Time
rsi = qb.Indicator(RelativeStrengthIndex(20),symbol, Resolution.Daily, start=start,end=end)
bbdf = qb.Indicator(BollingerBands(20, 2), symbol, Resolution.Daily, start=start,end=end)
sma = qb.Indicator(SimpleMovingAverage(20), symbol, Resolution.Daily, start=start,end=end)
else:
rsi = qb.Indicator(RelativeStrengthIndex(20), symbol, ind_lookback + 1, Resolution.Daily)
bbdf = qb.Indicator(BollingerBands(20, 2), symbol, ind_lookback + 1, Resolution.Daily)
sma = qb.Indicator(SimpleMovingAverage(20), symbol, ind_lookback, Resolution.Daily)
# Add relevant columns
final_df=df.droplevel(0).join(rsi).join(bbdf).join(sma)
final_df['price_diff']=final_df["open"]-final_df["simplemovingaverage"]
final_df['band_diff_up']=final_df["open"]-final_df["upperband"]
final_df['band_diff_mid']=final_df["open"]-final_df["middleband"]
final_df['band_diff_lower']=final_df["open"]-final_df["lowerband"]
# Add tiingo data
if historical:
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, tiingo_lookback, Resolution.Daily)
else:
data = qb.AddData(TiingoNews, symbol).Symbol
tiingo = qb.History(data, tiingo_lookback, Resolution.Daily)
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
hist = final_df.join(tiingo)
#Create 5 days lag data
lags = range(1, 5)
final_df=hist.assign(**{
f'{col} (t-{lag})': final_df[col].shift(lag)
for lag in lags
for col in final_df
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
final_df['symbol'] = ticker
self.dat=pd.concat([self.dat, final_df])
return self.datfrom AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
from scipy.stats import norm
import math
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 6, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 50
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
# Risk threshold
self.risk_threshold = -0.015
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 100
self.num_fine_symbols = 10
# Train immediately
self.Train(self.classifier_training)
self.Schedule.On(self.DateRules.On(2023, 4, 19),
self.TimeRules.Now,
self.actions)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.At(10, 0),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if len(self.securities) == 0:
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
elif self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if len(self.securities) == 0:
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
return
def OnEndOfDay(self):
self.yesterday_total_profit = self.Portfolio.TotalProfit
self.yesterday_total_fees = self.Portfolio.TotalFees
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
# def actions(self):
# self.Liquidate() #Liquidate the whole portfolio
# self.make_predictions()
# self.LinOptProg()
# a_securities = [s for s in self.securities]
# for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
# if wt != 0:
# self.SetHoldings(security.Symbol, wt)
def actions(self):
self.Liquidate() #Liquidate the whole portfolio
self.make_predictions()
self.LinOptProg()
lookback = 30
active_securities = [s.Symbol for s in self.securities]
position_resize = 0.6
# risk management
if len(self.weights) > 0:
history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily)
history = history['close'].unstack(level=0)
history.columns = active_securities
returns = history.pct_change()
w = np.array([i[0] for i in self.weights])
VaR = self.value_at_risk(returns, w) # calculation of value-at-risk limit
self.Debug(f"VaR={VaR}")
# position sizing
if VaR <= self.risk_threshold: # if estimated loss in the next day is greater than our maximum risk threshold
self.Debug(f"estimated risk {VaR} exceeds threshold")
reduction_size = self.risk_threshold - VaR
for (security, wt) in zip(active_securities, [i[0] for i in self.weights]):
quantity = self.CalculateOrderQuantity(security, wt)
reduced_quantity = math.ceil(quantity * position_resize)
if reduced_quantity != 0:
self.Debug(f"VaR limit reached; expected loss is {VaR}. Reducing position size of \
{security} from {quantity} to {reduced_quantity}")
self.MarketOrder(security, reduced_quantity)
else:
a_securities = [s for s in self.securities]
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
if wt != 0:
self.SetHoldings(security.Symbol, wt)
self.prices_at_order[security.Symbol] = self.Securities[security.Symbol].Price
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
self.Debug(self.returns)
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 15)
mean_fit_out = mean_clf.fit(X,y)
quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_clf_st = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_lg = quantile_clf_lg.fit(X,y)
quantile_fit_st = quantile_clf_st.fit(X,y)
return (mean_fit_out, quantile_fit_lg, quantile_fit_st)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.01, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(self.returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends?
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
#tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
self.Log(full_data)
return full_data
def get_daily_realized_pnl(self):
daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit
daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees
return daily_gross_profit - daily_fees
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1):
"""
Calculates the value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
num_days : int
length of the period the VaR is calculated over
"""
cov_matrix = returns.cov()
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, weights, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
VaR = value_at_risk(returns, weights)
return VaR.mean()
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
from scipy.stats import norm
import math
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2022, 12, 23)
self.SetEndDate(2023, 3, 24)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 50
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
self.risk_threshold = -0.015
# self.SetWarmup(timedelta(days=50))
self.prices_at_order = {}
self.yesterday_total_profit = 0
self.yesterday_total_fees = 0
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 100
self.num_fine_symbols = 10
# Train immediately
self.Train(self.classifier_training)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.At(10, 0),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if self.Time.day == 23 and self.Time.month == 12:
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
if self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if self.Time.day == 23 and self.Time.month == 12:
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
return
def OnEndOfDay(self):
self.yesterday_total_profit = self.Portfolio.TotalProfit
self.yesterday_total_fees = self.Portfolio.TotalFees
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def actions(self):
self.Liquidate() #Liquidate the whole portfolio
self.make_predictions()
self.LinOptProg()
lookback = 30
active_securities = [s.Symbol for s in self.securities]
# risk management
if len(self.weights) > 0:
history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily)
history = history['close'].unstack(level=0)
history.columns = active_securities
returns = history.pct_change()
w = np.array([i[0] for i in self.weights])
VaR = self.value_at_risk(returns, w) # calculation of value-at-risk limit
self.Debug(f"VaR={VaR}")
# position sizing
max_loss_dollars = self.InitCash * self.risk_threshold # maximum loss in dollars we are willing to have in one trade
# self.Debug(f"max_loss_dollars={max_loss_dollars}")
if VaR <= self.risk_threshold: # if estimated loss in the next day is greater than our maximum risk threshold
self.Debug(f"estimated risk {VaR} exceeds threshold")
reduction_size = self.risk_threshold - VaR
for (security, wt) in zip(active_securities, [i[0] for i in self.weights]):
quantity = self.CalculateOrderQuantity(security, wt)
reduced_quantity = math.ceil(quantity * 0.6)
if reduced_quantity != 0:
self.Debug(f"VaR limit reached; expected loss is {VaR}. Reducing position size of \
{security} from {quantity} to {reduced_quantity}")
self.MarketOrder(security, reduced_quantity)
else:
a_securities = [s for s in self.securities]
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
if wt != 0:
self.SetHoldings(security.Symbol, wt)
self.prices_at_order[security.Symbol] = self.Securities[security.Symbol].Price
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
self.Debug(self.returns)
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 15)
mean_fit_out = mean_clf.fit(X,y)
quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_clf_st = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_lg = quantile_clf_lg.fit(X,y)
quantile_fit_st = quantile_clf_st.fit(X,y)
return (mean_fit_out, quantile_fit_lg, quantile_fit_st)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.01, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(self.returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
return full_data
def get_daily_realized_pnl(self):
daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit
daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees
return daily_gross_profit - daily_fees
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1):
"""
Calculates the value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
num_days : int
length of the period the VaR is calculated over
"""
cov_matrix = returns.cov()
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, weights, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
VaR = value_at_risk(returns, weights)
return VaR.mean()
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
"""
Parameters to consider:
-----------------------
number of fine / coarse
boosting hyperparameters
fundamental criteria (changing them did worse)
training / buying frequency
lookback period (careful with periods needed for bb / rsi)
To Do:
------
maybe remove mkt benchmark
remove unused comments
"""
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 6, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 25
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# benchmarking against SPY
self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
self.mkt = []
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 40
self.num_fine_symbols = 7 # keeps 8 since python is zero-indexed
# Train immediately
self.Train(self.classifier_training)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 10), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 20), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen("SPY", 10),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.AfterMarketOpen("SPY", 10),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
return
# Put CVaR and VaR here
def OnEndOfDay(self):
# code here plots benchmark against our portfolio performance on the equity chart
mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1]
self.mkt.append(mkt_price)
mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.MKT, mkt_perf)
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def actions(self):
self.Liquidate() #Liquidate the whole portfolio
self.make_predictions()
self.LinOptProg()
a_securities = [s for s in self.securities]
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
if wt != 0:
self.SetHoldings(security.Symbol, wt)
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
self.Debug(self.returns)
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 15)
mean_fit_out = mean_clf.fit(X,y)
quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_clf_st = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_lg = quantile_clf_lg.fit(X,y)
quantile_fit_st = quantile_clf_st.fit(X,y)
return (mean_fit_out, quantile_fit_lg, quantile_fit_st)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.01, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(self.returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends?
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
#tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
self.Log(full_data)
return full_data
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=30):
"""
Calculates the (absolute) value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
days : int
number of days the VaR is calculated over. 30 by default
"""
cov_matrix = returns.cov()
cov_matrix
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, stdev, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
CVaR = conf_level**-1 * norm.pdf(conf_level) * stdev - returns
return CVaR
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
from scipy.stats import norm
import math
"""
Parameters to consider:
-----------------------
number of fine / coarse
boosting hyperparameters
fundamental criteria (changing them did worse)
training / buying frequency
lookback period (careful with periods needed for bb / rsi)
To Do:
------
maybe remove mkt benchmark
remove unused comments
"""
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 6, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 25
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# benchmarking against SPY
self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
self.mkt = []
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
# Value-at-Risk limit: maximum risk we are willing to take on any one trade,
# as a percentage loss of the investment capital
self.var_limit = -0.015
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 40
self.num_fine_symbols = 7 # keeps 8 since python is zero-indexed
# Train immediately
self.Train(self.classifier_training)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 10), self.TimeRules.At(4, 0), self.classifier_training)
#self.Train(self.DateRules.MonthStart(daysOffset = 20), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen("SPY", 10),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.AfterMarketOpen("SPY", 10),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
# risk management (value-at-risk)
lookback = 30
reduction_size = 0.8 # by how much to reduce the portion size when VaR limit is exceeded
active_securities = [s.Symbol for s in self.securities]
if len(self.weights) > 0:
history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily)
history = history['close'].unstack(level=0)
history.columns = active_securities
returns = history.pct_change()
w = np.array([i[0] for i in self.weights])
VaR = self.value_at_risk(returns, w)
# VaR is expressed as a negative quantity, so this is when the var_limit is reached
if VaR <= self.var_limit:
self.Debug(f"VaR limit reached; expected loss is {VaR}")
for (security, wt) in zip(self.securities, [i[0] for i in self.weights]):
quantity = self.CalculateOrderQuantity(security.Symbol, wt)
reduced_quantity = math.ceil(quantity * reduction_size)
if reduced_quantity != 0:
# reduce position size
self.MarketOrder(security.Symbol, reduced_quantity)
def OnEndOfDay(self):
# code here plots benchmark against our portfolio performance on the equity chart
mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1]
self.mkt.append(mkt_price)
mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.MKT, mkt_perf)
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def actions(self):
self.Liquidate() #Liquidate the whole portfolio
self.make_predictions()
self.LinOptProg()
a_securities = [s for s in self.securities]
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
if wt != 0:
self.SetHoldings(security.Symbol, wt)
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
self.Debug(self.returns)
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 15)
mean_fit_out = mean_clf.fit(X,y)
quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_clf_st = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_lg = quantile_clf_lg.fit(X,y)
quantile_fit_st = quantile_clf_st.fit(X,y)
return (mean_fit_out, quantile_fit_lg, quantile_fit_st)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.01, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(self.returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends?
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
#tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
self.Log(full_data)
return full_data
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1):
"""
Calculates the (absolute) value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
num_days : int
length of the period the VaR is calculated over
"""
cov_matrix = returns.cov()
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, weights, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
VaR = value_at_risk(returns, weights)
return VaR.mean()
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
from scipy.stats import norm
import math
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 6, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 50
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
# Value-at-Risk limit: maximum risk we are willing to take on any one trade,
# as a percentage loss of the investment capital
self.var_limit = -0.015
self.yesterday_total_profit = 0
self.yesterday_total_fees = 0
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 100
self.num_fine_symbols = 10
# Train immediately
self.Train(self.classifier_training)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.At(10, 0),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
return
def OnEndOfDay(self):
# code here plots benchmark against our portfolio performance on the equity chart
self.yesterday_total_profit = self.Portfolio.TotalProfit
self.yesterday_total_fees = self.Portfolio.TotalFees
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def actions(self):
self.Liquidate() #Liquidate the whole portfolio
self.make_predictions()
self.LinOptProg()
a_securities = [s for s in self.securities]
pnl = self.get_daily_realized_pnl()
# risk management (value-at-risk)
lookback = 30
reduction_size = 0.8 # by how much to reduce the portion size when VaR limit is exceeded
active_securities = [s.Symbol for s in self.securities]
if len(self.weights) > 0:
history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily)
history = history['close'].unstack(level=0)
history.columns = active_securities
returns = history.pct_change()
w = np.array([i[0] for i in self.weights])
self.var_limit = self.value_at_risk(returns, w)
# VaR is expressed as a negative quantity, so this is when the var_limit is reached
if pnl <= self.var_limit:
self.Debug(f"VaR limit ({self.var_limit}) reached; expected loss is {VaR}")
for (security, wt) in zip(self.securities, [i[0] for i in self.weights]):
quantity = self.CalculateOrderQuantity(security.Symbol, wt)
reduced_quantity = math.ceil(quantity * reduction_size)
if reduced_quantity != 0:
# reduce position size
self.MarketOrder(security.Symbol, reduced_quantity)
else:
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
if wt != 0:
self.SetHoldings(security.Symbol, wt)
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
self.Debug(self.returns)
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 15)
mean_fit_out = mean_clf.fit(X,y)
quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_clf_st = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_lg = quantile_clf_lg.fit(X,y)
quantile_fit_st = quantile_clf_st.fit(X,y)
return (mean_fit_out, quantile_fit_lg, quantile_fit_st)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.01, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(self.returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends?
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
#tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
self.Log(full_data)
return full_data
def get_daily_realized_pnl(self):
daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit
daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees
return daily_gross_profit - daily_fees
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1):
"""
Calculates the value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
num_days : int
length of the period the VaR is calculated over
"""
cov_matrix = returns.cov()
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, weights, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
VaR = value_at_risk(returns, weights)
return VaR.mean()
#region imports
from AlgorithmImports import *
#endregion
# Your New Python File
class FocusedGreenDonkey(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 10, 12)
self.SetCash(100000)
self.AddUniverse(self.MyCoarseFilterFunction)
self.symbolDataBySymbol = {}
def MyCoarseFilterFunction(self,coarse):
StocksUnder10 = [c for c in coarse if c.Price<10]
symbols = [c.Symbol for c in StocksUnder10 if c.Symbol not in self.symbolDataBySymbol]
history = self.History(symbols, 252, Resolution.Daily)
if not history.empty:
history = history.close.unstack(0)
for symbol in symbols:
if str(symbol) not in history:
continue
df = history[symbol].dropna()
if not df.empty:
self.symbolDataBySymbol[symbol] = SymbolData(self, symbol, df)
for x in coarse:
symbol = x.Symbol
if symbol in self.symbolDataBySymbol:
self.symbolDataBySymbol[symbol].Update(x.EndTime, x.AdjustedPrice)
selectedSymbols = [symbol for symbol, symbolData in self.symbolDataBySymbol.items() if symbolData.HasNewMax and symbolData.rsi.Current.Value>80]
return selectedSymbols
def OnData(self, data):
pass
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
self.MarketOrder(symbol, 100)
for security in changes.RemovedSecurities:
symbol = security.Symbol
self.Liquidate(symbol)
class SymbolData:
def __init__(self, algorithm, symbol, history):
self.symbol = symbol
self.max = Maximum(252)
self.rsi = RelativeStrengthIndex(10, MovingAverageType.Simple)
self.maxWindow = RollingWindow[IndicatorDataPoint](2)
self.max.Updated += self.OnMax
for time, close in history.iteritems():
self.Update(time, close)
def OnMax(self, sender, updated):
if self.max.IsReady:
self.maxWindow.Add(updated)
def Update(self, time, close):
self.max.Update(time, close)
self.rsi.Update(time,close)
@property
def HasNewMax(self):
if self.maxWindow.IsReady:
return self.maxWindow[0] > self.maxWindow[1]
else:
return Falsefrom AlgorithmImports import *
class USCoarseUniverseConstituentsDataAlgorithm(QCAlgorithm):
_number_of_symbols = 3
_changes = None
def Initialize(self):
self.SetStartDate(2021, 1, 1)
self.SetEndDate(2021, 7, 1)
self.SetCash(100000)
# Requesting data
self.AddUniverse(self.CoarseSelectionFunction)
def CoarseSelectionFunction(self, coarse):
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
return [ x.Symbol for x in sortedByDollarVolume[:self._number_of_symbols] ]
def OnData(self, data):
# if we have no changes, do nothing
if self._changes is None: return
# liquidate removed securities
for security in self._changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol)
# we want 1/N allocation in each security in our universe
for security in self._changes.AddedSecurities:
self.SetHoldings(security.Symbol, 1 / self._number_of_symbols)
self._changes = None
self.Log({security.Symbol})
def OnSecuritiesChanged(self, changes):
self._changes = changes
for security in changes.AddedSecurities:
# Historical data
history = self.History(security.Symbol, 7, Resolution.Daily)
self.Debug(f"We got {len(history)} from our history request for {security.Symbol}")from QuantConnect.Data.Custom.Tiingo import *
from AlgorithmImports import *
# ref: https://www.quantconnect.com/forum/discussion/6638/new-data-source-tiingo-news-data/p1
class TiingoNLPDemonstration(QCAlgorithm):
def Initialize(self):
self.wordSentiment = {
"bad": -0.5, "good": 0.5,
"negative": -0.5, "great": 0.5,
"growth": 0.5, "fail": -0.5,
"failed": -0.5, "success": 0.5, "nailed": 0.5,
"beat": 0.5, "missed": -0.5,
}
self.SetStartDate(2022, 1, 1)
self.SetCash(8000)
aapl = self.AddEquity("AAPL", Resolution.Hour).Symbol
self.aaplCustom = self.AddData(TiingoNews, aapl).Symbol
def OnData(self, data):
if not data.ContainsKey(self.aaplCustom):
return
news = data[self.aaplCustom]
descriptionWords = news.Description.lower().split(" ")
intersection = set(self.wordSentiment.keys()).intersection(descriptionWords)
sentimentSum = sum([self.wordSentiment[i] for i in intersection])
self.SetHoldings(self.aaplCustom.Underlying, sentimentSum)from AlgorithmImports import *
import pandas as pd
class Algorithm(QCAlgorithm()):
def Initialize(self):
pass
def value_at_risk(self, returns, weights):
"""
Need to modify -- check if returns are approximately normally distributed
or needs modifications on this assumption
---------------------------------------------------
Parameters
returns: pd.DataFrame
periodic returns
weights: np.array
portfolio weights
"""
cov_matrix = returns.cov()
cov_matrix
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
mean_investment = (1+portfolio_mean) * self.InitCash
stdev_investment = self.InitCash * portfolio_stdev
# confidence interval (95%)
conf = 0.05
cutoff = norm.ppf(conf, mean_investment, stdev_investment)
var_1d1 = self.InitCash - cutoff
# Calculate n Day VaR
var_array = []
num_days = 30
for x in range(1, num_days+1):
var_array.append(np.round(var_1d1 * np.sqrt(x),2))
return var_array
from AlgorithmImports import *
import numpy as np
import pandas as pd
from nltk.sentiment import SentimentIntensityAnalyzer
from scipy.optimize import linprog
from sklearn.ensemble import GradientBoostingRegressor
from scipy.stats import norm
import math
class XGP(QCAlgorithm):
def Initialize(self):
# set up
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2017, 6, 1)
self.InitCash = 10000000
self.SetCash(self.InitCash)
self.lookback = 50
# setting brokerage and reality modeling params
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel()))
# manually keeping track of securities
self.securities = []
# We're getting an error when this is removed
self.weights = []
self.trained = True
self.risk_threshold = -0.015
# self.SetWarmup(timedelta(days=50))
self.prices_at_order = {}
self.yesterday_total_profit = 0
self.yesterday_total_fees = 0
# Requesting data
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))
self.UniverseSettings.Resolution = Resolution.Daily
self.num_coarse_symbols = 100
self.num_fine_symbols = 10
# Train immediately
self.Train(self.classifier_training)
self.Schedule.On(self.DateRules.On(2023, 4, 19),
self.TimeRules.At(10,0),
self.actions)
# Train every Sunday at 4am or first day of month (because new universe)
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training)
self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.actions)
self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0),
self.TimeRules.At(10, 0),
self.actions)
def SelectCoarse(self, coarse):
"""selecting CoarseFundamental objects based on criteria in paper"""
if len(self.securities) == 0:
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
elif self.Time.day != 1:
return Universe.Unchanged
selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5]
sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]]
def SelectFine(self, fine):
"""selecting FineFundamental objects based on our criteria"""
if len(self.securities) == 0:
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
if self.Time.day != 1:
return Universe.Unchanged
selected = [f for f in fine if
f.ValuationRatios.PERatio < 100
and f.MarketCap > 300000000
and f.ValuationRatios.PEGRatio < 3
and f.OperationRatios.TotalDebtEquityRatio.Value < 2
and f.OperationRatios.CurrentRatio.Value > 1]
sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True)
return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]]
def OnSecuritiesChanged(self, changes):
"""triggers when Universe changes as result of filtering"""
for security in changes.AddedSecurities:
self.Debug(f"{self.Time}: Added {security}")
for security in changes.RemovedSecurities:
self.Debug(f"{self.Time}: Removed {security}")
added = changes.AddedSecurities
removed = changes.RemovedSecurities
self.securities = list(set(self.securities).union(set(added)).difference(set(removed)))
def OnData(self, data):
return
def OnEndOfDay(self):
self.yesterday_total_profit = self.Portfolio.TotalProfit
self.yesterday_total_fees = self.Portfolio.TotalFees
def OnOrderEvent(self, orderEvent):
"""logs the details of an order"""
self.Log(f'{orderEvent}')
# =====================================================================================================================
# begin custom functions
# =====================================================================================================================
def actions(self):
self.Liquidate() #Liquidate the whole portfolio
self.make_predictions()
self.LinOptProg()
lookback = 30
active_securities = [s.Symbol for s in self.securities]
# risk management
if len(self.weights) > 0:
history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily)
history = history['close'].unstack(level=0)
history.columns = active_securities
returns = history.pct_change()
w = np.array([i[0] for i in self.weights])
VaR = self.value_at_risk(returns, w) # calculation of value-at-risk limit
self.Debug(f"VaR={VaR}")
# position sizing
max_loss_dollars = self.InitCash * self.risk_threshold # maximum loss in dollars we are willing to have in one trade
# self.Debug(f"max_loss_dollars={max_loss_dollars}")
if VaR <= self.risk_threshold: # if estimated loss in the next day is greater than our maximum risk threshold
self.Debug(f"estimated risk {VaR} exceeds threshold")
reduction_size = self.risk_threshold - VaR
for (security, wt) in zip(active_securities, [i[0] for i in self.weights]):
# # price_diff = how much this security's price has dropped now
# # compared to the price at the time of order
# price_diff = self.prices_at_order[security] - self.Securities[security].Price
# self.Debug(f"price_diff: {price_diff}")
# if wt < 0: # for short positions
# price_diff = price_diff * -1 # so that price_diff is always negative
# num_shares_to_order = max_loss_dollars * wt / price_diff
# # make order within the risk limit
# self.MarketOrder(security, num_shares_to_order)
quantity = self.CalculateOrderQuantity(security, wt)
reduced_quantity = math.ceil(quantity * (1-reduction_size))
if reduced_quantity != 0:
self.Debug(f"VaR limit reached; expected loss is {VaR}. Reducing position size of \
{security} from {quantity} to {reduced_quantity}")
else:
a_securities = [s for s in self.securities]
for (security, wt) in zip(a_securities, [i[0] for i in self.weights]):
if wt != 0:
self.SetHoldings(security.Symbol, wt)
self.prices_at_order[security.Symbol] = self.Securities[security.Symbol].Price
def classifier_training(self):
self.return_mods = []
self.quantile_mods_lg = []
self.quantile_mods_st = []
#active_securities = [s.Symbol.Value for s in self.securities]
active_securities = [s.Symbol for s in self.securities]
self.Log(f"Training Started at {self.Time}")
for security in active_securities:
data = self.get_all_data([security], training=True, backtesting=False) # get tickers
try:
y_reg = data["return"]
X = data.drop(["direction", "return", "symbol"], axis = 1)
(ret, qut_lg, qut_st) = self.gb_returns(X, y_reg)
except:
ret = "NoModel"
qut_lg = "NoModel"
qut_st = "NoModel"
self.return_mods.append(ret)
self.quantile_mods_lg.append(qut_lg)
self.quantile_mods_st.append(qut_st)
self.trained = True
def make_predictions(self):
self.returns = []
self.quantiles = []
act_securities = [s.Symbol for s in self.securities]
for i in range(len(act_securities)):
security = act_securities[i]
data = self.get_all_data([security], training=False)
data = data[data.index == data.index.max()]
prediction_data = data.drop(["direction", "return", "symbol"], axis = 1)
try:
r_pred = self.return_mods[i].predict(prediction_data)[0]
except:
r_pred = 0
if r_pred > 0:
q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0]
elif r_pred < 0:
q_pred = self.quantile_mods_st[i].predict(prediction_data)[0]
else:
q_pred = 0
self.returns.append(r_pred)
self.quantiles.append(q_pred)
self.Debug(self.returns)
def gb_returns(self, X, y):
"""
Function to calculate expected returns and quantile loss
"""
mean_clf = GradientBoostingRegressor(n_estimators = 150,
loss = "squared_error",
criterion = "friedman_mse",
learning_rate = 0.05,
random_state = 1693,
n_iter_no_change = 15)
mean_fit_out = mean_clf.fit(X,y)
quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.05,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_clf_st = GradientBoostingRegressor(n_estimators = 150,
loss = "quantile",
alpha = 0.95,
n_iter_no_change = 15,
learning_rate = 0.05,
criterion = "friedman_mse",
random_state = 1693)
quantile_fit_lg = quantile_clf_lg.fit(X,y)
quantile_fit_st = quantile_clf_st.fit(X,y)
return (mean_fit_out, quantile_fit_lg, quantile_fit_st)
def LinOptProg(self):
"""
Convex optimization Function
"""
self.weights = []
self.returns = np.array(self.returns).reshape(-1,1)
self.quantiles = np.array(self.quantiles).reshape(-1,1)
dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1)
bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs]
A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze()
b = np.array([0.01, 1, 0])
res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds)
if res.status == 0:
self.weights = res.x.reshape(-1,1)
else:
self.Log("Optimization failed")
# If optimization fails, give uniform weight 0 (buy nothing)
self.weights = dirs * (1/len(self.returns))
del self.returns
del self.quantiles
def bollinger_bands(self, data, window=20, num_std=2):
# Calculate the moving average
data['MA'] = data['close'].rolling(window=window).mean()
# Calculate the standard deviation
data['STD'] = data['close'].rolling(window=window).std()
# Calculate the Bollinger Bands
data['Upper_BB'] = data['MA'] + (data['STD'] * num_std)
data['Lower_BB'] = data['MA'] - (data['STD'] * num_std)
return data
def calculate_rsi(self,data, period=20):
# Calculate the daily price changes (gains and losses)
delta = data['close'].diff().dropna()
# Separate gains and losses into their own series
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate the average gain and average loss
avg_gain = gains.ewm(com=period - 1, min_periods=period).mean()
avg_loss = losses.ewm(com=period - 1, min_periods=period).mean()
# Calculate the Relative Strength (RS)
rs = avg_gain / avg_loss
# Calculate the Relative Strength Index (RSI)
rsi = 100 - (100 / (1 + rs))
data['rsi']=rsi
return data
def get_all_data(self, tickers, historical=True, training=False, backtesting=True):
"""
Gets historical data for training and prediction
Parameters:
-----------
tickers : list
list of tickers to retrieve data
historical : Bool, default True
Flag to determine if we are training or backtesting; False if live trading
training : Bool, default False
If True, retrieves training data, a 90-day period. If performing predictions,
False retrieves most recent day of data. For example, if called at 8 A.M., retrieves
the previous trading days' data.
backtesting : Bool, default True
Flag to determine if we are backtesting or training
Return:
-------
self.dat : pd.DataFrame
DataFrame containing data
"""
if historical:
if backtesting:
shift_factor = 30 # overshooting and select the maximum
hist_lookback = 1 + shift_factor
tiingo_lookback = 12 # in case of weekends?
elif training:
hist_lookback = self.lookback + 25
tiingo_lookback = self.lookback * 1.5
else:
raise ValueError("Please train or backtest if using historical = True")
else:
shift_factor = 7 # needed so we can calculate lagged data
hist_lookback = 1 + shift_factor
tiingo_lookback = 1 + shift_factor # in case of weekends?
full_data = pd.DataFrame()
for symbol in tickers:
# Get Price History
history = self.History(symbol, hist_lookback)
history = pd.DataFrame(history)
# convert the historical data to a pandas DataFrame
history['direction'] = np.where(history['close'] > history['open'], 1, 0)
history['return']=history['close'].pct_change(periods=5)
history = self.bollinger_bands(history)
history = self.calculate_rsi(history)
# Add relevant columns
history['price_diff']=history["open"]-history["MA"]
history['band_diff_up']=history["open"]-history["Upper_BB"]
history['band_diff_lower']=history["open"]-history["Lower_BB"]
# Add Tiingo Data
data = self.AddData(TiingoNews, symbol).Symbol
tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily)
if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns):
analyzer = SentimentIntensityAnalyzer()
tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x))
tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1)
#tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative')
tiingo = tiingo[[ 'publisheddate', 'compound']]
tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date
tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum()
tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True)
tiingo.set_index('time',inplace=True)
history = history.join(tiingo)
lags = range(1,5)
history=history.assign(**{
f'{col} (t-{lag})': history[col].shift(lag)
for lag in lags
for col in history
}).dropna().drop(columns = ['close','high','low','volume'], errors='ignore')
history['symbol'] = symbol.Value
full_data=pd.concat([full_data, history])
self.Log(full_data)
return full_data
def get_daily_realized_pnl(self):
daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit
daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees
return daily_gross_profit - daily_fees
def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1):
"""
Calculates the value-at-risk of the portfolio.
---------------------------------------------------
Parameters
returns : pd.DataFrame
periodic returns
conf_level : float
confidence level. 0.05 by default
weights : np.array
portfolio weights
num_days : int
length of the period the VaR is calculated over
"""
cov_matrix = returns.cov()
avg_return = returns.mean()
portfolio_mean = avg_return.dot(weights)
portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights))
cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev)
# n-Day VaR
VaR = cutoff * np.sqrt(num_days)
return VaR
def cvar(self, returns, weights, conf_level=0.05):
"""
Calculates the portfolio CVaR
------------------------------------
Parameters
returns : pd.DataFrame
portfolio returns
stdev :
portfolio standard deviation
conf_level : float
confidence level
"""
VaR = value_at_risk(returns, weights)
return VaR.mean()