Coupled MIMO finite-Hankel reduction benchmark

Tutorial goal

Measure finite block-Hankel reduction cost and repeated state-space simulation speedup on coupled MIMO systems.

Note

New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.

Context

The MIMO reducer returns state-space matrices rather than scalar filter coefficients. This benchmark therefore measures the repeated cost of simulating the full and reduced MIMO systems on batched multichannel input signals. It uses the compiled mimo_state_space_process_batch kernel when available, so the measured processing time reflects the current C++ state-space runtime rather than a pure Python loop.

The table deliberately separates three concepts: processing speedup, one-shot end-to-end speedup including a single reduction, and amortized end-to-end speedup after reusing the reduced model for --reuse-count additional batches. This keeps the benchmark scope explicit: the reduction can have excellent repeated-runtime speedups while still needing enough reuse to pay back preprocessing.

This is still the reference block-Hankel/ERA-style baseline. It is not a matrix AAK/Nehari solver; it is the finite block-Hankel reference point for comparison with matrix optimal-reduction methods.

Key idea and equations

The benchmark reports processing speedup

\[S_{process}=\frac{t_{full}}{t_{reduced}},\]

one-shot end-to-end speedup including one reduction,

\[S_{one-shot}=\frac{t_{full}}{t_{reduce}+t_{reduced}},\]

and amortized end-to-end speedup across K reused batches,

\[S_{amortized}=\frac{K t_{full}}{t_{reduce}+K t_{reduced}}.\]

How to read the result

Look for stable reduced state matrices, decreasing Markov/output error with order, high processing speedup, and amortized end-to-end speedup above one when the workload reuses the reduced model enough times.

Run command

python benchmarks/mimo_hankel_reduction_speedup.py --full-orders 8 16 --reduced-orders 2 4 6 8 --inputs 3 --outputs 3 --batch 8 --samples 6000 --repeats 2 --reuse-count 50 --n-threads 1 --n-markov 256 --block-rows 32 --block-cols 32 --output docs/benchmarks/generated/_artifacts/mimo_hankel_reduction_speedup/mimo-hankel-reduction-speedup.json

Visual and data readout

When the benchmark gallery is built with results, this page embeds PNG summaries generated from the same JSON/CSV artifacts. The raw data stay available below as downloads so exact numbers remain reproducible without making the public page read like console output.

Source code

  1"""Benchmark finite block-Hankel reduction on coupled MIMO systems.
  2
  3The reducer returns state-space models.  This benchmark therefore measures the
  4one-time reduction cost and the repeated cost of simulating the full and reduced
  5state-space systems on batched MIMO input signals.
  6"""
  7
  8from __future__ import annotations
  9
 10import argparse
 11import json
 12import math
 13import platform
 14import statistics
 15import time
 16from pathlib import Path
 17
 18import numpy as np
 19
 20import lattice_dsp as ld
 21
 22
 23def state_spectral_radius(A) -> float:
 24    A = np.asarray(A, dtype=float)
 25    if A.size == 0:
 26        return 0.0
 27    return float(np.max(np.abs(np.linalg.eigvals(A))))
 28
 29
 30def coupled_state_space(order: int, outputs: int, inputs: int, seed: int):
 31    rng = np.random.default_rng(seed)
 32    q, _ = np.linalg.qr(rng.normal(size=(order, order)))
 33    radii = np.linspace(0.90, 0.18, order)
 34    signs = np.where(np.arange(order) % 2 == 0, 1.0, -1.0)
 35    A = q @ np.diag(signs * radii) @ q.T
 36    B = 0.28 * rng.normal(size=(order, inputs))
 37    C = 0.28 * rng.normal(size=(outputs, order))
 38    D = 0.025 * rng.normal(size=(outputs, inputs))
 39    if inputs == outputs:
 40        D += 0.035 * (np.ones((outputs, inputs)) - np.eye(outputs))
 41    return A, B, C, D
 42
 43
 44def state_space_process_python(A, B, C, D, x):
 45    A = np.asarray(A, dtype=float)
 46    B = np.asarray(B, dtype=float)
 47    C = np.asarray(C, dtype=float)
 48    D = np.asarray(D, dtype=float)
 49    x = np.asarray(x, dtype=float)
 50    batch, samples, _ = x.shape
 51    n_outputs = D.shape[0]
 52    n_state = A.shape[0]
 53    state = np.zeros((batch, n_state), dtype=float)
 54    y = np.zeros((batch, samples, n_outputs), dtype=float)
 55    for n in range(samples):
 56        xn = x[:, n, :]
 57        y[:, n, :] = state @ C.T + xn @ D.T
 58        if n_state:
 59            state = state @ A.T + xn @ B.T
 60    return y
 61
 62
 63def state_space_process(A, B, C, D, x, n_threads: int = 0):
 64    """Use the compiled state-space processor when available.
 65
 66    The Python fallback keeps the benchmark source readable when somebody runs it
 67    against an older local extension before rebuilding.
 68    """
 69
 70    compiled = getattr(ld, "mimo_state_space_process_batch", None)
 71    if compiled is None:
 72        return state_space_process_python(A, B, C, D, x)
 73    return compiled(A, B, C, D, x, n_threads=n_threads)
 74
 75
 76def median_time(fn, repeats: int):
 77    values = []
 78    result = None
 79    for _ in range(repeats):
 80        t0 = time.perf_counter()
 81        result = fn()
 82        values.append(time.perf_counter() - t0)
 83    return statistics.median(values), result
 84
 85
 86def rel_mse(reference, estimate) -> float:
 87    reference = np.asarray(reference, dtype=float)
 88    estimate = np.asarray(estimate, dtype=float)
 89    return float(np.sum((reference - estimate) ** 2) / (np.sum(reference**2) + 1e-30))
 90
 91
 92def snr_db_from_rel_mse(value: float) -> float:
 93    return 10.0 * math.log10(1.0 / max(value, 1e-300))
 94
 95
 96def break_even_samples_per_batch(reduction_s: float, full_s: float, reduced_s: float, samples: int):
 97    full_per_sample = full_s / samples
 98    reduced_per_sample = reduced_s / samples
 99    saved = full_per_sample - reduced_per_sample
