Skip to content

Logcat

Stream, filter, and capture Android log output. Access via device.logcat.

Stream Logs

async def stream_example(device):
    # Stream all logs
    async for entry in device.logcat.stream_async():
        print(f"[{entry.level}] {entry.tag}: {entry.message}")

Filter Logs

from adbflow.utils.types import LogLevel

async def filter_examples(device):
    # Filter by tag
    async for entry in device.logcat.stream_async(tags=["MyApp", "ActivityManager"]):
        print(entry.message)

    # Filter by level
    async for entry in device.logcat.stream_async(level=LogLevel.ERROR):
        print(f"{entry.tag}: {entry.message}")

    # Filter by PID
    async for entry in device.logcat.stream_async(pid=12345):
        print(entry.message)

    # Filter by regex pattern
    async for entry in device.logcat.stream_async(pattern="Exception|Error"):
        print(f"{entry.tag}: {entry.message}")

    # Filter since a timestamp
    async for entry in device.logcat.stream_async(since="2024-01-01 12:00:00.000"):
        print(entry.message)

    # Combine filters
    async for entry in device.logcat.stream_async(
        tags=["MyApp"],
        level=LogLevel.WARNING
    ):
        print(entry.message)

Capture to File

async def capture_examples(device):
    # Capture for a duration
    await device.logcat.capture_async(
        output_path="/local/logs.txt",
        duration=10.0  # seconds
    )

    # Capture a fixed number of entries
    await device.logcat.capture_async(
        output_path="/local/logs.txt",
        count=1000
    )

    # Capture with filters
    await device.logcat.capture_async(
        output_path="/local/errors.txt",
        tags=["MyApp"],
        level=LogLevel.ERROR,
        duration=30.0
    )

Clear Logs

async def clear_example(device):
    await device.logcat.clear_async()

Crash Detection

Monitor for app crashes in the background:

async def crash_detect_example(device):
    def on_crash(crash_info):
        print(f"CRASH: {crash_info.package}")
        print(f"Signal: {crash_info.signal}")

    # Start monitoring (returns an asyncio.Task)
    task = await device.logcat.crash_detect_async(
        callback=on_crash,
        packages=["com.example.app"]
    )

    # ... run your test ...

    # Cancel monitoring
    task.cancel()

Tips

  • Log streams are async iterators — use async for to consume them.
  • Always limit streams with filters in production to avoid overwhelming output.
  • Use crash_detect_async during test runs to catch unexpected crashes.
  • clear_async() is useful before a test to start with a clean log buffer.