motleycrew.tasks

Tasks and task units.

Modules

simple

task

task_unit

class motleycrew.tasks.Task(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)

Bases: ABC, Generic[TaskUnitType]

Base class for describing tasks.

This class is abstract and must be subclassed to implement the task logic.

NODE_CLASS

Class for representing task nodes, can be overridden.

Type:

Type[motleycrew.tasks.task.TaskNodeType]

TASK_IS_UPSTREAM_LABEL

Label for indicating upstream tasks, can be overridden.

NODE_CLASS

alias of TaskNode

TASK_IS_UPSTREAM_LABEL = 'task_is_upstream'
__init__(name: str, task_unit_class: Type[TaskUnitType], crew: MotleyCrew | None = None, allow_async_units: bool = False)

Initialize the task.

Parameters:
  • name – Name of the task.

  • task_unit_class – Class for representing task units.

  • crew – Crew to which the task belongs. If not provided, the task should be registered with a crew later.

  • allow_async_units – Whether the task allows asynchronous units. Default is False. If True, the task may be queried for the next unit even if it has other units in progress.

prepare_graph_store()

Prepare the graph store for storing tasks and their units.

property graph_store: MotleyGraphStore

The graph store where the task is stored.

This is an alias for the graph store of the crew that the task belongs to.

set_upstream(task: Task) Task

Set a task as an upstream task for the current task.

This means that the current task will not be queried for task units until the upstream task is marked as done.

Parameters:

task – Upstream task.

get_units(status: str | None = None) List[TaskUnitType]

Get the units of the task that are already inserted in the graph.

This method should be used for fetching the existing task units.

Parameters:

status – Status of the task units to filter by.

Returns:

List of task units.

get_upstream_tasks() List[Task]

Get the upstream tasks of the current task.

Returns:

List of upstream tasks.

get_downstream_tasks() List[Task]

Get the downstream tasks of the current task.

Returns:

List of downstream tasks.

set_done(value: bool = True)

Set the done status of the task.

Parameters:

value – Value to set the done status to.

on_unit_dispatch(unit: TaskUnitType) None

Method that is called by the crew when a unit of the task is dispatched.

Should be implemented by the subclass if needed.

Parameters:

unit – Task unit that is dispatched.

on_unit_completion(unit: TaskUnitType) None

Method that is called by the crew when a unit of the task is completed.

Should be implemented by the subclass if needed.

Parameters:

unit – Task unit that is completed.

abstract get_next_unit() TaskUnitType | None

Get the next unit of the task to run. Must be implemented by the subclass.

This method is called in the crew’s main loop repeatedly while the task is not done and there are units in progress.

Note that returning a unit does not guarantee that it will be dispatched. Because of this, any state changes are strongly discouraged in this method. If you need to perform some actions when the unit is dispatched or completed, you should implement the on_unit_dispatch and/or on_unit_completion methods.

If you need to find which units already exist in order to generate the next one, you can use the get_units method.

Returns:

Next unit to run, or None if there are no units to run at the moment.

abstract get_worker(tools: List[MotleyTool] | None) Runnable

Get the worker that will run the task units.

This method is called by the crew when a unit of the task is dispatched. The unit will be converted to a dictionary and passed to the worker’s invoke method.

Typically, the worker is an agent, but it can be any object that implements the Langchain Runnable interface.

Parameters:

tools – Tools to be used by the worker.

Returns:

Worker that will run the task units.

class motleycrew.tasks.SimpleTask(crew: MotleyCrew, description: str, name: str | None = None, agent: MotleyAgentAbstractParent | None = None, tools: Sequence[MotleyTool] | None = None, additional_params: dict[str, Any] | None = None, prompt_template_with_upstreams: PromptTemplate = PromptTemplate(input_variables=['description', 'upstream_results'], input_types={}, partial_variables={}, template='{description}\n\nYou must use the results of these upstream tasks:\n\n{upstream_results}\n'))

Bases: Task

Simple task class.

A simple task consists of a description and an agent that can execute the task. It produces a single task unit with a prompt based on the description and the results of upstream tasks.

The task is considered done when the task unit is completed.

__init__(crew: MotleyCrew, description: str, name: str | None = None, agent: MotleyAgentAbstractParent | None = None, tools: Sequence[MotleyTool] | None = None, additional_params: dict[str, Any] | None = None, prompt_template_with_upstreams: PromptTemplate = PromptTemplate(input_variables=['description', 'upstream_results'], input_types={}, partial_variables={}, template='{description}\n\nYou must use the results of these upstream tasks:\n\n{upstream_results}\n'))

Initialize the simple task.

Parameters:
  • crew – Crew to which the task belongs.

  • description – Description of the task.

  • name – Name of the task (will be used as the name of the task unit).

  • agent – Agent to execute the task.

  • tools – Tools to use for the task.

  • additional_params – Additional parameters for the task.

  • prompt_template_with_upstreams – Prompt template to use for generating the prompt if the task has upstream dependencies. Otherwise, just the description is used. The template must have input variables ‘description’ and ‘upstream_results’.

on_unit_completion(unit: SimpleTaskUnit) None

Handle completion of the task unit.

Sets the task as done and stores the output of the task unit.

Parameters:

unit – Task unit that has completed.

get_next_unit() SimpleTaskUnit | None

Get the next task unit to run.

If all upstream tasks are done, returns a task unit with the prompt based on the description and the results of the upstream tasks. Otherwise, returns None (the task is not ready to run yet).

Returns:

Task unit to run if the task is ready, None otherwise.

get_worker(tools: List[MotleyTool] | None) MotleyAgentAbstractParent

Get the worker for the task.

If the task is associated with a crew and an agent, returns the agent. Otherwise, raises an exception.

Parameters:

tools – Additional tools to add to the agent.

Returns:

Agent to run the task unit.

class motleycrew.tasks.TaskUnit(*, status: str = 'pending', output: Any | None = None)

Bases: MotleyGraphNode, ABC

Base class for describing task units. A task unit should contain all the input data for the worker (usually an agent). When a task unit is dispatched by the crew, it is converted to a dictionary and passed to the worker’s invoke() method.

status

Status of the task unit.

Type:

str

output

Output of the task unit.

Type:

Optional[Any]

status: str
output: Any | None
property pending

Whether the task unit is pending.

property running

Whether the task unit is running.

property done

Whether the task unit is done.

set_pending()

Set the task unit status to pending.

set_running()

Set the task unit status to running.

set_done()

Set the task unit status to done.

as_dict()

Represent the task as a dictionary for passing to invoke() methods of runnables.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'output': FieldInfo(annotation=Union[Any, NoneType], required=False, default=None), 'status': FieldInfo(annotation=str, required=False, default='pending')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.