Single Baseline pI FRF SNR¶

by Josh Dillon, last updated March 16, 2026

This notebook takes corner-turned, calibrated, redundantly-averaged visibility data, forms pseudo-Stokes pI, and computes delay-filtered, fringe-rate-filtered SNR waterfalls. The results are written out as uvh5 files to be combined across baselines to look for residual structure that fringes like the main beam.

Here's a set of links to skip to particular figures and tables:

• Figure 1: Delay-Filtered pI SNR in Fringe Rate Space¶

• Figure 2: Delay-Filtered pI SNR Waterfall¶

• Figure 3: Delay-Filtered pI SNR Histogram¶

• Figure 4: Delay+FR Filtered pI SNR Waterfall¶

• Figure 5: Delay+FR Filtered pI SNR Histogram¶

In [1]:
import time
tstart = time.time()
!hostname
!date
herapost013
Mon Mar 30 02:12:20 MDT 2026
In [2]:
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin
import numpy as np
import yaml
import copy
import re
import glob
import matplotlib
from astropy import units
from scipy import interpolate
from scipy.signal.windows import blackmanharris
from pyuvdata import UVFlag
from hera_cal import io, utils, flag_utils, red_groups
from hera_cal.frf import sky_frates_single, get_FR_buffer_from_spectra, get_m2f_mixer
from hera_filters.dspec import dpss_operator, fourier_filter
import hera_pspec as hp
import uvtools
import matplotlib.pyplot as plt
from IPython.display import display
%matplotlib inline
In [3]:
RED_AVG_FILE = os.environ.get("RED_AVG_FILE", None)
# RED_AVG_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459935/zen.2459935.25792.sum.smooth_calibrated.red_avg.uvh5' # 7_61

CORNER_TURN_MAP_YAML = os.environ.get("CORNER_TURN_MAP_YAML", 
                                      os.path.join(os.path.dirname(RED_AVG_FILE), "single_baseline_files/corner_turn_map.yaml"))
FRF_SNR_SUFFIX = os.environ.get("FRF_SNR_SUFFIX", ".pI_FRF_SNR.uvh5")
SAVE_DLY_SNR = os.environ.get("SAVE_DLY_SNR", "TRUE").upper() == "TRUE"
DLY_SNR_SUFFIX = os.environ.get("DLY_SNR_SUFFIX", ".pI_DLYFILT_SNR.uvh5")
SAVE_FRF_SNR = os.environ.get("SAVE_FRF_SNR", "TRUE").upper() == "TRUE"

FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5))  # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0))  # in MHz

FILTER_DELAY = float(os.environ.get("FILTER_DELAY", 750))  # in ns
EIGENVAL_CUTOFF = float(os.environ.get("EIGENVAL_CUTOFF", 1e-12))

FR_SPECTRA_FILE = os.environ.get("FR_SPECTRA_FILE", 
                                 "/lustre/aoc/projects/hera/h6c-analysis/IDR3/beam_simulation_products/spectra_cache_hera_core.h5")
AUTO_FR_SPECTRUM_FILE = os.environ.get("AUTO_FR_SPECTRUM_FILE",
                                       "/lustre/aoc/projects/hera/zmartino/hera_frf/spectra_cache/spectra_cache_hera_auto.h5")
XTALK_FR = float(os.environ.get("XTALK_FR", 0.01))  # in mHz

FR_QUANTILE_LOW = float(os.environ.get("FR_QUANTILE_LOW", 0.05))
FR_QUANTILE_HIGH = float(os.environ.get("FR_QUANTILE_HIGH", 0.95))

MIN_SAMP_FRAC = float(os.environ.get("MIN_SAMP_FRAC", 0.1))

APPLY_PRIOR_FLAGS = os.environ.get("APPLY_PRIOR_FLAGS", "FALSE").upper() == "TRUE"
PRIOR_FLAG_SUFFIX = os.environ.get("PRIOR_FLAG_SUFFIX", ".flag_waterfall_round_3.h5")

