| Overall Statistics |
|
Total Orders 624 Average Win 0.94% Average Loss -0.55% Compounding Annual Return 13.756% Drawdown 17.900% Expectancy 0.379 Start Equity 100000 End Equity 190491.10 Net Profit 90.491% Sharpe Ratio 0.503 Sortino Ratio 0.632 Probabilistic Sharpe Ratio 23.856% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.70 Alpha 0.029 Beta 0.521 Annual Standard Deviation 0.131 Annual Variance 0.017 Information Ratio -0.044 Tracking Error 0.128 Treynor Ratio 0.126 Total Fees $983.54 Estimated Strategy Capacity $0 Lowest Capacity Asset EOG R735QTJ8XC9X Portfolio Turnover 3.39% Drawdown Recovery 579 |
from AlgorithmImports import *
class TrendPredictor:
"""Simple, dependency-free trend prediction proxy (MLFinLab-like intent).
Given a sequence of daily closes (oldest -> newest), it computes a trend score
using an OLS linear regression slope on log(prices). Positive slope implies an
uptrend. The API is intentionally small so it can be swapped for a more
sophisticated model later.
Returns:
trend_score (float): Higher means stronger uptrend.
predicted_up (bool): True if slope > 0.
NOTE:
The algorithm now primarily uses an EMA-based trend filter for selection,
but this class is kept as a pluggable component for future experimentation.
"""
def __init__(self, lookback: int = 21) -> None:
self._lookback = max(2, int(lookback))
@property
def lookback(self) -> int:
return self._lookback
def predict(self, closes: list) -> tuple:
n = len(closes)
if n < 2:
return 0.0, False
log_prices = []
for p in closes:
if p is None:
return 0.0, False
price = float(p)
if price <= 0:
return 0.0, False
log_prices.append(math.log(price))
x_mean = (n - 1) / 2.0
y_mean = sum(log_prices) / float(n)
num = 0.0
den = 0.0
for i in range(n):
dx = float(i) - x_mean
dy = log_prices[i] - y_mean
num += dx * dy
den += dx * dx
if den == 0.0:
return 0.0, False
slope = num / den
# Scale by window length to make scores more separable.
trend_score = slope * float(n)
predicted_up = slope > 0.0
return float(trend_score), bool(predicted_up)
class FundamentalValueQualityTrendAlgorithm(QCAlgorithm):
"""Value/quality universe + trend filter + volatility targeting (monthly rebalance).
Strategy thesis
---------------
This strategy seeks relatively cheap, financially healthier, shareholder-friendly
U.S. stocks, then applies a lightweight trend classifier to avoid (some) value traps.
It now adds:
- EMA-based trend filter (more robust than raw slope alone)
- Inverse-volatility position sizing (volatility targeting)
- Simple portfolio drawdown guard
Universe Selection (Coarse + Fine)
----------------------------------
1) Coarse (daily):
- U.S. equities with fundamental data
- Price > $5
- Take top 1000 by dollar volume (liquidity filter)
2) Fine (daily):
Filter to a value/quality profile:
- P/E between 5 and 15 (inclusive)
- Debt-to-equity < 1
- Dividend yield > 1% (0.01)
- ROI > 10% (0.10)
Note: FineFundamental field names/availability can vary; the implementation uses
robust attribute access helpers that attempt multiple plausible paths.
3) Ranking:
From fine-filtered set, rank by P/E ascending (more value = lower P/E).
The algorithm keeps this working set for the trend step at rebalance.
Trend / ML Step
---------------
At rebalance time:
- Fetch last 21 trading days of daily close prices using History()
- Compute an EMA-based trend score:
* fast EMA (e.g., 10) vs slow EMA (e.g., 21)
* predicted_up is True when fast EMA > slow EMA
* trend_score = fast_ema - slow_ema (used for ranking)
Portfolio Construction / Rebalance
----------------------------------
- Target portfolio size: 15 stocks
- Primary selection: predicted_up == True (ranked by trend_score, highest first)
- Final ordering: sort selected by P/E ascending and keep 15.
- Position sizing: inverse-volatility weights (21-day log-return volatility).
- Monthly rebalance scheduled at MonthStart (anchored to SPY) after market open.
- Liquidate positions not in the current selection at each rebalance.
- Portfolio drawdown guard: if drawdown exceeds a threshold, scale exposure down.
Risks / assumptions
-------------------
- Fundamental fields can be missing; this can reduce the eligible universe.
- The trend proxy is still relatively simple and can whipsaw.
- Monthly turnover and transaction costs can impact live results.
"""
def initialize(self) -> None:
self.set_start_date(2016, 1, 1)
self.set_end_date(2026, 1, 1)
self.set_cash(100000)
# Daily resolution for universe and history/trend computations
self.universe_settings.resolution = Resolution.DAILY
# Scheduling anchor
self._spy = self.add_equity("SPY", Resolution.DAILY).symbol
# Universe selection (Coarse + Fine)
self._coarse_count = 1000
self.add_universe(self._coarse_selection_function, self._fine_selection_function)
# Strategy parameters
self._portfolio_size = 15
self._trend_predictor = TrendPredictor(lookback=21)
# Trend / vol parameters
self._trend_fast_ema = 10
self._trend_slow_ema = 21
self._vol_lookback = 21
self._max_drawdown_for_scaling = 0.10 # 10% portfolio DD guard
# State (initialized with safe defaults, not None)
self._latest_coarse_count = 0
self._latest_fine_pass_count = 0
self._latest_ml_pass_count = 0
self._fine_working_set = []
self._pe_by_symbol = {}
self._last_selection = []
# Track equity peak for drawdown calculations
self._equity_peak = self.portfolio.total_portfolio_value
# Rebalance monthly on month start
self.schedule.on(
self.date_rules.month_start(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self._rebalance
)
# Warmup for stable history access
self.set_warm_up(30, Resolution.DAILY)
# -----------------
# Universe selection
# -----------------
def _coarse_selection_function(self, coarse: list) -> list:
# Step 1: basic filters + liquidity
filtered = []
for x in coarse:
if not x.has_fundamental_data:
continue
if x.price is None or x.price <= 5:
continue
filtered.append(x)
filtered.sort(key=lambda c: c.dollar_volume, reverse=True)
selected = filtered[: self._coarse_count]
self._latest_coarse_count = len(selected)
return [c.symbol for c in selected]
def _fine_selection_function(self, fine: list) -> list:
# Step 2: value/quality filters (robust field access)
passed = []
pe_by_symbol = {}
for f in fine:
pe = self._get_float(
f,
[
"ValuationRatios.PERatio",
"ValuationRatios.PE",
"PERatio",
"PE",
],
default=float("nan"),
)
debt_to_equity = self._get_float(
f,
[
"OperationRatios.DebtEquityRatio",
"OperationRatios.TotalDebtEquityRatio",
"FinancialStatements.balance_sheet.TotalDebtEquityRatio",
"DebtEquityRatio",
],
default=float("nan"),
)
dividend_yield = self._get_float(
f,
[
"ValuationRatios.ForwardDividendYield",
"ValuationRatios.DividendYield",
"DividendYield",
],
default=float("nan"),
)
# ROI can be represented differently; attempt a few likely fields.
roi = self._get_float(
f,
[
"OperationRatios.ROIC",
"OperationRatios.roi",
"OperationRatios.ReturnOnInvestment",
"ROIC",
"ROI",
],
default=float("nan"),
)
if not self._is_finite(pe) or not self._is_finite(debt_to_equity) or not self._is_finite(dividend_yield) or not self._is_finite(roi):
continue
# Apply filters
if pe < 5.0 or pe > 15.0:
continue
if debt_to_equity >= 1.0:
continue
if dividend_yield <= 0.01:
continue
if roi <= 0.10:
continue
passed.append(f)
pe_by_symbol[f.symbol] = float(pe)
# Step 3: rank by P/E ascending (more value = lower P/E), keep working set for trend step
passed.sort(key=lambda ff: pe_by_symbol.get(ff.symbol, float("inf")))
passed = passed[: self._portfolio_size + 5]
self._fine_working_set = [ff.symbol for ff in passed]
self._pe_by_symbol = pe_by_symbol
self._latest_fine_pass_count = len(self._fine_working_set)
return self._fine_working_set
# -------------
# Rebalance step
# -------------
def _rebalance(self) -> None:
if self.is_warming_up:
return
# Update equity peak and compute current drawdown
self._equity_peak = max(self._equity_peak, self.portfolio.total_portfolio_value)
current_dd = self._portfolio_drawdown()
# Candidates from the most recent fine selection
candidates = list(self._fine_working_set)
if len(candidates) == 0:
self.debug("Rebalance: no fine candidates; skipping.")
return
# Step 4: trend prediction + volatility via History()
scores = {}
vols = {}
ml_pass = []
history = self.history(candidates, max(self._trend_slow_ema, self._vol_lookback), Resolution.DAILY)
if history.empty:
self.debug("Rebalance: History returned empty; skipping.")
return
# Try to parse pandas multi-index history (symbol, time)
for symbol in candidates:
try:
if symbol not in history.index.get_level_values(0):
continue
sym_hist = history.loc[symbol]
if "close" not in sym_hist.columns:
continue
closes_series = sym_hist["close"].dropna()
closes = [float(x) for x in closes_series.values.tolist()]
if len(closes) < max(self._trend_slow_ema, self._vol_lookback):
continue
# EMA-based trend filter
trend_score, up = self._ema_trend(closes, self._trend_fast_ema, self._trend_slow_ema)
scores[symbol] = float(trend_score)
# Volatility estimate (for inverse-vol weighting)
vols[symbol] = self._calculate_volatility(closes[-self._vol_lookback:])
if up:
ml_pass.append(symbol)
except Exception:
# Defensive: if any symbol's history parsing fails, skip it
continue
self._latest_ml_pass_count = len(ml_pass)
# Step 5: Select top N with predicted_up; rank by trend_score
selected = sorted(ml_pass, key=lambda s: scores.get(s, float("-inf")), reverse=True)
selected = selected[: self._portfolio_size]
# Step 6: Final ordering by P/E ascending and keep N
selected.sort(key=lambda s: self._pe_by_symbol.get(s, float("inf")))
selected = selected[: self._portfolio_size]
# Debug logs (counts + selected symbols)
self.debug(
f"Universe funnel: coarse={self._latest_coarse_count}, "
f"fine_pass={self._latest_fine_pass_count}, "
f"ml_pass={self._latest_ml_pass_count}, "
f"selected={len(selected)}, "
f"drawdown={current_dd:.2%}"
)
selected_str = ",".join(
[f"{s.value}(PE={self._pe_by_symbol.get(s, float('nan')):.2f})" for s in selected]
)
self.debug("Selected symbols: " + selected_str)
# Step 7: Liquidate positions not in selection
selected_set = set(selected)
for symbol, holding in self.portfolio.items():
if holding.invested and symbol not in selected_set:
self.liquidate(symbol, "Removed from selection")
if len(selected) == 0:
self._last_selection = []
return
# Step 8: Inverse-volatility position sizing
inv_vols = {}
for symbol in selected:
vol = vols.get(symbol, float("inf"))
if vol is None or vol <= 0 or not math.isfinite(vol):
continue
inv_vols[symbol] = 1.0 / vol
# Fallback to equal-weight if we couldn't compute vols
if not inv_vols:
weight = 1.0 / float(len(selected))
weights = {s: weight for s in selected}
else:
total_inv_vol = sum(inv_vols.values())
weights = {s: inv_vols[s] / total_inv_vol for s in inv_vols.keys()}
# Step 9: Apply portfolio drawdown guard (scale exposure when DD is high)
scale = 1.0
if current_dd > self._max_drawdown_for_scaling:
self.debug(
f"Drawdown {current_dd:.2%} exceeds {self._max_drawdown_for_scaling:.2%}, scaling exposure by 0.5"
)
scale = 0.5
# Step 10: Set holdings
for symbol in selected:
if symbol not in self.securities:
continue
if not self.securities[symbol].is_tradable:
continue
target_weight = weights.get(symbol, 0.0) * scale
if target_weight <= 0:
continue
self.set_holdings(symbol, target_weight)
self._last_selection = selected
# ----------------
# Helper functions
# ----------------
def _get_float(self, root: object, paths: list, default: float) -> float:
"""Robust attribute getter that tolerates:
- Missing nested fields
- Non-numeric boxed values
- Objects with a `.Value` numeric property (common in QC fundamentals)
Args:
root: object to traverse (e.g., FineFundamental)
paths: list of dot-separated attribute paths to try
default: fallback float value
Returns:
float (or default)
"""
for path in paths:
value = self._try_get_attr_path(root, path)
numeric = self._coerce_to_float(value)
if numeric is None:
continue
return float(numeric)
return float(default)
def _try_get_attr_path(self, root: object, path: str) -> object:
current = root
for part in path.split("."):
if current is None or not hasattr(current, part):
return None
current = getattr(current, part)
return current
def _coerce_to_float(self, value: object) -> float:
if value is None:
return None
if hasattr(value, "Value"):
try:
return float(getattr(value, "Value"))
except Exception:
return None
try:
return float(value)
except Exception:
return None
def _is_finite(self, x: float) -> bool:
try:
return x is not None and math.isfinite(float(x))
except Exception:
return False
def _ema_trend(self, closes: list, fast: int, slow: int) -> tuple:
"""EMA-based trend proxy.
Args:
closes: list of close prices (oldest -> newest)
fast: fast EMA span
slow: slow EMA span
Returns:
(trend_score, predicted_up)
trend_score = fast_ema - slow_ema (higher = stronger uptrend)
predicted_up = True if fast_ema > slow_ema
"""
if len(closes) < slow:
return 0.0, False
series = pd.Series(closes)
fast_ema = series.ewm(span=fast).mean().iloc[-1]
slow_ema = series.ewm(span=slow).mean().iloc[-1]
trend_score = float(fast_ema - slow_ema)
predicted_up = fast_ema > slow_ema
return trend_score, predicted_up
def _calculate_volatility(self, closes: list) -> float:
"""Compute simple log-return volatility over the given closes."""
if len(closes) < 2:
return float("inf")
returns = []
for i in range(1, len(closes)):
if closes[i - 1] <= 0 or closes[i] <= 0:
continue
r = math.log(closes[i] / closes[i - 1])
returns.append(r)
if len(returns) < 2:
return float("inf")
return float(np.std(returns))
def _portfolio_drawdown(self) -> float:
"""Compute current portfolio drawdown relative to peak equity."""
if self._equity_peak <= 0:
return 0.0
equity = self.portfolio.total_portfolio_value
dd = (self._equity_peak - equity) / self._equity_peak
return max(0.0, float(dd))
from AlgorithmImports import *
class FundamentalValueSentiment30DayRebalanceAlgorithm(QCAlgorithm):
"""Fundamental value + TiingoNews sentiment strategy with 30-day rebalancing.
Strategy overview
-----------------
This algorithm combines a fundamentals-based universe with a news-sentiment overlay:
1) Universe selection (US equities)
- Coarse: selects liquid equities (top 1000 by dollar volume), with fundamental data,
and price > $5.
- Fine: filters for a value/quality profile to select top 20:
- P/E between 5 and 15
- Debt-to-equity < 1
- Dividend yield > 1%
- ROI > 10%
2) News sentiment (TiingoNews)
- For each selected underlying, subscribes to `TiingoNews` using `add_data`.
- Each incoming news item is converted into a sentiment score using FinBERT
(ProsusAI/finbert) when available.
- If FinBERT can't be loaded or inference fails, the algorithm falls back to:
a) numeric sentiment/polarity/score fields (if present on the TiingoNews item)
b) a naive keyword-based text sentiment model (last resort)
3) Sentiment aggregation
- Maintains an exponentially-weighted moving average (EWMA) sentiment score
per underlying symbol:
ewma_t = alpha * score_t + (1 - alpha) * ewma_{t-1}
4) Portfolio construction / rebalance
- Roughly every 30 days:
- ranks the current fine-selected list by EWMA sentiment
- takes the top subset and produces weights using the existing weighting logic
(allocation behavior is unchanged)
- calls `set_holdings(targets)` to rebalance
5) Risk rules (Disabled)
- Uses recorded entry prices (captured on filled orders) to apply:
- take-profit at +50% from entry
- stop-loss at -10% from entry
Warmup handling
--------------
QuantConnect warmup is intended for building indicators/state, not trading.
Universe selection can still add/remove securities during warmup, so this algorithm
queues any symbols removed during warmup and processes liquidation/subscription cleanup
in `on_warmup_finished()`.
"""
def initialize(self) -> None:
"""Initialize state, schedules, universe selection, and sentiment model."""
self.set_start_date(2021, 1, 1)
self.set_end_date(2025, 1, 1)
self.set_cash(100000)
self.universe_settings.resolution = Resolution.DAILY
self.universe_settings.data_normalization_mode = DataNormalizationMode.ADJUSTED
# Used only as a scheduling anchor (market hours/time rules).
self._spy = self.add_equity("SPY", Resolution.MINUTE).symbol
# Universe / rebalance bookkeeping
self._selected_symbols: List[Symbol] = []
self._pending_rebalance = False
self._last_rebalance_time = datetime(1900, 1, 1)
self._coarse_count = 0
self._fine_count = 0
# Trade / risk bookkeeping
self._entry_price_by_symbol: dict = {}
# TiingoNews subscription maps:
# - underlying -> news_symbol (the custom data Symbol returned by add_data)
# - news_symbol -> underlying
self._news_symbol_by_underlying: dict = {}
self._underlying_by_news_symbol: dict = {}
# Sentiment state (EWMA)
self._sentiment_ewma_by_symbol: dict = {}
self._sentiment_alpha = 0.2
# Warmup-safe removal queue
self._pending_liquidations = set()
# ----------------------------
# FinBERT configuration/state
# ----------------------------
# Notes:
# - QuantConnect projects may or may not have `transformers` / model files available.
# - This implementation is defensive: if FinBERT isn't available, the strategy still runs.
self._use_finbert = True
self._finbert = {}
self._finbert_ready = False
# Compute controls:
# - truncate long articles to keep inference bounded
# - cache inference results to avoid re-scoring the same news item
self._finbert_max_chars = 1500
self._finbert_cache_max_size = 2000
self._finbert_cache_by_key: dict = {}
# Warmup is used to build state (e.g., sentiment EWMA) before trading starts.
self.set_warm_up(timedelta(days=60))
# Initialize FinBERT early; if it fails, we automatically fall back.
self._initialize_finbert()
# Universe selection
self.add_universe(self._coarse_selection, self._fine_selection)
# Schedule: rebalance and daily risk checks
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self._rebalance_if_due,
)
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 45),
self._daily_risk_check,
)
def _coarse_selection(self, coarse: List[CoarseFundamental]) -> List[Symbol]:
"""Coarse universe: liquid stocks with fundamentals and price > 5."""
filtered = [
x
for x in coarse
if x.has_fundamental_data and x.price is not None and float(x.price) > 5
]
top = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)[:1000]
self._coarse_count = len(top)
return [x.symbol for x in top]
def _fine_selection(self, fine: List[FineFundamental]) -> List[Symbol]:
"""Fine universe: value/quality screen and set rebalance flag when the set changes."""
selected = []
for f in fine:
pe = self._get_float(
f,
[
"valuation_ratios.pe_ratio",
"valuation_ratios.peratio",
"valuation_ratios.price_earnings_ratio",
],
)
dte = self._get_float(
f,
[
"operation_ratios.total_debt_equity_ratio",
"operation_ratios.debt_to_equity",
"operation_ratios.debttoequity",
],
)
div_yield = self._get_float(
f,
[
"valuation_ratios.trailing_dividend_yield",
"valuation_ratios.dividend_yield",
"valuation_ratios.dividendyield",
],
)
roi = self._get_float(
f,
[
"operation_ratios.roi",
"operation_ratios.return_on_investment",
"operation_ratios.returnoninvesment",
"profitability_ratios.roi",
"profitability_ratios.return_on_investment",
"profitability_ratios.return_on_invested_capital",
"operation_ratios.roic",
"profitability_ratios.roic",
],
)
if (
not self._is_finite_number(pe)
or not self._is_finite_number(dte)
or not self._is_finite_number(div_yield)
or not self._is_finite_number(roi)
):
continue
if pe < 5 or pe > 15:
continue
if dte >= 1.0:
continue
if div_yield <= 0.01:
continue
if roi <= 0.10:
continue
selected.append((f.symbol, float(pe)))
selected_sorted = sorted(selected, key=lambda x: x[1], reverse=True)
symbols = [x[0] for x in selected_sorted[:20]]
self._fine_count = len(selected_sorted)
if set(symbols) != set(self._selected_symbols):
self._selected_symbols = symbols
self._pending_rebalance = True
return symbols
def on_securities_changed(self, changes: SecurityChanges) -> None:
"""Handle universe membership changes and TiingoNews subscriptions.
Key point:
- During warmup we avoid trading actions; symbols removed in warmup are queued and
processed in `on_warmup_finished()` instead.
"""
for security in changes.removed_securities:
symbol = security.symbol
if self.is_warming_up:
self._pending_liquidations.add(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
continue
if self.portfolio[symbol].invested:
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._remove_tiingo_news_subscription(symbol)
for security in changes.added_securities:
symbol = security.symbol
self._ensure_tiingo_news_subscription(symbol)
def on_data(self, slice: Slice) -> None:
"""Consume TiingoNews and update EWMA sentiment.
Workflow per news item:
1) Map news symbol -> underlying equity symbol.
2) Compute a sentiment score (FinBERT first, then fallbacks).
3) Update EWMA sentiment for that underlying.
"""
if slice is None:
return
try:
news_by_symbol = slice.get(TiingoNews)
except Exception:
news_by_symbol = None
if news_by_symbol is None:
return
for kvp in news_by_symbol:
try:
news_symbol = kvp.key
item = kvp.value
except Exception:
continue
if news_symbol is None or item is None:
continue
underlying = self._underlying_by_news_symbol.get(news_symbol, None)
if underlying is None:
continue
score = self._try_get_news_sentiment_score(item)
if score is None:
score = self._compute_naive_text_sentiment(item)
if score is None:
continue
self._update_sentiment(underlying, float(score))
def on_order_event(self, order_event: OrderEvent) -> None:
"""Capture entry price on fills to support the risk checks."""
if order_event is None:
return
if order_event.status != OrderStatus.FILLED:
return
if order_event.symbol is None:
return
symbol = order_event.symbol
if not self.securities.contains_key(symbol):
return
holding = self.portfolio[symbol]
if holding.invested and symbol not in self._entry_price_by_symbol:
fill_price = float(order_event.fill_price)
if self._is_finite_number(fill_price) and fill_price > 0:
self._entry_price_by_symbol[symbol] = fill_price
if not holding.invested:
self._entry_price_by_symbol.pop(symbol, None)
def _rebalance_if_due(self) -> None:
"""Rebalance approximately every 30 days using sentiment ranking."""
if self.is_warming_up:
return
if (self.time - self._last_rebalance_time) < timedelta(days=30):
return
if len(self._selected_symbols) == 0:
return
ranked = self._rank_by_sentiment(self._selected_symbols)
if len(ranked) == 0:
return
ranked_f = ranked[:15]
targets = self._build_weighted_targets(ranked_f)
if len(targets) == 0:
return
self.set_holdings(targets)
self._last_rebalance_time = self.time
self._pending_rebalance = False
preview = ",".join([str(x.value) for x in ranked_f[:16]])
self.debug(
f"Rebalance {self.time.date()} | coarse={self._coarse_count} fine_pass={self._fine_count} "
f"selected={len(self._selected_symbols)} | ranked_first={preview}"
)
def _rank_by_sentiment(self, symbols: List[Symbol]) -> List[Symbol]:
"""Sort symbols by current EWMA sentiment score (descending)."""
if symbols is None or len(symbols) == 0:
return []
scores = [(s, float(self._get_current_sentiment(s))) for s in symbols]
scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True)
return [x[0] for x in scores_sorted]
def _build_weighted_targets(self, ranked: List[Symbol]) -> List[PortfolioTarget]:
"""Build portfolio targets using the existing weighting logic (unchanged)."""
n = len(ranked)
if n <= 0:
return []
top_n = int(math.ceil(0.25 * float(n)))
top_n = max(1, min(top_n, n))
rest_n = n - top_n
weights_by_symbol: dict = {}
if rest_n <= 0:
w_each = 1.0 / float(top_n)
for symbol in ranked:
weights_by_symbol[symbol] = w_each
else:
w_top_each = 0.5 / float(top_n)
w_rest_each = 0.5 / float(rest_n)
for i, symbol in enumerate(ranked):
weights_by_symbol[symbol] = w_top_each if i < top_n else w_rest_each
total = sum(weights_by_symbol.values())
if self._is_finite_number(total) and float(total) > 0:
for symbol in list(weights_by_symbol.keys()):
weights_by_symbol[symbol] = float(weights_by_symbol[symbol]) / float(total)
return [PortfolioTarget(symbol, float(weight)) for symbol, weight in weights_by_symbol.items()]
def _daily_risk_check(self) -> None:
"""Daily take-profit/stop-loss check based on recorded entry price."""
if self.is_warming_up:
return
for symbol in list(self._entry_price_by_symbol.keys()):
if not self.portfolio[symbol].invested:
self._entry_price_by_symbol.pop(symbol, None)
continue
entry_price = self._entry_price_by_symbol.get(symbol, 0.0)
if not self._is_finite_number(entry_price) or float(entry_price) <= 0:
self._entry_price_by_symbol.pop(symbol, None)
continue
price = float(self.securities[symbol].price)
if not self._is_finite_number(price) or float(price) <= 0:
continue
if price >= 1.5 * float(entry_price) or price <= 0.9 * float(entry_price):
#self.liquidate(symbol)
#self._entry_price_by_symbol.pop(symbol, None)
self.debug(f"Risk Triggred for Info {symbol} | Entry Price {entry_price} | Current price {price}")
def _initialize_finbert(self) -> None:
"""Initialize FinBERT sentiment model.
Implementation notes:
- Uses HuggingFace `transformers.pipeline` with the ProsusAI FinBERT model.
- This is wrapped in try/except because the environment may not include the package
or model artifacts. If initialization fails, the algorithm continues with fallbacks.
"""
self._finbert_ready = False
self._finbert = {}
if not bool(getattr(self, "_use_finbert", False)):
self.debug("FinBERT disabled by configuration")
return
try:
from transformers import pipeline # type: ignore
pipe = pipeline(
task="sentiment-analysis",
model="ProsusAI/finbert",
tokenizer="ProsusAI/finbert",
truncation=True,
)
self._finbert = {"pipeline": pipe}
self._finbert_ready = True
self.debug("FinBERT enabled successfully")
except Exception as exc:
self._finbert_ready = False
self._finbert = {}
self.debug(f"FinBERT not available; using fallback sentiment. Error: {exc}")
def _finbert_sentiment_score(self, text: str):
"""Convert FinBERT output into a numeric score in [-1, +1].
Mapping:
- POSITIVE -> +confidence
- NEGATIVE -> -confidence
- NEUTRAL -> 0.0
Returns:
float: in [-1, +1] when successful
None: if FinBERT is not ready or inference fails
"""
if not bool(self._use_finbert):
return None
if not bool(self._finbert_ready):
return None
if text is None:
return None
cleaned = str(text).strip()
if len(cleaned) == 0:
return None
try:
max_chars = int(getattr(self, "_finbert_max_chars", 0))
if max_chars > 0 and len(cleaned) > max_chars:
cleaned = cleaned[:max_chars]
except Exception:
pass
try:
pipe = self._finbert.get("pipeline", None)
if pipe is None:
return None
result = pipe(cleaned)
if result is None:
return None
if isinstance(result, list):
if len(result) == 0:
return None
result = result[0]
if not isinstance(result, dict):
return None
label = str(result.get("label", "")).strip().lower()
confidence = result.get("score", None)
if confidence is None:
return None
conf = float(confidence)
if not self._is_finite_number(conf):
return None
conf = max(0.0, min(1.0, conf))
if "pos" in label:
return float(conf)
if "neg" in label:
return float(-conf)
if "neu" in label:
return 0.0
return None
except Exception:
return None
def _try_get_news_sentiment_score(self, news_item: TiingoNews):
"""Compute sentiment score for a TiingoNews item (FinBERT-first with caching).
Steps:
1) Build combined text from title/headline/description/summary.
2) Try FinBERT (if enabled/ready). Cache by (end_time, symbol) to avoid repeat scoring.
3) Fall back to extracting numeric fields if available on the news item.
Returns:
float or None
"""
if news_item is None:
return None
combined_text = ""
for attr_name in ["title", "Title", "headline", "Headline", "description", "Description", "summary", "Summary"]:
if hasattr(news_item, attr_name):
try:
part = getattr(news_item, attr_name)
if part is None:
continue
combined_text = (combined_text + " " + str(part)).strip()
except Exception:
continue
if bool(self._use_finbert) and len(str(combined_text).strip()) > 0:
try:
end_time = getattr(news_item, "end_time", None)
symbol = getattr(news_item, "symbol", None)
cache_key = (end_time, symbol)
if cache_key in self._finbert_cache_by_key:
cached = self._finbert_cache_by_key.get(cache_key, None)
if cached is None:
return None
return float(cached)
finbert_score = self._finbert_sentiment_score(str(combined_text))
if finbert_score is not None and self._is_finite_number(finbert_score):
self._finbert_cache_by_key[cache_key] = float(finbert_score)
if len(self._finbert_cache_by_key) > int(self._finbert_cache_max_size):
try:
self._finbert_cache_by_key.pop(next(iter(self._finbert_cache_by_key.keys())), None)
except Exception:
pass
return float(finbert_score)
self._finbert_cache_by_key[cache_key] = None
if len(self._finbert_cache_by_key) > int(self._finbert_cache_max_size):
try:
self._finbert_cache_by_key.pop(next(iter(self._finbert_cache_by_key.keys())), None)
except Exception:
pass
except Exception:
pass
candidate_attr_names = [
"Sentiment", "sentiment",
"Polarity", "polarity",
"Score", "score",
"SentimentScore", "sentimentscore",
"sentiment_score",
]
for name in candidate_attr_names:
if hasattr(news_item, name):
try:
v = getattr(news_item, name)
if hasattr(v, "value"):
v = getattr(v, "value")
fv = float(v)
if self._is_finite_number(fv):
return float(fv)
except Exception:
continue
if hasattr(news_item, "properties"):
try:
props = getattr(news_item, "properties")
if isinstance(props, dict):
for k in [
"sentiment", "Sentiment",
"polarity", "Polarity",
"score", "Score",
"sentimentScore", "SentimentScore",
"sentiment_score",
]:
if k in props:
try:
fv = float(props[k])
if self._is_finite_number(fv):
return float(fv)
except Exception:
continue
except Exception:
pass
return None
def _compute_naive_text_sentiment(self, news_item: TiingoNews):
"""Fallback sentiment model using a small positive/negative keyword list."""
if news_item is None:
return None
text = ""
for attr_name in ["title", "Title", "headline", "Headline", "description", "Description", "summary", "Summary"]:
if hasattr(news_item, attr_name):
try:
part = getattr(news_item, attr_name)
if part is None:
continue
text = (text + " " + str(part)).strip()
except Exception:
continue
if text is None or len(str(text).strip()) == 0:
return None
positive_words = {
"beat", "beats", "beating", "growth", "surge", "surges", "surged", "up",
"upgrade", "upgrades", "upgraded", "strong", "bull", "bullish", "profit",
"profits", "record", "raises", "raise", "raised", "outperform", "buy",
"positive", "win", "wins", "winning",
}
negative_words = {
"miss", "misses", "missed", "down", "downgrade", "downgrades", "downgraded",
"weak", "bear", "bearish", "loss", "losses", "lawsuit", "investigation",
"falls", "fall", "fell", "cut", "cuts", "cutting", "underperform", "sell",
"negative", "warning", "warns", "warn",
}
try:
cleaned = str(text).lower().replace("\n", " ").replace("\r", " ").replace("\t", " ")
for ch in [",", ".", ":", ";", "!", "?", "(", ")", "[", "]", "{", "}"]:
cleaned = cleaned.replace(ch, " ")
tokens = [t for t in cleaned.split(" ") if len(t) > 0]
except Exception:
return None
if len(tokens) == 0:
return None
pos = 0
neg = 0
for t in tokens:
if t in positive_words:
pos += 1
if t in negative_words:
neg += 1
denom = max(1, pos + neg)
score = float(pos - neg) / float(denom)
if not self._is_finite_number(score):
return None
return float(score)
def _update_sentiment(self, symbol: Symbol, score: float) -> None:
"""Update EWMA sentiment for a symbol."""
if symbol is None:
return
if not self._is_finite_number(score):
return
prev = self._sentiment_ewma_by_symbol.get(symbol, float("nan"))
if not self._is_finite_number(prev):
self._sentiment_ewma_by_symbol[symbol] = float(score)
return
a = float(self._sentiment_alpha)
self._sentiment_ewma_by_symbol[symbol] = a * float(score) + (1.0 - a) * float(prev)
def _get_current_sentiment(self, symbol: Symbol) -> float:
"""Get current EWMA sentiment score, or -inf if unavailable."""
if symbol is None:
self.debug(f"{symbol} Sentiment score not available")
return float("-inf")
v = self._sentiment_ewma_by_symbol.get(symbol, float("nan"))
if not self._is_finite_number(v):
self.debug(f"{symbol} Sentiment score not available")
return float("-inf")
self.debug(f"{symbol} Sentiment score: {float(v)}")
return float(v)
def _ensure_tiingo_news_subscription(self, underlying: Symbol) -> None:
"""Subscribe to TiingoNews for an underlying equity symbol (idempotent)."""
if underlying is None:
return
if underlying in self._news_symbol_by_underlying:
return
news_symbol = self.add_data(TiingoNews, underlying).symbol
self._news_symbol_by_underlying[underlying] = news_symbol
self._underlying_by_news_symbol[news_symbol] = underlying
def _remove_tiingo_news_subscription(self, underlying: Symbol) -> None:
"""Remove TiingoNews subscription for an underlying (best-effort)."""
if underlying is None:
return
news_symbol = self._news_symbol_by_underlying.pop(underlying, None)
if news_symbol is None:
return
self._underlying_by_news_symbol.pop(news_symbol, None)
try:
self.remove_security(news_symbol)
except Exception:
pass
@staticmethod
def _is_finite_number(x) -> bool:
if x is None:
return False
if isinstance(x, bool):
return False
try:
return math.isfinite(float(x))
except Exception:
return False
@staticmethod
def _get_float(obj: object, attr_paths: List[str]):
for path in attr_paths:
current = obj
ok = True
for part in path.split("."):
if current is None or not hasattr(current, part):
ok = False
break
current = getattr(current, part)
if not ok or current is None:
continue
if hasattr(current, "value"):
current = getattr(current, "value")
try:
return float(current)
except Exception:
continue
return None
def _to_ratio(self, value):
"""Utility helper kept for compatibility with your codebase (currently unused here)."""
if not self._is_finite_number(value):
return None
v = float(value)
return v / 100.0 if v > 1.0 else v
def on_warmup_finished(self) -> None:
"""Process any symbol removals queued during warmup.
Why this exists:
- Universe membership can change during warmup.
- Trading during warmup is intentionally avoided; we defer liquidation and cleanup.
"""
if len(self._pending_liquidations) == 0:
return
for symbol in list(self._pending_liquidations):
if self.portfolio[symbol].invested:
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._remove_tiingo_news_subscription(symbol)
self._pending_liquidations.clear()
from AlgorithmImports import *
class FundamentalValueUniverse30DayRebalanceAlgorithm(QCAlgorithm):
"""
Strategy overview (Daily universe + 30-day rebalance + daily risk exits)
Step 1) Universe / Selection (Coarse + Fine)
- Coarse (daily):
- US equities with fundamental data
- Price > 5
- Top 1000 by dollar volume
- Fine (daily), selection criteria:
- P/E ratio in [0, 15] inclusive (skip non-finite values)
- Total debt-to-equity ratio < 1
- Dividend yield > 1% (0.01)
- ROI > 10% (0.10)
Step 2) Sorting
- After filtering, sort by P/E in descending order.
Step 3) Portfolio construction
- Select top 20 and hold them equally weighted.
Daily portfolio check (risk rule based on purchase price)
- Track entry (purchase) price per symbol when position opens (0 -> invested).
- Each day after market open:
- Sell if price >= 1.5 * entry_price (gain > 50%)
- Sell if price <= 0.90 * entry_price (drop > 10%)
Rebalance rule
- Rebalance after every 30 calendar days (time delta check) and only when the
selected universe changes (pending_rebalance flag).
"""
def initialize(self) -> None:
self.set_start_date(2024, 1, 1)
self.set_end_date(2025, 1, 1)
self.set_cash(100000)
self.universe_settings.resolution = Resolution.DAILY
self.universe_settings.data_normalization_mode = DataNormalizationMode.ADJUSTED
# SPY minute subscription for scheduling
self._spy = self.add_equity("SPY", Resolution.MINUTE).symbol
self._selected_symbols = []
self._pending_rebalance = False
self._last_rebalance_time = datetime(1900, 1, 1)
self._coarse_count = 0
self._fine_count = 0
# Entry price tracking for daily sell rule
self._entry_price_by_symbol = {}
self.set_warm_up(timedelta(days=60))
self.add_universe(self._coarse_selection, self._fine_selection)
# Rebalance schedule (30 min after open)
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self._rebalance_if_due,
)
# Daily risk check schedule (35 min after open)
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 35),
self._daily_risk_check,
)
def _coarse_selection(self, coarse: List[CoarseFundamental]) -> List[Symbol]:
filtered = [
x
for x in coarse
if x.has_fundamental_data and x.price is not None and float(x.price) > 5
]
top = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)[:1000]
self._coarse_count = len(top)
return [x.symbol for x in top]
def _fine_selection(self, fine: List[FineFundamental]) -> List[Symbol]:
selected = []
for f in fine:
pe = self._get_float(
f,
[
"valuation_ratios.pe_ratio",
"valuation_ratios.peratio",
"valuation_ratios.price_earnings_ratio",
],
)
dte = self._get_float(
f,
[
"operation_ratios.total_debt_equity_ratio",
"operation_ratios.debt_to_equity",
"operation_ratios.debttoequity",
],
)
div_yield = self._get_float(
f,
[
"valuation_ratios.trailing_dividend_yield",
"valuation_ratios.dividend_yield",
"valuation_ratios.dividendyield",
],
)
roi = self._get_float(
f,
[
"operation_ratios.roi",
"operation_ratios.return_on_investment",
"operation_ratios.returnoninvesment",
"profitability_ratios.roi",
"profitability_ratios.return_on_investment",
"profitability_ratios.return_on_invested_capital",
"operation_ratios.roic",
"profitability_ratios.roic",
],
)
if (
not self._is_finite_number(pe)
or not self._is_finite_number(dte)
or not self._is_finite_number(div_yield)
or not self._is_finite_number(roi)
):
continue
# Step 1 filters
if pe < 0 or pe > 15:
continue
if dte >= 1:
continue
if div_yield <= 0.01:
continue
if roi <= 0.10:
continue
selected.append((f.symbol, float(pe)))
# Step 2 sort by PE descending
selected_sorted = sorted(selected, key=lambda x: x[1], reverse=True)
# Step 3 select top 2015
symbols = [x[0] for x in selected_sorted[:15]]
self._fine_count = len(selected_sorted)
if set(symbols) != set(self._selected_symbols):
self._selected_symbols = symbols
self._pending_rebalance = True
return symbols
def on_securities_changed(self, changes: SecurityChanges) -> None:
# Liquidate removed securities and clear entry price tracking
for security in changes.removed_securities:
symbol = security.symbol
if self.portfolio[symbol].invested:
self.liquidate(symbol)
if symbol in self._entry_price_by_symbol:
self._entry_price_by_symbol.pop(symbol, None)
def on_order_event(self, order_event: OrderEvent) -> None:
# Track entry price when a position transitions from 0 to invested.
if order_event is None:
return
if order_event.status != OrderStatus.FILLED:
return
if order_event.symbol is None:
return
symbol = order_event.symbol
if not self.securities.contains_key(symbol):
return
holding = self.portfolio[symbol]
# Set entry price only once when it becomes invested and we don't already have an entry.
if holding.invested and symbol not in self._entry_price_by_symbol:
fill_price = float(order_event.fill_price)
if self._is_finite_number(fill_price) and fill_price > 0:
self._entry_price_by_symbol[symbol] = fill_price
# If we are not invested, clear entry tracking
if not holding.invested:
self._entry_price_by_symbol.pop(symbol, None)
def _rebalance_if_due(self) -> None:
if self.is_warming_up:
return
if not self._pending_rebalance:
return
if (self.time - self._last_rebalance_time) < timedelta(days=30):
return
if len(self._selected_symbols) == 0:
return
weight = 1.0 / float(len(self._selected_symbols))
targets = [PortfolioTarget(symbol, weight) for symbol in self._selected_symbols]
self.set_holdings(targets)
self._last_rebalance_time = self.time
self._pending_rebalance = False
preview = ",".join([str(x.value) for x in self._selected_symbols[:10]])
self.debug(
f"Rebalance {self.time.date()} | coarse={self._coarse_count} fine_pass={self._fine_count} "
f"selected={len(self._selected_symbols)} | first={preview}"
)
def _daily_risk_check(self) -> None:
if self.is_warming_up:
return
for symbol in list(self._entry_price_by_symbol.keys()):
if not self.portfolio[symbol].invested:
self._entry_price_by_symbol.pop(symbol, None)
continue
entry_price = self._entry_price_by_symbol.get(symbol, 0.0)
if not self._is_finite_number(entry_price) or float(entry_price) <= 0:
self._entry_price_by_symbol.pop(symbol, None)
continue
price = float(self.securities[symbol].price)
if not self._is_finite_number(price) or price <= 0:
continue
if price >= 1.5 * float(entry_price) or price <= 0.90 * float(entry_price):
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
@staticmethod
def _is_finite_number(x) -> bool:
if x is None:
return False
if isinstance(x, bool):
return False
try:
return math.isfinite(float(x))
except Exception:
return False
@staticmethod
def _get_float(obj: object, attr_paths: List[str]):
for path in attr_paths:
current = obj
ok = True
for part in path.split("."):
if current is None or not hasattr(current, part):
ok = False
break
current = getattr(current, part)
if not ok or current is None:
continue
if hasattr(current, "value"):
current = getattr(current, "value")
try:
return float(current)
except Exception:
continue
return Nonefrom AlgorithmImports import *
class TrendPredictor:
"""Simple, dependency-free trend prediction proxy (MLFinLab-like intent).
Given a sequence of daily closes (oldest -> newest), it computes a trend score
using an OLS linear regression slope on log(prices). Positive slope implies an
uptrend. The API is intentionally small so it can be swapped for a more
sophisticated model later.
Returns:
trend_score (float): Higher means stronger uptrend.
predicted_up (bool): True if slope > 0.
"""
def __init__(self, lookback: int = 21) -> None:
self._lookback = max(2, int(lookback))
@property
def lookback(self) -> int:
return self._lookback
def predict(self, closes: list) -> tuple:
n = len(closes)
if n < 2:
return 0.0, False
log_prices = []
for p in closes:
if p is None:
return 0.0, False
price = float(p)
if price <= 0:
return 0.0, False
log_prices.append(math.log(price))
x_mean = (n - 1) / 2.0
y_mean = sum(log_prices) / float(n)
num = 0.0
den = 0.0
for i in range(n):
dx = float(i) - x_mean
dy = log_prices[i] - y_mean
num += dx * dy
den += dx * dx
if den == 0.0:
return 0.0, False
slope = num / den
# Scale by window length to make scores more separable.
trend_score = slope * float(n)
predicted_up = slope > 0.0
return float(trend_score), bool(predicted_up)
class FundamentalValueQualityTrendAlgorithm(QCAlgorithm):
"""Value/quality universe + trend filter proxy (monthly equal-weight rebalance).
Strategy thesis
---------------
This strategy seeks relatively cheap, financially healthier, shareholder-friendly
U.s. stocks, then applies a lightweight trend classifier to avoid (some) value traps.
Universe Selection (Coarse + Fine)
----------------------------------
1) Coarse (daily):
- U.s. equities with fundamental data
- Price > $5
- Take top 1000 by dollar volume (liquidity filter)
2) Fine (daily):
Filter to a value/quality profile:
- P/E between 5 and 15 (inclusive)
- Debt-to-equity < 1
- Dividend yield > 1% (0.01)
- ROI > 10% (0.10)
Note: FineFundamental field names/availability can vary; the implementation uses
robust attribute access helpers that attempt multiple plausible paths.
3) Ranking:
From fine-filtered set, rank by P/E descending .
The algorithm keeps this working set for the trend/ML step at rebalance.
ML Step (MLFinLab-like intent, self-contained)
----------------------------------------------
At rebalance time:
- Fetch last 21 trading days of daily close prices using History()
- Compute log-price linear regression slope as a trend_score
- predicted_up is True when slope > 0
Portfolio Construction / Rebalance
----------------------------------
- Target portfolio size: 15 stocks
- Primary selection: predicted_up == True (ranked by trend_score, highest first)
- If fewer than 15 pass, fill remaining with best trend_score among the remaining
candidates (relaxing predicted_up constraint).
- Final ordering: sort selected by P/E ascending and keep 15 (user requirement:
"Select top 15 steps based on PE").
- Equal-weight allocation across the final 15.
- Monthly rebalance scheduled at MonthStart (anchored to SPY) after market open.
- Liquidate positions not in the current selection at each rebalance.
Risks / assumptions
-------------------
- Fundamental fields can be missing; this can reduce the eligible universe.
- The trend proxy is simplistic and can whipsaw; it is intended as a replaceable
component rather than a production-grade ML model.
- Monthly turnover and transaction costs can impact live results.
"""
def initialize(self) -> None:
#self.set_start_date(2021, 1, 1)
#self.set_end_date(2026, 1, 1)
self.SetEndDate(2026, 1, 1)
self.set_start_date(self.end_date - timedelta(5*365))
self.set_cash(100000)
# Daily resolution for universe and history/trend computations
self.universe_settings.resolution = Resolution.DAILY
# Scheduling anchor
self._spy = self.add_equity("SPY", Resolution.DAILY).symbol
# Universe selection (Coarse + Fine)
self._coarse_count = 1000
self.add_universe(self._coarse_selection_function, self._fine_selection_function)
# Strategy parameters
self._portfolio_size = 15
self._trend_predictor = TrendPredictor(lookback=21)
# Track equity peak for drawdown calculations
self._equity_peak = self.portfolio.total_portfolio_value
self._max_drawdown_for_scaling = 0.10 # 10% portfolio DD guard
# Trend / vol parameters
self._trend_fast_ema = 10
self._trend_slow_ema = 21
self._vol_lookback = 21
# State (initialized with safe defaults, not None)
self._latest_coarse_count = 0
self._latest_fine_pass_count = 0
self._latest_ml_pass_count = 0
self._fine_working_set = []
self._pe_by_symbol = {}
self._last_selection = []
# Rebalance monthly on month start
self.schedule.on(
self.date_rules.month_start(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self._rebalance
)
# Warmup for stable history access
self.set_warm_up(30, Resolution.DAILY)
# -----------------
# Universe selection
# -----------------
def _coarse_selection_function(self, coarse: list) -> list:
# Step 1: basic filters + liquidity
filtered = []
for x in coarse:
if not x.has_fundamental_data:
continue
if x.price is None or x.price <= 5:
continue
filtered.append(x)
filtered.sort(key=lambda c: c.dollar_volume, reverse=True)
selected = filtered[: self._coarse_count]
self._latest_coarse_count = len(selected)
return [c.symbol for c in selected]
def _fine_selection_function(self, fine: list) -> list:
# Step 2: value/quality filters (robust field access)
passed = []
pe_by_symbol = {}
for f in fine:
pe = self._get_float(
f,
[
"ValuationRatios.PERatio",
"ValuationRatios.PE",
"PERatio",
"PE",
],
default=float("nan"),
)
debt_to_equity = self._get_float(
f,
[
"OperationRatios.DebtEquityRatio",
"OperationRatios.TotalDebtEquityRatio",
"FinancialStatements.balance_sheet.TotalDebtEquityRatio",
"DebtEquityRatio",
],
default=float("nan"),
)
dividend_yield = self._get_float(
f,
[
"ValuationRatios.ForwardDividendYield",
"ValuationRatios.DividendYield",
"DividendYield",
],
default=float("nan"),
)
# ROI can be represented differently; attempt a few likely fields.
roi = self._get_float(
f,
[
"OperationRatios.ROIC",
"OperationRatios.roi",
"OperationRatios.ReturnOnInvestment",
"ROIC",
"ROI",
],
default=float("nan"),
)
if not self._is_finite(pe) or not self._is_finite(debt_to_equity) or not self._is_finite(dividend_yield) or not self._is_finite(roi):
continue
# Apply filters
if pe < 5.0 or pe > 15.0:
continue
if debt_to_equity >= 1.0:
continue
if dividend_yield <= 0.01:
continue
if roi <= 0.10:
continue
passed.append(f)
pe_by_symbol[f.symbol] = float(pe)
# Step 3: rank by P/E ascending, keep working set for ML step
passed.sort(key=lambda ff: pe_by_symbol.get(ff.symbol, float("inf")),reverse=True)
passed = passed[: self._portfolio_size+5]
self._fine_working_set = [ff.symbol for ff in passed]
self._pe_by_symbol = pe_by_symbol
self._latest_fine_pass_count = len(self._fine_working_set)
return self._fine_working_set
# -------------
# Rebalance step
# -------------
def _rebalance(self) -> None:
if self.is_warming_up:
return
# Update equity peak and compute current drawdown
self._equity_peak = max(self._equity_peak, self.portfolio.total_portfolio_value)
current_dd = self._portfolio_drawdown()
# Candidates from the most recent fine selection
candidates = list(self._fine_working_set)
if len(candidates) == 0:
self.debug("Rebalance: no fine candidates; skipping.")
return
# Step 4: trend prediction via History()
scores = {}
vols = {}
ml_pass = []
history = self.history(candidates, self._trend_predictor.lookback, Resolution.DAILY)
if history.empty:
self.debug("Rebalance: History returned empty; skipping.")
return
# Try to parse pandas multi-index history (symbol, time)
for symbol in candidates:
try:
if symbol not in history.index.get_level_values(0):
continue
sym_hist = history.loc[symbol]
if "close" not in sym_hist.columns:
continue
closes_series = sym_hist["close"].dropna()
closes = [float(x) for x in closes_series.values.tolist()]
#if len(closes) < max(self._trend_slow_ema, self._vol_lookback):
if len(closes) < self._trend_predictor.lookback:
continue
# EMA-based trend filter
#trend_score, up = self._ema_trend(closes, self._trend_fast_ema, self._trend_slow_ema)
#scores[symbol] = float(trend_score)
score, up = self._trend_predictor.predict(closes)
scores[symbol] = float(score)
# Volatility estimate (for inverse-vol weighting)
vols[symbol] = self._calculate_volatility(closes[-self._vol_lookback:])
if up:
ml_pass.append(symbol)
except Exception:
# Defensive: if any symbol's history parsing fails, skip it
continue
self._latest_ml_pass_count = len(ml_pass)
# Step 5: Select top 15 with predicted_up; if insufficient, fill by trend_score
selected = sorted(ml_pass, key=lambda s: scores.get(s, float("-inf")), reverse=True)
#if len(selected) < self._portfolio_size:
# remaining = [s for s in candidates if s not in selected]
# remaining.sort(key=lambda s: scores.get(s, float("-inf")), reverse=True)
# need = self._portfolio_size - len(selected)
#selected.extend(remaining[:need])
selected = selected[: self._portfolio_size]
# Step 6: Final ordering by P/E descending and keep 15
selected.sort(key=lambda s: self._pe_by_symbol.get(s, float("inf")),reverse=True)
selected = selected[: self._portfolio_size]
# Step 11: Debug logs (counts + selected symbols)
self.debug(
f"Universe funnel: coarse={self._latest_coarse_count}, "
f"fine_pass={self._latest_fine_pass_count}, "
f"ml_pass={self._latest_ml_pass_count}, "
f"selected={len(selected)}"
)
selected_str = ",".join([f"{s.value}(PE={self._pe_by_symbol.get(s, float('nan')):.2f})" for s in selected])
self.debug("Selected symbols: " + selected_str)
# Step 9: Liquidate positions not in selection
selected_set = set(selected)
for symbol, holding in self.portfolio.items():
if holding.invested and symbol not in selected_set:
self.liquidate(symbol, "Removed from selection")
if len(selected) == 0:
self._last_selection = []
return
# Step 8: Inverse-volatility position sizing
inv_vols = {}
for symbol in selected:
vol = vols.get(symbol, float("inf"))
if vol is None or vol <= 0 or not math.isfinite(vol):
continue
inv_vols[symbol] = 1.0 / vol
# Fallback to equal-weight if we couldn't compute vols
if not inv_vols:
weight = 1.0 / float(len(selected))
weights = {s: weight for s in selected}
else:
total_inv_vol = sum(inv_vols.values())
weights = {s: inv_vols[s] / total_inv_vol for s in inv_vols.keys()}
# Step 9: Apply portfolio drawdown guard (scale exposure when DD is high)
scale = 1.0
if current_dd > self._max_drawdown_for_scaling:
self.debug(
f"Drawdown {current_dd:.2%} exceeds {self._max_drawdown_for_scaling:.2%}, scaling exposure by 0.5"
)
scale = 0.5
# Step 10: Set holdings
#weight = 1.0 / float(len(selected))
for symbol in selected:
if symbol not in self.securities:
continue
if not self.securities[symbol].is_tradable:
continue
target_weight = weights.get(symbol, 0.0) * scale
if target_weight <= 0:
continue
self.set_holdings(symbol, target_weight)
# Step 7: Equal-weight allocation
#for symbol in selected:
# if symbol not in self.securities:
# continue
# if not self.securities[symbol].is_tradable:
# continue
# self.set_holdings(symbol, weight)
self._last_selection = selected
# ----------------
# Helper functions
# ----------------
def _get_float(self, root: object, paths: list, default: float) -> float:
"""Robust attribute getter that tolerates:
- Missing nested fields
- Non-numeric boxed values
- Objects with a `.value` numeric property (common in QC fundamentals)
Args:
root: object to traverse (e.g., FineFundamental)
paths: list of dot-separated attribute paths to try
default: fallback float value
Returns:
float (or default)
"""
for path in paths:
value = self._try_get_attr_path(root, path)
numeric = self._coerce_to_float(value)
if numeric is None:
continue
return float(numeric)
return float(default)
def _try_get_attr_path(self, root: object, path: str) -> object:
current = root
for part in path.split("."):
if current is None or not hasattr(current, part):
return None
current = getattr(current, part)
return current
def _coerce_to_float(self, value: object) -> float:
if value is None:
return None
if hasattr(value, "Value"):
try:
return float(getattr(value, "Value"))
except Exception:
return None
try:
return float(value)
except Exception:
return None
def _is_finite(self, x: float) -> bool:
try:
return x is not None and math.isfinite(float(x))
except Exception:
return False
def _calculate_volatility(self, closes: list) -> float:
"""Compute simple log-return volatility over the given closes."""
if len(closes) < 2:
return float("inf")
returns = []
for i in range(1, len(closes)):
if closes[i - 1] <= 0 or closes[i] <= 0:
continue
r = math.log(closes[i] / closes[i - 1])
returns.append(r)
if len(returns) < 2:
return float("inf")
return float(np.std(returns))
def _portfolio_drawdown(self) -> float:
"""Compute current portfolio drawdown relative to peak equity."""
if self._equity_peak <= 0:
return 0.0
equity = self.portfolio.total_portfolio_value
dd = (self._equity_peak - equity) / self._equity_peak
return max(0.0, float(dd))
def _ema_trend(self, closes: list, fast: int, slow: int) -> tuple:
"""EMA-based trend proxy.
Args:
closes: list of close prices (oldest -> newest)
fast: fast EMA span
slow: slow EMA span
Returns:
(trend_score, predicted_up)
trend_score = fast_ema - slow_ema (higher = stronger uptrend)
predicted_up = True if fast_ema > slow_ema
"""
if len(closes) < slow:
return 0.0, False
series = pd.Series(closes)
fast_ema = series.ewm(span=fast).mean().iloc[-1]
slow_ema = series.ewm(span=slow).mean().iloc[-1]
trend_score = float(fast_ema - slow_ema)
predicted_up = fast_ema > slow_ema
return trend_score, predicted_upfrom AlgorithmImports import *
class FundamentalValueSentiment30DayRebalanceAlgorithm(QCAlgorithm):
"""A value + news-sentiment strategy with a fixed 30-calendar-day rebalance cycle.
Strategy flow:
1) Coarse universe (daily):
- Has fundamental data
- Price > 5
- Top 1000 by dollar volume
2) Fine universe (daily) simplified filters ONLY:
- P/E in [0, 15]
- Total debt to equity ratio < 1
- Dividend yield > 1% (normalized via _to_ratio)
- ROI > 10% (normalized via _to_ratio)
Skip non-finite values. After filtering, sort by P/E descending and select top 20.
3) Sentiment (TiingoNews):
- Subscribe to TiingoNews per selected symbol.
- Update EWMA sentiment (alpha=0.2) on each news item.
4) Rebalance:
- Time-based every 30 calendar days, checked daily after market open.
- Independent of universe changes.
5) Portfolio weights:
- Rank selected symbols by current sentiment descending (missing -> -inf).
- Take top 12 stocks
- Allocate 50% to top 25% (ceil) and 50% to the rest equally.
- Normalize weights to sum to 1.
6) Daily risk rule: [Disabled]
- Track entry price via OnOrderEvent.
- After market open: TP at +50% or SL at -10% vs entry triggers liquidation.
"""
def initialize(self) -> None:
self.set_start_date(2025, 1, 1)
self.set_end_date(2025, 10, 1)
self.set_cash(100000)
self.universe_settings.resolution = Resolution.DAILY
self.universe_settings.data_normalization_mode = DataNormalizationMode.ADJUSTED
self._spy = self.add_equity("SPY", Resolution.MINUTE).symbol
self._selected_symbols: List[Symbol] = []
self._pending_rebalance = False
self._last_rebalance_time = datetime(1900, 1, 1)
self._coarse_count = 0
self._fine_count = 0
# Entry price tracking for daily sell rule
self._entry_price_by_symbol: dict = {}
self._news_symbol_by_underlying: dict = {}
self._underlying_by_news_symbol: dict = {}
self._sentiment_ewma_by_symbol: dict = {}
self._sentiment_alpha = 0.2
self._pending_liquidations = set()
self.set_warm_up(timedelta(days=60))
self.add_universe(self._coarse_selection, self._fine_selection)
# Rebalance check happens daily, but only executes when 30 calendar days have elapsed.
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self._rebalance_if_due,
)
# Daily risk check is independent of rebalance timing.
self.schedule.on(
self.date_rules.every_day(self._spy),
self.time_rules.after_market_open(self._spy, 45),
self._daily_risk_check,
)
def _coarse_selection(self, coarse: List[CoarseFundamental]) -> List[Symbol]:
"""Daily coarse universe selection (unchanged).
Filters:
- Has fundamental data
- Price > 5
Sort:
- Top 1000 by dollar volume
"""
filtered = [
x
for x in coarse
if x.has_fundamental_data and x.price is not None and float(x.price) > 5
]
top = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)[:1000]
self._coarse_count = len(top)
return [x.symbol for x in top]
def _fine_selection(self, fine: List[FineFundamental]) -> List[Symbol]:
selected = []
for f in fine:
pe = self._get_float(
f,
[
"valuation_ratios.pe_ratio",
"valuation_ratios.peratio",
"valuation_ratios.price_earnings_ratio",
],
)
dte = self._get_float(
f,
[
"operation_ratios.total_debt_equity_ratio",
"operation_ratios.debt_to_equity",
"operation_ratios.debttoequity",
],
)
div_yield = self._get_float(
f,
[
"valuation_ratios.trailing_dividend_yield",
"valuation_ratios.dividend_yield",
"valuation_ratios.dividendyield",
],
)
roi = self._get_float(
f,
[
"operation_ratios.roi",
"operation_ratios.return_on_investment",
"operation_ratios.returnoninvesment",
"profitability_ratios.roi",
"profitability_ratios.return_on_investment",
"profitability_ratios.return_on_invested_capital",
"operation_ratios.roic",
"profitability_ratios.roic",
],
)
if (
not self._is_finite_number(pe)
or not self._is_finite_number(dte)
or not self._is_finite_number(div_yield)
or not self._is_finite_number(roi)
):
continue
# Step 1 filters
if pe < 0 or pe > 15:
continue
if dte >= 1.0:
continue
if div_yield <= 0.01:
continue
if roi <= 0.10:
continue
selected.append((f.symbol, float(pe)))
# Step 2 sort by PE descending
selected_sorted = sorted(selected, key=lambda x: x[1], reverse=True)
# Step 3 select top 20
symbols = [x[0] for x in selected_sorted[:20]]
self._fine_count = len(selected_sorted)
if set(symbols) != set(self._selected_symbols):
self._selected_symbols = symbols
self._pending_rebalance = True
return symbols
def on_securities_changed(self, changes: SecurityChanges) -> None:
"""Handle universe changes.
- For removed securities: liquidate, clear entry-price and sentiment state, and
remove TiingoNews subscription.
- For added securities: ensure TiingoNews subscription exists.
"""
for security in changes.removed_securities:
symbol = security.symbol
if self.is_warming_up:
self._pending_liquidations.add(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
continue
if self.portfolio[symbol].invested:
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._remove_tiingo_news_subscription(symbol)
for security in changes.added_securities:
symbol = security.symbol
self._ensure_tiingo_news_subscription(symbol)
def on_data(self, slice: Slice) -> None:
"""Consume TiingoNews and update EWMA sentiment per underlying symbol."""
if slice is None:
return
try:
news_by_symbol = slice.get(TiingoNews)
except Exception:
news_by_symbol = None
if news_by_symbol is None:
return
for kvp in news_by_symbol:
try:
news_symbol = kvp.key
item = kvp.value
except Exception:
continue
if news_symbol is None or item is None:
continue
underlying = self._underlying_by_news_symbol.get(news_symbol, None)
if underlying is None:
continue
score = self._try_get_news_sentiment_score(item)
if score is None:
score = self._compute_naive_text_sentiment(item)
if score is None:
continue
self._update_sentiment(underlying, float(score))
def on_order_event(self, order_event: OrderEvent) -> None:
"""Track entry price when an order fills.
Entry price is used for daily TP/SL checks.
"""
if order_event is None:
return
if order_event.status != OrderStatus.FILLED:
return
if order_event.symbol is None:
return
symbol = order_event.symbol
if not self.securities.contains_key(symbol):
return
holding = self.portfolio[symbol]
if holding.invested and symbol not in self._entry_price_by_symbol:
fill_price = float(order_event.fill_price)
if self._is_finite_number(fill_price) and fill_price > 0:
self._entry_price_by_symbol[symbol] = fill_price
if not holding.invested:
self._entry_price_by_symbol.pop(symbol, None)
def _rebalance_if_due(self) -> None:
"""Rebalance if the 30-calendar-day interval has elapsed.
The rebalance schedule is independent of whether the universe changed.
It checks daily after market open and only executes when due.
"""
if self.is_warming_up:
return
if (self.time - self._last_rebalance_time) < timedelta(days=30):
return
if len(self._selected_symbols) == 0:
return
ranked = self._rank_by_sentiment(self._selected_symbols)
if len(ranked) == 0:
return
# Filter - Take Top 10 stocks by sentiment
ranked_f = ranked[:10]
targets = self._build_weighted_targets(ranked_f)
if len(targets) == 0:
return
self.set_holdings(targets)
self._last_rebalance_time = self.time
self._pending_rebalance = False
preview = ",".join([str(x.value) for x in ranked_f[:16]])
self.debug(
f"Rebalance {self.time.date()} | coarse={self._coarse_count} fine_pass={self._fine_count} "
f"selected={len(self._selected_symbols)} | ranked_first={preview}"
)
def _rank_by_sentiment(self, symbols: List[Symbol]) -> List[Symbol]:
"""Rank a list of symbols by EWMA sentiment score (descending).
Missing sentiment scores are treated as -inf.
"""
if symbols is None or len(symbols) == 0:
return []
scores = [(s, float(self._get_current_sentiment(s))) for s in symbols]
scores_sorted = sorted (scores, key=lambda x: x[1], reverse=True)
return [x[0] for x in scores_sorted]
def _build_weighted_targets(self, ranked: List[Symbol]) -> List[PortfolioTarget]:
"""Build portfolio targets.
- Allocate 50% equally among the top 25% (ceil).
- Allocate remaining 50% equally among the rest.
- Normalize final weights to sum to 1.
"""
n = len(ranked)
if n <= 0:
return []
top_n = int(math.ceil(0.25 * float(n)))
top_n = max(1, min(top_n, n))
rest_n = n - top_n
weights_by_symbol: dict = {}
if rest_n <= 0:
w_each = 1.0 / float(top_n)
for symbol in ranked:
weights_by_symbol[symbol] = w_each
else:
w_top_each = 0.5 / float(top_n)
w_rest_each = 0.5 / float(rest_n)
for i, symbol in enumerate(ranked):
weights_by_symbol[symbol] = w_top_each if i < top_n else w_rest_each
total = sum(weights_by_symbol.values())
if self._is_finite_number(total) and float(total) > 0:
for symbol in list(weights_by_symbol.keys()):
weights_by_symbol[symbol] = float(weights_by_symbol[symbol]) / float(total)
return [PortfolioTarget(symbol, float(weight)) for symbol, weight in weights_by_symbol.items()]
def _daily_risk_check(self) -> None:
"""Daily take-profit / stop-loss check after market open.
TP: price >= 1.5 * entry
SL: price <= 0.9 * entry
"""
if self.is_warming_up:
return
for symbol in list(self._entry_price_by_symbol.keys()):
if not self.portfolio[symbol].invested:
self._entry_price_by_symbol.pop(symbol, None)
continue
entry_price = self._entry_price_by_symbol.get(symbol, 0.0)
if not self._is_finite_number(entry_price) or float(entry_price) <= 0:
self._entry_price_by_symbol.pop(symbol, None)
continue
price = float(self.securities[symbol].price)
if not self._is_finite_number(price) or float(price) <= 0:
continue
if price >= 1.5 * float(entry_price) or price <= 0.9 * float(entry_price):
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self.debug(f"Risk Triggred for Info {symbol} | Entry Price {entry_price} | Current price {price}")
def _try_get_news_sentiment_score(self, news_item: TiingoNews):
"""Try extracting a sentiment score from TiingoNews item properties/fields."""
if news_item is None:
return None
candidate_attr_names = [
"Sentiment", "sentiment",
"Polarity", "polarity",
"Score", "score",
"SentimentScore", "sentimentscore",
"sentiment_score",
]
for name in candidate_attr_names:
if hasattr(news_item, name):
try:
v = getattr(news_item, name)
if hasattr(v, "value"):
v = getattr(v, "value")
fv = float(v)
if self._is_finite_number(fv):
return float(fv)
except Exception:
continue
if hasattr(news_item, "properties"):
try:
props = getattr(news_item, "properties")
if isinstance(props, dict):
for k in [
"sentiment", "Sentiment",
"polarity", "Polarity",
"score", "Score",
"sentimentScore", "SentimentScore",
"sentiment_score",
]:
if k in props:
try:
fv = float(props[k])
if self._is_finite_number(fv):
return float(fv)
except Exception:
continue
except Exception:
pass
return None
def _compute_naive_text_sentiment(self, news_item: TiingoNews):
"""Fallback sentiment: a tiny keyword-based score from the news text."""
if news_item is None:
return None
text = ""
for attr_name in ["title", "Title", "headline", "Headline", "description", "Description", "summary", "Summary"]:
if hasattr(news_item, attr_name):
try:
part = getattr(news_item, attr_name)
if part is None:
continue
text = (text + " " + str(part)).strip()
except Exception:
continue
if text is None or len(str(text).strip()) == 0:
return None
positive_words = {
"beat", "beats", "beating", "growth", "surge", "surges", "surged", "up",
"upgrade", "upgrades", "upgraded", "strong", "bull", "bullish", "profit",
"profits", "record", "raises", "raise", "raised", "outperform", "buy",
"positive", "win", "wins", "winning",
}
negative_words = {
"miss", "misses", "missed", "down", "downgrade", "downgrades", "downgraded",
"weak", "bear", "bearish", "loss", "losses", "lawsuit", "investigation",
"falls", "fall", "fell", "cut", "cuts", "cutting", "underperform", "sell",
"negative", "warning", "warns", "warn",
}
try:
cleaned = str(text).lower().replace("\n", " ").replace("\r", " ").replace("\t", " ")
for ch in [",", ".", ":", ";", "!", "?", "(", ")", "[", "]", "{", "}"]:
cleaned = cleaned.replace(ch, " ")
tokens = [t for t in cleaned.split(" ") if len(t) > 0]
except Exception:
return None
if len(tokens) == 0:
return None
pos = 0
neg = 0
for t in tokens:
if t in positive_words:
pos += 1
if t in negative_words:
neg += 1
denom = max(1, pos + neg)
score = float(pos - neg) / float(denom)
if not self._is_finite_number(score):
return None
return float(score)
def _update_sentiment(self, symbol: Symbol, score: float) -> None:
"""Update EWMA sentiment score for a symbol."""
if symbol is None:
return
if not self._is_finite_number(score):
return
prev = self._sentiment_ewma_by_symbol.get(symbol, float("nan"))
if not self._is_finite_number(prev):
self._sentiment_ewma_by_symbol[symbol] = float(score)
return
a = float(self._sentiment_alpha)
self._sentiment_ewma_by_symbol[symbol] = a * float(score) + (1.0 - a) * float(prev)
def _get_current_sentiment(self, symbol: Symbol) -> float:
"""Get current EWMA sentiment for a symbol; missing -> -inf."""
if symbol is None:
self.debug(f"{symbol} Sentiment score not available")
return float("-inf")
v = self._sentiment_ewma_by_symbol.get(symbol, float("nan"))
if not self._is_finite_number(v):
self.debug(f"{symbol} Sentiment score not available")
return float("-inf")
self.debug(f"{symbol} Sentiment score: {float(v)}")
return float(v)
def _ensure_tiingo_news_subscription(self, underlying: Symbol) -> None:
"""Subscribe to TiingoNews for an underlying symbol (idempotent)."""
if underlying is None:
return
if underlying in self._news_symbol_by_underlying:
return
news_symbol = self.add_data(TiingoNews, underlying).symbol
self._news_symbol_by_underlying[underlying] = news_symbol
self._underlying_by_news_symbol[news_symbol] = underlying
def _remove_tiingo_news_subscription(self, underlying: Symbol) -> None:
"""Remove TiingoNews subscription for an underlying symbol if present."""
if underlying is None:
return
news_symbol = self._news_symbol_by_underlying.pop(underlying, None)
if news_symbol is None:
return
self._underlying_by_news_symbol.pop(news_symbol, None)
try:
self.remove_security(news_symbol)
except Exception:
pass
@staticmethod
def _is_finite_number(x) -> bool:
"""True if x can be cast to a finite float (and is not boolean)."""
if x is None:
return False
if isinstance(x, bool):
return False
try:
return math.isfinite(float(x))
except Exception:
return False
@staticmethod
def _get_float(obj: object, attr_paths: List[str]):
"""Try reading a numeric attribute via multiple dot-separated paths."""
for path in attr_paths:
current = obj
ok = True
for part in path.split("."):
if current is None or not hasattr(current, part):
ok = False
break
current = getattr(current, part)
if not ok or current is None:
continue
if hasattr(current, "value"):
current = getattr(current, "value")
try:
return float(current)
except Exception:
continue
return None
def _to_ratio(self, value):
"""Normalize either a percent (e.g., 12.3) or a ratio (e.g., 0.123) to ratio."""
if not self._is_finite_number(value):
return None
v = float(value)
return v / 100.0 if v > 1.0 else v
def on_warmup_finished(self) -> None:
if len(self._pending_liquidations) == 0:
return
for symbol in list(self._pending_liquidations):
if self.portfolio[symbol].invested:
self.liquidate(symbol)
self._entry_price_by_symbol.pop(symbol, None)
self._sentiment_ewma_by_symbol.pop(symbol, None)
self._remove_tiingo_news_subscription(symbol)
self._pending_liquidations.clear()