from AlgorithmImports import *
from QuantConnect.DataSource import *
from QuantConnect.Data.UniverseSelection import *
from datetime import datetime
class ADASIGNAL(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "https://www.dropbox.com/scl/fi/w45r2sp7wgc0qvzzq4ucy/predictions_ADA.csv?rlkey=ekqccoysvh1glmd0ab3gmyenk&dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip()):
return None
try:
data = line.split(",")
token = ADASIGNAL()
token.Symbol = config.Symbol
token.Time = datetime.strptime(data[0], "%Y-%m-%d %H:%M:%S")+timedelta(minutes=1)
token.Value = 0 # Required by QuantConnect. Set to 0 or a relevant value.
token["Signal"] = float(data[1]) # Use the custom data format to store the signal
return token
except ValueError:
# Handle errors in parsing
return None
class BTCSIGNAL(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "https://www.dropbox.com/scl/fi/3kec4nhbjtdhqdt9tchql/predictions_btc.csv?rlkey=sxx6cwttgqb0vmdncarr3mbzd&dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip()):
return None
try:
data = line.split(",")
token = BTCSIGNAL()
token.Symbol = config.Symbol
token.Time = datetime.strptime(data[0], "%Y-%m-%d %H:%M:%S")+timedelta(minutes=1)
token.Value = 0 # Required by QuantConnect. Set to 0 or a relevant value.
token["Signal"] = float(data[1]) # Use the custom data format to store the signal
return token
except ValueError:
# Handle errors in parsing
return None
class DOGESIGNAL(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "https://www.dropbox.com/scl/fi/r5y7yriyq1h2t0dkd5ven/predictions_doge.csv?rlkey=31vp2oaej5hm9dlag131dv8t0&dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip()):
return None
try:
data = line.split(",")
token = DOGESIGNAL()
token.Symbol = config.Symbol
token.Time = datetime.strptime(data[0], "%Y-%m-%d %H:%M:%S")+timedelta(minutes=1)
token.Value = 0 # Required by QuantConnect. Set to 0 or a relevant value.
token["Signal"] = float(data[1]) # Use the custom data format to store the signal
return token
except ValueError:
# Handle errors in parsing
return None
class DOTSIGNAL(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "https://www.dropbox.com/scl/fi/zkfw3o4lm6fuuisjla9tc/predictions_dot.csv?rlkey=v7e39jbtzklmz0v2g7gkn5bmb&dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip()):
return None
try:
data = line.split(",")
token = DOTSIGNAL()
token.Symbol = config.Symbol
token.Time = datetime.strptime(data[0], "%Y-%m-%d %H:%M:%S")+timedelta(minutes=1)
token.Value = 0 # Required by QuantConnect. Set to 0 or a relevant value.
token["Signal"] = float(data[1]) # Use the custom data format to store the signal
return token
except ValueError:
# Handle errors in parsing
return None
class ETHSIGNAL(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "https://www.dropbox.com/scl/fi/t7okki70nmxtcjljh2syu/predictions_eth.csv?rlkey=6cmpiuh0p9wrjro42xpqpa8sn&dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip()):
return None
try:
data = line.split(",")
token = ETHSIGNAL()
token.Symbol = config.Symbol
token.Time = datetime.strptime(data[0], "%Y-%m-%d %H:%M:%S")+timedelta(minutes=1)
token.Value = 0 # Required by QuantConnect. Set to 0 or a relevant value.
token["Signal"] = float(data[1]) # Use the custom data format to store the signal
return token
except ValueError:
# Handle errors in parsing
return None
class XRPSIGNAL(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "https://www.dropbox.com/scl/fi/e8rjjlnbwsnuco3k1m9d5/predictions_xrp.csv?rlkey=podg4gvx5s70x3ydk8patwdwu&dl=1"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip()):
return None
try:
data = line.split(",")
token = XRPSIGNAL()
token.Symbol = config.Symbol
token.Time = datetime.strptime(data[0], "%Y-%m-%d %H:%M:%S")+timedelta(minutes=1)
token.Value = 0 # Required by QuantConnect. Set to 0 or a relevant value.
token["Signal"] = float(data[1]) # Use the custom data format to store the signal
return token
except ValueError:
# Handle errors in parsing
return None
class Algorirthm1(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 1)
self.SetEndDate(2023, 12, 31)
balance = 1000000 # 1M
self.SetCash("USDT", balance)
self.SetBrokerageModel(BrokerageName.Binance, AccountType.Cash)
#ONLY HERE
self.ADA = self.AddCrypto("ADAUSDT", Resolution.Minute).Symbol
self.BTC = self.AddCrypto("BTCUSDT", Resolution.Minute).Symbol
self.DOGE = self.AddCrypto("DOGEUSDT", Resolution.Minute).Symbol
self.DOT = self.AddCrypto("DOTUSDT", Resolution.Minute).Symbol
self.ETH = self.AddCrypto("ETHUSDT", Resolution.Minute).Symbol
self.XRP = self.AddCrypto("XRPUSDT", Resolution.Minute).Symbol
self.ADASIGNAL = self.AddData(ADASIGNAL, "ADASIG", Resolution.Minute).Symbol
self.BTCSIGNAL = self.AddData(BTCSIGNAL, "BTCSIG", Resolution.Minute).Symbol
self.DOGESIGNAL = self.AddData(DOGESIGNAL, "DOGESIG", Resolution.Minute).Symbol
self.DOTSIGNAL = self.AddData(DOTSIGNAL, "DOTSIG", Resolution.Minute).Symbol
self.ETHSIGNAL = self.AddData(ETHSIGNAL, "ETHSIG", Resolution.Minute).Symbol
self.XRPSIGNAL = self.AddData(XRPSIGNAL, "XRPSIG", Resolution.Minute).Symbol
self.cryptos = [self.AddCrypto(token, Resolution.MINUTE).Symbol for token in ["BTCUSDT", "ETHUSDT", "XRPUSDT", "ADAUSDT", "DOTUSDT", "DOGEUSDT"]]
self.symbols = [self.AddData(CoinGecko, crypto).Symbol for crypto in ["BTC", "ETH", "XRP", "ADA", "DOT", "DOGE"]]
self.windows = {}
for symbol in ["BTC", "ETH", "XRP", "ADA", "DOT", "DOGE"]:
self.windows[symbol] = RollingWindow[CoinGecko](2)
self.SetBenchmark(self.BTC)
# Loop through the cryptocurrencies and retrieve their market caps
self.adaentry = 0
self.adanextEntryTime = self.Time
self.btcentry = 0
self.btcnextEntryTime = self.Time
self.dogeentry = 0
self.dogenextEntryTime = self.Time
self.dotentry = 0
self.dotnextEntryTime = self.Time
self.ethentry = 0
self.ethnextEntryTime = self.Time
self.xrpentry = 0
self.xrpnextEntryTime = self.Time
self.tp = 0.01
self.sl = self.tp*2
self.adainvested = False
self.btcinvested = False
self.dogeinvested = False
self.dotinvested = False
self.ethinvested = False
self.xrpinvested = False
def OnData(self, data) -> None:
ada_weight = 1/6
if self.ADASIGNAL in data and self.ADA in data:
price = data[self.ADA].Close
signal = data[self.ADASIGNAL].Signal
if self.Time >= self.adanextEntryTime and signal == 1.0:
self.Transactions.CancelOpenOrders()
self.SetHoldings(self.ADA, ada_weight)
self.adaentry = price
self.Log(f"ADA buy @ {price}")
self.adainvested = True
elif price >= self.adaentry*(1+self.tp) or signal == 0.0 or price <= self.adaentry*(1-self.sl):
self.liquidate_crypto(self.ADA)
if(signal == 1.0):
self.SetHoldings(self.ADA, ada_weight)
self.adaentry = price
self.adanextEntryTime = self.Time + timedelta(minutes=1)
self.Log(f"ADA sell @ {price}")
self.adainvested=False
else:
return
btc_weight = 1/6
if self.BTCSIGNAL in data and self.BTC in data:
price = data[self.BTC].Close
signal = data[self.BTCSIGNAL].Signal
if self.Time >= self.btcnextEntryTime and signal == 1.0:
self.Transactions.CancelOpenOrders()
self.SetHoldings(self.BTC, btc_weight)
self.btcentry = price
self.Log(f"BTC buy @ {price}")
self.btcinvested = True
elif price >= self.btcentry*(1+self.tp) or signal == 0.0 or price <= self.btcentry*(1-self.sl):
self.liquidate_crypto(self.BTC)
if(signal == 1.0):
self.SetHoldings(self.BTC, btc_weight)
self.btcentry = price
self.btcnextEntryTime = self.Time + timedelta(minutes=1)
self.Log(f"BTC sell @ {price}")
self.btcinvested=False
else:
return
doge_weight = 1/6
if self.DOGESIGNAL in data and self.DOGE in data:
price = data[self.DOGE].Close
signal = data[self.DOGESIGNAL].Signal
if self.Time >= self.dogenextEntryTime and signal == 1.0:
self.Transactions.CancelOpenOrders()
self.SetHoldings(self.DOGE, doge_weight)
self.dogeentry = price
self.Log(f"DOGE buy @ {price}")
self.dogeinvested = True
elif price >= self.dogeentry*(1+self.tp) or signal == 0.0 or price <= self.dogeentry*(1-self.sl):
self.liquidate_crypto(self.DOGE)
if(signal == 1.0):
self.SetHoldings(self.DOGE, doge_weight)
self.dogeentry = price
self.dogenextEntryTime = self.Time + timedelta(minutes=1)
self.Log(f"DOGE sell @ {price}")
self.dogeinvested=False
else:
return
dot_weight = 1/6
if self.DOTSIGNAL in data and self.DOT in data:
price = data[self.DOT].Close
signal = data[self.DOTSIGNAL].Signal
if self.Time >= self.dotnextEntryTime and signal == 1.0:
self.Transactions.CancelOpenOrders()
self.SetHoldings(self.DOT, dot_weight)
self.dotentry = price
self.Log(f"DOT buy @ {price}")
self.dotinvested = True
elif price >= self.dotentry*(1+self.tp) or signal == 0.0 or price <= self.dotentry*(1-self.sl):
self.liquidate_crypto(self.DOT)
if(signal == 1.0):
self.SetHoldings(self.DOT, dot_weight)
self.dotentry = price
self.dotnextEntryTime = self.Time + timedelta(minutes=1)
self.Log(f"DOT sell @ {price}")
self.dotinvested=False
else:
return
eth_weight = 1/6
if self.ETHSIGNAL in data and self.ETH in data:
price = data[self.ETH].Close
signal = data[self.ETHSIGNAL].Signal
if self.Time >= self.ethnextEntryTime and signal == 1.0:
self.Transactions.CancelOpenOrders()
self.SetHoldings(self.ETH, eth_weight)
self.ethentry = price
self.Log(f"ETH buy @ {price}")
self.ethinvested = True
elif price >= self.ethentry*(1+self.tp) or signal == 0.0 or price <= self.ethentry*(1-self.sl):
self.liquidate_crypto(self.ETH)
if(signal == 1.0):
self.SetHoldings(self.ETH, eth_weight)
self.ethentry = price
self.ethnextEntryTime = self.Time + timedelta(minutes=1)
self.Log(f"ETH sell @ {price}")
self.ethinvested=False
else:
return
xrp_weight = 1/6
if self.XRPSIGNAL in data and self.XRP in data:
price = data[self.XRP].Close
signal = data[self.XRPSIGNAL].Signal
if self.Time >= self.xrpnextEntryTime and signal == 1.0:
self.Transactions.CancelOpenOrders()
self.SetHoldings(self.XRP, xrp_weight)
self.xrpentry = price
self.Log(f"XRP buy @ {price}")
self.xrpinvested = True
elif price >= self.xrpentry*(1+self.tp) or signal == 0.0 or price <= self.xrpentry*(1-self.sl):
self.liquidate_crypto(self.XRP)
if(signal == 1.0):
self.SetHoldings(self.XRP, xrp_weight)
self.xrpentry = price
self.xrpnextEntryTime = self.Time + timedelta(minutes=1)
self.Log(f"XRP sell @ {price}")
self.xrpinvested=False
else:
return
def liquidate_crypto(self, symbol):
crypto = self.Securities[symbol]
base_currency = crypto.BaseCurrency
quantity = min(crypto.Holdings.Quantity, base_currency.Amount)
lot_size = crypto.SymbolProperties.LotSize;
quantity = (round(quantity / lot_size) - 1) * lot_size
if self.is_valid_order_size(crypto, quantity):
self.MarketOrder(symbol, -quantity)
def set_crypto_holdings(self, symbol, percentage):
usdTotal = self.Portfolio.CashBook["USDT"].Amount
usdReserved = sum(x.Quantity * x.LimitPrice for x
in [x for x in self.Transactions.GetOpenOrders()
if x.Direction == OrderDirection.Buy
and x.Type == OrderType.Limit])
usdAvailable = usdTotal - usdReserved
self.Debug("usdAvailable: {}".format(usdAvailable))
limitPrice = round(self.Securities[symbol].Price, 2)
quantity = min(5,usdAvailable * percentage / limitPrice)
self.LimitOrder(symbol, quantity, limitPrice)
def is_valid_order_size(self, crypto, quantity):
return abs(crypto.Price * quantity) > crypto.SymbolProperties.MinimumOrderSize