import pandas as pd
import numpy as np
import cvxpy as cv
class OptimisationPortfolioConstructionModel():
def __init__(self, turnover, max_wt, longshort):
self.turnover = turnover
self.max_wt = max_wt
self.longshort = longshort
def GenerateOptimalPortfolio(self, algorithm, alpha_df):
# algorithm.Log("Generating target portfolio...")
alphas = alpha_df['alpha_score']
optimal_portfolio = self.Optimise(algorithm, self.AddZeroHoldings(algorithm, alphas))
# algorithm.Log(f"Created a portfolio of {len(optimal_portfolio[optimal_portfolio != 0])} securities")
return optimal_portfolio
def AddZeroHoldings(self, algorithm, portfolio):
zero_holding_securities = [str(s.Symbol) for s in algorithm.Portfolio.Values if s.Invested and str(s.Symbol) not in portfolio.index]
for security in zero_holding_securities:
portfolio.loc[security] = 0
return portfolio
def Optimise(self, algorithm, alphas):
invested_securities = [security for security in algorithm.Portfolio.Values if security.Invested]
if len(invested_securities) == 0:
algorithm.Log('Initial portfolio rebalance')
self.initial_rebalance = True
turnover = 1
initial_portfolio = pd.DataFrame(columns=['symbol', 'weight', 'alpha']).set_index('symbol')
else:
self.initial_rebalance = False
turnover = self.turnover
initial_portfolio = pd.DataFrame.from_records(
[
{
'symbol': str(security.Symbol),
'weight': security.HoldingsValue / algorithm.Portfolio.TotalHoldingsValue,
'alpha': alphas.loc[security] if security in alphas.index else 0,
} for security in invested_securities
]).set_index('symbol')
for security, alpha in alphas.iteritems():
if security not in initial_portfolio.index:
initial_portfolio.loc[security, 'weight'] = 0
initial_portfolio.loc[security, 'alpha'] = alpha
for i in range(int(turnover*100), 101, 1):
to = i / 100
optimiser = Optimiser(initial_portfolio, turnover=to, max_wt=self.max_wt)
optimal_portfolio, optimisation_status = optimiser.optimise()
if optimisation_status != 'optimal':
algorithm.Log(f'Optimisation with {to} turnover not feasible: {optimisation_status}')
else:
break
return optimal_portfolio
class Optimiser:
def __init__(self, initial_portfolio, turnover, max_wt, longshort=True):
self.symbols = np.array(initial_portfolio.index)
self.init_wt = np.array(initial_portfolio['weight'])
self.opt_wt = cv.Variable(self.init_wt.shape)
self.alpha = np.array(initial_portfolio['alpha'])
self.longshort = longshort
self.turnover = turnover
self.max_wt = max_wt
if self.longshort:
self.min_wt = -self.max_wt
self.net_exposure = 0
self.gross_exposure = 1
else:
self.min_wt = 0
self.net_exposure = 1
self.gross_exposure = 1
def optimise(self):
constraints = self.get_constraints()
optimisation = cv.Problem(cv.Maximize(cv.sum(self.opt_wt*self.alpha)), constraints)
optimisation.solve()
status = optimisation.status
if status == 'optimal':
optimal_portfolio = pd.Series(np.round(optimisation.solution.primal_vars[list(optimisation.solution.primal_vars.keys())[0]], 3), index=self.symbols)
else:
optimal_portfolio = pd.Series(np.round(self.init_wt, 3), index=self.symbols)
return optimal_portfolio, status
def get_constraints(self):
min_wt = self.opt_wt >= self.min_wt
max_wt = self.opt_wt <= self.max_wt
turnover = cv.sum(cv.abs(self.opt_wt-self.init_wt)) <= self.turnover*2
net_exposure = cv.sum(self.opt_wt) == self.net_exposure
gross_exposure = cv.sum(cv.abs(self.opt_wt)) <= self.gross_exposure
return [min_wt, max_wt, turnover, net_exposure, gross_exposure]
class FactorUniverseSelectionModel():
def __init__(self, algorithm):
self.algorithm = algorithm
def SelectCoarse(self, coarse):
# self.algorithm.Log("Generating universe...")
universe = self.FilterDollarPriceVolume(coarse)
return [c.Symbol for c in universe]
def SelectFine(self, fine):
universe = self.FilterFactor(self.FilterFinancials(fine))
# self.algorithm.Log(f"Universe consists of {len(universe)} securities")
self.algorithm.securities = universe
return [f.Symbol for f in universe]
def FilterDollarPriceVolume(self, coarse):
filter_dollar_price = [c for c in coarse if c.Price > 1]
sorted_dollar_volume = sorted([c for c in filter_dollar_price if c.HasFundamentalData], key=lambda c: c.DollarVolume, reverse=True)
return sorted_dollar_volume[:1000]
def FilterFinancials(self, fine):
filter_financials = [f for f in fine if f.AssetClassification.MorningstarSectorCode != MorningstarSectorCode.FinancialServices]
return filter_financials
def FilterFactor(self, fine):
filter_factor = sorted(fine, key=lambda f: f.ValuationRatios.CashReturn, reverse=True)
return filter_factor[:50] + filter_factor[-50:]
class Execution():
def __init__(self, liq_tol):
self.liq_tol = liq_tol
def ExecutePortfolio(self, algorithm, portfolio):
# algorithm.Log(f"Executing portfolio trades...")
liquidate_securities = portfolio[abs(portfolio) < self.liq_tol].index
holding_port = portfolio[abs(portfolio) >= self.liq_tol]
self.LiquidateSecurities(algorithm, liquidate_securities)
self.SetPortfolioHoldings(algorithm, holding_port)
def LiquidateSecurities(self, algorithm, securities):
liquid_count = 0
for security in securities:
if algorithm.Securities[security].Invested:
algorithm.Liquidate(security)
liquid_count += 1
# algorithm.Log(f"Successfully liquidated {liquid_count} securities")
def SetPortfolioHoldings(self, algorithm, portfolio):
# algorithm.Log(f"Setting portfolio holdings for {len(portfolio)} securities...")
for security, weight in portfolio.iteritems():
algorithm.SetHoldings(security, weight)
# algorithm.Log(f"Successfully set all holdings")
import pandas as pd
def normalise(series, equal_ls=True):
if equal_ls:
series -= series.mean()
sum = series.abs().sum()
return series.apply(lambda x: x/sum)
class ValueAlphaModel():
def __init__(self):
pass
def GenerateAlphaScores(self, algorithm, securities):
# algorithm.Log(f"Generating alpha scores for {len(securities)} securities...")
fcf_y = pd.DataFrame.from_records(
[
{
'symbol': str(security.Symbol),
'fcf_y': security.ValuationRatios.CashReturn
} for security in securities
]).set_index('symbol')
fcf_y['alpha_score'] = normalise(fcf_y['fcf_y'], True)
return fcf_y
from universe_selection import FactorUniverseSelectionModel
from alpha_model import ValueAlphaModel
from portfolio_construction import OptimisationPortfolioConstructionModel
from execution import Execution
from charting import InitCharts, PlotPerformanceChart, PlotPosConcentrationChart, PlotStockCountChart, PlotExposureChart
class TradingBot(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2003, 1, 1)
self.SetCash(100000)
# Data resolution
self.UniverseSettings.Resolution = Resolution.Minute
# Universe selection model
self.securities = []
self.CustomUniverseSelectionModel = FactorUniverseSelectionModel(self)
self.AddUniverse(self.CustomUniverseSelectionModel.SelectCoarse, self.CustomUniverseSelectionModel.SelectFine)
# Alpha model
self.CustomAlphaModel = ValueAlphaModel()
# Portfolio construction model
self.CustomPortfolioConstructionModel = OptimisationPortfolioConstructionModel(turnover=0.05, max_wt=0.05, longshort=True)
# Execution model
self.CustomExecution = Execution(liq_tol=0.005)
# Add SPY for trading days data
self.AddEquity('SPY', Resolution.Daily)
# Schedule rebalancing
self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.At(13, 0), Action(self.RebalancePortfolio))
# Init charting
InitCharts(self)
# Schedule charting
self.Schedule.On(self.DateRules.Every(DayOfWeek.Friday), self.TimeRules.BeforeMarketClose('SPY', 0), Action(self.PlotCharts))
def OnData(self, data):
pass
def RebalancePortfolio(self):
alpha_df = self.CustomAlphaModel.GenerateAlphaScores(self, self.securities)
portfolio = self.CustomPortfolioConstructionModel.GenerateOptimalPortfolio(self, alpha_df)
self.CustomExecution.ExecutePortfolio(self, portfolio)
def PlotCharts(self):
PlotPerformanceChart(self)
PlotPosConcentrationChart(self)
PlotStockCountChart(self)
PlotExposureChart(self)
def InitCharts(algorithm):
performance_plot = Chart('Performance Breakdown')
performance_plot.AddSeries(Series('Total Fees', SeriesType.Line, 0))
performance_plot.AddSeries(Series('Total Gross Profit', SeriesType.Line, 0))
algorithm.AddChart(performance_plot)
concentration_plot = Chart('Position Concentration')
concentration_plot.AddSeries(Series('Largest Long Position', SeriesType.Line, 0))
concentration_plot.AddSeries(Series('Largest Short Position', SeriesType.Line, 0))
concentration_plot.AddSeries(Series('Smallest Long Position', SeriesType.Line, 0))
concentration_plot.AddSeries(Series('Smallest Short Position', SeriesType.Line, 0))
algorithm.AddChart(concentration_plot)
stock_count_plot = Chart('Stock Count')
stock_count_plot.AddSeries(Series('Long', SeriesType.Line, 0))
stock_count_plot.AddSeries(Series('Short', SeriesType.Line, 0))
algorithm.AddChart(stock_count_plot)
exposure_plot = Chart('Exposure/Leverage')
exposure_plot.AddSeries(Series('Gross', SeriesType.Line, 0))
exposure_plot.AddSeries(Series('Net', SeriesType.Line, 0))
algorithm.AddChart(exposure_plot)
def PlotPerformanceChart(algorithm):
algorithm.Plot('Performance Breakdown', 'Total Fees', algorithm.Portfolio.TotalFees)
algorithm.Plot('Performance Breakdown', 'Total Gross Profit', algorithm.Portfolio.TotalProfit)
def PlotPosConcentrationChart(algorithm):
long_max_val = 0
short_max_val = 0
long_min_val = 999999999
short_min_val = 999999999
for security, v in algorithm.Portfolio.items():
if v.Invested:
val = v.AbsoluteHoldingsValue
if v.IsLong:
if val > long_max_val:
long_max_val = val
if val < long_min_val:
long_min_val = val
elif v.IsShort:
if val > short_max_val:
short_max_val = val
if val < short_min_val:
short_min_val = val
total_holdings = algorithm.Portfolio.TotalHoldingsValue
long_max_pos = long_max_val / total_holdings
short_max_pos = short_max_val / total_holdings
long_min_pos = long_min_val / total_holdings
short_min_pos = short_min_val / total_holdings
algorithm.Plot('Position Concentration', 'Largest Long Position', long_max_pos)
algorithm.Plot('Position Concentration', 'Largest Short Position', short_max_pos)
algorithm.Plot('Position Concentration', 'Smallest Long Position', long_min_pos)
algorithm.Plot('Position Concentration', 'Smallest Short Position', short_min_pos)
def PlotStockCountChart(algorithm):
long_count = 0
short_count = 0
for security, v in algorithm.Portfolio.items():
if v.Invested:
val = v.AbsoluteHoldingsValue
if v.IsLong:
long_count += 1
elif v.IsShort:
short_count += 1
algorithm.Plot('Stock Count', 'Long', long_count)
algorithm.Plot('Stock Count', 'Short', short_count)
def PlotExposureChart(algorithm):
long_val = 0
short_val = 0
for security, v in algorithm.Portfolio.items():
if v.Invested:
val = v.AbsoluteHoldingsValue
if v.IsLong:
long_val += val
elif v.IsShort:
short_val += val
total_equity = algorithm.Portfolio.TotalPortfolioValue
gross = (long_val + short_val) / total_equity
net = (long_val - short_val) / total_equity
algorithm.Plot('Exposure/Leverage', 'Gross', gross)
algorithm.Plot('Exposure/Leverage', 'Net', net)