Capon/MVDR spectral estimation

Tutorial goal

Use an inverse-covariance Capon spectrum to resolve nearby tones.

Note

New to the terminology? See the lattice DSP concept map and the causality/data-use guide for how online, offline, block, and MIMO examples should be read.

Context

Capon/MVDR spectra are useful as a high-resolution diagnostic. They are not a replacement for every periodogram or AR model, but they are visually helpful when nearby narrowband components are hard to separate.

Key idea and equations

For steering vector a(ω) and loaded covariance matrix R, the Capon spectrum is

\[\hat S_{\mathrm{Capon}}(\omega)=\frac{1}{a(\omega)^H R^{-1} a(\omega)}.\]

How to read the result

The main figure compares periodogram, AR, and Capon curves. The covariance-eigenvalue plot helps diagnose whether the covariance estimate is well conditioned.

Run command

python examples/capon_spectrum_demo.py

Run status

Return code: 0

Captured stdout

true tone frequencies: [0.21, 0.236]
Capon aperture: 32
AR model order: 20
periodogram peak estimates: [0.21, 0.2361]
Capon peak estimates: [0.2102, 0.2129]
AR peak estimates: [0.209, 0.2373]

Figures

capon covariance eigenvalues

capon_covariance_eigenvalues.png

capon spectrum demo

capon_spectrum_demo.png

Generated data files

Source code

  1"""Tutorial: Capon/MVDR spectral estimation on two close tones.
  2
  3Capon spectral estimation uses an inverse covariance matrix to minimize output
  4power subject to unit response at the frequency being tested.  This makes it a
  5useful high-resolution diagnostic when a periodogram smears together nearby
  6sinusoids.
  7"""
  8
  9from __future__ import annotations
 10
 11import csv
 12import os
 13from pathlib import Path
 14
 15import numpy as np
 16
 17from lattice_dsp import autocorrelation, levinson_durbin_denominator
 18
 19
 20def artifact_dir() -> Path:
 21    path = Path(os.environ.get("LATTICE_DSP_ARTIFACT_DIR", "reports/example-artifacts"))
 22    path.mkdir(parents=True, exist_ok=True)
 23    return path
 24
 25
 26def db_normalized(power: np.ndarray) -> np.ndarray:
 27    power = np.maximum(np.asarray(power, dtype=float), 1e-18)
 28    return 10.0 * np.log10(power / np.max(power))
 29
 30
 31def periodogram(x: np.ndarray, n_fft: int) -> tuple[np.ndarray, np.ndarray]:
 32    window = np.hanning(x.size)
 33    spectrum = np.fft.rfft(window * x, n=n_fft)
 34    power = np.abs(spectrum) ** 2 / max(np.sum(window**2), 1e-12)
 35    return np.fft.rfftfreq(n_fft, d=1.0), power
 36
 37
 38def covariance_from_windows(x: np.ndarray, aperture: int) -> np.ndarray:
 39    windows = np.lib.stride_tricks.sliding_window_view(x, aperture)
 40    # One row per window.  Complex dtype keeps the formula general.
 41    X = windows.astype(complex)
 42    R = (X.conj().T @ X) / X.shape[0]
 43    loading = 1e-3 * float(np.trace(R).real) / aperture
 44    return R + loading * np.eye(aperture)
 45
 46
 47def capon_spectrum(x: np.ndarray, aperture: int, freq: np.ndarray) -> np.ndarray:
 48    R = covariance_from_windows(x, aperture)
 49    Rinv = np.linalg.pinv(R)
 50    n = np.arange(aperture)
 51    power = np.empty_like(freq, dtype=float)
 52    for i, f in enumerate(freq):
 53        steering = np.exp(-2j * np.pi * f * n)
 54        denom = np.vdot(steering, Rinv @ steering).real
 55        power[i] = 1.0 / max(denom, 1e-18)
 56    return power
 57
 58
 59def ar_spectrum(denominator: np.ndarray, freq: np.ndarray) -> np.ndarray:
 60    z = np.exp(-2j * np.pi * freq)
 61    a = np.zeros_like(z, dtype=complex)
 62    for i, coef in enumerate(denominator):
 63        a += float(coef) * z**i
 64    return 1.0 / np.maximum(np.abs(a) ** 2, 1e-18)
 65
 66
 67def top_peaks(
 68    freq: np.ndarray, db: np.ndarray, count: int = 2, guard_bins: int = 10
 69) -> list[float]:
 70    candidates = db.copy()
 71    peaks: list[float] = []
 72    for _ in range(count):
 73        idx = int(np.argmax(candidates))
 74        peaks.append(float(freq[idx]))
 75        lo = max(0, idx - guard_bins)
 76        hi = min(candidates.size, idx + guard_bins + 1)
 77        candidates[lo:hi] = -np.inf
 78    return peaks
 79
 80
 81def write_csv(path: Path, freq: np.ndarray, columns: dict[str, np.ndarray]) -> None:
 82    with path.open("w", newline="", encoding="utf-8") as f:
 83        writer = csv.writer(f)
 84        writer.writerow(["frequency_cycles_per_sample", *columns])
 85        for i, value in enumerate(freq):
 86            writer.writerow([value, *(columns[name][i] for name in columns)])
 87
 88
 89def main() -> None:
 90    rng = np.random.default_rng(314)
 91    samples = 384
 92    n = np.arange(samples)
 93    tones = [0.210, 0.236]
 94    x = (
 95        np.sin(2.0 * np.pi * tones[0] * n)
 96        + 0.9 * np.sin(2.0 * np.pi * tones[1] * n + 0.8)
 97        + 0.55 * rng.normal(size=samples)
 98    )
 99    x -= np.mean(x)
