Flow¶
Flow¶
adbflow.flow.engine.Flow
Multi-step workflow engine with conditions, error handling, and retry logic.
Properties¶
| Property | Type | Description |
|---|---|---|
steps |
list[Step] |
Registered steps |
Methods¶
| Method | Parameters | Returns | Description |
|---|---|---|---|
add_step |
name, action, condition=None, on_error=ErrorStrategy.STOP, retries=0 |
Flow |
Add a step (chainable) |
run_async |
— | FlowContext |
Execute the flow |
Parameters for add_step¶
name— step identifieraction—async def(ctx: FlowContext) -> Anycondition— optionalasync def(ctx: FlowContext) -> bool. Step runs only if this returnsTrueon_error—ErrorStrategy.STOP(default) orErrorStrategy.SKIPretries— number of retry attempts on failure
FlowContext¶
adbflow.flow.step.FlowContext
Shared context passed to every step.
Fields¶
| Field | Type | Description |
|---|---|---|
device |
Device |
The target device |
variables |
dict[str, Any] |
Shared variables between steps |
step_results |
dict[str, Any] |
Return value of each completed step |
Step¶
adbflow.flow.step.Step
Fields¶
| Field | Type | Description |
|---|---|---|
name |
str |
Step identifier |
action |
Callable[[FlowContext], Coroutine] |
The async action |
condition |
Callable \| None |
Optional guard condition |
on_error |
ErrorStrategy |
Error handling strategy |
retries |
int |
Retry count |
Example¶
from adbflow.flow import Flow
from adbflow.utils.types import ErrorStrategy
flow = Flow(device)
async def open_app(ctx):
await ctx.device.apps.start_async("com.example.app")
async def check_login(ctx):
return not await ctx.device.ui.exists_async(Selector().text("Dashboard"))
async def login(ctx):
# ... login logic ...
ctx.variables["logged_in"] = True
flow.add_step("open", open_app)
flow.add_step("login", login, condition=check_login, retries=2)
result = await flow.run_async()
print(result.variables)