Overall Statistics
Total Orders
389
Average Win
0.32%
Average Loss
-0.28%
Compounding Annual Return
55.894%
Drawdown
5.900%
Expectancy
0.209
Start Equity
1000000000
End Equity
1118417299.61
Net Profit
11.842%
Sharpe Ratio
2.144
Sortino Ratio
3.114
Probabilistic Sharpe Ratio
72.304%
Loss Rate
44%
Win Rate
56%
Profit-Loss Ratio
1.16
Alpha
0.319
Beta
-0.174
Annual Standard Deviation
0.162
Annual Variance
0.026
Information Ratio
1.956
Tracking Error
0.263
Treynor Ratio
-2.005
Total Fees
$3920940.54
Estimated Strategy Capacity
$13000000.00
Lowest Capacity Asset
XSD TFYQNA7D69UT
Portfolio Turnover
89.40%
# region imports
from AlgorithmImports import *
# endregion

from significant_correlations_for_lag import significant_correlations_for_lag

def get_etfs_with_significant_correlation(qb, fn, df_etf_industry, df_ret, df_fund_flow):
    dfsc = []
    for lag in [1,2,7,14,30]:
        df = significant_correlations_for_lag(lag, df_etf_industry, df_ret, df_fund_flow)
        plt.show()
        display(df)
        dfsc.append(df)
    df_etfs_significant_correlation = pd.concat(dfsc).sort_values(by=['ticker', 'lag']).sort_values(by='correlation')
    str = df_etfs_significant_correlation.to_csv()
    qb.object_store.save(fn, str)
    return df_etfs_significant_correlation
# region imports
from AlgorithmImports import *
# endregion

# Your New Python File
from tqdm import tqdm
import numpy as np
from scipy.stats import pearsonr
from plot_empirical_distribution import plot_empirical_distribution

def significant_correlations_for_lag(lag, df_etf_industry, df_ret, df_fund_flow):
    # Calculate correlations and p-values
    tickers = []
    correlations = []
    p_values = []
    for ticker in tqdm(df_etf_industry.symbol):
        rets = df_ret[df_ret.TICKER==ticker]
        flows = df_fund_flow[df_fund_flow.Ticker == ticker].reset_index(drop=True)
        flows = flows[flows.Date.isin(rets.DATE)]
        rets = rets[rets.DATE.isin(flows.Date)]
        flows_rets = pd.merge(rets, flows, left_on = 'DATE', right_on = 'Date')
        F = flows_rets.NetDailyFlow.values[0:-lag]
        R = flows_rets.TRAILING_1D_RET.values[lag:]
        if np.any(np.isnan(F)) or np.any(np.isnan(R)):
            continue
        if len(F) == 0:
            continue

        rho, p_value = pearsonr(F, R)
        tickers.append(ticker)
        correlations.append(rho)
        p_values.append(p_value)

    # Determine significance
    significance_level = 0.05
    significant_correlations = [(ticker, lag, rho, p) for ticker, rho, p in zip(tickers, correlations, p_values) if p < significance_level]

    # Output results
    correlations.sort()
    print(f"Number of significant correlations (p < {significance_level}):", len(significant_correlations), 'out of', len(correlations))
    A = np.array([x for x in correlations if np.isfinite(x)])
    plot_empirical_distribution(A, f'ETF correlation of 0D flow with {lag}D return', 'Correlation')
    df = pd.DataFrame(significant_correlations, columns = ['ticker', 'lag', 'correlation', 'p-value']).sort_values('correlation')
    return df
# region imports
from AlgorithmImports import *
# endregion

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm, gaussian_kde

def plot_empirical_distribution(A, title, quantity):

    # Plot the density histogram
    plt.figure(figsize=(8, 6))
    count, bins, ignored = plt.hist(A, bins=30, density=True, alpha=0.5, color='g', label='Empirical PDF')

    # Plot Gaussian KDE
    kde = gaussian_kde(A)
    x = np.linspace(min(bins), max(bins), 1000)
    plt.plot(x, kde(x), color='blue', label='Gaussian KDE')

    # Plot normal distribution
    mu, std = norm.fit(A)
    p = norm.pdf(x, mu, std)
    plt.plot(x, p, color='red', linewidth=2, label='Normal Distribution')

    # Add labels and legend
    plt.title(title)
    plt.xlabel(quantity)
    plt.ylabel('Density')
    plt.legend(loc='upper right')

    # Show plot
    plt.show()
