Overall Statistics |
Total Trades 7212 Average Win 0.33% Average Loss -0.31% Compounding Annual Return -0.063% Drawdown 66.700% Expectancy 0.073 Net Profit -1.128% Sharpe Ratio 0.08 Probabilistic Sharpe Ratio 0.000% Loss Rate 48% Win Rate 52% Profit-Loss Ratio 1.07 Alpha 0.041 Beta -0.373 Annual Standard Deviation 0.166 Annual Variance 0.027 Information Ratio -0.232 Tracking Error 0.269 Treynor Ratio -0.036 Total Fees $13780.61 Estimated Strategy Capacity $2200000.00 Lowest Capacity Asset ALB R735QTJ8XC9X |
# region imports from AlgorithmImports import * import numpy as np # endregion class HyperActiveGreenFish(QCAlgorithm): def Initialize(self): qb = self qb.SetStartDate(2005,1,1) qb.SetEndDate(2022,12,1) qb.SetCash(100000) qb.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash) self.SetBenchmark("SPY") # self.SetBenchmark(self.AddEquity("SPY").Symbol) self.resolution = Resolution.Daily # self.rebalanceTime = datetime.min self.activeStocks = set() # to keep track of stocks self.universe = {} self.free_positions = 10 self.UniverseSettings.Resolution = self.resolution self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.SplitAdjusted self.AddUniverse(self.CoarseFilter) #, self.FineFilter) # self.SetWarmup(10, self.resolution) # days = daysSMA def CoarseFilter(self, coarse): self.coarseListReturn = [] self.coarseNames = [] self.coarseInstances = [] # self.coarseListReturn.clear() sortedByDollarVolume = sorted(coarse, key = lambda x: x.DollarVolume, reverse=True) filteredByVolume = [c for c in sortedByDollarVolume if c.DollarVolume > 100000000] coarseUniverse = [c for c in filteredByVolume if c.Price > 15] for c in coarseUniverse: symbol = c.Symbol ticker = symbol.Value if symbol in self.ActiveSecurities and self.Portfolio[symbol].IsShort: self.coarseListReturn = self.coarseListReturn elif symbol in self.ActiveSecurities and symbol in self.universe and self.universe[symbol].orderStatus == OrderStatus.Submitted: self.coarseListReturn = self.coarseListReturn else: #Create a new instance of SelectionData and save to universe[symbol] # First Call history to get tradebars and dataframe history_tb = self.History[TradeBar](symbol, 10, Resolution.Daily) history_df = self.History(symbol, 100, Resolution.Daily) # stockObject = SelectionData(history_tb, history_df, c.Price) # self.universe[ticker] = stockObject # stock = self.universe[ticker] if 'close' in history_df.columns and len(history_df) > 10 and not history_df.isnull().values.any(): self.universe[symbol] = SelectionData(None, history_df, c.Price) # stock = self.universe[symbol] # checking 10-day average Dollar Volume # volume = self.universe[symbol].average_volume(history_tb, c.Price) # if volume > 25 : #5. Check ATR and closingprices, and if ok append the symbol to selected list. atr10 = self.universe[symbol].atr_10(history_tb, c.Price) if self.universe[symbol].atrIsReady and atr10 > 2.5: # check if price increased last 2 days higher = self.universe[symbol].each_day_higher(history_df) if higher: # check percent change prev 3 days # if self.universe[symbol].percent_up_prev_days(history_df, 3) > 6: # check RSI rsi = self.universe[symbol].rsi_TV(history_df, 3) if rsi > 92: self.coarseListReturn.append(symbol) # self.coarseNames.append(symbol.Value) # self.coarseInstances.append(self.universe[symbol]) self.coarseListReturn = sorted(self.coarseListReturn, key = lambda x: self.universe[x].rsi_atEntry, reverse=True) # self.Log('COARSE: RSI: 92 - Volume avg: 8 - ATR: 2.5 - higher: 2 days - up 3 days:NaN%') # self.Log(f'{self.Time}') # self.Log(f'-UniverseStocks amount: {len(self.coarseListReturn)} UniverseStocks: {self.coarseNames}') self.coarseListReturn = self.coarseListReturn[:10] return self.coarseListReturn def OnSecuritiesChanged(self, changes): # self.Log(f'Date: {self.Time}. (in OnSecuritiesChanged)') # securities taken out of universe: for x in changes.RemovedSecurities: symbol = x.Symbol if symbol in self.activeStocks: self.activeStocks.remove(symbol) # self.Log(f'{symbol.Value} Removed from Universe') # securities added to universe: for x in changes.AddedSecurities: symbol = x.Symbol self.activeStocks.add(symbol) # self.Log(f'{symbol.Value} Added to Universe') def OnData(self, data): # self.Log(f'{self.Time} - DAY START') # return if warming up: # if self.IsWarmingUp: # return for symbol in self.activeStocks: if symbol not in data: return if self.activeStocks == [] and not self.Portfolio.Invested: return self.maxAllowedPositions = 10 self.invested = [ x.Symbol.Value for x in self.Portfolio.Values if x.Invested ] self.activePositions = len(self.invested) self.free_positions = self.maxAllowedPositions - self.activePositions self.investedSymbols = [ x.Symbol for x in self.Portfolio.Values if x.Invested ] # self.investedSymbols2 = [x for x in self.Portfolio if x.Invested] # COVERING SHORT for symbol in self.investedSymbols: if self.Portfolio[symbol].IsShort: # portfolioHoldings = list(self.Portfolio.keys()) # for symbol in portfolioHoldings: # stock = self.universe[symbol] # logging new day in tradingdays since entry self.universe[symbol].tradingdays_since_entry() # getting current price self.currentPrice = self.Securities[symbol].Price # checking status self.profitSinceEntry = 100 * self.Portfolio[symbol].UnrealizedProfitPercent # profitSoFar = self.universe[symbol].profit_since_entry(self.universe[symbol].entryPrice, currentPrice) # exit on days if self.universe[symbol].tradingDaysSinceEntry >= 4: self.Liquidate(symbol) self.Log(f'4-Days-sell: {symbol}') # exit on profit 10% elif self.profitSinceEntry > 4: self.Liquidate(symbol) self.Log(f'Profit-sell above 4%: {symbol}') # Stop-loss elif self.currentPrice > (self.universe[symbol].entryPrice + (3*self.universe[symbol].atr10_atEntry)): self.Liquidate(symbol) self.Log(f'Stop-loss 3ATR: {symbol}') # self.Log(f'RSI:92 // 4-Days-sell // Profit-sell above 4% // Stop-loss 3ATR') # calculating positionsize: if self.free_positions <= 0: self.cashPerPosition = 0 elif self.free_positions > 0: # self.cashPerPosition = 0.88 * (self.Portfolio.MarginRemaining / self.free_positions) self.cashPerPosition = 0.9 * (self.Portfolio.TotalPortfolioValue / self.maxAllowedPositions) for symbol in self.activeStocks: # GOING SHORT if (self.Portfolio[symbol].IsShort == False) and (self.free_positions > 0) and (self.Portfolio.MarginRemaining > 3000): # checking how many positions I already chave # invested = [x.Key for x in self.Portfolio if x.Value.Invested] # self.Log(f'{self.activePositions} already active positions invested') # self.Log(f'Active positions: {self.invested}') # self.Log(f'Free positions: {self.free_positions}') # self.Log(f'Cash pr. position: {cashPerPosition}') # getting current price symbol.price = self.Securities[symbol].Price if symbol.price == 0 or symbol.price == None: return # calculate number of stocks to buy amountToBuy = abs(self.cashPerPosition / symbol.price) # # stock = self.universe[symbol] # if self.universe[symbol].rsi_atEntry == None: # return self.orderTicket = self.MarketOrder(symbol, -amountToBuy) # # save ticket to class # self.universe[symbol].order_ticket(self.orderTicket) # #log entry-price # self.universe[symbol].entry_price(symbol.price) #initialising profit-max-log self.universe[symbol].max_profit(0) # self.Log(f'Buy-Order sent: {symbol.Value} Cost: {cashPerPosition}') # self.Log(f'RSI: {self.universe[symbol].rsi_atEntry}, ATR: {self.universe[symbol].atr10_atEntry_Percent}') # END of DAY def OnOrderEvent(self, orderEvent): symbol = orderEvent.Symbol # stock = self.universe[symbol] order_id = orderEvent.OrderId # ticket = self.Transactions.GetOrderTicket(order_id) if orderEvent.Status == OrderStatus.Submitted: # update order status self.universe[symbol].order_status(orderEvent.Status) # self.universe[symbol].order_date_submitted(self.Time) elif orderEvent.Status == OrderStatus.Filled: if orderEvent.Direction == OrderDirection.Sell: self.universe[symbol].order_status(orderEvent.Status) self.universe[symbol].entry_date(self.Time) # logging entry price self.universe[symbol].entry_price(orderEvent.FillPrice) # set stop-order self.universe[symbol].entry_quantity(orderEvent.FillQuantity) # # Update the order tag # updateSettings = UpdateOrderFields() # # updateSettings.Tag = "OnOrderEvent: Sell Filled " # response = ticket.Update(updateSettings) # self.Log(f'OrderEvent (Short Filled): {symbol.Value}') if symbol in self.activeStocks: self.activeStocks.remove(symbol) self.free_positions -= 1 elif orderEvent.Direction == OrderDirection.Buy: # self.universe[symbol].exit_date(self.Time) # # logging exit price # self.universe[symbol].exit_price(orderEvent.FillPrice) # open_orders = self.Transactions.GetOpenOrders(symbol) # if not len(open_orders) == 0: # self.Transactions.CancelOpenOrders(symbol) # Update the order tag # updateSettings = UpdateOrderFields() # # updateSettings.Tag = "OnOrderEvent: Buy Filled" # response = ticket.Update(updateSettings) # remove from dictionary # if symbol in self.activeStocks: # self.activeStocks.remove(symbol) self.universe.pop(symbol) self.free_positions += 1 # self.Log(f'OrderEvent (ShortCover Filled): {symbol.Value} (removed from universe(dict)') # elif orderEvent.Status == OrderStatus.Canceled: # # self.universe[symbol].order_status(orderEvent.Status) # # self.Log(f'Order Canceled: {symbol.Value}') # pass # class SelectionData(object): class SelectionData(): def __init__(self, history_tb, history_df, currentPrice): # rsi_period = 3 self.orderStatus = None self.entryPrice = None self.rsi_atEntry = None self.atr10_atEntry = None self.atr10_atEntry_Percent = None self.atrIsReady = False self.atr10 = AverageTrueRange(10, movingAverageType=MovingAverageType.Exponential) self.orderTickets = [] self.tradingDaysSinceEntry = 0 # def __str__(self): # return f"EntryPrice:" def order_status(self, orderStatus): self.orderStatus = orderStatus def atr_10(self, history_tb, currentPrice): if currentPrice == 0: return 0 if history_tb is not None: for trade_bar in history_tb: self.atr10.Update(trade_bar) self.atrIsReady = self.atr10.IsReady if self.atrIsReady: self.atr10_atEntry = self.atr10.Current.Value self.atr10_atEntry_Percent = (100 * self.atr10_atEntry) / currentPrice return self.atr10_atEntry_Percent return 0 def average_volume(self, history_tb, currentPrice): if history_tb is not None: volume = 0 period = 0 for bar in history_tb: volume = volume + bar.Volume period += 1 if period > 0: self.volumeDollar10DayAverage = ((currentPrice * volume) / (period)) / 1000000 return self.volumeDollar10DayAverage else: return 0 return 0 def each_day_higher(self, history_df): if history_df is not None: close = history_df["close"] if len(close) > 5: if close[-1] > close[-2] > close[-3]: # > close[-4] > close[-5]: self.eachDayHigher = True return self.eachDayHigher else: self.eachDayHigher = False return self.eachDayHigher else: return None return None def percent_up_prev_days(self, history_df, period): if history_df is not None: close = history_df["close"] if len(close) >= period: lookBack = 0 - period - 1 self.percentUpPrevDays = (100 * (close[-1] - close[lookBack])) / close[lookBack] return self.percentUpPrevDays return 0 def rsi_TV(self, history_df, rsi_period): if history_df is not None: delta = history_df["close"].diff() up = delta.copy() up[up < 0] = 0 up = pd.Series.ewm(up, alpha=1/rsi_period).mean() down = delta.copy() down[down > 0] = 0 down *= -1 down = pd.Series.ewm(down, alpha=1/rsi_period).mean() rsi = np.where(up == 0, 0, np.where(down == 0, 100, 100 - (100 / (1 + up / down)))) self.rsi_atEntry = rsi[-1] return self.rsi_atEntry return def tradingdays_since_entry(self): self.tradingDaysSinceEntry += 1 return def order_date_submitted(self, submittedDate): self.submittedDate = submittedDate return def entry_date(self, entryDate): self.entryDate = entryDate return def exit_date(self, exitDate): self.exitDate = exitDate return def max_profit(self, maxProfit): self.maxProfit = maxProfit return def entry_price(self, entryPrice): self.entryPrice = entryPrice return def exit_price(self, exitPrice): self.exitPrice = exitPrice return def order_ticket(self, orderTicket): self.orderTickets.append(orderTicket) return def entry_quantity(self, entryQuantity): self.entryQuantity = entryQuantity return def profit_since_entry(self, entryPrice, currentPrice): self.profitSoFar = (-100 * (currentPrice - entryPrice)) / entryPrice return self.profitSoFar