| Overall Statistics |
|
Total Orders 197 Average Win 0.50% Average Loss -0.27% Compounding Annual Return 81.225% Drawdown 10.100% Expectancy 1.215 Start Equity 10000000 End Equity 14120863.45 Net Profit 41.209% Sharpe Ratio 2.056 Sortino Ratio 3.037 Probabilistic Sharpe Ratio 72.093% Loss Rate 22% Win Rate 78% Profit-Loss Ratio 1.83 Alpha 0.586 Beta -0.123 Annual Standard Deviation 0.275 Annual Variance 0.075 Information Ratio 1.345 Tracking Error 0.295 Treynor Ratio -4.599 Total Fees $0.00 Estimated Strategy Capacity $29000000.00 Lowest Capacity Asset BTCUSD 2XR Portfolio Turnover 3.82% |
# region imports
from AlgorithmImports import *
from transformers import AutoTokenizer
from transformers import pipeline
import joblib
import talib
from helper_function import *
import pytz
import pandas as pd
# endregion
class MySlippageModel:
"""Class that defines our slippage model"""
def GetSlippageApproximation(self, asset: Security, order: Order) -> float:
"""Slippage formula"""
slippage = asset.Price * 0.0001 * np.log10(2*float(order.AbsoluteQuantity))
return slippage
class FatGreenHorse(QCAlgorithm):
"""Main Algo Class"""
def Initialize(self):
# INS in-sample
self.SetStartDate(2022, 4, 10)
self.SetEndDate(2023,3,31)
# Out of Sample (OOS) 1
# self.SetStartDate(2021, 5, 14)
# self.SetEndDate(2021, 8, 10)
# OOS 2
# self.SetStartDate(2021, 9, 10)
# self.SetEndDate(2021, 12, 10)
self.SetCash(1000000) # Setting initial Cash
self.SetWarmUp(30) # Warm up for 30 days
# Adding instruments
self.AddEquity("SPY", Resolution.Daily)
self.btc_security = self.AddCrypto("BTCUSD", Resolution.Daily)
self.btc_symbol = self.btc_security.symbol
self.eth_security = self.AddCrypto("ETHUSD", Resolution.Daily)
self.sol_security = self.AddCrypto("SOLUSD", Resolution.Daily)
self.symbol = self.AddEquity("coin", Resolution.Daily).Symbol
# Slippage (uncomment for slippage)
# self.btc_security.SetSlippageModel(MySlippageModel())
# Adding data sources
self.dataset_symbol = self.AddData(TiingoNews, self.symbol, Resolution.Daily).Symbol
self.treas = self.add_data(USTreasuryYieldCurveRate, "USTYCR", Resolution.Daily).symbol
self.vix = self.add_data(CBOE, "VIX", Resolution.Daily).symbol
# setting up FINBERT
bert_res = self.set_up_bert()
self.pipe = bert_res[0]
self.tokenizer = bert_res[1]
self.signal_model = self.load_rf_model()
# Initializing empty DF for Sentiment scores
self.btc_sentiment_df = pd.DataFrame()
self.coin_sentiment_df = pd.DataFrame()
# Initiliazing list to store preds
self.predicted_15dars = []
# Initializing runtime flags
self.first_run = True
self.bought_btc = False
self.shorted_btc = False
self.old_portfolio_value = 0
self.days_portfolio_decline = 0
def set_up_bert(self) -> tuple:
"""Loads FINBERT from QC object store"""
path = self.ObjectStore.GetFilePath("mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis")
pipe = pipeline("text-classification", model=path)
self.debug(f'{pipe("bitcoin pushes to fresh record high after breaching $62,000 yesterday")}')
tokenizer = AutoTokenizer.from_pretrained(path)
return pipe, tokenizer
def load_rf_model(self):
"""Loads trained Random Forest Model from QC object store"""
path = self.ObjectStore.GetFilePath("group_4_crypto_trading_with_sentiment_sprin_2024/random_forest_model.pkl")
return joblib.load(path)
def OnData(self, data: Slice):
if self.IsWarmingUp:
return
# Risk management 1 - If value drops by more than 5% from previous OnData call, Liquidate
if self.portfolio.TotalPortfolioValue < self.old_portfolio_value * 0.95:
self.debug(f"Liquidating at: {self.time}")
self.Liquidate()
# Check if data contains BTCUSD info
if data.ContainsKey("BTCUSD"):
# Getting necessary historical BTC data for passing to our RF ML model
df = self.History(self.btc_symbol, 37).droplevel(0)
# Extracting technical indicators
ti = technical_indicators(df)
stoch_fastk = ti['STOCH_FASTK'].values[-1]
stoch_fastd = ti['STOCH_FASTD'].values[-1]
stoch_fast_d_rolling_10 = ti['STOCH_FASTD'].rolling(10).sum().values[-1]
aroonosc = ti['AROONOSC'].values[-1][-1]
mfi = ti['MFI'].values[-1]
roc = ti['ROC'].values[-1]
rsi = ti['RSI'].values[-1]
roc_rolling_10 = ti['RSI'].rolling(10).sum().values[-1]
willr = ti['WILLR'].values[-1]
mom_rolling_10 = ti['MOM'].rolling(10).sum().values[-1]
natr = ti['NATR'].values[-1]
mom = ti['MOM'].values[-1]
cmo = ti['CMO'].values[-1]
willr_rolling_10 = ti['WILLR'].rolling(10).sum().values[-1]
macdhist = ti['MACDHIST'].values[-1]
stoch_fastd_rolling_10 = ti['STOCH_FASTD'].rolling(10).sum().values[-1]
plus_di_rolling_10 = ti['PLUS_DI'].rolling(10).sum().values[-1]
macdhist_rolling_10 = ti['MACDHIST'].rolling(10).sum().values[-1]
plus_di = ti['PLUS_DI'].values[-1]
stoch_fastk_rolling_10 = ti['STOCH_FASTK'].rolling(10).sum().values[-1]
cci = ti['CCI'].values[-1]
ULTOSC_rolling_10 = ti['ULTOSC'].rolling(10).sum().values[-1]
ultosc = ti['ULTOSC'].values[-1]
minus_di = ti['MINUS_DI'].values[-1]
mfi_rolling_10 = ti['MFI'].rolling(10).sum().values[-1]
macd = ti['MACD'].values[-1]
cci_rolling_10 = ti['CCI'].rolling(10).sum().values[-1]
ht_phasor_quad_rolling_10 = ti['HT_PHASOR_quadrature'].rolling(10).sum().values[-1]
trange_rolling_10 = ti['TRANGE'].rolling(10).sum().values[-1]
ht_dcphase = ti['HT_DCPHASE'].values[-1]
# Getting last 37 days of News data from TIINGO
history_coin = process_coinbase(self.History(self.dataset_symbol, 37, Resolution.Daily).droplevel(0))
bitcoin_news = history_coin[1]
coinbase_news = history_coin[0]
# Filtering new news data since the last day
if not self.first_run:
new_btc_news = bitcoin_news[bitcoin_news.index > self.last_day.replace(tzinfo=pytz.UTC)]
new_coin_news = coinbase_news[coinbase_news.index > self.last_day.replace(tzinfo=pytz.UTC)]
self.last_day = self.Time
else:
new_btc_news = bitcoin_news
new_coin_news = coinbase_news
self.last_day = self.Time
self.first_run = False
# Analyzing sentiment of new news data
new_btc_news, new_coin_news = get_sentiment(self, new_btc_news, new_coin_news)
# Concatenating new news data with existing data
if new_btc_news.shape[0] > 0:
self.btc_sentiment_df = pd.concat([self.btc_sentiment_df, new_btc_news], ignore_index=True)
if new_btc_news.shape[0] > 0:
self.coin_sentiment_df = pd.concat([self.coin_sentiment_df, new_coin_news], ignore_index=True)
# Calculating sentiment scores
sentiment_scores = get_sentiment_scores(self.btc_sentiment_df, self.coin_sentiment_df)
coinbase_neutral_count_rolling_10 = sentiment_scores[0]
coinbase_positive_avg_score_rolling_10 = sentiment_scores[1]
coinbase_positive_avg_score_rolling_30 = sentiment_scores[2]
btc_total_news_score_rolling_30 = sentiment_scores[3]
coinbase_total_news_score = sentiment_scores[4]
# Creating dataframe with feature order for model prediction
indicators_order = [aroonosc, roc_rolling_10, mom_rolling_10, roc, mfi, willr, rsi, cmo,
mom, natr, willr_rolling_10, macdhist, stoch_fast_d_rolling_10,
plus_di, plus_di_rolling_10, stoch_fastk_rolling_10, cci,
stoch_fastk, macdhist_rolling_10, btc_total_news_score_rolling_30,
ultosc, coinbase_positive_avg_score_rolling_10, stoch_fastd,
ht_phasor_quad_rolling_10, coinbase_total_news_score,
coinbase_neutral_count_rolling_10, minus_di, trange_rolling_10,
coinbase_positive_avg_score_rolling_30, ht_dcphase]
df = pd.DataFrame([indicators_order], columns=['AROONOSC', 'ROC_rolling_10', 'MOM_rolling_10', 'ROC',
'MFI', 'WILLR', 'RSI', 'CMO', 'MOM', 'NATR',
'WILLR_rolling_10', 'MACDHIST', 'STOCH_FASTD_rolling_10',
'PLUS_DI', 'PLUS_DI_rolling_10', 'STOCH_FASTK_rolling_10',
'CCI', 'STOCH_FASTK', 'MACDHIST_rolling_10',
'btc_total_news_score_rolling_30', 'ULTOSC',
'coinbase_positive_avg_score_rolling_10', 'STOCH_FASTD',
'HT_PHASOR_quadrature_rolling_10', 'coinbase_total_news_score',
'coinbase_neutral_count_rolling_10', 'MINUS_DI', 'TRANGE_rolling_10',
'coinbase_positive_avg_score_rolling_30', 'HT_DCPHASE'])
# using the above features to get our signal for tomorrow
preds = self.signal_model.predict(df)
self.predicted_15dars.append(preds[0])
# Implementing main strategy based on predicted signal
if len(self.predicted_15dars) >= 0:
last_1_day = self.predicted_15dars[-1]
buy = 0
sell = 0
if last_1_day > 0.01:
buy = 1
sell = 0
elif last_1_day < -0.005:
buy = 0
sell = 1
# Getting importance of prediction from custom importance function
importance = abs(apply_importance_function(last_1_day))
importance += 0.4
if importance < 0.51:
importance = 0.51
elif importance > 1:
importance = 0.99
# Getting bet size based on importance and modified kelly kriterion
kelly_fraction = (1 * abs(importance - (1 - importance))) / 1
if kelly_fraction > 0.8:
kelly_fraction = 0.8
elif kelly_fraction < 0:
kelly_fraction = 0
# Trading based on strategy
if (buy == 1):
self.SetHoldings("BTCUSD", kelly_fraction * 0.8)
self.SetHoldings("ETHUSD", kelly_fraction * 0.2)
# self.SetHoldings("SOLUSD", kelly_fraction * 0.2)
self.bought_btc = True
self.shorted_btc = False
elif (sell == 1):
# We take less risk when going short
self.SetHoldings("BTCUSD", -0.5 * kelly_fraction * 0.8)
self.SetHoldings("ETHUSD", -0.5 * kelly_fraction * 0.2)
# self.SetHoldings("SOLUSD", -0.5 * kelly_fraction * 0.2)
self.bought_btc = False
self.shorted_btc = True
self.debug(f"time: {self.time}, pred: {preds[0]}, importance: {importance}, kelly_fraction: {kelly_fraction}")
# Liquidate if portfolio value falls for nine consecutive days - Risk management 2
if (self.Portfolio.TotalPortfolioValue < self.old_portfolio_value) and not self.first_run:
self.days_portfolio_decline += 1
else:
self.days_portfolio_decline = 0
if self.days_portfolio_decline >= 9:
self.Liquidate()
self.old_portfolio_value = self.Portfolio.TotalPortfolioValue
current_date = self.Time
# Check if it's the last day of the backtest
if current_date.date() >= self.EndDate.date():
self.Liquidate() # Liquidate all positions# region imports
from AlgorithmImports import *
from transformers import AutoTokenizer
from transformers import pipeline
import joblib
import talib
from helper_function import *
import pytz
import pandas as pd
# endregion
class BuyAndHoldBitcoinEthAlgorithm(QCAlgorithm):
def Initialize(self):
# INS in-sample
# self.SetStartDate(2022, 4, 10)
# self.SetEndDate(2023,3,31)
# Out of Sample (OOS) 1
self.SetStartDate(2021, 5, 14)
# self.SetEndDate(2021, 8, 10)
# OOS 2
# self.SetStartDate(2021, 9, 10)
self.SetEndDate(2021, 12, 10)
self.SetCash(10000000)
# Add Bitcoin data
self.AddCrypto("BTCUSD", Resolution.Daily)
self.AddCrypto("ETHUSD", Resolution.Daily)
# Flag to check if Bitcoin has been bought
self.bought_btc = False
def OnData(self, data):
if not self.bought_btc:
# Buy Bitcoin & ETH 80/20
self.SetHoldings("BTCUSD", 0.8)
self.SetHoldings("ETHUSD", 0.2)
self.bought_btc = True
# Check if it's the last day of the backtest
if self.Time.date >= self.EndDate.date():
self.Liquidate() # Liquidate all positions
# # region imports
from AlgorithmImports import *
from QuantConnect.DataSource import *
class CoinAPIDataAlgorithm(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2014, 1, 1)
self.SetEndDate(2024, 4, 20)
self.SetCash(100000)
# Coinbase accepts Cash account type only, AccountType.Margin will result in an exception.
self.SetBrokerageModel(BrokerageName.Coinbase, AccountType.Cash)
self.symbol = self.AddCrypto("BTCUSD", Resolution.Daily, Market.Coinbase).Symbol
def OnData(self, slice: Slice) -> None:
if self.symbol in slice.Bars:
trade_bar = slice.Bars[self.symbol]
self.Debug(f"{self.symbol} close at {slice.Time}: {trade_bar.Close}")
# if self.symbol in slice.QuoteBars:
# quote_bar = slice.QuoteBars[self.symbol]
# self.Debug(f"{self.symbol} bid at {slice.Time}: {quote_bar.Bid.Close}")
# if self.symbol in slice.Ticks:
# ticks = slice.Ticks[self.symbol]
# for tick in ticks:
# self.Debug(f"{self.symbol} price at {slice.Time}: {tick.Price}")
#region imports
from AlgorithmImports import *
from transformers import AutoTokenizer
from transformers import pipeline
import joblib
import talib
#endregion
def get_news_count_d(df, label) -> int:
"""Calculate the count of news articles for a specific label."""
news_count = (
df[df["label"] == label].groupby(["date_dt"]).count()["News"].reset_index()
)
news_count.columns = ["date_dt", f"{label}_count"]
return news_count
# function to get average sentiment score
def get_avg_sentiment_d(df, label) -> float:
"""Calculate the average sentiment score for a specific label."""
avg_sentiment = (
df[df["label"] == label].groupby(["date_dt"])["score"].mean().reset_index()
)
avg_sentiment.columns = ["date_dt", f"{label}_avg_score"]
return avg_sentiment
# making a function to return setiement
def get_bert_sentiment(pipe, news_description) -> pd.Series:
"""Get sentiment using FINBERT"""
try:
sent = pipe(news_description) # Get sentiment using BERT pipeline
label = sent[0]['label'] # Extract sentiment label
score = sent[0]['score'] # Extract sentiment score
# attempt at making a consolidated label
if label == 'positive':
combined_label = 1
elif label == 'negative':
combined_label = -1
else:
combined_label = 0
# Return sentiment information
return pd.Series([label, score, combined_label])
except IndexError:
# Return for bad data
return pd.Series(['BAD Data', 0, 0])
def process_coinbase(df) -> pd.DataFrame:
"""Process data from Coinbase."""
coinbase = df
# Drop the null values
coinbase = coinbase.dropna(subset=['articleid', 'description'])
# Drop if News is ' '
coinbase = coinbase[coinbase['description'] != ' ']
coinbase = coinbase[['articleid', 'description']]
coinbase.drop_duplicates(subset=['articleid', 'description'], inplace=True)
# turn all News to lowercase
coinbase['description'] = coinbase['description'].str.lower()
# only keep row if bitcoin or btc is in string
bitcoin = coinbase[coinbase['description'].str.contains('bitcoin|btc')]
# Return the dataframe
return coinbase, bitcoin
def compute_momentum_indicators(df) -> pd.DataFrame:
"""Compute momentum indicators."""
# Prepare data
close = df['close'].values
high = df['high'].values
low = df['low'].values
open1 = df['open'].values
volume = df['volume'].values
# Initialize dictionary to hold results
results = {}
# Compute momentum indicators
results['ADX'] = talib.ADX(high, low, close, timeperiod=14)
results['APO'] = talib.APO(close, fastperiod=12, slowperiod=26, matype=0)
results['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
results['BOP'] = talib.BOP(open1, high, low, close)
results['CCI'] = talib.CCI(high, low, close, timeperiod=14)
results['CMO'] = talib.CMO(close, timeperiod=14)
results['MACD'], results['MACDSIGNAL'], results['MACDHIST'] = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
results['MFI'] = talib.MFI(high, low, close, volume, timeperiod=14)
results['MOM'] = talib.MOM(close, timeperiod=10)
results['RSI'] = talib.RSI(close, timeperiod=14)
results['STOCH_FASTK'], results['STOCH_FASTD'] = talib.STOCHF(high, low, close, fastk_period=5, fastd_period=3, fastd_matype=0)
results['STOCHRSI_K'], results['STOCHRSI_D'] = talib.STOCHRSI(close, timeperiod=14, fastk_period=5, fastd_period=3, fastd_matype=0)
results['TRIX'] = talib.TRIX(close, timeperiod=30)
results['ULTOSC'] = talib.ULTOSC(high, low, close, timeperiod1=7, timeperiod2=14, timeperiod3=28)
results['WILLR'] = talib.WILLR(high, low, close, timeperiod=14)
# Convert dictionary to DataFrame
results_df = pd.DataFrame(results)
results_df.index = df.index
return results_df
def compute_extended_volume_indicators(df) -> pd.DataFrame:
"""Get extended volumne indicators"""
# Ensure numerical columns are floats
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
volume = df['volume'].astype(float).values # Ensure volume is float for calculations
open_ = df['open'].astype(float).values # Add open prices for indicators that might use it
# Initialize dictionary to hold results
results = {}
# Compute original volume indicators
results['AD'] = talib.AD(high, low, close, volume)
results['ADOSC'] = talib.ADOSC(high, low, close, volume, fastperiod=3, slowperiod=10)
results['OBV'] = talib.OBV(close, volume)
results_df = pd.DataFrame(results)
results_df.index = df.index
return results_df
def compute_trend_indicators(df) -> pd.DataFrame:
"""Compute trend indicators"""
# Ensure all necessary columns are floats
open_ = df['open'].astype(float).values
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
volume = df['volume'].astype(float).values # Necessary for MAMA
# Initialize dictionary to hold results
results = {}
# Moving Averages
results['SMA'] = talib.SMA(close, timeperiod=14)
results['EMA'] = talib.EMA(close, timeperiod=14)
results['WMA'] = talib.WMA(close, timeperiod=14)
results['DEMA'] = talib.DEMA(close, timeperiod=14)
results['TEMA'] = talib.TEMA(close, timeperiod=14)
results['TRIMA'] = talib.TRIMA(close, timeperiod=14)
results['KAMA'] = talib.KAMA(close, timeperiod=14)
mama, fama = talib.MAMA(close, fastlimit=0.5, slowlimit=0.05) # MAMA returns MAMA and FAMA
results['MAMA'] = mama
results['FAMA'] = fama
results['T3'] = talib.T3(close, timeperiod=14, vfactor=0.7)
# Directional Movement Indicators
results['ADX'] = talib.ADX(high, low, close, timeperiod=14)
results['PLUS_DI'] = talib.PLUS_DI(high, low, close, timeperiod=14)
results['MINUS_DI'] = talib.MINUS_DI(high, low, close, timeperiod=14)
# Others
results['SAR'] = talib.SAR(high, low, acceleration=0.02, maximum=0.2)
results['AROON_DOWN'], results['AROON_UP'] = talib.AROON(high, low, timeperiod=14)
results['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
# results['VI_PLUS'], results['VI_MINUS'] = talib.VI(high, low, close, timeperiod=14)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_volatility_indicators(df) -> pd.DataFrame:
"""Compute volatility indicators"""
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Bollinger Bands
results['BBANDS_UPPER'], results['BBANDS_MIDDLE'], results['BBANDS_LOWER'] = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
# Average True Range
results['ATR'] = talib.ATR(high, low, close, timeperiod=14)
# Normalized Average True Range
results['NATR'] = talib.NATR(high, low, close, timeperiod=14)
# True Range
results['TRANGE'] = talib.TRANGE(high, low, close)
# Chandelier Exit (custom calculation, not a direct TA-Lib function)
# Typically uses a 22-day period and a multiplier of 3 times the ATR
atr_22 = talib.ATR(high, low, close, timeperiod=22)
highest_high_22 = talib.MAX(high, timeperiod=22)
lowest_low_22 = talib.MIN(low, timeperiod=22)
results['CHANDELIER_EXIT_LONG'] = highest_high_22 - (atr_22 * 3)
results['CHANDELIER_EXIT_SHORT'] = lowest_low_22 + (atr_22 * 3)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_price_transform_indicators(df) -> pd.DataFrame:
"""Get price transform indicators"""
# Ensure all necessary columns are floats
open_ = df['open'].astype(float).values
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Weighted Close Price
results['WCLPRICE'] = talib.WCLPRICE(high, low, close)
# Typical Price
results['TYPPRICE'] = talib.TYPPRICE(high, low, close)
# Median Price
results['MEDPRICE'] = talib.MEDPRICE(high, low)
# Price Rate of Change
results['ROC'] = talib.ROC(close, timeperiod=10)
# Average Price
results['AVGPRICE'] = talib.AVGPRICE(open_, high, low, close)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_cycle_indicators(df) -> pd.DataFrame:
"""Gey cycle indicators"""
# Ensure 'close' column is a float
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Hilbert Transform - Dominant Cycle Period
results['HT_DCPERIOD'] = talib.HT_DCPERIOD(close)
# Hilbert Transform - Dominant Cycle Phase
results['HT_DCPHASE'] = talib.HT_DCPHASE(close)
# Hilbert Transform - Phasor Components
results['HT_PHASOR_inphase'], results['HT_PHASOR_quadrature'] = talib.HT_PHASOR(close)
# Hilbert Transform - SineWave
results['HT_SINE'], results['HT_LEADSINE'] = talib.HT_SINE(close)
# Hilbert Transform - Trend vs Cycle Mode
results['HT_TRENDMODE'] = talib.HT_TRENDMODE(close)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def technical_indicators(df) -> pd.DataFrame:
"""Compute various technical indicators."""
all_results = pd.concat([compute_momentum_indicators(df),
compute_trend_indicators(df),
compute_price_transform_indicators(df),
compute_volatility_indicators(df),
compute_cycle_indicators(df)
], axis=1
)
return all_results
def get_sentiment(object_class, bitcoin_news, coinbase_news):
"""Get sentiment assigned by Finbert"""
if len(bitcoin_news) != 0:
bitcoin_news = bitcoin_news.rename(columns={'description': 'News'})
bitcoin_news['News'] = bitcoin_news['News'].apply(lambda x: object_class.tokenizer(x, truncation=True, max_length=512)['input_ids'])
bitcoin_news['News'] = bitcoin_news['News'].apply(lambda x: object_class.tokenizer.decode(x))
bitcoin_news[['label', 'score', 'combined_label']] = bitcoin_news['News'].apply(lambda x: get_bert_sentiment(object_class.pipe, x))
bitcoin_news['Date'] = pd.to_datetime(bitcoin_news.index, format='mixed', utc=True)
# make a new column 'date' and only keep the day from the 'Date' column
bitcoin_news["date_dt"] = bitcoin_news['Date'].dt.date
bitcoin_news["date_hr"] = bitcoin_news['Date'].dt.hour
if len(coinbase_news) != 0:
coinbase_news = coinbase_news.rename(columns={'description': 'News'})
coinbase_news['News'] = coinbase_news['News'].apply(lambda x: object_class.tokenizer(x, truncation=True, max_length=512)['input_ids'])
coinbase_news['News'] = coinbase_news['News'].apply(lambda x: object_class.tokenizer.decode(x))
coinbase_news[['label', 'score', 'combined_label']] = coinbase_news['News'].apply(lambda x: get_bert_sentiment(object_class.pipe, x))
coinbase_news['Date'] = pd.to_datetime(coinbase_news.index, format='mixed', utc=True)
# make a new column 'date' and only keep the day from the 'Date' column
coinbase_news["date_dt"] = coinbase_news['Date'].dt.date
coinbase_news["date_hr"] = coinbase_news['Date'].dt.hour
return bitcoin_news, coinbase_news
def get_sentiment_scores(bitcoin_news, coinbase_news):
"""Get latest day Sentiment Scores"""
# Get the news count for each label
pn = get_news_count_d(bitcoin_news, "positive")
nn = get_news_count_d(bitcoin_news, "negative")
neun = get_news_count_d(bitcoin_news, "neutral")
# Get the average sentiment score for each label
pn_avg = get_avg_sentiment_d(bitcoin_news, "positive")
nn_avg = get_avg_sentiment_d(bitcoin_news, "negative")
neun_avg = get_avg_sentiment_d(bitcoin_news, "neutral")
# # Merge the dataframes
merged_d = pd.merge(pn, nn, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, neun, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, pn_avg, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, nn_avg, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, neun_avg, on="date_dt", how="outer")
# Fill NaN values with 0
merged_d = merged_d.fillna(0)
# total news count
merged_d["total_news_count"] = (
merged_d["positive_count"] + merged_d["negative_count"] + merged_d["neutral_count"]
)
merged_d["total_news_score"] = (
merged_d["positive_avg_score"] * merged_d["positive_count"]
) - (merged_d["negative_avg_score"] * merged_d["negative_count"])
merged_d["signal"] = (merged_d["positive_count"] - merged_d["negative_count"]) / (
merged_d["positive_count"] + merged_d["negative_count"]
)
merged_d.columns = [f"btc_{c}" for c in merged_d.columns]
# Get the news count for each label
pn = get_news_count_d(coinbase_news, "positive")
nn = get_news_count_d(coinbase_news, "negative")
neun = get_news_count_d(coinbase_news, "neutral")
# Get the average sentiment score for each label
pn_avg = get_avg_sentiment_d(coinbase_news, "positive")
nn_avg = get_avg_sentiment_d(coinbase_news, "negative")
neun_avg = get_avg_sentiment_d(coinbase_news, "neutral")
# Merge the dataframes
merged_dc = pd.merge(pn, nn, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, neun, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, pn_avg, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, nn_avg, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, neun_avg, on="date_dt", how="outer")
# Fill NaN values with 0
merged_dc = merged_dc.fillna(0)
# total news count
merged_dc["total_news_count"] = (
merged_dc["positive_count"]
+ merged_dc["negative_count"]
+ merged_dc["neutral_count"]
)
merged_dc["total_news_score"] = (
merged_dc["positive_avg_score"] * merged_dc["positive_count"]
) - (merged_dc["negative_avg_score"] * merged_dc["negative_count"])
merged_dc["signal"] = (merged_dc["positive_count"] - merged_dc["negative_count"]) / (
merged_dc["positive_count"] + merged_dc["negative_count"]
)
merged_dc.columns = [f"coinbase_{c}" for c in merged_dc.columns]
coinbase_neutral_count_rolling_10 = merged_dc['coinbase_neutral_count'].rolling(10).sum().values[-1]
coinbase_positive_avg_score_rolling_10 = merged_dc['coinbase_positive_avg_score'].rolling(10).sum().values[-1]
coinbase_positive_avg_score_rolling_30 = merged_dc['coinbase_positive_avg_score'].rolling(30).sum().values[-1]
btc_total_news_score_rolling_30 = merged_d['btc_total_news_score'].rolling(30).sum().values[-1]
coinbase_total_news_score = merged_dc['coinbase_total_news_score'].values[-1]
return (coinbase_neutral_count_rolling_10, coinbase_positive_avg_score_rolling_10, coinbase_positive_avg_score_rolling_30,
btc_total_news_score_rolling_30, coinbase_total_news_score)
def apply_importance_function(x):
"""Applies importance based on piecewise function"""
if x < -0.05:
return -1
elif -0.05 <= x < -0.004:
return 16.433*(x) - 0.1777
elif -0.004 <= x < 0:
return 60 * x
elif 0 <= x < 0.01:
return 40 * x
elif 0.01 <= x <= 0.05:
return 14.975*(x) + 0.25
else:
return 1# region imports
from AlgorithmImports import *
from transformers import AutoTokenizer
from transformers import pipeline
import joblib
import talib
from helper_function import *
import pytz
import pandas as pd
# endregion
class MySlippageModel:
"""Class that defines our slippage model"""
def GetSlippageApproximation(self, asset: Security, order: Order) -> float:
"""Slippage formula"""
slippage = asset.Price * 0.0001 * np.log10(2*float(order.AbsoluteQuantity))
return slippage
class FatGreenHorse(QCAlgorithm):
"""Main Algo Class"""
def Initialize(self):
# INS in-sample
# self.SetStartDate(2022, 4, 10)
# self.SetEndDate(2023,3,31)
# Out of Sample (OOS) 1
self.SetStartDate(2021, 5, 14)
# self.SetEndDate(2021, 8, 10)
# OOS 2
# self.SetStartDate(2021, 9, 10)
self.SetEndDate(2021, 12, 10)
self.SetCash(10000000) # Setting initial Cash
self.SetWarmUp(30) # Warm up for 30 days
# Adding instruments
self.AddEquity("SPY", Resolution.Daily)
self.btc_security = self.AddCrypto("BTCUSD", Resolution.Daily)
self.btc_symbol = self.btc_security.symbol
self.eth_security = self.AddCrypto("ETHUSD", Resolution.Daily)
self.sol_security = self.AddCrypto("SOLUSD", Resolution.Daily)
self.symbol = self.AddEquity("coin", Resolution.Daily).Symbol
# Slippage (uncomment for slippage)
self.btc_security.SetSlippageModel(MySlippageModel())
# Adding data sources
self.dataset_symbol = self.AddData(TiingoNews, self.symbol, Resolution.Daily).Symbol
self.treas = self.add_data(USTreasuryYieldCurveRate, "USTYCR", Resolution.Daily).symbol
self.vix = self.add_data(CBOE, "VIX", Resolution.Daily).symbol
# setting up FINBERT
bert_res = self.set_up_bert()
self.pipe = bert_res[0]
self.tokenizer = bert_res[1]
self.signal_model = self.load_rf_model()
# Initializing empty DF for Sentiment scores
self.btc_sentiment_df = pd.DataFrame()
self.coin_sentiment_df = pd.DataFrame()
# Initiliazing list to store preds
self.predicted_15dars = []
# Initializing runtime flags
self.first_run = True
self.bought_btc = False
self.shorted_btc = False
self.old_portfolio_value = 0
self.days_portfolio_decline = 0
def set_up_bert(self) -> tuple:
"""Loads FINBERT from QC object store"""
path = self.ObjectStore.GetFilePath("mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis")
pipe = pipeline("text-classification", model=path)
self.debug(f'{pipe("bitcoin pushes to fresh record high after breaching $62,000 yesterday")}')
tokenizer = AutoTokenizer.from_pretrained(path)
return pipe, tokenizer
def load_rf_model(self):
"""Loads trained Random Forest Model from QC object store"""
path = self.ObjectStore.GetFilePath("group_4_crypto_trading_with_sentiment_sprin_2024/random_forest_model.pkl")
return joblib.load(path)
def OnData(self, data: Slice):
if self.IsWarmingUp:
return
# Risk management 1 - If value drops by more than 5% from previous OnData call, Liquidate
if self.portfolio.TotalPortfolioValue < self.old_portfolio_value * 0.95:
self.debug(f"Liquidating at: {self.time}")
self.Liquidate()
# Check if data contains BTCUSD info
if data.ContainsKey("BTCUSD"):
# Getting necessary historical BTC data for passing to our RF ML model
df = self.History(self.btc_symbol, 37).droplevel(0)
# Extracting technical indicators
ti = technical_indicators(df)
stoch_fastk = ti['STOCH_FASTK'].values[-1]
stoch_fastd = ti['STOCH_FASTD'].values[-1]
stoch_fast_d_rolling_10 = ti['STOCH_FASTD'].rolling(10).sum().values[-1]
aroonosc = ti['AROONOSC'].values[-1][-1]
mfi = ti['MFI'].values[-1]
roc = ti['ROC'].values[-1]
rsi = ti['RSI'].values[-1]
roc_rolling_10 = ti['RSI'].rolling(10).sum().values[-1]
willr = ti['WILLR'].values[-1]
mom_rolling_10 = ti['MOM'].rolling(10).sum().values[-1]
natr = ti['NATR'].values[-1]
mom = ti['MOM'].values[-1]
cmo = ti['CMO'].values[-1]
willr_rolling_10 = ti['WILLR'].rolling(10).sum().values[-1]
macdhist = ti['MACDHIST'].values[-1]
stoch_fastd_rolling_10 = ti['STOCH_FASTD'].rolling(10).sum().values[-1]
plus_di_rolling_10 = ti['PLUS_DI'].rolling(10).sum().values[-1]
macdhist_rolling_10 = ti['MACDHIST'].rolling(10).sum().values[-1]
plus_di = ti['PLUS_DI'].values[-1]
stoch_fastk_rolling_10 = ti['STOCH_FASTK'].rolling(10).sum().values[-1]
cci = ti['CCI'].values[-1]
ULTOSC_rolling_10 = ti['ULTOSC'].rolling(10).sum().values[-1]
ultosc = ti['ULTOSC'].values[-1]
minus_di = ti['MINUS_DI'].values[-1]
mfi_rolling_10 = ti['MFI'].rolling(10).sum().values[-1]
macd = ti['MACD'].values[-1]
cci_rolling_10 = ti['CCI'].rolling(10).sum().values[-1]
ht_phasor_quad_rolling_10 = ti['HT_PHASOR_quadrature'].rolling(10).sum().values[-1]
trange_rolling_10 = ti['TRANGE'].rolling(10).sum().values[-1]
ht_dcphase = ti['HT_DCPHASE'].values[-1]
# Getting last 37 days of News data from TIINGO
history_coin = process_coinbase(self.History(self.dataset_symbol, 37, Resolution.Daily).droplevel(0))
bitcoin_news = history_coin[1]
coinbase_news = history_coin[0]
# Filtering new news data since the last day
if not self.first_run:
new_btc_news = bitcoin_news[bitcoin_news.index > self.last_day.replace(tzinfo=pytz.UTC)]
new_coin_news = coinbase_news[coinbase_news.index > self.last_day.replace(tzinfo=pytz.UTC)]
self.last_day = self.Time
else:
new_btc_news = bitcoin_news
new_coin_news = coinbase_news
self.last_day = self.Time
self.first_run = False
# Analyzing sentiment of new news data
new_btc_news, new_coin_news = get_sentiment(self, new_btc_news, new_coin_news)
# Concatenating new news data with existing data
if new_btc_news.shape[0] > 0:
self.btc_sentiment_df = pd.concat([self.btc_sentiment_df, new_btc_news], ignore_index=True)
if new_btc_news.shape[0] > 0:
self.coin_sentiment_df = pd.concat([self.coin_sentiment_df, new_coin_news], ignore_index=True)
# Calculating sentiment scores
sentiment_scores = get_sentiment_scores(self.btc_sentiment_df, self.coin_sentiment_df)
coinbase_neutral_count_rolling_10 = sentiment_scores[0]
coinbase_positive_avg_score_rolling_10 = sentiment_scores[1]
coinbase_positive_avg_score_rolling_30 = sentiment_scores[2]
btc_total_news_score_rolling_30 = sentiment_scores[3]
coinbase_total_news_score = sentiment_scores[4]
# Creating dataframe with feature order for model prediction
indicators_order = [aroonosc, roc_rolling_10, mom_rolling_10, roc, mfi, willr, rsi, cmo,
mom, natr, willr_rolling_10, macdhist, stoch_fast_d_rolling_10,
plus_di, plus_di_rolling_10, stoch_fastk_rolling_10, cci,
stoch_fastk, macdhist_rolling_10, btc_total_news_score_rolling_30,
ultosc, coinbase_positive_avg_score_rolling_10, stoch_fastd,
ht_phasor_quad_rolling_10, coinbase_total_news_score,
coinbase_neutral_count_rolling_10, minus_di, trange_rolling_10,
coinbase_positive_avg_score_rolling_30, ht_dcphase]
df = pd.DataFrame([indicators_order], columns=['AROONOSC', 'ROC_rolling_10', 'MOM_rolling_10', 'ROC',
'MFI', 'WILLR', 'RSI', 'CMO', 'MOM', 'NATR',
'WILLR_rolling_10', 'MACDHIST', 'STOCH_FASTD_rolling_10',
'PLUS_DI', 'PLUS_DI_rolling_10', 'STOCH_FASTK_rolling_10',
'CCI', 'STOCH_FASTK', 'MACDHIST_rolling_10',
'btc_total_news_score_rolling_30', 'ULTOSC',
'coinbase_positive_avg_score_rolling_10', 'STOCH_FASTD',
'HT_PHASOR_quadrature_rolling_10', 'coinbase_total_news_score',
'coinbase_neutral_count_rolling_10', 'MINUS_DI', 'TRANGE_rolling_10',
'coinbase_positive_avg_score_rolling_30', 'HT_DCPHASE'])
# using the above features to get our signal for tomorrow
preds = self.signal_model.predict(df)
self.predicted_15dars.append(preds[0])
# Implementing main strategy based on predicted signal
if len(self.predicted_15dars) >= 0:
last_1_day = self.predicted_15dars[-1]
buy = 0
sell = 0
if last_1_day > 0.01:
buy = 1
sell = 0
elif last_1_day < -0.005:
buy = 0
sell = 1
# Getting importance of prediction from custom importance function
importance = abs(apply_importance_function(last_1_day))
importance += 0.4
if importance < 0.51:
importance = 0.51
elif importance > 1:
importance = 0.99
# Getting bet size based on importance and modified kelly kriterion
kelly_fraction = (1 * abs(importance - (1 - importance))) / 1
if kelly_fraction > 0.8:
kelly_fraction = 0.8
elif kelly_fraction < 0:
kelly_fraction = 0
# Trading based on strategy
if (buy == 1):
self.SetHoldings("BTCUSD", kelly_fraction * 0.8)
self.SetHoldings("ETHUSD", kelly_fraction * 0.2)
# self.SetHoldings("SOLUSD", kelly_fraction * 0.2)
self.bought_btc = True
self.shorted_btc = False
elif (sell == 1):
# We take less risk when going short
self.SetHoldings("BTCUSD", -0.5 * kelly_fraction * 0.8)
self.SetHoldings("ETHUSD", -0.5 * kelly_fraction * 0.2)
# self.SetHoldings("SOLUSD", -0.5 * kelly_fraction * 0.2)
self.bought_btc = False
self.shorted_btc = True
self.debug(f"time: {self.time}, pred: {preds[0]}, importance: {importance}, kelly_fraction: {kelly_fraction}")
# Liquidate if portfolio value falls for nine consecutive days - Risk management 2
if (self.Portfolio.TotalPortfolioValue < self.old_portfolio_value) and not self.first_run:
self.days_portfolio_decline += 1
else:
self.days_portfolio_decline = 0
if self.days_portfolio_decline >= 9:
self.Liquidate()
self.old_portfolio_value = self.Portfolio.TotalPortfolioValue
current_date = self.Time
# Check if it's the last day of the backtest
if current_date.date() >= self.EndDate.date():
self.Liquidate() # Liquidate all positions# region imports
from AlgorithmImports import *
from transformers import AutoTokenizer
from transformers import pipeline
import joblib
import talib
# endregion
def get_news_count_d(df, label):
news_count = (
df[df["label"] == label].groupby(["date_dt"]).count()["description"].reset_index()
)
news_count.columns = ["date_dt", f"{label}_count"]
return news_count
# function to get average sentiment score
def get_avg_sentiment_d(df, label):
avg_sentiment = (
df[df["label"] == label].groupby(["date_dt"])["score"].mean().reset_index()
)
avg_sentiment.columns = ["date_dt", f"{label}_avg_score"]
return avg_sentiment
# making a function to return setiement
def get_bert_sentiment(pipe, news_description):
try:
sent = pipe(news_description)
label = sent[0]['label']
score = sent[0]['score']
# attempt at making a consolidated label
if label == 'positive':
combined_label = 1
elif label == 'negative':
combined_label = -1
else:
combined_label = 0
return pd.Series([label, score, combined_label])
except IndexError:
return pd.Series(['BAD Data', 0, 0])
def process_coinbase(df) -> pd.DataFrame:
coinbase = df
# Drop the null values
coinbase = coinbase.dropna(subset=['articleid', 'description'])
# Drop if News is ' '
coinbase = coinbase[coinbase['description'] != ' ']
coinbase = coinbase[['articleid', 'description']]
coinbase.drop_duplicates(subset=['articleid', 'description'], inplace=True)
# turn all News to lowercase
coinbase['description'] = coinbase['description'].str.lower()
# only keep row if bitcoin or btc is in string
bitcoin = coinbase[coinbase['description'].str.contains('bitcoin|btc')]
# Return the dataframe
return coinbase, bitcoin
def compute_momentum_indicators(df):
# Prepare data
close = df['close'].values
high = df['high'].values
low = df['low'].values
open1 = df['open'].values
volume = df['volume'].values
# Initialize dictionary to hold results
results = {}
# Compute momentum indicators
results['ADX'] = talib.ADX(high, low, close, timeperiod=14)
results['APO'] = talib.APO(close, fastperiod=12, slowperiod=26, matype=0)
results['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
results['BOP'] = talib.BOP(open1, high, low, close)
results['CCI'] = talib.CCI(high, low, close, timeperiod=14)
results['CMO'] = talib.CMO(close, timeperiod=14)
results['MACD'], results['MACDSIGNAL'], results['MACDHIST'] = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
results['MFI'] = talib.MFI(high, low, close, volume, timeperiod=14)
results['MOM'] = talib.MOM(close, timeperiod=10)
results['RSI'] = talib.RSI(close, timeperiod=14)
results['STOCH_FASTK'], results['STOCH_FASTD'] = talib.STOCHF(high, low, close, fastk_period=5, fastd_period=3, fastd_matype=0)
results['STOCHRSI_K'], results['STOCHRSI_D'] = talib.STOCHRSI(close, timeperiod=14, fastk_period=5, fastd_period=3, fastd_matype=0)
results['TRIX'] = talib.TRIX(close, timeperiod=30)
results['ULTOSC'] = talib.ULTOSC(high, low, close, timeperiod1=7, timeperiod2=14, timeperiod3=28)
results['WILLR'] = talib.WILLR(high, low, close, timeperiod=14)
# Convert dictionary to DataFrame
results_df = pd.DataFrame(results)
results_df.index = df.index
return results_df
def compute_extended_volume_indicators(df):
# Ensure numerical columns are floats
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
volume = df['volume'].astype(float).values # Ensure volume is float for calculations
open_ = df['open'].astype(float).values # Add open prices for indicators that might use it
# Initialize dictionary to hold results
results = {}
# Compute original volume indicators
results['AD'] = talib.AD(high, low, close, volume)
results['ADOSC'] = talib.ADOSC(high, low, close, volume, fastperiod=3, slowperiod=10)
results['OBV'] = talib.OBV(close, volume)
results_df = pd.DataFrame(results)
results_df.index = df.index
return results_df
def compute_trend_indicators(df):
# Ensure all necessary columns are floats
open_ = df['open'].astype(float).values
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
volume = df['volume'].astype(float).values # Necessary for MAMA
# Initialize dictionary to hold results
results = {}
# Moving Averages
results['SMA'] = talib.SMA(close, timeperiod=14)
results['EMA'] = talib.EMA(close, timeperiod=14)
results['WMA'] = talib.WMA(close, timeperiod=14)
results['DEMA'] = talib.DEMA(close, timeperiod=14)
results['TEMA'] = talib.TEMA(close, timeperiod=14)
results['TRIMA'] = talib.TRIMA(close, timeperiod=14)
results['KAMA'] = talib.KAMA(close, timeperiod=14)
mama, fama = talib.MAMA(close, fastlimit=0.5, slowlimit=0.05) # MAMA returns MAMA and FAMA
results['MAMA'] = mama
results['FAMA'] = fama
results['T3'] = talib.T3(close, timeperiod=14, vfactor=0.7)
# Directional Movement Indicators
results['ADX'] = talib.ADX(high, low, close, timeperiod=14)
results['PLUS_DI'] = talib.PLUS_DI(high, low, close, timeperiod=14)
results['MINUS_DI'] = talib.MINUS_DI(high, low, close, timeperiod=14)
# Others
results['SAR'] = talib.SAR(high, low, acceleration=0.02, maximum=0.2)
results['AROON_DOWN'], results['AROON_UP'] = talib.AROON(high, low, timeperiod=14)
results['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
# results['VI_PLUS'], results['VI_MINUS'] = talib.VI(high, low, close, timeperiod=14)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_volatility_indicators(df):
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Bollinger Bands
results['BBANDS_UPPER'], results['BBANDS_MIDDLE'], results['BBANDS_LOWER'] = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
# Average True Range
results['ATR'] = talib.ATR(high, low, close, timeperiod=14)
# Normalized Average True Range
results['NATR'] = talib.NATR(high, low, close, timeperiod=14)
# True Range
results['TRANGE'] = talib.TRANGE(high, low, close)
# Chandelier Exit (custom calculation, not a direct TA-Lib function)
# Typically uses a 22-day period and a multiplier of 3 times the ATR
atr_22 = talib.ATR(high, low, close, timeperiod=22)
highest_high_22 = talib.MAX(high, timeperiod=22)
lowest_low_22 = talib.MIN(low, timeperiod=22)
results['CHANDELIER_EXIT_LONG'] = highest_high_22 - (atr_22 * 3)
results['CHANDELIER_EXIT_SHORT'] = lowest_low_22 + (atr_22 * 3)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_price_transform_indicators(df):
# Ensure all necessary columns are floats
open_ = df['open'].astype(float).values
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Weighted Close Price
results['WCLPRICE'] = talib.WCLPRICE(high, low, close)
# Typical Price
results['TYPPRICE'] = talib.TYPPRICE(high, low, close)
# Median Price
results['MEDPRICE'] = talib.MEDPRICE(high, low)
# Price Rate of Change
results['ROC'] = talib.ROC(close, timeperiod=10)
# Average Price
results['AVGPRICE'] = talib.AVGPRICE(open_, high, low, close)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_cycle_indicators(df):
# Ensure 'close' column is a float
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Hilbert Transform - Dominant Cycle Period
results['HT_DCPERIOD'] = talib.HT_DCPERIOD(close)
# Hilbert Transform - Dominant Cycle Phase
results['HT_DCPHASE'] = talib.HT_DCPHASE(close)
# Hilbert Transform - Phasor Components
results['HT_PHASOR_inphase'], results['HT_PHASOR_quadrature'] = talib.HT_PHASOR(close)
# Hilbert Transform - SineWave
results['HT_SINE'], results['HT_LEADSINE'] = talib.HT_SINE(close)
# Hilbert Transform - Trend vs Cycle Mode
results['HT_TRENDMODE'] = talib.HT_TRENDMODE(close)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def technical_indicators(df):
all_results = pd.concat([compute_momentum_indicators(df),
compute_trend_indicators(df),
compute_price_transform_indicators(df),
compute_volatility_indicators(df),
compute_cycle_indicators(df)
], axis=1
)
return all_results
class FatGreenHorse(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 5, 16)
self.SetCash(100000)
self.SetWarmUp(30) # Warm up for 30 days
self.is_warmup_complete = False
self.AddEquity("SPY", Resolution.Daily)
self.btc_symbol = self.AddCrypto("BTCUSD", Resolution.Daily).symbol
self.symbol = self.AddEquity("coin", Resolution.Daily).Symbol
self.dataset_symbol = self.AddData(TiingoNews, self.symbol, Resolution.Daily).Symbol
self.treas = self.add_data(USTreasuryYieldCurveRate, "USTYCR", Resolution.Daily).symbol
self.vix = self.add_data(CBOE, "VIX", Resolution.Daily).symbol
bert_res = self.set_up_bert()
self.pipe = bert_res[0]
self.tokenizer = bert_res[1]
self.signal_model = self.load_rf_model()
self.cached_coinbase_data = None
self.cached_bitcoin_news = None
self.cached_coinbase_news = None
self.cached_bitcoin_news_sent = None
self.cached_coinbase_news_sent = None
def set_up_bert(self):
path = self.ObjectStore.GetFilePath("mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis")
pipe = pipeline("text-classification", model=path)
self.debug(f'{pipe("bitcoin pushes to fresh record high after breaching $62,000 yesterday")}')
tokenizer = AutoTokenizer.from_pretrained(path)
return pipe, tokenizer
def load_rf_model(self):
path = self.ObjectStore.GetFilePath("group_4_crypto_trading_with_sentiment_sprin_2024/random_forest_model.pkl")
return joblib.load(path)
def get_latest_data(self):
# Check if cached data is available
if self.cached_coinbase_data is None:
# Fetch and process latest data
coinbase_data = self.History(self.dataset_symbol, 37, Resolution.Daily).droplevel(0)
self.cached_coinbase_news, self.cached_bitcoin_news, = process_coinbase(coinbase_data)
self.cached_coinbase_data = coinbase_data
else:
# Fetch only new data since the last update
latest_data = self.History(self.dataset_symbol, 1, Resolution.Daily).droplevel(0)
if latest_data.index[-1] != self.cached_coinbase_data.index[-1]:
new_data = latest_data.loc[latest_data.index > self.cached_coinbase_data.index[-1]]
if not new_data.empty:
# Process new data and update cache
cached_coinbase_news_new, cached_bitcoin_news_new = process_coinbase(new_data)
self.cached_bitcoin_news = pd.concat([self.cached_bitcoin_news, cached_bitcoin_news_new])
self.cached_coinbase_news = pd.concat([self.cached_coinbase_news, cached_coinbase_news_new])
self.cached_coinbase_data = pd.concat([self.cached_coinbase_data, new_data])
return self.cached_bitcoin_news, self.cached_coinbase_news
def process_data(self, bitcoin_news, coinbase_news):
# Tokenize news descriptions
bitcoin_news['description'] = bitcoin_news['description'].apply(lambda x: self.tokenizer(x, truncation=True, max_length=512)['input_ids'])
bitcoin_news['description'] = bitcoin_news['description'].apply(lambda x: self.tokenizer.decode(x))
coinbase_news['description'] = coinbase_news['description'].apply(lambda x: self.tokenizer(x, truncation=True, max_length=512)['input_ids'])
coinbase_news['description'] = coinbase_news['description'].apply(lambda x: self.tokenizer.decode(x))
# Get sentiment analysis
bitcoin_news[['label', 'score', 'combined_label']] = bitcoin_news['description'].apply(lambda x: get_bert_sentiment(self.pipe, x))
coinbase_news[['label', 'score', 'combined_label']] = coinbase_news['description'].apply(lambda x: get_bert_sentiment(self.pipe, x))
# Extract date information
bitcoin_news['Date'] = pd.to_datetime(bitcoin_news.index, format='mixed', utc=True)
coinbase_news['Date'] = pd.to_datetime(coinbase_news.index, format='mixed', utc=True)
bitcoin_news["date_dt"] = bitcoin_news['Date'].dt.date
bitcoin_news["date_hr"] = bitcoin_news['Date'].dt.hour
coinbase_news["date_dt"] = coinbase_news['Date'].dt.date
coinbase_news["date_hr"] = coinbase_news['Date'].dt.hour
return bitcoin_news, coinbase_news
def compute_metrics(self, bitcoin_news, coinbase_news):
# Get news count and average sentiment scores
btc_news_counts = get_news_count_d(bitcoin_news, "positive"), get_news_count_d(bitcoin_news, "negative"), get_news_count_d(bitcoin_news, "neutral")
btc_avg_sentiments = get_avg_sentiment_d(bitcoin_news, "positive"), get_avg_sentiment_d(bitcoin_news, "negative"), get_avg_sentiment_d(bitcoin_news, "neutral")
coinbase_news_counts = get_news_count_d(coinbase_news, "positive"), get_news_count_d(coinbase_news, "negative"), get_news_count_d(coinbase_news, "neutral")
coinbase_avg_sentiments = get_avg_sentiment_d(coinbase_news, "positive"), get_avg_sentiment_d(coinbase_news, "negative"), get_avg_sentiment_d(coinbase_news, "neutral")
# Merge dataframes
merged_d = pd.concat([btc_news_counts[0], btc_news_counts[1], btc_news_counts[2], btc_avg_sentiments[0], btc_avg_sentiments[1], btc_avg_sentiments[2]], axis=1)
merged_d.columns = ['btc_positive_count', 'btc_negative_count', 'btc_neutral_count', 'btc_positive_avg_score', 'btc_negative_avg_score', 'btc_neutral_avg_score']
merged_d.fillna(0, inplace=True)
merged_d["btc_total_news_count"] = merged_d["btc_positive_count"] + merged_d["btc_negative_count"] + merged_d["btc_neutral_count"]
merged_d["btc_total_news_score"] = merged_d["btc_positive_avg_score"] * merged_d["btc_positive_count"] - merged_d["btc_negative_avg_score"] * merged_d["btc_negative_count"]
merged_d["btc_signal"] = (merged_d["btc_positive_count"] - merged_d["btc_negative_count"]) / (merged_d["btc_positive_count"] + merged_d["btc_negative_count"])
merged_dc = pd.concat([coinbase_news_counts[0], coinbase_news_counts[1], coinbase_news_counts[2], coinbase_avg_sentiments[0], coinbase_avg_sentiments[1], coinbase_avg_sentiments[2]], axis=1)
merged_dc.columns = ['coinbase_positive_count', 'coinbase_negative_count', 'coinbase_neutral_count', 'coinbase_positive_avg_score', 'coinbase_negative_avg_score', 'coinbase_neutral_avg_score']
merged_dc.fillna(0, inplace=True)
merged_dc["coinbase_total_news_count"] = merged_dc["coinbase_positive_count"] + merged_dc["coinbase_negative_count"] + merged_dc["coinbase_neutral_count"]
merged_dc["coinbase_total_news_score"] = merged_dc["coinbase_positive_avg_score"] * merged_dc["coinbase_positive_count"] - merged_dc["coinbase_negative_avg_score"] * merged_dc["coinbase_negative_count"]
merged_dc["coinbase_signal"] = (merged_dc["coinbase_positive_count"] - merged_dc["coinbase_negative_count"]) / (merged_dc["coinbase_positive_count"] + merged_dc["coinbase_negative_count"])
return merged_d, merged_dc
def compute_rolling_sums(self, merged_d, merged_dc):
coinbase_negative_count_rolling_10 = merged_dc['coinbase_negative_count'].rolling(10).sum().iloc[-1]
coinbase_positive_avg_score_rolling_10 = merged_dc['coinbase_positive_avg_score'].rolling(10).sum().iloc[-1]
coinbase_positive_avg_score_rolling_30 = merged_dc['coinbase_positive_avg_score'].rolling(30).sum().iloc[-1]
btc_total_news_score_rolling_30 = merged_d['btc_total_news_score'].rolling(30).sum().iloc[-1]
coinbase_total_news_score = merged_dc['coinbase_total_news_score'].iloc[-1]
return coinbase_negative_count_rolling_10, coinbase_positive_avg_score_rolling_10, coinbase_positive_avg
def OnData(self, data: Slice):
if self.IsWarmingUp:
return
if not self.is_warmup_complete:
self.last_warmup_date = self.Time # Stores the last warmup date
self.Debug(f"Warmup completed on: {self.last_warmup_date}")
self.is_warmup_complete = True
self.all_data = self.History(self.btc_symbol, 37).droplevel(0)
# if not self.Portfolio.Invested:
# self.SetHoldings("SPY", 0.33)
# self.SetHoldings("BND", 0.33)
# self.SetHoldings("AAPL", 0.33)
if data.ContainsKey("BTCUSD"):
df = self.History(self.btc_symbol, 1).droplevel(0)
self.all_data = pd.concat([self.all_data, df])
df = self.all_data.copy()
ti = technical_indicators(df)
stoch_fastk = ti['STOCH_FASTK'].values[-1]
stoch_fastd = ti['STOCH_FASTD'].values[-1]
stoch_fast_d_rolling_10 = ti['STOCH_FASTD'].rolling(10).sum().values[-1]
aroonosc = ti['AROONOSC'].values[-1][-1]
mfi = ti['MFI'].values[-1]
roc = ti['ROC'].values[-1]
rsi = ti['RSI'].values[-1]
roc_rolling_10 = ti['RSI'].rolling(10).sum().values[-1]
willr = ti['WILLR'].values[-1]
mom_rolling_10 = ti['MOM'].rolling(10).sum().values[-1]
natr = ti['NATR'].values[-1]
mom = ti['MOM'].values[-1]
cmo = ti['CMO'].values[-1]
willr_rolling_10 = ti['WILLR'].rolling(10).sum().values[-1]
macdhist = ti['MACDHIST'].values[-1]
stoch_fastd_rolling_10 = ti['STOCH_FASTD'].rolling(10).sum().values[-1]
plus_di_rolling_10 = ti['PLUS_DI'].rolling(10).sum().values[-1]
macdhist_rolling_10 = ti['MACDHIST'].rolling(10).sum().values[-1]
plus_di = ti['PLUS_DI'].values[-1]
stoch_fastk_rolling_10 = ti['STOCH_FASTK'].rolling(10).sum().values[-1]
cci = ti['CCI'].values[-1]
ULTOSC_rolling_10 = ti['ULTOSC'].rolling(10).sum().values[-1]
ultosc = ti['ULTOSC'].values[-1]
minus_di = ti['MINUS_DI'].values[-1]
mfi_rolling_10 = ti['MFI'].rolling(10).sum().values[-1]
macd = ti['MACD'].values[-1]
cci_rolling_10 = ti['CCI'].rolling(10).sum().values[-1]
ht_phasor_quad_rolling_10 = ti['HT_PHASOR_quadrature'].rolling(10).sum().values[-1]
# history_coin = process_coinbase(self.History(self.dataset_symbol, 37, Resolution.Daily).droplevel(0))
# bitcoin_news = history_coin[1]
# coinbase_news = history_coin[0]
bitcoin_news, coinbase_news = self.get_latest_data()
if (not isinstance(self.cached_coinbase_news_sent, pd.DataFrame)) or (not isinstance(self.cached_bitcoin_news_sent, pd.DataFrame)):
bitcoin_news, coinbase_news = self.process_data(bitcoin_news, coinbase_news)
# Update the cache flags
self.cached_coinbase_news_sent = coinbase_news
self.cached_bitcoin_news_sent = bitcoin_news
else:
# Filter new data based on the latest article timestamp
latest_timestamp = max(coinbase_news.index.max(), bitcoin_news.index.max())
new_bitcoin_news = bitcoin_news[bitcoin_news.index > latest_timestamp]
new_coinbase_news = coinbase_news[coinbase_news.index > latest_timestamp]
# Process only the new data
new_bitcoin_news, new_coinbase_news = self.process_data(new_bitcoin_news, new_coinbase_news)
# Update cache with the processed new data
self.cached_coinbase_news_sent = pd.concat([self.cached_coinbase_news_sent, new_coinbase_news])
self.cached_bitcoin_news_sent = pd.concat([self.cached_bitcoin_news_sent, new_bitcoin_news])
bitcoin_news = self.cached_bitcoin_news_sent
coinbase_news = self.cached_coinbase_news_sent
# Get the news count for each label
pn = get_news_count_d(bitcoin_news, "positive")
nn = get_news_count_d(bitcoin_news, "negative")
neun = get_news_count_d(bitcoin_news, "neutral")
# Get the average sentiment score for each label
pn_avg = get_avg_sentiment_d(bitcoin_news, "positive")
nn_avg = get_avg_sentiment_d(bitcoin_news, "negative")
neun_avg = get_avg_sentiment_d(bitcoin_news, "neutral")
# # Merge the dataframes
merged_d = pd.merge(pn, nn, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, neun, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, pn_avg, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, nn_avg, on="date_dt", how="outer")
merged_d = pd.merge(merged_d, neun_avg, on="date_dt", how="outer")
# Fill NaN values with 0
merged_d = merged_d.fillna(0)
# total news count
merged_d["total_news_count"] = (
merged_d["positive_count"] + merged_d["negative_count"] + merged_d["neutral_count"]
)
merged_d["total_news_score"] = (
merged_d["positive_avg_score"] * merged_d["positive_count"]
) - (merged_d["negative_avg_score"] * merged_d["negative_count"])
merged_d["signal"] = (merged_d["positive_count"] - merged_d["negative_count"]) / (
merged_d["positive_count"] + merged_d["negative_count"]
)
merged_d.columns = [f"btc_{c}" for c in merged_d.columns]
# Get the news count for each label
pn = get_news_count_d(coinbase_news, "positive")
nn = get_news_count_d(coinbase_news, "negative")
neun = get_news_count_d(coinbase_news, "neutral")
# Get the average sentiment score for each label
pn_avg = get_avg_sentiment_d(coinbase_news, "positive")
nn_avg = get_avg_sentiment_d(coinbase_news, "negative")
neun_avg = get_avg_sentiment_d(coinbase_news, "neutral")
# Merge the dataframes
merged_dc = pd.merge(pn, nn, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, neun, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, pn_avg, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, nn_avg, on="date_dt", how="outer")
merged_dc = pd.merge(merged_dc, neun_avg, on="date_dt", how="outer")
# Fill NaN values with 0
merged_dc = merged_dc.fillna(0)
# total news count
merged_dc["total_news_count"] = (
merged_dc["positive_count"]
+ merged_dc["negative_count"]
+ merged_dc["neutral_count"]
)
merged_dc["total_news_score"] = (
merged_dc["positive_avg_score"] * merged_dc["positive_count"]
) - (merged_dc["negative_avg_score"] * merged_dc["negative_count"])
merged_dc["signal"] = (merged_dc["positive_count"] - merged_dc["negative_count"]) / (
merged_dc["positive_count"] + merged_dc["negative_count"]
)
merged_dc.columns = [f"coinbase_{c}" for c in merged_dc.columns]
coinbase_negative_count_rolling_10 = merged_dc['coinbase_negative_count'].rolling(10).sum().values[-1]
coinbase_positive_avg_score_rolling_10 = merged_dc['coinbase_positive_avg_score'].rolling(10).sum().values[-1]
coinbase_positive_avg_score_rolling_30 = merged_dc['coinbase_positive_avg_score'].rolling(30).sum().values[-1]
btc_total_news_score_rolling_30 = merged_d['btc_total_news_score'].rolling(30).sum().values[-1]
coinbase_total_news_score = merged_dc['coinbase_total_news_score'].values[-1]
indicators_order = [aroonosc, roc_rolling_10, mfi, mom_rolling_10, roc, cmo, rsi, mom,
willr, natr, stoch_fast_d_rolling_10, willr_rolling_10, plus_di, macdhist,
plus_di_rolling_10, cci, coinbase_negative_count_rolling_10, macdhist_rolling_10,
coinbase_positive_avg_score_rolling_10, stoch_fastk, minus_di, ultosc,
stoch_fast_d_rolling_10, coinbase_positive_avg_score_rolling_30,
btc_total_news_score_rolling_30, ULTOSC_rolling_10, coinbase_total_news_score,
stoch_fastd, cci_rolling_10, ht_phasor_quad_rolling_10]
df = pd.DataFrame([indicators_order], columns=['AROONOSC', 'ROC_rolling_10', 'MFI', 'MOM_rolling_10', 'ROC', 'CMO',
'RSI', 'MOM', 'WILLR', 'NATR', 'STOCH_FASTD_rolling_10',
'WILLR_rolling_10', 'PLUS_DI', 'MACDHIST', 'PLUS_DI_rolling_10', 'CCI',
'coinbase_negative_count_rolling_10', 'MACDHIST_rolling_10',
'coinbase_positive_avg_score_rolling_10', 'STOCH_FASTK', 'MINUS_DI',
'ULTOSC', 'STOCH_FASTK_rolling_10',
'coinbase_positive_avg_score_rolling_30',
'btc_total_news_score_rolling_30', 'ULTOSC_rolling_10',
'coinbase_total_news_score', 'STOCH_FASTD', 'CCI_rolling_10',
'HT_PHASOR_quadrature_rolling_10'])
preds = self.signal_model.predict(df)
print(21)
for dataset_symbol, article in data.Get(TiingoNews).items():
# self.Debug(f"|| {article.ArticleID} || {article.Description}")
pass
#region imports
from AlgorithmImports import *
#endregion
import talib
def compute_momentum_indicators(df):
# Prepare data
close = df['close'].values
high = df['high'].values
low = df['low'].values
open1 = df['open'].values
volume = df['volume'].values
# Initialize dictionary to hold results
results = {}
# Compute momentum indicators
results['ADX'] = talib.ADX(high, low, close, timeperiod=14)
results['APO'] = talib.APO(close, fastperiod=12, slowperiod=26, matype=0)
results['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
results['BOP'] = talib.BOP(open1, high, low, close)
results['CCI'] = talib.CCI(high, low, close, timeperiod=14)
results['CMO'] = talib.CMO(close, timeperiod=14)
results['MACD'], results['MACDSIGNAL'], results['MACDHIST'] = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
results['MFI'] = talib.MFI(high, low, close, volume, timeperiod=14)
results['MOM'] = talib.MOM(close, timeperiod=10)
results['RSI'] = talib.RSI(close, timeperiod=14)
results['STOCH_FASTK'], results['STOCH_FASTD'] = talib.STOCHF(high, low, close, fastk_period=5, fastd_period=3, fastd_matype=0)
results['STOCHRSI_K'], results['STOCHRSI_D'] = talib.STOCHRSI(close, timeperiod=14, fastk_period=5, fastd_period=3, fastd_matype=0)
results['TRIX'] = talib.TRIX(close, timeperiod=30)
results['ULTOSC'] = talib.ULTOSC(high, low, close, timeperiod1=7, timeperiod2=14, timeperiod3=28)
results['WILLR'] = talib.WILLR(high, low, close, timeperiod=14)
# Convert dictionary to DataFrame
results_df = pd.DataFrame(results)
results_df.index = df.index
return results_df
def compute_extended_volume_indicators(df):
# Ensure numerical columns are floats
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
volume = df['volume'].astype(float).values # Ensure volume is float for calculations
open_ = df['open'].astype(float).values # Add open prices for indicators that might use it
# Initialize dictionary to hold results
results = {}
# Compute original volume indicators
results['AD'] = talib.AD(high, low, close, volume)
results['ADOSC'] = talib.ADOSC(high, low, close, volume, fastperiod=3, slowperiod=10)
results['OBV'] = talib.OBV(close, volume)
results_df = pd.DataFrame(results)
results_df.index = df.index
return results_df
def compute_trend_indicators(df):
# Ensure all necessary columns are floats
open_ = df['open'].astype(float).values
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
volume = df['volume'].astype(float).values # Necessary for MAMA
# Initialize dictionary to hold results
results = {}
# Moving Averages
results['SMA'] = talib.SMA(close, timeperiod=14)
results['EMA'] = talib.EMA(close, timeperiod=14)
results['WMA'] = talib.WMA(close, timeperiod=14)
results['DEMA'] = talib.DEMA(close, timeperiod=14)
results['TEMA'] = talib.TEMA(close, timeperiod=14)
results['TRIMA'] = talib.TRIMA(close, timeperiod=14)
results['KAMA'] = talib.KAMA(close, timeperiod=14)
mama, fama = talib.MAMA(close, fastlimit=0.5, slowlimit=0.05) # MAMA returns MAMA and FAMA
results['MAMA'] = mama
results['FAMA'] = fama
results['T3'] = talib.T3(close, timeperiod=14, vfactor=0.7)
# Directional Movement Indicators
results['ADX'] = talib.ADX(high, low, close, timeperiod=14)
results['PLUS_DI'] = talib.PLUS_DI(high, low, close, timeperiod=14)
results['MINUS_DI'] = talib.MINUS_DI(high, low, close, timeperiod=14)
# Others
results['SAR'] = talib.SAR(high, low, acceleration=0.02, maximum=0.2)
results['AROON_DOWN'], results['AROON_UP'] = talib.AROON(high, low, timeperiod=14)
results['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
# results['VI_PLUS'], results['VI_MINUS'] = talib.VI(high, low, close, timeperiod=14)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_volatility_indicators(df):
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Bollinger Bands
results['BBANDS_UPPER'], results['BBANDS_MIDDLE'], results['BBANDS_LOWER'] = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
# Average True Range
results['ATR'] = talib.ATR(high, low, close, timeperiod=14)
# Normalized Average True Range
results['NATR'] = talib.NATR(high, low, close, timeperiod=14)
# True Range
results['TRANGE'] = talib.TRANGE(high, low, close)
# Chandelier Exit (custom calculation, not a direct TA-Lib function)
# Typically uses a 22-day period and a multiplier of 3 times the ATR
atr_22 = talib.ATR(high, low, close, timeperiod=22)
highest_high_22 = talib.MAX(high, timeperiod=22)
lowest_low_22 = talib.MIN(low, timeperiod=22)
results['CHANDELIER_EXIT_LONG'] = highest_high_22 - (atr_22 * 3)
results['CHANDELIER_EXIT_SHORT'] = lowest_low_22 + (atr_22 * 3)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_price_transform_indicators(df):
# Ensure all necessary columns are floats
open_ = df['open'].astype(float).values
high = df['high'].astype(float).values
low = df['low'].astype(float).values
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Weighted Close Price
results['WCLPRICE'] = talib.WCLPRICE(high, low, close)
# Typical Price
results['TYPPRICE'] = talib.TYPPRICE(high, low, close)
# Median Price
results['MEDPRICE'] = talib.MEDPRICE(high, low)
# Price Rate of Change
results['ROC'] = talib.ROC(close, timeperiod=10)
# Average Price
results['AVGPRICE'] = talib.AVGPRICE(open_, high, low, close)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def compute_cycle_indicators(df):
# Ensure 'close' column is a float
close = df['close'].astype(float).values
# Initialize dictionary to hold results
results = {}
# Hilbert Transform - Dominant Cycle Period
results['HT_DCPERIOD'] = talib.HT_DCPERIOD(close)
# Hilbert Transform - Dominant Cycle Phase
results['HT_DCPHASE'] = talib.HT_DCPHASE(close)
# Hilbert Transform - Phasor Components
results['HT_PHASOR_inphase'], results['HT_PHASOR_quadrature'] = talib.HT_PHASOR(close)
# Hilbert Transform - SineWave
results['HT_SINE'], results['HT_LEADSINE'] = talib.HT_SINE(close)
# Hilbert Transform - Trend vs Cycle Mode
results['HT_TRENDMODE'] = talib.HT_TRENDMODE(close)
# Convert dictionary to DataFrame and ensure it aligns with the original DataFrame's index
results_df = pd.DataFrame(results, index=df.index)
return results_df
def technical_indicators(df):
all_results = pd.concat([compute_momentum_indicators(df),
compute_trend_indicators(df),
compute_price_transform_indicators(df),
compute_volatility_indicators(df),
compute_cycle_indicators(df)
], axis=1
)
return all_results
# region imports
from AlgorithmImports import *
from QuantConnect.DataSource import *
class TiingoNewsDataAlgorithm(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2014, 1, 1)
self.SetEndDate(2024, 3, 20)
self.SetCash(100000)
self.symbol = self.AddEquity("coin", Resolution.Daily).Symbol
self.dataset_symbol = self.AddData(TiingoNews, self.symbol, Resolution.Daily).Symbol
def OnData(self, slice: Slice) -> None:
for dataset_symbol, article in slice.Get(TiingoNews).items():
self.Debug(f"|| {article.ArticleID} || {article.Description}")