| Overall Statistics |
|
Total Orders 2628 Average Win 0.04% Average Loss -0.03% Compounding Annual Return -0.301% Drawdown 2.300% Expectancy -0.034 Start Equity 1000000 End Equity 985051.27 Net Profit -1.495% Sharpe Ratio -4.526 Sortino Ratio -3.412 Probabilistic Sharpe Ratio 0.048% Loss Rate 55% Win Rate 45% Profit-Loss Ratio 1.14 Alpha -0.04 Beta 0.002 Annual Standard Deviation 0.009 Annual Variance 0 Information Ratio -0.777 Tracking Error 0.141 Treynor Ratio -20.23 Total Fees $8328.56 Estimated Strategy Capacity $9700000.00 Lowest Capacity Asset NFLX SEWJWLJNHZDX Portfolio Turnover 14.53% Drawdown Recovery 1403 |
import pytz
from AlgorithmImports import *
from news_return_generator import IntradayNewsReturnGenerator
class TiingoNewsImpactAlphaModel(AlphaModel):
_PREDICTION_INTERVAL = timedelta(minutes=30)
_LOOKBACK_PERIOD = timedelta(days=30)
_STANDARD_DEVS = 6
_STANDARD_DEV_PERIOD = 30
def __init__(self, algorithm):
self._algorithm = algorithm
self._securities = []
# Schedule model training sessions.
algorithm.train(algorithm.date_rules.month_start(), algorithm.time_rules.at(8, 0), self._train_models)
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
for security in changes.added_securities:
security.dataset_symbol = algorithm.add_data(TiingoNews, security).Symbol
security.news_return_generator = IntradayNewsReturnGenerator(self._PREDICTION_INTERVAL, self._aggregate_expected_returns)
security.predictions_bb = BollingerBands(self._STANDARD_DEV_PERIOD, self._STANDARD_DEVS)
algorithm.train(lambda security=security: self._train_model(security))
self._securities.append(security)
for security in changes.removed_securities:
algorithm.remove_security(security.dataset_symbol)
self._securities.remove(security)
def _train_models(self):
for security in self._securities:
self._train_model(security)
def _train_model(self, security):
# Get and structure news history
news_history = self._algorithm.history[TiingoNews](security.dataset_symbol, self._LOOKBACK_PERIOD)
filtered_news_history = []
words_collection = []
for article in news_history:
# Convert article timestamp (GMT) into the asset time zone
timestamp = article.time.astimezone(pytz.timezone(security.exchange.hours.time_zone.to_string())).replace(tzinfo=None)
# Skip articles that were released outside of regular trading hours
is_open = security.exchange.hours.is_open(timestamp + timedelta(minutes=1), extended_market_hours=False)
if not is_open:
continue # If you train the model with articles that we released outside of RTH, then the simulated entry price and exit price can be the same (the market opening price)
filtered_news_history.append(article)
words_collection.append( (timestamp, self._get_words(article)) )
# Get security history
security_history = self._algorithm.history(security, self._LOOKBACK_PERIOD)
if not security_history.empty:
security_history = security_history.loc[security.symbol]['close']
# Train news return generator
security.news_return_generator.update_word_sentiments(words_collection, security_history)
# Warm up STD of predictions using the most recent articles
for article in filtered_news_history[-security.predictions_bb.warm_up_period:]:
expected_return = self._get_expected_return(security, article)
if expected_return is None:
continue # The expected return can be None if the article has no title (data issue)
security.predictions_bb.update(article.time, expected_return)
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
insights = []
# Get expected returns of each Symbol, given the current news
expected_returns_by_security = {}
for dataset_symbol, article in data.get(TiingoNews).items():
for asset_symbol in article.symbols:
if asset_symbol not in algorithm.securities:
continue
security = algorithm.securities[asset_symbol]
if security not in self._securities:
continue # Articles can mention assets that aren't in the universe
is_open = security.exchange.hours.is_open(algorithm.time + timedelta(minutes=1), extended_market_hours=False)
if not is_open:
continue # Only trade during regular hours, otherwise market orders get converted to MOO and causes margin issues
if asset_symbol not in expected_returns_by_security:
expected_returns_by_security[security] = []
expected_return = self._get_expected_return(security, article)
if expected_return is not None:
expected_returns_by_security[security].append(expected_return)
expected_returns_by_security = {
security: self._aggregate_expected_returns(expected_returns)
for security, expected_returns in expected_returns_by_security.items()
if len(expected_returns) > 0
}
for security, expected_return in expected_returns_by_security.items():
if self._should_trade(security, expected_return):
direction = InsightDirection.UP if expected_return > 0 else InsightDirection.DOWN
insights.append(Insight.price(security, self._PREDICTION_INTERVAL, direction))
return insights
def _aggregate_expected_returns(self, expected_returns):
return sum(expected_returns)/len(expected_returns)
def _get_words(self, article):
return article.title + article.description
def _get_expected_return(self, security, article):
return security.news_return_generator.get_expected_return(self._get_words(article))
def _should_trade(self, security, expected_return):
if expected_return in [None, 0]:
return False
return (
security.predictions_bb.is_ready and
not (security.predictions_bb.lower_band.current.value <= expected_return <= security.predictions_bb.upper_band.current.value)
)# region imports
from AlgorithmImports import *
from universe import QQQETFUniverseSelectionModel
from alpha import TiingoNewsImpactAlphaModel
from portfolio import PartitionedPortfolioConstructionModel
# endregion
class QQQConstituentsNewsImpactAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(self.end_date - timedelta(5*365))
self.set_cash(1_000_000)
self.add_universe_selection(QQQETFUniverseSelectionModel(self.universe_settings))
self.add_alpha(TiingoNewsImpactAlphaModel(self))
self.set_portfolio_construction(PartitionedPortfolioConstructionModel(self, 10))
self.add_risk_management(NullRiskManagementModel())
self.set_execution(ImmediateExecutionModel())
#region imports
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from AlgorithmImports import *
#endregion
class IntradayNewsReturnGenerator:
_STOP_WORDS = set(stopwords.words('english'))
_PUNCTUATION = "!()-[]{};:'\",<>./?@#%^&*_~"
def __init__(self, prediction_interval, aggregation_method):
self._PREDICTION_INTERVAL = prediction_interval
self._aggregate_future_returns = aggregation_method
self._future_return_by_word = {}
def update_word_sentiments(self, words_collection, security_history):
self._future_return_by_word = {}
if len(words_collection) == 0 or security_history.empty:
return
future_returns_by_word = {}
for time, words in words_collection:
# Get entry price
start_time = np.datetime64(time)
entry_prices = security_history.loc[security_history.index >= start_time]
if entry_prices.empty:
continue
entry_price = entry_prices.iloc[0]
# Get exit price
end_time = np.datetime64(time + self._PREDICTION_INTERVAL)
exit_prices = security_history.loc[security_history.index >= end_time]
if exit_prices.empty:
continue
exit_price = exit_prices.iloc[0]
# Calculate trade return
future_return = (exit_price - entry_price) / entry_price
# Save simulated trade return for each word
filtered_words = self._filter_words(words)
for word in filtered_words:
if word not in future_returns_by_word:
future_returns_by_word[word] = []
future_returns_by_word[word].append(future_return)
# Aggregate future returns for each word
self._future_return_by_word = {
word: self._aggregate_future_returns(future_returns)
for word, future_returns in future_returns_by_word.items()
}
def _filter_words(self, words):
word_tokens = word_tokenize(words)
return list(set([w.lower() for w in word_tokens if w.lower() not in self._STOP_WORDS and w not in self._PUNCTUATION]))
def get_expected_return(self, words):
if len(self._future_return_by_word) == 0:
return None
filtered_words = self._filter_words(words)
future_returns = []
for word in filtered_words:
if word in self._future_return_by_word:
future_returns.append(self._future_return_by_word[word])
if len(future_returns) == 0:
return None
return self._aggregate_future_returns(future_returns)
#region imports
from AlgorithmImports import *
#endregion
class PartitionedPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, algorithm, _NUM_PARTITIONS):
self._algorithm = algorithm
self._NUM_PARTITIONS = _NUM_PARTITIONS
# REQUIRED: Will determine the target percent for each insight
def determine_target_percent(self, active_insights: List[Insight]):
target_pct_by_insight = {}
# Sort insights by time they were emmited
insights_sorted_by_time = sorted(active_insights, key=lambda x: x.generated_time_utc)
# Find target securities and group insights by Symbol
target_symbols = []
insight_by_symbol = {}
for insight in insights_sorted_by_time:
insight_by_symbol[insight.symbol] = insight
# Liquidate securities that are removed from the universe
if insight.symbol in self.removed_symbols:
continue
if len(target_symbols) < self._NUM_PARTITIONS:
target_symbols.append(insight.symbol)
occupied_portfolio_value = 0
occupied_partitions = 0
# Get last insight emmited for each target Symbol
for symbol, insight in insight_by_symbol.items():
# Only invest in Symbols in `target_symbols`
if symbol not in target_symbols:
target_pct_by_insight[insight] = 0
else:
security_holding = self._algorithm.portfolio[symbol]
# If we're invested in the security in the proper direction, do nothing
if (security_holding.is_short and insight.direction == InsightDirection.DOWN or
security_holding.is_long and insight.direction == InsightDirection.UP):
occupied_portfolio_value += security_holding.absolute_holdings_value
occupied_partitions += 1
continue
# If currently invested and there but the insight direction has changed,
# change portfolio weight of security and reset set partition size
if (security_holding.is_short and insight.direction == InsightDirection.UP or
security_holding.is_long and insight.direction == InsightDirection.DOWN):
target_pct_by_insight[insight] = int(insight.direction)
# If not currently invested, set portfolio weight of security with partition size
if not security_holding.invested:
target_pct_by_insight[insight] = int(insight.direction)
# Scale down target percentages to respect partitions (account for liquidations from insight expiry + universe removals)
total_portfolio_value = self._algorithm.portfolio.total_portfolio_value
free_portfolio_pct = (total_portfolio_value - occupied_portfolio_value) / total_portfolio_value
vacant_partitions = self._NUM_PARTITIONS - occupied_partitions
scaling_factor = free_portfolio_pct / vacant_partitions if vacant_partitions != 0 else 0
for insight, target_pct in target_pct_by_insight.items():
target_pct_by_insight[insight] = target_pct * scaling_factor
return target_pct_by_insight
# Determines if the portfolio should be rebalanced base on the provided rebalancing func
def is_rebalance_due(self, insights: List[Insight], algorithm_utc: datetime) -> bool:
# Rebalance when any of the following cases are true:
# Case 1: A security we're invested in was removed from the universe
# Case 2: The latest insight for a Symbol we're invested in has expired
# Case 3: The insight direction for a security we're invested in has changed
# Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio
last_active_insights = self.get_target_insights() # Warning: This assumes that all insights have the same duration
insight_symbols = [insight.symbol for insight in last_active_insights]
num_investments = 0
for symbol, security_holding in self._algorithm.portfolio.items():
if not security_holding.invested:
continue
num_investments += 1
# Case 1: A security we're invested in was removed from the universe
# Case 2: The latest insight for a Symbol we're invested in has expired
if symbol not in insight_symbols:
return True
for insight in last_active_insights:
security_holding = self._algorithm.portfolio[insight.symbol]
# Case 3: The insight direction for a security we're invested in has changed
if (security_holding.is_short and insight.direction == InsightDirection.UP or
security_holding.is_long and insight.direction == InsightDirection.DOWN):
return True
# Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio
if not security_holding.invested and num_investments < self._NUM_PARTITIONS:
return True
return False
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
super().on_securities_changed(algorithm, changes)
self.removed_symbols = []
for security in changes.removed_securities:
self.removed_symbols.append(security.symbol)
if not algorithm.insights.contains_key(security.symbol):
continue
for insight in algorithm.insights[security.symbol]:
algorithm.insights.remove(insight)
from AlgorithmImports import *
class QQQETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
def __init__(self, universe_settings: UniverseSettings = None) -> None:
symbol = Symbol.create("QQQ", SecurityType.EQUITY, Market.USA)
super().__init__(symbol, universe_settings, self._etf_constituents_filter)
def _etf_constituents_filter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
selected = sorted([c for c in constituents if c.weight], key=lambda c: c.weight, reverse=True)[:10]
return [c.symbol for c in selected]