| Overall Statistics |
|
Total Trades 54041 Average Win 0.03% Average Loss -0.01% Compounding Annual Return 24.771% Drawdown 13.600% Expectancy 0.277 Net Profit 204.419% Sharpe Ratio 1.497 Probabilistic Sharpe Ratio 81.571% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 2.00 Alpha 0 Beta 0 Annual Standard Deviation 0.141 Annual Variance 0.02 Information Ratio 1.497 Tracking Error 0.141 Treynor Ratio 0 Total Fees $215289.56 |
BrokerageSupportedSecurities = {
"AlpacaBrokerageModel": [SecurityType.Equity],
"AlphaStreamsBrokerageModel": [SecurityType.Equity, SecurityType.Forex, SecurityType.Crypto, SecurityType.Future, SecurityType.Option, SecurityType.Cfd, SecurityType.Base],
"BitfinexBrokerageModel": [SecurityType.Crypto],
"DefaultBrokerageModel": [SecurityType.Equity, SecurityType.Forex, SecurityType.Crypto, SecurityType.Future, SecurityType.Option, SecurityType.Cfd, SecurityType.Base],
"FxcmBrokerageModel": [SecurityType.Forex, SecurityType.Cfd],
"GDAXBrokerageModel": [SecurityType.Crypto],
"InteractiveBrokersBrokerageModel": [SecurityType.Equity, SecurityType.Forex, SecurityType.Future, SecurityType.Option],
"OandaBrokerageModel": [SecurityType.Forex, SecurityType.Cfd],
"TradierBrokerageModel": [SecurityType.Equity]
}
def BrokerageErrorMessage(symbol, brokerage):
ErrorMessage = {
"AlpacaBrokerageModel": f"Alpaca Brokerage doesn't support trading {symbol.Value}.",
"AlphaStreamsBrokerageModel": f"Alpha Streams Brokerage doesn't support data for {symbol.Value}.",
"BitfinexBrokerageModel": f"Bitfinex Brokerage doesn't support trading {symbol.Value}.",
"DefaultBrokerageModel": f"Default Brokerage doesn't support data for {symbol.Value}.",
"FxcmBrokerageModel": f"FXCM Brokerage doesn't support trading {symbol.Value}.",
"GDAXBrokerageModel": f"GDAX Brokerage doesn't support trading {symbol.Value}.",
"InteractiveBrokersBrokerageModel": f"Interactive Brokers doesn't support trading {symbol.Value}.",
"OandaBrokerageModel": f"Oanda Brokerage doesn't support trading {symbol.Value}.",
"TradierBrokerageModel": f"Tradier Brokerage doesn't support trading {symbol.Value}."
}
return ErrorMessage[brokerage]from AlphaStreamsSocket import *
from AlphaStreamsAlphaModel import *
from AlphaStream.AlphaStreamClient import *
class AlphaStreamsRunnerAlgorithm(QCAlgorithm):
''' Basic template QC Algorithm to backtest or trade live using Alpha insights '''
def Initialize(self):
# Set the brokerage model and account settings for Financial Advisor accounts
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
# self.SetBrokerageModel(InteractiveBrokersBrokerageModel())
# self.UniverseSettings.Leverage = 2.2
# self.DefaultOrderProperties = InteractiveBrokersOrderProperties()
# self.DefaultOrderProperties.Account = ""
# self.DefaultOrderProperties.FaGroup = ""
# self.DefaultOrderProperties.FaMethod = ""
# self.DefaultOrderProperties.FaPercentage = ""
# self.DefaultOrderProperties.FaProfile = ""
# Set AlphaStream ID and API token
clientId = f'{self.GetParameter("clientId")}'
clientToken = f'{self.GetParameter("clientToken")}'
client = AlphaStreamClient(clientId, clientToken)
self.client = client
# # Create Alpha model(s)
self.alphaIds = ["175bdd1036af614cfdc74dba9", "065fa592191da79dda29c8e37", "6d59e4088ef09a0aa316349dc", "083015f9d0b166d78ca7a71f7"]
# # Build a dictionary containing the credentials necessary to stream Insights live
RMQUserName = f'{self.GetParameter("RMQUserName")}'
RMQPassword = f'{self.GetParameter("RMQPassword")}'
RMQHostName = f'{self.GetParameter("RMQHostName")}'
RMQVirtualHost = f'{self.GetParameter("RMQVirtualHost")}'
RMQExchange = f'{self.GetParameter("RMQExchange")}'
streamClientInformation = {'UserName':RMQUserName, 'Password':RMQPassword, 'HostName':RMQHostName,
'VirtualHost':RMQVirtualHost, 'ExchangeName':RMQExchange, 'Port':5672}
self.alphaModels = {id: AlphaStreamsAlphaModel(self, id, client) for id in self.alphaIds}
# Add the alpha model(s) -- comma-separated arguments
models = list(self.alphaModels.values())
self.AddAlpha(CompositeAlphaModel(models[0], models[1], models[2], models[3]))
# Set Start Date and End Date based on Alpha models
# self.SetStartDate(min([x.StartDate for x in self.alphaModels.values()])) # Set Start Date
self.SetStartDate(2015, 4, 1)
self.SetEndDate(max([x.EndDate for x in self.alphaModels.values()])) # Set End Date
self.SetCash(1000000) # Set Strategy Cash
# Initialize security prices using most recent historical data
self.SetSecurityInitializer(self.HistoricalSecurityInitializer)
# Use null benchmark to avoid brokerage/data conflicts
self.SetBenchmark(lambda x: 0)
# Set the portfolio construction model to turn Insights into Portfolio Targets
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None))
# Set the execution model to turn Portfolio Targets into orders
self.SetExecution(ImmediateExecutionModel())
# Set Universe Selection
self.SetUniverseSelection(CoarseFundamentalUniverseSelectionModel(self.CoarseSelectionFunction))
# If trading live, stream the Insights
if self.LiveMode:
for id, model in self.alphaModels.items():
model.EnsureState(client)
self.socket = AlphaStreamsSocket(self, client, streamClientInformation, self.alphaIds)
self.first = True
def CoarseSelectionFunction(self, coarse):
if self.first:
symbols = []
for model in list(self.alphaModels.values()):
symbols += model.symbols
symbols = list(set(symbols))
self.first = False
return symbols
else:
return Universe.Unchanged
def HistoricalSecurityInitializer(self, security):
if security.IsCustomData():
return
bar = self.GetLastKnownPrice(security)
security.SetMarketPrice(bar)
def OnEndOfAlgorithm(self):
if self.LiveMode:
for i in self.alphaIds:
try:
self.client.Unsubscribe(i)
self.Log(f'Unsubscribed from {i}')
except:
self.Log(f'Unable to unsubscribe from {i}. Please check your Institution page to check and manage your subscriptions.')
def OnOrderEvent(self, orderEvent):
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if orderEvent.Status == OrderStatus.Filled:
self.Log("{0}: {1}: {2}".format(self.Time, order.Type, orderEvent))class UnsubscribeRequest(object):
""" Send subscription stop request for an Alpha """
def __init__(self, alphaId):
self.Id = str(alphaId)
self.Endpoint = "alpha/" + self.Id + "/unsubscribe"
def GetPayload(self):
payload = {
"id" : self.Id
}
return payloadclass SubscribeRequest(object):
""" Send subscription request for an Alpha """
def __init__(self, alphaId, exclusive=False):
self.Id = str(alphaId)
self.Endpoint = "alpha/" + self.Id + "/subscribe"
self.Exclusive = exclusive
def GetPayload(self):
payload = {
"id" : self.Id,
"exclusive" : self.Exclusive
}
return payloadfrom RabbitMQ.Client import *
from RabbitMQ.Client.Events import *
from System import *
from System import String, Object
from System.Text import *
from System.Collections.Generic import Dictionary
from QuantConnect import *
from QuantConnect.Data.Custom import *
from AlphaStream.AlphaStreamInsight import *
import json
from datetime import datetime
class AlphaStreamsSocket:
'''
Class to create and run threads for each Alpha being subscribed to. It creates threads,
opens connections to the streaming Insights, and passes them on to the Alpha Model
for each Alpha ID.
'''
def __init__(self, algorithm, client, streamClientInformation, alphaIds):
'''
Args:
algorithm: QC Algorithm allows for logging and access to algorithm class
client: Alpha Stream Client instance
streamClientInformation: dictionary holding credentials necessary to establish the connection
'''
self.algorithm = algorithm
# Added null data source to ensure Update() method fires every second
self.algorithm.AddData(NullData, 'NullData', Resolution.Second)
for alphaId in alphaIds:
try:
client.Subscribe(alphaId)
self.algorithm.Log(f'Subscribing to {alphaId}')
except:
client.Unsubscribe(alphaId)
client.Subscribe(alphaId)
self.algorithm.Log(f'{datetime.now()} :: Creating RMQ factory')
# Setting factory properties
factory = ConnectionFactory()
factory.HostName = streamClientInformation.get('HostName', None)
factory.Port = streamClientInformation.get('Port', None)
factory.UserName = streamClientInformation.get('UserName', None)
factory.Password = streamClientInformation.get('Password', None)
factory.VirtualHost = streamClientInformation.get('VirtualHost', None)
factory.AutomaticRecoveryEnabled = True
factory.RequestedConnectionTimeout = 5000
connection = factory.CreateConnection()
connection.ConnectionBlocked += self.OnConnectionBlocked
connection.ConnectionShutdown += self.OnConnectionShutdown
connection.ConnectionUnblocked += self.OnConnectionUnblocked
connection.CallbackException += self.OnCallbackException
# Open channel
self.algorithm.Log(f'{datetime.now()} :: Opening RMQ channel')
channel = connection.CreateModel()
# Create consumer to receive messages
self.algorithm.Log(f'{datetime.now()} :: Creating RMQ consumer')
consumer = EventingBasicConsumer(channel)
consumer.Received += self.ConsumerOnReceived
dict1 = Dictionary[String, Object]()
dict1["x-message-ttl"] = 60000
self.algorithm.Log(f'{datetime.now()} :: Creating RMQ Queue')
channel.QueueDeclare("AlphaStreamsRunner", False, False, True, dict1)
channel.QueueBind("AlphaStreamsRunner", streamClientInformation.get("ExchangeName", None), f'*', dict1)
channel.BasicConsume("AlphaStreamsRunner", True, "", False, False, dict1, consumer)
self.algorithm.Log(f'{datetime.now()} :: RMQ connection established')
def OnConnectionBlocked(self, sender, args):
self.algorithm.Log(f"RMQHelper.OnConnectionBlocked(): Connection is blocked: {args.Reason}")
def OnConnectionShutdown(self, sender, args):
self.algorithm.Log(f"RMQHelper.OnConnectionShutdown(): Connection is shutdown: {args.Reason}")
def OnConnectionUnblocked(self, sender, args):
self.algorithm.Log(f"RMQHelper.OnConnectionUnblocked(): Connection is unblocked: {args.Reason}")
def OnCallbackException(self, sender, args):
self.algorithm.Log(f"RMQHelper.OnCallbackException(): Callback exception: {args.Reason}")
def ConsumerOnReceived(self, sender, e):
''' Consumes and processes the messages received via the channel '''
try:
stringDictionary = Encoding.UTF8.GetString(e.Body)
packet = json.loads(stringDictionary)
self.algorithm.Log(f'{datetime.now()} :: Routing Key: {e.get_RoutingKey()} :: Exchange: {e.get_Exchange()} :: Consumer tag: {e.get_ConsumerTag()} :: Delivery tag: {e.get_DeliveryTag()}')
messageType = packet.get("eType", None)
alphaId = packet.get("alpha-id", None)
model = self.algorithm.alphaModels.get(alphaId, None)
if model is None:
# raise Exception (f'Message received from different Alpha: {alphaId}. Check Queue bindings, shutting down algorithm.')
return
if messageType == "AlphaResult":
insights = packet.get("insights", [])
for insight in insights:
self.algorithm.Log(f'{self.algorithm.Time} :: {alphaId} received Insight. Converting to framework Insight')
asi = AlphaStreamInsight(insight)
model.Listener(model.AlphaInsightToFrameworkInsight(asi, alphaId)) # Send streamed Insights back into Alpha model
elif messageType == "AlphaHeartbeat":
self.algorithm.Log(f'Model: {model.Id}')
algorithmId = packet.get("algorithm-id", None)
machineTime = packet.get("machine-time", None)
self.algorithm.Log(f'Heartbeat :: alphaId: {model.Id} -- algo ID: {algorithmId} :: {machineTime}\n')
else:
raise Exception(f"Invalid type: {messageType}")
except Exception as err:
self.algorithm.Log(f"Failed parsing deliver event: {err}")import threading
from itertools import groupby
from datetime import timedelta, datetime
from BrokerageSupportedSecurities import *
class AlphaStreamsAlphaModel(AlphaModel):
'''
Alpha Model that backtests and streams live Insights. Backtest Insights are collected in batch
and then iterated over. Live Insights are streamed and emitted in real-time.
Arguments:
algorithm: QCAlgorithm that is being run
alphaId: ID of the Alpha being tested
client: AlphaStreamsClient to fetch Insights
'''
def __init__(self, algorithm, alphaId, client):
self.StartDate = datetime(1900, 1, 1)
self.EndDate = datetime(2050, 1, 1)
self.Id = alphaId
self.lock = threading.Lock()
self.liveInsightCollection = []
self.backtestInsightCollection = {}
self.backtestInsightIndex = 0
self.algorithm = algorithm
self.supportedSecurities = BrokerageSupportedSecurities[str(algorithm.BrokerageModel)[24:]]
self.dataResolution = {}
self.canExecute = []
self.symbols = []
if not algorithm.LiveMode:
insights = []
hasData = True
start = 0
# Fetch all Insights in the Alpha's backtest
while hasData:
responseInsights = client.GetAlphaInsights(alphaId, start) # Fetch alpha Insights (backtest and live)
insights += [self.AlphaInsightToFrameworkInsight(x, alphaId) for x in responseInsights if
x.Source in ['in sample', 'live trading']]
hasData = len(responseInsights)
start += 100
# Raise exception if there are no Insights
if len(insights) == 0:
# raise Exception(f'No insights from alpha {alphaId}')
algorithm.Log(f'No insights from alpha {alphaId}')
self.backtestInsightKeys = []
self.StartDate = algorithm.Time
self.EndDate = algorithm.Time
self.dataResolution = Resolution.Minute
return
# Group insights by time created
self.backtestInsightCollection = {k: list(g) for k, g in
groupby(sorted(insights, key=lambda x: x.GeneratedTimeUtc),
key=lambda x: x.GeneratedTimeUtc)}
self.backtestInsightKeys = list(self.backtestInsightCollection.keys()) # Create list of dictionary keys
self.StartDate = self.backtestInsightKeys[0] # Get date of first Insight
self.EndDate = self.backtestInsightKeys[-1] + list(self.backtestInsightCollection.values())[-1][ 0].Period # Get date of last Insight
self.dataResolution = {x.Symbol: Resolution.Minute if x.GeneratedTimeUtc.second == 0 else Resolution.Second for x in list(set([item for sublist in list(self.backtestInsightCollection.values()) for item in sublist]))} # List of data resolution for each symbol
self.symbols = [x.Symbol for x in insights]
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The insights
'''
insights = []
# Fetch Insights to be emitted
if algorithm.LiveMode:
# Lock thread to modify insight collection
self.lock.acquire()
insights = [self.liveInsightCollection.pop(self.liveInsightCollection.index(x)) for x in self.liveInsightCollection if (x.GeneratedTimeUtc <= algorithm.UtcTime.replace(tzinfo=None)) and (algorithm.ActiveSecurities.ContainsKey(x.Symbol))]
self.lock.release()
if len(self.backtestInsightKeys) == 0:
return []
else:
if self.backtestInsightIndex == len(self.backtestInsightKeys):
return []
algoTime = algorithm.UtcTime.replace(tzinfo=None)
while algoTime >= self.backtestInsightKeys[self.backtestInsightIndex]:
if (self.backtestInsightCollection[self.backtestInsightKeys[self.backtestInsightIndex]] is None) or (insights is None):
x = 5
insights += self.backtestInsightCollection[self.backtestInsightKeys[self.backtestInsightIndex]]
if self.backtestInsightIndex is None:
x = 5
self.backtestInsightIndex += 1
if self.backtestInsightIndex == len(self.backtestInsightKeys):
break
for i in insights:
if i.CloseTimeUtc <= algoTime:
insights.pop(insights.index(i))
algorithm.Log(f'{algorithm.Time} :: In {self.Id} Update(), emitting Insight: {i.ToString()}')
return insights
def Listener(self, insight):
''' Called in the thread when messages are received via Rabbit MQ. It checks if
data needs to be added for the new Insight and then adds the Insight to the
Insight collection.
Args:
insight: Insight streamed from the live Alpha
'''
# Check that the security type is supported by the brokerage model, else kill algorithm
self.EnsureExecution(insight.Symbol)
# Add data for the Insight Symbol if necessary
self.EnsureData(insight.Symbol)
# Lock thread to modify insight collection
self.lock.acquire()
self.liveInsightCollection += [insight]
self.lock.release()
self.algorithm.Log(f'{self.algorithm.Time} :: In {self.Id} Listener(), adding Insight: {insight.ToString()}')
def EnsureState(self, client):
''' Called in QCAlgorithm Initialize() for each Alpha model. Checks to see if there are any Insights that are
currently live and creates Insights to mirror them.
Args:
client: AlphaStreamClient instance
'''
insights = []
hasData = True
alpha = client.GetAlphaById(self.Id)
start = alpha.InSampleInsights + alpha.LiveTradingInsights + alpha.OutOfSampleInsights - 100
# Fetch most recent Insights to see if there are any live trading Insights that haven't expired yet
while hasData:
responseInsights = client.GetAlphaInsights(self.Id, start)[::-1]
# In case our initial "start" value is too large and won't fetch any Insights
if len(responseInsights) < 1:
if start < 0:
# Raise exception if we haven't found any Insights at all
raise Exception(f"No Insights found for {self.Id} while trying to ensure state")
start -= 100
continue
# Filter for Insights whose CloseTime is > algorithm time
liveInsights = [self.AlphaInsightToFrameworkInsight(x, self.Id) for x in responseInsights if (x.Source == 'live trading') and (self.algorithm.UtcTime.replace(tzinfo=None) < x.CloseTime)]
insights += liveInsights
hasData = len(liveInsights)
start -= 100
if len(insights) > 0:
# Data check for all Insights
for insight in insights:
# Check that the security type is supported by the brokerage model
self.EnsureExecution(insight.Symbol)
# Add data for the Insight Symbol if necessary
self.EnsureData(insight.Symbol)
# Lock thread and modify insight collection
self.lock.acquire()
self.liveInsightCollection += insights
self.lock.release()
self.algorithm.Log(f'{self.algorithm.Time} :: In {self.Id} Alpha Model, adding currently live insights')
def EnsureExecution(self, symbol):
''' Called from Listener() to see if the security type of the Insight can be traded on the selected brokerage
Args:
insight: Framework Insight being streamed in
Returns: True if brokerage supports the security type, false otherwise
'''
if symbol in self.canExecute:
return
if symbol.SecurityType not in self.supportedSecurities:
self.algorithm.Quit(f'{BrokerageErrorMessage(symbol, str(self.algorithm.BrokerageModel)[24:])}')
else:
self.canExecute += [symbol]
def EnsureData(self, symbol):
''' Called from Listener method to see if data needs to be added for new Insights
Args:
symbol: Symbol of the asset underlying the Insight
'''
if not self.algorithm.Securities.ContainsKey(symbol):
symbolResolution = Resolution.Second
if not self.algorithm.LiveMode:
symbolResolution = self.dataResolution[symbol]
self.algorithm.AddSecurity(symbol.SecurityType, symbol.Value, symbolResolution, symbol.ID.Market, True, 0, False)
# Converts AlphaStream Insight types to QC Insight types
def AlphaInsightToFrameworkInsight(self, alphaInsight, alphaId = None):
''' Converts Alpha Stream Insights to QC Algorithm Framework Insights, which is the format
required.
Args:
alphaInsight: Alpha Streams Insight (live or backtest)
Returns:
insight: QC Algorithm Framework Insight
'''
if alphaInsight.Direction.lower() == 'up':
direction = InsightDirection.Up
elif alphaInsight.Direction.lower() == 'down':
direction = InsightDirection.Down
else:
direction = InsightDirection.Flat
## Try using insight ticker, not symbol
## plan B -- coarse universe is created to use symbols, not tickers
symbol = self.algorithm.Symbol(alphaInsight.Symbol)
insight = Insight(symbol, timedelta(seconds=alphaInsight.Period), InsightType.Price,
direction, alphaInsight.Magnitude, alphaInsight.Confidence,
alphaId if alphaId is not None else alphaInsight.SourceModel,
alphaInsight.Weight)
insight.GeneratedTimeUtc = alphaInsight.CreatedTime if alphaInsight.CreatedTime is not None else alphaInsight.GeneratedTimeUtc
insight.CloseTimeUtc = insight.GeneratedTimeUtc + insight.Period
self.algorithm.Log(f'{alphaInsight.Symbol} :: AS Insight ID {alphaInsight.Id} ---> Framework Insight ID {insight.Id} :: Alpha ID: {self.Id}')
return insight# Your New Python File
import json
import requests
import hashlib
import time
import base64
import os
import pandas as pd
from datetime import datetime
from AlphaStreamInsight import *
from GetAlphaInsightsRequest import *
from SubscribeRequest import *
from UnsubscribeRequest import *
from GetAlphaByIdRequest import *
from AlphaStreamsAlpha import *
class AlphaStreamClient(object):
"""Alpha Streams Client is the REST executor and client """
def __init__(self, *args, **kwargs):
self.__clientId = str(kwargs.pop('clientId', args[0]))
self.__token = str(kwargs.pop('token', args[1]))
self.__url = 'https://www.quantconnect.com/api/v2/'
def Execute(self, request, debug=False):
""" Execute an authenticated request to the Alpha Streams API """
# Create authenticated timestamped token.
timestamp = str(int(time.time()))
# Attach timestamp to token for increasing token randomness
timeStampedToken = self.__token + ':' + timestamp
# Hash token for transport
apiToken = hashlib.sha256(timeStampedToken.encode('utf-8')).hexdigest()
# Attach in headers for basic authentication.
authentication = "{}:{}".format(self.__clientId, apiToken)
base64string = base64.b64encode(authentication.encode('utf-8'))
headers = {
'Authorization': 'Basic %s' % base64string.decode('ascii'),
'Timestamp': timestamp
}
# URL endpoint specified in request
url = self.__url + request.Endpoint
# Encode the request in parameters of URL. Most of API is GET.
result = requests.get(url, params=request.GetPayload(), headers=headers)
if debug:
print(result.url)
self.PrettyPrint(result)
# Convert to object for parsing.
try:
json = result.json()
except Exception as err:
messages = []
messages.append(
'API returned a result which cannot be parsed into JSON. Please inspect the raw result below:')
messages.append(result.text)
json = {'success': False, 'messages': messages}
if type(json) is not list:
if ('success' in json.keys()) and ('messages' in json.keys()):
if json['success'] is False:
raise Exception(
'There was an exception processing your request: {}'.format(", ".join(json["messages"]), json))
elif ('success' in json.keys()):
if json['success'] is False:
raise Exception(
'There was an exception processing your request: {}'.format(json))
else:
raise Exception(
'There was an exception processing your request: {}'.format(json))
return json
def GetAlphaById(self, alphaId):
""" Request details about a specific alpha """
request = GetAlphaByIdRequest(alphaId)
result = self.Execute(request)
return AlphaStreamsAlpha(result)
def GetAuthorById(self, authorId):
""" Get information about a specific author """
request = GetAuthorByIdRequest(authorId)
result = self.Execute(request)
return Author(result)
def GetAlphaInsights(self, alphaId, start=0):
""" Get the insights for a specific alpha """
request = GetAlphaInsightsRequest(alphaId, start)
result = self.Execute(request)
insights = []
for i in result:
insights.append(AlphaStreamInsight(i))
return insights
def GetAlphaQuotePrices(self, alphaId, start=0):
""" Get the prices for a specific alpha """
request = GetAlphaPricesRequest(alphaId, start)
result = self.Execute(request)
prices = []
for i in result:
prices.append(Price(i))
return prices
def GetAlphaErrors(self, alphaId, start=0):
""" Get the errors for a specific alpha """
request = GetAlphaErrorsRequest(alphaId, start)
result = self.Execute(request)
errors = []
for i in result:
errors.append(RuntimeError(i))
return errors
def GetAlphaEquityCurve(self, alphaId, date_format = 'date'):
""" Get the pandas DataFrame with the equity curve for a specific alpha """
request = GetAlphaEquityCurveRequest(alphaId, date_format)
result = self.Execute(request)
for i in result:
if isinstance(i[0], int):
i[0] = datetime.utcfromtimestamp(i[0])
else:
i[0] = datetime.strptime(i[0], "%d/%m/%Y %H:%M:%S")
return pd.DataFrame.from_records(result, index=['time'], columns=['time', 'equity', 'sample'])
def GetAlphaList(self):
""" Get list of all available alpha Ids """
request = GetAlphaListRequest()
return self.Execute(request)
def SearchAlphas(self, *args, **kwargs):
""" Applying the search criteria supplied; find matching alphas and return an array of alpha objects """
criteria = SearchAlphasRequest(kwargs=kwargs)
result = self.Execute(criteria)
alphas = []
for a in result:
alphas.append(Alpha(a))
return alphas
def SearchAuthors(self, *args, **kwargs):
""" Applying the search criteria supplied; find matching authors and return an array of author objects """
criteria = SearchAuthorsRequest(kwargs=kwargs)
result = self.Execute(criteria)
authors = []
for ath in result:
authors.append(Author(ath))
return authors
def Subscribe(self, alphaId):
""" Subscribe to an alpha """
request = SubscribeRequest(alphaId)
result = self.Execute(request)
return result['success']
def Unsubscribe(self, alphaId):
""" Unsubscribe from an alpha """
request = UnsubscribeRequest(alphaId)
result = self.Execute(request)
return result['success']
def CreateConversation(self, alphaId, email, subject, message, cc = ''):
""" Create a conversation thread. """
request = CreateConversationRequest(alphaId, email, subject, message, cc)
result = self.Execute(request)
if result['success']:
return 'Conversation thread was successfully created.'
else:
return os.linesep.join(result['messages'])
def ReadConversation(self, alphaId):
""" Read a conversation thread to confirm receipt and return list of Conversation objects. """
request = CreateReadRequest(alphaId)
result = self.Execute(request)
conversations = [Conversation(i) for i in result]
return conversations
def CreateBid(self, *args, **kwargs):
""" Create a bid price request.
Args:
alphaId: Unique id hash of an Alpha published to the marketplace.
exclusive: Bid for the exclusive price (optional if shared is defined).
shared: Bid for the shared price (optional if exclusive is defined).
good_until: Expiration time of the bid."""
request = CreateBidPriceRequest(*args, **kwargs)
result = self.Execute(request)
if result['success']:
return 'Bid price was successfully created.'
else:
return os.linesep.join(result['messages'])
def PrettyPrint(self, result):
""" Print out a nice formatted version of the request """
print ('')
try:
parsed = json.loads(result.text)
print (json.dumps(parsed, indent=4, sort_keys=True))
except Exception as err:
print ('Fall back error (text print)')
print ('')
print (result.text)
print ('')from datetime import datetime
class AlphaStreamInsight:
"""Individual prediction/insight generated by an Alpha in the QuantConnect Alpha Streams market"""
def __init__(self, json):
self.Id = json['id']
self.Type = json.get('type', None)
self.Direction = json.get('direction', None)
self.Period = json.get('period', None)
self.CreatedTime = datetime.utcfromtimestamp(json['created-time']) if 'created-time' in json else None
self.GeneratedTimeUtc = datetime.utcfromtimestamp(json['generated-time']) if 'generated-time' in json else None
self.CloseTime = datetime.utcfromtimestamp(json['close-time']) if 'close-time' in json else None
self.Magnitude = json.get('magnitude', None)
self.Confidence = json.get('confidence', None)
self.SourceModel = json.get('source-model', None)
self.Group = json.get('group', None)
self.Source = json.get('source', None) # In sample
self.ReferenceValue = json.get('reference-value', None)
self.EstimatedValue = json.get('estimated-value', None)
self.Symbol = json.get('symbol', None)
self.Ticker = json.get('ticker', None)
self.Invalid = json.get('invalid', None)
self.ScoreFinal = json.get('score-final', False)
self.Weight = json.get('weight', None)
def __repr__(self):
return f'{self.CreatedTime} Alpha {self.Source} {self.Type} insight for {self.Ticker:<10} going {self.Direction} over the next {self.Period}s'class GetAlphaInsightsRequest:
""" Fetch an alpha insights, starting from `start` for a maximum of 1000 insights """
def __init__(self, alphaId, start = 0):
self.Id = alphaId
self.Start = start
self.Endpoint = "alpha/{}/insights".format(alphaId)
def GetPayload(self):
payload = {
"id" : self.Id,
"start" : self.Start
}
return payloadclass GetAlphaByIdRequest(object):
""" Request a specific alpha with a matching Alpha Id """
def __init__(self, alphaId):
self.Id = str(alphaId)
self.Endpoint = "alpha/" + self.Id
def GetPayload(self):
payload = {
"id" : self.Id
}
return payloadclass AlphaStreamsAlpha(object):
"""Algorithm alpha model from the Alpha Streams marketplace."""
def __init__(self, json):
self.Id = json['id']
self.AssetClasses = json.get('asset-classes', None)
self.Accuracy = json.get('accuracy', None)
self.AnalysesPerformed = json.get('analyses-performed', None)
self.AuthorTrading = json.get('author-trading', False)
self.Description = json.get('description', '')
self.EstimatedDepth = json.get('estimated-depth', None)
self.ExclusiveAvailable = json.get('exclusive-available', None)
self.ExclusiveSubscriptionFee = json.get('exclusive-subscription-fee', None)
self.EstimatedEffort = json.get('estimated-effort', None)
self.ListedTime = datetime.utcfromtimestamp(json['listed-time']) if 'listed-time' in json else None
self.Name = json.get('name', None)
self.Uniqueness = json.get('uniqueness', None)
self.SharpeRatio = json.get('sharpe-ratio', None)
self.SharedSubscriptionFee = json.get('subscription-fee', None)
self.Version = json.get('version', None)
self.Status = json.get('status', None)
self.InSampleInsights = json.get('in-sample-insights', None)
self.LiveTradingInsights = json.get('live-trading-insights', None)
self.OutOfSampleInsights = json.get('out-of-sample-insights', None)
self.Tags = json.get('tags', [])
self.Parameters = json.get('parameters', None)
self.OutOfSampleDtwDistance = json.get('out-of-sample-dtw-distance', None)
self.OutOfSampleReturnsCorrelation = json.get('out-of-sample-returns-correlation', None)
self.Trial = json.get('trial', 0)
def __repr__(self):
return f'''
Alpha Id: {self.Id}
Project: {self.Project.Name}
Sharpe Ratio: {self.SharpeRatio}
Uniqueness: {self.Uniqueness}
Exclusive Available: {self.ExclusiveAvailable}
Listed: {self.ListedTime}
Status: {self.Status}'''