| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.616 Tracking Error 0.149 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from QuantConnect.DataSource import *
from AlgorithmImports import *
from etf_universe import ETFUniverse
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import statsmodels.api as sm
from scipy.stats.mstats import winsorize
from sklearn.linear_model import LinearRegression, LogisticRegression, ElasticNetCV, LassoCV, RidgeCV
from sklearn.metrics import mean_squared_error
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA
from math import sqrt
from IPython.display import display, HTML, IFrame
from scipy.stats import rankdata, ttest_ind, jarque_bera, mannwhitneyu, pearsonr, spearmanr, chisquare, shapiro
import numbers
from pytz import timezone
from factor import *
class DatasetAnalyzer:
"""
A class to analyze datasets listed on the QC data market.
"""
def __init__(self, dataset, dataset_tickers, universe, factors, sparse_data,
dataset_start_date, in_sample_end_date, out_of_sample_end_date,
label_function=None, return_prediction_period=1, marker_size=3):
"""
Retrieves historical price data for the universe securities and historical data for the factors under analysis.
The first 5 rows of the raw dataset history DataFrame is displayed, then the value functions for each of the factors
are applied.
Input:
- dataset
Class type of the dataset to analyze
- dataset_tickers
Either a ManualUniverse or ETFUniverse object that matches the `universe` (if the dataset is linked),
otherwise a list of tickers of dataset links (for example: ['REG'] for Regalytics)
- universe
A ManualUniverse or ETFUniverse object containing list of tickers to use when analyzing the relationship
between the dataset and security returns.
- factors
A list of Factors to analyze within the dataset
- sparse_data
Boolean to represent if the `dataset` is sparse.
- dataset_start_date
Start date of the dataset. Retrievable from the dataset listing.
- in_sample_end_date
Date to mark the end of the in-sample period.
- out_of_sample_end_date
Date to mark the end of the out-of-sample period.
- return_prediction_period
Number of days positions would be held for (the target variable)
- marker_size
Size of markers in plots
"""
self.qb = QuantBook()
self.dataset = dataset
self.factors = factors
self.sparse_data = sparse_data
self.dataset_start_date = dataset_start_date
self.in_sample_end_date = in_sample_end_date
self.out_of_sample_end_date = out_of_sample_end_date
self.return_prediction_period = return_prediction_period
self.linked_dataset = universe == dataset_tickers
self.transformed_dataset_history = pd.DataFrame()
self.hypothesis_test_p_value = {}
self.marker_size = marker_size
# Subscribe to universe of securities
symbols, self.security_timezone = universe.get_symbols(self.qb)
# Request historical price data
self.raw_history = self.qb.History(symbols, self.dataset_start_date, self.out_of_sample_end_date)
self.price_history = self.raw_history.open.unstack(level=0)
self.price_history = self.price_history.groupby(self.price_history.index.date).last()
self.price_history.index = [datetime.combine(date, datetime.min.time()) for date in self.price_history.index]
# Only consider securities with historical data
self.security_symbols = [x for x in symbols if x in self.price_history.columns]
self.dataset_symbols = []
# Subscribe to dataset
links = self.security_symbols if self.linked_dataset else dataset_tickers
for link in links:
dataset_subscription = self.qb.AddData(dataset, link)
self.dataset_timezone = dataset_subscription.Exchange.TimeZone
self.dataset_symbols.append(dataset_subscription.Symbol)
# Create labels
if label_function is None:
self.labels = self.price_history.pct_change(self.return_prediction_period).shift(-self.return_prediction_period).iloc[:-self.return_prediction_period]
else:
self.labels = label_function(self.raw_history)
# Calculate historical returns
self.return_history = self.price_history.pct_change(self.return_prediction_period).shift(-self.return_prediction_period).iloc[:-self.return_prediction_period]
# Request historical dataset data
dataset_str = str(self.dataset).split("'")[-2].split(".")[-1]
try:
dataset_hist = []
for symbol in self.dataset_symbols:
dataset_hist.append(self.qb.History(symbol, self.dataset_start_date, self.out_of_sample_end_date))
self.dataset_history = pd.concat(dataset_hist)
except Exception as e:
print(f"You don't have a subscription for {dataset_str}. Add it to your organization on the Pricing page.")
return
if self.dataset_history.empty:
print(f'No historical data was available for the {dataset_str} dataset.')
return
factor_names = [factor.name for factor in factors]
# Reformat DataFrame
self.dataset_history = self._process_nested_dataset(
self.dataset_history[~self.dataset_history.index.duplicated(keep="last")], factor_names)
try:
self.dataset_history = self.dataset_history.groupby(self.dataset_history.index.date).last()
except:
self.dataset_history = self.dataset_history.groupby(pd.to_datetime(self.dataset_history.index, utc=True).date).last()
self.dataset_history.index = [datetime.combine(date, datetime.min.time()) for date in self.dataset_history.index]
# Show the raw data
display(self.dataset_history.dropna(how='all').head().dropna(axis=1, how='all'))
self.transformed_dataset_history = pd.DataFrame()
# Transform raw factor values using the value function defined by the client
for factor in factors:
if factor.value_function is None:
df = self._default_value_function(self.dataset_history[factor.name], self.dataset_history.index, self.dataset_timezone, self.security_timezone)
else:
df = factor.value_function(self.dataset_history[factor.name], self.dataset_history.index, self.dataset_timezone, self.security_timezone)
df.columns = pd.MultiIndex.from_tuples([(factor.name, col) for col in df.columns])
self.transformed_dataset_history = pd.concat([self.transformed_dataset_history, df], axis=1)
# Timestamp of adjusted factor values should be when the factor value was acted upon
# factor_value_raw_timestamp => target_return_period
# -Sunday 12am => Monday open to Tuesday open
# -Monday 12am => Monday open to Tuesday open (timestamp: Tuesday 12am)
# -Tuesday 12am => Tuesday open to Wednesday open (timestamp: Wednesday 12am)
# -Wednesday 12am => Wednesday open to Thursday open (timestamp: Thursday 12am)
# -Thursday 12am => Thursday open to Friday open (timestamp: Friday 12am)
# -Friday 12am => Friday open to Monday open (timestamp: Saturday 12am)
# -Saturday 12am => Monday open to Tuesday open (timestamp: Tuesday 12am)
# In this ^ case, adjusted factor timestamps should match the timestamp of the bar where the trade was opened
## Remove securities that don't have data for the dataset
indices_to_remove = []
for i, dataset_symbol in enumerate(self.dataset_symbols):
if not all([dataset_symbol in self.transformed_dataset_history[factor.name].columns for factor in factors]):
indices_to_remove.append(i)
for factor in factors:
if dataset_symbol in self.transformed_dataset_history[factor.name].columns:
self.transformed_dataset_history.drop(pd.MultiIndex.from_tuples([(factor.name, dataset_symbol)]), axis=1, inplace=True)
for index in indices_to_remove[::-1]:
del self.dataset_symbols[index]
del self.security_symbols[index]
self.price_history = self.price_history.loc[:, self.security_symbols]
self.return_sub_history = self.return_history.reindex(self.transformed_dataset_history.index)
# Use a scatter plot if the factor values are sparse, otherwise use a line chart
self.dataset_plotting_mode = 'markers' if self.sparse_data else 'lines'
# Align all timestamps
self.price_history = self.price_history.reindex(self.return_sub_history.index)
def _process_nested_dataset(self, df, factor_names):
def _is_dataframe_all_numbers(df):
try:
df.apply(pd.to_numeric)
return True
except:
return False
if not _is_dataframe_all_numbers(df):
df = df.unstack(level=0).swaplevel(axis=1)
symbols = [col[0] for col in df.columns]
new_df = pd.DataFrame()
for factor in factor_names:
row = {}
for j in range(len(symbols)):
row[symbols[j]] = []
for i in range(df.shape[0]):
if isinstance(df.iloc[i, j], List):
df.iloc[i, j] = df.iloc[i, j][-1]
try:
if pd.isnull(df.iloc[i, j]):
row[symbols[j]].append(df.iloc[i, j])
else:
row[symbols[j]].append(eval(f"df.iloc[i, j].{factor}"))
except:
row[symbols[j]].append(np.nan)
new_row = pd.DataFrame(row)
new_df = pd.concat([new_df, new_row], axis=1)
new_df.columns = pd.MultiIndex.from_tuples([(symbol, factor) for factor in factor_names for symbol in symbols])
new_df.index = df.index
return new_df.swaplevel(axis=1)
return df[factor_names].unstack(level=0)
def _default_value_function(self, df, index, dataset_timezone, security_timezone):
"""
This function transforms the dataset's raw data into a numerical value.
The timestamps of the factor values returned from this method should match
the timestamps of the bar where the trade was opened in response to the factor value.
Input:
- df
DataFrame of factor values for each security in the universe
- index
The timestamps of when the security traded
- dataset_timezone
Timezone of the dataset
- security_timezone
Timezone of the security
Returns a DataFrame of adjusted numerical factor values.
"""
# Match timezones
if dataset_timezone != security_timezone:
match_timezones_func = lambda time: time.replace(tzinfo=timezone(str(dataset_timezone))).astimezone(timezone(str(security_timezone)))
df.index = df.index.map(match_timezones_func)
result_df = pd.DataFrame(columns=df.columns)
# If tz-aware index, remove tz-aware (so we can compare the indices in the snippet that comes after)
if isinstance(df.index, pd.Index):
new_index = pd.to_datetime(df.index, utc=True).tz_convert(None)
df.index = new_index
# Move dataset index forward by 1 day (since we open our trade on the day after we receive the factor value)
df.index = df.index + timedelta(1)
# Move forward index of other_dataset_history if its index elements don't align with `this_dataset_index`
for i in df.index:
adjusted_index_options = index[index >= i]
if len(adjusted_index_options) == 0:
continue
adjusted_index = adjusted_index_options[0]
row = df.loc[i]
row.name = adjusted_index
result_df.loc[adjusted_index] = row
# Drop duplicate indices
result_df = result_df[~result_df.index.duplicated(keep='last')]
# Align factor values with this_dataset_index
result_df = result_df.reindex(index)
# Drop rows that have only NaN values
result_df = result_df.dropna(axis=0, how='all')
return result_df
def plot_data_shape(self, num_securities=10, y_axis_title='', subplot_title_extension=''):
"""
Displays a time series plot for each factor using the values returned from the value function.
For linked datasets, the first `num_securities` are selected to have their factor values plotted.
Input:
- num_securities
Number of securities to plot factor values for. (Used for linked datasets)
- y_axis_title
Y axis title of each subplot
- subplot_title_extension
A string to add onto the end of the factor names to make them more understandable
"""
# Create Plotly figure
titles = []
for factor in self.factors:
title = factor.printable_name
if subplot_title_extension != '':
title += f' {subplot_title_extension}'
titles.append(title)
fig = make_subplots(rows=len(self.factors), cols=1, shared_xaxes=False, vertical_spacing=0.15, subplot_titles=tuple(titles))
current_row = 1
for factor in self.factors:
dataset_symbols = []
if self.linked_dataset:
for equity_symbol_index, security_symbol in enumerate(self.security_symbols[:num_securities]):
dataset_symbols.append(self.dataset_symbols[equity_symbol_index])
else:
dataset_symbols.append(self.dataset_symbols[0])
for dataset_symbol in dataset_symbols:
factor_values = self.transformed_dataset_history[factor.name][dataset_symbol]
fig.append_trace(go.Scatter(x=factor_values.index, y = factor_values, mode = self.dataset_plotting_mode, marker=dict(size=self.marker_size), name=str(dataset_symbol)),
row=current_row, col=1)
current_row += 1
fig.update_layout(title_text=f"Factor Values Over Time",
margin=dict(l=0, r=0, b=0),
showlegend=False,
height = (current_row-1) * 300)
for i, factor in enumerate(self.factors):
fig['layout'][f'yaxis{i+1}']['title']= y_axis_title
fig['layout'][f'xaxis{i+1}']['range'] = [self.transformed_dataset_history.index[0], self.transformed_dataset_history.index[-1]]
# Show the plot
fig.show()
def _convert_to_legend_format(self, value):
"""
A helper method to display write well-formated values in statistical plots.
Input:
- value
The value to be rounded or put into scientific notation
"""
rounded = round(value, 5)
if rounded != 0:
return str(rounded)
return "{:e}".format(value) # Scientific notation
def measure_significance(self):
"""
Displays R square, adjusted R square, t-test p-value, and F-test p-value.
Each one is selected when appropriate, depending on the universe size and number of factors.
"""
adj_r_squares = []
f_pvalues = []
r_squares_by_factor = {factor: [] for factor in self.factors}
t_pvalues_by_factor = {factor: [] for factor in self.factors}
# For each security, gather the regression results
for equity_symbol_index, security_symbol in enumerate(self.security_symbols):
dataset_symbol = self.dataset_symbols[equity_symbol_index if self.linked_dataset else 0]
# x-axis = factor values
factor_values = self.transformed_dataset_history.iloc[:, self.transformed_dataset_history.columns.get_level_values(1)==dataset_symbol]
# y-axis = returns
labels = self.labels[security_symbol]#equity_returns = self.return_sub_history[security_symbol]
# Align time stamps (incase there are NaN values)
intersect_index = sorted(list(set(labels.dropna().index).intersection(set(factor_values.dropna().index))))
factor_values = factor_values.reindex(intersect_index)
labels = labels.reindex(intersect_index)
# Fit the model using each factor individually
for i, factor in enumerate(self.factors):
if factor_values[factor_values.columns[i]].empty:
r_squares_by_factor[factor].append(0)
t_pvalues_by_factor[factor].append(1)
continue
x = sm.add_constant(factor_values[factor_values.columns[i]].values, has_constant='add')
temp_df = pd.DataFrame(x)
temp_df["labels"] = labels.values.reshape(-1, 1)
temp_df = temp_df.replace([np.inf, -np.inf], np.nan).dropna()
if temp_df.empty:
r_squares_by_factor[factor].append(0)
t_pvalues_by_factor[factor].append(1)
continue
model = sm.OLS(temp_df.iloc[:, -1], temp_df.iloc[:, :-1])
results = model.fit()
r_squares_by_factor[factor].append(results.rsquared)
t_pvalues_by_factor[factor].append(results.pvalues[1])
if len(self.factors) > 1:
# Fit the model using all the factors
if factor_values.empty:
r_squares_by_factor[factor].append(0)
t_pvalues_by_factor[factor].append(1)
continue
x = sm.add_constant(factor_values.values, has_constant='add')
temp_df = pd.DataFrame(x)
temp_df["labels"] = labels.values.reshape(-1, 1)
temp_df = temp_df.replace([np.inf, -np.inf], np.nan).dropna()
if temp_df.empty:
adj_r_squares.append(np.array([0]*factor_values.shape[1]))
f_pvalues.append(np.array([1]*factor_values.shape[1]))
continue
model = sm.OLS(temp_df.iloc[:, -1], temp_df.iloc[:, :-1])
results = model.fit()
adj_r_squares.append(results.rsquared_adj)
f_pvalues.append(results.f_pvalue)
max_x = 0 if len(self.factors) == 1 else max(adj_r_squares)
for r_squares in r_squares_by_factor.values():
max_x = max(max_x, max(r_squares))
if len(self.factors) > 1:
# Plot the results
fig = go.Figure()
fig.add_trace(go.Scatter(x=adj_r_squares, y=f_pvalues, mode='markers', showlegend=False, marker=dict(symbol='circle', opacity=0.7, color='white', size=8, line=dict(width=1), )))
fig.add_trace(go.Histogram2d(x=adj_r_squares, y=f_pvalues, colorscale='YlGnBu', xbins=dict(start=0.,end=1., size=0.05), autobinx=False, ybins=dict(start=0.,end=1., size=0.05)))#, nbinsx=20, nbinsy=20, zauto=True)) # colorscale='YlGnBu',
fig.update_layout(
title=f'All Factors',
margin=dict(l=0, r=0, b=0),
xaxis = dict(range = [-0.05, max_x + 0.05], title="Adjusted R<sup>2</sup>"),
yaxis = dict(range = [-0.05, 1.05], title='F-Statistic P-Value'),
height=500,
width=500,
hovermode='closest',
)
fig.show()
for factor in self.factors:
fig = go.Figure()
r_squares = r_squares_by_factor[factor]
t_pvalues = t_pvalues_by_factor[factor]
fig.add_trace(go.Scatter(x=r_squares, y=t_pvalues, mode='markers', showlegend=False, marker=dict(symbol='circle', opacity=0.7, color='white', size=8, line=dict(width=1), )))
fig.add_trace(go.Histogram2d(x=r_squares, y=t_pvalues, colorscale='YlGnBu', xbins=dict(start=0.,end=1., size=0.05), autobinx=False, ybins=dict(start=0.,end=1., size=0.05)))#, nbinsx=20, nbinsy=20, zauto=True)) # colorscale='YlGnBu',
fig.update_layout(
title=f'{factor.printable_name} Factor',
margin=dict(l=0, r=0, b=0),
xaxis = dict(range = [-0.05, max_x + 0.05], title="R<sup>2</sup>"),
yaxis = dict(range = [-0.05, 1.05], title='T-Test P-Value'),
height=500,
width=500,
hovermode='closest',
)
fig.show()
def calculate_statistics(self, winsorize_limits=(0.01, 0.01)):
"""
Displays a DataFrame of the following statistics: mean, std dev, skewness, kurtosis, & normality test P-value.
Input:
- winsorize_limits
Limits to exclude the top x% and bottom y% of outliers from the calculations.
"""
statistic_df = pd.DataFrame()
for factor in self.factors:
# Gather factor values for all the securities
all_factor_values = pd.Series(self.transformed_dataset_history[factor.name].values.flatten()).dropna()
# Remove outliers
all_factor_values = pd.Series(winsorize(all_factor_values, limits=winsorize_limits))
statistic_df.loc['Mean', factor.printable_name] = all_factor_values.mean()
statistic_df.loc['Standard deviation', factor.printable_name] = all_factor_values.std()
statistic_df.loc['Skewness', factor.printable_name] = all_factor_values.skew()
statistic_df.loc['Kurtosis', factor.printable_name] = all_factor_values.kurt()
if factor.data_type == 'continuous':
if len(all_factor_values) > 2000:
p_value = jarque_bera(all_factor_values).pvalue
else:
p_value = shapiro(all_factor_values).pvalue
else:
p_value = 'N/A'
statistic_df.loc['Normality test P-value', factor.printable_name] = p_value
statistic_df.index.names = ['Universe Statistic']
display(statistic_df)
def _update_correlation_results(self, results, df, factor, dataset_class_str, other_dataset_factor):
"""
A helper method to update correlation calculation results.
Input:
- results
The DataFrame containing all of the correlation results
- df
A DataFrame containing data from two datasets that needs the correlation calculated
- factor
The first factor we're calculating the correlation on
- dataset_class_str
The name of the dataset the second factor is from
- other_dataset_factor
The second factor we're calculating the correlation on
Returns the correlation results DataFrame
"""
# Calculate correlation
corr = df['this_dataset'].corr(df['other_dataset'])
hypothesis_test_p_value = self._get_p_value(df['this_dataset'], df['other_dataset'], factor.data_type, other_dataset_factor.data_type)
corr = '{:,.4f}'.format(corr)
hypothesis_test_p_value = '{:,.4f}'.format(hypothesis_test_p_value)
results.loc[f"{dataset_class_str}.{other_dataset_factor.factor_name}", f'{factor.printable_name}'] = f"{corr} ({hypothesis_test_p_value})"
return results
def calculate_factor_correlations(self, other_dataset_factor_by_class):
"""
Displays a DataFrames to show
- The correlation between each of the factors in the `dataset`
- The correlation each of the factors in the `dataset` and their respective correlation
with the factors of the datasets in the `other_dataset_factor_by_class` dictionary.
Input:
- other_dataset_factor_by_class
A dictionary (key: dataset class, value: a list of OtherDatasetFactor objects) that contains
the factors of other datasets we want to calculate the correlation with
"""
results = pd.DataFrame()
# Factor correlation within the main dataset
for factor_1 in self.factors:
#factor_df[factor.printable_name] = pd.Series(self.transformed_dataset_history[factor_1.name].values.flatten('F'))
for factor_2 in self.factors:
df = pd.DataFrame({'this_dataset': self.transformed_dataset_history[factor_1.name].values.flatten('F'),
'other_dataset': self.transformed_dataset_history[factor_2.name].values.flatten('F')}).dropna(axis=0, how='any')
# Calculate correlation
corr = df['this_dataset'].corr(df['other_dataset'])
hypothesis_test_p_value = self._get_p_value(df['this_dataset'], df['other_dataset'], factor_1.data_type, factor_2.data_type)
corr = '{:,.4f}'.format(corr)
hypothesis_test_p_value = '{:,.4f}'.format(hypothesis_test_p_value)
results.loc[f"{factor_2.printable_name}", f'{factor_1.printable_name}'] = f"{corr} ({hypothesis_test_p_value})"
# Calculate p-value of correlation
#results.loc[f"{self.dataset.__name__}.{factor_2.name}", f'P-value on Correlation with {factor_1.printable_name}'] = hypothesis_test_p_value
# Calculate correlation of the factors with factors from other datasets
# For linked securities, select the columns of the securities that are present in both datasets
# For unlinked securities, duplicate the columns
# To get one correlation value, move all of the columns in the DataFrames into one column before using `corr`
for dataset_class in other_dataset_factor_by_class:
dataset_class_str = str(dataset_class).split("'")[-2].split(".")[-1]
if dataset_class not in other_dataset_factor_by_class:
print(f"{dataset_class_str} was not provided in the `other_dataset_factor_by_class` dictionary. To use {dataset_class_str}, add it to the `other_dataset_factor_by_class` dictionary.")
continue
# Select one of the factors from the other datasets
for other_dataset_factor in other_dataset_factor_by_class[dataset_class]:
# Get start and end dates of correlation period
start_date = self.transformed_dataset_history.index[0]
end_date = self.transformed_dataset_history.index[-1]
if other_dataset_factor.link == SecurityType.Equity:
# Gather `other_dataset_factor` data for each security in the universe
other_dataset_symbols = []
other_dataset_timezone = None
for i, symbol in enumerate(self.security_symbols):
# Subscribe to the other dataset factor
other_dataset_subscription = self.qb.AddData(dataset_class, symbol)
other_dataset_timezone = other_dataset_subscription.Exchange.TimeZone
other_dataset_symbol = other_dataset_subscription.Symbol
other_dataset_symbols.append(other_dataset_symbol)
# Get historical data for the `other_dataset_factor`
try:
other_dataset_history = self.qb.History(other_dataset_symbols, start_date, end_date)
except:
print(f'You don\'t have a subscription to the {dataset_class_str} dataset. Add it to your organization on the Pricing page.')
continue
if other_dataset_history.empty:
print(f'No historical data was available for the {dataset_class_str} dataset.')
continue
other_dataset_history = other_dataset_history[other_dataset_factor.factor_name]
other_dataset_history = other_dataset_history[~other_dataset_history.index.duplicated(keep='last')].unstack(level=0)
for factor in self.factors:
# Get this dataset history
this_dataset_history = self.transformed_dataset_history[factor.name]
# Apply value function
other_dataset_history = other_dataset_factor.value_function(other_dataset_history, this_dataset_history.index, other_dataset_timezone, self.dataset_timezone)
# Align indices (incase timestamps were removed from the value function)
intersect_index = sorted(list(set(this_dataset_history.index).intersection(set(other_dataset_history.index))))
aligned_this_dataset_history = this_dataset_history.loc[intersect_index]
aligned_other_dataset_history = other_dataset_history.loc[intersect_index]
if self.linked_dataset:
# Get a lists of symbols for securities that are in both datasets
selected_this_dataset_symbols = []
selected_other_dataset_symbols = []
for i, other_dataset_symbol in enumerate(other_dataset_symbols):
if other_dataset_symbol not in aligned_other_dataset_history.columns:
continue
if self.dataset_symbols[i] not in aligned_this_dataset_history.columns:
continue
selected_this_dataset_symbols.append(self.dataset_symbols[i])
selected_other_dataset_symbols.append(other_dataset_symbol)
# Make a DataFrame of both histories so we can align the timestamps and drop rows with nan values
data = {
'this_dataset': aligned_this_dataset_history[selected_this_dataset_symbols].values.flatten('F'),
'other_dataset': aligned_other_dataset_history[selected_other_dataset_symbols].values.flatten('F')
}
df = pd.DataFrame(data).dropna(axis=0, how='any')
else:
# Get a lists of symbols for securities that are in the 'other' dataset
selected_other_dataset_symbols = []
for i, other_dataset_symbol in enumerate(other_dataset_symbols):
if other_dataset_symbol not in aligned_other_dataset_history.columns:
continue
selected_other_dataset_symbols.append(other_dataset_symbol)
for i in range(1, len(selected_other_dataset_symbols)):
aligned_this_dataset_history[f"column_{i}"] = aligned_this_dataset_history[aligned_this_dataset_history.columns[0]]
# Make a DataFrame of both histories so we can align the timestamps and drop rows with nan values
df = pd.DataFrame({'this_dataset': aligned_this_dataset_history.values.flatten('F'),
'other_dataset': aligned_other_dataset_history[selected_other_dataset_symbols].values.flatten('F')}).dropna(axis=0, how='any')
results = self._update_correlation_results(results, df, factor, dataset_class_str, other_dataset_factor)
else: # In this case, the dataset isn't linked to a security
# Subscribe to the other dataset
other_dataset_subscription = self.qb.AddData(dataset_class, other_dataset_factor.link)
other_dataset_symbol = other_dataset_subscription.Symbol
other_dataset_timezone = other_dataset_subscription.Exchange.TimeZone
# Gather historical data of the other dataset
try:
other_dataset_history = self.qb.History(other_dataset_symbol, start_date, end_date)
except:
print(f'You don\'t have a subscription to the {dataset_class_str} dataset. Add it to your organization on the Pricing page.')
continue
if other_dataset_history.empty:
print(f'No historical data was available for the {dataset_class_str} dataset.')
continue
other_dataset_history = other_dataset_history.loc[other_dataset_symbol][[other_dataset_factor.factor_name]]
for factor in self.factors:
# Get this dataset history
this_dataset_history = self.transformed_dataset_history[factor.name]
# Apply value function
other_dataset_history = other_dataset_factor.value_function(other_dataset_history, this_dataset_history.index, other_dataset_timezone, self.dataset_timezone)
# Align indices (incase timestamps were removed from the value function)
intersect_index = sorted(list(set(this_dataset_history.index).intersection(set(other_dataset_history.index))))
aligned_this_dataset_history = this_dataset_history.reindex(intersect_index)
aligned_other_dataset_history = other_dataset_history.reindex(intersect_index)
# Make duplicate columns of `aligned_other_dataset_history` so the number of columns matches `aligned_this_dataset_history`
for i in range(1, len(aligned_this_dataset_history.columns)):
aligned_other_dataset_history[f"column_{i}"] = aligned_other_dataset_history[aligned_other_dataset_history.columns[0]]
# Make a DataFrame of both histories so we can align the timestamps and drop rows nan values
df = pd.DataFrame({'this_dataset': aligned_this_dataset_history.values.flatten('F'),
'other_dataset': aligned_other_dataset_history.values.flatten('F')}).dropna(axis=0, how='any')
results = self._update_correlation_results(results, df, factor, dataset_class_str, other_dataset_factor)
results.columns = pd.MultiIndex.from_tuples([('Factor Correlation Coefficient (P-Value)', col) for col in results.columns])
display(results)
def _get_p_value(self, dataset_a, dataset_b, data_a_type, data_b_type, significance=0.05):
"""
Gets the p-value of two lists, considering the "type" of each list (continuous or discrete).
Input:
- dataset_a
First list of values.
- dataset_b
Second list of values.
- data_a_type
Type of `dataset_a` ('continuous' or 'discrete').
- data_b_type
Type of `dataset_b` ('continuous' or 'discrete').
- significance
Level of significance to use for the normality test.
Returns the p-value that results after applying the correct statistical test.
"""
np.seterr(divide='ignore')
if (isinstance(dataset_a, pd.Series) or isinstance(dataset_a, pd.DataFrame)) \
and (isinstance(dataset_b, pd.Series) or isinstance(dataset_b, pd.DataFrame)):
merge = pd.concat([dataset_a, dataset_b], axis=1).replace([np.inf, -np.inf], np.nan).dropna(axis=0)
dataset_a = merge.iloc[:, 0]
dataset_b = merge.iloc[:, 1]
else:
merge = np.concatenate([dataset_a.reshape(-1, 1), dataset_b.reshape(-1, 1)], axis=1)
merge = merge[np.isfinite(merge).all(axis=1), :]
dataset_a = merge[:, 0]
dataset_b = merge[:, 1]
if jarque_bera(dataset_b).pvalue < significance: # If normally distributed
return pearsonr(dataset_a, dataset_b)[1] # null hypothesis is that there is no linear relationship between the datasets
return spearmanr(dataset_a, dataset_b)[1] # null hypotheisis is that two sets of data are uncorrelated
def calculate_factor_importance(self, standardize=True):
"""
Displays a box plot showing the relative importance of each factor in determining future returns for
the securities in the universe, and a histogram showing the accuracy score of regularization.
Input:
- standardize
A boolean to represent if the data should be standardized before applying dimensionality reduction
"""
if len(self.factors) == 1:
print("Factor importance analysis is only available when analyzing multiple factors.")
return
models = [LassoCV, RidgeCV, ElasticNetCV] #
titles = [str(model.__name__) for model in models]
fig = make_subplots(rows=len(models), cols=1, subplot_titles=tuple(titles), vertical_spacing=0.1, shared_yaxes='all')
current_row = 1
results_by_model = {}
scores_by_model = {}
# Generate factor importance plots
for model in models:
results_by_model[model] = np.ndarray(shape=(0,len(self.factors)))
scores_by_model[model] = pd.DataFrame(columns=["Score"])
for symbol_index, security_symbol in enumerate(self.security_symbols):
dataset_symbol = self.dataset_symbols[symbol_index if self.linked_dataset else 0]
factor_importance_pct, factor_names, model_score = self._get_factor_importance_pct(dataset_symbol, security_symbol, model, standardize)
if factor_importance_pct is None:
continue
# Append row to results ndarray
results_by_model[model] = np.vstack([results_by_model[model], factor_importance_pct])
scores_by_model[model].loc[security_symbol] = np.array(model_score)
for i, factor in enumerate(self.factors):
fig.append_trace(go.Box(y=results_by_model[model][:, i], name=factor.printable_name, line_width=6, showlegend=False), row=current_row, col=1)
current_row += 1
# Update layout
for i in range(1, len(models)+1):
fig['layout'][f'yaxis{i}']['title']='Importance (%)'
fig.update_layout(title_text=f"Distribution of Explanatory Power of Each Factor for Each Security in the Universe",
yaxis_range=[0,1],
showlegend=False,
margin=dict(l=0, r=0, b=0),
height = (current_row-1) * 250)
fig.show()
# Generate the accuracy (variance explained) score plot
fig = go.Figure()
annotation_text = ""
for i, model in enumerate(models):
fig.add_trace(go.Histogram(
x=scores_by_model[model].values.flatten(),
name=str(model.__name__)))
scores = scores_by_model[model].values
minimum = self._convert_to_legend_format(scores.min())
maximum = self._convert_to_legend_format(scores.max())
mean = self._convert_to_legend_format(scores.mean())
std = self._convert_to_legend_format(scores.std())
if i > 0:
annotation_text += "<br>"
annotation_text += f"{str(model.__name__)}<br>-Minimum: {minimum}<br>-Maximum: {maximum}<br>-Mean: {mean}<br>-Standard deviation: {std}<br>"
fig.update_layout(title=f"Model Accuracy<br><span style='font-size: 12px'>Distribution of R<sup>2</sup> Values From Applying Each Model to Each Security</span>",
legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)'),
margin=dict(l=0, r=0, b=0),
xaxis_title="R<sup>2</sup>",
yaxis_title='Count',
height = 500,
bargap=0.1,
annotations=[
go.layout.Annotation(
text=annotation_text,
align='left',
showarrow=False,
xref='paper',
yref='paper',
x=1,
y=1,
bordercolor='black',
borderwidth=1,
bgcolor='white',
opacity=0.75
)]
)
fig.show()
def _get_factor_importance_pct(self, dataset_symbol, security_symbol, model, standardize):
"""
A helper method calculate how much influence each factor has on the target values
Input:
- dataset_symbol
The symbol of the dataset to use
- security_symbol
The symbol of the security used to fit the model
- model
The regression model to use
- standardize
A boolean to represent if the data should be standardized before applying dimentionality reduction
Returns the percentage of influence each factor has on target values, the associated factor names, and the accuracy score of the model.
"""
data = pd.DataFrame()
for factor in self.factors:
data[factor.name] = self.transformed_dataset_history[factor.name][dataset_symbol]
# Drop rows with NaN values
data = data.dropna(axis=0, how='any')
factor_names = data.columns
# Standardize data
if standardize:
if 0 in data.std().values:
return None, None, None
data = (data - data.mean()) / data.std()
# Drop columns that are just NaNs
data.dropna(axis=1, how='all', inplace=True)
if data.shape[1] <= 1:
return None, None, None
return_ = self.return_history[security_symbol].reindex(data.index)
data = pd.concat([data, return_], axis=1).dropna()
try:
model_ = model()
except TypeError as e:
raise e
model_.fit(data.iloc[:, :-1], data.iloc[:, -1])
coef = model_.coef_
if np.sum(abs(coef)) == 0:
factor_importance_pct = abs(coef)
else:
factor_importance_pct = abs(coef)/np.sum(abs(coef))
model_score = model_.score(data.iloc[:, :-1], data.iloc[:, -1])
return factor_importance_pct, factor_names, model_score
def run_ml_models(self, regression_models, classifier_models, negative_return_label=-1, positive_return_label=1):
"""
Trains machine learning models to predict the magnitude/direction of the next day given the factor values of the current day.
Input:
- model
Instance of the SKLearn classification model to use.
- negative_return_label
Label to use when there was a negative daily return
(-1 means the model will take 100% short exposure when it predicts a down day)
- positive_return_label
Label to use when there was a negative daily return
(1 means the model will take 100% long exposure when it predicts an up day)
Displays the results of the models, including equity curve, accuracy, and exposure.
"""
all_returns_by_model = {}
all_predictions_by_model = {}
scores_by_model = {}
computed_symbols_by_model = {}
models = regression_models + classifier_models
for model_idx, model in enumerate(models):
is_regression_model = model_idx < len(regression_models)
scores_by_model[model] = {'in-sample': np.array([]), 'out-of-sample': np.array([])}
computed_symbols_by_model[model] = []
all_returns_by_model[model] = pd.DataFrame()
all_predictions_by_model[model] = pd.DataFrame()
for equity_symbol_index, security_symbol in enumerate(self.security_symbols):
# Gather factor values
dataset_symbol = self.dataset_symbols[equity_symbol_index if self.linked_dataset else 0]
factor_values = pd.DataFrame()
for factor in self.factors:
factor_values[factor.name] = self.transformed_dataset_history[factor.name][dataset_symbol]
# Gather labels
label = self.labels[security_symbol].copy() #self.return_sub_history[security_symbol].copy()
if not is_regression_model:
label[label <= 0] = negative_return_label
label[label > 0] = positive_return_label
# Align time stamps (incase there are NaN values)
intersect_index = sorted(list(set(label.dropna().index).intersection(set(factor_values.dropna().index))))
factor_values = factor_values.reindex(intersect_index)
label = label.reindex(intersect_index)
x = factor_values.loc[:self.in_sample_end_date.date()]
if x.empty: continue
y = label.loc[label.index <= self.in_sample_end_date]
if y.empty: continue
# Fit model to in-sample data
model.fit(x, y)
# Run model on in-sample data
in_sample_predictions = pd.Series(model.predict(x), index=y.index)
if is_regression_model:
# Convert to binary predictions and labels
in_sample_predictions.loc[in_sample_predictions <= 0] = negative_return_label
in_sample_predictions.loc[in_sample_predictions > 0] = positive_return_label
y.loc[y <= 0] = negative_return_label
y.loc[y > 0] = positive_return_label
in_sample_score = (in_sample_predictions == y).mean()
else:
in_sample_score = model.score(x, y)
# Gather out-of-sample-data
x = factor_values.loc[factor_values.index > self.in_sample_end_date]
if x.empty: continue
y = label.loc[label.index > self.in_sample_end_date]
if y.empty: continue
# Run model on out of sample data
out_of_sample_predictions = pd.Series(model.predict(x), index=y.index)
out_of_sample_predictions.loc[out_of_sample_predictions <= 0] = negative_return_label
out_of_sample_predictions.loc[out_of_sample_predictions > 0] = positive_return_label
if is_regression_model:
y.loc[y <= 0] = negative_return_label
y.loc[y > 0] = positive_return_label
out_of_sample_score = (out_of_sample_predictions == y).mean()
else:
out_of_sample_score = model.score(x, y)
# Align predictions with security price
security_price_history = self.price_history[security_symbol].dropna()
in_sample_price_index = security_price_history[security_price_history.index <= self.in_sample_end_date].index
out_of_sample_price_index = security_price_history[security_price_history.index > self.in_sample_end_date].index
in_sample_predictions = in_sample_predictions.reindex(in_sample_price_index)
out_of_sample_predictions = out_of_sample_predictions.reindex(out_of_sample_price_index)
# Fill forward predictions
if self.return_prediction_period > 1:
in_sample_predictions = in_sample_predictions.fillna(method='ffill', limit=self.return_prediction_period - 1)
out_of_sample_predictions = out_of_sample_predictions.fillna(method='ffill', limit=self.return_prediction_period - 1)
in_and_out_sample_predictions = pd.concat([in_sample_predictions, out_of_sample_predictions])
all_predictions_by_model[model] = pd.concat([all_predictions_by_model[model], in_and_out_sample_predictions], axis=1, sort=False)
# Calculate prediction returns
security_return_history = security_price_history.pct_change(1).shift(-1).dropna()
model_returns = security_return_history * in_and_out_sample_predictions.fillna(0)
model_returns.name = security_symbol
all_returns_by_model[model] = pd.concat([all_returns_by_model[model], model_returns], axis=1, sort=False)
# Save results
scores_by_model[model]['in-sample'] = np.append(scores_by_model[model]['in-sample'], in_sample_score)
scores_by_model[model]['out-of-sample'] = np.append(scores_by_model[model]['out-of-sample'], out_of_sample_score)
computed_symbols_by_model[model].append(security_symbol)
# Plot equity curves
fig = go.Figure()
# -- Benchmark (Universe) returns
universe_returns = (self.price_history[self.security_symbols].pct_change() + 1).mean(axis=1)
universe_returns.iloc[0] = 1
universe_returns = universe_returns.cumprod()
bottom = min(universe_returns)
top = max(universe_returns)
# -- Model returns
for model in models:
model_returns = all_returns_by_model[model].reindex(universe_returns.index)
model_returns = (model_returns + 1).mean(axis=1).cumprod().shift(1)
model_returns.iloc[0] = 1
fig.add_trace(go.Scatter(x=model_returns.index, y = model_returns, name=f"{type(model).__name__} Model"))
bottom = min(bottom, min(model_returns))
top = max(top, max(model_returns))
fig.add_trace(go.Scatter(x=universe_returns.index, y = universe_returns, name='Benchmark'))
fig.add_shape(type="line",
x0=self.in_sample_end_date, y0=bottom, x1=self.in_sample_end_date, y1=top,
line=dict(color="Orange", width=2, dash="dot")
)
fig.add_trace(go.Scatter(
x=[self.in_sample_end_date], y=[top],
text=["In-sample "],
mode="text",
textposition="top left",
showlegend=False
))
fig.add_trace(go.Scatter(
x=[self.in_sample_end_date], y=[top],
text=[" Out-of-sample"],
mode="text",
textposition="top right",
showlegend=False
))
fig.update_layout(title_text=f"Daily Equity Curves<br><span style='font-size: 12px'>Equity Curves of Buying the Universe Constituents and Following the Model Predictions</span>",
margin=dict(l=0, r=0, b=0),
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
))
fig.update_xaxes(title="Date", range=[universe_returns.index[0], universe_returns.index[-1]])
fig.show()
# Plot daily prediction accuracy
fig = go.Figure()
for model_idx, model in enumerate(models):
all_predictions_by_model[model].columns = list(range(len(computed_symbols_by_model[model])))
all_correct_predictions = pd.DataFrame()
for i, column in enumerate(computed_symbols_by_model[model]):
predictions = all_predictions_by_model[model][i].dropna()[:-1]
returns = self.price_history[column].pct_change(1).shift(-1).reindex(predictions.index)
signed_returns = np.sign(returns)
signed_returns.loc[signed_returns == 0] = -1
correct_predictions = pd.Series(predictions.values == signed_returns.values, index=returns.index, name=column)
all_correct_predictions = pd.concat([all_correct_predictions, correct_predictions], axis=1, sort=False)
accuracy_per_day = all_correct_predictions.mean(axis=1)
fig.add_trace(go.Scatter(x=accuracy_per_day.index, y = accuracy_per_day.values, mode='markers', marker=dict(size=self.marker_size), name=type(model).__name__))
title_text = f"Daily Prediction Accuracy<br><span style='font-size: 12px'>"
title_text += f"The Proportion of Models That Had a Correct Prediction for Each Day"
fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)'), height=300)
title_text += "</span>"
fig.update_layout(title_text=title_text, margin=dict(l=0, r=0, b=0), yaxis_range=[-0.02,1.02])
fig.update_xaxes(range=[universe_returns.index[0], universe_returns.index[-1]])
fig.update_layout(legend=dict(yanchor="top",
y=0.99,
xanchor="left",
x=0.01,
bgcolor='rgba(255,255,255,0.5)'))
fig.update_xaxes(title="Date")
fig.add_shape(type="line",
x0=self.in_sample_end_date, y0=0, x1=self.in_sample_end_date, y1=1,
line=dict(color="Orange", width=2, dash="dot")
)
fig.add_trace(go.Scatter(
x=[self.in_sample_end_date], y=[0.9],
text=["In-sample "],
mode="text",
textposition="top left",
showlegend=False
))
fig.add_trace(go.Scatter(
x=[self.in_sample_end_date], y=[0.9],
text=[" Out-of-sample"],
mode="text",
textposition="top right",
showlegend=False
))
fig.show()
## Plot Long-Short Exposure
fig = go.Figure()
for model in models:
all_predictions = all_predictions_by_model[model].reindex(universe_returns.index).mean(axis=1).fillna(0)
fig.add_trace(go.Scatter(x=all_predictions.index, y = all_predictions.values, name=type(model).__name__))
fig.update_layout(title_text=f"Daily Mean Security Exposure<br><span style='font-size: 12px'>Mean Long-Short Exposure Across All of the Universe Constituents When Following the Model Predictions</span>",
margin=dict(l=0, r=0, b=0),
height=300,
yaxis_range=[-1.02,1.02],
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01,
bgcolor='rgba(255,255,255,0.5)'
))
fig.add_shape(type="line",
x0=self.in_sample_end_date, y0=-1, x1=self.in_sample_end_date, y1=1,
line=dict(color="Orange", width=2, dash="dot")
)
fig.add_trace(go.Scatter(
x=[self.in_sample_end_date], y=[0.8],
text=["In-sample "],
mode="text",
textposition="top left",
showlegend=False
))
fig.add_trace(go.Scatter(
x=[self.in_sample_end_date], y=[0.8],
text=[" Out-of-sample"],
mode="text",
textposition="top right",
showlegend=False
))
fig.update_xaxes(title="Date", range=[universe_returns.index[0], universe_returns.index[-1]])
fig.show()
# Display model accuracy scores
for period in ['in-sample', 'out-of-sample']:
capitalized_period = 'In-Sample' if period == 'in-sample' else 'Out-of-Sample'
# Generate the accuracy (variance explained) score plot
fig = go.Figure()
output_text = ""
for i, model in enumerate(models):
scores = scores_by_model[model][period]
fig.add_trace(go.Histogram(x=scores.flatten(),name=type(model).__name__, xbins=dict(
start=0.,
end=1.,
size=0.05),
autobinx=False
))
fig.update_layout(title=f"Daily Model Accuracy {capitalized_period}<br><span style='font-size: 12px'>Distribution of {capitalized_period} Accuracy Values From Applying Each Model to Each Security</span>",
legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)'),
margin=dict(l=0, r=0, b=0),
xaxis_title="Accuracy",
yaxis_title='Count',
height = 250,
bargap=0.1,
)
fig.update_xaxes(range=[0, 1])
fig.show()
# Display model comparison DataFrame
result_df = pd.DataFrame(columns=pd.MultiIndex.from_tuples([('In-Sample Accuracy Distribution','Mean'), ('In-Sample Accuracy Distribution','Standard Deviation'),
('Out-of-Sample Accuracy Distribution','Mean'), ('Out-of-Sample Accuracy Distribution','Standard Deviation')]))
for model in models:
model_name = type(model).__name__
for period in ['in-sample', 'out-of-sample']:
capitalized_period = 'In-Sample' if period == 'in-sample' else 'Out-of-Sample'
score_mean = scores_by_model[model][period].mean()
score_std_dev = scores_by_model[model][period].std()
result_df.loc[model_name, (capitalized_period + ' Accuracy Distribution', 'Mean')] = '{:,.4f}'.format(score_mean)
result_df.loc[model_name, (capitalized_period + ' Accuracy Distribution', 'Standard Deviation')] = '{:,.4f}'.format(score_std_dev)
display(result_df)
def _get_factor_rankings(self):
"""
Gets the factor rankings for each security.
If there are more than one factor, for each security, we take the mean rankings across all the factors.
"""
if len(self.factors) == 1:
daily_ranks = self.transformed_dataset_history.copy().rank(axis=1, method='first')
daily_ranks.columns = self.security_symbols
else:
daily_ranks = pd.DataFrame()
for equity_symbol_index, security_symbol in enumerate(self.security_symbols):
dataset_symbol = self.dataset_symbols[equity_symbol_index if self.linked_dataset else 0]
daily_ranks[security_symbol] = self.transformed_dataset_history.iloc[:, self.transformed_dataset_history.columns.get_level_values(1)==dataset_symbol].mean(axis=1)
daily_ranks = daily_ranks.rank(axis=1, method='first')
return daily_ranks
def run_ranking_algorithm(self, quantiles):
"""
Breaks the universe into quantiles based on the ranking of the sole factor then produces equity curve plots of each quantile.
A long-short portfolio is also presented using the first and last quantiles.
If there are more than one factor, for each security, we take the mean rankings across all the factors.
Input:
- quantiles
Number of quantiles to break the universe into
"""
# Plot benchmark
fig = go.Figure()
daily_returns = self.price_history.pct_change().shift(-1)
daily_returns.columns = self.security_symbols
benchmark_equity_curve = (daily_returns.mean(axis=1) + 1).cumprod()
fig.add_trace(go.Scatter(x=benchmark_equity_curve.index, y = benchmark_equity_curve.values, name='Benchmark'))
# Plot each quantile
for selected_quantile in range(1, quantiles+1):
daily_ranks = self._get_factor_rankings()
num_ranks_per_day = daily_ranks.max(axis=1)
def rank_to_exposure(row):
if isinstance(row.name, np.datetime64): # The first row is called 2 times when using `apply`
return row
num_securities = num_ranks_per_day.loc[row.name]
security_per_quintile = int(num_securities / quantiles)
long_start = security_per_quintile * (selected_quantile-1)
long_end = security_per_quintile * selected_quantile
row[(row < long_start) | (row > long_end)] = np.nan
row[~row.isna()] = 1
return row
exposures = daily_ranks.apply(rank_to_exposure, axis=1)
portfolio_returns = exposures * daily_returns
portfolio_equity_curve = (portfolio_returns.mean(axis=1) + 1).cumprod()
fig.add_trace(go.Scatter(x=portfolio_equity_curve.index, y = portfolio_equity_curve.values, name=f'Q{selected_quantile}'))
# Plot Qn-Q1
daily_ranks = self._get_factor_rankings()
num_ranks_per_day = daily_ranks.max(axis=1)
def rank_to_exposure(row):
if isinstance(row.name, np.datetime64): # The first row is called 2 times when using `apply`
return row
num_securities = num_ranks_per_day.loc[row.name]
short_threshold = int(num_securities / quantiles)
long_threshold = num_securities - short_threshold + 1
row[(row > short_threshold) & (row < long_threshold)] = np.nan
row[row <= short_threshold] = -1
row[row >= long_threshold] = 1
return row
exposures = daily_ranks.apply(rank_to_exposure, axis=1)
portfolio_returns = exposures * daily_returns
portfolio_equity_curve = (portfolio_returns.mean(axis=1) + 1).cumprod()
fig.add_trace(go.Scatter(x=portfolio_equity_curve.index, y = portfolio_equity_curve.values, name=f'Q{quantiles}-Q1'))
# Update figure layout
fig.update_layout(title_text=f"Daily Equity Curves<br><span style='font-size: 12px'>Equity Curves of Buying the Universe Constituents and Forming a Portfolio Based on the Factor Rankings</span>",
margin=dict(l=0, r=0, b=0),
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
))
fig.update_xaxes(title="Date", range=[portfolio_equity_curve.index[0], portfolio_equity_curve.index[-1]])
fig.show()
def show_video(url_code):
display(IFrame(src=f"https://www.youtube.com/embed/{url_code}?rel=0&controls=0&showinfo=0" , width="560", height="315"))# region imports
from AlgorithmImports import *
# endregion
class ETFUniverse:
"""
A class to create a universe of equities from the constituents of an ETF
"""
def __init__(self, etf_ticker, universe_date):
"""
Input:
- etf_ticker
Ticker of the ETF
- universe_date
The date to gather the constituents of the ETF
"""
self.etf_ticker = etf_ticker
self.universe_date = universe_date
def get_symbols(self, qb):
"""
Subscribes to the universe constituents and returns a list of symbols and their timezone
Input:
- qb
The QuantBook instance inside the DatasetAnalyzer
Returns a list of symbols and their timezone
"""
etf_symbols = self._get_etf_constituents(qb, self.etf_ticker, self.universe_date)
security_timezone = None
security_symbols = []
# Subscribe to the universe price data
for symbol in etf_symbols:
security = qb.AddSecurity(symbol, Resolution.Daily)
security_timezone = security.Exchange.TimeZone
security_symbols.append(symbol)
return security_symbols, security_timezone
def _get_etf_constituents(self, qb, etf_ticker, date):
"""
A helper method to retreive the ETF constituents on a given date
Input:
- qb
The QuantBook instance inside the DatasetAnalyzer
- etf_ticker
Ticker of the ETF
- universe_date
The date to gather the constituents of the ETF
Returns a list of symbols
"""
date_str = date.strftime("%Y%m%d")
filename = f"/data/equity/usa/universes/etf/{etf_ticker.lower()}/{date_str}.csv"
try:
df = pd.read_csv(filename)
except:
print(f'Error: The ETF universe file does not exist')
return
security_ids = df[df.columns[1]].values
symbols = [qb.Symbol(security_id) for security_id in security_ids]
return symbols
from AlgorithmImports import *
from pytz import timezone
class Factor:
"""
A class to define factors from the dataset under analysis
"""
def __init__(self, name, printable_name, data_type, value_function):
"""
Input:
- name
Name of the factor as represented in the dataframe column of a history request
- printable_name
The name of the factor to be used when mentioning in plots and tables
- data_type
The type of data ('discrete' or 'continuous')
- value_function
User-defined value function to translate the raw factor values
"""
self.name = name
self.printable_name = printable_name
self.data_type = data_type
self.value_function = value_function
class OtherDatasetFactor:
"""
A class to define factors from other datasets (for the inter-dataset correlation analysis)
"""
def __init__(self, factor_name, data_type, link, sparse, value_function):
"""
Input:
- factor_name
Name of the factor as represented in the dataframe column of a history request
- data_type
The type of data ('discrete' or 'continuous')
- link
The linked (SecurityType.Equity if linked; 'REG' for unlinked Regalytics dataset)
- value_function
User-defined value function to translate the raw factor values
"""
self.factor_name = factor_name
self.data_type = data_type
self.link = link
self.sparse = sparse
self.value_function = value_function
class DemoCorrelationDatasets:
def __init__(self):
self.other_dataset_factor_by_class = {
QuiverWallStreetBets: [OtherDatasetFactor('rank', 'discrete', SecurityType.Equity, True, self.value_function),
OtherDatasetFactor('sentiment', 'continuous', SecurityType.Equity, True, self.value_function)],
QuiverQuantTwitterFollowers: [OtherDatasetFactor('followers', 'discrete', SecurityType.Equity, True, self.value_function)],
USTreasuryYieldCurveRate: [OtherDatasetFactor('onemonth', 'continuous', "USTYCR", False, self.value_function)]
}
def value_function(self, other_dataset_history, this_dataset_index, other_dataset_timezone, this_dataset_timezone):
"""
This function transforms the dataset's raw data into a numerical value and aligns
it with the timestamps of the securities in the universe.
In this case, we just return the raw values since all the factors are processed data.
"""
# Match timezones
if other_dataset_timezone != this_dataset_timezone:
match_timezones_func = lambda time: time.replace(tzinfo=timezone(str(other_dataset_timezone))).astimezone(timezone(str(this_dataset_timezone)))
other_dataset_history.index = other_dataset_history.index.map(match_timezones_func)
result_df = pd.DataFrame(columns=other_dataset_history.columns)
# If tz-aware index, remove tz-aware (so we can compare the indices in the snippet that comes after)
if isinstance(other_dataset_history.index, pd.Index):
new_index = pd.to_datetime(other_dataset_history.index, utc=True).tz_convert(None)
other_dataset_history.index = new_index
# Move dataset index forward by 1 day (since we open our trade on the day after we receive the factor value)
other_dataset_history.index = other_dataset_history.index + timedelta(1)
# Move forward index of other_dataset_history if its index elements don't align with `this_dataset_index`
for i in other_dataset_history.index:
adjusted_index_options = this_dataset_index[this_dataset_index >= i]
if len(adjusted_index_options) == 0:
continue
adjusted_index = adjusted_index_options[0]
row = other_dataset_history.loc[i]
row.name = adjusted_index
result_df.loc[adjusted_index] = row
# Drop duplicate indices
result_df = result_df[~result_df.index.duplicated(keep='last')]
# Align factor values with this_dataset_index
result_df = result_df.reindex(this_dataset_index)
# Drop rows and columns that have only NaN values
result_df = result_df.dropna(axis=0, how='all').dropna(axis=1, how='all')
return result_df
#region imports
from AlgorithmImports import *
#endregion
class UglyBrownMule(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 3, 21) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
# self.AddEquity("SPY", Resolution.Minute)
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
# if not self.Portfolio.Invested:
# self.SetHoldings("SPY", 1)