Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.swarms.world/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The SequentialWorkflow class orchestrates the execution of a sequence of agents in a defined workflow. This class enables the construction and execution of a workflow where multiple agents (or callables) are executed in a specified order, passing tasks and optional data through the chain.

Key Features

  • Sequential Execution: Agents execute one after another in a defined order
  • Synchronous and Asynchronous Support: Both blocking and non-blocking execution modes
  • Batched Processing: Execute multiple tasks through the same workflow
  • Concurrent Task Processing: Run multiple tasks concurrently using thread pools
  • Team Awareness: Agents can be aware of their position in the team structure
  • Auto-save Support: Automatically save conversation history to workspace
  • Flexible Output Types: Support for various output formats (dict, list, string)

Installation

pip install -U swarms

Class Definition

class SequentialWorkflow:
    def __init__(
        self,
        id: str = "sequential_workflow",
        name: str = "SequentialWorkflow",
        description: str = "Sequential Workflow, where agents are executed in a sequence.",
        agents: List[Union[Agent, Callable]] = None,
        max_loops: int = 1,
        output_type: OutputType = "dict",
        shared_memory_system: callable = None,
        multi_agent_collab_prompt: bool = False,
        team_awareness: bool = False,
        autosave: bool = True,
        verbose: bool = False,
        drift_detection: bool = False,
        drift_threshold: float = 0.75,
        drift_model: str = "claude-sonnet-4-5",
        *args,
        **kwargs,
    )

Parameters

id
str
default:"sequential_workflow"
Unique identifier for the workflow instance
name
str
default:"SequentialWorkflow"
Human-readable name for the workflow
description
str
Description of the workflow’s purpose
agents
List[Union[Agent, Callable]]
required
List of agents or callables to execute in sequence. Cannot be None or empty.
max_loops
int
default:"1"
Maximum number of times to execute the workflow. Must be greater than 0.
output_type
OutputType
default:"dict"
Format of the output from the workflow. Options include “dict”, “list”, “str”, etc.
shared_memory_system
callable
default:"None"
Optional callable for managing shared memory between agents
multi_agent_collab_prompt
bool
default:"False"
If True, appends a collaborative prompt to each agent’s system prompt
team_awareness
bool
default:"False"
Whether agents are aware of the team structure and their position
autosave
bool
default:"True"
Whether to enable autosaving of conversation history to workspace
verbose
bool
default:"False"
Whether to enable verbose logging
drift_detection
bool
default:"False"
If True, a judge agent scores the final output’s semantic alignment with the original task after the pipeline completes. If the score falls below drift_threshold, the pipeline reruns and the cycle repeats until the score meets the threshold. If the judge output cannot be parsed, drift checking is skipped and the last result is returned as-is.
drift_threshold
float
default:"0.75"
Minimum alignment score (0.0–1.0) to consider the output acceptable. A warning is logged when the score falls below this value, and the pipeline reruns. Used only when drift_detection=True.
drift_model
str
default:"claude-sonnet-4-5"
Model used by the drift detection judge agent. Used only when drift_detection=True.

Methods

run(task, img=None, imgs=None, *args, **kwargs)

Executes a specified task through the agents in the dynamically constructed flow. If drift_detection is configured, a judge agent scores the final output’s semantic alignment with the original task after the pipeline completes. If the score falls below drift_threshold, the pipeline reruns and the cycle repeats until the score meets the threshold.
task
str
required
The task for the agents to execute
img
Optional[str]
default:"None"
An optional image input forwarded to every agent that supports it
imgs
Optional[List[str]]
default:"None"
Optional list of images. Accepted for API compatibility; only img is currently forwarded to the underlying flow.
result
str | dict | list
The final result, formatted per output_type

run_batched(tasks)

Executes a batch of tasks through the agents sequentially.
tasks
List[str]
required
A list of tasks for the agents to execute. Must be a non-empty list of strings.
results
List[str]
A list of final results after processing through all agents

