| Overall Statistics |
|
Total Trades 7 Average Win 1.60% Average Loss -43.21% Compounding Annual Return 0% Drawdown 116.500% Expectancy -0.309 Net Profit -116.531% Sharpe Ratio -0.568 Sortino Ratio -0.14 Probabilistic Sharpe Ratio 0.000% Loss Rate 33% Win Rate 67% Profit-Loss Ratio 0.04 Alpha 0 Beta 0 Annual Standard Deviation 1.644 Annual Variance 2.704 Information Ratio -0.564 Tracking Error 1.644 Treynor Ratio 0 Total Fees $1248.25 Estimated Strategy Capacity $2000.00 Lowest Capacity Asset AMD 3025CPPRRW1C6|AMD R735QTJ8XC9X Portfolio Turnover -1.38% |
# region imports
from AlgorithmImports import *
# endregion
class CreativeBlackMule(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 1, 1)
self.SetEndDate(2015, 12, 1)
self.Balance = 100000
tickers = ["SPY", "QQQ", "IWM", "NVDA", "AAPL", "AMD", "AMZN", "MSFT"]
self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in tickers]
self.SetWarmUp(timedelta(100))
#self.rocp = {symbol: self.ROCP(symbol, 30) for symbol in symbols}
self.EnableAutomaticIndicatorWarmUp = True
self.indicator = {symbol: self.DCH(symbol, 90, 90) for symbol in self.symbols}
self.SetSecurityInitializer(self.customSecurityInitializer)
self.DCH_previous_Up = {symbol: None for symbol in self.symbols}
self.DCH_previous_Down = {symbol: None for symbol in self.symbols}
self.DCH_previous_Middle = {symbol: None for symbol in self.symbols}
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
self.vix = self.AddData(CBOE, "VIX").Symbol
self.IVlvl = 0.5
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 60), self.AfterMarketOpen)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.VIX)
def OnData(self, data: Slice):
if self.IsWarmingUp: return
for symbol in self.symbols:
if symbol in data.Bars:
bar = data.Bars[symbol]
if self.indicator[symbol].IsReady:
if self.DCH_previous_Up[symbol] is not None and self.DCH_previous_Down[symbol] is not None:
if ((bar.Close - self.DCH_previous_Up[symbol]) / (bar.Close) > 0.025) and self.rank > self.IVlvl:
security = self.Securities[symbol]
self.buyoptions(security)
if ((bar.Close - self.DCH_previous_Down[symbol]) / (bar.Close) < -0.01) and self.rank > self.IVlvl:
security = self.Securities[symbol]
self.buyputs(security)
#if bar.Close <= self.DCH_previous_Down[symbol]:
#if self.Portfolio[symbol].IsLong:
#self.SetHoldings(symbol, 0)
#self.SetHoldings(symbol, -0.1)
#if not self.Portfolio[symbol].IsShort:
#self.SetHoldings(symbol, -0.1)
#self.Debug("Shorting")
#if not self.can_short and self.Portfolio[symbol].IsLong:
#self.SetHoldings(symbol, 0)
self.DCH_previous_Up[symbol] = self.indicator[symbol].UpperBand.Current.Value
self.DCH_previous_Down[symbol] = self.indicator[symbol].LowerBand.Current.Value
self.DCH_previous_Middle[symbol] = self.indicator[symbol].Current.Value
# Plot indicator and prices
#self.Plot(f"{symbol}", "Donchian Channel High", self.DCH_previous_Up[symbol])
#self.Plot(f"{symbol}", "High Price", bar.High)
#self.Plot(f"{symbol}", "Close Price", bar.Close)
#self.Plot(f"{symbol}", "Low Price", bar.Low)
#self.Plot(f"{symbol}", "Donchian Channel Low", self.DCH_previous_Down[symbol])
#self.DCH_previous_Up[symbol] = self.indicator[symbol].UpperBand.Current.Value
#self.DCH_previous_Down[symbol] = self.indicator[symbol].LowerBand.Current.Value
#for symbol, symbols in slice.Bars.items():
#if symbol_data not in data.Bars:
#return
#bar = data.Bars[symbol_data]
#if self.DCH_previous_Up is not None and self.DCH_previous_Down is not None:
#if bar.Close > self.DCH_previous_Up and not self.Portfolio[self.symbols].IsLong:
#self.SetHoldings(self.symbols, 1)
#if bar.Close <= self.DCH_previous_Down:
#if self.can_short and not self.Portfolio[self.symbols].IsShort:
#self.SetHoldings(self.symbols, -1)
#if not self.can_short and self.Portfolio[self.symbols].IsLong:
#self.SetHoldings(self.symbols, 0)
#self.DCH_previous_Up = self.indicator.UpperBand.Current.Value
#self.DCH_previous_Down = self.indicator.LowerBand.Current.Value
def AfterMarketOpen(self):
option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option]
if option_invested:
#self.SetRuntimeStatistic("Total Profit", (f"$ {round(self.Portfolio.TotalPortfolioValue - 10000)}"))
for i, symbol in enumerate(option_invested):
if (self.Time + timedelta(10) > option_invested[i].ID.Date):
self.Liquidate(option_invested[i], tag = f"Expiration - Profit/Loss @ {str(self.Portfolio[option_invested[i]].LastTradeProfit)}")
self.Debug (f"| {self.Time} [+]--- Liquidate Call @ {str(self.Portfolio[option_invested[i]].Symbol.Value)} || Stock @ {str(self.Securities[option_invested[i].Underlying.Value].Price)}|| Profit/Loss @ {str(self.Portfolio[option_invested[i]].LastTradeProfit)}")
self.Log (f"| {self.Time} [-]--- REASON: || <{(10)} DTE | {(option_invested[i].ID.Date - self.Time).days} DTE")
def customSecurityInitializer(self, security):
bar = self.GetLastKnownPrice(security)
security.SetMarketPrice(bar)
def buyoptions(self, security):
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
# Filter for out of the money call options and expiring at least 60 days
option_chain = self.OptionChainProvider.GetOptionContractList(security.Symbol, self.Time)
call_options = [option for option in option_chain if option.ID.OptionRight == 0]
otm_call_options = sorted([option for option in call_options if option.ID.StrikePrice > security.Close and (option.ID.Date - self.Time).days >= 30])
if len(otm_call_options) > 0:
self.call = otm_call_options[0]
#subscribe to the option contract, so that we can buy it
option_buy = self.AddOptionContract(self.call, Resolution.Minute)
if option_buy.AskPrice == 0:
self.Debug("No prices")
return
self.SetHoldings(option_buy.Symbol, 0.1)
#self.MarketOrder(option_buy.Symbol, 1)
self.Debug(f" {self.Time} -- -- Bought call option {option_buy} for {option_buy.AskPrice} // Underlying stock @ {option_buy.Underlying.Close} // Limit {option_buy.AskPrice * 1.3} ")
def buyputs(self, security):
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
option_chain = self.OptionChainProvider.GetOptionContractList(security.Symbol, self.Time)
put_options = [option for option in option_chain if option.ID.OptionRight == 1]
# Filter for out of the money call options and expiring at least 60 days
otm_put_options = sorted([option for option in put_options if (option.ID.StrikePrice < security.Close) and (option.ID.Date - self.Time).days >= 30 and (option.ID.Date - self.Time).days < 40])
if len(otm_put_options) > 0:
self.put = otm_put_options[-1]
#subscribe to the option contract, so that we can buy it
option_buyput = self.AddOptionContract(self.put, Resolution.Minute)
if option_buyput.AskPrice == 0:
self.Debug("No prices")
return
self.SetHoldings(option_buyput.Symbol, 0.1)
#self.MarketOrder(option_buyput.Symbol, 1)
self.Debug(f" {self.Time} -- -- Bought put option {option_buyput} for {option_buyput.AskPrice} // Underlying stock @ {option_buyput.Underlying.Close} // Limit {option_buyput.AskPrice * 1.3} ")
def OnOrderEvent(self, orderEvent):
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and order.SecurityType == SecurityType.Option and order.Symbol.ID.OptionRight == OptionRight.Call:
quantity = math.floor((self.Portfolio[order.Symbol].Quantity) / 2)
fill_price = self.Portfolio[order.Symbol].AveragePrice
limit_price = fill_price * 1.3
self.LimitOrder(order.Symbol, -quantity, limit_price * 1.3)
if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and order.SecurityType == SecurityType.Option and order.Symbol.ID.OptionRight == OptionRight.Put:
quantity = math.floor((self.Portfolio[order.Symbol].Quantity) / 2)
fill_price = self.Portfolio[order.Symbol].AveragePrice
limit_price = fill_price * 1.3
self.LimitOrder(order.Symbol, -quantity, limit_price * 1.3)
if order.Status == OrderStatus.Filled and (order.Type == OrderType.Limit):
options_quantity = self.Portfolio[order.Symbol].Quantity
if options_quantity > 0:
fill_price = self.Portfolio[order.Symbol].AveragePrice
limit_price2 = fill_price * 1.5
self.LimitOrder(order.Symbol, -options_quantity, limit_price2)
def VIX(self):
VIXhistory = self.History(CBOE, self.vix, 90, Resolution.Daily)
# (Current - Min) / (Max - Min)
self.rank = ((self.Securities[self.vix].Price - min(VIXhistory["low"])) / (max(VIXhistory["high"]) - min(VIXhistory["low"])))