# region imports
from AlgorithmImports import *
from FlowStrat import FlowStrat, stop_percentage_of
from CalmarAlgorithm import CalmarAlgorithm
from io import StringIO
from fund_flow import FundFlow
# endregion

class ETFGFundFlowsAlgorithm(CalmarAlgorithm):

    def initialize(self):
        self.df_flow_ret_corr = None
        self.get_flow_ret_corr()
        #self.strategy = FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5
        self.strategy = FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT
        # historical_cutoff for universe calibration was 20220701, so don't trade before that to avoid future information

        if True:                             # In-sample period
            self.set_start_date(2022, 7, 1)
            self.set_end_date(2022, 10, 1)
        else:
            self.set_start_date(2024, 6, 1)  # Out of sample period
            self.set_end_date(2024, 9, 1)

        self.starting_cash = 1_000_000_000
        self.set_cash(self.starting_cash)
        self.set_warm_up(timedelta(1))
        self.set_security_initializer(BrokerageModelSecurityInitializer(self.brokerage_model, 
                    FuncSecuritySeeder(self.get_last_known_prices)))
        self.universe_settings.resolution = Resolution.DAILY
        self._universe = self.add_universe(FundFlow, 'FundFlow', Resolution.DAILY, self._select_assets)
        spy = Symbol.create('SPY', SecurityType.EQUITY, Market.USA)
        self.schedule.on(
            self.date_rules.every_day(spy),
            self.time_rules.before_market_open(spy, 10), self._rebalance)
        self.get_etf_industry()
        self.restrict_to = self.df_flow_ret_corr.Ticker.unique()
        self.set_strategy_name()
        self.set_stop_percentage(self.strategy)

    def set_strategy_name(self):
        ss = str(self.strategy)
        sd = str(self.start_date)[0:10].replace('-','')
        se = str(self.end_date)[0:10].replace('-','')
        self.set_name(f"{ss} {sd}-{se}")

    def _select_assets(self, data):
        current_date = self.Time.date()
        self.correlations = {etf.symbol.value: self.get_correlation(etf.symbol.value, current_date) for etf in data}
        self.etfs = list(sorted([etf for etf in data 
                                 if etf.symbol.value in self.restrict_to \
                                    and self.correlations[etf.symbol.value] != 0 \
                                    and etf.nav > 0], key=lambda etf: etf.nav))
        if self.strategy in [FlowStrat.TOP10_NAV_LONG_FLOWS_NEG, FlowStrat.TOP10_NAV_LONG_FLOWS_NEG_STOP5]:
            self.etfs = [etf for etf in self.etfs if etf.fund_flow < 0]
        elif self.strategy in [FlowStrat.TOP10_NAV_LONG_FLOWS_POS_STOP5, FlowStrat.TOP10_NAV_LONG_FLOWS_POS]:
            self.etfs = [etf for etf in self.etfs if etf.fund_flow > 0]
        elif self.strategy in [FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5, 
                               FlowStrat.TOP10_NAV_LS_FLOWS_NEGPOS_STOP5,
                               FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT,
                               FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5]:
            top_5_positive_flow = [etf for etf in self.etfs if etf.fund_flow > 0][-5:]
            top_5_negative_flow = [etf for etf in self.etfs if etf.fund_flow < 0][-5:]
            self.etfs = top_5_positive_flow + top_5_negative_flow
        self.etfs = self.etfs[-10:]
        symbols = [etf.symbol for etf in self.etfs]
        return symbols

    def _rebalance(self):
        if not self._universe.selected:
            return
        n = len(self._universe.selected)
        symbols = [symbol for symbol in self._universe.selected if self.securities[symbol].price]
        if not symbols:
            return
        if self.strategy in [FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5, FlowStrat.TOP10_NAV_LS_FLOWS_NEGPOS_STOP5]:
            return self._rebalance_TOP10_NAV_LS_FLOWS_POSNEG_STOP5()
        elif self.strategy in [FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT,
                               FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5]:
            return self._rebalance_TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT()
        else:
            weight = 0.9 / len(symbols)
            for symbol in symbols:
                self.set_holdings(symbol, weight, True)

    def _rebalance_TOP10_NAV_LS_FLOWS_POSNEG_STOP5(self):
        top_5_positive_flow = [etf for etf in self.etfs if etf.fund_flow > 0]
        top_5_negative_flow = [etf for etf in self.etfs if etf.fund_flow < 0]
        if self.strategy == FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5:
            pos, neg = top_5_positive_flow, top_5_negative_flow
        else:
            neg, pos = top_5_positive_flow, top_5_negative_flow
        weight = 0.9 / len(pos)
        for etf in pos:
            self.set_holdings(etf.symbol, weight, True)
        weight = 0.9 / len(neg)
        for etf in neg:
            self.set_holdings(etf.symbol, -weight, True)

    def _rebalance_TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT(self):
        current_date = self.Time.date()
        top_5_positive_flow = [etf for etf in self.etfs if etf.fund_flow > 0]
        top_5_negative_flow = [etf for etf in self.etfs if etf.fund_flow < 0]
        # Implement your rebalancing logic here
        longs = []
        shorts = []
        for etf in top_5_positive_flow:
            rho = self.correlations[etf.symbol.value]
            if rho > 0: # and flow > 0
                longs.append((rho, etf))
            else: # rho < 0 and flow > 0
                shorts.append((-rho, etf))
        for etf in top_5_negative_flow:
            rho = self.correlations[etf.symbol.value]
            if rho > 0: # and flow < 0
                shorts.append((rho, etf))
            else: # rho < 0 and flow < 0
                longs.append((-rho, etf))
        targets = []
        sum_long_rho = sum([rho for rho,etf in longs])
        if sum_long_rho > 0:
            for rho, etf in longs:
                weight = rho/sum_long_rho
                targets.append(PortfolioTarget(etf.symbol, weight))
        sum_short_rho = sum([rho for rho,etf in shorts])
        if sum_short_rho > 0:
            for rho, etf in shorts:
                weight = -rho/sum_short_rho
                targets.append(PortfolioTarget(etf.symbol, weight))
        if targets:  # if no information just coast
            self.set_holdings(targets, liquidate_existing_holdings=True)
    
    def on_data(self, data):
        super().on_data(data)
        self.set_stop_percentage(data)

    def set_stop_percentage(self, data):
        self.stop_loss_percentage = stop_percentage_of(self.strategy)
        if self.stop_loss_percentage is None:
            return
        for symbol, security in self.securities.items():
            if security.invested: 
                pm = -1. if security.holdings.quantity > 0 else 1.
                stop_price = security.price * (1 + pm * self.stop_loss_percentage)
                self.stop_market_order(symbol, -security.holdings.quantity, stop_price)

    def get_etf_industry(self):
        fn = 'ericson_df_etf_industry.csv'  
        S = self.object_store.read(fn)
        self.df_etf_industry = pd.read_csv(StringIO(S))

    def get_flow_ret_corr(self):
        if self.df_flow_ret_corr is not None:
            return self.df_flow_ret_corr
        else:
            S = self.object_store.read('dfrho_ericson.csv')
            self.df_flow_ret_corr = pd.read_csv(StringIO(S))
            self.df_flow_ret_corr.Date = self.df_flow_ret_corr.Date.apply(lambda date_int: datetime.strptime(str(date_int), '%Y-%m-%d').date())
            self.debug(f"RHO goes to {str(self.df_flow_ret_corr.iloc[-1].Date)}")
            return self.df_flow_ret_corr

    def get_correlation(self, sym, current_date):
        rho = self.df_flow_ret_corr[(self.df_flow_ret_corr.Date == current_date) & (self.df_flow_ret_corr.Ticker == sym)]
        if len(rho):
            rho = rho.iloc[0].CorrRF
        else:
            rho = 0
        return rho

    def __init__(self):
        super().__init__()