run_async(task)

Executes the specified task through the agents asynchronously.
task
str
required
The task for the agents to execute. Must be a non-empty string.
result
str
The final result after processing through all agents

run_concurrent(tasks)

Executes a batch of tasks through the agents concurrently using ThreadPoolExecutor.
tasks
List[str]
required
A list of tasks for the agents to execute. Must be a non-empty list of strings.
results
List[str]
A list of final results after processing through all agents

run_stream(task, img=None, with_events=False, **kwargs)

Stream tokens from each agent in pipeline order, in real time. Each agent’s tokens are yielded the moment the LLM produces them; once an agent finishes, its full output is handed off to the next agent — the same hand-off as run(), just streamed. Delegates to AgentRearrange.run_stream internally.
task
str
required
Initial task passed to the first agent in the pipeline.
img
Optional[str]
default:"None"
Optional image input forwarded to every agent.
with_events
bool
default:"False"
When False, yield plain token strings. When True, yield structured event dicts (agent_start, token, agent_end) — useful for per-agent UI panels or attributing each token to its emitting agent.
generator
Iterator[str | dict]
Sync generator. Yields plain token strings by default, or event dicts when with_events=True.
for token in workflow.run_stream("Analyse NVDA"):
    print(token, end="", flush=True)

arun_stream(task, img=None, with_events=False, **kwargs)

Async generator version of run_stream. Drop-in for any async caller (FastAPI’s StreamingResponse, async background workers, etc.). Same parameter and yield semantics as the sync version.
async_generator
AsyncIterator[str | dict]
Async generator. Yields plain token strings by default, or event dicts when with_events=True.
async for token in workflow.arun_stream("Analyse NVDA"):
    print(token, end="", flush=True)

Structured events

When called with with_events=True, both methods yield three event types:
TypeFieldsWhen emitted
agent_startagentRight before an agent begins streaming
tokenagent, tokenFor every token the agent emits
agent_endagent, outputAfter the agent finishes; carries the full output
async for evt in workflow.arun_stream(task, with_events=True):
    if evt["type"] == "agent_start":
        print(f"--- {evt['agent']} starting ---")
    elif evt["type"] == "token":
        print(evt["token"], end="", flush=True)
    elif evt["type"] == "agent_end":
        print(f"\n--- {evt['agent']} finished ({len(evt['output'])} chars) ---")
max_loops > 1 and drift_detection are not applied in streaming mode. Use run() if you need those.

sequential_flow()

Constructs a string representation of the agent execution order.
flow
str
A string showing the order of agent execution (e.g., “AgentA -> AgentB -> AgentC”)

reliability_check()

Validates the workflow configuration and prepares agents for execution. Raises:
  • ValueError: If the agents list is None or empty, or if max_loops is set to 0

Attributes

AttributeTypeDescription
idstrUnique identifier for the workflow instance
namestrHuman-readable name for the workflow
descriptionstrDescription of the workflow’s purpose
agentsList[Union[Agent, Callable]]List of agents or callables to execute in sequence
max_loopsintMaximum number of times to execute the workflow
output_typeOutputTypeFormat of the output from the workflow
shared_memory_systemcallableOptional shared memory helper
multi_agent_collab_promptboolWhether the collaboration prompt is appended to each agent
team_awarenessboolWhether team-awareness is enabled on the underlying AgentRearrange
autosaveboolWhether conversation history is autosaved to disk
verboseboolWhether verbose logging is on
drift_thresholdfloatMinimum alignment score required when drift_detection=True
drift_agentOptional[Agent]Judge agent used for drift detection; None when disabled
swarm_workspace_dirOptional[str]Workspace directory used for autosave; populated by _setup_autosave()
flowstrString representation of the agent execution order
agent_rearrangeAgentRearrangeInternal helper for managing agent execution

Usage Examples

Basic Sequential Workflow

