Documentation Index
Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
Use this file to discover all available pages before exploring further.
The SequentialWorkflow orchestrates multiple agents in a linear chain, where each agent processes the output from the previous agent. This creates a pipeline where tasks flow through a series of specialized agents.
When to Use
- Step-by-step processes: Tasks with clear sequential dependencies
- Data transformation pipelines: Progressive refinement of outputs
- Multi-stage analysis: Research → Analysis → Writing → Review
- Quality improvement: Multiple rounds of refinement
Key Features
- Automatic flow construction from agent list
- Support for multiple execution loops
- Shared memory between agents (optional)
- Team awareness capabilities
- Conversation history tracking
- Autosave functionality
Basic Example
from swarms import Agent, SequentialWorkflow
# Create specialized agents
researcher = Agent(
agent_name="Researcher",
system_prompt="Research the given topic and provide detailed information.",
model_name="gpt-5.4",
max_loops=1,
)
writer = Agent(
agent_name="Writer",
system_prompt="Transform research into an engaging article.",
model_name="gpt-5.4",
max_loops=1,
)
editor = Agent(
agent_name="Editor",
system_prompt="Edit and polish the article for publication.",
model_name="gpt-5.4",
max_loops=1,
)
# Create workflow
workflow = SequentialWorkflow(
agents=[researcher, writer, editor],
max_loops=1,
)
# Execute
result = workflow.run("The impact of AI on healthcare")
print(result)
Advanced Configuration
With Team Awareness
workflow = SequentialWorkflow(
name="ContentPipeline",
description="Research, write, and edit content",
agents=[researcher, writer, editor],
max_loops=1,
team_awareness=True, # Agents know about each other
multi_agent_collab_prompt=True, # Add collaboration instructions
output_type="dict",
autosave=True,
verbose=True,
)
With Shared Memory
from swarms.memory.langtrace_memory import LangTrace
# Initialize shared memory
shared_memory = LangTrace()
workflow = SequentialWorkflow(
agents=[researcher, writer, editor],
shared_memory_system=shared_memory,
max_loops=1,
)
Key Parameters
name
str
default:"SequentialWorkflow"
Identifier for the workflow instance
List of agents to execute sequentially
Number of times to execute the complete workflow
Format for workflow output (dict, str, list, etc.)
Enable agents to know about team structure and flow
multi_agent_collab_prompt
Add collaboration instructions to agent prompts
Automatically save conversation history
Methods
run()
Execute the workflow with a task.
result = workflow.run(
task="Analyze quantum computing trends",
img=None, # Optional image input
)
run_batched()
Process multiple tasks sequentially.
tasks = [
"AI in healthcare",
"AI in finance",
"AI in education"
]
results = workflow.run_batched(tasks)
run_async()
Execute workflow asynchronously.
import asyncio
async def main():
result = await workflow.run_async("Task description")
return result
result = asyncio.run(main())
run_concurrent()
Run multiple tasks concurrently.
tasks = ["Task 1", "Task 2", "Task 3"]
results = workflow.run_concurrent(tasks)
run_stream() / arun_stream()
Stream tokens from each agent in pipeline order, in real time. Each agent’s tokens are yielded the moment the LLM produces them; once an agent finishes, its full output is handed off to the next agent — same hand-off as run(), just streamed.
# Sync generator
for token in workflow.run_stream("Analyse NVDA"):
print(token, end="", flush=True)
# Async generator
import asyncio
async def main():
async for token in workflow.arun_stream("Analyse NVDA"):
print(token, end="", flush=True)
asyncio.run(main())
Structured events: with_events=True
By default both methods yield plain token strings. Pass with_events=True to receive structured event dicts instead — useful when you want to render per-agent panels, attribute tokens to their emitting agent, or know exactly when each agent starts and finishes.
async for evt in workflow.arun_stream(task, with_events=True):
if evt["type"] == "agent_start":
print(f"--- {evt['agent']} starting ---")
elif evt["type"] == "token":
print(evt["token"], end="", flush=True)
elif evt["type"] == "agent_end":
print(f"\n--- {evt['agent']} finished ({len(evt['output'])} chars) ---")
Three event types are emitted:
| Type | Fields | When |
|---|
agent_start | agent | Right before an agent begins streaming |
token | agent, token | For every token emitted by the agent |
agent_end | agent, output | After the agent has finished, carries the full output |
max_loops > 1 and drift_detection are not applied in streaming mode. Use run() if you need those.
Use Cases
Content Creation Pipeline
# Research → Write → Edit → Publish
researcher = Agent(agent_name="Researcher", ...)
writer = Agent(agent_name="Writer", ...)
editor = Agent(agent_name="Editor", ...)
publisher = Agent(agent_name="Publisher", ...)
pipeline = SequentialWorkflow(
agents=[researcher, writer, editor, publisher]
)
article = pipeline.run("Latest developments in renewable energy")
Data Analysis Pipeline
# Collect → Clean → Analyze → Visualize
collector = Agent(agent_name="DataCollector", ...)
cleaner = Agent(agent_name="DataCleaner", ...)
analyst = Agent(agent_name="Analyst", ...)
visualizer = Agent(agent_name="Visualizer", ...)
analysis_pipeline = SequentialWorkflow(
agents=[collector, cleaner, analyst, visualizer]
)
result = analysis_pipeline.run("Q4 sales data")
Code Review Pipeline
# Generate → Test → Review → Document
coder = Agent(agent_name="Coder", ...)
tester = Agent(agent_name="Tester", ...)
reviewer = Agent(agent_name="Reviewer", ...)
documenter = Agent(agent_name="Documenter", ...)
code_pipeline = SequentialWorkflow(
agents=[coder, tester, reviewer, documenter]
)
output = code_pipeline.run("Create a binary search function")
Best Practices
Tip: Keep each agent focused on a single responsibility for cleaner workflows
- Clear Agent Roles: Each agent should have a distinct, well-defined purpose
- Appropriate Ordering: Place agents in logical sequence (research before writing)
- Output Formatting: Ensure each agent’s output format matches the next agent’s expected input
- Error Handling: Monitor for failures in early stages to prevent cascade issues
- Performance: Consider workflow length - too many agents can slow execution
Advanced Features
Custom Flow Generation
The workflow automatically generates a flow string:
workflow = SequentialWorkflow(
agents=[agent1, agent2, agent3]
)
# Automatically generates: "agent1 -> agent2 -> agent3"
print(workflow.flow)
Conversation Tracking
Access complete conversation history:
result = workflow.run("Task")
# Access conversation through internal agent_rearrange
history = workflow.agent_rearrange.conversation.conversation_history
The workflow validates that agents is not empty and max_loops > 0 on initialization