for setting in ['RED_AVG_FILE', 'CORNER_TURN_MAP_YAML', 'FRF_SNR_SUFFIX', 'DLY_SNR_SUFFIX', 
                'FR_SPECTRA_FILE', 'AUTO_FR_SPECTRUM_FILE', 'PRIOR_FLAG_SUFFIX']:
    print(f'{setting} = "{eval(setting)}"')
for setting in []:
    print(f'{setting} = {eval(setting)}')
for setting in ['FM_LOW_FREQ', 'FM_HIGH_FREQ', 'FILTER_DELAY', 'EIGENVAL_CUTOFF', 'XTALK_FR',
                'SAVE_DLY_SNR', 'SAVE_FRF_SNR', 'FR_QUANTILE_LOW', 'FR_QUANTILE_HIGH', 'MIN_SAMP_FRAC',
                'APPLY_PRIOR_FLAGS']:
    print(f'{setting} = {eval(setting)}')
RED_AVG_FILE = "/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459976/zen.2459976.21367.sum.smooth_calibrated.red_avg.uvh5"
CORNER_TURN_MAP_YAML = "/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459976/single_baseline_files/corner_turn_map.yaml"
FRF_SNR_SUFFIX = ".pI_FRF_SNR.uvh5"
DLY_SNR_SUFFIX = ".pI_DLYFILT_SNR.uvh5"
FR_SPECTRA_FILE = "/lustre/aoc/projects/hera/h6c-analysis/IDR3/beam_simulation_products/spectra_cache_hera_core.h5"
AUTO_FR_SPECTRUM_FILE = "/lustre/aoc/projects/hera/zmartino/hera_frf/spectra_cache/spectra_cache_hera_auto.h5"
PRIOR_FLAG_SUFFIX = ".flag_waterfall_round_3.h5"
FM_LOW_FREQ = 87.5
FM_HIGH_FREQ = 108.0
FILTER_DELAY = 750.0
EIGENVAL_CUTOFF = 1e-12
XTALK_FR = 0.01
SAVE_DLY_SNR = True
SAVE_FRF_SNR = False
FR_QUANTILE_LOW = 0.05
FR_QUANTILE_HIGH = 0.95
MIN_SAMP_FRAC = 0.1
APPLY_PRIOR_FLAGS = True

Preliminaries¶

In [4]:
with open(CORNER_TURN_MAP_YAML, 'r') as file:
    corner_turn_map = yaml.unsafe_load(file)
In [5]:
# get autocorrelations
# TODO: generalize for not-previously inpainted data
all_outfiles = [outfile.replace('.uvh5', '.inpainted.uvh5') for outfiles in corner_turn_map['files_to_outfiles_map'].values() for outfile in outfiles]
for outfile in all_outfiles:
    match = re.search(r'\.(\d+)_(\d+)\.', os.path.basename(outfile))
    if match and match.group(1) == match.group(2):
        hd_autos = io.HERAData(outfile)
        autos, _, auto_nsamples = hd_autos.read(polarizations=['ee', 'nn'])
        break
In [6]:
# Load and combine prior flags if enabled
if APPLY_PRIOR_FLAGS:
    jdstr = [s for s in os.path.basename(RED_AVG_FILE).split('.') if s.isnumeric()][0]
    flag_dir = os.path.dirname(CORNER_TURN_MAP_YAML)
    flag_pattern = os.path.join(flag_dir, f'zen.{jdstr}*{PRIOR_FLAG_SUFFIX}')
    prior_flag_files = sorted(glob.glob(flag_pattern))
    
    if len(prior_flag_files) == 0:
        raise ValueError(f'APPLY_PRIOR_FLAGS is True but no files matched {flag_pattern}')
    else:
        print(f'Found {len(prior_flag_files)} prior flag files:')
        for f in prior_flag_files:
            print(f'  {os.path.basename(f)}')
        prior_flags = np.any([np.all(UVFlag(flag_file).flag_array, axis=-1) for flag_file in prior_flag_files], axis=0)
        print(f'Combined prior flags: {np.mean(prior_flags):.3%} flagged.')
Found 1 prior flag files:
  zen.2459976.flag_waterfall_round_3.h5
