# region imports
from AlgorithmImports import *
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# endregion
class LazyPricesStrategy(QCAlgorithm):
def initialize(self):
self.set_start_date(2022, 1, 1)
self.set_end_date(2024, 1, 1)
self.set_cash(1_000_000)
self.settings.seed_initial_prices = True
self.assets_per_side = 5
self.universe_settings.resolution = Resolution.DAILY
self._universe = self.add_universe(lambda fundamental:
[s.symbol for s in sorted([f for f in fundamental if f.has_fundamental_data], key=lambda x: x.dollar_volume)[-25:]]
)
def on_warmup_finished(self):
# Configure training and prediction schedule.
time_rule = self.time_rules.at(8, 0)
self.schedule.on(self.date_rules.month_start('SPY'), time_rule, self._rebalance)
# Rebalance today too.
if self.live_mode:
self._rebalance()
else:
self.schedule.on(self.date_rules.today, time_rule, self._rebalance)
def on_securities_changed(self, changes):
# Subscribe to custom 10-K data feeds for newly added universe equities.
for security in changes.added_securities:
security.report = self.add_data(SECReport10K, security.symbol, Resolution.DAILY)
security.last_filing_time = None
security.prev_filing_text = None
security.last_similarity = None
def _process_filing(self, report, prev_filing_text, last_filing_time):
# Skip if filing is stale.
if last_filing_time and report.end_time <= last_filing_time:
return None, prev_filing_text, last_filing_time
# Extract and clean document text.
docs = [doc.text for doc in report.report.documents if doc.text and len(doc.text) > 50]
if not docs:
return None, prev_filing_text, last_filing_time
raw_text = " ".join(docs)
if len(raw_text) <= 200:
return None, prev_filing_text, last_filing_time
# Filter lines by content quality: exclude metadata, require minimum length and text ratio.
excluded = ["table of contents", "xbrl", "exhibit", "signature", "signatures", "html", "xml", "excel", ".xls", ".pdf"]
ratio_by_line = {}
for line in raw_text.splitlines():
line = line.strip()
if len(line) < 30:
continue
lower = line.lower()
if any(x in lower for x in excluded):
continue
ratio_by_line[line] = sum(c.isdigit() for c in line) / len(line)
if not ratio_by_line:
return None, prev_filing_text, last_filing_time
cutoff = self._digit_cutoff(np.array(list(ratio_by_line.values())))
clean_lines = [line for line, ratio in ratio_by_line.items() if ratio <= cutoff]
clean_text = " ".join(clean_lines)
if len(clean_text) <= 200:
return None, prev_filing_text, last_filing_time
if not prev_filing_text:
return None, clean_text, report.end_time
cv = TfidfVectorizer(stop_words="english")
mat = cv.fit_transform([prev_filing_text, clean_text])
sim = float(cosine_similarity(mat[0:1], mat[1:2])[0][0])
return sim, clean_text, report.end_time
def _digit_cutoff(self, ratios):
# Otsu valley between prose (~0) and table-row (high) digit ratios.
counts, edges = np.histogram(ratios, bins=50, range=(0.0, 1.0))
centers = (edges[:-1] + edges[1:]) / 2
p = counts / counts.sum()
omega = np.cumsum(p)
mu = np.cumsum(p * centers)
between = np.nan_to_num((mu[-1] * omega - mu) ** 2 / (omega * (1 - omega)))
total_var = (p * (centers - mu[-1]) ** 2).sum()
return float(centers[between.argmax()])
def _rebalance(self):
if self.is_warming_up or not self._universe.selected:
return
# Pull new filings and compute similarity scores for this rebalance cycle.
similarity_by_symbol = {}
for symbol in self._universe.selected:
security = self.securities[symbol]
for report in self.history[SECReport10K](security.report.symbol, timedelta(days=500), Resolution.DAILY):
security.last_similarity, security.prev_filing_text, security.last_filing_time = self._process_filing(report, security.prev_filing_text, security.last_filing_time)
if security.last_similarity is None:
self.log(f"{self.time.date()} | {symbol.value} | no prior 10-K to compare, skipping")
continue
similarity_by_symbol[symbol] = security.last_similarity
# Require sufficient signals for both long and short sides.
if len(similarity_by_symbol) < 2 * self.assets_per_side:
self.debug(f"{self.time.date()} | {len(similarity_by_symbol)} signals < {2 * self.assets_per_side}. Skipping.")
return
ranked = sorted(similarity_by_symbol, key=similarity_by_symbol.get)
short_symbols = ranked[:self.assets_per_side]
long_symbols = ranked[-self.assets_per_side:]
w = 0.5 / self.assets_per_side
targets = [PortfolioTarget(s, w) for s in long_symbols] + [PortfolioTarget(s, -w) for s in short_symbols]
self.set_holdings(targets, True)
self.debug(
f"{self.time.date()} | Signals={len(similarity_by_symbol)} | "
f"Long={[s.value for s in long_symbols]} | "
f"Short={[s.value for s in short_symbols]}"
)
self.plot("Strategy", "Signal Count", len(similarity_by_symbol))