Source code for tweakwcs.wcsimage

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module provides support for working with image footprints on the sky and
source catalogs.

:Authors: Mihai Cara

:License: :doc:`LICENSE`

"""

import logging
import numbers
import os
import sys
from copy import deepcopy

import numpy as np
from gwcs.geometry import CartesianToSpherical, SphericalToCartesian
from spherical_geometry.polygon import MalformedPolygonError, SphericalPolygon

from astropy import table
from astropy.utils.decorators import deprecated, deprecated_renamed_argument

from . import __version__  # noqa: F401
from .correctors import WCSCorrector
from .linalg import inv
from .linearfit import SUPPORTED_FITGEOM_MODES, iter_linear_fit
from .wcsutils import planar_rot_3d

__author__ = "Mihai Cara"

__all__ = ["RefCatalog", "WCSGroupCatalog", "WCSImageCatalog", "convex_hull"]

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

_S2C = SphericalToCartesian(name="s2c", wrap_lon_at=180)
_C2S = CartesianToSpherical(name="c2s", wrap_lon_at=180)


def _is_int(n):
    return (isinstance(n, numbers.Integral) and not isinstance(n, bool)) or (
        isinstance(n, np.generic) and np.issubdtype(n, np.integer)
    )


[docs] class WCSImageCatalog: """ A class that holds information pertinent to an image WCS and a source catalog of the sources found in that image. .. warning:: If ``tpwcs.meta`` dictionary contains any of the following keywords ``'catalog'``, ``'name'``, or ``'group_id'``, they will be ignored without warning. """ @deprecated_renamed_argument("tpwcs", "corrector", since="0.8.0") def __init__(self, catalog, corrector, name=None, group_id=None, meta={}): """ Parameters ---------- catalog: astropy.table.Table Source catalog associated with an image. Must contain ``'x'`` and ``'y'`` columns which indicate source coordinates (in pixels) in the associated image. corrector: WCSCorrector ``WCSCorrector``-derived tangent-plane WCS corrector object associated with the image from which the catalog was derived. name: str, None, optional Image catalog's name. This is used to identify catalog during logging. If ``name`` is `None`, the ``name`` of this ``WCSImageCatalog`` object will be set to ``'Unknown'``. group_id: hashable, None, optional Group ID that may be used for identifying catalogs that need to be aligned together. ``group_id`` must be hashable. meta: dict, optional Additional information about image, catalog, and/or WCS to be stored (for convenience) within `WCSImageCatalog` object. """ self.name = name self.group_id = group_id self._catalog = None self._bb_radec = None self.img_bounding_ra = None self.img_bounding_dec = None self.meta = dict(meta) self._fit_info = {"status": "SKIPPED"} self.corrector = corrector self.catalog = catalog self._poly_area = None @property def poly_area(self): """Area of the bounding polygon (in srad).""" return self._poly_area @property @deprecated("0.8.0", obj_type="property") def tpwcs(self): """Get :py:class:`WCSCorrector` WCS.""" return self._corr @tpwcs.setter @deprecated("0.8.0", obj_type="property") def tpwcs(self, tpwcs): """Get/Set catalog's WCS (a :py:class:`WCSCorrector`-derived object). .. note:: Setting the WCS triggers automatic bounding polygon recalculation. Parameters ---------- tpwcs: WCSCorrector ``WCSCorrector``-derived tangent-plane WCS corrector object associated with the image from which the catalog was extracted. """ if not isinstance(tpwcs, WCSCorrector): raise TypeError("Unsupported 'tpwcs' type. 'tpwcs' must be a subtype of WCSCorrector.") self._corr = tpwcs # create spherical polygon bounding the image self.calc_bounding_polygon() @property def corrector(self): """Get :py:class:`WCSCorrector` WCS.""" return self._corr @corrector.setter def corrector(self, corrector): """Get/Set catalog's WCS (a :py:class:`WCSCorrector`-derived object). .. note:: Setting the WCS triggers automatic bounding polygon recalculation. Parameters ---------- corrector: WCSCorrector ``WCSCorrector``-derived tangent-plane WCS corrector object associated with the image from which the catalog was extracted. """ if not isinstance(corrector, WCSCorrector): raise TypeError( "Unsupported 'corrector' type. 'corrector' must be a subtype of WCSCorrector." ) self._corr = corrector # create spherical polygon bounding the image self.calc_bounding_polygon() @property def name(self): """ Get/set catalog's name. This is used to identify catalog during logging. Upon setting, the value will be converted to a `str`. When setting to `None`, the ``name`` will be set to ``'Unknown'``. """ return self._name @name.setter def name(self, name): self._name = "Unknown" if name is None else str(name) @property def group_id(self): """ Get/set :py:class:`WCSImageCatalog` object's group ID that may be used for identifying catalogs that need to be aligned together. ``group_id`` must be hashable. """ return self._group_id @group_id.setter def group_id(self, group_id): try: # check if input is hashable: hash(group_id) except TypeError: raise TypeError("'group_id' must be hashable.") self._group_id = group_id @property def fit_status(self): """ Get/Set fit status. This property is a shortcut to the ``'status'`` key value in the ``fit_info`` dictionary. When the :py:class:`WCSImageCatalog` object is created, ``fit_status`` is initially set to ``'SKIPPED'``. Alignment tools are responsible for updating catalog's fit status. """ return self._fit_info["status"] @fit_status.setter def fit_status(self, fit_status): self._fit_info["status"] = fit_status @property def fit_info(self): """ Get fit information - a dictionary. This class sets only the ``'status'`` field but fitting routines may set additional fields. """ return self._fit_info @property def catalog(self): """Get/set image's catalog.""" return self._catalog @catalog.setter def catalog(self, catalog): if catalog is None: self._catalog = None return if "x" not in catalog.colnames or "y" not in catalog.colnames: raise ValueError("An image catalog must contain 'x' and 'y' columns!") self._catalog = table.Table(catalog.copy(), masked=True) self._catalog.meta["name"] = self._name if "id" not in self._catalog.colnames: # pragma: no branch self._catalog["id"] = np.arange(1, len(self._catalog) + 1) # create spherical polygon bounding the image self.calc_bounding_polygon()
[docs] def det_to_world(self, x, y): """ Convert pixel coordinates to sky coordinates using full (i.e., including distortions) transformations. """ return self._corr.det_to_world(x, y)
[docs] def world_to_det(self, ra, dec): """ Convert sky coordinates to image's pixel coordinates using full (i.e., including distortions) transformations. """ return self._corr.world_to_det(ra, dec)
[docs] def det_to_tanp(self, x, y): """ Convert detector (pixel) coordinates to tangent plane coordinates. """ return self._corr.det_to_tanp(x, y)
[docs] def tanp_to_det(self, x, y): """ Convert tangent plane coordinates to detector (pixel) coordinates. """ return self._corr.tanp_to_det(x, y)
[docs] def tanp_to_world(self, x, y): """ Convert tangent plane coordinates to world coordinates. """ return self._corr.tanp_to_world(x, y)
[docs] def world_to_tanp(self, ra, dec): """ Convert tangent plane coordinates to detector (pixel) coordinates. """ return self._corr.world_to_tanp(ra, dec)
@property def polygon(self): """Get image's (or catalog's) bounding spherical polygon.""" return self._polygon
[docs] def intersection(self, wcsim): """ Compute intersection of this `WCSImageCatalog` object and another `WCSImageCatalog`, `WCSGroupCatalog`, or :py:class:`~spherical_geometry.polygon.SphericalPolygon` object. Parameters ---------- wcsim: WCSImageCatalog, WCSGroupCatalog, SphericalPolygon Another object that should be intersected with this `WCSImageCatalog`. Returns ------- polygon: SphericalPolygon A :py:class:`~spherical_geometry.polygon.SphericalPolygon` that is the intersection of this `WCSImageCatalog` and `wcsim`. """ if isinstance(wcsim, (WCSImageCatalog, WCSGroupCatalog)): return self._polygon.intersection(wcsim.polygon) else: return self._polygon.intersection(wcsim)
# TODO: due to a bug in the sphere package, see # https://github.com/spacetelescope/sphere/issues/74 # intersections with polygons formed as union does not work. # For this reason I re-implement 'intersection_area' below with # a workaround for the bug. # The original implementation should be uncommented once the bug # is fixed. # # def intersection_area(self, wcsim): # """ Calculate the area of the intersection polygon. # """ # return np.fabs(self.intersection(wcsim).area())
[docs] def intersection_area(self, wcsim): """Calculate the area of the intersection polygon.""" if isinstance(wcsim, (WCSImageCatalog, RefCatalog)): return np.fabs(self.intersection(wcsim).area()) else: # this is bug workaround for image groups (multi-unions): area = 0.0 for wim in wcsim: area += np.fabs(self.polygon.intersection(wim.polygon).area()) return area
def _guarded_intersection_area(self, wcsim): """ Calculate the area of the intersection polygon. If some intersections fail due to a bug/limitation of ``spherical_geometry`` then the area of the valid intersections will be returned. If images do not intersect or intersection fails, 0 will be returned. In addition to the area, this function returns the number of times a failure in the ``spherical_geometry`` package occurred resulting in a ``MalformedPolygonError`` error. Return value: ``(area, nfailures)`` """ if isinstance(wcsim, (WCSImageCatalog, RefCatalog)): try: return np.fabs(self.intersection(wcsim).area()), 0 except MalformedPolygonError: return 0.0, 1 else: # this is bug workaround: area = 0.0 nfailures = 0 for wim in wcsim: try: area += np.fabs(self.polygon.intersection(wim.polygon).area()) except MalformedPolygonError: nfailures += 1 continue return area, nfailures def _calc_chip_bounding_polygon(self, stepsize=None): """ Compute image's bounding polygon. Parameters ---------- stepsize: int, None, optional Indicates the maximum separation between two adjacent vertices of the bounding polygon along each side of the image. Corners of the image are included automatically. If `stepsize` is `None`, bounding polygon will contain only vertices of the image. """ if self.corrector is None or self.catalog is None: return if self.corrector.bounding_box is None: # just take max image coordinates from catalogs as bounds: lx = -0.5 ly = -0.5 hx = max(1, int(np.ceil(np.amax(self._catalog["x"])))) - 0.5 hy = max(1, int(np.ceil(np.amax(self._catalog["y"])))) - 0.5 else: ((lx, hx), (ly, hy)) = self.corrector.bounding_box # shrink BB so that we do not get NaNs due to rounding # issues (pixels on the edge could outside the box) lx += 0.5 hx -= 0.5 ly += 0.5 hy -= 0.5 if stepsize is None: nintx = 3 ninty = 3 # codespell:ignore ninty else: nintx = max(2, int(np.ceil((hx - lx) / stepsize))) ninty = max(2, int(np.ceil((hy - ly) / stepsize))) # codespell:ignore ninty xs = np.linspace(lx, hx, nintx + 1, dtype=np.double).tolist() ys = np.linspace( ly, hy, ninty + 1, # codespell:ignore ninty dtype=np.double, ).tolist()[1:-1] nptx = len(xs) npty = len(ys) # bottom borderx = list(xs) bordery = nptx * [ly] # right borderx.extend(npty * [hx]) bordery.extend(ys) # top: borderx.extend(xs[::-1]) bordery.extend(nptx * [hy]) # left: borderx.extend(npty * [lx]) bordery.extend(ys[::-1]) # close: borderx.append(borderx[0]) bordery.append(bordery[0]) ra, dec = self.det_to_world(borderx, bordery) # TODO: for strange reasons, occasionally ra[0] != ra[-1] and/or # dec[0] != dec[-1] (even though we close the polygon in the # previous two lines). Then SphericalPolygon fails because # points are not closed. Therefore we force it to be closed: ra[-1] = ra[0] dec[-1] = dec[0] self.img_bounding_ra = ra self.img_bounding_dec = dec self._bb_radec = (ra, dec) self._polygon = SphericalPolygon.from_radec(ra, dec) def _calc_cat_convex_hull(self): """ Compute convex hull that bounds the sources in the catalog. """ if self.corrector is None or self.catalog is None: return x = self.catalog["x"] y = self.catalog["y"] if len(x) == 0: # no points raise RuntimeError( # pragma: no cover "Unexpected error: Contact software developer" ) elif len(x) > 2: ra, dec = convex_hull(x, y, wcs=self.det_to_world, min_separation=1e-11) # else, for len(x) in [1, 2], use entire image footprint. # TODO: a more robust algorithm should be implemented to deal with # len(x) in [1, 2] cases. self._bb_radec = (ra, dec) self._polygon = SphericalPolygon.from_radec(ra, dec) self._poly_area = np.fabs(self._polygon.area())
[docs] def calc_bounding_polygon(self): """ Calculate bounding polygon of the image or of the sources in the catalog (if catalog was set). """ # we need image's footprint for later: self._calc_chip_bounding_polygon() # create smallest convex spherical polygon bounding all sources: if self.catalog: self._calc_cat_convex_hull()
@property def bb_radec(self): """ Get a tuple of `numpy.ndarray` of RA and DEC of the vertices of the bounding polygon. """ return self._bb_radec
[docs] class WCSGroupCatalog: """ A class that holds together `WCSImageCatalog` image catalog objects whose relative positions are fixed and whose source catalogs should be fitted together to a reference catalog. """ bb_approx_threshold = 50 def __init__(self, images, name=None, bb_policy="auto"): """ Parameters ---------- images: list of WCSImageCatalog A list of `WCSImageCatalog` image catalogs. name: str, None, optional Name of the group. bb_policy: int, {'exact', 'auto'} Describes how to compute the bounding polygon of the group. ``'exact'`` will compute the exact union of bounding boxes of input ``images``. An integer number will *approximate* the bounding box using convex hull if the number of input ``images`` is exceeds the value of ``bb_policy`` and it will switch to exact computations (using unions) otherwise. ``'auto'`` is the same as setting threshold to 50. """ self._catalog = None self._name = name self._poly_area = None self.bb_policy = bb_policy if isinstance(images, WCSImageCatalog): self._images = [images] if images.catalog is None: raise ValueError("Each input WCS image catalog must have a valid catalog.") elif hasattr(images, "__iter__"): if not images: raise ValueError("List of images cannot be empty.") self._images = [] for im in images: if not isinstance(im, WCSImageCatalog): raise TypeError( "Each element of the 'images' parameter " "must be an 'WCSImageCatalog' object." ) if im.catalog is None: raise ValueError("Each input WCS image catalog must have a valid catalog.") self._images.append(im) else: raise TypeError( "Parameter 'images' must be either a single " "'WCSImageCatalog' object or a list of " "'WCSImageCatalog' objects" ) self._catalog = self.create_group_catalog() self.update_bounding_polygon() @property def poly_area(self): """Area of the bounding polygon (in srad).""" return self._poly_area @property def name(self): """Get/set :py:class:`WCSImageCatalog` object's name.""" return self._name @name.setter def name(self, name): self._name = name @property def bb_policy(self): """Get/set :py:class:`WCSImageCatalog` policy for switching to approximate computation of group's bounding box. """ return self._bb_policy @bb_policy.setter def bb_policy(self, bb_policy): if bb_policy == "auto": self._bb_threshold = WCSGroupCatalog.bb_approx_threshold elif bb_policy == "exact": self._bb_threshold = sys.maxsize elif _is_int(bb_policy) and bb_policy >= 0: self._bb_threshold = bb_policy else: raise ValueError( "'bb_policy' must be either 'auto', 'exact', or a non-negative integer number." ) self._bb_policy = bb_policy @property def polygon(self): """Get image's (or catalog's) bounding spherical polygon.""" return self._polygon
[docs] def intersection(self, wcsim): """ Compute intersection of this `WCSGroupCatalog` object and another `WCSImageCatalog`, `WCSGroupCatalog`, or :py:class:`~spherical_geometry.polygon.SphericalPolygon` object. Parameters ---------- wcsim: WCSImageCatalog, WCSGroupCatalog, SphericalPolygon Another object that should be intersected with this `WCSGroupCatalog`. Returns ------- polygon: SphericalPolygon A :py:class:`~spherical_geometry.polygon.SphericalPolygon` that is the intersection of this `WCSGroupCatalog` and `wcsim`. """ if isinstance(wcsim, (WCSGroupCatalog, WCSImageCatalog)): return self._polygon.intersection(wcsim.polygon) else: return self._polygon.intersection(wcsim)
# TODO: due to a bug in the sphere package, see # https://github.com/spacetelescope/sphere/issues/74 # intersections with polygons formed as union does not work. # For this reason I re-implement 'intersection_area' below with # a workaround for the bug. # The original implementation should be uncommented once the bug # is fixed. # # def intersection_area(self, wcsim): # """ Calculate the area of the intersection polygon. # """ # return np.fabs(self.intersection(wcsim).area())
[docs] def intersection_area(self, wcsim): """Calculate the area of the intersection polygon.""" return sum(im.intersection_area(wcsim) for im in self._images)
def _guarded_intersection_area(self, wcsim): """ Calculate the area of the intersection polygon. If some intersections fail due to a bug/limitation of ``spherical_geometry`` then the area of the valid intersections will be returned. If images do not intersect or intersection fails, 0 will be returned. In addition to the area, this function returns the number of times a failure in the ``spherical_geometry`` package occurred resulting in a ``MalformedPolygonError`` error. Return value: ``(area, nfailures)`` """ area = 0 nfailures = 0 for im in self._images: a, nf = im._guarded_intersection_area(wcsim) area += a nfailures += nf return area, nfailures def _aproximate_bb(self): if not self._images: return tanplane_wcs = deepcopy(self._images[0].corrector) tanplane_wcs.wcs.bounding_box = None tpx = [] tpy = [] for im in self._images: if im.corrector is None or im.bb_radec is None: return r, d = im.bb_radec tx, ty = tanplane_wcs.world_to_tanp(r, d) tpx.extend(tx) tpy.extend(ty) ra, dec = convex_hull(tpx, tpy, wcs=tanplane_wcs.tanp_to_world, min_separation=1e-11) self.img_bounding_ra = ra self.img_bounding_dec = dec self._bb_radec = (ra, dec) self._polygon = SphericalPolygon.from_radec(ra, dec) self._poly_area = np.fabs(self._polygon.area())
[docs] def update_bounding_polygon(self): """Recompute bounding polygons of the member images.""" if len(self._images) > self._bb_threshold: self._aproximate_bb() return polygons = [im.polygon for im in self._images] if polygons: try: self._polygon = SphericalPolygon.multi_union(polygons) except MalformedPolygonError: log.warning( "MalformedPolygonError in spherical_geometry. Using " "convex hull instead of multi_union. Alignment order " "may be sub-optimal." ) refcat = RefCatalog(self._catalog) self._polygon = refcat.polygon else: self._polygon = SphericalPolygon([]) self._poly_area = np.fabs(self._polygon.area())
def __len__(self): return len(self._images) def __getitem__(self, idx): return self._images[idx] def __iter__(self): yield from self._images @property def catalog(self): """Get/set image's catalog.""" return self._catalog
[docs] def create_group_catalog(self): """ Combine member's image catalogs into a single group's catalog. Returns ------- group_catalog: astropy.table.Table Combined group catalog. """ catalogs = [] catno = 0 has_weights = None cat_names = [] for image in self._images: catlen = len(image.catalog) if catlen == 0: continue cat_name = image.catalog.meta.get("name", image.name or "Unnamed") cat_names.append(cat_name) if has_weights is None: has_weights = "weight" in image.catalog.colnames elif has_weights != ("weight" in image.catalog.colnames): raise KeyError( "Non-empty catalogs in a group must all " "either have or not have 'weight' column." ) if image.name is None: catname = f"Catalog #{catno:d}" else: catname = image.name col_catname = table.MaskedColumn(catlen * [catname], name="cat_name") col_imcatidx = table.MaskedColumn(catlen * [catno], name="_imcat_idx") col_id = table.MaskedColumn(image.catalog["id"]) col_x = table.MaskedColumn(image.catalog["x"], dtype=np.double) col_y = table.MaskedColumn(image.catalog["y"], dtype=np.double) ra, dec = image.det_to_world(image.catalog["x"], image.catalog["y"]) col_ra = table.MaskedColumn(ra, dtype=np.double, name="RA") col_dec = table.MaskedColumn(dec, dtype=np.double, name="DEC") if has_weights: col_wght = table.MaskedColumn(image.catalog["weight"], dtype=np.double) cat = table.Table( [col_imcatidx, col_catname, col_id, col_x, col_y, col_ra, col_dec, col_wght], masked=True, ) else: cat = table.Table( [col_imcatidx, col_catname, col_id, col_x, col_y, col_ra, col_dec], masked=True ) catalogs.append(cat) catno += 1 catname = os.path.commonprefix(cat_names) if cat_names else None if catno: cat = table.vstack(catalogs, join_type="exact") else: # no catalogs with sources. Create an empty table with required # columns and types: image = self._images[0] if image.name is None: catname = f"Catalog #{catno:d}" else: catname = image.name col_catname = table.MaskedColumn([catname], name="cat_name") col_catname = col_catname[[False]] col_imcatidx = table.MaskedColumn([], dtype=int, name="_imcat_idx") col_id = table.MaskedColumn(image.catalog["id"]) col_x = table.MaskedColumn([], name="x", dtype=np.double) col_y = table.MaskedColumn([], name="y", dtype=np.double) col_ra = table.MaskedColumn([], name="RA", dtype=np.double) col_dec = table.MaskedColumn([], name="DEC", dtype=np.double) cat = table.Table( [col_imcatidx, col_catname, col_id, col_x, col_y, col_ra, col_dec], masked=True ) if catname: cat.meta["name"] = catname return cat
[docs] def get_unmatched_cat(self): """ Retrieve only those sources from the catalog that have **not** been matched to the sources in the reference catalog. """ mask = self._catalog["matched_ref_id"].mask return self._catalog[mask]
[docs] def get_matched_cat(self): """ Retrieve only those sources from the catalog that **have been** matched to the sources in the reference catalog. """ mask = np.logical_not(self._catalog["matched_ref_id"].mask) return self._catalog[mask]
[docs] def recalc_catalog_radec(self): """Recalculate RA and DEC of the sources in the catalog.""" for k, image in enumerate(self._images): idx = self._catalog["_imcat_idx"] == k if not np.any(idx): continue ra, dec = image.det_to_world(self._catalog["x"][idx], self._catalog["y"][idx]) self._catalog["RA"][idx] = ra self._catalog["DEC"][idx] = dec
[docs] def calc_tanp_xy(self, tanplane_wcs): """ Compute x- and y-positions of the sources from the image catalog in the tangent plane. This creates the following columns in the catalog's table: ``'TPx'`` and ``'TPy'``. Parameters ---------- tanplane_wcs: WCSCorrector A `WCSCorrector` object that will provide transformations to the tangent plane to which sources of this catalog a should be "projected". """ if "RA" not in self._catalog.colnames or "DEC" not in self._catalog.colnames: raise RuntimeError( "'recalc_catalog_radec()' should have been run prior to calc_tanp_xy()" ) # compute x & y in the reference WCS: xtp, ytp = tanplane_wcs.world_to_tanp(self.catalog["RA"], self.catalog["DEC"]) self._catalog["TPx"] = table.MaskedColumn(xtp, name="TPx", dtype=np.double, mask=False) self._catalog["TPy"] = table.MaskedColumn(ytp, name="TPy", dtype=np.double, mask=False)
[docs] def match2ref(self, refcat, match=None): """Uses ``xyxymatch`` to cross-match sources between this catalog and a reference catalog. Parameters ---------- refcat: RefCatalog A `RefCatalog` object that contains a catalog of reference sources as well as a valid reference WCS. match: MatchCatalogs, function, None, optional A callable that takes two arguments: a reference catalog and an image catalog. Both catalogs will have columns ``'TPx'`` and ``'TPy'`` that represent the source coordinates in some common (to both catalogs) coordinate system. Returns ------- nmatches: int Number of found matches. mref_idx: numpy.ndarray Integer array indicating indices of sources in the reference catalog that were matched to sources in group's ``catalog``. minput_idx: numpy.ndarray Integer array indicating indices of sources in group's ``catalog`` that were matched to sources in the reference catalog. """ colnames = self._catalog.colnames catlen = len(self._catalog) if match is None: if catlen != len(refcat.catalog): raise ValueError( "When matching is not requested, catalogs " "should have been matched previously and " "have equal lengths." ) log.info( "No matching of sources from '%s' catalog with sources from the " "reference '%s' catalog was requested.", self.name, refcat.name, ) log.info("Catalogs are assumed matched with 1-to-1 correspondence.") mref_idx = np.arange(catlen) minput_idx = np.arange(catlen) nmatches = catlen else: if "TPx" not in colnames or "TPy" not in colnames: raise RuntimeError("'calc_tanp_xy()' should have been run prior to match2ref()") if catlen == 0: return 0, np.array([], dtype=int), np.array([], dtype=int) try: tp_pscale = self._images[0].corrector.tanp_center_pixel_scale except NotImplementedError: tp_pscale = 1.0 finally: tp_units = self._images[0].corrector.units mref_idx, minput_idx = match( refcat.catalog, self._catalog, tp_pscale=tp_pscale, tp_units=tp_units ) nmatches = len(mref_idx) # matched_ref_id: if "matched_ref_id" in colnames: self._catalog["matched_ref_id"].mask[:] = True else: c = table.MaskedColumn(name="matched_ref_id", dtype=int, length=catlen, mask=True) self._catalog.add_column(c) self._catalog["matched_ref_id"].mask[minput_idx] = False self._catalog["matched_ref_id"][minput_idx] = refcat.catalog["id"][mref_idx] # this is needed to index reference catalog directly without using # astropy table indexing which, at this moment, is experimental: if "_raw_matched_ref_idx" in colnames: self._catalog["_raw_matched_ref_idx"].mask = True else: c = table.MaskedColumn(name="_raw_matched_ref_idx", dtype=int, length=catlen, mask=True) self._catalog.add_column(c) self._catalog["_raw_matched_ref_idx"][minput_idx] = mref_idx self._catalog["_raw_matched_ref_idx"].mask[minput_idx] = False log.info("Found %d matches for '%s'...", nmatches, self.name) # TODO: revisit this once the bug described in # https://github.com/spacetelescope/stsci.stimage/issues/8 # is fixed. # # Due to this bug minput_idx may contain duplicate values. # Because of this, the above logic for masking, saving ids, and indices # does not work reliably. As a workaround, we save matched array # indices within the image catalog so that we can use them in # `fit2ref()`. For this to work, reference and image catalog must not # change between this function return and `fit2ref()` call. # self._mref_idx = mref_idx self._minput_idx = minput_idx return nmatches, mref_idx, minput_idx
[docs] def fit2ref( self, refcat, tanplane_wcs, fitgeom="general", nclip=3, sigma=(3.0, "rmse"), clip_accum=False, ): """ Perform linear fit of this group's combined catalog to the reference catalog. When either/both group's catalog or/and the reference catalog contain ``'weight'`` column, weigted fitting will be performed. See ``Notes`` section for further details. Parameters ---------- refcat: RefCatalog A `RefCatalog` object that contains a catalog of reference sources. tanplane_wcs: WCSCorrector A `WCSCorrector` object that will provide transformations to the tangent plane to which sources of this catalog a should be "projected". fitgeom: {'shift', 'rscale', 'general'}, optional The fitting geometry to be used in fitting the matched object lists. This parameter is used in fitting the offsets, rotations and/or scale changes from the matched object lists. The 'general' fit geometry allows for independent scale and rotation for each axis. nclip: int, None, optional Number (a non-negative integer) of clipping iterations in fit. Clipping will be turned off if ``nclip`` is either `None` or 0. sigma: float, tuple of the form (float, str), optional When a tuple is provided, first value (a positive number) indicates the number of "fit error estimates" to use for clipping. The second value (a string) indicates the statistic to be used for "fit error estimate". Currently the following values are supported: ``'rmse'``, ``'mae'``, and ``'std'`` - see `~tweakwcs.linearfit.iter_linear_fit` for more details. When ``sigma`` is a single number, it must be a positive number and the default error estimate ``'rmse'`` is assumed. This parameter is ignored when ``nclip`` is either `None` or 0. clip_accum: bool, optional Indicates whether or not to reset the list of "bad" (clipped out) sources after each clipping iteration. When set to `True` the list only grows with each iteration as "bad" positions never re-enter the pool of available position for the fit. By default the list of "bad" source positions is purged at each iteration. This parameter is ignored when ``nclip`` is either `None` or 0. Notes ----- When fitting image sources to reference catalog sources, we can specify which sources have higher weights. This can be done by assigning a "weight" to each source by specifying these values in the optional ``'weight'`` column of either the reference catalog, image catalog, or both. When weights are not provided, all sources are weighed equally. When only one of image or reference catalog weights are provided, the sources will be weighted with the specified weights. When *both* image *and* reference catalogs specify weights for the same sources, the two weights will be combined into a single weight as: .. math:: 1/w = 1/w_i + 1/w_r .. warning:: Keep in mind that when a group catalog is created from individual catalogs, weights of the group catalog are created by *concatenating* weights of individual catalogs. Therefore, for the weighting of groups of catalogs to work correctly, the weights of individual catalogs should be scaled in such a way that when individual catalogs are combined into a single "group catalog", weights preserve their relative values. For example, let's say a group is formed from two individual catalogs. Let's say first catalog contains four sources with equal weights ``[1,1,1,1]`` and the second catalog contains two sources with weights ``[1,1]`` then the group's catalogs sources will also have equal weights ``[1,1,1,1,1,1]``. However, if each individual catalog's weights were normalized such that sum of all weights is 1, then group's sources will be weighed unequally: ``[0.25,0.25,0.25,0.25,0.5,0.5]``. """ im_xyref = np.asarray([self._catalog["TPx"], self._catalog["TPy"]]).T refxy = np.asarray([refcat.catalog["TPx"], refcat.catalog["TPy"]]).T # mask = np.logical_not(self._catalog['matched_ref_id'].mask) # im_xyref = im_xyref[mask] # ref_idx = self._catalog['_raw_matched_ref_idx'][mask] # TODO: revisit this once the bug described in # https://github.com/spacetelescope/stsci.stimage/issues/8 # is fixed. # # Due to this bug minput_idx may contain duplicate values. # For now we bypass the above commented code by accessing indices # stored in the image's catalog. minput_idx = self._minput_idx im_xyref = im_xyref[minput_idx] ref_idx = self._mref_idx refxy = refxy[ref_idx] # process weights: if "weight" in self._catalog.colnames: im_weight = np.asarray(self._catalog["weight"])[minput_idx] else: im_weight = None if "weight" in refcat.catalog.colnames: ref_weight = np.asarray(refcat.catalog["weight"])[ref_idx] else: ref_weight = None fit = iter_linear_fit( refxy, im_xyref, ref_weight, im_weight, fitgeom=fitgeom, nclip=nclip, sigma=sigma, center=None, clip_accum=clip_accum, ) # re-compute shifts for the center at (0, 0): fit["shift_ld"] += fit["center_ld"] - np.dot(fit["center_ld"], fit["matrix_ld"].T) fit["shift"] = fit["shift_ld"].astype(np.double) xy_fit = fit["shift"] + np.dot(im_xyref[fit["fitmask"]], fit["matrix"].T) fit["fit_xy"] = xy_fit fit["fit_RA"], fit["fit_DEC"] = tanplane_wcs.tanp_to_world(*(xy_fit.T)) log.info("Computed '%s' fit for '%s':", fitgeom, self.name) if fitgeom == "shift": log.info("XSH: %.6g YSH: %.6g", fit["shift"][0], fit["shift"][1]) elif fitgeom in ["rshift", "rscale"] and fit["proper"]: log.info( "XSH: %.6g YSH: %.6g ROT: %.6g SCALE: %.6g", fit["shift"][0], fit["shift"][1], fit["proper_rot"], fit["<scale>"], ) elif fitgeom == "general" or (fitgeom in ["rshift", "rscale"] and not fit["proper"]): log.info( "XSH: %.6g YSH: %.6g PROPER ROT: %.6g", fit["shift"][0], fit["shift"][1], fit["proper_rot"], ) log.info( "<ROT>: %.6g SKEW: %.6g ROT_X: %.6g ROT_Y: %.6g", fit["<rot>"], fit["skew"], fit["rot"][0], fit["rot"][1], ) log.info( "<SCALE>: %.6g SCALE_X: %.6g SCALE_Y: %.6g", fit["<scale>"], fit["scale"][0], fit["scale"][1], ) else: raise ValueError("Unsupported fit geometry.") # pragma: no cover log.info("") log.info("FIT RMSE: %.6g FIT MAE: %.6g", fit["rmse"], fit["mae"]) log.info("Final solution based on %d objects.", fit["resids"].shape[0]) return fit
[docs] def apply_affine_to_wcs(self, ref_tpwcs, matrix, shift, meta=None): """Applies a general affine transformation to the WCS.""" for imcat in self: imcat.corrector.set_correction(matrix, shift, ref_tpwcs=ref_tpwcs, meta=meta)
[docs] def align_to_ref( self, refcat, ref_tpwcs=None, match=None, minobj=None, fitgeom="rscale", nclip=3, sigma=(3.0, "rmse"), clip_accum=False, ): """ Matches sources from the image catalog to the sources in the reference catalog, finds the affine transformation between matched sources, and adjusts images' WCS according to this fit. Upon successful return, this function will also set the following fields of the ``fit_info`` attribute of each member `WCSImageCatalog` object: * **'fitgeom'**: the value of the ``fitgeom`` argument * **'eff_minobj'**: effective value of the ``minobj`` parameter * **'matrix'**: computed rotation matrix * **'shift'**: shift (offset) along X- and Y-axis in units of the *tangent plane* coordinates. * **'rot'**: A tuple of ``(rotx, roty)`` - the rotation angles with regard to the ``X`` and ``Y`` axes. * **'<rot>'**: *Arithmetic mean* of the angles of rotation around ``X`` and ``Y`` axes. * **'proper_rot'**: rotation angle as if rotation is a proper rotation. * **'proper'**: Indicates whether the rotation is a proper rotation (boolean) * **'scale'**: A tuple of ``(sx, sy)`` - scale change in the direction of the ``X`` and ``Y`` axes. * **'<scale>'**: *Geometric mean* of scales ``sx`` and ``sy``. * **'skew'**: Computed skew - an angle in the range [-180, 180). * **'center'**: Center of rotation in the *tangent plane* of the computed linear transformations. * **'fitmask'**: boolean array indicating (with `True`) sources **used** for fitting * **'nmatches'** [when ``match`` is not `None`]: number of matched sources * **'matched_ref_idx'** [when ``match`` is not `None`]: indices of the matched sources in the reference catalog * **'matched_input_idx'** [when ``match`` is not `None`]: indices of the matched sources in the "input" catalog (the catalog from image to be aligned) * **'fit_ref_idx'**: indices of the sources from the reference catalog used for fitting (a subset of 'matched_ref_idx' indices, when ``match`` is not `None`, left after clipping iterations performed during fitting) * **'fit_input_idx'**: indices of the sources from the "input" (image) catalog used for fitting (a subset of 'matched_input_idx' indices, when ``match`` is not `None`, left after clipping iterations performed during fitting) * **'rmse'**: fit Root-Mean-Square Error in *tangent plane* coordinates of corrected image source positions from reference source positions. * **'mae'**: fit Mean Absolute Error in *tangent plane* coordinates of corrected image source positions from reference source positions. * **'std'**: Norm of the STandard Deviation of the residuals in *tangent plane* along each axis. * **'fit_RA'**: first (corrected) world coordinate of input source positions used in fitting. * **'fit_DEC'**: second (corrected) world coordinate of input source positions used in fitting. * **'status'**: Alignment status. Currently two possible status are possible ``'SUCCESS'`` or ``'FAILED: reason for failure'``. When alignment failed, the reason for failure is provided after alignment status. .. note:: A ``'SUCCESS'`` status does not indicate a "good" alignment. It simply indicates that alignment algorithm has completed without errors. Use other fields to evaluate alignment: fit ``RMSE`` and ``MAE`` values, number of matched sources, etc. .. note:: Many quantities in ``fit_info`` are in units of the *tangent plane* coordinates, e.g., ``shift``, ``rmse``, ``std``, ``mae``. See specific ``WCSCorrector`` in `~tweakwcs.correctors` for the units of the tangent plane. Parameters ---------- refcat: RefCatalog A `RefCatalog` object that contains a catalog of reference sources. ref_tpwcs: WCSCorrector A `WCSCorrector` object that defines a projection tangent plane to be used for matching and fitting during alignment. match: MatchCatalogs, function, None, optional A callable that takes two arguments: a reference catalog and an image catalog. minobj: int, None, optional Minimum number of identified objects from each input image to use in matching objects from other images. If the default `None` value is used then `align` will automatically deternmine the minimum number of sources from the value of the `fitgeom` parameter. This parameter is used to interrupt alignment process (catalog fitting, ``WCS`` "tweaking") when the number of matched sources is smaller than the value of ``minobj`` in which case this method will return `False`. fitgeom: {'shift', 'rscale', 'general'}, optional The fitting geometry to be used in fitting the matched object lists. This parameter is used in fitting the offsets, rotations and/or scale changes from the matched object lists. The 'general' fit geometry allows for independent scale and rotation for each axis. This parameter is ignored if ``match`` is `False`. nclip: int, None, optional Number (a non-negative integer) of clipping iterations in fit. Clipping will be turned off if ``nclip`` is either `None` or 0. This parameter is ignored if ``match`` is `False`. sigma: float, tuple of the form (float, str), optional When a tuple is provided, first value (a positive number) indicates the number of "fit error estimates" to use for clipping. The second value (a string) indicates the statistic to be used for "fit error estimate". Currently the following values are supported: ``'rmse'``, ``'mae'``, and ``'std'`` - see `~tweakwcs.linearfit.iter_linear_fit` for more details. When ``sigma`` is a single number, it must be a positive number and the default error estimate ``'rmse'`` is assumed. This parameter is ignored when ``nclip`` is either `None` or 0 or when ``match`` is `False`. clip_accum: bool, optional Indicates whether or not to reset the list of "bad" (clipped out) sources after each clipping iteration. When set to `True` the list only grows with each iteration as "bad" positions never re-enter the pool of available position for the fit. By default the list of "bad" source positions is purged at each iteration. This parameter is ignored when ``nclip`` is either `None` or 0. Returns ------- bool Returns `True` if the number of matched sources is larger or equal to ``minobj`` and all steps have been performed, including catalog fitting and ``WCS`` "tweaking". If the number of matched sources is smaller than ``minobj``, this function will return `False`. """ if not self._images: name = "Unnamed" if self.name is None else self.name log.warning("WCSGroupCatalog '%s' is empty. Nothing to align.", name) return False # set initial status to 'FAILED': for imcat in self: imcat.fit_status = "FAILED: Unknown error" if minobj is None: minobj = SUPPORTED_FITGEOM_MODES[fitgeom] log.debug("Setting 'minobj' to %d for fitgeom='%s'", minobj, fitgeom) if ref_tpwcs is None: ref_tpwcs = deepcopy(self._images[0].corrector) self.calc_tanp_xy(tanplane_wcs=ref_tpwcs) refcat.calc_tanp_xy(tanplane_wcs=ref_tpwcs) nmatches, mref_idx, minput_idx = self.match2ref(refcat=refcat, match=match) if nmatches < minobj: name = "Unnamed" if self.name is None else self.name log.warning( "Not enough matches (%d) found for image catalog '%s'. Min required: %d.", nmatches, name, minobj, ) for imcat in self: imcat.fit_status = "FAILED: not enough matches" return False fit = self.fit2ref( refcat=refcat, tanplane_wcs=ref_tpwcs, fitgeom=fitgeom, nclip=nclip, sigma=sigma, clip_accum=clip_accum, ) fit_info = { "fitgeom": fitgeom, "eff_minobj": minobj, "matrix": fit["matrix"], "shift": fit["shift"], "center": fit["center"], # center of rotation in geom. transforms "fitmask": fit["fitmask"], # sources was used for fitting "proper_rot": fit["proper_rot"], # proper rotation "proper": fit["proper"], # is a proper rotation? True/False "rot": fit["rot"], # rotx, roty "<rot>": fit["<rot>"], # Arithmetic mean of rotx and roty "scale": fit["scale"], # sx, sy "<scale>": fit["<scale>"], # Geometric mean of sx, sy "skew": fit["skew"], # skew "rmse": fit["rmse"], # fit RMSE in tangent plane coords "mae": fit["mae"], # fit MAE in tangent plane coords "fit_RA": fit["fit_RA"], "fit_DEC": fit["fit_DEC"], "status": "SUCCESS", } if match is not None: fit_info.update( {"nmatches": nmatches, "matched_ref_idx": mref_idx, "matched_input_idx": minput_idx} ) self.apply_affine_to_wcs( ref_tpwcs=ref_tpwcs, matrix=fit["matrix"], shift=fit["shift"], # meta=meta ) for imcat in self: imcat.fit_info.update(deepcopy(fit_info)) self.recalc_catalog_radec() return True
[docs] class RefCatalog: """ An object that holds a reference catalog and provides tools for coordinate conversions using reference WCS as well as catalog manipulation and expansion. """ def __init__(self, catalog, name=None, footprint_tol=1.0): """ Parameters ---------- catalog: astropy.table.Table Reference catalog. .. note:: Reference catalogs (:py:class:`~astropy.table.Table`) *must* contain *both* ``'RA'`` and ``'DEC'`` columns. name: str, None, optional Name of the reference catalog. footprint_tol: float, optional Matching tolerance in arcsec. This is used to estimate catalog's footprint when catalog contains only one or two sources. """ self._name = name self._catalog = None self._footprint_tol = footprint_tol self._poly_area = None # make sure catalog has RA & DEC if catalog is not None: self.catalog = catalog def _check_catalog(self, catalog): if catalog is None: raise ValueError("Reference catalogs cannot be None") if "RA" not in catalog.colnames or "DEC" not in catalog.colnames: raise KeyError("Reference catalogs *must* contain *both* 'RA' and 'DEC' columns.") @property def name(self): """Get/set :py:class:`RefCatalog` object's name.""" return self._name @name.setter def name(self, name): self._name = name @property def catalog(self): """Get/set image's catalog.""" return self._catalog @catalog.setter def catalog(self, catalog): self._check_catalog(catalog) if not catalog: raise ValueError("Reference catalog must contain at least one source.") self._catalog = catalog.copy() if "id" not in self._catalog.colnames: self._catalog["id"] = np.arange(1, len(self._catalog) + 1) # create spherical polygon bounding the sources self.calc_bounding_polygon() @property def poly_area(self): """Area of the bounding polygon (in srad).""" return self._poly_area @property def polygon(self): """Get image's (or catalog's) bounding spherical polygon.""" return self._polygon
[docs] def intersection(self, wcsim): """ Compute intersection of this `WCSImageCatalog` object and another `WCSImageCatalog`, `WCSGroupCatalog`, `RefCatalog`, or :py:class:`~spherical_geometry.polygon.SphericalPolygon` object. Parameters ---------- wcsim: WCSImageCatalog, WCSGroupCatalog, RefCatalog, SphericalPolygon Another object that should be intersected with this `WCSImageCatalog`. Returns ------- polygon: SphericalPolygon A :py:class:`~spherical_geometry.polygon.SphericalPolygon` that is the intersection of this `WCSImageCatalog` and `wcsim`. """ if isinstance(wcsim, (WCSImageCatalog, WCSGroupCatalog, RefCatalog)): return self._polygon.intersection(wcsim.polygon) else: return self._polygon.intersection(wcsim)
# TODO: due to a bug in the sphere package, see # https://github.com/spacetelescope/sphere/issues/74 # intersections with polygons formed as union does not work. # For this reason I re-implement 'intersection_area' below with # a workaround for the bug. # The original implementation should be uncommented once the bug # is fixed. # # def intersection_area(self, wcsim): # """ Calculate the area of the intersection polygon. # """ # return np.fabs(self.intersection(wcsim).area())
[docs] def intersection_area(self, wcsim): """Calculate the area of the intersection polygon.""" if isinstance(wcsim, (WCSImageCatalog, RefCatalog)): return np.fabs(self.intersection(wcsim).area()) else: # this is bug workaround: area = 0.0 for wim in wcsim: area += np.fabs(self.polygon.intersection(wim.polygon).area()) return area
def _guarded_intersection_area(self, wcsim): """ Calculate the area of the intersection polygon. If some intersections fail due to a bug/limitation of ``spherical_geometry`` then the area of the valid intersections will be returned. If images do not intersect or intersection fails, 0 will be returned. In addition to the area, this function returns the number of times a failure in the ``spherical_geometry`` package occurred resulting in a ``MalformedPolygonError`` error. Return value: ``(area, nfailures)`` """ if isinstance(wcsim, (WCSImageCatalog, RefCatalog)): try: return np.fabs(self.intersection(wcsim).area()), 0 except MalformedPolygonError: return 0.0, 1 else: # this is bug workaround: area = 0.0 nfailures = 0 for wim in wcsim: try: area += np.fabs(self.polygon.intersection(wim.polygon).area()) except MalformedPolygonError: nfailures += 1 continue return area, nfailures def _calc_cat_convex_hull(self): """ Calculate spherical polygon corresponding to the convex hull bounding the sources in the catalog. """ if self.catalog is None: return # Find an "optimal" tangent plane to the catalog points based on the # mean point and then construct a WCS based on the mean point. # Compute x, y coordinates in this tangent plane based on the # previously computed WCS and return the set of x, y coordinates and # "reference WCS". x, y, z = _S2C(self.catalog["RA"], self.catalog["DEC"]) ra_ref, dec_ref = _C2S( x.mean(dtype=np.double), y.mean(dtype=np.double), z.mean(dtype=np.double) ) rotm = [ planar_rot_3d(np.deg2rad(alpha), 2 - axis) for axis, alpha in enumerate([ra_ref, dec_ref]) ] euler_rot = np.linalg.multi_dot(rotm) inv_euler_rot = inv(euler_rot) xr, yr, zr = np.dot(euler_rot, (x, y, z)) x = yr / xr y = zr / xr xv, yv = convex_hull(x, y, wcs=None, min_separation=1e-11) if len(xv) == 0: # no points raise RuntimeError( # pragma: no cover "Unexpected error: Contact software developer" ) elif len(xv) == 1: # one point. build a small box around it: tol = 0.5 * self._footprint_tol xv = [xv[0] - tol, xv[0] - tol, xv[0] + tol, xv[0] + tol, xv[0] - tol] yv = [yv[0] - tol, yv[0] + tol, yv[0] + tol, yv[0] - tol, yv[0] - tol] elif len(xv) == 2 or len(xv) == 3: # two points. build a small box around them: tol = 0.5 * self._footprint_tol vx = yv[1] - yv[0] vy = xv[1] - xv[0] norm = np.sqrt(vx * vx + vy * vy) vx /= norm vy /= norm xv = [ xv[0] - (vx - vy) * tol, xv[0] - (vx + vy) * tol, xv[1] + (vx - vy) * tol, xv[1] + (vx + vy) * tol, xv[0] - (vx - vy) * tol, ] yv = [ yv[0] - (vy + vx) * tol, yv[0] - (vy - vx) * tol, yv[1] + (vy + vx) * tol, yv[1] + (vy - vx) * tol, yv[0] - (vy + vx) * tol, ] # "unrotate" cartezian coordinates back to their original # ra_ref and dec_ref "positions": xt = np.ones_like(xv) xcr, ycr, zcr = np.dot(inv_euler_rot, (xt, xv, yv)).astype(np.double) # convert cartesian to spherical coordinates: ra, dec = _C2S(xcr, ycr, zcr) # TODO: for strange reasons, occasionally ra[0] != ra[-1] and/or # dec[0] != dec[-1] (even though we close the polygon in the # previous two lines). Then SphericalPolygon fails because # points are not closed. Therefore we force it to be closed: ra[-1] = ra[0] dec[-1] = dec[0] self._radec = [(ra, dec)] self._polygon = SphericalPolygon.from_radec(ra, dec) self._poly_area = np.fabs(self._polygon.area())
[docs] def calc_bounding_polygon(self): """Calculate bounding polygon of the sources in the catalog.""" # create spherical polygon bounding the sources self._calc_cat_convex_hull()
[docs] def expand_catalog(self, catalog): """ Expand current reference catalog with sources from another catalog. If current catalog is empty, then the catalog being added will become the new reference catalog. In this case if the ``catalog`` does have ``id`` column, those ID values will be preserved. If the ``catalog`` does not contain an ID column, then the new IDs will be assigned in increasing order starting with 1. If the existing reference catalog is not empty, then the IDs from the ``catalog`` being added will be discarded and new IDs will be assigned in the increasing order such as to continue the numbering of existing source positions in the reference catalog. Parameters ---------- catalog: astropy.table.Table A catalog of new sources to be added to the existing reference catalog. `catalog` *must* contain *both* ``'RA'`` and ``'DEC'`` columns. """ self._check_catalog(catalog) cat = catalog.copy() if self._catalog is None: self._catalog = cat if "id" not in self._catalog.colnames: # pragma: no branch self._catalog["id"] = np.arange(1, len(self._catalog) + 1) else: maxid = self.catalog["id"].max() oldlen = len(self.catalog) self._catalog = table.vstack( [self.catalog, cat], join_type="outer", metadata_conflicts="silent" ) # assign ids to the newly added source positions in consecutive # order above the largest id in the already existing catalog: self._catalog["id"][oldlen:] = np.arange(maxid + 1, maxid + len(cat) + 1) self.calc_bounding_polygon()
[docs] def calc_tanp_xy(self, tanplane_wcs): """ Compute x- and y-positions of the sources from the reference catalog in the tangent plane provided by the `tanplane_wcs`. This creates the following columns in the catalog's table: ``'TPx'`` and ``'TPy'``. Parameters ---------- tanplane_wcs: WCSCorrector A `WCSCorrector` object that will provide transformations to the tangent plane to which sources of this catalog a should be "projected". """ # compute x & y in the reference WCS: xtp, ytp = tanplane_wcs.world_to_tanp(self.catalog["RA"], self.catalog["DEC"]) self._catalog["TPx"] = table.MaskedColumn(xtp, name="TPx", dtype=np.double, mask=False) self._catalog["TPy"] = table.MaskedColumn(ytp, name="TPy", dtype=np.double, mask=False)
[docs] def convex_hull(x, y, wcs=None, min_separation=None): """Computes the convex hull of a set of 2D points. Implements `Andrew's monotone chain algorithm <http://en.wikibooks.org\ /wiki/Algorithm_Implementation/Geometry/Convex_hull/Monotone_chain>`_. The algorithm has O(n log n) complexity. Credit: `<http://en.wikibooks.org/wiki/Algorithm_Implementation/Geometry/\ Convex_hull/Monotone_chain>`_ Parameters ---------- x: list, tuple, numpy.ndarray An iterable sequence of ``x``-coordinates of points. y: list, tuple, numpy.ndarray An iterable sequence of ``y``-coordinates of points. wcs : function A function that takes two arguments (``x``, ``y``) and converts them to "world" coordinates. If provided, returned convex hull vertex coordnates will be in "world" coordinates. min_separation : None, float A non-negative number or `None`. When provided as a number, it specifies the minimum separation in both ``x`` and ``y`` coordinates between adjacent verices in the hull. Vertices too close to their neighbors will be removed. This operation is performed _before_ conversion to "world" coordinates. When ``min_separation`` is `None`, all vertices are kept. Returns ------- Output: list A list of vertices of the convex hull in counter-clockwise order, starting from the vertex with the lexicographically smallest coordinates. When a coordinate conversion function is supplied via the ``wcs`` argument, the returned values are those of the converted vertex coordinates. """ if min_separation is not None: if min_separation < 0: raise ValueError("'min_separation' must be non-negative or None.") def _format_return(arr1, arr2): return (arr1, arr2) if ndarray else (arr1.tolist(), arr2.tolist()) ndarray = isinstance(x, np.ndarray) or isinstance(y, np.ndarray) # Sort the points lexicographically (tuples are compared # lexicographically). Remove duplicates to detect the case we have # just one unique point. points = sorted(set(zip(x, y))) # Boring case: no points or a single point, # possibly repeated multiple times. if len(points) == 0: return _format_return(np.array([]), np.array([])) elif len(points) == 1: if ndarray: return _format_return(np.array([points[0][0]]), np.array([points[0][1]])) # 2D cross product of OA and OB vectors, i.e. z-component of their # 3D cross product. # Returns a positive value, if OAB makes a counter-clockwise turn, # negative for clockwise turn, and zero if the points are collinear. def cross(o, a, b): return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0]) # Build lower hull lower = [] for p in points: while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0: lower.pop() lower.append(p) # Build upper hull upper = [] for p in reversed(points): while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0: upper.pop() upper.append(p) # Concatenation of the lower and upper hulls gives the convex hull. # Last point of each list is omitted because it is repeated at the # beginning of the other list. total_hull = np.asanyarray(lower[:-1] + upper) ptx = total_hull[:, 0] pty = total_hull[:, 1] if min_separation is not None: # remove points that are too close to each other: idx = list(range(np.size(ptx))) for k in range(np.size(ptx) - 2, 0, -1): if ( np.abs(ptx[k] - ptx[k + 1]) <= min_separation and np.abs(pty[k] - pty[k + 1]) <= min_separation ): idx.pop(k) ptx = ptx[idx] pty = pty[idx] if wcs is None: return _format_return(ptx, pty) # convert x, y vertex coordinates to RA & DEC: ra, dec = wcs(ptx, pty) # TODO: for strange reasons, occasionally ra[0] != ra[-1] and/or # dec[0] != dec[-1] (even though we close the polygon in the # previous two lines). Then SphericalPolygon fails because # points are not closed. Therefore we force it to be closed: ra[-1] = ra[0] dec[-1] = dec[0] return _format_return(ra, dec)