Combined prior flags: 48.271% flagged.

Define Plotting and Helper Functions¶

In [7]:
def compute_mb_fr_ranges(hd, antpair):
    '''Compute main beam fringe rate ranges from beam simulation spectra.
    Returns per-frequency arrays: freqs in MHz, FR bounds in mHz.'''
    with h5py.File(FR_SPECTRA_FILE, "r") as h5f:
        metadata = h5f["metadata"]
        bl_to_index_map = {tuple(ap): int(index) for index, antpairs
                           in metadata["baseline_groups"].items() for ap in antpairs}
        spectrum_freqs = metadata["frequencies_MHz"][()] * 1e6
        m_modes = metadata["erh_mode_integer_index"][()]
        this_red_group = red_groups.RedundantGroups.from_antpos(hd.antpos)[antpair]
        for ap in this_red_group:
            if ap in bl_to_index_map:
                mmode_spectrum = h5f["erh_mode_power_spectrum"][:, :, bl_to_index_map[ap]]
                break
            elif ap[::-1] in bl_to_index_map:
                mmode_spectrum = h5f["erh_mode_power_spectrum"][:, :, bl_to_index_map[ap[::-1]]]
                m_modes = m_modes * -1
                break
        else:
            raise KeyError(f'{antpair}, nor any baseline redundant with it, was found in bl_to_index_map.')

    # Build mixing matrix from m-modes to fringe rates
    full_times = hd.times
    times_ks = (full_times - full_times[0] + np.median(np.diff(full_times))) * units.day.to(units.ks)
    filt_frates = np.fft.fftshift(np.fft.fftfreq(times_ks.size, d=np.median(np.diff(times_ks))))
    _m2f_mixer = get_m2f_mixer(times_ks, m_modes)

    # Vectorized: FR spectrum for every spectrum_freq channel at once
    # _m2f_mixer: (n_frates, n_mmodes), mmode_spectrum: (n_mmodes, n_spectrum_freqs)
    fr_spectra = np.abs(np.einsum("fm,mc,mf->fc", _m2f_mixer, mmode_spectrum, _m2f_mixer.T.conj()))
    # Normalize each channel
    fr_spectra /= fr_spectra.sum(axis=0, keepdims=True)

    # Compute quantile bounds per spectrum_freq channel
    cumsum = np.cumsum(fr_spectra, axis=0)
    spec_tops = np.array([np.interp(FR_QUANTILE_HIGH, cumsum[:, c], filt_frates)
                           for c in range(len(spectrum_freqs))])
    spec_bottoms = np.array([np.interp(FR_QUANTILE_LOW, cumsum[:, c], filt_frates)
                              for c in range(len(spectrum_freqs))])

    # Interpolate from spectrum_freqs to data freqs (with extrapolation)
    mb_frate_tops = interpolate.interp1d(spectrum_freqs, spec_tops, fill_value='extrapolate')(hd.freqs)
    mb_frate_bottoms = interpolate.interp1d(spectrum_freqs, spec_bottoms, fill_value='extrapolate')(hd.freqs)

    return (hd.freqs / 1e6, mb_frate_tops, mb_frate_bottoms)


def compute_sky_fr_ranges(hd, antpair):
    '''Compute sky fringe rate ranges using analytic sky_frates_single + empirical buffer.
    Returns per-frequency arrays in MHz and mHz.'''
    blvec = hd.antpos[antpair[0]] - hd.antpos[antpair[1]]
    latitude = hd.telescope.location.lat.rad
    sky_centers, sky_hws = sky_frates_single(hd.freqs, blvec, latitude)  # mHz
    fr_buffer = get_FR_buffer_from_spectra(AUTO_FR_SPECTRUM_FILE, hd.times, hd.freqs, gauss_fit_buffer_cut=1e-5)
    return (hd.freqs / 1e6, sky_centers + sky_hws + fr_buffer, sky_centers - sky_hws - fr_buffer)


