| Overall Statistics |
|
Total Orders 5641 Average Win 0.11% Average Loss -0.09% Compounding Annual Return 2.991% Drawdown 24.400% Expectancy 0.057 Start Equity 1000000 End Equity 1158829.41 Net Profit 15.883% Sharpe Ratio -0.07 Sortino Ratio -0.087 Probabilistic Sharpe Ratio 1.778% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.26 Alpha -0.009 Beta -0.01 Annual Standard Deviation 0.132 Annual Variance 0.017 Information Ratio -0.344 Tracking Error 0.194 Treynor Ratio 0.953 Total Fees $57266.06 Estimated Strategy Capacity $580000.00 Lowest Capacity Asset TBF UF9WRZG9YA1X Portfolio Turnover 21.09% Drawdown Recovery 1002 |
# region imports
from AlgorithmImports import *
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
from regression_function import fit_regression_model
# endregion
class VerticalParticleShield(QCAlgorithm):
def initialize(self):
self.set_start_date(self.end_date - timedelta(5 * 365))
self.set_cash(1_000_000)
self.set_benchmark("SPY")
self.set_portfolio_construction(EqualWeightingPortfolioConstructionModel())
# Seed prices so treasury ETFs are tradeable immediately on universe entry.
self.settings.seed_initial_prices = True
# Treasury long ETFs and their inverse counterparts used for insight generation.
self._long_etfs = [
self.add_equity(symbol) for symbol in
["IEF", "SHY", "TLT", "IEI", "TLH", "BIL", "SPTL", "TMF", "SCHO", "SCHR", "SPTS", "GOVT"]
]
self._inverse_etfs = [
self.add_equity(symbol) for symbol in ["SHV", "TBT", "TBF", "TMV"]
]
# Add the yield curve data.
self._yield_curve = self.add_data(USTreasuryYieldCurveRate, "USTYCR", Resolution.DAILY).symbol
# Add a warm-up period so the algorithm immediately trades on deployment.
self.set_warm_up(timedelta(10))
def on_warmup_finished(self):
# Add a Scheduled Event to rebalance the portfolio each week.
time_rule = self.time_rules.after_market_open('SPY', 5)
self.schedule.on(self.date_rules.week_start('SPY'), time_rule, self._run_regression)
# Rebalance today too.
if self.live_mode:
self._run_regression()
else:
self.schedule.on(self.date_rules.today, time_rule, self._run_regression)
def _run_regression(self):
qb = self
# Get history.
history = qb.history(self._yield_curve, 100)
if history.empty:
return
# Get prices and returns.
bonds = history.loc[self._yield_curve].pct_change().replace([np.inf, -np.inf], np.nan).ffill().bfill().fillna(0)
#### Prepare data set -- feature names, training set, and testing set.
training = bonds.iloc[:-1].copy()
testing = bonds.iloc[-1:].copy()
# Find number of components to explain > 95% of variance of treasury prices.
pca = PCA(n_components=0.95)
# Fit the PCA model to our training data.
pca.fit(training)
# Initialize the regression model selected in the research notebook.
model = RandomForestRegressor(random_state=0, n_estimators=100)
# Fit the regression model and return predictions.
results = fit_regression_model(pca, model, training, testing)
# Find out if the prediction is up or down relative to current price.
going_up = results.mean(axis=1).values[0] > 0
up_group = [s for s in (self._long_etfs if going_up else self._inverse_etfs) if s.price]
flat_group = [s for s in (self._inverse_etfs if going_up else self._long_etfs) if s.price]
insights = [Insight.price(s, timedelta(days=7), InsightDirection.UP) for s in up_group]
insights += [Insight.price(s, timedelta(days=1), InsightDirection.FLAT) for s in flat_group]
self.emit_insights(insights)# region imports
import statsmodels.api as sm
from sklearn.model_selection import GridSearchCV
from AlgorithmImports import *
# endregion
def fit_regression_model(pca, model, training, testing, alpha=False):
estimators = []
y_predicted = []
y_actual = []
# Project the training data onto the principal components.
x_projected = pca.transform(training)
# Fit a lagged model for each principal component.
for i in range(pca.n_components_):
# Lag x by one period relative to y so x predicts the next y.
X = x_projected[:-1, i]
y = x_projected[1:, i]
y_actual.append(y)
X = sm.add_constant(X)
if alpha:
grid_search = GridSearchCV(model, {"alpha": np.arange(10)})
grid_search.fit(X, y)
est = model.fit(X, y)
estimators.append(est)
y_predicted.append(model.predict(X))
y_predicted = np.array(y_predicted).transpose()
y_actual = np.array(y_actual).transpose()
# Transform predictions back into the original feature space.
y_actual_orig = pca.inverse_transform(y_actual)
y_predicted_orig = pca.inverse_transform(y_predicted)
train_sse = np.sum((y_predicted_orig - y_actual_orig) ** 2)
# Project the test observation and predict each component's next value.
testing_proj = pca.transform(testing)
testing_prediction = []
for i in range(pca.n_components_):
row = np.array([[1, testing_proj[0, i]]])
testing_prediction.append(model.predict(row)[0])
predictions = pca.inverse_transform(testing_prediction)
return pd.DataFrame({"Predicted": predictions}, index=testing.columns)