#!/usr/bin/env python3
"""speedtest-hd — a CrystalDiskMark-style storage benchmark for Linux, on fio.

This is the Python successor to ``speedtest-hd.sh``. It measures storage the way
CrystalDiskMark does, and adds a dedicated **SLOG / sync-write latency profile**
for diagnosing ZFS ZIL performance (NFS / iSCSI / VM sync workloads).

Three profiles
--------------
* **cdm**  — the four CrystalDiskMark default tests (``SEQ1M Q8T1``, ``SEQ1M
  Q1T1``, ``RND4K Q32T16``, ``RND4K Q1T1``), each measured for read *and*
  write, reported in MB/s and IOPS.
* **slog** — synchronous 4K random writes swept across thread counts, reporting
  IOPS, MB/s and p50/p99 *commit latency*. This is the load a ZFS SLOG sees.
* **dd**   — a dependency-free fallback when ``fio`` isn't installed.

Why fio JSON?
-------------
Every measurement asks fio for ``--output-format=json`` and parses it with the
standard library. That's the whole reason this is Python and not bash: robust,
unit-safe parsing of bandwidth / IOPS / latency percentiles with no fragile
text scraping.

See README.md for the full case study (diagnosing a "slow" Optane SLOG that
turned out to be CPU power management, not the disk).
"""

from __future__ import annotations

import argparse
import glob
import json
import os
import shutil
import statistics
import subprocess
import sys
import time
from dataclasses import dataclass, field
from typing import Callable, Optional, Sequence

# --------------------------------------------------------------------------- #
# Color / styling (standard library only -- raw ANSI escape codes)
# --------------------------------------------------------------------------- #

# SGR escape sequences. We stick to the basic 16-color set so output respects
# the user's terminal theme instead of hard-coding RGB.
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
ITALIC = "\033[3m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
BRIGHT_CYAN = "\033[96m"


def supports_color(stream) -> bool:
    """Decide per-stream whether to emit ANSI codes.

    Honors the de-facto ``NO_COLOR`` / ``FORCE_COLOR`` conventions and otherwise
    colors only when the stream is an interactive terminal.
    """
    if os.environ.get("NO_COLOR"):
        return False
    if os.environ.get("FORCE_COLOR"):
        return True
    if os.environ.get("TERM") == "dumb":
        return False
    return bool(getattr(stream, "isatty", None)) and stream.isatty()


class Painter:
    """Wraps text in SGR codes, or returns it untouched when color is off."""

    def __init__(self, enabled: bool):
        self.enabled = enabled

    def paint(self, text: str, *codes: str) -> str:
        if not self.enabled or not codes:
            return text
        return "".join(codes) + text + RESET


# stdout carries results (banner + tables); stderr carries progress + verbose
# fio dumps. Each gets its own enable flag so piping one but not the other works.
OUT = Painter(supports_color(sys.stdout))
ERR = Painter(supports_color(sys.stderr))


# --------------------------------------------------------------------------- #
# Constants
# --------------------------------------------------------------------------- #

# Preference order for the IO engine. io_uring is the modern, lowest-overhead
# Linux engine; libaio is older (and only truly async with O_DIRECT); posixaio
# and sync are the portable fallbacks (e.g. some NFS mounts).
ENGINE_CANDIDATES = ("io_uring", "libaio", "posixaio", "sync")

# Binary unit multipliers (fio treats "1g" as 1 GiB, so we match that).
_SIZE_UNITS = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4}


@dataclass(frozen=True)
class CdmTest:
    """One CrystalDiskMark test. ``seq`` picks sequential vs random patterns."""

    label: str
    bs: str
    iodepth: int
    numjobs: int
    seq: bool

    @property
    def read_rw(self) -> str:
        return "read" if self.seq else "randread"

    @property
    def write_rw(self) -> str:
        return "write" if self.seq else "randwrite"


# The four CrystalDiskMark default tests, as data, in CrystalDiskMark's own
# display order. Q = queue depth (iodepth), T = threads (numjobs).
CDM_TESTS: tuple[CdmTest, ...] = (
    CdmTest("SEQ1M  Q8T1",   bs="1m", iodepth=8,  numjobs=1,  seq=True),
    CdmTest("SEQ1M  Q1T1",   bs="1m", iodepth=1,  numjobs=1,  seq=True),
    CdmTest("RND4K  Q32T16", bs="4k", iodepth=32, numjobs=16, seq=False),
    CdmTest("RND4K  Q1T1",   bs="4k", iodepth=1,  numjobs=1,  seq=False),
)

