Event-driven orchestration for AI systems: new ways and old
Data processing tools like Kafka, Spark, Flink etc. have long been used to build workflows. Today, as people create complex AI systems, they unsurprisingly often find themselves needing orchestration frameworks. Is the field really so specific it needs new tools for solving the same problems? After all, LLMs are just APIs, and the fundamental patterns of workflow architecture remain the same.
A widespread data processing paradigm is event-driven architecture. The purpose of this example is to demonstrate how you can use tried-and-tested tools like Kafka and Faust to build workflows. This also comes with reliability and scalability of Kafka, easing production deployment.
We’ll walk through the basic concepts of the newly released LlamaIndex Workflows functionality, and compare them to what you get with Faust.
Here we are using the well-maintained fork of Faust: https://github.com/faust-streaming/faust
[ ]:
%pip install faust-streaming llama-index -U
The first example is a simple workflow with two steps:
[1]:
from llama_index.core.workflow import Event, StartEvent, StopEvent, Workflow, step
# `pip install llama-index-llms-openai` if you don't already have it
from llama_index.llms.openai import OpenAI
class JokeEvent(Event):
joke: str
class JokeFlow(Workflow):
llm = OpenAI()
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
Analysis:
This joke plays on the pun of "fish and ships" sounding like "fish and chips," a popular dish at seafood restaurants. The joke also incorporates the pirate theme by referencing ships, which is a common element in pirate lore.
Critique:
Overall, this joke is light-hearted and playful, making it suitable for a general audience. The use of wordplay adds a clever twist to the punchline, making it more engaging for the listener. However, the joke may be considered somewhat predictable as the setup leads directly to the pun, leaving little room for surprise or unexpected humor. Additionally, the joke relies heavily on the pun itself, which may not appeal to all audiences. Overall, while the joke is amusing and well-crafted, it may not be considered particularly original or innovative.
Let’s try do do the same thing with Faust.
Instead of different event classes, we’ll use channels to pass messages between our workflow steps. In this example, we just need to pass single strings, so we’ll use a single container type for that. If you want, you of course can use more complex types, including custom classes, dataclasses, etc.
Faust is stream-based, which means it will indefinitely process messages that are sent to the streams in an async loop until the app shuts down. Of course, concurrency also comes out of the box.
The catch with Faust is that it requires a Kafka broker (it’s the only dependency). Still, it’s quite easy to set up (see https://github.com/wurstmeister/kafka-docker), and your organization is probably already using Kafka for something else.
If you’re just playing around, you can execute the following cell to start a Kafka broker in a Docker container.
[2]:
!docker run -d --name kafka-zookeeper \
-p 2181:2181 -p 9092:9092 \
-e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 \
-e ZOOKEEPER_HOST=127.0.0.1 \
johnnypark/kafka-zookeeper
import time
time.sleep(30) # Wait for the broker to start
55d2b52b9a9ccc72fdfec3122223789b598c10d6009f8660063914434915d338
[2]:
import asyncio
import faust
from llama_index.llms.openai import OpenAI
app = faust.App('joke-flow', broker='kafka://localhost:9092', web_enabled=False)
class StringRecord(faust.Record):
content: str
# We create channels for passing messages between our workflow steps, just like we did with the events before.
# Start and end channels will be used as entry and exit points, respectively.
start_channel = app.channel(value_type=StringRecord)
joke_channel = app.channel(value_type=StringRecord)
end_channel = app.channel(value_type=StringRecord)
llm = OpenAI()
results = asyncio.Queue() # For collecting results (in practice, you'd use a database or something)
@app.agent(start_channel, sink=[joke_channel]) # Specify the input and output channels
async def generate_joke(requests):
async for request in requests: # Use an async loop to process incoming messages
prompt = f"Write your best joke about {request.content}."
response = await llm.acomplete(prompt)
yield StringRecord(content=str(response))
@app.agent(joke_channel, sink=[end_channel]) # We can even set the sink directly to another agent, like [handle_end]
async def critique_joke(jokes):
async for joke in jokes:
prompt = f"Give a thorough analysis and critique of the following joke: {joke.content}"
response = await llm.acomplete(prompt)
yield StringRecord(content=str(response))
@app.agent(end_channel)
async def handle_end(events):
async for event in events:
await results.put(event.content) # Put the result in the queue
To have more control over what channels to use, you can send the messages explicitly without specifying the sink
parameter:
@app.agent(start_channel)
async def generate_joke(requests):
async for request in requests:
...
await joke_channel.send(value=StringRecord(content=str(response)))
By the way, you can also use Kafka topics in place of channels:
topic = app.topic('my-topic', value_type=...)
@app.agent(topic)
async def process_stream(stream):
async for record in stream:
...
Let’s run the example:
[3]:
await app.start()
await start_channel.send(value=StringRecord(content="pirates"))
result = await results.get() # Get the first and only result from the queue
await app.stop()
print(result)
Analysis:
This joke plays on the pun of "fish and ships" sounding like "fish and chips," a popular dish at seafood restaurants. The joke also incorporates the pirate theme by referencing ships, which is a common element in pirate lore. The humor comes from the unexpected twist on a familiar phrase and the clever wordplay.
Critique:
Overall, this joke is light-hearted and fun, making it suitable for a wide audience. The use of wordplay is clever and adds an element of surprise to the punchline. However, the joke may be considered somewhat predictable as the setup hints at the punchline with the mention of ships. Additionally, the humor may not be particularly sophisticated or nuanced, which could limit its appeal to some audiences. Overall, while this joke is entertaining and likely to elicit a chuckle, it may not be memorable or particularly original in the long run.
Using context
A useful feature of LlamaIndex Workflows is the ability to pass context between steps. Let’s see how you can achieve the same effect with Faust.
The simplest way is to pass the context around in a record:
class Context(faust.Record):
user_id: str
query: str
...
class RecordWithContext(faust.Record):
...
context: Context
@app.agent
async def process_stream(stream):
async for record in stream:
...
# Pass the context to the next agent
await next_stream.send(value=RecordWithContext(..., context=record.context))
What about persisting state? We can use Faust’s tables for that. Tables are distributed key-value stores that can be used as regular Python dictionaries.
class RecordWithContext(faust.Record):
...
context_id: str
context_table = app.Table('context', default=dict)
@app.agent
async def process_stream(stream):
async for record in stream:
context = context_table.get(record.context_id)
...
context["new_key"] = "new_value"
context_table[record.context_id] = context
await next_stream.send(value=record)
Refer to Faust user guide to learn about these and other more advanced Faust capabilities, like timers, web views, statistics, etc.
Waiting for multiple events
LlamaIndex Context provides a handy collect_events
method to wait for multiple events. We can replicate the same behavior with Faust by using a table to store the incoming records and then checking if we have received all the required ones. This gives you even more flexibility on how you want to join the events and trigger the next step.
Here’s a simple example:
# The channels for the records we're waiting for
apples_channel = app.channel(value_type=AppleRecord)
oranges_channel = app.channel(value_type=OrangeRecord)
# Let's create tables to buffer the records we've received
apples_buffer = app.Table('apples_buffer', default=lambda: None)
oranges_buffer = app.Table('oranges_buffer', default=lambda: None)
async def try_process_fruits(context_id):
apples = apples_buffer.get(context_id)
oranges = oranges_buffer.get(context_id)
if apples is not None and oranges is not None:
# Do something with apples and oranges
...
del apples_buffer[context_id]
del oranges_buffer[context_id]
@app.agent(apples_channel)
async def process_first_event(stream):
async for record in stream:
apples_buffer[record.id] = record
await try_process_fruits(record.id)
@app.agent(oranges_channel)
async def process_second_event(stream):
async for record in stream:
oranges_buffer[record.id] = record
await try_process_fruits(record.id)
Constructing simple workflows in a concise manner using FaustWorkflow
We’ve created a wrapper around Faust that makes it easy to construct simple workflows in a LlamaIndex-Workflows-like manner, powered by event classes and type annotations. It also allows you to visualize the workflow.
This is just a proof of concept for now, done mostly for fun and as an illustration. If you like the idea and have suggestions for improvement, please let us know!
Here’s how you can implement the joke workflow from the previous section using FaustWorkflow:
[4]:
from motleycrew.applications.faust_workflow import FaustWorkflow, Event, step
class StartEvent(Event):
topic: str
class JokeEvent(Event):
joke: str
class StopEvent(Event):
result: str
class JokeFlow(FaustWorkflow):
result_event_type = StopEvent
llm = OpenAI()
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
prompt = f"Write your best joke about {ev.topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
prompt = f"Give a thorough analysis and critique of the following joke: {ev.joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
/Users/whimo/motleycrew/motleycrew/tools/__init__.py:3: LangChainDeprecationWarning: As of langchain-core 0.3.0, LangChain uses pydantic v2 internally. The langchain_core.pydantic_v1 module was a compatibility shim for pydantic v1, and should no longer be used. Please update the code to import from Pydantic directly.
For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. from pydantic.v1 import BaseModel
from motleycrew.tools.tool import MotleyTool
[5]:
app = faust.App("faust-joke-workflow-demo", broker="kafka://localhost:9092", web_enabled=False)
joke_workflow = JokeFlow(app=app, timeout=120)
result = await joke_workflow.run(StartEvent(topic="pirates"))
print(result.result)
Analysis:
This joke plays on the pun of "fish and ships" sounding like "fish and chips," a popular dish at seafood restaurants. The joke also incorporates the pirate theme by mentioning a pirate going to a seafood restaurant, which adds an element of humor.
Critique:
Overall, this joke is light-hearted and easy to understand, making it suitable for a wide audience. The use of wordplay is clever and adds a fun twist to the punchline. However, the joke may be considered somewhat predictable as the setup leads directly to the pun, leaving little room for surprise or unexpected humor. Additionally, the joke relies heavily on the pun itself, which may not appeal to everyone's sense of humor. Overall, while this joke is amusing and well-crafted, it may not be particularly memorable or original.
[6]:
from motleycrew.applications.faust_workflow import draw_faust_workflow
draw_faust_workflow(joke_workflow)
faust_workflow.html