Single File Post-Processing¶

by Josh Dillon, last updated January 9, 2025

This notebook, the final step in a pre-LST-binning analysis, applies calibration solutions and flags to both sum and diff data, producing a variety of data products. These currently may include:

  • Abs-calibrated, redundantly-averaged visibility sums

  • Abs-calibrated, redundantly-averaged visibility diffs

  • Smooth-calibrated, redundantly-averaged visibility sums

  • Smooth-calibrated, redundantly-averaged visibility diffs

  • Abs-calibrated, redundantly-averaged, delay-filtered visibility sums

  • Abs-calibrated, redundantly-averaged, delay-filtered visibility diffs

  • Smooth-calibrated, redundantly-averaged, delay-filtered visibility sums

  • Smooth-calibrated, redundantly-averaged, delay-filtered visibility diffs

  • Smooth-calibrated, coherently redundantly-averaged, incoherently array-averaged visibility magnitudes

    • This is done for all visibilities, just autos, and just cross-correlations and is designed to be lightweight data product for transient searches

Some of these data products are experimental and may be removed at a later date to save compute and disk. Some may be turned off using environment variables.

Here's a set of links to skip to particular figures and tables:

• Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal¶

• Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶

• Figure 3: Delay-Filtered Visibility Delay Spectra after smooth_cal and Redundant-Averaging¶

In [1]:
import time
tstart = time.time()
In [2]:
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin  # REQUIRED to have the compression plugins available
import numpy as np
import copy
import pickle
import glob
from pyuvdata import UVFlag
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from hera_qm.metrics_io import read_a_priori_ant_flags
from hera_qm.time_series_metrics import true_stretches
from scipy import constants, signal
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline

Load data and calibration solutions¶

