Builtin Nodes¶
Nodes are the basic building blocks of Workflows
Collections¶
Group¶
- class pyapp_flow.Group(*nodes_: Callable[[WorkflowContext], Any], log_level: str | int | None = None)¶
Group of nodes.
Useful for creating composable blocks, that don’t require variable scope.
Nodes¶
- class pyapp_flow.Nodes(*nodes_: Callable[[WorkflowContext], Any], log_level: str | int | None = None)¶
A series of nodes to be executed on call.
Modify context variables¶
SetVar¶
- class pyapp_flow.SetVar(**values: Any | Callable[[WorkflowContext], Any])¶
Set context variable to specified values
- Parameters:
values – Key/Value pairs or Key/Callable pairs to be applied to the context.
SetVar( title="Hyperion", published=datetime.date(1996, 11, 1), updated=lambda context: datetime.datetime.now() )
SetGlobalVar¶
- class pyapp_flow.SetGlobalVar(**values: Any | Callable[[WorkflowContext], Any])¶
Set global context variable to specified values
- Parameters:
values – Key/Value pairs or Key/Callable pairs to be applied to the context.
SetGlobalVar( title="Hyperion", published=datetime.date(1996, 11, 1), updated=lambda context: datetime.datetime.now() )
Append¶
- class pyapp_flow.Append(target_var: str, message: str)¶
Append a message to a list.
The message is formatted using
pyapp_flow.WorkflowContext.format()before being appended thetarget_var.If the target_var does not exist a new list will be created.
- Parameters:
target_var – Name of the context variable with the list
message – The message to be formatted and added to the list.
Append("messages", "Unable to complete book")
CaptureErrors¶
- class pyapp_flow.CaptureErrors(target_var: str, try_all: bool = True, *, except_types: type | Sequence[type] | None = None)¶
Capture and store any exceptions raised by node(s).
Errors are captured into the specified variable.
If the target_var does not exist a new list will be created.
- Parameters:
target_var – Name of the context variable with the list
try_all – Call every node even if a previous node raised an exception.
( CaptureErrors("errors", try_all=True) .nodes( ... # Node(s) to try ) )
- branches() dict[str, Sequence[Navigable]] | None¶
Branches from an object in the workflow node tree.
- property name¶
Name of the node.
- nodes(*nodes: Callable[[WorkflowContext], Any]) Self¶
Add additional nodes.
Provide feedback¶
LogMessage¶
- class pyapp_flow.LogMessage(message: str, *, level: int = 20)¶
Print a message to log with optional level.
The message is formatted using
pyapp_flow.WorkflowContext.format()before being appended to the log.- Parameters:
message – The message to be formatted and added to the list.
level – Log level to use for log message; a constant from the logging module.
LogMessage("Failed to complete task", level=logging.ERROR)
Branching¶
Conditional¶
- class pyapp_flow.Conditional(condition: str | Callable[[WorkflowContext], bool])¶
Branch a workflow based on a condition, analogous with an if statement.
- Parameters:
condition – A condition can be either a context variable that can be interpreted as a
bool(using Python rules) or a callable that accepts apyapp_flow.WorkflowContextand returns abool.
# With context variable ( If("is_successful") .true(LogMessage("Process successful :)")) .false(LogMessage("Process failed :(")) ) # With Lambda ( If(lambda context: len(context.state.errors) == 0) .true(LogMessage("Process successful :)")) .false(LogMessage("Process failed :(")) )
- branches() dict[str, Sequence[Navigable]] | None¶
Branches from an object in the workflow node tree.
- false(*nodes: Callable[[WorkflowContext], Any]) Self¶
Nodes to use for the false branch.
- property name¶
Name of the node.
- true(*nodes: Callable[[WorkflowContext], Any]) Self¶
Nodes to use for the true branch.
Switch¶
- class pyapp_flow.Switch(condition: str | Callable[[WorkflowContext], Hashable])¶
Branch a workflow into one of multiple subprocesses, analogous with a switch statement found in many languages or with Python a dict lookup with a default fallback.
- Parameters:
condition – A condition can be either a context variable that that provides a hashable object or a callable that accepts a
pyapp_flow.WorkflowContextand returns a hashable object.
( Switch("status") .case("Active", ...) # Active branch node(s) .case("Starting", ...) # Starting branch node(s) .default(...) # Optional default fallback branch node(s) )
- branches() dict[str, Sequence[Navigable]] | None¶
Branches from an object in the workflow node tree.
- case(key: Hashable, *nodes: Callable[[WorkflowContext], Any]) Self¶
Key used to match branch and the nodes that make up the branch.
- default(*nodes: Callable[[WorkflowContext], Any]) Self¶
If a case key is not matched use these nodes as the default branch.
- property name¶
Name of the node.
Feature Flag¶
Iteration¶
ForEach¶
- class pyapp_flow.ForEach(target_vars: str | Sequence[str], in_var: str, *, loop_label: str | None = None)¶
For each loop to iterate through a set of values.
Call a set of nodes on each value; analogous with a for loop this node will iterate through a sequence and call each of the child nodes.
All nodes within a for-each loop are in a nested context scope.
Values can be un-packed into multiple context variables using Python iterable unpacking rules.
- Parameters:
target_vars – Singular or multiple variables to unpack value into. This value can be either a single string, a comma separated list of strings or a sequence of strings.
in_var – Context variable containing a sequence of values to be iterated over.
# With a single target variable ( ForEach("message", in_var="messages") .loop(log_message("- {message}")) ) # With multiple target variables ( ForEach("name, age", in_var="students") .loop(log_message("- {name} is {age} years old.")) )
- branches() dict[str, Sequence[Navigable]] | None¶
Branches from an object in the workflow node tree.
- loop(*nodes: Callable[[WorkflowContext], Any]) Self¶
Nodes to call on each iteration of the foreach block.
- property name¶
Name of the node.
- property target_vars_name¶
Display value for target vars.
Error Handling¶
TryExcept¶
- class pyapp_flow.TryExcept(*nodes: Callable[[WorkflowContext], Any])¶
Try a set of nodes and catch any exceptions.
( TryExcept( resolve_state_a, resolve_state_b, ) .except_on(RuntimeError, fallback_state) )
- and_finally(*nodes) Self¶
Nodes that are always called even if an exception is raised.
This is analogous to a
finallyblock.
- branches() dict[str, Sequence[Navigable]] | None¶
Branches from an object in the workflow node tree.
- except_on(exception: type[Exception], *nodes: Callable[[WorkflowContext], Any]) Self¶
Nodes to call if the exception is raised.
- property name: str¶
Name of the node.
TryUntil¶
- class pyapp_flow.TryUntil(except_types: type[Exception] | ~collections.abc.Sequence[type[Exception]] = <class 'pyapp_flow.errors.StepFailedError'>)¶
Try a set of nodes until one of them does not raise an exception.
The default behaviour is to catch a
StepFailedErrorexception.- Parameters:
except_types – Exception type(s) to catch; defaults to
StepFailedError.
( TryUntil() .nodes( resolve_state_a, resolve_state_b, ) .default( fallback_state, ) )
- branches() dict[str, Sequence[Navigable]] | None¶
Branches from an object in the workflow node tree.
- default(node: Callable[[WorkflowContext], Any]) Self¶
Nodes to call if all nodes raise an exception.
- property name: str¶
Name of the node.
- nodes(*nodes: Callable[[WorkflowContext], Any]) Self¶
Nodes to try.