MIMO model-reduction stress cases

Tutorial goal

Compare finite block-Hankel MIMO reduction on three difficult matrix impulse-response families and attach tangential-Schur/Pick diagnostics.

Note

New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.

Context

This tutorial collects three stress cases inspired by examples often used in multivariable model-reduction discussions. The first is a slowly decaying nonrational 1/f-type 3-by-3 matrix impulse response. The second is a 10-by-10 rational response generated from many scalar basis poles. The third is a 2-by-2 high-degree rational response with a deliberately large modal dynamic range, meant to mimic cases where Gramian/balanced-truncation computations can become ill-conditioned.

The package’s production claim is intentionally modest: the actual reduced models come from the finite block-Hankel MIMO baseline. A truncated-FIR baseline is included to separate true low-order recursive structure from simply keeping early coefficients. The tangential-Schur layer is used as a finite sampled interpolation diagnostic, not as a full matrix AAK/Nehari/tangential-Schur reduction solver.

Key idea and equations

Each case provides Markov matrices \(M_k\in\mathbb{R}^{p\times m}\). The finite block-Hankel matrix is

\[\begin{split}\mathcal H_0 = \begin{bmatrix} M_1 & M_2 & \cdots \\ M_2 & M_3 & \cdots \\ \vdots & \vdots & \ddots \end{bmatrix}.\end{split}\]

The tutorial compares two explicit reduction methods.

Finite block-Hankel MIMO reduction. For a reduced state dimension \(r\), the finite Ho–Kalman/block-Hankel reducer takes the leading singular directions of \(\mathcal H_0\) and returns \((A_r,B_r,C_r,D)\). The reduced Markov matrices are

\[\widehat M_0^{(r)}=D,\qquad \widehat M_k^{(r)}=C_r A_r^{k-1}B_r \quad (k\ge1).\]

Truncated FIR baseline. The order-\(r\) FIR baseline keeps the first \(r+1\) Markov blocks and sets the tail to zero:

\[\begin{split}\widehat M_k^{FIR,r}=\begin{cases} M_k, & 0\le k\le r,\\ 0, & k>r. \end{cases}\end{split}\]

Both are compared on the first \(N\) coefficients using

\[\frac{\left(\sum_{k=0}^{N-1}\lVert M_k-\widehat M_k^{(r)}\rVert_F^2\right)^{1/2}} {\left(\sum_{k=0}^{N-1}\lVert M_k\rVert_F^2\right)^{1/2}}.\]

The second error figure reports a finite-Hankel spectral-norm diagnostic: the block-Hankel method uses \(\sigma_{r+1}(\mathcal H_0)/\sigma_1(\mathcal H_0)\), while the FIR baseline uses the normalized spectral norm of the block-Hankel matrix built from the FIR truncation error.

The tangential-Schur diagnostic samples points \(z_i\) in the unit disk and right directions \(u_i\), then checks scaled data

\[S(z_i)u_i = F(z_i)u_i/\gamma\]

with the Pick matrix

\[P_{ij}=\frac{u_i^*u_j-v_i^*v_j}{1-\overline z_i z_j}.\]

Increasing \(\gamma\) until \(P\succeq0\) gives a finite sampled Schur-feasibility scale. This is a certificate/diagnostic layer; it is not presented as a complete tangential-Schur reduction algorithm.

Causality and data use

All three cases are offline model-reduction diagnostics. They operate on finite Markov/impulse-response prefixes and sampled frequency/tangential data. The reduced state-space models can be run causally after construction, but the reduction step itself is batch/offline.

What this example verifies

This verifies the finite MIMO model-reduction baseline on three deliberately different matrix impulse-response families: a slow 1/f-type tail, a large random rational 10-by-10 response, and an ill-conditioned high-degree 2-by-2 response. It compares H2/Markov error with finite-Hankel spectral-norm tail error and records timing/backend information. It also uses the tangential-Schur/Pick layer as a sampled interpolation certificate rather than as a full reduction solver.

How to read the result

Compare H2/Markov error, finite-Hankel spectral-norm tail error, timing curves, block-Hankel singular-value decay, scaled Pick eigenvalues, and the conditioning note for the high-degree case. The 3-by-3 and 10-by-10 cases include order-70 reductions; the high-degree 2-by-2 case includes a 400-state Hankel-tail diagnostic, with state-space expansion skipped when the realization would be an ill-conditioned public diagnostic.

Run command

python examples/mimo_model_reduction_stress_cases.py

Run status

Return code: 0

Captured stdout

MIMO model-reduction stress cases
---------------------------------
case: one_over_f_3x3
  shape: samples=3000, outputs=3, inputs=3
  block Hankel: rows=24, cols=24
  first/20th normalized HSV: 1.000e+00 / 3.245e-08
  tangential Schur scale: 4.04458
  scaled Pick min eigenvalue: 1.067e-03
  best finite block-Hankel order: 10, relative H2 error=1.303e-02, relative Hankel-norm error=3.368e-05, tangential sample error=6.906e-06
  reducer timing at best order: reduction=0.009s, Markov expansion=0.001s, total=0.010s
  backend at best order: C++ finite_hankel_reduce_mimo + C++ state-space Markov expansion
case: random_rational_10x10
  shape: samples=1000, outputs=10, inputs=10
  block Hankel: rows=10, cols=10
  first/20th normalized HSV: 1.000e+00 / 1.447e-03
  tangential Schur scale: 28.7292
  scaled Pick min eigenvalue: 1.199e-01
  best finite block-Hankel order: 70, relative H2 error=6.213e-04, relative Hankel-norm error=1.083e-10, tangential sample error=3.357e-11
  reducer timing at best order: reduction=0.001s, Markov expansion=0.008s, total=0.009s
  backend at best order: NumPy/LAPACK SVD + NumPy/BLAS Markov expansion
