motleycrew.tasks
Tasks and task units.
Modules
- class motleycrew.tasks.Task(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)
Bases:
ABC
,Generic
[TaskUnitType
]Base class for describing tasks.
This class is abstract and must be subclassed to implement the task logic.
- NODE_CLASS
Class for representing task nodes, can be overridden.
- Type:
Type[motleycrew.tasks.task.TaskNodeType]
- TASK_IS_UPSTREAM_LABEL
Label for indicating upstream tasks, can be overridden.
- TASK_IS_UPSTREAM_LABEL = 'task_is_upstream'
- __init__(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)
Initialize the task.
- Parameters:
name – Name of the task.
task_unit_class – Class for representing task units.
crew – Crew to which the task belongs. If not provided, the task should be registered with a crew later.
allow_async_units – Whether the task allows asynchronous units. Default is False. If True, the task may be queried for the next unit even if it has other units in progress.
- prepare_graph_store()
Prepare the graph store for storing tasks and their units.
- property graph_store: MotleyGraphStore
The graph store where the task is stored.
This is an alias for the graph store of the crew that the task belongs to.
- set_upstream(task: Task) Task
Set a task as an upstream task for the current task.
This means that the current task will not be queried for task units until the upstream task is marked as done.
- Parameters:
task – Upstream task.
- get_units(status: str | None = None) List[TaskUnitType]
Get the units of the task that are already inserted in the graph.
This method should be used for fetching the existing task units.
- Parameters:
status – Status of the task units to filter by.
- Returns:
List of task units.
- get_upstream_tasks() List[Task]
Get the upstream tasks of the current task.
- Returns:
List of upstream tasks.
- get_downstream_tasks() List[Task]
Get the downstream tasks of the current task.
- Returns:
List of downstream tasks.
- set_done(value: bool = True)
Set the done status of the task.
- Parameters:
value – Value to set the done status to.
- on_unit_dispatch(unit: TaskUnitType) None
Method that is called by the crew when a unit of the task is dispatched.
Should be implemented by the subclass if needed.
- Parameters:
unit – Task unit that is dispatched.
- on_unit_completion(unit: TaskUnitType) None
Method that is called by the crew when a unit of the task is completed.
Should be implemented by the subclass if needed.
- Parameters:
unit – Task unit that is completed.
- abstract get_next_unit() TaskUnitType | None
Get the next unit of the task to run. Must be implemented by the subclass.
This method is called in the crew’s main loop repeatedly while the task is not done and there are units in progress.
Note that returning a unit does not guarantee that it will be dispatched. Because of this, any state changes are strongly discouraged in this method. If you need to perform some actions when the unit is dispatched or completed, you should implement the
on_unit_dispatch
and/oron_unit_completion
methods.If you need to find which units already exist in order to generate the next one, you can use the
get_units
method.- Returns:
Next unit to run, or None if there are no units to run at the moment.
- abstract get_worker(tools: List[MotleyTool] | None) Runnable
Get the worker that will run the task units.
This method is called by the crew when a unit of the task is dispatched. The unit will be converted to a dictionary and passed to the worker’s
invoke
method.Typically, the worker is an agent, but it can be any object that implements the Langchain Runnable interface.
- Parameters:
tools – Tools to be used by the worker.
- Returns:
Worker that will run the task units.
- class motleycrew.tasks.SimpleTask(crew: MotleyCrew, description: str, name: str | None = None, agent: MotleyAgentAbstractParent | None = None, tools: Sequence[MotleyTool] | None = None, additional_params: dict[str, Any] | None = None, prompt_template_with_upstreams: PromptTemplate = PromptTemplate(input_variables=['description', 'upstream_results'], template='{description}\n\nYou must use the results of these upstream tasks:\n\n{upstream_results}\n'))
Bases:
Task
Simple task class.
A simple task consists of a description and an agent that can execute the task. It produces a single task unit with a prompt based on the description and the results of upstream tasks.
The task is considered done when the task unit is completed.
- __init__(crew: MotleyCrew, description: str, name: str | None = None, agent: MotleyAgentAbstractParent | None = None, tools: Sequence[MotleyTool] | None = None, additional_params: dict[str, Any] | None = None, prompt_template_with_upstreams: PromptTemplate = PromptTemplate(input_variables=['description', 'upstream_results'], template='{description}\n\nYou must use the results of these upstream tasks:\n\n{upstream_results}\n'))
Initialize the simple task.
- Parameters:
crew – Crew to which the task belongs.
description – Description of the task.
name – Name of the task (will be used as the name of the task unit).
agent – Agent to execute the task.
tools – Tools to use for the task.
additional_params – Additional parameters for the task.
prompt_template_with_upstreams – Prompt template to use for generating the prompt if the task has upstream dependencies. Otherwise, just the description is used. The template must have input variables ‘description’ and ‘upstream_results’.
- on_unit_completion(unit: SimpleTaskUnit) None
Handle completion of the task unit.
Sets the task as done and stores the output of the task unit.
- Parameters:
unit – Task unit that has completed.
- get_next_unit() SimpleTaskUnit | None
Get the next task unit to run.
If all upstream tasks are done, returns a task unit with the prompt based on the description and the results of the upstream tasks. Otherwise, returns None (the task is not ready to run yet).
- Returns:
Task unit to run if the task is ready, None otherwise.
- get_worker(tools: List[MotleyTool] | None) MotleyAgentAbstractParent
Get the worker for the task.
If the task is associated with a crew and an agent, returns the agent. Otherwise, raises an exception.
- Parameters:
tools – Additional tools to add to the agent.
- Returns:
Agent to run the task unit.
- class motleycrew.tasks.TaskUnit(*, status: str = 'pending', output: Any | None = None)
Bases:
MotleyGraphNode
,ABC
Base class for describing task units. A task unit should contain all the input data for the worker (usually an agent). When a task unit is dispatched by the crew, it is converted to a dictionary and passed to the worker’s
invoke()
method.- status
Status of the task unit.
- Type:
str
- output
Output of the task unit.
- Type:
Optional[Any]
- status: str
- output: Any | None
- property pending
Whether the task unit is pending.
- property running
Whether the task unit is running.
- property done
Whether the task unit is done.
- set_pending()
Set the task unit status to pending.
- set_running()
Set the task unit status to running.
- set_done()
Set the task unit status to done.
- as_dict()
Represent the task as a dictionary for passing to invoke() methods of runnables.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'output': FieldInfo(annotation=Union[Any, NoneType], required=False, default=None), 'status': FieldInfo(annotation=str, required=False, default='pending')}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.