# region imports
from AlgorithmImports import *
# endregion


class FundFlow(PythonData):

    def get_source(self, config, date, is_live):
        return SubscriptionDataSource(
            f"etfg_fundflow_us/{date.year}/{date.strftime('%Y%m%d')}_fundflow_v2.csv",
            SubscriptionTransportMedium.OBJECT_STORE,
            FileFormat.CSV
        )

    def reader(self, config, line, date, is_live):
        data = [x for x in line.split(',') if x]
        if len(data) < 6:
            return None
        # Check if QC Cloud has this asset.
        if ' ' in data[2]:
            return None
        security_id = SecurityIdentifier.generate_equity(data[2], Market.USA, True, None, date)
        if security_id.date.year < 1998:
            return None
        ff = FundFlow()
        ff.symbol = Symbol(security_id, data[2])

        ff.shares_outstanding = float(data[3])
        ff.nav = float(data[4])
        ff.fund_flow = float(data[5])
        ff.time = datetime.strptime(data[0], '%Y-%m-%d') 
        ff.end_time = ff.time + timedelta(1)
        return ff
# region imports
from AlgorithmImports import *
# endregion

from list_csv_files_in_object_store import list_csv_files_in_object_store
from io import StringIO

def get_df_fund_flows(qb, symbols, fn):
    fns = list_csv_files_in_object_store(qb, 'etfg_fundflow_us/')
    flow_dfs = []
    for fn in fns:
        flows = qb.object_store.read(fn)
        df = pd.read_csv(StringIO(flows), header=None)
        df.columns=['ETFGDate', 'Date', 'Ticker', 'FundSharesOutstanding', 'NAV', 'NetDailyFlow']
        flow_dfs.append(df)
    df_fund_flow = pd.concat(flow_dfs)
    df_fund_flow = df_fund_flow[df_fund_flow.Ticker.isin(symbols)]
    fn = 'df_fund_flow_ericson.csv'
    sdf = df_fund_flow.to_csv()
    qb.object_store.save(fn, sdf)
    return df_fund_flow
