Coupled MIMO finite-Hankel reduction benchmark¶
Tutorial goal
Measure finite block-Hankel reduction cost and repeated state-space simulation speedup on coupled MIMO systems.
Note
New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.
Context¶
The MIMO reducer returns state-space matrices rather than scalar filter coefficients. This
benchmark therefore measures the repeated cost of simulating the full and reduced MIMO
systems on batched multichannel input signals. It uses the compiled
mimo_state_space_process_batch kernel when available, so the measured processing time
reflects the current C++ state-space runtime rather than a pure Python loop.
The table deliberately separates three concepts: processing speedup, one-shot end-to-end
speedup including a single reduction, and amortized end-to-end speedup after reusing the
reduced model for --reuse-count additional batches. This keeps the benchmark scope explicit: the
reduction can have excellent repeated-runtime speedups while still needing enough reuse to
pay back preprocessing.
This is still the reference block-Hankel/ERA-style baseline. It is not a matrix AAK/Nehari solver; it is the finite block-Hankel reference point for comparison with matrix optimal-reduction methods.
Key idea and equations¶
The benchmark reports processing speedup
one-shot end-to-end speedup including one reduction,
and amortized end-to-end speedup across K reused batches,
How to read the result¶
Look for stable reduced state matrices, decreasing Markov/output error with order, high processing speedup, and amortized end-to-end speedup above one when the workload reuses the reduced model enough times.
Run command¶
python benchmarks/mimo_hankel_reduction_speedup.py --full-orders 8 16 --reduced-orders 2 4 6 8 --inputs 3 --outputs 3 --batch 8 --samples 6000 --repeats 2 --reuse-count 50 --n-threads 1 --n-markov 256 --block-rows 32 --block-cols 32 --output docs/benchmarks/generated/_artifacts/mimo_hankel_reduction_speedup/mimo-hankel-reduction-speedup.json
Visual and data readout¶
When the benchmark gallery is built with results, this page embeds PNG summaries generated from the same JSON/CSV artifacts. The raw data stay available below as downloads so exact numbers remain reproducible without making the public page read like console output.
Source code¶
1"""Benchmark finite block-Hankel reduction on coupled MIMO systems.
2
3The reducer returns state-space models. This benchmark therefore measures the
4one-time reduction cost and the repeated cost of simulating the full and reduced
5state-space systems on batched MIMO input signals.
6"""
7
8from __future__ import annotations
9
10import argparse
11import json
12import math
13import platform
14import statistics
15import time
16from pathlib import Path
17
18import numpy as np
19
20import lattice_dsp as ld
21
22
23def state_spectral_radius(A) -> float:
24 A = np.asarray(A, dtype=float)
25 if A.size == 0:
26 return 0.0
27 return float(np.max(np.abs(np.linalg.eigvals(A))))
28
29
30def coupled_state_space(order: int, outputs: int, inputs: int, seed: int):
31 rng = np.random.default_rng(seed)
32 q, _ = np.linalg.qr(rng.normal(size=(order, order)))
33 radii = np.linspace(0.90, 0.18, order)
34 signs = np.where(np.arange(order) % 2 == 0, 1.0, -1.0)
35 A = q @ np.diag(signs * radii) @ q.T
36 B = 0.28 * rng.normal(size=(order, inputs))
37 C = 0.28 * rng.normal(size=(outputs, order))
38 D = 0.025 * rng.normal(size=(outputs, inputs))
39 if inputs == outputs:
40 D += 0.035 * (np.ones((outputs, inputs)) - np.eye(outputs))
41 return A, B, C, D
42
43
44def state_space_process_python(A, B, C, D, x):
45 A = np.asarray(A, dtype=float)
46 B = np.asarray(B, dtype=float)
47 C = np.asarray(C, dtype=float)
48 D = np.asarray(D, dtype=float)
49 x = np.asarray(x, dtype=float)
50 batch, samples, _ = x.shape
51 n_outputs = D.shape[0]
52 n_state = A.shape[0]
53 state = np.zeros((batch, n_state), dtype=float)
54 y = np.zeros((batch, samples, n_outputs), dtype=float)
55 for n in range(samples):
56 xn = x[:, n, :]
57 y[:, n, :] = state @ C.T + xn @ D.T
58 if n_state:
59 state = state @ A.T + xn @ B.T
60 return y
61
62
63def state_space_process(A, B, C, D, x, n_threads: int = 0):
64 """Use the compiled state-space processor when available.
65
66 The Python fallback keeps the benchmark source readable when somebody runs it
67 against an older local extension before rebuilding.
68 """
69
70 compiled = getattr(ld, "mimo_state_space_process_batch", None)
71 if compiled is None:
72 return state_space_process_python(A, B, C, D, x)
73 return compiled(A, B, C, D, x, n_threads=n_threads)
74
75
76def median_time(fn, repeats: int):
77 values = []
78 result = None
79 for _ in range(repeats):
80 t0 = time.perf_counter()
81 result = fn()
82 values.append(time.perf_counter() - t0)
83 return statistics.median(values), result
84
85
86def rel_mse(reference, estimate) -> float:
87 reference = np.asarray(reference, dtype=float)
88 estimate = np.asarray(estimate, dtype=float)
89 return float(np.sum((reference - estimate) ** 2) / (np.sum(reference**2) + 1e-30))
90
91
92def snr_db_from_rel_mse(value: float) -> float:
93 return 10.0 * math.log10(1.0 / max(value, 1e-300))
94
95
96def break_even_samples_per_batch(reduction_s: float, full_s: float, reduced_s: float, samples: int):
97 full_per_sample = full_s / samples
98 reduced_per_sample = reduced_s / samples
99 saved = full_per_sample - reduced_per_sample
100 if saved <= 0.0:
101 return None
102 return reduction_s / saved
103
104
105def run(args):
106 rng = np.random.default_rng(args.seed)
107 rows = []
108 n_threads = getattr(args, "n_threads", 0)
109 reuse_count = max(1, int(getattr(args, "reuse_count", 1)))
110
111 for full_order in args.full_orders:
112 A, B, C, D = coupled_state_space(
113 full_order, args.outputs, args.inputs, args.seed + full_order
114 )
115 markov = ld.mimo_state_space_markov_response(A, B, C, D, args.n_markov)
116 x = rng.normal(size=(args.batch, args.samples, args.inputs))
117
118 full_time, y_full = median_time(
119 lambda A=A, B=B, C=C, D=D, x=x: state_space_process(A, B, C, D, x, n_threads),
120 args.repeats,
121 )
122
123 for reduced_order in args.reduced_orders:
124 if reduced_order > min(args.block_rows * args.outputs, args.block_cols * args.inputs):
125 continue
126 if reduced_order > full_order:
127 continue
128
129 t0 = time.perf_counter()
130 try:
131 reduction = ld.finite_hankel_reduce_mimo(
132 markov,
133 reduced_order=reduced_order,
134 block_rows=args.block_rows,
135 block_cols=args.block_cols,
136 )
137 reduce_s = time.perf_counter() - t0
138 except ValueError as exc:
139 rows.append(
140 {
141 "full_order": full_order,
142 "reduced_order": reduced_order,
143 "stable": False,
144 "error": str(exc),
145 }
146 )
147 continue
148
149 red_time, y_red = median_time(
150 lambda r=reduction, x=x: state_space_process(
151 r["A"], r["B"], r["C"], r["D"], x, n_threads
152 ),
153 args.repeats,
154 )
155 output_rel = rel_mse(y_full, y_red)
156 approx_markov = ld.mimo_state_space_markov_response(
157 reduction["A"], reduction["B"], reduction["C"], reduction["D"], args.n_markov
158 )
159 markov_rel = rel_mse(markov, approx_markov)
160 filter_speedup = full_time / red_time if red_time > 0 else float("inf")
161 one_shot_end2end = (
162 full_time / (reduce_s + red_time) if reduce_s + red_time > 0 else float("inf")
163 )
164 amortized_denominator = reduce_s + reuse_count * red_time
165 amortized_end2end = (
166 (reuse_count * full_time) / amortized_denominator
167 if amortized_denominator > 0
168 else float("inf")
169 )
170 break_even = break_even_samples_per_batch(reduce_s, full_time, red_time, args.samples)
171
172 rows.append(
173 {
174 "full_order": full_order,
175 "reduced_order": reduced_order,
176 "stable": bool(reduction["stable"]),
177 "full_state_radius": state_spectral_radius(A),
178 "reduced_state_radius": state_spectral_radius(reduction["A"]),
179 "retained_hankel_energy": float(reduction["retained_hankel_energy"]),
180 "relative_markov_error": markov_rel,
181 "relative_output_error": output_rel,
182 "output_snr_db": snr_db_from_rel_mse(output_rel),
183 "reduction_time_s": reduce_s,
184 "full_process_median_s": full_time,
185 "reduced_process_median_s": red_time,
186 "process_speedup": filter_speedup,
187 "one_shot_end_to_end_speedup": one_shot_end2end,
188 "amortized_end_to_end_speedup": amortized_end2end,
189 "reuse_count": reuse_count,
190 "break_even_samples_per_batch": break_even,
191 }
192 )
193
194 metadata = {
195 "python": platform.python_version(),
196 "platform": platform.platform(),
197 "has_openmp": bool(getattr(ld, "HAS_OPENMP", False)),
198 "inputs": args.inputs,
199 "outputs": args.outputs,
200 "batch": args.batch,
201 "samples": args.samples,
202 "repeats": args.repeats,
203 "reuse_count": reuse_count,
204 "n_threads": n_threads,
205 "state_space_backend": "compiled"
206 if getattr(ld, "mimo_state_space_process_batch", None) is not None
207 else "python",
208 "n_markov": args.n_markov,
209 "block_rows": args.block_rows,
210 "block_cols": args.block_cols,
211 "seed": args.seed,
212 "description": "Coupled MIMO finite block-Hankel reduction benchmark. Reduction is a preprocessing cost; processing speedup is measured by batched state-space simulation, and amortized end-to-end speedup assumes the reduced model is reused for reuse_count batches.",
213 }
214 return {"metadata": metadata, "rows": rows}
215
216
217def print_table(result):
218 print(json.dumps(result["metadata"], indent=2))
219 print()
220 print(
221 f"{'full':>5s} {'red':>5s} {'stable':>7s} {'reduce_s':>10s} {'proc_x':>9s} "
222 f"{'one_x':>8s} {'reuse_x':>9s} {'SNR':>8s} {'markov_err':>12s} {'radius':>8s} {'break_even':>12s}"
223 )
224 print("-" * 110)
225 for row in result["rows"]:
226 if "error" in row:
227 print(
228 f"{row['full_order']:5d} {row['reduced_order']:5d} {'False':>7s} "
229 f"{'n/a':>10s} {'n/a':>9s} {'n/a':>8s} {'n/a':>9s} "
230 f"{'n/a':>8s} {'n/a':>12s} {'n/a':>8s} {'n/a':>12s}"
231 )
232 continue
233 be = row["break_even_samples_per_batch"]
234 be_text = "n/a" if be is None else f"{be:.0f}"
235 print(
236 f"{row['full_order']:5d} {row['reduced_order']:5d} {str(row['stable']):>7s} "
237 f"{row['reduction_time_s']:10.4f} {row['process_speedup']:9.2f} "
238 f"{row['one_shot_end_to_end_speedup']:8.2f} {row['amortized_end_to_end_speedup']:9.2f} "
239 f"{row['output_snr_db']:8.2f} {row['relative_markov_error']:12.3e} "
240 f"{row['reduced_state_radius']:8.4f} {be_text:>12s}"
241 )
242
243
244def main():
245 parser = argparse.ArgumentParser(description=__doc__)
246 parser.add_argument("--full-orders", type=int, nargs="+", default=[8, 16, 32])
247 parser.add_argument("--reduced-orders", type=int, nargs="+", default=[2, 4, 6, 8, 12])
248 parser.add_argument("--inputs", type=int, default=3)
249 parser.add_argument("--outputs", type=int, default=3)
250 parser.add_argument("--batch", type=int, default=16)
251 parser.add_argument("--samples", type=int, default=20000)
252 parser.add_argument("--repeats", type=int, default=3)
253 parser.add_argument(
254 "--reuse-count",
255 type=int,
256 default=1,
257 help="number of additional batches over which to amortize the one-time reduction cost",
258 )
259 parser.add_argument(
260 "--n-threads",
261 type=int,
262 default=1,
263 help="thread count for the compiled state-space processor; use 0 for the OpenMP default when available",
264 )
265 parser.add_argument("--n-markov", type=int, default=512)
266 parser.add_argument("--block-rows", type=int, default=48)
267 parser.add_argument("--block-cols", type=int, default=48)
268 parser.add_argument("--seed", type=int, default=911)
269 parser.add_argument("--output", default="reports/mimo-hankel-reduction-speedup.json")
270 args = parser.parse_args()
271
272 result = run(args)
273 print_table(result)
274
275 output = Path(args.output)
276 output.parent.mkdir(parents=True, exist_ok=True)
277 output.write_text(json.dumps(result, indent=2), encoding="utf-8")
278 print(f"\nWrote {output}")
279
280
281if __name__ == "__main__":
282 main()