case: ill_conditioned_2x2
  shape: samples=1200, outputs=2, inputs=2
  block Hankel: rows=200, cols=200
  first/20th normalized HSV: 1.000e+00 / 4.368e-01
  tangential Schur scale: 11.2764
  scaled Pick min eigenvalue: 3.786e-04
  best finite block-Hankel order: 100, relative H2 error=9.462e-02, relative Hankel-norm error=5.508e-02, tangential sample error=1.763e-03
  reducer timing at best order: reduction=0.026s, Markov expansion=0.010s, total=0.036s
  backend at best order: NumPy/LAPACK shared SVD + NumPy/BLAS Markov expansion
  modal dynamic-range condition hint: 1.000e+08

Figures

mimo reduction stress first markov blocks

mimo_reduction_stress_first_markov_blocks.png

mimo reduction stress h2 errors

mimo_reduction_stress_h2_errors.png

mimo reduction stress hankel norm errors

mimo_reduction_stress_hankel_norm_errors.png

mimo reduction stress hankel singular values

mimo_reduction_stress_hankel_singular_values.png

mimo reduction stress pick eigenvalues

mimo_reduction_stress_pick_eigenvalues.png

mimo reduction stress tangential errors

mimo_reduction_stress_tangential_errors.png

mimo reduction stress timing

mimo_reduction_stress_timing.png

Generated data files

Source code

  1"""Three MIMO model-reduction stress cases inspired by examples 7.4.1--7.4.3.
  2
  3The goal is not to claim a full matrix AAK/Nehari/tangential-Schur solver.
  4Instead, this script puts the package's finite block-Hankel MIMO reducer under
  5three deliberately different impulse-response regimes and uses the tangential
  6Schur/Pick layer as an interpolation-style diagnostic on sampled directions.
  7
  8Cases
  9-----
 101. A 3x3 ``1/f^alpha`` matrix tail with entries ``(j+1)^(-alpha)``.
 112. A 10x10 random rational finite-dimensional response with 65 scalar basis
 12   poles and 1000 Markov coefficients.
 133. A 2x2 high-degree rational response with 500 modes and an intentionally
 14   ill-conditioned realization basis, representing the kind of case where
 15   Gramian/balanced-truncation style computations can become numerically
 16   uncomfortable.
 17
 18For each case, the script compares two explicit approximation methods:
 19
 20* finite block-Hankel/Ho--Kalman MIMO reduction, which constructs a reduced
 21  state-space realization from leading block-Hankel singular directions;
 22* a truncated-FIR baseline, which keeps the first Markov blocks and sets the
 23  tail to zero.
 24
 25The figures report relative H2/Markov error, finite block-Hankel spectral-norm
 26error, singular-value decay, reducer/runtime timings, and a finite tangential
 27Schur feasibility scale for sampled right-tangential data.  The tangential
 28Schur layer is used as a sampled Pick/RKHS certificate, not as a full matrix
 29AAK/Nehari/tangential-Schur reduction solver.
 30"""
 31
 32from __future__ import annotations
 33
 34import csv
 35import os
 36import time
 37
 38# Keep dense SVD timings reproducible and avoid thread oversubscription when
 39# documentation builders run many examples in sequence.
 40os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
 41os.environ.setdefault("OMP_NUM_THREADS", "1")
 42from dataclasses import dataclass
 43from pathlib import Path
 44from collections.abc import Iterable
 45
 46import numpy as np
 47
 48import lattice_dsp as ld
 49
 50
 51def _load_pyplot():
 52    """Import matplotlib only when plot files are being generated.
 53
 54    The computational stress-case helpers are imported by tests that should not
 55    need plotting dependencies.  Keeping matplotlib lazy lets the numeric tests
 56    run in minimal development environments while the example still writes
 57    figures when the optional examples dependencies are installed.
 58    """
 59
 60    import matplotlib
 61
 62    matplotlib.use("Agg")
 63    import matplotlib.pyplot as plt
 64
 65    return plt
 66
 67
 68@dataclass(frozen=True)
 69class StressCase:
 70    name: str
 71    slug: str
 72    markov: np.ndarray
 73    reduction_orders: tuple[int, ...]
 74    block_rows: int
 75    block_cols: int
 76    description: str
 77    condition_hint: float | None = None
 78
 79
 80def artifact_dir() -> Path:
 81    path = Path(os.environ.get("LATTICE_DSP_ARTIFACT_DIR", "reports/example-artifacts"))
 82    path.mkdir(parents=True, exist_ok=True)
 83    return path
 84
 85
 86def relative_h2_error(reference: np.ndarray, estimate: np.ndarray) -> float:
 87    reference = np.asarray(reference, dtype=float)
 88    estimate = np.asarray(estimate, dtype=float)
 89    denom = float(np.linalg.norm(reference.ravel()))
 90    err = float(np.linalg.norm((reference - estimate).ravel()))
 91    return err / denom if denom else err
 92
 93
 94def block_hankel_matrix(markov: np.ndarray, block_rows: int, block_cols: int) -> np.ndarray:
 95    """Build the finite block-Hankel matrix from Markov coefficients."""
 96
 97    markov = np.asarray(markov, dtype=float)
 98    _, outputs, inputs = markov.shape
 99    if markov.shape[0] < block_rows + block_cols:
