February 19, 2024
We provide one-click investor event access with real time speech-to-text transcription, live audio playback controls, search, price tracking, alerts, in-transcript annotation, automated AI-insights across sentiment, topic extractions, and recently interrogation.
In this article, we demonstrate the potential of Aiera-powered chat. We’ll integrate Aiera’s data API and OpenAI’s new assistants API, leverage Aiera’s leading coverage universe (60,000+ events per year across 13,000+ equities) with OpenAI’s latest model of GPT-4 Turbo update, gpt-4–0125-preview. Our gpt-4 chat will collect earnings transcripts from Aiera and allow user’s to interrogate the transcripts. You can play around with an Aiera-powered GPT via OpenAI’s marketplace or here https://chat.openai.com/g/g-FTB0Gd6uU-aiera.
To build our application, we’ll use Python with Streamlit to prototype a browser app and OpenAI’s assistant API to manage GPT-4 chats. To complete this tutorial, you will require an API key and org id provided by OpenAI, available at https://platform.openai.com/api-keys and https://platform.openai.com/account/organization respectively. Additionally, you’ll require an API key distributed by Aiera. Contact sales@aiera.com for more information.
OpenAI’s assistant API simplifies the process of building chat by facilitating integration with external knowledge bases and miscellaneous tool calling. When appropriate, message processing is interrupted, giving the execution thread the opportunity to work before resuming operation. Examples of built-in tools include retrieval, for fetching document data, and the code interpreter, which is able to execute code that can do things useful things. For example: generating visualizations of data:
You can check the chat itself out here (I love this shareable chats feature): https://chat.openai.com/share/e/0d3c43dc-552e-42dc-8da0-92f61e6e6684
For the purpose of this tutorial, we’ll use the assistant objects defined in their beta Python SDK, documented here. The full spec for OpenAI’s python SDK is documented here. We’ll describe these objects later in step 2.
export OPENAI_API_KEY={your key}
export OPENAI_ORG_ID={your org id}
export AIERA_API_KEY={your aiera api key}
We’ll use these when establishing connections later on.
Create the assistant and define the tools required to fetch transcript data from Aiera. A notebook describing this process is available at https://github.com/aiera-inc/aiera-assistant/blob/main/AieraAssistant.ipynb.
import os
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
AIERA_API_KEY = os.environ["AIERA_API_KEY"]
1. get_events
2. upload_event_transcripts
The get_events action will signal our code to query events. Likewise, the upload_event_transcripts action signals our code should upload files containing the event transcript to OpenAI for subsequent retrieval. Actions infer parameters from the chat context. For example, if I ask it, “What did Mark Zuckerberg say about gen AI on the 1Q 2023 call?”, the model passes arguments:
{
"bloomberg_ticker": "FB:US",
"start_date": "2022-11-01",
"end_date":"2024-01-29",
"call_type": "earnings"
}
Programmatically, we define these actions below in json. The parameters describe a small subset of endpoints available via Aiera’s REST offerings.
get_events = {
"name": "get_events",
"description": "Retrieves a events that match the parameters provided.",
"parameters": {
"type": "object",
"properties": {
"modified_since": {
"type": "string",
"description": "Returns events modified since this date."
},
"bloomberg_ticker": {
"type": "string",
"description": "Filter events to 1+ bloomberg tickers (comma-separated) including the country code."
},
"event_type": {
"type": "string",
"enum": [
"earnings",
"earnings_release",
"presentation",
"investor_meeting",
"special_situation"
],
"description": "Filter by earnings, earnings_release, presentation, investor_meeting, special_situation (comma-separated)"
},
"start_date": {
"type": "string",
"description": "Fetch events on or after this date (defaults to 2 weeks ago)"
},
"end_date": {
"type": "string",
"description": "Fetch events on or before this date (defaults to 2 weeks from now)"
},
"required": [
"event_type",
"start_date"
]
}
}
upload_event_transcripts = {
"name": "upload_event_transcripts",
"description": "A function that takes in a list of event_ids and uploads the event transcripts as .json files.",
"parameters": {
"type": "object",
"properties": {
"event_ids": {
"type": "array",
"description": "The event_ids for which to fetch transcripts",
"items": {
"type": "integer"
}
}
},
"required": [
"event_ids"
]
}
The model uses the schema descriptions to inform their inferred parameters.
Now let’s describe how the assistant should behave by providing it with instructions. Experimenting with different instructions allows you to vary chat behavior. The instructions below are only an example and intended to limit the functionality of the chat to only a small set of tasks.
instructions= """You are Aiera, an expert in analyzing earnings call \
transcripts with a focus on accuracy and detail. Your role is to analyze \
earnings call events for specific companies.
Instructions for collecting events:
1. When a user requests analysis, like 'Compare KPIs across Microsoft's 2022 \
earnings calls', use the get_events operation to retrieve relevant 'earnings' \
events. Infer the Bloomberg ticker with country code from the user's context. \
Set the start_date to November of the previous year and the end_date to today \
to ensure coverage of the fiscal period. When the user asks about the most \
recent event, always ensure you use the event occurring closest to today's \
date by calling the get_events operation.
2. Apply a stringent filter on the events fiscal_year to ensure only events \
matching the fiscal year provided by the user are included when uploading \
transcripts.
3. Ask clarifying questions to determine which event to use in the case of \
ambiguity and collect new events if applicable.
4. Upload event content using the upload_event_transcripts operation.
Instructions for analysis:
Present your analysis, emphasizing key insights and metrics from the earnings \
calls.
After each task, offer further in-depth exploration or different event \
analysis, and suggest contacting sales@aiera.com for more information.
Instructions for additional questions:
Ask clarifying questions to determine which event to use in the case of \
ambiguity and collect new events if applicable.
When suggesting analysis, propose that the user explores topics for the event, \
build a swot analysis for the company, or summarize the event.
"""
Now we create our OpenAI assistant. The assistant id returned will be used by our application to create chat threads. You can check out your assistants on OpenAI’s API-console at: https://platform.openai.com/assistants. The assistants are directly in the UI. Here we do the same with the Python SDK:
client = OpenAI(
organization = OPENAI_ORG_ID,
api_key = OPENAI_API_KEY
)
assistant = client.beta.assistants.create(
name="Aiera Assistant",
instructions=instructions,
model="gpt-4-1106-preview",
tools = [{"type": "retrieval"}, # for file retrieval
{"type" : "function", "function": get_events},
{"type" : "function", "function": upload_event_transcripts}
]
)
print(assistant.id)
Add the ID of the assistant to the environment:
export OPENAI_ASSISTANT_ID={your_assistant_id}
We’ve created our assistant, now let’s build tools for managing the assistant’s messaging and functions. To do so, we’ll make use of three objects defined in OpenAI’s SDK: the thread, run, and message.
Pydantic can be used for initializing as in the github project here.
# aiera_assistant/assistant.py
# imports, many to be used l8r
import time
import re
from typing import List
import json
import os
from openai import OpenAI
from openai.types.beta.threads import ThreadMessage
from aiera_assistant.config import AieraSettings, OpenAISettings
from aiera_assistant.__init__ import ROOT_DIR
import logging
import requests
class AieraAssistant:
def __init__(self, openai_settings: OpenAISettings, aiera_settings, db_settings):
# settings for connecting to Aiera resources
self.aiera_settings = aiera_settings
# connect to client
self.client = OpenAI(
organization = openai_settings.org_id,
api_key = openai_settings.api_key
)
# id of the assistant
self.assistant_id = openai_settings.assistant_id
# load the assistant
self.assistant = self.client.beta.assistants.retrieve(self.assistant_id)
# create a thread
self.thread = self.client.beta.threads.create()
# whether we'll persist the files uploaded to openai
# or remove after execution
self.persist_files = openai_settings.persist_files
# track all files uploaded to openai
self._file_ids = []
# track file ids currently being used
self._current_file_ids = []
Now, lets implement functions to communicate with OpenAI.The submit_message
function creates a message with the role user
and passes the relevant file ids to the message.
def submit_message(self, message_content: str):
self.client.beta.threads.messages.create(
thread_id = self.thread.id,
role = "user",
content = message_content,
file_ids = self._current_file_ids
)
Once a message has been created during a run, the assistant endpoint processes the chat using the assistant’s metadata and current active thread. Below, _wait_for_run_event
handles polling until OpenAI has completed running. A run finishes with a status: completed
, failed
, requires_action
,in_progress
, expired
, cancelling
, or cancelled
.
When our model returnsrequires_action
, we run the associated function by looking at the tool.function.name
returned by the run, and it’s arguments in tool.function.arguments
. Using these values, we call the corresponding functions defined in our own code to return some other data to the conversation thread.
At present (1/29/24), files behave slightly differently because of some quirks with the way that OpenAI handles file upload and retrieval. There are a couple of documented challenges with actions and files: https://community.openai.com/t/actions-and-files-both-out-and-in/596696. In particular, we were unable to return a file as the result of a function call and use it in the same message thread and therefore the chat was unable to answer questions about the event. I’m optimistic some solutions will be found in the upcoming months, but for the time being, I’m using a work around for uploading files for use with retrieval by:
def process_messages(self) -> List[dict]:
messages = list(self.client.beta.threads.messages.list(thread_id=self.thread.id))
run = self.client.beta.threads.runs.create(
thread_id = self.thread.id,
assistant_id = self.assistant.id
)
# poll openai
run = self._wait_for_run_event(run)
while run.status == 'requires_action':
# the run describes how/what tool to cool
tools_to_call = run.required_action.submit_tool_outputs.tool_calls
# iterate over tools
for each_tool in tools_to_call:
tool_call_id = each_tool.id
function_name = each_tool.function.name
function_arg = each_tool.function.arguments
# load the function arguments the model has inferred
if function_arg is not None:
function_arg = json.loads(function_arg)
# handle get_events calls
if function_name == "get_events":
found_events = self.get_events(
**function_arg
)
# Submit events
run = self.client.beta.threads.runs.submit_tool_outputs(
thread_id = self.thread.id,
run_id = run.id,
tool_outputs = [{"tool_call_id": tool_call_id, "output": found_events}]
)
# poll openai
run = self._wait_for_run_event(run)
elif function_name == "upload_event_transcripts":
file_ids = self.upload_event_transcripts(
**function_arg
)
self._current_file_ids = file_ids
# uploads require cancel because of file handling
self.client.beta.threads.runs.cancel(
thread_id = self.thread.id,
run_id = run.id
)
# Attempt update of files
self.client.beta.threads.messages.create(
thread_id = self.thread.id,
role = "user",
content = "",
file_ids = file_ids
)
return self.process_messages()
if run.status == "completed":
logger.debug("Completed run.")
messages = self.client.beta.threads.messages.list(
thread_id = self.thread.id
)
formatted_messages = self._format_openai_messages(messages)
logger.debug("Current messages:\n%s", json.dumps(formatted_messages))
return formatted_messages
else:
logger.error("Something went wrong. Run status : %s", run.status)
def _wait_for_run_event(self, run):
i = 0
while run.status not in ["completed", "failed", "requires_action"]:
if i > 0:
time.sleep(10)
run = self.client.beta.threads.runs.retrieve(
thread_id = self.thread.id,
run_id = run.id
)
i += 1
return run
Next, we implement the functions to be used in conjunction with the tool calls.
def get_events(self, modified_since: str = None,
bloomberg_ticker: str = None,
event_type: str = None,
start_date: str = None,
end_date: str=None,
):
param_strings = []
for param, item in {"modified_since": modified_since,
"bloomberg_ticker": bloomberg_ticker,
"event_type": event_type,
"start_date": start_date,
"end_date": end_date,
}.items():
if item is not None:
param_strings.append(f"{param}={item}")
# format string for use with aiera
param_string = "&".join(param_strings)
url = f"{self.aiera_settings.base_url}/events?{param_string}"
matches = requests.get(url,
headers={"X-API-Key": self.aiera_settings.api_key})
content = json.dumps(matches.json())
return content
def upload_event_transcripts(self, event_ids: list):
file_ids = []
for event_id in event_ids:
event = requests.get(f"{self.aiera_settings.base_url}/events/{event_id}?transcripts=true",
headers={"X-API-Key": self.aiera_settings.api_key})
event_data = event.json()
transcripts = [event_item["transcript"] for event_item in event_data["transcripts"]]
# remove transcripts items
del event_data["transcripts"]
event_data["transcript"] = "\n".join(transcripts)
filename = f'{event_id}.json'
file_id = self.upload_transcript_file(filename, json.dumps(event_data))
file_ids.append(file_id)
return file_ids
def upload_transcript_file(self, filename, transcript, sleep=5):
# utility function for single uploads
# create temporary local file
with open(filename, "w") as f:
f.write(transcript)
#upload a file with an assistants purpose
try:
file = self.client.files.create(
file = open(filename, "rb"),
purpose = "assistants"
)
self._file_ids.append(file.id)
except Exception as e:
logger.error(str(e))
# optional param to give openai time to index
if sleep:
time.sleep(sleep)
# remove local file
os.remove(filename)
return file.id
Let’s add a couple of other utility functions to begin the conversation, close the conversation, and format the messages for use with Streamlit.
def begin_conversation(self):
self.submit_message("hello")
return self.process_messages()
def close_chat(self):
self.client.beta.threads.delete(self.thread.id)
def _format_openai_messages(self, messages):
new_messages = []
for message in messages:
if isinstance(message, ThreadMessage):
content = message.content[0].text.value
# add escape so $ doesn't render like math
content = content.replace(" $", " \\$")
new_message = {"role": message.role, "content": content}
new_messages.append(new_message)
return new_messages
And finally, let’s create a destructor to handle deleting files from OpenAI at application shutdown.
def __del__(self):
if self._file_ids:
if not self.persist_files:
# remove each file
for file_id in list(set(self._file_ids)):
res = self.client.files.delete(
file_id = file_id
)
Now we can use Streamlit to assemble our application and define the runtime logic for invoking our assistant. We assign an assistant instance to our streamlit state, and use that object for communicating with Aiera and OpenAI. In a new file, use the following snippet to define the UI:
# aiera_assistant/main.py
import streamlit as st
import re
from streamlit_chat import message
from aiera_assistant.config import openai_settings, aiera_settings, db_settings
from aiera_assistant.assistant import AieraAssistant
from aiera_assistant.__init__ import ROOT_DIR
import os
from pathlib import Path
ROOT_DIR = str(Path(__file__).parent.parent.absolute())
def main():
# Setting page title and header
st.set_page_config(page_title="Aiera")
st.markdown("<h1 style='text-align: center;'>Aiera Assistant</h1>", unsafe_allow_html=True)
# initialize the assistant
if 'assistant' not in st.session_state:
st.session_state['assistant'] = AieraAssistant(
openai_settings = openai_settings,
aiera_settings=aiera_settings
)
# Initialise session state variables
if 'generated' not in st.session_state:
st.session_state['generated'] = st.session_state['assistant'].begin_conversation()
# container for chat history
response_container = st.container()
# container for text input
container = st.container()
# set up user input form
with container:
with st.form(key='user_input_form', clear_on_submit=True):
user_input = st.text_area("You:", key='input', height=100)
submit_button = st.form_submit_button(label='Send')
# if user has submitted input, submit and process messages
if submit_button and user_input:
with st.spinner(text='Processing...'):
st.session_state['generated'].append({"role": "user", "content": user_input})
# trigger assistant processing
st.session_state['assistant'].submit_message(user_input)
messages = st.session_state['assistant'].process_messages()
# update messages
st.session_state['generated'] = [mess for mess in messages]
# format new messages
if st.session_state['generated']:
with response_container:
citations = []
# iterate over and differentiate user / chatbot messages
for i, mess in enumerate(reversed(st.session_state['generated'])):
if mess["role"] == 'user':
message(mess["content"], is_user=True, key=str(i) + '_user')
else:
content = mess["content"]
# filter out annotation placeholders
if "【" in content:
content = re.sub(r'【(.*?)】', '', content)
with st.chat_message('Aiera'):
st.write(content)
if __name__ == "__main__":
main()
The above file can be used to run the application with:
streamlit run aiera_assistant/main.py.
In close, we’ve built a chat application that may be used for locally testing custom Aiera-powered AI.
If you’d like to learn more about Aiera’s wide data offerings, please contact sales@aiera.com.