I'm writing my algorithm in python, and my algorithm doesn't complete a run when I execute it over a 15 year period. The data resolution is daily, and I make investing decisions every month, so the amount of computation that goes into the algorithm should not be unreasonable.

Over time, I find that the algorithm's memory footprint, as reported by the server stats, increases. For the first couple of years it is at 3G. Then it starts hitting 6G at about year 12, and then just fails. So, I appear to have a memory or resource leak. I'm not sure of the best way to go about tracking down this memory leak.

Relevant to my question may be that I keep track of price histories in pandas series, rather than in indicators. When doing coarse selection, I am getting 3 months of close prices for ~2000 securities. And when doing alpha evaluation my universe is at about 400 symbols, with one year of price history. I try to ensure I am not making unnecessary history calls. Here's the code that manages price histories. It is missing documentation, but I hope you find it readable. I'm basically attempting a simple algorithm: price momentum over a low variance universe.

Suggestions for improving performance will be much appreciated.

import pandas as pd

from QuantConnect import Resolution


class PriceHistoryManager:
def __init__(self, num_bars):
self._num_bars = num_bars
self._close_histories = {}
self._last_time = None

@property
def close_histories(self):
return self._close_histories

def apply_fetched_close_histories(self, algorithm, history_df, updated_closes):
history_by_syms = history_df["close"].unstack(level=0)
for s in history_by_syms.columns:
s_new_hist = history_by_syms[s]
s_sym = algorithm.Symbol(s)
if s_sym in updated_closes:
s_old_hist = updated_closes[s_sym]
updated_closes[s_sym] = pd.concat([s_old_hist, s_new_hist]).sort_index()[-self._num_bars:]
else:
updated_closes[s_sym] = s_new_hist.sort_index()

def update_close_histories(self, algorithm, symbols):
updated_closes = {}
symbols_to_update = []
new_symbols = []
# For each symbol that already has history, copy that history to the next_indicators.
for s in symbols:
if s in self._close_histories:
symbols_to_update.append(s)
updated_closes[s] = self._close_histories[s]
else:
new_symbols.append(s)
# If there are symbols to update, get the incremental history
if symbols_to_update:
# Because we drop all history that isn't right now relevant, we are assured that all the
# history we require will be from the last run to now.
inc_history = algorithm.History(symbols_to_update, self._last_time, algorithm.Time, Resolution.Daily)
self.apply_fetched_close_histories(algorithm, inc_history, updated_closes)
# If there are new symbols, get the full history
if new_symbols:
full_history = algorithm.History(new_symbols, self._num_bars, Resolution.Daily)
self.apply_fetched_close_histories(algorithm, full_history, updated_closes)
self._close_histories = updated_closes
self._last_time = algorithm.Time
assert len(self._close_histories) == len(symbols)
for s in symbols:
assert len(self._close_histories[s]) <= self._num_bars

def split_at_ratio(self, ratio, symbols=None):
assert 1.0 >= ratio >= 0.0
return self.split_at(int(self._num_bars * ratio), symbols)

def split_at(self, bars, symbols=None):
return self.tail(bars, symbols), self.head(self._num_bars - bars, symbols)

def tail(self, bars, symbols=None):
"""Grabs the oldest bars."""
assert bars >= 0
symbols = symbols and set(symbols)
return {sym: history[:bars] for sym, history in self._close_histories.items()
if symbols is None or sym in symbols}

def head(self, bars, symbols=None):
"""Grabs the newest bars"""
assert bars >= 0
return {sym: history[-bars:] for sym, history in self._close_histories.items()
if symbols is None or sym in symbols}