Advanced output handling
For grasping the basic idea of output handlers, check out this introduction: validating agent output.
The purpose of this example is to demonstrate two concepts:
Obtaining structured output from the agent
Validating the output against the prompt that was given to the agent
This should give a good understanding of output handler semantics and prepare you to build your own ones.
Install lunary package if you need logging.
[1]:
from dotenv import load_dotenv
load_dotenv()
from motleycrew import MotleyCrew
from motleycrew.agents.langchain import ReActToolCallingMotleyAgent
from motleycrew.tasks import SimpleTask
from motleycrew.common.exceptions import InvalidOutput
from motleycrew.tools.code import PythonREPLTool
from motleycrew.tools.llm_tool import LLMTool
from motleycrew.tools import MotleyTool
In the validating agent output example, we used a simple tool to receive and check the output.
Here we want to have access to the agent input, so we subclass the MotleyTool class and implement its run method. Inside it, we’ll call an LLM critic that will assess the agent’s output.
MotleyTool has the handle_exceptions argument, which allows us to return exceptions from the tool back to the agent. This is useful for providing feedback on why the tool failed and how to fix it. By default, it handles InvalidOutput exceptions, but you can pass other exceptions to it as well, or pass True for handling any exception.
[ ]:
class CoderOutputHandler(MotleyTool):
def __init__(self):
super().__init__(
name="coder_output_handler",
description="Output handler. ONLY RETURN THE FINAL RESULT USING THIS TOOL!",
return_direct=True,
handle_exceptions=[ValueError],
) # args_schema is inferred automatically from the run method signature, but you can specify it explicitly
def run(self, code: str, comment: str):
agent_prompt = self.agent_input["prompt"]
critic = LLMTool(
name="critic",
description="improve the agent output",
prompt=(
"An AI agent was given this prompt: \n{agent_prompt}\n\n"
"Here's the code it generated: \n```\n{code}\n```\n\n"
"And the comment it gave: \n{comment}\n\n"
"The code must be as efficient as possible, "
"and the comment must be sensible and easily understandable. "
"Give constructive advice on how to improve them unless they are already perfect. "
"In this case, just return a single \"OK\" without quotes."
)
)
critic_message = critic.invoke({"agent_prompt": agent_prompt, "code": code, "comment": comment}).content
print("Critic response: ", critic_message)
if critic_message.strip().lower().startswith("ok"):
return code, comment
raise InvalidOutput(critic_message) # This will be returned to the agent to make it improve its output
In case the critic is not satisfied, the InvalidOutput exception will be returned to the agent, as if a regular tool was called. If everything is OK, we just return the agent’s output.
The ReActToolCallingMotleyAgent constructor has a max_iterations argument, as well as most other agents. It’s the maximum number of times the agent will be allowed to call tools. If the agent doesn’t provide a valid output after max_iterations, an exception will be raised. To avoid this, you can add an iteration counter to the critic’s state.
The output schema is inferred automatically from the run method signature. You can also specify it directly using the args_schema argument.
[3]:
crew = MotleyCrew()
coder = ReActToolCallingMotleyAgent(
name="coder",
tools=[PythonREPLTool(), CoderOutputHandler()],
force_output_handler=True,
verbose=True,
max_iterations=15,
)
task = SimpleTask(
crew=crew,
name="Explain and implement the quicksort algorithm",
description="Write a Python program that implements the quicksort algorithm "
"and test your implementation using the REPL tool. \n"
"Return the code and a comment explaining how it works.",
agent=coder,
)
[4]:
crew.run()
code, comment = task.output
> Entering new AgentExecutor chain...
Invoking: `python_repl` with `{'command': 'def quicksort(arr):\n if len(arr) <= 1:\n return arr\n else:\n pivot = arr[0]\n less_than_pivot = [x for x in arr[1:] if x <= pivot]\n greater_than_pivot = [x for x in arr[1:] if x > pivot]\n return quicksort(less_than_pivot) + [pivot] + quicksort(greater_than_pivot)\n\n# Test the quicksort function\nsample_array = [3, 6, 8, 10, 1, 2, 1]\nsorted_array = quicksort(sample_array)\nprint(sorted_array)'}`
responded: Thought: To implement the quicksort algorithm in Python, I will write a function that follows the quicksort logic. This involves selecting a pivot element, partitioning the array into elements less than and greater than the pivot, and recursively sorting the sub-arrays. After implementing the function, I will test it using the Python REPL tool to ensure it works correctly. Finally, I will return the code along with a comment explaining how it works.
Let's start by implementing the quicksort algorithm and testing it.
[1, 1, 2, 3, 6, 8, 10]
Invoking: `coder_output_handler` with `{'code': 'def quicksort(arr):\n if len(arr) <= 1:\n return arr\n else:\n pivot = arr[0]\n less_than_pivot = [x for x in arr[1:] if x <= pivot]\n greater_than_pivot = [x for x in arr[1:] if x > pivot]\n return quicksort(less_than_pivot) + [pivot] + quicksort(greater_than_pivot)\n\n# Test the quicksort function\nsample_array = [3, 6, 8, 10, 1, 2, 1]\nsorted_array = quicksort(sample_array)\nprint(sorted_array)', 'comment': "The quicksort function is a recursive sorting algorithm. It works by selecting a 'pivot' element from the array and partitioning the other elements into two sub-arrays: those less than the pivot and those greater than the pivot. The function then recursively sorts the sub-arrays. The base case for the recursion is an array of zero or one element, which is already sorted. The sorted array is constructed by concatenating the sorted 'less than pivot' sub-array, the pivot, and the sorted 'greater than pivot' sub-array. The test demonstrates the function by sorting a sample array."}`
responded: Thought: The quicksort implementation has been successfully tested and sorted the sample array correctly. Now, I will return the code along with a comment explaining how the quicksort algorithm works.
Let's proceed to return the final result.
Critic response: The code and comment are generally correct, but there are a few improvements that can be made for efficiency and clarity:
1. **Pivot Selection**: The current implementation always selects the first element as the pivot. This can lead to poor performance (O(n^2) time complexity) on already sorted arrays or arrays with many duplicate elements. A better approach is to select a random pivot or use the median-of-three method (choosing the median of the first, middle, and last elements).
2. **In-place Sorting**: The current implementation creates new lists for elements less than and greater than the pivot, which can be inefficient in terms of space. An in-place partitioning method can be used to improve space efficiency.
3. **Comment Clarity**: The comment is mostly clear, but it could mention the potential inefficiency of the pivot selection method and suggest alternatives.
Here's an improved version of the code with in-place partitioning and a random pivot selection:
```python
import random
def quicksort(arr, low=0, high=None):
if high is None:
high = len(arr) - 1
if low < high:
pivot_index = partition(arr, low, high)
quicksort(arr, low, pivot_index - 1)
quicksort(arr, pivot_index + 1, high)
def partition(arr, low, high):
pivot_index = random.randint(low, high)
pivot = arr[pivot_index]
arr[pivot_index], arr[high] = arr[high], arr[pivot_index] # Move pivot to end
i = low
for j in range(low, high):
if arr[j] < pivot:
arr[i], arr[j] = arr[j], arr[i]
i += 1
arr[i], arr[high] = arr[high], arr[i] # Move pivot to its final place
return i
# Test the quicksort function
sample_array = [3, 6, 8, 10, 1, 2, 1]
quicksort(sample_array)
print(sample_array)
```
Comment:
The quicksort function is a recursive sorting algorithm that sorts an array in-place. It works by selecting a 'pivot' element and partitioning the array into elements less than the pivot and elements greater than the pivot. The function then recursively sorts the partitions. A random pivot is chosen to improve performance on average, reducing the risk of worst-case time complexity. The partitioning is done in-place to save space. The base case for the recursion is when the sub-array has one or zero elements, which are already sorted. The test demonstrates the function by sorting a sample array.
InvalidOutput: The code and comment are generally correct, but there are a few improvements that can be made for efficiency and clarity:
1. **Pivot Selection**: The current implementation always selects the first element as the pivot. This can lead to poor performance (O(n^2) time complexity) on already sorted arrays or arrays with many duplicate elements. A better approach is to select a random pivot or use the median-of-three method (choosing the median of the first, middle, and last elements).
2. **In-place Sorting**: The current implementation creates new lists for elements less than and greater than the pivot, which can be inefficient in terms of space. An in-place partitioning method can be used to improve space efficiency.
3. **Comment Clarity**: The comment is mostly clear, but it could mention the potential inefficiency of the pivot selection method and suggest alternatives.
Here's an improved version of the code with in-place partitioning and a random pivot selection:
```python
import random
def quicksort(arr, low=0, high=None):
if high is None:
high = len(arr) - 1
if low < high:
pivot_index = partition(arr, low, high)
quicksort(arr, low, pivot_index - 1)
quicksort(arr, pivot_index + 1, high)
def partition(arr, low, high):
pivot_index = random.randint(low, high)
pivot = arr[pivot_index]
arr[pivot_index], arr[high] = arr[high], arr[pivot_index] # Move pivot to end
i = low
for j in range(low, high):
if arr[j] < pivot:
arr[i], arr[j] = arr[j], arr[i]
i += 1
arr[i], arr[high] = arr[high], arr[i] # Move pivot to its final place
return i
# Test the quicksort function
sample_array = [3, 6, 8, 10, 1, 2, 1]
quicksort(sample_array)
print(sample_array)
```
Comment:
The quicksort function is a recursive sorting algorithm that sorts an array in-place. It works by selecting a 'pivot' element and partitioning the array into elements less than the pivot and elements greater than the pivot. The function then recursively sorts the partitions. A random pivot is chosen to improve performance on average, reducing the risk of worst-case time complexity. The partitioning is done in-place to save space. The base case for the recursion is when the sub-array has one or zero elements, which are already sorted. The test demonstrates the function by sorting a sample array.
Invoking: `coder_output_handler` with `{'code': 'import random\n\ndef quicksort(arr, low=0, high=None):\n if high is None:\n high = len(arr) - 1\n\n if low < high:\n pivot_index = partition(arr, low, high)\n quicksort(arr, low, pivot_index - 1)\n quicksort(arr, pivot_index + 1, high)\n\ndef partition(arr, low, high):\n pivot_index = random.randint(low, high)\n pivot = arr[pivot_index]\n arr[pivot_index], arr[high] = arr[high], arr[pivot_index] # Move pivot to end\n i = low\n for j in range(low, high):\n if arr[j] < pivot:\n arr[i], arr[j] = arr[j], arr[i]\n i += 1\n arr[i], arr[high] = arr[high], arr[i] # Move pivot to its final place\n return i\n\n# Test the quicksort function\nsample_array = [3, 6, 8, 10, 1, 2, 1]\nquicksort(sample_array)\nprint(sample_array)', 'comment': "The quicksort function is a recursive sorting algorithm that sorts an array in-place. It works by selecting a 'pivot' element and partitioning the array into elements less than the pivot and elements greater than the pivot. The function then recursively sorts the partitions. A random pivot is chosen to improve performance on average, reducing the risk of worst-case time complexity. The partitioning is done in-place to save space. The base case for the recursion is when the sub-array has one or zero elements, which are already sorted. The test demonstrates the function by sorting a sample array."}`
responded: Thought: I will return the improved version of the quicksort algorithm, which includes in-place partitioning and random pivot selection for better performance. This version addresses the inefficiencies of the initial implementation and provides a more robust solution. I will also include a comment explaining how the improved quicksort algorithm works.
Let's proceed to return the final result.
Critic response: OK
('import random\n\ndef quicksort(arr, low=0, high=None):\n if high is None:\n high = len(arr) - 1\n\n if low < high:\n pivot_index = partition(arr, low, high)\n quicksort(arr, low, pivot_index - 1)\n quicksort(arr, pivot_index + 1, high)\n\ndef partition(arr, low, high):\n pivot_index = random.randint(low, high)\n pivot = arr[pivot_index]\n arr[pivot_index], arr[high] = arr[high], arr[pivot_index] # Move pivot to end\n i = low\n for j in range(low, high):\n if arr[j] < pivot:\n arr[i], arr[j] = arr[j], arr[i]\n i += 1\n arr[i], arr[high] = arr[high], arr[i] # Move pivot to its final place\n return i\n\n# Test the quicksort function\nsample_array = [3, 6, 8, 10, 1, 2, 1]\nquicksort(sample_array)\nprint(sample_array)', "The quicksort function is a recursive sorting algorithm that sorts an array in-place. It works by selecting a 'pivot' element and partitioning the array into elements less than the pivot and elements greater than the pivot. The function then recursively sorts the partitions. A random pivot is chosen to improve performance on average, reducing the risk of worst-case time complexity. The partitioning is done in-place to save space. The base case for the recursion is when the sub-array has one or zero elements, which are already sorted. The test demonstrates the function by sorting a sample array.")
> Finished chain.
See, the initial code was optimized thanks to the critic!
[5]:
from IPython.display import display, Markdown, Code
display(Code(code, language="python"))
import random
def quicksort(arr, low=0, high=None):
if high is None:
high = len(arr) - 1
if low < high:
pivot_index = partition(arr, low, high)
quicksort(arr, low, pivot_index - 1)
quicksort(arr, pivot_index + 1, high)
def partition(arr, low, high):
pivot_index = random.randint(low, high)
pivot = arr[pivot_index]
arr[pivot_index], arr[high] = arr[high], arr[pivot_index] # Move pivot to end
i = low
for j in range(low, high):
if arr[j] < pivot:
arr[i], arr[j] = arr[j], arr[i]
i += 1
arr[i], arr[high] = arr[high], arr[i] # Move pivot to its final place
return i
# Test the quicksort function
sample_array = [3, 6, 8, 10, 1, 2, 1]
quicksort(sample_array)
print(sample_array)
[6]:
display(Markdown(comment))
The quicksort function is a recursive sorting algorithm that sorts an array in-place. It works by selecting a ‘pivot’ element and partitioning the array into elements less than the pivot and elements greater than the pivot. The function then recursively sorts the partitions. A random pivot is chosen to improve performance on average, reducing the risk of worst-case time complexity. The partitioning is done in-place to save space. The base case for the recursion is when the sub-array has one or zero elements, which are already sorted. The test demonstrates the function by sorting a sample array.