# region imports
from AlgorithmImports import *
from collections import defaultdict
# endregion

class CalmarAlgorithm(QCAlgorithm):

    def __init__(self):
        super().__init__()
        self.debug("Calmar Init!!")
        self.opening_prices = {}
        self.equity_over_time = defaultdict(float)

    def on_data(self, data):
        self.equity_over_time[self.time] = self.portfolio.total_portfolio_value

    def on_end_of_algorithm(self):
        # Cancel all open orders before liquidation
        self.transactions.cancel_open_orders()
        self.debug("All open orders cancelled.")

        # Liquidate all positions
        self.liquidate()
        self.debug("All positions have been liquidated.")

        # Calculate the Calmar Ratio
        annualized_return = self.calculate_annualized_return()
        max_drawdown = self.calculate_max_drawdown()
        calmar_ratio = annualized_return / abs(max_drawdown)
        self.set_runtime_statistic("Calmar Ratio", round(calmar_ratio, 2))
        self.debug(f'Calmar Ratio: {calmar_ratio}')

    def calculate_annualized_return(self) -> float:
        starting_capital = self.starting_cash
        final_capital = self.portfolio.total_portfolio_value
        years = (self.end_date - self.start_date).days / 365.25
        return ((final_capital / starting_capital) ** (1 / years)) - 1

    def calculate_max_drawdown(self) -> float:
        peak = -float('inf')
        max_drawdown = 0.001
        for time, value in self.equity_over_time.items():
            if value > peak:
                peak = value
            drawdown = (peak - value) / peak
            if drawdown > max_drawdown:
                max_drawdown = drawdown
        return max_drawdown
# region imports
from AlgorithmImports import *
# endregion

def list_csv_files_in_object_store(qb, folder: str) -> None:
    # Access the ObjectStore
    object_store = qb.object_store

    # Get all keys (file paths) in the ObjectStore
    all_keys = list(object_store.keys)

    # Filter keys to get files in the specified folder
    folder_files = [key for key in all_keys if key.startswith(folder) and key.endswith('.csv')]

    return folder_files
# region imports
from AlgorithmImports import *
# endregion

from tqdm import tqdm

