motleycrew.tasks.task
Classes
|
Base class for describing tasks. |
|
Node representing a task in the graph. |
- class motleycrew.tasks.task.TaskNode(*, name: str, done: bool = False)
Bases:
MotleyGraphNodeNode representing a task in the graph.
- name
Name of the task.
- Type:
str
- done
Whether the task is done.
- Type:
bool
- name: str
- done: bool
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class motleycrew.tasks.task.Task(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)
Bases:
ABC,Generic[TaskUnitType]Base class for describing tasks.
This class is abstract and must be subclassed to implement the task logic.
- NODE_CLASS
Class for representing task nodes, can be overridden.
- Type:
Type[motleycrew.tasks.task.TaskNodeType]
- TASK_IS_UPSTREAM_LABEL
Label for indicating upstream tasks, can be overridden.
- TASK_IS_UPSTREAM_LABEL = 'task_is_upstream'
- __init__(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)
Initialize the task.
- Parameters:
name – Name of the task.
task_unit_class – Class for representing task units.
crew – Crew to which the task belongs. If not provided, the task should be registered with a crew later.
allow_async_units – Whether the task allows asynchronous units. Default is False. If True, the task may be queried for the next unit even if it has other units in progress.
- prepare_graph_store()
Prepare the graph store for storing tasks and their units.
- property graph_store: MotleyGraphStore
The graph store where the task is stored.
This is an alias for the graph store of the crew that the task belongs to.
- set_upstream(task: Task) Task
Set a task as an upstream task for the current task.
This means that the current task will not be queried for task units until the upstream task is marked as done.
- Parameters:
task – Upstream task.
- get_units(status: str | None = None) List[TaskUnitType]
Get the units of the task that are already inserted in the graph.
This method should be used for fetching the existing task units.
- Parameters:
status – Status of the task units to filter by.
- Returns:
List of task units.
- get_upstream_tasks() List[Task]
Get the upstream tasks of the current task.
- Returns:
List of upstream tasks.
- get_downstream_tasks() List[Task]
Get the downstream tasks of the current task.
- Returns:
List of downstream tasks.
- set_done(value: bool = True)
Set the done status of the task.
- Parameters:
value – Value to set the done status to.
- on_unit_dispatch(unit: TaskUnitType) None
Method that is called by the crew when a unit of the task is dispatched.
Should be implemented by the subclass if needed.
- Parameters:
unit – Task unit that is dispatched.
- on_unit_completion(unit: TaskUnitType) None
Method that is called by the crew when a unit of the task is completed.
Should be implemented by the subclass if needed.
- Parameters:
unit – Task unit that is completed.
- abstract get_next_unit() TaskUnitType | None
Get the next unit of the task to run. Must be implemented by the subclass.
This method is called in the crew’s main loop repeatedly while the task is not done and there are units in progress.
Note that returning a unit does not guarantee that it will be dispatched. Because of this, any state changes are strongly discouraged in this method. If you need to perform some actions when the unit is dispatched or completed, you should implement the
on_unit_dispatchand/oron_unit_completionmethods.If you need to find which units already exist in order to generate the next one, you can use the
get_unitsmethod.- Returns:
Next unit to run, or None if there are no units to run at the moment.
- abstract get_worker(tools: List[MotleyTool] | None) Runnable
Get the worker that will run the task units.
This method is called by the crew when a unit of the task is dispatched. The unit will be converted to a dictionary and passed to the worker’s
invokemethod.Typically, the worker is an agent, but it can be any object that implements the Langchain Runnable interface.
- Parameters:
tools – Tools to be used by the worker.
- Returns:
Worker that will run the task units.