Skip to content

Flow Engine

The flow engine lets you build multi-step automation workflows with conditions, error handling, and retry logic.

Basic Flow

from adbflow.flow import Flow, FlowContext
from adbflow.utils.types import ErrorStrategy

async def basic_flow(device):
    flow = Flow(device)

    async def open_app(ctx: FlowContext):
        await ctx.device.apps.start_async("com.example.app")

    async def login(ctx: FlowContext):
        username = await ctx.device.ui.wait_for_async(
            Selector().resource_id("com.example:id/username")
        )
        await username.tap_async()
        await username.text_input_async("user@test.com")

        password = await ctx.device.ui.find_async(
            Selector().resource_id("com.example:id/password")
        )
        await password.tap_async()
        await password.text_input_async("password123")

        await ctx.device.ui.find_async(
            Selector().text("Login").clickable()
        ).tap_async()

    async def verify(ctx: FlowContext):
        await ctx.device.wait_for_text_async("Dashboard", timeout=10.0)

    flow.add_step("open_app", open_app)
    flow.add_step("login", login)
    flow.add_step("verify", verify)

    result = await flow.run_async()

Conditional Steps

Skip steps based on runtime conditions:

async def conditional_flow(device):
    flow = Flow(device)

    async def check_logged_in(ctx: FlowContext) -> bool:
        return await ctx.device.ui.exists_async(
            Selector().text("Dashboard")
        )

    async def login(ctx: FlowContext):
        # ... login logic ...
        pass

    async def navigate(ctx: FlowContext):
        # ... navigate to feature ...
        pass

    # Login step only runs if not already logged in
    flow.add_step("login", login, condition=check_logged_in)
    flow.add_step("navigate", navigate)

    await flow.run_async()

Error Strategies

Control what happens when a step fails:

async def error_handling_flow(device):
    flow = Flow(device)

    async def risky_step(ctx: FlowContext):
        # This might fail
        await ctx.device.ui.wait_for_async(
            Selector().text("Unstable Element"),
            timeout=3.0
        )

    async def cleanup(ctx: FlowContext):
        await ctx.device.gestures.key_async(KeyCode.HOME)

    # Stop the entire flow on failure (default)
    flow.add_step("critical", risky_step, on_error=ErrorStrategy.STOP)

    # Skip this step on failure, continue with next
    flow.add_step("optional", risky_step, on_error=ErrorStrategy.SKIP)

    # Retry up to 3 times before failing
    flow.add_step("flaky", risky_step, on_error=ErrorStrategy.STOP, retries=3)

    flow.add_step("cleanup", cleanup)

    await flow.run_async()

Shared Variables

Steps share data through FlowContext.variables:

async def shared_state_flow(device):
    flow = Flow(device)

    async def get_username(ctx: FlowContext):
        element = await ctx.device.ui.find_async(
            Selector().resource_id("com.example:id/username_display")
        )
        ctx.variables["username"] = element.get_text()

    async def use_username(ctx: FlowContext):
        name = ctx.variables["username"]
        print(f"Logged in as: {name}")

    flow.add_step("get_username", get_username)
    flow.add_step("use_username", use_username)

    result = await flow.run_async()
    print(result.variables)        # shared variables
    print(result.step_results)     # per-step results

Tips

  • Use ErrorStrategy.SKIP for non-critical steps (e.g., dismissing optional dialogs).
  • Use retries for steps that may fail due to timing (e.g., waiting for network responses).
  • Keep steps small and focused — each step should do one thing.
  • Use FlowContext.variables to pass data between steps instead of closures.