Overall Statistics Total Trades 29 Average Win 6.43% Average Loss -0.96% Compounding Annual Return 12.087% Drawdown 33.800% Expectancy 5.068 Net Profit 493.389% Sharpe Ratio 0.811 Probabilistic Sharpe Ratio 16.329% Loss Rate 21% Win Rate 79% Profit-Loss Ratio 6.72 Alpha 0.113 Beta -0.07 Annual Standard Deviation 0.131 Annual Variance 0.017 Information Ratio 0.082 Tracking Error 0.229 Treynor Ratio -1.518 Total Fees \$256.69
import numpy as np
from scipy.optimize import minimize
import statsmodels.api as sm

def MonthDiff(d1, d2):
return (d1.year - d2.year) * 12 + d1.month - d2.month

def Return(values):
return (values[-1] - values[0]) / values[0]

def Volatility(values):
values = np.array(values)
returns = (values[1:] - values[:-1]) / values[:-1]
return np.std(returns)

def MultipleLinearRegresion(x, y):
x = np.array(x).T
result = sm.OLS(endog=y, exog=x).fit()
return result

# Custom fee model
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))

# Quandl free data
class QuandlFutures(PythonQuandl):
def __init__(self):
self.ValueColumnName = "settle"

# Quandl "value" data
class QuandlValue(PythonQuandl):
def __init__(self):
self.ValueColumnName = 'Value'

# Quandl short interest data.
class QuandlFINRA_ShortVolume(PythonQuandl):
def __init__(self):
self.ValueColumnName = 'SHORTVOLUME'    # also 'TOTALVOLUME' is accesible

