Skip to content

Flow

Flow

adbflow.flow.engine.Flow

Multi-step workflow engine with conditions, error handling, and retry logic.

Properties

Property Type Description
steps list[Step] Registered steps

Methods

Method Parameters Returns Description
add_step name, action, condition=None, on_error=ErrorStrategy.STOP, retries=0 Flow Add a step (chainable)
run_async FlowContext Execute the flow

Parameters for add_step

  • name — step identifier
  • actionasync def(ctx: FlowContext) -> Any
  • condition — optional async def(ctx: FlowContext) -> bool. Step runs only if this returns True
  • on_errorErrorStrategy.STOP (default) or ErrorStrategy.SKIP
  • retries — number of retry attempts on failure

FlowContext

adbflow.flow.step.FlowContext

Shared context passed to every step.

Fields

Field Type Description
device Device The target device
variables dict[str, Any] Shared variables between steps
step_results dict[str, Any] Return value of each completed step

Step

adbflow.flow.step.Step

Fields

Field Type Description
name str Step identifier
action Callable[[FlowContext], Coroutine] The async action
condition Callable \| None Optional guard condition
on_error ErrorStrategy Error handling strategy
retries int Retry count

Example

from adbflow.flow import Flow
from adbflow.utils.types import ErrorStrategy

flow = Flow(device)

async def open_app(ctx):
    await ctx.device.apps.start_async("com.example.app")

async def check_login(ctx):
    return not await ctx.device.ui.exists_async(Selector().text("Dashboard"))

async def login(ctx):
    # ... login logic ...
    ctx.variables["logged_in"] = True

flow.add_step("open", open_app)
flow.add_step("login", login, condition=check_login, retries=2)

result = await flow.run_async()
print(result.variables)