import numbers
import operator
from inspect import getfullargspec
import numpy as np
from glue.core.contracts import contract, ContractsMeta
from glue.core.coordinate_helpers import (dependent_axes, default_world_coords,
pixel2world_single_axis,
world2pixel_single_axis)
from glue.core.subset import InequalitySubsetState
from glue.core.util import join_component_view
from glue.utils import unbroadcast
from glue.logger import logger
__all__ = ['ComponentLink', 'BinaryComponentLink', 'CoordinateComponentLink']
def null(*args):
return None
OPSYM = {operator.add: '+', operator.sub: '-',
operator.truediv: '/', operator.mul: '*',
operator.pow: '**'}
[docs]class ComponentLink(object, metaclass=ContractsMeta):
"""
ComponentLinks represent transformation logic between ComponentIDs
Parameters
----------
comp_from : `list` of :class:`~glue.core.component_id.ComponentID`
The input ComponentIDs
comp_to : :class:`~glue.core.component_id.ComponentID`
The target component ID
using : `func`, optional
The translation function which maps data from ``comp_from`` to
``comp_to``. The using function should satisfy
``using(data[comp_from[0]],...,data[comp_from[-1]]) = desired data``.
If not specifies, this defaults to an identity function.
inverse : `func`, optional
The inverse translation function, if exists
description : `str`
A short description for the link. This is used e.g. in the link editor.
input_names : `list` of `str`, optional
The names to use for the inputs to the ``using`` function. By default
this is determined by inspecting the function signature. This is used
e.g. in the link editor.
output_name : `str`, optional
The name to use for the output of the ``using`` function. This is used
e.g. in the link editor.
Notes
-----
Both ``inverse`` and ``using`` should accept and return numpy arrays.
Examples
--------
::
def hours_to_minutes(hours):
return hours * 60
d = Data(hour=[1, 2, 3])
hour = d.id['hour']
minute = ComponentID('minute')
link = ComponentLink( [hour], minute, using=hours_to_minutes)
link.compute(d) # array([ 60, 120, 180])
d.add_component_link(link)
d['minute'] # array([ 60, 120, 180])
"""
@contract(using='callable|None',
inverse='callable|None')
def __init__(self, comp_from, comp_to, using=None, inverse=None,
inverse_component_link=None, description=None,
input_names=None, output_name=None):
from glue.core.data import ComponentID
from glue.core.link_helpers import identity
self._from = comp_from
self._to = comp_to
if using is None:
using = identity
if using is identity:
if inverse is None:
inverse = identity
elif inverse is identity:
pass
else:
raise ValueError("Cannot specify inverse if using is identity")
self._using = using
self._inverse = inverse
self.identity = self._using is identity
# NOTE: the getattr(using, 'func', using) in the following code
# is to make sure that things work properly if the functions are
# PartialResult objects.
self.description = description or ''
self.input_names = input_names or getfullargspec(getattr(using, 'func', using))[0]
self.output_name = output_name or 'output'
if not isinstance(comp_from, list):
raise TypeError("comp_from must be a list: %s" % type(comp_from))
if not all(isinstance(f, ComponentID) for f in self._from):
raise TypeError("from argument is not a list of ComponentIDs: %s" %
self._from)
if not isinstance(self._to, ComponentID):
raise TypeError("to argument is not a ComponentID: %s" %
type(self._to))
if using is identity:
if len(comp_from) != 1:
raise TypeError("comp_from must have only 1 element, "
"or a 'using' function must be provided")
if inverse_component_link is None:
if inverse is not None:
if len(comp_from) == 1:
self._inverse_component_link = ComponentLink([self._to], self._from[0],
using=self._inverse,
inverse=self._using,
inverse_component_link=self)
else:
raise ValueError("Can only use an inverse with one comp_from link")
else:
self._inverse_component_link = None
else:
self._inverse_component_link = inverse_component_link
[docs] @contract(data='isinstance(Data)', view='array_view')
def compute(self, data, view=None):
"""
For a given data set, compute the component comp_to given the data
associated with each comp_from and the ``using`` function
This raises an :class:`glue.core.exceptions.IncompatibleAttribute` if the
data set doesn't have all the ComponentIDs needed for the transformation
Parameters
----------
data : `~glue.core.data.Data`
The data set to use.
view : `slice` or `tuple`, optional
Optional view (e.g. slice) through the data to use.
Returns
-------
result
The data associated with comp_to component
"""
# First we get the values of all the 'from' components.
args = [data[join_component_view(f, view)] for f in self._from]
# We keep track of the original shape of the arguments
original_shape = args[0].shape
logger.debug("shape of first argument: %s", original_shape)
# We now unbroadcast the arrays to only compute the link with the
# smallest number of values we can. This can help for cases where
# the link depends only on e.g. pixel components or world coordinates
# that themselves only depend on a subset of pixel components.
# Unbroadcasting is the act of returning the smallest array that
# contains all the information needed to be broadcasted back to its
# full value
args = [unbroadcast(arg) for arg in args]
# We now broadcast these to the smallest common shape in case the
# linking functions don't know how to broadcast arrays with different
# shapes.
args = np.broadcast_arrays(*args)
# We call the actual linking function
result = self._using(*args)
# We call asarray since link functions may return Python scalars in some cases
result = np.asarray(result)
# In some cases, linking functions return ravelled arrays, so we
# fix this here.
logger.debug("shape of result: %s", result.shape)
if result.shape != args[0].shape:
logger.debug("ComponentLink function %s changed shape. Fixing",
self._using.__name__)
result.shape = args[0].shape
# Finally we broadcast the final result to desired shape
result = np.broadcast_to(result, original_shape)
return result
def __contains__(self, cid):
return cid in self._from or cid is self._to
[docs] def get_from_ids(self):
""" The list of input ComponentIDs """
return self._from
[docs] @contract(old='isinstance(ComponentID)', new='isinstance(ComponentID)')
def replace_ids(self, old, new):
"""Replace all references to an old ComponentID with references to new.
Parameters
----------
old : :class:`~glue.core.component_id.ComponentID`
ComponentID to replace.
new : :class:`~glue.core.component_id.ComponentID`
ComponentID to replace with.
"""
for i, f in enumerate(self._from):
if f is old:
self._from[i] = new
if self._to is old:
self._to = new
[docs] @contract(_from='list(isinstance(ComponentID))')
def set_from_ids(self, _from):
if len(_from) != len(self._from):
raise ValueError("New ID list has the wrong length.")
self._from = _from
[docs] def get_to_id(self):
""" The target ComponentID """
return self._to
[docs] def get_to_ids(self):
return [self.get_to_id()]
[docs] def set_to_id(self, to):
self._to = to
[docs] def get_using(self):
""" The transformation function """
return self._using
[docs] @property
def inverse(self):
if self._inverse is None:
return None
else:
return self._inverse_component_link
[docs] def get_inverse(self):
""" The inverse transformation, or None """
return self._inverse
def __str__(self):
from glue.core.link_helpers import identity
args = ", ".join([t.label for t in self._from])
if self._using is identity:
result = "%s <-> %s" % (self._to, self._from[0])
else:
if self._inverse is None:
result = "%s <- %s(%s)" % (self._to, self._using.__name__, args)
else:
result = "%s <-> %s(%s)" % (self._to, self._using.__name__, args)
return result
[docs] def to_html(self):
from glue.core.link_helpers import identity
args = ", ".join([t.to_html() for t in self._from])
if self._using is identity:
result = "%s ↔ %s" % (self._to.to_html(), self._from[0].to_html())
else:
if self._inverse is None:
result = "%s ← %s(%s)" % (self._to.to_html(), self._using.__name__, args)
else:
result = "%s ↔ %s(%s)" % (self._to.to_html(), self._using.__name__, args)
return result
def __repr__(self):
return str(self)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __add__(self, other):
return BinaryComponentLink(self, other, operator.add)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __radd__(self, other):
return BinaryComponentLink(other, self, operator.add)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __sub__(self, other):
return BinaryComponentLink(self, other, operator.sub)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __rsub__(self, other):
return BinaryComponentLink(other, self, operator.sub)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __mul__(self, other):
return BinaryComponentLink(self, other, operator.mul)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __rmul__(self, other):
return BinaryComponentLink(other, self, operator.mul)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __div__(self, other):
return BinaryComponentLink(self, other, operator.div)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __rdiv__(self, other):
return BinaryComponentLink(other, self, operator.div)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __truediv__(self, other):
return BinaryComponentLink(self, other, operator.truediv)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __rtruediv__(self, other):
return BinaryComponentLink(other, self, operator.truediv)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __pow__(self, other):
return BinaryComponentLink(self, other, operator.pow)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __rpow__(self, other):
return BinaryComponentLink(other, self, operator.pow)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __lt__(self, other):
return InequalitySubsetState(self, other, operator.lt)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __le__(self, other):
return InequalitySubsetState(self, other, operator.le)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __gt__(self, other):
return InequalitySubsetState(self, other, operator.gt)
@contract(other='isinstance(ComponentID)|component_like|float|int')
def __ge__(self, other):
return InequalitySubsetState(self, other, operator.ge)
[docs]class CoordinateComponentLink(ComponentLink):
@contract(comp_from='list(isinstance(ComponentID))',
comp_to='isinstance(ComponentID)',
coords='isinstance(Coordinates)',
index=int,
pixel2world=bool)
def __init__(self, comp_from, comp_to, coords, index, pixel2world=True):
self.coords = coords
self.index = index
self.pixel2world = pixel2world
# Some coords don't need all pixel coords
# to compute a given world coord, and vice versa
# (e.g., spectral data cubes)
self.ndim = len(comp_from)
self.from_needed = dependent_axes(coords, index)
self._from_all = comp_from
comp_from = [comp_from[i] for i in self.from_needed]
super(CoordinateComponentLink, self).__init__(
comp_from, comp_to, self.using)
[docs] def using(self, *args):
# NOTE: in the past, we set any non-specified arguemnts to 0 for the
# input coordinates, but this caused issues because in astropy.wcs
# if one specifies e.g. (0, 0, 3000.) for (ra, dec, velocity), and if
# (0, 0) for RA/Dec would return (nan, nan) normally, the velocity
# is also NaN even though it is decoupled from the other coordinates.
default = default_world_coords(self.coords)
args2 = [None] * self.ndim
for f, a in zip(self.from_needed, args):
args2[f] = a
for i in range(self.ndim):
if args2[i] is None:
args2[i] = np.broadcast_to(default[self.ndim - 1 - i], args[0].shape)
args2 = tuple(args2)
if self.pixel2world:
return pixel2world_single_axis(self.coords, *args2[::-1], world_axis=self.ndim - 1 - self.index)
else:
return world2pixel_single_axis(self.coords, *args2[::-1], pixel_axis=self.ndim - 1 - self.index)
def __str__(self):
rep = 'pix2world' if self.pixel2world else 'world2pix'
sup = super(CoordinateComponentLink, self).__str__()
return sup.replace('using', rep)
[docs]class BinaryComponentLink(ComponentLink):
"""
A ComponentLink that combines two inputs with a binary function
Parameters
----------
left : :class:`~glue.core.component_id.ComponentID`, :class:`ComponentLink`, or `number`
The first input argument.
right : :class:`~glue.core.component_id.ComponentID`, :class:`ComponentLink`, or `number`
The second input argument.
op : `func`
A function with two inputs that works on numpy arrays.
The ComponentLink represents the logic of applying `op` to the
data associated with the inputs `left` and `right`.
"""
def __init__(self, left, right, op):
from glue.core.data import ComponentID
self._left = left
self._right = right
self._op = op
from_ = []
if isinstance(left, ComponentID):
from_.append(left)
elif isinstance(left, ComponentLink):
from_.extend(left.get_from_ids())
elif not isinstance(left, numbers.Number):
raise TypeError("Cannot create BinaryComponentLink using %s" %
left)
if isinstance(right, ComponentID):
from_.append(right)
elif isinstance(right, ComponentLink):
from_.extend(right.get_from_ids())
elif not isinstance(right, numbers.Number):
raise TypeError("Cannot create BinaryComponentLink using %s" %
right)
to = ComponentID("")
super(BinaryComponentLink, self).__init__(from_, to, null)
[docs] def replace_ids(self, old, new):
super(BinaryComponentLink, self).replace_ids(old, new)
if self._left is old:
self._left = new
elif isinstance(self._left, ComponentLink):
self._left.replace_ids(old, new)
if self._right is old:
self._right = new
elif isinstance(self._right, ComponentLink):
self._right.replace_ids(old, new)
[docs] def compute(self, data, view=None):
left = self._left
right = self._right
if not isinstance(self._left, numbers.Number):
left = data[self._left, view]
if not isinstance(self._right, numbers.Number):
right = data[self._right, view]
# As described in more detail in ComponentLink.compute, we can
# 'unbroadcast' the arrays to ensure a minimal operation
original_shape = None
if isinstance(left, np.ndarray):
original_shape = left.shape
left = unbroadcast(left)
if isinstance(right, np.ndarray):
original_shape = right.shape
right = unbroadcast(right)
if original_shape is not None:
left, right = np.broadcast_arrays(left, right)
result = self._op(left, right)
if original_shape is None:
return result
else:
return np.broadcast_to(result, original_shape)
def __gluestate__(self, context):
left = context.id(self._left)
right = context.id(self._right)
operator = context.do(self._op)
return dict(left=left, right=right, operator=operator)
@classmethod
def __setgluestate__(cls, rec, context):
left = context.object(rec['left'])
right = context.object(rec['right'])
operator = context.object(rec['operator'])
return cls(left, right, operator)
def __str__(self):
sym = OPSYM.get(self._op, self._op.__name__)
return '(%s %s %s)' % (self._left, sym, self._right)
def __repr__(self):
return "<BinaryComponentLink: %s>" % self