| Overall Statistics |
|
Total Orders 680 Average Win 1.12% Average Loss -0.76% Compounding Annual Return 21.148% Drawdown 22.300% Expectancy 0.338 Start Equity 100000 End Equity 261223.11 Net Profit 161.223% Sharpe Ratio 0.625 Sortino Ratio 0.758 Probabilistic Sharpe Ratio 24.351% Loss Rate 46% Win Rate 54% Profit-Loss Ratio 1.48 Alpha 0.061 Beta 0.963 Annual Standard Deviation 0.208 Annual Variance 0.043 Information Ratio 0.374 Tracking Error 0.157 Treynor Ratio 0.135 Total Fees $1139.48 Estimated Strategy Capacity $0 Lowest Capacity Asset TROW R735QTJ8XC9X Portfolio Turnover 3.86% Drawdown Recovery 349 |
from AlgorithmImports import *
class FundamentalValueSentiment30DayRebalanceAlgorithm(QCAlgorithm):
"""Fundamental value + TiingoNews sentiment strategy with monthly rebalancing.
Validated configuration (Run 8 — best backtest):
───────────────────────────────────────────────────────────────────────
Backtest 2021–2025: Sharpe 0.496 | Return 16.2% | Drawdown 22.2%
Out-of-sample 2018–2021: Sharpe 0.395 | Return 11.5% | Drawdown 37.1%
Alpha positive in both periods.
Live trading changes vs backtest:
───────────────────────────────────────────────────────────────────────
- No fixed start/end dates (live_mode auto-detected)
- FinBERT enabled (disabled in backtest to avoid 50hr runtime)
- Extended logging via self.log() for rebalance, risk exits, DD guard
- Brokerage model set to Interactive Brokers Margin
Key parameters:
───────────────────────────────────────────────────────────────────────
STOP_LOSS = 0.15 (15% trailing stop)
TAKE_PROFIT = 0.50 (50% take profit)
MIN_HOLD_DAYS = 10 (min days before stop checked)
DD_GUARD_THRESHOLD = 0.10 (10% monthly drawdown fires circuit breaker)
DD_GUARD_SCALE = 0.25 (scale to 50% when guard fires)
MAX_POSITIONS = 10
SENTIMENT_ENTRY_FLOOR = 0.0 (no negative-sentiment entries)
MAX_POSITION_WEIGHT = 0.20 (20% max per position)
MOMENTUM_LOOKBACK = 63 (3-month momentum filter)
───────────────────────────────────────────────────────────────────────
"""
# ── Sentiment constants ────────────────────────────────────────────────
MIN_NEWS_COUNT = 3
SENTIMENT_ALPHA = 0.2
DECAY_FACTOR = 1.0 - SENTIMENT_ALPHA # 0.80
# ── Risk constants ─────────────────────────────────────────────────────
TAKE_PROFIT = 0.50 # exit at +50% from entry
STOP_LOSS = 0.15 # exit at -15% from entry
# Minimum holding period — prevents thrashing around stop-loss level
# e.g. CTSH bought and stopped out repeatedly in consecutive months
MIN_HOLD_DAYS = 10 # stop-loss not checked for first 10 days after entry
# Portfolio-level drawdown guard
DD_GUARD_THRESHOLD = 0.10 # 10% drawdown from monthly peak triggers guard
DD_GUARD_SCALE = 0.25 # scale positions to 50% when triggered
# ── Sharpe improvements ────────────────────────────────────────────────
# Sentiment entry floor — only enter positions with non-negative sentiment.
# Applied only when the symbol has enough news hits to be trusted.
# Avoids buying names like SNA=-0.24, BLK=-0.09 seen in logs.
SENTIMENT_ENTRY_FLOOR = 0.0 # must have EWMA sentiment >= 0 to be entered
# Volatility-adjusted weighting cap — no single position > this weight
MAX_POSITION_WEIGHT = 0.20 # cap any single position at 20% of portfolio
# ── Portfolio construction ─────────────────────────────────────────────
MAX_POSITIONS = 10 # concentrate in top 10 — matches typical tradeable count
MIN_HISTORY_DAYS = 5 # symbol must have been in universe this many days
# before eligible for rebalance (fixes tradeable gap)
# ── Momentum filter ────────────────────────────────────────────────────
MOMENTUM_LOOKBACK = 63 # ~3 months of trading days
# Minimum 3-month return required to pass momentum filter.
MOMENTUM_MIN_RETURN = 0.0 # price must be above where it was 63 days ago
# ── Queue / cache limits ───────────────────────────────────────────────
QUEUE_MAX_SIZE = 500
SCORE_CACHE_MAX = 2000
def initialize(self) -> None:
# ── Live / Backtest mode ───────────────────────────────────────────
# Dates only used in backtesting — ignored in live trading
if not self.live_mode:
self.set_start_date(2021, 1, 1)
self.set_end_date(2026, 1, 1)
else:
self.set_start_date(self.end_date - timedelta(5*365))
self.set_benchmark("SPY")
self.set_cash(100_000)
# ── Brokerage ─────────────────────────────────────────────────────
self.set_brokerage_model(
BrokerageName.INTERACTIVE_BROKERS_BROKERAGE,
AccountType.MARGIN
)
self.universe_settings.resolution = Resolution.DAILY
self.universe_settings.data_normalization_mode = DataNormalizationMode.ADJUSTED
self.universe_settings.fill_data_before_start = True
self._spy = self.add_equity("SPY", Resolution.MINUTE).symbol
# Use ImmediateFillModel — avoids ask-price lookup errors on daily bars
# (daily OHLC data has no bid/ask spread, so quote-based fill models fail)
self.set_security_initializer(
lambda s: s.set_fill_model(ImmediateFillModel())
)
# Refresh universe monthly — fundamental ratios update quarterly so
# daily refresh wastes compute and causes unnecessary position churn.
self.universe_settings.schedule.on(
self.date_rules.month_start(self._spy)
)
# Universe state
self._selected_symbols: List[Symbol] = []
self._coarse_count = 0
self._fine_count = 0
# Trade / risk state
self._entry_price_by_symbol: dict = {}
self._position_entry_date: dict = {} # Symbol -> datetime of first fill
# Portfolio drawdown guard state
self._monthly_peak_value = 0.0 # portfolio value at start of each month
self._dd_guard_active = False # True when guard has fired this month
# Tracks when each symbol was added to the universe.
# Used to enforce MIN_HISTORY_DAYS before a symbol is rebalance-eligible.
self._symbol_added_date: dict = {} # Symbol -> datetime
# Momentum indicators — ROC(63) per symbol, created in OnSecuritiesChanged
self._momentum: dict = {} # Symbol -> RateOfChange indicator
# TiingoNews subscription maps
self._news_symbol_by_underlying: dict = {}
self._underlying_by_news_symbol: dict = {}
# Sentiment state
self._sentiment_ewma_by_symbol: dict = {}
self._sentiment_hit_count: dict = {}
self._sentiment_alpha = self.SENTIMENT_ALPHA
# Warmup-safe removal queue
self._pending_liquidations = set()
# Async news queue + dedup score cache
self._news_queue: list = []
self._score_cache: dict = {}
# Local FinBERT
# Backtest: disabled (200ms/article makes backtests take 50+ hours)
# Live: enabled — real-time FinBERT via async queue
self._use_local_finbert = self.live_mode
self._finbert = {}
self._finbert_ready = False
self._finbert_max_chars = 1500
# Warmup — covers momentum lookback (63 trading days ≈ 93 calendar days)
# Pre-warm scores 10 days of TiingoNews history on warmup completion
self.set_warm_up(timedelta(days=self.MOMENTUM_LOOKBACK + 30))
self._initialize_local_finbert()
self.add_universe(self._fundamental_selection)
# Scheduled jobs — order matters: decay → queue → rebalance → risk
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 10),
self._decay_sentiment,
)
# Process news queue every 30 min — sufficient for monthly rebalance,
# reduces scheduler overhead significantly in backtesting.
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.every(timedelta(minutes=30)),
self._process_news_queue,
)
# Rebalance on the first trading day of each month, 30 min after open
self.schedule.on(
self.date_rules.month_start(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self._rebalance_if_due,
)
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 45),
self._daily_risk_check,
)
# Portfolio drawdown guard — runs after risk check
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 60),
self._portfolio_drawdown_guard,
)
# ══════════════════════════════════════════════════════════════════════════
# UNIVERSE SELECTION
# ══════════════════════════════════════════════════════════════════════════
def _fundamental_selection(self, fundamental: List[Fundamental]) -> List[Symbol]:
"""Single-pass coarse + fine filter (v2.5 Fundamental API)."""
filtered = [
f for f in fundamental
if f.has_fundamental_data
and f.price is not None
and float(f.price) > 5
and f.dollar_volume > 10_000_000
]
top1000 = sorted(filtered, key=lambda f: f.dollar_volume, reverse=True)[:1000]
self._coarse_count = len(top1000)
selected = []
for f in top1000:
pe = self._get_float(f, [
"valuation_ratios.pe_ratio",
"valuation_ratios.peratio",
"valuation_ratios.price_earnings_ratio",
])
dte = self._get_float(f, [
"operation_ratios.total_debt_equity_ratio",
"operation_ratios.debt_to_equity",
"operation_ratios.debttoequity",
])
div_yield = self._get_float(f, [
"valuation_ratios.trailing_dividend_yield",
"valuation_ratios.dividend_yield",
"valuation_ratios.dividendyield",
])
roi = self._get_float(f, [
"operation_ratios.roi",
"operation_ratios.return_on_investment",
"operation_ratios.returnoninvesment",
"profitability_ratios.roi",
"profitability_ratios.return_on_investment",
"profitability_ratios.return_on_invested_capital",
"operation_ratios.roic",
"profitability_ratios.roic",
])
if not all(self._is_finite_number(v) for v in [pe, dte, div_yield, roi]):
continue
if pe < 5 or pe > 18: continue
if dte >= 1.0: continue
if div_yield <= 0.01: continue
if roi <= 0.12: continue
selected.append((f.symbol, float(roi)))
selected_sorted = sorted(selected, key=lambda x: x[1], reverse=True)
symbols = [x[0] for x in selected_sorted[:20]]
self._fine_count = len(selected_sorted)
if set(symbols) != set(self._selected_symbols):
self._selected_symbols = symbols
return symbols
# ══════════════════════════════════════════════════════════════════════════
# SECURITIES CHANGED
# ══════════════════════════════════════════════════════════════════════════
def on_securities_changed(self, changes: SecurityChanges) -> None:
for security in changes.removed_securities:
symbol = security.symbol
if self.is_warming_up:
self._pending_liquidations.add(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._sentiment_hit_count.pop(symbol, None)
self._momentum.pop(symbol, None)
self._symbol_added_date.pop(symbol, None)
continue
if self.portfolio[symbol].invested:
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._sentiment_hit_count.pop(symbol, None)
self._momentum.pop(symbol, None)
self._symbol_added_date.pop(symbol, None)
self._remove_tiingo_news_subscription(symbol)
for security in changes.added_securities:
symbol = security.symbol
# Record when this symbol entered the universe
if symbol not in self._symbol_added_date:
self._symbol_added_date[symbol] = self.time
# Create 3-month momentum indicator for each new symbol
if symbol not in self._momentum:
self._momentum[symbol] = self.ROC(
symbol, self.MOMENTUM_LOOKBACK, Resolution.DAILY
)
self._ensure_tiingo_news_subscription(symbol)
# ══════════════════════════════════════════════════════════════════════════
# ON DATA — queue only, no scoring
# ══════════════════════════════════════════════════════════════════════════
def on_data(self, slice: Slice) -> None:
"""Queue TiingoNews items for async scoring. Never blocks here."""
if slice is None:
return
try:
news_by_symbol = slice.get(TiingoNews)
except Exception:
news_by_symbol = None
if news_by_symbol is None:
return
for kvp in news_by_symbol:
try:
news_symbol = kvp.key
item = kvp.value
except Exception:
continue
if news_symbol is None or item is None:
continue
if self._underlying_by_news_symbol.get(news_symbol) is None:
continue
if len(self._news_queue) >= self.QUEUE_MAX_SIZE:
self._news_queue.pop(0) # drop oldest
self._news_queue.append((news_symbol, item))
# ══════════════════════════════════════════════════════════════════════════
# FIX 7: ASYNC NEWS QUEUE PROCESSOR (every 5 min)
# ══════════════════════════════════════════════════════════════════════════
def _process_news_queue(self) -> None:
"""Score all queued news and update EWMA. Runs every 5 min via scheduler.
Skipped during warmup — on_warmup_finished handles historical scoring in bulk.
"""
if self.is_warming_up:
self._news_queue.clear() # discard warmup items — covered by pre-warm
return
if not self._news_queue:
return
snapshot = list(self._news_queue)
self._news_queue = [] # clear immediately so on_data can keep filling
scored = 0
for news_symbol, item in snapshot:
underlying = self._underlying_by_news_symbol.get(news_symbol)
if underlying is None:
continue
text = self._extract_text(item)
if not text:
continue
text_hash = hash(text)
if text_hash in self._score_cache:
# Cache hit — no inference needed
score = self._score_cache[text_hash]
else:
# Score in priority order: FinBERT → keyword fallback
score = self._finbert_sentiment_score(text)
if score is None:
score = self._compute_naive_text_sentiment(item)
# Cache result (None cached too — avoids retrying bad items)
if len(self._score_cache) >= self.SCORE_CACHE_MAX:
try:
self._score_cache.pop(next(iter(self._score_cache)))
except Exception:
pass
self._score_cache[text_hash] = score
if score is not None and self._is_finite_number(score):
self._update_sentiment(underlying, float(score))
scored += 1
if scored > 0 and not self.live_mode:
pass # suppress queue logs in backtest to avoid rate limiting
elif scored > 0:
self.debug(f"Queue processed: {len(snapshot)} items, {scored} scored")
# ══════════════════════════════════════════════════════════════════════════
# TEXT EXTRACTION
# ══════════════════════════════════════════════════════════════════════════
def _extract_text(self, news_item) -> str:
parts = []
for attr in ["title", "Title", "headline", "Headline",
"description", "Description", "summary", "Summary"]:
if hasattr(news_item, attr):
try:
val = getattr(news_item, attr)
if val:
parts.append(str(val).strip())
except Exception:
continue
return " ".join(parts).strip()
# ══════════════════════════════════════════════════════════════════════════
# ORDER EVENTS
# ══════════════════════════════════════════════════════════════════════════
def on_order_event(self, order_event: OrderEvent) -> None:
if order_event is None or order_event.status != OrderStatus.FILLED:
return
symbol = order_event.symbol
if symbol is None or not self.securities.contains_key(symbol):
return
holding = self.portfolio[symbol]
if holding.invested and symbol not in self._entry_price_by_symbol:
fill_price = float(order_event.fill_price)
if self._is_finite_number(fill_price) and fill_price > 0:
self._entry_price_by_symbol[symbol] = fill_price
self._position_entry_date[symbol] = self.time # record entry date
if not holding.invested:
self._entry_price_by_symbol.pop(symbol, None)
self._position_entry_date.pop(symbol, None)
# ══════════════════════════════════════════════════════════════════════════
# SCHEDULED JOBS
# ══════════════════════════════════════════════════════════════════════════
def _rebalance_if_due(self) -> None:
"""Monthly rebalance — fired by month_start scheduler and on universe changes."""
if self.is_warming_up:
return
if not self._selected_symbols:
self.debug("Rebalance skipped — no symbols in universe yet")
return
ranked = self._rank_by_sentiment(self._selected_symbols)
if not ranked:
return
# Filter to symbols with valid price, min history, positive momentum,
# and non-negative sentiment (entry floor)
momentum_excluded = 0
history_excluded = 0
sentiment_excluded = 0
tradeable = []
for s in ranked:
if s not in self.securities:
continue
sec = self.securities[s]
if not sec.has_data or sec.price <= 0 or not sec.is_tradable:
continue
# History filter
added = self._symbol_added_date.get(s)
if added is not None:
days_in_universe = (self.time - added).days
if days_in_universe < self.MIN_HISTORY_DAYS:
history_excluded += 1
continue
# Momentum filter
roc = self._momentum.get(s)
if roc is not None and roc.is_ready:
if float(roc.current.value) < self.MOMENTUM_MIN_RETURN:
momentum_excluded += 1
continue
# Sentiment entry floor — skip trusted symbols with negative sentiment
# Untrusted symbols (insufficient news) pass through to avoid over-exclusion
hits = self._sentiment_hit_count.get(s, 0)
if hits >= self.MIN_NEWS_COUNT:
sentiment = self._get_current_sentiment(s)
if sentiment < self.SENTIMENT_ENTRY_FLOOR:
sentiment_excluded += 1
continue
tradeable.append(s)
if not tradeable:
self.debug("Rebalance skipped — no symbols passed filters")
return
if self.live_mode:
no_data = len(ranked) - len(tradeable) - momentum_excluded - history_excluded - sentiment_excluded
if no_data > 0:
self.debug(f"Rebalance: {no_data} skipped (no price data)")
if history_excluded > 0:
self.debug(f"Rebalance: {history_excluded} skipped (< {self.MIN_HISTORY_DAYS} days)")
if momentum_excluded > 0:
self.debug(f"Rebalance: {momentum_excluded} excluded (momentum filter)")
if sentiment_excluded > 0:
self.debug(f"Rebalance: {sentiment_excluded} excluded (negative sentiment)")
# Cap at MAX_POSITIONS — concentrate in highest conviction names
ranked_f = tradeable[:self.MAX_POSITIONS]
targets = self._build_weighted_targets(ranked_f)
if not targets:
return
self.set_holdings(targets)
# Reset monthly peak and guard at each rebalance — fresh month, fresh baseline
self._monthly_peak_value = self.portfolio.total_portfolio_value
self._dd_guard_active = False
self._last_rebalance_time = self.time
self._pending_rebalance = False
preview = ",".join([x.value for x in ranked_f[:5]])
scores_summary = " | ".join([
f"{s.value}={self._get_current_sentiment(s):.2f}"
for s in ranked_f[:5]
])
rebalance_msg = (
f"Rebalance {self.time.date()} | "
f"coarse={self._coarse_count} fine={self._fine_count} "
f"selected={len(self._selected_symbols)} "
f"hist_excl={history_excluded} mom_excl={momentum_excluded} "
f"sent_excl={sentiment_excluded} positions={len(ranked_f)} | "
f"top5={preview} | scores={scores_summary}"
)
self.debug(rebalance_msg)
# Live mode — log full portfolio after rebalance
if self.live_mode:
self.log(f"[REBALANCE] {rebalance_msg}")
for s in ranked_f:
weight = sum(
t.quantity for t in targets
if t.symbol == s
)
self.log(
f" {s.value}: sentiment={self._get_current_sentiment(s):.3f} "
f"hits={self._sentiment_hit_count.get(s, 0)} "
f"finbert={'yes' if self._finbert_ready else 'keyword'}"
)
def _daily_risk_check(self) -> None:
if self.is_warming_up:
return
for symbol in list(self._entry_price_by_symbol.keys()):
if not self.portfolio[symbol].invested:
self._entry_price_by_symbol.pop(symbol, None)
self._position_entry_date.pop(symbol, None)
continue
entry = self._entry_price_by_symbol.get(symbol, 0.0)
if not self._is_finite_number(entry) or float(entry) <= 0:
self._entry_price_by_symbol.pop(symbol, None)
self._position_entry_date.pop(symbol, None)
continue
# Minimum holding period — skip stop-loss check for first MIN_HOLD_DAYS
entry_date = self._position_entry_date.get(symbol)
if entry_date is not None:
days_held = (self.time - entry_date).days
if days_held < self.MIN_HOLD_DAYS:
continue
price = float(self.securities[symbol].price)
if not self._is_finite_number(price) or price <= 0:
continue
hit_tp = price >= (1.0 + self.TAKE_PROFIT) * float(entry)
hit_sl = price <= (1.0 - self.STOP_LOSS) * float(entry)
if hit_tp or hit_sl:
reason = "take-profit" if hit_tp else "stop-loss"
exit_msg = f"Risk exit [{reason}] {symbol} | entry={entry:.2f} now={price:.2f}"
self.debug(exit_msg)
if self.live_mode:
self.log(f"[RISK EXIT] {exit_msg}")
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._position_entry_date.pop(symbol, None)
def _portfolio_drawdown_guard(self) -> None:
"""
Portfolio-level circuit breaker.
If the portfolio drops more than DD_GUARD_THRESHOLD (10%) from the
monthly peak, scale all invested positions down to DD_GUARD_SCALE (50%)
of their current weight. Guard stays active until next monthly rebalance,
which resets the peak and restores full position sizing.
This directly addresses cluster stop-outs (e.g. April 2025, Sep 2022)
where multiple positions hit -15% simultaneously due to macro shocks.
The guard fires before individual stops are triggered, cutting exposure
while there is still capital to protect.
"""
if self.is_warming_up:
return
equity = self.portfolio.total_portfolio_value
# Initialise peak on first call
if self._monthly_peak_value <= 0:
self._monthly_peak_value = equity
return
# Update peak if portfolio is at a new high
if equity > self._monthly_peak_value:
self._monthly_peak_value = equity
return
# Guard already active this month — don't fire again
if self._dd_guard_active:
return
# Calculate drawdown from monthly peak
drawdown = (self._monthly_peak_value - equity) / self._monthly_peak_value
if drawdown < self.DD_GUARD_THRESHOLD:
return
# Guard fires — scale down all positions by DD_GUARD_SCALE
self._dd_guard_active = True
guard_msg = (
f"DD Guard triggered: drawdown={drawdown:.1%} from peak "
f"${self._monthly_peak_value:,.0f} | current=${equity:,.0f} | "
f"scaling positions to {self.DD_GUARD_SCALE:.0%}"
)
self.debug(guard_msg)
if self.live_mode:
self.log(f"[DD GUARD] {guard_msg}")
targets = []
for symbol, holding in self.portfolio.items():
if not holding.invested:
continue
current_weight = holding.holdings_value / equity
scaled_weight = current_weight * self.DD_GUARD_SCALE
targets.append(PortfolioTarget(symbol, scaled_weight))
if targets:
self.set_holdings(targets)
def _decay_sentiment(self) -> None:
"""Fade all EWMA scores toward zero daily.
Skipped during warmup — scores built in bulk by on_warmup_finished.
"""
if self.is_warming_up:
return
for symbol in list(self._sentiment_ewma_by_symbol.keys()):
v = self._sentiment_ewma_by_symbol.get(symbol)
if self._is_finite_number(v):
self._sentiment_ewma_by_symbol[symbol] = float(v) * self.DECAY_FACTOR
# ══════════════════════════════════════════════════════════════════════════
# SENTIMENT RANKING & WEIGHTING
# ══════════════════════════════════════════════════════════════════════════
def _rank_by_sentiment(self, symbols: List[Symbol]) -> List[Symbol]:
if not symbols:
return []
trusted, untrusted = [], []
for s in symbols:
score = float(self._get_current_sentiment(s))
hits = self._sentiment_hit_count.get(s, 0)
(trusted if hits >= self.MIN_NEWS_COUNT else untrusted).append((s, score))
trusted.sort(key=lambda x: x[1], reverse=True)
untrusted.sort(key=lambda x: x[1], reverse=True)
return [x[0] for x in trusted + untrusted]
def _build_weighted_targets(self, ranked: List[Symbol]) -> List[PortfolioTarget]:
"""
Build portfolio targets with three improvements for Sharpe:
1. Sentiment-tilted weighting — top 25% by sentiment get 60% of capital,
bottom 75% share 40%. (was 50/50 — increases signal contribution)
2. Per-position weight cap at MAX_POSITION_WEIGHT (20%) — prevents
inverse-vol weighting over-concentrating in a single low-vol name.
3. Renormalise after capping so weights always sum to 1.0.
"""
n = len(ranked)
if n <= 0:
return []
top_n = max(1, min(int(math.ceil(0.25 * n)), n))
rest_n = n - top_n
weights: dict = {}
if rest_n <= 0:
w = 1.0 / n
for s in ranked:
weights[s] = w
else:
# Tilted 60/40 split (was 50/50) — rewards top sentiment names more
w_top = 0.60 / top_n
w_rest = 0.40 / rest_n
for i, s in enumerate(ranked):
weights[s] = w_top if i < top_n else w_rest
# Apply per-position cap
capped = False
for s in weights:
if weights[s] > self.MAX_POSITION_WEIGHT:
weights[s] = self.MAX_POSITION_WEIGHT
capped = True
# Renormalise so weights sum to 1.0
total = sum(weights.values())
if self._is_finite_number(total) and total > 0:
for s in weights:
weights[s] /= total
return [PortfolioTarget(s, float(w)) for s, w in weights.items()]
# ══════════════════════════════════════════════════════════════════════════
# SENTIMENT HELPERS
# ══════════════════════════════════════════════════════════════════════════
def _update_sentiment(self, symbol: Symbol, score: float) -> None:
if symbol is None or not self._is_finite_number(score):
return
prev = self._sentiment_ewma_by_symbol.get(symbol, float("nan"))
if not self._is_finite_number(prev):
self._sentiment_ewma_by_symbol[symbol] = float(score)
else:
a = self._sentiment_alpha
self._sentiment_ewma_by_symbol[symbol] = (
a * float(score) + (1.0 - a) * float(prev)
)
self._sentiment_hit_count[symbol] = self._sentiment_hit_count.get(symbol, 0) + 1
def _get_current_sentiment(self, symbol: Symbol) -> float:
if symbol is None:
return 0.0
v = self._sentiment_ewma_by_symbol.get(symbol, float("nan"))
if not self._is_finite_number(v):
return 0.0
return float(v)
# ══════════════════════════════════════════════════════════════════════════
# LOCAL FINBERT (fallback)
# ══════════════════════════════════════════════════════════════════════════
def _initialize_local_finbert(self) -> None:
"""Load FinBERT for live trading only — skipped entirely in backtesting."""
self._finbert_ready = False
self._finbert = {}
# Skip in backtesting — model download (~440MB) blocks initialization
# and inference at 200ms/article makes backtests take 50+ hours.
if not self.live_mode:
self.debug("FinBERT skipped — backtest mode, using keyword model")
return
if not self._use_local_finbert:
self.debug("FinBERT disabled by configuration")
return
try:
from transformers import pipeline # type: ignore
pipe = pipeline(
task="sentiment-analysis",
model="ProsusAI/finbert",
tokenizer="ProsusAI/finbert",
truncation=True,
)
test = pipe("earnings beat expectations")
if not test:
raise RuntimeError("Empty test inference")
self._finbert = {"pipeline": pipe}
self._finbert_ready = True
self.debug(f"Local FinBERT ready (test: {test[0].get('label')})")
except Exception as exc:
self.debug(f"Local FinBERT unavailable: {exc}")
def _finbert_sentiment_score(self, text: str):
if not self._finbert_ready or not text:
return None
try:
pipe = self._finbert.get("pipeline")
if pipe is None:
return None
result = pipe(str(text).strip()[:self._finbert_max_chars])
if isinstance(result, list) and result:
result = result[0]
if not isinstance(result, dict):
return None
label = str(result.get("label", "")).lower()
conf = max(0.0, min(1.0, float(result.get("score", 0.0))))
if "pos" in label: return conf
if "neg" in label: return -conf
if "neu" in label: return 0.0
return None
except Exception:
return None
# ══════════════════════════════════════════════════════════════════════════
# ENHANCED KEYWORD SENTIMENT FALLBACK
#
# Design decisions (per user config):
# - Mixed / All sectors → broad keyword coverage across earnings,
# M&A, legal, guidance, dividends, credit, operations, products
# - Ambiguous words → treated as negative (conservative)
# - Macro keywords → excluded (company-specific only)
#
# Scoring model:
# Three-tier weighted system instead of simple +1/-1 counting:
# STRONG positive = +1.5 | negative = -1.5
# NORMAL positive = +1.0 | negative = -1.0
# WEAK positive = +0.5 | negative = -0.5
# Ambiguous words contribute -0.3 (conservative treatment).
# Negation handling: "not", "no", "never", "didn't", "won't" etc.
# before a positive word flips it to negative weight, and vice versa.
# Final score is clamped to [-1, +1].
# ══════════════════════════════════════════════════════════════════════════
# ── Keyword dictionaries (class-level, built once) ─────────────────────
# Strong positive signals — high confidence, high impact
_KW_STRONG_POS: frozenset = frozenset({
# Earnings beats
"beat", "beats", "beating", "blowout", "smashed", "crushed", "topped",
"exceeded", "surpassed", "outperformed",
# Revenue / profit strength
"record", "record-breaking", "all-time-high", "explosive", "blockbuster",
"landmark", "milestone",
# Guidance raises
"raised", "raises", "raise", "lifted", "increased", "boosted", "upped",
"reiterated",
# M&A / deals
"acquisition", "acquired", "merger", "buyout", "takeover", "deal",
"partnership", "collaboration", "alliance", "joint-venture",
# Shareholder returns
"dividend", "dividends", "buyback", "repurchase", "special-dividend",
"distribution",
# Ratings / upgrades
"upgrade", "upgraded", "upgrades", "overweight", "outperform",
"strong-buy", "initiates",
# Product / pipeline wins
"approved", "approval", "launched", "breakthrough", "patent",
"clearance", "fda-approval", "authorized",
})
# Normal positive signals — solid but not exceptional
_KW_NORMAL_POS: frozenset = frozenset({
# Earnings / financials
"profit", "profits", "profitable", "earnings", "revenue", "growth",
"grew", "grow", "growing", "gains", "gain", "positive", "strong",
"solid", "robust", "healthy", "improved", "improvement", "improving",
"momentum", "expansion", "expanding",
# Operations
"efficient", "efficiency", "streamlined", "optimized", "margin",
"margins", "cash-flow", "cashflow", "synergies", "synergy",
# Market position
"market-share", "competitive", "dominance", "leading", "leader",
"innovative", "innovation", "differentiated",
# Guidance / outlook
"guidance", "outlook", "forecast", "confident", "confidence",
"optimistic", "opportunity", "opportunities",
# Capital structure
"debt-free", "investment-grade", "upgraded-credit", "liquidity",
"well-capitalized",
# General
"win", "wins", "winning", "success", "successful", "deliver",
"delivered", "delivering", "momentum", "buy", "bullish", "bull",
})
# Weak positive signals — mildly encouraging
_KW_WEAK_POS: frozenset = frozenset({
"stable", "steady", "maintained", "maintains", "in-line", "inline",
"met", "meets", "meeting", "matched", "matching", "resilient",
"recovery", "recovering", "stabilizing", "stabilized", "bottomed",
"rebound", "rebounding", "bouncing", "normalizing", "normalize",
"gradual", "gradually", "progress", "progressing",
})
# Strong negative signals — high severity
_KW_STRONG_NEG: frozenset = frozenset({
# Earnings misses
"miss", "misses", "missed", "missed-estimates", "shortfall",
"disappointed", "disappoints", "disappointing", "dismal",
# Legal / regulatory
"lawsuit", "sued", "litigation", "indicted", "fraud", "scandal",
"investigation", "probe", "subpoena", "regulatory-action", "fine",
"fined", "penalty", "penalties", "violation", "violations",
"criminal", "charges", "charged",
# Ratings / downgrades
"downgrade", "downgraded", "downgrades", "underweight", "underperform",
"sell", "strong-sell", "avoid",
# Severe operational / financial distress
"bankruptcy", "bankrupt", "insolvent", "default", "defaulted",
"restructuring", "chapter-11", "liquidation", "seized",
"receivership", "collapse", "collapsed", "imploded",
# Guidance cuts
"cut", "cuts", "cutting", "slashed", "slashing", "slashed-guidance",
"reduced", "reduces", "lowered", "withdrew", "withdrawn",
"suspended", "suspends", "suspending",
# Layoffs / restructuring
"layoffs", "layoff", "fired", "termination", "terminated",
"mass-layoff", "job-cuts", "redundancies",
})
# Normal negative signals
_KW_NORMAL_NEG: frozenset = frozenset({
# Earnings / financials
"loss", "losses", "losing", "deficit", "write-down", "writedown",
"write-off", "writeoff", "impairment", "charge", "charges",
"negative", "weak", "weakness", "softness", "soft", "sluggish",
"slowdown", "slowing", "declined", "declines", "declining",
"decreased", "decrease", "fell", "fall", "falls", "falling",
"down", "drop", "drops", "dropped", "lower", "lowered",
# Guidance concerns
"warned", "warns", "warning", "cautious", "caution",
"headwinds", "headwind", "pressure", "pressured", "pressures",
"challenged", "challenges", "difficult", "difficulties",
# Market / competitive
"lost", "losing", "market-share-loss", "competition", "competitive-pressure",
"disrupted", "disruption", "obsolete",
# Capital structure
"dilution", "diluted", "debt", "leverage", "overleveraged",
"downgraded-credit", "junk", "high-yield-risk",
# Ratings
"bear", "bearish",
})
# Weak negative signals — mildly concerning
_KW_WEAK_NEG: frozenset = frozenset({
"below", "missed-slightly", "slightly-below", "modestly-below",
"modest-decline", "slight-decline", "marginal-decline",
"uncertainty", "uncertain", "unclear", "remains-unclear",
"mixed", "uneven", "inconsistent", "volatile", "volatility",
"delayed", "delay", "delays", "postponed", "postponement",
"slower", "slowed", "muted", "subdued", "tepid", "lackluster",
})
# Ambiguous words → treated as negative (conservative, per user config)
_KW_AMBIGUOUS: frozenset = frozenset({
"volatile", "volatility", "cautious", "caution", "mixed",
"uncertain", "uncertainty", "unclear", "challenging", "complex",
"complicated", "evolving", "fluid", "dynamic", "transitioning",
"transition", "restructure", "restructuring", "transforming",
"transformation", "pivoting", "pivot",
})
# Negation words — flip the polarity of the next sentiment word
_KW_NEGATIONS: frozenset = frozenset({
"not", "no", "never", "neither", "nor", "without", "lack",
"lacking", "lacks", "failed", "fails", "unable", "unlikely",
"didn't", "doesn't", "don't", "won't", "wasn't", "weren't",
"isn't", "aren't", "hasn't", "haven't", "couldn't", "wouldn't",
"shouldn't", "cannot", "cant",
})
# Macro / market-wide words to EXCLUDE (company-specific only per config)
_KW_MACRO_EXCLUDE: frozenset = frozenset({
"fed", "federal-reserve", "fomc", "interest-rate", "interest-rates",
"inflation", "cpi", "ppi", "gdp", "unemployment", "jobs-report",
"nonfarm", "payrolls", "treasury", "yield-curve", "quantitative",
"tightening", "tapering", "rate-hike", "rate-cut", "basis-points",
"recession", "economic", "economy", "macro", "geopolitical",
"tariff", "tariffs", "trade-war", "sanctions", "opec",
})
def _compute_naive_text_sentiment(self, news_item) -> float:
"""
Enhanced weighted keyword sentiment scorer.
Improvements over the original:
- Three signal tiers (strong/normal/weak) with weights 1.5/1.0/0.5
- Negation handling: flips polarity within a 3-token window
- Ambiguous words scored as -0.3 (conservative per config)
- Macro/market-wide keywords excluded (company-specific only)
- Score normalised by total signal weight, then clamped to [-1, +1]
- Minimum token threshold to avoid scoring near-empty headlines
"""
text = self._extract_text(news_item)
if not text:
return None
try:
cleaned = str(text).lower()
for ch in "\n\r\t,.;:!?()[]{}'\"":
cleaned = cleaned.replace(ch, " ")
# Normalise hyphens so "strong-buy" tokenises as one unit
tokens = [t.strip("-") for t in cleaned.split() if t.strip("-")]
except Exception:
return None
if len(tokens) < 3:
return None
# Build bigram + trigram tokens for compound phrases
bigrams = [tokens[i] + "-" + tokens[i+1] for i in range(len(tokens)-1)]
trigrams = [tokens[i] + "-" + tokens[i+1] + "-" + tokens[i+2]
for i in range(len(tokens)-2)]
all_tokens = tokens + bigrams + trigrams
# Macro filter — if headline is predominantly macro, return neutral
macro_hits = sum(1 for t in all_tokens if t in self._KW_MACRO_EXCLUDE)
if macro_hits >= 3:
return 0.0
total_score = 0.0
total_weight = 0.0
# Negation window: track index of last negation token
negation_indices = {i for i, t in enumerate(tokens) if t in self._KW_NEGATIONS}
def _is_negated(idx: int) -> bool:
"""True if any negation falls within 3 tokens before idx."""
return any(idx - 3 <= ni < idx for ni in negation_indices)
for i, token in enumerate(tokens):
weight = None
polarity = None # +1 or -1
if token in self._KW_STRONG_POS:
weight, polarity = 1.5, 1.0
elif token in self._KW_NORMAL_POS:
weight, polarity = 1.0, 1.0
elif token in self._KW_WEAK_POS:
weight, polarity = 0.5, 1.0
elif token in self._KW_STRONG_NEG:
weight, polarity = 1.5, -1.0
elif token in self._KW_NORMAL_NEG:
weight, polarity = 1.0, -1.0
elif token in self._KW_WEAK_NEG:
weight, polarity = 0.5, -1.0
elif token in self._KW_AMBIGUOUS:
# Ambiguous → conservative negative treatment
total_score -= 0.3
total_weight += 0.3
continue
if weight is None:
continue
# Flip polarity if preceded by a negation within 3 tokens
if _is_negated(i):
polarity *= -1.0
total_score += weight * polarity
total_weight += weight
if total_weight == 0.0:
return 0.0
# Normalise and clamp to [-1, +1]
score = total_score / total_weight
score = max(-1.0, min(1.0, score))
return float(score) if self._is_finite_number(score) else None
# ══════════════════════════════════════════════════════════════════════════
# TIINGO NEWS SUBSCRIPTIONS
# ══════════════════════════════════════════════════════════════════════════
def _ensure_tiingo_news_subscription(self, underlying: Symbol) -> None:
if underlying is None or underlying in self._news_symbol_by_underlying:
return
news_symbol = self.add_data(TiingoNews, underlying).symbol
self._news_symbol_by_underlying[underlying] = news_symbol
self._underlying_by_news_symbol[news_symbol] = underlying
def _remove_tiingo_news_subscription(self, underlying: Symbol) -> None:
if underlying is None:
return
news_symbol = self._news_symbol_by_underlying.pop(underlying, None)
if news_symbol is None:
return
self._underlying_by_news_symbol.pop(news_symbol, None)
try:
self.remove_security(news_symbol)
except Exception:
pass
# ══════════════════════════════════════════════════════════════════════════
# WARMUP FINISHED
# ══════════════════════════════════════════════════════════════════════════
# Cap articles per symbol during pre-warm to prevent OOM on QC cloud nodes.
# At ~200ms per FinBERT call: 5 articles x 20 symbols = ~20 seconds, safe.
PREWARM_MAX_ARTICLES_PER_SYMBOL = 5
# Set True if backtest still crashes — skips FinBERT in pre-warm entirely,
# uses keyword model only (instant, zero RAM overhead).
PREWARM_KEYWORD_ONLY = False
def on_warmup_finished(self) -> None:
"""
Runs once when the 10-day warmup ends.
1. Liquidate positions queued for removal during warmup.
2. Pre-warm sentiment EWMA from 10 days of TiingoNews history,
capped at PREWARM_MAX_ARTICLES_PER_SYMBOL per symbol to
prevent OOM on QC cloud nodes.
"""
# ── 1. Pending liquidations ────────────────────────────────────────
for symbol in list(self._pending_liquidations):
if self.portfolio[symbol].invested:
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._sentiment_hit_count.pop(symbol, None)
self._momentum.pop(symbol, None)
self._symbol_added_date.pop(symbol, None)
self._remove_tiingo_news_subscription(symbol)
self._pending_liquidations.clear()
# ── 2. Pre-warm sentiment ──────────────────────────────────────────
scorer = "keyword-only" if (self.PREWARM_KEYWORD_ONLY or not self.live_mode) \
else "FinBERT+keyword"
self.debug(f"Sentiment pre-warm starting [{scorer}] ...")
total_articles = 0
total_scored = 0
for underlying, news_symbol in list(self._news_symbol_by_underlying.items()):
try:
history = self.history(TiingoNews, news_symbol, 10, Resolution.DAILY)
if history is None or history.empty:
continue
# Take only the N most recent articles per symbol
rows = list(history.iterrows())[-self.PREWARM_MAX_ARTICLES_PER_SYMBOL:]
for _, row in rows:
text_parts = []
for col in ["title", "description", "summary",
"Title", "Description", "Summary"]:
val = row.get(col, "")
if val and str(val).strip():
text_parts.append(str(val).strip())
text = " ".join(text_parts).strip()
if not text:
continue
total_articles += 1
text_hash = hash(text)
if text_hash in self._score_cache:
score = self._score_cache[text_hash]
else:
# FinBERT skipped in backtest — keyword model only
if self.PREWARM_KEYWORD_ONLY or not self.live_mode:
score = None
else:
score = self._finbert_sentiment_score(text)
# Keyword fallback
if score is None:
class _Row:
pass
r = _Row()
r.title = row.get("title", "")
r.description = row.get("description", "")
r.summary = row.get("summary", "")
score = self._compute_naive_text_sentiment(r)
if len(self._score_cache) >= self.SCORE_CACHE_MAX:
try:
self._score_cache.pop(next(iter(self._score_cache)))
except Exception:
pass
self._score_cache[text_hash] = score
if score is not None and self._is_finite_number(score):
self._update_sentiment(underlying, float(score))
total_scored += 1
except Exception as e:
self.debug(f"Pre-warm error [{underlying.value}]: {e}")
continue
trusted = sum(
1 for c in self._sentiment_hit_count.values()
if c >= self.MIN_NEWS_COUNT
)
self.debug(
f"Sentiment pre-warm complete | "
f"articles={total_articles} scored={total_scored} | "
f"symbols={len(self._sentiment_hit_count)} | "
f"trusted (>={self.MIN_NEWS_COUNT} hits)={trusted}"
)
# ══════════════════════════════════════════════════════════════════════════
# STATIC UTILITIES
# ══════════════════════════════════════════════════════════════════════════
@staticmethod
def _is_finite_number(x) -> bool:
if x is None or isinstance(x, bool):
return False
try:
return math.isfinite(float(x))
except Exception:
return False
@staticmethod
def _get_float(obj, attr_paths: List[str]):
for path in attr_paths:
current = obj
ok = True
for part in path.split("."):
if current is None or not hasattr(current, part):
ok = False
break
current = getattr(current, part)
if not ok or current is None:
continue
if hasattr(current, "value"):
current = current.value
try:
return float(current)
except Exception:
continue
return None
def _to_ratio(self, value):
if not self._is_finite_number(value):
return None
v = float(value)
return v / 100.0 if v > 1.0 else v