# NOTE: IMPORTANT: Data order must be ascending (datewise).
def GetSource(self, config, date, isLiveMode):
return SubscriptionDataSource("data.quantpedia.com/backtesting_data/futures/cot/{0}.PRN".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

# File example.
# DATE   OPEN     HIGH        LOW       CLOSE     VOLUME   OI
# ----   ----     ----        ---       -----     ------   --
# DATE   LARGE    SPECULATOR  COMMERCIAL HEDGER   SMALL TRADER
#        LONG     SHORT       LONG      SHORT     LONG     SHORT
def Reader(self, config, line, date, isLiveMode):
data.Symbol = config.Symbol

if not line[0].isdigit(): return None
split = line.split(',')

data.Time = datetime.strptime(split[0], "%Y%m%d") + timedelta(days=1)

data['LARGE_SPECULATOR_LONG'] = int(split[1])
data['LARGE_SPECULATOR_SHORT'] = int(split[2])
data['COMMERCIAL_HEDGER_LONG'] = int(split[3])
data['COMMERCIAL_HEDGER_SHORT'] = int(split[4])
data['open_interest'] = int(split[1]) + int(split[2]) + int(split[3]) + int(split[4]) + int(split[5]) + int(split[6])
data.Value = int(split[1])

return data

# Quantpedia bond yield data.
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaIndices(PythonData):
def GetSource(self, config, date, isLiveMode):
return SubscriptionDataSource("data.quantpedia.com/backtesting_data/index/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

def Reader(self, config, line, date, isLiveMode):
data = QuantpediaIndices()
data.Symbol = config.Symbol

if not line[0].isdigit(): return None
split = line.split(',')

data.Time = datetime.strptime(split[0], "%Y-%m-%d") + timedelta(days=1)
data['close'] = float(split[1])
data.Value = float(split[1])

return data

# Quantpedia bond yield data.
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaBondYield(PythonData):
def GetSource(self, config, date, isLiveMode):
return SubscriptionDataSource("data.quantpedia.com/backtesting_data/bond_yield/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

def Reader(self, config, line, date, isLiveMode):
data = QuantpediaBondYield()
data.Symbol = config.Symbol

if not line[0].isdigit(): return None
split = line.split(',')

data.Time = datetime.strptime(split[0], "%Y-%m-%d") + timedelta(days=1)
data['yield'] = float(split[1])
data.Value = float(split[1])

return data

# Quantpedia data.
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaFutures(PythonData):
def GetSource(self, config, date, isLiveMode):
return SubscriptionDataSource("data.quantpedia.com/backtesting_data/futures/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

def Reader(self, config, line, date, isLiveMode):
data = QuantpediaFutures()
data.Symbol = config.Symbol

if not line[0].isdigit(): return None
split = line.split(';')

data.Time = datetime.strptime(split[0], "%d.%m.%Y") + timedelta(days=1)
data['spliced'] = float(split[2])
data.Value = float(split[1])

return data

# NOTE: IMPORTANT: Data order must be ascending (datewise).
def GetSource(self, config, date, isLiveMode):
return SubscriptionDataSource("data.quantpedia.com/backtesting_data/futures/cot/{0}.PRN".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

# File example.
# DATE   OPEN     HIGH        LOW       CLOSE     VOLUME   OI
# ----   ----     ----        ---       -----     ------   --
# DATE   LARGE    SPECULATOR  COMMERCIAL HEDGER   SMALL TRADER
#        LONG     SHORT       LONG      SHORT     LONG     SHORT
def Reader(self, config, line, date, isLiveMode):
data.Symbol = config.Symbol

if not line[0].isdigit(): return None
split = line.split(',')

data.Time = datetime.strptime(split[0], "%Y%m%d") + timedelta(days=1)

data['LARGE_SPECULATOR_LONG'] = int(split[1])
data['LARGE_SPECULATOR_SHORT'] = int(split[2])
data['COMMERCIAL_HEDGER_LONG'] = int(split[3])
data['COMMERCIAL_HEDGER_SHORT'] = int(split[4])

data.Value = int(split[1])

return data

# NOTE: Manager for new trades. It's represented by certain count of equally weighted brackets for long and short positions.
# If there's a place for new trade, it will be managed for time of holding period.
def __init__(self, algorithm, long_size, short_size, holding_period):
self.algorithm = algorithm  # algorithm to execute orders in.

self.long_size = long_size
self.short_size = short_size

self.long_len = 0
self.short_len = 0

# Arrays of ManagedSymbols
self.symbols = []

self.holding_period = holding_period    # Days of holding.

managed_symbol = ManagedSymbol(symbol, self.holding_period, long_flag)

if long_flag:
# If there's a place for it.
if self.long_len < self.long_size:
self.symbols.append(managed_symbol)
self.algorithm.SetHoldings(symbol, 1 / self.long_size)
self.long_len += 1
else:

else:
# If there's a place for it.
if self.short_len < self.short_size:
self.symbols.append(managed_symbol)
self.algorithm.SetHoldings(symbol, - 1 / self.short_size)
self.short_len += 1
else:

# Decrement holding period and liquidate symbols.
def TryLiquidate(self):
symbols_to_delete = []
for managed_symbol in self.symbols:
managed_symbol.days_to_liquidate -= 1

# Liquidate.
if managed_symbol.days_to_liquidate == 0:
symbols_to_delete.append(managed_symbol)
self.algorithm.Liquidate(managed_symbol.symbol)

if managed_symbol.long_flag: self.long_len -= 1
else: self.short_len -= 1

# Remove symbols from management.
for managed_symbol in symbols_to_delete:
self.symbols.remove(managed_symbol)

def LiquidateTicker(self, ticker):
symbol_to_delete = None
for managed_symbol in self.symbols:
if managed_symbol.symbol.Value == ticker:
self.algorithm.Liquidate(managed_symbol.symbol)
symbol_to_delete = managed_symbol
if managed_symbol.long_flag: self.long_len -= 1
else: self.short_len -= 1

break

if symbol_to_delete: self.symbols.remove(symbol_to_delete)
else: self.algorithm.Debug("Ticker is not held in portfolio!")

class ManagedSymbol():
def __init__(self, symbol, days_to_liquidate, long_flag):
self.symbol = symbol
self.days_to_liquidate = days_to_liquidate
self.long_flag = long_flag

class PortfolioOptimization(object):
def __init__(self, df_return, risk_free_rate, num_assets):
self.daily_return = df_return
self.risk_free_rate = risk_free_rate
self.n = num_assets # numbers of risk assets in portfolio
self.target_vol = 0.05

def annual_port_return(self, weights):
# calculate the annual return of portfolio
return np.sum(self.daily_return.mean() * weights) * 252

def annual_port_vol(self, weights):
# calculate the annual volatility of portfolio
return np.sqrt(np.dot(weights.T, np.dot(self.daily_return.cov() * 252, weights)))

def min_func(self, weights):
# method 1: maximize sharp ratio
return - self.annual_port_return(weights) / self.annual_port_vol(weights)

# method 2: maximize the return with target volatility
#return - self.annual_port_return(weights) / self.target_vol

def opt_portfolio(self):
# maximize the sharpe ratio to find the optimal weights
cons = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
bnds = tuple((0, 1) for x in range(2)) + tuple((0, 0.25) for x in range(self.n - 2))
opt = minimize(self.min_func,                               # object function
np.array(self.n * [1. / self.n]),            # initial value
method='SLSQP',                              # optimization method
bounds=bnds,                                 # bounds for variables
constraints=cons)                            # constraint conditions

opt_weights = opt['x']

return opt_weights
# https://quantpedia.com/strategies/fed-model/
#
# Each month, the investor conducts a one-month predictive regression (using all available data up to that date) predicting excess stock market
# returns using the yield gap as an independent variable. The “Yield gap” is calculated as YG = EY − y, with earnings yield EY ≡ ln (1 ++ E/P)
# and y = ln (1 ++ Y) is the log 10 year Treasury bond yield. Then, the strategy allocates 100% in the risky asset if the forecasted excess
# returns are positive, and otherwise, it invests 100% in the risk-free rate.

from collections import deque
import fk_tools
import numpy as np
from scipy import stats

class ReversalYieldChangeFactor(QCAlgorithm):

def Initialize(self):
self.SetStartDate(2005, 1, 1)
self.SetCash(100000)

# Monthly price data and yield gap data.
self.data = {}

self.period = 12 * 21
self.SetWarmUp(self.period)

self.market = 'SPY'
self.data[self.market] = deque()

self.cash = 'SHY'

# Risk free rate.

# 10Y bond yield symbol.
self.bond_yield = 'US10YT'

# SP500 earnings yield data.
self.sp_earnings_yield = 'MULTPL/SP500_EARNINGS_YIELD_MONTH'

self.data['yield_gap'] = deque()

self.last_month = -1
self.Schedule.On(self.DateRules.MonthStart(self.market), self.TimeRules.AfterMarketOpen(self.market), self.Rebalance)

def OnData(self, data):
# Update only on month change. It allows us to prefetch data with WarmUp period.
if self.last_month == self.Time.month:
return
self.last_month = self.Time.month

# Update market price data.
if self.Securities.ContainsKey(self.market) and self.Securities.ContainsKey(self.risk_free_rate):
market_price = self.Securities[self.market].Price
rf_rate = self.Securities[self.risk_free_rate].Price

if market_price != 0 and rf_rate != 0:
self.data[self.market].append((market_price, rf_rate))
else:
# Append previous data as a next one in case there's 0 as price.
if len(self.data[self.market]) > 0:
last_data = self.data[self.market][-1]
self.data[self.market].append(last_data)

# Update SP earnings yield.
if self.Securities.ContainsKey(self.bond_yield) and self.Securities.ContainsKey(self.sp_earnings_yield):
bond_yield = self.Securities[self.bond_yield].Price
sp_ey = self.Securities[self.sp_earnings_yield].Price
if bond_yield != 0 and sp_ey != 0:
yield_gap = np.log(sp_ey) - np.log(bond_yield)
self.data['yield_gap'].append(yield_gap)
else:
# Append previous data as a next one in case there's 0 as price.
if len(self.data['yield_gap']) > 0:
last_data = self.data['yield_gap'][-1]
self.data['yield_gap'].append(last_data)

def Rebalance(self):
# Ensure minimum data points to calculate regression.
min_count = 6
if len(self.data[self.market]) >= min_count and len(self.data['yield_gap']) >= min_count:
market_closes = np.array([x[0] for x in self.data[self.market]])
market_returns = (market_closes[1:] - market_closes[:-1]) / market_closes[:-1]
rf_rates = np.array([x[1] for x in self.data[self.market]][1:])
excess_returns = market_returns - rf_rates

yield_gaps = [x for x in self.data['yield_gap']]

# Linear regression calc.
# Y = α + (β ∗ X)
# intercept = alpha
# slope = beta
beta, alpha, r_value, p_value, std_err = stats.linregress(yield_gaps[:-1], market_returns[1:])
X = yield_gaps[-1]

# Predicted market return.
Y = alpha + (beta * X)