from __future__ import annotations
import pathlib
import re
import warnings
from dataclasses import dataclass
from typing import Any
import numpy
import pandas
import scipy.ndimage
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time
from astropy.wcs import WCS, FITSFixedWarning
from astropy.wcs.utils import fit_wcs_from_points
from scipy.spatial import KDTree
from skimage.registration import phase_cross_correlation
from coordio.conv import guideToTangent, tangentToWok
from . import Site, calibration, defaults
from .astrometry import astrometrynet_quick
from .conv import tangentToGuide, wokToTangent
from .coordinate import Coordinate, Coordinate2D
from .exceptions import CoordIOError, CoordIOUserWarning
from .extraction import sextractor_quick
from .sky import ICRS, Observed
from .telescope import Field, FocalPlane
from .utils import radec2wokxy
from .wok import Wok
__all__ = [
"Guide",
"GuiderFitter",
"umeyama",
"radec_to_gfa",
"gfa_to_radec",
"cross_match",
]
warnings.filterwarnings("ignore", module="astropy.wcs.wcs")
warnings.filterwarnings("ignore", category=FITSFixedWarning)
[docs]class Guide(Coordinate2D):
"""Guide coordinates are 2D cartesian xy coordinates. Units are pixels.
The origin is the lower left corner of the lower left pixel when looking
at the CCD through the camera window. Thus guide images are taken in
"selfie-mode". So the coordinate (0.5, 0.5) is the center of the lower
left pixel. -y points (generally) toward the wok vertex. True rotations
and locations of GFAs in the wok are measured/calibrated after
installation and on-sky characterization.
Parameters
-----------
value : numpy.ndarray
A Nx2 array where [x,y] are columns. Or `.Tangent`
xBin : int
CCD x bin factor, defaults to 1
yBin : int
CCD y bin factor, defaults to 1
Attributes
-----------
guide_warn : numpy.ndarray
boolean array indicating coordinates outside the CCD FOV
"""
__extra_params__ = ["xBin", "yBin"]
__warn_arrays__ = ["guide_warn"]
xBin: int
yBin: int
guide_warn: numpy.ndarray
def __new__(cls, value, **kwargs):
xBin = kwargs.get("xBin", None)
if xBin is None:
kwargs["xBin"] = 1
yBin = kwargs.get("yBin", None)
if yBin is None:
kwargs["yBin"] = 1
if isinstance(value, Coordinate):
if value.coordSysName == "Tangent":
# going from 3D -> 2D coord sys
# reduce dimensionality
arrInit = numpy.zeros((len(value), 2))
obj = super().__new__(cls, arrInit, **kwargs)
obj._fromTangent(value)
else:
raise CoordIOError(
"Cannot convert to Guide from %s" % value.coordSysName
)
else:
obj = super().__new__(cls, value, **kwargs)
obj._fromRaw()
return obj
def _fromTangent(self, tangentCoords):
"""Convert from tangent coordinates to guide coordinates"""
if tangentCoords.holeID not in calibration.VALID_GUIDE_IDS:
raise CoordIOError(
"Cannot convert from wok hole %s to Guide" % tangentCoords.holeID
)
# make sure the guide wavelength is specified for all coords
# this may be a little too extreme of a check
badWl = numpy.sum(tangentCoords.wavelength - defaults.INST_TO_WAVE["GFA"]) != 0
if badWl:
raise CoordIOError(
"Cannont convert to Guide coords from non-guide wavelength"
)
# use coords projected to tangent xy plane
# could almost equivalently use tangentCoords[:,0:2]
# and we may want to ditch the projection eventually if its
# unnecessary
# note, moved this calculation into
# conv.tangentToGuide, may want to just use that one
xT = tangentCoords.xProj
yT = tangentCoords.yProj
xPix = (1 / self.xBin) * (
defaults.MICRONS_PER_MM / defaults.GFA_PIXEL_SIZE * xT
+ defaults.GFA_CHIP_CENTER
)
yPix = (1 / self.yBin) * (
defaults.MICRONS_PER_MM / defaults.GFA_PIXEL_SIZE * yT
+ defaults.GFA_CHIP_CENTER
)
self[:, 0] = xPix
self[:, 1] = yPix
self._fromRaw()
def _fromRaw(self):
"""Find coords that may be out of the FOV and tag them"""
# note this might be wrong when xBin is 3
maxX = defaults.GFA_CHIP_CENTER * 2 / self.xBin
maxY = defaults.GFA_CHIP_CENTER * 2 / self.yBin
xPix = self[:, 0]
yPix = self[:, 1]
arg = numpy.argwhere((xPix < 0) | (xPix > maxX) | (yPix < 0) | (yPix > maxY))
if len(arg) == 0:
# everything in range...
return
arg = arg.squeeze()
self.guide_warn[arg] = True
def gfa_to_wok(observatory: str, x_pix: float, y_pix: float, gfa_id: int):
"""Converts from a GFA pixel to wok coordinates."""
gfa_row = calibration.gfaCoords.loc[(observatory, gfa_id), :]
b = gfa_row[["xWok", "yWok", "zWok"]].to_numpy().squeeze()
iHat = gfa_row[["ix", "iy", "iz"]].to_numpy().squeeze()
jHat = gfa_row[["jx", "jy", "jz"]].to_numpy().squeeze()
kHat = gfa_row[["kx", "ky", "kz"]].to_numpy().squeeze()
xt, yt = guideToTangent(x_pix, y_pix)
zt = 0
return tangentToWok(xt, yt, zt, b, iHat, jHat, kHat) # type: ignore
[docs]def gfa_to_radec(
observatory: str,
x_pix: float,
y_pix: float,
gfa_id: int,
bore_ra: float,
bore_dec: float,
position_angle: float = 0,
offset_ra: float = 0,
offset_dec: float = 0,
offset_pa: float = 0,
obstime: float | None = None,
icrs: bool = False,
):
"""Converts from a GFA pixel to observed RA/Dec. Offsets are in arcsec."""
site = Site(observatory)
site.set_time(obstime)
wavelength = defaults.INST_TO_WAVE["GFA"]
wok_coords = gfa_to_wok(observatory, x_pix, y_pix, gfa_id)
bore_ra += offset_ra / 3600.0 / numpy.cos(numpy.radians(bore_dec))
bore_dec += offset_dec / 3600.0
position_angle -= offset_pa / 3600.0
boresight_icrs = ICRS([[bore_ra, bore_dec]])
boresight = Observed(
boresight_icrs,
site=site,
wavelength=wavelength,
)
wok = Wok([wok_coords], site=site, obsAngle=position_angle)
focal = FocalPlane(wok, wavelength=wavelength, site=site)
field = Field(focal, field_center=boresight)
observed = Observed(field, wavelength=wavelength, site=site)
if icrs is False:
return (observed.ra[0], observed.dec[0])
else:
icrs = ICRS(observed)
return icrs[0]
[docs]def radec_to_gfa(
observatory: str,
ra: float | numpy.ndarray,
dec: float | numpy.ndarray,
gfa_id: int,
bore_ra: float,
bore_dec: float,
position_angle: float = 0,
offset_ra: float = 0,
offset_dec: float = 0,
offset_pa: float = 0,
obstime: float | None = None,
):
"""Converts from observed RA/Dec to GFA pixel, taking into account offsets."""
ra = numpy.atleast_1d(ra)
dec = numpy.atleast_1d(dec)
site = Site(observatory)
site.set_time(obstime)
assert site.time
bore_ra += offset_ra / 3600.0 / numpy.cos(numpy.radians(bore_dec))
bore_dec += offset_dec / 3600.0
position_angle -= offset_pa / 3600.0
xwok, ywok, *_ = radec2wokxy(
ra,
dec,
None,
"GFA",
bore_ra,
bore_dec,
position_angle,
observatory,
site.time.jd,
focalScale=1.0, # Guider scale is relative to 1
)
zwok = numpy.array([defaults.POSITIONER_HEIGHT] * len(xwok))
gfa_coords = calibration.gfaCoords.loc[observatory]
gfa_coords.reset_index(inplace=True)
gfa_row = gfa_coords[gfa_coords.id == gfa_id]
b = gfa_row[["xWok", "yWok", "zWok"]].to_numpy().squeeze()
iHat = gfa_row[["ix", "iy", "iz"]].to_numpy().squeeze()
jHat = gfa_row[["jx", "jy", "jz"]].to_numpy().squeeze()
kHat = gfa_row[["kx", "ky", "kz"]].to_numpy().squeeze()
xt, yt, _ = wokToTangent(xwok, ywok, zwok, b, iHat, jHat, kHat)
x_ccd, y_ccd = tangentToGuide(xt, yt)
return x_ccd, y_ccd
[docs]def umeyama(X, Y):
"""Rigid alignment of two sets of points in k-dimensional Euclidean space.
Given two sets of points in correspondence, this function computes the
scaling, rotation, and translation that define the transform TR that
minimizes the sum of squared errors between TR(X) and its corresponding
points in Y. This routine takes O(n k^3)-time.
Parameters
----------
X
A ``k x n`` matrix whose columns are points.
Y
A ``k x n`` matrix whose columns are points that correspond to the
points in X
Returns
-------
c,R,t
The scaling, rotation matrix, and translation vector defining the
linear map TR as ::
TR(x) = c * R * x + t
such that the average norm of ``TR(X(:, i) - Y(:, i))`` is
minimized.
Copyright: Carlo Nicolini, 2013
Code adapted from the Mark Paskin Matlab version from
http://openslam.informatik.uni-freiburg.de/data/svn/tjtf/trunk/matlab/ralign.m
See paper by Umeyama (1991)
"""
m, n = X.shape
mx = X.mean(1)
my = Y.mean(1)
Xc = X - numpy.tile(mx, (n, 1)).T
Yc = Y - numpy.tile(my, (n, 1)).T
sx = numpy.mean(numpy.sum(Xc * Xc, 0))
Sxy = numpy.dot(Yc, Xc.T) / n
U, D, V = numpy.linalg.svd(Sxy, full_matrices=True, compute_uv=True)
V = V.T.copy()
r = numpy.linalg.matrix_rank(Sxy)
S = numpy.eye(m)
if r < (m - 1):
raise CoordIOError("Not enough independent measurements")
if numpy.linalg.det(Sxy) < 0:
S[-1, -1] = -1
elif r == m - 1:
if numpy.linalg.det(U) * numpy.linalg.det(V) < 0:
S[-1, -1] = -1
R = numpy.dot(numpy.dot(U, S), V.T)
c = numpy.trace(numpy.dot(numpy.diag(D), S)) / sx
t = my - c * numpy.dot(R, mx)
return c, R, t
[docs]def cross_match(
measured_xy: numpy.ndarray,
reference_xy: numpy.ndarray,
reference_radec: numpy.ndarray,
x_size: int,
y_size: int,
cross_corrlation_shift: bool = True,
blur: float = 5,
distance_upper_bound: int = 10,
**kwargs,
):
"""Determines the shift between two sets of points using cross-correlation.
Constructs 2D images from reference and measured data sets and calculates
the image shift using cross-correlation registration. It then associated
reference to measured detections using nearest neighbours and builds a
WCS using the reference on-sky positions.
Parameters
----------
measured_xy
A 2D array with the x and y coordinates of each measured point.
reference_xy
A 2D array with the x and y coordinates of each reference point.
reference_radec
A 2D array with the ra and dec coordinates of each reference point.
x_size
Size of the image for 2D cross-correlation.
y_size
Size of the image for 2D cross-correlation.
blur
The sigma, in pixels, of the Gaussian kernel used to convolve the images.
distance_upper_bound
Maximum distance, in pixels, for KD tree nearest neighbours.
kwargs
Other arguments to pass to ``skimage.registration.phase_cross_correlation``.
Returns
-------
wcs
A tuple with the WCS of the solution and the translation invariant normalized
RMS error between reference and moving image (see ``phase_cross_correlation``).
"""
# Create and blur the reference and measured images.
ref_image = numpy.zeros((y_size, x_size), numpy.float32)
ref_image[reference_xy.astype(int)[:, 1], reference_xy.astype(int)[:, 0]] = 1
if blur > 0:
ref_image = scipy.ndimage.gaussian_filter(ref_image, blur)
meas_image = numpy.zeros((y_size, x_size), numpy.float32)
meas_image[measured_xy.astype(int)[:, 1], measured_xy.astype(int)[:, 0]] = 1
if blur > 0:
meas_image = scipy.ndimage.gaussian_filter(meas_image, blur)
# Calculate the shift and error.
error: float
if cross_corrlation_shift:
shift, error, _ = phase_cross_correlation(ref_image, meas_image, **kwargs)
else:
shift = numpy.array([0.0, 0.0])
error = numpy.nan
# Apply shift.
measured_shift = measured_xy + shift[::-1]
# Associate measured to reference using KD tree.
tree = KDTree(reference_xy)
dd, ii = tree.query(measured_shift, k=1, distance_upper_bound=distance_upper_bound)
# Reject measured objects without a close neighbour.
# KDTree.query() assigns an index larger than the initial set of points when
# the closest neighbour has distance > distance_upper_bound.
valid = dd < len(reference_xy)
ii_valid = ii[valid]
# Select valid measured object and their corresponding RA/Dec references.
measured_xy_valid = measured_xy[valid]
reference_radec_valid = reference_radec[ii_valid, :]
if len(reference_radec_valid) < 3 or len(measured_xy_valid) < 3:
return None, 0
# Build the WCS.
reference_skycoord_valid = SkyCoord(
ra=reference_radec_valid[:, 0],
dec=reference_radec_valid[:, 1],
unit="deg",
)
wcs = fit_wcs_from_points(
(measured_xy_valid[:, 0], measured_xy_valid[:, 1]),
reference_skycoord_valid,
)
assert isinstance(wcs, WCS)
return wcs, error
@dataclass
class GuiderFit:
"""Results of a guider data fit."""
gfa_wok: pandas.DataFrame
astro_wok: pandas.DataFrame
delta_ra: float
delta_dec: float
delta_rot: float
delta_scale: float
xrms: float
yrms: float
rms: float
only_radec: bool = False
[docs]class GuiderFitter:
"""Fits guider data, returning the measured offsets."""
def __init__(self, observatory: str):
self.observatory = observatory.upper()
self.plate_scale = defaults.PLATE_SCALE[self.observatory]
self.pixel_size = defaults.GFA_PIXEL_SIZE
self.pixel_scale = 1.0 / self.plate_scale * self.pixel_size * 3600 / 1000
self.reference_pixel: tuple[int, int] = (1024, 1024)
self.gfa_wok_coords = self._gfa_to_wok()
self.astro_data: pandas.DataFrame | None = None
self.result: GuiderFit | None = None
[docs] def reset(self):
"""Clears the astrometric data."""
self.astro_data = None
self.result = None
def _gfa_to_wok(self):
"""Converts GFA reference points to wok coordinates."""
if self.observatory not in calibration.gfaCoords.index.get_level_values(0):
raise CoordIOError(
f"GFA coordinates for {self.observatory} are missing. "
"Did you load the correct calibrations?"
)
gfaCoords = calibration.gfaCoords.loc[self.observatory]
wok_coords = {}
for gfa_id in range(1, 7):
camera_coords: pandas.DataFrame = gfaCoords.loc[gfa_id]
b = camera_coords[["xWok", "yWok", "zWok"]].to_numpy().squeeze()
iHat = camera_coords[["ix", "iy", "iz"]].to_numpy().squeeze()
jHat = camera_coords[["jx", "jy", "jz"]].to_numpy().squeeze()
kHat = camera_coords[["kx", "ky", "kz"]].to_numpy().squeeze()
xt, yt = guideToTangent(self.reference_pixel[0], self.reference_pixel[1])
zt = 0
wok_coords[gfa_id] = tangentToWok(xt, yt, zt, b, iHat, jHat, kHat)
df = pandas.DataFrame.from_dict(wok_coords).transpose()
df.index.name = "gfa_id"
df.columns = ["xwok", "ywok", "zwok"]
return df
[docs] def add_astro(
self,
camera: str | int,
ra: float,
dec: float,
obstime: float,
x_pixel: float,
y_pixel: float,
):
"""Adds an astrometric measurement."""
if isinstance(camera, str):
match = re.match(r".*([1-6]).*", camera)
if not match:
raise CoordIOError(f"Cannot understand camera {camera!r}.")
camera = int(match.group(1))
new_data = (camera, ra, dec, obstime, x_pixel, y_pixel)
if self.astro_data is None:
self.astro_data = pandas.DataFrame(
[new_data],
columns=["gfa_id", "ra", "dec", "obstime", "x", "y"],
)
else:
self.astro_data.loc[len(self.astro_data.index)] = new_data
[docs] def add_wcs(
self,
camera: str | int,
wcs: WCS,
obstime: float,
pixels: numpy.ndarray | None = None,
):
"""Adds a camera measurement from a WCS solution."""
if pixels is None:
coords: Any = wcs.pixel_to_world(*self.reference_pixel)
ra = coords.ra.value
dec = coords.dec.value
self.add_astro(camera, ra, dec, obstime, *self.reference_pixel)
else:
ras, decs = wcs.all_pix2world(pixels[:, 0], pixels[:, 1], 0)
for ii in range(len(ras)):
self.add_astro(
camera,
ras[ii],
decs[ii],
obstime,
pixels[ii, 0],
pixels[ii, 1],
)
[docs] def add_gimg(self, path: str | pathlib.Path, pixels: numpy.ndarray | None = None):
"""Processes a raw GFA image, runs astrometry.net, adds the solution."""
data = fits.getdata(str(path))
header = fits.getheader(str(path), 1)
if header["OBSERVAT"] != self.observatory:
raise CoordIOError("GFA image is from a different observatory.")
if "RAFIELD" not in header or "DECFIELD" not in header:
raise CoordIOError("GFA image does not have RAFIELD or DECFIELD.")
ra_field = header["RAFIELD"]
dec_field = header["DECFIELD"]
pa_field = header["FIELDPA"]
sources = sextractor_quick(data)
camera_id = int(header["CAMNAME"][3])
radec_centre = gfa_to_radec(
self.observatory,
self.reference_pixel[0],
self.reference_pixel[1],
camera_id,
ra_field,
dec_field,
position_angle=pa_field,
)
wcs = astrometrynet_quick(
sources,
radec_centre[0],
radec_centre[1],
self.pixel_scale,
)
if wcs is None:
raise CoordIOError("astrometry.net could not solve image.")
obstime = Time(header["DATE-OBS"], format="iso").jd
self.add_wcs(camera_id, wcs, obstime, pixels=pixels)
return wcs
[docs] def add_proc_gimg(
self,
path: str | pathlib.Path,
pixels: numpy.ndarray | None = None,
):
"""Adds an astrometric solution from a ``proc-gimg`` image."""
hdu = fits.open(str(path))
is_proc = len(hdu) > 1 and "SOLVED" in hdu[1].header
if not is_proc:
raise CoordIOError(f"{path!s} is not a proc-gimg image.")
header = hdu[1].header
if header["OBSERVAT"] != self.observatory:
raise CoordIOError("GFA image is from a different observatory.")
if not header.get("SOLVED", False):
raise CoordIOError(f"Image {path!s} has not been astrometrically solved.")
wcs = WCS(header)
obstime = Time(header["DATE-OBS"], format="iso").jd
self.add_wcs(header["CAMNAME"], wcs, obstime, pixels=pixels)
[docs] def fit(
self,
field_ra: float,
field_dec: float,
field_pa: float = 0.0,
offset: tuple[float, float, float] | numpy.ndarray = (0.0, 0.0, 0.0),
scale_rms: bool = False,
only_radec: bool = False,
):
"""Performs the fit and returns a `.GuiderFit` object.
The fit is performed using `.umeyama`.
Parameters
----------
field_ra
The field centre RA.
field_dec
The field centre declination.
field_pa
The field centre position angle.
offset
The offset in RA, Dec, PA to apply.
scale_rms
Whether to correct the RMS using the measured scale factor.
only_radec
If `True`, fits only translation. Useful when only one or two cameras
are solving. In this case the rotation and scale offsets will still be
calculated using the Umeyama model, but the ra and dec offsets are
overridden with a simple translation. The `.GuiderFit` object will
have `only_radec=True`.
"""
offset_ra, offset_dec, offset_pa = offset
if self.astro_data is None:
raise CoordIOError("Astro data has not been set.")
camera_ids: list[int] = []
xwok_gfa: list[float] = []
ywok_gfa: list[float] = []
xwok_astro: list[float] = []
ywok_astro: list[float] = []
for d in self.astro_data.itertuples():
camera_id: int = d.gfa_id
camera_ids.append(camera_id)
x_wok, y_wok, _ = gfa_to_wok(self.observatory, d.x, d.y, camera_id)
xwok_gfa.append(x_wok)
ywok_gfa.append(y_wok)
_xwok_astro, _ywok_astro, *_ = radec2wokxy(
[d.ra],
[d.dec],
None,
"GFA",
field_ra - offset_ra / numpy.cos(numpy.deg2rad(d.dec)) / 3600.0,
field_dec - offset_dec / 3600.0,
field_pa - offset_pa / 3600.0,
self.observatory,
d.obstime,
focalScale=1.0, # Guider scale is relative to 1
)
xwok_astro += _xwok_astro.tolist()
ywok_astro += _ywok_astro.tolist()
# X: GFA to wok coordinates as a 2D array
# Y: ICRS to wok coordinates as a 2D array
X = numpy.array([xwok_gfa, ywok_gfa])
Y = numpy.array([xwok_astro, ywok_astro])
try:
c, R, t = umeyama(X, Y)
except ValueError:
if only_radec is True:
warnings.warn(
"Cannot fit using Umeyama. Assuming unitary rotation and scale.",
CoordIOUserWarning,
)
c = 1.0
R = numpy.array([[1, 0], [1, 0]])
else:
return False
if only_radec is True:
# Override the translation component with a simple average translation
t = (Y - X).mean(axis=1).T
# delta_x and delta_y only align with RA/Dec if PA=0. Otherwise we need to
# project using the PA.
pa_rad = numpy.deg2rad(field_pa - offset_pa / 3600.0)
delta_ra = t[0] * numpy.cos(pa_rad) + t[1] * numpy.sin(pa_rad)
delta_dec = -t[0] * numpy.sin(pa_rad) + t[1] * numpy.cos(pa_rad)
# Convert to arcsec and round up
delta_ra = float(numpy.round(delta_ra / self.plate_scale * 3600.0, 3))
delta_dec = float(numpy.round(delta_dec / self.plate_scale * 3600.0, 3))
delta_rot = -numpy.rad2deg(numpy.arctan2(R[1, 0], R[0, 0])) * 3600.0
delta_rot = float(numpy.round(delta_rot, 1))
delta_scale = float(numpy.round(c, 6))
if scale_rms:
xwok_astro = list(numpy.array(xwok_astro) / delta_scale)
ywok_astro = list(numpy.array(ywok_astro) / delta_scale)
delta_x = (numpy.array(xwok_gfa) - numpy.array(xwok_astro)) ** 2
delta_y = (numpy.array(ywok_gfa) - numpy.array(ywok_astro)) ** 2
xrms = numpy.sqrt(numpy.sum(delta_x) / len(delta_x))
yrms = numpy.sqrt(numpy.sum(delta_y) / len(delta_y))
rms = numpy.sqrt(numpy.sum(delta_x + delta_y) / len(delta_x))
# Convert to arcsec and round up
xrms = float(numpy.round(xrms / self.plate_scale * 3600.0, 3))
yrms = float(numpy.round(yrms / self.plate_scale * 3600.0, 3))
rms = float(numpy.round(rms / self.plate_scale * 3600.0, 3))
astro_wok = pandas.DataFrame.from_records(
{"gfa_id": camera_ids, "xwok": xwok_astro, "ywok": ywok_astro}
)
gfa_wok = pandas.DataFrame.from_records(
{"gfa_id": camera_ids, "xwok": xwok_gfa, "ywok": ywok_gfa}
)
self.result = GuiderFit(
gfa_wok,
astro_wok,
delta_ra,
delta_dec,
delta_rot,
delta_scale,
xrms,
yrms,
rms,
only_radec=only_radec,
)
return self.result