Source code for subpixal.catalogs

"""
A module that manages catalogs and source finding algorithms (i.e.,
``SExtractor`` source finding).

:Author: Mihai Cara (for help, contact `HST Help Desk <https://hsthelp.stsci.edu>`_)

:License: :doc:`../LICENSE`

"""
from __future__ import (absolute_import, division, unicode_literals,
                        print_function)

import copy
import sys
import subprocess

import six
import numpy as np
from astropy.io import ascii as ascii_io

from stsci.skypac import parseat

from . utils import py2round


__all__ = ['SourceCatalog', 'SExCatalog', 'SExImageCatalog']


_INT_TYPE = (int, long,) if sys.version_info < (3,) else (int,)


def _is_int(n):
    return (
        (isinstance(n, _INT_TYPE) and not isinstance(n, bool)) or
        (isinstance(n, np.generic) and np.issubdtype(n, np.integer))
    )


[docs]class SourceCatalog(object): """ A class for handling catalog data: storing, filtering, and retrieving sources. """ _CMP_MAP = { '>': np.greater, '>=': np.greater_equal, '==': np.equal, '!=': np.not_equal, '<': np.less, '<=': np.less_equal } # NOTE: Keep all predefined catalog keys ("catalog type", e.g., # 'SExtractor') in _PREDEF_CATMAP in LOWER CASE! _PREDEF_CATMAP = { 'sextractor': { 'NUMBER':'id', 'X_IMAGE': 'x', 'Y_IMAGE': 'y', 'FLUX_PETRO': 'flux', 'PETRO_RADIUS': 'radius', 'A_IMAGE': 'prof-rms-a', 'B_IMAGE': 'prof-rms-b', 'FWHM_IMAGE': 'fwhm', 'CLASS_STAR': 'stellarity' } } def __init__(self): self._rawcat = None self._rawcat_colnames = None self._origin = 0 self._catalog = None self._mask = None self._dirty = False self._required_colnames = [ 'id', 'x', 'y', 'flux', 'radius', 'prof-rms-a', 'prof-rms-b', 'stellarity', 'fwhm' ] self._catmap = {i: i for i in self._required_colnames} self.set_default_filters() @property def predefined_catmaps(self): """ Get names of available (pre-defined) column name mappings. """ return list(self._PREDEF_CATMAP.keys())
[docs] def set_default_filters(self): """ Set default source selection criteria. """ self._filters = [ ('flux', '>', 0), ('fwhm', '>', 0), ('radius', '>', 0), ('prof-rms-a', '>', 0), ('prof-rms-b', '>', 0) ]
[docs] def mark_dirty(self): """ Mark the catalog as "dirty", indicating whether or not sources should be re-extracted from the raw catalog when ``execute()`` is run. Masking and filtering criteria will be applied to the raw catalog during this run of ``execute()``. """ self._dirty = True
[docs] def is_dirty(self): """ Returns the "dirty" status. When a catalog is marked as "dirty", sources must be (re-)extracted from the raw catalog. In order to update """ return self._dirty
[docs] def set_raw_catalog(self, rawcat, catmap=None, origin=0): """ Parameters ---------- rawcat : astropy.table.Table An :py:class:`~astropy.table.Table` containing source data. catmap : dict, str, optional A `dict` that provides mapping (a dictionary) between source's ``'x'``, ``'y'``, ``'flux'``, etc. and the corresponding column names in a :py:class:`~astropy.table.Table` catalog. Instead of a dictionary, this parameter may be a pre-defined mapping name supported by this class. To get the list of all pre-defined catalog mapping supported by this class, use `SourceCatalog`'s `predefined_catmaps` property. When ``catmap`` is `None`, no column name mapping will be applied. origin: int, float, optional Coordinates of the sources of `SourceCatalog.catalog` are zero-based. The ``origin`` is used for converting ``rawcat``'s coordinates when raw catalog's source coordinates are not zero-based. """ if rawcat is None: self._rawcat = None self._rawcat_colnames = None self._catalog = None self._mask = None self._mask_type = None self.set_default_filters() self._dirty = False return if catmap is None: catmap = {} elif isinstance(catmap, six.string_types): catmap = self._PREDEF_CATMAP[catmap.lower()] # make sure we have all required columns before we proceed any further: new_col_names = [catmap.get(k, k) for k in rawcat.colnames] if not all(i in new_col_names for i in self._required_colnames): raise ValueError("Input raw catalog does not include all the " "required columns.") self._rawcat = rawcat.copy() self._dirty = True self._origin = origin self._catmap = {} if catmap is not None: colnames = rawcat.colnames self._catmap.update(catmap) for k, v in catmap.items(): if k in colnames: self._rawcat.rename_column(k, v) # reset mask and filters: self._rawcat_colnames = new_col_names self.mask = None self.set_default_filters()
@property def rawcat(self): """ Get raw catalog. """ return self._rawcat.copy() @property def required_colnames(self): """ Get a list of the minimum column names that are *required* to be present in the raw catalog **after** catalog column name mapping has been applied. """ return self._required_colnames[:] @property def rawcat_colnames(self): """ Get a list of the column names in the raw catalog **after** catalog column name mapping has been applied. """ return self._rawcat_colnames[:] @property def catmap(self): """ Get raw catalog column name mapping. """ return {k: v for k, v in self._catmap} @property def mask_type(self): """ Get mask type: 'coords', 'image', or `None` (mask not set). """ return self._mask_type @property def mask(self): """ Get mask indicating "bad" (`True`) and "good" (`False`) sources when ``mask_type`` is ``'image'`` or a 2D array of shape ``(N, 2)`` containing integer coordinates of "bad" pixels. """ return None if self._mask is None else self._mask.copy() @mask.setter def mask(self, mask): """ Get/Set mask used to ignore (mask) "bad" sources from the raw catalog. Mask is a 2D image-like (but boolean) `numpy.ndarray` indicating "bad" pixels using value `True` (=ignore these pixels) and "good" pixels using the value `False` (=no need to mask). Parameters ---------- mask : str, tuple of two 1D lists of int, 2D numpy.ndarray A mask can be provided in several ways: - When ``mask`` is a string, it is assumed to be the name of a simple FITS file contaning a boolean mask indicating "bad" pixels using value `True` and "good" pixels using value `False` (=no need to mask). - ``mask`` can also be provided directly as a boolean 2D "image" in the form of a boolean `numpy.ndarray`. - Finally, ``mask`` can be a tuple of exactly two lists (or 1D `numpy.ndarray`) containing **integer** coordinates of the "pixels" to be masked as "bad". Any source with coordinates within such a "pixel" will be excluded from the catalog. """ if mask is None: if self._mask is not None: self._dirty = True self._mask = None self._mask_type = None return elif isinstance(mask, six.string_types): # open file and find image extensions: files = parse_cs_line(mask, default_ext='*', clobber=False, fnamesOnly=False, doNotOpenDQ=True, im_fmode='readonly', dq_fmode='readonly', msk_fmode='readonly', logfile=None, verbose=False) if len(files) > 1: for f in files: f.release_all_images() raise ValueError("Only a single file can be specified as mask") self._mask = np.array(files[0].image.hdu[files[0].fext].data, dtype=np.bool) self._mask_type = 'image' files[0].release_all_images() elif isinstance(mask, tuple): if len(mask) != 2: raise ValueError("When 'mask' is a tuple, it must contain " "two 1D lists of integer coordinates to be " "excluded from the catalog.") x, y = mask x = np.asarray(x) y = np.asarray(y) if len(x.shape) != 1 or x.shape != y.shape or not _is_int(x[0]) \ or not _is_int(y[0]): raise ValueError("When 'mask' is a tuple, it must contain " "two 1D lists of equal length of integer " "coordinates to be excluded from the " "catalog.") self._mask = np.array([x, y]).T self._mask_type = 'coords' else: mask = np.array(mask) if len(mask.shape) == 2 and mask.shape[1] == 2 and \ np.issubdtype(mask.dtype, np.integer): # we are dealing with a "list" of integer indices: self._mask = mask self._mask_type = 'coords' #nonneg = np.prod(mask >= 0, axis=1, dtype=np.bool) #mask = mask[nonneg] #badpix = tuple(np.fliplr(mask.T)) #self._mask = np.ones(np.max(mask, axis=0) + 1, dtype=np.bool) #self._mask[badpix] = False elif len(mask.shape) == 2 and np.issubdtype(mask.dtype, np.bool): # we are dealing with a boolean mask: self._mask = mask self._mask_type = 'image' else: raise ValueError("Unsupported mask type or format.") self._dirty = True
[docs] def set_filters(self, fcond): """ Set conditions for *selecting* sources from the raw catalog. Parameters ---------- fcond : tuple, list of tuples Each selection condition must be specified as a tuple of the form ``(colname, comp, value)`` where: - ``colname`` is a column name from the raw catalog **after** catalog column name mapping has been applied. Use `rawcat_colnames` to get a list of available column names. - ``comp`` is a **string** representing a comparison operator. The following operators are suported: ``['>', '>=', '==', '!=', '<', '<=']``. - ``value`` is a numeric value to be used for comparison of column values. Multiple selection conditions can be provided as a list of the condition tuples described above. """ if isinstance(fcond, list): filters = [] for f in fcond[::-1]: key, op, val = f[:3] op = ''.join(op.split()) idxs = self._find_filters(filters, key, op) if idxs is None: filters.insert(0, (key, op, val)) elif isinstance(fcond, tuple): key, op, val = fcond[:3] op = ''.join(op.split()) filters = [(key, op, val)] else: raise TypeError("'fcond' must be a tuple or a list of tuples.") if self._filters != filters: self._dirty = True self._filters = filters
[docs] def reset_filters(self): """ Remove selection filters. """ self._dirty = len(self._filters) > 0 self._filters = []
[docs] def append_filters(self, fcond): """ Add one or more conditions for *selecting* sources from the raw catalog to already set filters. See :py:meth:`set_filters` for description of parameter ``fcond``. """ self._dirty = True if isinstance(fcond, list): for f in fcond: key, op, val = f[:3] op = ''.join(op.split()) flt = (key, op, val) idxs = self._find_filters(self._filters, key, op) if idxs is not None: for i in idxs: del self._filters[i] self._filters.append((key, op, val)) elif isinstance(fcond, tuple): key, op, val = fcond[:3] op = ''.join(op.split()) idxs = self._find_filters(self._filters, key, op) if idxs is not None: for i in idxs: del self._filters[i] self._filters.append((key, op, val)) self._dirty = True else: raise TypeError("'fcond' must be a tuple or a list of tuples.")
@staticmethod def _find_filters(filters, key, op=None): idxs = [] for i, (k, o, _) in enumerate(filters): if k == key and (op is None or o == op): idxs.append(i) return idxs if idxs else None
[docs] def remove_filter(self, key, op=None): """ Remove a specific filter by column name and, optionally, by comparison operator. Parameters ---------- key : str Column name to which selection criteria (filter) is applied. If more conditions match a column name vale, all of them will be removed. op : str, optional Specifies the comparison operation used in a filter. This allows narrowing down which filters should be removed. """ idxs = self._find_filters(self._filters, key, op) if idxs is None: return for idx in idxs[::-1]: del self._filters[idx] self._dirty = True
@property def filters(self): """ Get a list of all active selection filters. """ return self._filters[:] @classmethod def _op2cmp(cls, op): op = ''.join(op.split()) return cls._CMP_MAP[op]
[docs] def execute(self): """ Compute catalog applying masks and selecting only sources that satisfy all set filters. """ if not self._dirty: return # start with the original "raw" catalog: catalog = self._rawcat.copy() # remove sources with masked values: if catalog.masked: mask = np.zeros(len(catalog), dtype=np.bool) for c in catalog.itercols: mask = np.logical_or(mask, c.mask) # create a new catalog having only the "good" data without mask: catalog = catalog.__class__(catalog[np.logical_not(mask)], masked=False) # correct for 'origin': catalog['x'] -= self._origin catalog['y'] -= self._origin xi = _py2round(np.asarray(catalog['x'])).astype(np.int) yi = _py2round(np.asarray(catalog['y'])).astype(np.int) # apply mask: if self._mask is None: mask = np.ones(xi.size, dtype=np.bool) elif self._mask_type == 'coords': mask = np.logical_not( [np.any(np.all(np.equal(self._mask, p), axis=1)) for p in np.array([xi, yi]).T] ) elif self._mask_type == 'image': ymmax, xmmax = self._mask.shape mm = (xi >= 0) & (xi < xmmax) & (yi >= 0) & (yi < ymmax) mask = np.array( [np.logical_not(self._mask[i, j]) if m else False for m, i, j in zip(mm, yi, xi)] ) else: raise ValueError("Unexpected value of '_mask_type'. Contact " "software developer.") # apply filters: for f in self._filters: key, op, val = f[:3] if key in catalog.colnames: mask *= self._op2cmp(op)(catalog[key], val) catalog = catalog[mask] xi = xi[mask] yi = yi[mask] calc = np.asarray(catalog['flux'])**2 / np.asarray(catalog['fwhm']) Rup = _py2round(np.asarray(catalog['radius'])).astype(np.int) Aup = _py2round(np.asarray(catalog['prof-rms-a'])).astype(np.int) Rlg = 2 * Rup * Aup xs = xi - Rlg xe = xi + Rlg ys = yi - Rlg ye = yi + Rlg catalog['xi'] = xi catalog['yi'] = yi catalog['xs'] = xs catalog['xe'] = xe catalog['ys'] = ys catalog['ye'] = ye catalog['calc'] = calc self._catalog = catalog self._dirty = False
@property def catalog(self): """ Get catalog (after applying masks and selection filters). """ if self._dirty: self.execute() return self._catalog
[docs]class SExCatalog(SourceCatalog): """ A catalog class specialized for handling ``SExtractor`` output catalogs, such as being able to load raw ``SExtractor`` catalogs directly from text files. Parameters ---------- rawcat : astropy.table.Table, str An :py:class:`~astropy.table.Table` containing source data or a ``SExtractor``-generated text file name. max_stellarity : float, optional Maximum stellarity for selecting sources from the catalog. """ def __init__(self, rawcat=None, max_stellarity=1.0): self._max_stellarity = max_stellarity super().__init__() self._origin = 1 self.set_raw_catalog(rawcat)
[docs] def set_default_filters(self): """ Sets default filters for selecting sources from the raw catalog. Default selection criteria are: ``flux > 0`` AND ``fwhm > 0`` AND ``radius > 0`` AND ``prof-rms-a > 0`` AND ``prof-rms-b > 0`` AND ``stellarity <= max_stellarity``. """ self._filters = [ ('flux', '>', 0), ('fwhm', '>', 0), ('radius', '>', 0), ('prof-rms-a', '>', 0), ('prof-rms-b', '>', 0), ('stellarity', '<=', self._max_stellarity) ]
[docs] def set_raw_catalog(self, rawcat): """ Set/replace raw catalog. Parameters ---------- rawcat : astropy.table.Table, str An :py:class:`~astropy.table.Table` containing source data or a ``SExtractor``-generated text file name. """ if isinstance(rawcat, six.string_types): rawcat = ascii_io.read(rawcat, guess=False, format='sextractor') super().set_raw_catalog(rawcat, catmap='sextractor', origin=1)
[docs]class SExImageCatalog(SExCatalog): """ A catalog class specialized for finding sources in images using ``SExtractor`` and then loading raw ``SExtractor`` catalogs directly from output text files. Parameters ---------- image : str A ``FITS`` image file name. sexconfig : str File name of the ``SExtractor`` configuration file to be used for finding sources in the ``image``. max_stellarity : float, optional Maximum stellarity for selecting sources from the catalog. sextractor_cmd : str, optional Command to invoke ``SExtractor``. """ def __init__(self, image=None, sexconfig=None, max_stellarity=1.0, sextractor_cmd='sex'): self._max_stellarity = max_stellarity self._sextractor_cmd = sextractor_cmd super().__init__(rawcat=None, max_stellarity=max_stellarity) self._catname = None self.sexconfig = sexconfig self.image = image # _dirty_img indicates that either image or SExtractor configuration # file (or both) have changed and that a re-extraction of sources # is needed. self._dirty_img = not (image is None or sexconfig is None) def _find_sources(self): # returns exit status of the child (i.e., 'sex') process # returns None if either image or sexconfig are not set: if self._image is None or self._sexconfig is None: return None cmd = [self._sextractor_cmd, self.image, '-c', self._sexconfig] # popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, # stderr=subprocess.PIPE) # out, err = popen.communicate() return subprocess.call(cmd) @property def image(self): """ Get image. """ return self._image @image.setter def image(self, image): """ Set/Get image file name. """ if image is None: super().set_raw_catalog(rawcat=None) self._dirty_img = False return self._image = image self._dirty_img = self.sexconfig is not None @property def sexconfig(self): """ Get ``SExtractor`` configuration file. """ return self._sexconfig @sexconfig.setter def sexconfig(self, sexconfig): """ Set/Get ``SExtractor`` configuration file. """ self._sexconfig = sexconfig if sexconfig is None: super().set_raw_catalog(rawcat=None) self._catname = None self._dirty_img = False self._dirty = False else: # find output catalog file name: self._catname = SExImageCatalog._get_catname(sexconfig) self.set_raw_catalog(self._catname) self._dirty_img = True self._dirty = True @staticmethod def _get_catname(sexconfig): catname = None with open(sexconfig, mode='r') as f: line = f.readline() while line: line = line.strip() if not line or line.startswith('#'): line = f.readline() continue # retrieve the part before inline comments cfgpar = line.split('#')[0] cfgpar = cfgpar.strip() if len(cfgpar) > 1: parname, parval = cfgpar.split()[:2] if parname.upper().startswith('CATALOG_NAME'): catname = parval.strip() break line = f.readline() if catname is None: raise ValueError( "Unable to retrieve SExtractor catalog name from the " "provided SExtractor configuration file '{:s}'." .format(sexconfig) ) return catname @staticmethod def _get_checkname(sexconfig, check_image_type): segname = None check_types = None check_files = None with open(sexconfig, mode='r') as f: line = f.readline() while line and (check_types is None or check_files is None): line = line.strip() if not line or line.startswith('#'): line = f.readline() continue # retrieve the part before inline comments cfgpar = line.split('#')[0] cfgpar = cfgpar.strip() if len(cfgpar) > 1: parname, parval = cfgpar.split()[:2] if cfgpar.upper().startswith('CHECKIMAGE_TYPE'): check_types = list(map(str.upper, map( str.strip, cfgpar[len('CHECKIMAGE_TYPE'):].split(',') ))) elif cfgpar.upper().startswith('CHECKIMAGE_NAME'): check_files = list(map( str.strip, cfgpar[len('CHECKIMAGE_NAME'):].split(',') )) line = f.readline() if check_types is None or check_files is None: return None try: idx = check_types.index(check_image_type) except ValueError: return None return check_files[idx] @property def segmentation_file(self): """ Get segmentation file name stored in the ``SExtractor``'s configuration file or `None`. """ return SExImageCatalog._get_segname( self._sexconfig, check_image_type='SEGMENTATION' )
[docs] def execute(self): if self._dirty_img: try: retcode = self._find_sources() if retcode is None: return if retcode < 0: print('SExtractor was terminated by signal {:d}' .format(-retcode)) return elif retcode > 0: print('SExtractor returned {:d}'.format(retcode)) return except OSError as e: print('SExtractor execution failed: {}'.format(e)) return self._dirty_img = False super().execute()