#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-08-17
# @Filename: coordinate.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import warnings
from typing import Type, TypeVar
import numpy
from .defaults import VALID_WAVELENGTHS, WAVELENGTH
from .exceptions import CoordinateError, CoordIOError, CoordIOUserWarning
from .site import Site
T = TypeVar('T', bound="Coordinate", covariant=True)
[docs]
def verifySite(kwargs, strict=True):
"""Search through kwargs for site parameter, and verify
that it looks ok. Otherwise raise a CoordIOError
Parameters
------------
kwargs : dict
kwargs passed to Coordinate __new__
strict : bool
if True, a time must be specified on the site
"""
if kwargs.get('site', None) is None:
raise CoordIOError('Site must be passed')
else:
site = kwargs.get('site')
if not isinstance(site, Site):
raise CoordIOError('Site must be passed')
if strict and site.time is None:
raise CoordIOError(
"Time of observation must be specified on Site"
)
[docs]
def verifyWavelength(kwargs, lenArray, strict=True):
"""Search through kwargs for wavelength parameter. If not existent,
return insert GFA wavelength to kwargs (of the right size). If strict and
wavelengths do not correspond to Apogee, Boss, or GFA wavelengths, raise a
CoordIOError
Parameters
-----------
kwargs : dict
kwargs passed to Coordinate __new__
lenArray : int
length of the array to create if wavelength not present in kwargs
strict : bool
if strict is True, wavelengths may only be those corresponding to
Apogee, Boss, or GFA
"""
wls = kwargs.get("wavelength", None)
if wls is None:
# create an array of correct length default to gfa wavelength
wls = numpy.zeros(lenArray) + WAVELENGTH
warnings.warn(
"Warning! Wavelengths not supplied, defaulting to %i angstrom" % WAVELENGTH,
CoordIOUserWarning
)
else:
if hasattr(wls, "__len__"):
# array passed
wls = numpy.array(wls, dtype=numpy.float64)
else:
# single value passed
wls = numpy.zeros(lenArray) + float(wls)
wlSet = set(wls)
if strict and not wlSet.issubset(VALID_WAVELENGTHS):
raise CoordIOError(
"Invalid wavelength passed to FocalPlane \
valid wavelengths are %s" % (str(VALID_WAVELENGTHS))
)
# modify
kwargs["wavelength"] = wls
[docs]
class Coordinate(numpy.ndarray):
r"""Base class to represent a coordinate system.
`.Coordinate` is a wrapper around the `numpy.ndarray` to represent a
series of points in a given coordinate system. It handles most of the
particularities of `subclassing a Numpy array
<https://numpy.org/doc/stable/user/basics.subclassing.html>`__.
A `.Coordinate` instance must be a 2D array of dimensions :math:`N\times M`
in which N is the number of coordinate points, and M are their axis values.
Extra parameters and arrays can be added to the coordinate object. For
example, the following code minimally implements an ICRS coordinate system
::
class ICRS(Coordinate):
__extra_params__ ['name']
__extra_arrays__ = ['epoch', 'pmra', 'pmdec', 'parallax', 'rvel']
Now we can instantiate a set of coordinates ::
>>> icrs = ICRS([[100., 20.], [200., 10], [300., 5.]],
pmra=[1, 2, 0], pmdec=[-0.5, 0.5, 0],
name='my_coordinates')
>>> print(icrs)
ICRS([[100., 20.],
[200., 10.],
[300., 5.]])
>>> print(icrs.pmra)
[1, 2, 0]
>>> print(icrs.parallax)
[0, 0, 0]
>>> print(icrs.name)
my_coordinates
Values in ``__extra_params__`` are propagated to the new instance as
attributes and can be defined with any user-provided value.
``__extra_arrays__`` on the other hand are expected to be arrays with
the same length as the number of coordinates. If the extra array value
is not provided when instantiating the coordinate, it is replaced with
arrays of zeros of size N (the number of coordinates). If you don't want
the default value to be zeros you'll need to override the
``__array_finalize__`` method, for example ::
def __array_finalize__(self, obj):
super().__array_finalize__(obj)
if getattr(obj, 'epoch', None) is None:
self.epoch += 2451545.0
And now the default value for ``epoch`` will be 2451545.
`.Coordinate` takes care of slicing the arrays appropriately. If we slice
the coordinate along the first axis the resulting object will be another
coordinate instance in which both the internal array and the extra ones
are sliced correctly ::
>>> new_icrs = icrs[0:2, :]
>>> print(new_icrs)
ICRS([[100., 20.],
[200., 10.]])
>>> print(icrs.pmra)
[1, 2]
But if we slice it in a way that the resulting array cannot represent the
coordinate system anymore, a simple Numpy array will be returned, without
any of the additional information ::
>>> icrs[:, 0]
array([100., 200., 300.])
"""
__extra_arrays__ = []
__extra_params__ = []
__computed_arrays__ = [] # values that are computed (not passed)
__warn_arrays__ = [] # boolean arrays to indicate warnings
__coord_dim__ = None
def __new__(cls: Type[T], value, **kwargs) -> T:
if value is not None:
value = value.copy() # this prevents weirdness
obj = numpy.asarray(value, dtype=numpy.float64).view(cls)
if len(obj.shape) != 2 or obj.shape[1] < 2:
raise CoordinateError('The input array must be NxM')
if obj.shape[1] < 2:
raise CoordinateError('The input array must be NxM, M>=2.')
if obj.__coord_dim__ is not None and obj.shape[1] != obj.__coord_dim__:
raise CoordinateError(
'The input array must be Nx%i.' % obj.__coord_dim__
)
for kwarg in kwargs:
if kwarg not in obj.__extra_arrays__ and kwarg not in obj.__extra_params__:
raise CoordinateError(f'Invalid input argument {kwarg!r}.')
for param in obj.__extra_params__:
setattr(obj, param, kwargs.get(param, None))
# initialize extra arrays arrays
# set to zeros if not passed
for param in obj.__extra_arrays__:
array = kwargs.get(param, None)
if array is None:
# arrays default to zeros, could cause trouble
# but its nice for things like pm, radVal, etc
# which should probably default to zeros
array = numpy.zeros(obj.shape[0], dtype=numpy.float64)
elif isinstance(array, (numpy.ndarray, list, tuple)):
array = numpy.array(array, dtype=numpy.float64)
else:
array = numpy.tile(array, obj.shape[0])
setattr(obj, param, array)
if array.shape[0] != obj.shape[0]:
raise CoordinateError(f'{param} size does not match '
'the coordinate values.')
# initialize computed arrays to zeros
for param in obj.__computed_arrays__:
array = numpy.zeros(obj.shape[0], dtype=numpy.float64)
setattr(obj, param, array)
# initialize warning arrays to false
for param in obj.__warn_arrays__:
array = numpy.array([False] * obj.shape[0])
setattr(obj, param, array)
return obj
def __array_finalize__(self, obj):
# self is the new array, obj is the old array
if obj is None or not isinstance(obj, numpy.ndarray):
return obj
# This is so that copies and slices of array copy the params.
params = []
params += self.__extra_arrays__
params += self.__extra_params__
params += self.__computed_arrays__
params += self.__warn_arrays__
for param in params:
setattr(self, param, getattr(obj, param, None))
def __getitem__(self, sl):
""" When a coordinate is sliced, slice the extra arrays too,
and carry over params, if that's the right thing to do
Returns
--------
sliced : numpy.ndarray or `.Coordinate`
Depending on how it was sliced...
"""
# handling everything here is tricky
# https://scipy-cookbook.readthedocs.io/items/ViewsVsCopies.html
# is helpful
sliced = super().__getitem__(sl)
if (not isinstance(sliced, numpy.ndarray) or
sliced.ndim != 2 or sliced.shape[1] != self.shape[1]):
return sliced.view(numpy.ndarray)
arrays2slice = []
arrays2slice += self.__extra_arrays__
arrays2slice += self.__computed_arrays__
arrays2slice += self.__warn_arrays__
for param in arrays2slice:
if isinstance(sl, tuple):
# index looks something like arr[2:4,:]
setattr(sliced, param, getattr(sliced, param)[sl[0]])
else:
# index looks something like arr[arr<4]
setattr(sliced, param, getattr(sliced, param)[sl])
return sliced
@property
def coordSysName(self):
return self.__class__.__name__
[docs]
class Coordinate2D(Coordinate):
"""2 dimensional coordinate system
"""
__coord_dim__ = 2
[docs]
class Coordinate3D(Coordinate):
"""3 dimensional coordinate system
"""
__coord_dim__ = 3