| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -3.413 Tracking Error 0.154 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
# region imports
from AlgorithmImports import *
# endregion
class CalmMagentaAlbatross(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2022, 8, 1) # Set Start Date
self.SetEndDate(2022, 8, 5)
self.SetCash(100000) # Set Strategy Cash
self.SetTimeZone('UTC')
self.custom_symbol = self.AddData(CustomData, "BTCUSDTC", Resolution.Minute, timeZone=TimeZones.Utc).Symbol
self.symbol = self.AddCrypto('BTCUSDT', Resolution.Minute, market=Market.Binance).Symbol
self.debug_df = pd.DataFrame(columns=['Custom Data Close', 'QC AddCrypto Close'])
global algo
algo = self
def OnData(self, data: Slice):
#if self.Time.hour == 0 and self.Time.minute == 0:
# self.Log("{}: {}".format(self.Time, data['BTCUSDTC'].Close))
if (self.Time.hour == 0 and self.Time.minute == 0) or (self.Time.hour == 23 and self.Time.minute == 59):
# Adding Custom Data and QC Data Class data.
custom_close = data["BTCUSDTC"].Close if data.ContainsKey("BTCUSDTC") else np.nan
qc_close = data.Bars[self.symbol].Close if data.ContainsKey(self.symbol) else np.nan
self.debug_df.loc[self.Time] = [custom_close, qc_close]
def OnEndOfAlgorithm(self):
self.Log("Time: Custom Data Close, QC AddCrypto Close")
for i, row in self.debug_df.iterrows():
self.Log("{}: {}, {}".format(row.name, row['Custom Data Close'], row['QC AddCrypto Close']))
class CustomData(PythonData):
""" Custom Data Class. """
def GetSource(self,
config: SubscriptionDataConfig,
date: datetime,
isLive: bool) -> SubscriptionDataSource:
file_date = date.strftime('%Y%m%d')
file_path = "crypto/binance/minute/{}/{}_trade.zip#{}_{}_minute_trade.csv".format(
'btcusdt', file_date,
file_date, 'btcusdt'
)
source = os.path.join(Globals.DataFolder, file_path)
return SubscriptionDataSource(source, SubscriptionTransportMedium.LocalFile, FileFormat.Csv)
def Reader(self,
config: SubscriptionDataConfig,
line: str,
date: datetime,
isLive: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()):
return None
data = CustomData()
data.Symbol = config.Symbol
try:
# Example File Format:
# Timestamp (milliseconds) Open High Low Close
# 60000 7792.9 7799.9 7722.65 7748.7
data_line = line.split(',')
data_time = timedelta(milliseconds=int(data_line[0]))
data.Period = timedelta(minutes=1)
# Both Time and EndTime end up being the EndTime value
data.Time = date + data_time
data.EndTime = date + data_time + data.Period
data.Value = float(data_line[4])
data['Open'] = float(data_line[1])
data['High'] = float(data_line[2])
data['Low'] = float(data_line[3])
data['Close'] = float(data_line[4])
data['Volume'] = float(data_line[5])
if data.EndTime.hour == 0 and data.EndTime.minute == 0:
algo.Log("Time of data: {}".format(
timedelta(milliseconds=int(line.split(',')[0]))))
algo.Log("Data.Time, Data.EndTime, Close, OHLV: {}".format((
data.Time, data.EndTime, data["Close"], data["Open"],
data["High"], data["Low"], data["Volume"])))
except Exception as e:
return None
return data
def DataTimeZone(self):
'''
Select the time zone of your data here.
Defaults to New York timezone if no implementation is provided.
'''
return TimeZones.Utc
def DefaultResolution(self):
'''
Sets the default resolution of this data source.
'''
return Resolution.Minute