from AlgorithmImports import *
from datetime import timedelta
class InsiderClusterBuying(QCAlgorithm):
class InsiderSelectionData:
def __init__(self):
self.buyer_records = []
def update(self, time, buyer_name, shares):
if shares and shares > 0 and buyer_name:
self.buyer_records.append((time, buyer_name))
def count_distinct_buyers(self, cutoff):
recent = [name for t, name in self.buyer_records if t >= cutoff]
return len(set(recent))
def initialize(self):
self.set_start_date(self.end_date - timedelta(5 * 365))
self.set_cash(100000)
self.settings.seed_initial_prices = True
self.universe_settings.resolution = Resolution.DAILY
self._max_positions = 25
self._hold_days = 60
self._lookback_days = 14
self._min_buyers = 3
self._skip_ytd_decline = -0.30
self._insider_data = {}
self._active = {}
self._universe = self.add_universe(QuiverInsiderTradingUniverse, self._select_assets)
self.schedule.on(
self.date_rules.every_day("SPY"),
self.time_rules.after_market_open("SPY", 1),
self._rebalance
)
def _select_assets(self, data):
for d in data:
if d.symbol not in self._insider_data:
self._insider_data[d.symbol] = self.InsiderSelectionData()
shares = getattr(d, "shares", None)
name = getattr(d, "name", None)
self._insider_data[d.symbol].update(self.time, name, shares)
recent_cutoff = self.time - timedelta(days=90)
active = []
for sym in list(self._insider_data.keys()):
recent = [t for t, _ in self._insider_data[sym].buyer_records if t >= recent_cutoff]
if recent:
active.append(sym)
else:
del self._insider_data[sym]
return active
def _rebalance(self):
symbols_to_exit = []
for sym, entry_time in list(self._active.items()):
if sym in self.portfolio and self.portfolio[sym].invested:
if self._count_trading_days(entry_time, self.time) >= self._hold_days:
symbols_to_exit.append(sym)
else:
symbols_to_exit.append(sym)
for sym in symbols_to_exit:
if sym in self._active:
del self._active[sym]
if sym in self.portfolio and self.portfolio[sym].invested:
self.liquidate(sym)
cutoff = self.time - timedelta(days=self._lookback_days)
for sym in list(self._insider_data.keys()):
self._insider_data[sym].buyer_records = [
(t, n) for t, n in self._insider_data[sym].buyer_records if t >= cutoff
]
candidates = []
for sym in self._universe.selected:
distinct_buyers = self._insider_data[sym].count_distinct_buyers(cutoff)
if distinct_buyers >= self._min_buyers:
history = self.history(sym, timedelta(days=365), Resolution.DAILY)
if not history.empty and len(history) > 1:
start_price = history["close"].iloc[0]
current_price = history["close"].iloc[-1]
ytd_return = (current_price - start_price) / start_price
if ytd_return <= self._skip_ytd_decline:
continue
candidates.append(sym)
candidates = [c for c in candidates if c not in self._active]
def _first_signal_time(sym):
records = self._insider_data[sym].buyer_records
if records:
return min(t for t, _ in records)
return self.time
candidates.sort(key=_first_signal_time)
current_positions = [s for s in self.portfolio.keys() if self.portfolio[s].invested]
slots = self._max_positions - len(current_positions)
new_entries = candidates[:slots]
if not new_entries:
return
all_target_symbols = list(self._active.keys()) + new_entries
all_target_symbols = [s for s in all_target_symbols if s in self._universe.selected]
if not all_target_symbols:
return
weight = 1.0 / self._max_positions
targets = [PortfolioTarget(s, weight) for s in all_target_symbols]
self.set_holdings(targets, liquidate_existing_holdings=False)
for sym in new_entries:
self._active[sym] = self.time
def _count_trading_days(self, start, end):
if start >= end:
return 0
count = 0
d = start.date() + timedelta(days=1)
end_date = end.date()
while d <= end_date:
if d.weekday() < 5:
count += 1
d += timedelta(days=1)
return count