100
101    n_fft = 4096
102    freq, p_periodogram = periodogram(x, n_fft)
103    aperture = 32
104    p_capon = capon_spectrum(x, aperture, freq)
105
106    order = 20
107    r = autocorrelation(x, order)
108    den = np.asarray(levinson_durbin_denominator(r, order), dtype=float)
109    p_ar = ar_spectrum(den, freq)
110
111    y_periodogram = db_normalized(p_periodogram)
112    y_capon = db_normalized(p_capon)
113    y_ar = db_normalized(p_ar)
114
115    out_dir = artifact_dir()
116    csv_path = out_dir / "capon_spectrum_demo.csv"
117    write_csv(
118        csv_path,
119        freq,
120        {
121            "periodogram_db": y_periodogram,
122            "capon_db": y_capon,
123            "ar_db": y_ar,
124        },
125    )
126
127    print("true tone frequencies:", tones)
128    print("Capon aperture:", aperture)
129    print("AR model order:", order)
130    print("periodogram peak estimates:", [round(v, 4) for v in top_peaks(freq, y_periodogram)])
131    print("Capon peak estimates:", [round(v, 4) for v in top_peaks(freq, y_capon)])
132    print("AR peak estimates:", [round(v, 4) for v in top_peaks(freq, y_ar)])
133    print(f"wrote {csv_path}")
134
135    try:
136        import matplotlib.pyplot as plt
137    except Exception:
138        print("matplotlib is not installed; skipped figures")
139        return
140
141    fig, ax = plt.subplots(figsize=(9, 4.8))
142    ax.plot(freq, y_periodogram, label="periodogram")
143    ax.plot(freq, y_ar, label="AR spectrum")
144    ax.plot(freq, y_capon, label="Capon/MVDR")
145    for tone in tones:
146        ax.axvline(tone, linestyle=":", linewidth=1.0)
147    ax.set_xlim(0.17, 0.27)
148    ax.set_ylim(-50, 4)
149    ax.set_title("Capon spectrum separates nearby tones")
150    ax.set_xlabel("frequency (cycles/sample)")
151    ax.set_ylabel("normalized power (dB)")
152    ax.legend()
153    fig.tight_layout()
154    fig_path = out_dir / "capon_spectrum_demo.png"
155    fig.savefig(fig_path, dpi=160)
156    print(f"wrote {fig_path}")
157
158    eigvals = np.linalg.eigvalsh(covariance_from_windows(x, aperture))
159    fig2, ax2 = plt.subplots(figsize=(7, 3.6))
160    ax2.semilogy(np.arange(1, eigvals.size + 1), np.sort(eigvals)[::-1], marker="o")
161    ax2.set_title("Loaded covariance eigenvalues used by Capon")
162    ax2.set_xlabel("eigenvalue index")
163    ax2.set_ylabel("eigenvalue")
164    fig2.tight_layout()
165    fig2_path = out_dir / "capon_covariance_eigenvalues.png"
166    fig2.savefig(fig2_path, dpi=160)
167    print(f"wrote {fig2_path}")
168
169
170if __name__ == "__main__":
171    main()