以下の記事を見ながら、Strands AgentsのマルチエージェントをAgentCoreランタイムで動かすハンズオンをやってみます。
Qiita


Strands & AgentCoreハンズオン! MCPマルチエージェントをAWSに簡単デプロイ - Qiita
この記事は人力で書きました。 このイベント用の手順書ですが、本記事を読めば1時間ぐらいで誰でも試せます! Claude Codeなど、AIエージェントを「使う」人はかなり増えて...
目次
環境構築、IAMユーザーの作成
以下を参照
あわせて読みたい


【AWS】Strands Agentsのハンズオンをやってみた
以下の記事を見ながら、Strands Agentsのハンズオンをやってみます。 https://qiita.com/minorun365/items/6d4fae4f7dacbc1e1971 引用元: Bedrock AgentCore & Str…
構成イメージ

Bedrock AgentCore 上で動くアプリ。
監督者エージェント(オーケストレーター)が次の2つをツールとして呼び出す構成。
- AWSマスター(MCP over HTTP)
- APIマスター(MCP over stdio)
それぞれのサブエージェントは Strands Agent と MCP クライアントを組み合わせて動きます。
マルチエージェントの開発
ディレクトリ構成

コード
3_advanced/dokcer/multiagent.py
import os, asyncio
from strands import Agent, tool
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from bedrock_agentcore.runtime import BedrockAgentCoreApp
# =============================================================================
# サブエージェントのストリーミング処理
# =============================================================================
async def send_event(queue, message, stage, tool_name=None):
"""サブエージェントのステータスを送信"""
if not queue:
return
progress = {"message": message, "stage": stage}
if tool_name:
progress["tool_name"] = tool_name
await queue.put({"event": {"subAgentProgress": progress}})
async def merge_streams(stream, queue):
"""親子エージェントのストリームを統合"""
create_task = asyncio.create_task
main = create_task(anext(stream, None))
sub = create_task(queue.get())
waiting = {main, sub}
# チャンクの到着を待機
while waiting:
ready_chunks, waiting = await asyncio.wait(
waiting, return_when=asyncio.FIRST_COMPLETED
)
for ready_chunk in ready_chunks:
# 監督者エージェントのチャンクを処理
if ready_chunk == main:
event = ready_chunk.result()
if event is not None:
yield event
main = create_task(anext(stream, None))
waiting.add(main)
else:
main = None
# サブエージェントのチャンクを処理
elif ready_chunk == sub:
try:
sub_event = ready_chunk.result()
yield sub_event
sub = create_task(queue.get())
waiting.add(sub)
except Exception:
sub = None
if main is None and queue.empty():
break
# =============================================================================
# サブエージェントの呼び出し
# =============================================================================
async def _extract(queue, agent, event, state):
"""ストリーミングから内容を抽出"""
if isinstance(event, str):
state["text"] += event
if queue:
delta = {"delta": {"text": event}}
await queue.put(
{"event": {"contentBlockDelta": delta}}
)
elif isinstance(event, dict) and "event" in event:
event_data = event["event"]
# ツール使用を検出
if "contentBlockStart" in event_data:
block = event_data["contentBlockStart"]
start_data = block.get("start", {})
if "toolUse" in start_data:
tool_use = start_data["toolUse"]
tool = tool_use.get("name", "unknown")
await send_event(queue,
f"「{agent}」がツール「{tool}」を実行中",
"tool_use", tool
)
# テキスト増分を処理
if "contentBlockDelta" in event_data:
block = event_data["contentBlockDelta"]
delta = block.get("delta", {})
if "text" in delta:
state["text"] += delta["text"]
if queue:
await queue.put(event)
async def invoke_agent(agent, query, mcp, create_agent, queue):
"""サブエージェントを呼び出し"""
state = {"text": ""}
await send_event(
queue, f"サブエージェント「{agent}」が呼び出されました", "start"
)
try:
# MCPクライアントを起動しながら、エージェントを呼び出し
with mcp:
agent_obj = create_agent()
async for event in agent_obj.stream_async(query):
await _extract(queue, agent, event, state)
await send_event(
queue, f"「{agent}」が対応を完了しました", "complete"
)
return state["text"]
except Exception:
return f"{agent}エージェントの処理に失敗しました"
# =============================================================================
# サブエージェント1: AWSマスター
# =============================================================================
class AwsMasterState:
def __init__(self):
self.client = None
self.queue = None
_aws_state = AwsMasterState()
def setup_aws_master(queue):
"""新規キューを受け取り、MCPクライアントを準備"""
_aws_state.queue = queue
if queue and not _aws_state.client:
try:
_aws_state.client = MCPClient(
lambda: streamablehttp_client(
"https://knowledge-mcp.global.api.aws"
)
)
except Exception:
_aws_state.client = None
def _create_aws_agent():
"""AWS知識参照エージェントを作成"""
if not _aws_state.client:
return None
return Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=_aws_state.client.list_tools_sync(),
system_prompt="語尾は「〜ゾイ。」にしてください。検索・参照は手短にね。"
)
@tool
async def aws_master(query):
"""AWSマスターエージェント"""
if not _aws_state.client:
return "MCPクライアントが利用不可です"
return await invoke_agent(
"AWSマスター", query, _aws_state.client,
_create_aws_agent, _aws_state.queue
)
# =============================================================================
# サブエージェント2: APIマスター
# =============================================================================
class ApiMasterState:
def __init__(self):
self.client = None
self.queue = None
_api_state = ApiMasterState()
def setup_api_master(queue):
"""新規キューを受け取り、MCPクライアントを準備"""
_api_state.queue = queue
if queue and not _api_state.client:
try:
_api_state.client = MCPClient(
lambda: stdio_client(StdioServerParameters(
command="uvx", args=["awslabs.aws-api-mcp-server"],
env=os.environ.copy()
))
)
except Exception:
_api_state.client = None
def _create_api_agent():
"""API操作エージェントを作成"""
if not _api_state.client:
return None
return Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=_api_state.client.list_tools_sync(),
system_prompt="ギャル風の口調で応対してください。"
)
@tool
async def api_master(query):
"""APIマスターエージェント"""
if not _api_state.client:
return "MCPクライアントが利用不可です"
return await invoke_agent(
"APIマスター", query, _api_state.client,
_create_api_agent, _api_state.queue
)
# =============================================================================
# メイン処理
# =============================================================================
def _create_orchestrator():
"""監督者エージェントを作成"""
return Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=[aws_master, api_master],
system_prompt="""2体のサブエージェントを使って日本語で応対して。
1. AWSマスター:AWSドキュメントなどを参照できます。
2. APIマスター:AWSアカウントをAPIで操作できます。
語尾は「〜だキュウ。」にしてください。"""
)
# アプリケーションを初期化
app = BedrockAgentCoreApp()
orchestrator = _create_orchestrator()
@app.entrypoint
async def invoke(payload):
"""呼び出し処理の開始地点"""
prompt = payload.get("input", {}).get("prompt", "")
# サブエージェント用のキューを初期化
queue = asyncio.Queue()
setup_aws_master(queue)
setup_api_master(queue)
try:
# 監督者エージェントを呼び出し、ストリームを統合
stream = orchestrator.stream_async(prompt)
async for event in merge_streams(stream, queue):
yield event
finally:
# キューをクリーンアップ
setup_aws_master(None)
setup_api_master(None)
# AgentCoreランタイムを起動
if __name__ == "__main__":
app.run()
AWSにデプロイ
requirements.txtを作成
strands-agents
bedrock-agentcore
uvAgentCoreランタイムへデプロイ
# .env の内容をターミナルの環境変数に設定
export $(cat /workspaces/strands-agentcore/.env | grep -v ^# | xargs)
# AgentCoreのスターターツールキットをインストール
pip install bedrock-agentcore-starter-toolkit
# デプロイ準備
agentcore configure --entrypoint multiagent.pyRequirement already satisfied: bedrock-agentcore-starter-toolkit in /usr/local/python/3.12.1/lib/python3.12/site-packages (0.1.32)
Requirement already satisfied: autopep8>=2.3.2 in /usr/local/python/3.12.1/lib/python3.12/site-packages (from bedrock-agentcore-starter-toolkit) (2.3.2)
Requirement already satisfied: bedrock-agentcore>=1.0.3 in /usr/local/python/3.12.1/lib/python3.12/site-packages (from bedrock-agentcore-starter-toolkit) (1.0.5)
Requirement already satisfied: boto3>=1.40.65 in /usr/local/python/3.12.1/lib/python3.12/site-packages (from bedrock-agentcore-starter-toolkit) (1.40.66)
~(略)~
Memory Configuration
Tip: Use --disable-memory flag to skip memory entirely
✅ MemoryManager initialized for region: ap-northeast-1
Existing memory resources found:
1. jawsug1109_mem-gmaIQy6R50
ID: jawsug1109_mem-gmaIQy6R50
Options:
• Enter a number to use existing memory
• Press Enter to create new memory
• Type 's' to skip memory setup
Your choice: 1
✓ Using existing memory: jawsug1109_mem-gmaIQy6R50
Using existing memory resource: jawsug1109_mem-gmaIQy6R50
Network mode: PUBLIC
Setting 'multiagent' as default agent「MemoryManager」は既存のものを利用したく番号を指定しましたが、確認事項は基本的にEnterで進めました。
デプロイを実施。
agentcore launch Deployment completed successfully - Agent: arn:aws:bedrock-agentcore:xxxxxxxxxxxx(略)
╭───────────────────────────────────────────────────── Deployment Success ──────────────────────────────────────────────────────╮
│ Agent Details: │
│ Agent Name: multiagent │
│ Agent ARN: arn:aws:bedrock-agentcore:xxxxxxxxxxxx(略) │
│ Deployment Type: Direct Code Deploy │
│ │
│ 📦 Code package deployed to Bedrock AgentCore │
│ │
│ Next Steps: │
│ agentcore status │
│ agentcore invoke '{"prompt": "Hello"}' │
│ │
│ 📋 CloudWatch Logs: │
│ /aws/bedrock-agentcore/runtimes/multiagent-xxxxxxxxxxxx(略) --log-stream-name-prefix "2025/11/24/[runtime-logs" │
│ /aws/bedrock-agentcore/runtimes/multiagent-xxxxxxxxxxxx(略) --log-stream-names "otel-rt-logs" │
│ │
│ 🔍 GenAI Observability Dashboard: │
│ https://console.aws.amazon.com/cloudwatch/home?region=xxxxxxxxxxxxxxxxxxxxxxxxxx/agent-core │
│ │
│ ⏱️ Note: Observability data may take up to 10 minutes to appear after first launch │
│ │
│ 💡 Tail logs with: │
│ aws logs tail /aws/bedrock-agentcore/runtimes/xxxxxxxxxxxx(略) --log-stream-name-prefix │
│ "2025/11/24/[runtime-logs" --follow │
│ aws logs tail /aws/bedrock-agentcore/runtimes/xxxxxxxxxxxx(略) --log-stream-name-prefix │
│ "2025/11/24/[runtime-logs" --since 1h │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
デプロイ成功。
ターミナルに出力されたAgent ARNを控えておきます。
動作確認
.envファイルにAgent ARNを記載
先程控えたARNを以下のように記載
# AgentCore設定
AGENT_RUNTIME_ARN=arn:aws:bedrock-agentcore:xxxxxxxxxxxx(略) IAMロールの権限追加
- マネコンで、[Amazon Bedrock AgentCore] > [エージェントランタイム] >
multiagentをクリック - [Versions]から最新バージョンをクリック > [IAM service role]をクリック
- [許可を追加] > [ポリシーをアタッチ] をクリック
- [AWS管理 – ジョブ機能] で絞り込み、以下をチェックして[許可を追加]
BillingReadOnlyAccess

フロントエンドを準備
3_advanced/frontend.py
from dotenv import load_dotenv
import os, json, uuid, asyncio, boto3
import streamlit as st
load_dotenv(override=True)
# =============================================================================
# ストリーミング処理
# =============================================================================
def create_state():
"""新しい状態を作成"""
return {
"containers": [],
"current_status": None,
"current_text": None,
"final_response": ""
}
def think(container, state):
"""思考開始を表示"""
with container:
thinking_status = st.empty()
thinking_status.status("思考中", state="running")
state["containers"].append((thinking_status, "思考中"))
def change_status(event, container, state):
"""サブエージェントのステータスを更新"""
progress_info = event["subAgentProgress"]
message = progress_info.get("message")
stage = progress_info.get("stage", "processing")
# 前のステータスを完了状態に
if state["current_status"]:
status, old_message = state["current_status"]
status.status(old_message, state="complete")
# 新しいステータス表示
with container:
new_status_box = st.empty()
if stage == "complete":
display_state = "complete"
else:
display_state = "running"
new_status_box.status(message, state=display_state)
status_info = (new_status_box, message)
state["containers"].append(status_info)
state["current_status"] = status_info
state["current_text"] = None
state["final_response"] = ""
def stream_text(event, container, state):
"""テキストをストリーミング表示"""
delta = event["contentBlockDelta"]["delta"]
if "text" not in delta:
return
# テキスト出力開始時にステータスを完了に
if state["current_text"] is None:
if state["containers"]:
status, first_message = state["containers"][0]
if "思考中" in first_message:
status.status("思考中", state="complete")
if state["current_status"]:
status, message = state["current_status"]
status.status(message, state="complete")
# テキスト処理
text = delta["text"]
state["final_response"] += text
# テキストコンテナ更新
if state["current_text"] is None:
with container:
state["current_text"] = st.empty()
if state["current_text"]:
response = state["final_response"]
state["current_text"].markdown(response)
def finish(state):
"""表示の終了処理"""
if state["current_text"]:
response = state["final_response"]
state["current_text"].markdown(response)
for status, message in state["containers"]:
status.status(message, state="complete")
# =============================================================================
# サブエージェント呼び出し
# =============================================================================
def extract_stream(data, container, state):
"""ストリーミングから内容を抽出"""
if not isinstance(data, dict):
return
event = data.get("event", {})
if "subAgentProgress" in event:
change_status(event, container, state)
elif "contentBlockDelta" in event:
stream_text(event, container, state)
elif "error" in data:
error_msg = data.get("error", "Unknown error")
error_type = data.get("error_type", "Unknown")
st.error(f"AgentCoreエラー: {error_msg}")
state["final_response"] = f"エラー: {error_msg}"
async def invoke_agent(prompt, container, agent_core):
"""エージェントを呼び出し"""
state = create_state()
session_id = f"session_{str(uuid.uuid4())}"
think(container, state)
payload = json.dumps({
"input": {"prompt": prompt, "session_id": session_id}
}).encode()
try:
agent_response = agent_core.invoke_agent_runtime(
agentRuntimeArn=os.getenv("AGENT_RUNTIME_ARN"),
runtimeSessionId=session_id,
payload=payload,
qualifier="DEFAULT"
)
for line in agent_response["response"].iter_lines():
decoded = line.decode("utf-8")
if not line or not decoded.startswith("data: "):
continue
try:
data = json.loads(decoded[6:])
extract_stream(data, container, state)
except json.JSONDecodeError:
continue
finish(state)
return state["final_response"]
except Exception as e:
st.error(f"エラーが発生しました: {e}")
return ""
# =============================================================================
# メイン画面
# =============================================================================
# タイトル表示
st.title("アマQ Unlimited")
st.write("AWSドキュメントや、あなたのアカウントを調査し放題!")
# セッションを初期化
if 'messages' not in st.session_state:
st.session_state.messages = []
# メッセージ履歴を表示
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# AgentCore APIクライアントを初期化
agent_core = boto3.client('bedrock-agentcore')
# ユーザー入力を表示
if prompt := st.chat_input("メッセージを入力してね"):
with st.chat_message("user"):
st.markdown(prompt)
st.session_state.messages.append(
{"role": "user", "content": prompt}
)
# エージェントの応答を表示
with st.chat_message("assistant"):
container = st.container()
try:
response = asyncio.run(
invoke_agent(prompt, container, agent_core)
)
if response:
st.session_state.messages.append(
{"role": "assistant", "content": response}
)
except Exception as e:
st.error(f"エラーが発生しました: {e}")
以下のコマンドで起動します。
$ cd /workspaces/strands-agentcore/3_advanced/
$ pip install streamlit==1.46.1
$ streamlit run frontend.pyhttp://localhost:8501をクリックするとブラウザにエージェントが起動します。
エージェントに質問してみます。

残念ながらエラーに。。。モデル名を見直してみます。
(追って、後述します)

