import os
import numpy
import pandas
from scipy.interpolate import interp1d
from scipy import optimize
from functools import partial
from astropy import stats
from . import defaults
from .site import Site
from .sky import ICRS, Observed
from .telescope import Field, FocalPlane
from .wok import Wok
[docs]
def wokCurveAPO(r):
"""Curve of the wok at APO at radial position r
Parameters
-----------
r : scalar or 1D array
radius (cylindrical coords) mm
Returns:
---------
result : scalar or 1D array
z (height) of wok surface in mm (0 at vertex)
Curve that was specified for machining the APO wok, decided by Colby J.
"""
A = 9199.322517101522
return A - numpy.sqrt(A**2 - r**2)
[docs]
def wokCurveLCO(r):
"""Curve of the wok LCO at radial position r
Parameters
-----------
r : scalar or 1D array
radius (cylindrical coords) mm
Returns:
---------
result : scalar or 1D array
z (height) of wok surface in mm (0 at vertex)
Curve that was specified for machining the LCO wok, decided by Colby J.
"""
A = 0.000113636363636
B = 0.0000000129132231405
C = 0.0000012336318
return A * r**2 / (1 + numpy.sqrt(1 - B * r**2)) + C * r**2
[docs]
def radec2wokxy(ra, dec, coordEpoch, waveName, raCen, decCen, obsAngle,
obsSite, obsTime, focalScale=None, pmra=None, pmdec=None, parallax=None,
radVel=None, pressure=None, relativeHumidity=0.5,
temperature=10, fullOutput=False, darLambda=None):
r"""
Convert from ra, dec ICRS coords to a flat-wok XY in mm. At obsAngle=0
wok +y is a aligned with +dec, and wok +x is aligned with +ra
Question for José, do we need to enforce a time scale? I think everything
defaults to UTC.
Parameters
------------
ra : numpy.ndarray
Right ascension in ICRS, degrees
dec : numpy.ndarray
Declination in ICRS, degrees
coordEpoch : float
A TDB Julian date, the epoch for the input
coordinates (from the catalog). Defaults to J2000.
waveName : str, or numpy.ndarray
Array elements must be "Apogee", "Boss", or "GFA" strings
raCen : float
Right ascension of field center, in degrees
decCen : float
Declination of field center, in degrees.
obsAngle : float
Position angle for observation. Angle is measured from North
through East to wok +y. So obsAngle of 45 deg, wok +y points NE.
obsSite : str
Either "APO" or "LCO"
obsTime : float
TDB Julian date. The time at which these coordinates will be observed
with the FPS.
focalScale : float or None
Scale factor for conversion between focal and wok coords, found
via dither analysis. Defaults to value in defaults.SITE_TO_SCALE
pmra : numpy.ndarray
A 1D array with the proper motion in the RA axis for the N targets,
in milliarcsec/yr. Must be a true angle, i.e, it must include the
``cos(dec)`` term. Defaults to 0.
pmdec : numpy.ndarray
A 1D array with the proper motion in the RA axis for the N targets,
in milliarcsec/yr. Defaults to 0.
parallax : numpy.ndarray
A 1D array with the parallax for the N targets, in milliarcsec.
Defaults to 0.
radVel : numpy.ndarray
A 1D array with the radial velocity in km/s, positive when receding.
Defaults to 0.
pressure : float
The atmospheric pressure at the site, in millibar (same as hPa). If
not provided the pressure will be calculated using the altitude
:math:`h` and the approximate expression
.. math::
p \sim -p_0 \exp\left( \dfrac{g h M} {T_0 R_0} \right)
where :math:`p_0` is the pressure at sea level, :math:`M` is the molar
mass of the air, :math:`R_0` is the universal gas constant, and
:math:`T_0=288.16\,{\rm K}` is the standard sea-level temperature.
relativeHumidity : float
The relative humidity, in the range :math:`0-1`. Defaults to 0.5.
temperature : float
The site temperature, in degrees Celsius. Defaults to
:math:`10^\circ{\rm C}`.
fullOutput : Bool
If True all intermediate coords are returned
Returns
---------
xWok : numpy.ndarray
x wok coordinate, mm
yWok : numpy.ndarray
y wok coordinate, mm
fieldWarn : numpy.ndarray
boolean array. Where True the coordinate converted should be eyed with
suspicion. (It came from very far off axis).
hourAngle : float
hour angle of field center in degrees
positionAngle : float
position angle of field center in degrees
if fullOutput=True additionally returns:
xFocal : numpy.ndarray
x focal coordinate, mm
yFocal : numpy.ndarray
y focal coordinate, mm
thetaField : numpy.ndarray
azimuthal field coordinate, deg +RA through +Dec
phiField : numpy.ndarray
polar field coordinate, deg off axis
altitude : numpy.ndarray
altitude coordinate, deg
azimuth : numpy.ndarray
azimuth coordinate N=0, E=90, deg
altFieldCen : float
altitude of field center, deg
azFieldCen : float
azimuth of field center, deg
"""
nCoords = len(ra)
# first grab the correct wavelengths for fibers
wavelength = numpy.zeros(nCoords)
if isinstance(waveName, str):
# same wl for all coords
wavelength += defaults.INST_TO_WAVE[waveName]
else:
assert len(waveName) == nCoords
for ii, ft in enumerate(waveName):
wavelength[ii] = defaults.INST_TO_WAVE[ft]
site = Site(
obsSite, pressure=pressure,
temperature=temperature, rh=relativeHumidity
)
site.set_time(obsTime)
# first determine the field center in observed coordinates
# use the guide wavelength for field center
# epoch not needed, no propermotions, etc (josé verify?)
icrsCen = ICRS([[raCen, decCen]])
obsCen = Observed(icrsCen, site=site, wavelength=defaults.INST_TO_WAVE["GFA"])
radec = numpy.array([ra, dec]).T
if pmra is not None:
pmra[numpy.isnan(pmra)] = 0
if pmdec is not None:
pmdec[numpy.isnan(pmdec)] = 0
if parallax is not None:
parallax[numpy.isnan(parallax)] = 0
icrs = ICRS(
radec, epoch=coordEpoch, pmra=pmra, pmdec=pmdec,
parallax=parallax, rvel=radVel
)
# propogate propermotions, etc
icrs = icrs.to_epoch(obsTime, site=site)
if focalScale is None:
focalScale = defaults.SITE_TO_SCALE[obsSite]
# check to see if darLambda was supplied, and if so
# use that wavelength (just for DAR not telescope distortion model)
darWavelength = wavelength.copy()
if darLambda is not None:
if hasattr(darLambda, "__len__"):
assert len(darLambda) == nCoords
darWavelength = darLambda
else:
darWavelength.fill(darLambda)
obs = Observed(icrs, site=site, wavelength=darWavelength)
field = Field(obs, field_center=obsCen)
focal = FocalPlane(field,
wavelength=wavelength,
site=site,
fpScale=focalScale,
use_closest_wavelength=True)
wok = Wok(focal, site=site, obsAngle=obsAngle)
output = (
wok[:, 0], wok[:, 1], focal.field_warn,
float(obsCen.ha), float(obsCen.pa)
)
if fullOutput:
output += (
focal[:, 0], focal[:, 1],
field[:, 0], field[:, 1],
obs[:, 0], obs[:, 1],
obsCen[0][0], obsCen[0][1]
)
return output
[docs]
def wokxy2radec(xWok, yWok, waveName, raCen, decCen, obsAngle,
obsSite, obsTime, focalScale=None, pressure=None, relativeHumidity=0.5,
temperature=10):
r"""
Convert from flat-wok XY (mm) to ra, dec ICRS coords (deg)
Question for José, do we need to enforce a time scale? I think everything
defaults to UTC.
Parameters
------------
xWok : numpy.ndarray
X wok coordinates, mm
yWok : numpy.ndarray
Y wok coordinates, mm
waveName : str, or numpy.ndarray
Array elements must be "Apogee", "Boss", or "GFA" strings
raCen : float
Right ascension of field center, in degrees
decCen : float
Declination of field center, in degrees.
obsAngle : float
Position angle for observation. Angle is measured from North
through East to wok +y. So obsAngle of 45 deg, wok +y points NE.
obsSite : str
Either "APO" or "LCO"
obsTime : float
TDB Julian date. The time at which these coordinates will be observed
with the FPS.
focalScale : float or None
Scale factor for conversion between focal and wok coords, found
via dither analysis. Defaults to value in defaults.SITE_TO_SCALE
pressure : float
The atmospheric pressure at the site, in millibar (same as hPa). If
not provided the pressure will be calculated using the altitude
:math:`h` and the approximate expression
.. math::
p \sim -p_0 \exp\left( \dfrac{g h M} {T_0 R_0} \right)
where :math:`p_0` is the pressure at sea level, :math:`M` is the molar
mass of the air, :math:`R_0` is the universal gas constant, and
:math:`T_0=288.16\,{\rm K}` is the standard sea-level temperature.
relativeHumidity : float
The relative humidity, in the range :math:`0-1`. Defaults to 0.5.
temperature : float
The site temperature, in degrees Celsius. Defaults to
:math:`10^\circ{\rm C}`.
Returns
---------
xWok : numpy.ndarray
x wok coordinate, mm
yWok : numpy.ndarray
y wok coordinate, mm
fieldWarn : numpy.ndarray
boolean array. Where True the coordinate converted should be eyed with
suspicion. (It came from very far off axis).
"""
nCoords = len(xWok)
# first grab the correct wavelengths for fibers
wavelength = numpy.zeros(nCoords)
if isinstance(waveName, str):
# same wl for all coords
wavelength += defaults.INST_TO_WAVE[waveName]
else:
assert len(waveName) == nCoords
for ii, ft in enumerate(waveName):
wavelength[ii] = defaults.INST_TO_WAVE[ft]
site = Site(
obsSite, pressure=pressure,
temperature=temperature, rh=relativeHumidity
)
site.set_time(obsTime)
# first determine the field center in observed coordinates
# use the guide wavelength for field center
# epoch not needed, no propermotions, etc (josé verify?)
icrsCen = ICRS([[raCen, decCen]])
obsCen = Observed(icrsCen, site=site, wavelength=defaults.INST_TO_WAVE["GFA"])
# hack in z wok position of 143 mm
# could do a little better and estimate the curve of focal surface
xyzWok = numpy.zeros((nCoords, 3))
rWok = numpy.sqrt(xWok**2 + yWok**2)
# this is not totally correct
# but probably better than modeling as flat
# project the xy positions onto the 3d surface
# and add 143 (the positioner height)
if obsSite == "APO":
zWok = defaults.POSITIONER_HEIGHT + wokCurveAPO(rWok)
else:
zWok = defaults.POSITIONER_HEIGHT + wokCurveLCO(rWok)
xyzWok[:, 0] = xWok
xyzWok[:, 1] = yWok
xyzWok[:, 2] = zWok
wok = Wok(xyzWok, site=site, obsAngle=obsAngle)
if focalScale is None:
focalScale = defaults.SITE_TO_SCALE[obsSite]
focal = FocalPlane(wok,
wavelength=wavelength,
site=site,
fpScale=focalScale,
use_closest_wavelength=True)
field = Field(focal, field_center=obsCen)
obs = Observed(field, site=site, wavelength=wavelength)
icrs = ICRS(obs, epoch=obsTime)
return icrs[:, 0], icrs[:, 1], field.field_warn
[docs]
def fitsTableToPandas(recarray):
d = {}
if hasattr(recarray, "names"):
names = recarray.names
else:
names = recarray.dtype.names
for name in names:
key = name
if hasattr(key, "decode"):
key = key.decode()
d[key] = recarray[name].byteswap().newbyteorder()
df = pandas.DataFrame(d)
# decode binary data into strings, if present
# https://stackoverflow.com/questions/40389764/how-to-translate-bytes-objects-into-literal-strings-in-pandas-dataframe-pytho
# str_df = df.select_dtypes([numpy.object])
# if len(str_df) > 0:
# str_df = str_df.stack().str.decode('utf-8').unstack()
# for col in str_df:
# df[col] = str_df[col]
return df
[docs]
class MoffatLossProfile(object):
"""
class for calculating fiber loss based on Moffat profile
Parameters
----------
offset: numpy.array
fiber offset
beta: float
Power index of the Moffat profile
FWHM: float
seeing
rfiber: float
radius of the fiber
"""
def __init__(self, offset, beta, FWHM, rfiber=1):
self.offset = offset
self.beta = beta
self.FWHM = FWHM
self.alpha = self.FWHM / (2. * numpy.sqrt(2 ** (1 / self.beta) - 1))
self.amplitude = 1. / self.moffat_norm(1.) # this is the amplitude needed to normalize the Moffat profile
self.rfiber = rfiber
[docs]
def moffat_1D(self, r, amplitude=None): # unit flux at the center
"""
Function computing Moffat 1D profile
Parameters
------------
r: float or numpy.array
Distance from the centre of the profile
Returns
-------
moff_prof: float or numpy.array
1D Moffat profile.
"""
if amplitude is None:
moff_prof = self.amplitude * (1 + (r / self.alpha) ** 2) ** (-self.beta)
else:
moff_prof = amplitude * (1 + (r / self.alpha) ** 2) ** (-self.beta)
return moff_prof
[docs]
def moffat_norm(self, amplitude):
"""
Function computing the normalized the Moffat 1D profile.
Parameters
-----------
amplitude: float
Amplitude of the profile
Returns
-------
moff_prof_norm: float
Normalised Moffat profile.
"""
xmin, xmax, xstep = -7., 7., 0.05 # fixed ln radius steps
# x is the ln radius in units of the r / alpha
steps = numpy.arange(xmin, xmax, xstep)
r = self.alpha * numpy.exp(steps)
norm = numpy.sum(numpy.exp(2 * steps) * self.moffat_1D(r, amplitude=amplitude))
moff_prof_norm = 2 * numpy.pi * self.alpha ** 2 * norm * xstep
return moff_prof_norm
[docs]
def flux_loss(self, offset):
"""
Function computing the flux loss obtained by moving the fiber
across the source in a certain direction.
Prameters
---------
offset: numpy.array
fiber offset
Returns
--------
norm: numpy.array
the flux loss
"""
x = numpy.arange(-self.rfiber, self.rfiber, self.rfiber / 50) # set up a 100x100 Cartesian grid across the fiber
y = numpy.arange(-self.rfiber, self.rfiber, self.rfiber / 50)
X, Y = numpy.meshgrid(x, y)
r = numpy.sqrt(X **2 + Y ** 2)
norm = numpy.zeros(len(offset))
r_ev = (r <= self.rfiber)
for i in range(len(offset)):
r_moffat = numpy.sqrt((X - offset[i]) ** 2 + Y ** 2)
norm[i] = numpy.sum((self.rfiber / 50) ** 2 *
self.moffat_1D(r_moffat[r_ev]))
return norm
[docs]
def func_magloss(self):
"""
Function computing the magnitude loss obtained by moving the fiber
across the source in a certain direction. Note that the magnitude
loss with offset = 0 is not 0, and should coincide with the difference
between PSF and fiber magnitudes.
Prameters
----------
Returns
--------
magloss: numpy.array
the magnitude loss
"""
magloss = numpy.zeros(len(self.offset))
magloss = -2.5 * numpy.log10(self.flux_loss(self.offset)) \
+ 2.5 * numpy.log10(self.flux_loss(numpy.array([0.]))[0])
return magloss
[docs]
def default_Moffat_params():
"""
Returns the dict of default parameters for
the Moffat profile base don observator and lunation
"""
# set beta
beta = {}
beta['bright'] = {'APO': 1.66, 'LCO': 1.78}
beta['dark'] = {'APO': 1.87, 'LCO': 1.90}
# set FWHM
FWHM = {}
FWHM['bright'] = {'APO': 0.57, 'LCO': 0.69}
FWHM['dark'] = {'APO': 0.70, 'LCO': 0.62}
return beta, FWHM
[docs]
class Moffat2dInterp(object):
"""
Create the dict of 1D interpolations function
for a moffat profile offset for various FWHMs.
Object returned includes two dicts, one for APO
and one for LCO for the various fiber sizes.
"""
def __init__(self, Noffset=None, FWHM=None, beta=None):
if Noffset is None:
Noffset = 1500
if FWHM is None:
defualt = default_Moffat_params()[1]
FWHM = [defualt[l][o] for l in ['bright', 'dark'] for o in ['APO', 'LCO']]
if beta is None:
beta = default_Moffat_params()[0]
rfibers = {'APO': 1., 'LCO': 1.33 / 2}
offsets = numpy.zeros((len(FWHM), Noffset))
FWHMs = numpy.zeros((len(FWHM), Noffset))
for i, f in enumerate(FWHM):
FWHMs[i, :] = f
offsets[i, :] = numpy.linspace(0, 50, Noffset)
magloss = numpy.zeros((FWHMs.shape[0], Noffset))
fmagloss = {}
# this is for default case to have it by lunation as well
if isinstance(beta, dict) and 'bright' in beta.keys():
for lunation, beta_lun in zip(beta.keys(), beta.values()):
fmagloss[lunation] = {}
for obs, rfiber in zip(rfibers.keys(), rfibers.values()):
fmagloss[lunation][obs] = {}
b = beta_lun[obs]
for i, f in enumerate(FWHMs[:, 0]):
magloss[i, :] = MoffatLossProfile(offsets[i, :], b, f, rfiber=rfiber).func_magloss()
fmagloss[lunation][obs][f] = interp1d(magloss[i, :], offsets[i, :])
else:
for obs, rfiber in zip(rfibers.keys(), rfibers.values()):
fmagloss[obs] = {}
if isinstance(beta, dict):
b = beta[obs]
else:
b = beta
for i, f in enumerate(FWHMs[:, 0]):
magloss[i, :] = MoffatLossProfile(offsets[i, :], b, f, rfiber=rfiber).func_magloss()
fmagloss[obs][f] = interp1d(magloss[i, :], offsets[i, :])
self.fmagloss = fmagloss
self.beta_interp2d = beta
self.FWHM_interp2d = FWHM
def __call__(self, magloss, FWHM, obsSite, lunation=None):
"""
The cal to return the offset value based on the desired
magloss.
Parameters
-----------
magloss: float or numpy.array
The desired magnitude loss for the object(s)
FWHM: float
The FWHM for the moffat profile that has been calculated
on the init of the object
obsSite: str
The observatory of the observation. Should either be
'APO' or 'LCO'.
lunation: str
If the designmode is bright time ('bright') or dark
time ('dark'). Required if set up using default params.
Returns
-------
r: float or numpy.array
The offset to get the desired magloss in arcseconds.
"""
if 'bright' in self.fmagloss.keys():
if lunation is None:
raise ValueError('Must provide lunation for default function!')
r = self.fmagloss[lunation][obsSite][FWHM](magloss)
else:
r = self.fmagloss[obsSite][FWHM](magloss)
return r
[docs]
def offset_definition(mag, mag_limits, lunation, waveName, obsSite, fmagloss=None,
safety_factor=0., beta=None, FWHM=None, skybrightness=None,
offset_min_skybrightness=None, can_offset=None,
use_type='bright_neigh', mag_limit_ind=None):
"""
Returns the offset needed for object with mag to be
observed at mag_limit.
This is for the piecewise appromixation used based on:
https://wiki.sdss.org/pages/viewpage.action?pageId=100173069
Parameters
----------
mag: float or numpy.array
The magniutde(s) of the objects. For BOSS should be
Gaia G-band and for APOGEE should be 2MASS H-band.
mag_limits: numpy.array
Magnitude limits for the designmode of the design.
This should be an array of length N=10 where indexes
correspond to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K].
This matches the apogee_bright_limit_targets_min or
boss_bright_limit_targets_min (depending on instrument) from
targetdb.DesignMode for the design_mode of the design.
lunation: str:
If the designmode is bright time ('bright') or dark
time ('dark')
waveName: str
Instrument for the fibers offset definition being applied
to. Either 'Boss' or 'Apogee'.
obsSite: str
The observatory of the observation. Should either be
'APO' or 'LCO'.
fmagloss: object
Moffat2dInterp class with the lookup table
for doing the moffat profile inversion. If None,
then table is calculated at function call.
safety_factor: float
Factor to add to mag_limit. Should equal zero for
bright neighbor checks (i.e. remain at default).
beta: float
Power index of the Moffat profile. If None, default set in code.
FWHM: float
seeing for the Moffat profile. If None, default set in code.
skybrightness: float
Sky brightness for the field cadence. Only set if
want to check for offset_flag TOO_DARK (8).
offset_min_skybrightness: float
Minimum sky brightness for the offset. Only set if
want to check for offset_flag TOO_DARK (8).
can_offset: boolean or numpy.array
can_offset value from targetdb for the target(s) to be
offset. Only set if
want to check for offset_flag NO_CAN_OFFSET (16).
use_type: str
Defines type for definition use. 'bright_neigh' is for bright
neighbor check and auto sets mag_limit from mag_limits array.
'offfset' is for offsetting and the index from mag_limits array
is set by mag_limit_ind.
mag_limit_ind: int
when used with use_type='offfset', then this sets what index
from mag_limits array is set as the magnitude limit. Otherwise,
for bright neighbor check this is set in code.
Returns
-------
r: float or numpy.array
offset radius in arcseconds around object(s)
offset_flag: int or numpy.array
bitmask for how offset was set. Flags are:
- 0: offset applied normally (i.e. when mag <= mag_limit)
- 1: no offset applied because mag > mag_limit
- 2: no offset applied because magnitude was null value.
- 8: No offset applied because sky brightness is <=
minimum offset sky brightness
- 16: no offsets applied because can_offset = False
- 32: no offset applied because mag <= offset_bright_limit
(offset_bright_limit is G = 6 for Boss bright time and
G = 13 for Boss dark time, and
H = 1 for Apogee).
"""
# define Null cases for targetdb.magnitude table
cases = [-999, -9999, 999,
0.0, numpy.nan, 99.9, None]
# set magntiude limit for instrument and lunation
if use_type == 'bright_neigh':
if waveName == 'Apogee':
# 2MASS H
mag_limit = mag_limits[8]
elif lunation == 'bright':
# Gaia G
mag_limit = mag_limits[5]
else:
# SDSS r
mag_limit = mag_limits[1]
# for bright_neigh, need exclusion radius for all stars
offset_bright_limit = -9999.
else:
mag_limit = mag_limits[mag_limit_ind]
# only set real mag_limits for offsets
if waveName == 'Boss':
if lunation == 'bright':
offset_bright_limit = 6.
else:
offset_bright_limit = 13.
else:
offset_bright_limit = 1.
# assign correct FWHM
if FWHM is None:
FWHM = default_Moffat_params()[1][lunation][obsSite]
if beta is None:
beta = default_Moffat_params()[0]
# get magloss function
if fmagloss is None:
fmagloss = Moffat2dInterp(beta=beta, FWHM=[FWHM])
if isinstance(mag, float) or isinstance(mag, int):
# make can_offset always True if not supplied
if can_offset is None:
can_offset = True
offset_flag = 0
if mag <= mag_limit and mag not in cases and can_offset and mag > offset_bright_limit:
# linear portion in the wings
r_wings = ((mag_limit + safety_factor) - mag - 8.2) / 0.05
# core area
if beta != fmagloss.beta_interp2d or FWHM not in fmagloss.FWHM_interp2d:
fmagloss = Moffat2dInterp(beta=beta, FWHM=[FWHM])
r_core = fmagloss((mag_limit + safety_factor) - mag, FWHM, obsSite, lunation=lunation)
else:
r_core = fmagloss((mag_limit + safety_factor) - mag, FWHM, obsSite, lunation=lunation)
# tom's old conservative core function
# r_core = 1.5 * ((mag_limit + safety_factor) - mag) ** 0.8
# exlusion radius is the max of each section
r = numpy.nanmax([r_wings, r_core])
else:
r = 0.
if mag > mag_limit:
offset_flag += 1
elif mag in cases:
offset_flag += 2
elif not can_offset:
offset_flag += 16
else:
offset_flag += 32
if skybrightness is not None and offset_min_skybrightness is not None:
if skybrightness <= offset_min_skybrightness:
offset_flag += 8
r = 0.
else:
# make can_offset always True if not supplied
if can_offset is None:
can_offset = numpy.zeros(mag.shape, dtype=bool) + True
# create empty arrays for each portion
r_wings = numpy.zeros(mag.shape)
r_core = numpy.zeros(mag.shape)
# only do calc for valid mags and can_offsets for offset
# to avoid warning
mag_valid = ((mag <= mag_limit) & (~numpy.isin(mag, cases)) &
(~numpy.isnan(mag)) & (can_offset) & (mag > offset_bright_limit))
# set flags
offset_flag = numpy.zeros(mag.shape, dtype=int)
offset_flag[mag > mag_limit] += 1
offset_flag[(numpy.isin(mag, cases)) | (numpy.isnan(mag))] += 2
offset_flag[~can_offset] += 16
offset_flag[(mag <= offset_bright_limit) & ~((numpy.isin(mag, cases)) | (numpy.isnan(mag)))] += 32
# linear portion in the wings
r_wings[mag_valid] = ((mag_limit + safety_factor) - mag[mag_valid] - 8.2) / 0.05
# core area
if beta != fmagloss.beta_interp2d or FWHM not in fmagloss.FWHM_interp2d:
fmagloss = Moffat2dInterp(beta=beta, FWHM=[FWHM])
r_core[mag_valid] = fmagloss((mag_limit + safety_factor) - mag[mag_valid],
FWHM, obsSite, lunation=lunation)
else:
r_core[mag_valid] = fmagloss((mag_limit + safety_factor) - mag[mag_valid],
FWHM, obsSite, lunation=lunation)
# tom's old conservative core function
# r_core[mag_valid] = 1.5 * ((mag_limit + safety_factor) - mag[mag_valid]) ** 0.8
# exlusion radius is the max of each section
r = numpy.nanmax(numpy.column_stack((r_wings,
r_core)),
axis=1)
if skybrightness is not None and offset_min_skybrightness is not None:
if skybrightness <= offset_min_skybrightness:
offset_flag[:] += 8
r[:] = 0.
return r, offset_flag
[docs]
def offset_valid_check(mags, mag_limits, offset_flag, program):
"""
check if the offset returned is a valid value or not for a set of targets
Parameters
----------
mags: numpy.array
The magniutdes of the objects. Should be a 2D, Nx10 array, where
N is number of objects and length of 10 index should correspond
to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K].
mag_limits: numpy.array
Magnitude limits for the designmode of the design.
This should be an array of length N=10 where indexes
correspond to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K].
This matches the apogee_bright_limit_targets_min or
boss_bright_limit_targets_min (depending on instrument) from
targetdb.DesignMode for the design_mode of the design.
offset_flag: numpy.array
bitmask for how offset was set in
numpy array of length N. Flags are:
- 0: offset applied normally (i.e. when mag <= mag_limit)
- 1: no offset applied because mag > mag_limit
- 2: no offset applied because magnitude was null value.
- 8: offsets should not be used as sky brightness is <=
minimum offset sky brightness
- 16: no offsets applied because can_offset = False
- 32: no offset applied because mag <= offset_bright_limit
(offset_bright_limit is G = 6 for Boss bright time and
G = 13 for Boss dark time, and
H = 1 for Apogee).
- 64: no offset applied because no valid magnitude limits
program: numpy.array
The program for each object
Returns
---------
offset_valid: numpy.array
Boolean array if the offset is valid or not
"""
# make bad mag cases nan
cases = [-999, -9999, 999, 0.0, numpy.nan, 99.9, None]
mags[numpy.isin(mags, cases)] = numpy.nan
# check stars that are too bright for design mode
mag_limits = numpy.array(mag_limits)
valid_ind = numpy.where(numpy.array(mag_limits) != -999.0)[0]
mag_bright = numpy.any(mags[:, valid_ind] < mag_limits[valid_ind], axis=1)
# check offset flags to see if should be used or not
offset_valid = numpy.zeros(len(mags), dtype=bool)
for i, fl in enumerate(offset_flag):
# manually check bad flags
if program[i] == "SKY" or "ops" in program[i]:
offset_valid[i] = True
elif 8 & int(fl) and mag_bright[i]:
# if below sky brightness and brighter than mag limit
offset_valid[i] = False
elif 16 & int(fl) and mag_bright[i]:
# if can_offset False and brighter than mag limit
offset_valid[i] = False
elif 32 & int(fl):
# if brighter than safety limit
offset_valid[i] = False
else:
offset_valid[i] = True
return offset_valid
[docs]
def object_offset(mags, mag_limits, lunation, waveName, obsSite, fmagloss=None,
safety_factor=None, beta=None, FWHM=None, skybrightness=None,
offset_min_skybrightness=None, can_offset=None,
check_valid_offset=False, program=None):
"""
Returns the offset needed for object with mag to be
observed at mag_limit. Currently assumption is all offsets
are set in positive RA direction.
Parameters
----------
mags: numpy.array
The magniutdes of the objects. Should be a 2D, Nx10 array, where
N is number of objects and length of 10 index should correspond
to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K].
mag_limits: numpy.array
Magnitude limits for the designmode of the design.
This should be an array of length N=10 where indexes
correspond to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K].
This matches the apogee_bright_limit_targets_min or
boss_bright_limit_targets_min (depending on instrument) from
targetdb.DesignMode for the design_mode of the design.
lunation: str:
If the designmode is bright time ('bright') or dark
time ('dark')
waveName: str
Instrument for the fibers offset definition being applied
to. Either 'Boss' or 'Apogee'.
obsSite: str
The observatory of the observation. Should either be
'APO' or 'LCO'.
fmagloss: object
Moffat2dInterp class with the lookup table
for doing the moffat profile inversion. If None,
then table is calculated at function call.
safety_factor: float
Factor to add to mag_limit. If None, default set in code.
beta: float
Power index of the Moffat profile. If None, default set in code.
FWHM: float
seeing for the Moffat profile. If None, default set in code.
skybrightness: float
Sky brightness for the field cadence. Only set if
want to check for offset_flag TOO_DARK (8).
offset_min_skybrightness: float
Minimum sky brightness for the offset. Only set if
want to check for offset_flag TOO_DARK (8).
can_offset: boolean or numpy.array
can_offset value from targetdb for the target(s) to be
offset. Only set if
want to check for offset_flag NO_CAN_OFFSET (16).
check_valid_offset: boolean
Whether or not to check for valid offsets. Program must be
set for this to be returned as well.
program: numpy.array
Program of the targets being offset. Needed for valid offset
check.
Returns
-------
delta_ra: numpy.array
offset in RA in arcseconds around object(s) in
numpy array of length N
delta_dec: numpy.array
offset in Decl. in arcseconds around object(s) in
numpy array of length N
offset_flag: numpy.array
bitmask for how offset was set in
numpy array of length N. Flags are:
- 0: offset applied normally (i.e. when mag <= mag_limit)
- 1: no offset applied because mag > mag_limit
- 2: no offset applied because magnitude was null value.
- 8: offsets should not be used as sky brightness is <=
minimum offset sky brightness
- 16: no offsets applied because can_offset = False
- 32: no offset applied because mag <= offset_bright_limit
(offset_bright_limit is G = 6 for Boss bright time and
G = 13 for Boss dark time, and
H = 1 for Apogee).
- 64: no offset applied because no valid magnitude limits
offset_valid: numpy.array
Boolean array if the offset is valid or not. Only returned if
check_valid_offset = True and program is not None
"""
# check if 2D array
if len(mags.shape) != 2:
raise ValueError('mags must be a 2D numpy.array of shape (N, 10)')
if mags.shape[1] != 10:
raise ValueError('mags must be a 2D numpy.array of shape (N, 10)')
# add default values for offset function if None supplied
if safety_factor is None:
if lunation == 'bright':
safety_factor = 0.5
else:
safety_factor = 1.0
if FWHM is None:
FWHM = default_Moffat_params()[1][lunation][obsSite]
if beta is None:
beta = default_Moffat_params()[0]
delta_ras = numpy.zeros(mags.shape)
offset_flags = numpy.zeros(mags.shape)
for i in range(len(mag_limits)):
if mag_limits[i] != -999.:
delta_ras[:, i], offset_flags[:, i] = offset_definition(mags[:, i], mag_limits, lunation, waveName,
obsSite, fmagloss=fmagloss,
safety_factor=safety_factor, beta=beta,
FWHM=FWHM, skybrightness=skybrightness,
offset_min_skybrightness=offset_min_skybrightness,
can_offset=can_offset,
use_type='offset', mag_limit_ind=i)
else:
# make artificially less than 0 so this doesnt get chosen
# for max when setting offset_flag
delta_ras[:, i] = numpy.zeros(len(mags[:, i]))
offset_flags[:, i] = numpy.zeros(len(mags[:, i])) + 64
# retain all flags when delta_ra = 0 for all checks
def unique_offset_flags(flags):
unq_flags = []
poss_flags = [1, 2, 8, 16, 32, 64]
for f in flags:
for pf in poss_flags:
if pf & int(f):
unq_flags.append(pf)
total_flags = numpy.sum(numpy.unique(unq_flags))
return numpy.zeros(flags.shape) + total_flags
try:
# catch all where entire row is == 0 or flag 32 thrown
# need to remove flag 32 things
ev_off = (numpy.all(delta_ras == 0., 1)) | (numpy.any(offset_flags == 32, 1))
offset_flags[ev_off] = numpy.apply_along_axis(unique_offset_flags,
1,
offset_flags[ev_off])
except ValueError:
pass
# use max offset
delta_ra = numpy.max(delta_ras, axis=1)
ind_max = numpy.argmax(delta_ras, axis=1)
offset_flag = numpy.array([offset_flags[i, j] for i, j in enumerate(ind_max)],
dtype=int)
delta_dec = numpy.zeros(len(delta_ra))
# zero out things with flag 32 in them but delta_ra > 0
flag_32 = numpy.array([32 & int(fl) for fl in offset_flag], dtype=bool)
delta_ra[(delta_ra > 0) & flag_32] = 0.
if check_valid_offset:
if program is None:
raise ValueError('Must provide program to check valid offsets!')
offset_valid = offset_valid_check(mags, mag_limits, offset_flag, program)
return delta_ra, delta_dec, offset_flag, offset_valid
else:
return delta_ra, delta_dec, offset_flag
def _offset_radec(ra=None, dec=None, delta_ra=0., delta_dec=0.):
"""Offsets ra and dec according to specified amount. From Mike's
robostrategy.Field object
Parameters
----------
ra : numpy.float64 or ndarray of numpy.float64
right ascension, deg
dec : numpy.float64 or ndarray of numpy.float64
declination, deg
delta_ra : numpy.float64 or ndarray of numpy.float64
right ascension direction offset, arcsec
delta_dec : numpy.float64 or ndarray of numpy.float64
declination direction offset, arcsec
Returns
-------
offset_ra : numpy.float64 or ndarray of numpy.float64
offset right ascension, deg
offset_dec : numpy.float64 or ndarray of numpy.float64
offset declination, deg
Notes
-----
Assumes that delta_ra, delta_dec are in proper coordinates; i.e.
an offset of delta_ra=1 arcsec represents the same angular separation
on the sky at any declination.
Carefully offsets in the local directions of ra, dec based on
the local tangent plane (i.e. does not just scale delta_ra by
1/cos(dec))
"""
deg2rad = numpy.pi / 180.
arcsec2rad = numpy.pi / 180. / 3600.
x = numpy.cos(dec * deg2rad) * numpy.cos(ra * deg2rad)
y = numpy.cos(dec * deg2rad) * numpy.sin(ra * deg2rad)
z = numpy.sin(dec * deg2rad)
ra_x = - numpy.sin(ra * deg2rad)
ra_y = numpy.cos(ra * deg2rad)
ra_z = 0.
dec_x = - numpy.sin(dec * deg2rad) * numpy.cos(ra * deg2rad)
dec_y = - numpy.sin(dec * deg2rad) * numpy.sin(ra * deg2rad)
dec_z = numpy.cos(dec * deg2rad)
xoff = x + (ra_x * delta_ra + dec_x * delta_dec) * arcsec2rad
yoff = y + (ra_y * delta_ra + dec_y * delta_dec) * arcsec2rad
zoff = z + (ra_z * delta_ra + dec_z * delta_dec) * arcsec2rad
offnorm = numpy.sqrt(xoff**2 + yoff**2 + zoff**2)
xoff = xoff / offnorm
yoff = yoff / offnorm
zoff = zoff / offnorm
decoff = numpy.arcsin(zoff) / deg2rad
raoff = ((numpy.arctan2(yoff, xoff) / deg2rad) + 360.) % 360.
return(raoff, decoff)
[docs]
def gaia_mags2sdss_gri(gaia_g, gaia_bp=None, gaia_rp=None, gaia_bp_rp=None):
'''
Stolen from Tom Dwelly's dither_work:
https://github.com/sdss/dither_work/blob/main/python/dither_work/utils.py#L75
There are some fairly accurate+reliable G,BP-RP -> SDSS gri
transforms available in the literature. Evans et al (2018,
https://www.aanda.org/articles/aa/full_html/2018/08/aa32756-18/aa32756-18.html)
give transforms for main sequence stars. Paraphrased below:
sdss_g_from_gdr2 = phot_g_mean_mag_gdr2 - (0.13518 - 0.46245*bp_rp_gdr2 +
-0.25171*bp_rp_gdr2**2 + 0.021349*bp_rp_gdr2**3)
sdss_r_from_gdr2 = phot_g_mean_mag_gdr2 - (-0.12879 + 0.24662*bp_rp_gdr2 +
-0.027464*bp_rp_gdr2**2 - 0.049465*bp_rp_gdr2**3)
sdss_i_from_gdr2 = phot_g_mean_mag_gdr2 - (-0.29676 + 0.64728*bp_rp_gdr2 +
-0.10141*bp_rp_gdr2**2)
'''
if (gaia_bp is not None) and (gaia_rp is not None) and (gaia_bp_rp is None):
gaia_bp_rp = gaia_bp - gaia_rp
elif (gaia_bp is None) and (gaia_rp is None) and (gaia_bp_rp is not None):
pass
else:
raise Exception("Error - you must supply either bp and rp or just bp-rp")
sdss_g = gaia_g - (0.13518 - 0.46245 * gaia_bp_rp -
0.25171 * gaia_bp_rp**2 + 0.021349 * gaia_bp_rp**3)
sdss_r = gaia_g - (-0.12879 + 0.24662 * gaia_bp_rp -
0.027464 * gaia_bp_rp**2 - 0.049465 * gaia_bp_rp**3)
sdss_i = gaia_g - (-0.29676 + 0.64728 * gaia_bp_rp -
0.10141 * gaia_bp_rp**2)
return sdss_g, sdss_r, sdss_i
[docs]
def calc_R(xc, yc, x, y):
""" calculate the distance of each 2D points from the center (xc, yc) """
return numpy.sqrt((x-xc)**2 + (y-yc)**2)
[docs]
def f_2(c, x, y):
""" calculate the algebraic distance between the data points and the mean circle centered at c=(xc, yc) """
Ri = calc_R(c[0], c[1], x, y)
return Ri - Ri.mean()
[docs]
def fit_circle(xs, ys, sigma_clip=None):
"""
Fit a circle to xy inputs xs, ys
If sigma clip, iteratavely throw out xs ys outliers until no
outliers remain.
Based on https://scipy-cookbook.readthedocs.io/items/Least_Squares_Circle.html
returns:
xcenter: x center of the circle
ycenter: y center of the circle
radius: radius of the circle
keep: ndarray indicating indices in xs/ys used for fitting.
"""
xs = numpy.array(xs)
ys = numpy.array(ys)
center_estimate = [xs.mean(), ys.mean()]
keep = numpy.array([True]*len(xs))
xkeep = xs[keep]
ykeep = ys[keep]
_f_2 = partial(f_2, x=xkeep, y=ykeep)
center_fit, ier = optimize.leastsq(_f_2, center_estimate)
rs = calc_R(center_fit[0], center_fit[1], xkeep, ykeep)
radius_fit = rs.mean()
if sigma_clip is not None:
# throw out data until there are no outliers
mask = stats.sigma_clip(rs, sigma=sigma_clip, masked=True)
keep = ~mask.mask
xkeep = xs[keep]
ykeep = ys[keep]
_f_2 = partial(f_2, x=xkeep, y=ykeep)
center_fit, ier = optimize.leastsq(_f_2, center_estimate)
rs = calc_R(center_fit[0], center_fit[1], xkeep, ykeep)
radius_fit = rs.mean()
return center_fit[0], center_fit[1], radius_fit, keep