| Overall Statistics |
|
Total Orders 389 Average Win 0.32% Average Loss -0.28% Compounding Annual Return 55.894% Drawdown 5.900% Expectancy 0.209 Start Equity 1000000000 End Equity 1118417299.61 Net Profit 11.842% Sharpe Ratio 2.144 Sortino Ratio 3.114 Probabilistic Sharpe Ratio 72.304% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 1.16 Alpha 0.319 Beta -0.174 Annual Standard Deviation 0.162 Annual Variance 0.026 Information Ratio 1.956 Tracking Error 0.263 Treynor Ratio -2.005 Total Fees $3920940.54 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset XSD TFYQNA7D69UT Portfolio Turnover 89.40% |
# region imports
from AlgorithmImports import *
# endregion
from significant_correlations_for_lag import significant_correlations_for_lag
def get_etfs_with_significant_correlation(qb, fn, df_etf_industry, df_ret, df_fund_flow):
dfsc = []
for lag in [1,2,7,14,30]:
df = significant_correlations_for_lag(lag, df_etf_industry, df_ret, df_fund_flow)
plt.show()
display(df)
dfsc.append(df)
df_etfs_significant_correlation = pd.concat(dfsc).sort_values(by=['ticker', 'lag']).sort_values(by='correlation')
str = df_etfs_significant_correlation.to_csv()
qb.object_store.save(fn, str)
return df_etfs_significant_correlation# region imports
from AlgorithmImports import *
# endregion
# Your New Python File
from tqdm import tqdm
import numpy as np
from scipy.stats import pearsonr
from plot_empirical_distribution import plot_empirical_distribution
def significant_correlations_for_lag(lag, df_etf_industry, df_ret, df_fund_flow):
# Calculate correlations and p-values
tickers = []
correlations = []
p_values = []
for ticker in tqdm(df_etf_industry.symbol):
rets = df_ret[df_ret.TICKER==ticker]
flows = df_fund_flow[df_fund_flow.Ticker == ticker].reset_index(drop=True)
flows = flows[flows.Date.isin(rets.DATE)]
rets = rets[rets.DATE.isin(flows.Date)]
flows_rets = pd.merge(rets, flows, left_on = 'DATE', right_on = 'Date')
F = flows_rets.NetDailyFlow.values[0:-lag]
R = flows_rets.TRAILING_1D_RET.values[lag:]
if np.any(np.isnan(F)) or np.any(np.isnan(R)):
continue
if len(F) == 0:
continue
rho, p_value = pearsonr(F, R)
tickers.append(ticker)
correlations.append(rho)
p_values.append(p_value)
# Determine significance
significance_level = 0.05
significant_correlations = [(ticker, lag, rho, p) for ticker, rho, p in zip(tickers, correlations, p_values) if p < significance_level]
# Output results
correlations.sort()
print(f"Number of significant correlations (p < {significance_level}):", len(significant_correlations), 'out of', len(correlations))
A = np.array([x for x in correlations if np.isfinite(x)])
plot_empirical_distribution(A, f'ETF correlation of 0D flow with {lag}D return', 'Correlation')
df = pd.DataFrame(significant_correlations, columns = ['ticker', 'lag', 'correlation', 'p-value']).sort_values('correlation')
return df# region imports
from AlgorithmImports import *
# endregion
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm, gaussian_kde
def plot_empirical_distribution(A, title, quantity):
# Plot the density histogram
plt.figure(figsize=(8, 6))
count, bins, ignored = plt.hist(A, bins=30, density=True, alpha=0.5, color='g', label='Empirical PDF')
# Plot Gaussian KDE
kde = gaussian_kde(A)
x = np.linspace(min(bins), max(bins), 1000)
plt.plot(x, kde(x), color='blue', label='Gaussian KDE')
# Plot normal distribution
mu, std = norm.fit(A)
p = norm.pdf(x, mu, std)
plt.plot(x, p, color='red', linewidth=2, label='Normal Distribution')
# Add labels and legend
plt.title(title)
plt.xlabel(quantity)
plt.ylabel('Density')
plt.legend(loc='upper right')
# Show plot
plt.show()
# region imports
from AlgorithmImports import *
from FlowStrat import FlowStrat, stop_percentage_of
from CalmarAlgorithm import CalmarAlgorithm
from io import StringIO
from fund_flow import FundFlow
# endregion
class ETFGFundFlowsAlgorithm(CalmarAlgorithm):
def initialize(self):
self.df_flow_ret_corr = None
self.get_flow_ret_corr()
#self.strategy = FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5
self.strategy = FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT
# historical_cutoff for universe calibration was 20220701, so don't trade before that to avoid future information
if True: # In-sample period
self.set_start_date(2022, 7, 1)
self.set_end_date(2022, 10, 1)
else:
self.set_start_date(2024, 6, 1) # Out of sample period
self.set_end_date(2024, 9, 1)
self.starting_cash = 1_000_000_000
self.set_cash(self.starting_cash)
self.set_warm_up(timedelta(1))
self.set_security_initializer(BrokerageModelSecurityInitializer(self.brokerage_model,
FuncSecuritySeeder(self.get_last_known_prices)))
self.universe_settings.resolution = Resolution.DAILY
self._universe = self.add_universe(FundFlow, 'FundFlow', Resolution.DAILY, self._select_assets)
spy = Symbol.create('SPY', SecurityType.EQUITY, Market.USA)
self.schedule.on(
self.date_rules.every_day(spy),
self.time_rules.before_market_open(spy, 10), self._rebalance)
self.get_etf_industry()
self.restrict_to = self.df_flow_ret_corr.Ticker.unique()
self.set_strategy_name()
self.set_stop_percentage(self.strategy)
def set_strategy_name(self):
ss = str(self.strategy)
sd = str(self.start_date)[0:10].replace('-','')
se = str(self.end_date)[0:10].replace('-','')
self.set_name(f"{ss} {sd}-{se}")
def _select_assets(self, data):
current_date = self.Time.date()
self.correlations = {etf.symbol.value: self.get_correlation(etf.symbol.value, current_date) for etf in data}
self.etfs = list(sorted([etf for etf in data
if etf.symbol.value in self.restrict_to \
and self.correlations[etf.symbol.value] != 0 \
and etf.nav > 0], key=lambda etf: etf.nav))
if self.strategy in [FlowStrat.TOP10_NAV_LONG_FLOWS_NEG, FlowStrat.TOP10_NAV_LONG_FLOWS_NEG_STOP5]:
self.etfs = [etf for etf in self.etfs if etf.fund_flow < 0]
elif self.strategy in [FlowStrat.TOP10_NAV_LONG_FLOWS_POS_STOP5, FlowStrat.TOP10_NAV_LONG_FLOWS_POS]:
self.etfs = [etf for etf in self.etfs if etf.fund_flow > 0]
elif self.strategy in [FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5,
FlowStrat.TOP10_NAV_LS_FLOWS_NEGPOS_STOP5,
FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT,
FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5]:
top_5_positive_flow = [etf for etf in self.etfs if etf.fund_flow > 0][-5:]
top_5_negative_flow = [etf for etf in self.etfs if etf.fund_flow < 0][-5:]
self.etfs = top_5_positive_flow + top_5_negative_flow
self.etfs = self.etfs[-10:]
symbols = [etf.symbol for etf in self.etfs]
return symbols
def _rebalance(self):
if not self._universe.selected:
return
n = len(self._universe.selected)
symbols = [symbol for symbol in self._universe.selected if self.securities[symbol].price]
if not symbols:
return
if self.strategy in [FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5, FlowStrat.TOP10_NAV_LS_FLOWS_NEGPOS_STOP5]:
return self._rebalance_TOP10_NAV_LS_FLOWS_POSNEG_STOP5()
elif self.strategy in [FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT,
FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5]:
return self._rebalance_TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT()
else:
weight = 0.9 / len(symbols)
for symbol in symbols:
self.set_holdings(symbol, weight, True)
def _rebalance_TOP10_NAV_LS_FLOWS_POSNEG_STOP5(self):
top_5_positive_flow = [etf for etf in self.etfs if etf.fund_flow > 0]
top_5_negative_flow = [etf for etf in self.etfs if etf.fund_flow < 0]
if self.strategy == FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5:
pos, neg = top_5_positive_flow, top_5_negative_flow
else:
neg, pos = top_5_positive_flow, top_5_negative_flow
weight = 0.9 / len(pos)
for etf in pos:
self.set_holdings(etf.symbol, weight, True)
weight = 0.9 / len(neg)
for etf in neg:
self.set_holdings(etf.symbol, -weight, True)
def _rebalance_TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT(self):
current_date = self.Time.date()
top_5_positive_flow = [etf for etf in self.etfs if etf.fund_flow > 0]
top_5_negative_flow = [etf for etf in self.etfs if etf.fund_flow < 0]
# Implement your rebalancing logic here
longs = []
shorts = []
for etf in top_5_positive_flow:
rho = self.correlations[etf.symbol.value]
if rho > 0: # and flow > 0
longs.append((rho, etf))
else: # rho < 0 and flow > 0
shorts.append((-rho, etf))
for etf in top_5_negative_flow:
rho = self.correlations[etf.symbol.value]
if rho > 0: # and flow < 0
shorts.append((rho, etf))
else: # rho < 0 and flow < 0
longs.append((-rho, etf))
targets = []
sum_long_rho = sum([rho for rho,etf in longs])
if sum_long_rho > 0:
for rho, etf in longs:
weight = rho/sum_long_rho
targets.append(PortfolioTarget(etf.symbol, weight))
sum_short_rho = sum([rho for rho,etf in shorts])
if sum_short_rho > 0:
for rho, etf in shorts:
weight = -rho/sum_short_rho
targets.append(PortfolioTarget(etf.symbol, weight))
if targets: # if no information just coast
self.set_holdings(targets, liquidate_existing_holdings=True)
def on_data(self, data):
super().on_data(data)
self.set_stop_percentage(data)
def set_stop_percentage(self, data):
self.stop_loss_percentage = stop_percentage_of(self.strategy)
if self.stop_loss_percentage is None:
return
for symbol, security in self.securities.items():
if security.invested:
pm = -1. if security.holdings.quantity > 0 else 1.
stop_price = security.price * (1 + pm * self.stop_loss_percentage)
self.stop_market_order(symbol, -security.holdings.quantity, stop_price)
def get_etf_industry(self):
fn = 'ericson_df_etf_industry.csv'
S = self.object_store.read(fn)
self.df_etf_industry = pd.read_csv(StringIO(S))
def get_flow_ret_corr(self):
if self.df_flow_ret_corr is not None:
return self.df_flow_ret_corr
else:
S = self.object_store.read('dfrho_ericson.csv')
self.df_flow_ret_corr = pd.read_csv(StringIO(S))
self.df_flow_ret_corr.Date = self.df_flow_ret_corr.Date.apply(lambda date_int: datetime.strptime(str(date_int), '%Y-%m-%d').date())
self.debug(f"RHO goes to {str(self.df_flow_ret_corr.iloc[-1].Date)}")
return self.df_flow_ret_corr
def get_correlation(self, sym, current_date):
rho = self.df_flow_ret_corr[(self.df_flow_ret_corr.Date == current_date) & (self.df_flow_ret_corr.Ticker == sym)]
if len(rho):
rho = rho.iloc[0].CorrRF
else:
rho = 0
return rho
def __init__(self):
super().__init__()
# region imports
from AlgorithmImports import *
# endregion
class FundFlow(PythonData):
def get_source(self, config, date, is_live):
return SubscriptionDataSource(
f"etfg_fundflow_us/{date.year}/{date.strftime('%Y%m%d')}_fundflow_v2.csv",
SubscriptionTransportMedium.OBJECT_STORE,
FileFormat.CSV
)
def reader(self, config, line, date, is_live):
data = [x for x in line.split(',') if x]
if len(data) < 6:
return None
# Check if QC Cloud has this asset.
if ' ' in data[2]:
return None
security_id = SecurityIdentifier.generate_equity(data[2], Market.USA, True, None, date)
if security_id.date.year < 1998:
return None
ff = FundFlow()
ff.symbol = Symbol(security_id, data[2])
ff.shares_outstanding = float(data[3])
ff.nav = float(data[4])
ff.fund_flow = float(data[5])
ff.time = datetime.strptime(data[0], '%Y-%m-%d')
ff.end_time = ff.time + timedelta(1)
return ff
# region imports
from AlgorithmImports import *
# endregion
from list_csv_files_in_object_store import list_csv_files_in_object_store
from io import StringIO
def get_df_fund_flows(qb, symbols, fn):
fns = list_csv_files_in_object_store(qb, 'etfg_fundflow_us/')
flow_dfs = []
for fn in fns:
flows = qb.object_store.read(fn)
df = pd.read_csv(StringIO(flows), header=None)
df.columns=['ETFGDate', 'Date', 'Ticker', 'FundSharesOutstanding', 'NAV', 'NetDailyFlow']
flow_dfs.append(df)
df_fund_flow = pd.concat(flow_dfs)
df_fund_flow = df_fund_flow[df_fund_flow.Ticker.isin(symbols)]
fn = 'df_fund_flow_ericson.csv'
sdf = df_fund_flow.to_csv()
qb.object_store.save(fn, sdf)
return df_fund_flow# region imports
from AlgorithmImports import *
from collections import defaultdict
# endregion
class CalmarAlgorithm(QCAlgorithm):
def __init__(self):
super().__init__()
self.debug("Calmar Init!!")
self.opening_prices = {}
self.equity_over_time = defaultdict(float)
def on_data(self, data):
self.equity_over_time[self.time] = self.portfolio.total_portfolio_value
def on_end_of_algorithm(self):
# Cancel all open orders before liquidation
self.transactions.cancel_open_orders()
self.debug("All open orders cancelled.")
# Liquidate all positions
self.liquidate()
self.debug("All positions have been liquidated.")
# Calculate the Calmar Ratio
annualized_return = self.calculate_annualized_return()
max_drawdown = self.calculate_max_drawdown()
calmar_ratio = annualized_return / abs(max_drawdown)
self.set_runtime_statistic("Calmar Ratio", round(calmar_ratio, 2))
self.debug(f'Calmar Ratio: {calmar_ratio}')
def calculate_annualized_return(self) -> float:
starting_capital = self.starting_cash
final_capital = self.portfolio.total_portfolio_value
years = (self.end_date - self.start_date).days / 365.25
return ((final_capital / starting_capital) ** (1 / years)) - 1
def calculate_max_drawdown(self) -> float:
peak = -float('inf')
max_drawdown = 0.001
for time, value in self.equity_over_time.items():
if value > peak:
peak = value
drawdown = (peak - value) / peak
if drawdown > max_drawdown:
max_drawdown = drawdown
return max_drawdown
# region imports
from AlgorithmImports import *
# endregion
def list_csv_files_in_object_store(qb, folder: str) -> None:
# Access the ObjectStore
object_store = qb.object_store
# Get all keys (file paths) in the ObjectStore
all_keys = list(object_store.keys)
# Filter keys to get files in the specified folder
folder_files = [key for key in all_keys if key.startswith(folder) and key.endswith('.csv')]
return folder_files# region imports
from AlgorithmImports import *
# endregion
from tqdm import tqdm
def aggregate_flows_and_returns_at_sector_level(etf_sector, df_ret, df_fund_flow):
df_ret.columns = ['Date', 'Ticker', 'Ret2D', 'Close']
df_ret_flow = pd.merge(df_fund_flow, df_ret, on = ['Date', 'Ticker'])
sectors = []
for key in etf_sector:
sectors.extend([sector for sector in etf_sector[key]])
sectors = list(set(sectors))
sectors.sort()
df_sector_ret_flow = []
for ticker in tqdm(df_terms_correlated.symbol):
for sector, factor in etf_sector[ticker].items():
flows = df_ret_flow[df_ret_flow.Ticker == ticker]
for i, row in flows.iterrows():
df_sector_ret_flow.append((row.Date,
sector,
row.FundSharesOutstanding * factor,
row.NAV,
row.NetDailyFlow * factor,
row.Ret2D,
row.Close))
df_sector_ret_flow = pd.DataFrame(df_sector_ret_flow,
columns = ['Date', 'Sector', 'FundSharesOutstanding', 'NAV', 'NetDailyFlow', 'Ret2D', 'Close'])
rows = []
for sector in tqdm(sectors):
df = df_sector_ret_flow[df_sector_ret_flow.Sector == sector].copy()
dates = df.Date.unique()
for date in dates:
df1 = df[df.Date == date].copy()
df1['sharenav'] = df1['FundSharesOutstanding'] * df1.NAV
sumsharenav = df1['sharenav'].sum()
df1['Ret'] = df1.Ret2D * df1['sharenav'] / sumsharenav
df1 = df1[['Date', 'Sector', 'NetDailyFlow', 'Ret']]
df2 = df1.groupby(['Date', 'Sector']).agg({
'NetDailyFlow': 'sum',
'Ret': 'sum'}).reset_index()
rows.extend(df2.values.tolist())
df_sector_flow = pd.DataFrame(rows, columns = ['Date', 'Sector', 'Flow', 'Ret'])
return df_sector_flow
# region imports
from AlgorithmImports import *
# endregion
from tqdm import tqdm
def aggregate_flows_and_returns_at_country_level(etf_country, df_ret, df_fund_flow, df_terms_correlated):
df_ret.columns = ['Date', 'Ticker', 'Ret2D', 'Close']
df_ret_flow = pd.merge(df_fund_flow, df_ret, on = ['Date', 'Ticker'])
countries = []
for key in etf_country:
countries.extend([country for country in etf_country[key]])
countries = list(set(countries))
countries.sort()
df_country_ret_flow = []
for ticker in tqdm(df_terms_correlated.symbol):
for country, factor in etf_country[ticker].items():
flows = df_ret_flow[df_ret_flow.Ticker == ticker]
for i, row in flows.iterrows():
df_country_ret_flow.append((row.Date,
country,
row.FundSharesOutstanding * factor,
row.NAV,
row.NetDailyFlow * factor,
row.Ret2D,
row.Close))
df_country_ret_flow = pd.DataFrame(df_country_ret_flow,
columns = ['Date', 'Country', 'FundSharesOutstanding', 'NAV', 'NetDailyFlow', 'Ret2D', 'Close'])
rows = []
for country in tqdm(countries):
df = df_country_ret_flow[df_country_ret_flow.Country == country].copy()
dates = df.Date.unique()
for date in dates:
df1 = df[df.Date == date].copy()
df1['sharenav'] = df1['FundSharesOutstanding'] * df1.NAV
sumsharenav = df1['sharenav'].sum()
df1['Ret'] = df1.Ret2D * df1['sharenav'] / sumsharenav
df1 = df1[['Date', 'Country', 'NetDailyFlow', 'Ret']]
df2 = df1.groupby(['Date', 'Country']).agg({
'NetDailyFlow': 'sum',
'Ret': 'sum'}).reset_index()
rows.extend(df2.values.tolist())
df_country_flow = pd.DataFrame(rows, columns = ['Date', 'Country', 'Flow', 'Ret'])
return df_country_flow
# region imports
from AlgorithmImports import *
# endregion
def get_df_ret_for_universe(qb, symbols, fn):
ret_2022 = qb.object_store.read('etf_returns_2022.txt')
df_ret_2022 = pd.read_csv(StringIO(ret_2022), sep='|')
ret_2023 = qb.object_store.read('etf_returns_2023.txt')
df_ret_2023 = pd.read_csv(StringIO(ret_2023), sep='|')
ret_2024 = qb.object_store.read('etf_returns_2024.txt')
df_ret_2024 = pd.read_csv(StringIO(ret_2024), sep='|')
df_ret = pd.concat([df_ret_2022, df_ret_2023, df_ret_2024])
df_ret = df_ret[df_ret.TICKER.isin(symbols)]
rets = df_ret.to_csv()
qb.object_store.save(fn, rets)
return df_ret# region imports
from AlgorithmImports import *
# endregion
def calculate_corr(df, window, F, R):
corr_values = [0] * window # Initialize with NaNs for the first 'window' rows
for i in range(window, len(df)):
corr = df[F][i-window:i].corr(df[R][i-window+1:i+1])
corr = corr if np.isfinite(corr) else 0
corr_values.append(corr)
df['CorrRF'] = corr_values
return df# region imports
from AlgorithmImports import *
# endregion
def split_dataframe_by_ticker(df):
return [group for _, group in df.groupby('Ticker')]
# region imports
from AlgorithmImports import *
from io import StringIO
from etf_industry import ETFIndustry
# endregion
def get_tickers_in_scope(qb, fn):
"""
* Get ETF industry data
* Exclude smaller, less relevant ETFs, such as the bottom 10% by NAV. This approach helps narrow the focus to more significant and impactful data.
* Exclude funds unlikely to have predictive value for institutional flows (e.g., thematic or highly leveraged ETFs).
"""
etf_industry = qb.add_data(ETFIndustry, 'ETFIndustry').symbol
df_etf_industry = qb.history(etf_industry, datetime(2022, 1, 2), datetime(2022, 1, 4), Resolution.DAILY)
df_etf_industry = df_etf_industry[df_etf_industry.asset_class == 'Equity'].reset_index()
df_etf_industry = df_etf_industry[["symbol", "asset_class","aum","avg_daily_trading_volume","bid_ask_spread","category","currency_exposure",
"description","development_class","focus","geographic_exposure","inception_date","industry_exposure","industry_group_exposure",
"is_active","is_etn","is_levered","issuer","lead_market_maker","levered_amount","listing_exchange","management_fee","maturity_exposure",
"net_expenses","num_holdings","options_available","options_volume","other_expenses","portfolio_manager","primary_benchmark","put_call_ratio",
"put_volume","region","sector_exposure","short_interest","subadvisor","subindustry_exposure","tax_classification","total_expenses",
"transfer_agent","trustee"]]
df_etf_industry = df_etf_industry[df_etf_industry.avg_daily_trading_volume.notna()]
df_etf_industry = df_etf_industry[["symbol", "asset_class","aum","avg_daily_trading_volume","bid_ask_spread","category","currency_exposure","description",
"development_class","focus","geographic_exposure","inception_date","industry_group_exposure","is_etn","levered_amount","management_fee",
"maturity_exposure","net_expenses","num_holdings","options_available","options_volume","other_expenses","primary_benchmark",
"put_call_ratio","put_volume","region","sector_exposure","short_interest","subindustry_exposure","total_expenses"]]
df_etf_industry = df_etf_industry[df_etf_industry["options_available"].notna()]
df_etf_industry = df_etf_industry[["symbol","aum","avg_daily_trading_volume","bid_ask_spread","category","currency_exposure","description",
"development_class","focus","geographic_exposure","industry_group_exposure","is_etn","levered_amount","management_fee",
"maturity_exposure","net_expenses","num_holdings","options_volume","other_expenses","primary_benchmark","put_call_ratio",
"put_volume","region","sector_exposure","short_interest","subindustry_exposure","total_expenses"]]
df_etf_industry = df_etf_industry[df_etf_industry.put_call_ratio.notna()]
### Exclude bottom 10% by NAV
aum = df_etf_industry['aum'].values
aum.sort()
pct10 = aum[int(len(aum)*.1)]
aum = aum[int(len(aum)*.1):]
plt.plot(aum)
df_etf_industry = df_etf_industry[df_etf_industry.aum > pct10]
df_etf_industry = df_etf_industry.sort_values(by='aum').reset_index()
### Exclude thematic or highly leveraged ETFs
df_etf_industry = df_etf_industry[df_etf_industry.levered_amount.isnull()]
syms = df_etf_industry.to_csv()
qb.object_store.save(fn, syms)
return df_etf_industry
# region imports
from AlgorithmImports import *
# endregion
class ETFReturn(PythonData):
def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live: bool):
return SubscriptionDataSource(
f"etf_returns_{date.year}.txt",
SubscriptionTransportMedium.OBJECT_STORE,
FileFormat.CSV
)
def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live: bool):
if not line[0].isdigit():
return None
data = [x for x in line.split('|') if x]
if len(data) < 4:
return None
# Check if QC Cloud has this asset.
security_id = SecurityIdentifier.generate_equity(data[1], Market.USA, True, None, date)
if security_id.date.year < 1998:
return None
etf = ETFReturn()
etf.symbol = Symbol(security_id, data[1])
etf.trailing_1d_return = float(data[2])
etf.value = float(data[3])
etf.end_time = datetime.strptime(data[0], '%Y%m%d')
return etf
# region imports
from AlgorithmImports import *
# endregion
import csv
class ETFIndustry(PythonData):
def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live: bool):
return SubscriptionDataSource(
f"etfg_industry_us/{date.year}/{date.strftime('%Y%m%d')}_industries_v2.csv",
SubscriptionTransportMedium.OBJECT_STORE,
FileFormat.CSV
)
def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live: bool):
data = list(csv.reader(line.splitlines()))[0]
# Check if QC Cloud has this asset.
if ' ' in data[2]:
return None
security_id = SecurityIdentifier.generate_equity(data[2], Market.USA, True, None, date)
if security_id.date.year < 1998:
return None
industry = ETFIndustry()
industry.time = datetime.strptime(data[0], '%Y-%m-%d')
industry.end_time = industry.time + timedelta(1)
industry.symbol = Symbol(security_id, data[2])
industry.issuer = data[3] if data[3] else np.nan
industry.description = data[4] if data[4] else np.nan
industry.inception_date = datetime.strptime(data[5], '%Y-%m-%d') if data[5] else np.nan
industry.primary_benchmark = data[6] if data[6] else np.nan
industry.tax_classification = data[7] if data[7] else np.nan
industry.is_etn = bool(data[8]) if data[8] else np.nan
industry.aum = float(data[9]) if data[9] else np.nan
industry.avg_daily_trading_volume = float(data[10]) if data[10] else np.nan
industry.asset_class = data[11] if data[11] else np.nan
industry.category = data[12] if data[12] else np.nan
industry.focus = data[13] if data[13] else np.nan
industry.development_class = data[14] if data[14] else np.nan
industry.region = data[15] if data[15] else np.nan
industry.is_levered = bool(data[16]) if data[16] else np.nan
industry.levered_amount = float(data[17]) if data[17] else np.nan
industry.is_active = bool(data[18]) if data[18] else np.nan
industry.administrator = data[19] if data[19] else np.nan
industry.advisor = data[20] if data[20] else np.nan
industry.custodian = data[21] if data[21] else np.nan
industry.distributor = data[22] if data[22] else np.nan
industry.portfolio_manager = data[23] if data[23] else np.nan
industry.subadvisor = data[24] if data[24] else np.nan
industry.transfer_agent = data[25] if data[25] else np.nan
industry.trustee = data[26] if data[26] else np.nan
industry.futures_commission_merchant = data[27] if data[27] else np.nan
industry.fiscal_year_end = data[28] if data[28] else None
industry.distribution_frequency = data[29] if data[29] else np.nan
industry.listing_exchange = data[30] if data[30] else None
industry.creation_unit_size = float(data[31]) if data[31] else np.nan
industry.creation_fee = data[32] if data[32] else np.nan
industry.geographic_exposure = self._parse_dictionary(data[33]) if data[33] else np.nan
industry.currency_exposure = str(self._parse_dictionary(data[34])) if data[34] else np.nan
industry.sector_exposure = self._parse_dictionary(data[35]) if data[35] else np.nan
industry.industry_group_exposure = self._parse_dictionary(data[36]) if data[36] else np.nan
industry.industry_exposure = self._parse_dictionary(data[37]) if data[37] else np.nan
industry.subindustry_exposure = self._parse_dictionary(data[38]) if data[38] else np.nan
industry.coupon_exposure = self._parse_dictionary(data[39]) if data[39] else np.nan
industry.maturity_exposure = self._parse_dictionary(data[40]) if data[40] else np.nan
industry.options_available = bool(data[41]) if data[41] else np.nan
industry.options_volume = float(data[42]) if data[42] else np.nan
industry.short_interest = float(data[43]) if data[43] else np.nan
industry.put_call_ratio = float(data[44]) if data[44] else np.nan
industry.num_holdings = float(data[45]) if data[45] else np.nan
industry.discount_premium = float(data[46]) if data[46] else np.nan
industry.bid_ask_spread = float(data[47]) if data[47] else np.nan
industry.put_volume = float(data[48]) if data[48] else np.nan
industry.call_volume = float(data[49]) if data[49] else np.nan
industry.management_fee = float(data[50]) if data[50] else np.nan
industry.other_expenses = float(data[51]) if data[51] else np.nan
industry.total_expenses = float(data[52]) if data[52] else np.nan
industry.fee_waivers = float(data[53]) if data[53] else np.nan
industry.net_expenses = float(data[54]) if data[54] else np.nan
industry.lead_market_maker = data[55] if data[55] else np.nan
return industry
def _parse_dictionary(self, data):
return {k: float(v) for k, v in (pair.split('=') for pair in data.split(';'))}
# region imports
from AlgorithmImports import *
# endregion
# region imports
from enum import StrEnum
# endregion
class FlowStrat(StrEnum):
TOP10_NAV_LONG = 'Highest NAV Top 10 Long'
TOP10_NAV_LONG_FLOWS_NEG = 'Highest NAV Flows Out Top 10 Long'
TOP10_NAV_LONG_FLOWS_POS = 'Highest NAV Flows In Top 10 Long'
TOP10_NAV_LONG_FLOWS_NEG_STOP5 = 'Highest NAV Flows Out Top 10 Long 5% stop loss'
TOP10_NAV_LONG_FLOWS_POS_STOP5 = 'Highest NAV Flows In Top 10 Long 5% stop loss'
TOP10_NAV_LS_FLOWS_POSNEG_STOP5 = 'Highest NAV Top 5 Flows In Long Flows Out Short 5% stop loss'
TOP10_NAV_LS_FLOWS_NEGPOS_STOP5 = 'Highest NAV Top 5 Flows In Short Flows Out Long 5% stop loss'
TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT = 'Highest NAV Top 5 Flow Predicts Returns'
TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5 = 'Highest NAV Top 5 Flow Predicts Returns 5% stop loss'
def stop_percentage_of(strat):
if strat in [FlowStrat.TOP10_NAV_LONG_FLOWS_NEG_STOP5,
FlowStrat.TOP10_NAV_LONG_FLOWS_POS_STOP5,
FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5,
FlowStrat.TOP10_NAV_LS_FLOWS_NEGPOS_STOP5,
FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5]:
return 0.05
else:
return None