def aggregate_flows_and_returns_at_sector_level(etf_sector, df_ret, df_fund_flow):
    df_ret.columns = ['Date', 'Ticker', 'Ret2D', 'Close']
    df_ret_flow = pd.merge(df_fund_flow, df_ret, on = ['Date', 'Ticker'])
    sectors = []
    for key in etf_sector:
        sectors.extend([sector for sector in etf_sector[key]])
    sectors = list(set(sectors))
    sectors.sort()

    df_sector_ret_flow = []
    for ticker in tqdm(df_terms_correlated.symbol):
        for sector, factor in etf_sector[ticker].items():
            flows = df_ret_flow[df_ret_flow.Ticker == ticker]
            for i, row in flows.iterrows():
                df_sector_ret_flow.append((row.Date, 
                                            sector,
                                            row.FundSharesOutstanding * factor,
                                            row.NAV,
                                            row.NetDailyFlow * factor,
                                            row.Ret2D,
                                            row.Close))

    df_sector_ret_flow = pd.DataFrame(df_sector_ret_flow, 
            columns = ['Date', 'Sector', 'FundSharesOutstanding', 'NAV', 'NetDailyFlow', 'Ret2D', 'Close'])
    rows = []
    for sector in tqdm(sectors):
        df = df_sector_ret_flow[df_sector_ret_flow.Sector == sector].copy()
        dates = df.Date.unique()
        for date in dates:
            df1 = df[df.Date == date].copy()
            df1['sharenav'] = df1['FundSharesOutstanding'] * df1.NAV
            sumsharenav = df1['sharenav'].sum()
            df1['Ret'] = df1.Ret2D * df1['sharenav'] / sumsharenav
            df1 = df1[['Date', 'Sector', 'NetDailyFlow', 'Ret']]
            df2 = df1.groupby(['Date', 'Sector']).agg({
                    'NetDailyFlow': 'sum',
                    'Ret': 'sum'}).reset_index()
            rows.extend(df2.values.tolist())
    df_sector_flow = pd.DataFrame(rows, columns = ['Date', 'Sector', 'Flow', 'Ret'])
    return df_sector_flow
# region imports
from AlgorithmImports import *
# endregion

from tqdm import tqdm

def aggregate_flows_and_returns_at_country_level(etf_country, df_ret, df_fund_flow, df_terms_correlated):
    df_ret.columns = ['Date', 'Ticker', 'Ret2D', 'Close']
    df_ret_flow = pd.merge(df_fund_flow, df_ret, on = ['Date', 'Ticker'])
    countries = []
    for key in etf_country:
        countries.extend([country for country in etf_country[key]])
    countries = list(set(countries))
    countries.sort()

    df_country_ret_flow = []
    for ticker in tqdm(df_terms_correlated.symbol):
        for country, factor in etf_country[ticker].items():
            flows = df_ret_flow[df_ret_flow.Ticker == ticker]
            for i, row in flows.iterrows():
                df_country_ret_flow.append((row.Date, 
                                            country,
                                            row.FundSharesOutstanding * factor,
                                            row.NAV,
                                            row.NetDailyFlow * factor,
                                            row.Ret2D,
                                            row.Close))

    df_country_ret_flow = pd.DataFrame(df_country_ret_flow, 
            columns = ['Date', 'Country', 'FundSharesOutstanding', 'NAV', 'NetDailyFlow', 'Ret2D', 'Close'])
    rows = []
    for country in tqdm(countries):
        df = df_country_ret_flow[df_country_ret_flow.Country == country].copy()
        dates = df.Date.unique()
        for date in dates:
            df1 = df[df.Date == date].copy()
            df1['sharenav'] = df1['FundSharesOutstanding'] * df1.NAV
            sumsharenav = df1['sharenav'].sum()
            df1['Ret'] = df1.Ret2D * df1['sharenav'] / sumsharenav
            df1 = df1[['Date', 'Country', 'NetDailyFlow', 'Ret']]
            df2 = df1.groupby(['Date', 'Country']).agg({
                    'NetDailyFlow': 'sum',
                    'Ret': 'sum'}).reset_index()
            rows.extend(df2.values.tolist())
    df_country_flow = pd.DataFrame(rows, columns = ['Date', 'Country', 'Flow', 'Ret'])
    return df_country_flow