In [3]:
# figure out whether to save results (and which ones)
SAVE_RESULTS = os.environ.get("SAVE_RESULTS", "TRUE").upper() == "TRUE"
SAVE_DIFF_RED_AVG = os.environ.get("SAVE_DIFF_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_ABS_CAL_RED_AVG = os.environ.get("SAVE_ABS_CAL_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_DLY_FILT_RED_AVG = os.environ.get("SAVE_DLY_FILT_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_CROSS_POLS = os.environ.get("SAVE_CROSS_POLS", "FALSE").upper() == "TRUE"
# TODO: in theory, if some of these are turned off, some memory and/or processing could be saved 
# by doing only a subset of what this notebook does. That's left for future work.

add_to_history = 'Produced by file_prostprocessing notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
for setting in ['SAVE_RESULTS', 'SAVE_DIFF_RED_AVG', 'SAVE_ABS_CAL_RED_AVG', 'SAVE_DLY_FILT_RED_AVG', 'SAVE_CROSS_POLS']:
    print(f'{setting} = {eval(setting)}')
SAVE_RESULTS = True
SAVE_DIFF_RED_AVG = False
SAVE_ABS_CAL_RED_AVG = False
SAVE_DLY_FILT_RED_AVG = False
SAVE_CROSS_POLS = True
In [4]:
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.46123.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')
DIFF_SUFFIX = os.environ.get("DIFF_SUFFIX", 'diff.uvh5')

# get input calibration files and flags
ABS_CAL_SUFFIX = os.environ.get("ABS_CAL_SUFFIX", 'sum.omni.calfits')
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
APOSTERIORI_YAML_SUFFIX = os.environ.get("APOSTERIORI_YAML_SUFFIX", '_aposteriori_flags.yaml')
aposteriori_yaml_file = os.path.join(os.path.dirname(SUM_FILE), SUM_FILE.split('.')[-4] + APOSTERIORI_YAML_SUFFIX)

# Get filter cache name
FILTER_CACHE = os.environ.get("FILTER_CACHE", "filter_cache")
filter_cache_dir = os.path.join(os.path.dirname(SUM_FILE), FILTER_CACHE)

# Get output reds pickle name for storing the actual reds used in the redundant average
REDS_PICKLE_SUFFIX = os.environ.get("REDS_PICKLE_SUFFIX", 'reds_used.p')

# output abs-calibrated, redundantly-averaged files
SUM_ABS_CAL_RED_AVG_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_SUFFIX", 'sum.abs_calibrated.red_avg.uvh5')
DIFF_ABS_CAL_RED_AVG_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_SUFFIX", 'diff.abs_calibrated.red_avg.uvh5')

# output smooth-calibrated, redundantly-averaged files
SUM_SMOOTH_CAL_RED_AVG_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_SUFFIX", 'sum.smooth_calibrated.red_avg.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_SUFFIX", 'diff.smooth_calibrated.red_avg.uvh5')

# output abs-calibrated, redundantly-averaged, delay-filtered files
SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.abs_calibrated.red_avg.dly_filt.uvh5')
DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.abs_calibrated.red_avg.dly_filt.uvh5')

# output smooth-calibrated, redundantly-averaged, delay-filtered files
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.smooth_calibrated.red_avg.dly_filt.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.smooth_calibrated.red_avg.dly_filt.uvh5')

# output smooth-calibrated, averaged absolute value of visibilities
AVG_ABS_ALL_SUFFIX = os.environ.get("AVG_ABS_SUFFIX", 'sum.smooth_calibrated.avg_abs_all.uvh5')
AVG_ABS_AUTO_SUFFIX = os.environ.get("AVG_ABS_AUTO_SUFFIX", 'sum.smooth_calibrated.avg_abs_auto.uvh5')
AVG_ABS_CROSS_SUFFIX = os.environ.get("AVG_ABS_CROSS_SUFFIX", 'sum.smooth_calibrated.avg_abs_cross.uvh5')

for suffix in ['DIFF_SUFFIX', 'ABS_CAL_SUFFIX', 'SMOOTH_CAL_SUFFIX', 'REDS_PICKLE_SUFFIX',
               'SUM_ABS_CAL_RED_AVG_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_SUFFIX',
               'SUM_SMOOTH_CAL_RED_AVG_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_SUFFIX',               
               'SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 
               'SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX',               
               'AVG_ABS_ALL_SUFFIX', 'AVG_ABS_AUTO_SUFFIX', 'AVG_ABS_CROSS_SUFFIX']:
    file_var = suffix.replace('SUFFIX', 'FILE')
    exec(f'{file_var} = SUM_FILE.replace(SUM_SUFFIX, {suffix})')
    print(f"{file_var} = '{eval(file_var)}'")
print(f'aposteriori_yaml_file = {aposteriori_yaml_file}')
print(f'filter_cache = {filter_cache_dir}')
DIFF_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.diff.uvh5'
ABS_CAL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.omni.calfits'
SMOOTH_CAL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth.calfits'
REDS_PICKLE_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.reds_used.p'
SUM_ABS_CAL_RED_AVG_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.abs_calibrated.red_avg.uvh5'
DIFF_ABS_CAL_RED_AVG_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.diff.abs_calibrated.red_avg.uvh5'
SUM_SMOOTH_CAL_RED_AVG_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.red_avg.uvh5'
DIFF_SMOOTH_CAL_RED_AVG_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.diff.smooth_calibrated.red_avg.uvh5'
SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.abs_calibrated.red_avg.dly_filt.uvh5'
DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.diff.abs_calibrated.red_avg.dly_filt.uvh5'
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.red_avg.dly_filt.uvh5'
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.diff.smooth_calibrated.red_avg.dly_filt.uvh5'
AVG_ABS_ALL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.avg_abs_all.uvh5'
AVG_ABS_AUTO_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.avg_abs_auto.uvh5'
AVG_ABS_CROSS_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.avg_abs_cross.uvh5'
aposteriori_yaml_file = /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/2459856_aposteriori_flags.yaml
filter_cache = /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/filter_cache
In [5]:
# parse delay filtering settings
DLY_FILT_HORIZON = float(os.environ.get("DLY_FILT_HORIZON", 1.0))
DLY_FILT_STANDOFF = float(os.environ.get("DLY_FILT_STANDOFF", 0.0)) # in ns
DLY_FILT_MIN_DLY = float(os.environ.get("DLY_FILT_MIN_DLY", 150.0)) # in ns
DLY_FILT_EIGENVAL_CUTOFF = float(os.environ.get("DLY_FILT_EIGENVAL_CUTOFF", 1e-12))
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
for setting in ['DLY_FILT_HORIZON', 'DLY_FILT_STANDOFF', 'DLY_FILT_MIN_DLY', 'DLY_FILT_EIGENVAL_CUTOFF', 'FM_LOW_FREQ', 'FM_HIGH_FREQ']:
        print(f'{setting} = {eval(setting)}')
DLY_FILT_HORIZON = 1.0
DLY_FILT_STANDOFF = 0.0
DLY_FILT_MIN_DLY = 150.0
DLY_FILT_EIGENVAL_CUTOFF = 1e-12
FM_LOW_FREQ = 87.5
FM_HIGH_FREQ = 108.0
In [6]:
# Find all cached matrices within the filter cache
cache_files = glob.glob(os.path.join(filter_cache_dir, "*.filter_cache.p"))
In [7]:
pols = ['nn', 'ee', 'ne', 'en'] if SAVE_CROSS_POLS else ['ee', 'nn']

# Load sum and diff data
hd = io.HERADataFastReader(SUM_FILE)
data, flags, nsamples = hd.read(pols=pols)

if SAVE_DIFF_RED_AVG:
    hd_diff = io.HERADataFastReader(DIFF_FILE)
    diff_data, diff_flags, diff_nsamples = hd_diff.read(pols=pols)
In [8]:
# pick all reds that might be in the data, using the same set across polarizations for easier output
reds = redcal.get_reds(hd.data_antpos, pols=pols, include_autos=True, bl_error_tol=2.0)
ex_ants = set(read_a_priori_ant_flags(aposteriori_yaml_file))
possibly_unflagged_bls = [bl for bl in data if utils.split_bl(bl)[0] not in ex_ants and utils.split_bl(bl)[1] not in ex_ants]
possibly_unflagged_antpairs = set([ap for bl in possibly_unflagged_bls for ap in [bl[:2], bl[-2::-1]]])
reds = [red for red in reds if np.any([bl[0:2] in possibly_unflagged_antpairs for bl in red])]
In [9]:
# Load calibration solutions and gain flags
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()
pol_convention = hc_smooth.pol_convention
gain_scale = hc_smooth.gain_scale

hc_abs = io.HERACal(ABS_CAL_FILE)
abs_gains, abs_cal_flags, _, _ = hc_abs.read()
assert pol_convention == hc_abs.pol_convention, f'{pol_convention} != {hc_abs.pol_convention}'
assert gain_scale == hc_abs.gain_scale, f'{gain_scale} != {hc_abs.gain_scale}'
In [10]:
# handle the the case where the smooth_cal flags are all True, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
    print('This file is entirely flagged. Proceeding with averaging and filtering using earlier flags, '
          'but the output data products will still be fully flagged.')
    ALL_FLAGGED = True
    
    # Likely the file was fully flagged for broadband RFI, so instead use the original flags from file_calibration
    cal_flags = abs_cal_flags
    # And if that didn't work, just make the flags all False for now (though ex_ants will still be exluded via reds)
    if np.all([flag for flag in cal_flags.values()]):
        cal_flags = {ant: np.zeros_like(cal_flags[ant]) for ant in cal_flags}
This file is entirely flagged. Proceeding with averaging and filtering using earlier flags, but the output data products will still be fully flagged.
In [11]:
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):    
    # Redundantly average data
    wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
                                                             | cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
    sol = redcal.RedSol(reds, gains=gains)
    sol.update_vis_from_data(data, wgts=wgts)
    
    # Figure out redundantly averaged flags and nsamples
    red_avg_flags = {}
    red_avg_nsamples = {}
    for red in reds:
        if red[0] in sol.vis:
            red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
            red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
        else:
            # empty placeholders to make sure every file has the same shape for the whole day
            sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
            red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
            red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
    sol.make_sol_finite()
    
    # Build output RedDataContainers 
    red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
    red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
    red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
    return red_avg_data, red_avg_flags, red_avg_nsamples
