Finite-section AAK/Nehari reduction of a stable IIR filter

Tutorial goal

Apply the finite AAK/Nehari tail reducer to a full stable SISO IIR filter and compare the selected reduced model.

Note

New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.

Context

The previous finite AAK/Nehari pages work with abstract tail sequences. This tutorial closes the loop for DSP users: start with a stable higher-order lattice/IIR filter, compute its impulse response, select a reduced rational model, convert the selected denominator back to reflection coefficients, and run the reduced filter on real signals.

The point is practical rather than absolute optimality. The current implementation is a finite-section SISO reduction candidate with Hankel, Schmidt-pair, and rational diagnostics; it is not a full infinite-dimensional AAK/Nehari solver.

Key idea and equations

A stable full model has transfer function

\[H(z)=\frac{B(z)}{A(z)},\qquad |\lambda_i(A)|<1.\]

The finite AAK/Nehari workflow uses the impulse response h_n to build a finite Hankel matrix, selects a rational order r, and returns a reduced model

\[H_r(z)=\frac{B_r(z)}{A_r(z)},\]

whose denominator is converted back to reflection coefficients when stable.

How to read the result

Look for the selected rank, impulse and magnitude-response error, pole radius below one, and filtering speedup on a random batch.

Run command

python examples/finite_aak_iir_reduction_demo.py

Run status

Return code: 0

Captured stdout

full IIR order: 8
finite Hankel matrix: 96 x 96
candidate ranks: [2, 3, 4, 5, 6, 8]
selected rank: 3
selected accepted: True
selected pole radius: 0.9074
relative impulse error: 3.863e-03
batch output SNR: 48.27 dB
batch output rel MSE: 1.488e-05
max magnitude error: 0.039 dB
full filter median time: 0.0416 s
reduced filter median time: 0.0172 s
filter speedup: 2.42x

Figures

finite aak iir impulse response

finite_aak_iir_impulse_response.png

finite aak iir magnitude response

finite_aak_iir_magnitude_response.png

finite aak iir poles

finite_aak_iir_poles.png

finite aak iir singular values

finite_aak_iir_singular_values.png

Generated data files

Source code

  1"""Tutorial: finite-section AAK/Nehari reduction of a stable SISO IIR filter.
  2
  3The previous finite AAK/Nehari tutorials work directly with an abstract tail
  4sequence.  This tutorial closes the loop for DSP users: start with a stable
  5higher-order lattice/IIR filter, compute its impulse response, select a reduced
  6rational model with ``finite_aak_reduce_iir``, and compare impulse response,
  7frequency response, and filtering speed.
  8
  9This is still a finite-section reduction candidate, not a full
 10infinite-dimensional AAK/Nehari solver.  Its purpose is practical: demonstrate
 11how the current finite Hankel/Schmidt-pair/rational workflow can produce a
 12stable lower-order IIR model that is cheaper to run.
 13"""
 14
 15from __future__ import annotations
 16
 17import csv
 18import math
 19import os
 20import statistics
 21import time
 22from pathlib import Path
 23from collections.abc import Callable
 24
 25import numpy as np
 26
 27import lattice_dsp as ld
 28
 29
 30def artifact_dir() -> Path:
 31    path = Path(os.environ.get("LATTICE_DSP_ARTIFACT_DIR", "reports/example-artifacts"))
 32    path.mkdir(parents=True, exist_ok=True)
 33    return path
 34
 35
 36def impulse_from_poles(poles: np.ndarray, weights: np.ndarray, n_terms: int) -> np.ndarray:
 37    n = np.arange(n_terms, dtype=float)
 38    return np.sum(weights[:, None] * poles[:, None] ** n[None, :], axis=0)
 39
 40
 41def numerator_from_impulse_and_denominator(
 42    impulse: np.ndarray, denominator: np.ndarray
 43) -> np.ndarray:
 44    order = denominator.size - 1
 45    numerator = np.zeros(order + 1, dtype=float)
 46    for i in range(order + 1):
 47        numerator[i] = sum(float(denominator[j]) * float(impulse[i - j]) for j in range(i + 1))
 48    return numerator
 49
 50
 51def synthetic_high_order_iir() -> dict[str, np.ndarray]:
 52    """Return a stable full model whose impulse response has compressible modes."""
 53
 54    poles = np.array([0.91, 0.72, -0.55, 0.38, -0.25, 0.14, -0.08, 0.03], dtype=float)
 55    weights = np.array([1.0, 0.25, -0.18, 0.07, -0.035, 0.015, -0.007, 0.003], dtype=float)
 56    denominator = np.asarray(np.poly(poles), dtype=float)
 57    impulse = impulse_from_poles(poles, weights, 512)
 58    numerator = numerator_from_impulse_and_denominator(impulse, denominator)
 59    reflection = np.asarray(ld.denominator_to_reflection(denominator.tolist()), dtype=float)
 60    return {
 61        "poles": poles,
 62        "weights": weights,
 63        "denominator": denominator,
 64        "numerator": numerator,
 65        "reflection": reflection,
 66        "impulse": impulse,
 67    }
 68
 69
 70def frequency_response(
 71    denominator: np.ndarray, numerator: np.ndarray, n_freq: int = 512
 72) -> tuple[np.ndarray, np.ndarray]:
 73    w = np.linspace(0.0, math.pi, n_freq)
 74    z = np.exp(-1j * w)
 75    num = np.zeros_like(z, dtype=np.complex128)
 76    den = np.zeros_like(z, dtype=np.complex128)
 77    for k, coeff in enumerate(numerator):
 78        num += coeff * z**k
 79    for k, coeff in enumerate(denominator):
 80        den += coeff * z**k
 81    return w, num / den
 82
 83
 84def median_time(fn: Callable[[], np.ndarray], repeats: int) -> tuple[float, np.ndarray]:
 85    times: list[float] = []
 86    result: np.ndarray | None = None
 87    for _ in range(repeats):
 88        t0 = time.perf_counter()
 89        result = fn()
 90        times.append(time.perf_counter() - t0)
 91    assert result is not None
 92    return statistics.median(times), result
 93
 94
 95def process(reflection: np.ndarray, numerator: np.ndarray, x: np.ndarray) -> np.ndarray:
 96    return np.asarray(ld.process_batch(reflection.tolist(), numerator.tolist(), x), dtype=float)
 97
 98
 99def snr_db(reference: np.ndarray, estimate: np.ndarray) -> float:
