Python autoinstrumentation library for AWS Bedrock calls made using boto3 (sync) and aioboto3 (async).
This package implements OpenInference tracing for invoke_model, invoke_agent and converse calls made using the bedrock-runtime and bedrock-agent-runtime clients from both boto3 (sync) and aioboto3 (async).
[!NOTE]
The Converse API was introduced in botocore v1.34.116. Please use v1.34.116 or above to utilize converse.
Find the list of Bedrock-supported models and their IDs here. Future testing is planned for additional models.
| Model | Supported Methods |
|---|---|
Anthropic Claude 2.0 |
converse, invoke |
Anthropic Claude 2.1 |
converse, invoke |
Anthropic Claude 3 Sonnet 1.0 |
converse |
Anthropic Claude 3.5 Sonnet |
converse |
Anthropic Claude 3 Haiku |
converse |
Meta Llama 3 8b Instruct |
converse |
Meta Llama 3 70b Instruct |
converse |
Mistral AI Mistral 7B Instruct |
converse |
Mistral AI Mixtral 8X7B Instruct |
converse |
Mistral AI Mistral Large |
converse |
Mistral AI Mistral Small |
converse |
pip install openinference-instrumentation-bedrock
To instrument async Bedrock calls made via aioboto3, install aioboto3 in addition to this package:
pip install openinference-instrumentation-bedrock aioboto3
[!IMPORTANT]
OpenInference for AWS Bedrock supports bothinvoke_modelandconverse. For models that use the Messages API, such as Anthropic Claude 3 and Anthropic Claude 3.5, use the Converse API instead.
In a notebook environment (jupyter, colab, etc.) install openinference-instrumentation-bedrock, arize-phoenix and boto3.
You can test out this quickstart guide in Google Colab!
pip install openinference-instrumentation-bedrock arize-phoenix boto3
For async usage with aioboto3:
pip install openinference-instrumentation-bedrock arize-phoenix aioboto3
Ensure that boto3 is configured with AWS credentials.
The tracing setup below is shared for both sync (boto3) and async (aioboto3) usage.
from urllib.parse import urljoin
import boto3
import phoenix as px
from openinference.instrumentation.bedrock import BedrockInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
Next, we’ll start a phoenix server and set it as a collector.
px.launch_app()
session_url = px.active_session().url
phoenix_otlp_endpoint = urljoin(session_url, "v1/traces")
phoenix_exporter = OTLPSpanExporter(endpoint=phoenix_otlp_endpoint)
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter=phoenix_exporter))
trace_api.set_tracer_provider(tracer_provider=tracer_provider)
BedrockInstrumentor().instrument()
Now, all calls to invoke_model are instrumented and can be viewed in the phoenix UI.
session = boto3.session.Session()
client = session.client("bedrock-runtime")
prompt = b'{"prompt": "Human: Hello there, how are you? Assistant:", "max_tokens_to_sample": 1024}'
response = client.invoke_model(modelId="anthropic.claude-v2", body=prompt)
response_body = json.loads(response.get("body").read())
print(response_body["completion"])
Alternatively, all calls to converse are instrumented and can be viewed in the phoenix UI.
session = boto3.session.Session()
client = session.client("bedrock-runtime")
message1 = {
"role": "user",
"content": [{"text": "Create a list of 3 pop songs."}]
}
message2 = {
"role": "user",
"content": [{"text": "Make sure the songs are by artists from the United Kingdom."}]
}
messages = []
messages.append(message1)
response = client.converse(
modelId="anthropic.claude-3-5-sonnet-20240620-v1:0",
messages=messages
)
out = response["output"]["message"]
messages.append(out)
print(out.get("content")[-1].get("text"))
messages.append(message2)
response = client.converse(
modelId="anthropic.claude-v2:1",
messages=messages
)
out = response['output']['message']
print(out.get("content")[-1].get("text"))
All calls to invoke_agent are instrumented and can be viewed in the phoenix UI. You can enable the agent traces by passing enableTrace=True argument.
session = boto3.session.Session()
client = session.client("bedrock-agent-runtime")
agent_id = '<AgentId>'
agent_alias_id = '<AgentAliasId>'
session_id = f"default-session1_{int(time.time())}"
attributes = dict(
inputText="When is a good time to visit the Taj Mahal?",
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
enableTrace=True
)
response = client.invoke_agent(**attributes)
for idx, event in enumerate(response['completion']):
if 'chunk' in event:
chunk_data = event['chunk']
if 'bytes' in chunk_data:
output_text = chunk_data['bytes'].decode('utf8')
print(output_text)
elif 'trace' in event:
print(event['trace'])
OpenInference AWS Bedrock instrumentation also supports async Bedrock calls using aioboto3.
import aioboto3
import asyncio
async def main():
session = aioboto3.session.Session(region_name="us-east-1")
async with session.client(
"bedrock-runtime",
aws_access_key_id="test",
aws_secret_access_key="test",
) as client:
response = await client.converse(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
messages=[
{
"role": "user",
"content": [{"text": "What is the sum of numbers from 1 to 10?"}],
}
],
)
print(response["output"]["message"]["content"][-1]["text"])
asyncio.run(main())
All async calls to invoke_agent are instrumented and can be viewed in the phoenix UI. You can enable the agent traces by passing enableTrace=True argument.
import aioboto3
import asyncio
import time
async def main():
session = aioboto3.session.Session(region_name="us-east-1")
agent_id = '<AgentId>'
agent_alias_id = '<AgentAliasId>'
session_id = f"default-session1_{int(time.time())}"
attributes = dict(
inputText="When is a good time to visit the Taj Mahal?",
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
enableTrace=True
)
async with session.client(
"bedrock-runtime",
aws_access_key_id="test",
aws_secret_access_key="test",
) as client:
response = await client.invoke_agent(**attributes)
for idx, event in enumerate(response['completion']):
if 'chunk' in event:
chunk_data = event['chunk']
if 'bytes' in chunk_data:
output_text = chunk_data['bytes'].decode('utf8')
print(output_text)
elif 'trace' in event:
print(event['trace'])
asyncio.run(main())