from AlgorithmImports import *
from typing import List
class LowVolatilityFactor(QCAlgorithm):
def initialize(self):
self.set_start_date(2020, 1, 1)
self.set_cash(1000000)
self.set_benchmark("SPY")
self.universe_settings.resolution = Resolution.DAILY
self._spy = self.add_equity("SPY", Resolution.DAILY).symbol
self._selection_data_by_symbol = {}
self._universe = self.add_universe(self._select_fine)
self.settings.seed_initial_prices = True
self.set_warm_up(61, Resolution.DAILY)
self.schedule.on(
self.date_rules.month_start(self._spy),
self.time_rules.after_market_open(self._spy, 5),
self._rebalance
)
def _select_fine(self, fine: List[Fundamental]) -> List[Symbol]:
top_500 = sorted(fine, key=lambda f: f.market_cap, reverse=True)[:500]
ready_stocks = [
f for f in top_500
if self._selection_data_by_symbol.setdefault(f.symbol, SelectionData(self, f)).update(f)
]
for symbol in self._selection_data_by_symbol.keys() - {f.symbol for f in top_500}:
del self._selection_data_by_symbol[symbol]
if self.is_warming_up:
return []
vol_by_symbol = {
f.symbol: self._selection_data_by_symbol[f.symbol].volatility
for f in ready_stocks
}
vol_by_symbol = {s: v for s, v in vol_by_symbol.items() if v is not None and v > 0}
if len(vol_by_symbol) == 0:
return []
sorted_symbols = sorted(vol_by_symbol.items(), key=lambda x: x[1])
selected = [s for s, _ in sorted_symbols[:30]]
return selected
def _rebalance(self):
selected = list(self._universe.selected)
if len(selected) == 0:
return
weight = 1.0 / len(selected)
targets = [PortfolioTarget(symbol, weight) for symbol in selected]
self.set_holdings(targets, liquidate_existing_holdings=True)
class SelectionData:
def __init__(self, algorithm, f):
self._algorithm = algorithm
self._price_scale_factor = f.price_scale_factor
self._prev_price = float(f.price)
self._std = StandardDeviation(60)
def update(self, f):
price = float(f.price)
if f.price_scale_factor != self._price_scale_factor:
self._price_scale_factor = f.price_scale_factor
self._std.reset()
history = self._algorithm.history[TradeBar](
f.symbol,
61,
Resolution.DAILY,
data_normalization_mode=DataNormalizationMode.SCALED_RAW
)
if len(history) >= 2:
prev = float(history[0].close)
for bar in history[1:]:
curr = float(bar.close)
ret = curr / prev - 1.0
self._std.update(bar.end_time, ret)
prev = curr
self._prev_price = prev
ret = price / prev - 1.0
self._std.update(f.end_time, ret)
self._prev_price = price
else:
self._prev_price = price
return self._std.is_ready
ret = price / self._prev_price - 1.0
self._std.update(f.end_time, ret)
self._prev_price = price
return self._std.is_ready
@property
def volatility(self):
if not self._std.is_ready:
return None
return float(self._std.current.value)