Experimental MIMO matrix-lattice realization sweep¶
Tutorial goal
Sweep reduced and lattice orders for the experimental all-pass/polar matrix-lattice scaffold.
Note
New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.
Context¶
The experimental state-space to matrix-lattice helper returns an all-pass scaffold, not a full dynamic gain realization. This benchmark runs that helper over a small grid of reduced MIMO orders and lattice orders, then reports polar-factor error, raw state-response error, static-gain-compensated error, gain conditioning, unitarity, and a diagnostic classification.
This makes the current matrix-lattice realization story measurable: good all-pass fits, mostly static-gain mismatch, and poor lattice-scaffold fits are separated explicitly.
Key idea and equations¶
Static gain compensation fits
The improvement ratio is
How to read the result¶
Look for unitary lattice responses, stable reduced states, lower static-gain-compensated error than raw state-response error, and classifications that explain whether the mismatch is all-pass, static-gain, or scaffold-driven.
Run command¶
python benchmarks/experimental_mimo_matrix_lattice_realization_sweep.py --full-order 12 --reduced-orders 4 6 --lattice-orders 2 4 --channels 3 --n-markov 128 --n-freq 128 --block-rows 20 --block-cols 20 --candidate-gains 0.20 0.40 0.60 0.80 --static-gain-iterations 10 --repeats 1 --n-threads 1 --output docs/benchmarks/generated/_artifacts/experimental_mimo_matrix_lattice_realization_sweep/experimental-mimo-matrix-lattice-realization-sweep.json
Visual and data readout¶
When the benchmark gallery is built with results, this page embeds PNG summaries generated from the same JSON/CSV artifacts. The raw data stay available below as downloads so exact numbers remain reproducible without making the public page read like console output.
Source code¶
1"""Benchmark experimental MIMO state-space to matrix-lattice realization diagnostics."""
2
3from __future__ import annotations
4
5import argparse
6import csv
7import json
8import platform
9import statistics
10import sys
11import time
12from pathlib import Path
13from collections.abc import Iterable
14
15# When this benchmark is executed as ``python benchmarks/<script>.py``,
16# Python puts ``benchmarks/`` on sys.path rather than the repository root.
17# Prefer the checked-out source tree over any older installed lattice-dsp.
18_REPO_ROOT = Path(__file__).resolve().parents[1]
19if str(_REPO_ROOT) not in sys.path:
20 sys.path.insert(0, str(_REPO_ROOT))
21
22import numpy as np # noqa: E402
23
24import lattice_dsp as ld # noqa: E402
25
26
27def coupled_state_space(
28 order: int, channels: int, seed: int
29) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
30 """Create a deterministic stable square coupled MIMO state-space model."""
31
32 if order <= 0:
33 raise ValueError("order must be positive")
34 if channels <= 0:
35 raise ValueError("channels must be positive")
36 rng = np.random.default_rng(seed)
37 q, _ = np.linalg.qr(rng.normal(size=(order, order)))
38 radii = np.linspace(0.30, 0.90, order)
39 A = q @ np.diag(radii) @ np.linalg.inv(q)
40 mix_in = rng.normal(size=(order, channels))
41 mix_out = rng.normal(size=(channels, order))
42 B = 0.35 * mix_in / np.sqrt(order)
43 C = 0.35 * mix_out / np.sqrt(order)
44 direct = rng.normal(size=(channels, channels))
45 D = 0.08 * direct / max(np.linalg.norm(direct, ord=2), 1e-12)
46 return A.astype(float), B.astype(float), C.astype(float), D.astype(float)
47
48
49def state_spectral_radius(A: np.ndarray) -> float:
50 eigvals = np.linalg.eigvals(np.asarray(A, dtype=float))
51 return float(np.max(np.abs(eigvals))) if eigvals.size else 0.0
52
53
54def median_runtime(fn, repeats: int) -> tuple[float, object]:
55 times: list[float] = []
56 value: object = None
57 for _ in range(repeats):
58 start = time.perf_counter()
59 value = fn()
60 times.append(time.perf_counter() - start)
61 return float(statistics.median(times)), value
62
63
64def _float(value: object) -> float:
65 return float(value) # central place for mypy/readability in row construction
66
67
68def run_case(
69 *,
70 full_order: int,
71 reduced_order: int,
72 lattice_order: int,
73 channels: int,
74 n_markov: int,
75 n_freq: int,
76 block_rows: int,
77 block_cols: int,
78 candidate_gains: Iterable[float],
79 static_gain_iterations: int,
80 repeats: int,
81 n_threads: int,
82 seed: int,
83) -> dict[str, object]:
84 """Run one coupled-MIMO realization diagnostic case."""
85
86 A, B, C, D = coupled_state_space(full_order, channels, seed)
87 markov = ld.mimo_state_space_markov_response(A, B, C, D, n_markov)
88
89 reduce_s, reduced_obj = median_runtime(
90 lambda: ld.finite_hankel_reduce_mimo(
91 markov,
92 reduced_order=reduced_order,
93 block_rows=block_rows,
94 block_cols=block_cols,
95 ),
96 repeats,
97 )
98 reduced = dict(reduced_obj) # type: ignore[arg-type]
99
100 def realize() -> dict[str, object]:
101 return ld.experimental_mimo_state_space_to_matrix_lattice(
102 reduced["A"],
103 reduced["B"],
104 reduced["C"],
105 reduced["D"],
106 order=lattice_order,
107 n_markov=n_markov,
108 n_freq=n_freq,
109 candidate_gains=tuple(candidate_gains),
110 fit_static_gains=True,
111 static_gain_mode="both",
112 static_gain_iterations=static_gain_iterations,
113 n_threads=n_threads,
114 )
115
116 realize_s, fit_obj = median_runtime(realize, repeats)
117 fit = dict(fit_obj) # type: ignore[arg-type]
118 raw_error = _float(fit["state_response_relative_error"])
119 compensated_error = _float(fit["static_gain_relative_error"])
120 improvement = raw_error / max(compensated_error, 1e-30)
121
122 return {
123 "full_order": int(full_order),
124 "reduced_order": int(reduced_order),
125 "lattice_order": int(lattice_order),
126 "channels": int(channels),
127 "reduce_s": reduce_s,
128 "realize_s": realize_s,
129 "reduced_state_radius": state_spectral_radius(np.asarray(reduced["A"], dtype=float)),
130 "retained_hankel_energy": _float(reduced["retained_hankel_energy"]),
131 "selected_gain": _float(fit["selected_gain"]),
132 "polar_factor_relative_error": _float(fit["polar_factor_relative_error"]),
133 "state_response_relative_error": raw_error,
134 "static_gain_relative_error": compensated_error,
135 "static_gain_improvement": improvement,
136 "static_gain_left_condition": _float(fit["static_gain_left_condition"]),
137 "static_gain_right_condition": _float(fit["static_gain_right_condition"]),
138 "unitarity_error": _float(fit["unitarity_error"]),
139 "max_reflection_singular_value": _float(fit["max_reflection_singular_value"]),
140 "target_gain_condition_span": _float(fit["target_gain_condition_span"]),
141 "diagnostic_classification": str(fit["diagnostic_classification"]),
142 }
143
144
145def parse_args() -> argparse.Namespace:
146 parser = argparse.ArgumentParser(description=__doc__)
147 parser.add_argument("--full-order", type=int, default=12)
148 parser.add_argument("--reduced-orders", type=int, nargs="+", default=[4, 6, 8])
149 parser.add_argument("--lattice-orders", type=int, nargs="+", default=[2, 4, 6])
150 parser.add_argument("--channels", type=int, default=3)
151 parser.add_argument("--n-markov", type=int, default=192)
152 parser.add_argument("--n-freq", type=int, default=192)
153 parser.add_argument("--block-rows", type=int, default=28)
154 parser.add_argument("--block-cols", type=int, default=28)
155 parser.add_argument(
156 "--candidate-gains",
157 type=float,
158 nargs="+",
159 default=[0.15, 0.25, 0.35, 0.45, 0.55, 0.70, 0.85],
160 )
161 parser.add_argument("--static-gain-iterations", type=int, default=20)
162 parser.add_argument("--repeats", type=int, default=1)
163 parser.add_argument("--n-threads", type=int, default=1)
164 parser.add_argument("--seed", type=int, default=901)
165 parser.add_argument(
166 "--output",
167 type=Path,
168 default=Path("reports/experimental-mimo-matrix-lattice-realization-sweep.json"),
169 )
170 parser.add_argument("--csv-output", type=Path, default=None)
171 return parser.parse_args()
172
173
174def _validate_args(args: argparse.Namespace) -> None:
175 if args.full_order <= 0:
176 raise SystemExit("--full-order must be positive")
177 if args.channels <= 0:
178 raise SystemExit("--channels must be positive")
179 if args.n_markov <= 8:
180 raise SystemExit("--n-markov must be larger than 8")
181 if args.n_freq < 8:
182 raise SystemExit("--n-freq must be at least 8")
183 if args.block_rows <= 0 or args.block_cols <= 0:
184 raise SystemExit("--block-rows and --block-cols must be positive")
185 if args.repeats <= 0:
186 raise SystemExit("--repeats must be positive")
187 if args.static_gain_iterations <= 0:
188 raise SystemExit("--static-gain-iterations must be positive")
189 if not args.candidate_gains or any(g < 0.0 or not np.isfinite(g) for g in args.candidate_gains):
190 raise SystemExit("--candidate-gains must be finite nonnegative values")
191 if any(r <= 0 for r in args.reduced_orders):
192 raise SystemExit("--reduced-orders must be positive")
193 if any(o < 0 for o in args.lattice_orders):
194 raise SystemExit("--lattice-orders must be nonnegative")
195
196
197def main() -> None:
198 args = parse_args()
199 _validate_args(args)
200
201 rows: list[dict[str, object]] = []
202 for reduced_order in args.reduced_orders:
203 for lattice_order in args.lattice_orders:
204 rows.append(
205 run_case(
206 full_order=args.full_order,
207 reduced_order=reduced_order,
208 lattice_order=lattice_order,
209 channels=args.channels,
210 n_markov=args.n_markov,
211 n_freq=args.n_freq,
212 block_rows=args.block_rows,
213 block_cols=args.block_cols,
214 candidate_gains=args.candidate_gains,
215 static_gain_iterations=args.static_gain_iterations,
216 repeats=args.repeats,
217 n_threads=args.n_threads,
218 seed=args.seed + 100 * reduced_order + lattice_order,
219 )
220 )
221
222 payload = {
223 "python": platform.python_version(),
224 "platform": platform.platform(),
225 "has_openmp": bool(getattr(ld, "HAS_OPENMP", False)),
226 "full_order": args.full_order,
227 "channels": args.channels,
228 "n_markov": args.n_markov,
229 "n_freq": args.n_freq,
230 "block_rows": args.block_rows,
231 "block_cols": args.block_cols,
232 "candidate_gains": list(args.candidate_gains),
233 "static_gain_iterations": args.static_gain_iterations,
234 "repeats": args.repeats,
235 "n_threads": args.n_threads,
236 "description": (
237 "Experimental MIMO state-space to matrix-lattice realization diagnostic sweep. "
238 "The fitted lattice is an all-pass/polar scaffold; static gain compensation is reported "
239 "separately and this is not an exact matrix AAK/Nehari solver."
240 ),
241 "results": rows,
242 }
243
244 args.output.parent.mkdir(parents=True, exist_ok=True)
245 args.output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
246 if args.csv_output is not None:
247 args.csv_output.parent.mkdir(parents=True, exist_ok=True)
248 with args.csv_output.open("w", newline="", encoding="utf-8") as f:
249 writer = csv.DictWriter(f, fieldnames=list(rows[0]) if rows else [])
250 writer.writeheader()
251 writer.writerows(rows)
252
253 print(json.dumps({k: v for k, v in payload.items() if k != "results"}, indent=2))
254 print()
255 print(
256 f"{'red':>4} {'lat':>4} {'stable':>6} {'realize_s':>10} {'polar_err':>10} "
257 f"{'raw_err':>10} {'gain_err':>10} {'improve':>9} {'unitary':>10} {'class':>34}"
258 )
259 print("-" * 118)
260 for row in rows:
261 stable = bool(
262 float(row["max_reflection_singular_value"]) < 1.0
263 and float(row["reduced_state_radius"]) < 1.0
264 )
265 print(
266 f"{int(row['reduced_order']):4d} {int(row['lattice_order']):4d} {str(stable):>6} "
267 f"{float(row['realize_s']):10.4f} {float(row['polar_factor_relative_error']):10.3e} "
268 f"{float(row['state_response_relative_error']):10.3e} {float(row['static_gain_relative_error']):10.3e} "
269 f"{float(row['static_gain_improvement']):9.2f} {float(row['unitarity_error']):10.2e} "
270 f"{str(row['diagnostic_classification']):>34}"
271 )
272 print(f"\nWrote {args.output}")
273 if args.csv_output is not None:
274 print(f"Wrote {args.csv_output}")
275
276
277if __name__ == "__main__":
278 main()