def overlaps_FR0(bands_bl, mb_frate_tops, mb_frate_bottoms):
    '''This checks if the main beam FRs are overlap FR=0 ± XTALK_FR.'''
    for band in bands_bl:    
        # check if all FRs above FR=0 band
        if not ((np.all(mb_frate_tops[band] > XTALK_FR)) and (np.all(mb_frate_bottoms[band] > XTALK_FR))):
            # check if all FRs below FR=0 band
            if not ((np.all(mb_frate_tops[band] < -XTALK_FR)) and (np.all(mb_frate_bottoms[band] < -XTALK_FR))):
                return True
    return False
In [8]:
def plot_fr_waterfall(snr_wf, flags_wf, taper_2d, freqs, times, title,
                      mb_frate_freqs_MHz=None, mb_frate_tops=None, mb_frate_bottoms=None,
                      sky_frate_freqs_MHz=None, sky_frate_tops=None, sky_frate_bottoms=None,
                      vmax=5):
    '''Plot freq vs fringe rate waterfall of |SNR| after FFT along time axis.
    Accepts pre-assembled full waterfalls with a 2D per-band taper.'''
    ntimes = len(times)
    times_in_seconds = (times - times[0]) * 24 * 3600
    frates = uvtools.utils.fourier_freqs(times_in_seconds) * 1000  # mHz

    # Per-column normalization accounting for taper and flags
    unflagged = (~flags_wf).astype(float)
    norm = (ntimes * np.mean((taper_2d * unflagged)**2, axis=0))**.5

    # FFT with per-band taper 
    to_plot = np.fft.fftshift(np.fft.fft(taper_2d * np.where((~flags_wf) & (taper_2d > 0), snr_wf, 0), axis=0), axes=0)
    to_plot = np.abs(to_plot) / norm[np.newaxis, :]

    fig = plt.figure(figsize=(14, 8), dpi=200)
    extent = [freqs[0] / 1e6, freqs[-1] / 1e6, frates[-1], frates[0]]
    im = plt.imshow(to_plot, aspect='auto', interpolation='none',
                    extent=extent, vmin=0, vmax=vmax, cmap='plasma')
    plt.colorbar(im, extend='max', label='|pI SNR|')
    plt.xlabel('Frequency (MHz)')
    plt.ylabel('Fringe Rate (mHz)')
    plt.title(title)

    if sky_frate_freqs_MHz is not None:
        plt.plot(sky_frate_freqs_MHz, sky_frate_tops, 'w:', lw=1, label='Sky FRs')
        plt.plot(sky_frate_freqs_MHz, sky_frate_bottoms, 'w:', lw=1)
        plt.ylim([-np.max([np.abs(sky_frate_tops), np.abs(sky_frate_bottoms)]) * 1.25,
                  np.max([np.abs(sky_frate_tops), np.abs(sky_frate_bottoms)]) * 1.25])
    else:
        plt.ylim([-5, 5])
    if mb_frate_freqs_MHz is not None:
        plt.plot(mb_frate_freqs_MHz, mb_frate_tops, 'w--', lw=1, label='Main Beam FRs')
        plt.plot(mb_frate_freqs_MHz, mb_frate_bottoms, 'w--', lw=1)
    if sky_frate_freqs_MHz is not None or mb_frate_freqs_MHz is not None:
        plt.legend()

    plt.tight_layout()
    plt.close(fig)
    return fig


def plot_time_freq_waterfall(snr_wf, flags_wf, freqs, times, lsts, title, vmax=5):
    '''Plot freq vs time waterfall of |SNR| in real space with LST right axis.
    Accepts pre-assembled full waterfalls.'''
    to_plot = np.where(flags_wf, np.nan, np.abs(snr_wf))

    fig, ax = plt.subplots(figsize=(14, 8), dpi=200)
    extent = [freqs[0] / 1e6, freqs[-1] / 1e6,
              times[-1] - int(times[0]), times[0] - int(times[0])]
    im = ax.imshow(to_plot, aspect='auto', interpolation='none',
                   extent=extent, vmin=0, vmax=vmax, cmap='plasma')
    plt.colorbar(im, extend='max', label='|pI SNR|', ax=ax)
    ax.set_xlabel('Frequency (MHz)')
    ax.set_ylabel(f'JD - {int(times[0])}')
    ax.set_title(title)

    # Add LST right axis with proper wrapping
    lst_grid = lsts * 12 / np.pi  # radians to hours
    lst_grid[lst_grid > lst_grid[-1]] -= 24
    ax2 = ax.twinx()
    ax2.set_ylim(lst_grid[-1], lst_grid[0])
    mod24 = lambda x, _: f"{x % 24:.1f}"
    ax2.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(mod24))
    ax2.set_ylabel('LST (hours)')

    plt.tight_layout()
    plt.close(fig)
    return fig


