Single Baseline Filtering and Power Spectrum Estimation¶
by Josh Dillon, Bobby Pascua, Mike Wilensky, Jianrong Tan and Steven Murray, last updated March 11, 2025
This notebook is designed to take a single redundantly-averaged unique baseline (typically after LST-binning) and push it through all the way to the power spectrum. It operates on single files that contain a single baseline for all LSTs and both 'ee'
and 'nn'
polarizations. It then can:
- Throw out highly flagged times and/or channels
- Inpaint autocorrelations to produce a noise model, if necessary
- Inpaint cross-correlations (optional)
- Delay-filter cross-correlations (optional)
- De-interleave by time into multiple waterfalls with independent noise and rephased to the same set of LSTs
- Perform crosstalk notch filtering of the FR = 0 mode
- Perform main beam top hat fringe-rate filtering
- Convert to pseudo-Stokes I and Q
- Perform coherent time averaging
- Compute power spectra from pairs of interleaves
- Estimate noise, accounting for how the fringe-rate filter and the coherent
- Incoherent averaging over time and across interleave-pairs
This notebook also produces a series of plots and tables to illustrate the progress of the analysis. These include:
• Table 1: Band Definitions¶
• Figure 1: Bands and Flag Occupancy¶
• Table 2: Fringe-Rate and Crosstalk Filtering Ranges and Losses¶
• Figure 2: Waterfalls Before Delay Filtering and/or Inpainting¶
• Figure 3: Waterfalls After Delay Filtering and/or Inpainting¶
• Figure 4: First Set of De-Interleaved Waterfalls after Cross-Talk Filtering¶
• Figure 5: First Set of De-Interleaved Waterfalls after Main-Beam Fringe-Rate Filtering¶
• Figure 6: First Set of De-Interleaved Waterfalls after Coherent Time Averaging¶
• Figure 7: First Set of De-Interleaved Waterfalls after Forming Pseudo-Stokes I¶
• Figure 8: Interleave-Averaged Power Spectra (Pseudo-Stokes I, Q, U, & V) vs. LST-vs.-LST)¶
• Figure 9: Interleave-Averaged Power Spectrum SNR vs. LST (Real and Imaginary for pI))¶
• Figure 10: High Delay Power Spectrum SNR Histograms Before and After Incoherent Averaging¶
• Figure 11: Incoherently Averaged Power Spectrum with Error Bars¶
Imports and Parameters¶
TODO:
- Calculate signal loss due to redundant averaging
import time
tstart = time.time()
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
import h5py
import hdf5plugin # REQUIRED to have the compression plugins available
import numpy as np
import pandas as pd
import re
import matplotlib.pyplot as plt
import matplotlib
import copy
import warnings
from astropy import units
from scipy import constants, interpolate, special
from scipy.signal.windows import dpss
from pyuvdata import UVBeam
from pyuvdata import utils as uvutils
from hera_cal import io, utils, vis_clean, frf, datacontainer, noise, redcal
from hera_qm.time_series_metrics import true_stretches
from hera_filters import dspec
import hera_pspec as hp
import uvtools
from functools import reduce
from IPython.display import HTML
from pathlib import Path
from hera_notebook_templates.utils import parse_band_str
import importlib
%matplotlib inline
# Notebook settings
PLOT: bool = True
SAVE_RESULTS: bool = True
# Data settings
SINGLE_BL_FILE: str = Path("/lustre/aoc/projects/hera/Validation/H6C_IDR2/lstbin-outputs/redavg-smoothcal-inpaint-500ns-lstcal/inpaint/single_baseline_files/zen.LST.baseline.6_34.sum.uvh5")
OUT_PSPEC_FILE: str = None
OUT_TAVG_PSPEC_FILE: str = None
# Band settings
BAND_STR: str = ('50.1~62.2,63.7~73.5,74.6~85.4,108.1~115.8,117.3~124.4,125.5~135.9,138.8~147.1,'
'150.3~161.4,161.5~169.9,171.9~180.8,181.6~196.4,198.6~209.7,212.3~220.5,223.8~231.1') # in MHz
# Inpainting settings
ALREADY_INPAINTED: bool = True
PERFORM_INPAINT: bool = False
INPAINT_MIN_DLY: float = 500.0 # in ns
INPAINT_HORIZON: float = 1.0
INPAINT_STANDOFF: float = 0.0 # in ns
INPAINT_EIGENVAL_CUTOFF: float = 1e-12
# Delay filtering settings
PERFORM_DLY_FILT: bool = False
DLY_FILT_MIN_DLY: float = 150.0 # ns
DLY_FILT_HORIZON: float = 1.0
DLY_FILT_STANDOFF: float = 0.0 # ns
DLY_FILT_EIGENVAL_CUTOFF: float = 1e-12
# Flagging settings
USE_BAND_AVG_NSAMPLES: bool = True # for time filtering, time averaging, and power spectrum estimation
FLAG_COHERENT_CHUNKS: bool = False
FM_CUT_FREQ: float = 100e6 # in Hz
PIXEL_FLAG_CUT: float = 0.75
INTEGRATION_FLAG_CUT: float = 0.2
CHANNEL_FLAG_CUT: float = 0.5 # If neither PERFORM_INPAINT nor PERFORM_DLY_FILT, this is ignored
# FRF and time-averaging settings
NINTERLEAVE: int = 4 # number of interleaves to independently FRF
XTALK_FR: float = 0.01 # Fringe rate half-width in Hz used for fringe rate filtering crosstalk
FR_SPECTRA_FILE: str = Path("/lustre/aoc/projects/hera/zmartino/hera_frf/spectra_cache/spectra_cache_hera_core.h5")
FR_QUANTILE_LOW: float = 0.05
FR_QUANTILE_HIGH: float = 0.95
FR_EIGENVAL_CUTOFF: float = 1e-12
TARGET_AVERAGING_TIME: float = 300.0 # coherent integration time in seconds. Actual time might be less to so that all interleaves have the same number of samples averaged
USE_CORR_MATRIX: bool = True # Whether or not to use the correlation matrix to do the noise statistics correction factor
CORR_MATRIX_FREQ_DECIMATION: int = 10 # How much to decimate the frequency axis when getting the noise statistic correction factor
CORR_MATRIX_NOTCH_CUTOFF: float = 30.0 # Maximum EW-projection in meters for including the notch filter in error covariance calculation
# Power spectrum settings
EFIELD_HEALPIX_BEAM_FILE: str = Path("/lustre/aoc/projects/hera/h6c-analysis/IDR2/beams/NF_HERA_Vivaldi_efield_beam_healpix.fits")
TAPER: str = 'bh' # taper applied when doing power spectra
INCLUDE_INTERLEAVE_AUTO_PS: bool = False
STORE_WINDOW_FUNCTIONS: bool = False
# Debugging settings (for advanced users, otherwise leave all False)
USE_SIMULATED_NOISE: bool = False # replaces data with random white noise with the statistics of the autos
FLAT_AUTOS: bool = False # sets autos to flat 10000 Jy
NO_FLAGS_FLAT_NSAMPLES: bool = False # sets all flags to False and all nsamples to the median value for the baseline
SKIP_XTALK_AND_FRF: bool = False # doesn't perform any kind of time-based filtering
LST_MIN: float = 0.0 # hours
LST_MAX: float = 24.0 # hours
# Parameters
SINGLE_BL_FILE = "/lustre/aoc/projects/hera/Validation/H6C_IDR2/lstbin-outputs/redavg-smoothcal-inpaint-500ns-lstcal/inpaint/single_baseline_files/zen.LST.baseline.6_109.sum.uvh5"
OUT_PSPEC_FILE = "/lustre/aoc/projects/hera/Validation/H6C_IDR2/lstbin-outputs/redavg-smoothcal-inpaint-500ns-lstcal/inpaint/single_baseline_files/zen.LST.baseline.6_109.sum.pspec.h5"
LST_MAX = 5.75
LST_MIN = 1.25
SKIP_XTALK_AND_FRF = False
NO_FLAGS_FLAT_NSAMPLES = False
FLAT_AUTOS = False
USE_SIMULATED_NOISE = False
STORE_WINDOW_FUNCTIONS = False
INCLUDE_INTERLEAVE_AUTO_PS = False
TAPER = "bh"
EFIELD_HEALPIX_BEAM_FILE = (
"/lustre/aoc/projects/hera/H4C/beams/NF_HERA_Vivaldi_efield_beam_healpix.fits"
)
CORR_MATRIX_NOTCH_CUTOFF = 30.0
CORR_MATRIX_FREQ_DECIMATION = 10
USE_CORR_MATRIX = True
TARGET_AVERAGING_TIME = 300
FR_EIGENVAL_CUTOFF = 1e-12
FR_QUANTILE_HIGH = 0.95
FR_QUANTILE_LOW = 0.05
FR_SPECTRA_FILE = "/lustre/aoc/projects/hera/zmartino/hera_frf/spectra_cache/spectra_cache_hera_core.h5"
XTALK_FR = 0.01
NINTERLEAVE = 4
CHANNEL_FLAG_CUT = 0.0
INTEGRATION_FLAG_CUT = 0.2
PIXEL_FLAG_CUT = 0.0
FM_CUT_FREQ = 100000000.0
FLAG_COHERENT_CHUNKS = False
USE_BAND_AVG_NSAMPLES = True
DLY_FILT_EIGENVAL_CUTOFF = 1e-12
DLY_FILT_STANDOFF = 0.0
DLY_FILT_HORIZON = 1.0
DLY_FILT_MIN_DLY = 150.0
PERFORM_DLY_FILT = False
INPAINT_EIGENVAL_CUTOFF = 1e-12
INPAINT_STANDOFF = 0.0
INPAINT_HORIZON = 1.0
INPAINT_MIN_DLY = 500.0
PERFORM_INPAINT = False
ALREADY_INPAINTED = True
BAND_STR = "50.1~62.2,63.3~73.5,74.6~85.4,108.0~116.1,117.3~124.4,125.3~136.2,138.3~148.2,150.1~159.2,159.3~169.9,171.9~181.1,181.4~196.4,198.5~208.4,212.3~220.6,224.4~231.1"
OUT_TAVG_PSPEC_FILE = None
SAVE_RESULTS = True
PLOT = True
papermill_output_path = "/lustre/aoc/projects/hera/Validation/H6C_IDR2/lstbin-outputs/redavg-smoothcal-inpaint-500ns-lstcal/inpaint/single_baseline_files/zen.LST.baseline.6_109.sum.single_baseline_postprocessing_and_pspec.ipynb"
papermill_input_path = "/lustre/aoc/projects/hera/heramgr/anaconda3/envs/h6c_idr2_validation/lib/python3.12/site-packages/hera_notebook_templates/notebooks/single_baseline_postprocessing_and_pspec.ipynb"
if (not PERFORM_INPAINT) and (not PERFORM_DLY_FILT):
CHANNEL_FLAG_CUT = 0.0
PIXEL_FLAG_CUT = 0.0
SINGLE_BL_FILE = Path(SINGLE_BL_FILE)
FR_SPECTRA_FILE = Path(FR_SPECTRA_FILE)
EFIELD_HEALPIX_BEAM_FILE = Path(EFIELD_HEALPIX_BEAM_FILE)
if SAVE_RESULTS and OUT_PSPEC_FILE is None:
OUT_PSPEC_FILE = SINGLE_BL_FILE.with_suffix(f".pspec.h5")
if SAVE_RESULTS and OUT_TAVG_PSPEC_FILE is None:
OUT_TAVG_PSPEC_FILE = SINGLE_BL_FILE.with_suffix(f".tavg.pspec.h5")
Load Data¶
# figure out ANTPAIR and corresponding AUTO_BL_FILE
ANTPAIR = tuple([int(ant) for ant in re.search(r'\d+_\d+', SINGLE_BL_FILE.name).group().split('_')])
all_files = SINGLE_BL_FILE.parent.glob(SINGLE_BL_FILE.name.replace(f'{ANTPAIR[0]}_{ANTPAIR[1]}', '*'))
AUTO_BL_FILE = sorted([f for f in all_files if len(set(re.search(r'\d+_\d+', f.name).group().split('_'))) == 1])[0]
# load data for both crosses and autos with times corresponding only to those in the crosses,
single_bl_times = np.array(io.HERAData(SINGLE_BL_FILE).times)
hd = io.HERAData([AUTO_BL_FILE, SINGLE_BL_FILE])
data, flags, nsamples = hd.read(times=single_bl_times)
cross_bls = [ANTPAIR + (pol,) for pol in data.pols()]
# check that non-finite data is flagged and flagged data is set to 0
for bl in cross_bls:
assert np.all(flags[bl][~np.isfinite(data[bl])])
data[bl][~np.isfinite(data[bl])] = 0
data[bl][flags[bl]] = 0
auto_antpair = sorted(set([k[0:2] for k in data.bls() if k[0] == k[1]]))[0]
df = np.median(np.diff(data.freqs))
dt = np.median(np.diff(data.times)) * 24 * 3600
# Calculate averaging time that divides neatly into NINTERLEAVE
AVERAGING_TIME = TARGET_AVERAGING_TIME / (dt * (1 + 1e-10)) // NINTERLEAVE * (dt * (1 + 1e-10)) * NINTERLEAVE
print(f'Using an actual coherent averaging time of {AVERAGING_TIME:.3f} seconds to ensure even interleaving.')
Using an actual coherent averaging time of 270.592 seconds to ensure even interleaving.
NSamples Cut¶
# Removes all pixels that are more than 50% (by default) flagged relative to the maximum in that integration.
# This is done above and below FM separately because it's possible for a baseline to be entirely flagged above or below but not both.
FM_ind = np.argmin(np.abs(data.freqs - FM_CUT_FREQ))
for bl in cross_bls:
npix_flagged_before = np.sum(nsamples[bl] == 0)
for fslice in [slice(0, FM_ind), slice(FM_ind, len(data.freqs))]:
if fslice.start >= fslice.stop:
continue
flags[bl][:, fslice][(nsamples[bl][:, fslice] < PIXEL_FLAG_CUT * np.max(nsamples[bl][:, fslice], axis=1, keepdims=True))] = True
nsamples[bl][flags[bl]] = 0
print(f'{bl}: flagging {np.sum(nsamples[bl] == 0) - npix_flagged_before} pixels.')
# Remove all integrations that have fewer integrations than 20% (by default) of the best-observed integration
nsamples_by_time = np.sum([np.where(flags[bl], 0, nsamples[bl]) for bl in cross_bls], axis=(0, 2))
for bl in cross_bls:
print(f'{bl}: flagging {np.sum((nsamples_by_time < INTEGRATION_FLAG_CUT * np.max(nsamples_by_time)) & ~np.all(flags[bl], axis=1))} times.')
flags[bl][nsamples_by_time < INTEGRATION_FLAG_CUT * np.max(nsamples_by_time), :] = True
nsamples[bl][flags[bl]] = 0
# Remove all channels that are more than 50% (by default) flagged (relative to the best-observed channel)
nsamples_by_chan = np.sum([np.where(flags[bl], 0, nsamples[bl]) for bl in cross_bls], axis=(0, 1))
for bl in cross_bls:
print(f'{bl}: flagging {np.sum((nsamples_by_chan < CHANNEL_FLAG_CUT * np.max(nsamples_by_chan)) & ~np.all(flags[bl], axis=0))} channels.')
flags[bl][:, nsamples_by_chan < CHANNEL_FLAG_CUT * np.max(nsamples_by_chan)] = True
nsamples[bl][flags[bl]] = 0
(6, 109, 'nn'): flagging 0 pixels. (6, 109, 'ee'): flagging 0 pixels. (6, 109, 'ne'): flagging 0 pixels.
(6, 109, 'en'): flagging 0 pixels.
(6, 109, 'nn'): flagging 243 times. (6, 109, 'ee'): flagging 243 times. (6, 109, 'ne'): flagging 243 times. (6, 109, 'en'): flagging 243 times.
(6, 109, 'nn'): flagging 0 channels. (6, 109, 'ee'): flagging 0 channels. (6, 109, 'ne'): flagging 0 channels. (6, 109, 'en'): flagging 0 channels.
Define and show bands¶
df, bands, min_freqs, max_freqs, band_slices, nchans = parse_band_str(BAND_STR, data.freqs)
min_chan = [slc.start for slc in band_slices]
max_chan = [slc.stop for slc in band_slices]
all_zs = 1420405751.768 / data.freqs - 1
avg_zs = [np.mean(all_zs[slc]) for slc in band_slices]
bandwidth = [f'{nc * df / 1e6:.1f}' for nc in nchans]
def plot_bands():
plt.figure(figsize=(18, 6), dpi=100)
to_plot = np.mean([nsamples[ANTPAIR + ('ee',)], nsamples[ANTPAIR + ('nn',)]], axis=(0, 1))
plt.plot(data.freqs/1e6, to_plot, 'k.-', lw=.5, ms=4, label='Included in a band')
plt.xlabel('Frequency (MHz)')
plt.ylabel(f'Average Nsamples on {ANTPAIR}')
in_any_band = np.sum([(data.freqs / 1e6 <= band[1]) & (data.freqs / 1e6 >= band[0]) for band in bands], axis=0).astype(bool)
plt.plot(data.freqs[~in_any_band] / 1e6, to_plot[~in_any_band], 'r.', lw=.5, ms=4, label='Excluded from all bands')
plt.legend(loc='lower left')
for i, band in enumerate(bands):
plt.axvspan(band[0], band[1], alpha=.3, color=f'C{i}', zorder=0)
plt.text((band[0] + band[1]) / 2, np.max(to_plot) * 1.05, f'Band {i + 1}', ha='center', va='bottom',
bbox=dict(facecolor='w', edgecolor='black', alpha=.75, boxstyle='round'))
plt.ylim([-1, np.max(to_plot) * 1.1])
for freq in [117.19, 133.11, 152.25, 167.97]:
plt.axvline(freq, ls='--', color='k')
for i, freq in enumerate([(117.19 + 133.11)/2, (152.25 + 167.97)/2]):
plt.text(freq, np.max(to_plot) * .03, f'H1C IDR3\nBand {i + 1}', ha='center', va='bottom',
bbox=dict(facecolor='w', edgecolor='black', alpha=.75, boxstyle='round', ls='--'))
plt.tight_layout()
def show_band_table():
table = pd.DataFrame({'Band': np.arange(len(bands)) + 1,
'Channel Range': [f'{c0} — {c1}' for c0, c1 in zip(min_chan, max_chan)],
'# of Channels': nchans,
'Frequency Range (MHz)': [f'{f0:.1f} — {f1:.1f}' for f0, f1 in zip(min_freqs, max_freqs)],
'$\\Delta\\nu$ (MHz)': bandwidth,
'$z$ Range': [f'{1420.405751768 / f1 - 1:.2f} — {1420.405751768 / f0 - 1:.1f}' for f0, f1 in zip(min_freqs, max_freqs)],
'Center $z$': [f'{(1420.405751768 / f1 - 1) / 2 + (1420.405751768 / f0 - 1) / 2:.1f}' for f0, f1 in zip(min_freqs, max_freqs)],
'Delta $z$': [f'{(1420.405751768 / f0 - 1) - (1420.405751768 / f1 - 1):.1f}' for f0, f1 in zip(min_freqs, max_freqs)],
})
return table.style.hide().to_html()
Table 1: Band Definitions¶
HTML(show_band_table())
Band | Channel Range | # of Channels | Frequency Range (MHz) | $\Delta\nu$ (MHz) | $z$ Range | Center $z$ | Delta $z$ |
---|---|---|---|---|---|---|---|
1 | 27 — 126 | 99 | 50.2 — 62.2 | 12.1 | 21.82 — 27.3 | 24.6 | 5.5 |
2 | 135 — 218 | 83 | 63.3 — 73.5 | 10.1 | 18.33 — 21.4 | 19.9 | 3.1 |
3 | 227 — 316 | 89 | 74.6 — 85.4 | 10.9 | 15.63 — 18.0 | 16.8 | 2.4 |
4 | 501 — 567 | 66 | 108.0 — 116.1 | 8.1 | 11.24 — 12.1 | 11.7 | 0.9 |
5 | 577 — 635 | 58 | 117.3 — 124.4 | 7.1 | 10.42 — 11.1 | 10.8 | 0.7 |
6 | 643 — 732 | 89 | 125.4 — 136.2 | 10.9 | 9.43 — 10.3 | 9.9 | 0.9 |
7 | 749 — 830 | 81 | 138.3 — 148.2 | 9.9 | 8.59 — 9.3 | 8.9 | 0.7 |
8 | 846 — 920 | 74 | 150.1 — 159.2 | 9.0 | 7.92 — 8.5 | 8.2 | 0.5 |
9 | 921 — 1008 | 87 | 159.3 — 169.9 | 10.6 | 7.36 — 7.9 | 7.6 | 0.6 |
10 | 1024 — 1100 | 76 | 171.9 — 181.1 | 9.3 | 6.84 — 7.3 | 7.1 | 0.4 |
11 | 1102 — 1225 | 123 | 181.4 — 196.4 | 15.0 | 6.23 — 6.8 | 6.5 | 0.6 |
12 | 1242 — 1323 | 81 | 198.5 — 208.4 | 9.9 | 5.82 — 6.2 | 6.0 | 0.3 |
13 | 1355 — 1423 | 68 | 212.3 — 220.6 | 8.3 | 5.44 — 5.7 | 5.6 | 0.3 |
14 | 1454 — 1509 | 55 | 224.3 — 231.1 | 6.7 | 5.15 — 5.3 | 5.2 | 0.2 |
Figure 1: Bands and Flag Occupancy¶
This figure illustrates the definition of the various bands in which the power spectrum is to be estimated, which frequencies are included/excluded, as well as the showing the fraction of each channel flagged.
if PLOT: plot_bands()
Figure out slicing¶
# figure out high and low bands
FM_ind = np.argmin(np.abs(data.freqs - FM_CUT_FREQ))
unflagged_chans = np.argwhere(~np.all([np.all(flags[bl], axis=0) for bl in flags], axis=0)).squeeze()
if np.any(unflagged_chans < FM_ind):
low_band = slice(np.min(unflagged_chans), np.max(unflagged_chans[unflagged_chans < FM_ind]) + 1)
else:
low_band = slice(0,0)
if np.any(unflagged_chans > FM_ind):
high_band = slice(np.min(unflagged_chans[unflagged_chans > FM_ind]), np.max(unflagged_chans) + 1)
else:
high_band = slice(0,0)
print(f'Below FM Frequency Slice: {low_band}')
print(f'Above FM Frequency Slice: {high_band}')
# figure out the range of times that includes all unflagged times (though may still have some flags) for all polarizations
ORed_flags = np.any([np.all(flags[bl], axis=1) for bl in cross_bls], axis=0)
tslice = slice(true_stretches(~ORed_flags)[0].start, true_stretches(~ORed_flags)[-1].stop)
print(f'Time Slice Excluded Edge Flags: {tslice}')
Below FM Frequency Slice: slice(np.int64(0), np.int64(435), None) Above FM Frequency Slice: slice(np.int64(436), np.int64(1536), None) Time Slice Excluded Edge Flags: slice(np.int64(121), np.int64(3912), None)
# FOR DIAGNOSTICS/DEBUGGING ONLY: unflag everything and set nsamples to the median
if NO_FLAGS_FLAT_NSAMPLES:
for bl in cross_bls:
flags[bl][tslice, :] = False
nsamples[bl][tslice, :] = np.median([nsamples[bl][tslice, :] for bl in cross_bls])
# FOR DIAGNOSTICS/DEBUGGING ONLY: make all the autos a flat 10,000 Jy
if FLAT_AUTOS:
for bl in data:
if bl[0] == bl[1]:
data[bl] = 10000 * np.ones_like(data[bl])
Filtering and Systematics Mitigation¶
Plotting Functions¶
def sym_log_norm(to_plot, linthresh=10, clim=None):
'''Convenience interface for matplotlib.colors.SymLogNorm'''
if clim is None:
return matplotlib.colors.SymLogNorm(linthresh, vmin=-np.nanmax(np.abs(to_plot)), vmax=np.nanmax(np.abs(to_plot)))
else:
return matplotlib.colors.SymLogNorm(linthresh, vmin=clim[0], vmax=clim[1])
def plot_waterfall(data, bl=(ANTPAIR + ('ee',)), flags=flags, nsamples=nsamples, tslice=tslice):
'''Plots data (amplitude and phase) as well as nsamples waterfalls for a baseline.'''
if tslice is None:
tslice = slice(0, data[bl].shape[0], 1)
lsts = np.where(data.lsts > data.lsts[-1], data.lsts - 2 * np.pi, data.lsts)[tslice] * 12 / np.pi
extent = [data.freqs[0]/1e6, data.freqs[-1]/1e6, lsts[-1], lsts[0]]
fig, axes = plt.subplots(1, 3, figsize=(20, 12), sharex=True, sharey=True, dpi=100)
im = axes[0].imshow(np.where(flags[bl], np.nan, np.abs(data[bl]))[tslice], aspect='auto', norm=matplotlib.colors.LogNorm(), interpolation='none', cmap='inferno', extent=extent)
fig.colorbar(im, ax=axes[0], location='top', pad=.02, label=f'{bl}: Amplitude (Jy)')
im = axes[1].imshow(np.where(flags[bl], np.nan, np.angle(data[bl]))[tslice], aspect='auto', cmap='twilight', interpolation='none', extent=extent)
fig.colorbar(im, ax=axes[1], location='top', pad=.02, label=f'{bl}: Phase (Radians)')
im = axes[2].imshow(nsamples[bl][tslice], aspect='auto', interpolation='none', extent=extent)
fig.colorbar(im, ax=axes[2], location='top', pad=.02, label=f'{bl}: Number of Samples')
plt.tight_layout()
for ax in axes:
ax.set_ylabel('LST (Hours)')
ax.set_xlabel('Frequency (MHz)')
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
ax.set_yticklabels([f'{(int(val) if np.isclose(val, int(val)) else val) % 24:n}' for val in ax.get_yticks()])
plt.tight_layout()
def plot_real_delay_vs_lst(data, bl=(ANTPAIR + ('ee',)), flags=None, xlim=[-1999, 1999], clim=None, linthresh=10, taper=TAPER, tslice=tslice):
'''Plots the real part of the tapered FFT of the data in each power spectrum band as a function of delay and LST.'''
if tslice is None:
tslice = slice(0, data[bl].shape[0], 1)
lsts = np.where(data.lsts > data.lsts[-1], data.lsts - 2 * np.pi, data.lsts)[tslice] * 12 / np.pi
fig, axes = plt.subplots(1, len(bands), figsize=(28, 12), sharex=True, sharey=True, gridspec_kw={'wspace': .03}, dpi=100)
for i, (ax, band, band_slice) in enumerate(zip(axes, bands, band_slices)):
dfft = uvtools.utils.FFT(data[bl][tslice, band_slice], axis=1, taper=taper)
delays = uvtools.utils.fourier_freqs(data.freqs[band_slice]) * 1e9
to_plot = np.real(dfft)
if flags is not None:
flagged_times = np.all(flags[bl][tslice, band_slice], axis=1)
to_plot[flagged_times, :] = np.nan
if i == 0:
_to_plot = copy.deepcopy(to_plot)
ax.set_ylabel('LST (Hours)')
im = ax.imshow(to_plot, interpolation='none', aspect='auto', cmap='bwr', norm=sym_log_norm(_to_plot, linthresh=linthresh, clim=clim),
extent=[delays[0], delays[-1], lsts[-1], lsts[0]])
for dly in dly_filter_half_widths[0] * 1e9 * np.array([1, -1]):
ax.axvline(dly, ls='--', color='k', lw=.5)
for dly in inpaint_filter_half_widths[0] * 1e9 * np.array([1, -1]):
ax.axvline(dly, ls=':', color='k', lw=.5)
ax.set_xlim(xlim)
ax.set_title(f'Band {i+1}:\n{band[0]}—{band[1]} MHz', fontsize=10)
ax.set_xlabel('Delay (ns)')
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
ax.set_yticklabels([f'{(int(val) if np.isclose(val, int(val)) else val) % 24:n}' for val in ax.get_yticks()])
plt.colorbar(im, ax=axes, pad=.02, aspect=40, extend='both', label=f'Re$[\\widetilde{{V}}_{{{bl}}}]$ (Jy)')
def plot_dly_vs_fr(data, bl=(ANTPAIR + ('ee',)), xlim=[-1999, 1999], ylim=[-5, 5], clim=None, tslice=tslice, taper=TAPER):
'''Plots the magnitude of the 2D tapered FFT of the data in each power spectrum band as a function of delay and FR.
Also shows the foreground filtering delay, the main beam range of FRs, and the expected shape of mutual coupling.'''
fig, axes = plt.subplots(1, len(bands), figsize=(28, 6), sharex=True, sharey=True, gridspec_kw={'wspace': .03}, dpi=100)
if tslice is None:
tslice = slice(0, data[bl].shape[0], 1)
to_plots = []
for i, (ax, band, band_slice) in enumerate(zip(axes, bands, band_slices)):
dfft = uvtools.utils.FFT(data[bl][:, band_slice], axis=1, taper=TAPER)
delays = uvtools.utils.fourier_freqs(data.freqs[band_slice]) * 1e9
frates = uvtools.utils.fourier_freqs((data.times[tslice] - data.times[0]) * 24 * 60 * 60) * 1000
dfft2 = uvtools.utils.FFT(dfft[tslice, :], axis=0, taper=TAPER)
to_plot = np.abs(dfft2)
to_plots.append(to_plot)
if i == 0:
_to_plot = copy.deepcopy(to_plot)
ax.set_ylabel('Fringe Rate (mHz)')
im = ax.imshow(to_plot, interpolation='none', aspect='auto', cmap='turbo',
norm=matplotlib.colors.LogNorm(vmin=(clim[0] if clim is not None else np.min(_to_plot)),
vmax=(clim[1] if clim is not None else np.max(_to_plot))),
extent=[delays[0], delays[-1], frates[-1], frates[0]])
for dly in dly_filter_half_widths[0] * 1e9 * np.array([1, -1]):
ax.axvline(dly, ls='--', color='k', lw=.5)
for dly in inpaint_filter_half_widths[0] * 1e9 * np.array([1, -1]):
ax.axvline(dly, ls=':', color='k', lw=.5)
for fr in fr_ranges[band]:# + (pol,)]:
ax.axhline(fr, ls='--', color='k', lw=.5)
ax.set_ylim(ylim)
ax.set_xlim(xlim)
ax.set_title(f'Band {i+1}:\n{band[0]}—{band[1]} MHz', fontsize=10)
ax.set_xlabel('Delay (ns)')
plt.colorbar(im, ax=axes, pad=.02, aspect=40, extend='both', label=f'$|\\widetilde{{V}}_{{{bl}}}|$ (Jy)')
omega_earth = 2 * np.pi / (24 * 3600) #rad/s
hera_dec = -30.72152612068925 * np.pi / 180
def calculate_fr(bl_vec, freq):
"""bl_vec in meters, freq in Hz, returns fr in mHz"""
bl_we = bl_vec[0]
fr = -bl_we * omega_earth * freq * np.cos(hera_dec) / constants.c * 1e3
return fr
def max_fr(tau, freq):
"""takes in tau in ns, freq in Hz, returns corresponding max fringe rate in mHz"""
return omega_earth * freq * tau * np.cos(hera_dec) * 1e-6 # mHz
b_ij = data.antpos[bl[1]] - data.antpos[bl[0]]
for i in range(len(bands)):
freq = np.mean(bands[i]) * 1e6
axes[i].plot(delays, max_fr(delays, freq) + calculate_fr(b_ij, freq), color='k', ls='--', lw=.5)
axes[i].plot(delays, -max_fr(delays, freq) + calculate_fr(b_ij, freq), color='k', ls='--', lw=.5)
Filtering and Post-Processing Functions¶
def delay_filter(data, wgts, filter_centers, filter_half_widths, eigenval_cutoff, cache={}, bls=None, zeros_where_zero_wgt=True):
'''This function performs a high-pass delay filter, removing the wedge plus some buffer. It also performs inpainting with the same delay.'''
dly_filt_data = copy.deepcopy(data)
inpainted_data = copy.deepcopy(data)
if bls is None:
bls = cross_bls
for bl in bls:
d_mdl = np.zeros_like(dly_filt_data[bl])
for band in [low_band, high_band]:
if band.start >= band.stop:
# This can happen if the frequencies are all above/below FM
continue
d_mdl[:, band], _, info = dspec.fourier_filter(data.freqs[band], dly_filt_data[bl][:, band], wgts=wgts[bl][:, band], filter_centers=filter_centers,
filter_half_widths=filter_half_widths, mode='dpss_solve',
eigenval_cutoff=[eigenval_cutoff], suppression_factors=[eigenval_cutoff],
max_contiguous_edge_flags=len(data.freqs), cache=cache)
if zeros_where_zero_wgt:
dly_filt_data[bl] = np.where(wgts[bl] == 0, 0, dly_filt_data[bl] - d_mdl)
else:
dly_filt_data[bl] = dly_filt_data[bl] - d_mdl
inpainted_data[bl] = np.where(wgts[bl] == 0, d_mdl, data[bl])
return dly_filt_data, inpainted_data
def xtalk_filter(data, wgts, xtalk_fr=XTALK_FR, tslice=tslice, cache={}, bls=None):
'''This function performs a high-pass filter in fringe rate, removing some small range around the 0 FR mode.'''
xtalk_filt_data = copy.deepcopy(data)
if bls is None:
bls = cross_bls
for bl in bls:
if tslice is None:
tslice = slice(0, data[bl].shape[0], 1)
d_mdl, _, info = dspec.fourier_filter(data.times[tslice] * 24 * 60 * 60, data[bl][tslice],
wgts=wgts[bl][tslice, :], filter_centers=[0],
filter_half_widths=[xtalk_fr / 1000], mode='dpss_solve',
eigenval_cutoff=[FR_EIGENVAL_CUTOFF], suppression_factors=[FR_EIGENVAL_CUTOFF],
max_contiguous_edge_flags=len(data.times), cache=cache, filter_dims=0)
xtalk_filt_data[bl][tslice, :] = np.where(wgts[bl][tslice, :] == 0, 0, xtalk_filt_data[bl][tslice, :] - d_mdl)
return xtalk_filt_data
def main_beam_FR_filter(data, wgts, tslice=tslice, cache={}, bls=None):
'''This function performs fringe rate filtering, keeping a range determined by fr_ranges for each band.'''
frf_data = copy.deepcopy(data)
if bls is None:
bls = cross_bls
info = {}
for bl in bls:
if tslice is None:
tslice = slice(0, data[bl].shape[0], 1)
info[bl] = {}
d_mdl = np.zeros_like(data[bl])
for band, band_slice in zip(bands, band_slices):
d_mdl[tslice, band_slice], _, info_band = dspec.fourier_filter(
data.times[tslice] * 24 * 60 * 60,
data[bl][tslice, band_slice],
wgts=wgts[bl][tslice, band_slice],
filter_centers=[np.mean(fr_ranges[band]) / 1000],
filter_half_widths=[np.diff(fr_ranges[band]) / 2 / 1000],
mode='dpss_solve',
eigenval_cutoff=[FR_EIGENVAL_CUTOFF],
suppression_factors=[FR_EIGENVAL_CUTOFF],
max_contiguous_edge_flags=len(data.times),
cache=cache,
filter_dims=0
)
info[bl][band] = info_band
frf_data[bl] *= 0
frf_data[bl][tslice, :] = np.where(wgts[bl][tslice, :] == 0, 0, d_mdl[tslice, :])
return frf_data, info
def form_pseudostokes(data=None, flags=None, nsamples=None, pol_convention=hd.pol_convention, x_orientation=hd.x_orientation):
'''This function uses hera_pspec.pstokes to supplement existing datacontainers with pstokes I, Q, U, and/or V (where possible).'''
for ap in data.antpairs():
# loop over pseudo-stokes parameters
iter_list = [('ee', 'nn', 'pI'), ('ee', 'nn', 'pQ'), ('en', 'ne', 'pU'), ('en', 'ne', 'pV')]
for pol1, pol2, pstokes in iter_list:
bl1 = ap + (pol1,)
bl2 = ap + (pol2,)
data_list = ([data[bl1], data[bl2]] if (data is not None) and (bl1 in data) and (bl2 in data) else None)
flags_list = ([flags[bl1], flags[bl2]] if (flags is not None) and (bl1 in flags) and (bl2 in flags) else None)
nsamples_list = ([nsamples[bl1], nsamples[bl2]] if (nsamples is not None) and (bl1 in nsamples) and (bl2 in nsamples) else None)
# use hp.pstokes._combine_pol_arrays() to properly combine data/flags/nsamples in a pol_convention-aware way
(combined_data,
combined_flags,
combined_nsamples) = hp.pstokes._combine_pol_arrays(pol1, pol2, pstokes,
pol_convention=pol_convention,
data_list=data_list,
flags_list=flags_list,
nsamples_list=nsamples_list,
x_orientation=x_orientation)
# put results in original data containers
if data is not None:
data[ap + (pstokes,)] = combined_data
if flags is not None:
flags[ap + (pstokes,)] = combined_flags
if nsamples is not None:
nsamples[ap + (pstokes,)] = combined_nsamples
def timeavg_data(data, flags, nsamples, Navg=int(np.round(AVERAGING_TIME / (dt * NINTERLEAVE))), pols=None, tslice=slice(None), rephase=True):
'''Performs coherent averaging of Navg integrations, rephasing to the common phase center.'''
avg_data = datacontainer.DataContainer({})
avg_flags = datacontainer.DataContainer({})
avg_nsamples = datacontainer.DataContainer({})
# perform time-averaging
for bl in data:
if (pols is not None) and (bl[2] not in pols):
continue
bl_vec = data.antpos[bl[0]] - data.antpos[bl[1]]
(avg_data[bl],
avg_flags[bl],
avg_nsamples[bl],
avg_lsts,
extra) = frf.timeavg_waterfall(data[bl][tslice, :], Navg,
flags=np.zeros_like(flags[bl][tslice, :]),
nsamples=nsamples[bl][tslice, :],
extra_arrays={'times': data.times[tslice]},
lsts=data.lsts[tslice], freqs=data.freqs,
rephase=rephase, bl_vec=bl_vec, verbose=False)
avg_flags[bl][avg_nsamples[bl] == 0] = True # TODO: is this right???
# attach relevant quantities to datacontainer
for dc in (avg_data, avg_flags, avg_nsamples):
dc.freqs = copy.deepcopy(data.freqs)
dc.antpos = copy.deepcopy(data.antpos)
dc.lsts = avg_lsts
dc.times = extra['avg_times']
return avg_data, avg_flags, avg_nsamples
Figure out delay filter properties¶
bl_vec = (data.antpos[ANTPAIR[1]] - data.antpos[ANTPAIR[0]])
bl_len = np.linalg.norm(bl_vec[:2]) / constants.c
dly_filter_centers, dly_filter_half_widths = vis_clean.gen_filter_properties(
ax='freq', horizon=DLY_FILT_HORIZON, standoff=DLY_FILT_STANDOFF,
min_dly=DLY_FILT_MIN_DLY, bl_len=bl_len
)
inpaint_filter_centers, inpaint_filter_half_widths = vis_clean.gen_filter_properties(
ax='freq', horizon=INPAINT_HORIZON, standoff=INPAINT_STANDOFF,
min_dly=INPAINT_MIN_DLY, bl_len=bl_len
)
If desired, propagate flags on a channel to all times that will be coherently averaged together¶
This is primarily useful in the delay-filtered case with flags, where we want to avoid different interleaves having different flagging patterns.
def factored_flags(flag_array, tslice=slice(None)):
'''If any channel is flagged in a boolean flag_array, flag all times for that channel unless the channel flag comes from an entirely flagged time.'''
out_flags = np.zeros_like(flag_array)
flagged_times = np.all(flag_array, axis=1)
out_flags[flagged_times, :] = True
if not np.all(flagged_times) :
chan_flags = np.any(flag_array[~flagged_times], axis=0)
out_flags[:, chan_flags] = True
return out_flags
if FLAG_COHERENT_CHUNKS:
Nchunk = int(AVERAGING_TIME // (dt * NINTERLEAVE)) * NINTERLEAVE
for ci in range(int(np.ceil(flags[bl][tslice].shape[0] / Nchunk))):
for bl in flags:
flags[bl][tslice][ci * Nchunk:(ci + 1) * Nchunk] = factored_flags(flags[bl][tslice][ci * Nchunk:(ci + 1) * Nchunk], tslice=tslice)
Perform inpainting and/or delay-filtering¶
def per_band_avg_nsamples(nsamples, band_slices):
'''Create new datacontainer where nsamples has been averaged per-integration in band_slices.
This is an approximate way to account for the fact that inpainted data is considered Nsamples=0.'''
out_nsamples = copy.deepcopy(nsamples)
for bl in out_nsamples:
for band in band_slices:
for i in range(out_nsamples[bl].shape[0]):
out_nsamples[bl][i, band] = np.mean(out_nsamples[bl][i, band])
return out_nsamples
def build_weights(data, flags, nsamples, wgt_by_avg_nsamples=False, band_slices=[]):
'''Construct weights proportional to inverse noise variance (i.e. Nsamples / Autocorr^2).
If wgt_by_avg_nsamples is True, average Nsamples in each subband (defined with band_slices).
This avoids the introduction of spectral structure.'''
wgts = copy.deepcopy(data)
if wgt_by_avg_nsamples:
nsamples_here = per_band_avg_nsamples(nsamples, band_slices)
else:
nsamples_here = copy.deepcopy(nsamples)
for bl in wgts:
auto_bl = auto_antpair + bl[2:]
wgts[bl] = np.where(flags[bl], 0, np.abs(data[auto_bl])**-2 * nsamples_here[bl])
wgts[bl] /= np.abs(np.nanmean(np.where(flags[bl], np.nan, wgts[bl]))) # avoid dynamic range issues
wgts[bl][~np.isfinite(wgts[bl])] = 0
return wgts
# Build weights for delay-filter and/or inpainting that don't involve any Nsample averaging
freq_filt_wgts = build_weights(data, flags, nsamples)
# Inpaint autocorrelations to allow for prediction of thermal noise on every channel
if not ALREADY_INPAINTED:
_, data = delay_filter(data, freq_filt_wgts, inpaint_filter_centers, inpaint_filter_half_widths, INPAINT_EIGENVAL_CUTOFF,
bls=[auto_antpair + (pol,) for pol in ['ee', 'nn']])
filt_data = copy.deepcopy(data)
filt_flags = copy.deepcopy(flags)
filt_nsamples = copy.deepcopy(nsamples)
# This cell replaces data with appropriate noise, which is useful for debugging
if USE_SIMULATED_NOISE:
np.random.seed(21)
reds = redcal.get_reds(filt_data.data_antpos, pols=filt_data.pols(), include_autos=True, bl_error_tol=2.0)
red_inpainted = datacontainer.RedDataContainer(data, reds=reds)
for bl in cross_bls:
predicted_var = noise.predict_noise_variance_from_autos(bl, red_inpainted, nsamples=nsamples)
predicted_var = np.where(~np.isfinite(predicted_var), 0, predicted_var)
filt_data[bl] = np.sqrt(predicted_var) / 2**.5 * (np.random.randn(hd.Ntimes, hd.Nfreqs) + 1.0j * np.random.randn(hd.Ntimes, hd.Nfreqs))
# Inpaint crosses
if PERFORM_INPAINT:
_, filt_data = delay_filter(data, freq_filt_wgts, inpaint_filter_centers, inpaint_filter_half_widths, INPAINT_EIGENVAL_CUTOFF)
for bl in cross_bls:
for band in [high_band, low_band]:
if band.start >= band.stop:
continue
for i in range(filt_flags[bl].shape[0]):
if not np.all(filt_flags[bl][i, band]):
filt_flags[bl][i, band] = False
inpainted = copy.deepcopy(filt_data)
# perform delay filtering on crosses
if PERFORM_DLY_FILT:
filt_data, _ = delay_filter(inpainted, freq_filt_wgts, dly_filter_centers, dly_filter_half_widths, DLY_FILT_EIGENVAL_CUTOFF, zeros_where_zero_wgt=False)
# Recompute time filter flags, averaging nsamples in subbands if desired (this also applies to further processing)
if USE_BAND_AVG_NSAMPLES:
filt_nsamples = per_band_avg_nsamples(filt_nsamples, band_slices)
time_filt_wgts = build_weights(filt_data, filt_flags, filt_nsamples)
# perform deinterleaving
deint_filt_data = filt_data.deinterleave(NINTERLEAVE, tslice=tslice)
deint_flags = filt_flags.deinterleave(NINTERLEAVE, tslice=tslice)
deint_nsamples = filt_nsamples.deinterleave(NINTERLEAVE, tslice=tslice)
deint_wgts = time_filt_wgts.deinterleave(NINTERLEAVE, tslice=tslice)
Figure out per-band fringe-rate filter ranges¶
# TODO: graduate this code into hera_cal
# load relevant FR spectrum vs. frequency and associated metadata
with h5py.File(FR_SPECTRA_FILE, "r") as h5f:
metadata = h5f["metadata"]
bl_to_index_map = {tuple(ap): int(index) for index, antpairs in metadata["baseline_groups"].items() for ap in antpairs}
spectrum_freqs = metadata["frequencies_MHz"][()] * 1e6
m_modes = metadata["erh_mode_integer_index"][()]
if ANTPAIR in bl_to_index_map:
mmode_spectrum = h5f["erh_mode_power_spectrum"][:, :, bl_to_index_map[ANTPAIR]]
else:
# If ANTPAIR is not in the FR_SPECTRA_FILE, but the reverse is, also reverse the spectrum
mmode_spectrum = h5f["erh_mode_power_spectrum"][:, :, bl_to_index_map[ANTPAIR[::-1]]]
m_modes *= -1
# TODO: graduate this code into hera_cal
# convert to fringe rate, accouting for the fact that we have less than 24 hours of LST
def m2f(m_modes):
# Convert m-modes to fringe-rates in mHz.
return m_modes / units.sday.to(units.ks)
times_ks = (data.times[tslice] - data.times[0] + np.median(np.diff(data.times))) * units.day.to(units.ks)
m2f_phasors = np.exp(2j * np.pi * m2f(m_modes)[None, :] * times_ks[:, None])
m2f_mixer = np.fft.fftshift(np.fft.fft(np.fft.ifftshift(m2f_phasors, axes=0), axis=0), axes=0)
# f is fringe rate, m is m-mode, n is nu (i.e. freqeuency)
fr_spectrum = np.abs(np.einsum('fm,mn,mf->fn', m2f_mixer, mmode_spectrum, m2f_mixer.T.conj()))
# TODO: graduate this code into hera_cal
# interpolate to all frequencies in data
interp_fr_spectrum = interpolate.interp1d(spectrum_freqs, fr_spectrum, kind='cubic', fill_value='extrapolate')(data.freqs)
frates = np.fft.fftshift(np.fft.fftfreq(len(times_ks), d=np.median(np.diff(times_ks))))
# perform window-weighted average over each band, then get lower and upper quantiles
fr_ranges = {}
fr_profiles = {}
frf_losses = {}
xtalk_overlaps = {}
for band, bs in zip(bands, band_slices):
taper = dspec.gen_window(TAPER, len(data.freqs[bs]))
band_avg_fr_spectrum = np.average(interp_fr_spectrum[:, bs], weights=taper**2, axis=1)
band_avg_fr_spectrum /= np.sum(band_avg_fr_spectrum)
fr_profiles[band] = band_avg_fr_spectrum
cumsum_interpolator = interpolate.interp1d(np.cumsum(band_avg_fr_spectrum), frates)
fr_ranges[band] = cumsum_interpolator(FR_QUANTILE_LOW), cumsum_interpolator(FR_QUANTILE_HIGH)
# account for overlap between FR=0 notch and main beam FRF
def overlap_frs(frs1, frs2):
start = np.maximum(frs1[0], frs2[0])
end = np.minimum(frs1[1], frs2[1])
return (start, end) if start < end else None
frate_interpolator = interpolate.interp1d(frates, np.cumsum(band_avg_fr_spectrum))
frf_losses[band] = 1 - frate_interpolator(fr_ranges[band][1]) + frate_interpolator(fr_ranges[band][0])
xtalk_overlaps[band] = overlap_frs(fr_ranges[band], [-XTALK_FR, XTALK_FR])
if xtalk_overlaps[band] is not None:
frf_losses[band] += frate_interpolator(xtalk_overlaps[band][1]) - frate_interpolator(xtalk_overlaps[band][0])
# Helper fft functions for signal loss calculation.
def FFT(data, axis):
"""Thin wrapper around fft stuff."""
return np.fft.fftshift(np.fft.fft(np.fft.ifftshift(data, axes=axis), axis=axis), axes=axis)
def IFFT(data, axis):
"""Thin wrapper around ifft stuff."""
return np.fft.ifftshift(np.fft.ifft(np.fft.fftshift(data, axes=axis), axis=axis), axes=axis)
# TODO: graduate this code to hera_cal
def construct_filter(times, fc, fhw, eigval_cutoff=FR_EIGENVAL_CUTOFF, wgts=None):
"""Compute the time-domain DPSS filter matrix.
This function essentially computes a generalization of Eq. 3.17 from Pascua+ 2024
to allow for non-uniform weighting in time. If the DPSS filter design matrix is A,
and the weights are W, then the filter matrix T is
T = A @ (A^\dag @ W @ A)^{-1} @ (W @ A)^\dag.
The design matrix A is Eq. 3.14 from Pascua+ 2024.
"""
if wgts is None:
wgts = np.ones(times.size)
# Compute the phasor for shifting the DPSS modes to the filter center.
frf_phasor = np.exp(-2j * np.pi * fc * (times-times.mean()))
# Compute the B*W parameter for defining the DPSS modes.
half_bandwidth = (times[-1] - times[0]) * fhw
# Approximation for the extra number of modes to compute beyond 2*B*W
n_extra_modes = int(
4 * np.log(4*times.size) * np.log(4/eigval_cutoff) / np.pi**2
)
# Generate the DPSS modes used for filtering.
n_modes = int(2 * half_bandwidth) + n_extra_modes
modes, eigvals = dpss(
times.size, half_bandwidth, Kmax=n_modes, return_ratios=True
)
# Apply the eigenvalue cutoff and rephase to the filter center.
cut = np.argwhere(eigvals >= eigval_cutoff).flatten()[-1]
modes = modes[:cut].T * frf_phasor[:,None] # Shape (n_times, n_dpss)
# Compute the filter matrix according to a weighted least-squares fit.
ATW = modes.T.conj() * wgts
return modes @ np.linalg.solve(ATW @ modes, ATW)
# TODO: incorporate coherent average and time-interleaving to signal loss calculation.
# Update frf signal loss calculation.
frf_losses = {}
for band, band_slice in zip(bands, band_slices):
# TODO: figure out how to handle frequency and polarization-dependent weights.
# The weights change as a function of frequency (due to the inverse noise weighting),
# so this isn't entirely accurate; will need to update in the future.
# Also need to more carefully consider how to combine the weights across polarization,
# and which precise set of weights to use when filtering the data.
wgts = np.mean([time_filt_wgts[bl][tslice,band_slice] for bl in cross_bls if bl[2][0] == bl[2][1]], axis=(0,2))
if np.all(wgts==0):
frf_losses[band] = np.nan
continue
# Compute the filter center and filter half-width.
fmin, fmax = fr_ranges[band]
fc = 0.5 * (fmin+fmax) # mHz
fhw = 0.5 * np.abs(fmax-fmin) # mHz
# Generate the main-lobe and crosstalk time-time filter matrices.
main_lobe_filt = construct_filter(times_ks, fc, fhw, eigval_cutoff=FR_EIGENVAL_CUTOFF, wgts=wgts)
xt_filt = np.eye(times_ks.size) - construct_filter(times_ks, 0, XTALK_FR, eigval_cutoff=FR_EIGENVAL_CUTOFF, wgts=wgts)
# Fourier transform to obtain the fringe-rate filter matrices.
filter_xfer_matrix = FFT(IFFT(main_lobe_filt @ xt_filt, axis=0), axis=1)
# Now compute the signal loss.
filtered_power = np.sum(fr_profiles[band][None,:] * np.abs(filter_xfer_matrix)**2)
unfiltered_power = np.sum(fr_profiles[band])
frf_losses[band] = 1 - filtered_power / unfiltered_power
def show_FR_table():
table = pd.DataFrame({'Band': np.arange(len(bands)) + 1,
'Frequency Range (MHz)': [f'{f0:.1f} — {f1:.1f}' for f0, f1 in zip(min_freqs, max_freqs)],
f'Main Beam {FR_QUANTILE_LOW:.0%} — {FR_QUANTILE_HIGH:.0%}<br>Kept Fringe Rates (mHz)': [f'{frs[0]:.3f} to {frs[1]:.3f}' for frs in fr_ranges.values()],
f'Signal Loss with<br>{-XTALK_FR} to {XTALK_FR} mHz X-Talk Filter': [f'{loss:.1%}' for loss in frf_losses.values()],
})
return table.style.hide().to_html()
Table 2: Fringe-Rate and Crosstalk Filtering Ranges and Losses¶
The losses computed here are based on an extension of the framework from Pascua+ 2024 to allow for non-uniform time weighting. These losses include contributions from both the main beam filter and the crosstalk filter, but do not include losses from coherent time averaging or the time-interleaved incoherent average.
HTML(show_FR_table())
Band | Frequency Range (MHz) | Main Beam 5% — 95% Kept Fringe Rates (mHz) |
Signal Loss with -0.01 to 0.01 mHz X-Talk Filter |
---|---|---|---|
1 | 50.2 — 62.2 | -0.512 to -0.172 | 3.8% |
2 | 63.3 — 73.5 | -0.625 to -0.097 | 7.0% |
3 | 74.6 — 85.4 | -0.686 to -0.287 | 4.7% |
4 | 108.0 — 116.1 | -0.891 to -0.496 | 2.4% |
5 | 117.3 — 124.4 | -0.950 to -0.546 | 2.4% |
6 | 125.4 — 136.2 | -1.018 to -0.602 | 2.6% |
7 | 138.3 — 148.2 | -1.107 to -0.665 | 2.9% |
8 | 150.1 — 159.2 | -1.189 to -0.723 | 2.5% |
9 | 159.3 — 169.9 | -1.253 to -0.780 | 2.6% |
10 | 171.9 — 181.1 | -1.335 to -0.843 | 2.6% |
11 | 181.4 — 196.4 | -1.422 to -0.907 | 2.7% |
12 | 198.5 — 208.4 | -1.519 to -0.989 | 2.7% |
13 | 212.3 — 220.6 | -1.610 to -1.057 | 2.8% |
14 | 224.3 — 231.1 | -1.690 to -1.114 | 2.9% |
# precomputation for rephasing before coherent averaging, ensuring all interleaves are rephased to the same lst
bl_vec = {bl: data.antpos[bl[0]] - data.antpos[bl[1]] for bl in [ap + (pol,) for ap in data.antpairs()
for pol in (utils._VISPOLS | set(utils.POL_STR2NUM_DICT))]}
Navg = int(AVERAGING_TIME // (dt * NINTERLEAVE))
n_avg_int = int(np.ceil(len(deint_filt_data[0].lsts) / Navg))
target_lsts = [np.mean([np.unwrap(d.lsts)[i * Navg:(i+1) * Navg] for d in deint_filt_data]) for i in range(n_avg_int)]
Run time filtering, time averaging, and form pseudo-Stokes¶
# Initialize arrays for storing intermediate and final results for each interleave
deint_avg_data, deint_avg_flags, deint_avg_nsamples = [], [], []
deint_xtalk_filt_data, deint_frf_data = [], []
# List of dicts saying whether main lobe FRF was skipped or not
FRF_info = []
for d, f, n, w in zip(deint_filt_data, deint_flags, deint_nsamples, deint_wgts):
# Perform full time-filtering
if not SKIP_XTALK_AND_FRF:
# Crosstalk filtering of FR=0 mode
xtalk_filt_d = xtalk_filter(d, w, tslice=None)
deint_xtalk_filt_data.append(xtalk_filt_d)
# Main beam FRF and forming pseduostokes
frf_d, info_stream = main_beam_FR_filter(xtalk_filt_d, w, tslice=None)
FRF_info.append(info_stream)
deint_frf_data.append(frf_d)
else:
deint_xtalk_filt_data.append(d)
deint_frf_data.append(d)
# Coherent time-averaging, rephasing to a set of lsts that's consistent across interleaves
dlst = [target_lsts[i] - l for i in range(n_avg_int) for l in np.unwrap(d.lsts)[i * Navg:(i+1) * Navg]]
rephased_frf_d = copy.deepcopy(deint_frf_data[-1])
utils.lst_rephase(rephased_frf_d, bl_vec, d.freqs, dlst, lat=hd.telescope_location_lat_lon_alt_degrees[0], inplace=True)
pstokes_pols = sorted([pol for pol in rephased_frf_d.pols() if utils.polstr2num(pol, x_orientation=hd.x_orientation) > 0])
avg_d, avg_f, avg_n = timeavg_data(rephased_frf_d, f, n, Navg=int(AVERAGING_TIME // (dt * NINTERLEAVE)), rephase=False)
# Form pseudo-Stokes I, Q, U, and V from ee and nn
form_pseudostokes(avg_d, avg_f, avg_n)
# Store Results
deint_avg_data.append(avg_d)
deint_avg_flags.append(avg_f)
deint_avg_nsamples.append(avg_n)
Compute correction factor for coherent averaging, either approximately with mode counting or more exactly with the FRF noise covariances (set USE_CORR_MATRIX=True)¶
# TODO: this function should probably be graduated into hera_pspec or hera_cal
def dpss_coherent_avg_correction(spw):
'''This function computes an approximate correction to the noise calculation after fringe-rate filtering. It assumes the that number of integrations
that are coherently averaged together is equal the ratio of the number of integrations per interleave divided by the number of FR modes kept. This
is then used to correct the calculation done in hera_pspec, which doesn't know about the FRF. The actual noise power spectrum is reduced by this factor
compared to what naively comes out of hera_pspec. However, when performing incoherent averaging of power spectra, one needs to raise noise power spectrum
by the square-root of this factor to account for the correlations between coherently-averaged power spectrum bins.'''
if SKIP_XTALK_AND_FRF:
coherent_avg_correction_factor = 1.0
else:
band = bands[spw]
time_in_seconds = (deint_filt_data[0].times - deint_filt_data[0].times.min()) * 60 * 60 * 24 # time array in seconds
time_filters = dspec.dpss_operator(time_in_seconds, [np.mean(fr_ranges[band]) / 1000],
[np.diff(fr_ranges[band]) / 2 / 1000], eigenval_cutoff=[FR_EIGENVAL_CUTOFF])[0].real
# count the effective number of integrations that go into each coherent average, accounting for overlap with the xtalk filter
if xtalk_overlaps[band] is None:
actual_integrations_per_coherent_avg = time_filters.shape[0] / time_filters.shape[1] # ratio of total number of DPSS FR modes to modes kept after filtering
else:
overlap_filters = dspec.dpss_operator(time_in_seconds, [np.mean(xtalk_overlaps[band]) / 1000],
[np.diff(xtalk_overlaps[band]) / 2 / 1000], eigenval_cutoff=[FR_EIGENVAL_CUTOFF])[0].real
actual_integrations_per_coherent_avg = time_filters.shape[0] / (time_filters.shape[1] - overlap_filters.shape[1])
integrations_per_coherent_avg = int(AVERAGING_TIME // (dt * NINTERLEAVE))
coherent_avg_correction_factor = actual_integrations_per_coherent_avg / integrations_per_coherent_avg
return coherent_avg_correction_factor
def get_frop_wrapper(mode="main_lobe", pol="nn", stream_ind=0, band_ind=0, t_avg=AVERAGING_TIME,
rephase=True, wgt_tavg_by_nsample=True, bl_vec=None,
dlst=None, coherent_avg=True, times=None,
freq_decimation=CORR_MATRIX_FREQ_DECIMATION):
"""
This wraps hera_cal.frf.get_frop_for_noise using specific information from this notebook so that we can do
post FRF time-time visibility covariance calculation. It returns an operator that can be used on a dynamic
spectrum to filter it down the time axis.
"""
bl=(ANTPAIR[0], ANTPAIR[1], pol)
band = bands[band_ind]
band_slice = band_slices[band_ind]
if freq_decimation > 1:
band_slice = slice(
band_slice.start,
band_slice.stop,
freq_decimation
)
weights=deint_wgts[stream_ind][bl][:, band_slice]
nsamples = deint_nsamples[stream_ind][bl][:, band_slice]
if times is None:
times = np.modf(single_bl_times[tslice][stream_ind::NINTERLEAVE])[0] * 24 * 3600
times = times - times[0]
if mode == "main_lobe":
filter_cent_use = [np.mean(fr_ranges[band]) / 1000]
filter_half_wid_use = [np.diff(fr_ranges[band]) / 2 / 1000]
elif mode == "xtalk":
filter_cent_use = [0]
filter_half_wid_use=[XTALK_FR / 1000]
frop = frf.get_frop_for_noise(times, filter_cent_use=filter_cent_use,
filter_half_wid_use=filter_half_wid_use,
freqs=data.freqs[band_slice], t_avg=t_avg,
eigenval_cutoff=FR_EIGENVAL_CUTOFF, weights=weights,
rephase=rephase, wgt_tavg_by_nsample=wgt_tavg_by_nsample,
nsamples=nsamples, bl_vec=bl_vec, dlst=dlst,
coherent_avg=coherent_avg)
if mode == "xtalk":
frop = np.eye(len(times))[:, :, None] - frop
return frop
coherent_avg_correction_factors = []
for spw, band in enumerate(bands):
freq_slice = band_slices[spw]
decimated_freq_slice = slice(
freq_slice.start,
freq_slice.stop,
CORR_MATRIX_FREQ_DECIMATION
)
if USE_CORR_MATRIX and not SKIP_XTALK_AND_FRF:
corr_factors = np.zeros(NINTERLEAVE, dtype=float)
known_bad_streams = np.zeros(NINTERLEAVE, dtype=bool)
for stream_ind in range(NINTERLEAVE):
times=deint_filt_data[stream_ind].times * 24 * 3600
cov_here = 0
num_skipped_pols = 0
for pol in ("ee", "nn"):
cross_antpairpol = ANTPAIR + (pol,)
info_here = FRF_info[stream_ind][cross_antpairpol][bands[spw]]["status"]["axis_0"]
info_array = np.array([info_here[key] for key in info_here])
# Assuming any skips mean the whole deal is off.
# Since we're decimating usually by a factor of 10, this should not often be affected by the odd fully flagged channel here or there.
Nfreqs_spw = freq_slice.stop - freq_slice.start
info_slice = slice(0,
Nfreqs_spw,
CORR_MATRIX_FREQ_DECIMATION)
if np.any(info_array[info_slice] == "skipped"):
num_skipped_pols += 1
num_skip_freq = np.count_nonzero(info_array == "skipped")
frac_skip_freq = num_skip_freq / (freq_slice.stop - freq_slice.start)
percent_skip_freq = frac_skip_freq * 100
percent_skip_freq = "%.2f" % percent_skip_freq
print(f"{percent_skip_freq}\% of frequencies were skipped in band {spw + 1}, stream {stream_ind}, pol {pol}, skipping covariance calculation")
continue
else:
var = frf.prep_var_for_frop(deint_filt_data[stream_ind],
deint_nsamples[stream_ind],
deint_wgts[stream_ind],
cross_antpairpol,
decimated_freq_slice,
auto_ant=0)
main_lobe_frop = get_frop_wrapper(pol=pol, stream_ind=stream_ind, band_ind=spw,
dlst=dlst, bl_vec=bl_vec[cross_antpairpol],
times=times)
# Calculate whether the EW-baseline projection is short enough to care about the notch
EW_proj = data.antpos[ANTPAIR[0]][0] - data.antpos[ANTPAIR[1]][0]
if np.abs(EW_proj) < CORR_MATRIX_NOTCH_CUTOFF:
xtalk_frop = get_frop_wrapper(pol=pol, stream_ind=stream_ind, band_ind=spw,
dlst=dlst, bl_vec=bl_vec[cross_antpairpol],
times=times, rephase=False, mode="xtalk", coherent_avg=True,
t_avg=times[1] - times[0])
frop = np.zeros_like(main_lobe_frop)
Nfreqs_band = freq_slice.stop - freq_slice.start
Nfreqs_calc = int(np.ceil(Nfreqs_band / CORR_MATRIX_FREQ_DECIMATION))
for freq_ind in range(Nfreqs_calc):
frop[:, :, freq_ind] = np.tensordot(main_lobe_frop[:, :, freq_ind],
xtalk_frop[:, :, freq_ind],
axes=1)
else:
frop = main_lobe_frop
cov_here += frf.get_FRF_cov(frop, var) # computes covariance for pI by adding ee and nn
if hd.pol_convention == "avg" and num_skipped_pols == 0:
cov_here /= 4
# Don't use times that are all flagged
ALLed_flags = np.all(deint_avg_flags[stream_ind][ANTPAIR + ('pI',)][:, freq_slice], axis=1)
if num_skipped_pols < 2 and not np.all(ALLed_flags): # Don't allow an empty slice
corr_factors[stream_ind] = frf.get_correction_factor_from_cov(cov_here, tslc=(~ALLed_flags))
else:
print(f"No valid pI data in band {spw + 1}, stream {stream_ind}. Setting this correction factor to nan.")
corr_factors[stream_ind] = np.nan
known_bad_streams[stream_ind] = True
if np.all(known_bad_streams): # We think we understand why they're all bad, set to nan
print(f"No valid correction factors found for band {spw + 1}, setting to nan.")
corr_factor = np.nan
else:
which_finite = np.isfinite(corr_factors[~known_bad_streams])
# Catches potential baddies from time slicing corner cases that we haven't thought of, should error
assert np.all(which_finite), f'For band {band}, corr_factors={corr_factors}, known_bad_streams={known_bad_streams}'
corr_factor = np.mean(corr_factors[~known_bad_streams])
coherent_avg_correction_factors.append(corr_factor)
else:
# use the mode-counting method, which is simpler but less accurate
coherent_avg_correction_factors.append(dpss_coherent_avg_correction(spw))
Figure 2: Waterfalls Before Delay Filtering and/or Inpainting¶
if PLOT:
plot_waterfall(data, flags=flags)
plot_real_delay_vs_lst(data, flags=flags, xlim=[-3999, 3999], clim=[-1e4, 1e4])
plot_dly_vs_fr(data, xlim=[-1999, 1999], clim=[1e0, 1e5])
Figure 3: Waterfalls After Delay Filtering and/or Inpainting¶
if PLOT and (PERFORM_DLY_FILT or PERFORM_INPAINT):
plot_waterfall(filt_data, flags=filt_flags, nsamples=nsamples)
plot_real_delay_vs_lst(filt_data, flags=filt_flags, xlim=[-3999, 3999], clim=[-1e4, 1e4])
plot_dly_vs_fr(filt_data, xlim=[-1999, 1999], clim=[1e0, 1e5])
Figure 4: First Set of De-Interleaved Waterfalls after Cross-Talk Filtering¶
if PLOT and not SKIP_XTALK_AND_FRF:
plot_waterfall(deint_xtalk_filt_data[0], flags=deint_flags[0], nsamples=deint_nsamples[0], tslice=None)
plot_real_delay_vs_lst(deint_xtalk_filt_data[0], flags=deint_flags[0], xlim=[-3999, 3999], clim=[-1e4, 1e4], tslice=None)
plot_dly_vs_fr(deint_xtalk_filt_data[0], xlim=[-1999, 1999], clim=[1e0, 1e5], tslice=None)
Figure 5: First Set of De-Interleaved Waterfalls after Main-Beam Fringe-Rate Filtering¶
if PLOT and not SKIP_XTALK_AND_FRF:
plot_waterfall(deint_frf_data[0], flags=deint_flags[0], nsamples=deint_nsamples[0], tslice=None)
plot_real_delay_vs_lst(deint_frf_data[0], flags=deint_flags[0], xlim=[-3999, 3999], clim=[-2e2, 2e2], linthresh=1, tslice=None)
plot_dly_vs_fr(deint_frf_data[0], xlim=[-1999, 1999], clim=[1e0, 1e5], tslice=None)
Figure 6: First Set of De-Interleaved Waterfalls after Coherent Time Averaging¶
if PLOT:
plot_waterfall(deint_avg_data[0], bl=(ANTPAIR + ('ee',)), flags=deint_avg_flags[0], nsamples=deint_avg_nsamples[0], tslice=None)
plot_real_delay_vs_lst(deint_avg_data[0], bl=(ANTPAIR + ('ee',)), flags=deint_avg_flags[0], xlim=[-3999, 3999], clim=[-2e2, 2e2], linthresh=1, tslice=None)
plot_dly_vs_fr(deint_avg_data[0], bl=(ANTPAIR + ('ee',)), xlim=[-1999, 1999], clim=[1e0, 1e5], tslice=None)
Figure 7: First Set of De-Interleaved Waterfalls after Forming Pseudo-Stokes I¶
if PLOT:
plot_waterfall(deint_avg_data[0], bl=(ANTPAIR + ('pI',)), flags=deint_avg_flags[0], nsamples=deint_avg_nsamples[0], tslice=None)
plot_real_delay_vs_lst(deint_avg_data[0], bl=(ANTPAIR + ('pI',)), flags=deint_avg_flags[0], xlim=[-3999, 3999], clim=[-2e2, 2e2], linthresh=1, tslice=None)
plot_dly_vs_fr(deint_avg_data[0], bl=(ANTPAIR + ('pI',)), xlim=[-1999, 1999], clim=[1e0, 1e5], tslice=None)
Power Spectrum Estimation¶
Prepare for power spectrum estimation¶
# put results back into HERAData objects for use in hera_pspec (which works with UVData objects)
hds = []
for avg_data, avg_flags, avg_nsamples in zip(deint_avg_data, deint_avg_flags, deint_avg_nsamples):
avg_hd = copy.deepcopy(hd)
# select the right number of times and update time and lst arrays
avg_hd.select(times=np.unique(avg_hd.time_array)[:len(avg_data.times)])
for ap in avg_hd.get_antpairs():
blt_slice = avg_hd._blt_slices[ap]
avg_hd.time_array[blt_slice] = avg_data.times
avg_hd.lst_array[blt_slice] = avg_data.lsts
# update polarizations
pstokes_pols = sorted([pol for pol in avg_data.pols() if utils.polstr2num(pol, x_orientation=hd.x_orientation) > 0])
avg_hd.polarization_array = np.array([utils.polstr2num(pol) for pol in pstokes_pols])
avg_hd._determine_pol_indexing()
# update pstokes only data in avg_hd
avg_data_for_update = datacontainer.DataContainer({bl: avg_data[bl] for bl in avg_data if bl[2] in pstokes_pols})
avg_flags_for_update = datacontainer.DataContainer({bl: avg_flags[bl] for bl in avg_flags if bl[2] in pstokes_pols})
avg_nsamples_for_update = datacontainer.DataContainer({bl: avg_nsamples[bl] for bl in avg_nsamples if bl[2] in pstokes_pols})
avg_hd.update(data=avg_data_for_update, flags=avg_flags_for_update, nsamples=avg_nsamples_for_update)
# add in autocorrelations copies for all antennas for use in noise calculations
auto_aps = [ap for ap in avg_hd.get_antpairs() if ap[0] == ap[1]]
for ant in ANTPAIR:
if (ant, ant) not in auto_aps:
avg_hd_copy = copy.deepcopy(avg_hd)
avg_hd_copy.select(bls=[auto_aps][0])
avg_hd_copy.ant_1_array = np.full_like(avg_hd_copy.ant_1_array, ant)
avg_hd_copy.ant_2_array = np.full_like(avg_hd_copy.ant_2_array, ant)
avg_hd_copy.baseline_array = uvutils.antnums_to_baseline(avg_hd_copy.ant_1_array, avg_hd_copy.ant_2_array, Nants_telescope=avg_hd_copy.Nants_telescope)
avg_hd.fast_concat(avg_hd_copy, 'blt', inplace=True)
hds.append(avg_hd)
# Load uvbeam file
uvb = UVBeam()
uvb.read(EFIELD_HEALPIX_BEAM_FILE)
uvb.use_future_array_shapes()
# convert to pstokes and peak-normalized power beam
uvb_ps = uvb.efield_to_pstokes(inplace=False)
uvb_ps.peak_normalize()
uvb.efield_to_power()
uvb.peak_normalize()
Estimate power spectra for all unique interleaved pairs¶
# Estimate power spectrum
cosmo = hp.conversions.Cosmo_Conversions()
pspecbeam = hp.pspecbeam.PSpecBeamUV(uvb_ps, cosmo=cosmo)
band = bands[0]
time_in_seconds = (deint_filt_data[0].times - deint_filt_data[0].times.min()) * 60 * 60 * 24 # time array in seconds
time_filters = dspec.dpss_operator(time_in_seconds, [np.mean(fr_ranges[band]) / 1000], [np.diff(fr_ranges[band]) / 2 / 1000], eigenval_cutoff=[FR_EIGENVAL_CUTOFF])[0].real
# Compute power spectra for all unique pairs of interleaves
uvps = []
for ind1, hd1 in enumerate(hds):
for ind2, hd2 in enumerate((hds[ind1:] if INCLUDE_INTERLEAVE_AUTO_PS else hds[ind1 + 1:])):
# Compute power spectrum
ds = hp.PSpecData(dsets=[copy.deepcopy(hd1), copy.deepcopy(hd2)], beam=pspecbeam)
ds.Jy_to_mK()
uvp = ds.pspec([ANTPAIR], [ANTPAIR], dsets=(0, 1),
pols=[utils.polnum2str(pol) for pol in hd1.polarization_array],
spw_ranges=[(bs.start, bs.stop) for bs in band_slices],
taper=TAPER, store_window=STORE_WINDOW_FUNCTIONS, verbose=False)
# Figure out error bars using autocorrelations
auto_Tsys = hp.utils.uvd_to_Tsys((hd1 + hd2), pspecbeam)
hp.utils.uvp_noise_error(uvp, auto_Tsys, err_type=['P_N'])
# append to list of power spectra
uvps.append(uvp)
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating... Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
Warning: min freq 46920776.3671875 < self.beam_freqs.min(), extrapolating...
A note on estimating $P_{SN}$:¶
Tan et al. (2021) showed that $P_{SN}^2 = \sqrt{2}P_S P_N + P_N^2$ (Eq. 30). The challenge is estimating $P_S$. Any power spectrum measurement has both noise and signal in it, so even with the imposition of the prior that $P_S$ is real and non-negative, we expect double counting of the noise. Tan et al. (2021) shows that the expectation value of the 1st term on the LHS of Eq. 30 has an expectation value of $P_N^2 / \sqrt{\pi}$ for Gaussian-distributed power spectra (see Eq. 31) and $P_N^2 / 2$ for Laplacian-distributed power spectra. The former is appropriate after sufficient time averaging, the latter for single integrations.
In this code, we estimate $P_S$ from all other interleaves, which reduces the overcount of $P_N$ in the formula for $P_{SN}$ by a factor of (len(uvps) - 1)
. It also makes the histogram of $P(k) / P_{SN}$ closer to the theoretical expectation, since we never divide $P(k)$ by itself.
# TODO: graduate this code into hera_pspec
# Apply coherent_avg_correction_factor and compute P_SN
for spw, band in enumerate(bands):
for i, uvp in enumerate(uvps):
# loop over all pols, but only this spw
for key in uvp.get_all_keys():
if key[0] != spw:
continue
# apply coherent average correction due to integrations not being independent after FRF
P_N = uvp.get_stats('P_N', key) / coherent_avg_correction_factors[spw]
uvp.set_stats('P_N', key, P_N)
# use other interleaves to estimate P_S
P_S = np.mean([uvp2.get_data(key).real for j, uvp2 in enumerate(uvps) if i != j], axis=0)
P_SN = np.sqrt((np.sqrt(2) * np.where(P_S > 0, P_S, 0) * P_N + P_N**2)) # Tan et al. 2021, Eq. 30
# Apply P_SN Correction for Laplacian statistics (see note above)
P_SN = (P_SN**2 - .5 / (len(uvps) - 1) * P_N**2)**.5
P_SN[~np.isfinite(P_SN)] = np.inf
uvp.set_stats('P_SN', key, P_SN)
# TODO: graduate this code into hera_pspec
# Perform incoherent time average over LST_RANGE_HOURS and correct for FRF
uvps_time_avg = []
for uvp in uvps:
lst_subset = uvp.lst_avg_array[(uvp.lst_avg_array >= LST_MIN * np.pi / 12) & (uvp.lst_avg_array <= LST_MAX * np.pi / 12)]
uvp_tavg = uvp.select(lsts=lst_subset, polpairs=[('pI', 'pI')], inplace=False)
uvp_tavg.average_spectra(time_avg=True, error_weights='P_N', error_field=['P_N', 'P_SN'], inplace=True)
uvps_time_avg.append(uvp_tavg)
for spw, band in enumerate(bands):
coherent_avg_correction_factor = coherent_avg_correction_factors[spw]
for i, uvp in enumerate(uvps_time_avg):
# loop over all pols, but only this spw
for key in uvp.get_all_keys():
if key[0] != spw:
continue
# Update P_N and P_SN for incoherent time average
P_N = uvp.get_stats('P_N', key) * (coherent_avg_correction_factor)**.5
uvp.set_stats('P_N', key, P_N)
P_S = np.mean([uvp2.get_data(key).real for j, uvp2 in enumerate(uvps_time_avg) if i != j], axis=0)
P_SN = np.sqrt((np.sqrt(2) * np.where(P_S > 0, P_S, 0) * P_N + P_N**2)) # Tan et al. 2021, Eq. 30
# Apply P_SN Correction for Gaussian statistics (see note above)
P_SN = (P_SN**2 - 1./ np.sqrt(np.pi) / (len(uvps_time_avg) - 1) * P_N**2)**.5
P_SN[~np.isfinite(P_SN)] = np.inf
uvp.set_stats('P_SN', key, P_SN)
Average Over Interleaves¶
# combine all interleaves into UVPSpec object
interleaved_uvp = hp.uvpspec.recursive_combine_uvpspec(uvps)
# select each individual time and average interleaves together incoherently
to_recombine = []
for times in interleaved_uvp.time_avg_array.reshape(-1, len(uvps)):
to_recombine.append(interleaved_uvp.select(times=times, inplace=False))
to_recombine[-1].average_spectra(time_avg=True, error_weights='P_N', error_field=['P_N', 'P_SN'])
# combine all single-integration UVPSpec objects
interleaved_uvp = hp.uvpspec.recursive_combine_uvpspec(to_recombine)
# Perform incoherent time-averaging across interleaves
uvp_interleave = reduce(lambda x, y: x + y, uvps_time_avg)
uvp_avg_all = uvp_interleave.average_spectra(time_avg=True, error_weights='P_N', error_field=['P_N', 'P_SN'], inplace=False)
Correct for FRF Signal Loss¶
hp.loss.apply_bias_correction(interleaved_uvp, data_bias={spw: (1.0 - loss)**-1 for spw, loss in enumerate(frf_losses.values())})
hp.loss.apply_bias_correction(uvp_avg_all, data_bias={spw: (1.0 - loss)**-1 for spw, loss in enumerate(frf_losses.values())})
<hera_pspec.uvpspec.UVPSpec at 0x7f8146ba6900>
Power Spectrum Plotting Code¶
# average all interleaved LSTs together for plotting
all_lsts = []
for uvp in uvps:
if np.mean(np.unwrap(uvp.lst_avg_array)) - np.mean(np.unwrap(uvps[0].lst_avg_array)) > np.pi:
all_lsts.append(np.unwrap(uvp.lst_avg_array) - 2 * np.pi)
elif np.mean(np.unwrap(uvp.lst_avg_array)) - np.mean(np.unwrap(uvps[0].lst_avg_array)) < -np.pi:
all_lsts.append(np.unwrap(uvp.lst_avg_array) + 2 * np.pi)
else:
all_lsts.append(np.unwrap(uvp.lst_avg_array))
avg_pspec_lsts = (np.mean(all_lsts, axis=0) % (2 * np.pi))
# Compute SNRs for plotting
SNRs = []
tavg_SNRs = []
for spw, band in enumerate(bands):
key = (spw, (ANTPAIR, ANTPAIR), ('pI', 'pI'))
for i, uvp in enumerate(uvps):
high_dlys = np.abs(uvp.get_dlys(key[0]) * 1e9) > 1000
SNRs.append(np.ravel((uvp.get_data(key) / uvp.get_stats('P_N', key).real)[:, high_dlys]))
for i, uvp in enumerate(uvps_time_avg):
high_dlys = np.abs(uvp.get_dlys(key[0]) * 1e9) > 1000
tavg_SNRs.append(np.ravel((uvp.get_data(key) / uvp.get_stats('P_N', key).real)[:, high_dlys]))
def plot_Pk_vs_LST(clim=None, xlim=[-2999, 2999], pol='pI'):
'''Plots the real part of the power spectrum from each band as a function of LST and delay for each band.'''
lsts = np.where(avg_pspec_lsts > avg_pspec_lsts[-1], avg_pspec_lsts - 2 * np.pi, avg_pspec_lsts) * 12 / np.pi
fig, axes = plt.subplots(1, len(bands), figsize=(28, 12), sharex=True, sharey=True, gridspec_kw={'wspace': .03}, dpi=100)
for spw, (ax, band, band_slice) in enumerate(zip(axes, bands, band_slices)):
key = (spw, (ANTPAIR, ANTPAIR), (pol, pol))
pk_avg = interleaved_uvp.get_data(key).real
# np.mean([uvp.get_data(key) for uvp in uvps], axis=0).real
delays = interleaved_uvp.get_dlys(key[0]) * 1e9
# uvps[0].get_dlys(key[0]) * 1e9
if spw == 0:
_to_plot = copy.deepcopy(np.where(np.isfinite(pk_avg), pk_avg, np.nan))
_to_plot = np.where(_to_plot == 0, np.nan, _to_plot)
im = ax.imshow(np.abs(np.where(np.isfinite(pk_avg), pk_avg, np.nan)),
interpolation='none', aspect='auto', cmap='inferno',
norm=matplotlib.colors.LogNorm(vmin=(clim[0] if clim is not None else np.nanmin(np.abs(_to_plot))),
vmax=(clim[1] if clim is not None else np.nanmax(np.abs(_to_plot)))),
extent=[delays[0], delays[-1], lsts[-1], lsts[0]])
for multiple in [1, -1]:
ax.axvline(multiple * dly_filter_half_widths[0] * 1e9, ls='--', color='k')
ax.axvline(multiple * inpaint_filter_half_widths[0] * 1e9, ls=':', color='k')
ax.set_xlim(xlim)
ax.set_title(f'Band {spw+1}:\n{band[0]}—{band[1]} MHz', fontsize=10)
ax.set_xlabel('Delay (ns)')
if spw == 0:
ax.set_ylabel('LST (Hours)')
ax.set_yticklabels([f'{(int(val) if np.isclose(val, int(val)) else val) % 24}' for val in ax.get_yticks()])
plt.colorbar(im, ax=axes, pad=.02, aspect=40, extend='both', label=f'{pol} ' + r'|Re[$P(k)$]| (mK$^2$ $h^{-3}$ Mpc$^3$)')
def plot_Pk_SNR_vs_LST(clim=[-5, 5], xlim = [-2999, 2999], func=np.real):
'''Plots the real power spectrum SNR (normalized by P_N, not P_SN) as a function of LST and delay for each band.'''
lsts = np.where(data.lsts[tslice] > data.lsts[tslice][-1], data.lsts[tslice] - 2 * np.pi, data.lsts[tslice]) * 12 / np.pi
fig, axes = plt.subplots(1, len(bands), figsize=(28, 12), sharex=True, sharey=True, gridspec_kw={'wspace': .03}, dpi=100)
for spw, (ax, band, band_slice) in enumerate(zip(axes, bands, band_slices)):
key = (spw, (ANTPAIR, ANTPAIR), ('pI', 'pI'))
pk_avg = func(interleaved_uvp.get_data(key))
# func(np.mean([uvp.get_data(key) for uvp in uvps], axis=0))
delays = interleaved_uvp.get_dlys(key[0]) * 1e9
P_N = np.abs(interleaved_uvp.get_stats("P_N", key)) #np.mean([np.abs(uvp.get_stats('P_N', key)) for uvp in uvps], axis=0)
SNR = pk_avg / P_N# / np.sqrt(len(uvps)))
im = ax.imshow(SNR, interpolation='none', aspect='auto', cmap='bwr',
vmin=clim[0], vmax=clim[1],
extent=[delays[0], delays[-1], lsts[-1], lsts[0]])
for multiple in [1, -1]:
ax.axvline(multiple * dly_filter_half_widths[0] * 1e9, ls='--', color='k')
ax.axvline(multiple * inpaint_filter_half_widths[0] * 1e9, ls=':', color='k')
ax.set_xlim(xlim)
ax.set_title(f'Band {spw+1}:\n{band[0]}—{band[1]} MHz', fontsize=10)
ax.set_xlabel('Delay (ns)')
if spw == 0:
ax.set_ylabel('LST (Hours)')
ax.set_yticklabels([f'{(int(val) if np.isclose(val, int(val)) else val) % 24}' for val in ax.get_yticks()])
plt.colorbar(im, ax=axes, pad=.02, aspect=40, extend='both', label=f'{"Re" if func == np.real else "Im"}' + r'[$P(k) / P_N(k)$] (unitless)')
def plot_SNR_hist(to_hist, theory='laplace', leg_title='All Bands, $|\\tau| > 1000$ ns', bins=None,
denom_label='$P_N$'):
'''Plots the histogram of power spectrum SNR values (both real and imaginary) and compares them to a theoretical distribution.'''
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
if bins is None:
bins = np.linspace(-15,15,200)
for ax, func, c in zip(axes, [np.real, np.imag], ['C0', 'C1']):
ax.hist(func(to_hist), bins=bins, density=True, color=c, edgecolor='k', linewidth=.1,
label=f'{"Re" if func == np.real else "Im"}[$P(k)$] / {denom_label}')
ax.set_yscale('log')
if theory == 'laplace':
b = 2**-.5
laplace = np.exp(-np.abs(bins) / b) / 2 / b
ax.plot(bins, laplace, 'k--', label='Laplace Distribution')
elif theory == 'gauss':
gauss = np.exp(-bins**2/2) / np.sqrt(2*np.pi)
ax.plot(bins, gauss, 'k--', label='Gaussian Distribution')
ax.legend(title=leg_title)
ax.set_xlabel('SNR')
ax.set_ylabel('Density')
ax.set_ylim([10**np.floor(np.log10(1 / len(to_hist))), 1])
text = f'Observed Mean: {np.nanmean(func(to_hist)).real:.3f}'
text += f'\nObserved Median: {np.nanmedian(func(to_hist)).real:.3f}'
if theory == 'laplace':
text += f'\nObserved/Expected Std: {np.nanstd(func(to_hist)) / 2**.5 / b:.3f}'
text += f'\nObserved/Expected MAD: {np.nanmedian(np.abs(func(to_hist) - np.nanmedian(func(to_hist)))) / b / np.log(2):.3f}'
elif theory == 'gauss':
text += f'\nObserved/Expected Std: {np.nanstd(func(to_hist)):.3f}'
text += f'\nObserved/Expected MAD: {np.nanmedian(np.abs(func(to_hist) - np.nanmedian(func(to_hist)))) / (2**.5 * special.erfinv(.5)):.3f}'
ax.set_title(text)
def plot_tavg_pspec():
'''This plots the time-averaged power spectrum over the whole range of LSTs, including 2 sigma errors'''
fig, axes = plt.subplots(int(np.ceil(len(bands) / 2)), 2, figsize=(18, 12), sharex=True, sharey=True, gridspec_kw={'wspace': .03, 'hspace': .0}, dpi=100)
for spw, (ax, band, band_slice) in enumerate(zip(np.ravel(axes), bands, band_slices)):
key = (spw, (ANTPAIR, ANTPAIR), ('pI', 'pI'))
pk_avg = np.squeeze(uvp_avg_all.get_data(key).real)
delays = uvp_avg_all.get_dlys(key[0]) * 1e9
P_N = np.squeeze(uvp_avg_all.get_stats('P_N', key))
P_SN = np.squeeze(uvp_avg_all.get_stats('P_SN', key))
ax.errorbar(delays, pk_avg, marker='o', ls='', yerr=2*P_SN, label='$Re[P(k)]$ with 2$\sigma$ $P_{SN}$ errors')
ax.plot(delays, P_N, 'k--', label='$P_{N}$')
ax.plot(delays, P_SN, 'k-', label='$P_{SN}$')
ax.set_yscale('log')
ax.set_xlim([-2500, 2500])
ax.set_ylim([1e3, 1e14])
ax.set_xlabel('Delay (ns)')
ax.tick_params(axis='x', direction='in')
for multiple in [1, -1]:
ax.axvline(multiple * dly_filter_half_widths[0] * 1e9, ls='--', color='k', lw=.5, label=(r'Filtering $\tau_{max}$' if multiple == 1 else None))
ax.axvline(multiple * inpaint_filter_half_widths[0] * 1e9, ls=':', color='k', lw=.5, label=(r'Inpainting $\tau_{max}$' if multiple == 1 else None))
if spw % 2 == 0:
ax.set_ylabel('Re[$P(k)$]\n(mK$^2$ $h^{-3}$ Mpc$^3$)')
ax.text(.02, .93, f'Band {spw + 1}:\n{band[0]}—{band[1]} MHz', transform=ax.transAxes, fontsize=12,
va='top', ha='left', bbox=dict(boxstyle='round', facecolor='w', alpha=0.8))
handles, labels = axes[-1, -1].get_legend_handles_labels()
fig.legend(handles, labels, loc='upper center', bbox_to_anchor=(0.5, .92), ncol=len(labels))
plt.tight_layout()
Figure 8: Interleave-Averaged Power Spectra (Pseudo-Stokes I, Q, U, & V) vs. LST¶
plot_Pk_vs_LST(pol='pI')
plot_Pk_vs_LST(pol='pQ')
plot_Pk_vs_LST(pol='pU')
plot_Pk_vs_LST(pol='pV')
Figure 9: Interleave-Averaged Power Spectrum SNR vs. LST (Real and Imaginary for pI)¶
plot_Pk_SNR_vs_LST(func=np.real)
plot_Pk_SNR_vs_LST(func=np.imag)
Figure 10: High Delay Power Spectrum SNR Histograms Before and After Incoherent Averaging¶
plot_SNR_hist(np.array([snr for interleave_band in SNRs for snr in interleave_band]), theory='laplace')
plot_SNR_hist(np.array([snr for interleave_band in tavg_SNRs for snr in interleave_band]), theory='gauss', bins=np.linspace(-10,10,100),
leg_title='All Bands, Time-Averaged,\n$|\\tau| > 1000$ ns')
Figure 11: Incoherently Averaged Power Spectrum with Error Bars¶
plot_tavg_pspec()
Save Results¶
if SAVE_RESULTS:
# Create pspec container and write all interleaves to it
psc = hp.PSpecContainer(OUT_PSPEC_FILE, mode='rw', keep_open=False)
psc.set_pspec('stokespol', 'interleave_averaged', interleaved_uvp, overwrite=True)
# Create pspec container for time-averaged power spectra
psc_tavg = hp.PSpecContainer(OUT_TAVG_PSPEC_FILE, mode='rw', keep_open=False)
psc_tavg.set_pspec('stokespol', 'time_and_interleave_averaged', uvp_avg_all, overwrite=True)
# write ancillary data products directly to header attributes
for outfile in [OUT_PSPEC_FILE, OUT_TAVG_PSPEC_FILE]:
with h5py.File(outfile, 'r+') as f:
f['header'].attrs['dpss_coherent_avg_corrections'] = coherent_avg_correction_factors
f['header'].attrs['frf_losses'] = [frf_losses[band] for band in bands]
Metadata¶
for repo in ['numpy', 'scipy', 'astropy', 'hera_cal', 'hera_qm', 'hera_filters', 'hera_pspec', 'hera_notebook_templates', 'pyuvdata']:
print(f"{repo}: {importlib.import_module(repo).__version__}")
numpy: 2.2.1 scipy: 1.14.1 astropy: 7.0.0 hera_cal: 3.7.1.dev32+gd483c28f hera_qm: 2.2.0 hera_filters: 0.1.6.dev1+g297dcce hera_pspec: 0.4.3.dev75+g9a2c973 hera_notebook_templates: 0.1.dev1102+gc368f33 pyuvdata: 3.1.3
print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')
Finished execution in 16.37 minutes.