Single File Post-Processing¶

by Josh Dillon, last updated January 9, 2024

This notebook, the final step in a pre-LST-binning analysis, applies calibration solutions and flags to both sum and diff data, producing a variety of data products. These currently may include:

  • Abs-calibrated, redundantly-averaged, delay-filtered visibility sums

  • Abs-calibrated, redundantly-averaged, delay-filtered visibility diffs

  • Smooth-calibrated, redundantly-averaged, delay-filtered visibility sums

  • Smooth-calibrated, redundantly-averaged, delay-filtered visibility diffs

  • Abs-calibrated, redundantly-averaged, inpainted visibility sums

  • Abs-calibrated, redundantly-averaged, inpainted visibility diffs

  • Smooth-calibrated, redundantly-averaged, inpainted visibility sums

  • Smooth-calibrated, redundantly-averaged, inpainted visibility diffs

  • UVFlag file indicating where inpainting was done

  • Smooth-calibrated, coherently redundantly-averaged, incoherently array-averaged visibility magnitudes

    • This is done for all visibilities, just autos, and just cross-correlations and is designed to be lightweight data product for transient searches

The inpainted files are flagged where inpainting was done, however one can use the accompanying UVFlag file to figure out where data was replaced with inpainted data (generally it's outside the FM band for all baselines not completely flagged). The inpainted files may have additional flags due to excessively large channel gaps which cause inpainting to become unreliable, even with regularization.

Some of these data products are experimental and may be removed at a later date to save compute and disk. Some may be turned off using environment variables.

Here's a set of links to skip to particular figures and tables:

• Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal¶

• Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶

• Figure 3: Inpainted and Delay-Filtered Visibility Delay Spectra after smooth_cal and Redundant-Averaging¶

In [1]:
import time
tstart = time.time()
In [2]:
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin  # REQUIRED to have the compression plugins available
import numpy as np
import copy
import pickle
import glob
from pyuvdata import UVFlag
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from hera_qm.metrics_io import read_a_priori_ant_flags
from hera_qm.time_series_metrics import true_stretches
from scipy import constants, signal
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline

Load data and calibration solutions¶

In [3]:
# figure out whether to save results (and which ones)
SAVE_RESULTS = os.environ.get("SAVE_RESULTS", "TRUE").upper() == "TRUE"
SAVE_DIFF_RED_AVG = os.environ.get("SAVE_DIFF_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_ABS_CAL_RED_AVG = os.environ.get("SAVE_ABS_CAL_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_DLY_FILT_RED_AVG = os.environ.get("SAVE_DLY_FILT_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_INPAINT_RED_AVG = os.environ.get("SAVE_INPAINT_RED_AVG", "TRUE").upper() == "TRUE"
# TODO: in theory, if some of these are turned off, some memory and/or processing could be saved 
# by doing only a subset of what this notebook does. That's left for fuure work.

add_to_history = 'Produced by file_prostprocessing notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
for setting in ['SAVE_RESULTS', 'SAVE_DIFF_RED_AVG', 'SAVE_ABS_CAL_RED_AVG', 'SAVE_DLY_FILT_RED_AVG', 'SAVE_INPAINT_RED_AVG']:
    print(f'{setting} = {eval(setting)}')
SAVE_RESULTS = True
SAVE_DIFF_RED_AVG = False
SAVE_ABS_CAL_RED_AVG = True
SAVE_DLY_FILT_RED_AVG = True
SAVE_INPAINT_RED_AVG = True
In [4]:
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.46123.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')
DIFF_SUFFIX = os.environ.get("DIFF_SUFFIX", 'diff.uvh5')

# get input calibration files and flags
ABS_CAL_SUFFIX = os.environ.get("ABS_CAL_SUFFIX", 'sum.omni.calfits')
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
APOSTERIORI_YAML_SUFFIX = os.environ.get("APOSTERIORI_YAML_SUFFIX", '_aposteriori_flags.yaml')
aposteriori_yaml_file = os.path.join(os.path.dirname(SUM_FILE), SUM_FILE.split('.')[-4] + APOSTERIORI_YAML_SUFFIX)

# Get filter cache name
FILTER_CACHE = os.environ.get("FILTER_CACHE", "filter_cache")
filter_cache_dir = os.path.join(os.path.dirname(SUM_FILE), FILTER_CACHE)

# output abs-calibrated, redundantly-averaged, delay-filtered files
SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.abs_calibrated.red_avg.dly_filt.uvh5')
DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.abs_calibrated.red_avg.dly_filt.uvh5')

# output smooth-calibrated, redundantly-averaged, delay-filtered files
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.smooth_calibrated.red_avg.dly_filt.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.smooth_calibrated.red_avg.dly_filt.uvh5')

# output abs-calibrated, redundantly-averaged, inpainted files
SUM_ABS_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_INPAINT_SUFFIX", 'sum.abs_calibrated.red_avg.inpaint.uvh5')
DIFF_ABS_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_INPAINT_SUFFIX", 'diff.abs_calibrated.red_avg.inpaint.uvh5')

# output smooth-calibrated, redundantly-averaged, inpainted files
SUM_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX", 'sum.smooth_calibrated.red_avg.inpaint.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX", 'diff.smooth_calibrated.red_avg.inpaint.uvh5')

# output UVFlag files for where inpainting happened
WHERE_INPAINTED_SUFFIX = os.environ.get("WHERE_INPAINTED_SUFFIX", 'where_inpainted.h5')

# output smooth-calibrated, averaged absolute value of visibilities
AVG_ABS_ALL_SUFFIX = os.environ.get("AVG_ABS_SUFFIX", 'sum.smooth_calibrated.avg_abs_all.uvh5')
AVG_ABS_AUTO_SUFFIX = os.environ.get("AVG_ABS_AUTO_SUFFIX", 'sum.smooth_calibrated.avg_abs_auto.uvh5')
AVG_ABS_CROSS_SUFFIX = os.environ.get("AVG_ABS_CROSS_SUFFIX", 'sum.smooth_calibrated.avg_abs_cross.uvh5')

for suffix in ['DIFF_SUFFIX', 'ABS_CAL_SUFFIX', 'SMOOTH_CAL_SUFFIX', 
               'SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 
               'SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX', 
               'SUM_ABS_CAL_RED_AVG_INPAINT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_INPAINT_SUFFIX',
               'SUM_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX',
               'WHERE_INPAINTED_SUFFIX', 'AVG_ABS_ALL_SUFFIX', 'AVG_ABS_AUTO_SUFFIX', 'AVG_ABS_CROSS_SUFFIX']:
    file_var = suffix.replace('SUFFIX', 'FILE')
    exec(f'{file_var} = SUM_FILE.replace(SUM_SUFFIX, {suffix})')
    print(f"{file_var} = '{eval(file_var)}'")
print(f'aposteriori_yaml_file = {aposteriori_yaml_file}')
print(f'filter_cache = {filter_cache_dir}')
DIFF_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.diff.uvh5'
ABS_CAL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.omni.calfits'
SMOOTH_CAL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth.calfits'
SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.abs_calibrated.red_avg.dly_filt.uvh5'
DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.diff.abs_calibrated.red_avg.dly_filt.uvh5'
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.red_avg.dly_filt.uvh5'
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.diff.smooth_calibrated.red_avg.dly_filt.uvh5'
SUM_ABS_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.abs_calibrated.red_avg.inpaint.uvh5'
DIFF_ABS_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.diff.abs_calibrated.red_avg.inpaint.uvh5'
SUM_SMOOTH_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.red_avg.inpaint.uvh5'
DIFF_SMOOTH_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.diff.smooth_calibrated.red_avg.inpaint.uvh5'
WHERE_INPAINTED_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.where_inpainted.h5'
AVG_ABS_ALL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.avg_abs_all.uvh5'
AVG_ABS_AUTO_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.avg_abs_auto.uvh5'
AVG_ABS_CROSS_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.avg_abs_cross.uvh5'
aposteriori_yaml_file = /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/2459866_aposteriori_flags.yaml
filter_cache = /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/filter_cache
In [5]:
# parse delay filtering settings
DLY_FILT_HORIZON = float(os.environ.get("DLY_FILT_HORIZON", 1.0))
DLY_FILT_STANDOFF = float(os.environ.get("DLY_FILT_STANDOFF", 0.0)) # in ns
DLY_FILT_MIN_DLY = float(os.environ.get("DLY_FILT_MIN_DLY", 150.0)) # in ns
DLY_FILT_EIGENVAL_CUTOFF = float(os.environ.get("DLY_FILT_EIGENVAL_CUTOFF", 1e-12))
INPAINT_MIN_DLY = float(os.environ.get("INPAINT_MIN_DLY", 500.0)) # in ns
INPAINT_REGULARIZATION = float(os.environ.get("INPAINT_REGULARIZATION", 1e-5)) # reasonable values are between 1e-2 and 1e-5
INPAINT_MAX_GAP_FACTOR = float(os.environ.get("INPAINT_MAX_GAP_FACTOR", 2)) # maximum allowed gap before flaggint the whole integration is this * 1/dly
INPAINT_MAX_CONVOLVED_FLAG_FRAC = float(os.environ.get("INPAINT_MAX_CONVOLVED_FLAG_FRAC", 0.667)) # maximum allowed flag density for the purpose of identifying flag gaps
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
for setting in ['DLY_FILT_HORIZON', 'DLY_FILT_STANDOFF', 'DLY_FILT_MIN_DLY', 'DLY_FILT_EIGENVAL_CUTOFF', 'INPAINT_MIN_DLY', 
                'INPAINT_REGULARIZATION', 'INPAINT_MAX_GAP_FACTOR', 'INPAINT_MAX_CONVOLVED_FLAG_FRAC', 'FM_LOW_FREQ', 'FM_HIGH_FREQ']:
        print(f'{setting} = {eval(setting)}')
DLY_FILT_HORIZON = 1.0
DLY_FILT_STANDOFF = 0.0
DLY_FILT_MIN_DLY = 150.0
DLY_FILT_EIGENVAL_CUTOFF = 1e-12
INPAINT_MIN_DLY = 500.0
INPAINT_REGULARIZATION = 1e-05
INPAINT_MAX_GAP_FACTOR = 2.0
INPAINT_MAX_CONVOLVED_FLAG_FRAC = 0.667
FM_LOW_FREQ = 87.5
FM_HIGH_FREQ = 108.0
In [6]:
# Find all cached matrices within the filter cache
cache_files = glob.glob(os.path.join(filter_cache_dir, "*.filter_cache.p"))
In [7]:
# Load sum and diff data
hd = io.HERADataFastReader(SUM_FILE)
data, flags, nsamples = hd.read(pols=['ee', 'nn'])

hd_diff = io.HERADataFastReader(DIFF_FILE)
diff_data, diff_flags, diff_nsamples = hd_diff.read(pols=['ee', 'nn'])
In [8]:
# pick all reds that might be in the data, using the same set across polarizations for easier output
reds = redcal.get_reds(hd.data_antpos, pols=['ee', 'nn'], include_autos=True)
ex_ants = set(read_a_priori_ant_flags(aposteriori_yaml_file))
possibly_unflagged_bls = [bl for bl in data if utils.split_bl(bl)[0] not in ex_ants and utils.split_bl(bl)[1] not in ex_ants]
possibly_unflagged_antpairs = set([ap for bl in possibly_unflagged_bls for ap in [bl[:2], bl[-2::-1]]])
reds = [red for red in reds if np.any([bl[0:2] in possibly_unflagged_antpairs for bl in red])]
In [9]:
# Load calibration solutions and gain flags
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()

hc_abs = io.HERACal(ABS_CAL_FILE)
abs_gains, abs_cal_flags, _, _ = hc_abs.read()
In [10]:
# handle the the case where the smooth_cal flags are all True, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
    print('This file is entirely flagged. Proceeding with averaging and filtering using earlier flags, '
          'but the output data products will still be fully flagged.')
    ALL_FLAGGED = True
    
    # Likely the file was fully flagged for broadband RFI, so instead use the original flags from file_calibration
    cal_flags = abs_cal_flags
    # And if that didn't work, just make the flags all False for now (though ex_ants will still be exluded via reds)
    if np.all([flag for flag in cal_flags.values()]):
        cal_flags = {ant: np.zeros_like(cal_flags[ant]) for ant in cal_flags}
In [11]:
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):    
    # Redundantly average data
    wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
                                                             | cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
    sol = redcal.RedSol(reds, gains=gains)
    sol.update_vis_from_data(data, wgts=wgts)
    
    # Figure out redundantly averaged flags and nsamples
    red_avg_flags = {}
    red_avg_nsamples = {}
    for red in reds:
        if red[0] in sol.vis:
            red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
            red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
        else:
            # empty placeholders to make sure every file has the same shape for the whole day
            sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
            red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
            red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
    sol.make_sol_finite()
    
    # Build output RedDataContainers 
    red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
    red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
    red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
    return red_avg_data, red_avg_flags, red_avg_nsamples
In [12]:
# perform redundant averaging
red_avg_smooth_cal_sum_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
red_avg_smooth_cal_diff_data, _, _ = red_average(reds, diff_data, diff_nsamples, smooth_gains, flags=diff_flags, cal_flags=cal_flags)
red_avg_abs_cal_sum_data, _, _ = red_average(reds, data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
red_avg_abs_cal_diff_data, _, _ = red_average(reds, diff_data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
In [13]:
if not ALL_FLAGGED:
    integration_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags], axis=(0, 2))
In [14]:
def plot_red_avg_vis():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 6), dpi=100, sharex='col', sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
    for i, pol in enumerate(['ee', 'nn']):

        reds_here = redcal.filter_reds(reds, pols=[pol], antpos=hd.antpos, min_bl_cut=1)
        sol = redcal.RedSol(reds_here, gains=smooth_gains)
        red = sorted(reds_here, key=lambda red: np.median(red_avg_nsamples.get(red[0], 0)), reverse=True)[0]
        
        if np.any([bl[2] == pol for bl in red_avg_flags]):
            for tind in range(red_avg_flags[red[0]].shape[0]):
                if not np.all(red_avg_flags[red[0]][tind]):
                    break

            calibrated = {bl: np.where(flags[bl] | cal_flags[utils.split_bl(bl)[0]] | cal_flags[utils.split_bl(bl)[1]], 
                                       np.nan, sol.calibrate_bl(bl, data[bl])) for bl in red if bl in data}
            for bl in red:
                axes[0, i].plot(hd.freqs/1e6, np.angle(calibrated[bl][tind]), alpha=.5, lw=.5)
                axes[1, i].semilogy(hd.freqs/1e6, np.abs(calibrated[bl][tind]), alpha=.5, lw=.5)

            to_plot = np.where(red_avg_flags[red[0]][tind], np.nan, red_avg_smooth_cal_sum_data[red[0]][tind])
            axes[0, i].plot(hd.freqs / 1e6, np.angle(to_plot), lw=1, c='k')
            axes[1, i].semilogy(hd.freqs / 1e6, np.abs(to_plot), lw=1, c='k', label=f'Baseline Group:\n{red[0]}')
            axes[1, i].set_xlabel('Frequency (MHz)')
            axes[1, i].legend(loc='upper right')
    axes[0, 0].set_ylabel('Visibility Phase (radians)')
    axes[1, 0].set_ylabel('Visibility Amplitude (Jy)')
    plt.tight_layout()