In [12]:
# perform redundant averaging
red_avg_smooth_cal_sum_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
red_avg_abs_cal_sum_data, _, _ = red_average(reds, data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
if SAVE_DIFF_RED_AVG:
    red_avg_smooth_cal_diff_data, _, _ = red_average(reds, diff_data, diff_nsamples, smooth_gains, flags=diff_flags, cal_flags=cal_flags)
    red_avg_abs_cal_diff_data, _, _ = red_average(reds, diff_data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
In [13]:
# Save the actual reds used for this redundant average
flagged_bls = [bl for bl in flags if np.all(flags[bl])]
flagged_ants = [ant for ant in cal_flags if np.all(cal_flags[ant])]
reds_used_here = redcal.filter_reds(reds, ex_bls=flagged_bls, ex_ants=flagged_ants)
if SAVE_RESULTS:
    with open(REDS_PICKLE_FILE, 'wb') as reds_pickle_file:
        pickle.dump(reds_used_here, reds_pickle_file)
In [14]:
if not ALL_FLAGGED:
    integration_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags], axis=(0, 2))
In [15]:
def plot_red_avg_vis(pols=['ee', 'nn']):
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 6), dpi=100, sharex='col', sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
    for i, pol in enumerate(pols):

        reds_here = redcal.filter_reds(reds, pols=[pol], antpos=hd.antpos, min_bl_cut=1)
        sol = redcal.RedSol(reds_here, gains=smooth_gains)
        red = sorted(reds_here, key=lambda red: np.median(red_avg_nsamples.get(red[0], 0)), reverse=True)[0]
        
        if np.any([bl[2] == pol for bl in red_avg_flags]):
            for tind in range(red_avg_flags[red[0]].shape[0]):
                if not np.all(red_avg_flags[red[0]][tind]):
                    break

            calibrated = {bl: np.where(flags[bl] | cal_flags[utils.split_bl(bl)[0]] | cal_flags[utils.split_bl(bl)[1]], 
                                       np.nan, sol.calibrate_bl(bl, data[bl])) for bl in red if bl in data}
            for bl in red:
                axes[0, i].plot(hd.freqs/1e6, np.angle(calibrated[bl][tind]), alpha=.5, lw=.5)
                axes[1, i].semilogy(hd.freqs/1e6, np.abs(calibrated[bl][tind]), alpha=.5, lw=.5)

            to_plot = np.where(red_avg_flags[red[0]][tind], np.nan, red_avg_smooth_cal_sum_data[red[0]][tind])
            axes[0, i].plot(hd.freqs / 1e6, np.angle(to_plot), lw=1, c='k')
            axes[1, i].semilogy(hd.freqs / 1e6, np.abs(to_plot), lw=1, c='k', label=f'Baseline Group:\n{(int(red[0][0]), int(red[0][1]), red[0][2])}')
            axes[1, i].set_xlabel('Frequency (MHz)')
            axes[1, i].legend(loc='upper right')
    axes[0, 0].set_ylabel('Visibility Phase (radians)')
    axes[1, 0].set_ylabel('Visibility Amplitude (Jy)')
    plt.tight_layout()