100    if saved <= 0.0:
101        return None
102    return reduction_s / saved
103
104
105def run(args):
106    rng = np.random.default_rng(args.seed)
107    rows = []
108    n_threads = getattr(args, "n_threads", 0)
109    reuse_count = max(1, int(getattr(args, "reuse_count", 1)))
110
111    for full_order in args.full_orders:
112        A, B, C, D = coupled_state_space(
113            full_order, args.outputs, args.inputs, args.seed + full_order
114        )
115        markov = ld.mimo_state_space_markov_response(A, B, C, D, args.n_markov)
116        x = rng.normal(size=(args.batch, args.samples, args.inputs))
117
118        full_time, y_full = median_time(
119            lambda A=A, B=B, C=C, D=D, x=x: state_space_process(A, B, C, D, x, n_threads),
120            args.repeats,
121        )
122
123        for reduced_order in args.reduced_orders:
124            if reduced_order > min(args.block_rows * args.outputs, args.block_cols * args.inputs):
125                continue
126            if reduced_order > full_order:
127                continue
128
129            t0 = time.perf_counter()
130            try:
131                reduction = ld.finite_hankel_reduce_mimo(
132                    markov,
133                    reduced_order=reduced_order,
134                    block_rows=args.block_rows,
135                    block_cols=args.block_cols,
136                )
137                reduce_s = time.perf_counter() - t0
138            except ValueError as exc:
139                rows.append(
140                    {
141                        "full_order": full_order,
142                        "reduced_order": reduced_order,
143                        "stable": False,
144                        "error": str(exc),
145                    }
146                )
147                continue
148
149            red_time, y_red = median_time(
150                lambda r=reduction, x=x: state_space_process(
151                    r["A"], r["B"], r["C"], r["D"], x, n_threads
152                ),
153                args.repeats,
154            )
155            output_rel = rel_mse(y_full, y_red)
156            approx_markov = ld.mimo_state_space_markov_response(
157                reduction["A"], reduction["B"], reduction["C"], reduction["D"], args.n_markov
158            )
159            markov_rel = rel_mse(markov, approx_markov)
160            filter_speedup = full_time / red_time if red_time > 0 else float("inf")
161            one_shot_end2end = (
162                full_time / (reduce_s + red_time) if reduce_s + red_time > 0 else float("inf")
163            )
164            amortized_denominator = reduce_s + reuse_count * red_time
165            amortized_end2end = (
166                (reuse_count * full_time) / amortized_denominator
167                if amortized_denominator > 0
168                else float("inf")
169            )
170            break_even = break_even_samples_per_batch(reduce_s, full_time, red_time, args.samples)
171
172            rows.append(
173                {
174                    "full_order": full_order,
175                    "reduced_order": reduced_order,
176                    "stable": bool(reduction["stable"]),
177                    "full_state_radius": state_spectral_radius(A),
178                    "reduced_state_radius": state_spectral_radius(reduction["A"]),
179                    "retained_hankel_energy": float(reduction["retained_hankel_energy"]),
180                    "relative_markov_error": markov_rel,
181                    "relative_output_error": output_rel,
182                    "output_snr_db": snr_db_from_rel_mse(output_rel),
183                    "reduction_time_s": reduce_s,
184                    "full_process_median_s": full_time,
185                    "reduced_process_median_s": red_time,
186                    "process_speedup": filter_speedup,
187                    "one_shot_end_to_end_speedup": one_shot_end2end,
188                    "amortized_end_to_end_speedup": amortized_end2end,
189                    "reuse_count": reuse_count,
190                    "break_even_samples_per_batch": break_even,
191                }
192            )
193
194    metadata = {
195        "python": platform.python_version(),
196        "platform": platform.platform(),
197        "has_openmp": bool(getattr(ld, "HAS_OPENMP", False)),
198        "inputs": args.inputs,
199        "outputs": args.outputs,
200        "batch": args.batch,
201        "samples": args.samples,
202        "repeats": args.repeats,
203        "reuse_count": reuse_count,
204        "n_threads": n_threads,
205        "state_space_backend": "compiled"
206        if getattr(ld, "mimo_state_space_process_batch", None) is not None
207        else "python",
208        "n_markov": args.n_markov,
209        "block_rows": args.block_rows,
210        "block_cols": args.block_cols,
211        "seed": args.seed,
212        "description": "Coupled MIMO finite block-Hankel reduction benchmark. Reduction is a preprocessing cost; processing speedup is measured by batched state-space simulation, and amortized end-to-end speedup assumes the reduced model is reused for reuse_count batches.",
213    }
214    return {"metadata": metadata, "rows": rows}
215
216
217def print_table(result):
218    print(json.dumps(result["metadata"], indent=2))
219    print()
220    print(
221        f"{'full':>5s} {'red':>5s} {'stable':>7s} {'reduce_s':>10s} {'proc_x':>9s} "
222        f"{'one_x':>8s} {'reuse_x':>9s} {'SNR':>8s} {'markov_err':>12s} {'radius':>8s} {'break_even':>12s}"
223    )
224    print("-" * 110)
225    for row in result["rows"]:
226        if "error" in row:
227            print(
228                f"{row['full_order']:5d} {row['reduced_order']:5d} {'False':>7s} "
229                f"{'n/a':>10s} {'n/a':>9s} {'n/a':>8s} {'n/a':>9s} "
230                f"{'n/a':>8s} {'n/a':>12s} {'n/a':>8s} {'n/a':>12s}"
231            )
232            continue
233        be = row["break_even_samples_per_batch"]
234        be_text = "n/a" if be is None else f"{be:.0f}"
235        print(
236            f"{row['full_order']:5d} {row['reduced_order']:5d} {str(row['stable']):>7s} "
237            f"{row['reduction_time_s']:10.4f} {row['process_speedup']:9.2f} "
238            f"{row['one_shot_end_to_end_speedup']:8.2f} {row['amortized_end_to_end_speedup']:9.2f} "
239            f"{row['output_snr_db']:8.2f} {row['relative_markov_error']:12.3e} "
240            f"{row['reduced_state_radius']:8.4f} {be_text:>12s}"
241        )
242
243
244def main():
245    parser = argparse.ArgumentParser(description=__doc__)
246    parser.add_argument("--full-orders", type=int, nargs="+", default=[8, 16, 32])
247    parser.add_argument("--reduced-orders", type=int, nargs="+", default=[2, 4, 6, 8, 12])
248    parser.add_argument("--inputs", type=int, default=3)
249    parser.add_argument("--outputs", type=int, default=3)
250    parser.add_argument("--batch", type=int, default=16)
251    parser.add_argument("--samples", type=int, default=20000)
252    parser.add_argument("--repeats", type=int, default=3)
253    parser.add_argument(
254        "--reuse-count",
255        type=int,
256        default=1,
257        help="number of additional batches over which to amortize the one-time reduction cost",
258    )
259    parser.add_argument(
260        "--n-threads",
261        type=int,
262        default=1,
263        help="thread count for the compiled state-space processor; use 0 for the OpenMP default when available",
264    )
265    parser.add_argument("--n-markov", type=int, default=512)
266    parser.add_argument("--block-rows", type=int, default=48)
267    parser.add_argument("--block-cols", type=int, default=48)
268    parser.add_argument("--seed", type=int, default=911)
269    parser.add_argument("--output", default="reports/mimo-hankel-reduction-speedup.json")
270    args = parser.parse_args()
271
272    result = run(args)
273    print_table(result)
274
275    output = Path(args.output)
276    output.parent.mkdir(parents=True, exist_ok=True)
277    output.write_text(json.dumps(result, indent=2), encoding="utf-8")
278    print(f"\nWrote {output}")
279
280
281if __name__ == "__main__":
282    main()