In [15]:
def plot_red_avg_nsamples():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return
    
    fig, axes = plt.subplots(2, 1, figsize=(14,7), dpi=100, sharex=True, gridspec_kw={'hspace': 0})
    max_nsamples = np.max([np.median(ns) for ns in red_avg_nsamples.values()])
    for ax, pol in zip(axes, ['ee', 'nn']):
        if np.any([bl[2] == pol for bl in red_avg_nsamples]):
            blvecs = np.array([hd.antpos[red[0][1]] - hd.antpos[red[0][0]] for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
            med_nsamples = np.array([np.median(red_avg_nsamples[red[0]][~integration_flags, :]) 
                                     for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
            sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=0)
            sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=(100 * (600 / np.diff(ax.get_xlim())[0])**2), ec='k', linewidths=.5,
                             c=med_nsamples, norm=matplotlib.colors.LogNorm(vmin=1, vmax=max_nsamples), cmap='turbo')
            ax.axis('equal')
            ax.set_xlabel('EW Baseline Vector (m)')
            ax.set_ylabel('NS Baseline Vector (m)')
        ax.text(.98, .94, f'{pol}-polarized', transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))           

    plt.tight_layout()
    fig.colorbar(sca, ax=axes, pad=.02, label='Number of Samples')

Figure 1: Redundant Averaging of a Single Baseline Group After smooth_cal¶

The results of calibration and redundant averaging the baseline group with the highest redundancy in each polarization. Visibilities with flagged antennas are excluded. The black line is the redundantly-averaged visibility. Each thin colored line is a different baseline group. Phases are shown in the top row, amplitudes in the bottom, ee-polarized visibilities in the left column, and nn-polarized visibilities in the right.

In [16]:
plot_red_avg_vis()

Figure 2: Number of Redundantly-Averaged Samples as a Function of Baseline¶

The number of visibilities averaged together in each baseline group. Note that the split of the HERA core produces more highly-sampled intra-sector baselines that are interspersed with the less-highly-sampled inter-sector baselines off the main grid.

In [17]:
plot_red_avg_nsamples()
In [18]:
# save memory
ALL_CAL_FLAGS = np.all([np.all(~cal_flags[ant]) for ant in cal_flags])
del hd, data, flags, nsamples, hd_diff, diff_data, diff_flags, diff_nsamples
del hc_smooth, smooth_gains, cal_flags, hc_abs, abs_gains, abs_cal_flags
hd = io.HERADataFastReader(SUM_FILE)

Perform Delay Filter¶

In [19]:
def read_filter_cache_scratch(cache_files):
    """
    Load files from a cache specified by cache_dir.
    """
    # Load up the cache file with the most keys (precomputed filter matrices).
    cache = {}
    # loop through cache files, load them.
    # If there are new keys, add them to internal cache.
    # If not, delete the reference matrices from memory.
    for cache_file in cache_files:        
        try:
            cfile = open(cache_file, 'rb')
            try:
                cache_t = pickle.load(cfile)
            finally:
                cfile.close()
        except:
            continue
        for key in cache_t:
            cache[key] = cache_t[key]
                
    return cache
In [20]:
# Peform delay filter separately above and below FM
low_band = slice(0, np.argwhere(hd.freqs > FM_LOW_FREQ * 1e6)[0][0])
high_band = slice(np.argwhere(hd.freqs > FM_HIGH_FREQ * 1e6)[0][0], len(hd.freqs))
to_filter = np.outer(np.ones(len(hd.times), dtype=bool), (hd.freqs <= FM_LOW_FREQ * 1e6) | (hd.freqs > FM_HIGH_FREQ * 1e6))
In [21]:
# set up datacontainers for delay-filtered data products
dly_filt_red_avg_smooth_cal_sum_data = copy.deepcopy(red_avg_smooth_cal_sum_data)
dly_filt_red_avg_smooth_cal_diff_data = copy.deepcopy(red_avg_smooth_cal_diff_data)
dly_filt_red_avg_abs_cal_sum_data = copy.deepcopy(red_avg_abs_cal_sum_data)
dly_filt_red_avg_abs_cal_diff_data = copy.deepcopy(red_avg_abs_cal_diff_data)

# set up datacontainers for inpainted data products
inpainted_red_avg_smooth_cal_sum_data = copy.deepcopy(red_avg_smooth_cal_sum_data)
inpainted_red_avg_smooth_cal_diff_data = copy.deepcopy(red_avg_smooth_cal_diff_data)
inpainted_red_avg_abs_cal_sum_data = copy.deepcopy(red_avg_abs_cal_sum_data)
inpainted_red_avg_abs_cal_diff_data = copy.deepcopy(red_avg_abs_cal_diff_data)
where_inpainted = copy.deepcopy(red_avg_flags)
red_avg_inpainted_flags = copy.deepcopy(red_avg_flags)

auto_bls = [bl for bl in dly_filt_red_avg_smooth_cal_sum_data if bl[0] == bl[1]]
gap_flag_warned = set()

new_cache_keys = []
if ALL_FLAGGED and ALL_CAL_FLAGS:
    print('All integrations flagged after the file_calibration notebook... skipping delay filtering.')
    for bl in where_inpainted:
        where_inpainted[bl][:, :] = False    
else:    
    # Load files in cache
    cache = read_filter_cache_scratch(cache_files)
    cache_keys = list(cache.keys())
    
    for bl in sorted(red_avg_flags.keys(), key=lambda bl: np.sum(red_avg_flags[bl]), reverse=True):
        # inverse variance weight using smooth_calibrated autocorrealtions (which are proprotional to std of noise)
        auto_bl = [abl for abl in auto_bls if abl[2] == bl[2]][0]
        wgts = np.where(red_avg_flags[bl], 0, red_avg_smooth_cal_sum_data[auto_bl]**-2)
        wgts /= np.nanmean(np.where(red_avg_flags[bl], np.nan, wgts))  # avoid dynamic range issues
        wgts[~np.isfinite(wgts)] = 0        

        for mode in ['dly_filt', 'inpaint']:
            # switch for inpainting or delay filtering
            if mode == 'dly_filt':
                min_dly = DLY_FILT_MIN_DLY
                ridge_alpha = 0
                dcs = [dly_filt_red_avg_smooth_cal_sum_data, dly_filt_red_avg_smooth_cal_diff_data, 
                       dly_filt_red_avg_abs_cal_sum_data, dly_filt_red_avg_abs_cal_diff_data]
            else:
                min_dly = INPAINT_MIN_DLY
                ridge_alpha = INPAINT_REGULARIZATION                
                dcs = [inpainted_red_avg_smooth_cal_sum_data, inpainted_red_avg_smooth_cal_diff_data, 
                       inpainted_red_avg_abs_cal_sum_data, inpainted_red_avg_abs_cal_diff_data]
            
            # calculate filter properties
            bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
            bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
            filter_centers, filter_half_widths = vis_clean.gen_filter_properties(ax='freq', min_dly=min_dly, horizon=DLY_FILT_HORIZON,
                                                                                 standoff=DLY_FILT_STANDOFF, bl_len=bl_len,)
            
            for dc in dcs:
                if np.all(dc[bl] == 0.0) and np.all(red_avg_flags[bl]):
                    where_inpainted[bl] = np.zeros_like(red_avg_flags[bl])
                    # don't bother delay filtering all 0s.
                    continue
 
                # run delay filter for each band individually
                d_mdl = np.zeros_like(dc[bl])
                inpainting_succeeded = np.zeros_like(red_avg_flags[bl])
                for band in [low_band, high_band]:
                    d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dc[bl][:, band], wgts=wgts[:, band], filter_centers=filter_centers, 
                                                                   filter_half_widths=filter_half_widths, mode='dpss_solve', 
                                                                   ridge_alpha=ridge_alpha, fit_intercept=(mode == 'inpaint'),
                                                                   eigenval_cutoff=[DLY_FILT_EIGENVAL_CUTOFF], suppression_factors=[DLY_FILT_EIGENVAL_CUTOFF], 
                                                                   max_contiguous_edge_flags=len(hd.freqs), cache=cache)

                    # track the flagging on integrations with too-large gaps
                    if mode == 'inpaint':        
                        # also conolve flags with a triangle filter to mark as un-inpaintable any stretches effectively "average" to too large a gap        
                        max_allowed_gap_size = INPAINT_MAX_GAP_FACTOR * (info['filter_params']['axis_1']['filter_half_widths'][0])**-1 / np.median(np.diff(hd.freqs))
                        convolution_kernel = np.append(np.linspace(0, 1, int(max_allowed_gap_size) - 1, endpoint=False), np.linspace(1, 0, int(max_allowed_gap_size)))
                        convolution_kernel /= np.sum(convolution_kernel)
                        convolved_flags = signal.convolve2d(red_avg_flags[bl][:, band], convolution_kernel[np.newaxis, :], mode='same') > INPAINT_MAX_CONVOLVED_FLAG_FRAC
                        # loop over times
                        for i in range(len(hd.times)):
                            flagged_stretches = true_stretches((red_avg_flags[bl][i, band] | convolved_flags[i]))
                            longest_gap = np.max([ts.stop - ts.start for ts in flagged_stretches])
                            if longest_gap > max_allowed_gap_size:
                                red_avg_inpainted_flags[bl][i, band] = True
                                if (i, band.start, band.stop) not in gap_flag_warned:
                                    print(f'Flagging integration {i} (JD = {hd.times[i]}) for between channels {band.start} and {band.stop} because '
                                          f'it has an effective {longest_gap}-channel flag gap (which exceeds {int(max_allowed_gap_size)}, the maximum allowed).')
                                    gap_flag_warned.add((i, band.start, band.stop))
                            else:
                                inpainting_succeeded[i, band] = True

                    # Track new DPSS-filters entering the cache
                    fourier_filter_key = dspec._fourier_filter_hash(
                        filter_centers=filter_centers, filter_half_widths=filter_half_widths, filter_factors=[0.], x=hd.freqs[band], w=None,
                        crit_name='eigenval_cutoff', label='dpss_operator', crit_val=tuple([DLY_FILT_EIGENVAL_CUTOFF])
                    )
                    if fourier_filter_key not in cache_keys:
                        new_cache_keys.append(fourier_filter_key)                    
                        
                if (mode == 'inpaint'):
                    where_inpainted[bl] = (red_avg_inpainted_flags[bl] & to_filter & inpainting_succeeded & (not ALL_FLAGGED))
                    dc[bl] = np.where(red_avg_inpainted_flags[bl] & inpainting_succeeded, d_mdl, dc[bl])
                elif (bl[0] == bl[1]):
                    # inpaint autos at delay-filter delay
                    dc[bl] = np.where(red_avg_flags[bl], d_mdl, dc[bl])
                else:
                    # for all other baselines, filter foregrounds
                    dc[bl] = np.where(red_avg_flags[bl], 0, dc[bl] - d_mdl)
