| Overall Statistics |
|
Total Trades 1778 Average Win 0.42% Average Loss -0.19% Compounding Annual Return 9.734% Drawdown 13.000% Expectancy 0.019 Net Profit 1.986% Sharpe Ratio 0.472 Probabilistic Sharpe Ratio 39.811% Loss Rate 68% Win Rate 32% Profit-Loss Ratio 2.19 Alpha 0.61 Beta -0.483 Annual Standard Deviation 0.423 Annual Variance 0.179 Information Ratio -1.404 Tracking Error 0.463 Treynor Ratio -0.414 Total Fees $14352.15 |
import numpy as np
import pandas as pd
import scipy
from sklearn import preprocessing
from sklearn.metrics import roc_auc_score
from sklearn.dummy import DummyClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.pipeline import FeatureUnion, make_pipeline
from sklearn.impute import SimpleImputer, MissingIndicator
class MagicConch(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 2) # Set Start Date
self.SetEndDate(2020, 3, 31) # Set End Date
self.SetCash(100000) # Set Strategy Cash
self.etf_tickers = [
"EDC",
"EDZ",
"TYD",
"TYO",
"WTID",
"WTIU",
"SPXL",
"SPXS",
"TQQQ",
"SQQQ",
"TECL",
"TECS",
"DUST",
"NUGT",
"TVIX",
]
self.trading_whitelist = self.etf_tickers
self.trading_interval = 15
self.history_intervals = [
2, 5, 10, 20, 30, 45, 60
]
self.classifier_days = 5
self.classifier_split = 0.2
self.classifier_min_confidence = 0.3
self.label_thresholds = 0.02
self.label_window_sizes = [
15, 30, 45, 60
]
self.label_window_weights = [
0.53, 0.53 * 0.5, 0.53 * 0.25, 0.53 * 0.125
]
self.symbols = {
ticker: self.AddEquity(ticker, Resolution.Minute).Symbol
for ticker in self.etf_tickers
}
self.field_getters = [
"Average",
"Close",
# "High",
# "Low",
# "Median",
# "Open",
# "SevenBar",
# "Typical",
"Volume",
# "Weighted"
]
self.indicators = {}
for ticker in self.etf_tickers:
for duration in self.history_intervals:
for field in self.field_getters:
feature_prefix = f"{duration}_{ticker}_{field.lower()}_"
self.indicators[feature_prefix + "sma"] = self.SMA(ticker, duration, Resolution.Minute, getattr(Field, field))
self.indicators[feature_prefix + "apo"] = self.APO(ticker, duration, duration * 2, MovingAverageType.Simple, Resolution.Minute, getattr(Field, field))
self.SetWarmUp(60, Resolution.Minute)
self.AddEquity("SPY", Resolution.Minute)
self.SetBenchmark("SPY")
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", -30), self.PreMarketOpen)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 17), self.PreMarketClose)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", -30), self.PostMarketClose)
self.tick = 0
self.trading_end = False
self.today_features = []
self.daily_features = []
self.models = {}
def KillSwitch(self):
self.trading_end = True
self.Liquidate()
def PreMarketOpen(self):
self.tick = 0
self.trading_end = False
self.today_features = []
if len(self.daily_features) >= self.classifier_days:
self.models = self.BuildModels()
def PreMarketClose(self):
if not self.trading_end:
self.KillSwitch()
def PostMarketClose(self):
current_time = self.Time
ds = str(datetime.date(current_time))
today_df = pd.DataFrame(self.today_features)
self.daily_features.append(today_df)
self.Debug(f"On {ds}, training data shape: {today_df.shape}")
self.Debug(f"On {ds}, last logged feature: {self.today_features[-1]}")
if len(self.daily_features) > self.classifier_days:
self.daily_features.pop(0)
def ExtractFeatures(self, data):
current_time = self.UtcTime
features = {"timestamp": current_time}
midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
secs_since_midnight = current_time - midnight
features["secs_since_midnight"] = secs_since_midnight.seconds
for ticker in self.etf_tickers:
for field in self.field_getters:
if data.ContainsKey(ticker):
value = getattr(Field, field)(data[ticker])
else:
value = np.nan
features[f"{ticker}_{field.lower()}_current"] = value
for feature_name, indicator in self.indicators.items():
if indicator.IsReady:
features[feature_name] = indicator.Current.Value
else:
features[feature_name] = np.nan
self.today_features.append(features)
return features
def BuildLabels(self, data):
asset_names = self.etf_tickers
data["timestamp"] = pd.to_datetime(data["timestamp"], utc=True)
data["timestamp"] = data["timestamp"].dt.floor("Min")
data = data.set_index("timestamp")
for asset_name in asset_names:
for side in ["buy", "sell"]:
for window_size in self.label_window_sizes:
column_prefix = "{asset_name}_".format(asset_name=asset_name)
close_column = column_prefix + "close_current"
label_column = column_prefix + f"{side}_{window_size}m_label"
if close_column in data:
price_value = data[close_column]
resampled_price_value = price_value.resample(
pd.Timedelta(minutes=1)
).fillna(method="nearest")
max_in_frame = (
resampled_price_value.rolling(window_size).max().shift(-window_size)
)
max_in_frame = max_in_frame[price_value.index]
change = max_in_frame / price_value - 1
change = change.fillna(0.0)
if side == "buy":
label = change > self.label_thresholds
else:
label = change < -self.label_thresholds
ctr = label.dropna().sum() / len(label)
# self.Debug(f"{label_column} CTR: {ctr:.4f}")
else:
label = 0.0
data[label_column] = label
return data
def BuildModels(self):
current_time = self.Time
ds = str(datetime.date(current_time))
asset_names = self.etf_tickers
train_df = pd.DataFrame()
for i, day_df in enumerate(self.daily_features):
labeled_df = self.BuildLabels(day_df)
train_df = train_df.append(labeled_df, ignore_index=True)
self.Log(f"Model on {ds}, training data shape: {train_df.shape}")
# train_df = train_df.drop(columns=["timestamp"])
# train_df, test_df = train_test_split(
# train_df, test_size=self.classifier_split
# )
label_names = [
label_name for label_name in train_df.columns if "label" in label_name
]
training_data = train_df.drop(label_names, axis=1)
# training_data = training_data.replace([-np.inf, np.inf], np.nan)
X_train_raw = training_data.values
feature_transformer = FeatureUnion(
transformer_list=[
("features", SimpleImputer(strategy="constant", fill_value=0.0)),
("indicators", MissingIndicator(features="all")),
]
)
preprocessor = make_pipeline(feature_transformer, preprocessing.StandardScaler())
X_train = preprocessor.fit_transform(X_train_raw)
models = {}
for asset in asset_names:
models[asset] = {}
for window_size in self.label_window_sizes:
models[asset][window_size] = {}
for side in ["buy", "sell"]:
column_prefix = "{asset}_{side}_{window_size}m_".format(
asset=asset, side=side, window_size=window_size
)
if (column_prefix + "label") in train_df.columns:
pos_y_train =(
train_df[column_prefix + "label"].values == True
).astype(np.float64)
neg_y_train = (
train_df[column_prefix + "label"].values == False
).astype(np.float64)
else:
example_nums, _ = X_train.shape
pos_y_train = np.zeros([example_nums])
neg_y_train = np.zeros([example_nums])
if pos_y_train.sum() > 0 and neg_y_train.sum() > 0:
clf = LogisticRegression(max_iter=1200000)
# clf = RandomForestClassifier()
sampling_weight = (
neg_y_train.sum() / pos_y_train.sum()
) * pos_y_train
sampling_weight += neg_y_train
else:
clf_output = 1 if pos_y_train.sum() > 0 else 0
pos_y_train = np.zeros_like(pos_y_train, dtype=np.float64)
pos_y_train[-1] = 1.0
clf = DummyClassifier("constant", constant=clf_output)
sampling_weight = np.ones_like(pos_y_train, dtype=np.float64)
clf = clf.fit(X_train, pos_y_train, sample_weight=sampling_weight)
model = {
"features": list(training_data.columns),
"preprocessor": preprocessor,
"predictor": clf,
"ds": ds,
}
models[asset][window_size][side] = model
return models
def Predict(self, model, features_map):
feature_names = model["features"]
preprocessor = model["preprocessor"]
predictor = model["predictor"]
features = np.array([features_map[feature_name] for feature_name in feature_names])
features = np.expand_dims(features, axis=0)
# features = np.nan_to_num(features, copy=True)
features = preprocessor.transform(features)
prob = predictor.predict_proba(features)[0, 1]
return prob
def CalculateDistribution(self, features_map):
current_time = self.Time
ds = str(datetime.date(current_time))
asset_names = self.trading_whitelist
models = self.models
results = {asset: {"buy": 0.0, "sell": 0.0} for asset in asset_names}
for window_size, window_weight in zip(
self.label_window_sizes,
self.label_window_weights,
):
for asset in asset_names:
results[asset]["buy"] += (
self.Predict(models[asset][window_size]["buy"], features_map) * window_weight
)
results[asset]["sell"] += (
self.Predict(models[asset][window_size]["sell"], features_map)
* window_weight
)
asset_confidence = np.array(
[results[asset]["buy"] - results[asset]["sell"] for asset in asset_names]
)
asset_liquidate = asset_confidence <= self.classifier_min_confidence
if asset_liquidate.all():
result = np.zeros_like(asset_liquidate, dtype=np.float64).tolist()
else:
asset_distribution = asset_confidence - 10 * asset_liquidate
exp_range = np.exp(asset_distribution)
asset_distribution = exp_range / exp_range.sum()
result = asset_distribution.tolist()
return result
def OnData(self, data):
self.tick += 1
asset_features = self.ExtractFeatures(data)
if not self.trading_end and self.models and (self.tick % self.trading_interval == 0):
asset_distribution = self.CalculateDistribution(asset_features)
for asset_name, percent in zip(self.trading_whitelist, asset_distribution):
self.Log(f"{asset_name:>20}: {percent:.4f}")
self.SetHoldings(asset_name, percent)