| Overall Statistics |
|
Total Orders 182 Average Win 1.19% Average Loss -2.28% Compounding Annual Return -9.391% Drawdown 32.400% Expectancy 0.003 Start Equity 1000.0 End Equity 905.84 Net Profit -9.416% Sharpe Ratio -0.584 Sortino Ratio -0.61 Probabilistic Sharpe Ratio 7.483% Loss Rate 34% Win Rate 66% Profit-Loss Ratio 0.52 Alpha -0.115 Beta -0.028 Annual Standard Deviation 0.19 Annual Variance 0.036 Information Ratio 0.075 Tracking Error 0.656 Treynor Ratio 4.023 Total Fees â‚®516.25 Estimated Strategy Capacity â‚®150000000000.00 Lowest Capacity Asset BTCUSDT 10B Portfolio Turnover 49.76% |
#region imports
from AlgorithmImports import *
import pandas as pd
import copy
import pickle
import calendar
import pytz
# import jsonpickle
#endregion
class ReportManager():
# ==========================================
# Constructor. Accepts algo Object
# ==========================================
def __init__(self, algo):
self.algo = algo
self.algo.OnEndOfAlgorithm = self.OnEndOfAlgorithm
self.projectID = "17556876"
# ==================================================
def OnEndOfAlgorithm(self):
closedTrades = self.algo.TradeBuilder.ClosedTrades
# create an empty dataframe
df = pd.DataFrame()
# loop through the list of Trade objects and add one row for each trade object to the dataframe
for trade in closedTrades:
# Monday is 0 and Sunday is 6
pctReturn = round( ((trade.ExitPrice - trade.EntryPrice) / trade.EntryPrice),2)
# Define the Eastern timezone
eastern = pytz.timezone('US/Eastern')
# Convert the original datetime to Eastern time
tradeEntryTime = trade.EntryTime.astimezone(eastern)
# dayOfWeek = tradeEntryTime.weekday()
dayOfWeek = tradeEntryTime.strftime('%A')
hourOfDay = tradeEntryTime.hour
epochExit = calendar.timegm(trade.ExitTime.timetuple())
genericObject = { 'symbol' : str(trade.Symbol),
'entryPrice': str(trade.EntryPrice),
'quantity' : str(trade.Quantity),
'entryPrice': str(trade.EntryPrice),
'exitPrice': str(trade.ExitPrice),
'direction' : str(trade.Direction),
'duration' : str(trade.Duration.total_seconds()),
'pctReturn' : str(pctReturn),
'mae' : str(trade.MAE),
'mfe' : str(trade.MFE),
'exitTime' : str(trade.ExitTime),
'epochExit' : str(epochExit),
'hourOfDay' : str(hourOfDay),
'dayOfWeek' : str(dayOfWeek)}
# # use the deepcopy function to clone the Trade object into a generic object
# generic_object = copy.deepcopy(trade)
# # convert the generic object to a dictionary
# dictObject = dict(generic_object)
# # create a TradeWithDict object using the properties of the Trade object
# trade_with_dict = TradeWithDict(**trade.__dict__)
# df = df.append(genericObject, ignore_index=True)''
# df = pd.concat([df, genericObject], ignore_index=True)
genericObject_df = pd.DataFrame.from_records([genericObject])
df = pd.concat([df, genericObject_df], ignore_index=True)
# JsonPickle Approach
# ---------------
# serialized = jsonpickle.encode(closedTrades)
# self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", serialized)
# storedObject = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades")
# deserialized = jsonpickle.decode(bytearray(storedObject))
# Pickle Approach
# ---------------
# serialized = pickle.dumps(closedTrades)
# self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", serialized)
# storedObject = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades")
# deserialized = pickle.loads(bytearray(storedObject))
# JSON Approach
# ------------
# serialized = json.dumps(df)
self.algo.ObjectStore.Save(f"{self.projectID}/CompletedTrades", df.to_json(date_unit='ns'))
retrieved = self.algo.ObjectStore.Read(f"{self.projectID}/CompletedTrades")
restored = pd.read_json(retrieved)
pass
'''
# Read back data from object store
restored_history = pd.read_json(qb.ObjectStore.Read("data"))
# Restore the indices
restored_history['time'] = pd.to_datetime(restored_history['time'])
restored_history['symbol'] = restored_history['symbol'].apply(lambda x: qb.Symbol(x))
restored_history.set_index(['symbol', 'time'], inplace=True)
restored_history
'''
# deserialized = json.loads(retrieved)
# pass
# self.algo.ObjectStore.SaveBytes('OS_signal_dens', pickle.dumps(self.signal_dens))
# https://www.quantconnect.com/forum/discussion/14176/object-store-upgrades-and-behavior-change/p1
# Add logic: only do this if we are NOT optimizing, and we are NOT live.
# self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", closedTrades)
# else:
# if self.algo.ObjectStore.ContainsKey(str(self.spy)):
# date = self.algo.ObjectStore.Read(str(self.spy))
# date = parser.parse(date)
# that = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades")
# so = 0
# class TradeWithDict(Trade):
# @property
# def __dict__(self):
# # get a dictionary that contains the properties and their values
# props = vars(self)
# return props
# region imports
from AlgorithmImports import *
# endregion
# Your New Python File
class TradeUtils():
def __init__(self, algo):
self.algo = algo
# Convenience method to liquidate with a message
def LiquidateWithMsg(self, symbol, exitReason):
pnl = round(100 * self.algo.Portfolio[symbol].UnrealizedProfitPercent,2)
biasText = 'Long' if (self.algo.Portfolio[symbol].IsLong) else 'Short'
winlossText = 'win' if pnl > 0 else 'loss'
orderNote = f"[{pnl}% {winlossText}] {exitReason} | Exiting {biasText} position"
# If Crypto, call LiquidateMarketOrder
if self.algo.Securities[symbol].Type == SecurityType.Crypto:
self.LiquidateMarketOrder(symbol=symbol, tag=orderNote)
else:
self.algo.liquidate(symbol, tag=orderNote)
## Liquidate via market order. Necessary for crypto
def LiquidateMarketOrder(self, symbol, tag):
crypto = self.algo.securities[symbol]
base_currency = crypto.base_currency
# Avoid negative amount after liquidate
quantity = max(crypto.holdings.quantity, base_currency.amount)
# Round down to observe the lot size
lot_size = crypto.symbol_properties.lot_size;
quantity = (round(quantity / lot_size) - 1) * lot_size
if self.is_valid_order_size(crypto, quantity):
# self.algo.debug(f"------------ [START] Market Order: liquidation start")
self.algo.debug(f" Liquidating: {quantity} units of {symbol.Value}")
self.algo.market_order(symbol, -quantity, tag=tag)
self.algo.debug(f"Market Order liquidation was Successful")
self.algo.debug(f" Leftover: {crypto.holdings.quantity} units of {symbol.Value}")
self.algo.debug(f"------------ [END] Market Order liquidation")
if( abs(crypto.holdings.quantity) > lot_size):
self.LiquidateMarketOrder(symbol, tag="reomving trailing coins")
else:
self.algo.debug(f"ERROR ERRROR ---- ")
self.algo.debug(f"ERROR ERRROR Invalid order size: {quantity}")
# Brokerages have different order size rules
# Binance considers the minimum volume (price x quantity):
def is_valid_order_size(self, crypto, quantity):
return abs(crypto.price * quantity) > crypto.symbol_properties.minimum_order_size
def HasHoldings(self, symbol):
if self.algo.Securities[symbol].Type == SecurityType.Crypto:
## TODO: Explore a better way to do this. Not clear how base_currency.amount should be used
min_lot_size = self.algo.securities[symbol].symbol_properties.lot_size
asset = self.algo.securities[symbol]
base_currency = asset.base_currency
# quantity = min(asset.holdings.quantity, base_currency.amount)
quantity = abs(asset.holdings.quantity)
# abs(self.securities[self.symbol].symbol_properties.lot_size - self.securities[self.symbol].holdings.quantity) > 0.000000001
return abs(quantity) >= self.algo.securities[symbol].symbol_properties.minimum_order_size
# return abs(quantity - min_lot_size) > min_lot_size
else:
return self.algo.Portfolio[symbol].Invested#region imports
from AlgorithmImports import *
#endregion
class TrailingStopHelper():
def __init__(self, algo, symbol, volaIndicator=None, trailStopCoeff=2, initialStopCoeff=1, activationCoeff=2):
# Track state and indicators
self.algo = algo
self.symbol = symbol
self.entryPrice = 0
self.ExitMessage = ""
self.systemActivated = False
self.volaIndicator = volaIndicator
self.volaIndicator.Updated += self.OnATRIndicatorUpdated
# Stop Loss States
self.trailStopCoeff = trailStopCoeff
self.initialStopCoeff = initialStopCoeff
self.activationCoeff = activationCoeff
self.ResetStopLosses()
@property
def lastPrice(self):
if (self.symbol in self.algo.Securities \
and self.algo.Securities[self.symbol] is not None):
return self.algo.Securities[self.symbol].Price
return 0
## -----------------------------------------------
def OnATRIndicatorUpdated(self, sender, updated):
self.PlotCharts()
# Trailing Stop Exit
# This method updates the trailing stop
# ============================================
def TrailingExitSignalFired(self):
if( not self.volaIndicator.IsReady ):
return False
# If trailing stop is NOT set, get last price, and set it
# --------------------------------------------------------
if( not self.stopsLossActivated ):
self.highestPrice = self.lastPrice
self.trailingStopLoss = self.lastPrice - (self.volaIndicator.Current.Value * self.trailStopCoeff)
self.stopsLossActivated = True
# Recursively call this function to check for stops
# again, now that the trailing stop has been activated
# and the stop loss value has been updated.
# --------------------------------------------------
return self.TrailingExitSignalFired()
# If trailing stop loss is activated, check if price closed below it.
# If it did, then exit. If not, update the trailing stop loss.
else:
if self.PriceIsBelowStopLoss():
return True
else:
# If price has gone up
if self.lastPrice > self.highestPrice:
# If price is above the trail activation price, update trailing stop
if self.lastPrice > self.activationPrice:
self.highestPrice = self.lastPrice
newTrailingStopLoss = self.highestPrice - (self.volaIndicator.Current.Value * self.trailStopCoeff)
self.trailingStopLoss = max (self.trailingStopLoss, newTrailingStopLoss, self.activationPrice)
# check again just in case price ends up below the new trailing stop level
if self.PriceIsBelowStopLoss():
return True
return False
## Check if price is below trailing stop loss or regular stop loss
## ---------------------------------------------------------------
def PriceIsBelowStopLoss(self):
if self.lastPrice > self.activationPrice:
if( self.lastPrice < self.trailingStopLoss ):
self.ExitMessage = "Trailing Stop Loss"
return True
else:
if( self.lastPrice < self.initialStopLoss ):
self.ExitMessage = "Initial Stop Loss"
return True
self.PlotCharts()
return False
## Logic to run immediately after a new position is opened.
## We track entry price and set initial stop loss values
## ---------------------------------------------------------
def Activate(self, entryPrice,initialStopValue=0, activationValue=0 ):
self.entryPrice = entryPrice
self.systemActivated = True
self.SetInitialStops(initialStopValue, activationValue)
return
## Set initial stop and activation level. Called after new position opened.
## ------------------------------------------------------------------------
def SetInitialStops(self, initialStopValue=0, activationValue=0):
## TODO: Use onOrderEvent to set this, because the actual price may be different
self.entryPrice = self.lastPrice
if initialStopValue == 0:
self.initialStopLoss = self.entryPrice - (self.volaIndicator.Current.Value * self.initialStopCoeff)
else:
self.initialStopLoss = initialStopValue
if activationValue == 0:
self.activationPrice = self.entryPrice + (self.volaIndicator.Current.Value * self.activationCoeff)
else:
self.activationPrice = activationValue
## Logic to run immediately after a position is closed
## Reset exit message, stop loss values.
## ---------------------------------------------------
def Deactivate(self):
self.PlotCharts()
self.ExitMessage = "No Exit Message"
self.systemActivated = False
self.ResetStopLosses()
## Reset stop losses
## -------------------------------------------------
def ResetStopLosses(self):
self.stopsLossActivated = False
self.initialStopLoss = 0
self.activationPrice = 0
self.trailingStopLoss = 0
## Plot Price, Stop losses & activation levels
## -------------------------------------------
def PlotCharts(self):
return
self.algo.Plot(f"{self.symbol} Trailing stop", "Price", self.lastPrice)
self.algo.Plot(f"{self.symbol} Trailing stop", "Initial Stop", self.initialStopLoss)
self.algo.Plot(f"{self.symbol} Trailing stop", "Acivation Pt", self.activationPrice)
self.algo.Plot(f"{self.symbol} Trailing stop", "TrailingStop", self.trailingStopLoss)
return from AlgorithmImports import *
from ReportManager import *
from TradeUtils import *
from TrailingStopHelper import *
from io import StringIO
import pandas as pd
import pytz
class CSVSignalImporter(QCAlgorithm):
def Initialize(self):
self.InitUtils()
self.InitParams()
self.InitBacktest()
self.ImportSignals()
self.InitIndicators()
## Initialize Utils
## ------------------------------------------------
def InitUtils(self):
self.TradeUtils = TradeUtils(self)
## Initialize config parameters, state tracking,etc.
## ------------------------------------------------
def InitParams(self):
# Data parameters
self.timeCol = "date_time"
self.timezone = pytz.timezone('UTC')
self.newTimeCol = "NewTime"
self.ticker = "BTCUSDT"
# Trading System parameters
self.slATRCoeff = int(self.get_parameter("slATRCoeff")) # 2 # ATR coefficient for stop loss
self.rrRatio = float(self.get_parameter("rrRatio")) # 2 # risk-reward ratio for take profit
self.sizeAcctPct = 1 # Pct of the cash available to use for trade
self.threshold = float(self.get_parameter("thresh")) # Prediction threshold
self.signal_exp = 30 # Signal expiry in seconds
self.hold_dur = 4320 # Duration (in minutes) for which to hold position
self.exitMethod = 2 # 0 exits after specified duration
# 1 exits after specified duration + Stop loss
# 2 exits after ATR TP/SL
# 3 exits after ATR Trailing Stop
# 4 exits after min holding period and ATR Trailing Stop
# 5 exits after ATR Trailing Stop BUT with ML vola initial levels
# Check trailing stop breach only when ATR updates (eg hourly) versus every minute.
self.checkTrailingStopOnATRUpdateOnly = False
self.useMLVola = (1 == int(self.get_parameter("useMLVola"))) # When meauring tp/sl, use vola from the ML features
# State Tracking
self.direction = None # Keep track of trade direction
self.entryMsg = ""
self.exitMsg = ""
self.signalTime = ""
self.mlVola = 0.0
## Init backtest properties / configs / managers
## ---------------------------------------------
def InitBacktest(self):
# Dates
self.SetStartDate(2022, 5, 1)
self.SetEndDate(2023, 5, 1)
# Data
self.SetAccountCurrency("USDT")
self.SetCash(1000)
self.SetBrokerageModel(BrokerageName.KRAKEN, AccountType.Margin)
self.symbol = self.AddCrypto(self.ticker, Resolution.Minute, Market.KRAKEN).Symbol
self.SetBenchmark(self.symbol)
# Custom reporting
self.ReportManager = ReportManager(self)
## Init indicators used for tracking volatility and trailing stops
## ---------------------------------------------------------------
def InitIndicators(self):
self.atr = self.ATR(self.symbol, 14, MovingAverageType.Simple, Resolution.Hour)
# Uncomment this if you want trailing stop exists to
if(self.checkTrailingStopOnATRUpdateOnly):
self.atr.updated += self.onTrailATRUpdateCheckForExits
trailStopCoeff = float(self.get_parameter("trailStopCoeff"))
initialStopCoeff = float(self.get_parameter("initialStopCoeff"))
activationCoeff = initialStopCoeff * self.rrRatio
self.trailingStop = TrailingStopHelper(self,self.symbol,self.atr,
trailStopCoeff,initialStopCoeff,activationCoeff)
## Logic for ingesting and parsing the CSV with model signals
## ----------------------------------------------------------
def ImportSignals(self):
furkan_jul_30_csv = self.Download("https://docs.google.com/spreadsheets/d/1nO1iHe9Eff4uLoUmwGj7SICph-rwS6tmEWNUz9Fj4lk/export?format=csv")
signals_csv = furkan_jul_30_csv
self.df = pd.read_csv(StringIO(signals_csv))
self.df[self.newTimeCol] = pd.to_datetime(self.df[self.timeCol], format='%Y-%m-%d %H:%M:%S')
self.df[self.newTimeCol] = self.df[self.newTimeCol].dt.to_pydatetime()
self.df[self.newTimeCol] = pd.to_datetime(self.df[self.newTimeCol]).dt.tz_localize('UTC')
# Strip rows that dont meet our prediction threshold
self.df = self.df[self.df["predicted_probabilities"].notna() & (self.df["predicted_probabilities"] >= self.threshold)]
# Sort df by date -- shouldnt be necessary though
self.df = self.df.sort_values(self.newTimeCol)
# Custom handling for Tolu's signal CSV
if 'Quantity' in self.df.columns:
# Remove rows with negative quantity (sell signals)
self.df = self.df[self.df['Quantity'] >= 0]
## System method, called every time a new bar comes in.
## Here we check for entry / exit signals, and clean up data
## ----------------------------------------------------------
def OnData(self, data: Slice):
# First, make sure we have valid data coming in
if (self.ticker in data ) and (data[self.ticker] is not None):
currTimeStr = self.Time.strftime("%Y-%m-%d %H:%M:%S")
current_time = self.Time.astimezone(self.timezone)
# If not invested, check for entry signals
if( not self.TradeUtils.HasHoldings(self.symbol)):
# If we have a signal, open position, set direction, set tops.
if self.EntrySignalFired():
second_diff = (current_time - self.signalTime).total_seconds()
signal_vola = round(self.mlVola * 100,2)
self.SetHoldings(self.symbol, self.sizeAcctPct, tag=f"Signal fired {second_diff} seconds ago). Signal vola: {signal_vola}% ")
self.direction = PortfolioBias.LONG
self.SetStopLossTakeProfit()
# If invested, check for exit signals
else:
if self.exitMethod == 3 or self.exitMethod == 4:
if not (self.checkTrailingStopOnATRUpdateOnly):
## If we're using trail stops, we might check here for exits (every minuite)
## otherwise, we only check when the trail stop updates (hourly)
self.ExecuteExitSignals()
else:
# If not using trailing stops, always check here for exits
self.ExecuteExitSignals()
# remove expired signals (records older than x minutes ago).
self.df = self.df[self.df[self.newTimeCol] >= (current_time - timedelta(seconds=self.signal_exp))]
# When trail ATR updates, check for exit
def onTrailATRUpdateCheckForExits(self,indicator,indicator_data):
if(self.atr.IsReady):
if( self.TradeUtils.HasHoldings(self.symbol)):
self.ExecuteExitSignals()
def ExecuteExitSignals(self):
if self.ExitSignalFired():
self.TradeUtils.LiquidateWithMsg(self.symbol, self.exitMsg)
# self.Liquidate(tag=self.exitMsg)
## Returns true if we found a recent valid entry signal (that has not expired).
## ----------------------------------------------------------------------------
def EntrySignalFired(self):
current_time = self.Time.astimezone(self.timezone)
# Get most recent prediction record that hasnt expired (ie: within last x minutes)
recent_df = self.df[self.df[self.newTimeCol] <= (current_time - timedelta(seconds=self.signal_exp))]
if not recent_df.empty:
for index, row in recent_df.iterrows():
rowTimeOne = (row[self.newTimeCol])
rowTime = pd.to_datetime(row[self.newTimeCol]).astimezone(self.timezone)
# rowTimeStr = rowTime.strftime("%Y-%m-%d %H:%M:%S")
# self.Debug(f"Entry Signal stamped {rowTimeStr} Fired On Day: {currTimeStr}")
# Delete
# rowTime = pd.to_datetime(row[self.newTimeCol])
# rowTime = rowTime.to_pydatetime()
# rowTime = rowTime.astimezone(self.timezone)
try:
self.mlVola = row['trgt']
except:
self.mlVola = 0
self.Debug(f"Volatility value not found for timestamp: {rowTime}")
self.Quit()
self.signalTime = rowTime
return True
return False
def SetStopLossTakeProfit(self):
vola = self.mlVola
self.EntryTime = self.Time
closePrice = self.CurrentSlice[self.ticker].Close
if (self.exitMethod == 1) or (self.exitMethod == 2) :
if( self.useMLVola ):
# Set take profit/stop loss - Using Volatiliy from ML features
self.takeProfit = closePrice + (closePrice * vola)
self.stopLoss = closePrice - (closePrice * vola / self.rrRatio)
else:
# Set take profit/stop loss - Using ATR-measured Volatiliy
self.stopLoss = closePrice - (self.slATRCoeff * self.atr.Current.Value)
self.takeProfit = closePrice + (self.slATRCoeff * self.atr.Current.Value * self.rrRatio)
elif(self.exitMethod == 3):
# Set Trailstop
self.trailingStop.Activate(closePrice)
elif(self.exitMethod == 5):
# Set Trailstop with vola
initialStopLoss = closePrice - (closePrice * vola / self.rrRatio)
activationLevel = closePrice + (closePrice * vola)
self.trailingStop.Activate(closePrice, initialStopLoss, activationLevel)
# Check if exit signal has fired, due to
# Take Profit, Stop Loss, or time duration
def ExitSignalFired(self):
# Exit based on time. ie: Held position for x minutes
if self.exitMethod == 0:
if self.HeldForMinimumDuration():
self.exitMsg = f"Exit After {self.hold_dur} minutes"
return True
# Exit based on time AND stop loss.
elif self.exitMethod == 1:
closePrice = self.CurrentSlice[self.ticker].Close
if self.HeldForMinimumDuration():
self.exitMsg = f"Exit After {self.hold_dur} minutes"
return True
elif closePrice <= self.stopLoss:
self.exitMsg = "Stop Loss Exit"
return True
# Take Profit / Stop loss exit
elif self.exitMethod == 2:
closePrice = self.CurrentSlice[self.ticker].Close
if closePrice >= self.takeProfit:
self.exitMsg = "Take Profit Exit"
return True
elif closePrice <= self.stopLoss:
self.exitMsg = "Stop Loss Exit"
return True
# Trailing stop exit
elif self.exitMethod == 3 or self.exitMethod == 5:
if self.trailingStop.TrailingExitSignalFired():
self.exitMsg = self.trailingStop.ExitMessage
return True
# Trailing stop exit after Min Hold Duration
elif self.exitMethod == 4:
if (self.HeldForMinimumDuration()):
if not self.trailingStop.systemActivated:
self.trailingStop.Activate(self.securities[self.symbol].price)
if self.trailingStop.TrailingExitSignalFired():
self.exitMsg = self.trailingStop.ExitMessage
return True
return False
def HeldForMinimumDuration(self):
return (self.Time - self.EntryTime) >= pd.Timedelta(minutes=self.hold_dur)
# quantity <self.algo.securities[symbol].symbol_properties.lot_size
# def HasHoldings(self, symbol):
# ## TODO: Explore a better way to do this. Not clear how base_currency.amount should be used
# crypto = self.securities[symbol]
# base_currency = crypto.base_currency
# quantity = min(crypto.holdings.quantity, base_currency.amount)
# return (quantity > 0)
# quantity = crypto.holdings.quantity
# return abs(crypto.price * quantity) > crypto.symbol_properties.minimum_order_size