| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 3.413 Tracking Error 0.191 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from QuantConnect.Logging import *
from enum import Enum
class RsiAlphaModel(AlphaModel):
Name = "RsiAlphaModel"
'''Uses Wilder's RSI to create insights.
Using default settings, a cross over below 30 or above 70 will trigger a new insight.'''
def __init__(self,
period = 14,
resolution = Resolution.Minute):
'''Initializes a new instance of the RsiAlphaModel class
Args:
period: The RSI indicator period'''
self.period = period
self.resolution = resolution
self.insightPeriod = Time.Multiply(Extensions.ToTimeSpan(resolution), period)
self.symbolDataBySymbol ={}
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = '{}({},{})'.format(self.__class__.__name__, period, resolutionString)
self.Log(f'Alpha: initialize()')
def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
'''Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
self.algorithm.Log(f'Alpha: update()')
insights = []
for symbol, symbolData in self.symbolDataBySymbol.items():
rsi = symbolData.RSI
previous_state = symbolData.State
state = self.GetState(rsi, previous_state)
if state != previous_state and rsi.IsReady:
if state == State.TrippedLow:
insights.append(Insight.Price(symbol, self.insightPeriod, InsightDirection.Up))
if state == State.TrippedHigh:
insights.append(Insight.Price(symbol, self.insightPeriod, InsightDirection.Down))
symbolData.State = state
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Cleans out old security data and initializes the RSI for any newly added securities.
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# clean up data for removed securities
symbols = [ x.Symbol for x in changes.RemovedSecurities ]
if len(symbols) > 0:
for subscription in algorithm.SubscriptionManager.Subscriptions:
if subscription.Symbol in symbols:
self.symbolDataBySymbol.pop(subscription.Symbol, None)
subscription.Consolidators.Clear()
# initialize data for added securities
addedSymbols = [ x.Symbol for x in changes.AddedSecurities if x.Symbol not in self.symbolDataBySymbol]
if len(addedSymbols) == 0: return
history = algorithm.History(addedSymbols, self.period, self.resolution)
for symbol in addedSymbols:
rsi = algorithm.RSI(symbol, self.period, MovingAverageType.Wilders, self.resolution)
if not history.empty:
ticker = SymbolCache.GetTicker(symbol)
if ticker not in history.index.levels[0]:
Log.Trace(f'RsiAlphaModel.OnSecuritiesChanged: {ticker} not found in history data frame.')
continue
for tuple in history.loc[ticker].itertuples():
rsi.Update(tuple.Index, tuple.close)
self.symbolDataBySymbol[symbol] = SymbolData(symbol, rsi)
self.algorithm.Log(f'Alpha: OSC Sym,RSI = {symbol},{rsi}')
def GetState(self, rsi, previous):
''' Determines the new state. This is basically cross-over detection logic that
includes considerations for bouncing using the configured bounce tolerance.'''
if rsi.Current.Value > 70:
return State.TrippedHigh
if rsi.Current.Value < 30:
return State.TrippedLow
if previous == State.TrippedLow:
if rsi.Current.Value > 35:
return State.Middle
if previous == State.TrippedHigh:
if rsi.Current.Value < 65:
return State.Middle
return previous
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, symbol, rsi):
self.Symbol = symbol
self.RSI = rsi
self.State = State.Middle
class State(Enum):
'''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
TrippedLow = 0
Middle = 1
TrippedHigh = 2# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
class SpreadExecutionModel(ExecutionModel):
'''Execution model that submits orders while the current spread is tight.
Note this execution model will not work using Resolution.Daily since Exchange.ExchangeOpen will be false, suggested resolution is Minute
'''
def __init__(self, acceptingSpreadPercent=0.005):
'''Initializes a new instance of the SpreadExecutionModel class'''
self.targetsCollection = PortfolioTargetCollection()
# Gets or sets the maximum spread compare to current price in percentage.
self.acceptingSpreadPercent = Math.Abs(acceptingSpreadPercent)
def Execute(self, algorithm, targets):
'''Executes market orders if the spread percentage to price is in desirable range.
Args:
algorithm: The algorithm instance
targets: The portfolio targets'''
# update the complete set of portfolio targets with the new targets
self.targetsCollection.AddRange(targets)
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
if self.targetsCollection.Count > 0:
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
symbol = target.Symbol
# calculate remaining quantity to be ordered
unorderedQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target)
# check order entry conditions
if unorderedQuantity != 0:
# get security information
security = algorithm.Securities[symbol]
if self.SpreadIsFavorable(security):
algorithm.MarketOrder(symbol, unorderedQuantity)
self.targetsCollection.ClearFulfilled(algorithm)
def SpreadIsFavorable(self, security):
'''Determines if the spread is in desirable range.'''
# Price has to be larger than zero to avoid zero division error, or negative price causing the spread percentage < 0 by error
# Has to be in opening hours of exchange to avoid extreme spread in OTC period
return security.Exchange.ExchangeOpen \
and security.Price > 0 and security.AskPrice > 0 and security.BidPrice > 0 \
and (security.AskPrice - security.BidPrice) / security.Price <= self.acceptingSpreadPercent#region imports
from AlgorithmImports import *
from clr import AddReference
from clr import GetClrType as typeof
AddReference("System.Core")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import QCAlgorithm
from QuantConnect import Symbol
from QuantConnect.Data.UniverseSelection import *
from QuantConnect.Securities.Option import OptionPriceModels
from datetime import datetime, timedelta
from io import StringIO
import pandas as pd
import math
from Selection.UniverseSelectionModel import UniverseSelectionModel
#endregion
class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
def __init__(self, brokerage_model, security_seeder, algorithm):
super().__init__(brokerage_model, security_seeder)
self.algorithm = algorithm
self.period = self.algorithm.period + 1
def Initialize(self, security):
if security.Type == SecurityType.Base:
return
if security.Type == SecurityType.Equity:
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
security.VolatilityModel = StandardDeviationOfReturnsVolatilityModel(self.period)
history = self.algorithm.History(security.Symbol, self.period, Resolution.Daily)
#self.algorithm.Log(f'{self.algorithm.Time}| CSI: {security} | {security.Type}')
for index, row in history.iterrows():
trade_bar = TradeBar(index[1], security.Symbol, row.open, row.high, row.low, row.close, row.volume)
security.VolatilityModel.Update(security, trade_bar)
if security.Type == SecurityType.Option:
security.PriceModel = OptionPriceModels.CrankNicolsonFD()
self.algorithm.security = security
# The superclass method will get the last know prices
super().Initialize(security)
# Let's check out the prices
message = f'{self.algorithm.Time} :: Security Initializer :: Symbol: {security.Symbol.Value} | Price: {security.Price} | Ask: {security.AskPrice} | Bid: {security.BidPrice}'
if self.algorithm.LiveMode:
self.algorithm.Log(message)
self.algorithm.Log(f'{self.algorithm.Time}| CSI: {security} | {security.Type}')
#alertedOptions went here
#OnSecuritiesChanged went here
class OptionAlerts(PythonData):
def __init__(self):
self.count = 0
self.last_date = datetime.min
def GetSource(self, config, date, isLiveMode):
url = "https://files.mailparser.io/d/lztyzgbf?dl=1" if isLiveMode else \
"https://files.mailparser.io/d/ofsoxaee?dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not isLiveMode:
# backtest gets data from csv file in Mailparser
if not line.startswith('"Message ID"'):
csv = line.split(';')
alert = OptionAlerts()
alert.Time = datetime.strptime(csv[11], '"%m-%d-%Y %H:%M:%S"')
alert.EndTime = alert.Time + timedelta(minutes=1)
alert["ID"] = str(csv[0])
alert["Size"] = int(csv[3])
alert["Ticker"] = str(csv[4])
alert["Strike"] = float(csv[5])
alert["Right"] = str(csv[6])
alert["Expiry"] = datetime.strptime(csv[7], "%Y-%m-%d")
alert["TradePrice"] = float(csv[8])
alert["Premium"] = float(csv[9])
alert["Side"] = str(csv[10])
if alert["Right"] == str("put"):
optionRight = 1
if alert["Right"] == str("call"):
optionRight = 0
underlying = Symbol.Create(alert["Ticker"], SecurityType.Equity, Market.USA)
alert.Symbol = Symbol.CreateOption(underlying, Market.USA, 0, optionRight, alert["Strike"], alert["Expiry"])# alert["Premium"]) using alias to store premium?
return alert
else: pass
if self.last_date != date:
# reset our counter for the new day
self.last_date = date
self.count = 0
if not line.startswith('"Message ID"'):
csv = line.split(';')
alert = OptionAlerts()
alert.Time = datetime.strptime(csv[1], '"%Y-%m-%d %H:%M:%S"')
alert.EndTime = alert.Time + timedelta(minutes=1)
alert["ID"] = str(csv[0])
alert["Size"] = int(csv[3])
alert["Ticker"] = str(csv[4])
alert["Strike"] = float(csv[5])
alert["Right"] = str(csv[6])
alert["Expiry"] = datetime.strptime(csv[7], "%Y-%m-%d")
alert["TradePrice"] = float(csv[8])
alert["Premium"] = float(csv[9])
alert["Side"] = str(csv[10])
if alert["Right"] == str("put"):
optionRight = 1
if alert["Right"] == str("call"):
optionRight = 0
underlying = Symbol.Create(alert["Ticker"], SecurityType.Equity, Market.USA)
alert.Symbol = Symbol.CreateOption(underlying, Market.USA, 0, optionRight, alert["Strike"], alert["Expiry"])#, alert["Premium"]) using alias to store premium?
return alert
else: pass
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from Portfolio.EqualWeightingPortfolioConstructionModel import *
class AccumulativeInsightPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that allocates percent of account
to each insight, defaulting to 3%.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.
By default, no rebalancing shall be done.
Rules:
1. On active Up insight, increase position size by percent
2. On active Down insight, decrease position size by percent
3. On active Flat insight, move by percent towards 0
4. On expired insight, and no other active insight, emits a 0 target'''
def __init__(self, rebalance = None, portfolioBias = PortfolioBias.LongShort, percent = 0.03):
'''Initialize a new instance of AccumulativeInsightPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)
percent: percent of portfolio to allocate to each position'''
super().__init__(rebalance)
self.portfolioBias = portfolioBias
self.percent = abs(percent)
self.sign = lambda x: -1 if x < 0 else (1 if x > 0 else 0)
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
percentPerSymbol = {}
insights = sorted(self.InsightCollection.GetActiveInsights(self.currentUtcTime), key=lambda insight: insight.GeneratedTimeUtc)
for insight in insights:
targetPercent = 0
if insight.Symbol in percentPerSymbol:
targetPercent = percentPerSymbol[insight.Symbol]
if insight.Direction == InsightDirection.Flat:
# We received a Flat
# if adding or subtracting will push past 0, then make it 0
if abs(targetPercent) < self.percent:
targetPercent = 0
else:
# otherwise, we flatten by percent
targetPercent += (-self.percent if targetPercent > 0 else self.percent)
targetPercent += self.percent * insight.Direction
# adjust to respect portfolio bias
if self.portfolioBias != PortfolioBias.LongShort and self.sign(targetPercent) != self.portfolioBias:
targetPercent = 0
percentPerSymbol[insight.Symbol] = targetPercent
return dict((insight, percentPerSymbol[insight.Symbol]) for insight in activeInsights)
def CreateTargets(self, algorithm, insights):
'''Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model'''
self.currentUtcTime = algorithm.UtcTime
return super().CreateTargets(algorithm, insights)# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
class MaximumDrawdownPercentPerSecurity(RiskManagementModel):
'''Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage'''
def __init__(self, maximumDrawdownPercent = 0.1):
'''Initializes a new instance of the MaximumDrawdownPercentPerSecurity class
Args:
maximumDrawdownPercent: The maximum percentage drawdown allowed for any single security holding'''
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
def ManageRisk(self, algorithm, targets):
'''Manages the algorithm's risk at each time step
Args:
algorithm: The algorithm instance
targets: The current portfolio targets to be assessed for risk'''
targets = []
for kvp in algorithm.Securities:
security = kvp.Value
if not security.Invested:
continue
pnl = security.Holdings.UnrealizedProfitPercent
if pnl < self.maximumDrawdownPercent:
# liquidate
targets.append(PortfolioTarget(security.Symbol, 0))
return targets# region imports
from AlphaModelRSI import *
from ExecutionModelSpread import *
from PortfolioModelAccumulative import *
from RiskModelMaxDrawdownPercent import *
from OptionAlertsAndInitializer import *
from AlgorithmImports import *
# endregion
'''NOTES:
Alerts enter universe
TASKS:
[] Start on Alpha
[] Subscribe to underlying at hourly resolution'''
class leanBWcciRSI(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2022, 2, 10) # Set Start Date
self.SetEndDate(2022, 2, 20) # Set End Date
self.SetCash(100000) # Set Strategy Cash
self.SetBrokerageModel(BrokerageName.TradierBrokerage, AccountType.Cash)
#Global variables and functions
self.period = 30
#self.AddEquity("SPY", Resolution.Minute)
# Get up to date prices on alerted options
self.SetSecurityInitializer(CustomSecurityInitializer(
self.BrokerageModel,
FuncSecuritySeeder(self.GetLastKnownPrices),
self))
# add a custom universe data source
self.UniverseSettings.Resolution = Resolution.Minute
self.AddUniverse(OptionAlerts, "universe-option-alerts", Resolution.Minute, self.alertedOptions)
self.AddData(OptionAlerts, "option-alerts", Resolution.Minute)
# Active Alpha Modules
self.AddAlpha(RsiAlphaModel())
self.AddRiskManagement(MaximumDrawdownPercentPerSecurity())
# Portfolio and Execution models need Null unless margin
self.SetPortfolioConstruction(NullPortfolioConstructionModel())
self.SetExecution(NullExecutionModel())
def alertedOptions(self, data):
options = [ alert.Symbol for alert in data ]
underlyings = list(set([x.Underlying for x in options]))
return options + underlyings
def OnSecuritiesChanged(self, changes):
for security in changes.RemovedSecurities:
# delete Alert data from dataframe
self.Log(f'{self.Time}| Removed {security} from universe')
for security in changes.AddedSecurities:
self.SecurityInitializer.Initialize(security)
if security.Symbol.SecurityType == SecurityType.Equity:
self.underlying = security.Symbol.Value
self.Log(f'{self.Time}| Equity added to Universe: {self.underlying}')
if security.Symbol.SecurityType == SecurityType.Option:
self.security = security
self.Log(f'{self.Time}| Option added to Universe: {security}')
def OnData(self, data: Slice):
pass