| Overall Statistics |
|
Total Trades 795 Average Win 0.03% Average Loss -0.03% Compounding Annual Return -0.764% Drawdown 1.600% Expectancy -0.112 Net Profit -1.523% Sharpe Ratio -0.659 Probabilistic Sharpe Ratio 0.312% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 1.02 Alpha -0.005 Beta 0.003 Annual Standard Deviation 0.008 Annual Variance 0 Information Ratio -0.228 Tracking Error 0.161 Treynor Ratio -1.539 Total Fees $2536.80 Estimated Strategy Capacity $5300000.00 Lowest Capacity Asset NVDA RHM8UTD8DT2D |
from AlgorithmImports import *
from datetime import timedelta
from symbol_data import SymbolData
class TiingoNewsImpactAlphaModel(AlphaModel):
PREDICTION_INTERVAL = timedelta(minutes=30)
def __init__(self, algorithm):
self.algorithm = algorithm
self.symbol_data_by_symbol = {}
# Schedule model training sessions
algorithm.Train(algorithm.DateRules.MonthStart(), algorithm.TimeRules.At(7, 0), self.train_models)
def train_models(self):
for symbol, symbol_data in self.symbol_data_by_symbol.items():
symbol_data.train_model()
def Update(self, algorithm: QCAlgorithm, slice: Slice) -> List[Insight]:
insights = []
# Get expected returns of each Symbol, given the current news
expected_returns_by_symbol = {}
for dataset_symbol, article in slice.Get(TiingoNews).items():
for asset_symbol in article.Symbols:
if asset_symbol not in self.symbol_data_by_symbol:
continue # Articles can mention assets that aren't in the universe
is_open = algorithm.Securities[asset_symbol].Exchange.Hours.IsOpen(algorithm.Time + timedelta(minutes=1), extendedMarket=False)
if not is_open:
continue # Only trade during regular hours, otherwise market orders get converted to MOO and causes margin issues
if asset_symbol not in expected_returns_by_symbol:
expected_returns_by_symbol[asset_symbol] = []
expected_return = self.symbol_data_by_symbol[asset_symbol].get_expected_return(article)
if expected_return is not None:
expected_returns_by_symbol[asset_symbol].append(expected_return)
expected_return_by_symbol = {
asset_symbol: self.aggregate_expected_returns(expected_returns)
for asset_symbol, expected_returns in expected_returns_by_symbol.items()
if len(expected_returns) > 0
}
for asset_symbol, expected_return in expected_return_by_symbol.items():
if self.symbol_data_by_symbol[asset_symbol].should_trade(expected_return):
direction = InsightDirection.Up if expected_return > 0 else InsightDirection.Down
insights.append(Insight.Price(asset_symbol, self.PREDICTION_INTERVAL, direction))
return insights
def aggregate_expected_returns(self, expected_returns):
return sum(expected_returns)/len(expected_returns)
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
for security in changes.AddedSecurities:
self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol, self.PREDICTION_INTERVAL, self.aggregate_expected_returns)
for security in changes.RemovedSecurities:
if security.Symbol in self.symbol_data_by_symbol:
symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None)
if symbol_data:
symbol_data.dispose()
# region imports
from AlgorithmImports import *
from universe import QQQETFUniverseSelectionModel
from alpha import TiingoNewsImpactAlphaModel
from portfolio import PartitionedPortfolioConstructionModel
# endregion
class QQQConstituentsNewsImpactAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 1)
self.SetEndDate(2023, 1, 1)
self.SetCash(1_000_000)
self.AddUniverseSelection(QQQETFUniverseSelectionModel(self.UniverseSettings))
self.AddAlpha(TiingoNewsImpactAlphaModel(self))
self.SetPortfolioConstruction(PartitionedPortfolioConstructionModel(self, 10))
self.AddRiskManagement(NullRiskManagementModel())
self.SetExecution(ImmediateExecutionModel())
#region imports
from AlgorithmImports import *
#endregion
#region imports
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import numpy as np
#endregion
class IntradayNewsReturnGenerator:
STOP_WORDS = set(stopwords.words('english'))
PUNCTUATION = "!()-[]{};:'\",<>./?@#%^&*_~"
def __init__(self, prediction_interval, aggregation_method):
self.PREDICTION_INTERVAL = prediction_interval
self.aggregate_future_returns = aggregation_method
self.future_return_by_word = {}
def update_word_sentiments(self, words_collection, security_history):
self.future_return_by_word = {}
if len(words_collection) == 0 or security_history.empty:
return
future_returns_by_word = {}
for time, words in words_collection:
# Get entry price
start_time = np.datetime64(time)
entry_prices = security_history.loc[security_history.index >= start_time]
if entry_prices.empty:
continue
entry_price = entry_prices.iloc[0]
# Get exit price
end_time = np.datetime64(time + self.PREDICTION_INTERVAL)
exit_prices = security_history.loc[security_history.index >= end_time]
if exit_prices.empty:
continue
exit_price = exit_prices.iloc[0]
# Calculate trade return
future_return = (exit_price - entry_price) / entry_price
# Save simulated trade return for each word
filtered_words = self.filter_words(words)
for word in filtered_words:
if word not in future_returns_by_word:
future_returns_by_word[word] = []
future_returns_by_word[word].append(future_return)
# Aggregate future returns for each word
self.future_return_by_word = {
word: self.aggregate_future_returns(future_returns)
for word, future_returns in future_returns_by_word.items()
}
def filter_words(self, words):
word_tokens = word_tokenize(words)
return list(set([w.lower() for w in word_tokens if w.lower() not in self.STOP_WORDS and w not in self.PUNCTUATION]))
def get_expected_return(self, words):
if len(self.future_return_by_word) == 0:
return None
filtered_words = self.filter_words(words)
future_returns = []
for word in filtered_words:
if word in self.future_return_by_word:
future_returns.append(self.future_return_by_word[word])
if len(future_returns) == 0:
return None
return self.aggregate_future_returns(future_returns)
#region imports
from AlgorithmImports import *
#endregion
class PartitionedPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, algorithm, num_partitions):
self.algorithm = algorithm
self.NUM_PARTITIONS = num_partitions
# REQUIRED: Will determine the target percent for each insight
def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:
target_pct_by_insight = {}
# Sort insights by time they were emmited
insights_sorted_by_time = sorted(activeInsights, key=lambda x: x.GeneratedTimeUtc)
# Find target securities and group insights by Symbol
target_symbols = []
insight_by_symbol = {}
for insight in insights_sorted_by_time:
insight_by_symbol[insight.Symbol] = insight
# Liquidate securities that are removed from the universe
if insight.Symbol in self.removed_symbols:
continue
if len(target_symbols) < self.NUM_PARTITIONS:
target_symbols.append(insight.Symbol)
occupied_portfolio_value = 0
occupied_partitions = 0
# Get last insight emmited for each target Symbol
for symbol, insight in insight_by_symbol.items():
# Only invest in Symbols in `target_symbols`
if symbol not in target_symbols:
target_pct_by_insight[insight] = 0
else:
security_holding = self.algorithm.Portfolio[symbol]
# If we're invested in the security in the proper direction, do nothing
if security_holding.IsShort and insight.Direction == InsightDirection.Down \
or security_holding.IsLong and insight.Direction == InsightDirection.Up:
occupied_portfolio_value += security_holding.AbsoluteHoldingsValue
occupied_partitions += 1
continue
# If currently invested and there but the insight direction has changed,
# change portfolio weight of security and reset set partition size
if security_holding.IsShort and insight.Direction == InsightDirection.Up \
or security_holding.IsLong and insight.Direction == InsightDirection.Down:
target_pct_by_insight[insight] = int(insight.Direction)
# If not currently invested, set portfolio weight of security with partition size
if not security_holding.Invested:
target_pct_by_insight[insight] = int(insight.Direction)
# Scale down target percentages to respect partitions (account for liquidations from insight expiry + universe removals)
total_portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue
free_portfolio_pct = (total_portfolio_value - occupied_portfolio_value) / total_portfolio_value
vacant_partitions = self.NUM_PARTITIONS - occupied_partitions
scaling_factor = free_portfolio_pct / vacant_partitions if vacant_partitions != 0 else 0
for insight, target_pct in target_pct_by_insight.items():
target_pct_by_insight[insight] = target_pct * scaling_factor
return target_pct_by_insight
# Determines if the portfolio should be rebalanced base on the provided rebalancing func
def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool:
# Rebalance when any of the following cases are true:
# Case 1: A security we're invested in was removed from the universe
# Case 2: The latest insight for a Symbol we're invested in has expired
# Case 3: The insight direction for a security we're invested in has changed
# Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio
last_active_insights = self.GetTargetInsights() # Warning: This assumes that all insights have the same duration
insight_symbols = [insight.Symbol for insight in last_active_insights]
num_investments = 0
for symbol, security_holding in self.algorithm.Portfolio.items():
if not security_holding.Invested:
continue
num_investments += 1
# Case 1: A security we're invested in was removed from the universe
# Case 2: The latest insight for a Symbol we're invested in has expired
if symbol not in insight_symbols:
return True
for insight in last_active_insights:
security_holding = self.algorithm.Portfolio[insight.Symbol]
# Case 3: The insight direction for a security we're invested in has changed
if security_holding.IsShort and insight.Direction == InsightDirection.Up \
or security_holding.IsLong and insight.Direction == InsightDirection.Down:
return True
# Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio
if not security_holding.Invested and num_investments < self.NUM_PARTITIONS:
return True
return False
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
super().OnSecuritiesChanged(algorithm, changes)
self.removed_symbols = []
for security in changes.RemovedSecurities:
self.removed_symbols.append(security.Symbol)
if not self.InsightCollection.ContainsKey(security.Symbol):
continue
for insight in self.InsightCollection[security.Symbol]:
self.InsightCollection.Remove(insight)
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
import pytz
from news_return_generator import IntradayNewsReturnGenerator
class SymbolData:
LOOKBACK_PERIOD = timedelta(days=30)
STANDARD_DEVS = 6
STANDARD_DEV_PERIOD = 30
def __init__(self, algorithm, symbol, prediction_interval, aggregation_method):
self.algorithm = algorithm
self.symbol = symbol
self.dataset_symbol = algorithm.AddData(TiingoNews, symbol).Symbol
self.news_return_generator = IntradayNewsReturnGenerator(prediction_interval, aggregation_method)
self.predictions_bb = BollingerBands(self.STANDARD_DEV_PERIOD, self.STANDARD_DEVS)
algorithm.Train(self.train_model)
def train_model(self):
# Get and structure news history
news_history = self.algorithm.History[TiingoNews](self.dataset_symbol, self.LOOKBACK_PERIOD)
filtered_news_history = []
words_collection = []
for article in news_history:
# Convert article timestamp (GMT) into the asset time zone
timestamp = article.Time.astimezone(pytz.timezone(self.algorithm.Securities[self.symbol].Exchange.Hours.TimeZone.ToString())).replace(tzinfo=None)
# Skip articles that were released outside of regular trading hours
is_open = self.algorithm.Securities[self.symbol].Exchange.Hours.IsOpen(timestamp + timedelta(minutes=1), extendedMarket=False)
if not is_open:
continue # If you train the model with articles that we released outside of RTH, then the simulated entry price and exit price can be the same (the market opening price)
filtered_news_history.append(article)
words_collection.append( (timestamp, self.get_words(article)) )
# Get security history
security_history = self.algorithm.History(self.symbol, self.LOOKBACK_PERIOD)
if not security_history.empty:
security_history = security_history.loc[self.symbol]['close']
# Train news return generator
self.news_return_generator.update_word_sentiments(words_collection, security_history)
# Warm up STD of predictions using the most recent articles
for article in filtered_news_history[-self.predictions_bb.WarmUpPeriod:]:
expected_return = self.get_expected_return(article)
if expected_return is None:
continue # The expected return can be None if the article has no title (data issue)
self.predictions_bb.Update(article.Time, expected_return)
def get_expected_return(self, article):
return self.news_return_generator.get_expected_return(self.get_words(article))
def get_words(self, article):
return article.Title + article.Description
def dispose(self):
self.algorithm.RemoveSecurity(self.dataset_symbol)
def should_trade(self, expected_return):
if expected_return in [None, 0]:
return False
return self.predictions_bb.IsReady \
and (expected_return > self.predictions_bb.UpperBand.Current.Value \
or expected_return < self.predictions_bb.LowerBand.Current.Value)from AlgorithmImports import *
class QQQETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
def __init__(self, universe_settings: UniverseSettings = None) -> None:
symbol = Symbol.Create("QQQ", SecurityType.Equity, Market.USA)
super().__init__(symbol, universe_settings, self.ETFConstituentsFilter)
def ETFConstituentsFilter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
selected = sorted([c for c in constituents if c.Weight],
key=lambda c: c.Weight, reverse=True)[:10]
return [c.Symbol for c in selected]