from __future__ import annotations
import pathlib
import re
import warnings
from dataclasses import dataclass
from typing import Any
import numpy
import numpy.typing as nt
import pandas
import scipy.ndimage
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time, TimeDelta
from astropy.wcs import WCS, FITSFixedWarning
from astropy.wcs.utils import fit_wcs_from_points
from matplotlib import pyplot as plt
from scipy.spatial import KDTree
from scipy.stats import sigmaclip
from skimage.registration import phase_cross_correlation
from skimage.transform import SimilarityTransform, EuclideanTransform
from coordio.conv import guideToTangent, tangentToWok
from . import Site, calibration, defaults
from .astrometry import astrometrynet_quick
from .conv import tangentToGuide, wokToTangent
from .coordinate import Coordinate, Coordinate2D
from .exceptions import CoordIOError, CoordIOUserWarning
from .extraction import sextractor_quick
from .sky import ICRS, Observed
from .telescope import Field, FocalPlane
from .utils import radec2wokxy, wokxy2radec, gaia_mags2sdss_gri
from .wok import Wok
from .transforms import arg_nearest_neighbor
__all__ = [
"Guide",
"GuiderFitter",
"umeyama",
"radec_to_gfa",
"gfa_to_radec",
"cross_match",
]
warnings.filterwarnings("ignore", module="astropy.wcs.wcs")
warnings.filterwarnings("ignore", category=FITSFixedWarning)
[docs]
class Guide(Coordinate2D):
"""Guide coordinates are 2D cartesian xy coordinates. Units are pixels.
The origin is the lower left corner of the lower left pixel when looking
at the CCD through the camera window. Thus guide images are taken in
"selfie-mode". So the coordinate (0.5, 0.5) is the center of the lower
left pixel. -y points (generally) toward the wok vertex. True rotations
and locations of GFAs in the wok are measured/calibrated after
installation and on-sky characterization.
Parameters
-----------
value : numpy.ndarray
A Nx2 array where [x,y] are columns. Or `.Tangent`
xBin : int
CCD x bin factor, defaults to 1
yBin : int
CCD y bin factor, defaults to 1
Attributes
-----------
guide_warn : numpy.ndarray
boolean array indicating coordinates outside the CCD FOV
"""
__extra_params__ = ["xBin", "yBin"]
__warn_arrays__ = ["guide_warn"]
xBin: int
yBin: int
guide_warn: numpy.ndarray
def __new__(cls, value, **kwargs):
xBin = kwargs.get("xBin", None)
if xBin is None:
kwargs["xBin"] = 1
yBin = kwargs.get("yBin", None)
if yBin is None:
kwargs["yBin"] = 1
if isinstance(value, Coordinate):
if value.coordSysName == "Tangent":
# going from 3D -> 2D coord sys
# reduce dimensionality
arrInit = numpy.zeros((len(value), 2))
obj = super().__new__(cls, arrInit, **kwargs)
obj._fromTangent(value)
else:
raise CoordIOError(
"Cannot convert to Guide from %s" % value.coordSysName
)
else:
obj = super().__new__(cls, value, **kwargs)
obj._fromRaw()
return obj
def _fromTangent(self, tangentCoords):
"""Convert from tangent coordinates to guide coordinates"""
if tangentCoords.holeID not in calibration.VALID_GUIDE_IDS:
raise CoordIOError(
"Cannot convert from wok hole %s to Guide" % tangentCoords.holeID
)
# make sure the guide wavelength is specified for all coords
# this may be a little too extreme of a check
badWl = numpy.sum(tangentCoords.wavelength - defaults.INST_TO_WAVE["GFA"]) != 0
if badWl:
raise CoordIOError(
"Cannont convert to Guide coords from non-guide wavelength"
)
# use coords projected to tangent xy plane
# could almost equivalently use tangentCoords[:,0:2]
# and we may want to ditch the projection eventually if its
# unnecessary
# note, moved this calculation into
# conv.tangentToGuide, may want to just use that one
xT = tangentCoords.xProj
yT = tangentCoords.yProj
xPix = (1 / self.xBin) * (
defaults.MICRONS_PER_MM / defaults.GFA_PIXEL_SIZE * xT
+ defaults.GFA_CHIP_CENTER
)
yPix = (1 / self.yBin) * (
defaults.MICRONS_PER_MM / defaults.GFA_PIXEL_SIZE * yT
+ defaults.GFA_CHIP_CENTER
)
self[:, 0] = xPix
self[:, 1] = yPix
self._fromRaw()
def _fromRaw(self):
"""Find coords that may be out of the FOV and tag them"""
# note this might be wrong when xBin is 3
maxX = defaults.GFA_CHIP_CENTER * 2 / self.xBin
maxY = defaults.GFA_CHIP_CENTER * 2 / self.yBin
xPix = self[:, 0]
yPix = self[:, 1]
arg = numpy.argwhere((xPix < 0) | (xPix > maxX) | (yPix < 0) | (yPix > maxY))
if len(arg) == 0:
# everything in range...
return
arg = arg.squeeze()
self.guide_warn[arg] = True
def gfa_to_wok(observatory: str, x_pix: float, y_pix: float, gfa_id: int):
"""Converts from a GFA pixel to wok coordinates."""
gfa_row = calibration.gfaCoords.loc[(observatory, gfa_id), :]
b = gfa_row[["xWok", "yWok", "zWok"]].to_numpy().squeeze()
iHat = gfa_row[["ix", "iy", "iz"]].to_numpy().squeeze()
jHat = gfa_row[["jx", "jy", "jz"]].to_numpy().squeeze()
kHat = gfa_row[["kx", "ky", "kz"]].to_numpy().squeeze()
xt, yt = guideToTangent(x_pix, y_pix)
zt = 0
return tangentToWok(xt, yt, zt, b, iHat, jHat, kHat) # type: ignore
[docs]
def gfa_to_radec(
observatory: str,
x_pix: float,
y_pix: float,
gfa_id: int,
bore_ra: float,
bore_dec: float,
position_angle: float = 0,
offset_ra: float = 0,
offset_dec: float = 0,
offset_pa: float = 0,
obstime: float | None = None,
icrs: bool = False,
):
"""Converts from a GFA pixel to observed RA/Dec. Offsets are in arcsec."""
site = Site(observatory)
site.set_time(obstime)
wavelength = defaults.INST_TO_WAVE["GFA"]
wok_coords = gfa_to_wok(observatory, x_pix, y_pix, gfa_id)
bore_ra += offset_ra / 3600.0 / numpy.cos(numpy.radians(bore_dec))
bore_dec += offset_dec / 3600.0
position_angle -= offset_pa / 3600.0
boresight_icrs = ICRS([[bore_ra, bore_dec]])
boresight = Observed(
boresight_icrs,
site=site,
wavelength=wavelength,
)
wok = Wok([wok_coords], site=site, obsAngle=position_angle)
focal = FocalPlane(wok, wavelength=wavelength, site=site)
field = Field(focal, field_center=boresight)
observed = Observed(field, wavelength=wavelength, site=site)
if icrs is False:
return (observed.ra[0], observed.dec[0])
else:
return ICRS(observed)[0]
[docs]
def radec_to_gfa(
observatory: str,
ra: float | numpy.ndarray,
dec: float | numpy.ndarray,
gfa_id: int,
bore_ra: float,
bore_dec: float,
position_angle: float = 0,
offset_ra: float = 0,
offset_dec: float = 0,
offset_pa: float = 0,
obstime: float | None = None,
):
"""Converts from observed RA/Dec to GFA pixel, taking into account offsets."""
ra = numpy.atleast_1d(ra)
dec = numpy.atleast_1d(dec)
site = Site(observatory)
site.set_time(obstime)
assert site.time
bore_ra += offset_ra / 3600.0 / numpy.cos(numpy.radians(bore_dec))
bore_dec += offset_dec / 3600.0
position_angle -= offset_pa / 3600.0
xwok, ywok, *_ = radec2wokxy(
ra,
dec,
None,
"GFA",
bore_ra,
bore_dec,
position_angle,
observatory,
site.time.jd,
focalScale=1.0, # Guider scale is relative to 1
)
zwok = numpy.array([defaults.POSITIONER_HEIGHT] * len(xwok))
gfa_coords = calibration.gfaCoords.loc[observatory]
gfa_coords.reset_index(inplace=True)
gfa_row = gfa_coords[gfa_coords.id == gfa_id]
b = gfa_row[["xWok", "yWok", "zWok"]].to_numpy().squeeze()
iHat = gfa_row[["ix", "iy", "iz"]].to_numpy().squeeze()
jHat = gfa_row[["jx", "jy", "jz"]].to_numpy().squeeze()
kHat = gfa_row[["kx", "ky", "kz"]].to_numpy().squeeze()
xt, yt, _ = wokToTangent(xwok, ywok, zwok, b, iHat, jHat, kHat)
x_ccd, y_ccd = tangentToGuide(xt, yt)
return x_ccd, y_ccd
[docs]
def umeyama(X, Y):
"""Rigid alignment of two sets of points in k-dimensional Euclidean space.
Given two sets of points in correspondence, this function computes the
scaling, rotation, and translation that define the transform TR that
minimizes the sum of squared errors between TR(X) and its corresponding
points in Y. This routine takes O(n k^3)-time.
Parameters
----------
X
A ``k x n`` matrix whose columns are points.
Y
A ``k x n`` matrix whose columns are points that correspond to the
points in X
Returns
-------
c,R,t
The scaling, rotation matrix, and translation vector defining the
linear map TR as ::
TR(x) = c * R * x + t
such that the average norm of ``TR(X(:, i) - Y(:, i))`` is
minimized.
Copyright: Carlo Nicolini, 2013
Code adapted from the Mark Paskin Matlab version from
http://openslam.informatik.uni-freiburg.de/data/svn/tjtf/trunk/matlab/ralign.m
See paper by Umeyama (1991)
"""
m, n = X.shape
mx = X.mean(1)
my = Y.mean(1)
Xc = X - numpy.tile(mx, (n, 1)).T
Yc = Y - numpy.tile(my, (n, 1)).T
sx = numpy.mean(numpy.sum(Xc * Xc, 0))
Sxy = numpy.dot(Yc, Xc.T) / n
U, D, V = numpy.linalg.svd(Sxy, full_matrices=True, compute_uv=True)
V = V.T.copy()
r = numpy.linalg.matrix_rank(Sxy)
S = numpy.eye(m)
if r < (m - 1):
raise CoordIOError("Not enough independent measurements")
if numpy.linalg.det(Sxy) < 0:
S[-1, -1] = -1
elif r == m - 1:
if numpy.linalg.det(U) * numpy.linalg.det(V) < 0:
S[-1, -1] = -1
R = numpy.dot(numpy.dot(U, S), V.T)
c = numpy.trace(numpy.dot(numpy.diag(D), S)) / sx
t = my - c * numpy.dot(R, mx)
return c, R, t
[docs]
def cross_match(
measured_xy: numpy.ndarray,
reference_xy: numpy.ndarray,
reference_radec: numpy.ndarray,
x_size: int,
y_size: int,
cross_corrlation_shift: bool = True,
blur: float = 5,
distance_upper_bound: int = 10,
**kwargs,
):
"""Determines the shift between two sets of points using cross-correlation.
Constructs 2D images from reference and measured data sets and calculates
the image shift using cross-correlation registration. It then associated
reference to measured detections using nearest neighbours and builds a
WCS using the reference on-sky positions.
Parameters
----------
measured_xy
A 2D array with the x and y coordinates of each measured point.
reference_xy
A 2D array with the x and y coordinates of each reference point.
reference_radec
A 2D array with the ra and dec coordinates of each reference point.
x_size
Size of the image for 2D cross-correlation.
y_size
Size of the image for 2D cross-correlation.
blur
The sigma, in pixels, of the Gaussian kernel used to convolve the images.
distance_upper_bound
Maximum distance, in pixels, for KD tree nearest neighbours.
kwargs
Other arguments to pass to ``skimage.registration.phase_cross_correlation``.
Returns
-------
wcs
A tuple with the WCS of the solution and the translation invariant normalized
RMS error between reference and moving image (see ``phase_cross_correlation``).
"""
# Create and blur the reference and measured images.
ref_image = numpy.zeros((y_size, x_size), numpy.float32)
ref_image[reference_xy.astype(int)[:, 1], reference_xy.astype(int)[:, 0]] = 1
if blur > 0:
ref_image = scipy.ndimage.gaussian_filter(ref_image, blur)
meas_image = numpy.zeros((y_size, x_size), numpy.float32)
meas_image[measured_xy.astype(int)[:, 1], measured_xy.astype(int)[:, 0]] = 1
if blur > 0:
meas_image = scipy.ndimage.gaussian_filter(meas_image, blur)
# Calculate the shift and error.
error: float
if cross_corrlation_shift:
shift, error, _ = phase_cross_correlation(ref_image, meas_image, **kwargs)
else:
shift = numpy.array([0.0, 0.0])
error = numpy.nan
# Apply shift.
measured_shift = measured_xy + shift[::-1]
# Associate measured to reference using KD tree.
tree = KDTree(reference_xy)
dd, ii = tree.query(measured_shift, k=1, distance_upper_bound=distance_upper_bound)
# Reject measured objects without a close neighbour.
# KDTree.query() assigns an index larger than the initial set of points when
# the closest neighbour has distance > distance_upper_bound.
valid = dd < len(reference_xy)
ii_valid = ii[valid]
# Select valid measured object and their corresponding RA/Dec references.
measured_xy_valid = measured_xy[valid]
reference_radec_valid = reference_radec[ii_valid, :]
if len(reference_radec_valid) < 3 or len(measured_xy_valid) < 3:
return None, 0
# Build the WCS.
reference_skycoord_valid = SkyCoord(
ra=reference_radec_valid[:, 0],
dec=reference_radec_valid[:, 1],
unit="deg",
)
wcs = fit_wcs_from_points(
(measured_xy_valid[:, 0], measured_xy_valid[:, 1]),
reference_skycoord_valid,
)
assert isinstance(wcs, WCS)
return wcs, error
@dataclass
class GuiderFit:
"""Results of a guider data fit."""
cameras: list[int]
gfa_wok: pandas.DataFrame
astro_wok: pandas.DataFrame
coeffs: list[float]
delta_ra: float
delta_dec: float
delta_rot: float
delta_scale: float
xrms: float
yrms: float
rms: float
rms_data: pandas.DataFrame
fit: pandas.DataFrame
fit_rms: pandas.DataFrame
only_radec: bool = False
[docs]
class GuiderFitter:
"""Fits guider data, returning the measured offsets."""
def __init__(self, observatory: str):
self.observatory = observatory.upper()
self.plate_scale = defaults.PLATE_SCALE[self.observatory]
self.pixel_size = defaults.GFA_PIXEL_SIZE
self.pixel_scale = 1.0 / self.plate_scale * self.pixel_size * 3600 / 1000
self.reference_pixel: tuple[int, int] = (1024, 1024)
self.gfa_wok_coords = self._gfa_to_wok()
self.astro_data: pandas.DataFrame | None = None
self.result: GuiderFit | None = None
[docs]
def reset(self):
"""Clears the astrometric data."""
self.astro_data = None
self.result = None
def _gfa_to_wok(self):
"""Converts GFA reference points to wok coordinates."""
if self.observatory not in calibration.gfaCoords.index.get_level_values(0):
raise CoordIOError(
f"GFA coordinates for {self.observatory} are missing. "
"Did you load the correct calibrations?"
)
gfaCoords = calibration.gfaCoords.loc[self.observatory]
wok_coords = {}
for gfa_id in range(1, 7):
camera_coords: pandas.DataFrame = gfaCoords.loc[gfa_id]
b = camera_coords[["xWok", "yWok", "zWok"]].to_numpy().squeeze()
iHat = camera_coords[["ix", "iy", "iz"]].to_numpy().squeeze()
jHat = camera_coords[["jx", "jy", "jz"]].to_numpy().squeeze()
kHat = camera_coords[["kx", "ky", "kz"]].to_numpy().squeeze()
xt, yt = guideToTangent(self.reference_pixel[0], self.reference_pixel[1])
zt = 0
wok_coords[gfa_id] = tangentToWok(xt, yt, zt, b, iHat, jHat, kHat)
df = pandas.DataFrame.from_dict(wok_coords).transpose()
df.index.name = "gfa_id"
df.columns = ["xwok", "ywok", "zwok"]
return df
[docs]
def add_astro(
self,
camera: str | int,
ra: float,
dec: float,
obstime: float,
x_pixel: float,
y_pixel: float,
):
"""Adds an astrometric measurement."""
if isinstance(camera, str):
match = re.match(r".*([1-6]).*", camera)
if not match:
raise CoordIOError(f"Cannot understand camera {camera!r}.")
camera = int(match.group(1))
new_data = (camera, ra, dec, obstime, x_pixel, y_pixel)
if self.astro_data is None:
self.astro_data = pandas.DataFrame(
[new_data],
columns=["gfa_id", "ra", "dec", "obstime", "x", "y"],
)
else:
self.astro_data.loc[len(self.astro_data.index)] = new_data
[docs]
def add_wcs(
self,
camera: str | int,
wcs: WCS,
obstime: float,
pixels: numpy.ndarray | None = None,
):
"""Adds a camera measurement from a WCS solution."""
if pixels is None:
coords: Any = wcs.pixel_to_world(*self.reference_pixel)
ra = coords.ra.value
dec = coords.dec.value
self.add_astro(camera, ra, dec, obstime, *self.reference_pixel)
else:
ras, decs = wcs.all_pix2world(pixels[:, 0], pixels[:, 1], 0)
for ii in range(len(ras)):
self.add_astro(
camera,
ras[ii],
decs[ii],
obstime,
pixels[ii, 0],
pixels[ii, 1],
)
[docs]
def add_gimg(self, path: str | pathlib.Path, pixels: numpy.ndarray | None = None):
"""Processes a raw GFA image, runs astrometry.net, adds the solution."""
data = fits.getdata(str(path))
header = fits.getheader(str(path), 1)
if header["OBSERVAT"] != self.observatory:
raise CoordIOError("GFA image is from a different observatory.")
if "RAFIELD" not in header or "DECFIELD" not in header:
raise CoordIOError("GFA image does not have RAFIELD or DECFIELD.")
ra_field = header["RAFIELD"]
dec_field = header["DECFIELD"]
pa_field = header["FIELDPA"]
sources = sextractor_quick(data)
camera_id = int(header["CAMNAME"][3])
radec_centre = gfa_to_radec(
self.observatory,
self.reference_pixel[0],
self.reference_pixel[1],
camera_id,
ra_field,
dec_field,
position_angle=pa_field,
)
wcs = astrometrynet_quick(
sources,
radec_centre[0],
radec_centre[1],
self.pixel_scale,
)
if wcs is None:
raise CoordIOError("astrometry.net could not solve image.")
obstime = Time(header["DATE-OBS"], format="iso").jd
self.add_wcs(camera_id, wcs, obstime, pixels=pixels)
return wcs
[docs]
def add_proc_gimg(
self,
path: str | pathlib.Path,
pixels: numpy.ndarray | None = None,
):
"""Adds an astrometric solution from a ``proc-gimg`` image."""
hdu = fits.open(str(path))
header = hdu[1].header
is_proc = len(hdu) > 1 and "SOLVED" in header
if not is_proc:
raise CoordIOError(f"{path!s} is not a proc-gimg image.")
if header["OBSERVAT"] != self.observatory:
raise CoordIOError("GFA image is from a different observatory.")
if not header.get("SOLVED", False):
raise CoordIOError(f"Image {path!s} has not been astrometrically solved.")
wcs = WCS(header)
obstime = Time(header["DATE-OBS"], format="iso").jd
self.add_wcs(header["CAMNAME"], wcs, obstime, pixels=pixels)
[docs]
def fit(
self,
field_ra: float,
field_dec: float,
field_pa: float = 0.0,
offset: tuple[float, float, float] | numpy.ndarray = (0.0, 0.0, 0.0),
scale_rms: bool = False,
only_radec: bool = False,
cameras: list[int] | None = None,
):
"""Performs the fit and returns a `.GuiderFit` object.
The fit is performed using `.umeyama`.
Parameters
----------
field_ra
The field centre RA.
field_dec
The field centre declination.
field_pa
The field centre position angle.
offset
The offset in RA, Dec, PA to apply.
scale_rms
Whether to correct the RMS using the measured scale factor.
only_radec
If `True`, fits only translation. Useful when only one or two cameras
are solving. In this case the rotation and scale offsets will still be
calculated using the Umeyama model, but the ra and dec offsets are
overridden with a simple translation. The `.GuiderFit` object will
have `only_radec=True`.
"""
offset_ra, offset_dec, offset_pa = offset
if self.astro_data is None:
raise CoordIOError("Astro data has not been set.")
camera_ids: list[int] = []
xwok_gfa: list[float] = []
ywok_gfa: list[float] = []
xwok_astro: list[float] = []
ywok_astro: list[float] = []
use_detections: list[bool] = []
used_cameras: set[int] = set([])
for d in self.astro_data.itertuples():
camera_id: int = int(d.gfa_id)
if cameras is None or camera_id in cameras:
used_cameras.add(camera_id)
camera_ids.append(camera_id)
x_wok, y_wok, _ = gfa_to_wok(self.observatory, d.x, d.y, camera_id)
xwok_gfa.append(x_wok)
ywok_gfa.append(y_wok)
_xwok_astro, _ywok_astro, *_ = radec2wokxy(
[d.ra],
[d.dec],
None,
"GFA",
field_ra - offset_ra / numpy.cos(numpy.deg2rad(d.dec)) / 3600.0,
field_dec - offset_dec / 3600.0,
field_pa - offset_pa / 3600.0,
self.observatory,
d.obstime,
focalScale=1.0, # Guider scale is relative to 1
)
xwok_astro += _xwok_astro.tolist()
ywok_astro += _ywok_astro.tolist()
# Use these detections fit unless we are excluding the camera.
use_detections.append(cameras is None or camera_id in cameras)
# Create arrays with all detections, then with only selected ones.
X_all: nt.NDArray[numpy.float32] = numpy.array([xwok_gfa, ywok_gfa])
Y_all: nt.NDArray[numpy.float32] = numpy.array([xwok_astro, ywok_astro])
X_fit = X_all[:, use_detections] # GFA to wok coordinates as a 2D array
Y_fit = Y_all[:, use_detections] # ICRS to wok coordinates as a 2D array
try:
c, R, t = umeyama(X_fit, Y_fit)
except ValueError:
if only_radec is True:
warnings.warn(
"Cannot fit using Umeyama. Assuming unitary rotation and scale.",
CoordIOUserWarning,
)
c = 1.0
R = numpy.array([[1, 0], [1, 0]])
t = numpy.array([0.0, 0.0])
else:
return False
if only_radec is True:
# Override the translation component with a simple average translation
t = (Y_fit - X_fit).mean(axis=1).T
coeffs = [c, R, t]
# delta_x and delta_y only align with RA/Dec if PA=0. Otherwise we need to
# project using the PA.
pa_rad = numpy.deg2rad(field_pa - offset_pa / 3600.0)
delta_ra = t[0] * numpy.cos(pa_rad) + t[1] * numpy.sin(pa_rad)
delta_dec = -t[0] * numpy.sin(pa_rad) + t[1] * numpy.cos(pa_rad)
# Convert to arcsec and round up
delta_ra = float(numpy.round(delta_ra / self.plate_scale * 3600.0, 3))
delta_dec = float(numpy.round(delta_dec / self.plate_scale * 3600.0, 3))
delta_rot = -numpy.rad2deg(numpy.arctan2(R[1, 0], R[0, 0])) * 3600.0
delta_rot = float(numpy.round(delta_rot, 1))
delta_scale = float(numpy.round(c, 6))
detection_id = numpy.arange(len(xwok_astro)) + 1
astro_wok = pandas.DataFrame.from_records(
{
"gfa_id": camera_ids,
"detection_id": detection_id,
"xwok": xwok_astro.copy(),
"ywok": ywok_astro.copy(),
"selected": numpy.array(use_detections).astype(int),
},
index=["gfa_id", "detection_id"],
)
gfa_wok = pandas.DataFrame.from_records(
{
"gfa_id": camera_ids,
"detection_id": detection_id,
"xwok": xwok_gfa.copy(),
"ywok": ywok_gfa.copy(),
"selected": numpy.array(use_detections).astype(int),
},
index=["gfa_id", "detection_id"],
)
rms_df = self.calculate_rms(
gfa_wok,
astro_wok,
scale=c if scale_rms else 1,
cameras=cameras,
)
fit_data = c * R @ X_all + t[numpy.newaxis].T
fit_df = pandas.DataFrame.from_records(
{
"gfa_id": camera_ids,
"detection_id": detection_id,
"xwok": fit_data[0, :],
"ywok": fit_data[1, :],
"selected": numpy.array(use_detections).astype(int),
},
index=["gfa_id", "detection_id"],
)
fit_rms_df = self.calculate_rms(fit_df, astro_wok, cameras=cameras)
self.result = GuiderFit(
list(used_cameras),
gfa_wok,
astro_wok,
coeffs,
delta_ra,
delta_dec,
delta_rot,
delta_scale,
rms_df.loc[0].xrms,
rms_df.loc[0].yrms,
rms_df.loc[0].rms,
rms_df,
fit_df,
fit_rms_df,
only_radec=only_radec,
)
return self.result
[docs]
def calculate_rms(
self,
gfa_wok: pandas.DataFrame,
astro_wok: pandas.DataFrame,
scale: float = 1,
cameras: list[int] | None = None,
):
"""Calculates the RMS of the measurements.
Parameters
----------
gfa_wok
GFA to wok coordinates data. The data frame contains three columns:
``gfa_id``, ``xwok``, and ``ywok``.
astro_wok
Astrometric solution to wok coordinates data. The data frame contains
three columns: ``gfa_id``, ``xwok``, and ``ywok``.
scale
Factor by which to scale coordinates.
cameras
The cameras to use to calculate the global RMS.
Returns
-------
rms
A data frame with columns ``gfa_id``, ``xrms``, ``yrms``, ``rms``
for the x, y, and combined RMS measurements for each camera. An
additional ``gfa_id=0`` is added with the RMS measurements for all
GFA cameras combined. RMS values are returned in mm on wok coordinates.
"""
def calc_rms(gfa_cam: pandas.DataFrame):
gfa_id = gfa_cam.index.values[0][0]
astro_cam = astro_wok.loc[gfa_id]
delta = gfa_cam - astro_cam
xrms = numpy.sqrt(numpy.mean(delta.xwok**2))
yrms = numpy.sqrt(numpy.mean(delta.ywok**2))
rms = numpy.sqrt(numpy.mean(delta.xwok**2 + delta.ywok**2))
return pandas.Series([xrms, yrms, rms], index=["xrms", "yrms", "rms"])
astro_wok = astro_wok.copy()
gfa_wok = gfa_wok.copy()
gfa_wok.loc[:, ["xwok", "ywok"]] *= scale
rms_df = gfa_wok.groupby("gfa_id").apply(calc_rms)
if cameras is None:
delta = gfa_wok - astro_wok
else:
delta = gfa_wok.loc[cameras] - astro_wok.loc[cameras]
xrms = numpy.sqrt(numpy.mean(delta.xwok**2))
yrms = numpy.sqrt(numpy.mean(delta.ywok**2))
rms = numpy.sqrt(numpy.mean(delta.xwok**2 + delta.ywok**2))
rms_df.loc[0, :] = (xrms, yrms, rms)
return rms_df
[docs]
def plot_fit(self, filename: str | pathlib.Path):
"""Plot the fit results."""
if self.result is None:
raise RuntimeError("fit() needs to be run before plot_fit().")
fig, ax = plt.subplots(2, 3)
fig.tight_layout()
iax = 0
jax = 0
direction = 1
for cam_id in range(1, 7):
one_arcsec = self.plate_scale / 3600.0
camera_note = None
color = "k"
if cam_id in self.result.fit.index:
X = self.result.fit.loc[cam_id].xwok
Y = self.result.fit.loc[cam_id].ywok
U = self.result.astro_wok.loc[cam_id].xwok
V = self.result.astro_wok.loc[cam_id].ywok
U = U - X
V = V - Y
rms_mm = float(self.result.fit_rms.loc[cam_id].rms)
rms = numpy.round(rms_mm / self.plate_scale * 3600, 4)
if self.result.cameras and cam_id not in self.result.cameras:
camera_note = "Not fit"
color = "r"
else:
X = Y = U = V = []
rms = -999
camera_note = "Disabled / not solved"
color = "r"
q = ax[iax][jax].quiver(
X,
Y,
U,
V,
color=color,
units="xy",
angles="xy",
scale_units="inches",
scale=0.15,
)
if cam_id == 1:
ax[iax][jax].quiverkey(q, 0.2, 0.9, one_arcsec, label="1 arcsec")
text1 = ax[iax][jax].text(
0.5,
0.95,
f"RMS: {rms}",
color=color,
horizontalalignment="center",
verticalalignment="center",
transform=ax[iax][jax].transAxes,
)
text1.set_bbox(dict(facecolor="white", alpha=1, edgecolor="none"))
if camera_note:
text2 = ax[iax][jax].text(
0.5,
0.90,
camera_note,
color=color,
horizontalalignment="center",
verticalalignment="center",
transform=ax[iax][jax].transAxes,
)
text2.set_bbox(dict(facecolor="white", alpha=1, edgecolor="none"))
ax[iax][jax].set_title(f"GFA{cam_id}", color=color)
jax += direction
if jax == 3:
jax = -1
iax = 1
direction = -1
fig.savefig(str(filename))
class SolvePointing:
"""
Determines the best-fit telescope boresight pointing from a set of GFA
images. Requires specifying a guess or reference pointing as a starting
point. The reference pointing can be assumed from the gfa image headers.
Parameters
-------------
raCen : float | None = None
RA of field center in degrees. If None, user must specify pt_source
decCen : float | None = None
Dec of field center in degrees. If None, user must specify pt_source
paCen : float | None = None
PA of field center in degrees, If None, user must specify pt_src
pt_source : str | None = None
either "design" or "telescope". Design will solve using robostrategy
intented field center, telescope will solve using the reported
sky position of the telescope boresight. Data pulled from gcam headers.
If raCen, decCen, and/or paCen are specified, those values are used
instead of data from image headers
offset_ra : float = 0
offset in ra to add to the supplied field center (true arcseconds)
offset_dec : float = 0
offset in dec to add to the supplied field center (arcseconds)
offset_pa : float = 0
offset in pa to subtract from supplied field angle (arcseconds)
scale : float | None = None
scale factor for the field
db_conn_st : str
connection string for the db
db_tab_name : str
table in database with gaia info
"""
def __init__(
self,
raCen: float | None = None,
decCen: float | None = None,
paCen: float | None = None,
scale: float | None = None,
pt_source: str | None = None,
offset_ra: float = 0,
offset_dec: float = 0,
offset_pa: float = 0,
db_conn_st: str = "postgresql://sdss_user@operations.sdss.org/sdss5db",
db_tab_name: str = "catalogdb.gaia_dr2_source_g_lt_18"
):
if pt_source is None and None in [raCen, decCen, paCen]:
raise RuntimeError(
"must specify either pt_source or raCen, decCen, paCen"
)
if raCen is None or decCen is None or paCen is None:
if pt_source not in ["telescope", "design"]:
raise RuntimeError(
"pt_source must be either 'telescope' or 'design'"
)
self._raCen = raCen
self._decCen = decCen
self._paCen = paCen
self.pt_source = pt_source
self.offset_ra = offset_ra
self.offset_dec = offset_dec
self.offset_pa = offset_pa
self.scale = scale
self.db_conn_st = db_conn_st
self.db_tab_name = db_tab_name
self.GAIA_EPOCH = 2457206
# populated by add_gimg
self.fieldInit = False
self.observatory = None
self.raCenRef = None
self.decCenRef = None
self.paCenRef = None
self.obsTimeRef = None
self.ipa = None
self.imgNum = None
self.gfaHeaders = {}
self.gfaWCS = {}
self.allCentroids = pandas.DataFrame()
# populated or modified by solve
self.allGaia = pandas.DataFrame()
self.skyDistThresh = None # arcseconds for a match hit
self.raCenMeas = None
self.decCenMeas = None
self.paCenMeas = None
self.scaleMeas = None
self.altCenMeas = None
self.azCenMeas = None
# self.fit_rms = None
self.matchedSources = pandas.DataFrame()
self.translation = numpy.array([0.0, 0.0])
self.rotation = 0.0
def getMetadata(self):
"""Get a list of data that can be easily stuffed in a fits
header.
"""
metaDataList = [
("SOL_RA", self.raCenMeas, "solved RA of pointing (deg)"),
("SOL_DEC", self.decCenMeas, "solved Dec of pointing (deg)"),
("SOL_PA", self.paCenMeas, "solved PA of pointing (deg)"),
("SOL_SCL", self.scaleMeas, "solved scale factor"),
("SOL_ALT", self.altCenMeas, "solved alt of pointing (deg)"),
("SOL_AZ", self.azCenMeas, "solved az of pointing (deg)"),
("SOL_GRMS", self.guide_rms, "guide rms between reference and solved (mm)"),
("SOL_FRMS", self.fit_rms, "rms of fit (mm)"),
("REF_RA", self.raCenRef, "reference RA (deg)"),
("REF_DEC", self.decCenRef, "reference Dec (deg)"),
("REF_PA", self.paCenRef, "reference PA (deg)"),
("REF_SCL", self.scale, "reference scale factor"),
("TAI_MID", self.obsTimeRef.mjd * 24 * 60 * 60, "tai seconds at middle of exposure"),
# ("N_WCS", len(self.gfaWCS), "number of GFAs with astronet solutions"),
("N_STARS", len(self.matchedSources), "number of stars used in fit"),
("N_GFAS", len(set(self.matchedSources.gfaNum)), "number of GFAs used in fit"),
("NITR_WCS", self.nIterWCS, "number of iterations for wcs solve"),
("NITR_ALL", self.nIterAll, "number of iterations for full solve")
]
return metaDataList
def median_zeropoint(self, gfaNum):
_df = self.matchedSources[self.matchedSources.gfaNum == gfaNum]
return numpy.nanmedian(_df.zp)
@property
def plate_scale(self):
# mm per deg
return defaults.PLATE_SCALE[self.observatory]
# @property
# def pixel_scale(self):
# # arcsec per pixel
# return 1.0 / self.plate_scale * defaults.GFA_PIXEL_SIZE * 3600 / 1000
@property
def wokDistThresh(self):
# mm
return self.skyDistThresh / 3600. * self.plate_scale
@property
def xyWokPredictRef(self):
output = radec2wokxy(
self.matchedSources.ra.to_numpy(),
self.matchedSources.dec.to_numpy(), self.GAIA_EPOCH,
"GFA", self.raCenRef, self.decCenRef, self.paCenRef,
self.observatory, self.obsTimeRef.jd, focalScale=self.scaleMeas,
pmra=self.matchedSources.pmra.to_numpy(),
pmdec=self.matchedSources.pmdec.to_numpy(),
parallax=self.matchedSources.parallax.to_numpy(), fullOutput=True
)
xPred = output[0]
yPred = output[1]
return xPred, yPred
def fitWCS(self, gfaNum):
# return a dictionary keyed by GFA ID
_df = self.matchedSources[self.matchedSources.gfaNum==gfaNum]
if len(_df) < 3:
# require at least 3 stars for a wcs solution?
return None
xy = _df[["x", "y"]].to_numpy()
raDec = _df[["ra", "dec"]].to_numpy()
# Build the WCS.
reference_skycoord_valid = SkyCoord(
ra=raDec[:, 0],
dec=raDec[:, 1],
unit="deg",
)
wcs = fit_wcs_from_points(
(xy[:, 0], xy[:, 1]),
reference_skycoord_valid,
)
return wcs
def rms_df(self, fit=False):
"""
returns rms for expected vs solved pointing
----------
A data frame with columns ``gfa_id``, ``xrms``, ``yrms``, ``rms``
for the x, y, and combined RMS measurements for each camera. An
additional ``gfa_id=0`` is added with the RMS measurements for all
GFA cameras combined. RMS values are returned in mm on wok coordinates.
"""
df = self.matchedSources.copy()
if not fit:
# reference rms
xPred, yPred = self.xyWokPredictRef
df["xWokPred"] = xPred
df["yWokPred"] = yPred
_gfa_id = []
_xrms = []
_yrms = []
_rms = []
for gfaNum in list(set(df.gfaNum)):
_df = df[df.gfaNum==gfaNum]
dx = _df.xWokPred - _df.xWokMeas
dy = _df.yWokPred - _df.yWokMeas
_gfa_id.append(gfaNum)
_xrms.append(numpy.sqrt(numpy.mean(dx**2)))
_yrms.append(numpy.sqrt(numpy.mean(dy**2)))
_rms.append(numpy.sqrt(numpy.mean(dx**2+dy**2)))
# finally add total rms under gfa_id = 0
dx = df.xWokPred - df.xWokMeas
dy = df.yWokPred - df.yWokMeas
_gfa_id.append(0)
_xrms.append(numpy.sqrt(numpy.mean(dx**2)))
_yrms.append(numpy.sqrt(numpy.mean(dy**2)))
_rms.append(numpy.sqrt(numpy.mean(dx**2+dy**2)))
out = pandas.DataFrame({
"gfa_id": _gfa_id,
"xrms": _xrms,
"yrms": _yrms,
"rms": _rms,
})
# out.set_index("gfa_id")
return out.set_index("gfa_id")
@property
def guide_rms(self):
# RMS error between stars in reference frame and solved frame
# the measured scale is applied to remove a scale dependence
# on rms
xPred, yPred = self.xyWokPredictRef
xFit = self.matchedSources.xWokPred.to_numpy()
yFit = self.matchedSources.yWokPred.to_numpy()
rms = numpy.sqrt(
numpy.mean((xPred - xFit)**2 + (yPred - yFit)**2)
)
return rms
@property
def fit_rms(self):
return numpy.sqrt(numpy.mean(self.matchedSources.dr**2))
@property
def guide_rms_sky(self):
# arcsec
return self.guide_rms / self.plate_scale * 3600
@property
def fit_rms_sky(self):
# arcsec
return self.fit_rms / self.plate_scale * 3600
@property
def used_cameras(self):
gfaNums = set(self.matchedSources.gfaNum)
return sorted(list(gfaNums))
@property
def n_stars_used(self):
return len(self.matchedSources)
@property
def gfa_wok(self):
gfa_id = self.matchedSources.gfaNum.to_numpy(),
detection_id = numpy.arange(len(gfa_id))
x_wok = self.matchedSources.xWokMeas.to_numpy(),
y_wok = self.matchedSources.yWokMeas.to_numpy(),
selected = numpy.ones(len(gfa_id))
df = pandas.DataFrame(
{
"gfa_id": gfa_id,
"detection_id": detection_id,
"xwok": x_wok,
"ywok": y_wok,
"selected": selected,
},
index=["gfa_id", "detection_id"],
)
return df
@property
def astro_wok(self):
gfa_id = self.matchedSources.gfaNum.to_numpy()
detection_id = numpy.arange(len(gfa_id))
x_wok, y_wok = self.xyWokPredictRef
selected = numpy.ones(len(gfa_id))
df = pandas.DataFrame.from_records(
{
"gfa_id": gfa_id,
"detection_id": detection_id,
"xwok": x_wok,
"ywok": y_wok,
"selected": selected,
},
index=["gfa_id", "detection_id"],
)
return df
@property
def fit_data(self):
gfa_id = self.matchedSources.gfaNum.to_numpy()
detection_id = numpy.arange(len(gfa_id))
x_wok = self.matchedSources.xWokPred.to_numpy()
y_wok = self.matchedSources.yWokPred.to_numpy()
selected = numpy.ones(len(gfa_id))
df = pandas.DataFrame.from_records(
{
"gfa_id": gfa_id,
"detection_id": detection_id,
"xwok": x_wok,
"ywok": y_wok,
"selected": selected,
},
index=["gfa_id", "detection_id"],
)
return df
@property
def guider_fit(self):
rms_df = self.rms_df()
fit_rms_df = self.rms_df(fit=True)
result = GuiderFit(
list(self.used_cameras),
self.gfa_wok,
self.astro_wok,
self.coeffs,
self.delta_ra,
self.delta_dec,
self.delta_rot,
self.delta_scale,
rms_df.loc[0].xrms,
rms_df.loc[0].yrms,
rms_df.loc[0].rms,
rms_df,
self.fit_data,
fit_rms_df,
only_radec=False,
)
return result
@property
def coeffs(self):
rotMat = numpy.array([
[numpy.cos(self.rotation), -numpy.sin(self.rotation)],
[numpy.sin(self.rotation), numpy.cos(self.rotation)]
])
return [self.scaleMeas, rotMat, self.translation]
@property
def delta_ra(self):
cosDec = numpy.cos(numpy.radians(self.decCenMeas))
return (self.raCenMeas - self.raCenRef) * 3600 * cosDec
@property
def delta_dec(self):
return (self.decCenMeas - self.decCenRef) * 3600
@property
def delta_rot(self):
return (self.paCenMeas - self.paCenRef) * 3600
@property
def delta_scale(self):
return self.scaleMeas
def pix2wok(self, x, y, gfaNum):
g = calibration.gfaCoords.loc[(self.observatory, gfaNum), :]
zt = numpy.zeros(len(x))
b = g[["xWok", "yWok", "zWok"]].to_numpy()
iHat = g[["ix", "iy", "iz"]].to_numpy()
jHat = g[["jx", "jy", "jz"]].to_numpy()
kHat = g[["kx", "ky", "kz"]].to_numpy()
xt, yt = guideToTangent(x, y)
xw, yw, zw = tangentToWok(
xt, yt, zt,
b, iHat, jHat, kHat
)
return xw, yw
def wok2pix(self, x, y, gfaNum):
g = calibration.gfaCoords.loc[(self.observatory, gfaNum), :]
zw = numpy.zeros(len(x))
b = g[["xWok", "yWok", "zWok"]].to_numpy()
iHat = g[["ix", "iy", "iz"]].to_numpy()
jHat = g[["jx", "jy", "jz"]].to_numpy()
kHat = g[["kx", "ky", "kz"]].to_numpy()
xt, yt, zt = wokToTangent(
x, y, zw,
b, iHat, jHat, kHat
)
xyPix = tangentToGuide(xt,yt) #,y_0=float(g.y_0), y_1=float(g.y_1))
return xyPix
def gfa2radec(self, gfaNum):
""" get the ra/dec of ccd center, if wcs is available use that """
xw, yw = self.pix2wok(numpy.array([1024]), numpy.array([1024]), gfaNum)
ra, dec, fieldWarn = wokxy2radec(
numpy.array(xw), numpy.array(yw), "GFA", self.raCenMeas,
self.decCenMeas, self.paCenMeas, self.observatory,
self.obsTimeRef.jd, focalScale=self.scaleMeas
)
return ra[0], dec[0]
def initializeField(self, observatory, imgNum, hdr):
self.observatory = observatory
if self.pt_source == "design":
_raCen = hdr["RAFIELD"]
_decCen = hdr["DECFIELD"]
_paCen = hdr["FIELDPA"]
elif self.pt_source == "telescope":
# warning assumes APO for now
# will need to modify for LCO
_raCen = hdr["RA"] # RA is ObjNetPos, RADEG is ObjPos (tcc kws)
_decCen = hdr["DEC"]
_paCen = hdr["ROTPOS"] # wont work for LCO no ROTPOS header
# if no field center was specified, use field center from
# header
if self._raCen is None:
self._raCen = _raCen
if self._decCen is None:
self._decCen = _decCen
if self._paCen is None:
self._paCen = _paCen
# apply offsets if specified
ddec = self.offset_dec / 3600.
self.decCenRef = self._decCen - ddec
dra = self.offset_ra / 3600. / numpy.cos(numpy.radians(self.decCenRef))
self.raCenRef = self._raCen - dra
self.paCenRef = self._paCen - self.offset_pa / 3600.
# initalize the starting point for the iteration at the reference
# pointing
self.raCenMeas = self.raCenRef
self.decCenMeas = self.decCenRef
self.paCenMeas = self.paCenRef
if self.scale is None:
self.scale = defaults.SITE_TO_SCALE[self.observatory]
self.scaleMeas = self.scale
self.tStart = Time(hdr["DATE-OBS"], format="iso", scale="tai")
self.exptime = hdr["EXPTIMEN"]
dt = TimeDelta(self.exptime/2.0, format="sec", scale="tai")
# choose exposure end as reference time because
# it better matches the telescope headers (eg for a pointing model)
self.obsTimeRef = self.tStart + dt
# dt = TimeDelta(hdr["EXPTIME"]/2., format="sec", scale="tai")
# self.obsTimeMid = self.tStart + dt
# # tcc need this in seconds
# self.taiMid = self.obsTimeRef.mjd * 24 * 60 * 60
self.imgNum = imgNum
self.ipa = hdr["IPA"]
if self.observatory == "APO":
self.fieldCenAltRef = hdr["ALT"]
self.fieldCenAzRef = hdr["AZ"]
else:
self.fieldCenAltRef = None
self.fieldCenAzRef = None
self.initField = True
def add_gimg(
self,
img_path: str | pathlib.Path,
centroids: pandas.DataFrame | None = None,
wcs: WCS | None = None,
gain: float | None = None
):
"""
Add gfa data to be used in fitting. Don't add a gfa you don't want
incuded in fitting. Strict checking is done to make sure all images
added have the same exposure number. Expects SDSS style naming and
headers, should work for both gimg*.fits and proc-gimg*.fits files.
Parameters
--------------
img_path
path to a gimg or proc-gimg file
centroids
sep style extracted parameters in pandas.DataFrame form.
Additionally if a "CENTROIDS" extension is present in the fits
file, those will be used. If not present, centroids are extracted
from the data.
wcs
a WCS object
gain
electrons per adu for this camera
"""
img_path = str(img_path)
ff = fits.open(img_path)
if centroids is None or len(centroids)==0:
# extract centroids
centroids = sextractor_quick(ff[1].data)
if len(centroids) == 0:
# no centroids found skip this gfa
ff.close()
return
hdr = dict(ff[1].header)
tokens = img_path.strip(".fits").split("/")[-1].split("-")
imgNum = int(tokens[-1])
gfaNum = int(tokens[-2].strip("gfa").strip("n").strip("s"))
self.gfaHeaders[gfaNum] = hdr
if not self.fieldInit:
if tokens[-2].endswith("n"):
observatory = "APO"
else:
observatory = "LCO"
self.initializeField(observatory, imgNum, hdr)
if imgNum != self.imgNum:
raise RuntimeError(
"May not add gfa files with differing img numbers"
)
# fwhm = 2 * (numpy.log(2) * (centroids.a**2 + centroids.b**2))**0.5
# centroids["fwhm"] = fwhm
centroids["gfaNum"] = gfaNum
# remove saturated sources
# centroids = centroids[centroids.peak < 55000].reset_index(drop=True)
# calculate wok coordinates for each centroid
xWokMeas, yWokMeas = self.pix2wok(
centroids.x.to_numpy(),
centroids.y.to_numpy(),
gfaNum
)
centroids["xWokMeas"] = xWokMeas
centroids["yWokMeas"] = yWokMeas
if wcs is not None and "CTYPE1" in wcs.to_header():
self.gfaWCS[gfaNum] = wcs
# use wcs to calculate on-sky locations
# of centroids note these are off by 0.5
# but leaving so that cherno default rotation and fudge factor
# don't need to change
xyCents = centroids[["x", "y"]].to_numpy()
raDecMeas = numpy.array(wcs.pixel_to_world_values(xyCents))
centroids["raMeas"] = raDecMeas[:, 0]
centroids["decMeas"] = raDecMeas[:, 1]
# import pdb; pdb.set_trace()
centroids["gain"] = gain
self.allCentroids = pandas.concat(
[self.allCentroids, centroids], ignore_index=True
)
# if wcs is not None:
# self.gfaWCS[gfaNum] = WCS(open(wcs).read())
ff.close()
def getGaiaSources(self, ra, dec, radius=0.16, magLimit=18):
query = "SELECT source_id, ra, dec, pmra, pmdec, parallax, phot_g_mean_mag, bp_rp"
query += " FROM %s" % self.db_tab_name
query += " WHERE q3c_radial_query"
query += "(ra, dec, %.4f, %.4f, %.4f)" % (ra, dec, radius)
query += " AND phot_g_mean_mag < %.2f" % magLimit
df = pandas.read_sql(query, self.db_conn_st)
df = df.sort_values("phot_g_mean_mag")
df = df.head(250).reset_index(drop=True)
return df.dropna().reset_index(drop=True)
def _updateZP(self):
xWokPredRef, yWokPredRef = self.xyWokPredictRef
self.matchedSources["xWokPredRef"] = xWokPredRef
self.matchedSources["yWokPredRef"] = yWokPredRef
sdss_g, sdss_r, sdss_i = gaia_mags2sdss_gri(
gaia_g=self.matchedSources.phot_g_mean_mag.to_numpy(),
gaia_bp_rp=self.matchedSources.bp_rp.to_numpy()
)
self.matchedSources["sdss_r"] = sdss_r
# when re-analyzing old images, aperflux is not available in the
# centroids table unless you re-extract sources. For this case, just
# replace it with flux which has always been there.
if "aperflux" in list(self.matchedSources.columns):
flux = self.matchedSources.aperflux
else:
flux = self.matchedSources.flux
zp = 2.5 * numpy.log10(
flux * self.matchedSources.gain / self.exptime
) + sdss_r
self.matchedSources["zp"] = zp
def _matchWCS(self):
"""Note, only works if there was at least 1 wcs solution provided!
"""
raDecGaia = self.allGaia[["ra", "dec"]].to_numpy()
# na values in centroids mean no wcs soln was present, so drop them
cents = self.allCentroids.dropna().reset_index(drop=True)
# cents = cents[cents.gfaNum==1]
raDecMeas = cents[["raMeas", "decMeas"]].to_numpy()
matches, indices, minDists = arg_nearest_neighbor(raDecMeas, raDecGaia)
gaiaNN = self.allGaia.iloc[indices].reset_index(drop=True)
matched = pandas.concat([cents, gaiaNN], axis=1)
matched = matched.loc[:, ~matched.columns.duplicated()].copy()
# only keep matches closer than specified threshold
dra = (matched.ra - matched.raMeas)
dra = dra * numpy.cos(numpy.radians(matched.dec))
ddec = (matched.dec - matched.decMeas)
matched["drSky"] = numpy.sqrt(dra**2 + ddec**2) * 3600
matched = matched[matched.drSky < self.skyDistThresh].reset_index(drop=True)
output = radec2wokxy(
matched.ra.to_numpy(), matched.dec.to_numpy(), self.GAIA_EPOCH,
"GFA", self.raCenMeas, self.decCenMeas, self.paCenMeas,
self.observatory, self.obsTimeRef.jd, focalScale=self.scaleMeas,
pmra=matched.pmra.to_numpy(), pmdec=matched.pmdec.to_numpy(),
parallax=matched.parallax.to_numpy(), fullOutput=True
)
xPred = output[0]
yPred = output[1]
fieldWarn = output[2]
ha = output[3]
pa = output[4]
xFocal = output[5]
yFocal = output[6]
thetaField = output[7]
phiField = output[8]
altPred = output[9]
azPred = output[10]
self.altCenMeas = output[11]
self.azCenMeas = output[12]
matched["xWokPred"] = xPred
matched["yWokPred"] = yPred
matched["dx"] = matched.xWokMeas - matched.xWokPred
matched["dy"] = matched.yWokMeas - matched.yWokPred
matched["dr"] = numpy.sqrt(matched.dx**2 + matched.dy**2)
self.matchedSources = matched
self._updateZP()
def _matchWok(self):
output = radec2wokxy(
self.allGaia.ra.to_numpy(), self.allGaia.dec.to_numpy(), self.GAIA_EPOCH,
"GFA", self.raCenMeas, self.decCenMeas, self.paCenMeas,
self.observatory, self.obsTimeRef.jd, focalScale=self.scaleMeas,
pmra=self.allGaia.pmra.to_numpy(), pmdec=self.allGaia.pmdec.to_numpy(),
parallax=self.allGaia.parallax.to_numpy(), fullOutput=True
)
xPred = output[0]
yPred = output[1]
fieldWarn = output[2]
ha = output[3]
pa = output[4]
xFocal = output[5]
yFocal = output[6]
thetaField = output[7]
phiField = output[8]
altPred = output[9]
azPred = output[10]
self.fieldCenAltMeas = output[11]
self.fieldCenAzMeas = output[12]
self.allGaia["xWokPred"] = xPred
self.allGaia["yWokPred"] = yPred
xyWokMeas = self.allCentroids[["xWokMeas", "yWokMeas"]].to_numpy()
xyWokPredict = self.allGaia[["xWokPred", "yWokPred"]].to_numpy()
matches, indices, minDists = arg_nearest_neighbor(xyWokMeas, xyWokPredict)
gaiaNN = self.allGaia.iloc[indices].reset_index(drop=True)
matched = pandas.concat([self.allCentroids, gaiaNN], axis=1)
matched = matched.loc[:, ~matched.columns.duplicated()].copy()
# reject matches above the threshold
matched["dx"] = matched.xWokMeas - matched.xWokPred
matched["dy"] = matched.yWokMeas - matched.yWokPred
matched["dr"] = numpy.sqrt(matched.dx**2 + matched.dy**2)
goodMatches = matched[matched.dr < self.wokDistThresh]
self.matchedSources = goodMatches.reset_index(drop=True)
self._updateZP()
def _iter(self):
nGFAS = len(set(self.matchedSources.gfaNum))
xyMeas = self.matchedSources[["xWokMeas", "yWokMeas"]].to_numpy()
xyPred = self.matchedSources[["xWokPred", "yWokPred"]].to_numpy()
if nGFAS > 1:
st = SimilarityTransform()
st.estimate(xyPred, xyMeas)
xyFit = st(xyPred)
dxy = xyFit - xyMeas
dRot = st.rotation
dTrans = st.translation
dScale = st.scale
self.rotation += dRot
self.translation += dTrans
else:
dTrans = numpy.mean(xyMeas - xyPred, axis=0)
dxy = xyPred + dTrans - xyMeas
dRot = 0
dScale = 1
# update field center
self.paCenMeas += numpy.degrees(dRot)
rotRad = numpy.radians(self.paCenMeas)
cosRot = numpy.cos(rotRad)
sinRot = numpy.sin(rotRad)
# rotate by PA to get offset directions along ra/dec
rotMat = numpy.array([
[cosRot, sinRot],
[-sinRot, cosRot]
])
dra, ddec = rotMat @ dTrans
self.decCenMeas -= ddec / self.plate_scale
dra = dra / self.plate_scale
dra = dra / numpy.cos(numpy.radians(self.decCenMeas))
self.raCenMeas -= dra
self.scaleMeas /= dScale
def solve(
self,
skyDistThresh: float = 3, # arcseconds
):
self.skyDistThresh = skyDistThresh
# initialize field center to
# user supplied reference
self.nIterWCS = 0
self.nIterAll = 0
if len(self.gfaWCS) > 0:
# match wcs gets gaia sources for matching based on
# wcs if available
allGaia = []
for gfaNum, wcs in self.gfaWCS.items():
ra, dec = wcs.pixel_to_world_values([[1024,1024]])[0]
gaiaDF = self.getGaiaSources(ra,dec)
gaiaDF["gfaNum"] = gfaNum
allGaia.append(gaiaDF)
self.allGaia = pandas.concat(allGaia)
lastRMS = None
self._matchWCS()
for ii in range(10):
# maximum of 10 iters
self._iter()
self._matchWCS()
self.nIterWCS += 1
if lastRMS is not None:
if numpy.abs(lastRMS - self.fit_rms) < 0.003:
break
lastRMS = self.fit_rms
if len(self.gfaHeaders) != len(self.gfaWCS):
# at least one chip missing a WCS,
# add in gfa's without WCS solution and re-fit
gaiaGFAs = list(set(self.allGaia.gfaNum))
for gfaNum in self.gfaHeaders.keys():
if gfaNum in gaiaGFAs:
continue # already got gaia sources for this GFA
ra, dec = self.gfa2radec(gfaNum)
gaiaDF = self.getGaiaSources(ra, dec)
gaiaDF["gfaNum"] = gfaNum
self.allGaia = pandas.concat([self.allGaia, gaiaDF])
lastRMS = None
self._matchWok()
for ii in range(10):
self._iter()
self._matchWok()
self.nIterAll += 1
if lastRMS is not None:
if numpy.abs(lastRMS - self.fit_rms) < 0.003:
break
lastRMS = self.fit_rms
return self.guider_fit
def reSolve(
self,
imgNum: int,
obsTimeRef: Time,
exptime: float,
newCentroids: pandas.DataFrame,
skyDistThresh: float = 3, # arcseconds
):
""" warning only use if:
imgNum = self.imgNum + 1 (next image in sequence)
previous guide rms was small (eg < 1 arcsecond)
pointing reference is same as previous frame
same gfas included as previous frame
centroids needs (at least) columns x,y,peak,gfaNum
date_obs, exptime come from GFA header
this skips all db queries and relies on pre-existing
guide star table and doesn't require any pre-existing WCS
solutions
"""
self.skyDistThresh = skyDistThresh
self.imgNum = imgNum
self.obsTimeRef = obsTimeRef
self.exptime = exptime
dfList = []
for gfaNum, group in newCentroids.groupby("gfaNum"):
# group = group[group.peak < 55000].reset_index(drop=True)
# calculate wok coordinates for each centroid
xWokMeas, yWokMeas = self.pix2wok(
group.x.to_numpy(),
group.y.to_numpy(),
gfaNum
)
group["xWokMeas"] = xWokMeas
group["yWokMeas"] = yWokMeas
dfList.append(group)
self.allCentroids = pandas.concat(dfList).reset_index(drop=True)
lastRMS = None
self.nIterAll = 0
self.nIterWCS = 0
self._matchWok()
for ii in range(10):
self._iter()
self._matchWok()
self.nIterAll += 1
if lastRMS is not None:
if numpy.abs(lastRMS - self.fit_rms) < 0.003:
break
lastRMS = self.fit_rms
return self.guider_fit