strands.multiagent.swarm
Swarm Multi-Agent Pattern Implementation.
This module provides a collaborative agent orchestration system where agents work together as a team to solve complex tasks, with shared context and autonomous coordination.
Key Features:
- Self-organizing agent teams with shared working memory
- Tool-based coordination
- Autonomous agent collaboration without central control
- Dynamic task distribution based on agent capabilities
- Collective intelligence through shared context
- Human input via user interrupts raised in BeforeNodeCallEvent hooks and agent nodes
SwarmNode
Section titled “SwarmNode”@dataclassclass SwarmNode()Defined in: src/strands/multiagent/swarm.py:63
Represents a node (e.g. Agent) in the swarm.
__post_init__
Section titled “__post_init__”def __post_init__() -> NoneDefined in: src/strands/multiagent/swarm.py:72
Capture initial executor state after initialization.
__hash__
Section titled “__hash__”def __hash__() -> intDefined in: src/strands/multiagent/swarm.py:78
Return hash for SwarmNode based on node_id.
__eq__
Section titled “__eq__”def __eq__(other: Any) -> boolDefined in: src/strands/multiagent/swarm.py:82
Return equality for SwarmNode based on node_id.
__str__
Section titled “__str__”def __str__() -> strDefined in: src/strands/multiagent/swarm.py:88
Return string representation of SwarmNode.
__repr__
Section titled “__repr__”def __repr__() -> strDefined in: src/strands/multiagent/swarm.py:92
Return detailed representation of SwarmNode.
reset_executor_state
Section titled “reset_executor_state”def reset_executor_state() -> NoneDefined in: src/strands/multiagent/swarm.py:96
Reset SwarmNode executor state to initial state when swarm was created.
If Swarm is resuming from an interrupt, we reset the executor state from the interrupt context.
SharedContext
Section titled “SharedContext”@dataclassclass SharedContext()Defined in: src/strands/multiagent/swarm.py:113
Shared context between swarm nodes.
add_context
Section titled “add_context”def add_context(node: SwarmNode, key: str, value: Any) -> NoneDefined in: src/strands/multiagent/swarm.py:118
Add context.
SwarmState
Section titled “SwarmState”@dataclassclass SwarmState()Defined in: src/strands/multiagent/swarm.py:162
Current state of swarm execution.
current_node
Section titled “current_node”The agent currently executing
The original task from the user that is being executed
completion_status
Section titled “completion_status”Current swarm execution status
shared_context
Section titled “shared_context”Context shared between agents
node_history
Section titled “node_history”Complete history of agents that have executed
start_time
Section titled “start_time”When swarm execution began
results
Section titled “results”Results from each agent execution
execution_time
Section titled “execution_time”Total execution time in milliseconds
handoff_node
Section titled “handoff_node”The agent to execute next
handoff_message
Section titled “handoff_message”Message passed during agent handoff
should_continue
Section titled “should_continue”def should_continue( *, max_handoffs: int, max_iterations: int, execution_timeout: float, repetitive_handoff_detection_window: int, repetitive_handoff_min_unique_agents: int) -> tuple[bool, str]Defined in: src/strands/multiagent/swarm.py:180
Check if the swarm should continue.
Returns: (should_continue, reason)
SwarmResult
Section titled “SwarmResult”@dataclassclass SwarmResult(MultiAgentResult)Defined in: src/strands/multiagent/swarm.py:223
Result from swarm execution - extends MultiAgentResult with swarm-specific details.
class Swarm(MultiAgentBase)Defined in: src/strands/multiagent/swarm.py:229
Self-organizing collaborative agent teams with shared working memory.
__init__
Section titled “__init__”def __init__( nodes: list[Agent], *, entry_point: Agent | None = None, max_handoffs: int = 20, max_iterations: int = 20, execution_timeout: float = 900.0, node_timeout: float = 300.0, repetitive_handoff_detection_window: int = 0, repetitive_handoff_min_unique_agents: int = 0, session_manager: SessionManager | None = None, hooks: list[HookProvider] | None = None, id: str = _DEFAULT_SWARM_ID, trace_attributes: Mapping[str, AttributeValue] | None = None) -> NoneDefined in: src/strands/multiagent/swarm.py:232
Initialize Swarm with agents and configuration.
Arguments:
id- Unique swarm id (default: “default_swarm”)nodes- List of nodes (e.g. Agent) to include in the swarmentry_point- Agent to start with. If None, uses the first agent (default: None)max_handoffs- Maximum handoffs to agents and users (default: 20)max_iterations- Maximum node executions within the swarm (default: 20)execution_timeout- Total execution timeout in seconds (default: 900.0)node_timeout- Individual node timeout in seconds (default: 300.0)repetitive_handoff_detection_window- Number of recent nodes to check for repetitive handoffs Disabled by default (default: 0)repetitive_handoff_min_unique_agents- Minimum unique agents required in recent sequence Disabled by default (default: 0)session_manager- Session manager for persisting graph state and execution history (default: None)hooks- List of hook providers for monitoring and extending graph execution behavior (default: None)trace_attributes- Custom trace attributes to apply to the agent’s trace span (default: None)
__call__
Section titled “__call__”def __call__(task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any) -> SwarmResultDefined in: src/strands/multiagent/swarm.py:303
Invoke the swarm synchronously.
Arguments:
task- The task to executeinvocation_state- Additional state/context passed to underlying agents. Defaults to None to avoid mutable default argument issues.**kwargs- Keyword arguments allowing backward compatible future changes.
invoke_async
Section titled “invoke_async”async def invoke_async(task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any) -> SwarmResultDefined in: src/strands/multiagent/swarm.py:318
Invoke the swarm asynchronously.
This method uses stream_async internally and consumes all events until completion, following the same pattern as the Agent class.
Arguments:
task- The task to executeinvocation_state- Additional state/context passed to underlying agents. Defaults to None to avoid mutable default argument issues.**kwargs- Keyword arguments allowing backward compatible future changes.
stream_async
Section titled “stream_async”async def stream_async(task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any) -> AsyncIterator[dict[str, Any]]Defined in: src/strands/multiagent/swarm.py:342
Stream events during swarm execution.
Arguments:
task- The task to executeinvocation_state- Additional state/context passed to underlying agents. Defaults to None to avoid mutable default argument issues.**kwargs- Keyword arguments allowing backward compatible future changes.
Yields:
Dictionary events during swarm execution, such as:
- multi_agent_node_start: When a node begins execution
- multi_agent_node_stream: Forwarded agent events with node context
- multi_agent_handoff: When control is handed off between agents
- multi_agent_node_stop: When a node stops execution
- result: Final swarm result
serialize_state
Section titled “serialize_state”def serialize_state() -> dict[str, Any]Defined in: src/strands/multiagent/swarm.py:949
Serialize the current swarm state to a dictionary.
deserialize_state
Section titled “deserialize_state”def deserialize_state(payload: dict[str, Any]) -> NoneDefined in: src/strands/multiagent/swarm.py:979
Restore swarm state from a session dict and prepare for execution.
This method handles two scenarios:
- If the persisted status is COMPLETED, FAILED resets all nodes and graph state to allow re-execution from the beginning.
- Otherwise, restores the persisted state and prepares to resume execution from the next ready nodes.
Arguments:
payload- Dictionary containing persisted state data including status, completed nodes, results, and next nodes to execute.