【AWS】Strands Agentsで作ったマルチエージェントをAgentCoreランタイムで動かす

※2025/11/25 更新: 【3. マルチエージェントの開発】以降を追記しました。

以下の記事を見ながら、Strands AgentsのマルチエージェントをAgentCoreランタイムで動かすハンズオンをやってみます。

Qiita
Strands & AgentCoreハンズオン! MCPマルチエージェントをAWSに簡単デプロイ - Qiita この記事は人力で書きました。 このイベント用の手順書ですが、本記事を読めば1時間ぐらいで誰でも試せます! Claude Codeなど、AIエージェントを「使う」人はかなり増えて...
目次

環境構築、IAMユーザーの作成

以下を参照

環境構築
IAMユーザーの作成

構成イメージ

AgentCoreランタイムに載せたマルチエージェント構成図

Bedrock AgentCore 上で動くアプリ。

監督者エージェント(オーケストレーター)が次の2つをツールとして呼び出す構成。

  • AWSマスター(MCP over HTTP)
  • APIマスター(MCP over stdio)

それぞれのサブエージェントは Strands Agent と MCP クライアントを組み合わせて動きます。

マルチエージェントの開発

ディレクトリ構成

ディレクトリ構成

コード

3_advanced/dokcer/multiagent.py
import os, asyncio
from strands import Agent, tool
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from bedrock_agentcore.runtime import BedrockAgentCoreApp

# =============================================================================
# サブエージェントのストリーミング処理
# =============================================================================

async def send_event(queue, message, stage, tool_name=None):
    """サブエージェントのステータスを送信"""
    if not queue:
        return
    
    progress = {"message": message, "stage": stage}
    if tool_name:
        progress["tool_name"] = tool_name
    await queue.put({"event": {"subAgentProgress": progress}})

async def merge_streams(stream, queue):
    """親子エージェントのストリームを統合"""
    create_task = asyncio.create_task
    main = create_task(anext(stream, None))
    sub = create_task(queue.get())
    waiting = {main, sub}
    
    # チャンクの到着を待機
    while waiting:
        ready_chunks, waiting = await asyncio.wait(
            waiting, return_when=asyncio.FIRST_COMPLETED
        )
        for ready_chunk in ready_chunks:
            # 監督者エージェントのチャンクを処理
            if ready_chunk == main:
                event = ready_chunk.result()
                if event is not None:
                    yield event
                    main = create_task(anext(stream, None))
                    waiting.add(main)
                else:
                    main = None
            
            # サブエージェントのチャンクを処理
            elif ready_chunk == sub:
                try:
                    sub_event = ready_chunk.result()
                    yield sub_event
                    sub = create_task(queue.get())
                    waiting.add(sub)
                except Exception:
                    sub = None
        
        if main is None and queue.empty():
            break

# =============================================================================
# サブエージェントの呼び出し
# =============================================================================

async def _extract(queue, agent, event, state):
    """ストリーミングから内容を抽出"""
    if isinstance(event, str):
        state["text"] += event
        if queue:
            delta = {"delta": {"text": event}}
            await queue.put(
                {"event": {"contentBlockDelta": delta}}
            )
    elif isinstance(event, dict) and "event" in event:
        event_data = event["event"]
        
        # ツール使用を検出
        if "contentBlockStart" in event_data:
            block = event_data["contentBlockStart"]
            start_data = block.get("start", {})
            if "toolUse" in start_data:
                tool_use = start_data["toolUse"]
                tool = tool_use.get("name", "unknown")
                await send_event(queue, 
                    f"「{agent}」がツール「{tool}」を実行中", 
                    "tool_use", tool
                )
        
        # テキスト増分を処理
        if "contentBlockDelta" in event_data:
            block = event_data["contentBlockDelta"]
            delta = block.get("delta", {})
            if "text" in delta:
                state["text"] += delta["text"]
        
        if queue:
            await queue.put(event)

async def invoke_agent(agent, query, mcp, create_agent, queue):
    """サブエージェントを呼び出し"""
    state = {"text": ""}
    await send_event(
        queue, f"サブエージェント「{agent}」が呼び出されました", "start"
    )
    
    try:
        # MCPクライアントを起動しながら、エージェントを呼び出し
        with mcp:
            agent_obj = create_agent()
            async for event in agent_obj.stream_async(query):
                await _extract(queue, agent, event, state)
        
        await send_event(
            queue, f"「{agent}」が対応を完了しました", "complete"
        )
        return state["text"]
    
    except Exception:
        return f"{agent}エージェントの処理に失敗しました"

