Patterns for orchestrating complex, reliable, and scalable multi-agent workflows
These patterns go beyond single-agent logic, providing blueprints for building robust systems of collaborating LLM agents, tools, and services. Mastering these will help you design AI solutions that are modular, resilient, and enterprise-ready.
A linear sequence of processing stages where the output of one stage is the input to the next. Each stage performs a specific transformation or analysis on the data. This pattern is particularly relevant for LLM agents as it directly parallels the concept of "Prompt Chaining" - the fundamental way to structure multi-step LLM tasks.
Example LLM workflow: Define Goal → Search Web → Synthesize Findings → Draft Report → Review & Edit
# Mock LLM (replace with PipelineAI or your LLM)
llm = PipelineAI(pipeline_key="public/gpt-j:base")
# 1. Define Goal
goal_prompt = PromptTemplate.from_template("""Define the main goal for this research task:\n{input}""")
# 2. Search Web
search_prompt = PromptTemplate.from_template("""Search the web for information about: {goal}""")
# 3. Synthesize Findings
synthesize_prompt = PromptTemplate.from_template("""Summarize key findings from the search:\n{search_results}""")
# 4. Draft Report
draft_prompt = PromptTemplate.from_template("""Draft a report based on the findings:\n{summary}""")
# 5. Review & Edit
review_prompt = PromptTemplate.from_template("""Review and edit the report for clarity and completeness:\n{draft}""")
# Chain the steps
pipeline = RunnableSequence([
goal_prompt | llm | StrOutputParser(),
search_prompt | llm | StrOutputParser(),
synthesize_prompt | llm | StrOutputParser(),
draft_prompt | llm | StrOutputParser(),
review_prompt | llm | StrOutputParser(),
])
# Run the pipeline
input_text = "How can AI improve supply chain efficiency?"
result = pipeline.invoke({"input": input_text})
print(result)
This pattern features a single, central manager agent that directs the workflow by decomposing the overall task into subtasks and delegating them to specialized worker agents. Each worker agent is responsible for a specific aspect of the process. The manager coordinates the sequencing, collects outputs from workers, and consolidates the results for the user.
# agent_base.py
class Agent:
def __init__(self, name):
self.name = name
def handle_task(self, task):
raise NotImplementedError("Each agent must implement handle_task.")
# flight_booker.py
from agent_base import Agent
class FlightBookerAgent(Agent):
def handle_task(self, task):
if task.get("type") == "book_flight":
destination = task.get("data", {}).get("destination", "Unknown")
return f"Flight booked to {destination} (Air France 345)"
return "FlightBooker: Task not recognized"
# hotel_booker.py
from agent_base import Agent
class HotelBookerAgent(Agent):
def handle_task(self, task):
if task.get("type") == "book_hotel":
destination = task.get("data", {}).get("destination", "Unknown")
return f"Hotel booked in {destination} (Hilton Garden Inn)"
return "HotelBooker: Task not recognized"
# travel_manager.py
from flight_booker import FlightBookerAgent
from hotel_booker import HotelBookerAgent
class TravelManagerAgent:
def __init__(self):
self.flight_agent = FlightBookerAgent("FlightBooker")
self.hotel_agent = HotelBookerAgent("HotelBooker")
def handle_travel_request(self, destination):
print(f"[Manager] Received travel request to {destination}")
# Decompose the task
flight_task = {"type": "book_flight", "data": {"destination": destination}}
hotel_task = {"type": "book_hotel", "data": {"destination": destination}}
# Delegate to workers
print("[Manager] Delegating flight booking...")
flight_result = self.flight_agent.handle_task(flight_task)
print("[Manager] Delegating hotel booking...")
hotel_result = self.hotel_agent.handle_task(hotel_task)
# Combine results
return {
"flight": flight_result,
"hotel": hotel_result,
"summary": f"Trip to {destination} booked: ✈️ + 🏨"
}
# main.py
from travel_manager import TravelManagerAgent
if __name__ == "__main__":
manager = TravelManagerAgent()
result = manager.handle_travel_request("Paris")
print("\n✅ Final Result:")
print(result["summary"])
print(result["flight"])
print(result["hotel"])
[Manager] Received travel request to Paris [Manager] Delegating flight booking... [Manager] Delegating hotel booking... ✅ Final Result: Trip to Paris booked: ✈️ + 🏨 Flight booked to Paris (Air France 345) Hotel booked in Paris (Hilton Garden Inn)
A decentralized workflow pattern where agents interact directly with each other without central coordination, following predefined interaction patterns. This enables more decentralized, event-driven architectures where agents react to events and messages.
Example: A "customer support" agent creates a "ticket" event, which a "technical agent" then picks up without being directly commanded by the first agent.
# Agent implementations (typically, each could wrap an LLM prompt/tool API)
class CustomerSupportAgent:
def __init__(self, bus):
self.bus = bus
def handle_user_issue(self, issue):
print(f"[Support] User reports: {issue}")
event = {"ticket_id": 1, "issue": issue}
self.bus.emit("ticket_created", event)
class TechnicalAgent:
def __init__(self, bus):
self.bus = bus
self.bus.subscribe("ticket_created", self.fix_issue)
def fix_issue(self, event):
print(f"[Tech] Working on ticket #{event['ticket_id']}: {event['issue']}")
# Would use an LLM here for troubleshooting steps!
self.bus.emit("ticket_resolved", event)
class NotificationAgent:
def __init__(self, bus):
self.bus = bus
self.bus.subscribe("ticket_resolved", self.notify_user)
def notify_user(self, event):
print(f"[Notification] Ticket #{event['ticket_id']} resolved.")
# Set up system
bus = EventBus()
support = CustomerSupportAgent(bus)
tech = TechnicalAgent(bus)
notifier = NotificationAgent(bus)
# Simulate a support request
support.handle_user_issue("Cannot access account.")
[Support] User reports: Cannot access account. [Tech] Working on ticket #1: Cannot access account. [Notification] Ticket #1 resolved.
The Saga pattern is a robust workflow strategy for managing distributed, multi-step processes. It breaks down a large transaction into a series of local steps, each with a compensating action to recover from failures. This ensures that even if part of a workflow fails, the system can gracefully revert to a consistent state—crucial for LLM agent workflows that span multiple services or actions.
def book_trip(destination):
try:
flight = book_flight(destination) # Step 1
try:
hotel = book_hotel(destination) # Step 2
return {"flight": flight, "hotel": hotel}
except HotelFullException:
cancel_flight(flight) # Compensating action
raise
except Exception as e:
# Handle global failure
raise e
The Command Query Responsibility Segregation (CQRS) pattern supercharges knowledge management agents by splitting the system into two distinct flows: one for learning (write/command) and one for answering (read/query). This separation allows each side to be optimized for its unique needs—ensuring blazing-fast answers for users and robust, reliable knowledge updates in the background.
Operation Side | Purpose | Characteristics |
---|---|---|
Command (Write) | Learn & Update | Slower, complex, consistency-focused |
Query (Read) | Answer & Retrieve | Fast, scalable, optimized-for-search |