def plot_snr_histograms(snr_wf, flags_wf, title):
    '''Plot histogram of |SNR| compared to the Rayleigh distribution expected for noise-only.
    Accepts pre-assembled full waterfall and flags.'''
    fig = plt.figure(figsize=(12, 5))
    bins = np.arange(0, 10, .01)
    to_hist = np.abs(snr_wf[~flags_wf])
    to_hist = to_hist[np.isfinite(to_hist) & (to_hist > 0)]
    hist = plt.hist(to_hist, bins=bins, density=True, label='Real-space |pI SNR|')
    plt.plot(bins, 2 * bins * np.exp(-bins**2), 'k--', label='Rayleigh Distribution (Noise-Only)')
    plt.yscale('log')
    all_densities = hist[0][hist[0] > 0]
    if len(all_densities) > 0:
        plt.ylim(np.min(all_densities) / 2, np.max(all_densities) * 2)
    plt.legend()
    plt.ylabel('Density')
    plt.xlabel('|pI SNR|')
    plt.xlim([-.5, 10])
    plt.title(title)
    plt.tight_layout()
    plt.close(fig)
    return fig

Compute pI SNR, Looping Over Baselines¶

In [9]:
delay_fr_figs = []
dly_waterfall_figs = []
dly_histogram_figs = []
waterfall_figs = []
histogram_figs = []

