| Overall Statistics |
|
Total Trades 313 Average Win 0.01% Average Loss -0.01% Compounding Annual Return 52.949% Drawdown 1.100% Expectancy -0.190 Net Profit 7.736% Sharpe Ratio 6.211 Probabilistic Sharpe Ratio 99.277% Loss Rate 62% Win Rate 38% Profit-Loss Ratio 1.13 Alpha 0.091 Beta 0.983 Annual Standard Deviation 0.075 Annual Variance 0.006 Information Ratio 1.774 Tracking Error 0.048 Treynor Ratio 0.474 Total Fees $319.86 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset ORCL R735QTJ8XC9X |
from datetime import timedelta
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
class SectorBalancedPortfolioConstruction(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2016, 12, 28)
self.SetEndDate(2017, 3, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Hour
self.SetUniverseSelection(MyUniverseSelectionModel())
self.SetAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(1), 0.025, None))
self.SetPortfolioConstruction(MySectorWeightingPortfolioConstructionModel(Resolution.Daily))
self.SetExecution(ImmediateExecutionModel())
class MyUniverseSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self):
super().__init__(True, None)
def SelectCoarse(self, algorithm, coarse):
filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 0]
sortedByDollarVolume = sorted(filtered, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sortedByDollarVolume][:100]
def SelectFine(self, algorithm, fine):
filtered = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Technology]
self.technology = sorted(filtered, key=lambda f: f.MarketCap, reverse=True)[:3]
filtered = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.FinancialServices]
self.financialServices = sorted(filtered, key=lambda f: f.MarketCap, reverse=True)[:2]
filtered = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.ConsumerDefensive]
self.consumerDefensive = sorted(filtered, key=lambda f: f.MarketCap, reverse=True)[:1]
return [x.Symbol for x in self.technology + self.financialServices + self.consumerDefensive]
class MySectorWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, rebalance = Resolution.Daily):
super().__init__(rebalance)
self.symbolBySectorCode = dict()
self.result = dict()
def DetermineTargetPercent(self, activeInsights):
#1. Set the self.sectorBuyingPower before by dividing one by the length of self.symbolBySectorCode
self.sectorBuyingPower = 1/len(self.symbolBySectorCode)
for sector, symbols in self.symbolBySectorCode.items():
#2. Search for the active insights in this sector. Save the variable self.insightsInSector
self.insightsInSector = [insight for insight in activeInsights if insight.Symbol in symbols]
#3. Divide the self.sectorBuyingPower by the length of self.insightsInSector to calculate the variable percent
# The percent is the weight we'll assign the direction of the insight
self.percent = self.sectorBuyingPower / len(self.insightsInSector)
#4. For each insight in self.insightsInSector, assign each insight an allocation.
# The allocation is calculated by multiplying the insight direction by the self.percent
for insight in self.insightsInSector:
self.result[insight] = insight.Direction * self.percent
return self.result
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
sectorCode = security.Fundamentals.AssetClassification.MorningstarSectorCode
if sectorCode not in self.symbolBySectorCode:
self.symbolBySectorCode[sectorCode] = list()
self.symbolBySectorCode[sectorCode].append(security.Symbol)
for security in changes.RemovedSecurities:
sectorCode = security.Fundamentals.AssetClassification.MorningstarSectorCode
if sectorCode in self.symbolBySectorCode:
symbol = security.Symbol
if symbol in self.symbolBySectorCode[sectorCode]:
self.symbolBySectorCode[sectorCode].remove(symbol)
super().OnSecuritiesChanged(algorithm, changes)
from AlgorithmImports import *
from QuantConnect.Data.Custom.Tiingo import *
from datetime import datetime, timedelta
import numpy as np
class TiingoNewsSentimentAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2016, 11, 1)
self.SetEndDate(2017, 3, 1)
symbols = [Symbol.Create("AAPL", SecurityType.Equity, Market.USA),
Symbol.Create("NKE", SecurityType.Equity, Market.USA)]
self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))
self.SetAlpha(NewsSentimentAlphaModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetExecution(ImmediateExecutionModel())
self.SetRiskManagement(NullRiskManagementModel())
class NewsData():
def __init__(self, symbol):
self.Symbol = symbol
self.Window = RollingWindow[float](100)
class NewsSentimentAlphaModel(AlphaModel):
def __init__(self):
self.newsData = {}
self.wordScores = {
"bad": -0.5, "good": 0.5, "negative": -0.5,
"great": 0.5, "growth": 0.5, "fail": -0.5,
"failed": -0.5, "success": 0.5, "nailed": 0.5,
"beat": 0.5, "missed": -0.5, "profitable": 0.5,
"beneficial": 0.5, "right": 0.5, "positive": 0.5,
"large":0.5, "attractive": 0.5, "sound": 0.5,
"excellent": 0.5, "wrong": -0.5, "unproductive": -0.5,
"lose": -0.5, "missing": -0.5, "mishandled": -0.5,
"un_lucrative": -0.5, "up": 0.5, "down": -0.5,
"unproductive": -0.5, "poor": -0.5, "wrong": -0.5,
"worthwhile": 0.5, "lucrative": 0.5, "solid": 0.5
}
def Update(self, algorithm, data):
insights = []
news = data.Get(TiingoNews)
for article in news.Values:
words = article.Description.lower().split(" ")
score = sum([self.wordScores[word] for word in words
if word in self.wordScores])
#1. Get the underlying symbol and save to the variable symbol
symbol = article.Symbol.Underlying
#2. Add scores to the rolling window associated with its newsData symbol
self.newsData[symbol].Window.Add(score)
#3. Sum the rolling window scores, save to sentiment
# If sentiment aggregate score for the time period is greater than 5, emit an up insight
sentiment = sum(self.newsData[symbol].Window)
if sentiment > 5:
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Up))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
newsAsset = algorithm.AddData(TiingoNews, symbol)
self.newsData[symbol] = NewsData(newsAsset.Symbol)
for security in changes.RemovedSecurities:
newsData = self.newsData.pop(security.Symbol, None)
if newsData is not None:
algorithm.RemoveSecurity(newsData.Symbol)
from datetime import timedelta
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
class SectorBalancedPortfolioConstruction(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2016, 12, 28)
self.SetEndDate(2017, 3, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Hour
self.SetUniverseSelection(MyUniverseSelectionModel())
self.SetAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(1), 0.025, None))
self.SetPortfolioConstruction(MySectorWeightingPortfolioConstructionModel(Resolution.Daily))
self.SetExecution(ImmediateExecutionModel())
class MyUniverseSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self):
super().__init__(True, None)
def SelectCoarse(self, algorithm, coarse):
filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 0]
sortedByDollarVolume = sorted(filtered, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sortedByDollarVolume][:100]
def SelectFine(self, algorithm, fine):
filtered = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Technology]
self.technology = sorted(filtered, key=lambda f: f.MarketCap, reverse=True)[:3]
filtered = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.FinancialServices]
self.financialServices = sorted(filtered, key=lambda f: f.MarketCap, reverse=True)[:2]
filtered = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.ConsumerDefensive]
self.consumerDefensive = sorted(filtered, key=lambda f: f.MarketCap, reverse=True)[:1]
return [x.Symbol for x in self.technology + self.financialServices + self.consumerDefensive]
class MySectorWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, rebalance = Resolution.Daily):
super().__init__(rebalance)
self.symbolBySectorCode = dict()
self.result = dict()
def DetermineTargetPercent(self, activeInsights):
#1. Set the self.sectorBuyingPower before by dividing one by the length of self.symbolBySectorCode
self.sectorBuyingPower = 1/len(self.symbolBySectorCode)
for sector, symbols in self.symbolBySectorCode.items():
#2. Search for the active insights in this sector. Save the variable self.insightsInSector
self.insightsInSector = [insight for insight in activeInsights if insight.Symbol in symbols]
#3. Divide the self.sectorBuyingPower by the length of self.insightsInSector to calculate the variable percent
# The percent is the weight we'll assign the direction of the insight
self.percent = self.sectorBuyingPower / len(self.insightsInSector)
#4. For each insight in self.insightsInSector, assign each insight an allocation.
# The allocation is calculated by multiplying the insight direction by the self.percent
for insight in self.insightsInSector:
self.result[insight] = insight.Direction * self.percent
return self.result
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
sectorCode = security.Fundamentals.AssetClassification.MorningstarSectorCode
if sectorCode not in self.symbolBySectorCode:
self.symbolBySectorCode[sectorCode] = list()
self.symbolBySectorCode[sectorCode].append(security.Symbol)
for security in changes.RemovedSecurities:
sectorCode = security.Fundamentals.AssetClassification.MorningstarSectorCode
if sectorCode in self.symbolBySectorCode:
symbol = security.Symbol
if symbol in self.symbolBySectorCode[sectorCode]:
self.symbolBySectorCode[sectorCode].remove(symbol)
super().OnSecuritiesChanged(algorithm, changes)
from datetime import timedelta
from AlgorithmImports import *
"""
[The QC Algorithm Framework is the foundation for building a robust and flexible investment strategy.
The framework architecture makes it simple to reuse code and provides the scaffolding for good algorithm design.]
* Universe Selection Model
* Alpha Model
* Portfolio Construction Model
* Execution model
"""
class SMAPairsTrading(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 7, 1)
self.SetEndDate(2019, 3, 31)
self.SetCash(100000)
symbols = [Symbol.Create("PEP", SecurityType.Equity, Market.USA), Symbol.Create("KO", SecurityType.Equity, Market.USA)]
self.AddUniverseSelection(ManualUniverseSelectionModel(symbols))
self.UniverseSettings.Resolution = Resolution.Hour
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
self.AddAlpha(PairsTradingAlphaModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetExecution(ImmediateExecutionModel())
def OnEndOfDay(self, symbol):
self.Log("Taking a position of " + str(self.Portfolio[symbol].Quantity) + " units of symbol " + str(symbol))
class PairsTradingAlphaModel(AlphaModel):
def __init__(self):
self.pair = [ ]
self.spreadMean = SimpleMovingAverage(500)
self.spreadStd = StandardDeviation(500)
self.period = timedelta(hours=2)
def Update(self, algorithm, data):
spread = self.pair[1].Price - self.pair[0].Price
self.spreadMean.Update(algorithm.Time, spread)
self.spreadStd.Update(algorithm.Time, spread)
upperthreshold = self.spreadMean.Current.Value + self.spreadStd.Current.Value
lowerthreshold = self.spreadMean.Current.Value - self.spreadStd.Current.Value
if spread > upperthreshold:
return Insight.Group(
[
Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Up),
Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Down)
])
if spread < lowerthreshold:
return Insight.Group(
[
Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Down),
Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Up)
])
return []
def OnSecuritiesChanged(self, algorithm, changes):
self.pair = [x for x in changes.AddedSecurities]
#1. Call for 500 bars of history data for each symbol in the pair and save to the variable history
history = algorithm.History([x.Symbol for x in self.pair], 500)
#2. Unstack the Pandas data frame to reduce it to the history close price
history = history.close.unstack(level=0)
#3. Iterate through the history tuple and update the mean and standard deviation with historical data
for tuple in history.itertuples():
self.spreadMean.Update(tuple[0], tuple[2]-tuple[1])
self.spreadStd.Update(tuple[0], tuple[2]-tuple[1])
from datetime import timedelta
from AlgorithmImports import *
"""
[The QC Algorithm Framework is the foundation for building a robust and flexible investment strategy.
The framework architecture makes it simple to reuse code and provides the scaffolding for good algorithm design.]
* Universe Selection Model
* Alpha Model
* Portfolio Construction Model
* Execution model
"""
class MOMAlphaModel(AlphaModel):
def __init__(self):
self.mom = []
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
self.mom.append({"symbol":symbol, "indicator":algorithm.MOM(symbol, 14, Resolution.Daily)})
def Update(self, algorithm, data):
ordered = sorted(self.mom, key=lambda kv: kv["indicator"].Current.Value, reverse=True)
return Insight.Group([Insight.Price(ordered[0]['symbol'], timedelta(1), InsightDirection.Up), Insight.Price(ordered[1]['symbol'], timedelta(1), InsightDirection.Flat) ])
class FrameworkAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2013, 10, 1)
self.SetEndDate(2013, 12, 1)
self.SetCash(100000)
symbols = [Symbol.Create("SPY", SecurityType.Equity, Market.USA), Symbol.Create("BND", SecurityType.Equity, Market.USA) ]
self.UniverseSettings.Resolution = Resolution.Daily
self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))
self.SetAlpha(MOMAlphaModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.02))
self.SetExecution(ImmediateExecutionModel())
from AlgorithmImports import *
class EMAMomentumUniverse(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 7)
self.SetEndDate(2019, 4, 1)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction)
self.averages = { }
def CoarseSelectionFunction(self, universe):
selected = []
universe = sorted(universe, key=lambda c: c.DollarVolume, reverse=True)
universe = [c for c in universe if c.Price > 10][:100]
for coarse in universe:
symbol = coarse.Symbol
if symbol not in self.averages:
# 1. Call history to get an array of 200 days of history data
history = self.History(symbol, 200, Resolution.Daily)
#2. Adjust SelectionData to pass in the history result
self.averages[symbol] = SelectionData(history)
self.averages[symbol].update(self.Time, coarse.AdjustedPrice)
if self.averages[symbol].is_ready() and self.averages[symbol].fast > self.averages[symbol].slow:
selected.append(symbol)
return selected[:10]
def OnSecuritiesChanged(self, changes):
for security in changes.RemovedSecurities:
self.Liquidate(security.Symbol)
for security in changes.AddedSecurities:
self.SetHoldings(security.Symbol, 0.10)
class SelectionData():
#3. Update the constructor to accept a history array
def __init__(self, history):
self.slow = ExponentialMovingAverage(200)
self.fast = ExponentialMovingAverage(50)
#4. Loop over the history data and update the indicators
for bar in history.itertuples():
self.fast.Update(bar.Index[1], bar.close)
self.slow.Update(bar.Index[1], bar.close)
def is_ready(self):
return self.slow.IsReady and self.fast.IsReady
def update(self, time, price):
self.fast.Update(time, price)
self.slow.Update(time, price)
from AlgorithmImports import *
""""
Buy and Hold with a Trailing Stop Setting up a Stop Market Order
"""
class BootCampTask(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 12, 1) # Set Start Date
self.SetEndDate(2019, 4, 1) # Set End Date
self.SetCash(100000) # Set Strategy Cash
#1. Subscribe to SPY in raw mode
spy = self.AddEquity("SPY", Resolution.Daily)
spy.SetDataNormalizationMode(DataNormalizationMode.Raw)
def OnData(self, data):
if not self.Portfolio.Invested:
#2. Create market order to buy 500 units of SPY
self.MarketOrder("SPY", 500)
#3. Create a stop market order to sell 500 units at 90% of the SPY current price
self.StopMarketOrder("SPY", -500, 0.90*self.Securities["SPY"].Close)from AlgorithmImports import *
from datetime import timedelta
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class LiquidValueStocks(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017, 5, 15)
self.SetEndDate(2017, 7, 15)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Hour
self.AddUniverseSelection(LiquidValueUniverseSelectionModel())
self.AddAlpha(LongShortEYAlphaModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.02))
self.SetExecution(ImmediateExecutionModel())
class LiquidValueUniverseSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self):
super().__init__(True, None)
self.lastMonth = -1
def SelectCoarse(self, algorithm, coarse):
if self.lastMonth == algorithm.Time.month:
return Universe.Unchanged
self.lastMonth = algorithm.Time.month
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData],
key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sortedByDollarVolume[:100]]
def SelectFine(self, algorithm, fine):
sortedByYields = sorted(fine, key=lambda f: f.ValuationRatios.EarningYield, reverse=True)
universe = sortedByYields[:10] + sortedByYields[-10:]
return [f.Symbol for f in universe]
# Define the LongShortAlphaModel class
class LongShortEYAlphaModel(AlphaModel):
def __init__(self):
self.lastMonth = -1
def Update(self, algorithm, data):
insights = []
if self.lastMonth == algorithm.Time.month:
return insights
self.lastMonth = algorithm.Time.month
# For loop to emit insights with insight directions
# based on whether earnings yield is greater or less than zero once a month
for security in algorithm.ActiveSecurities.Values:
direction = 1 if security.Fundamentals.ValuationRatios.EarningYield > 0 else -1
insights.append(Insight.Price(security.Symbol, timedelta(28), direction))
return insightsfrom datetime import timedelta
from AlgorithmImports import *
class P03(QCAlgorithm):
''''
[Opening Range Breakout]
Opening range breakout uses a defined period of time to set a price-range,
and trades on leaving that range. To achieve this we will start by consolidating the first 30 minutes of data.
Close < Bar(30m).low, SHORT
Close > Bar(30m).high, LOONG
Liquidate @ 13:30
'''
openingBar = None
def Initialize(self):
self.SetStartDate(2018, 7, 10) # Set Start Date
self.SetEndDate(2019, 6, 30) # Set End Date
self.SetCash(100000) # Set Strategy Cash
# Subscribe to TSLA with Minute Resolution
self.AddEquity("TSLA", Resolution.Minute)
# Create our consolidator with a timedelta of 30 min
self.Consolidate("TSLA", timedelta(minutes=30), self.OnDataConsolidated)
# Create a scheduled event triggered at 13:30 calling the ClosePositions function
self.Schedule.On(self.DateRules.EveryDay("TSLA"), self.TimeRules.At(13,30), self.ClosePositions)
def OnData(self, data):
if self.Portfolio.Invested or self.openingBar==None:
pass
else:
if data["TSLA"].Close < self.openingBar.Low:
self.SetHoldings("TSLA", -1)
if data["TSLA"].Close > self.openingBar.High:
self.SetHoldings("TSLA", 1)
#2. Create a function OnDataConsolidator which saves the currentBar as bar
def OnDataConsolidated(self, bar):
# Check the time, we only want to work with the first 30min after Market Open
# Save one bar as openingBar
if bar.Time.hour == 9 and bar.Time.minute == 30:
self.openingBar = bar
def ClosePositions(self):
self.openingBar = None
self.Liquidate("TSLA")from datetime import datetime
from AlgorithmImports import *
class P01(QCAlgorithm):
# Order ticket for our stop order, Datetime when stop order was last hit
stopMarketTicket = None
stopMarketOrderFillTime = datetime.min
highestSPYPrice = 0
def Initialize(self):
self.SetStartDate(2010, 12, 1)
self.SetEndDate(2020, 12, 10)
self.SetCash(100000)
spy = self.AddEquity("SPY", Resolution.Daily)
spy.SetDataNormalizationMode(DataNormalizationMode.Raw)
def OnData(self, data):
# self.Log(self.Securities["SPY"].Close)
# Plot the current SPY price to "Data Chart" on series "Asset Price"
self.Plot("Data Chart", "Asset Price", data["SPY"].Close)
# at least hold for 15 days
if (self.Time - self.stopMarketOrderFillTime).days < 15:
return
if not self.Portfolio.Invested:
self.MarketOrder("SPY", 500)
# 当市场价格下降到【买入价格】的90%时,以市场价格卖出
self.stopMarketTicket = self.StopMarketOrder("SPY", -500, round(0.9 * self.Securities["SPY"].Close, 2))
else:
# Plot the moving stop price on "Data Chart" with "Stop Price" series name
self.Plot("Data Chart", "Stop Price", self.stopMarketTicket.Get(OrderField.StopPrice))
# Check if the SPY price is higher that highestSPYPrice.
if self.Securities["SPY"].Close > self.highestSPYPrice:
#2. Save the new high to highestSPYPrice; then update the stop price to 90% of highestSPYPrice
self.highestSPYPrice = self.Securities["SPY"].Close
updateFields = UpdateOrderFields()
updateFields.StopPrice = round(self.highestSPYPrice * 0.9, 2)
self.stopMarketTicket.Update(updateFields)
#3. Print the new stop price with Debug()
self.Debug("SPY: " + str(self.highestSPYPrice) + " Stop: " + str(updateFields.StopPrice))
def OnOrderEvent(self, orderEvent):
if orderEvent.Status != OrderStatus.Filled:
return
if self.stopMarketTicket is not None and self.stopMarketTicket.OrderId == orderEvent.OrderId:
self.stopMarketOrderFillTime = self.Timefrom AlgorithmImports import *
''''
[Setting Up a Coarse Universe Filter]
1. Sort descending by daily dollar volume
2. Select only Symbols with a price of more than $10 per share
3. Return the 8 most liquid Symbols from the filteredByPrice list
'''
class P04(QCAlgorithm):
filteredByPrice = None
def Initialize(self):
self.SetStartDate(2019, 1, 11)
self.SetEndDate(2019, 7, 1)
self.SetCash(100000)
# Add a Universe model using Coarse Fundamental Data and set the filter function
self.AddUniverse(self.CoarseSelectionFilter)
self.UniverseSettings.Resolution = Resolution.Daily
self.UniverseSettings.Leverage = 2
def CoarseSelectionFilter(self, coarse):
# Sort descending by daily dollar volume
sortedByDollarVolume = sorted(coarse, key=lambda c: c.DollarVolume, reverse=True)
# Select only Symbols with a price of more than $10 per share
self.filteredByPrice = [c.Symbol for c in sortedByDollarVolume if c.Price > 10]
# Return the 8 most liquid Symbols from the filteredByPrice list
self.filteredByPrice = self.filteredByPrice[:8]
return self.filteredByPrice
# Monitoring Universe Changes
def OnSecuritiesChanged(self, changes):
# Save securities changed as self.changes
self.changes = changes
# Log the changes in the function
self.Log(f"OnSecuritiesChanged({self.Time}):: {changes}")
# Liquidate removed securities
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol)
# We want 10% allocation in each security in our universe
for security in changes.AddedSecurities:
self.SetHoldings(security.Symbol, 0.1)from AlgorithmImports import *
""""
Fading The Gap
The difference between the close price of the previous day
and the opening price of the current day is referred to as a gap.
Fading the gap is a strategy that monitors for a large gap down and buys stock assuming it will rebound.
"""
class FadingTheGap(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2021, 7, 1)
self.SetCash(100000)
self.tsla = self.AddEquity ("TSLA", Resolution.Minute).Symbol
# Create a scheduled event to run every day at "0 minutes" before
# TSLA market close that calls the method ClosingBar
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.BeforeMarketClose(self.tsla, 0),
self.ClosingBar)
# Create a scheduled event to run every day at 1 minute after
# TSLA market open that calls the method OpeningBar
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen(self.tsla, 1),
self.OpeningBar)
# Create a scheduled event to run every day at 45 minutes after
# TSLA market open that calls the method ClosePositions
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen(self.tsla, 45),
self.ClosePositions)
# Save a RollingWindow with type TradeBar and length of 2 as self.window
self.window = RollingWindow[TradeBar](2)
# calculating TSLA volatility for a 60 minutes
self.volatility = StandardDeviation(self.tsla, 60)
def ClosingBar(self):
self.window.Add(self.CurrentSlice[self.tsla])
def OpeningBar(self):
if "TSLA"in self.CurrentSlice.Bars:
self.window.Add(self.CurrentSlice[self.tsla])
if not self.window.IsReady or not self.volatility.IsReady:
return
# Calculate the change in over night price
delta = self.window[0].Value - self.window[1].Value
self.deviations = delta / self.volatility.Current.Value
# SetHoldings() to 100% TSLA if deviations is less than -3 standard deviations (i.e. when we are 99% confident this gap is an anomaly).
if self.deviations < -3 :
self.SetHoldings(self.tsla, 1)
self.Debug("Open: " + str(self.window[0].Value) + "LastClose: " + str(self.window[1].Value))
# Liquidate our position to limit our exposure and keep our holding period short
def ClosePositions(self):
self.Liquidate()
def OnData(self, data):
# self.Debug("Call Event: " + str(self.CurrentSlice[self.tsla].Price))
if data[self.tsla] is not None:
self.volatility.Update(self.Time, data[self.tsla].Close)
from AlgorithmImports import *
''''
[Momentum-Based Tactical Allocation]
Momentum Percentage Indicator: computes the n-period percent change in the security
If SPY has more upward momentum than BND, then we liquidate our holdings in BND and
allocate 100% of our equity to SPY
'''
class P02(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2007, 8, 1) # Set Start Date
self.SetEndDate(2010, 8, 1) # Set End Date
#1. Subscribe to SPY -- S&P 500 Index ETF -- using daily resolution
self.spy = self.AddEquity("SPY", Resolution.Daily)
#2. Subscribe to BND -- Vanguard Total Bond Market ETF -- using daily resolution
self.bnd = self.AddEquity("BND", Resolution.Daily)
#3. Set strategy cash to $3000
self.SetCash(3000)
# Add 50-day Momentum Percent indicator:
# Momentum Percentage Indicator: computes the n-period percent change in the security
self.spyMomentum = self.MOMP("SPY", 50, Resolution.Daily)
self.bondMomentum = self.MOMP("BND", 50, Resolution.Daily)
# Set Benchmark
self.SetBenchmark("SPY")
self.SetWarmUp(50)
def OnData(self, data):
if self.IsWarmingUp:
return
if not self.Time.weekday() == 1:
return
# If SPY has more upward momentum than BND, then we liquidate our holdings in BND and allocate 100% of our equity to SPY
if self.spyMomentum.Current.Value > self.bondMomentum.Current.Value:
self.Liquidate("BND")
self.SetHoldings("SPY", 1) # Allocate 100% to SPY
else:
self.Liquidate("SPY")
self.SetHoldings("BND", 1)