# Thread counts for the SLOG sweep. T1 is the headline single-stream latency;
# the higher counts show how the SLOG scales as concurrent sync writers pile on.
SLOG_THREADS: tuple[int, ...] = (1, 4, 8, 16)


# --------------------------------------------------------------------------- #
# Configuration
# --------------------------------------------------------------------------- #


@dataclass
class Config:
    """Resolved run settings. ``engine`` and ``direct`` may start unset and get
    filled in by :func:`detect_io_settings`."""

    path: str
    mode: Optional[str]            # "cdm" | "slog" | "dd" | None (auto)
    engine: Optional[str]          # None => auto-detect
    direct: Optional[bool]         # None => auto-detect
    runtime: int
    size: str
    verbose: bool
    assume_yes: bool
    benchfile: str = field(default="")

    @property
    def direct_flag(self) -> int:
        """fio's --direct takes 0/1; default to 0 until detection runs."""
        return 1 if self.direct else 0


# --------------------------------------------------------------------------- #
# Small helpers
# --------------------------------------------------------------------------- #


def log(message: str) -> None:
    """Progress/diagnostic line -> stderr, so stdout stays pure results."""
    print(message, file=sys.stderr, flush=True)


def step(message: str) -> None:
    """A single colored progress line ('measuring ...') -> stderr."""
    log(f"  {ERR.paint('▶', BOLD, CYAN)} {ERR.paint(message, DIM)}")


def vsection(title: str) -> None:
    """A clear, ruled header for one --verbose fio section -> stderr."""
    width = 78
    head = f"─── {title} "
    head += "─" * max(3, width - len(head))
    log("")
    log(ERR.paint(head, BOLD, CYAN))


def parse_size_bytes(size: str) -> int:
    """Turn an fio-style size ("4k", "1g", "512m", "2048") into bytes."""
    s = size.strip().lower()
    if s and s[-1] in _SIZE_UNITS:
        return int(float(s[:-1]) * _SIZE_UNITS[s[-1]])
    return int(s)


def render_table(
    headers: Sequence[str],
    rows: Sequence[Sequence[str]],
    aligns: Optional[Sequence[str]] = None,
    col_styles: Optional[Sequence[Optional[str]]] = None,
) -> str:
    """Render a bordered ASCII table whose columns auto-size to their content.

    ``aligns`` is a per-column "<" (left) or ">" (right); by default the first
    column is left-aligned (the label) and the rest are right-aligned (numbers).
    ``col_styles`` is an optional per-column ANSI style applied to body cells
    (header is always bold, borders dim). Widths are computed on the *plain*
    text and color is applied after padding, so escape codes never skew layout.
    """
    ncols = len(headers)
    if aligns is None:
        aligns = ["<"] + [">"] * (ncols - 1)
    if col_styles is None:
        col_styles = [None] * ncols

    # Widest cell (header or any row) sets each column's width.
    widths = [
        max([len(str(headers[c]))] + [len(str(r[c])) for r in rows])
        for c in range(ncols)
    ]

    bar = OUT.paint("|", DIM)
    sep = OUT.paint(" | ", DIM)

    def rule() -> str:
        return OUT.paint("+" + "+".join("-" * (w + 2) for w in widths) + "+", DIM)

    def line(cells: Sequence[str], *, header: bool = False) -> str:
        parts = []
        for c in range(ncols):
            padded = f"{str(cells[c]):{aligns[c]}{widths[c]}}"
            if header:
                parts.append(OUT.paint(padded, BOLD))
            elif col_styles[c]:
                parts.append(OUT.paint(padded, col_styles[c]))
            else:
                parts.append(padded)
        return f"{bar} " + sep.join(parts) + f" {bar}"

    out = [rule(), line(headers, header=True), rule()]
    out += [line(r) for r in rows]
    out.append(rule())
    return "\n".join(out)


# --------------------------------------------------------------------------- #
# fio: results, invocation, parsing
# --------------------------------------------------------------------------- #


@dataclass(frozen=True)
class FioResult:
    """Parsed metrics for one direction (read or write) of an fio run.

    Latencies are completion latency (clat) in microseconds; for synchronous
    writes that is effectively the durable-commit latency.
    """

    bw_mbps: float        # decimal MB/s, matching CrystalDiskMark's SI figure
    iops: float
    p50_us: float
    p99_us: float
    mean_us: float

    @classmethod
    def zero(cls) -> "FioResult":
        return cls(0.0, 0.0, 0.0, 0.0, 0.0)