In [16]:
def plot_red_avg_nsamples():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return
    
    fig, axes = plt.subplots(2, 1, figsize=(14,7), dpi=100, sharex=True, gridspec_kw={'hspace': 0})
    max_nsamples = np.max([np.median(ns) for ns in red_avg_nsamples.values()])
    for ax, pol in zip(axes, ['ee', 'nn']):
        if np.any([bl[2] == pol for bl in red_avg_nsamples]):
            blvecs = np.array([hd.antpos[red[0][1]] - hd.antpos[red[0][0]] for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
            med_nsamples = np.array([np.median(red_avg_nsamples[red[0]][~integration_flags, :]) 
                                     for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
            sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=0)
            sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=(100 * (600 / np.diff(ax.get_xlim())[0])**2), ec='k', linewidths=.5,
                             c=med_nsamples, norm=matplotlib.colors.LogNorm(vmin=1, vmax=max_nsamples), cmap='turbo')
            ax.axis('equal')
            ax.set_xlabel('EW Baseline Vector (m)')
            ax.set_ylabel('NS Baseline Vector (m)')
        ax.text(.98, .94, f'{pol}-polarized', transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))           

    plt.tight_layout()
    fig.colorbar(sca, ax=axes, pad=.02, label='Number of Samples')

Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal¶

The results of calibration and redundant averaging the baseline group with the highest redundancy in each polarization. Visibilities with flagged antennas are excluded. The black line is the redundantly-averaged visibility. Each thin colored line is a different baseline group. Phases are shown in the top row, amplitudes in the bottom, ee-polarized visibilities in the left column, and nn-polarized visibilities in the right.

In [17]:
plot_red_avg_vis()
All integrations are flagged. Nothing to plot.
In [18]:
plot_red_avg_vis(['en', 'ne'])
All integrations are flagged. Nothing to plot.

Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶

The number of visibilities averaged together in each baseline group. Note that the split of the HERA core produces more highly-sampled intra-sector baselines that are interspersed with the less-highly-sampled inter-sector baselines off the main grid.

In [19]:
plot_red_avg_nsamples()
All integrations are flagged. Nothing to plot.
In [20]:
# save memory
ALL_CAL_FLAGS = np.all([np.all(~cal_flags[ant]) for ant in cal_flags])
del hd, data, flags, nsamples
if SAVE_DIFF_RED_AVG:
    del hd_diff, diff_data, diff_flags, diff_nsamples
del hc_smooth, smooth_gains, cal_flags, hc_abs, abs_gains, abs_cal_flags
hd = io.HERADataFastReader(SUM_FILE)

Perform Delay Filter¶

In [21]:
def read_filter_cache_scratch(cache_files):
    """
    Load files from a cache specified by cache_dir.
    """
    # Load up the cache file with the most keys (precomputed filter matrices).
    cache = {}
    # loop through cache files, load them.
    # If there are new keys, add them to internal cache.
    # If not, delete the reference matrices from memory.
    for cache_file in cache_files:        
        try:
            cfile = open(cache_file, 'rb')
            try:
                cache_t = pickle.load(cfile)
            finally:
                cfile.close()
        except:
            continue
        for key in cache_t:
            cache[key] = cache_t[key]
                
    return cache
In [22]:
# Peform delay filter separately above and below FM
low_band = slice(0, np.argwhere(hd.freqs > FM_LOW_FREQ * 1e6)[0][0])
high_band = slice(np.argwhere(hd.freqs > FM_HIGH_FREQ * 1e6)[0][0], len(hd.freqs))
to_filter = np.outer(np.ones(len(hd.times), dtype=bool), (hd.freqs <= FM_LOW_FREQ * 1e6) | (hd.freqs > FM_HIGH_FREQ * 1e6))
In [23]:
# set up datacontainers for delay-filtered data products
dly_filt_red_avg_smooth_cal_sum_data = copy.deepcopy(red_avg_smooth_cal_sum_data)
dly_filt_red_avg_abs_cal_sum_data = copy.deepcopy(red_avg_abs_cal_sum_data)
if SAVE_DIFF_RED_AVG:
    dly_filt_red_avg_smooth_cal_diff_data = copy.deepcopy(red_avg_smooth_cal_diff_data)
    dly_filt_red_avg_abs_cal_diff_data = copy.deepcopy(red_avg_abs_cal_diff_data)

auto_bls = [bl for bl in dly_filt_red_avg_smooth_cal_sum_data if utils.split_bl(bl)[0] == utils.split_bl(bl)[1]]
gap_flag_warned = set()

new_cache_keys = []
if ALL_FLAGGED and ALL_CAL_FLAGS:
    print('All integrations flagged after the file_calibration notebook... skipping delay filtering.')
else:    
    # Load files in cache
    cache = read_filter_cache_scratch(cache_files)
    cache_keys = list(cache.keys())
    
    for bl in sorted(red_avg_flags.keys(), key=lambda bl: np.sum(red_avg_flags[bl]), reverse=True):
        # inverse variance weight using smooth_calibrated autocorrealtions (which are proprotional to std of noise)
        auto_bl1 = [abl for abl in auto_bls if abl[2] == utils.join_pol(utils.split_pol(bl[2])[0], utils.split_pol(bl[2])[0])][0]
        auto_bl2 = [abl for abl in auto_bls if abl[2] == utils.join_pol(utils.split_pol(bl[2])[1], utils.split_pol(bl[2])[1])][0]
        wgts = np.where(red_avg_flags[bl], 0, np.abs(red_avg_smooth_cal_sum_data[auto_bl1] * red_avg_smooth_cal_sum_data[auto_bl2])**-1)
        wgts /= np.nanmean(np.where(red_avg_flags[bl], np.nan, wgts))  # avoid dynamic range issues
        wgts[~np.isfinite(wgts)] = 0        

        # calculate filter properties
        bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
        bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
        filter_centers, filter_half_widths = vis_clean.gen_filter_properties(ax='freq', min_dly=DLY_FILT_MIN_DLY, horizon=DLY_FILT_HORIZON,
                                                                             standoff=DLY_FILT_STANDOFF, bl_len=bl_len)
        
        for dc in [dly_filt_red_avg_smooth_cal_sum_data, dly_filt_red_avg_abs_cal_sum_data] + \
                  ([dly_filt_red_avg_smooth_cal_diff_data, dly_filt_red_avg_abs_cal_diff_data] if SAVE_DIFF_RED_AVG else []):
            if np.all(dc[bl] == 0.0) and np.all(red_avg_flags[bl]):
                # don't bother delay filtering all 0s.
                continue

            # run delay filter for each band individually
            d_mdl = np.zeros_like(dc[bl])
            for band in [low_band, high_band]:
                d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dc[bl][:, band], wgts=wgts[:, band], 
                                                               filter_centers=filter_centers, filter_half_widths=filter_half_widths,
                                                               mode='dpss_solve', ridge_alpha=0, fit_intercept=False,
                                                               eigenval_cutoff=[DLY_FILT_EIGENVAL_CUTOFF], 
                                                               suppression_factors=[DLY_FILT_EIGENVAL_CUTOFF], 
                                                               max_contiguous_edge_flags=len(hd.freqs), cache=cache)

                # Track new DPSS-filters entering the cache
                fourier_filter_key = dspec._fourier_filter_hash(
                    filter_centers=filter_centers, filter_half_widths=filter_half_widths, filter_factors=[0.], x=hd.freqs[band], w=None,
                    crit_name='eigenval_cutoff', label='dpss_operator', crit_val=tuple([DLY_FILT_EIGENVAL_CUTOFF])
                )
                if fourier_filter_key not in cache_keys:
                    new_cache_keys.append(fourier_filter_key)                    
                    
            if (bl[0] == bl[1]):
                # inpaint autos at delay-filter delay
                dc[bl] = np.where(red_avg_flags[bl], d_mdl, dc[bl])
            else:
                # for all other baselines, filter foregrounds
                dc[bl] = np.where(red_avg_flags[bl], 0, dc[bl] - d_mdl)