# region imports
from AlgorithmImports import *
# endregion

def get_df_ret_for_universe(qb, symbols, fn):
    ret_2022 = qb.object_store.read('etf_returns_2022.txt')
    df_ret_2022 = pd.read_csv(StringIO(ret_2022), sep='|')
    ret_2023 = qb.object_store.read('etf_returns_2023.txt')
    df_ret_2023 = pd.read_csv(StringIO(ret_2023), sep='|')
    ret_2024 = qb.object_store.read('etf_returns_2024.txt')
    df_ret_2024 = pd.read_csv(StringIO(ret_2024), sep='|')
    df_ret = pd.concat([df_ret_2022, df_ret_2023, df_ret_2024])
    df_ret = df_ret[df_ret.TICKER.isin(symbols)]
    rets = df_ret.to_csv()
    qb.object_store.save(fn, rets)
    return df_ret
# region imports
from AlgorithmImports import *
# endregion

def calculate_corr(df, window, F, R):
    corr_values = [0] * window  # Initialize with NaNs for the first 'window' rows
    for i in range(window, len(df)):
        corr = df[F][i-window:i].corr(df[R][i-window+1:i+1])
        corr = corr if np.isfinite(corr) else 0
        corr_values.append(corr)
    df['CorrRF'] = corr_values
    return df
# region imports
from AlgorithmImports import *
# endregion

def split_dataframe_by_ticker(df):
    return [group for _, group in df.groupby('Ticker')]
# region imports
from AlgorithmImports import *
from io import StringIO 
from etf_industry import ETFIndustry
# endregion

def get_tickers_in_scope(qb, fn):
    """
    * Get ETF industry data
    * Exclude smaller, less relevant ETFs, such as the bottom 10% by NAV. This approach helps narrow the focus to more significant and impactful data. 
    * Exclude funds unlikely to have predictive value for institutional flows (e.g., thematic or highly leveraged ETFs).
    """
    etf_industry = qb.add_data(ETFIndustry, 'ETFIndustry').symbol
    df_etf_industry = qb.history(etf_industry, datetime(2022, 1, 2), datetime(2022, 1, 4), Resolution.DAILY)
    df_etf_industry = df_etf_industry[df_etf_industry.asset_class == 'Equity'].reset_index()
    df_etf_industry = df_etf_industry[["symbol", "asset_class","aum","avg_daily_trading_volume","bid_ask_spread","category","currency_exposure",
        "description","development_class","focus","geographic_exposure","inception_date","industry_exposure","industry_group_exposure",
        "is_active","is_etn","is_levered","issuer","lead_market_maker","levered_amount","listing_exchange","management_fee","maturity_exposure",
        "net_expenses","num_holdings","options_available","options_volume","other_expenses","portfolio_manager","primary_benchmark","put_call_ratio",
        "put_volume","region","sector_exposure","short_interest","subadvisor","subindustry_exposure","tax_classification","total_expenses",
        "transfer_agent","trustee"]]
    df_etf_industry = df_etf_industry[df_etf_industry.avg_daily_trading_volume.notna()]
    df_etf_industry = df_etf_industry[["symbol", "asset_class","aum","avg_daily_trading_volume","bid_ask_spread","category","currency_exposure","description",
        "development_class","focus","geographic_exposure","inception_date","industry_group_exposure","is_etn","levered_amount","management_fee",
        "maturity_exposure","net_expenses","num_holdings","options_available","options_volume","other_expenses","primary_benchmark",
        "put_call_ratio","put_volume","region","sector_exposure","short_interest","subindustry_exposure","total_expenses"]]
    df_etf_industry = df_etf_industry[df_etf_industry["options_available"].notna()]
    df_etf_industry = df_etf_industry[["symbol","aum","avg_daily_trading_volume","bid_ask_spread","category","currency_exposure","description",
        "development_class","focus","geographic_exposure","industry_group_exposure","is_etn","levered_amount","management_fee",
        "maturity_exposure","net_expenses","num_holdings","options_volume","other_expenses","primary_benchmark","put_call_ratio",
        "put_volume","region","sector_exposure","short_interest","subindustry_exposure","total_expenses"]]
    df_etf_industry = df_etf_industry[df_etf_industry.put_call_ratio.notna()]
    ### Exclude bottom 10% by NAV
    aum = df_etf_industry['aum'].values
    aum.sort()
    pct10 = aum[int(len(aum)*.1)]
    aum = aum[int(len(aum)*.1):]
    plt.plot(aum)
    df_etf_industry = df_etf_industry[df_etf_industry.aum > pct10]
    df_etf_industry = df_etf_industry.sort_values(by='aum').reset_index()
    ### Exclude thematic or highly leveraged ETFs
    df_etf_industry = df_etf_industry[df_etf_industry.levered_amount.isnull()]
    syms = df_etf_industry.to_csv()
    qb.object_store.save(fn, syms)
    return df_etf_industry
