| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
# region imports
from AlgorithmImports import *
# endregion
class FredDffTest(QCAlgorithm):
def Initialize(self):
self.first = True
self.start_date = datetime.strptime(self.GetParameter('start_date'), '%Y-%m-%d')
self.SetStartDate(self.start_date.year, self.start_date.month, self.start_date.day)
self.SetCash(float(self.GetParameter('initial_cash')))
self.fred_api_key = self.GetParameter('fred_api_key')
# Works fine but obviously doesn't call via OnData, just loads a dictionary
#self.dff1 = DailyFedFundRates(self, self.start_date).Read()
#self.Log(self.dff1)
# Never calls the DailyFedFundRates2.Reader method? Why not?
# I also need to be able to pass in an api key and a start date. How?
self.dff2 = self.AddData(DailyFedFundRates2, "DailyFedFundRates2", Resolution.Daily).Symbol
self.SetBenchmark(self.dff2)
# I don't want to call History() - I want my data to appear in the OnData method, right?
customDataHistory = self.History(DailyFedFundRates2, "DailyFedFundRates2", timedelta(365))
self.Log(customDataHistory)
def OnData(self, data: Slice):
self.Log(data)
return
######################################################################################
# The DFF observations look like this:
# {
# "count" : 1,
# "file_type" : "json",
# "limit" : 100000,
# "observation_end" : "9999-12-31",
# "observation_start" : "2022-09-01",
# "observations" : [
# {
# "date" : "2022-09-01",
# "realtime_end" : "2022-09-06",
# "realtime_start" : "2022-09-06",
# "value" : "2.33"
# }
# ],
# "offset" : 0,
# "order_by" : "observation_date",
# "output_type" : 1,
# "realtime_end" : "2022-09-06",
# "realtime_start" : "2022-09-06",
# "sort_order" : "asc",
# "units" : "lin"
# }
######################################################################################
# Proof that the outside data is available.
class DailyFedFundRates():
def __init__(self, algorithm: QCAlgorithm, start_date):
self.Algorithm = algorithm
formatted_start_date = start_date.strftime('%Y-%m-%d')
url = f"https://api.stlouisfed.org/fred/series/observations?series_id=DFF&api_key={self.Algorithm.fred_api_key}&file_type=json&observation_start={formatted_start_date}"
self.Content = self.Algorithm.Download(url)
def Read(self):
if not (self.Content.strip()):
raise Exception(f'Cannot read FRED DFF data. Aborting.' )
datapoints = {}
jsobj = json.loads(self.Content)
for item in jsobj["observations"]:
try:
# datapoints[ item["date"] ] = {'date': item["date"], 'value': item["value"]}
datapoints[ item["date"] ] = float(item["value"])
except ValueError:
pass
if (len(datapoints) == 0):
raise Exception(f'Cannot parse FRED DFF data No valid observations found. Aborting.')
return datapoints
# Doesn't work.
# - Never calls the Reader method, even when supplied with a valid key. Why?
# - need to find a way to pass in the API key and not hard code it in the GetSource call.
# - need to find a way to pass in the start date and not hard code it in the GetSource call.
#
class DailyFedFundRates2(PythonData):
def GetSource(self, config, date, isLiveMode):
source = f"https://api.stlouisfed.org/fred/series/observations?series_id=DFF&api_key=REDACTED&file_type=json&observation_start=2022-01-01"
return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile, FileFormat.UnfoldingCollection)
def Reader(self, config, line, date, isLiveMode):
if not (self.line.strip()):
raise Exception(f'Cannot read FRED DFF data. Aborting.' )
datapoints = []
jsobj = json.loads(self.Content)
for item in jsobj["observations"]:
try:
index = DailyFedFundRates2()
index.Symbol = config.symbol
index.Time = datetime.strptime( item["Date"], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index.Value = float(item["value"])
datapoints.append(index)
except ValueError:
pass
if (len(datapoints) == 0):
raise Exception(f'Cannot parse FRED DFF data No valid observations found. Aborting.')
# I'm guessing I can't return a list but it doesn't matter since we never get here anyway.
return datapoints