# =============================================================================
# サブエージェント1: AWSマスター
# =============================================================================

class AwsMasterState:
    def __init__(self):
        self.client = None
        self.queue = None

_aws_state = AwsMasterState()

def setup_aws_master(queue):
    """新規キューを受け取り、MCPクライアントを準備"""
    _aws_state.queue = queue
    if queue and not _aws_state.client:
        try:
            _aws_state.client = MCPClient(
                lambda: streamablehttp_client(
                    "https://knowledge-mcp.global.api.aws"
                )
            )
        except Exception:
            _aws_state.client = None

def _create_aws_agent():
    """AWS知識参照エージェントを作成"""
    if not _aws_state.client:
        return None
    return Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=_aws_state.client.list_tools_sync(),
        system_prompt="語尾は「〜ゾイ。」にしてください。検索・参照は手短にね。"
    )

@tool
async def aws_master(query):
    """AWSマスターエージェント"""
    if not _aws_state.client:
        return "MCPクライアントが利用不可です"
    return await invoke_agent(
        "AWSマスター", query, _aws_state.client,
        _create_aws_agent, _aws_state.queue
    )

# =============================================================================
# サブエージェント2: APIマスター
# =============================================================================

class ApiMasterState:
    def __init__(self):
        self.client = None
        self.queue = None

_api_state = ApiMasterState()

def setup_api_master(queue):
    """新規キューを受け取り、MCPクライアントを準備"""
    _api_state.queue = queue
    if queue and not _api_state.client:
        try:
            _api_state.client = MCPClient(
                lambda: stdio_client(StdioServerParameters(
                    command="uvx", args=["awslabs.aws-api-mcp-server"],
                    env=os.environ.copy()
                ))
            )
        except Exception:
            _api_state.client = None

def _create_api_agent():
    """API操作エージェントを作成"""
    if not _api_state.client:
        return None
    return Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=_api_state.client.list_tools_sync(),
        system_prompt="ギャル風の口調で応対してください。"
    )

@tool
async def api_master(query):
    """APIマスターエージェント"""
    if not _api_state.client:
        return "MCPクライアントが利用不可です"
    return await invoke_agent(
        "APIマスター", query, _api_state.client,
        _create_api_agent, _api_state.queue
    )

# =============================================================================
# メイン処理
# =============================================================================

def _create_orchestrator():
    """監督者エージェントを作成"""
    return Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=[aws_master, api_master],
        system_prompt="""2体のサブエージェントを使って日本語で応対して。
1. AWSマスター:AWSドキュメントなどを参照できます。
2. APIマスター:AWSアカウントをAPIで操作できます。
語尾は「〜だキュウ。」にしてください。"""
    )

# アプリケーションを初期化
app = BedrockAgentCoreApp()
orchestrator = _create_orchestrator()

@app.entrypoint
async def invoke(payload):
    """呼び出し処理の開始地点"""
    prompt = payload.get("input", {}).get("prompt", "")
    
    # サブエージェント用のキューを初期化
    queue = asyncio.Queue()
    setup_aws_master(queue)
    setup_api_master(queue)
    
    try:
        # 監督者エージェントを呼び出し、ストリームを統合
        stream = orchestrator.stream_async(prompt)
        async for event in merge_streams(stream, queue):
            yield event
            
    finally:
        # キューをクリーンアップ
        setup_aws_master(None)
        setup_api_master(None)

# AgentCoreランタイムを起動
if __name__ == "__main__":
    app.run()

AWSにデプロイ

requirements.txtを作成

strands-agents
bedrock-agentcore
uv

AgentCoreランタイムへデプロイ

