Batch dataset runner
The BatchEvaluationRunner delegates span collection and evaluation entirely to the service via the StartBatchEvaluation and GetBatchEvaluation APIs. After invoking your agent for each scenario, the runner submits a batch job and polls until it completes, returning aggregate results.
Use the batch runner when you need aggregate scores across many sessions without managing span collection yourself; for baseline measurement, large datasets, and pre/post comparison.
How it works
The runner processes scenarios in four phases:
-
Invoke: All scenarios run concurrently using a thread pool. Each scenario gets a unique session ID, and turns within a scenario execute sequentially to maintain conversation context.
-
Wait: A configurable ingestion delay (default: 180 seconds) allows CloudWatch to ingest the telemetry data. This delay is paid once, not per scenario.
-
Submit: The runner calls
StartBatchEvaluationwith the CloudWatch log group, session IDs from the invocation phase, evaluator IDs, and ground truth from the dataset. -
Poll: The runner polls
GetBatchEvaluationuntil the job reaches a terminal state and returns the aggregate results.
Agent invoker
The runner requires an agent invoker, a callable that invokes your agent for a single turn. The invoker is framework-agnostic: you can call your agent via boto3 invoke_agent_runtime, a direct function call, HTTP request, or any other method.
import json import boto3 from bedrock_agentcore.evaluation import AgentInvokerInput, AgentInvokerOutput REGION = "<region-code>" AGENT_ARN = "arn:aws:bedrock-agentcore:<region-code>:<account-id>:runtime/<agent-id>" LOG_GROUP = "/aws/bedrock-agentcore/runtimes/<agent-id>-DEFAULT" SERVICE_NAME = "<agent-id>.DEFAULT" agentcore_client = boto3.client("bedrock-agentcore", region_name=REGION) def agent_invoker(invoker_input: AgentInvokerInput) -> AgentInvokerOutput: payload = invoker_input.payload if isinstance(payload, str): payload = json.dumps({"prompt": payload}).encode() elif isinstance(payload, dict): payload = json.dumps(payload).encode() print(f"[{invoker_input.session_id}] > sending payload: {payload.decode()}") response = agentcore_client.invoke_agent_runtime( agentRuntimeArn=AGENT_ARN, runtimeSessionId=invoker_input.session_id, payload=payload, ) response_body = response["response"].read() print(f"[{invoker_input.session_id}] < received response: {response_body.decode()}") return AgentInvokerOutput(agent_output=json.loads(response_body))
| Field | Type | Description |
|---|---|---|
|
|
|
The turn input from the dataset. |
|
|
|
Stable across all turns in a scenario. Pass this to your agent to maintain conversation context. |
|
|
|
The agent’s response. |
Example
The following example loads a dataset from a JSON file and runs the batch evaluation. For the dataset format, see Dataset schema.
from bedrock_agentcore.evaluation import ( BatchEvaluationRunner, BatchEvaluationRunConfig, BatchEvaluatorConfig, CloudWatchDataSourceConfig, FileDatasetProvider, ) # Load dataset (see Dataset schema for format) dataset = FileDatasetProvider("dataset.json").get_dataset() # Configure the batch evaluation config = BatchEvaluationRunConfig( batch_evaluation_name="dataset-batch-eval", evaluator_config=BatchEvaluatorConfig( evaluator_ids=[ "Builtin.GoalSuccessRate", "Builtin.Correctness", "Builtin.TrajectoryExactOrderMatch", "Builtin.Helpfulness", ], ), data_source=CloudWatchDataSourceConfig( service_names=[SERVICE_NAME], log_group_names=[LOG_GROUP], ingestion_delay_seconds=180, ), polling_timeout_seconds=1800, polling_interval_seconds=30, ) # Run runner = BatchEvaluationRunner(region=REGION) result = runner.run_dataset_evaluation( agent_invoker=agent_invoker, dataset=dataset, config=config, ) # Display aggregate results print(f"Status: {result.status}") print(f"Batch evaluation ID: {result.batch_evaluation_id}") if result.evaluation_results: er = result.evaluation_results print(f"Sessions completed: {er.number_of_sessions_completed}") print(f"Sessions failed: {er.number_of_sessions_failed}") print(f"Total sessions: {er.total_number_of_sessions}") for summary in er.evaluator_summaries or []: avg = summary.statistics.average_score if summary.statistics else None print(f" {summary.evaluator_id}: avg={avg}")
Fetching per-session detail
The aggregate results show averages across all sessions. To see per-session, per-evaluator scores, fetch the evaluation events from CloudWatch:
if result.output_data_config: events = runner.fetch_evaluation_events(result) print(f"\nEvaluation events: {len(events)}") for ev in events: attrs = ev.get("attributes", {}) print(f" session: {attrs.get('session.id', '')[:40]}") print(f" evaluator: {attrs.get('gen_ai.evaluation.name')}") print(f" score: {attrs.get('gen_ai.evaluation.score.value')}") print(f" label: {attrs.get('gen_ai.evaluation.score.label')}") print()
Configuration reference
BatchEvaluationRunConfig( batch_evaluation_name="my-batch-eval", # Job name evaluator_config=BatchEvaluatorConfig( evaluator_ids=["Builtin.GoalSuccessRate"], ), data_source=CloudWatchDataSourceConfig( service_names=["MyAgent.DEFAULT"], # Exactly 1 service name log_group_names=[LOG_GROUP], # 1-5 log group names ingestion_delay_seconds=180, # Wait for CW ingestion (default: 180) ), polling_timeout_seconds=1800, # Max wait for job completion (default: 1800) polling_interval_seconds=30, # Poll interval (default: 30) simulation_config=None, # Set SimulationConfig for simulated scenarios )
| Field | Default | Description |
|---|---|---|
|
|
— |
Name for the batch evaluation job. |
|
|
— |
List of evaluator IDs (built-in or custom). |
|
|
— |
Service name identifying your agent’s traces in CloudWatch. |
|
|
— |
CloudWatch log group names where agent telemetry is stored. |
|
|
180 |
Seconds to wait after invocation for CloudWatch to ingest spans. |
|
|
1800 |
Maximum seconds to wait for the batch job to complete. |
|
|
30 |
Seconds between poll requests. |
|
|
None |
Configuration for simulated scenarios. Set |
Result structure
The runner returns a BatchEvaluationResult:
BatchEvaluationResult ├── batch_evaluation_id: str ├── batch_evaluation_arn: str ├── batch_evaluation_name: str ├── status: str ├── created_at: datetime ├── evaluation_results: Optional[BatchEvaluationSummary] │ ├── number_of_sessions_completed: int │ ├── number_of_sessions_in_progress: int │ ├── number_of_sessions_failed: int │ ├── number_of_sessions_ignored: int │ ├── total_number_of_sessions: int │ └── evaluator_summaries: List │ ├── evaluator_id: str │ ├── statistics.average_score: float │ ├── total_evaluated: int │ └── total_failed: int ├── error_details: Optional[List[str]] ├── agent_invocation_failures: List[FailedScenario] └── output_data_config: Optional[CloudWatchOutputDataConfig] ├── log_group_name: str └── log_stream_name: str
-
agent_invocation_failureslists scenarios where the agent invocation failed before the batch job was submitted. These sessions are not included in the batch evaluation. -
output_data_configpoints to the CloudWatch log stream where per-session detail is written. Userunner.fetch_evaluation_events(result)to read it.
Error handling
-
Scenario invocation failures are recorded as
FailedScenariobut do not block the batch job; only successful sessions are submitted. -
If all scenarios fail, the runner raises
ValueErrorbefore calling the API. -
Polling timeout:
TimeoutErrorif the job exceedspolling_timeout_seconds. -
Job failure:
RuntimeErrorif the batch evaluation status isFAILEDorSTOPPED.