Finite-section AAK/Nehari reduction of a stable IIR filter¶
Tutorial goal
Apply the finite AAK/Nehari tail reducer to a full stable SISO IIR filter and compare the selected reduced model.
Note
New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.
Context¶
The previous finite AAK/Nehari pages work with abstract tail sequences. This tutorial closes the loop for DSP users: start with a stable higher-order lattice/IIR filter, compute its impulse response, select a reduced rational model, convert the selected denominator back to reflection coefficients, and run the reduced filter on real signals.
The point is practical rather than absolute optimality. The current implementation is a finite-section SISO reduction candidate with Hankel, Schmidt-pair, and rational diagnostics; it is not a full infinite-dimensional AAK/Nehari solver.
Key idea and equations¶
A stable full model has transfer function
The finite AAK/Nehari workflow uses the impulse response h_n to build a finite Hankel
matrix, selects a rational order r, and returns a reduced model
whose denominator is converted back to reflection coefficients when stable.
How to read the result¶
Look for the selected rank, impulse and magnitude-response error, pole radius below one, and filtering speedup on a random batch.
Run command¶
python examples/finite_aak_iir_reduction_demo.py
Source code¶
1"""Tutorial: finite-section AAK/Nehari reduction of a stable SISO IIR filter.
2
3The previous finite AAK/Nehari tutorials work directly with an abstract tail
4sequence. This tutorial closes the loop for DSP users: start with a stable
5higher-order lattice/IIR filter, compute its impulse response, select a reduced
6rational model with ``finite_aak_reduce_iir``, and compare impulse response,
7frequency response, and filtering speed.
8
9This is still a finite-section reduction candidate, not a full
10infinite-dimensional AAK/Nehari solver. Its purpose is practical: demonstrate
11how the current finite Hankel/Schmidt-pair/rational workflow can produce a
12stable lower-order IIR model that is cheaper to run.
13"""
14
15from __future__ import annotations
16
17import csv
18import math
19import os
20import statistics
21import time
22from pathlib import Path
23from collections.abc import Callable
24
25import numpy as np
26
27import lattice_dsp as ld
28
29
30def artifact_dir() -> Path:
31 path = Path(os.environ.get("LATTICE_DSP_ARTIFACT_DIR", "reports/example-artifacts"))
32 path.mkdir(parents=True, exist_ok=True)
33 return path
34
35
36def impulse_from_poles(poles: np.ndarray, weights: np.ndarray, n_terms: int) -> np.ndarray:
37 n = np.arange(n_terms, dtype=float)
38 return np.sum(weights[:, None] * poles[:, None] ** n[None, :], axis=0)
39
40
41def numerator_from_impulse_and_denominator(
42 impulse: np.ndarray, denominator: np.ndarray
43) -> np.ndarray:
44 order = denominator.size - 1
45 numerator = np.zeros(order + 1, dtype=float)
46 for i in range(order + 1):
47 numerator[i] = sum(float(denominator[j]) * float(impulse[i - j]) for j in range(i + 1))
48 return numerator
49
50
51def synthetic_high_order_iir() -> dict[str, np.ndarray]:
52 """Return a stable full model whose impulse response has compressible modes."""
53
54 poles = np.array([0.91, 0.72, -0.55, 0.38, -0.25, 0.14, -0.08, 0.03], dtype=float)
55 weights = np.array([1.0, 0.25, -0.18, 0.07, -0.035, 0.015, -0.007, 0.003], dtype=float)
56 denominator = np.asarray(np.poly(poles), dtype=float)
57 impulse = impulse_from_poles(poles, weights, 512)
58 numerator = numerator_from_impulse_and_denominator(impulse, denominator)
59 reflection = np.asarray(ld.denominator_to_reflection(denominator.tolist()), dtype=float)
60 return {
61 "poles": poles,
62 "weights": weights,
63 "denominator": denominator,
64 "numerator": numerator,
65 "reflection": reflection,
66 "impulse": impulse,
67 }
68
69
70def frequency_response(
71 denominator: np.ndarray, numerator: np.ndarray, n_freq: int = 512
72) -> tuple[np.ndarray, np.ndarray]:
73 w = np.linspace(0.0, math.pi, n_freq)
74 z = np.exp(-1j * w)
75 num = np.zeros_like(z, dtype=np.complex128)
76 den = np.zeros_like(z, dtype=np.complex128)
77 for k, coeff in enumerate(numerator):
78 num += coeff * z**k
79 for k, coeff in enumerate(denominator):
80 den += coeff * z**k
81 return w, num / den
82
83
84def median_time(fn: Callable[[], np.ndarray], repeats: int) -> tuple[float, np.ndarray]:
85 times: list[float] = []
86 result: np.ndarray | None = None
87 for _ in range(repeats):
88 t0 = time.perf_counter()
89 result = fn()
90 times.append(time.perf_counter() - t0)
91 assert result is not None
92 return statistics.median(times), result
93
94
95def process(reflection: np.ndarray, numerator: np.ndarray, x: np.ndarray) -> np.ndarray:
96 return np.asarray(ld.process_batch(reflection.tolist(), numerator.tolist(), x), dtype=float)
97
98
99def snr_db(reference: np.ndarray, estimate: np.ndarray) -> float:
100 error = reference - estimate
101 return 10.0 * math.log10(
102 (float(np.mean(reference * reference)) + 1e-30) / (float(np.mean(error * error)) + 1e-30)
103 )
104
105
106def write_csv(path: Path, rows: list[dict[str, object]]) -> None:
107 with path.open("w", newline="", encoding="utf-8") as f:
108 writer = csv.DictWriter(f, fieldnames=list(rows[0]))
109 writer.writeheader()
110 writer.writerows(rows)
111
112
113def main() -> None:
114 out_dir = artifact_dir()
115 model = synthetic_high_order_iir()
116 rows = cols = 96
117 n_impulse = 192
118 ranks = [2, 3, 4, 5, 6, 8]
119 criteria = ld.FiniteNehariCandidateCriteria(
120 max_tail_error=1.0e-3,
121 max_rational_error=5.0e-3,
122 max_pole_radius=0.99,
123 )
124
125 reduction = ld.finite_aak_reduce_iir(
126 model["reflection"],
127 model["numerator"],
128 ranks=ranks,
129 n_impulse=n_impulse,
130 rows=rows,
131 cols=cols,
132 criteria=criteria,
133 attach_certificate=True,
134 )
135
136 selected = reduction["selected"]
137 reduced_reflection = np.asarray(reduction["reduced_reflection"], dtype=float)
138 reduced_numerator = np.asarray(reduction["reduced_numerator"], dtype=float)
139 reduced_denominator = np.asarray(reduction["reduced_denominator"], dtype=float)
140
141 rng = np.random.default_rng(123)
142 channels = 32
143 samples = 30000
144 x = rng.normal(size=(channels, samples)).astype(np.float64)
145 full_time, y_full = median_time(
146 lambda: process(model["reflection"], model["numerator"], x), repeats=3
147 )
148 reduced_time, y_reduced = median_time(
149 lambda: process(reduced_reflection, reduced_numerator, x), repeats=3
150 )
151 filter_speedup = full_time / reduced_time
152 snr = snr_db(y_full, y_reduced)
153 rel_mse = float(np.mean((y_full - y_reduced) ** 2) / (np.mean(y_full**2) + 1e-30))
154
155 w, h_full = frequency_response(model["denominator"], model["numerator"])
156 _, h_reduced = frequency_response(reduced_denominator, reduced_numerator)
157 full_db = 20.0 * np.log10(np.maximum(np.abs(h_full), 1e-14))
158 reduced_db = 20.0 * np.log10(np.maximum(np.abs(h_reduced), 1e-14))
159 max_mag_error_db = float(np.max(np.abs(full_db - reduced_db)))
160
161 summary_rows = []
162 for row in reduction["candidates"]:
163 summary_rows.append(
164 {
165 "rank": row["rank"],
166 "sigma_next": row["sigma_next"],
167 "hankelized_tail_error": row["hankelized_tail_error"],
168 "rational_error": row["rational_error"],
169 "max_pole_radius": row["max_pole_radius"],
170 "accepted": row["accepted"],
171 }
172 )
173 summary_rows.append(
174 {
175 "rank": "selected",
176 "sigma_next": selected["sigma_next"],
177 "hankelized_tail_error": reduction["relative_impulse_error"],
178 "rational_error": selected["rational_error"],
179 "max_pole_radius": selected["max_pole_radius"],
180 "accepted": reduction["accepted"],
181 }
182 )
183 csv_path = out_dir / "finite_aak_iir_reduction_summary.csv"
184 write_csv(csv_path, summary_rows)
185
186 print("full IIR order:", model["reflection"].size)
187 print("finite Hankel matrix:", f"{rows} x {cols}")
188 print("candidate ranks:", ranks)
189 print("selected rank:", reduction["selected_rank"])
190 print("selected accepted:", reduction["accepted"])
191 print("selected pole radius:", f"{selected['max_pole_radius']:.4f}")
192 print("relative impulse error:", f"{reduction['relative_impulse_error']:.3e}")
193 print("batch output SNR:", f"{snr:.2f} dB")
194 print("batch output rel MSE:", f"{rel_mse:.3e}")
195 print("max magnitude error:", f"{max_mag_error_db:.3f} dB")
196 print("full filter median time:", f"{full_time:.4f} s")
197 print("reduced filter median time:", f"{reduced_time:.4f} s")
198 print("filter speedup:", f"{filter_speedup:.2f}x")
199 print(f"wrote {csv_path}")
200
201 try:
202 import matplotlib.pyplot as plt
203 except Exception:
204 print("matplotlib is not installed; skipped figures")
205 return
206
207 singular_values = np.asarray(selected["hankel_singular_values"], dtype=float)
208 fig, ax = plt.subplots(figsize=(8.8, 4.6))
209 ax.semilogy(np.arange(1, 21), singular_values[:20], marker="o")
210 ax.axvline(
211 reduction["selected_rank"] + 1, linestyle="--", linewidth=1.0, label="first neglected index"
212 )
213 ax.set_title("Hankel singular values of the full IIR impulse response")
214 ax.set_xlabel("index")
215 ax.set_ylabel("singular value")
216 ax.grid(True, alpha=0.3)
217 ax.legend()
218 fig.tight_layout()
219 path = out_dir / "finite_aak_iir_singular_values.png"
220 fig.savefig(path, dpi=160)
221 print(f"wrote {path}")
222
223 fig2, ax2 = plt.subplots(figsize=(9.2, 5.0))
224 n = np.arange(80)
225 ax2.plot(n, reduction["full_impulse_response"][:80], label="full impulse", linewidth=2.0)
226 ax2.plot(n, reduction["reduced_impulse_response"][:80], "--", label="reduced impulse")
227 ax2.set_title("Full and selected finite AAK/Nehari IIR impulse responses")
228 ax2.set_xlabel("sample")
229 ax2.set_ylabel("amplitude")
230 ax2.grid(True, alpha=0.3)
231 ax2.legend()
232 fig2.tight_layout()
233 path2 = out_dir / "finite_aak_iir_impulse_response.png"
234 fig2.savefig(path2, dpi=160)
235 print(f"wrote {path2}")
236
237 fig3, ax3 = plt.subplots(figsize=(9.2, 5.0))
238 ax3.plot(w / math.pi, full_db, label="full IIR", linewidth=2.0)
239 ax3.plot(w / math.pi, reduced_db, "--", label="selected reduced IIR")
240 ax3.set_title("Magnitude response after finite AAK/Nehari IIR reduction")
241 ax3.set_xlabel("normalized frequency ×π rad/sample")
242 ax3.set_ylabel("magnitude (dB)")
243 ax3.grid(True, alpha=0.3)
244 ax3.legend()
245 fig3.tight_layout()
246 path3 = out_dir / "finite_aak_iir_magnitude_response.png"
247 fig3.savefig(path3, dpi=160)
248 print(f"wrote {path3}")
249
250 fig4, ax4 = plt.subplots(figsize=(5.8, 5.8))
251 circle = plt.Circle((0.0, 0.0), 1.0, fill=False, linestyle="--")
252 ax4.add_artist(circle)
253 ax4.scatter(np.real(model["poles"]), np.imag(model["poles"]), label="full poles")
254 ax4.scatter(
255 np.real(selected["poles"]), np.imag(selected["poles"]), marker="x", label="reduced poles"
256 )
257 ax4.set_aspect("equal", adjustable="box")
258 ax4.set_xlim(-1.05, 1.05)
259 ax4.set_ylim(-1.05, 1.05)
260 ax4.set_xlabel("real")
261 ax4.set_ylabel("imaginary")
262 ax4.set_title("Stable poles before and after reduction")
263 ax4.grid(True, alpha=0.3)
264 ax4.legend()
265 fig4.tight_layout()
266 path4 = out_dir / "finite_aak_iir_poles.png"
267 fig4.savefig(path4, dpi=160)
268 print(f"wrote {path4}")
269
270
271if __name__ == "__main__":
272 main()