#!/usr/bin/env python3 """speedtest-hd — a CrystalDiskMark-style storage benchmark for Linux, on fio. This is the Python successor to ``speedtest-hd.sh``. It measures storage the way CrystalDiskMark does, and adds a dedicated **SLOG / sync-write latency profile** for diagnosing ZFS ZIL performance (NFS / iSCSI / VM sync workloads). Three profiles -------------- * **cdm** — the four CrystalDiskMark default tests (``SEQ1M Q8T1``, ``SEQ1M Q1T1``, ``RND4K Q32T16``, ``RND4K Q1T1``), each measured for read *and* write, reported in MB/s and IOPS. * **slog** — synchronous 4K random writes swept across thread counts, reporting IOPS, MB/s and p50/p99 *commit latency*. This is the load a ZFS SLOG sees. * **dd** — a dependency-free fallback when ``fio`` isn't installed. Why fio JSON? ------------- Every measurement asks fio for ``--output-format=json`` and parses it with the standard library. That's the whole reason this is Python and not bash: robust, unit-safe parsing of bandwidth / IOPS / latency percentiles with no fragile text scraping. See README.md for the full case study (diagnosing a "slow" Optane SLOG that turned out to be CPU power management, not the disk). """ from __future__ import annotations import argparse import glob import json import os import shutil import statistics import subprocess import sys import time from dataclasses import dataclass, field from typing import Callable, Optional, Sequence # --------------------------------------------------------------------------- # # Color / styling (standard library only -- raw ANSI escape codes) # --------------------------------------------------------------------------- # # SGR escape sequences. We stick to the basic 16-color set so output respects # the user's terminal theme instead of hard-coding RGB. RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" ITALIC = "\033[3m" RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" WHITE = "\033[37m" BRIGHT_CYAN = "\033[96m" def supports_color(stream) -> bool: """Decide per-stream whether to emit ANSI codes. Honors the de-facto ``NO_COLOR`` / ``FORCE_COLOR`` conventions and otherwise colors only when the stream is an interactive terminal. """ if os.environ.get("NO_COLOR"): return False if os.environ.get("FORCE_COLOR"): return True if os.environ.get("TERM") == "dumb": return False return bool(getattr(stream, "isatty", None)) and stream.isatty() class Painter: """Wraps text in SGR codes, or returns it untouched when color is off.""" def __init__(self, enabled: bool): self.enabled = enabled def paint(self, text: str, *codes: str) -> str: if not self.enabled or not codes: return text return "".join(codes) + text + RESET # stdout carries results (banner + tables); stderr carries progress + verbose # fio dumps. Each gets its own enable flag so piping one but not the other works. OUT = Painter(supports_color(sys.stdout)) ERR = Painter(supports_color(sys.stderr)) # --------------------------------------------------------------------------- # # Constants # --------------------------------------------------------------------------- # # Preference order for the IO engine. io_uring is the modern, lowest-overhead # Linux engine; libaio is older (and only truly async with O_DIRECT); posixaio # and sync are the portable fallbacks (e.g. some NFS mounts). ENGINE_CANDIDATES = ("io_uring", "libaio", "posixaio", "sync") # Binary unit multipliers (fio treats "1g" as 1 GiB, so we match that). _SIZE_UNITS = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4} @dataclass(frozen=True) class CdmTest: """One CrystalDiskMark test. ``seq`` picks sequential vs random patterns.""" label: str bs: str iodepth: int numjobs: int seq: bool @property def read_rw(self) -> str: return "read" if self.seq else "randread" @property def write_rw(self) -> str: return "write" if self.seq else "randwrite" # The four CrystalDiskMark default tests, as data, in CrystalDiskMark's own # display order. Q = queue depth (iodepth), T = threads (numjobs). CDM_TESTS: tuple[CdmTest, ...] = ( CdmTest("SEQ1M Q8T1", bs="1m", iodepth=8, numjobs=1, seq=True), CdmTest("SEQ1M Q1T1", bs="1m", iodepth=1, numjobs=1, seq=True), CdmTest("RND4K Q32T16", bs="4k", iodepth=32, numjobs=16, seq=False), CdmTest("RND4K Q1T1", bs="4k", iodepth=1, numjobs=1, seq=False), ) # Thread counts for the SLOG sweep. T1 is the headline single-stream latency; # the higher counts show how the SLOG scales as concurrent sync writers pile on. SLOG_THREADS: tuple[int, ...] = (1, 4, 8, 16) # --------------------------------------------------------------------------- # # Configuration # --------------------------------------------------------------------------- # @dataclass class Config: """Resolved run settings. ``engine`` and ``direct`` may start unset and get filled in by :func:`detect_io_settings`.""" path: str mode: Optional[str] # "cdm" | "slog" | "dd" | None (auto) engine: Optional[str] # None => auto-detect direct: Optional[bool] # None => auto-detect runtime: int size: str verbose: bool assume_yes: bool benchfile: str = field(default="") @property def direct_flag(self) -> int: """fio's --direct takes 0/1; default to 0 until detection runs.""" return 1 if self.direct else 0 # --------------------------------------------------------------------------- # # Small helpers # --------------------------------------------------------------------------- # def log(message: str) -> None: """Progress/diagnostic line -> stderr, so stdout stays pure results.""" print(message, file=sys.stderr, flush=True) def step(message: str) -> None: """A single colored progress line ('measuring ...') -> stderr.""" log(f" {ERR.paint('▶', BOLD, CYAN)} {ERR.paint(message, DIM)}") def vsection(title: str) -> None: """A clear, ruled header for one --verbose fio section -> stderr.""" width = 78 head = f"─── {title} " head += "─" * max(3, width - len(head)) log("") log(ERR.paint(head, BOLD, CYAN)) def parse_size_bytes(size: str) -> int: """Turn an fio-style size ("4k", "1g", "512m", "2048") into bytes.""" s = size.strip().lower() if s and s[-1] in _SIZE_UNITS: return int(float(s[:-1]) * _SIZE_UNITS[s[-1]]) return int(s) def render_table( headers: Sequence[str], rows: Sequence[Sequence[str]], aligns: Optional[Sequence[str]] = None, col_styles: Optional[Sequence[Optional[str]]] = None, ) -> str: """Render a bordered ASCII table whose columns auto-size to their content. ``aligns`` is a per-column "<" (left) or ">" (right); by default the first column is left-aligned (the label) and the rest are right-aligned (numbers). ``col_styles`` is an optional per-column ANSI style applied to body cells (header is always bold, borders dim). Widths are computed on the *plain* text and color is applied after padding, so escape codes never skew layout. """ ncols = len(headers) if aligns is None: aligns = ["<"] + [">"] * (ncols - 1) if col_styles is None: col_styles = [None] * ncols # Widest cell (header or any row) sets each column's width. widths = [ max([len(str(headers[c]))] + [len(str(r[c])) for r in rows]) for c in range(ncols) ] bar = OUT.paint("|", DIM) sep = OUT.paint(" | ", DIM) def rule() -> str: return OUT.paint("+" + "+".join("-" * (w + 2) for w in widths) + "+", DIM) def line(cells: Sequence[str], *, header: bool = False) -> str: parts = [] for c in range(ncols): padded = f"{str(cells[c]):{aligns[c]}{widths[c]}}" if header: parts.append(OUT.paint(padded, BOLD)) elif col_styles[c]: parts.append(OUT.paint(padded, col_styles[c])) else: parts.append(padded) return f"{bar} " + sep.join(parts) + f" {bar}" out = [rule(), line(headers, header=True), rule()] out += [line(r) for r in rows] out.append(rule()) return "\n".join(out) # --------------------------------------------------------------------------- # # fio: results, invocation, parsing # --------------------------------------------------------------------------- # @dataclass(frozen=True) class FioResult: """Parsed metrics for one direction (read or write) of an fio run. Latencies are completion latency (clat) in microseconds; for synchronous writes that is effectively the durable-commit latency. """ bw_mbps: float # decimal MB/s, matching CrystalDiskMark's SI figure iops: float p50_us: float p99_us: float mean_us: float @classmethod def zero(cls) -> "FioResult": return cls(0.0, 0.0, 0.0, 0.0, 0.0) def _direction_of(rw: str) -> str: """fio reports 'read' and 'write' sub-objects; pick the active one.""" return "read" if "read" in rw else "write" def _aggregate(jobs: list[dict], direction: str) -> FioResult: """Combine per-job fio JSON stats into a single :class:`FioResult`. We deliberately run fio *without* --group_reporting and aggregate ourselves, which sidesteps fio's group-merge quirks: sum throughput, average the median latency, and take the worst-case tail (p99) across jobs. """ def section(job: dict) -> dict: return job.get(direction, {}) bw_mbps = sum(section(j).get("bw_bytes", 0) for j in jobs) / 1e6 iops = sum(section(j).get("iops", 0.0) for j in jobs) def percentiles(key: str) -> list[float]: vals = [] for j in jobs: pct = section(j).get("clat_ns", {}).get("percentile", {}) v = pct.get(key) if v is not None: vals.append(v) return vals p50_ns = percentiles("50.000000") p99_ns = percentiles("99.000000") mean_ns = [section(j).get("clat_ns", {}).get("mean", 0.0) for j in jobs] return FioResult( bw_mbps=bw_mbps, iops=iops, p50_us=(statistics.mean(p50_ns) / 1000.0) if p50_ns else 0.0, # avg of medians p99_us=(max(p99_ns) / 1000.0) if p99_ns else 0.0, # worst tail mean_us=(statistics.mean(mean_ns) / 1000.0) if mean_ns else 0.0, ) def run_fio( cfg: Config, *, rw: str, bs: str, iodepth: int, numjobs: int, engine: Optional[str] = None, direct: Optional[bool] = None, sync: bool = False, end_fsync: bool = False, ) -> FioResult: """Run a single fio job against the shared bench file and parse its JSON. ``engine``/``direct`` default to the detected config values; the SLOG path overrides them (psync + buffered + O_SYNC). ``end_fsync`` flushes the device cache at the end of a write run so cached writes can't inflate the result. """ engine = engine if engine is not None else cfg.engine direct = direct if direct is not None else cfg.direct cmd = [ "sudo", "fio", "--name=speedtest", f"--filename={cfg.benchfile}", f"--ioengine={engine}", f"--direct={1 if direct else 0}", f"--rw={rw}", f"--bs={bs}", f"--size={cfg.size}", f"--numjobs={numjobs}", f"--iodepth={iodepth}", "--time_based", f"--runtime={cfg.runtime}", "--output-format=json", ] if sync: cmd.append("--sync=1") # O_SYNC: every write is a durable commit if end_fsync: cmd.append("--end_fsync=1") # flush device cache once at the end proc = subprocess.run(cmd, capture_output=True, text=True) if cfg.verbose: sync_tag = " sync=1" if sync else "" vsection(f"fio · {rw} bs={bs} iodepth={iodepth} numjobs={numjobs}{sync_tag}") log(" " + ERR.paint("$ " + " ".join(cmd[1:]), DIM, ITALIC)) log(ERR.paint(proc.stdout.strip() or "(no stdout)", DIM)) if proc.stderr.strip(): log(ERR.paint(proc.stderr.strip(), YELLOW)) try: data = json.loads(proc.stdout) return _aggregate(data["jobs"], _direction_of(rw)) except (json.JSONDecodeError, KeyError, ValueError): # fio failed or produced no parseable JSON; report zeros rather than # crashing the whole run (verbose mode above shows what went wrong). return FioResult.zero() def fio_probe(path: str, engine: str, direct: bool) -> bool: """Throwaway 1s fio job to see if an (engine, O_DIRECT) combo works here. This is how we stay accurate *and* portable across ext4/xfs/btrfs/ZFS/NFS without the caller needing to know what each filesystem supports. """ cmd = [ "sudo", "fio", "--name=probe", f"--directory={path}", f"--ioengine={engine}", "--rw=write", "--bs=4k", "--size=1m", f"--direct={1 if direct else 0}", "--time_based", "--runtime=1", ] proc = subprocess.run(cmd, capture_output=True, text=True) for leftover in glob.glob(os.path.join(path, "probe*")): try: os.remove(leftover) except OSError: pass return proc.returncode == 0 def detect_io_settings(cfg: Config) -> None: """Fill in ``cfg.engine`` and ``cfg.direct`` if the user didn't force them. O_DIRECT bypasses the OS page cache so we measure the device, not RAM. Not every filesystem supports it (older OpenZFS, some NFS), so we probe and fall back to buffered rather than erroring out. """ if cfg.engine is None: for engine in ENGINE_CANDIDATES: if fio_probe(cfg.path, engine, direct=False): cfg.engine = engine break else: cfg.engine = "sync" if cfg.direct is None: cfg.direct = fio_probe(cfg.path, cfg.engine, direct=True) # --------------------------------------------------------------------------- # # Profiles # --------------------------------------------------------------------------- # def _meta_line(text: str) -> str: """Color the 'Label :' prefix of a banner metadata line, leave value as-is. The value may already contain its own ANSI codes (e.g. a red O_DIRECT warning); since these lines aren't width-aligned that's harmless. """ key, sep, val = text.partition(":") if not sep: return " " + text return " " + OUT.paint(key + ":", BOLD, BRIGHT_CYAN) + val def banner(title: str, cfg: Config, extra: Sequence[str] = ()) -> None: full = f"speedtest-hd · {title}" inner = len(full) + 4 print() print(OUT.paint("╭" + "─" * inner + "╮", BOLD, CYAN)) print( OUT.paint("│", BOLD, CYAN) + " " + OUT.paint(full, BOLD, WHITE) + " " + OUT.paint("│", BOLD, CYAN) ) print(OUT.paint("╰" + "─" * inner + "╯", BOLD, CYAN)) print(_meta_line(f"Target : {OUT.paint(cfg.path, CYAN)}")) for line_ in extra: print(_meta_line(line_)) print() def cdm_profile(cfg: Config) -> None: """The CrystalDiskMark-style profile: 4 tests x (read, write), MB/s + IOPS.""" detect_io_settings(cfg) if cfg.direct: direct_label = OUT.paint("enabled (device)", GREEN) else: direct_label = OUT.paint( "DISABLED (buffered -- may reflect RAM cache!)", BOLD, RED ) banner( "CrystalDiskMark-style storage benchmark", cfg, extra=[ f"Engine : {cfg.engine} O_DIRECT: {direct_label}", f"Profile : size={cfg.size.upper()} runtime={cfg.runtime}s/run (8 runs)", ], ) rows: list[list[str]] = [] for test in CDM_TESTS: step(f"measuring {test.label.strip()} ...") r = run_fio(cfg, rw=test.read_rw, bs=test.bs, iodepth=test.iodepth, numjobs=test.numjobs) w = run_fio(cfg, rw=test.write_rw, bs=test.bs, iodepth=test.iodepth, numjobs=test.numjobs, end_fsync=True) rows.append([ test.label, f"{r.bw_mbps:.2f}", f"{w.bw_mbps:.2f}", f"{r.iops:.0f}", f"{w.iops:.0f}", ]) _cleanup(cfg) print() print(render_table( ["Test", "Read (MB/s)", "Write (MB/s)", "Read (IOPS)", "Write (IOPS)"], rows, col_styles=[CYAN, GREEN, YELLOW, GREEN, YELLOW], )) print() def slog_profile(cfg: Config) -> None: """SLOG / sync-write latency profile. Forces synchronous 4K random writes (O_SYNC) via the portable psync engine, so every write is a ZIL commit -- exercising a ZFS SLOG exactly the way a sync=always dataset (NFS/iSCSI/VM) does, regardless of the dataset's own sync property. Engine/direct detection is only for the banner here. """ detect_io_settings(cfg) banner( "SLOG / sync-write latency profile", cfg, extra=[ "Method : fio randwrite bs=4k --sync=1 (O_SYNC), psync engine", f"Profile : runtime={cfg.runtime}s/run size={cfg.size.upper()}", "Note : every write is a synchronous ZIL commit -- the load your", " SLOG actually sees. Watch it live in another shell with:", " zpool iostat -vl 1", ], ) # Measure everything first (progress -> stderr), then draw the whole table, # so the "measuring..." lines can't interleave into the middle of it. rows: list[list[str]] = [] for threads in SLOG_THREADS: step(f"measuring 4K sync randwrite T{threads} ...") res = run_fio( cfg, rw="randwrite", bs="4k", iodepth=1, numjobs=threads, engine="psync", direct=False, sync=True, ) rows.append([ f"4K sync T{threads}", f"{res.iops:.0f}", f"{res.bw_mbps:.2f}", f"{res.p50_us:.1f}", f"{res.p99_us:.1f}", ]) _cleanup(cfg) print() print(render_table( ["Test", "IOPS", "MB/s", "p50 lat(us)", "p99 lat(us)"], rows, col_styles=[CYAN, GREEN, GREEN, YELLOW, MAGENTA], )) print() print(OUT.paint(" Healthy Optane SLOG (eg P1600X) single-stream (T1) target:", BOLD)) print(OUT.paint(" ~15-25k IOPS, p50 latency ~40-65us.", GREEN) + OUT.paint(" Much higher latency usually means", DIM)) print(OUT.paint(" CPU C-states / PCIe ASPM / BIOS power profile (eg Dell DAPC) throttling.", DIM)) print() def dd_profile(cfg: Config) -> None: """Dependency-free fallback when fio isn't installed. Far cruder than fio: a single sequential stream, and the cached-read figure will reflect RAM. Use it only as a rough sanity check. """ count_mib = max(1, parse_size_bytes(cfg.size) // (1024**2)) nbytes = count_mib * 1024**2 banner( "basic dd benchmark (fio not installed)", cfg, extra=[f"Profile : {count_mib} MiB sequential stream"], ) def timed_dd(args: list[str]) -> float: start = time.monotonic() subprocess.run(args, capture_output=True, text=True) elapsed = time.monotonic() - start return (nbytes / elapsed / 1e6) if elapsed > 0 else 0.0 # decimal MB/s step("measuring sequential write ...") write_mbps = timed_dd([ "dd", "if=/dev/zero", f"of={cfg.benchfile}", "bs=1M", f"count={count_mib}", "conv=fdatasync,notrunc", ]) step("dropping caches for uncached read ...") subprocess.run(["sudo", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"], capture_output=True, text=True) step("measuring uncached read ...") uncached_mbps = timed_dd([ "dd", f"if={cfg.benchfile}", "of=/dev/null", "bs=1M", f"count={count_mib}", ]) step("measuring cached read ...") cached_mbps = timed_dd([ "dd", f"if={cfg.benchfile}", "of=/dev/null", "bs=1M", f"count={count_mib}", ]) _cleanup(cfg) print() print(render_table( ["Test", "MB/s"], [ ["Sequential write", f"{write_mbps:.2f}"], ["Uncached read", f"{uncached_mbps:.2f}"], ["Cached read (RAM)", f"{cached_mbps:.2f}"], ], col_styles=[CYAN, GREEN], )) print() def _cleanup(cfg: Config) -> None: """Remove the shared bench file.""" try: os.remove(cfg.benchfile) except OSError: pass # --------------------------------------------------------------------------- # # CLI # --------------------------------------------------------------------------- # def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="speedtest-hd.py", description="CrystalDiskMark-style storage benchmark built on fio.", epilog=( "Examples:\n" " speedtest-hd.py .\n" " speedtest-hd.py /mnt/nvmepool --runtime=10 --size=4g\n" " speedtest-hd.py /mnt/nfsshare --buffered\n" " speedtest-hd.py /mnt/nvme-ultra-r10/vm-root --slog --runtime=30\n" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument("path", help="directory/mount to benchmark ('.' for cwd)") mode = p.add_mutually_exclusive_group() mode.add_argument("--fio", action="store_const", const="cdm", dest="mode", help="force the fio CrystalDiskMark-style profile") mode.add_argument("--dd", action="store_const", const="dd", dest="mode", help="force the basic dd fallback test") mode.add_argument("--slog", action="store_const", const="slog", dest="mode", help="SLOG / sync-write latency profile (ZFS ZIL)") direct = p.add_mutually_exclusive_group() direct.add_argument("--direct", action="store_const", const=True, dest="direct", help="force O_DIRECT (bypass page cache)") direct.add_argument("--buffered", action="store_const", const=False, dest="direct", help="force buffered IO (e.g. if O_DIRECT unsupported)") p.add_argument("--engine", choices=ENGINE_CANDIDATES, help="force a specific IO engine (default: auto-detect)") p.add_argument("--runtime", type=int, default=5, metavar="SEC", help="seconds per run (default: 5, like CrystalDiskMark)") p.add_argument("--size", default="1g", metavar="SIZE", help="test file size (default: 1g)") p.add_argument("--verbose", action="store_true", help="also print the full fio output for every run") p.add_argument("-y", "--yes", action="store_true", dest="assume_yes", help="skip the confirmation prompt") p.set_defaults(mode=None, direct=None) return p def confirm(cfg: Config) -> None: """Guard prompt -- we're about to write a multi-GB file to the target.""" if cfg.assume_yes: return print(OUT.paint("NOTICE:", BOLD, YELLOW) + f" {cfg.size.upper()} free space on " + OUT.paint(f"'{cfg.path}'", CYAN) + " is required to perform the benchmark.") answer = input(f"Are you ready to start a storage benchmark against " f"'{cfg.path}' ? ") if not answer.strip().lower().startswith("y"): print(OUT.paint("Ok, cancelled!", YELLOW)) sys.exit(0) print(OUT.paint("Great! Starting benchmark now!", BOLD, GREEN)) def main(argv: Optional[Sequence[str]] = None) -> int: args = build_parser().parse_args(argv) path = os.getcwd() if args.path == "." else args.path if not os.path.exists(path): print(f"Path {path} does not exist", file=sys.stderr) return 1 cfg = Config( path=path, mode=args.mode, engine=args.engine, direct=args.direct, runtime=args.runtime, size=args.size, verbose=args.verbose, assume_yes=args.assume_yes, benchfile=os.path.join(path, "speedtest-hd.bench"), ) have_fio = shutil.which("fio") is not None # Resolve the mode: explicit flag wins; otherwise fio if available, else dd. mode = cfg.mode if mode in ("cdm", "slog") and not have_fio: print(ERR.paint("ERROR:", BOLD, RED) + " --fio/--slog require fio (apt install fio / pacman -S fio).", file=sys.stderr) return 1 if mode is None: if have_fio: mode = "cdm" else: print(OUT.paint("\nfio is not installed -- falling back to basic dd test.", YELLOW)) print(OUT.paint("Install fio for the full CrystalDiskMark-style benchmark.", DIM)) mode = "dd" confirm(cfg) # Dictionary dispatch: map each mode name to its handler function (the values # are the functions themselves -- no parentheses), look up the one for `mode`, # then call it with (cfg). Equivalent to an if/elif chain over `mode`. `mode` # is always one of these three keys by now, so the lookup can't KeyError. {"cdm": cdm_profile, "slog": slog_profile, "dd": dd_profile}[mode](cfg) return 0 if __name__ == "__main__": try: sys.exit(main()) except KeyboardInterrupt: print(ERR.paint("\nInterrupted.", YELLOW), file=sys.stderr) sys.exit(130)