All integrations flagged after the file_calibration notebook... skipping delay filtering.
In [24]:
def plot_delay_spectra():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return
    
    # loop over bands
    red_avg_flag_waterfall = np.all(list(red_avg_flags.values()), axis=0)
    unflagged_inds = np.squeeze(np.argwhere(~np.any(red_avg_flag_waterfall[~np.all(red_avg_flag_waterfall, axis=1), :], axis=0)))
    low_band = slice(unflagged_inds[0], np.max(unflagged_inds[hd.freqs[unflagged_inds] < FM_LOW_FREQ * 1e6]) + 1)
    high_band = slice(np.min(unflagged_inds[hd.freqs[unflagged_inds] > FM_HIGH_FREQ * 1e6]), unflagged_inds[-1] + 1)
    for band in [low_band, high_band]:

        display(HTML(f'<h2>Redundantly-Averaged, Smooth-Calibrated Delay Spectra: {hd.freqs[band][0] / 1e6:.2f} — {hd.freqs[band][-1] / 1e6:.2f} MHz</h2>'))
        fig, axes = plt.subplots(5, 2, figsize=(14,10), sharex=True, sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})

        mins, maxes = {}, {}
        for bl in red_avg_smooth_cal_sum_data:
            
            # figure out whether to plot this baseline
            bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
            if not ((np.abs(bl_vec[1]) < 1) and int(np.round(bl_vec[0] / 14.6)) in [0, 1, 2, 4, 8]):
                continue
            if bl[2] not in ['ee', 'nn']:
                continue            
            
            # compute filter size
            bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
            _, dly_filter_half_widths = vis_clean.gen_filter_properties(ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF, 
                                                                        min_dly=DLY_FILT_MIN_DLY, bl_len=bl_len)   
            
            # figure out where to put the plot
            row = {0: 0, 1: 1, 2: 2, 4: 3, 8: 4}[int(np.round(bl_vec[0] / 14.6))]
            col = int(bl[2] == 'nn')
            ax = axes[row, col]
            
            # FFT delay-filtered sums
            dfft_df, delays = vis_clean.fft_data(dly_filt_red_avg_smooth_cal_sum_data[bl][:, band], delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
            ax.semilogy(delays*1e9, np.mean(np.abs(dfft_df[~integration_flags, :]), axis=0), ls='--', label=f'Delay-Filtered Sum Data')

            # FFT diffs
            if SAVE_DIFF_RED_AVG:
                dfft_ddf, _ = vis_clean.fft_data(np.where(red_avg_flags[bl], 0, red_avg_smooth_cal_diff_data[bl])[:, band], 
                                                 delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
                ax.semilogy(delays*1e9, np.mean(np.abs(dfft_ddf[~integration_flags, :]), axis=0), ls=':', label=f'Diff Data')
    
                mins[bl] = np.median(np.abs(dfft_ddf[~integration_flags, :])) / 5
            else:
                mins[bl] = np.median(np.abs(dfft_df[~integration_flags, :])) / 5
            try:
                maxes[bl] = np.max(np.abs(dfft_ip[~integration_flags, :])) * 2
            except:
                maxes[bl] = np.max(np.abs(dfft_df[~integration_flags, :])) * 2

            ax.axvline(1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5, label=f'Delay Filter Delay')
            ax.axvline(-1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5)            

            if row == col == 0:
                ax.legend(loc='upper left')
            ax.set_xlim([-2250, 2250])

            vec = f'{bl[2]}-polarized\n{bl_vec[0]:.1f} m East\n{bl_vec[1]:.1f} m North'
            ax.text(.97, .9, vec, transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
            if col == 0:
                ax.set_ylabel('$|\\widetilde{V}|$')
            ax.set_xlabel('Delay (ns)')

        for ax in axes[1:, :].flatten():
            ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] != bl[1]]), 
                        np.max([maxes[bl] for bl in mins if bl[0] != bl[1]]))
        for ax in axes[0, :]:
            ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] == bl[1]]), 
                        np.max([maxes[bl] for bl in mins if bl[0] == bl[1]]))

        plt.tight_layout()
        plt.show()