Mean of empty slice
invalid value encountered in divide
In [22]:
def plot_delay_spectra():
    if ALL_FLAGGED:
        print('All integrations are flagged. Nothing to plot.')
        return
    
    # loop over bands
    red_avg_flag_waterfall = np.all(list(red_avg_flags.values()), axis=0)
    unflagged_inds = np.squeeze(np.argwhere(~np.any(red_avg_flag_waterfall[~np.all(red_avg_flag_waterfall, axis=1), :], axis=0)))
    low_band = slice(unflagged_inds[0], np.max(unflagged_inds[hd.freqs[unflagged_inds] < FM_LOW_FREQ * 1e6]) + 1)
    high_band = slice(np.min(unflagged_inds[hd.freqs[unflagged_inds] > FM_HIGH_FREQ * 1e6]), unflagged_inds[-1] + 1)
    for band in [low_band, high_band]:

        display(HTML(f'<h2>Redundantly-Averaged, Smooth-Calibrated Delay Spectra: {hd.freqs[band][0] / 1e6:.2f} — {hd.freqs[band][-1] / 1e6:.2f} MHz</h2>'))
        fig, axes = plt.subplots(5, 2, figsize=(14,10), sharex=True, sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})

        mins, maxes = {}, {}
        for bl in red_avg_smooth_cal_sum_data:
            
            # figure out whether to plot this baseline
            bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
            if not ((np.abs(bl_vec[1]) < 1) and int(np.round(bl_vec[0] / 14.6)) in [0, 1, 2, 4, 8]):
                continue
            
            # compute filter size
            bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
            _, dly_filter_half_widths = vis_clean.gen_filter_properties(ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF, 
                                                                        min_dly=DLY_FILT_MIN_DLY, bl_len=bl_len)   
            _, inpaint_filter_half_widths = vis_clean.gen_filter_properties(ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF, 
                                                                            min_dly=INPAINT_MIN_DLY, bl_len=bl_len)   
            
            # figure out where to put the plot
            row = {0: 0, 1: 1, 2: 2, 4: 3, 8: 4}[int(np.round(bl_vec[0] / 14.6))]
            col = int(bl[2] == 'nn')
            ax = axes[row, col]

            # FFT inpainted sums
            ip_integration_flags = integration_flags | np.all(red_avg_inpainted_flags[bl][:, band], axis=1)
            if not np.all(ip_integration_flags):
                dfft_ip, delays = vis_clean.fft_data(inpainted_red_avg_smooth_cal_sum_data[bl][:, band], delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
                ax.semilogy(delays*1e9, np.mean(np.abs(dfft_ip[~ip_integration_flags, :]), axis=0), label='Inpainted Sum Data')
            else:
                ax.plot([], []) # advances the color cycle
            
            # FFT delay-filtered sums
            dfft_df, delays = vis_clean.fft_data(dly_filt_red_avg_smooth_cal_sum_data[bl][:, band], delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
            ax.semilogy(delays*1e9, np.mean(np.abs(dfft_df[~integration_flags, :]), axis=0), ls='--', label=f'Delay-Filtered Sum Data')

            # FFT diffs
            dfft_ddf, _ = vis_clean.fft_data(np.where(red_avg_flags[bl], 0, red_avg_smooth_cal_diff_data[bl])[:, band], 
                                             delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
            ax.semilogy(delays*1e9, np.mean(np.abs(dfft_ddf[~integration_flags, :]), axis=0), ls=':', label=f'Diff Data')

            mins[bl] = np.median(np.abs(dfft_ddf[~integration_flags, :])) / 5
            try:
                maxes[bl] = np.max(np.abs(dfft_ip[~integration_flags, :])) * 2
            except:
                maxes[bl] = np.max(np.abs(dfft_df[~integration_flags, :])) * 2

            ax.axvline(1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5, label=f'Delay Filter Delay')
            ax.axvline(-1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5)
            ax.axvline(1e9 * inpaint_filter_half_widths[0], c='k', ls=':', lw=1, alpha=.5, label=f'Inpainting Delay')
            ax.axvline(-1e9 * inpaint_filter_half_widths[0], c='k', ls=':', lw=1, alpha=.5)
            

            if row == col == 0:
                ax.legend(loc='upper left')
            ax.set_xlim([-2250, 2250])

            vec = f'{bl[2]}-polarized\n{bl_vec[0]:.1f} m East\n{bl_vec[1]:.1f} m North'
            ax.text(.97, .9, vec, transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
            if col == 0:
                ax.set_ylabel('$|\\widetilde{V}|$')
            ax.set_xlabel('Delay (ns)')

        for ax in axes[1:, :].flatten():
            ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] != bl[1]]), 
                        np.max([maxes[bl] for bl in mins if bl[0] != bl[1]]))
        for ax in axes[0, :]:
            ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] == bl[1]]), 
                        np.max([maxes[bl] for bl in mins if bl[0] == bl[1]]))

        plt.tight_layout()
        plt.show()