# region imports
from AlgorithmImports import *
# endregion


class ETFReturn(PythonData):

    def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live: bool):
        return SubscriptionDataSource(
            f"etf_returns_{date.year}.txt",
            SubscriptionTransportMedium.OBJECT_STORE,
            FileFormat.CSV
        )

    def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live: bool):
        if not line[0].isdigit():
            return None
        data = [x for x in line.split('|') if x]
        if len(data) < 4:
            return None
        # Check if QC Cloud has this asset.
        security_id = SecurityIdentifier.generate_equity(data[1], Market.USA, True, None, date)
        if security_id.date.year < 1998:
            return None
        etf = ETFReturn()
        etf.symbol = Symbol(security_id, data[1])
        etf.trailing_1d_return = float(data[2]) 
        etf.value = float(data[3])
        etf.end_time = datetime.strptime(data[0], '%Y%m%d')
        return etf
# region imports
from AlgorithmImports import *
# endregion

import csv

class ETFIndustry(PythonData):

    def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live: bool):
        return SubscriptionDataSource(
            f"etfg_industry_us/{date.year}/{date.strftime('%Y%m%d')}_industries_v2.csv",
            SubscriptionTransportMedium.OBJECT_STORE,
            FileFormat.CSV
        )

    def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live: bool):
        data = list(csv.reader(line.splitlines()))[0]
        # Check if QC Cloud has this asset.
        if ' ' in data[2]:
            return None
        security_id = SecurityIdentifier.generate_equity(data[2], Market.USA, True, None, date)
        if security_id.date.year < 1998:
            return None
        industry = ETFIndustry()
        industry.time = datetime.strptime(data[0], '%Y-%m-%d') 
        industry.end_time = industry.time + timedelta(1)
        industry.symbol = Symbol(security_id, data[2])
        
        industry.issuer = data[3] if data[3] else np.nan
        industry.description = data[4] if data[4] else np.nan
        industry.inception_date = datetime.strptime(data[5], '%Y-%m-%d') if data[5] else np.nan
        industry.primary_benchmark = data[6] if data[6] else np.nan
        industry.tax_classification = data[7] if data[7] else np.nan
        industry.is_etn = bool(data[8]) if data[8] else np.nan
        industry.aum = float(data[9]) if data[9] else np.nan
        industry.avg_daily_trading_volume = float(data[10]) if data[10] else np.nan
        industry.asset_class = data[11] if data[11] else np.nan
        industry.category = data[12] if data[12] else np.nan
        industry.focus = data[13] if data[13] else np.nan
        industry.development_class = data[14] if data[14] else np.nan
        industry.region = data[15] if data[15] else np.nan
        industry.is_levered = bool(data[16]) if data[16] else np.nan
        industry.levered_amount = float(data[17]) if data[17] else np.nan
        industry.is_active = bool(data[18]) if data[18] else np.nan
        industry.administrator = data[19] if data[19] else np.nan
        industry.advisor = data[20] if data[20] else np.nan
        industry.custodian = data[21] if data[21] else np.nan
        industry.distributor = data[22] if data[22] else np.nan
        industry.portfolio_manager = data[23] if data[23] else np.nan
        industry.subadvisor = data[24] if data[24] else np.nan
        industry.transfer_agent = data[25] if data[25] else np.nan
        industry.trustee = data[26] if data[26] else np.nan
        industry.futures_commission_merchant = data[27] if data[27] else np.nan
        industry.fiscal_year_end = data[28] if data[28] else None
        industry.distribution_frequency = data[29] if data[29] else np.nan
        industry.listing_exchange = data[30] if data[30] else None
        industry.creation_unit_size = float(data[31]) if data[31] else np.nan
        industry.creation_fee = data[32] if data[32] else np.nan
        industry.geographic_exposure = self._parse_dictionary(data[33]) if data[33] else np.nan
        industry.currency_exposure = str(self._parse_dictionary(data[34])) if data[34] else np.nan
        industry.sector_exposure = self._parse_dictionary(data[35]) if data[35] else np.nan
        industry.industry_group_exposure = self._parse_dictionary(data[36]) if data[36] else np.nan
        industry.industry_exposure = self._parse_dictionary(data[37]) if data[37] else np.nan
        industry.subindustry_exposure = self._parse_dictionary(data[38]) if data[38] else np.nan
        industry.coupon_exposure = self._parse_dictionary(data[39]) if data[39] else np.nan
        industry.maturity_exposure = self._parse_dictionary(data[40]) if data[40] else np.nan
        industry.options_available = bool(data[41]) if data[41] else np.nan
        industry.options_volume = float(data[42]) if data[42] else np.nan
        industry.short_interest = float(data[43]) if data[43] else np.nan
        industry.put_call_ratio = float(data[44]) if data[44] else np.nan
        industry.num_holdings = float(data[45]) if data[45] else np.nan
        industry.discount_premium = float(data[46]) if data[46] else np.nan
        industry.bid_ask_spread = float(data[47]) if data[47] else np.nan
        industry.put_volume = float(data[48]) if data[48] else np.nan
        industry.call_volume = float(data[49]) if data[49] else np.nan
        industry.management_fee = float(data[50]) if data[50] else np.nan
        industry.other_expenses = float(data[51]) if data[51] else np.nan
        industry.total_expenses = float(data[52]) if data[52] else np.nan
        industry.fee_waivers = float(data[53]) if data[53] else np.nan
        industry.net_expenses = float(data[54]) if data[54] else np.nan
        industry.lead_market_maker = data[55] if data[55] else np.nan
    
        return industry

    def _parse_dictionary(self, data):
        return {k: float(v) for k, v in (pair.split('=') for pair in data.split(';'))}
