summaryrefslogtreecommitdiff
path: root/stwcs/wcsutil/headerlet.py
diff options
context:
space:
mode:
Diffstat (limited to 'stwcs/wcsutil/headerlet.py')
-rw-r--r--stwcs/wcsutil/headerlet.py2754
1 files changed, 2754 insertions, 0 deletions
diff --git a/stwcs/wcsutil/headerlet.py b/stwcs/wcsutil/headerlet.py
new file mode 100644
index 0000000..c0dd9b0
--- /dev/null
+++ b/stwcs/wcsutil/headerlet.py
@@ -0,0 +1,2754 @@
+"""
+This module implements headerlets.
+
+A headerlet serves as a mechanism for encapsulating WCS information
+which can be used to update the WCS solution of an image. The idea
+came up first from the desire for passing improved astrometric
+solutions for HST data and provide those solutions in a manner
+that would not require getting entirely new images from the archive
+when only the WCS information has been updated.
+
+"""
+
+from __future__ import absolute_import, division, print_function
+import os
+import sys
+import functools
+import logging
+import textwrap
+import copy
+import time
+
+import numpy as np
+from astropy.io import fits
+#import pywcs
+from astropy import wcs as pywcs
+from astropy.utils import lazyproperty
+
+from stsci.tools.fileutil import countExtn
+from stsci.tools import fileutil as fu
+from stsci.tools import parseinput
+
+from stwcs.updatewcs import utils
+from . import altwcs
+from . import wcscorr
+from .hstwcs import HSTWCS
+from .mappings import basic_wcs
+
+#### Logging support functions
+class FuncNameLoggingFormatter(logging.Formatter):
+ def __init__(self, fmt=None, datefmt=None):
+ if '%(funcName)s' not in fmt:
+ fmt = '%(funcName)s' + fmt
+ logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt)
+
+ def format(self, record):
+ record = copy.copy(record)
+ if hasattr(record, 'funcName') and record.funcName == 'init_logging':
+ record.funcName = ''
+ else:
+ record.funcName += ' '
+ return logging.Formatter.format(self, record)
+
+
+logger = logging.getLogger(__name__)
+formatter = FuncNameLoggingFormatter("%(levelname)s: %(message)s")
+ch = logging.StreamHandler()
+ch.setFormatter(formatter)
+ch.setLevel(logging.CRITICAL)
+logger.addHandler(ch)
+logger.setLevel(logging.DEBUG)
+
+FITS_STD_KW = ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT',
+ 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN',
+ 'INHERIT', 'DATE', 'IRAF-TLM']
+
+DEFAULT_SUMMARY_COLS = ['HDRNAME', 'WCSNAME', 'DISTNAME', 'AUTHOR', 'DATE',
+ 'SIPNAME', 'NPOLFILE', 'D2IMFILE', 'DESCRIP']
+COLUMN_DICT = {'vals': [], 'width': []}
+COLUMN_FMT = '{:<{width}}'
+
+
+def init_logging(funcname=None, level=100, mode='w', **kwargs):
+ """
+
+ Initialize logging for a function
+
+ Parameters
+ ----------
+ funcname: string
+ Name of function which will be recorded in log
+ level: int, or bool, or string
+ int or string : Logging level
+ bool: False - switch off logging
+ Text logging level for the message ("DEBUG", "INFO",
+ "WARNING", "ERROR", "CRITICAL")
+ mode: 'w' or 'a'
+ attach to logfile ('a' or start a new logfile ('w')
+
+ """
+ for hndl in logger.handlers:
+ if isinstance(hndl, logging.FileHandler):
+ has_file_handler = True
+ else:
+ has_file_handler = False
+ if level:
+ if not has_file_handler:
+ logname = 'headerlet.log'
+ fh = logging.FileHandler(logname, mode=mode)
+ fh.setFormatter(formatter)
+ fh.setLevel(logging.DEBUG)
+ logger.addHandler(fh)
+ logger.info("%s: Starting %s with arguments:\n\t %s" %
+ (time.asctime(), funcname, kwargs))
+
+
+def with_logging(func):
+ @functools.wraps(func)
+ def wrapped(*args, **kw):
+ level = kw.get('logging', 100)
+ mode = kw.get('logmode', 'w')
+ func_args = kw.copy()
+ if sys.version_info[0] >= 3:
+ argnames = func.__code__.co_varnames
+ else:
+ argnames = func.func_code.co_varnames
+
+ for argname, arg in zip(argnames, args):
+ func_args[argname] = arg
+
+ init_logging(func.__name__, level, mode, **func_args)
+ return func(*args, **kw)
+ return wrapped
+
+#### Utility functions
+def is_par_blank(par):
+ return par in ['', ' ', 'INDEF', "None", None]
+
+def parse_filename(fname, mode='readonly'):
+ """
+ Interprets the input as either a filename of a file that needs to be opened
+ or a PyFITS object.
+
+ Parameters
+ ----------
+ fname : str, `astropy.io.fits.HDUList`
+ Input pointing to a file or `astropy.io.fits.HDUList` object.
+ An input filename (str) will be expanded as necessary to
+ interpret any environmental variables
+ included in the filename.
+
+ mode : string
+ Specifies what mode to use when opening the file, if it needs
+ to open the file at all [Default: 'readonly']
+
+ Returns
+ -------
+ fobj : `astropy.io.fits.HDUList`
+ FITS file handle for input
+
+ fname : str
+ Name of input file
+
+ close_fobj : bool
+ Flag specifying whether or not fobj needs to be closed since it was
+ opened by this function. This allows a program to know whether they
+ need to worry about closing the FITS object as opposed to letting
+ the higher level interface close the object.
+
+ """
+ close_fobj = False
+ if not isinstance(fname, list):
+ if sys.version_info[0] >= 3:
+ is_string = isinstance(fname, str)
+ else:
+ is_string = isinstance(fname, basestring)
+ if is_string:
+ fname = fu.osfn(fname)
+ fobj = fits.open(fname, mode=mode)
+ close_fobj = True
+ else:
+ fobj = fname
+ if hasattr(fobj, 'filename'):
+ fname = fobj.filename()
+ else:
+ fname = ''
+ return fobj, fname, close_fobj
+
+def get_headerlet_kw_names(fobj, kw='HDRNAME'):
+ """
+ Returns a list of specified keywords from all HeaderletHDU
+ extensions in a science file.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ kw : str
+ Name of keyword to be read and reported
+ """
+
+ fobj, fname, open_fobj = parse_filename(fobj)
+
+ hdrnames = []
+ for ext in fobj:
+ if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
+ hdrnames.append(ext.header[kw])
+
+ if open_fobj:
+ fobj.close()
+
+ return hdrnames
+
+def get_header_kw_vals(hdr, kwname, kwval, default=0):
+ if kwval is None:
+ if kwname in hdr:
+ kwval = hdr[kwname]
+ else:
+ kwval = default
+ return kwval
+
+@with_logging
+def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
+ strict=True, logging=False, logmode='w'):
+ """
+ Returns all HeaderletHDU extensions in a science file that matches
+ the inputs specified by the user. If no hdrext, hdrname or distname are
+ specified, this function will return a list of all HeaderletHDU objects.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ Name of FITS file or open fits object (`astropy.io.fits.HDUList` instance)
+ hdrext : int, tuple or None
+ index number(EXTVER) or extension tuple of HeaderletHDU to be returned
+ hdrname : string
+ value of HDRNAME for HeaderletHDU to be returned
+ distname : string
+ value of DISTNAME for HeaderletHDUs to be returned
+ strict : bool [Default: True]
+ Specifies whether or not at least one parameter needs to be provided
+ If False, all extension indices returned if hdrext, hdrname and distname
+ are all None. If True and hdrext, hdrname, and distname are all None,
+ raise an Exception requiring one to be specified.
+ logging : boolean
+ enable logging to a file called headerlet.log
+ logmode : 'w' or 'a'
+ log file open mode
+
+ Returns
+ -------
+ hdrlets : list
+ A list of all matching HeaderletHDU extension indices (could be just one)
+
+ """
+
+ get_all = False
+ if hdrext is None and hdrname is None and distname is None:
+ if not strict:
+ get_all = True
+ else:
+ mess = """\n
+ =====================================================
+ No valid Headerlet extension specified.
+ Either "hdrname", "hdrext", or "distname" needs to be specified.
+ =====================================================
+ """
+ logger.critical(mess)
+ raise ValueError
+
+ fobj, fname, open_fobj = parse_filename(fobj)
+
+ hdrlets = []
+ if hdrext is not None and isinstance(hdrext, int):
+ if hdrext in range(len(fobj)): # insure specified hdrext is in fobj
+ if isinstance(fobj[hdrext], fits.hdu.base.NonstandardExtHDU) and \
+ fobj[hdrext].header['EXTNAME'] == 'HDRLET':
+ hdrlets.append(hdrext)
+ else:
+ for ext in fobj:
+ if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
+ if get_all:
+ hdrlets.append(fobj.index(ext))
+ else:
+ if hdrext is not None:
+ if isinstance(hdrext, tuple):
+ hdrextname = hdrext[0]
+ hdrextnum = hdrext[1]
+ else:
+ hdrextname = 'HDRLET'
+ hdrextnum = hdrext
+ hdrext_match = ((hdrext is not None) and
+ (hdrextnum == ext.header['EXTVER']) and
+ (hdrextname == ext.header['EXTNAME']))
+ hdrname_match = ((hdrname is not None) and
+ (hdrname == ext.header['HDRNAME']))
+ distname_match = ((distname is not None) and
+ (distname == ext.header['DISTNAME']))
+ if hdrext_match or hdrname_match or distname_match:
+ hdrlets.append(fobj.index(ext))
+
+ if open_fobj:
+ fobj.close()
+
+ if len(hdrlets) == 0:
+ if hdrname:
+ kwerr = 'hdrname'
+ kwval = hdrname
+ elif hdrext:
+ kwerr = 'hdrext'
+ kwval = hdrext
+ else:
+ kwerr = 'distname'
+ kwval = distname
+ message = """\n
+ =====================================================
+ No valid Headerlet extension found!'
+ "%s" = %s not found in %s.' % (kwerr, kwval, fname)
+ =====================================================
+ """
+ logger.critical(message)
+ raise ValueError
+
+ return hdrlets
+
+def verify_hdrname_is_unique(fobj, hdrname):
+ """
+ Verifies that no other HeaderletHDU extension has the specified hdrname.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ Name of FITS file or open fits file object
+ hdrname : str
+ value of HDRNAME for HeaderletHDU to be compared as unique
+
+ Returns
+ -------
+ unique: bool
+ If True, no other HeaderletHDU has the specified HDRNAME value
+ """
+ hdrnames_list = get_headerlet_kw_names(fobj)
+ unique = not(hdrname in hdrnames_list)
+
+ return unique
+
+def update_versions(sourcehdr, desthdr):
+ """
+ Update keywords which store version numbers
+ """
+ phdukw = {'PYWCSVER': 'Version of PYWCS used to updated the WCS',
+ 'UPWCSVER': 'Version of STWCS used to updated the WCS'
+ }
+ for key in phdukw:
+ try:
+ desthdr[key] = (sourcehdr[key], sourcehdr.comments[key])
+ except KeyError:
+ desthdr[key] = (" ", phdukw[key])
+
+def update_ref_files(source, dest):
+ """
+ Update the reference files name in the primary header of 'dest'
+ using values from 'source'
+
+ Parameters
+ ----------
+ source : `astropy.io.fits.Header`
+ dest : `astropy.io.fits.Header`
+ """
+ logger.info("Updating reference files")
+ phdukw = {'NPOLFILE': True,
+ 'IDCTAB': True,
+ 'D2IMFILE': True,
+ 'SIPNAME': True,
+ 'DISTNAME': True}
+
+ for key in phdukw:
+ try:
+ try:
+ del dest[key]
+ except:
+ pass
+ dest.set(key, source[key], source.comments[key])
+ except KeyError:
+ phdukw[key] = False
+ return phdukw
+
+def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
+ output=None, clobber=True, quiet=False ):
+ """
+ Print out summary dictionary to STDOUT, and possibly an output file
+
+ """
+ nrows = None
+ if idcol:
+ nrows = len(idcol['vals'])
+
+ # Find max width of each column
+ column_widths = {}
+ for kw in summary_dict:
+ colwidth = np.array(summary_dict[kw]['width']).max()
+ if maxwidth:
+ colwidth = min(colwidth, maxwidth)
+ column_widths[kw] = colwidth + pad
+ if nrows is None:
+ nrows = len(summary_dict[kw]['vals'])
+
+ # print rows now
+ outstr = ''
+ # Start with column names
+ if idcol:
+ outstr += COLUMN_FMT.format(idcol['name'], width=idcol['width'] + pad)
+ for kw in summary_cols:
+ outstr += COLUMN_FMT.format(kw, width=column_widths[kw])
+ outstr += '\n'
+ # Now, add a row for each headerlet
+ for row in range(nrows):
+ if idcol:
+ outstr += COLUMN_FMT.format(idcol['vals'][row],
+ width=idcol['width']+pad)
+ for kw in summary_cols:
+ val = summary_dict[kw]['vals'][row][:(column_widths[kw]-pad)]
+ outstr += COLUMN_FMT.format(val, width=column_widths[kw])
+ outstr += '\n'
+ if not quiet:
+ print(outstr)
+
+ # If specified, write info to separate text file
+ write_file = False
+ if output:
+ output = fu.osfn(output) # Expand any environment variables in filename
+ write_file = True
+ if os.path.exists(output):
+ if clobber:
+ os.remove(output)
+ else:
+ print('WARNING: Not writing results to file!')
+ print(' Output text file ', output, ' already exists.')
+ print(' Set "clobber" to True or move file before trying again.')
+ write_file = False
+ if write_file:
+ fout = open(output, mode='w')
+ fout.write(outstr)
+ fout.close()
+
+#### Private utility functions
+def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
+ sipname, npolfile, d2imfile,
+ nmatch,catalog, wcskey,
+ author, descrip, history):
+ # convert input values into valid FITS kw values
+ if author is None:
+ author = ''
+ if descrip is None:
+ descrip = ''
+
+ sipname, idctab = utils.build_sipname(fobj, fname, sipname)
+ logger.info("Setting sipname value to %s" % sipname)
+
+ npolname, npolfile = utils.build_npolname(fobj, npolfile)
+ logger.info("Setting npolfile value to %s" % npolname)
+
+ d2imname, d2imfile = utils.build_d2imname(fobj,d2imfile)
+ logger.info("Setting d2imfile value to %s" % d2imname)
+
+ distname = utils.build_distname(sipname, npolname, d2imname)
+ logger.info("Setting distname to %s" % distname)
+
+ # open file and parse comments
+ if history not in ['', ' ', None, 'INDEF'] and os.path.isfile(history):
+ f = open(fu.osfn(history))
+ history = f.readlines()
+ f.close()
+ else:
+ history = ''
+
+ rms_ra = fobj[wcsext].header.get("CRDER1"+wcskey, 0)
+ rms_dec = fobj[wcsext].header.get("CRDER2"+wcskey, 0)
+ if not nmatch:
+ nmatch = fobj[wcsext].header.get("NMATCH"+wcskey, 0)
+ if not catalog:
+ catalog = fobj[wcsext].header.get('CATALOG'+wcskey, "")
+ # get the version of STWCS used to create the WCS of the science file.
+ #try:
+ #upwcsver = fobj[0].header.cards[fobj[0].header.index('UPWCSVER')]
+ #except KeyError:
+ #upwcsver = pyfits.Card("UPWCSVER", " ",
+ #"Version of STWCS used to update the WCS")
+ #try:
+ #pywcsver = fobj[0].header.cards[fobj[0].header.index('PYWCSVER')]
+ #except KeyError:
+ #pywcsver = pyfits.Card("PYWCSVER", " ",
+ #"Version of PYWCS used to update the WCS")
+ upwcsver = fobj[0].header.get('UPWCSVER', "")
+ pywcsver = fobj[0].header.get('PYWCSVER', "")
+ # build Primary HDU
+ phdu = fits.PrimaryHDU()
+ phdu.header['DESTIM'] = (destim, 'Destination observation root name')
+ phdu.header['HDRNAME'] = (hdrname, 'Headerlet name')
+ fmt = "%Y-%m-%dT%H:%M:%S"
+ phdu.header['DATE'] = (time.strftime(fmt), 'Date FITS file was generated')
+ phdu.header['WCSNAME'] = (wcsname, 'WCS name')
+ phdu.header['DISTNAME'] = (distname, 'Distortion model name')
+ phdu.header['SIPNAME'] = (sipname,
+ 'origin of SIP polynomial distortion model')
+ phdu.header['NPOLFILE'] = (npolfile,
+ 'origin of non-polynmial distortion model')
+ phdu.header['D2IMFILE'] = (d2imfile,
+ 'origin of detector to image correction')
+ phdu.header['IDCTAB'] = (idctab,
+ 'origin of Polynomial Distortion')
+ phdu.header['AUTHOR'] = (author, 'headerlet created by this user')
+ phdu.header['DESCRIP'] = (descrip,
+ 'Short description of headerlet solution')
+ phdu.header['RMS_RA'] = (rms_ra,
+ 'RMS in RA at ref pix of headerlet solution')
+ phdu.header['RMS_DEC'] = (rms_dec,
+ 'RMS in Dec at ref pix of headerlet solution')
+ phdu.header['NMATCH'] = (nmatch,
+ 'Number of sources used for headerlet solution')
+ phdu.header['CATALOG'] = (catalog,
+ 'Astrometric catalog used for headerlet '
+ 'solution')
+ phdu.header['UPWCSVER'] = (upwcsver, "Version of STWCS used to update the WCS")
+ phdu.header['PYWCSVER'] = (pywcsver, "Version of PYWCS used to update the WCS")
+
+ # clean up history string in order to remove whitespace characters that
+ # would cause problems with FITS
+ if isinstance(history, list):
+ history_str = ''
+ for line in history:
+ history_str += line
+ else:
+ history_str = history
+ history_lines = textwrap.wrap(history_str, width=70)
+ for hline in history_lines:
+ phdu.header.add_history(hline)
+
+ return phdu
+
+
+#### Public Interface functions
+@with_logging
+def extract_headerlet(filename, output, extnum=None, hdrname=None,
+ clobber=False, logging=True):
+ """
+ Finds a headerlet extension in a science file and writes it out as
+ a headerlet FITS file.
+
+ If both hdrname and extnum are given they should match, if not
+ raise an Exception
+
+ Parameters
+ ----------
+ filename: string or HDUList or Python list
+ This specifies the name(s) of science file(s) from which headerlets
+ will be extracted.
+
+ String input formats supported include use of wild-cards, IRAF-style
+ '@'-files (given as '@<filename>') and comma-separated list of names.
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ If a list of filenames has been specified, it will extract a
+ headerlet from the same extnum from all filenames.
+ output: string
+ Filename or just rootname of output headerlet FITS file
+ If string does not contain '.fits', it will create a filename with
+ '_hlet.fits' suffix
+ extnum: int
+ Extension number which contains the headerlet to be written out
+ hdrname: string
+ Unique name for headerlet, stored as the HDRNAME keyword
+ It stops if a value is not provided and no extnum has been specified
+ clobber: bool
+ If output file already exists, this parameter specifies whether or not
+ to overwrite that file [Default: False]
+ logging: boolean
+ enable logging to a file
+
+ """
+
+ if isinstance(filename, fits.HDUList):
+ filename = [filename]
+ else:
+ filename, oname = parseinput.parseinput(filename)
+
+ for f in filename:
+ fobj, fname, close_fobj = parse_filename(f)
+ frootname = fu.buildNewRootname(fname)
+ if hdrname in ['', ' ', None, 'INDEF'] and extnum is None:
+ if close_fobj:
+ fobj.close()
+ logger.critical("Expected a valid extnum or hdrname parameter")
+ raise ValueError
+ if hdrname is not None:
+ extn_from_hdrname = find_headerlet_HDUs(fobj, hdrname=hdrname)[0]
+ if extn_from_hdrname != extnum:
+ logger.critical("hdrname and extnmu should refer to the same FITS extension")
+ raise ValueError
+ else:
+ hdrhdu = fobj[extn_from_hdrname]
+ else:
+ hdrhdu = fobj[extnum]
+
+ if not isinstance(hdrhdu, HeaderletHDU):
+ logger.critical("Specified extension is not a headerlet")
+ raise ValueError
+
+ hdrlet = hdrhdu.headerlet
+
+ if output is None:
+ output = frootname
+
+ if '.fits' in output:
+ outname = output
+ else:
+ outname = '%s_hlet.fits' % output
+
+ hdrlet.tofile(outname, clobber=clobber)
+
+ if close_fobj:
+ fobj.close()
+
+
+@with_logging
+def write_headerlet(filename, hdrname, output=None, sciext='SCI',
+ wcsname=None, wcskey=None, destim=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ attach=True, clobber=False, logging=False):
+
+ """
+ Save a WCS as a headerlet FITS file.
+
+ This function will create a headerlet, write out the headerlet to a
+ separate headerlet file, then, optionally, attach it as an extension
+ to the science image (if it has not already been archived)
+
+ Either wcsname or wcskey must be provided; if both are given, they must
+ match a valid WCS.
+
+ Updates wcscorr if necessary.
+
+ Parameters
+ ----------
+ filename: string or HDUList or Python list
+ This specifies the name(s) of science file(s) from which headerlets
+ will be created and written out.
+ String input formats supported include use of wild-cards, IRAF-style
+ '@'-files (given as '@<filename>') and comma-separated list of names.
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string
+ Unique name for this headerlet, stored as HDRNAME keyword
+ output: string or None
+ Filename or just rootname of output headerlet FITS file
+ If string does not contain '.fits', it will create a filename
+ starting with the science filename and ending with '_hlet.fits'.
+ If None, a default filename based on the input filename will be
+ generated for the headerlet FITS filename
+ sciext: string
+ name (EXTNAME) of extension that contains WCS to be saved
+ wcsname: string
+ name of WCS to be archived, if " ": stop
+ wcskey: one of A...Z or " " or "PRIMARY"
+ if " " or "PRIMARY" - archive the primary WCS
+ destim: string
+ DESTIM keyword
+ if NOne, use ROOTNAME or science file name
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ stored. If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ author: string
+ Name of user who created the headerlet, added as 'AUTHOR' keyword
+ to headerlet PRIMARY header
+ descrip: string
+ Short description of the solution provided by the headerlet
+ This description will be added as the single 'DESCRIP' keyword
+ to the headerlet PRIMARY header
+ history: filename, string or list of strings
+ Long (possibly multi-line) description of the solution provided
+ by the headerlet. These comments will be added as 'HISTORY' cards
+ to the headerlet PRIMARY header
+ If filename is specified, it will format and attach all text from
+ that file as the history.
+ attach: bool
+ Specify whether or not to attach this headerlet as a new extension
+ It will verify that no other headerlet extension has been created with
+ the same 'hdrname' value.
+ clobber: bool
+ If output file already exists, this parameter specifies whether or not
+ to overwrite that file [Default: False]
+ logging: boolean
+ enable file logging
+ """
+
+ if isinstance(filename, fits.HDUList):
+ filename = [filename]
+ else:
+ filename, oname = parseinput.parseinput(filename)
+
+ for f in filename:
+ if isinstance(f, str):
+ fname = f
+ else:
+ fname = f.filename()
+
+ if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
+ message = """\n
+ No valid WCS found found in %s.
+ A valid value for either "wcsname" or "wcskey"
+ needs to be specified.
+ """ % fname
+ logger.critical(message)
+ raise ValueError
+
+ # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
+ if wcskey == 'PRIMARY':
+ wcskey = ' '
+
+ if attach:
+ umode = 'update'
+ else:
+ umode = 'readonly'
+
+ fobj, fname, close_fobj = parse_filename(f, mode=umode)
+
+ # Interpret sciext input for this file
+ if isinstance(sciext, int):
+ sciextlist = [sciext] # allow for specification of simple FITS header
+ elif isinstance(sciext, str):
+ numsciext = countExtn(fobj, sciext)
+ if numsciext > 0:
+ sciextlist = [tuple((sciext,i)) for i in range(1, numsciext+1)]
+ else:
+ sciextlist = [0]
+ elif isinstance(sciext, list):
+ sciextlist = sciext
+ else:
+ errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
+ " a valid EXTNAME string, or an integer."
+ logger.critical(errstr)
+ raise ValueError
+
+ wnames = altwcs.wcsnames(fobj,ext=sciextlist[0])
+
+ # Insure that WCSCORR table has been created with all original
+ # WCS's recorded prior to adding the headerlet WCS
+ wcscorr.init_wcscorr(fobj)
+
+ if wcsname is None:
+ scihdr = fobj[sciextlist[0]].header
+ wname = scihdr['wcsname'+wcskey]
+ else:
+ wname = wcsname
+ if hdrname in [None, ' ', '']:
+ hdrname = wcsname
+
+ logger.critical('Creating the headerlet from image %s' % fname)
+ hdrletobj = create_headerlet(fobj, sciext=sciextlist,
+ wcsname=wname, wcskey=wcskey,
+ hdrname=hdrname,
+ sipname=sipname, npolfile=npolfile,
+ d2imfile=d2imfile, author=author,
+ descrip=descrip, history=history,
+ nmatch=nmatch, catalog=catalog,
+ logging=False)
+
+ if attach:
+ # Check to see whether or not a HeaderletHDU with
+ #this hdrname already exists
+ hdrnames = get_headerlet_kw_names(fobj)
+ if hdrname not in hdrnames:
+ hdrlet_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
+
+ if destim is not None:
+ hdrlet_hdu.header['destim'] = destim
+
+ fobj.append(hdrlet_hdu)
+
+ # Update the WCSCORR table with new rows from the headerlet's WCSs
+ wcscorr.update_wcscorr(fobj, source=hdrletobj,
+ extname='SIPWCS', wcs_id=wname)
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ else:
+ message = """
+ Headerlet with hdrname %s already archived for WCS %s.
+ No new headerlet appended to %s.
+ """ % (hdrname, wname, fname)
+ logger.critical(message)
+
+ if close_fobj:
+ logger.info('Closing image in write_headerlet()...')
+ fobj.close()
+
+ frootname = fu.buildNewRootname(fname)
+
+ if output is None:
+ # Generate default filename for headerlet FITS file
+ outname = '{0}_hlet.fits'.format(frootname)
+ else:
+ outname = output
+
+ if not outname.endswith('.fits'):
+ outname = '{0}_{1}_hlet.fits'.format(frootname,outname)
+
+ # If user specifies an output filename for headerlet, write it out
+ hdrletobj.tofile(outname, clobber=clobber)
+ logger.critical( 'Created Headerlet file %s ' % outname)
+
+ del hdrletobj
+
+@with_logging
+def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
+ wcskey=" ", wcsname=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ logging=False, logmode='w'):
+ """
+ Create a headerlet from a WCS in a science file
+ If both wcskey and wcsname are given they should match, if not
+ raise an Exception
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ sciext: string or python list (default: 'SCI')
+ Extension in which the science data with the linear WCS is.
+ The headerlet will be created from these extensions.
+ If string - a valid EXTNAME is expected
+ If int - specifies an extension with a valid WCS, such as 0 for a
+ simple FITS file
+ If list - a list of FITS extension numbers or strings representing
+ extension tuples, e.g. ('SCI, 1') is expected.
+ hdrname: string
+ value of HDRNAME keyword
+ Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey>
+ It stops if neither is found in the science file and a value is not provided
+ destim: string or None
+ name of file this headerlet can be applied to
+ if None, use ROOTNAME keyword
+ wcskey: char (A...Z) or " " or "PRIMARY" or None
+ a char representing an alternate WCS to be used for the headerlet
+ if " ", use the primary (default)
+ if None use wcsname
+ wcsname: string or None
+ if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ author: string
+ Name of user who created the headerlet, added as 'AUTHOR' keyword
+ to headerlet PRIMARY header
+ descrip: string
+ Short description of the solution provided by the headerlet
+ This description will be added as the single 'DESCRIP' keyword
+ to the headerlet PRIMARY header
+ history: filename, string or list of strings
+ Long (possibly multi-line) description of the solution provided
+ by the headerlet. These comments will be added as 'HISTORY' cards
+ to the headerlet PRIMARY header
+ If filename is specified, it will format and attach all text from
+ that file as the history.
+ nmatch: int (optional)
+ Number of sources used in the new solution fit
+ catalog: string (optional)
+ Astrometric catalog used for headerlet solution
+ logging: boolean
+ enable file logging
+ logmode: 'w' or 'a'
+ log file open mode
+
+ Returns
+ -------
+ Headerlet object
+
+ """
+ if wcskey == 'O':
+ message = "Warning: 'O' is a reserved key for the original WCS. Quitting..."
+ logger.info(message)
+ return None
+
+ fobj, fname, close_file = parse_filename(filename)
+ # based on `sciext` create a list of (EXTNAME, EXTVER) tuples
+ # of extensions with WCS to be saved in a headerlet
+ sciext = get_extname_extver_list(fobj, sciext)
+ logger.debug("Data extensions from which to create headerlet:\n\t %s"
+ % (str(sciext)))
+ if not sciext:
+ logger.critical("No valid target extensions found in file %s" % fname)
+ raise ValueError
+
+ # Define extension to evaluate for verification of input parameters
+ wcsext = sciext[0]
+ logger.debug("sciext in create_headerlet is %s" % str(sciext))
+ # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
+ if wcskey == 'PRIMARY':
+ wcskey = ' '
+ logger.info("wcskey reset from 'PRIMARY' to ' '")
+ wcskey = wcskey.upper()
+ wcsnamekw = "".join(["WCSNAME", wcskey.upper()]).rstrip()
+ hdrnamekw = "".join(["HDRNAME", wcskey.upper()]).rstrip()
+
+ wnames = altwcs.wcsnames(fobj, ext=wcsext)
+ if not wcsname:
+ # User did not specify a value for 'wcsname'
+ if wcsnamekw in fobj[wcsext].header:
+ #check if there's a WCSNAME for this wcskey in the header
+ wcsname = fobj[wcsext].header[wcsnamekw]
+ logger.info("Setting wcsname from header[%s] to %s" % (wcsnamekw, wcsname))
+ else:
+ if hdrname not in ['', ' ', None, "INDEF"]:
+ """
+ If wcsname for this wcskey was not provided
+ and WCSNAME<wcskey> does not exist in the header
+ and hdrname is provided, then
+ use hdrname as WCSNAME for the headerlet.
+ """
+ wcsname = hdrname
+ logger.debug("Setting wcsname from hdrname to %s" % hdrname)
+ else:
+ if hdrnamekw in fobj[wcsext].header:
+ wcsname = fobj[wcsext].header[hdrnamekw]
+ logger.debug("Setting wcsname from header[%s] to %s" % (hdrnamekw, wcsname))
+ else:
+ message = """
+ Required keywords 'HDRNAME' or 'WCSNAME' not found!
+ Please specify a value for parameter 'hdrname'
+ or update header with 'WCSNAME' keyword.
+ """
+ logger.critical(message)
+ raise KeyError
+ else:
+ # Verify that 'wcsname' and 'wcskey' values specified by user reference
+ # the same WCS
+ wname = fobj[wcsext].header[wcsnamekw]
+ if wcsname != wname:
+ message = "\tInconsistent values for 'wcskey' and 'wcsname' specified!\n"
+ message += " 'wcskey' = %s and 'wcsname' = %s. \n" % (wcskey, wcsname)
+ message += "Actual value of %s found to be %s. \n" % (wcsnamekw, wname)
+ logger.critical(message)
+ raise KeyError
+ wkeys = altwcs.wcskeys(fobj, ext=wcsext)
+ if wcskey != ' ':
+ if wcskey not in wkeys:
+ logger.critical('No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext)))
+ raise ValueError("No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))")
+
+ # get remaining required keywords
+ if destim is None:
+ if 'ROOTNAME' in fobj[0].header:
+ destim = fobj[0].header['ROOTNAME']
+ logger.info("Setting destim to rootname of the science file")
+ else:
+ destim = fname
+ logger.info('DESTIM not provided')
+ logger.info('Keyword "ROOTNAME" not found')
+ logger.info('Using file name as DESTIM')
+
+ if not hdrname:
+ # check if HDRNAME<wcskey> is in header
+ if hdrnamekw in fobj[wcsext].header:
+ hdrname = fobj[wcsext].header[hdrnamekw]
+ else:
+ if wcsnamekw in fobj[wcsext].header:
+ hdrname = fobj[wcsext].header[wcsnamekw]
+ message = """
+ Using default value for HDRNAME of "%s" derived from %s.
+ """ % (hdrname, wcsnamekw)
+ logger.info(message)
+ logger.info("Setting hdrname to %s from header[%s]"
+ % (hdrname, wcsnamekw))
+ else:
+ message = "Required keywords 'HDRNAME' or 'WCSNAME' not found"
+ logger.critical(message)
+ raise KeyError
+
+
+
+ hdul = []
+ phdu = _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
+ sipname, npolfile, d2imfile,
+ nmatch, catalog, wcskey,
+ author, descrip, history)
+ hdul.append(phdu)
+ wcsdvarr_extns = []
+ """
+ nd2i is a counter for d2i extensions to be used when the science file
+ has an old d2i correction format. The old format did not write EXTVER
+ kw for the d2i correction in the science header bu tthe new format expects
+ them.
+ """
+ nd2i_extver = 1
+ for ext in sciext:
+ wkeys = altwcs.wcskeys(fobj, ext=ext)
+ if wcskey != ' ':
+ if wcskey not in wkeys:
+ logger.debug(
+ 'No WCS with wcskey=%s found in extension %s. '
+ 'Skipping...' % (wcskey, str(ext)))
+ raise ValueError("")
+
+ hwcs = HSTWCS(fobj, ext=ext, wcskey=wcskey)
+
+ whdul = hwcs.to_fits(relax=True, key=" ")
+ if hasattr(hwcs, 'orientat'):
+ orient_comment = "positions angle of image y axis (deg. e of n)"
+ whdul[0].header['ORIENTAT'] = (hwcs.orientat, orient_comment)
+
+ whdul[0].header.append(('TG_ENAME', ext[0], 'Target science data extname'))
+ whdul[0].header.append(('TG_EVER', ext[1], 'Target science data extver'))
+
+ if hwcs.wcs.has_cd():
+ whdul[0].header = altwcs.pc2cd(whdul[0].header)
+
+ idckw = hwcs._idc2hdr()
+ whdul[0].header.extend(idckw)
+
+ if hwcs.det2im1 or hwcs.det2im2:
+ try:
+ whdul[0].header.append(fobj[ext].header.cards['D2IMEXT'])
+ except KeyError:
+ pass
+ whdul[0].header.extend(fobj[ext].header.cards['D2IMERR*'])
+ if 'D2IM1.EXTVER' in whdul[0].header:
+ try:
+ whdul[0].header['D2IM1.EXTVER'] = fobj[ext].header['D2IM1.EXTVER']
+ except KeyError:
+ whdul[0].header['D2IM1.EXTVER'] = nd2i_extver
+ nd2i_extver += 1
+ if 'D2IM2.EXTVER' in whdul[0].header:
+ try:
+ whdul[0].header['D2IM2.EXTVER'] = fobj[ext].header['D2IM2.EXTVER']
+ except KeyError:
+ whdul[0].header['D2IM2.EXTVER'] = nd2i_extver
+ nd2i_extver += 1
+
+ if hwcs.cpdis1 or hwcs.cpdis2:
+ whdul[0].header.extend(fobj[ext].header.cards['CPERR*'])
+ try:
+ whdul[0].header.append(fobj[ext].header.cards['NPOLEXT'])
+ except KeyError:
+ pass
+ if 'DP1.EXTVER' in whdul[0].header:
+ whdul[0].header['DP1.EXTVER'] = fobj[ext].header['DP1.EXTVER']
+ if 'DP2.EXTVER' in whdul[0].header:
+ whdul[0].header['DP2.EXTVER'] = fobj[ext].header['DP2.EXTVER']
+ ihdu = fits.ImageHDU(header=whdul[0].header, name='SIPWCS')
+
+ if ext[0] != "PRIMARY":
+ ihdu.update_ext_version(fobj[ext].header['EXTVER'], comment='Extension version')
+
+ hdul.append(ihdu)
+
+ if hwcs.cpdis1:
+ whdu = whdul[('WCSDVARR', 1)].copy()
+ whdu.update_ext_version(fobj[ext].header['DP1.EXTVER'])
+ hdul.append(whdu)
+ if hwcs.cpdis2:
+ whdu = whdul[('WCSDVARR', 2)].copy()
+ whdu.update_ext_version(fobj[ext].header['DP2.EXTVER'])
+ hdul.append(whdu)
+
+ if hwcs.det2im1:
+ whdu = whdul[('D2IMARR', 1)].copy()
+ whdu.update_ext_version(ihdu.header['D2IM1.EXTVER'])
+ hdul.append(whdu)
+ if hwcs.det2im2:
+ whdu = whdul[('D2IMARR', 2)].copy()
+ whdu.update_ext_version(ihdu.header['D2IM2.EXTVER'])
+ hdul.append(whdu)
+
+
+ #if hwcs.det2im1 or hwcs.det2im2:
+ #try:
+ #darr = hdul[('D2IMARR', 1)]
+ #except KeyError:
+ #whdu = whdul[('D2IMARR')]
+ #whdu.update_ext_version(1)
+ #hdul.append(whdu)
+ if close_file:
+ fobj.close()
+
+ hlet = Headerlet(hdul, logging=logging, logmode='a')
+ hlet.init_attrs()
+ return hlet
+
+@with_logging
+def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True,
+ force=False, logging=False, logmode='a'):
+ """
+ Apply headerlet 'hdrfile' to a science observation 'destfile' as the primary WCS
+
+ Parameters
+ ----------
+ filename: string or list of strings
+ File name(s) of science observation whose WCS solution will be updated
+ hdrlet: string or list of strings
+ Headerlet file(s), must match 1-to-1 with input filename(s)
+ attach: boolean
+ True (default): append headerlet to FITS file as a new extension.
+ archive: boolean
+ True (default): before updating, create a headerlet with the
+ WCS old solution.
+ force: boolean
+ If True, this will cause the headerlet to replace the current PRIMARY
+ WCS even if it has a different distortion model. [Default: False]
+ logging: boolean
+ enable file logging
+ logmode: 'w' or 'a'
+ log file open mode
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+ if not isinstance(hdrlet, list):
+ hdrlet = [hdrlet]
+ if len(hdrlet) != len(filename):
+ logger.critical("Filenames must have matching headerlets. "
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+
+ for fname,h in zip(filename,hdrlet):
+ print("Applying {0} as Primary WCS to {1}".format(h,fname))
+ hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
+ hlet.apply_as_primary(fname, attach=attach, archive=archive,
+ force=force)
+
+
+@with_logging
+def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None,
+ wcsname=None, logging=False, logmode='w'):
+ """
+ Apply headerlet to a science observation as an alternate WCS
+
+ Parameters
+ ----------
+ filename: string or list of strings
+ File name(s) of science observation whose WCS solution will be updated
+ hdrlet: string or list of strings
+ Headerlet file(s), must match 1-to-1 with input filename(s)
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ wcskey: string
+ Key value (A-Z, except O) for this alternate WCS
+ If None, the next available key will be used
+ wcsname: string
+ Name to be assigned to this alternate WCS
+ WCSNAME is a required keyword in a Headerlet but this allows the
+ user to change it as desired.
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+ if not isinstance(hdrlet, list):
+ hdrlet = [hdrlet]
+ if len(hdrlet) != len(filename):
+ logger.critical("Filenames must have matching headerlets. "
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+
+ for fname,h in zip(filename,hdrlet):
+ print('Applying {0} as an alternate WCS to {1}'.format(h,fname))
+ hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
+ hlet.apply_as_alternate(fname, attach=attach,
+ wcsname=wcsname, wcskey=wcskey)
+
+
+@with_logging
+def attach_headerlet(filename, hdrlet, logging=False, logmode='a'):
+ """
+ Attach Headerlet as an HeaderletHDU to a science file
+
+ Parameters
+ ----------
+ filename: HDUList or list of HDULists
+ science file(s) to which the headerlet should be applied
+ hdrlet: string, Headerlet object or list of strings or Headerlet objects
+ string representing a headerlet file(s), must match 1-to-1 input filename(s)
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+ if not isinstance(hdrlet, list):
+ hdrlet = [hdrlet]
+ if len(hdrlet) != len(filename):
+ logger.critical("Filenames must have matching headerlets. "
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+
+ for fname,h in zip(filename,hdrlet):
+ print('Attaching {0} as Headerlet extension to {1}'.format(h,fname))
+ hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
+ hlet.attach_to_file(fname,archive=True)
+
+
+@with_logging
+def delete_headerlet(filename, hdrname=None, hdrext=None, distname=None,
+ logging=False, logmode='w'):
+ """
+ Deletes HeaderletHDU(s) with same HDRNAME from science files
+
+ Notes
+ -----
+ One of hdrname, hdrext or distname should be given.
+ If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
+ If hdrext is given - delete HeaderletHDU in extension.
+ If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
+ Updates wcscorr
+
+ Parameters
+ ----------
+ filename: string, HDUList or list of strings
+ Filename can be specified as a single filename or HDUList, or
+ a list of filenames
+ Each input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string or None
+ HeaderletHDU primary header keyword HDRNAME
+ hdrext: int, tuple or None
+ HeaderletHDU FITS extension number
+ tuple has the form ('HDRLET', 1)
+ distname: string or None
+ distortion model as specified in the DISTNAME keyword
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+
+ for f in filename:
+ print("Deleting Headerlet from ",f)
+ _delete_single_headerlet(f, hdrname=hdrname, hdrext=hdrext,
+ distname=distname, logging=logging, logmode='a')
+
+def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None,
+ logging=False, logmode='w'):
+ """
+ Deletes HeaderletHDU(s) from a SINGLE science file
+
+ Notes
+ -----
+ One of hdrname, hdrext or distname should be given.
+ If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
+ If hdrext is given - delete HeaderletHDU in extension.
+ If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
+ Updates wcscorr
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string or None
+ HeaderletHDU primary header keyword HDRNAME
+ hdrext: int, tuple or None
+ HeaderletHDU FITS extension number
+ tuple has the form ('HDRLET', 1)
+ distname: string or None
+ distortion model as specified in the DISTNAME keyword
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ hdrlet_ind = find_headerlet_HDUs(filename, hdrname=hdrname, hdrext=hdrext,
+ distname=distname, logging=logging, logmode='a')
+ if len(hdrlet_ind) == 0:
+ message = """
+ No HDUs deleted... No Headerlet HDUs found with '
+ hdrname = %s
+ hdrext = %s
+ distname = %s
+ Please review input parameters and try again.
+ """ % (hdrname, str(hdrext), distname)
+ logger.critical(message)
+ return
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ # delete row(s) from WCSCORR table now...
+ #
+ #
+ if hdrname not in ['', ' ', None, 'INDEF']:
+ selections = {'hdrname': hdrname}
+ elif hdrname in ['', ' ', None, 'INDEF'] and hdrext is not None:
+ selections = {'hdrname': fobj[hdrext].header['hdrname']}
+ else:
+ selections = {'distname': distname}
+ wcscorr.delete_wcscorr_row(fobj['WCSCORR'].data, selections)
+
+ # delete the headerlet extension now
+ for hdrind in hdrlet_ind:
+ del fobj[hdrind]
+
+ utils.updateNEXTENDKw(fobj)
+ # Update file object with changes
+ fobj.flush()
+ # close file, if was opened by this function
+ if close_fobj:
+ fobj.close()
+ logger.critical('Deleted headerlet from extension(s) %s ' % str(hdrlet_ind))
+
+
+def headerlet_summary(filename, columns=None, pad=2, maxwidth=None,
+ output=None, clobber=True, quiet=False):
+ """
+
+ Print a summary of all HeaderletHDUs in a science file to STDOUT, and
+ optionally to a text file
+ The summary includes:
+ HDRLET_ext_number HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ columns: list
+ List of headerlet PRIMARY header keywords to report in summary
+ By default (set to None), it will use the default set of keywords
+ defined as the global list DEFAULT_SUMMARY_COLS
+ pad: int
+ Number of padding spaces to put between printed columns
+ [Default: 2]
+ maxwidth: int
+ Maximum column width(not counting padding) for any column in summary
+ By default (set to None), each column's full width will be used
+ output: string (optional)
+ Name of optional output file to record summary. This filename
+ can contain environment variables.
+ [Default: None]
+ clobber: bool
+ If True, will overwrite any previous output file of same name
+ quiet: bool
+ If True, will NOT report info to STDOUT
+
+ """
+ if columns is None:
+ summary_cols = DEFAULT_SUMMARY_COLS
+ else:
+ summary_cols = columns
+
+ summary_dict = {}
+ for kw in summary_cols:
+ summary_dict[kw] = copy.deepcopy(COLUMN_DICT)
+
+ # Define Extension number column
+ extnums_col = copy.deepcopy(COLUMN_DICT)
+ extnums_col['name'] = 'EXTN'
+ extnums_col['width'] = 6
+
+ fobj, fname, close_fobj = parse_filename(filename)
+ # find all HDRLET extensions and combine info into a single summary
+ for extn in fobj:
+ if 'extname' in extn.header and extn.header['extname'] == 'HDRLET':
+ hdrlet_indx = fobj.index_of(('hdrlet', extn.header['extver']))
+ try:
+ ext_cols, ext_summary = extn.headerlet.summary(columns=summary_cols)
+ extnums_col['vals'].append(hdrlet_indx)
+ for kw in summary_cols:
+ for key in COLUMN_DICT:
+ summary_dict[kw][key].extend(ext_summary[kw][key])
+ except:
+ print("Skipping headerlet")
+ print("Could not read Headerlet from extension ", hdrlet_indx)
+
+ if close_fobj:
+ fobj.close()
+
+ # Print out the summary dictionary
+ print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
+ idcol=extnums_col, output=output,
+ clobber=clobber, quiet=quiet)
+
+
+@with_logging
+def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
+ force=False, logging=False, logmode='w'):
+ """
+ Restores a headerlet as a primary WCS
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string
+ HDRNAME keyword of HeaderletHDU
+ hdrext: int or tuple
+ Headerlet extension number of tuple ('HDRLET',2)
+ archive: boolean (default: True)
+ When the distortion model in the headerlet is the same as the distortion model of
+ the science file, this flag indicates if the primary WCS should be saved as an alternate
+ nd a headerlet extension.
+ When the distortion models do not match this flag indicates if the current primary and
+ alternate WCSs should be archived as headerlet extensions and alternate WCS.
+ force: boolean (default:False)
+ When the distortion models of the headerlet and the primary do not match, and archive
+ is False, this flag forces an update of the primary.
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+
+ hdrlet_ind = find_headerlet_HDUs(filename, hdrext=hdrext, hdrname=hdrname)
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ if len(hdrlet_ind) > 1:
+ if hdrext:
+ kwerr = 'hdrext'
+ kwval = hdrext
+ else:
+ kwerr = 'hdrname'
+ kwval = hdrname
+ message = """
+ Multiple Headerlet extensions found with the same name.
+ %d Headerlets with "%s" = %s found in %s.
+ """% (len(hdrlet_ind), kwerr, kwval, fname)
+ if close_fobj:
+ fobj.close()
+ logger.critical(message)
+ raise ValueError
+
+ hdrlet_indx = hdrlet_ind[0]
+
+ # read headerlet from HeaderletHDU into memory
+ if hasattr(fobj[hdrlet_ind[0]], 'hdulist'):
+ hdrlet = fobj[hdrlet_indx].hdulist
+ else:
+ hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS
+
+ # read in the names of the extensions which HeaderletHDU updates
+ extlist = []
+ for ext in hdrlet:
+ if 'extname' in ext.header and ext.header['extname'] == 'SIPWCS':
+ # convert from string to tuple or int
+ sciext = eval(ext.header['sciext'])
+ extlist.append(fobj[sciext])
+ # determine whether distortion is the same
+ current_distname = hdrlet[0].header['distname']
+ same_dist = True
+ if current_distname != fobj[0].header['distname']:
+ same_dist = False
+ if not archive and not force:
+ if close_fobj:
+ fobj.close()
+ message = """
+ Headerlet does not have the same distortion as image!
+ Set "archive"=True to save old distortion model, or
+ set "force"=True to overwrite old model with new.
+ """
+ logger.critical(message)
+ raise ValueError
+
+ # check whether primary WCS has been archived already
+ # Use information from first 'SCI' extension
+ priwcs_name = None
+
+ scihdr = extlist[0].header
+ sci_wcsnames = altwcs.wcsnames(scihdr).values()
+ if 'hdrname' in scihdr:
+ priwcs_hdrname = scihdr['hdrname']
+ else:
+ if 'wcsname' in scihdr:
+ priwcs_hdrname = priwcs_name = scihdr['wcsname']
+ else:
+ if 'idctab' in scihdr:
+ priwcs_hdrname = ''.join(['IDC_',
+ utils.extract_rootname(scihdr['idctab'], suffix='_idc')])
+ else:
+ priwcs_hdrname = 'UNKNOWN'
+ priwcs_name = priwcs_hdrname
+ scihdr['WCSNAME'] = priwcs_name
+
+ priwcs_unique = verify_hdrname_is_unique(fobj, priwcs_hdrname)
+ if archive and priwcs_unique:
+ if priwcs_unique:
+ newhdrlet = create_headerlet(fobj, sciext=scihdr['extname'],
+ hdrname=priwcs_hdrname)
+ newhdrlet.attach_to_file(fobj)
+ #
+ # copy hdrlet as a primary
+ #
+ hdrlet.apply_as_primary(fobj, attach=False, archive=archive, force=force)
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ if close_fobj:
+ fobj.close()
+
+
+@with_logging
+def restore_all_with_distname(filename, distname, primary, archive=True,
+ sciext='SCI', logging=False, logmode='w'):
+ """
+ Restores all HeaderletHDUs with a given distortion model as alternate WCSs and a primary
+
+ Parameters
+ --------------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ distname: string
+ distortion model as represented by a DISTNAME keyword
+ primary: int or string or None
+ HeaderletHDU to be restored as primary
+ if int - a fits extension
+ if string - HDRNAME
+ if None - use first HeaderletHDU
+ archive: boolean (default True)
+ flag indicating if HeaderletHDUs should be created from the
+ primary and alternate WCSs in fname before restoring all matching
+ headerlet extensions
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ hdrlet_ind = find_headerlet_HDUs(fobj, distname=distname)
+ if len(hdrlet_ind) == 0:
+ message = """
+ No Headerlet extensions found with
+
+ DISTNAME = %s in %s.
+
+ For a full list of DISTNAMEs found in all headerlet extensions:
+
+ get_headerlet_kw_names(fobj, kw='DISTNAME')
+ """ % (distname, fname)
+ if close_fobj:
+ fobj.close()
+ logger.critical(message)
+ raise ValueError
+
+ # Interpret 'primary' parameter input into extension number
+ if primary is None:
+ primary_ind = hdrlet_ind[0]
+ elif isinstance(primary, int):
+ primary_ind = primary
+ else:
+ primary_ind = None
+ for ind in hdrlet_ind:
+ if fobj[ind].header['hdrname'] == primary:
+ primary_ind = ind
+ break
+ if primary_ind is None:
+ if close_fobj:
+ fobj.close()
+ message = """
+ No Headerlet extensions found with DISTNAME = %s in %s.
+ """ % (primary, fname)
+ logger.critical(message)
+ raise ValueError
+ # Check to see whether 'primary' HeaderletHDU has same distname as user
+ # specified on input
+
+ # read headerlet from HeaderletHDU into memory
+ if hasattr(fobj[primary_ind], 'hdulist'):
+ primary_hdrlet = fobj[primary_ind].hdulist
+ else:
+ primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS
+ pri_distname = primary_hdrlet[0].header['distname']
+ if pri_distname != distname:
+ if close_fobj:
+ fobj.close()
+ message = """
+ Headerlet extension to be used as PRIMARY WCS
+ has "DISTNAME" = %s
+ "DISTNAME" = %s was specified on input.
+ All updated WCSs must have same DISTNAME. Quitting...'
+ """ % (pri_distname, distname)
+ logger.critical(message)
+ raise ValueError
+
+ # read in the names of the WCSs which the HeaderletHDUs will update
+ wnames = altwcs.wcsnames(fobj[sciext, 1].header)
+
+ # work out how many HeaderletHDUs will be used to update the WCSs
+ numhlt = len(hdrlet_ind)
+ hdrnames = get_headerlet_kw_names(fobj, kw='wcsname')
+
+ # read in headerletHDUs and update WCS keywords
+ for hlet in hdrlet_ind:
+ if fobj[hlet].header['distname'] == distname:
+ if hasattr(fobj[hlet], 'hdulist'):
+ hdrlet = fobj[hlet].hdulist
+ else:
+ hdrlet = fobj[hlet].headerlet # older convention in PyFITS
+ if hlet == primary_ind:
+ hdrlet.apply_as_primary(fobj, attach=False,
+ archive=archive, force=True)
+ else:
+ hdrlet.apply_as_alternate(fobj, attach=False,
+ wcsname=hdrlet[0].header['wcsname'])
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ if close_fobj:
+ fobj.close()
+
+
+@with_logging
+def archive_as_headerlet(filename, hdrname, sciext='SCI',
+ wcsname=None, wcskey=None, destim=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ logging=False, logmode='w'):
+ """
+ Save a WCS as a headerlet extension and write it out to a file.
+
+ This function will create a headerlet, attach it as an extension to the
+ science image (if it has not already been archived) then, optionally,
+ write out the headerlet to a separate headerlet file.
+
+ Either wcsname or wcskey must be provided, if both are given, they must match a valid WCS
+ Updates wcscorr if necessary.
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string
+ Unique name for this headerlet, stored as HDRNAME keyword
+ sciext: string
+ name (EXTNAME) of extension that contains WCS to be saved
+ wcsname: string
+ name of WCS to be archived, if " ": stop
+ wcskey: one of A...Z or " " or "PRIMARY"
+ if " " or "PRIMARY" - archive the primary WCS
+ destim: string
+ DESTIM keyword
+ if NOne, use ROOTNAME or science file name
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ stored. If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ author: string
+ Name of user who created the headerlet, added as 'AUTHOR' keyword
+ to headerlet PRIMARY header
+ descrip: string
+ Short description of the solution provided by the headerlet
+ This description will be added as the single 'DESCRIP' keyword
+ to the headerlet PRIMARY header
+ history: filename, string or list of strings
+ Long (possibly multi-line) description of the solution provided
+ by the headerlet. These comments will be added as 'HISTORY' cards
+ to the headerlet PRIMARY header
+ If filename is specified, it will format and attach all text from
+ that file as the history.
+ logging: boolean
+ enable file folling
+ logmode: 'w' or 'a'
+ log file open mode
+ """
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
+ message = """
+ No valid WCS found found in %s.
+ A valid value for either "wcsname" or "wcskey"
+ needs to be specified.
+ """ % fname
+ if close_fobj:
+ fobj.close()
+ logger.critical(message)
+ raise ValueError
+
+ # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
+ if wcskey == 'PRIMARY':
+ wcskey = ' '
+ wcskey = wcskey.upper()
+
+ numhlt = countExtn(fobj, 'HDRLET')
+
+ if wcsname is None:
+ scihdr = fobj[sciext, 1].header
+ wcsname = scihdr['wcsname'+wcskey]
+
+ if hdrname in [None, ' ', '']:
+ hdrname = wcsname
+
+ # Check to see whether or not a HeaderletHDU with this hdrname already
+ # exists
+ hdrnames = get_headerlet_kw_names(fobj)
+ if hdrname not in hdrnames:
+ hdrletobj = create_headerlet(fobj, sciext=sciext,
+ wcsname=wcsname, wcskey=wcskey,
+ hdrname=hdrname,
+ sipname=sipname, npolfile=npolfile,
+ d2imfile=d2imfile, author=author,
+ descrip=descrip, history=history,
+ nmatch=nmatch, catalog=catalog,
+ logging=False)
+ hlt_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
+
+ if destim is not None:
+ hlt_hdu[0].header['destim'] = destim
+
+ fobj.append(hlt_hdu)
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ else:
+ message = """
+ Headerlet with hdrname %s already archived for WCS %s
+ No new headerlet appended to %s .
+ """ % (hdrname, wcsname, fname)
+ logger.critical(message)
+
+ if close_fobj:
+ fobj.close()
+
+#### Headerlet Class definitions
+class Headerlet(fits.HDUList):
+ """
+ A Headerlet class
+ Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets
+ """
+
+ def __init__(self, hdus=[], file=None, logging=False, logmode='w'):
+ """
+ Parameters
+ ----------
+ hdus : list
+ List of HDUs to be used to create the headerlet object itself
+ file: string
+ File-like object from which HDUs should be read
+ logging: boolean
+ enable file logging
+ logmode: 'w' or 'a'
+ for internal use only, indicates whether the log file
+ should be open in attach or write mode
+ """
+ self.logging = logging
+ init_logging('class Headerlet', level=logging, mode=logmode)
+
+ super(Headerlet, self).__init__(hdus, file=file)
+
+ def init_attrs(self):
+ self.fname = self.filename()
+ self.hdrname = self[0].header["HDRNAME"]
+ self.wcsname = self[0].header["WCSNAME"]
+ self.upwcsver = self[0].header.get("UPWCSVER", "")
+ self.pywcsver = self[0].header.get("PYWCSVER", "")
+ self.destim = self[0].header["DESTIM"]
+ self.sipname = self[0].header["SIPNAME"]
+ self.idctab = self[0].header["IDCTAB"]
+ self.npolfile = self[0].header["NPOLFILE"]
+ self.d2imfile = self[0].header["D2IMFILE"]
+ self.distname = self[0].header["DISTNAME"]
+
+ try:
+ self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) #None instead of 1?
+ except (IndexError, KeyError):
+ self.vafactor = self[0].header.get("VAFACTOR", 1) #None instead of 1?
+ self.author = self[0].header["AUTHOR"]
+ self.descrip = self[0].header["DESCRIP"]
+
+ self.fit_kws = ['HDRNAME', 'NMATCH', 'CATALOG']
+ self.history = ''
+ # header['HISTORY'] returns an iterable of all HISTORY values
+ if 'HISTORY' in self[0].header:
+ for hist in self[0].header['HISTORY']:
+ self.history += hist + '\n'
+
+ self.d2imerr = 0
+ self.axiscorr = 1
+
+ # Overridden to support the Headerlet logging features
+ @classmethod
+ def fromfile(cls, fileobj, mode='readonly', memmap=False,
+ save_backup=False, logging=False, logmode='w', **kwargs):
+ hlet = super(cls, cls).fromfile(fileobj, mode, memmap, save_backup,
+ **kwargs)
+ if len(hlet) > 0:
+ hlet.init_attrs()
+ hlet.logging = logging
+ init_logging('class Headerlet', level=logging, mode=logmode)
+ return hlet
+
+ @classmethod
+ def fromstring(cls, data, **kwargs):
+ hlet = super(cls, cls).fromstring(data, **kwargs)
+ hlet.logging = logging
+ init_logging('class Headerlet', level=logging, mode=logmode)
+ return hlet
+
+ def apply_as_primary(self, fobj, attach=True, archive=True, force=False):
+ """
+ Copy this headerlet as a primary WCS to fobj
+
+ Parameters
+ ----------
+ fobj: string, HDUList
+ science file to which the headerlet should be applied
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ archive: boolean (default is True)
+ When the distortion model in the headerlet is the same as the
+ distortion model of the science file, this flag indicates if
+ the primary WCS should be saved as an alternate and a headerlet
+ extension.
+ When the distortion models do not match this flag indicates if
+ the current primary and alternate WCSs should be archived as
+ headerlet extensions and alternate WCS.
+ force: boolean (default is False)
+ When the distortion models of the headerlet and the primary do
+ not match, and archive is False this flag forces an update
+ of the primary
+ """
+ self.hverify()
+ fobj, fname, close_dest = parse_filename(fobj, mode='update')
+ if not self.verify_dest(fobj, fname):
+ if close_dest:
+ fobj.close()
+ raise ValueError("Destination name does not match headerlet"
+ "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+
+ # Check to see whether the distortion model in the destination
+ # matches the distortion model in the headerlet being applied
+
+ dname = self.get_destination_model(fobj)
+ dist_models_equal = self.equal_distmodel(dname)
+ if not dist_models_equal and not force:
+ raise ValueError("Distortion models do not match"
+ " To overwrite the distortion model, set force=True")
+
+ orig_hlt_hdu = None
+ numhlt = countExtn(fobj, 'HDRLET')
+ hdrlet_extnames = get_headerlet_kw_names(fobj)
+
+ # Insure that WCSCORR table has been created with all original
+ # WCS's recorded prior to adding the headerlet WCS
+ wcscorr.init_wcscorr(fobj)
+
+
+ ### start archive
+ # If archive has been specified
+ # regardless of whether or not the distortion models are equal...
+
+ numsip = countExtn(self, 'SIPWCS')
+ sciext_list = []
+ alt_hlethdu = []
+ for i in range(1, numsip+1):
+ sipheader = self[('SIPWCS', i)].header
+ sciext_list.append((sipheader['TG_ENAME'], sipheader['TG_EVER']))
+ target_ext = sciext_list[0]
+ if archive:
+ if 'wcsname' in fobj[target_ext].header:
+ hdrname = fobj[target_ext].header['WCSNAME']
+ wcsname = hdrname
+ else:
+ hdrname = fobj[0].header['ROOTNAME'] + '_orig'
+ wcsname = None
+ if hdrname not in hdrlet_extnames:
+ # - if WCS has not been saved, write out WCS as headerlet extension
+ # Create a headerlet for the original Primary WCS data in the file,
+ # create an HDU from the original headerlet, and append it to
+ # the file
+ orig_hlt = create_headerlet(fobj, sciext=sciext_list, #[target_ext],
+ wcsname=wcsname,
+ hdrname=hdrname,
+ logging=self.logging)
+ orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt)
+ numhlt += 1
+ orig_hlt_hdu.header['EXTVER'] = numhlt
+ logger.info("Created headerlet %s to be attached to file" % hdrname)
+ else:
+ logger.info("Headerlet with name %s is already attached" % hdrname)
+
+
+ if dist_models_equal:
+ # Use the WCSNAME to determine whether or not to archive
+ # Primary WCS as altwcs
+ # wcsname = hwcs.wcs.name
+ scihdr = fobj[target_ext].header
+ if 'hdrname' in scihdr:
+ priwcs_name = scihdr['hdrname']
+ else:
+ if 'wcsname' in scihdr:
+ priwcs_name = scihdr['wcsname']
+ else:
+ if 'idctab' in scihdr:
+ priwcs_name = ''.join(['IDC_',
+ utils.extract_rootname(scihdr['idctab'],
+ suffix='_idc')])
+ else:
+ priwcs_name = 'UNKNOWN'
+ nextkey = altwcs.next_wcskey(fobj, ext=target_ext)
+ altwcs.archiveWCS(fobj, ext=sciext_list, wcskey=nextkey,
+ wcsname=priwcs_name)
+ else:
+
+ for hname in altwcs.wcsnames(fobj, ext=target_ext).values():
+ if hname != 'OPUS' and hname not in hdrlet_extnames:
+ # get HeaderletHDU for alternate WCS as well
+ alt_hlet = create_headerlet(fobj, sciext=sciext_list,
+ wcsname=hname, wcskey=wcskey,
+ hdrname=hname, sipname=None,
+ npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ logging=self.logging)
+ numhlt += 1
+ alt_hlet_hdu = HeaderletHDU.fromheaderlet(alt_hlet)
+ alt_hlet_hdu.header['EXTVER'] = numhlt
+ alt_hlethdu.append(alt_hlet_hdu)
+ hdrlet_extnames.append(hname)
+
+ self._del_dest_WCS_ext(fobj)
+ for i in range(1, numsip+1):
+ target_ext = sciext_list[i-1]
+ self._del_dest_WCS(fobj, target_ext)
+ sipwcs = HSTWCS(self, ('SIPWCS', i))
+ idckw = sipwcs._idc2hdr()
+ priwcs = sipwcs.to_fits(relax=True)
+ numnpol = 1
+ numd2im = 1
+ if sipwcs.wcs.has_cd():
+ priwcs[0].header = altwcs.pc2cd(priwcs[0].header)
+ priwcs[0].header.extend(idckw)
+ if 'crder1' in sipheader:
+ for card in sipheader['crder*'].cards:
+ priwcs[0].header.set(card.keyword, card.value, card.comment,
+ after='WCSNAME')
+ # Update WCS with HDRNAME as well
+
+ for kw in ['SIMPLE', 'BITPIX', 'NAXIS', 'EXTEND']:
+ try:
+ priwcs[0].header.remove(kw)
+ except ValueError:
+ pass
+
+ priwcs[0].header.set('WCSNAME', self[0].header['WCSNAME'], "")
+ priwcs[0].header.set('WCSAXES', self[('SIPWCS', i)].header['WCSAXES'], "")
+ priwcs[0].header.set('HDRNAME', self[0].header['HDRNAME'], "")
+ if sipwcs.det2im1 or sipwcs.det2im2:
+ try:
+ d2imerr = self[('SIPWCS', i)].header['D2IMERR*']
+ priwcs[0].header.extend(d2imerr)
+ except KeyError:
+ pass
+ try:
+ priwcs[0].header.append(self[('SIPWCS', i)].header.cards['D2IMEXT'])
+ except KeyError:
+ pass
+ if 'D2IM1.EXTVER' in priwcs[0].header:
+ priwcs[0].header['D2IM1.EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
+ priwcs[('D2IMARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
+ if 'D2IM2.EXTVER' in priwcs[0].header:
+ priwcs[0].header['D2IM2.EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
+ priwcs[('D2IMARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
+ # D2IM1 will NOT exist for WFPC2 data...
+ if 'D2IM1.EXTVER' in priwcs[0].header:
+ # only set number of D2IM extensions to 2 if D2IM1 exists
+ numd2im = 2
+
+ if sipwcs.cpdis1 or sipwcs.cpdis2:
+ try:
+ cperr = self[('SIPWCS', i)].header['CPERR*']
+ priwcs[0].header.extend(cperr)
+ except KeyError:
+ pass
+ try:
+ priwcs[0].header.append(self[('SIPWCS', i)].header.cards['NPOLEXT'])
+ except KeyError:
+ pass
+ if 'DP1.EXTVER' in priwcs[0].header:
+ priwcs[0].header['DP1.EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
+ priwcs[('WCSDVARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
+ if 'DP2.EXTVER' in priwcs[0].header:
+ priwcs[0].header['DP2.EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
+ priwcs[('WCSDVARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
+ numnpol = 2
+
+ fobj[target_ext].header.extend(priwcs[0].header)
+ if sipwcs.cpdis1:
+ whdu = priwcs[('WCSDVARR', (i-1)*numnpol+1)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['DP1.EXTVER'])
+ fobj.append(whdu)
+ if sipwcs.cpdis2:
+ whdu = priwcs[('WCSDVARR', i*numnpol)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['DP2.EXTVER'])
+ fobj.append(whdu)
+ if sipwcs.det2im1: #or sipwcs.det2im2:
+ whdu = priwcs[('D2IMARR', (i-1)*numd2im+1)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM1.EXTVER'])
+ fobj.append(whdu)
+ if sipwcs.det2im2:
+ whdu = priwcs[('D2IMARR', i*numd2im)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM2.EXTVER'])
+ fobj.append(whdu)
+
+ update_versions(self[0].header, fobj[0].header)
+ refs = update_ref_files(self[0].header, fobj[0].header)
+ # Update the WCSCORR table with new rows from the headerlet's WCSs
+ wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
+
+ # Append the original headerlet
+ if archive and orig_hlt_hdu:
+ fobj.append(orig_hlt_hdu)
+ # Append any alternate WCS Headerlets
+ if len(alt_hlethdu) > 0:
+ for ahdu in alt_hlethdu:
+ fobj.append(ahdu)
+ if attach:
+ # Finally, append an HDU for this headerlet
+ self.attach_to_file(fobj)
+ utils.updateNEXTENDKw(fobj)
+ if close_dest:
+ fobj.close()
+
+
+ def apply_as_alternate(self, fobj, attach=True, wcskey=None, wcsname=None):
+ """
+ Copy this headerlet as an alternate WCS to fobj
+
+ Parameters
+ ----------
+ fobj: string, HDUList
+ science file/HDUList to which the headerlet should be applied
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ wcskey: string
+ Key value (A-Z, except O) for this alternate WCS
+ If None, the next available key will be used
+ wcsname: string
+ Name to be assigned to this alternate WCS
+ WCSNAME is a required keyword in a Headerlet but this allows the
+ user to change it as desired.
+
+ """
+ self.hverify()
+ fobj, fname, close_dest = parse_filename(fobj, mode='update')
+ if not self.verify_dest(fobj, fname):
+ if close_dest:
+ fobj.close()
+ raise ValueError("Destination name does not match headerlet"
+ "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+
+ # Verify whether this headerlet has the same distortion
+ #found in the image being updated
+ dname = self.get_destination_model(fobj)
+ dist_models_equal = self.equal_distmodel(dname)
+ if not dist_models_equal:
+ raise ValueError("Distortion models do not match \n"
+ "Headerlet: %s \n"
+ "Destination file: %s\n"
+ "attach_to_file() can be used to append this headerlet" %(self.distname, dname))
+
+ # Insure that WCSCORR table has been created with all original
+ # WCS's recorded prior to adding the headerlet WCS
+ wcscorr.init_wcscorr(fobj)
+
+ # determine value of WCSNAME to be used
+ if wcsname is not None:
+ wname = wcsname
+ else:
+ wname = self[0].header['WCSNAME']
+ tg_ename = self[('SIPWCS', 1)].header['TG_ENAME']
+ tg_ever = self[('SIPWCS', 1)].header['TG_EVER']
+ # determine what alternate WCS this headerlet will be assigned to
+ if wcskey is None:
+ wkey = altwcs.next_wcskey(fobj[(tg_ename, tg_ever)].header)
+ else:
+ wcskey = wcskey.upper()
+ available_keys = altwcs.available_wcskeys(fobj[(tg_ename, tg_ever)].header)
+ if wcskey in available_keys:
+ wkey = wcskey
+ else:
+ mess = "Observation %s already contains alternate WCS with key %s" % (fname, wcskey)
+ logger.critical(mess)
+ if close_dest:
+ fobj.close()
+ raise ValueError(mess)
+ numsip = countExtn(self, 'SIPWCS')
+ for idx in range(1, numsip + 1):
+ siphdr = self[('SIPWCS', idx)].header
+ tg_ext = (siphdr['TG_ENAME'], siphdr['TG_EVER'])
+
+ fhdr = fobj[tg_ext].header
+ hwcs = pywcs.WCS(siphdr, self)
+ hwcs_header = hwcs.to_header(key=wkey)
+ _idc2hdr(siphdr, fhdr, towkey=wkey)
+ if hwcs.wcs.has_cd():
+ hwcs_header = altwcs.pc2cd(hwcs_header, key=wkey)
+
+ fhdr.extend(hwcs_header)
+ fhdr['WCSNAME' + wkey] = wname
+ # also update with HDRNAME (a non-WCS-standard kw)
+ for kw in self.fit_kws:
+ #fhdr.insert(wind, pyfits.Card(kw + wkey,
+ # self[0].header[kw]))
+ fhdr.append(fits.Card(kw + wkey, self[0].header[kw]))
+ # Update the WCSCORR table with new rows from the headerlet's WCSs
+ wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
+
+ if attach:
+ self.attach_to_file(fobj)
+ utils.updateNEXTENDKw(fobj)
+
+ if close_dest:
+ fobj.close()
+
+ def attach_to_file(self, fobj, archive=False):
+ """
+ Attach Headerlet as an HeaderletHDU to a science file
+
+ Parameters
+ ----------
+ fobj: string, HDUList
+ science file/HDUList to which the headerlet should be applied
+ archive: string
+ Specifies whether or not to update WCSCORR table when attaching
+
+ Notes
+ -----
+ The algorithm used by this method:
+ - verify headerlet can be applied to this file (based on DESTIM)
+ - verify that HDRNAME is unique for this file
+ - attach as HeaderletHDU to fobj
+
+ """
+ self.hverify()
+ fobj, fname, close_dest = parse_filename(fobj, mode='update')
+ destver = self.verify_dest(fobj, fname)
+ hdrver = self.verify_hdrname(fobj)
+ if destver and hdrver:
+
+ numhlt = countExtn(fobj, 'HDRLET')
+ new_hlt = HeaderletHDU.fromheaderlet(self)
+ new_hlt.header['extver'] = numhlt + 1
+ fobj.append(new_hlt)
+ utils.updateNEXTENDKw(fobj)
+ else:
+ message = "Headerlet %s cannot be attached to" % (self.hdrname)
+ message += "observation %s" % (fname)
+ if not destver:
+ message += " * Image %s keyword ROOTNAME not equal to " % (fname)
+ message += " DESTIM = '%s'\n" % (self.destim)
+ if not hdrver:
+ message += " * Image %s already has headerlet " % (fname)
+ message += "with HDRNAME='%s'\n" % (self.hdrname)
+ logger.critical(message)
+ if close_dest:
+ fobj.close()
+
+ def info(self, columns=None, pad=2, maxwidth=None,
+ output=None, clobber=True, quiet=False):
+ """
+ Prints a summary of this headerlet
+ The summary includes:
+ HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
+
+ Parameters
+ ----------
+ columns: list
+ List of headerlet PRIMARY header keywords to report in summary
+ By default (set to None), it will use the default set of keywords
+ defined as the global list DEFAULT_SUMMARY_COLS
+ pad: int
+ Number of padding spaces to put between printed columns
+ [Default: 2]
+ maxwidth: int
+ Maximum column width(not counting padding) for any column in summary
+ By default (set to None), each column's full width will be used
+ output: string (optional)
+ Name of optional output file to record summary. This filename
+ can contain environment variables.
+ [Default: None]
+ clobber: bool
+ If True, will overwrite any previous output file of same name
+ quiet: bool
+ If True, will NOT report info to STDOUT
+
+ """
+ summary_cols, summary_dict = self.summary(columns=columns)
+ print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
+ idcol=None, output=output, clobber=clobber, quiet=quiet)
+
+ def summary(self, columns=None):
+ """
+ Returns a summary of this headerlet as a dictionary
+
+ The summary includes a summary of the distortion model as :
+ HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
+
+ Parameters
+ ----------
+ columns: list
+ List of headerlet PRIMARY header keywords to report in summary
+ By default(set to None), it will use the default set of keywords
+ defined as the global list DEFAULT_SUMMARY_COLS
+
+ Returns
+ -------
+ summary: dict
+ Dictionary of values for summary
+ """
+ if columns is None:
+ summary_cols = DEFAULT_SUMMARY_COLS
+ else:
+ summary_cols = columns
+
+ # Initialize summary dict based on requested columns
+ summary = {}
+ for kw in summary_cols:
+ summary[kw] = copy.deepcopy(COLUMN_DICT)
+
+ # Populate the summary with headerlet values
+ for kw in summary_cols:
+ if kw in self[0].header:
+ val = self[0].header[kw]
+ else:
+ val = 'INDEF'
+ summary[kw]['vals'].append(val)
+ summary[kw]['width'].append(max(len(val), len(kw)))
+
+ return summary_cols, summary
+
+ def hverify(self):
+ """
+ Verify the headerlet file is a valid fits file and has
+ the required Primary Header keywords
+ """
+ self.verify()
+ header = self[0].header
+ assert('DESTIM' in header and header['DESTIM'].strip())
+ assert('HDRNAME' in header and header['HDRNAME'].strip())
+ assert('UPWCSVER' in header)
+
+ def verify_hdrname(self, dest):
+ """
+ Verifies that the headerlet can be applied to the observation
+
+ Reports whether or not this file already has a headerlet with this
+ HDRNAME.
+ """
+ unique = verify_hdrname_is_unique(dest, self.hdrname)
+ logger.debug("verify_hdrname() returned %s"%unique)
+ return unique
+
+ def get_destination_model(self, dest):
+ """
+ Verifies that the headerlet can be applied to the observation
+
+ Determines whether or not the file specifies the same distortion
+ model/reference files.
+ """
+ destim_opened = False
+ if not isinstance(dest, fits.HDUList):
+ destim = fits.open(dest)
+ destim_opened = True
+ else:
+ destim = dest
+ dname = destim[0].header['DISTNAME'] if 'distname' in destim[0].header \
+ else self.build_distname(dest)
+ if destim_opened:
+ destim.close()
+ return dname
+
+ def equal_distmodel(self, dmodel):
+ if dmodel != self[0].header['DISTNAME']:
+ if self.logging:
+ message = """
+ Distortion model in headerlet not the same as destination model
+ Headerlet model : %s
+ Destination model: %s
+ """ % (self[0].header['DISTNAME'], dmodel)
+ logger.critical(message)
+ return False
+ else:
+ return True
+
+ def verify_dest(self, dest, fname):
+ """
+ verifies that the headerlet can be applied to the observation
+
+ DESTIM in the primary header of the headerlet must match ROOTNAME
+ of the science file (or the name of the destination file)
+ """
+ try:
+ if not isinstance(dest, fits.HDUList):
+ droot = fits.getval(dest, 'ROOTNAME')
+ else:
+ droot = dest[0].header['ROOTNAME']
+ except KeyError:
+ logger.debug("Keyword 'ROOTNAME' not found in destination file")
+ droot = dest.split('.fits')[0]
+ if droot == self.destim:
+ logger.debug("verify_destim() returned True")
+ return True
+ else:
+ logger.debug("verify_destim() returned False")
+ logger.critical("Destination name does not match headerlet. "
+ "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+ return False
+
+ def build_distname(self, dest):
+ """
+ Builds the DISTNAME for dest based on reference file names.
+ """
+
+ try:
+ npolfile = dest[0].header['NPOLFILE']
+ except KeyError:
+ npolfile = None
+ try:
+ d2imfile = dest[0].header['D2IMFILE']
+ except KeyError:
+ d2imfile = None
+
+ sipname, idctab = utils.build_sipname(dest, dest, None)
+ npolname, npolfile = utils.build_npolname(dest, npolfile)
+ d2imname, d2imfile = utils.build_d2imname(dest, d2imfile)
+ dname = utils.build_distname(sipname,npolname,d2imname)
+ return dname
+
+ def tofile(self, fname, destim=None, hdrname=None, clobber=False):
+ """
+ Write this headerlet to a file
+
+ Parameters
+ ----------
+ fname: string
+ file name
+ destim: string (optional)
+ provide a value for DESTIM keyword
+ hdrname: string (optional)
+ provide a value for HDRNAME keyword
+ clobber: boolean
+ a flag which allows to overwrte an existing file
+ """
+ if not destim or not hdrname:
+ self.hverify()
+ self.writeto(fname, clobber=clobber)
+
+ def _del_dest_WCS(self, dest, ext=None):
+ """
+ Delete the WCS of a science file extension
+ """
+
+ logger.info("Deleting all WCSs of file %s" % dest.filename())
+ numext = len(dest)
+
+ if ext:
+ fext = dest[ext]
+ self._remove_d2im(fext)
+ self._remove_sip(fext)
+ self._remove_lut(fext)
+ self._remove_primary_WCS(fext)
+ self._remove_idc_coeffs(fext)
+ self._remove_fit_values(fext)
+ else:
+ for idx in range(numext):
+ # Only delete WCS from extensions which may have WCS keywords
+ if ('XTENSION' in dest[idx].header and
+ dest[idx].header['XTENSION'] == 'IMAGE'):
+ self._remove_d2im(dest[idx])
+ self._remove_sip(dest[idx])
+ self._remove_lut(dest[idx])
+ self._remove_primary_WCS(dest[idx])
+ self._remove_idc_coeffs(dest[idx])
+ self._remove_fit_values(dest[idx])
+ self._remove_ref_files(dest[0])
+ """
+ if not ext:
+ self._remove_alt_WCS(dest, ext=range(numext))
+ else:
+ self._remove_alt_WCS(dest, ext=ext)
+ """
+ def _del_dest_WCS_ext(self, dest):
+ numwdvarr = countExtn(dest, 'WCSDVARR')
+ numd2im = countExtn(dest, 'D2IMARR')
+ if numwdvarr > 0:
+ for idx in range(1, numwdvarr + 1):
+ del dest[('WCSDVARR', idx)]
+ if numd2im > 0:
+ for idx in range(1, numd2im + 1):
+ del dest[('D2IMARR', idx)]
+
+ def _remove_ref_files(self, phdu):
+ """
+ phdu: Primary HDU
+ """
+ refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE', 'SIPNAME', 'DISTNAME']
+ for kw in refkw:
+ try:
+ del phdu.header[kw]
+ except KeyError:
+ pass
+
+ def _remove_fit_values(self, ext):
+ """
+ Remove the any existing astrometric fit values from a FITS extension
+ """
+
+ logger.debug("Removing astrometric fit values from (%s, %s)"%
+ (ext.name, ext.ver))
+ dkeys = altwcs.wcskeys(ext.header)
+ if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values
+ for fitkw in ['NMATCH', 'CATALOG']:
+ for k in dkeys:
+ fkw = (fitkw+k).rstrip()
+ if fkw in ext.header:
+ del ext.header[fkw]
+
+ def _remove_sip(self, ext):
+ """
+ Remove the SIP distortion of a FITS extension
+ """
+
+ logger.debug("Removing SIP distortion from (%s, %s)"
+ % (ext.name, ext.ver))
+ for prefix in ['A', 'B', 'AP', 'BP']:
+ try:
+ order = ext.header[prefix + '_ORDER']
+ del ext.header[prefix + '_ORDER']
+ except KeyError:
+ continue
+ for i in range(order + 1):
+ for j in range(order + 1):
+ key = prefix + '_%d_%d' % (i, j)
+ try:
+ del ext.header[key]
+ except KeyError:
+ pass
+ try:
+ del ext.header['IDCTAB']
+ except KeyError:
+ pass
+
+ def _remove_lut(self, ext):
+ """
+ Remove the Lookup Table distortion of a FITS extension
+ """
+
+ logger.debug("Removing LUT distortion from (%s, %s)"
+ % (ext.name, ext.ver))
+ try:
+ cpdis = ext.header['CPDIS*']
+ except KeyError:
+ return
+ try:
+ for c in range(1, len(cpdis) + 1):
+ del ext.header['DP%s*...' % c]
+ del ext.header[cpdis.cards[c - 1].keyword]
+ del ext.header['CPERR*']
+ del ext.header['NPOLFILE']
+ del ext.header['NPOLEXT']
+ except KeyError:
+ pass
+
+ def _remove_d2im(self, ext):
+ """
+ Remove the Detector to Image correction of a FITS extension
+ """
+
+ logger.debug("Removing D2IM correction from (%s, %s)"
+ % (ext.name, ext.ver))
+ try:
+ d2imdis = ext.header['D2IMDIS*']
+ except KeyError:
+ return
+ try:
+ for c in range(1, len(d2imdis) + 1):
+ del ext.header['D2IM%s*...' % c]
+ del ext.header[d2imdis.cards[c - 1].keyword]
+ del ext.header['D2IMERR*']
+ del ext.header['D2IMFILE']
+ del ext.header['D2IMEXT']
+ except KeyError:
+ pass
+
+ def _remove_alt_WCS(self, dest, ext):
+ """
+ Remove Alternate WCSs of a FITS extension.
+ A WCS with wcskey 'O' is never deleted.
+ """
+ dkeys = altwcs.wcskeys(dest[('SCI', 1)].header)
+ for val in ['O', '', ' ']:
+ if val in dkeys:
+ dkeys.remove(val) # Never delete WCS with wcskey='O'
+
+ logger.debug("Removing alternate WCSs with keys %s from %s"
+ % (dkeys, dest.filename()))
+ for k in dkeys:
+ altwcs.deleteWCS(dest, ext=ext, wcskey=k)
+
+ def _remove_primary_WCS(self, ext):
+ """
+ Remove the primary WCS of a FITS extension
+ """
+
+ logger.debug("Removing Primary WCS from (%s, %s)"
+ % (ext.name, ext.ver))
+ naxis = ext.header['NAXIS']
+ for key in basic_wcs:
+ for i in range(1, naxis + 1):
+ try:
+ del ext.header[key + str(i)]
+ except KeyError:
+ pass
+ try:
+ del ext.header['WCSAXES']
+ except KeyError:
+ pass
+ try:
+ del ext.header['WCSNAME']
+ except KeyError:
+ pass
+
+ def _remove_idc_coeffs(self, ext):
+ """
+ Remove IDC coefficients of a FITS extension
+ """
+
+ logger.debug("Removing IDC coefficient from (%s, %s)"
+ % (ext.name, ext.ver))
+ coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
+ for k in coeffs:
+ try:
+ del ext.header[k]
+ except KeyError:
+ pass
+
+@with_logging
+def _idc2hdr(fromhdr, tohdr, towkey=' '):
+ """
+ Copy the IDC (HST specific) keywords from one header to another
+
+ """
+ # save some of the idc coefficients
+ coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
+ for c in coeffs:
+ try:
+ tohdr[c+towkey] = fromhdr[c]
+ logger.debug("Copied %s to header")
+ except KeyError:
+ continue
+
+
+def get_extname_extver_list(fobj, sciext):
+ """
+ Create a list of (EXTNAME, EXTVER) tuples
+
+ Based on sciext keyword (see docstring for create_headerlet)
+ walk throughh the file and convert extensions in `sciext` to
+ valid (EXTNAME, EXTVER) tuples.
+ """
+ extlist = []
+ if isinstance(sciext, int):
+ if sciext == 0:
+ extname = 'PRIMARY'
+ extver = 1
+ else:
+ try:
+ extname = fobj[sciext].header['EXTNAME']
+ except KeyError:
+ extname = ""
+ try:
+ extver = fobj[sciext].header['EXTVER']
+ except KeyError:
+ extver = 1
+ extlist.append((extname, extver))
+ elif isinstance(sciext, str):
+ if sciext == 'PRIMARY':
+ extname = "PRIMARY"
+ extver = 1
+ extlist.append((extname, extver))
+ else:
+ for ext in fobj:
+ try:
+ extname = ext.header['EXTNAME']
+ except KeyError:
+ continue
+ if extname.upper() == sciext.upper():
+ try:
+ extver = ext.header['EXTVER']
+ except KeyError:
+ extver = 1
+ extlist.append((extname, extver))
+ elif isinstance(sciext, list):
+ if isinstance(sciext[0], int):
+ for i in sciext:
+ try:
+ extname = fobj[i].header['EXTNAME']
+ except KeyError:
+ if i == 0:
+ extname = "PRIMARY"
+ extver = 1
+ else:
+ extname = ""
+ try:
+ extver = fobj[i].header['EXTVER']
+ except KeyError:
+ extver = 1
+ extlist.append((extname, extver))
+ else:
+ extlist = sciext[:]
+ else:
+ errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
+ " a valid EXTNAME string, or an integer."
+ logger.critical(errstr)
+ raise ValueError
+ return extlist
+
+
+class HeaderletHDU(fits.hdu.nonstandard.FitsHDU):
+ """
+ A non-standard extension HDU for encapsulating Headerlets in a file. These
+ HDUs have an extension type of HDRLET and their EXTNAME is derived from the
+ Headerlet's HDRNAME.
+
+ The data itself is a FITS file embedded within the HDU data. The file name
+ is derived from the HDRNAME keyword, and should be in the form
+ `<HDRNAME>_hdr.fits`. If the COMPRESS keyword evaluates to `True`, the tar
+ file is compressed with gzip compression.
+
+ The structure of this HDU is the same as that proposed for the 'FITS'
+ extension type proposed here:
+ http://listmgr.cv.nrao.edu/pipermail/fitsbits/2002-April/thread.html
+
+ The Headerlet contained in the HDU's data can be accessed by the
+ `headerlet` attribute.
+ """
+
+ _extension = 'HDRLET'
+
+ @lazyproperty
+ def headerlet(self):
+ """Return the encapsulated headerlet as a Headerlet object.
+
+ This is similar to the hdulist property inherited from the FitsHDU
+ class, though the hdulist property returns a normal HDUList object.
+ """
+
+ return Headerlet(self.hdulist)
+
+ @classmethod
+ def fromheaderlet(cls, headerlet, compress=False):
+ """
+ Creates a new HeaderletHDU from a given Headerlet object.
+
+ Parameters
+ ----------
+ headerlet : `Headerlet`
+ A valid Headerlet object.
+
+ compress : bool, optional
+ Gzip compress the headerlet data.
+
+ Returns
+ -------
+ hlet : `HeaderletHDU`
+ A `HeaderletHDU` object for the given `Headerlet` that can be
+ attached as an extension to an existing `HDUList`.
+ """
+
+ # TODO: Perhaps check that the given object is in fact a valid
+ # Headerlet
+ hlet = cls.fromhdulist(headerlet, compress)
+
+ # Add some more headerlet-specific keywords to the header
+ phdu = headerlet[0]
+
+ if 'SIPNAME' in phdu.header:
+ sipname = phdu.header['SIPNAME']
+ else:
+ sipname = phdu.header['WCSNAME']
+
+ hlet.header['HDRNAME'] = (phdu.header['HDRNAME'],
+ phdu.header.comments['HDRNAME'])
+ hlet.header['DATE'] = (phdu.header['DATE'],
+ phdu.header.comments['DATE'])
+ hlet.header['SIPNAME'] = (sipname, 'SIP distortion model name')
+ hlet.header['WCSNAME'] = (phdu.header['WCSNAME'], 'WCS name')
+ hlet.header['DISTNAME'] = (phdu.header['DISTNAME'],
+ 'Distortion model name')
+ hlet.header['NPOLFILE'] = (phdu.header['NPOLFILE'],
+ phdu.header.comments['NPOLFILE'])
+ hlet.header['D2IMFILE'] = (phdu.header['D2IMFILE'],
+ phdu.header.comments['D2IMFILE'])
+ hlet.header['EXTNAME'] = (cls._extension, 'Extension name')
+
+ return hlet
+
+
+fits.register_hdu(HeaderletHDU)