Figure 3: Delay-Filtered Visibility Delay Spectra after smooth_cal and Redundant Averaging¶

This figure plots delay spectra for autocorrelations and 1, 2, 4, and 8 unit East-west baselines. The delay-spectra are computed by FFTing the visibilities with a Blackman-Harris taper and then taking the absolute value. The delay spectra of visibility sums are delay-filtered. The delay spectra of visibility diffs are generated by simply setting their flagged channels to 0. For autocorrelations, only inpainting is performed. These delay spectra are plotted for both polarizations and in both the high (above FM) and low (below FM) bands. Note that the frequency ranges over which the delay-filtering was performed might be slightly larger than bands shown here since flagged channels at band edges were excluded from the FFT.

In [25]:
plot_delay_spectra()
All integrations are flagged. Nothing to plot.

Save redundantly averaged visbilities¶

In [26]:
if SAVE_RESULTS:
    hd_out = io.HERAData(SUM_FILE)
    hd_out.read(bls=list(set([bl[0:2] for bl in red_avg_flags])), polarizations=list(red_avg_flags.pols()))
    hd_out.empty_arrays()
    hd_out.history += add_to_history
    hd_out.update(flags=red_avg_flags, nsamples=red_avg_nsamples)
    hd_out.pol_convention = pol_convention
    hd_out.vis_units = gain_scale
    if ALL_FLAGGED:
        # put back in all the flags that we had been ignoring up to this point
        hd_out.flag_array = np.ones_like(hd_out.flag_array)

    def _write(dc, outfile, *args):
        if not np.all(args): 
           return 
        hd_out.update(data=dc)
        print(f'Now writing redundantly-averaged calibrated visibilities to {outfile}')
        hd_out.write_uvh5(outfile, clobber=True, fix_autos=True)

    # write delay-filtered file(s)
    _write(red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_FILE)
    _write(red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_FILE, SAVE_ABS_CAL_RED_AVG)
    _write(dly_filt_red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG)
    _write(dly_filt_red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG)
    if SAVE_DIFF_RED_AVG:
        _write(red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_FILE, SAVE_DIFF_RED_AVG)
        _write(red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_FILE, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
        _write(dly_filt_red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_DIFF_RED_AVG)    
        _write(dly_filt_red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.red_avg.uvh5
Fixing auto-correlations to be be real-only, after some imaginary values were detected in data_array. Largest imaginary component was 0.22454113743093607, largest imaginary/real ratio was 9.044873820140361e-09.

Save incoherent average magnitude waterfalls¶

In [27]:
for mode in ['autos', 'crosses', 'all visibiltiies']:
    avg_abs_data, avg_flags, avg_nsamples = {}, {}, {}
    for pol in ['ee', 'nn']:
        bls_to_avg = []
        if mode == 'autos' or mode == 'all visibiltiies':
            key = (hd.data_ants[0], hd.data_ants[0], pol)
            bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] == bl[1] and bl[2] == pol]
        if mode == 'crosses' or mode == 'all visibiltiies':
            key = (hd.data_ants[0], hd.data_ants[1], pol)
            bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] != bl[1] and bl[2] == pol]
        
        if len(bls_to_avg) > 0:
            weights = np.array([red_avg_nsamples[bl] * (~red_avg_flags[bl]) for bl in bls_to_avg])
            avg_flags[key] = np.all(weights == 0, axis=0)
            weights = np.array([np.where(avg_flags[key], red_avg_nsamples[bl], 
                                         red_avg_nsamples[bl] * (~red_avg_flags[bl])) for bl in bls_to_avg])
            avg_nsamples[key] = np.sum([red_avg_nsamples[bl] for bl in bls_to_avg], axis=0)
            avg_abs_data[key] = np.average([np.abs(red_avg_smooth_cal_sum_data[bl]) for bl in bls_to_avg], weights=weights, axis=0)
    
    if SAVE_RESULTS:
        hd_out = io.HERAData(SUM_FILE)
        hd_out.read(bls=set([bl[0:2] for bl in avg_abs_data]), polarizations=['ee', 'nn'])
        hd_out.empty_arrays()
        hd_out.update(data=avg_abs_data, flags=avg_flags, nsamples=avg_nsamples)
        hd_out.pol_convention = pol_convention
        hd_out.vis_units = gain_scale
        if ALL_FLAGGED:
            # put back in all the flags that we had been ignoring up to this point
            hd_out.flag_array = np.ones_like(hd_out.flag_array)        
        hd_out.history += add_to_history
        outfile = {'autos': AVG_ABS_AUTO_FILE, 'crosses': AVG_ABS_CROSS_FILE, 'all visibiltiies': AVG_ABS_ALL_FILE}[mode]
        print(f'Writing averaged absolute of {mode} to {outfile}')
        hd_out.write_uvh5(outfile, clobber=True, fix_autos=True)
