Builtin Nodes

Nodes are the basic building blocks of Workflows

Collections

Group

class pyapp_flow.Group(*nodes_: Callable[[WorkflowContext], Any], log_level: str | int | None = None)

Group of nodes.

Useful for creating composable blocks, that don’t require variable scope.

Nodes

class pyapp_flow.Nodes(*nodes_: Callable[[WorkflowContext], Any], log_level: str | int | None = None)

A series of nodes to be executed on call.

Modify context variables

SetVar

class pyapp_flow.SetVar(**values: Any | Callable[[WorkflowContext], Any])

Set context variable to specified values

Parameters:

values – Key/Value pairs or Key/Callable pairs to be applied to the context.

SetVar(
    title="Hyperion",
    published=datetime.date(1996, 11, 1),
    updated=lambda context: datetime.datetime.now()
)

SetGlobalVar

class pyapp_flow.SetGlobalVar(**values: Any | Callable[[WorkflowContext], Any])

Set global context variable to specified values

Parameters:

values – Key/Value pairs or Key/Callable pairs to be applied to the context.

SetGlobalVar(
    title="Hyperion",
    published=datetime.date(1996, 11, 1),
    updated=lambda context: datetime.datetime.now()
)

Append

class pyapp_flow.Append(target_var: str, message: str)

Append a message to a list.

The message is formatted using pyapp_flow.WorkflowContext.format() before being appended the target_var.

If the target_var does not exist a new list will be created.

Parameters:
  • target_var – Name of the context variable with the list

  • message – The message to be formatted and added to the list.

Append("messages", "Unable to complete book")

CaptureErrors

class pyapp_flow.CaptureErrors(target_var: str, try_all: bool = True, *, except_types: type | Sequence[type] | None = None)

Capture and store any exceptions raised by node(s).

Errors are captured into the specified variable.

If the target_var does not exist a new list will be created.

Parameters:
  • target_var – Name of the context variable with the list

  • try_all – Call every node even if a previous node raised an exception.

(
    CaptureErrors("errors", try_all=True)
    .nodes(
        ...  # Node(s) to try
    )
)
branches() dict[str, Sequence[Navigable]] | None

Branches from an object in the workflow node tree.

property name

Name of the node.

nodes(*nodes: Callable[[WorkflowContext], Any]) Self

Add additional nodes.

Provide feedback

LogMessage

class pyapp_flow.LogMessage(message: str, *, level: int = 20)

Print a message to log with optional level.

The message is formatted using pyapp_flow.WorkflowContext.format() before being appended to the log.

Parameters:
  • message – The message to be formatted and added to the list.

  • level – Log level to use for log message; a constant from the logging module.

LogMessage("Failed to complete task", level=logging.ERROR)

Branching

Conditional

class pyapp_flow.Conditional(condition: str | Callable[[WorkflowContext], bool])

Branch a workflow based on a condition, analogous with an if statement.

Parameters:

condition – A condition can be either a context variable that can be interpreted as a bool (using Python rules) or a callable that accepts a pyapp_flow.WorkflowContext and returns a bool.

# With context variable
(
    If("is_successful")
    .true(LogMessage("Process successful :)"))
    .false(LogMessage("Process failed :("))
)

# With Lambda
(
    If(lambda context: len(context.state.errors) == 0)
    .true(LogMessage("Process successful :)"))
    .false(LogMessage("Process failed :("))
)
branches() dict[str, Sequence[Navigable]] | None

Branches from an object in the workflow node tree.

false(*nodes: Callable[[WorkflowContext], Any]) Self

Nodes to use for the false branch.

property name

Name of the node.

true(*nodes: Callable[[WorkflowContext], Any]) Self

Nodes to use for the true branch.

Switch

class pyapp_flow.Switch(condition: str | Callable[[WorkflowContext], Hashable])

Branch a workflow into one of multiple subprocesses, analogous with a switch statement found in many languages or with Python a dict lookup with a default fallback.

Parameters:

condition – A condition can be either a context variable that that provides a hashable object or a callable that accepts a pyapp_flow.WorkflowContext and returns a hashable object.

(
    Switch("status")
    .case("Active", ...)  # Active branch node(s)
    .case("Starting", ...)  # Starting branch node(s)
    .default(...)  # Optional default fallback branch node(s)
)
branches() dict[str, Sequence[Navigable]] | None

Branches from an object in the workflow node tree.

case(key: Hashable, *nodes: Callable[[WorkflowContext], Any]) Self

Key used to match branch and the nodes that make up the branch.

default(*nodes: Callable[[WorkflowContext], Any]) Self

If a case key is not matched use these nodes as the default branch.

property name

Name of the node.

Feature Flag

Iteration

ForEach

class pyapp_flow.ForEach(target_vars: str | Sequence[str], in_var: str, *, loop_label: str | None = None)

For each loop to iterate through a set of values.

Call a set of nodes on each value; analogous with a for loop this node will iterate through a sequence and call each of the child nodes.

All nodes within a for-each loop are in a nested context scope.

Values can be un-packed into multiple context variables using Python iterable unpacking rules.

Parameters:
  • target_vars – Singular or multiple variables to unpack value into. This value can be either a single string, a comma separated list of strings or a sequence of strings.

  • in_var – Context variable containing a sequence of values to be iterated over.

# With a single target variable
(
    ForEach("message", in_var="messages")
    .loop(log_message("- {message}"))
)

# With multiple target variables
(
    ForEach("name, age", in_var="students")
    .loop(log_message("- {name} is {age} years old."))
)
branches() dict[str, Sequence[Navigable]] | None

Branches from an object in the workflow node tree.

loop(*nodes: Callable[[WorkflowContext], Any]) Self

Nodes to call on each iteration of the foreach block.

property name

Name of the node.

property target_vars_name

Display value for target vars.

Error Handling

TryExcept

class pyapp_flow.TryExcept(*nodes: Callable[[WorkflowContext], Any])

Try a set of nodes and catch any exceptions.

(
    TryExcept(
        resolve_state_a,
        resolve_state_b,
    )
    .except_on(RuntimeError, fallback_state)
)
and_finally(*nodes) Self

Nodes that are always called even if an exception is raised.

This is analogous to a finally block.

branches() dict[str, Sequence[Navigable]] | None

Branches from an object in the workflow node tree.

except_on(exception: type[Exception], *nodes: Callable[[WorkflowContext], Any]) Self

Nodes to call if the exception is raised.

property name: str

Name of the node.

TryUntil

class pyapp_flow.TryUntil(except_types: type[Exception] | ~collections.abc.Sequence[type[Exception]] = <class 'pyapp_flow.errors.StepFailedError'>)

Try a set of nodes until one of them does not raise an exception.

The default behaviour is to catch a StepFailedError exception.

Parameters:

except_types – Exception type(s) to catch; defaults to StepFailedError.

(
    TryUntil()
    .nodes(
        resolve_state_a,
        resolve_state_b,
    )
    .default(
        fallback_state,
    )
)
branches() dict[str, Sequence[Navigable]] | None

Branches from an object in the workflow node tree.

default(node: Callable[[WorkflowContext], Any]) Self

Nodes to call if all nodes raise an exception.

property name: str

Name of the node.

nodes(*nodes: Callable[[WorkflowContext], Any]) Self

Nodes to try.