for single_bl_file in corner_turn_map['files_to_outfiles_map'][RED_AVG_FILE]:
    if not SAVE_DLY_SNR and not SAVE_FRF_SNR:
        continue

    # Load data
    single_bl_file = single_bl_file.replace('.uvh5', '.inpainted.uvh5')
    print(f'Now loading {single_bl_file}')
    hd = io.HERAData(single_bl_file)
    data, flags, nsamples = hd.read(polarizations=['ee', 'nn'])
    dt = np.median(np.diff(hd.times)) * 24 * 3600
    df = np.median(np.diff(hd.freqs))
    antpair = data.antpairs().pop()

    if antpair[0] == antpair[1]:
        print('\tThis baseline is an autocorrelation. Skipping...')
        continue

    med_auto_nsamples = {bl[2]: np.median(n) for bl, n in auto_nsamples.items()}
    if not any([np.median(nsamples[bl]) > MIN_SAMP_FRAC * med_auto_nsamples[bl[2]] for bl in nsamples]):
        print('\tNo polarization has enough nsamples to be worth filtering. Skipping...')
        continue

    # Combine flags across pols, including prior flags if enabled
    flags_here = flags[antpair + ('ee',)] | flags[antpair + ('nn',)]
    if APPLY_PRIOR_FLAGS:
        flags_here |= prior_flags

    # Get tslice and bands split around FM
    tslices_bl, bands_bl = flag_utils.get_minimal_slices(flags_here,
        freqs=data.freqs, freq_cuts=[(FM_LOW_FREQ + FM_HIGH_FREQ) * .5e6])

    # Compute FR ranges for this baseline
    print('\tComputing fringe rate ranges...')
    (mb_frate_freqs_MHz, mb_frate_tops, mb_frate_bottoms) = compute_mb_fr_ranges(hd, antpair)
    (sky_frate_freqs_MHz, sky_frate_tops, sky_frate_bottoms) = compute_sky_fr_ranges(hd, antpair)

    # Exclude baselines that overlap FR=0, since there's sometimes extra high delay structure there that's not relevant to this step
    if overlaps_FR0(bands_bl, mb_frate_tops, mb_frate_bottoms):
        print(f'\tThis baseline overlaps with the FR = (0 ± {XTALK_FR}) mHz band. Skipping...')
        continue        

    # Process each band
    filt_flags_full = np.ones((len(hd.times), len(hd.freqs)), dtype=bool)
    newly_flagged = np.zeros((len(hd.times), len(hd.freqs)), dtype=bool)
    dly_filt_SNR_full = np.full((len(hd.times), len(hd.freqs)), np.nan, dtype=complex)
    frf_SNR_full = np.full((len(hd.times), len(hd.freqs)), np.nan, dtype=complex)
    taper_2d = np.zeros((len(hd.times), len(hd.freqs)))

    for tslice, band in zip(tslices_bl, bands_bl):
        if (band is None) or np.all(flags_here[tslice, band]):
            continue

        # Extract per-pol data for this band
        d_ee = data[antpair + ('ee',)][tslice, band]
        d_nn = data[antpair + ('nn',)][tslice, band]
        f_ee = flags[antpair + ('ee',)][tslice, band]
        f_nn = flags[antpair + ('nn',)][tslice, band]
        n_ee = nsamples[antpair + ('ee',)][tslice, band]
        n_nn = nsamples[antpair + ('nn',)][tslice, band]
        a_ee = np.abs(autos[autos.antpairs().pop() + ('ee',)][tslice, band])
        a_nn = np.abs(autos[autos.antpairs().pop() + ('nn',)][tslice, band])

        # Compute variance from autos
        var_pI = a_ee**2 / (dt * df) / n_ee + a_nn**2 / (dt * df) / n_nn

        # Form pseudo-Stokes pI
        d_pI, f_pI, n_pI = hp.pstokes._combine_pol_arrays(
            'ee', 'nn', 'pI', pol_convention=hd.pol_convention,
            data_list=[d_ee, d_nn], flags_list=[f_ee, f_nn],
            nsamples_list=[n_ee, n_nn],
            x_orientation=hd.telescope.get_x_orientation_from_feeds())

        if APPLY_PRIOR_FLAGS:
            f_pI |= prior_flags[tslice, band]

        d_pI[f_pI] = 0

        # Compute SNR
        SNR = d_pI / var_pI**.5

        # Delay filter
        print(f'\tDelay filtering band {band}...')
        wgts = np.where(f_pI, 0, 1).astype(float)
        filter_kwargs = dict(filter_centers=[0], filter_half_widths=[FILTER_DELAY * 1e-9],
            eigenval_cutoff=[EIGENVAL_CUTOFF], suppression_factors=[EIGENVAL_CUTOFF],
            max_contiguous_edge_flags=len(hd.freqs))
        result, _, info = fourier_filter(hd.freqs[band], SNR, wgts=wgts,
            mode='dpss_leastsq', **filter_kwargs)
        dly_filt_SNR = SNR - result

        # Re-run failed integrations with dpss_matrix (pinv-based, never fails)
        for i, s in info['status']['axis_1'].items():
            if s == 'skipped':
                res_i, _, _ = fourier_filter(hd.freqs[band], SNR[i:i+1, :],
                    wgts=wgts[i:i+1, :], mode='dpss_matrix', **filter_kwargs)
                dly_filt_SNR[i] = SNR[i] - res_i[0]

        # Per-integration weighted leverage correction
        X = dpss_operator(hd.freqs[band], [0],
            filter_half_widths=[FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]
        correction_cache = {}
        for i in range(SNR.shape[0]):
            w = wgts[i]
            if not np.any(w):
                continue
            cache_key = f_pI[i].tobytes()
            if cache_key not in correction_cache:
                XtWX = (X.T * w) @ X
                if np.all(np.isclose(XtWX.imag, 0)):
                    XtWX = np.real(XtWX)
                try:
                    P = np.linalg.pinv(XtWX) @ (X.T * w)
                    lev = np.real(np.sum(X * P.T, axis=1))
                    correction = (1 - lev)**.5
                    correction_cache[cache_key] = np.where(np.isfinite(correction) & (lev > 0) & (lev < 1), correction, np.nan)
                except np.linalg.LinAlgError:
                    correction_cache[cache_key] = np.full(X.shape[0], np.nan)
            dly_filt_SNR[i] /= correction_cache[cache_key]

        # update larger arrays and account for non-finite SNR (e.g. from degenerate leverage corrections)
        newly_flagged[tslice, band] |= ~np.isfinite(dly_filt_SNR) & ~f_pI
        f_pI |= newly_flagged[tslice, band]
        dly_filt_SNR[newly_flagged[tslice, band]] = 0
        filt_flags_full[tslice, band] = f_pI
        dly_filt_SNR_full[tslice, band] = dly_filt_SNR

        if SAVE_FRF_SNR:
            # Per-channel fringe rate filter (using full-resolution main beam bounds)
            print(f'\tFR filtering band {band}...')
            frf_SNR = copy.deepcopy(dly_filt_SNR)  # real-space normalized
            times_in_seconds = (hd.times[tslice] - hd.times[tslice][0]) * 24 * 3600
            for chan, (fr_low, fr_high) in enumerate(zip(mb_frate_bottoms[band], mb_frate_tops[band])):
                fr_center = (fr_low + fr_high) / 2 / 1000  # mHz -> Hz
                fr_halfwidth = (fr_high - fr_low) / 2 / 1000  # mHz -> Hz

                # filter directly at the main-beam fringe rate center
                result, _, _ = fourier_filter(times_in_seconds, dly_filt_SNR[:, chan:chan+1],
                    wgts=np.where(f_pI[:, chan:chan+1], 0, 1), filter_centers=[fr_center],
                    filter_half_widths=[fr_halfwidth], mode='dpss_leastsq',
                    eigenval_cutoff=[EIGENVAL_CUTOFF], suppression_factors=[EIGENVAL_CUTOFF],
                    max_contiguous_edge_flags=len(data.times), filter_dims=0)

                # Temporal leverage correction
                Xt = dpss_operator(hd.times[tslice] * 24 * 3600, filter_centers=[fr_center],
                    filter_half_widths=[fr_halfwidth], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]
                W = np.where(f_pI[:, chan], 0, 1)
                XtWXt = np.dot(Xt.conj().T * W, Xt)
                try:
                    P = np.linalg.pinv(XtWXt) @ (Xt.conj().T * W)
                    lev_t = np.real(np.sum(Xt * P.T, axis=1))
                    lev_t_correction = lev_t**.5
                    lev_t_correction = np.where(np.isfinite(lev_t) & (lev_t > 0), lev_t_correction, np.nan)
                except np.linalg.LinAlgError:
                    lev_t_correction = np.full(Xt.shape[0], np.nan)

                frf_SNR[:, chan:chan+1] = np.where(f_pI[:, chan:chan+1], 0, result / lev_t_correction[:, None])

            # account for non-finite SNR again
            frf_SNR_full[tslice, band] = frf_SNR
            newly_flagged[tslice, band] |= ~np.isfinite(frf_SNR) & ~f_pI

        # build 2D taper for plotting
        band_ntimes = tslice.stop - tslice.start
        taper_2d[tslice, band] = blackmanharris(band_ntimes)[:, np.newaxis]

    if np.any(newly_flagged):
        print(f'\tFlagging {np.sum(newly_flagged)} waterfall pixels due to NaNs and infs, typically due to degenerate leverage.')
        filt_flags_full |= newly_flagged
    
    if np.all(filt_flags_full):
        print(f'\t{antpair} is entirely flagged.')
        continue

    # Now produce figures to display later
    delay_fr_figs.append(plot_fr_waterfall(
        dly_filt_SNR_full, filt_flags_full, taper_2d, hd.freqs, hd.times,
        f'{antpair} Delay-Filtered pI',
        mb_frate_freqs_MHz=mb_frate_freqs_MHz, mb_frate_tops=mb_frate_tops, mb_frate_bottoms=mb_frate_bottoms,
        sky_frate_freqs_MHz=sky_frate_freqs_MHz, sky_frate_tops=sky_frate_tops, sky_frate_bottoms=sky_frate_bottoms))

    dly_waterfall_figs.append(plot_time_freq_waterfall(
        dly_filt_SNR_full, filt_flags_full, hd.freqs, hd.times, hd.lsts,
        f'{antpair} Delay-Filtered pI'))

    dly_histogram_figs.append(plot_snr_histograms(
        dly_filt_SNR_full, filt_flags_full, f'{antpair} Delay-Filtered pI'))

    if SAVE_FRF_SNR:
        waterfall_figs.append(plot_time_freq_waterfall(
            frf_SNR_full, filt_flags_full, hd.freqs, hd.times, hd.lsts,
            f'{antpair} Delay+FR-Filtered pI'))

        histogram_figs.append(plot_snr_histograms(
            frf_SNR_full, filt_flags_full, f'{antpair} Delay+FR-Filtered pI'))

    # Write SNR outputs
    if SAVE_DLY_SNR or SAVE_FRF_SNR:
        hd.select(polarizations=['ee'])
        hd.polarization_array[0] = utils.polstr2num('pI')
        for bl in list(data.keys()):
            if bl[2] != 'ee':
                del data[bl]
                del flags[bl]
        for to_save, snr_full, suffix, label in [(SAVE_DLY_SNR, dly_filt_SNR_full, DLY_SNR_SUFFIX, 'delay-filtered'),
                                                 (SAVE_FRF_SNR, frf_SNR_full, FRF_SNR_SUFFIX, 'delay-filtered and FRFed')]:
            if not to_save:
                continue
            data[antpair + ('ee',)] = np.where(np.isfinite(snr_full), snr_full, 0)
            flags[antpair + ('ee',)] = filt_flags_full
            hd.update(data=data, flags=flags)
            outfile = single_bl_file.replace('.uvh5', suffix)
            print(f'\tWriting {label} results to {outfile}')
            hd.write_uvh5(outfile, clobber=True)
Now loading /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459976/single_baseline_files/zen.2459976.baseline.0_4.sum.smooth_calibrated.red_avg.inpainted.uvh5
	Computing fringe rate ranges...
	Delay filtering band slice(np.int64(0), np.int64(332), None)...
invalid value encountered in divide
invalid value encountered in sqrt
	Delay filtering band slice(np.int64(501), np.int64(1496), None)...
	Flagging 131 waterfall pixels due to NaNs and infs, typically due to degenerate leverage.
invalid value encountered in divide
	Writing delay-filtered results to /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459976/single_baseline_files/zen.2459976.baseline.0_4.sum.smooth_calibrated.red_avg.inpainted.pI_DLYFILT_SNR.uvh5

Figure 1: Delay-Filtered pI SNR in Fringe Rate Space¶

In [10]:
for fig in delay_fr_figs:
    display(fig)
No description has been provided for this image

Figure 2: Delay-Filtered pI SNR Waterfall¶

In [11]:
for fig in dly_waterfall_figs:
    display(fig)
No description has been provided for this image

Figure 3: Delay-Filtered pI SNR Histogram¶

In [12]:
for fig in dly_histogram_figs:
    display(fig)
No description has been provided for this image

Figure 4: Delay+FR Filtered pI SNR Waterfall¶

In [13]:
for fig in waterfall_figs:
    display(fig)

Figure 5: Delay+FR Filtered pI SNR Histogram¶

In [14]:
for fig in histogram_figs:
    display(fig)

Metadata¶

In [15]:
for repo in ['hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates', 'hera_pspec', 'pyuvdata', 'numpy']:
    exec(f'from {repo} import __version__')
    print(f'{repo}: {__version__}')
hera_cal: 3.7.8.dev17+g164c717b7
hera_qm: 2.2.1.dev12+g95ecc30f0
hera_filters: 0.1.7
hera_notebook_templates: 0.0.1.dev1372+g2eeaf96cf
hera_pspec: 0.4.3.dev99+g0e4b0e22b
pyuvdata: 3.2.6.dev4+g0509a7d17
numpy: 2.2.5
In [16]:
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 1.77 minutes.