def _direction_of(rw: str) -> str:
    """fio reports 'read' and 'write' sub-objects; pick the active one."""
    return "read" if "read" in rw else "write"


def _aggregate(jobs: list[dict], direction: str) -> FioResult:
    """Combine per-job fio JSON stats into a single :class:`FioResult`.

    We deliberately run fio *without* --group_reporting and aggregate ourselves,
    which sidesteps fio's group-merge quirks: sum throughput, average the median
    latency, and take the worst-case tail (p99) across jobs.
    """

    def section(job: dict) -> dict:
        return job.get(direction, {})

    bw_mbps = sum(section(j).get("bw_bytes", 0) for j in jobs) / 1e6
    iops = sum(section(j).get("iops", 0.0) for j in jobs)

    def percentiles(key: str) -> list[float]:
        vals = []
        for j in jobs:
            pct = section(j).get("clat_ns", {}).get("percentile", {})
            v = pct.get(key)
            if v is not None:
                vals.append(v)
        return vals

    p50_ns = percentiles("50.000000")
    p99_ns = percentiles("99.000000")
    mean_ns = [section(j).get("clat_ns", {}).get("mean", 0.0) for j in jobs]

    return FioResult(
        bw_mbps=bw_mbps,
        iops=iops,
        p50_us=(statistics.mean(p50_ns) / 1000.0) if p50_ns else 0.0,   # avg of medians
        p99_us=(max(p99_ns) / 1000.0) if p99_ns else 0.0,               # worst tail
        mean_us=(statistics.mean(mean_ns) / 1000.0) if mean_ns else 0.0,
    )


def run_fio(
    cfg: Config,
    *,
    rw: str,
    bs: str,
    iodepth: int,
    numjobs: int,
    engine: Optional[str] = None,
    direct: Optional[bool] = None,
    sync: bool = False,
    end_fsync: bool = False,
) -> FioResult:
    """Run a single fio job against the shared bench file and parse its JSON.

    ``engine``/``direct`` default to the detected config values; the SLOG path
    overrides them (psync + buffered + O_SYNC). ``end_fsync`` flushes the device
    cache at the end of a write run so cached writes can't inflate the result.
    """
    engine = engine if engine is not None else cfg.engine
    direct = direct if direct is not None else cfg.direct

    cmd = [
        "sudo", "fio",
        "--name=speedtest",
        f"--filename={cfg.benchfile}",
        f"--ioengine={engine}",
        f"--direct={1 if direct else 0}",
        f"--rw={rw}",
        f"--bs={bs}",
        f"--size={cfg.size}",
        f"--numjobs={numjobs}",
        f"--iodepth={iodepth}",
        "--time_based",
        f"--runtime={cfg.runtime}",
        "--output-format=json",
    ]
    if sync:
        cmd.append("--sync=1")        # O_SYNC: every write is a durable commit
    if end_fsync:
        cmd.append("--end_fsync=1")   # flush device cache once at the end

    proc = subprocess.run(cmd, capture_output=True, text=True)

    if cfg.verbose:
        sync_tag = "  sync=1" if sync else ""
        vsection(f"fio · {rw}  bs={bs}  iodepth={iodepth}  numjobs={numjobs}{sync_tag}")
        log("  " + ERR.paint("$ " + " ".join(cmd[1:]), DIM, ITALIC))
        log(ERR.paint(proc.stdout.strip() or "(no stdout)", DIM))
        if proc.stderr.strip():
            log(ERR.paint(proc.stderr.strip(), YELLOW))

    try:
        data = json.loads(proc.stdout)
        return _aggregate(data["jobs"], _direction_of(rw))
    except (json.JSONDecodeError, KeyError, ValueError):
        # fio failed or produced no parseable JSON; report zeros rather than
        # crashing the whole run (verbose mode above shows what went wrong).
        return FioResult.zero()