Figure 3: Inpainted and Delay-Filtered Visibility Delay Spectra after smooth_cal and Redundant Averaging¶

This figure plots delay spectra for autocorrelations and 1, 2, 4, and 8 unit East-west baselines. The delay-spectra are computed by FFTing the visibilities with a Blackman-Harris taper and then taking the absolute value. The delay spectra of visibility sums are plotted both inpainted and delay-filtered. The delay spectra of visibility diffs are generated by simply setting their flagged channels to 0. For autocorrelations, only in-painting is performed, so the "delay-filtered" and inpainted delay spectra are the same. These delay spectra are plotted for both polarizations and in both the high (above FM) and low (below FM) bands. Note that the frequency ranges over which the delay-filtering was performed might be slightly larger than bands shown here since flagged channels at band edges were excluded from the FFT.

In [23]:
plot_delay_spectra()

Redundantly-Averaged, Smooth-Calibrated Delay Spectra: 46.92 — 87.33 MHz

Redundantly-Averaged, Smooth-Calibrated Delay Spectra: 108.08 — 234.30 MHz

Save redundantly averaged visbilities¶

In [24]:
if SAVE_RESULTS:
    hd_out = io.HERAData(SUM_FILE)
    hd_out.read(bls=red_avg_flags.keys())
    hd_out.empty_arrays()
    hd_out.history += add_to_history
    hd_out.update(flags=red_avg_flags, nsamples=red_avg_nsamples)
    if ALL_FLAGGED:
        # put back in all the flags that we had been ignoring up to this point
        hd_out.flag_array = np.ones_like(hd_out.flag_array)

    def _write(dc, outfile, *args):
        if not np.all(args): 
           return 
        hd_out.update(data=dc)
        print(f'Now writing redundantly-averaged calibrated visibilities to {outfile}')
        hd_out.write_uvh5(outfile, clobber=True)

    # write delay-filtered file(s)
    _write(dly_filt_red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG)
    _write(dly_filt_red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_DIFF_RED_AVG)    
    _write(dly_filt_red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG)
    _write(dly_filt_red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)

    # write inpainted file(s)
    if not ALL_FLAGGED:
        hd_out.update(flags=red_avg_inpainted_flags)
    _write(inpainted_red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG)
    _write(inpainted_red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG, SAVE_DIFF_RED_AVG)
    _write(inpainted_red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG, SAVE_ABS_CAL_RED_AVG)
    _write(inpainted_red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
    if SAVE_INPAINT_RED_AVG:
        hd_out.update(flags=where_inpainted)
        uvf = UVFlag(hd_out, mode='flag', copy_flags=True, use_future_array_shapes=True)
        uvf.history += add_to_history
        uvf.write(WHERE_INPAINTED_FILE, clobber=True)        
Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.red_avg.dly_filt.uvh5
File exists; clobbering
Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.abs_calibrated.red_avg.dly_filt.uvh5
File exists; clobbering
Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.red_avg.inpaint.uvh5
File exists; clobbering
Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.abs_calibrated.red_avg.inpaint.uvh5
File exists; clobbering
File /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.where_inpainted.h5 exists; clobbering

Save incoherent average magnitude waterfalls¶

In [25]:
for mode in ['autos', 'crosses', 'all visibiltiies']:
    avg_abs_data, avg_flags, avg_nsamples = {}, {}, {}
    for pol in ['ee', 'nn']:
        bls_to_avg = []
        if mode == 'autos' or mode == 'all visibiltiies':
            key = (hd.data_ants[0], hd.data_ants[0], pol)
            bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] == bl[1] and bl[2] == pol]
        if mode == 'crosses' or mode == 'all visibiltiies':
            key = (hd.data_ants[0], hd.data_ants[1], pol)
            bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] != bl[1] and bl[2] == pol]
        
        if len(bls_to_avg) > 0:
            weights = np.array([red_avg_nsamples[bl] * (~red_avg_flags[bl]) for bl in bls_to_avg])
            avg_flags[key] = np.all(weights == 0, axis=0)
            weights = np.array([np.where(avg_flags[key], red_avg_nsamples[bl], 
                                         red_avg_nsamples[bl] * (~red_avg_flags[bl])) for bl in bls_to_avg])
            avg_nsamples[key] = np.sum([red_avg_nsamples[bl] for bl in bls_to_avg], axis=0)
            avg_abs_data[key] = np.average([np.abs(red_avg_smooth_cal_sum_data[bl]) for bl in bls_to_avg], weights=weights, axis=0)
    
    if SAVE_RESULTS:
        hd_out = io.HERAData(SUM_FILE)
        hd_out.read(bls=set([bl[0:2] for bl in avg_abs_data]), polarizations=['ee', 'nn'])
        hd_out.empty_arrays()
        hd_out.update(data=avg_abs_data, flags=avg_flags, nsamples=avg_nsamples)
        if ALL_FLAGGED:
            # put back in all the flags that we had been ignoring up to this point
            hd_out.flag_array = np.ones_like(hd_out.flag_array)        
        hd_out.history += add_to_history
        outfile = {'autos': AVG_ABS_AUTO_FILE, 'crosses': AVG_ABS_CROSS_FILE, 'all visibiltiies': AVG_ABS_ALL_FILE}[mode]
        print(f'Writing averaged absolute of {mode} to {outfile}')
        hd_out.write_uvh5(outfile, clobber=True)
