by Josh Dillon, last updated January 9, 2024
This notebook, the final step in a pre-LST-binning analysis, applies calibration solutions and flags to both sum and diff data, producing a variety of data products. These currently may include:
Abs-calibrated, redundantly-averaged, delay-filtered visibility sums
Abs-calibrated, redundantly-averaged, delay-filtered visibility diffs
Smooth-calibrated, redundantly-averaged, delay-filtered visibility sums
Smooth-calibrated, redundantly-averaged, delay-filtered visibility diffs
Abs-calibrated, redundantly-averaged, inpainted visibility sums
Abs-calibrated, redundantly-averaged, inpainted visibility diffs
Smooth-calibrated, redundantly-averaged, inpainted visibility sums
Smooth-calibrated, redundantly-averaged, inpainted visibility diffs
UVFlag file indicating where inpainting was done
Smooth-calibrated, coherently redundantly-averaged, incoherently array-averaged visibility magnitudes
The inpainted files are flagged where inpainting was done, however one can use the accompanying UVFlag file to figure out where data was replaced with inpainted data (generally it's outside the FM band for all baselines not completely flagged). The inpainted files may have additional flags due to excessively large channel gaps which cause inpainting to become unreliable, even with regularization.
Some of these data products are experimental and may be removed at a later date to save compute and disk. Some may be turned off using environment variables.
Here's a set of links to skip to particular figures and tables:
smooth_cal
¶smooth_cal
and Redundant-Averaging¶import time
tstart = time.time()
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin # REQUIRED to have the compression plugins available
import numpy as np
import copy
import pickle
import glob
from pyuvdata import UVFlag
from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean
from hera_filters import dspec
from hera_qm.metrics_io import read_a_priori_ant_flags
from hera_qm.time_series_metrics import true_stretches
from scipy import constants, signal
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, HTML
%matplotlib inline
# figure out whether to save results (and which ones)
SAVE_RESULTS = os.environ.get("SAVE_RESULTS", "TRUE").upper() == "TRUE"
SAVE_DIFF_RED_AVG = os.environ.get("SAVE_DIFF_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_ABS_CAL_RED_AVG = os.environ.get("SAVE_ABS_CAL_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_DLY_FILT_RED_AVG = os.environ.get("SAVE_DLY_FILT_RED_AVG", "TRUE").upper() == "TRUE"
SAVE_INPAINT_RED_AVG = os.environ.get("SAVE_INPAINT_RED_AVG", "TRUE").upper() == "TRUE"
# TODO: in theory, if some of these are turned off, some memory and/or processing could be saved
# by doing only a subset of what this notebook does. That's left for fuure work.
add_to_history = 'Produced by file_prostprocessing notebook with the following environment:\n' + '=' * 65 + '\n' + os.popen('conda env export').read() + '=' * 65
for setting in ['SAVE_RESULTS', 'SAVE_DIFF_RED_AVG', 'SAVE_ABS_CAL_RED_AVG', 'SAVE_DLY_FILT_RED_AVG', 'SAVE_INPAINT_RED_AVG']:
print(f'{setting} = {eval(setting)}')
SAVE_RESULTS = True SAVE_DIFF_RED_AVG = False SAVE_ABS_CAL_RED_AVG = True SAVE_DLY_FILT_RED_AVG = True SAVE_INPAINT_RED_AVG = True
# get input data file names
SUM_FILE = os.environ.get("SUM_FILE", None)
# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.46123.sum.uvh5'
SUM_SUFFIX = os.environ.get("SUM_SUFFIX", 'sum.uvh5')
DIFF_SUFFIX = os.environ.get("DIFF_SUFFIX", 'diff.uvh5')
# get input calibration files and flags
ABS_CAL_SUFFIX = os.environ.get("ABS_CAL_SUFFIX", 'sum.omni.calfits')
SMOOTH_CAL_SUFFIX = os.environ.get("CAL_SUFFIX", 'sum.smooth.calfits')
APOSTERIORI_YAML_SUFFIX = os.environ.get("APOSTERIORI_YAML_SUFFIX", '_aposteriori_flags.yaml')
aposteriori_yaml_file = os.path.join(os.path.dirname(SUM_FILE), SUM_FILE.split('.')[-4] + APOSTERIORI_YAML_SUFFIX)
# Get filter cache name
FILTER_CACHE = os.environ.get("FILTER_CACHE", "filter_cache")
filter_cache_dir = os.path.join(os.path.dirname(SUM_FILE), FILTER_CACHE)
# output abs-calibrated, redundantly-averaged, delay-filtered files
SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.abs_calibrated.red_avg.dly_filt.uvh5')
DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.abs_calibrated.red_avg.dly_filt.uvh5')
# output smooth-calibrated, redundantly-averaged, delay-filtered files
SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'sum.smooth_calibrated.red_avg.dly_filt.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX", 'diff.smooth_calibrated.red_avg.dly_filt.uvh5')
# output abs-calibrated, redundantly-averaged, inpainted files
SUM_ABS_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("SUM_ABS_CAL_RED_AVG_INPAINT_SUFFIX", 'sum.abs_calibrated.red_avg.inpaint.uvh5')
DIFF_ABS_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("DIFF_ABS_CAL_RED_AVG_INPAINT_SUFFIX", 'diff.abs_calibrated.red_avg.inpaint.uvh5')
# output smooth-calibrated, redundantly-averaged, inpainted files
SUM_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("SUM_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX", 'sum.smooth_calibrated.red_avg.inpaint.uvh5')
DIFF_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX = os.environ.get("DIFF_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX", 'diff.smooth_calibrated.red_avg.inpaint.uvh5')
# output UVFlag files for where inpainting happened
WHERE_INPAINTED_SUFFIX = os.environ.get("WHERE_INPAINTED_SUFFIX", 'where_inpainted.h5')
# output smooth-calibrated, averaged absolute value of visibilities
AVG_ABS_ALL_SUFFIX = os.environ.get("AVG_ABS_SUFFIX", 'sum.smooth_calibrated.avg_abs_all.uvh5')
AVG_ABS_AUTO_SUFFIX = os.environ.get("AVG_ABS_AUTO_SUFFIX", 'sum.smooth_calibrated.avg_abs_auto.uvh5')
AVG_ABS_CROSS_SUFFIX = os.environ.get("AVG_ABS_CROSS_SUFFIX", 'sum.smooth_calibrated.avg_abs_cross.uvh5')
for suffix in ['DIFF_SUFFIX', 'ABS_CAL_SUFFIX', 'SMOOTH_CAL_SUFFIX',
'SUM_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_DLY_FILT_SUFFIX',
'SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_SUFFIX',
'SUM_ABS_CAL_RED_AVG_INPAINT_SUFFIX', 'DIFF_ABS_CAL_RED_AVG_INPAINT_SUFFIX',
'SUM_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX', 'DIFF_SMOOTH_CAL_RED_AVG_INPAINT_SUFFIX',
'WHERE_INPAINTED_SUFFIX', 'AVG_ABS_ALL_SUFFIX', 'AVG_ABS_AUTO_SUFFIX', 'AVG_ABS_CROSS_SUFFIX']:
file_var = suffix.replace('SUFFIX', 'FILE')
exec(f'{file_var} = SUM_FILE.replace(SUM_SUFFIX, {suffix})')
print(f"{file_var} = '{eval(file_var)}'")
print(f'aposteriori_yaml_file = {aposteriori_yaml_file}')
print(f'filter_cache = {filter_cache_dir}')
DIFF_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.diff.uvh5' ABS_CAL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.omni.calfits' SMOOTH_CAL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth.calfits' SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.abs_calibrated.red_avg.dly_filt.uvh5' DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.diff.abs_calibrated.red_avg.dly_filt.uvh5' SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.red_avg.dly_filt.uvh5' DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.diff.smooth_calibrated.red_avg.dly_filt.uvh5' SUM_ABS_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.abs_calibrated.red_avg.inpaint.uvh5' DIFF_ABS_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.diff.abs_calibrated.red_avg.inpaint.uvh5' SUM_SMOOTH_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.red_avg.inpaint.uvh5' DIFF_SMOOTH_CAL_RED_AVG_INPAINT_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.diff.smooth_calibrated.red_avg.inpaint.uvh5' WHERE_INPAINTED_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.where_inpainted.h5' AVG_ABS_ALL_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.avg_abs_all.uvh5' AVG_ABS_AUTO_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.avg_abs_auto.uvh5' AVG_ABS_CROSS_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.avg_abs_cross.uvh5' aposteriori_yaml_file = /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/2459864_aposteriori_flags.yaml filter_cache = /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/filter_cache
# parse delay filtering settings
DLY_FILT_HORIZON = float(os.environ.get("DLY_FILT_HORIZON", 1.0))
DLY_FILT_STANDOFF = float(os.environ.get("DLY_FILT_STANDOFF", 0.0)) # in ns
DLY_FILT_MIN_DLY = float(os.environ.get("DLY_FILT_MIN_DLY", 150.0)) # in ns
DLY_FILT_EIGENVAL_CUTOFF = float(os.environ.get("DLY_FILT_EIGENVAL_CUTOFF", 1e-12))
INPAINT_MIN_DLY = float(os.environ.get("INPAINT_MIN_DLY", 500.0)) # in ns
INPAINT_REGULARIZATION = float(os.environ.get("INPAINT_REGULARIZATION", 1e-5)) # reasonable values are between 1e-2 and 1e-5
INPAINT_MAX_GAP_FACTOR = float(os.environ.get("INPAINT_MAX_GAP_FACTOR", 2)) # maximum allowed gap before flaggint the whole integration is this * 1/dly
INPAINT_MAX_CONVOLVED_FLAG_FRAC = float(os.environ.get("INPAINT_MAX_CONVOLVED_FLAG_FRAC", 0.667)) # maximum allowed flag density for the purpose of identifying flag gaps
FM_LOW_FREQ = float(os.environ.get("FM_LOW_FREQ", 87.5)) # in MHz
FM_HIGH_FREQ = float(os.environ.get("FM_HIGH_FREQ", 108.0)) # in MHz
for setting in ['DLY_FILT_HORIZON', 'DLY_FILT_STANDOFF', 'DLY_FILT_MIN_DLY', 'DLY_FILT_EIGENVAL_CUTOFF', 'INPAINT_MIN_DLY',
'INPAINT_REGULARIZATION', 'INPAINT_MAX_GAP_FACTOR', 'INPAINT_MAX_CONVOLVED_FLAG_FRAC', 'FM_LOW_FREQ', 'FM_HIGH_FREQ']:
print(f'{setting} = {eval(setting)}')
DLY_FILT_HORIZON = 1.0 DLY_FILT_STANDOFF = 0.0 DLY_FILT_MIN_DLY = 150.0 DLY_FILT_EIGENVAL_CUTOFF = 1e-12 INPAINT_MIN_DLY = 500.0 INPAINT_REGULARIZATION = 1e-05 INPAINT_MAX_GAP_FACTOR = 2.0 INPAINT_MAX_CONVOLVED_FLAG_FRAC = 0.667 FM_LOW_FREQ = 87.5 FM_HIGH_FREQ = 108.0
# Find all cached matrices within the filter cache
cache_files = glob.glob(os.path.join(filter_cache_dir, "*.filter_cache.p"))
# Load sum and diff data
hd = io.HERADataFastReader(SUM_FILE)
data, flags, nsamples = hd.read(pols=['ee', 'nn'])
hd_diff = io.HERADataFastReader(DIFF_FILE)
diff_data, diff_flags, diff_nsamples = hd_diff.read(pols=['ee', 'nn'])
# pick all reds that might be in the data, using the same set across polarizations for easier output
reds = redcal.get_reds(hd.data_antpos, pols=['ee', 'nn'], include_autos=True)
ex_ants = set(read_a_priori_ant_flags(aposteriori_yaml_file))
possibly_unflagged_bls = [bl for bl in data if utils.split_bl(bl)[0] not in ex_ants and utils.split_bl(bl)[1] not in ex_ants]
possibly_unflagged_antpairs = set([ap for bl in possibly_unflagged_bls for ap in [bl[:2], bl[-2::-1]]])
reds = [red for red in reds if np.any([bl[0:2] in possibly_unflagged_antpairs for bl in red])]
# Load calibration solutions and gain flags
hc_smooth = io.HERACal(SMOOTH_CAL_FILE)
smooth_gains, cal_flags, _, _ = hc_smooth.read()
hc_abs = io.HERACal(ABS_CAL_FILE)
abs_gains, abs_cal_flags, _, _ = hc_abs.read()
# handle the the case where the smooth_cal flags are all True, trying to maintain consistent data shapes
ALL_FLAGGED = False
if np.all([flag for flag in cal_flags.values()]):
print('This file is entirely flagged. Proceeding with averaging and filtering using earlier flags, '
'but the output data products will still be fully flagged.')
ALL_FLAGGED = True
# Likely the file was fully flagged for broadband RFI, so instead use the original flags from file_calibration
cal_flags = abs_cal_flags
# And if that didn't work, just make the flags all False for now (though ex_ants will still be exluded via reds)
if np.all([flag for flag in cal_flags.values()]):
cal_flags = {ant: np.zeros_like(cal_flags[ant]) for ant in cal_flags}
def red_average(reds, data, nsamples, gains, flags={}, cal_flags={}):
# Redundantly average data
wgts = datacontainer.DataContainer({bl: nsamples[bl] * ~(flags.get(bl, False) | cal_flags.get(utils.split_bl(bl)[0], False) \
| cal_flags.get(utils.split_bl(bl)[1], False)) for bl in nsamples})
sol = redcal.RedSol(reds, gains=gains)
sol.update_vis_from_data(data, wgts=wgts)
# Figure out redundantly averaged flags and nsamples
red_avg_flags = {}
red_avg_nsamples = {}
for red in reds:
if red[0] in sol.vis:
red_avg_flags[red[0]] = np.all([wgts[bl] == 0 for bl in red], axis=0) | ~np.isfinite(sol.vis[red[0]])
red_avg_nsamples[red[0]] = np.sum([nsamples[bl] for bl in red if not np.all(wgts[bl] == 0)], axis=0)
else:
# empty placeholders to make sure every file has the same shape for the whole day
sol.vis[red[0]] = np.zeros_like(next(iter(data.values())))
red_avg_flags[red[0]] = np.ones_like(next(iter(flags.values())))
red_avg_nsamples[red[0]] = np.zeros_like(next(iter(nsamples.values())))
sol.make_sol_finite()
# Build output RedDataContainers
red_avg_data = datacontainer.RedDataContainer(sol.vis, reds)
red_avg_flags = datacontainer.RedDataContainer(red_avg_flags, reds)
red_avg_nsamples = datacontainer.RedDataContainer(red_avg_nsamples, reds)
return red_avg_data, red_avg_flags, red_avg_nsamples
# perform redundant averaging
red_avg_smooth_cal_sum_data, red_avg_flags, red_avg_nsamples = red_average(reds, data, nsamples, smooth_gains, flags=flags, cal_flags=cal_flags)
red_avg_smooth_cal_diff_data, _, _ = red_average(reds, diff_data, diff_nsamples, smooth_gains, flags=diff_flags, cal_flags=cal_flags)
red_avg_abs_cal_sum_data, _, _ = red_average(reds, data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
red_avg_abs_cal_diff_data, _, _ = red_average(reds, diff_data, nsamples, abs_gains, flags=flags, cal_flags=cal_flags)
if not ALL_FLAGGED:
integration_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags], axis=(0, 2))
def plot_red_avg_vis():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 2, figsize=(14, 6), dpi=100, sharex='col', sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
for i, pol in enumerate(['ee', 'nn']):
reds_here = redcal.filter_reds(reds, pols=[pol], antpos=hd.antpos, min_bl_cut=1)
sol = redcal.RedSol(reds_here, gains=smooth_gains)
red = sorted(reds_here, key=lambda red: np.median(red_avg_nsamples.get(red[0], 0)), reverse=True)[0]
if np.any([bl[2] == pol for bl in red_avg_flags]):
for tind in range(red_avg_flags[red[0]].shape[0]):
if not np.all(red_avg_flags[red[0]][tind]):
break
calibrated = {bl: np.where(flags[bl] | cal_flags[utils.split_bl(bl)[0]] | cal_flags[utils.split_bl(bl)[1]],
np.nan, sol.calibrate_bl(bl, data[bl])) for bl in red if bl in data}
for bl in red:
axes[0, i].plot(hd.freqs/1e6, np.angle(calibrated[bl][tind]), alpha=.5, lw=.5)
axes[1, i].semilogy(hd.freqs/1e6, np.abs(calibrated[bl][tind]), alpha=.5, lw=.5)
to_plot = np.where(red_avg_flags[red[0]][tind], np.nan, red_avg_smooth_cal_sum_data[red[0]][tind])
axes[0, i].plot(hd.freqs / 1e6, np.angle(to_plot), lw=1, c='k')
axes[1, i].semilogy(hd.freqs / 1e6, np.abs(to_plot), lw=1, c='k', label=f'Baseline Group:\n{red[0]}')
axes[1, i].set_xlabel('Frequency (MHz)')
axes[1, i].legend(loc='upper right')
axes[0, 0].set_ylabel('Visibility Phase (radians)')
axes[1, 0].set_ylabel('Visibility Amplitude (Jy)')
plt.tight_layout()
def plot_red_avg_nsamples():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
fig, axes = plt.subplots(2, 1, figsize=(14,7), dpi=100, sharex=True, gridspec_kw={'hspace': 0})
max_nsamples = np.max([np.median(ns) for ns in red_avg_nsamples.values()])
for ax, pol in zip(axes, ['ee', 'nn']):
if np.any([bl[2] == pol for bl in red_avg_nsamples]):
blvecs = np.array([hd.antpos[red[0][1]] - hd.antpos[red[0][0]] for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
med_nsamples = np.array([np.median(red_avg_nsamples[red[0]][~integration_flags, :])
for red in reds if red[0] in red_avg_nsamples and red[0][2] == pol])
sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=0)
sca = ax.scatter(blvecs[:, 0], blvecs[:, 1], s=(100 * (600 / np.diff(ax.get_xlim())[0])**2), ec='k', linewidths=.5,
c=med_nsamples, norm=matplotlib.colors.LogNorm(vmin=1, vmax=max_nsamples), cmap='turbo')
ax.axis('equal')
ax.set_xlabel('EW Baseline Vector (m)')
ax.set_ylabel('NS Baseline Vector (m)')
ax.text(.98, .94, f'{pol}-polarized', transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
plt.tight_layout()
fig.colorbar(sca, ax=axes, pad=.02, label='Number of Samples')
smooth_cal
¶The results of calibration and redundant averaging the baseline group with the highest redundancy in each polarization. Visibilities with flagged antennas are excluded. The black line is the redundantly-averaged visibility. Each thin colored line is a different baseline group. Phases are shown in the top row, amplitudes in the bottom, ee-polarized visibilities in the left column, and nn-polarized visibilities in the right.
plot_red_avg_vis()
The number of visibilities averaged together in each baseline group. Note that the split of the HERA core produces more highly-sampled intra-sector baselines that are interspersed with the less-highly-sampled inter-sector baselines off the main grid.
plot_red_avg_nsamples()
# save memory
ALL_CAL_FLAGS = np.all([np.all(~cal_flags[ant]) for ant in cal_flags])
del hd, data, flags, nsamples, hd_diff, diff_data, diff_flags, diff_nsamples
del hc_smooth, smooth_gains, cal_flags, hc_abs, abs_gains, abs_cal_flags
hd = io.HERADataFastReader(SUM_FILE)
def read_filter_cache_scratch(cache_files):
"""
Load files from a cache specified by cache_dir.
"""
# Load up the cache file with the most keys (precomputed filter matrices).
cache = {}
# loop through cache files, load them.
# If there are new keys, add them to internal cache.
# If not, delete the reference matrices from memory.
for cache_file in cache_files:
try:
cfile = open(cache_file, 'rb')
try:
cache_t = pickle.load(cfile)
finally:
cfile.close()
except:
continue
for key in cache_t:
cache[key] = cache_t[key]
return cache
# Peform delay filter separately above and below FM
low_band = slice(0, np.argwhere(hd.freqs > FM_LOW_FREQ * 1e6)[0][0])
high_band = slice(np.argwhere(hd.freqs > FM_HIGH_FREQ * 1e6)[0][0], len(hd.freqs))
to_filter = np.outer(np.ones(len(hd.times), dtype=bool), (hd.freqs <= FM_LOW_FREQ * 1e6) | (hd.freqs > FM_HIGH_FREQ * 1e6))
# set up datacontainers for delay-filtered data products
dly_filt_red_avg_smooth_cal_sum_data = copy.deepcopy(red_avg_smooth_cal_sum_data)
dly_filt_red_avg_smooth_cal_diff_data = copy.deepcopy(red_avg_smooth_cal_diff_data)
dly_filt_red_avg_abs_cal_sum_data = copy.deepcopy(red_avg_abs_cal_sum_data)
dly_filt_red_avg_abs_cal_diff_data = copy.deepcopy(red_avg_abs_cal_diff_data)
# set up datacontainers for inpainted data products
inpainted_red_avg_smooth_cal_sum_data = copy.deepcopy(red_avg_smooth_cal_sum_data)
inpainted_red_avg_smooth_cal_diff_data = copy.deepcopy(red_avg_smooth_cal_diff_data)
inpainted_red_avg_abs_cal_sum_data = copy.deepcopy(red_avg_abs_cal_sum_data)
inpainted_red_avg_abs_cal_diff_data = copy.deepcopy(red_avg_abs_cal_diff_data)
where_inpainted = copy.deepcopy(red_avg_flags)
red_avg_inpainted_flags = copy.deepcopy(red_avg_flags)
auto_bls = [bl for bl in dly_filt_red_avg_smooth_cal_sum_data if bl[0] == bl[1]]
gap_flag_warned = set()
new_cache_keys = []
if ALL_FLAGGED and ALL_CAL_FLAGS:
print('All integrations flagged after the file_calibration notebook... skipping delay filtering.')
for bl in where_inpainted:
where_inpainted[bl][:, :] = False
else:
# Load files in cache
cache = read_filter_cache_scratch(cache_files)
cache_keys = list(cache.keys())
for bl in sorted(red_avg_flags.keys(), key=lambda bl: np.sum(red_avg_flags[bl]), reverse=True):
# inverse variance weight using smooth_calibrated autocorrealtions (which are proprotional to std of noise)
auto_bl = [abl for abl in auto_bls if abl[2] == bl[2]][0]
wgts = np.where(red_avg_flags[bl], 0, red_avg_smooth_cal_sum_data[auto_bl]**-2)
wgts /= np.nanmean(np.where(red_avg_flags[bl], np.nan, wgts)) # avoid dynamic range issues
wgts[~np.isfinite(wgts)] = 0
for mode in ['dly_filt', 'inpaint']:
# switch for inpainting or delay filtering
if mode == 'dly_filt':
min_dly = DLY_FILT_MIN_DLY
ridge_alpha = 0
dcs = [dly_filt_red_avg_smooth_cal_sum_data, dly_filt_red_avg_smooth_cal_diff_data,
dly_filt_red_avg_abs_cal_sum_data, dly_filt_red_avg_abs_cal_diff_data]
else:
min_dly = INPAINT_MIN_DLY
ridge_alpha = INPAINT_REGULARIZATION
dcs = [inpainted_red_avg_smooth_cal_sum_data, inpainted_red_avg_smooth_cal_diff_data,
inpainted_red_avg_abs_cal_sum_data, inpainted_red_avg_abs_cal_diff_data]
# calculate filter properties
bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
filter_centers, filter_half_widths = vis_clean.gen_filter_properties(ax='freq', min_dly=min_dly, horizon=DLY_FILT_HORIZON,
standoff=DLY_FILT_STANDOFF, bl_len=bl_len,)
for dc in dcs:
if np.all(dc[bl] == 0.0) and np.all(red_avg_flags[bl]):
where_inpainted[bl] = np.zeros_like(red_avg_flags[bl])
# don't bother delay filtering all 0s.
continue
# run delay filter for each band individually
d_mdl = np.zeros_like(dc[bl])
inpainting_succeeded = np.zeros_like(red_avg_flags[bl])
for band in [low_band, high_band]:
d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dc[bl][:, band], wgts=wgts[:, band], filter_centers=filter_centers,
filter_half_widths=filter_half_widths, mode='dpss_solve',
ridge_alpha=ridge_alpha, fit_intercept=(mode == 'inpaint'),
eigenval_cutoff=[DLY_FILT_EIGENVAL_CUTOFF], suppression_factors=[DLY_FILT_EIGENVAL_CUTOFF],
max_contiguous_edge_flags=len(hd.freqs), cache=cache)
# track the flagging on integrations with too-large gaps
if mode == 'inpaint':
# also conolve flags with a triangle filter to mark as un-inpaintable any stretches effectively "average" to too large a gap
max_allowed_gap_size = INPAINT_MAX_GAP_FACTOR * (info['filter_params']['axis_1']['filter_half_widths'][0])**-1 / np.median(np.diff(hd.freqs))
convolution_kernel = np.append(np.linspace(0, 1, int(max_allowed_gap_size) - 1, endpoint=False), np.linspace(1, 0, int(max_allowed_gap_size)))
convolution_kernel /= np.sum(convolution_kernel)
convolved_flags = signal.convolve2d(red_avg_flags[bl][:, band], convolution_kernel[np.newaxis, :], mode='same') > INPAINT_MAX_CONVOLVED_FLAG_FRAC
# loop over times
for i in range(len(hd.times)):
flagged_stretches = true_stretches((red_avg_flags[bl][i, band] | convolved_flags[i]))
longest_gap = np.max([ts.stop - ts.start for ts in flagged_stretches])
if longest_gap > max_allowed_gap_size:
red_avg_inpainted_flags[bl][i, band] = True
if (i, band.start, band.stop) not in gap_flag_warned:
print(f'Flagging integration {i} (JD = {hd.times[i]}) for between channels {band.start} and {band.stop} because '
f'it has an effective {longest_gap}-channel flag gap (which exceeds {int(max_allowed_gap_size)}, the maximum allowed).')
gap_flag_warned.add((i, band.start, band.stop))
else:
inpainting_succeeded[i, band] = True
# Track new DPSS-filters entering the cache
fourier_filter_key = dspec._fourier_filter_hash(
filter_centers=filter_centers, filter_half_widths=filter_half_widths, filter_factors=[0.], x=hd.freqs[band], w=None,
crit_name='eigenval_cutoff', label='dpss_operator', crit_val=tuple([DLY_FILT_EIGENVAL_CUTOFF])
)
if fourier_filter_key not in cache_keys:
new_cache_keys.append(fourier_filter_key)
if (mode == 'inpaint'):
where_inpainted[bl] = (red_avg_inpainted_flags[bl] & to_filter & inpainting_succeeded & (not ALL_FLAGGED))
dc[bl] = np.where(red_avg_inpainted_flags[bl] & inpainting_succeeded, d_mdl, dc[bl])
elif (bl[0] == bl[1]):
# inpaint autos at delay-filter delay
dc[bl] = np.where(red_avg_flags[bl], d_mdl, dc[bl])
else:
# for all other baselines, filter foregrounds
dc[bl] = np.where(red_avg_flags[bl], 0, dc[bl] - d_mdl)
Mean of empty slice invalid value encountered in divide
def plot_delay_spectra():
if ALL_FLAGGED:
print('All integrations are flagged. Nothing to plot.')
return
# loop over bands
red_avg_flag_waterfall = np.all(list(red_avg_flags.values()), axis=0)
unflagged_inds = np.squeeze(np.argwhere(~np.any(red_avg_flag_waterfall[~np.all(red_avg_flag_waterfall, axis=1), :], axis=0)))
low_band = slice(unflagged_inds[0], np.max(unflagged_inds[hd.freqs[unflagged_inds] < FM_LOW_FREQ * 1e6]) + 1)
high_band = slice(np.min(unflagged_inds[hd.freqs[unflagged_inds] > FM_HIGH_FREQ * 1e6]), unflagged_inds[-1] + 1)
for band in [low_band, high_band]:
display(HTML(f'<h2>Redundantly-Averaged, Smooth-Calibrated Delay Spectra: {hd.freqs[band][0] / 1e6:.2f} — {hd.freqs[band][-1] / 1e6:.2f} MHz</h2>'))
fig, axes = plt.subplots(5, 2, figsize=(14,10), sharex=True, sharey='row', gridspec_kw={'hspace': 0, 'wspace': 0})
mins, maxes = {}, {}
for bl in red_avg_smooth_cal_sum_data:
# figure out whether to plot this baseline
bl_vec = (hd.antpos[bl[1]] - hd.antpos[bl[0]])
if not ((np.abs(bl_vec[1]) < 1) and int(np.round(bl_vec[0] / 14.6)) in [0, 1, 2, 4, 8]):
continue
# compute filter size
bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
_, dly_filter_half_widths = vis_clean.gen_filter_properties(ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF,
min_dly=DLY_FILT_MIN_DLY, bl_len=bl_len)
_, inpaint_filter_half_widths = vis_clean.gen_filter_properties(ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF,
min_dly=INPAINT_MIN_DLY, bl_len=bl_len)
# figure out where to put the plot
row = {0: 0, 1: 1, 2: 2, 4: 3, 8: 4}[int(np.round(bl_vec[0] / 14.6))]
col = int(bl[2] == 'nn')
ax = axes[row, col]
# FFT inpainted sums
ip_integration_flags = integration_flags | np.all(red_avg_inpainted_flags[bl][:, band], axis=1)
if not np.all(ip_integration_flags):
dfft_ip, delays = vis_clean.fft_data(inpainted_red_avg_smooth_cal_sum_data[bl][:, band], delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
ax.semilogy(delays*1e9, np.mean(np.abs(dfft_ip[~ip_integration_flags, :]), axis=0), label='Inpainted Sum Data')
else:
ax.plot([], []) # advances the color cycle
# FFT delay-filtered sums
dfft_df, delays = vis_clean.fft_data(dly_filt_red_avg_smooth_cal_sum_data[bl][:, band], delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
ax.semilogy(delays*1e9, np.mean(np.abs(dfft_df[~integration_flags, :]), axis=0), ls='--', label=f'Delay-Filtered Sum Data')
# FFT diffs
dfft_ddf, _ = vis_clean.fft_data(np.where(red_avg_flags[bl], 0, red_avg_smooth_cal_diff_data[bl])[:, band],
delta_bin=(hd.freqs[1] - hd.freqs[0]), window='bh')
ax.semilogy(delays*1e9, np.mean(np.abs(dfft_ddf[~integration_flags, :]), axis=0), ls=':', label=f'Diff Data')
mins[bl] = np.median(np.abs(dfft_ddf[~integration_flags, :])) / 5
try:
maxes[bl] = np.max(np.abs(dfft_ip[~integration_flags, :])) * 2
except:
maxes[bl] = np.max(np.abs(dfft_df[~integration_flags, :])) * 2
ax.axvline(1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5, label=f'Delay Filter Delay')
ax.axvline(-1e9 * dly_filter_half_widths[0], c='k', ls='--', lw=1, alpha=.5)
ax.axvline(1e9 * inpaint_filter_half_widths[0], c='k', ls=':', lw=1, alpha=.5, label=f'Inpainting Delay')
ax.axvline(-1e9 * inpaint_filter_half_widths[0], c='k', ls=':', lw=1, alpha=.5)
if row == col == 0:
ax.legend(loc='upper left')
ax.set_xlim([-2250, 2250])
vec = f'{bl[2]}-polarized\n{bl_vec[0]:.1f} m East\n{bl_vec[1]:.1f} m North'
ax.text(.97, .9, vec, transform=ax.transAxes, va='top', ha='right', bbox=dict(facecolor='w', alpha=0.5))
if col == 0:
ax.set_ylabel('$|\\widetilde{V}|$')
ax.set_xlabel('Delay (ns)')
for ax in axes[1:, :].flatten():
ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] != bl[1]]),
np.max([maxes[bl] for bl in mins if bl[0] != bl[1]]))
for ax in axes[0, :]:
ax.set_ylim(np.min([mins[bl] for bl in mins if bl[0] == bl[1]]),
np.max([maxes[bl] for bl in mins if bl[0] == bl[1]]))
plt.tight_layout()
plt.show()
smooth_cal
and Redundant Averaging¶This figure plots delay spectra for autocorrelations and 1, 2, 4, and 8 unit East-west baselines. The delay-spectra are computed by FFTing the visibilities with a Blackman-Harris taper and then taking the absolute value. The delay spectra of visibility sums are plotted both inpainted and delay-filtered. The delay spectra of visibility diffs are generated by simply setting their flagged channels to 0. For autocorrelations, only in-painting is performed, so the "delay-filtered" and inpainted delay spectra are the same. These delay spectra are plotted for both polarizations and in both the high (above FM) and low (below FM) bands. Note that the frequency ranges over which the delay-filtering was performed might be slightly larger than bands shown here since flagged channels at band edges were excluded from the FFT.
plot_delay_spectra()
if SAVE_RESULTS:
hd_out = io.HERAData(SUM_FILE)
hd_out.read(bls=red_avg_flags.keys())
hd_out.empty_arrays()
hd_out.history += add_to_history
hd_out.update(flags=red_avg_flags, nsamples=red_avg_nsamples)
if ALL_FLAGGED:
# put back in all the flags that we had been ignoring up to this point
hd_out.flag_array = np.ones_like(hd_out.flag_array)
def _write(dc, outfile, *args):
if not np.all(args):
return
hd_out.update(data=dc)
print(f'Now writing redundantly-averaged calibrated visibilities to {outfile}')
hd_out.write_uvh5(outfile, clobber=True)
# write delay-filtered file(s)
_write(dly_filt_red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG)
_write(dly_filt_red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_DIFF_RED_AVG)
_write(dly_filt_red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG)
_write(dly_filt_red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_DLY_FILT_FILE, SAVE_DLY_FILT_RED_AVG, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
# write inpainted file(s)
if not ALL_FLAGGED:
hd_out.update(flags=red_avg_inpainted_flags)
_write(inpainted_red_avg_smooth_cal_sum_data, SUM_SMOOTH_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG)
_write(inpainted_red_avg_smooth_cal_diff_data, DIFF_SMOOTH_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG, SAVE_DIFF_RED_AVG)
_write(inpainted_red_avg_abs_cal_sum_data, SUM_ABS_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG, SAVE_ABS_CAL_RED_AVG)
_write(inpainted_red_avg_abs_cal_diff_data, DIFF_ABS_CAL_RED_AVG_INPAINT_FILE, SAVE_INPAINT_RED_AVG, SAVE_ABS_CAL_RED_AVG, SAVE_DIFF_RED_AVG)
if SAVE_INPAINT_RED_AVG:
hd_out.update(flags=where_inpainted)
uvf = UVFlag(hd_out, mode='flag', copy_flags=True, use_future_array_shapes=True)
uvf.history += add_to_history
uvf.write(WHERE_INPAINTED_FILE, clobber=True)
Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.red_avg.dly_filt.uvh5 File exists; clobbering Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.abs_calibrated.red_avg.dly_filt.uvh5 File exists; clobbering Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.red_avg.inpaint.uvh5 File exists; clobbering Now writing redundantly-averaged calibrated visibilities to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.abs_calibrated.red_avg.inpaint.uvh5 File exists; clobbering File /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.where_inpainted.h5 exists; clobbering
for mode in ['autos', 'crosses', 'all visibiltiies']:
avg_abs_data, avg_flags, avg_nsamples = {}, {}, {}
for pol in ['ee', 'nn']:
bls_to_avg = []
if mode == 'autos' or mode == 'all visibiltiies':
key = (hd.data_ants[0], hd.data_ants[0], pol)
bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] == bl[1] and bl[2] == pol]
if mode == 'crosses' or mode == 'all visibiltiies':
key = (hd.data_ants[0], hd.data_ants[1], pol)
bls_to_avg += [bl for bl in red_avg_smooth_cal_sum_data if bl[0] != bl[1] and bl[2] == pol]
if len(bls_to_avg) > 0:
weights = np.array([red_avg_nsamples[bl] * (~red_avg_flags[bl]) for bl in bls_to_avg])
avg_flags[key] = np.all(weights == 0, axis=0)
weights = np.array([np.where(avg_flags[key], red_avg_nsamples[bl],
red_avg_nsamples[bl] * (~red_avg_flags[bl])) for bl in bls_to_avg])
avg_nsamples[key] = np.sum([red_avg_nsamples[bl] for bl in bls_to_avg], axis=0)
avg_abs_data[key] = np.average([np.abs(red_avg_smooth_cal_sum_data[bl]) for bl in bls_to_avg], weights=weights, axis=0)
if SAVE_RESULTS:
hd_out = io.HERAData(SUM_FILE)
hd_out.read(bls=set([bl[0:2] for bl in avg_abs_data]), polarizations=['ee', 'nn'])
hd_out.empty_arrays()
hd_out.update(data=avg_abs_data, flags=avg_flags, nsamples=avg_nsamples)
if ALL_FLAGGED:
# put back in all the flags that we had been ignoring up to this point
hd_out.flag_array = np.ones_like(hd_out.flag_array)
hd_out.history += add_to_history
outfile = {'autos': AVG_ABS_AUTO_FILE, 'crosses': AVG_ABS_CROSS_FILE, 'all visibiltiies': AVG_ABS_ALL_FILE}[mode]
print(f'Writing averaged absolute of {mode} to {outfile}')
hd_out.write_uvh5(outfile, clobber=True)
Writing averaged absolute of autos to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.avg_abs_auto.uvh5 File exists; clobbering Writing averaged absolute of crosses to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.avg_abs_cross.uvh5 File exists; clobbering Writing averaged absolute of all visibiltiies to /lustre/aoc/projects/hera/h6c-analysis/IDR2/2459864/zen.2459864.46084.sum.smooth_calibrated.avg_abs_all.uvh5 File exists; clobbering
if SAVE_RESULTS:
# Create cache directory if it doesn't already exist
if not os.path.exists(filter_cache_dir):
os.mkdir(filter_cache_dir)
# For each new cache key save a new cache file in the cache directory
for key in new_cache_keys:
cache_file_basename = key[0] + '.filter_cache.p'
if not os.path.exists(os.path.join(filter_cache_dir, cache_file_basename)):
try:
cfile = open(os.path.join(filter_cache_dir, cache_file_basename), 'wb')
try:
pickle.dump({key: cache[key]}, cfile)
finally:
cfile.close()
except (OSError, IOError):
# File could not be written to, move on
pass
for repo in ['pyuvdata', 'hera_cal', 'hera_qm', 'hera_filters', 'hera_notebook_templates']:
exec(f'from {repo} import __version__')
print(f'{repo}: {__version__}')
pyuvdata: 2.4.1.dev9+gc10c105e hera_cal: 3.3.1.dev6+gf5f463ca hera_qm: 2.1.2 hera_filters: 0.1.4 hera_notebook_templates: 0.1.dev531+gfe314a8
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 3.51 minutes.