# region imports
from AlgorithmImports import *
# endregion
# region imports
from enum import StrEnum
# endregion

class FlowStrat(StrEnum):
    TOP10_NAV_LONG = 'Highest NAV Top 10 Long'
    TOP10_NAV_LONG_FLOWS_NEG = 'Highest NAV Flows Out Top 10 Long'
    TOP10_NAV_LONG_FLOWS_POS = 'Highest NAV Flows In Top 10 Long'
    TOP10_NAV_LONG_FLOWS_NEG_STOP5 = 'Highest NAV Flows Out Top 10 Long 5% stop loss'
    TOP10_NAV_LONG_FLOWS_POS_STOP5 = 'Highest NAV Flows In Top 10 Long 5% stop loss'
    TOP10_NAV_LS_FLOWS_POSNEG_STOP5 = 'Highest NAV Top 5 Flows In Long Flows Out Short 5% stop loss'
    TOP10_NAV_LS_FLOWS_NEGPOS_STOP5 = 'Highest NAV Top 5 Flows In Short Flows Out Long 5% stop loss'
    TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT = 'Highest NAV Top 5 Flow Predicts Returns'
    TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5 = 'Highest NAV Top 5 Flow Predicts Returns 5% stop loss'

def stop_percentage_of(strat):
    if strat in [FlowStrat.TOP10_NAV_LONG_FLOWS_NEG_STOP5,
                FlowStrat.TOP10_NAV_LONG_FLOWS_POS_STOP5,
                FlowStrat.TOP10_NAV_LS_FLOWS_POSNEG_STOP5,
                FlowStrat.TOP10_NAV_LS_FLOWS_NEGPOS_STOP5,
                FlowStrat.TOP10_NAV_LS_FLOWS_1DFWDRHO_INSIGHT_STOP5]:
        return 0.05
    else:
        return None