motleycrew.tasks.task

Classes

Task(name, task_unit_class[, crew, ...])

Base class for describing tasks.

TaskNode(*, name[, done])

Node representing a task in the graph.

class motleycrew.tasks.task.TaskNode(*, name: str, done: bool = False)

Bases: MotleyGraphNode

Node representing a task in the graph.

name

Name of the task.

Type:

str

done

Whether the task is done.

Type:

bool

name: str
done: bool
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'done': FieldInfo(annotation=bool, required=False, default=False), 'name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class motleycrew.tasks.task.Task(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)

Bases: ABC, Generic[TaskUnitType]

Base class for describing tasks.

This class is abstract and must be subclassed to implement the task logic.

NODE_CLASS

Class for representing task nodes, can be overridden.

Type:

Type[motleycrew.tasks.task.TaskNodeType]

TASK_IS_UPSTREAM_LABEL

Label for indicating upstream tasks, can be overridden.

NODE_CLASS

alias of TaskNode

TASK_IS_UPSTREAM_LABEL = 'task_is_upstream'
__init__(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)

Initialize the task.

Parameters:
  • name – Name of the task.

  • task_unit_class – Class for representing task units.

  • crew – Crew to which the task belongs. If not provided, the task should be registered with a crew later.

  • allow_async_units – Whether the task allows asynchronous units. Default is False. If True, the task may be queried for the next unit even if it has other units in progress.

prepare_graph_store()

Prepare the graph store for storing tasks and their units.

property graph_store: MotleyGraphStore

The graph store where the task is stored.

This is an alias for the graph store of the crew that the task belongs to.

set_upstream(task: Task) Task

Set a task as an upstream task for the current task.

This means that the current task will not be queried for task units until the upstream task is marked as done.

Parameters:

task – Upstream task.

get_units(status: str | None = None) List[TaskUnitType]

Get the units of the task that are already inserted in the graph.

This method should be used for fetching the existing task units.

Parameters:

status – Status of the task units to filter by.

Returns:

List of task units.

get_upstream_tasks() List[Task]

Get the upstream tasks of the current task.

Returns:

List of upstream tasks.

get_downstream_tasks() List[Task]

Get the downstream tasks of the current task.

Returns:

List of downstream tasks.

set_done(value: bool = True)

Set the done status of the task.

Parameters:

value – Value to set the done status to.

on_unit_dispatch(unit: TaskUnitType) None

Method that is called by the crew when a unit of the task is dispatched.

Should be implemented by the subclass if needed.

Parameters:

unit – Task unit that is dispatched.

on_unit_completion(unit: TaskUnitType) None

Method that is called by the crew when a unit of the task is completed.

Should be implemented by the subclass if needed.

Parameters:

unit – Task unit that is completed.

abstract get_next_unit() TaskUnitType | None

Get the next unit of the task to run. Must be implemented by the subclass.

This method is called in the crew’s main loop repeatedly while the task is not done and there are units in progress.

Note that returning a unit does not guarantee that it will be dispatched. Because of this, any state changes are strongly discouraged in this method. If you need to perform some actions when the unit is dispatched or completed, you should implement the on_unit_dispatch and/or on_unit_completion methods.

If you need to find which units already exist in order to generate the next one, you can use the get_units method.

Returns:

Next unit to run, or None if there are no units to run at the moment.

abstract get_worker(tools: List[MotleyTool] | None) Runnable

Get the worker that will run the task units.

This method is called by the crew when a unit of the task is dispatched. The unit will be converted to a dictionary and passed to the worker’s invoke method.

Typically, the worker is an agent, but it can be any object that implements the Langchain Runnable interface.

Parameters:

tools – Tools to be used by the worker.

Returns:

Worker that will run the task units.