Writing averaged absolute of autos to /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.avg_abs_auto.uvh5
Writing averaged absolute of crosses to /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.avg_abs_cross.uvh5
Writing averaged absolute of all visibiltiies to /lustre/aoc/projects/hera/h6c-analysis/IDR3/2459856/zen.2459856.49362.sum.smooth_calibrated.avg_abs_all.uvh5

Save filter cache¶

In [28]:
if SAVE_RESULTS:
    # Create cache directory if it doesn't already exist
    if not os.path.exists(filter_cache_dir):
        os.mkdir(filter_cache_dir)
        
    # For each new cache key save a new cache file in the cache directory
    for key in new_cache_keys:
        cache_file_basename = key[0] + '.filter_cache.p'
        if not os.path.exists(os.path.join(filter_cache_dir, cache_file_basename)):
            try:
                cfile = open(os.path.join(filter_cache_dir, cache_file_basename), 'wb')
                try:
                    pickle.dump({key: cache[key]}, cfile)
                finally:
                    cfile.close()
            except (OSError, IOError):
                # File could not be written to, move on
                pass

Metadata¶

In [29]:
for repo in ['pyuvdata', 'hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates']:
    exec(f'from {repo} import __version__')
    print(f'{repo}: {__version__}')
pyuvdata: 3.2.3.dev10+g11d3f658
hera_cal: 3.7.6.dev8+gbd9622c0
hera_qm: 2.2.1.dev4+gf6d0211
hera_filters: 0.1.7.dev1+gcbb4f93
hera_notebook_templates: 0.1.dev1186+g5d4205b
In [30]:
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 1.38 minutes.