100    error = reference - estimate
101    return 10.0 * math.log10(
102        (float(np.mean(reference * reference)) + 1e-30) / (float(np.mean(error * error)) + 1e-30)
103    )
104
105
106def write_csv(path: Path, rows: list[dict[str, object]]) -> None:
107    with path.open("w", newline="", encoding="utf-8") as f:
108        writer = csv.DictWriter(f, fieldnames=list(rows[0]))
109        writer.writeheader()
110        writer.writerows(rows)
111
112
113def main() -> None:
114    out_dir = artifact_dir()
115    model = synthetic_high_order_iir()
116    rows = cols = 96
117    n_impulse = 192
118    ranks = [2, 3, 4, 5, 6, 8]
119    criteria = ld.FiniteNehariCandidateCriteria(
120        max_tail_error=1.0e-3,
121        max_rational_error=5.0e-3,
122        max_pole_radius=0.99,
123    )
124
125    reduction = ld.finite_aak_reduce_iir(
126        model["reflection"],
127        model["numerator"],
128        ranks=ranks,
129        n_impulse=n_impulse,
130        rows=rows,
131        cols=cols,
132        criteria=criteria,
133        attach_certificate=True,
134    )
135
136    selected = reduction["selected"]
137    reduced_reflection = np.asarray(reduction["reduced_reflection"], dtype=float)
138    reduced_numerator = np.asarray(reduction["reduced_numerator"], dtype=float)
139    reduced_denominator = np.asarray(reduction["reduced_denominator"], dtype=float)
140
141    rng = np.random.default_rng(123)
142    channels = 32
143    samples = 30000
144    x = rng.normal(size=(channels, samples)).astype(np.float64)
145    full_time, y_full = median_time(
146        lambda: process(model["reflection"], model["numerator"], x), repeats=3
147    )
148    reduced_time, y_reduced = median_time(
149        lambda: process(reduced_reflection, reduced_numerator, x), repeats=3
150    )
151    filter_speedup = full_time / reduced_time
152    snr = snr_db(y_full, y_reduced)
153    rel_mse = float(np.mean((y_full - y_reduced) ** 2) / (np.mean(y_full**2) + 1e-30))
154
155    w, h_full = frequency_response(model["denominator"], model["numerator"])
156    _, h_reduced = frequency_response(reduced_denominator, reduced_numerator)
157    full_db = 20.0 * np.log10(np.maximum(np.abs(h_full), 1e-14))
158    reduced_db = 20.0 * np.log10(np.maximum(np.abs(h_reduced), 1e-14))
159    max_mag_error_db = float(np.max(np.abs(full_db - reduced_db)))
160
161    summary_rows = []
162    for row in reduction["candidates"]:
163        summary_rows.append(
164            {
165                "rank": row["rank"],
166                "sigma_next": row["sigma_next"],
167                "hankelized_tail_error": row["hankelized_tail_error"],
168                "rational_error": row["rational_error"],
169                "max_pole_radius": row["max_pole_radius"],
170                "accepted": row["accepted"],
171            }
172        )
173    summary_rows.append(
174        {
175            "rank": "selected",
176            "sigma_next": selected["sigma_next"],
177            "hankelized_tail_error": reduction["relative_impulse_error"],
178            "rational_error": selected["rational_error"],
179            "max_pole_radius": selected["max_pole_radius"],
180            "accepted": reduction["accepted"],
181        }
182    )
183    csv_path = out_dir / "finite_aak_iir_reduction_summary.csv"
184    write_csv(csv_path, summary_rows)
185
186    print("full IIR order:", model["reflection"].size)
187    print("finite Hankel matrix:", f"{rows} x {cols}")
188    print("candidate ranks:", ranks)
189    print("selected rank:", reduction["selected_rank"])
190    print("selected accepted:", reduction["accepted"])
191    print("selected pole radius:", f"{selected['max_pole_radius']:.4f}")
192    print("relative impulse error:", f"{reduction['relative_impulse_error']:.3e}")
193    print("batch output SNR:", f"{snr:.2f} dB")
194    print("batch output rel MSE:", f"{rel_mse:.3e}")
195    print("max magnitude error:", f"{max_mag_error_db:.3f} dB")
196    print("full filter median time:", f"{full_time:.4f} s")
197    print("reduced filter median time:", f"{reduced_time:.4f} s")
198    print("filter speedup:", f"{filter_speedup:.2f}x")
199    print(f"wrote {csv_path}")
200
201    try:
202        import matplotlib.pyplot as plt
203    except Exception:
204        print("matplotlib is not installed; skipped figures")
205        return
206
207    singular_values = np.asarray(selected["hankel_singular_values"], dtype=float)
208    fig, ax = plt.subplots(figsize=(8.8, 4.6))
209    ax.semilogy(np.arange(1, 21), singular_values[:20], marker="o")
210    ax.axvline(
211        reduction["selected_rank"] + 1, linestyle="--", linewidth=1.0, label="first neglected index"
212    )
213    ax.set_title("Hankel singular values of the full IIR impulse response")
214    ax.set_xlabel("index")
215    ax.set_ylabel("singular value")
216    ax.grid(True, alpha=0.3)
217    ax.legend()
218    fig.tight_layout()
219    path = out_dir / "finite_aak_iir_singular_values.png"
220    fig.savefig(path, dpi=160)
221    print(f"wrote {path}")
222
223    fig2, ax2 = plt.subplots(figsize=(9.2, 5.0))
224    n = np.arange(80)
225    ax2.plot(n, reduction["full_impulse_response"][:80], label="full impulse", linewidth=2.0)
226    ax2.plot(n, reduction["reduced_impulse_response"][:80], "--", label="reduced impulse")
227    ax2.set_title("Full and selected finite AAK/Nehari IIR impulse responses")
228    ax2.set_xlabel("sample")
229    ax2.set_ylabel("amplitude")
230    ax2.grid(True, alpha=0.3)
231    ax2.legend()
232    fig2.tight_layout()
233    path2 = out_dir / "finite_aak_iir_impulse_response.png"
234    fig2.savefig(path2, dpi=160)
235    print(f"wrote {path2}")
236
237    fig3, ax3 = plt.subplots(figsize=(9.2, 5.0))
238    ax3.plot(w / math.pi, full_db, label="full IIR", linewidth=2.0)
239    ax3.plot(w / math.pi, reduced_db, "--", label="selected reduced IIR")
240    ax3.set_title("Magnitude response after finite AAK/Nehari IIR reduction")
241    ax3.set_xlabel("normalized frequency ×π rad/sample")
242    ax3.set_ylabel("magnitude (dB)")
243    ax3.grid(True, alpha=0.3)
244    ax3.legend()
245    fig3.tight_layout()
246    path3 = out_dir / "finite_aak_iir_magnitude_response.png"
247    fig3.savefig(path3, dpi=160)
248    print(f"wrote {path3}")
249
250    fig4, ax4 = plt.subplots(figsize=(5.8, 5.8))
251    circle = plt.Circle((0.0, 0.0), 1.0, fill=False, linestyle="--")
252    ax4.add_artist(circle)
253    ax4.scatter(np.real(model["poles"]), np.imag(model["poles"]), label="full poles")
254    ax4.scatter(
255        np.real(selected["poles"]), np.imag(selected["poles"]), marker="x", label="reduced poles"
256    )
257    ax4.set_aspect("equal", adjustable="box")
258    ax4.set_xlim(-1.05, 1.05)
259    ax4.set_ylim(-1.05, 1.05)
260    ax4.set_xlabel("real")
261    ax4.set_ylabel("imaginary")
262    ax4.set_title("Stable poles before and after reduction")
263    ax4.grid(True, alpha=0.3)
264    ax4.legend()
265    fig4.tight_layout()
266    path4 = out_dir / "finite_aak_iir_poles.png"
267    fig4.savefig(path4, dpi=160)
268    print(f"wrote {path4}")
269
270
271if __name__ == "__main__":
272    main()