# .env の内容をターミナルの環境変数に設定
export $(cat /workspaces/strands-agentcore/.env | grep -v ^# | xargs)

# AgentCoreのスターターツールキットをインストール
pip install bedrock-agentcore-starter-toolkit

# デプロイ準備
agentcore configure --entrypoint multiagent.py
Requirement already satisfied: bedrock-agentcore-starter-toolkit in /usr/local/python/3.12.1/lib/python3.12/site-packages (0.1.32)
Requirement already satisfied: autopep8>=2.3.2 in /usr/local/python/3.12.1/lib/python3.12/site-packages (from bedrock-agentcore-starter-toolkit) (2.3.2)
Requirement already satisfied: bedrock-agentcore>=1.0.3 in /usr/local/python/3.12.1/lib/python3.12/site-packages (from bedrock-agentcore-starter-toolkit) (1.0.5)
Requirement already satisfied: boto3>=1.40.65 in /usr/local/python/3.12.1/lib/python3.12/site-packages (from bedrock-agentcore-starter-toolkit) (1.40.66)

~(略)~

Memory Configuration
Tip: Use --disable-memory flag to skip memory entirely

✅ MemoryManager initialized for region: ap-northeast-1
Existing memory resources found:
  1. jawsug1109_mem-gmaIQy6R50
     ID: jawsug1109_mem-gmaIQy6R50

Options:
  • Enter a number to use existing memory
  • Press Enter to create new memory
  • Type 's' to skip memory setup
Your choice: 1
✓ Using existing memory: jawsug1109_mem-gmaIQy6R50
Using existing memory resource: jawsug1109_mem-gmaIQy6R50
Network mode: PUBLIC
Setting 'multiagent' as default agent

「MemoryManager」は既存のものを利用したく番号を指定しましたが、確認事項は基本的にEnterで進めました。

デプロイを実施。

agentcore launch
 Deployment completed successfully - Agent: arn:aws:bedrock-agentcore:xxxxxxxxxxxx(略)
╭───────────────────────────────────────────────────── Deployment Success ──────────────────────────────────────────────────────╮
│ Agent Details:                                                                                                                │
│ Agent Name: multiagent                                                                                                        │
│ Agent ARN: arn:aws:bedrock-agentcore:xxxxxxxxxxxx(略)                                                                        │
│ Deployment Type: Direct Code Deploy                                                                                           │
│                                                                                                                               │
│ 📦 Code package deployed to Bedrock AgentCore                                                                                 │
│                                                                                                                               │
│ Next Steps:                                                                                                                   │
│    agentcore status                                                                                                           │
│    agentcore invoke '{"prompt": "Hello"}'                                                                                     │
│                                                                                                                               │
│ 📋 CloudWatch Logs:                                                                                                          │
│    /aws/bedrock-agentcore/runtimes/multiagent-xxxxxxxxxxxx(略) --log-stream-name-prefix "2025/11/24/[runtime-logs"           │
│    /aws/bedrock-agentcore/runtimes/multiagent-xxxxxxxxxxxx(略) --log-stream-names "otel-rt-logs"                             │
│                                                                                                                               │
│ 🔍 GenAI Observability Dashboard:                                                                                            │
│    https://console.aws.amazon.com/cloudwatch/home?region=xxxxxxxxxxxxxxxxxxxxxxxxxx/agent-core                                │
│                                                                                                                               │
│ ⏱️  Note: Observability data may take up to 10 minutes to appear after first launch                                          │
│                                                                                                                               │
│ 💡 Tail logs with:                                                                                                            │
│    aws logs tail /aws/bedrock-agentcore/runtimes/xxxxxxxxxxxx(略) --log-stream-name-prefix                                   │
│ "2025/11/24/[runtime-logs" --follow                                                                                           │
│    aws logs tail /aws/bedrock-agentcore/runtimes/xxxxxxxxxxxx(略) --log-stream-name-prefix                                   │
│ "2025/11/24/[runtime-logs" --since 1h                                                                                         │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

デプロイ成功。

ターミナルに出力されたAgent ARNを控えておきます。

動作確認

.envファイルにAgent ARNを記載

先程控えたARNを以下のように記載

# AgentCore設定
AGENT_RUNTIME_ARN=arn:aws:bedrock-agentcore:xxxxxxxxxxxx(略)  

IAMロールの権限追加

  • マネコンで、[Amazon Bedrock AgentCore] > [エージェントランタイム] > multiagent をクリック
  • [Versions]から最新バージョンをクリック > [IAM service role]をクリック
  • [許可を追加] > [ポリシーをアタッチ] をクリック
  • [AWS管理 – ジョブ機能] で絞り込み、以下をチェックして[許可を追加]
    • Billing
    • ReadOnlyAccess
付与したIAMポリシー

フロントエンドを準備

3_advanced/frontend.py
from dotenv import load_dotenv
import os, json, uuid, asyncio, boto3
import streamlit as st

load_dotenv(override=True)

# =============================================================================
# ストリーミング処理
# =============================================================================

def create_state():
    """新しい状態を作成"""
    return {
        "containers": [],
        "current_status": None,
        "current_text": None,
        "final_response": ""
    }

def think(container, state):
    """思考開始を表示"""
    with container:
        thinking_status = st.empty()
        thinking_status.status("思考中", state="running")
    state["containers"].append((thinking_status, "思考中"))

def change_status(event, container, state):
    """サブエージェントのステータスを更新"""
    progress_info = event["subAgentProgress"]
    message = progress_info.get("message")
    stage = progress_info.get("stage", "processing")
    
    # 前のステータスを完了状態に
    if state["current_status"]:
        status, old_message = state["current_status"]
        status.status(old_message, state="complete")
    
    # 新しいステータス表示
    with container:
        new_status_box = st.empty()
        if stage == "complete":
            display_state = "complete"
        else:
            display_state = "running"
        new_status_box.status(message, state=display_state)
    
    status_info = (new_status_box, message)
    state["containers"].append(status_info)
    state["current_status"] = status_info
    state["current_text"] = None
    state["final_response"] = ""

def stream_text(event, container, state):
    """テキストをストリーミング表示"""
    delta = event["contentBlockDelta"]["delta"]
    if "text" not in delta:
        return
    
    # テキスト出力開始時にステータスを完了に
    if state["current_text"] is None:
        if state["containers"]:
            status, first_message = state["containers"][0]
            if "思考中" in first_message:
                status.status("思考中", state="complete")
        if state["current_status"]:
            status, message = state["current_status"]
            status.status(message, state="complete")
    
    # テキスト処理
    text = delta["text"]
    state["final_response"] += text
    
    # テキストコンテナ更新
    if state["current_text"] is None:
        with container:
            state["current_text"] = st.empty()
    if state["current_text"]:
        response = state["final_response"]
        state["current_text"].markdown(response)

def finish(state):
    """表示の終了処理"""
    if state["current_text"]:
        response = state["final_response"]
        state["current_text"].markdown(response)
    for status, message in state["containers"]:
        status.status(message, state="complete")

# =============================================================================
# サブエージェント呼び出し
# =============================================================================

def extract_stream(data, container, state):
    """ストリーミングから内容を抽出"""
    if not isinstance(data, dict):
        return

    event = data.get("event", {})    
    if "subAgentProgress" in event:
        change_status(event, container, state)
    elif "contentBlockDelta" in event:
        stream_text(event, container, state)
    elif "error" in data:
        error_msg = data.get("error", "Unknown error")
        error_type = data.get("error_type", "Unknown")
        st.error(f"AgentCoreエラー: {error_msg}")
        state["final_response"] = f"エラー: {error_msg}"

async def invoke_agent(prompt, container, agent_core):
    """エージェントを呼び出し"""
    state = create_state()
    session_id = f"session_{str(uuid.uuid4())}"
    think(container, state)
    
    payload = json.dumps({
        "input": {"prompt": prompt, "session_id": session_id}
    }).encode()
    
    try:
        agent_response = agent_core.invoke_agent_runtime(
            agentRuntimeArn=os.getenv("AGENT_RUNTIME_ARN"),
            runtimeSessionId=session_id,
            payload=payload,
            qualifier="DEFAULT"
        )
        for line in agent_response["response"].iter_lines():
            decoded = line.decode("utf-8")
            if not line or not decoded.startswith("data: "):
                continue
            try:
                data = json.loads(decoded[6:])
                extract_stream(data, container, state)
            except json.JSONDecodeError:
                continue
        
        finish(state)
        return state["final_response"]
    
    except Exception as e:
        st.error(f"エラーが発生しました: {e}")
        return ""

# =============================================================================
# メイン画面
# =============================================================================

# タイトル表示
st.title("アマQ Unlimited")
st.write("AWSドキュメントや、あなたのアカウントを調査し放題!")

# セッションを初期化
if 'messages' not in st.session_state:
    st.session_state.messages = []

# メッセージ履歴を表示
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# AgentCore APIクライアントを初期化
agent_core = boto3.client('bedrock-agentcore')

# ユーザー入力を表示
if prompt := st.chat_input("メッセージを入力してね"):
    with st.chat_message("user"):
        st.markdown(prompt)
    st.session_state.messages.append(
        {"role": "user", "content": prompt}
    )
    
    # エージェントの応答を表示
    with st.chat_message("assistant"):
        container = st.container()
        try:
            response = asyncio.run(
                invoke_agent(prompt, container, agent_core)
            )
            if response:
                st.session_state.messages.append(
                    {"role": "assistant", "content": response}
                )
        except Exception as e:
            st.error(f"エラーが発生しました: {e}")

以下のコマンドで起動します。

$ cd /workspaces/strands-agentcore/3_advanced/
$ pip install streamlit==1.46.1
$ streamlit run frontend.py

http://localhost:8501をクリックするとブラウザにエージェントが起動します。

エージェントに質問してみます。

streamlitエラー

残念ながらエラーに。。。モデル名を見直してみます。

(追って、後述します)

目次