Writing averaged absolute of autos to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.avg_abs_auto.uvh5
File exists; clobbering
Writing averaged absolute of crosses to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.avg_abs_cross.uvh5
File exists; clobbering
Writing averaged absolute of all visibiltiies to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459866/zen.2459866.46118.sum.smooth_calibrated.avg_abs_all.uvh5
File exists; clobbering

Save filter cache¶

In [26]:
if SAVE_RESULTS:
    # Create cache directory if it doesn't already exist
    if not os.path.exists(filter_cache_dir):
        os.mkdir(filter_cache_dir)
        
    # For each new cache key save a new cache file in the cache directory
    for key in new_cache_keys:
        cache_file_basename = key[0] + '.filter_cache.p'
        if not os.path.exists(os.path.join(filter_cache_dir, cache_file_basename)):
            try:
                cfile = open(os.path.join(filter_cache_dir, cache_file_basename), 'wb')
                try:
                    pickle.dump({key: cache[key]}, cfile)
                finally:
                    cfile.close()
            except (OSError, IOError):
                # File could not be written to, move on
                pass

Metadata¶

In [27]:
for repo in ['pyuvdata', 'hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates']:
    exec(f'from {repo} import __version__')
    print(f'{repo}: {__version__}')
pyuvdata: 2.4.1.dev9+gc10c105e
hera_cal: 3.3.1.dev6+gf5f463ca
hera_qm: 2.1.2
hera_filters: 0.1.4
hera_notebook_templates: 0.1.dev531+gfe314a8
In [28]:
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 5.74 minutes.
In [ ]: