Experimental MIMO matrix-lattice realization sweep

Tutorial goal

Sweep reduced and lattice orders for the experimental all-pass/polar matrix-lattice scaffold.

Note

New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.

Context

The experimental state-space to matrix-lattice helper returns an all-pass scaffold, not a full dynamic gain realization. This benchmark runs that helper over a small grid of reduced MIMO orders and lattice orders, then reports polar-factor error, raw state-response error, static-gain-compensated error, gain conditioning, unitarity, and a diagnostic classification.

This makes the current matrix-lattice realization story measurable: good all-pass fits, mostly static-gain mismatch, and poor lattice-scaffold fits are separated explicitly.

Key idea and equations

Static gain compensation fits

\[H(e^{j\omega}) \approx L\,G_{lattice}(e^{j\omega})\,R.\]

The improvement ratio is

\[I = \frac{\|H-G\|_F}{\|H-LGR\|_F}.\]

How to read the result

Look for unitary lattice responses, stable reduced states, lower static-gain-compensated error than raw state-response error, and classifications that explain whether the mismatch is all-pass, static-gain, or scaffold-driven.

Run command

python benchmarks/experimental_mimo_matrix_lattice_realization_sweep.py --full-order 12 --reduced-orders 4 6 --lattice-orders 2 4 --channels 3 --n-markov 128 --n-freq 128 --block-rows 20 --block-cols 20 --candidate-gains 0.20 0.40 0.60 0.80 --static-gain-iterations 10 --repeats 1 --n-threads 1 --output docs/benchmarks/generated/_artifacts/experimental_mimo_matrix_lattice_realization_sweep/experimental-mimo-matrix-lattice-realization-sweep.json

Run status

Return code: 1

Visual and data readout

When the benchmark gallery is built with results, this page embeds PNG summaries generated from the same JSON/CSV artifacts. The raw data stay available below as downloads so exact numbers remain reproducible without making the public page read like console output.

Source code

  1"""Benchmark experimental MIMO state-space to matrix-lattice realization diagnostics."""
  2
  3from __future__ import annotations
  4
  5import argparse
  6import csv
  7import json
  8import platform
  9import statistics
 10import sys
 11import time
 12from pathlib import Path
 13from collections.abc import Iterable
 14
 15# When this benchmark is executed as ``python benchmarks/<script>.py``,
 16# Python puts ``benchmarks/`` on sys.path rather than the repository root.
 17# Prefer the checked-out source tree over any older installed lattice-dsp.
 18_REPO_ROOT = Path(__file__).resolve().parents[1]
 19if str(_REPO_ROOT) not in sys.path:
 20    sys.path.insert(0, str(_REPO_ROOT))
 21
 22import numpy as np  # noqa: E402
 23
 24import lattice_dsp as ld  # noqa: E402
 25
 26
 27def coupled_state_space(
 28    order: int, channels: int, seed: int
 29) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
 30    """Create a deterministic stable square coupled MIMO state-space model."""
 31
 32    if order <= 0:
 33        raise ValueError("order must be positive")
 34    if channels <= 0:
 35        raise ValueError("channels must be positive")
 36    rng = np.random.default_rng(seed)
 37    q, _ = np.linalg.qr(rng.normal(size=(order, order)))
 38    radii = np.linspace(0.30, 0.90, order)
 39    A = q @ np.diag(radii) @ np.linalg.inv(q)
 40    mix_in = rng.normal(size=(order, channels))
 41    mix_out = rng.normal(size=(channels, order))
 42    B = 0.35 * mix_in / np.sqrt(order)
 43    C = 0.35 * mix_out / np.sqrt(order)
 44    direct = rng.normal(size=(channels, channels))
 45    D = 0.08 * direct / max(np.linalg.norm(direct, ord=2), 1e-12)
 46    return A.astype(float), B.astype(float), C.astype(float), D.astype(float)
 47
 48
 49def state_spectral_radius(A: np.ndarray) -> float:
 50    eigvals = np.linalg.eigvals(np.asarray(A, dtype=float))
 51    return float(np.max(np.abs(eigvals))) if eigvals.size else 0.0
 52
 53
 54def median_runtime(fn, repeats: int) -> tuple[float, object]:
 55    times: list[float] = []
 56    value: object = None
 57    for _ in range(repeats):
 58        start = time.perf_counter()
 59        value = fn()
 60        times.append(time.perf_counter() - start)
 61    return float(statistics.median(times)), value
 62
 63
 64def _float(value: object) -> float:
 65    return float(value)  # central place for mypy/readability in row construction
 66
 67
 68def run_case(
 69    *,
 70    full_order: int,
 71    reduced_order: int,
 72    lattice_order: int,
 73    channels: int,
 74    n_markov: int,
 75    n_freq: int,
 76    block_rows: int,
 77    block_cols: int,
 78    candidate_gains: Iterable[float],
 79    static_gain_iterations: int,
 80    repeats: int,
 81    n_threads: int,
 82    seed: int,
 83) -> dict[str, object]:
 84    """Run one coupled-MIMO realization diagnostic case."""
 85
 86    A, B, C, D = coupled_state_space(full_order, channels, seed)
 87    markov = ld.mimo_state_space_markov_response(A, B, C, D, n_markov)
 88
 89    reduce_s, reduced_obj = median_runtime(
 90        lambda: ld.finite_hankel_reduce_mimo(
 91            markov,
 92            reduced_order=reduced_order,
 93            block_rows=block_rows,
 94            block_cols=block_cols,
 95        ),
 96        repeats,
 97    )
 98    reduced = dict(reduced_obj)  # type: ignore[arg-type]
 99
100    def realize() -> dict[str, object]:
101        return ld.experimental_mimo_state_space_to_matrix_lattice(
102            reduced["A"],
103            reduced["B"],
104            reduced["C"],
105            reduced["D"],
106            order=lattice_order,
107            n_markov=n_markov,
108            n_freq=n_freq,
109            candidate_gains=tuple(candidate_gains),
110            fit_static_gains=True,
111            static_gain_mode="both",
112            static_gain_iterations=static_gain_iterations,
113            n_threads=n_threads,
114        )
115
116    realize_s, fit_obj = median_runtime(realize, repeats)
117    fit = dict(fit_obj)  # type: ignore[arg-type]
118    raw_error = _float(fit["state_response_relative_error"])
119    compensated_error = _float(fit["static_gain_relative_error"])
120    improvement = raw_error / max(compensated_error, 1e-30)
121
122    return {
123        "full_order": int(full_order),
124        "reduced_order": int(reduced_order),
125        "lattice_order": int(lattice_order),
126        "channels": int(channels),
127        "reduce_s": reduce_s,
128        "realize_s": realize_s,
129        "reduced_state_radius": state_spectral_radius(np.asarray(reduced["A"], dtype=float)),
130        "retained_hankel_energy": _float(reduced["retained_hankel_energy"]),
131        "selected_gain": _float(fit["selected_gain"]),
132        "polar_factor_relative_error": _float(fit["polar_factor_relative_error"]),
133        "state_response_relative_error": raw_error,
134        "static_gain_relative_error": compensated_error,
135        "static_gain_improvement": improvement,
136        "static_gain_left_condition": _float(fit["static_gain_left_condition"]),
137        "static_gain_right_condition": _float(fit["static_gain_right_condition"]),
138        "unitarity_error": _float(fit["unitarity_error"]),
139        "max_reflection_singular_value": _float(fit["max_reflection_singular_value"]),
140        "target_gain_condition_span": _float(fit["target_gain_condition_span"]),
141        "diagnostic_classification": str(fit["diagnostic_classification"]),
142    }
143
144
145def parse_args() -> argparse.Namespace:
146    parser = argparse.ArgumentParser(description=__doc__)
147    parser.add_argument("--full-order", type=int, default=12)
148    parser.add_argument("--reduced-orders", type=int, nargs="+", default=[4, 6, 8])
149    parser.add_argument("--lattice-orders", type=int, nargs="+", default=[2, 4, 6])
150    parser.add_argument("--channels", type=int, default=3)
151    parser.add_argument("--n-markov", type=int, default=192)
152    parser.add_argument("--n-freq", type=int, default=192)
153    parser.add_argument("--block-rows", type=int, default=28)
154    parser.add_argument("--block-cols", type=int, default=28)
155    parser.add_argument(
156        "--candidate-gains",
157        type=float,
158        nargs="+",
159        default=[0.15, 0.25, 0.35, 0.45, 0.55, 0.70, 0.85],
160    )
161    parser.add_argument("--static-gain-iterations", type=int, default=20)
162    parser.add_argument("--repeats", type=int, default=1)
163    parser.add_argument("--n-threads", type=int, default=1)
164    parser.add_argument("--seed", type=int, default=901)
165    parser.add_argument(
166        "--output",
167        type=Path,
168        default=Path("reports/experimental-mimo-matrix-lattice-realization-sweep.json"),
169    )
170    parser.add_argument("--csv-output", type=Path, default=None)
171    return parser.parse_args()
172
173
174def _validate_args(args: argparse.Namespace) -> None:
175    if args.full_order <= 0:
176        raise SystemExit("--full-order must be positive")
177    if args.channels <= 0:
178        raise SystemExit("--channels must be positive")
179    if args.n_markov <= 8:
180        raise SystemExit("--n-markov must be larger than 8")
181    if args.n_freq < 8:
182        raise SystemExit("--n-freq must be at least 8")
183    if args.block_rows <= 0 or args.block_cols <= 0:
184        raise SystemExit("--block-rows and --block-cols must be positive")
185    if args.repeats <= 0:
186        raise SystemExit("--repeats must be positive")
187    if args.static_gain_iterations <= 0:
188        raise SystemExit("--static-gain-iterations must be positive")
189    if not args.candidate_gains or any(g < 0.0 or not np.isfinite(g) for g in args.candidate_gains):
190        raise SystemExit("--candidate-gains must be finite nonnegative values")
191    if any(r <= 0 for r in args.reduced_orders):
192        raise SystemExit("--reduced-orders must be positive")
193    if any(o < 0 for o in args.lattice_orders):
194        raise SystemExit("--lattice-orders must be nonnegative")
195
196
197def main() -> None:
198    args = parse_args()
199    _validate_args(args)
200
201    rows: list[dict[str, object]] = []
202    for reduced_order in args.reduced_orders:
203        for lattice_order in args.lattice_orders:
204            rows.append(
205                run_case(
206                    full_order=args.full_order,
207                    reduced_order=reduced_order,
208                    lattice_order=lattice_order,
209                    channels=args.channels,
210                    n_markov=args.n_markov,
211                    n_freq=args.n_freq,
212                    block_rows=args.block_rows,
213                    block_cols=args.block_cols,
214                    candidate_gains=args.candidate_gains,
215                    static_gain_iterations=args.static_gain_iterations,
216                    repeats=args.repeats,
217                    n_threads=args.n_threads,
218                    seed=args.seed + 100 * reduced_order + lattice_order,
219                )
220            )
221
222    payload = {
223        "python": platform.python_version(),
224        "platform": platform.platform(),
225        "has_openmp": bool(getattr(ld, "HAS_OPENMP", False)),
226        "full_order": args.full_order,
227        "channels": args.channels,
228        "n_markov": args.n_markov,
229        "n_freq": args.n_freq,
230        "block_rows": args.block_rows,
231        "block_cols": args.block_cols,
232        "candidate_gains": list(args.candidate_gains),
233        "static_gain_iterations": args.static_gain_iterations,
234        "repeats": args.repeats,
235        "n_threads": args.n_threads,
236        "description": (
237            "Experimental MIMO state-space to matrix-lattice realization diagnostic sweep. "
238            "The fitted lattice is an all-pass/polar scaffold; static gain compensation is reported "
239            "separately and this is not an exact matrix AAK/Nehari solver."
240        ),
241        "results": rows,
242    }
243
244    args.output.parent.mkdir(parents=True, exist_ok=True)
245    args.output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
246    if args.csv_output is not None:
247        args.csv_output.parent.mkdir(parents=True, exist_ok=True)
248        with args.csv_output.open("w", newline="", encoding="utf-8") as f:
249            writer = csv.DictWriter(f, fieldnames=list(rows[0]) if rows else [])
250            writer.writeheader()
251            writer.writerows(rows)
252
253    print(json.dumps({k: v for k, v in payload.items() if k != "results"}, indent=2))
254    print()
255    print(
256        f"{'red':>4} {'lat':>4} {'stable':>6} {'realize_s':>10} {'polar_err':>10} "
257        f"{'raw_err':>10} {'gain_err':>10} {'improve':>9} {'unitary':>10} {'class':>34}"
258    )
259    print("-" * 118)
260    for row in rows:
261        stable = bool(
262            float(row["max_reflection_singular_value"]) < 1.0
263            and float(row["reduced_state_radius"]) < 1.0
264        )
265        print(
266            f"{int(row['reduced_order']):4d} {int(row['lattice_order']):4d} {str(stable):>6} "
267            f"{float(row['realize_s']):10.4f} {float(row['polar_factor_relative_error']):10.3e} "
268            f"{float(row['state_response_relative_error']):10.3e} {float(row['static_gain_relative_error']):10.3e} "
269            f"{float(row['static_gain_improvement']):9.2f} {float(row['unitarity_error']):10.2e} "
270            f"{str(row['diagnostic_classification']):>34}"
271        )
272    print(f"\nWrote {args.output}")
273    if args.csv_output is not None:
274        print(f"Wrote {args.csv_output}")
275
276
277if __name__ == "__main__":
278    main()