def fio_probe(path: str, engine: str, direct: bool) -> bool:
    """Throwaway 1s fio job to see if an (engine, O_DIRECT) combo works here.

    This is how we stay accurate *and* portable across ext4/xfs/btrfs/ZFS/NFS
    without the caller needing to know what each filesystem supports.
    """
    cmd = [
        "sudo", "fio", "--name=probe",
        f"--directory={path}",
        f"--ioengine={engine}",
        "--rw=write", "--bs=4k", "--size=1m",
        f"--direct={1 if direct else 0}",
        "--time_based", "--runtime=1",
    ]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    for leftover in glob.glob(os.path.join(path, "probe*")):
        try:
            os.remove(leftover)
        except OSError:
            pass
    return proc.returncode == 0


def detect_io_settings(cfg: Config) -> None:
    """Fill in ``cfg.engine`` and ``cfg.direct`` if the user didn't force them.

    O_DIRECT bypasses the OS page cache so we measure the device, not RAM. Not
    every filesystem supports it (older OpenZFS, some NFS), so we probe and fall
    back to buffered rather than erroring out.
    """
    if cfg.engine is None:
        for engine in ENGINE_CANDIDATES:
            if fio_probe(cfg.path, engine, direct=False):
                cfg.engine = engine
                break
        else:
            cfg.engine = "sync"

    if cfg.direct is None:
        cfg.direct = fio_probe(cfg.path, cfg.engine, direct=True)


# --------------------------------------------------------------------------- #
# Profiles
# --------------------------------------------------------------------------- #


def _meta_line(text: str) -> str:
    """Color the 'Label :' prefix of a banner metadata line, leave value as-is.

    The value may already contain its own ANSI codes (e.g. a red O_DIRECT
    warning); since these lines aren't width-aligned that's harmless.
    """
    key, sep, val = text.partition(":")
    if not sep:
        return "  " + text
    return "  " + OUT.paint(key + ":", BOLD, BRIGHT_CYAN) + val


def banner(title: str, cfg: Config, extra: Sequence[str] = ()) -> None:
    full = f"speedtest-hd  ·  {title}"
    inner = len(full) + 4

    print()
    print(OUT.paint("╭" + "─" * inner + "╮", BOLD, CYAN))
    print(
        OUT.paint("│", BOLD, CYAN)
        + "  " + OUT.paint(full, BOLD, WHITE) + "  "
        + OUT.paint("│", BOLD, CYAN)
    )
    print(OUT.paint("╰" + "─" * inner + "╯", BOLD, CYAN))
    print(_meta_line(f"Target  : {OUT.paint(cfg.path, CYAN)}"))
    for line_ in extra:
        print(_meta_line(line_))
    print()


def cdm_profile(cfg: Config) -> None:
    """The CrystalDiskMark-style profile: 4 tests x (read, write), MB/s + IOPS."""
    detect_io_settings(cfg)

    if cfg.direct:
        direct_label = OUT.paint("enabled (device)", GREEN)
    else:
        direct_label = OUT.paint(
            "DISABLED (buffered -- may reflect RAM cache!)", BOLD, RED
        )

    banner(
        "CrystalDiskMark-style storage benchmark",
        cfg,
        extra=[
            f"Engine  : {cfg.engine}    O_DIRECT: {direct_label}",
            f"Profile : size={cfg.size.upper()}  runtime={cfg.runtime}s/run  (8 runs)",
        ],
    )

    rows: list[list[str]] = []
    for test in CDM_TESTS:
        step(f"measuring {test.label.strip()} ...")
        r = run_fio(cfg, rw=test.read_rw, bs=test.bs,
                    iodepth=test.iodepth, numjobs=test.numjobs)
        w = run_fio(cfg, rw=test.write_rw, bs=test.bs,
                    iodepth=test.iodepth, numjobs=test.numjobs, end_fsync=True)
        rows.append([
            test.label,
            f"{r.bw_mbps:.2f}", f"{w.bw_mbps:.2f}",
            f"{r.iops:.0f}", f"{w.iops:.0f}",
        ])

    _cleanup(cfg)

    print()
    print(render_table(
        ["Test", "Read (MB/s)", "Write (MB/s)", "Read (IOPS)", "Write (IOPS)"],
        rows,
        col_styles=[CYAN, GREEN, YELLOW, GREEN, YELLOW],
    ))
    print()