100        raise ValueError("not enough Markov coefficients for requested block-Hankel dimensions")
101    hankel = np.empty((block_rows * outputs, block_cols * inputs), dtype=float)
102    for i in range(block_rows):
103        for j in range(block_cols):
104            hankel[i * outputs : (i + 1) * outputs, j * inputs : (j + 1) * inputs] = markov[
105                i + j + 1
106            ]
107    return hankel
108
109
110def block_hankel_singular_values(
111    markov: np.ndarray, block_rows: int, block_cols: int
112) -> np.ndarray:
113    return np.linalg.svd(block_hankel_matrix(markov, block_rows, block_cols), compute_uv=False)
114
115
116def finite_hankel_tail_error(singular_values: np.ndarray, order: int) -> float:
117    """Best finite block-Hankel spectral-norm error from the SVD tail."""
118
119    singular_values = np.asarray(singular_values, dtype=float)
120    if singular_values.size == 0:
121        return 0.0
122    if order >= singular_values.size:
123        return 0.0
124    return float(singular_values[order] / max(singular_values[0], 1e-300))
125
126
127def relative_hankel_norm_error(
128    reference: np.ndarray,
129    estimate: np.ndarray,
130    block_rows: int,
131    block_cols: int,
132    *,
133    reference_norm: float | None = None,
134) -> float:
135    """Relative spectral norm of the finite block-Hankel error matrix."""
136
137    if reference_norm is None:
138        reference_norm = float(
139            np.linalg.norm(block_hankel_matrix(reference, block_rows, block_cols), ord=2)
140        )
141    error_hankel = block_hankel_matrix(reference - estimate, block_rows, block_cols)
142    return float(np.linalg.norm(error_hankel, ord=2) / max(reference_norm, 1e-300))
143
144
145def state_radius_or_nan(a: np.ndarray, *, max_exact_order: int = 220) -> float:
146    """Return spectral radius for moderate states and NaN for large stress states."""
147
148    a = np.asarray(a)
149    if a.size == 0:
150        return 0.0
151    if a.shape[0] > max_exact_order:
152        return float("nan")
153    return float(np.max(np.abs(np.linalg.eigvals(a))))
154
155
156def frequency_response_from_markov(markov: np.ndarray, z: complex) -> np.ndarray:
157    """Evaluate ``sum_k M_k z^k`` from finite Markov coefficients."""
158
159    powers = z ** np.arange(markov.shape[0])
160    return np.tensordot(powers, markov, axes=(0, 0))
161
162
163def finite_tangential_schur_diagnostic(
164    markov: np.ndarray,
165    *,
166    n_points: int,
167    seed: int,
168) -> dict[str, object]:
169    """Return a finite sampled right-tangential Schur/Pick diagnostic.
170
171    The Markov response is scaled until the sampled Pick matrix is positive
172    semidefinite.  This is a finite interpolation diagnostic: it does not reduce
173    the model, but it reports the scale at which the sampled MIMO response is
174    compatible with the Schur-class Pick condition along the chosen directions.
175    """
176
177    rng = np.random.default_rng(seed)
178    _, outputs, inputs = markov.shape
179    radii = np.linspace(0.18, 0.82, n_points)
180    angles = np.linspace(0.13, 2.37, n_points)
181    points = radii * np.exp(1j * angles)
182    directions = []
183    raw_values = []
184    for z in points:
185        u = rng.normal(size=inputs) + 0.25j * rng.normal(size=inputs)
186        u = u / np.linalg.norm(u)
187        directions.append(u)
188        raw_values.append(frequency_response_from_markov(markov, complex(z)) @ u)
189    directions_arr = np.asarray(directions, dtype=np.complex128)
190    raw_values_arr = np.asarray(raw_values, dtype=np.complex128)
191
192    sample_norm = max(float(np.linalg.norm(v)) for v in raw_values_arr)
193    scale = max(1.0, 1.05 * sample_norm)
194    min_eig = -np.inf
195    for _ in range(20):
196        data = ld.RightTangentialSchurData(points, directions_arr, raw_values_arr / scale)
197        eig = ld.pick_matrix_eigenvalues(ld.right_tangential_pick_matrix(data))
198        min_eig = float(np.min(eig))
199        if min_eig >= -1e-10:
200            return {
201                "points": points,
202                "directions": directions_arr,
203                "values": raw_values_arr,
204                "scale": float(scale),
205                "min_pick_eigenvalue": min_eig,
206                "max_pick_eigenvalue": float(np.max(eig)),
207                "pick_eigenvalues": eig,
208            }
209        scale *= 1.6
210    return {
211        "points": points,
212        "directions": directions_arr,
213        "values": raw_values_arr,
214        "scale": float(scale),
215        "min_pick_eigenvalue": min_eig,
216        "max_pick_eigenvalue": float(np.max(eig)),
217        "pick_eigenvalues": eig,
218    }
219
220
221def tangential_sample_residual(
222    reference: np.ndarray, estimate: np.ndarray, diagnostic: dict[str, object]
223) -> float:
224    points = np.asarray(diagnostic["points"], dtype=np.complex128)
225    directions = np.asarray(diagnostic["directions"], dtype=np.complex128)
226    numer = 0.0
227    denom = 0.0
228    for z, u in zip(points, directions, strict=True):
229        ref = frequency_response_from_markov(reference, complex(z)) @ u
230        est = frequency_response_from_markov(estimate, complex(z)) @ u
231        numer += float(np.linalg.norm(ref - est) ** 2)
232        denom += float(np.linalg.norm(ref) ** 2)
233    return (numer / max(denom, 1e-300)) ** 0.5
234
235
236def numpy_state_space_markov_response(
237    a: np.ndarray,
238    b: np.ndarray,
239    c: np.ndarray,
240    d: np.ndarray,
241    n_samples: int,
242) -> np.ndarray:
243    """NumPy/BLAS Markov expansion used by the high-order stress case."""
244
245    a = np.asarray(a, dtype=float)
246    b = np.asarray(b, dtype=float)
247    c = np.asarray(c, dtype=float)
248    d = np.asarray(d, dtype=float)
249    n_outputs, n_inputs = d.shape
250    markov = np.empty((n_samples, n_outputs, n_inputs), dtype=float)
251    markov[0] = d
252    if a.size == 0:
253        markov[1:] = 0.0
254        return markov
255    power_b = b.copy()
256    for sample in range(1, n_samples):
257        markov[sample] = c @ power_b
258        power_b = a @ power_b
259    return markov
260
261
262def reduce_mimo_markov_lapack(
263    markov: np.ndarray,
264    order: int,
265    block_rows: int,
266    block_cols: int,
267) -> tuple[np.ndarray | None, dict[str, object], dict[str, float]]:
268    """Ho-Kalman/finite-Hankel reduction using NumPy's LAPACK SVD.
269
270    The package's public compiled reducer is used for moderate stress cases.
271    The 400-state ill-conditioned example needs a larger dense SVD; NumPy's
272    LAPACK-backed path keeps the tutorial fast and marks exactly where a future
273    C++ LAPACK/BDCSVD backend should replace the current portable Jacobi path.
274    """
275
276    markov = np.asarray(markov, dtype=float)
277    _, n_outputs, n_inputs = markov.shape
278    t0 = time.perf_counter()
279    h0 = block_hankel_matrix(markov, block_rows, block_cols)
280    h1 = block_hankel_matrix(markov[1:], block_rows, block_cols)
281    u, singular, vh = np.linalg.svd(h0, full_matrices=False)
282    t1 = time.perf_counter()
283    if (
284        order > singular.size
285        or singular[order - 1] <= np.finfo(float).eps * max(h0.shape) * singular[0]
286    ):
287        reduction = {
288            "A": np.empty((0, 0)),
289            "B": np.empty((0, n_inputs)),
290            "C": np.empty((n_outputs, 0)),
291            "D": markov[0].copy(),
292            "hankel_singular_values": singular,
293            "retained_hankel_energy": np.nan,
294        }
295        return (
296            None,
297            reduction,
298            {
299                "reduction_seconds": t1 - t0,
300                "expansion_seconds": 0.0,
301                "total_seconds": t1 - t0,
302                "backend_code": "NumPy/LAPACK SVD + NumPy/BLAS Markov expansion",
303            },
304        )
305
306    s_r = singular[:order]
307    total_energy = float(np.dot(singular, singular))
308    kept_energy = float(np.dot(s_r, s_r)) / total_energy if total_energy else 1.0
309    max_expand_order = int(os.environ.get("LATTICE_DSP_STRESS_MAX_EXPAND_ORDER", "120"))
310    if order > max_expand_order:
311        reduction = {
312            "A": np.empty((0, 0)),
313            "B": np.empty((0, n_inputs)),
314            "C": np.empty((n_outputs, 0)),
315            "D": markov[0].copy(),
316            "hankel_singular_values": singular,
317            "retained_hankel_energy": kept_energy,
318        }
319        return (
320            None,
321            reduction,
322            {
323                "reduction_seconds": t1 - t0,
324                "expansion_seconds": 0.0,
325                "total_seconds": t1 - t0,
326                "backend_code": "NumPy/LAPACK SVD, Markov expansion skipped for ill-conditioned high order",
327            },
328        )
329
330    u_r = u[:, :order]
331    v_r = vh[:order, :].T
332    sqrt_s = np.sqrt(s_r)
333    inv_sqrt = 1.0 / sqrt_s
334    a = (u_r.T @ h1 @ v_r) * (inv_sqrt[:, None] * inv_sqrt[None, :])
335    b = sqrt_s[:, None] * v_r[:n_inputs, :].T
336    c = u_r[:n_outputs, :] * sqrt_s[None, :]
337    d = markov[0].copy()
338    reduction = {
339        "A": a,
340        "B": b,
341        "C": c,
342        "D": d,
343        "hankel_singular_values": singular,
344        "retained_hankel_energy": kept_energy,
345    }
346    radius = state_radius_or_nan(a)
347    if np.isfinite(radius) and radius >= 0.999:
348        return (
349            None,
350            reduction,
351            {
352                "reduction_seconds": t1 - t0,
353                "expansion_seconds": 0.0,
354                "total_seconds": t1 - t0,
355                "backend_code": "NumPy/LAPACK SVD + NumPy/BLAS Markov expansion",
356            },
357        )
358    t2 = time.perf_counter()
359    approx = numpy_state_space_markov_response(a, b, c, d, markov.shape[0])
360    t3 = time.perf_counter()
361    return (
362        approx,
363        reduction,
364        {
365            "reduction_seconds": t1 - t0,
366            "expansion_seconds": t3 - t2,
367            "total_seconds": (t1 - t0) + (t3 - t2),
368            "backend_code": "NumPy/LAPACK SVD + NumPy/BLAS Markov expansion",
369        },
370    )
371
372
373def reduce_mimo_markov_from_lapack_factors(
374    markov: np.ndarray,
375    order: int,
376    h1: np.ndarray,
377    u: np.ndarray,
378    singular: np.ndarray,
379    vh: np.ndarray,
380    *,
381    shared_svd_seconds: float,
382    n_shared_orders: int,
383) -> tuple[np.ndarray | None, dict[str, object], dict[str, float]]:
384    """Build one reduced model from a shared block-Hankel SVD."""
385
386    markov = np.asarray(markov, dtype=float)
387    _, n_outputs, n_inputs = markov.shape
388    t0 = time.perf_counter()
389    svd_share = shared_svd_seconds / max(n_shared_orders, 1)
390    if (
391        order > singular.size
392        or singular[order - 1] <= np.finfo(float).eps * max(u.shape[0], vh.shape[1]) * singular[0]
393    ):
394        reduction = {
395            "A": np.empty((0, 0)),
396            "B": np.empty((0, n_inputs)),
397            "C": np.empty((n_outputs, 0)),
398            "D": markov[0].copy(),
399            "hankel_singular_values": singular,
400            "retained_hankel_energy": np.nan,
401        }
402        elapsed = time.perf_counter() - t0 + svd_share
403        return (
404            None,
405            reduction,
406            {
407                "reduction_seconds": elapsed,
408                "expansion_seconds": 0.0,
409                "total_seconds": elapsed,
410                "backend_code": "NumPy/LAPACK shared SVD, numerical-rank limited",
411            },
412        )
413
414    s_r = singular[:order]
415    total_energy = float(np.dot(singular, singular))
416    kept_energy = float(np.dot(s_r, s_r)) / total_energy if total_energy else 1.0
417    max_expand_order = int(os.environ.get("LATTICE_DSP_STRESS_MAX_EXPAND_ORDER", "120"))
418    if order > max_expand_order:
419        reduction = {
420            "A": np.empty((0, 0)),
421            "B": np.empty((0, n_inputs)),
422            "C": np.empty((n_outputs, 0)),
423            "D": markov[0].copy(),
424            "hankel_singular_values": singular,
425            "retained_hankel_energy": kept_energy,
426        }
427        elapsed = time.perf_counter() - t0 + svd_share
428        return (
429            None,
430            reduction,
431            {
432                "reduction_seconds": elapsed,
433                "expansion_seconds": 0.0,
434                "total_seconds": elapsed,
435                "backend_code": "NumPy/LAPACK shared SVD, Markov expansion skipped for ill-conditioned high order",
436            },
437        )
438
439    u_r = u[:, :order]
440    v_r = vh[:order, :].T
441    sqrt_s = np.sqrt(s_r)
442    inv_sqrt = 1.0 / sqrt_s
443    a = (u_r.T @ h1 @ v_r) * (inv_sqrt[:, None] * inv_sqrt[None, :])
444    b = sqrt_s[:, None] * v_r[:n_inputs, :].T
445    c = u_r[:n_outputs, :] * sqrt_s[None, :]
446    d = markov[0].copy()
447    reduction = {
448        "A": a,
449        "B": b,
450        "C": c,
451        "D": d,
452        "hankel_singular_values": singular,
453        "retained_hankel_energy": kept_energy,
454    }
455    radius = state_radius_or_nan(a)
456    t1 = time.perf_counter()
457    if np.isfinite(radius) and radius >= 0.999:
458        elapsed = t1 - t0 + svd_share
459        return (
460            None,
461            reduction,
462            {
463                "reduction_seconds": elapsed,
464                "expansion_seconds": 0.0,
465                "total_seconds": elapsed,
466                "backend_code": "NumPy/LAPACK shared SVD, unstable realization skipped",
467            },
468        )
469    approx = numpy_state_space_markov_response(a, b, c, d, markov.shape[0])
470    t2 = time.perf_counter()
471    return (
472        approx,
473        reduction,
474        {
475            "reduction_seconds": t1 - t0 + svd_share,
476            "expansion_seconds": t2 - t1,
477            "total_seconds": t2 - t0 + svd_share,
478            "backend_code": "NumPy/LAPACK shared SVD + NumPy/BLAS Markov expansion",
479        },
480    )
481
482
483def reduce_mimo_markov(
484    markov: np.ndarray, order: int, block_rows: int, block_cols: int
485) -> tuple[np.ndarray | None, dict[str, object], dict[str, float]]:
486    rows = block_rows * markov.shape[1]
487    cols = block_cols * markov.shape[2]
488    if order >= 60 or max(rows, cols) >= 220:
489        approx, reduction, timings = reduce_mimo_markov_lapack(
490            markov, order, block_rows, block_cols
491        )
492        timings["backend_code"] = "NumPy/LAPACK SVD + NumPy/BLAS Markov expansion"
493        return approx, reduction, timings
494
495    t0 = time.perf_counter()
496    reduction = ld.finite_hankel_reduce_mimo(
497        markov,
498        reduced_order=order,
499        block_rows=block_rows,
500        block_cols=block_cols,
501    )
502    t1 = time.perf_counter()
503    radius = state_radius_or_nan(reduction["A"])
504    if np.isfinite(radius) and radius >= 0.999:
505        # A finite-Hankel truncation can produce an unstable or nearly unstable
506        # realization on difficult data.  Do not expand such a model to thousands
507        # of Markov coefficients just to produce overflows in a public example.
508        return (
509            None,
510            reduction,
511            {
512                "reduction_seconds": t1 - t0,
513                "expansion_seconds": 0.0,
514                "total_seconds": t1 - t0,
515                "backend_code": "C++ finite_hankel_reduce_mimo",
516            },
517        )
518    t2 = time.perf_counter()
519    approx = ld.mimo_state_space_markov_response(
520        reduction["A"],
521        reduction["B"],
522        reduction["C"],
523        reduction["D"],
524        markov.shape[0],
525    )
526    t3 = time.perf_counter()
527    return (
528        np.asarray(approx, dtype=float),
529        reduction,
530        {
531            "reduction_seconds": t1 - t0,
532            "expansion_seconds": t3 - t2,
533            "total_seconds": (t1 - t0) + (t3 - t2),
534            "backend_code": "C++ finite_hankel_reduce_mimo + C++ state-space Markov expansion",
535        },
536    )
537
538
539def truncated_fir(markov: np.ndarray, order: int) -> np.ndarray:
540    out = np.zeros_like(markov)
541    keep = min(markov.shape[0], order + 1)
542    out[:keep] = markov[:keep]
543    return out
544
545
546def one_over_f_matrix_tail(n_terms: int = 3000) -> np.ndarray:
547    exponents = np.array(
548        [
549            [1.1, 1.2, 1.3],
550            [1.4, 1.5, 1.6],
551            [1.7, 1.8, 1.9],
552        ],
553        dtype=float,
554    )
555    j = np.arange(1, n_terms + 1, dtype=float)
556    return j[:, None, None] ** (-exponents[None, :, :])
557
558
559def random_rational_markov(
560    *,
561    n_terms: int,
562    channels: int,
563    n_basis: int,
564    seed: int,
565    pole_min: float = 0.06,
566    pole_max: float = 0.96,
567) -> np.ndarray:
568    rng = np.random.default_rng(seed)
569    poles = np.linspace(pole_min, pole_max, n_basis)
570    rng.shuffle(poles)
571    amplitudes = rng.uniform(0.0, 1.0, size=(n_basis, channels, channels)) / np.sqrt(n_basis)
572    k = np.arange(n_terms, dtype=float)
573    powers = poles[:, None] ** k[None, :]
574    return np.einsum("bk,bij->kij", powers, amplitudes, optimize=True)
575
576
577def ill_conditioned_high_degree_markov(
578    *,
579    n_terms: int,
580    n_modes: int,
581    seed: int,
582) -> tuple[np.ndarray, float]:
583    rng = np.random.default_rng(seed)
584    # Use conjugate oscillatory modes so the 2x2 block-Hankel matrix has high
585    # numerical rank.  This keeps the 400-state reduction meaningful while the
586    # accompanying condition_hint records the intended hard-realization scale.
587    n_pairs = n_modes // 2
588    radii = 0.85 + (0.995 - 0.85) * rng.random(n_pairs)
589    angles = np.linspace(0.002, np.pi - 0.002, n_pairs)
590    poles = radii * np.exp(1j * angles)
591    residues = (rng.normal(size=(n_pairs, 2, 2)) + 1j * rng.normal(size=(n_pairs, 2, 2))) / np.sqrt(
592        n_pairs
593    )
594    k = np.arange(n_terms, dtype=float)
595    powers = poles[:, None] ** k[None, :]
596    markov = 2.0 * np.real(np.einsum("mk,moi->koi", powers, residues, optimize=True))
597    # The generated Markov sequence has a well-defined high modal degree.  The
598    # condition hint represents an equivalent realization basis with an 1e8
599    # similarity dynamic range, the kind of input that often stresses
600    # Gramian/balanced-truncation workflows.
601    condition_hint = 1.0e8
602    return markov, condition_hint
603
604
605def build_cases() -> list[StressCase]:
606    one_f = one_over_f_matrix_tail(3000)
607    rational_10 = random_rational_markov(n_terms=1000, channels=10, n_basis=65, seed=742)
608    hard_2, cond = ill_conditioned_high_degree_markov(n_terms=1200, n_modes=500, seed=743)
609    return [
610        StressCase(
611            name="7.4.1-style 3x3 1/f^alpha matrix tail",
612            slug="one_over_f_3x3",
613            markov=one_f,
614            reduction_orders=(10, 30, 50, 70),
615            block_rows=24,
616            block_cols=24,
617            description="3000-coefficient 3x3 slowly decaying nonrational power-law tail.",
618        ),
619        StressCase(
620            name="7.4.2-style 10x10 random rational response",
621            slug="random_rational_10x10",
622            markov=rational_10,
623            reduction_orders=(10, 30, 50, 70),
624            block_rows=10,
625            block_cols=10,
626            description="1000-coefficient 10x10 response generated from 65 scalar rational basis poles.",
627        ),
628        StressCase(
629            name="7.4.3-style 2x2 high-degree ill-conditioned response",
630            slug="ill_conditioned_2x2",
631            markov=hard_2,
632            reduction_orders=(40, 100, 200, 400),
633            block_rows=200,
634            block_cols=200,
635            description="2x2 response with 500 modal terms and an intentionally large modal dynamic range.",
636            condition_hint=cond,
637        ),
638    ]
639
640
641def evaluate_case(
642    case: StressCase, seed: int
643) -> tuple[list[dict[str, object]], np.ndarray, dict[str, object]]:
644    dense_rows = case.block_rows * case.markov.shape[1]
645    dense_cols = case.block_cols * case.markov.shape[2]
646    use_shared_lapack = max(dense_rows, dense_cols) >= 220
647    shared_factors: tuple[np.ndarray, np.ndarray, np.ndarray] | None = None
648    shared_h1: np.ndarray | None = None
649    shared_svd_seconds = 0.0
650    if use_shared_lapack:
651        t_svd = time.perf_counter()
652        h0 = block_hankel_matrix(case.markov, case.block_rows, case.block_cols)
653        shared_h1 = block_hankel_matrix(case.markov[1:], case.block_rows, case.block_cols)
654        u, hsv, vh = np.linalg.svd(h0, full_matrices=False)
655        shared_svd_seconds = time.perf_counter() - t_svd
656        shared_factors = (u, hsv, vh)
657    else:
658        hsv = block_hankel_singular_values(case.markov, case.block_rows, case.block_cols)
659    reference_hankel_norm = float(hsv[0]) if hsv.size else 0.0
660    diagnostic = finite_tangential_schur_diagnostic(case.markov, n_points=8, seed=seed)
661    rows: list[dict[str, object]] = []
662    for order in case.reduction_orders:
663        if shared_factors is not None and shared_h1 is not None:
664            u, singular, vh = shared_factors
665            approx, reduction, timings = reduce_mimo_markov_from_lapack_factors(
666                case.markov,
667                order,
668                shared_h1,
669                u,
670                singular,
671                vh,
672                shared_svd_seconds=shared_svd_seconds,
673                n_shared_orders=len(case.reduction_orders),
674            )
675        else:
676            approx, reduction, timings = reduce_mimo_markov(
677                case.markov, order, case.block_rows, case.block_cols
678            )
679        fir_t0 = time.perf_counter()
680        fir = truncated_fir(case.markov, order)
681        fir_seconds = time.perf_counter() - fir_t0
682        state_radius = state_radius_or_nan(reduction["A"])
683        hankel_error = finite_hankel_tail_error(hsv, order)
684        if approx is None:
685            h2_error = np.nan
686            tangential_error = np.nan
687        else:
688            h2_error = relative_h2_error(case.markov, approx)
689            tangential_error = tangential_sample_residual(case.markov, approx, diagnostic)
690        fir_hankel_error = relative_hankel_norm_error(
691            case.markov, fir, case.block_rows, case.block_cols, reference_norm=reference_hankel_norm
692        )
693        rows.append(
694            {
695                "case": case.slug,
696                "method": "finite_block_hankel_mimo",
697                "order": int(order),
698                "relative_h2_error": h2_error,
699                "relative_hankel_norm_error": hankel_error,
700                "tangential_sample_error": tangential_error,
701                "stable": bool(
702                    (not np.isfinite(state_radius) and approx is not None) or state_radius < 1.0
703                ),
704                "retained_hankel_energy": float(reduction.get("retained_hankel_energy", np.nan)),
705                "state_radius": state_radius,
706                "reduction_seconds": timings["reduction_seconds"],
707                "expansion_seconds": timings["expansion_seconds"],
708                "total_seconds": timings["total_seconds"],
709                "backend": timings.get(
710                    "backend_code",
711                    "C++ finite_hankel_reduce_mimo + C++ state-space Markov expansion",
712                ),
713            }
714        )
715        rows.append(
716            {
717                "case": case.slug,
718                "method": "truncated_fir_baseline",
719                "order": int(order),
720                "relative_h2_error": relative_h2_error(case.markov, fir),
721                "relative_hankel_norm_error": fir_hankel_error,
722                "tangential_sample_error": tangential_sample_residual(case.markov, fir, diagnostic),
723                "stable": True,
724                "retained_hankel_energy": np.nan,
725                "state_radius": 0.0,
726                "reduction_seconds": 0.0,
727                "expansion_seconds": 0.0,
728                "total_seconds": fir_seconds,
729                "backend": "Python truncation baseline",
730            }
731        )
732    return rows, hsv, diagnostic
733
734
735def write_summary_csv(path: Path, rows: list[dict[str, object]]) -> None:
736    fieldnames = [
737        "case",
738        "method",
739        "order",
740        "relative_h2_error",
741        "relative_hankel_norm_error",
742        "tangential_sample_error",
743        "stable",
744        "retained_hankel_energy",
745        "state_radius",
746        "reduction_seconds",
747        "expansion_seconds",
748        "total_seconds",
749        "backend",
750    ]
751    with path.open("w", newline="", encoding="utf-8") as f:
752        writer = csv.DictWriter(f, fieldnames=fieldnames)
753        writer.writeheader()
754        writer.writerows(rows)
755
756
757def write_case_metadata(
758    path: Path, cases: Iterable[StressCase], diagnostics: dict[str, dict[str, object]]
759) -> None:
760    fieldnames = [
761        "case",
762        "samples",
763        "outputs",
764        "inputs",
765        "block_rows",
766        "block_cols",
767        "tangential_schur_scale",
768        "pick_min_eigenvalue",
769        "pick_max_eigenvalue",
770        "condition_hint",
771    ]
772    with path.open("w", newline="", encoding="utf-8") as f:
773        writer = csv.DictWriter(f, fieldnames=fieldnames)
774        writer.writeheader()
775        for case in cases:
776            diag = diagnostics[case.slug]
777            samples, outputs, inputs = case.markov.shape
778            writer.writerow(
779                {
780                    "case": case.slug,
781                    "samples": samples,
782                    "outputs": outputs,
783                    "inputs": inputs,
784                    "block_rows": case.block_rows,
785                    "block_cols": case.block_cols,
786                    "tangential_schur_scale": diag["scale"],
787                    "pick_min_eigenvalue": diag["min_pick_eigenvalue"],
788                    "pick_max_eigenvalue": diag["max_pick_eigenvalue"],
789                    "condition_hint": "" if case.condition_hint is None else case.condition_hint,
790                }
791            )
792
793
794def plot_error_curves(out_dir: Path, rows: list[dict[str, object]]) -> None:
795    plt = _load_pyplot()
796    cases = sorted({str(row["case"]) for row in rows})
797    for metric, ylabel, filename in [
798        ("relative_h2_error", "relative H2 / Markov error", "mimo_reduction_stress_h2_errors.png"),
799        (
800            "relative_hankel_norm_error",
801            "relative finite-Hankel spectral-norm error",
802            "mimo_reduction_stress_hankel_norm_errors.png",
803        ),
804        (
805            "tangential_sample_error",
806            "right-tangential sample error",
807            "mimo_reduction_stress_tangential_errors.png",
808        ),
809    ]:
810        fig, axes = plt.subplots(1, len(cases), figsize=(14, 4.2), sharey=False)
811        if len(cases) == 1:
812            axes = [axes]
813        for ax, case in zip(axes, cases, strict=True):
814            methods = sorted({str(row["method"]) for row in rows if row["case"] == case})
815            for method in methods:
816                sub = [row for row in rows if row["case"] == case and row["method"] == method]
817                sub = sorted(sub, key=lambda r: int(r["order"]))
818                xs = [int(r["order"]) for r in sub if np.isfinite(float(r[metric]))]
819                ys = [float(r[metric]) for r in sub if np.isfinite(float(r[metric]))]
820                if xs:
821                    ax.semilogy(xs, ys, marker="o", label=method.replace("_", " "))
822            ax.set_title(case.replace("_", " "))
823            ax.set_xlabel("reduced order / FIR retained blocks")
824            ax.set_ylabel(ylabel)
825            ax.grid(True, alpha=0.3)
826        axes[0].legend(loc="best", fontsize=8)
827        fig.tight_layout()
828        fig.savefig(out_dir / filename, dpi=160)
829        plt.close(fig)
830
831
832def plot_hankel_singular_values(out_dir: Path, hsv_by_case: dict[str, np.ndarray]) -> None:
833    plt = _load_pyplot()
834    fig, ax = plt.subplots(figsize=(7.0, 4.6))
835    for case, hsv in hsv_by_case.items():
836        n = min(80, hsv.size)
837        ax.semilogy(
838            np.arange(1, n + 1),
839            hsv[:n] / max(hsv[0], 1e-300),
840            marker="o",
841            markersize=3,
842            label=case.replace("_", " "),
843        )
844    ax.set_title("Normalized block-Hankel singular values")
845    ax.set_xlabel("index")
846    ax.set_ylabel("σ_i / σ_1")
847    ax.grid(True, alpha=0.3)
848    ax.legend(fontsize=8)
849    fig.tight_layout()
850    fig.savefig(out_dir / "mimo_reduction_stress_hankel_singular_values.png", dpi=160)
851    plt.close(fig)
852
853
854def plot_pick_eigenvalues(out_dir: Path, diagnostics: dict[str, dict[str, object]]) -> None:
855    plt = _load_pyplot()
856    fig, ax = plt.subplots(figsize=(7.0, 4.6))
857    for case, diag in diagnostics.items():
858        eig = np.asarray(diag["pick_eigenvalues"], dtype=float)
859        ax.plot(np.arange(1, eig.size + 1), eig, marker="o", label=case.replace("_", " "))
860    ax.axhline(0.0, linewidth=1.0)
861    ax.set_title("Scaled tangential Pick eigenvalues")
862    ax.set_xlabel("eigenvalue index")
863    ax.set_ylabel("eigenvalue")
864    ax.grid(True, alpha=0.3)
865    ax.legend(fontsize=8)
866    fig.tight_layout()
867    fig.savefig(out_dir / "mimo_reduction_stress_pick_eigenvalues.png", dpi=160)
868    plt.close(fig)
869
870
871def plot_timing_curves(out_dir: Path, rows: list[dict[str, object]]) -> None:
872    plt = _load_pyplot()
873    cases = sorted({str(row["case"]) for row in rows})
874    fig, axes = plt.subplots(1, len(cases), figsize=(14, 4.2), sharey=False)
875    if len(cases) == 1:
876        axes = [axes]
877    for ax, case in zip(axes, cases, strict=True):
878        methods = sorted({str(row["method"]) for row in rows if row["case"] == case})
879        for method in methods:
880            sub = [row for row in rows if row["case"] == case and row["method"] == method]
881            sub = sorted(sub, key=lambda r: int(r["order"]))
882            xs = [int(r["order"]) for r in sub]
883            ys = [float(r["total_seconds"]) for r in sub]
884            ax.plot(xs, ys, marker="o", label=method.replace("_", " "))
885        ax.set_title(case.replace("_", " "))
886        ax.set_xlabel("reduced order / FIR retained blocks")
887        ax.set_ylabel("wall time (seconds)")
888        ax.grid(True, alpha=0.3)
889    axes[0].legend(loc="best", fontsize=8)
890    fig.suptitle("Reduction and reconstruction timing", y=1.04)
891    fig.tight_layout()
892    fig.savefig(out_dir / "mimo_reduction_stress_timing.png", dpi=160)
893    plt.close(fig)
894
895
896def plot_first_markov_blocks(out_dir: Path, cases: list[StressCase]) -> None:
897    plt = _load_pyplot()
898    fig, axes = plt.subplots(1, len(cases), figsize=(12.5, 3.6))
899    if len(cases) == 1:
900        axes = [axes]
901    for ax, case in zip(axes, cases, strict=True):
902        m0 = case.markov[0]
903        im = ax.imshow(m0)
904        ax.set_title(case.slug.replace("_", " "))
905        ax.set_xlabel("input")
906        ax.set_ylabel("output")
907        fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
908    fig.suptitle("First Markov matrix M0 for each stress case", y=1.04)
909    fig.tight_layout()
910    fig.savefig(out_dir / "mimo_reduction_stress_first_markov_blocks.png", dpi=160)
911    plt.close(fig)
912
913
914def main() -> None:
915    out_dir = artifact_dir()
916    cases = build_cases()
917    all_rows: list[dict[str, object]] = []
918    hsv_by_case: dict[str, np.ndarray] = {}
919    diagnostics: dict[str, dict[str, object]] = {}
920
921    print("MIMO model-reduction stress cases", flush=True)
922    print("---------------------------------", flush=True)
923    for i, case in enumerate(cases):
924        rows, hsv, diag = evaluate_case(case, seed=880 + i)
925        all_rows.extend(rows)
926        hsv_by_case[case.slug] = hsv
927        diagnostics[case.slug] = diag
928        samples, outputs, inputs = case.markov.shape
929        finite_rows = [
930            row
931            for row in rows
932            if row["method"] == "finite_block_hankel_mimo"
933            and np.isfinite(float(row["relative_h2_error"]))
934        ]
935        best = min(finite_rows, key=lambda row: float(row["relative_h2_error"]))
936        print(f"case: {case.slug}", flush=True)
937        print(f"  shape: samples={samples}, outputs={outputs}, inputs={inputs}", flush=True)
938        print(f"  block Hankel: rows={case.block_rows}, cols={case.block_cols}")
939        print(
940            f"  first/20th normalized HSV: 1.000e+00 / {hsv[min(19, hsv.size - 1)] / max(hsv[0], 1e-300):.3e}"
941        )
942        print(f"  tangential Schur scale: {float(diag['scale']):.6g}")
943        print(f"  scaled Pick min eigenvalue: {float(diag['min_pick_eigenvalue']):.3e}")
944        print(
945            "  best finite block-Hankel order: "
946            f"{int(best['order'])}, relative H2 error={float(best['relative_h2_error']):.3e}, "
947            f"relative Hankel-norm error={float(best['relative_hankel_norm_error']):.3e}, "
948            f"tangential sample error={float(best['tangential_sample_error']):.3e}"
949        )
950        print(
951            "  reducer timing at best order: "
952            f"reduction={float(best['reduction_seconds']):.3f}s, "
953            f"Markov expansion={float(best['expansion_seconds']):.3f}s, "
954            f"total={float(best['total_seconds']):.3f}s"
955        )
956        print(f"  backend at best order: {best['backend']}")
957        if case.condition_hint is not None:
958            print(f"  modal dynamic-range condition hint: {case.condition_hint:.3e}")
959
960    write_summary_csv(out_dir / "mimo_reduction_stress_summary.csv", all_rows)
961    write_case_metadata(out_dir / "mimo_reduction_stress_case_metadata.csv", cases, diagnostics)
962    plot_error_curves(out_dir, all_rows)
963    plot_hankel_singular_values(out_dir, hsv_by_case)
964    plot_pick_eigenvalues(out_dir, diagnostics)
965    plot_timing_curves(out_dir, all_rows)
966    plot_first_markov_blocks(out_dir, cases)
967
968
969if __name__ == "__main__":
970    main()