from swarms import Agent, SequentialWorkflow
from swarms.models import OpenAIChat

# Initialize the language model
llm = OpenAIChat(
    model_name="claude-sonnet-4-6",
    temperature=0.5,
)

# Create specialized agents
researcher = Agent(
    agent_name="Researcher",
    llm=llm,
    max_loops=1,
    system_prompt="You are a research specialist. Gather and analyze information."
)

writer = Agent(
    agent_name="Writer",
    llm=llm,
    max_loops=1,
    system_prompt="You are a content writer. Create engaging content based on research."
)

editor = Agent(
    agent_name="Editor",
    llm=llm,
    max_loops=1,
    system_prompt="You are an editor. Review and polish the content."
)

# Create sequential workflow
workflow = SequentialWorkflow(
    name="Content Creation Pipeline",
    description="Research -> Write -> Edit pipeline",
    agents=[researcher, writer, editor],
    max_loops=1,
    verbose=True
)

# Run the workflow
result = workflow.run("Create a blog post about artificial intelligence")
print(result)

Batched Task Processing

# Process multiple tasks through the same workflow
tasks = [
    "Analyze the impact of AI on healthcare",
    "Discuss the future of renewable energy",
    "Examine cybersecurity trends"
]

results = workflow.run_batched(tasks)

for task, result in zip(tasks, results):
    print(f"Task: {task}")
    print(f"Result: {result}")
    print("-" * 50)

Concurrent Task Execution

import asyncio

# Run multiple tasks concurrently
tasks = [
    "Research quantum computing",
    "Research blockchain technology",
    "Research machine learning"
]

results = workflow.run_concurrent(tasks)
print(f"Processed {len(results)} tasks concurrently")

Async Execution

import asyncio

async def process_task():
    result = await workflow.run_async("Analyze climate change solutions")
    return result

# Run async
result = asyncio.run(process_task())
print(result)

With Team Awareness

# Create workflow with team awareness
workflow = SequentialWorkflow(
    name="Collaborative Pipeline",
    agents=[researcher, writer, editor],
    team_awareness=True,  # Agents know about each other
    multi_agent_collab_prompt=True,  # Add collaboration prompt
    verbose=True
)

result = workflow.run("Create a comprehensive market analysis")

Custom Output Types

# Different output formats
workflow_dict = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="dict"  # Returns dict format
)

workflow_list = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="list"  # Returns list format
)

workflow_str = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="str"  # Returns string format
)

With Autosave

import os

# Set workspace directory for autosave
os.environ["WORKSPACE_DIR"] = "./my_workspace"

workflow = SequentialWorkflow(
    name="SavedWorkflow",
    agents=[researcher, writer, editor],
    autosave=True,  # Automatically save conversation history
    verbose=True
)

result = workflow.run("Generate a report")
# Conversation history saved to ./my_workspace/swarms/SequentialWorkflow/

Error Handling

try:
    workflow = SequentialWorkflow(
        agents=[researcher, writer],
        max_loops=1
    )
    result = workflow.run("Process this task")
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Execution error: {e}")

Best Practices

  1. Agent Design: Ensure each agent has a clear, specific role in the sequence
  2. Error Handling: Use try-except blocks around workflow execution
  3. Verbose Mode: Enable verbose logging during development for debugging
  4. Autosave: Enable autosave for important workflows to preserve conversation history
  5. Task Clarity: Provide clear, specific tasks to get the best results
  6. Resource Management: Use run_concurrent for I/O-bound tasks to improve performance
  7. Team Awareness: Enable team awareness for complex workflows where context matters

Common Use Cases

  • Content Creation Pipelines: Research -> Write -> Edit workflows
  • Data Processing: Extract -> Transform -> Load (ETL) pipelines
  • Analysis Workflows: Collect -> Analyze -> Report sequences
  • Quality Assurance: Create -> Review -> Approve chains
  • Multi-Stage Reasoning: Break complex problems into sequential steps