def slog_profile(cfg: Config) -> None:
    """SLOG / sync-write latency profile.

    Forces synchronous 4K random writes (O_SYNC) via the portable psync engine,
    so every write is a ZIL commit -- exercising a ZFS SLOG exactly the way a
    sync=always dataset (NFS/iSCSI/VM) does, regardless of the dataset's own
    sync property. Engine/direct detection is only for the banner here.
    """
    detect_io_settings(cfg)

    banner(
        "SLOG / sync-write latency profile",
        cfg,
        extra=[
            "Method  : fio randwrite bs=4k --sync=1 (O_SYNC), psync engine",
            f"Profile : runtime={cfg.runtime}s/run  size={cfg.size.upper()}",
            "Note    : every write is a synchronous ZIL commit -- the load your",
            "          SLOG actually sees. Watch it live in another shell with:",
            "            zpool iostat -vl <pool> 1",
        ],
    )

    # Measure everything first (progress -> stderr), then draw the whole table,
    # so the "measuring..." lines can't interleave into the middle of it.
    rows: list[list[str]] = []
    for threads in SLOG_THREADS:
        step(f"measuring 4K sync randwrite T{threads} ...")
        res = run_fio(
            cfg, rw="randwrite", bs="4k", iodepth=1, numjobs=threads,
            engine="psync", direct=False, sync=True,
        )
        rows.append([
            f"4K sync T{threads}",
            f"{res.iops:.0f}", f"{res.bw_mbps:.2f}",
            f"{res.p50_us:.1f}", f"{res.p99_us:.1f}",
        ])

    _cleanup(cfg)

    print()
    print(render_table(
        ["Test", "IOPS", "MB/s", "p50 lat(us)", "p99 lat(us)"],
        rows,
        col_styles=[CYAN, GREEN, GREEN, YELLOW, MAGENTA],
    ))
    print()
    print(OUT.paint("  Healthy Optane SLOG (eg P1600X) single-stream (T1) target:", BOLD))
    print(OUT.paint("    ~15-25k IOPS, p50 latency ~40-65us.", GREEN)
          + OUT.paint(" Much higher latency usually means", DIM))
    print(OUT.paint("    CPU C-states / PCIe ASPM / BIOS power profile (eg Dell DAPC) throttling.", DIM))
    print()


def dd_profile(cfg: Config) -> None:
    """Dependency-free fallback when fio isn't installed.

    Far cruder than fio: a single sequential stream, and the cached-read figure
    will reflect RAM. Use it only as a rough sanity check.
    """
    count_mib = max(1, parse_size_bytes(cfg.size) // (1024**2))
    nbytes = count_mib * 1024**2

    banner(
        "basic dd benchmark (fio not installed)",
        cfg,
        extra=[f"Profile : {count_mib} MiB sequential stream"],
    )

    def timed_dd(args: list[str]) -> float:
        start = time.monotonic()
        subprocess.run(args, capture_output=True, text=True)
        elapsed = time.monotonic() - start
        return (nbytes / elapsed / 1e6) if elapsed > 0 else 0.0  # decimal MB/s

    step("measuring sequential write ...")
    write_mbps = timed_dd([
        "dd", "if=/dev/zero", f"of={cfg.benchfile}",
        "bs=1M", f"count={count_mib}", "conv=fdatasync,notrunc",
    ])

    step("dropping caches for uncached read ...")
    subprocess.run(["sudo", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"],
                   capture_output=True, text=True)

    step("measuring uncached read ...")
    uncached_mbps = timed_dd([
        "dd", f"if={cfg.benchfile}", "of=/dev/null", "bs=1M", f"count={count_mib}",
    ])

    step("measuring cached read ...")
    cached_mbps = timed_dd([
        "dd", f"if={cfg.benchfile}", "of=/dev/null", "bs=1M", f"count={count_mib}",
    ])

    _cleanup(cfg)

    print()
    print(render_table(
        ["Test", "MB/s"],
        [
            ["Sequential write", f"{write_mbps:.2f}"],
            ["Uncached read", f"{uncached_mbps:.2f}"],
            ["Cached read (RAM)", f"{cached_mbps:.2f}"],
        ],
        col_styles=[CYAN, GREEN],
    ))
    print()


def _cleanup(cfg: Config) -> None:
    """Remove the shared bench file."""
    try:
        os.remove(cfg.benchfile)
    except OSError:
        pass


# --------------------------------------------------------------------------- #
# CLI
# --------------------------------------------------------------------------- #


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="speedtest-hd.py",
        description="CrystalDiskMark-style storage benchmark built on fio.",
        epilog=(
            "Examples:\n"
            "  speedtest-hd.py .\n"
            "  speedtest-hd.py /mnt/nvmepool --runtime=10 --size=4g\n"
            "  speedtest-hd.py /mnt/nfsshare --buffered\n"
            "  speedtest-hd.py /mnt/nvme-ultra-r10/vm-root --slog --runtime=30\n"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    p.add_argument("path", help="directory/mount to benchmark ('.' for cwd)")

    mode = p.add_mutually_exclusive_group()
    mode.add_argument("--fio", action="store_const", const="cdm", dest="mode",
                      help="force the fio CrystalDiskMark-style profile")
    mode.add_argument("--dd", action="store_const", const="dd", dest="mode",
                      help="force the basic dd fallback test")
    mode.add_argument("--slog", action="store_const", const="slog", dest="mode",
                      help="SLOG / sync-write latency profile (ZFS ZIL)")

    direct = p.add_mutually_exclusive_group()
    direct.add_argument("--direct", action="store_const", const=True, dest="direct",
                        help="force O_DIRECT (bypass page cache)")
    direct.add_argument("--buffered", action="store_const", const=False, dest="direct",
                        help="force buffered IO (e.g. if O_DIRECT unsupported)")

    p.add_argument("--engine", choices=ENGINE_CANDIDATES,
                   help="force a specific IO engine (default: auto-detect)")
    p.add_argument("--runtime", type=int, default=5, metavar="SEC",
                   help="seconds per run (default: 5, like CrystalDiskMark)")
    p.add_argument("--size", default="1g", metavar="SIZE",
                   help="test file size (default: 1g)")
    p.add_argument("--verbose", action="store_true",
                   help="also print the full fio output for every run")
    p.add_argument("-y", "--yes", action="store_true", dest="assume_yes",
                   help="skip the confirmation prompt")

    p.set_defaults(mode=None, direct=None)
    return p


def confirm(cfg: Config) -> None:
    """Guard prompt -- we're about to write a multi-GB file to the target."""
    if cfg.assume_yes:
        return
    print(OUT.paint("NOTICE:", BOLD, YELLOW)
          + f" {cfg.size.upper()} free space on "
          + OUT.paint(f"'{cfg.path}'", CYAN)
          + " is required to perform the benchmark.")
    answer = input(f"Are you ready to start a storage benchmark against "
                   f"'{cfg.path}' ? ")
    if not answer.strip().lower().startswith("y"):
        print(OUT.paint("Ok, cancelled!", YELLOW))
        sys.exit(0)
    print(OUT.paint("Great! Starting benchmark now!", BOLD, GREEN))


def main(argv: Optional[Sequence[str]] = None) -> int:
    args = build_parser().parse_args(argv)

    path = os.getcwd() if args.path == "." else args.path
    if not os.path.exists(path):
        print(f"Path {path} does not exist", file=sys.stderr)
        return 1

    cfg = Config(
        path=path,
        mode=args.mode,
        engine=args.engine,
        direct=args.direct,
        runtime=args.runtime,
        size=args.size,
        verbose=args.verbose,
        assume_yes=args.assume_yes,
        benchfile=os.path.join(path, "speedtest-hd.bench"),
    )

    have_fio = shutil.which("fio") is not None

    # Resolve the mode: explicit flag wins; otherwise fio if available, else dd.
    mode = cfg.mode
    if mode in ("cdm", "slog") and not have_fio:
        print(ERR.paint("ERROR:", BOLD, RED)
              + " --fio/--slog require fio (apt install fio / pacman -S fio).",
              file=sys.stderr)
        return 1
    if mode is None:
        if have_fio:
            mode = "cdm"
        else:
            print(OUT.paint("\nfio is not installed -- falling back to basic dd test.",
                            YELLOW))
            print(OUT.paint("Install fio for the full CrystalDiskMark-style benchmark.",
                            DIM))
            mode = "dd"

    confirm(cfg)

    # Dictionary dispatch: map each mode name to its handler function (the values
    # are the functions themselves -- no parentheses), look up the one for `mode`,
    # then call it with (cfg). Equivalent to an if/elif chain over `mode`. `mode`
    # is always one of these three keys by now, so the lookup can't KeyError.
    {"cdm": cdm_profile, "slog": slog_profile, "dd": dd_profile}[mode](cfg)
    return 0


if __name__ == "__main__":
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print(ERR.paint("\nInterrupted.", YELLOW), file=sys.stderr)
        sys.exit(130)
