summaryrefslogtreecommitdiff
path: root/lib/stwcs/wcsutil/headerlet.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stwcs/wcsutil/headerlet.py')
-rw-r--r--lib/stwcs/wcsutil/headerlet.py2754
1 files changed, 0 insertions, 2754 deletions
diff --git a/lib/stwcs/wcsutil/headerlet.py b/lib/stwcs/wcsutil/headerlet.py
deleted file mode 100644
index c0dd9b0..0000000
--- a/lib/stwcs/wcsutil/headerlet.py
+++ /dev/null
@@ -1,2754 +0,0 @@
-"""
-This module implements headerlets.
-
-A headerlet serves as a mechanism for encapsulating WCS information
-which can be used to update the WCS solution of an image. The idea
-came up first from the desire for passing improved astrometric
-solutions for HST data and provide those solutions in a manner
-that would not require getting entirely new images from the archive
-when only the WCS information has been updated.
-
-"""
-
-from __future__ import absolute_import, division, print_function
-import os
-import sys
-import functools
-import logging
-import textwrap
-import copy
-import time
-
-import numpy as np
-from astropy.io import fits
-#import pywcs
-from astropy import wcs as pywcs
-from astropy.utils import lazyproperty
-
-from stsci.tools.fileutil import countExtn
-from stsci.tools import fileutil as fu
-from stsci.tools import parseinput
-
-from stwcs.updatewcs import utils
-from . import altwcs
-from . import wcscorr
-from .hstwcs import HSTWCS
-from .mappings import basic_wcs
-
-#### Logging support functions
-class FuncNameLoggingFormatter(logging.Formatter):
- def __init__(self, fmt=None, datefmt=None):
- if '%(funcName)s' not in fmt:
- fmt = '%(funcName)s' + fmt
- logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt)
-
- def format(self, record):
- record = copy.copy(record)
- if hasattr(record, 'funcName') and record.funcName == 'init_logging':
- record.funcName = ''
- else:
- record.funcName += ' '
- return logging.Formatter.format(self, record)
-
-
-logger = logging.getLogger(__name__)
-formatter = FuncNameLoggingFormatter("%(levelname)s: %(message)s")
-ch = logging.StreamHandler()
-ch.setFormatter(formatter)
-ch.setLevel(logging.CRITICAL)
-logger.addHandler(ch)
-logger.setLevel(logging.DEBUG)
-
-FITS_STD_KW = ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT',
- 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN',
- 'INHERIT', 'DATE', 'IRAF-TLM']
-
-DEFAULT_SUMMARY_COLS = ['HDRNAME', 'WCSNAME', 'DISTNAME', 'AUTHOR', 'DATE',
- 'SIPNAME', 'NPOLFILE', 'D2IMFILE', 'DESCRIP']
-COLUMN_DICT = {'vals': [], 'width': []}
-COLUMN_FMT = '{:<{width}}'
-
-
-def init_logging(funcname=None, level=100, mode='w', **kwargs):
- """
-
- Initialize logging for a function
-
- Parameters
- ----------
- funcname: string
- Name of function which will be recorded in log
- level: int, or bool, or string
- int or string : Logging level
- bool: False - switch off logging
- Text logging level for the message ("DEBUG", "INFO",
- "WARNING", "ERROR", "CRITICAL")
- mode: 'w' or 'a'
- attach to logfile ('a' or start a new logfile ('w')
-
- """
- for hndl in logger.handlers:
- if isinstance(hndl, logging.FileHandler):
- has_file_handler = True
- else:
- has_file_handler = False
- if level:
- if not has_file_handler:
- logname = 'headerlet.log'
- fh = logging.FileHandler(logname, mode=mode)
- fh.setFormatter(formatter)
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
- logger.info("%s: Starting %s with arguments:\n\t %s" %
- (time.asctime(), funcname, kwargs))
-
-
-def with_logging(func):
- @functools.wraps(func)
- def wrapped(*args, **kw):
- level = kw.get('logging', 100)
- mode = kw.get('logmode', 'w')
- func_args = kw.copy()
- if sys.version_info[0] >= 3:
- argnames = func.__code__.co_varnames
- else:
- argnames = func.func_code.co_varnames
-
- for argname, arg in zip(argnames, args):
- func_args[argname] = arg
-
- init_logging(func.__name__, level, mode, **func_args)
- return func(*args, **kw)
- return wrapped
-
-#### Utility functions
-def is_par_blank(par):
- return par in ['', ' ', 'INDEF', "None", None]
-
-def parse_filename(fname, mode='readonly'):
- """
- Interprets the input as either a filename of a file that needs to be opened
- or a PyFITS object.
-
- Parameters
- ----------
- fname : str, `astropy.io.fits.HDUList`
- Input pointing to a file or `astropy.io.fits.HDUList` object.
- An input filename (str) will be expanded as necessary to
- interpret any environmental variables
- included in the filename.
-
- mode : string
- Specifies what mode to use when opening the file, if it needs
- to open the file at all [Default: 'readonly']
-
- Returns
- -------
- fobj : `astropy.io.fits.HDUList`
- FITS file handle for input
-
- fname : str
- Name of input file
-
- close_fobj : bool
- Flag specifying whether or not fobj needs to be closed since it was
- opened by this function. This allows a program to know whether they
- need to worry about closing the FITS object as opposed to letting
- the higher level interface close the object.
-
- """
- close_fobj = False
- if not isinstance(fname, list):
- if sys.version_info[0] >= 3:
- is_string = isinstance(fname, str)
- else:
- is_string = isinstance(fname, basestring)
- if is_string:
- fname = fu.osfn(fname)
- fobj = fits.open(fname, mode=mode)
- close_fobj = True
- else:
- fobj = fname
- if hasattr(fobj, 'filename'):
- fname = fobj.filename()
- else:
- fname = ''
- return fobj, fname, close_fobj
-
-def get_headerlet_kw_names(fobj, kw='HDRNAME'):
- """
- Returns a list of specified keywords from all HeaderletHDU
- extensions in a science file.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- kw : str
- Name of keyword to be read and reported
- """
-
- fobj, fname, open_fobj = parse_filename(fobj)
-
- hdrnames = []
- for ext in fobj:
- if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
- hdrnames.append(ext.header[kw])
-
- if open_fobj:
- fobj.close()
-
- return hdrnames
-
-def get_header_kw_vals(hdr, kwname, kwval, default=0):
- if kwval is None:
- if kwname in hdr:
- kwval = hdr[kwname]
- else:
- kwval = default
- return kwval
-
-@with_logging
-def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
- strict=True, logging=False, logmode='w'):
- """
- Returns all HeaderletHDU extensions in a science file that matches
- the inputs specified by the user. If no hdrext, hdrname or distname are
- specified, this function will return a list of all HeaderletHDU objects.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- Name of FITS file or open fits object (`astropy.io.fits.HDUList` instance)
- hdrext : int, tuple or None
- index number(EXTVER) or extension tuple of HeaderletHDU to be returned
- hdrname : string
- value of HDRNAME for HeaderletHDU to be returned
- distname : string
- value of DISTNAME for HeaderletHDUs to be returned
- strict : bool [Default: True]
- Specifies whether or not at least one parameter needs to be provided
- If False, all extension indices returned if hdrext, hdrname and distname
- are all None. If True and hdrext, hdrname, and distname are all None,
- raise an Exception requiring one to be specified.
- logging : boolean
- enable logging to a file called headerlet.log
- logmode : 'w' or 'a'
- log file open mode
-
- Returns
- -------
- hdrlets : list
- A list of all matching HeaderletHDU extension indices (could be just one)
-
- """
-
- get_all = False
- if hdrext is None and hdrname is None and distname is None:
- if not strict:
- get_all = True
- else:
- mess = """\n
- =====================================================
- No valid Headerlet extension specified.
- Either "hdrname", "hdrext", or "distname" needs to be specified.
- =====================================================
- """
- logger.critical(mess)
- raise ValueError
-
- fobj, fname, open_fobj = parse_filename(fobj)
-
- hdrlets = []
- if hdrext is not None and isinstance(hdrext, int):
- if hdrext in range(len(fobj)): # insure specified hdrext is in fobj
- if isinstance(fobj[hdrext], fits.hdu.base.NonstandardExtHDU) and \
- fobj[hdrext].header['EXTNAME'] == 'HDRLET':
- hdrlets.append(hdrext)
- else:
- for ext in fobj:
- if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
- if get_all:
- hdrlets.append(fobj.index(ext))
- else:
- if hdrext is not None:
- if isinstance(hdrext, tuple):
- hdrextname = hdrext[0]
- hdrextnum = hdrext[1]
- else:
- hdrextname = 'HDRLET'
- hdrextnum = hdrext
- hdrext_match = ((hdrext is not None) and
- (hdrextnum == ext.header['EXTVER']) and
- (hdrextname == ext.header['EXTNAME']))
- hdrname_match = ((hdrname is not None) and
- (hdrname == ext.header['HDRNAME']))
- distname_match = ((distname is not None) and
- (distname == ext.header['DISTNAME']))
- if hdrext_match or hdrname_match or distname_match:
- hdrlets.append(fobj.index(ext))
-
- if open_fobj:
- fobj.close()
-
- if len(hdrlets) == 0:
- if hdrname:
- kwerr = 'hdrname'
- kwval = hdrname
- elif hdrext:
- kwerr = 'hdrext'
- kwval = hdrext
- else:
- kwerr = 'distname'
- kwval = distname
- message = """\n
- =====================================================
- No valid Headerlet extension found!'
- "%s" = %s not found in %s.' % (kwerr, kwval, fname)
- =====================================================
- """
- logger.critical(message)
- raise ValueError
-
- return hdrlets
-
-def verify_hdrname_is_unique(fobj, hdrname):
- """
- Verifies that no other HeaderletHDU extension has the specified hdrname.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- Name of FITS file or open fits file object
- hdrname : str
- value of HDRNAME for HeaderletHDU to be compared as unique
-
- Returns
- -------
- unique: bool
- If True, no other HeaderletHDU has the specified HDRNAME value
- """
- hdrnames_list = get_headerlet_kw_names(fobj)
- unique = not(hdrname in hdrnames_list)
-
- return unique
-
-def update_versions(sourcehdr, desthdr):
- """
- Update keywords which store version numbers
- """
- phdukw = {'PYWCSVER': 'Version of PYWCS used to updated the WCS',
- 'UPWCSVER': 'Version of STWCS used to updated the WCS'
- }
- for key in phdukw:
- try:
- desthdr[key] = (sourcehdr[key], sourcehdr.comments[key])
- except KeyError:
- desthdr[key] = (" ", phdukw[key])
-
-def update_ref_files(source, dest):
- """
- Update the reference files name in the primary header of 'dest'
- using values from 'source'
-
- Parameters
- ----------
- source : `astropy.io.fits.Header`
- dest : `astropy.io.fits.Header`
- """
- logger.info("Updating reference files")
- phdukw = {'NPOLFILE': True,
- 'IDCTAB': True,
- 'D2IMFILE': True,
- 'SIPNAME': True,
- 'DISTNAME': True}
-
- for key in phdukw:
- try:
- try:
- del dest[key]
- except:
- pass
- dest.set(key, source[key], source.comments[key])
- except KeyError:
- phdukw[key] = False
- return phdukw
-
-def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
- output=None, clobber=True, quiet=False ):
- """
- Print out summary dictionary to STDOUT, and possibly an output file
-
- """
- nrows = None
- if idcol:
- nrows = len(idcol['vals'])
-
- # Find max width of each column
- column_widths = {}
- for kw in summary_dict:
- colwidth = np.array(summary_dict[kw]['width']).max()
- if maxwidth:
- colwidth = min(colwidth, maxwidth)
- column_widths[kw] = colwidth + pad
- if nrows is None:
- nrows = len(summary_dict[kw]['vals'])
-
- # print rows now
- outstr = ''
- # Start with column names
- if idcol:
- outstr += COLUMN_FMT.format(idcol['name'], width=idcol['width'] + pad)
- for kw in summary_cols:
- outstr += COLUMN_FMT.format(kw, width=column_widths[kw])
- outstr += '\n'
- # Now, add a row for each headerlet
- for row in range(nrows):
- if idcol:
- outstr += COLUMN_FMT.format(idcol['vals'][row],
- width=idcol['width']+pad)
- for kw in summary_cols:
- val = summary_dict[kw]['vals'][row][:(column_widths[kw]-pad)]
- outstr += COLUMN_FMT.format(val, width=column_widths[kw])
- outstr += '\n'
- if not quiet:
- print(outstr)
-
- # If specified, write info to separate text file
- write_file = False
- if output:
- output = fu.osfn(output) # Expand any environment variables in filename
- write_file = True
- if os.path.exists(output):
- if clobber:
- os.remove(output)
- else:
- print('WARNING: Not writing results to file!')
- print(' Output text file ', output, ' already exists.')
- print(' Set "clobber" to True or move file before trying again.')
- write_file = False
- if write_file:
- fout = open(output, mode='w')
- fout.write(outstr)
- fout.close()
-
-#### Private utility functions
-def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
- sipname, npolfile, d2imfile,
- nmatch,catalog, wcskey,
- author, descrip, history):
- # convert input values into valid FITS kw values
- if author is None:
- author = ''
- if descrip is None:
- descrip = ''
-
- sipname, idctab = utils.build_sipname(fobj, fname, sipname)
- logger.info("Setting sipname value to %s" % sipname)
-
- npolname, npolfile = utils.build_npolname(fobj, npolfile)
- logger.info("Setting npolfile value to %s" % npolname)
-
- d2imname, d2imfile = utils.build_d2imname(fobj,d2imfile)
- logger.info("Setting d2imfile value to %s" % d2imname)
-
- distname = utils.build_distname(sipname, npolname, d2imname)
- logger.info("Setting distname to %s" % distname)
-
- # open file and parse comments
- if history not in ['', ' ', None, 'INDEF'] and os.path.isfile(history):
- f = open(fu.osfn(history))
- history = f.readlines()
- f.close()
- else:
- history = ''
-
- rms_ra = fobj[wcsext].header.get("CRDER1"+wcskey, 0)
- rms_dec = fobj[wcsext].header.get("CRDER2"+wcskey, 0)
- if not nmatch:
- nmatch = fobj[wcsext].header.get("NMATCH"+wcskey, 0)
- if not catalog:
- catalog = fobj[wcsext].header.get('CATALOG'+wcskey, "")
- # get the version of STWCS used to create the WCS of the science file.
- #try:
- #upwcsver = fobj[0].header.cards[fobj[0].header.index('UPWCSVER')]
- #except KeyError:
- #upwcsver = pyfits.Card("UPWCSVER", " ",
- #"Version of STWCS used to update the WCS")
- #try:
- #pywcsver = fobj[0].header.cards[fobj[0].header.index('PYWCSVER')]
- #except KeyError:
- #pywcsver = pyfits.Card("PYWCSVER", " ",
- #"Version of PYWCS used to update the WCS")
- upwcsver = fobj[0].header.get('UPWCSVER', "")
- pywcsver = fobj[0].header.get('PYWCSVER', "")
- # build Primary HDU
- phdu = fits.PrimaryHDU()
- phdu.header['DESTIM'] = (destim, 'Destination observation root name')
- phdu.header['HDRNAME'] = (hdrname, 'Headerlet name')
- fmt = "%Y-%m-%dT%H:%M:%S"
- phdu.header['DATE'] = (time.strftime(fmt), 'Date FITS file was generated')
- phdu.header['WCSNAME'] = (wcsname, 'WCS name')
- phdu.header['DISTNAME'] = (distname, 'Distortion model name')
- phdu.header['SIPNAME'] = (sipname,
- 'origin of SIP polynomial distortion model')
- phdu.header['NPOLFILE'] = (npolfile,
- 'origin of non-polynmial distortion model')
- phdu.header['D2IMFILE'] = (d2imfile,
- 'origin of detector to image correction')
- phdu.header['IDCTAB'] = (idctab,
- 'origin of Polynomial Distortion')
- phdu.header['AUTHOR'] = (author, 'headerlet created by this user')
- phdu.header['DESCRIP'] = (descrip,
- 'Short description of headerlet solution')
- phdu.header['RMS_RA'] = (rms_ra,
- 'RMS in RA at ref pix of headerlet solution')
- phdu.header['RMS_DEC'] = (rms_dec,
- 'RMS in Dec at ref pix of headerlet solution')
- phdu.header['NMATCH'] = (nmatch,
- 'Number of sources used for headerlet solution')
- phdu.header['CATALOG'] = (catalog,
- 'Astrometric catalog used for headerlet '
- 'solution')
- phdu.header['UPWCSVER'] = (upwcsver, "Version of STWCS used to update the WCS")
- phdu.header['PYWCSVER'] = (pywcsver, "Version of PYWCS used to update the WCS")
-
- # clean up history string in order to remove whitespace characters that
- # would cause problems with FITS
- if isinstance(history, list):
- history_str = ''
- for line in history:
- history_str += line
- else:
- history_str = history
- history_lines = textwrap.wrap(history_str, width=70)
- for hline in history_lines:
- phdu.header.add_history(hline)
-
- return phdu
-
-
-#### Public Interface functions
-@with_logging
-def extract_headerlet(filename, output, extnum=None, hdrname=None,
- clobber=False, logging=True):
- """
- Finds a headerlet extension in a science file and writes it out as
- a headerlet FITS file.
-
- If both hdrname and extnum are given they should match, if not
- raise an Exception
-
- Parameters
- ----------
- filename: string or HDUList or Python list
- This specifies the name(s) of science file(s) from which headerlets
- will be extracted.
-
- String input formats supported include use of wild-cards, IRAF-style
- '@'-files (given as '@<filename>') and comma-separated list of names.
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- If a list of filenames has been specified, it will extract a
- headerlet from the same extnum from all filenames.
- output: string
- Filename or just rootname of output headerlet FITS file
- If string does not contain '.fits', it will create a filename with
- '_hlet.fits' suffix
- extnum: int
- Extension number which contains the headerlet to be written out
- hdrname: string
- Unique name for headerlet, stored as the HDRNAME keyword
- It stops if a value is not provided and no extnum has been specified
- clobber: bool
- If output file already exists, this parameter specifies whether or not
- to overwrite that file [Default: False]
- logging: boolean
- enable logging to a file
-
- """
-
- if isinstance(filename, fits.HDUList):
- filename = [filename]
- else:
- filename, oname = parseinput.parseinput(filename)
-
- for f in filename:
- fobj, fname, close_fobj = parse_filename(f)
- frootname = fu.buildNewRootname(fname)
- if hdrname in ['', ' ', None, 'INDEF'] and extnum is None:
- if close_fobj:
- fobj.close()
- logger.critical("Expected a valid extnum or hdrname parameter")
- raise ValueError
- if hdrname is not None:
- extn_from_hdrname = find_headerlet_HDUs(fobj, hdrname=hdrname)[0]
- if extn_from_hdrname != extnum:
- logger.critical("hdrname and extnmu should refer to the same FITS extension")
- raise ValueError
- else:
- hdrhdu = fobj[extn_from_hdrname]
- else:
- hdrhdu = fobj[extnum]
-
- if not isinstance(hdrhdu, HeaderletHDU):
- logger.critical("Specified extension is not a headerlet")
- raise ValueError
-
- hdrlet = hdrhdu.headerlet
-
- if output is None:
- output = frootname
-
- if '.fits' in output:
- outname = output
- else:
- outname = '%s_hlet.fits' % output
-
- hdrlet.tofile(outname, clobber=clobber)
-
- if close_fobj:
- fobj.close()
-
-
-@with_logging
-def write_headerlet(filename, hdrname, output=None, sciext='SCI',
- wcsname=None, wcskey=None, destim=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- attach=True, clobber=False, logging=False):
-
- """
- Save a WCS as a headerlet FITS file.
-
- This function will create a headerlet, write out the headerlet to a
- separate headerlet file, then, optionally, attach it as an extension
- to the science image (if it has not already been archived)
-
- Either wcsname or wcskey must be provided; if both are given, they must
- match a valid WCS.
-
- Updates wcscorr if necessary.
-
- Parameters
- ----------
- filename: string or HDUList or Python list
- This specifies the name(s) of science file(s) from which headerlets
- will be created and written out.
- String input formats supported include use of wild-cards, IRAF-style
- '@'-files (given as '@<filename>') and comma-separated list of names.
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string
- Unique name for this headerlet, stored as HDRNAME keyword
- output: string or None
- Filename or just rootname of output headerlet FITS file
- If string does not contain '.fits', it will create a filename
- starting with the science filename and ending with '_hlet.fits'.
- If None, a default filename based on the input filename will be
- generated for the headerlet FITS filename
- sciext: string
- name (EXTNAME) of extension that contains WCS to be saved
- wcsname: string
- name of WCS to be archived, if " ": stop
- wcskey: one of A...Z or " " or "PRIMARY"
- if " " or "PRIMARY" - archive the primary WCS
- destim: string
- DESTIM keyword
- if NOne, use ROOTNAME or science file name
- sipname: string or None (default)
- Name of unique file where the polynomial distortion coefficients were
- read from. If None, the behavior is:
- The code looks for a keyword 'SIPNAME' in the science header
- If not found, for HST it defaults to 'IDCTAB'
- If there is no SIP model the value is 'NOMODEL'
- If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
- npolfile: string or None (default)
- Name of a unique file where the non-polynomial distortion was stored.
- If None:
- The code looks for 'NPOLFILE' in science header.
- If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
- If npol model exists, it is set to 'UNKNOWN'
- d2imfile: string
- Name of a unique file where the detector to image correction was
- stored. If None:
- The code looks for 'D2IMFILE' in the science header.
- If 'D2IMFILE' is not found and there is no d2im correction,
- it is set to 'NOMODEL'
- If d2im correction exists, but 'D2IMFILE' is missing from science
- header, it is set to 'UNKNOWN'
- author: string
- Name of user who created the headerlet, added as 'AUTHOR' keyword
- to headerlet PRIMARY header
- descrip: string
- Short description of the solution provided by the headerlet
- This description will be added as the single 'DESCRIP' keyword
- to the headerlet PRIMARY header
- history: filename, string or list of strings
- Long (possibly multi-line) description of the solution provided
- by the headerlet. These comments will be added as 'HISTORY' cards
- to the headerlet PRIMARY header
- If filename is specified, it will format and attach all text from
- that file as the history.
- attach: bool
- Specify whether or not to attach this headerlet as a new extension
- It will verify that no other headerlet extension has been created with
- the same 'hdrname' value.
- clobber: bool
- If output file already exists, this parameter specifies whether or not
- to overwrite that file [Default: False]
- logging: boolean
- enable file logging
- """
-
- if isinstance(filename, fits.HDUList):
- filename = [filename]
- else:
- filename, oname = parseinput.parseinput(filename)
-
- for f in filename:
- if isinstance(f, str):
- fname = f
- else:
- fname = f.filename()
-
- if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
- message = """\n
- No valid WCS found found in %s.
- A valid value for either "wcsname" or "wcskey"
- needs to be specified.
- """ % fname
- logger.critical(message)
- raise ValueError
-
- # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
- if wcskey == 'PRIMARY':
- wcskey = ' '
-
- if attach:
- umode = 'update'
- else:
- umode = 'readonly'
-
- fobj, fname, close_fobj = parse_filename(f, mode=umode)
-
- # Interpret sciext input for this file
- if isinstance(sciext, int):
- sciextlist = [sciext] # allow for specification of simple FITS header
- elif isinstance(sciext, str):
- numsciext = countExtn(fobj, sciext)
- if numsciext > 0:
- sciextlist = [tuple((sciext,i)) for i in range(1, numsciext+1)]
- else:
- sciextlist = [0]
- elif isinstance(sciext, list):
- sciextlist = sciext
- else:
- errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
- " a valid EXTNAME string, or an integer."
- logger.critical(errstr)
- raise ValueError
-
- wnames = altwcs.wcsnames(fobj,ext=sciextlist[0])
-
- # Insure that WCSCORR table has been created with all original
- # WCS's recorded prior to adding the headerlet WCS
- wcscorr.init_wcscorr(fobj)
-
- if wcsname is None:
- scihdr = fobj[sciextlist[0]].header
- wname = scihdr['wcsname'+wcskey]
- else:
- wname = wcsname
- if hdrname in [None, ' ', '']:
- hdrname = wcsname
-
- logger.critical('Creating the headerlet from image %s' % fname)
- hdrletobj = create_headerlet(fobj, sciext=sciextlist,
- wcsname=wname, wcskey=wcskey,
- hdrname=hdrname,
- sipname=sipname, npolfile=npolfile,
- d2imfile=d2imfile, author=author,
- descrip=descrip, history=history,
- nmatch=nmatch, catalog=catalog,
- logging=False)
-
- if attach:
- # Check to see whether or not a HeaderletHDU with
- #this hdrname already exists
- hdrnames = get_headerlet_kw_names(fobj)
- if hdrname not in hdrnames:
- hdrlet_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
-
- if destim is not None:
- hdrlet_hdu.header['destim'] = destim
-
- fobj.append(hdrlet_hdu)
-
- # Update the WCSCORR table with new rows from the headerlet's WCSs
- wcscorr.update_wcscorr(fobj, source=hdrletobj,
- extname='SIPWCS', wcs_id=wname)
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- else:
- message = """
- Headerlet with hdrname %s already archived for WCS %s.
- No new headerlet appended to %s.
- """ % (hdrname, wname, fname)
- logger.critical(message)
-
- if close_fobj:
- logger.info('Closing image in write_headerlet()...')
- fobj.close()
-
- frootname = fu.buildNewRootname(fname)
-
- if output is None:
- # Generate default filename for headerlet FITS file
- outname = '{0}_hlet.fits'.format(frootname)
- else:
- outname = output
-
- if not outname.endswith('.fits'):
- outname = '{0}_{1}_hlet.fits'.format(frootname,outname)
-
- # If user specifies an output filename for headerlet, write it out
- hdrletobj.tofile(outname, clobber=clobber)
- logger.critical( 'Created Headerlet file %s ' % outname)
-
- del hdrletobj
-
-@with_logging
-def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
- wcskey=" ", wcsname=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- logging=False, logmode='w'):
- """
- Create a headerlet from a WCS in a science file
- If both wcskey and wcsname are given they should match, if not
- raise an Exception
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- sciext: string or python list (default: 'SCI')
- Extension in which the science data with the linear WCS is.
- The headerlet will be created from these extensions.
- If string - a valid EXTNAME is expected
- If int - specifies an extension with a valid WCS, such as 0 for a
- simple FITS file
- If list - a list of FITS extension numbers or strings representing
- extension tuples, e.g. ('SCI, 1') is expected.
- hdrname: string
- value of HDRNAME keyword
- Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey>
- It stops if neither is found in the science file and a value is not provided
- destim: string or None
- name of file this headerlet can be applied to
- if None, use ROOTNAME keyword
- wcskey: char (A...Z) or " " or "PRIMARY" or None
- a char representing an alternate WCS to be used for the headerlet
- if " ", use the primary (default)
- if None use wcsname
- wcsname: string or None
- if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet
- sipname: string or None (default)
- Name of unique file where the polynomial distortion coefficients were
- read from. If None, the behavior is:
- The code looks for a keyword 'SIPNAME' in the science header
- If not found, for HST it defaults to 'IDCTAB'
- If there is no SIP model the value is 'NOMODEL'
- If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
- npolfile: string or None (default)
- Name of a unique file where the non-polynomial distortion was stored.
- If None:
- The code looks for 'NPOLFILE' in science header.
- If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
- If npol model exists, it is set to 'UNKNOWN'
- d2imfile: string
- Name of a unique file where the detector to image correction was
- If None:
- The code looks for 'D2IMFILE' in the science header.
- If 'D2IMFILE' is not found and there is no d2im correction,
- it is set to 'NOMODEL'
- If d2im correction exists, but 'D2IMFILE' is missing from science
- header, it is set to 'UNKNOWN'
- author: string
- Name of user who created the headerlet, added as 'AUTHOR' keyword
- to headerlet PRIMARY header
- descrip: string
- Short description of the solution provided by the headerlet
- This description will be added as the single 'DESCRIP' keyword
- to the headerlet PRIMARY header
- history: filename, string or list of strings
- Long (possibly multi-line) description of the solution provided
- by the headerlet. These comments will be added as 'HISTORY' cards
- to the headerlet PRIMARY header
- If filename is specified, it will format and attach all text from
- that file as the history.
- nmatch: int (optional)
- Number of sources used in the new solution fit
- catalog: string (optional)
- Astrometric catalog used for headerlet solution
- logging: boolean
- enable file logging
- logmode: 'w' or 'a'
- log file open mode
-
- Returns
- -------
- Headerlet object
-
- """
- if wcskey == 'O':
- message = "Warning: 'O' is a reserved key for the original WCS. Quitting..."
- logger.info(message)
- return None
-
- fobj, fname, close_file = parse_filename(filename)
- # based on `sciext` create a list of (EXTNAME, EXTVER) tuples
- # of extensions with WCS to be saved in a headerlet
- sciext = get_extname_extver_list(fobj, sciext)
- logger.debug("Data extensions from which to create headerlet:\n\t %s"
- % (str(sciext)))
- if not sciext:
- logger.critical("No valid target extensions found in file %s" % fname)
- raise ValueError
-
- # Define extension to evaluate for verification of input parameters
- wcsext = sciext[0]
- logger.debug("sciext in create_headerlet is %s" % str(sciext))
- # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
- if wcskey == 'PRIMARY':
- wcskey = ' '
- logger.info("wcskey reset from 'PRIMARY' to ' '")
- wcskey = wcskey.upper()
- wcsnamekw = "".join(["WCSNAME", wcskey.upper()]).rstrip()
- hdrnamekw = "".join(["HDRNAME", wcskey.upper()]).rstrip()
-
- wnames = altwcs.wcsnames(fobj, ext=wcsext)
- if not wcsname:
- # User did not specify a value for 'wcsname'
- if wcsnamekw in fobj[wcsext].header:
- #check if there's a WCSNAME for this wcskey in the header
- wcsname = fobj[wcsext].header[wcsnamekw]
- logger.info("Setting wcsname from header[%s] to %s" % (wcsnamekw, wcsname))
- else:
- if hdrname not in ['', ' ', None, "INDEF"]:
- """
- If wcsname for this wcskey was not provided
- and WCSNAME<wcskey> does not exist in the header
- and hdrname is provided, then
- use hdrname as WCSNAME for the headerlet.
- """
- wcsname = hdrname
- logger.debug("Setting wcsname from hdrname to %s" % hdrname)
- else:
- if hdrnamekw in fobj[wcsext].header:
- wcsname = fobj[wcsext].header[hdrnamekw]
- logger.debug("Setting wcsname from header[%s] to %s" % (hdrnamekw, wcsname))
- else:
- message = """
- Required keywords 'HDRNAME' or 'WCSNAME' not found!
- Please specify a value for parameter 'hdrname'
- or update header with 'WCSNAME' keyword.
- """
- logger.critical(message)
- raise KeyError
- else:
- # Verify that 'wcsname' and 'wcskey' values specified by user reference
- # the same WCS
- wname = fobj[wcsext].header[wcsnamekw]
- if wcsname != wname:
- message = "\tInconsistent values for 'wcskey' and 'wcsname' specified!\n"
- message += " 'wcskey' = %s and 'wcsname' = %s. \n" % (wcskey, wcsname)
- message += "Actual value of %s found to be %s. \n" % (wcsnamekw, wname)
- logger.critical(message)
- raise KeyError
- wkeys = altwcs.wcskeys(fobj, ext=wcsext)
- if wcskey != ' ':
- if wcskey not in wkeys:
- logger.critical('No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext)))
- raise ValueError("No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))")
-
- # get remaining required keywords
- if destim is None:
- if 'ROOTNAME' in fobj[0].header:
- destim = fobj[0].header['ROOTNAME']
- logger.info("Setting destim to rootname of the science file")
- else:
- destim = fname
- logger.info('DESTIM not provided')
- logger.info('Keyword "ROOTNAME" not found')
- logger.info('Using file name as DESTIM')
-
- if not hdrname:
- # check if HDRNAME<wcskey> is in header
- if hdrnamekw in fobj[wcsext].header:
- hdrname = fobj[wcsext].header[hdrnamekw]
- else:
- if wcsnamekw in fobj[wcsext].header:
- hdrname = fobj[wcsext].header[wcsnamekw]
- message = """
- Using default value for HDRNAME of "%s" derived from %s.
- """ % (hdrname, wcsnamekw)
- logger.info(message)
- logger.info("Setting hdrname to %s from header[%s]"
- % (hdrname, wcsnamekw))
- else:
- message = "Required keywords 'HDRNAME' or 'WCSNAME' not found"
- logger.critical(message)
- raise KeyError
-
-
-
- hdul = []
- phdu = _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
- sipname, npolfile, d2imfile,
- nmatch, catalog, wcskey,
- author, descrip, history)
- hdul.append(phdu)
- wcsdvarr_extns = []
- """
- nd2i is a counter for d2i extensions to be used when the science file
- has an old d2i correction format. The old format did not write EXTVER
- kw for the d2i correction in the science header bu tthe new format expects
- them.
- """
- nd2i_extver = 1
- for ext in sciext:
- wkeys = altwcs.wcskeys(fobj, ext=ext)
- if wcskey != ' ':
- if wcskey not in wkeys:
- logger.debug(
- 'No WCS with wcskey=%s found in extension %s. '
- 'Skipping...' % (wcskey, str(ext)))
- raise ValueError("")
-
- hwcs = HSTWCS(fobj, ext=ext, wcskey=wcskey)
-
- whdul = hwcs.to_fits(relax=True, key=" ")
- if hasattr(hwcs, 'orientat'):
- orient_comment = "positions angle of image y axis (deg. e of n)"
- whdul[0].header['ORIENTAT'] = (hwcs.orientat, orient_comment)
-
- whdul[0].header.append(('TG_ENAME', ext[0], 'Target science data extname'))
- whdul[0].header.append(('TG_EVER', ext[1], 'Target science data extver'))
-
- if hwcs.wcs.has_cd():
- whdul[0].header = altwcs.pc2cd(whdul[0].header)
-
- idckw = hwcs._idc2hdr()
- whdul[0].header.extend(idckw)
-
- if hwcs.det2im1 or hwcs.det2im2:
- try:
- whdul[0].header.append(fobj[ext].header.cards['D2IMEXT'])
- except KeyError:
- pass
- whdul[0].header.extend(fobj[ext].header.cards['D2IMERR*'])
- if 'D2IM1.EXTVER' in whdul[0].header:
- try:
- whdul[0].header['D2IM1.EXTVER'] = fobj[ext].header['D2IM1.EXTVER']
- except KeyError:
- whdul[0].header['D2IM1.EXTVER'] = nd2i_extver
- nd2i_extver += 1
- if 'D2IM2.EXTVER' in whdul[0].header:
- try:
- whdul[0].header['D2IM2.EXTVER'] = fobj[ext].header['D2IM2.EXTVER']
- except KeyError:
- whdul[0].header['D2IM2.EXTVER'] = nd2i_extver
- nd2i_extver += 1
-
- if hwcs.cpdis1 or hwcs.cpdis2:
- whdul[0].header.extend(fobj[ext].header.cards['CPERR*'])
- try:
- whdul[0].header.append(fobj[ext].header.cards['NPOLEXT'])
- except KeyError:
- pass
- if 'DP1.EXTVER' in whdul[0].header:
- whdul[0].header['DP1.EXTVER'] = fobj[ext].header['DP1.EXTVER']
- if 'DP2.EXTVER' in whdul[0].header:
- whdul[0].header['DP2.EXTVER'] = fobj[ext].header['DP2.EXTVER']
- ihdu = fits.ImageHDU(header=whdul[0].header, name='SIPWCS')
-
- if ext[0] != "PRIMARY":
- ihdu.update_ext_version(fobj[ext].header['EXTVER'], comment='Extension version')
-
- hdul.append(ihdu)
-
- if hwcs.cpdis1:
- whdu = whdul[('WCSDVARR', 1)].copy()
- whdu.update_ext_version(fobj[ext].header['DP1.EXTVER'])
- hdul.append(whdu)
- if hwcs.cpdis2:
- whdu = whdul[('WCSDVARR', 2)].copy()
- whdu.update_ext_version(fobj[ext].header['DP2.EXTVER'])
- hdul.append(whdu)
-
- if hwcs.det2im1:
- whdu = whdul[('D2IMARR', 1)].copy()
- whdu.update_ext_version(ihdu.header['D2IM1.EXTVER'])
- hdul.append(whdu)
- if hwcs.det2im2:
- whdu = whdul[('D2IMARR', 2)].copy()
- whdu.update_ext_version(ihdu.header['D2IM2.EXTVER'])
- hdul.append(whdu)
-
-
- #if hwcs.det2im1 or hwcs.det2im2:
- #try:
- #darr = hdul[('D2IMARR', 1)]
- #except KeyError:
- #whdu = whdul[('D2IMARR')]
- #whdu.update_ext_version(1)
- #hdul.append(whdu)
- if close_file:
- fobj.close()
-
- hlet = Headerlet(hdul, logging=logging, logmode='a')
- hlet.init_attrs()
- return hlet
-
-@with_logging
-def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True,
- force=False, logging=False, logmode='a'):
- """
- Apply headerlet 'hdrfile' to a science observation 'destfile' as the primary WCS
-
- Parameters
- ----------
- filename: string or list of strings
- File name(s) of science observation whose WCS solution will be updated
- hdrlet: string or list of strings
- Headerlet file(s), must match 1-to-1 with input filename(s)
- attach: boolean
- True (default): append headerlet to FITS file as a new extension.
- archive: boolean
- True (default): before updating, create a headerlet with the
- WCS old solution.
- force: boolean
- If True, this will cause the headerlet to replace the current PRIMARY
- WCS even if it has a different distortion model. [Default: False]
- logging: boolean
- enable file logging
- logmode: 'w' or 'a'
- log file open mode
- """
- if not isinstance(filename, list):
- filename = [filename]
- if not isinstance(hdrlet, list):
- hdrlet = [hdrlet]
- if len(hdrlet) != len(filename):
- logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
-
- for fname,h in zip(filename,hdrlet):
- print("Applying {0} as Primary WCS to {1}".format(h,fname))
- hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.apply_as_primary(fname, attach=attach, archive=archive,
- force=force)
-
-
-@with_logging
-def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None,
- wcsname=None, logging=False, logmode='w'):
- """
- Apply headerlet to a science observation as an alternate WCS
-
- Parameters
- ----------
- filename: string or list of strings
- File name(s) of science observation whose WCS solution will be updated
- hdrlet: string or list of strings
- Headerlet file(s), must match 1-to-1 with input filename(s)
- attach: boolean
- flag indicating if the headerlet should be attached as a
- HeaderletHDU to fobj. If True checks that HDRNAME is unique
- in the fobj and stops if not.
- wcskey: string
- Key value (A-Z, except O) for this alternate WCS
- If None, the next available key will be used
- wcsname: string
- Name to be assigned to this alternate WCS
- WCSNAME is a required keyword in a Headerlet but this allows the
- user to change it as desired.
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- if not isinstance(filename, list):
- filename = [filename]
- if not isinstance(hdrlet, list):
- hdrlet = [hdrlet]
- if len(hdrlet) != len(filename):
- logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
-
- for fname,h in zip(filename,hdrlet):
- print('Applying {0} as an alternate WCS to {1}'.format(h,fname))
- hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.apply_as_alternate(fname, attach=attach,
- wcsname=wcsname, wcskey=wcskey)
-
-
-@with_logging
-def attach_headerlet(filename, hdrlet, logging=False, logmode='a'):
- """
- Attach Headerlet as an HeaderletHDU to a science file
-
- Parameters
- ----------
- filename: HDUList or list of HDULists
- science file(s) to which the headerlet should be applied
- hdrlet: string, Headerlet object or list of strings or Headerlet objects
- string representing a headerlet file(s), must match 1-to-1 input filename(s)
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- if not isinstance(filename, list):
- filename = [filename]
- if not isinstance(hdrlet, list):
- hdrlet = [hdrlet]
- if len(hdrlet) != len(filename):
- logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
-
- for fname,h in zip(filename,hdrlet):
- print('Attaching {0} as Headerlet extension to {1}'.format(h,fname))
- hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.attach_to_file(fname,archive=True)
-
-
-@with_logging
-def delete_headerlet(filename, hdrname=None, hdrext=None, distname=None,
- logging=False, logmode='w'):
- """
- Deletes HeaderletHDU(s) with same HDRNAME from science files
-
- Notes
- -----
- One of hdrname, hdrext or distname should be given.
- If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
- If hdrext is given - delete HeaderletHDU in extension.
- If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
- Updates wcscorr
-
- Parameters
- ----------
- filename: string, HDUList or list of strings
- Filename can be specified as a single filename or HDUList, or
- a list of filenames
- Each input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string or None
- HeaderletHDU primary header keyword HDRNAME
- hdrext: int, tuple or None
- HeaderletHDU FITS extension number
- tuple has the form ('HDRLET', 1)
- distname: string or None
- distortion model as specified in the DISTNAME keyword
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- if not isinstance(filename, list):
- filename = [filename]
-
- for f in filename:
- print("Deleting Headerlet from ",f)
- _delete_single_headerlet(f, hdrname=hdrname, hdrext=hdrext,
- distname=distname, logging=logging, logmode='a')
-
-def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None,
- logging=False, logmode='w'):
- """
- Deletes HeaderletHDU(s) from a SINGLE science file
-
- Notes
- -----
- One of hdrname, hdrext or distname should be given.
- If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
- If hdrext is given - delete HeaderletHDU in extension.
- If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
- Updates wcscorr
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string or None
- HeaderletHDU primary header keyword HDRNAME
- hdrext: int, tuple or None
- HeaderletHDU FITS extension number
- tuple has the form ('HDRLET', 1)
- distname: string or None
- distortion model as specified in the DISTNAME keyword
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- hdrlet_ind = find_headerlet_HDUs(filename, hdrname=hdrname, hdrext=hdrext,
- distname=distname, logging=logging, logmode='a')
- if len(hdrlet_ind) == 0:
- message = """
- No HDUs deleted... No Headerlet HDUs found with '
- hdrname = %s
- hdrext = %s
- distname = %s
- Please review input parameters and try again.
- """ % (hdrname, str(hdrext), distname)
- logger.critical(message)
- return
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- # delete row(s) from WCSCORR table now...
- #
- #
- if hdrname not in ['', ' ', None, 'INDEF']:
- selections = {'hdrname': hdrname}
- elif hdrname in ['', ' ', None, 'INDEF'] and hdrext is not None:
- selections = {'hdrname': fobj[hdrext].header['hdrname']}
- else:
- selections = {'distname': distname}
- wcscorr.delete_wcscorr_row(fobj['WCSCORR'].data, selections)
-
- # delete the headerlet extension now
- for hdrind in hdrlet_ind:
- del fobj[hdrind]
-
- utils.updateNEXTENDKw(fobj)
- # Update file object with changes
- fobj.flush()
- # close file, if was opened by this function
- if close_fobj:
- fobj.close()
- logger.critical('Deleted headerlet from extension(s) %s ' % str(hdrlet_ind))
-
-
-def headerlet_summary(filename, columns=None, pad=2, maxwidth=None,
- output=None, clobber=True, quiet=False):
- """
-
- Print a summary of all HeaderletHDUs in a science file to STDOUT, and
- optionally to a text file
- The summary includes:
- HDRLET_ext_number HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- columns: list
- List of headerlet PRIMARY header keywords to report in summary
- By default (set to None), it will use the default set of keywords
- defined as the global list DEFAULT_SUMMARY_COLS
- pad: int
- Number of padding spaces to put between printed columns
- [Default: 2]
- maxwidth: int
- Maximum column width(not counting padding) for any column in summary
- By default (set to None), each column's full width will be used
- output: string (optional)
- Name of optional output file to record summary. This filename
- can contain environment variables.
- [Default: None]
- clobber: bool
- If True, will overwrite any previous output file of same name
- quiet: bool
- If True, will NOT report info to STDOUT
-
- """
- if columns is None:
- summary_cols = DEFAULT_SUMMARY_COLS
- else:
- summary_cols = columns
-
- summary_dict = {}
- for kw in summary_cols:
- summary_dict[kw] = copy.deepcopy(COLUMN_DICT)
-
- # Define Extension number column
- extnums_col = copy.deepcopy(COLUMN_DICT)
- extnums_col['name'] = 'EXTN'
- extnums_col['width'] = 6
-
- fobj, fname, close_fobj = parse_filename(filename)
- # find all HDRLET extensions and combine info into a single summary
- for extn in fobj:
- if 'extname' in extn.header and extn.header['extname'] == 'HDRLET':
- hdrlet_indx = fobj.index_of(('hdrlet', extn.header['extver']))
- try:
- ext_cols, ext_summary = extn.headerlet.summary(columns=summary_cols)
- extnums_col['vals'].append(hdrlet_indx)
- for kw in summary_cols:
- for key in COLUMN_DICT:
- summary_dict[kw][key].extend(ext_summary[kw][key])
- except:
- print("Skipping headerlet")
- print("Could not read Headerlet from extension ", hdrlet_indx)
-
- if close_fobj:
- fobj.close()
-
- # Print out the summary dictionary
- print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
- idcol=extnums_col, output=output,
- clobber=clobber, quiet=quiet)
-
-
-@with_logging
-def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
- force=False, logging=False, logmode='w'):
- """
- Restores a headerlet as a primary WCS
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string
- HDRNAME keyword of HeaderletHDU
- hdrext: int or tuple
- Headerlet extension number of tuple ('HDRLET',2)
- archive: boolean (default: True)
- When the distortion model in the headerlet is the same as the distortion model of
- the science file, this flag indicates if the primary WCS should be saved as an alternate
- nd a headerlet extension.
- When the distortion models do not match this flag indicates if the current primary and
- alternate WCSs should be archived as headerlet extensions and alternate WCS.
- force: boolean (default:False)
- When the distortion models of the headerlet and the primary do not match, and archive
- is False, this flag forces an update of the primary.
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
-
- hdrlet_ind = find_headerlet_HDUs(filename, hdrext=hdrext, hdrname=hdrname)
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- if len(hdrlet_ind) > 1:
- if hdrext:
- kwerr = 'hdrext'
- kwval = hdrext
- else:
- kwerr = 'hdrname'
- kwval = hdrname
- message = """
- Multiple Headerlet extensions found with the same name.
- %d Headerlets with "%s" = %s found in %s.
- """% (len(hdrlet_ind), kwerr, kwval, fname)
- if close_fobj:
- fobj.close()
- logger.critical(message)
- raise ValueError
-
- hdrlet_indx = hdrlet_ind[0]
-
- # read headerlet from HeaderletHDU into memory
- if hasattr(fobj[hdrlet_ind[0]], 'hdulist'):
- hdrlet = fobj[hdrlet_indx].hdulist
- else:
- hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS
-
- # read in the names of the extensions which HeaderletHDU updates
- extlist = []
- for ext in hdrlet:
- if 'extname' in ext.header and ext.header['extname'] == 'SIPWCS':
- # convert from string to tuple or int
- sciext = eval(ext.header['sciext'])
- extlist.append(fobj[sciext])
- # determine whether distortion is the same
- current_distname = hdrlet[0].header['distname']
- same_dist = True
- if current_distname != fobj[0].header['distname']:
- same_dist = False
- if not archive and not force:
- if close_fobj:
- fobj.close()
- message = """
- Headerlet does not have the same distortion as image!
- Set "archive"=True to save old distortion model, or
- set "force"=True to overwrite old model with new.
- """
- logger.critical(message)
- raise ValueError
-
- # check whether primary WCS has been archived already
- # Use information from first 'SCI' extension
- priwcs_name = None
-
- scihdr = extlist[0].header
- sci_wcsnames = altwcs.wcsnames(scihdr).values()
- if 'hdrname' in scihdr:
- priwcs_hdrname = scihdr['hdrname']
- else:
- if 'wcsname' in scihdr:
- priwcs_hdrname = priwcs_name = scihdr['wcsname']
- else:
- if 'idctab' in scihdr:
- priwcs_hdrname = ''.join(['IDC_',
- utils.extract_rootname(scihdr['idctab'], suffix='_idc')])
- else:
- priwcs_hdrname = 'UNKNOWN'
- priwcs_name = priwcs_hdrname
- scihdr['WCSNAME'] = priwcs_name
-
- priwcs_unique = verify_hdrname_is_unique(fobj, priwcs_hdrname)
- if archive and priwcs_unique:
- if priwcs_unique:
- newhdrlet = create_headerlet(fobj, sciext=scihdr['extname'],
- hdrname=priwcs_hdrname)
- newhdrlet.attach_to_file(fobj)
- #
- # copy hdrlet as a primary
- #
- hdrlet.apply_as_primary(fobj, attach=False, archive=archive, force=force)
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- if close_fobj:
- fobj.close()
-
-
-@with_logging
-def restore_all_with_distname(filename, distname, primary, archive=True,
- sciext='SCI', logging=False, logmode='w'):
- """
- Restores all HeaderletHDUs with a given distortion model as alternate WCSs and a primary
-
- Parameters
- --------------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- distname: string
- distortion model as represented by a DISTNAME keyword
- primary: int or string or None
- HeaderletHDU to be restored as primary
- if int - a fits extension
- if string - HDRNAME
- if None - use first HeaderletHDU
- archive: boolean (default True)
- flag indicating if HeaderletHDUs should be created from the
- primary and alternate WCSs in fname before restoring all matching
- headerlet extensions
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- hdrlet_ind = find_headerlet_HDUs(fobj, distname=distname)
- if len(hdrlet_ind) == 0:
- message = """
- No Headerlet extensions found with
-
- DISTNAME = %s in %s.
-
- For a full list of DISTNAMEs found in all headerlet extensions:
-
- get_headerlet_kw_names(fobj, kw='DISTNAME')
- """ % (distname, fname)
- if close_fobj:
- fobj.close()
- logger.critical(message)
- raise ValueError
-
- # Interpret 'primary' parameter input into extension number
- if primary is None:
- primary_ind = hdrlet_ind[0]
- elif isinstance(primary, int):
- primary_ind = primary
- else:
- primary_ind = None
- for ind in hdrlet_ind:
- if fobj[ind].header['hdrname'] == primary:
- primary_ind = ind
- break
- if primary_ind is None:
- if close_fobj:
- fobj.close()
- message = """
- No Headerlet extensions found with DISTNAME = %s in %s.
- """ % (primary, fname)
- logger.critical(message)
- raise ValueError
- # Check to see whether 'primary' HeaderletHDU has same distname as user
- # specified on input
-
- # read headerlet from HeaderletHDU into memory
- if hasattr(fobj[primary_ind], 'hdulist'):
- primary_hdrlet = fobj[primary_ind].hdulist
- else:
- primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS
- pri_distname = primary_hdrlet[0].header['distname']
- if pri_distname != distname:
- if close_fobj:
- fobj.close()
- message = """
- Headerlet extension to be used as PRIMARY WCS
- has "DISTNAME" = %s
- "DISTNAME" = %s was specified on input.
- All updated WCSs must have same DISTNAME. Quitting...'
- """ % (pri_distname, distname)
- logger.critical(message)
- raise ValueError
-
- # read in the names of the WCSs which the HeaderletHDUs will update
- wnames = altwcs.wcsnames(fobj[sciext, 1].header)
-
- # work out how many HeaderletHDUs will be used to update the WCSs
- numhlt = len(hdrlet_ind)
- hdrnames = get_headerlet_kw_names(fobj, kw='wcsname')
-
- # read in headerletHDUs and update WCS keywords
- for hlet in hdrlet_ind:
- if fobj[hlet].header['distname'] == distname:
- if hasattr(fobj[hlet], 'hdulist'):
- hdrlet = fobj[hlet].hdulist
- else:
- hdrlet = fobj[hlet].headerlet # older convention in PyFITS
- if hlet == primary_ind:
- hdrlet.apply_as_primary(fobj, attach=False,
- archive=archive, force=True)
- else:
- hdrlet.apply_as_alternate(fobj, attach=False,
- wcsname=hdrlet[0].header['wcsname'])
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- if close_fobj:
- fobj.close()
-
-
-@with_logging
-def archive_as_headerlet(filename, hdrname, sciext='SCI',
- wcsname=None, wcskey=None, destim=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- logging=False, logmode='w'):
- """
- Save a WCS as a headerlet extension and write it out to a file.
-
- This function will create a headerlet, attach it as an extension to the
- science image (if it has not already been archived) then, optionally,
- write out the headerlet to a separate headerlet file.
-
- Either wcsname or wcskey must be provided, if both are given, they must match a valid WCS
- Updates wcscorr if necessary.
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string
- Unique name for this headerlet, stored as HDRNAME keyword
- sciext: string
- name (EXTNAME) of extension that contains WCS to be saved
- wcsname: string
- name of WCS to be archived, if " ": stop
- wcskey: one of A...Z or " " or "PRIMARY"
- if " " or "PRIMARY" - archive the primary WCS
- destim: string
- DESTIM keyword
- if NOne, use ROOTNAME or science file name
- sipname: string or None (default)
- Name of unique file where the polynomial distortion coefficients were
- read from. If None, the behavior is:
- The code looks for a keyword 'SIPNAME' in the science header
- If not found, for HST it defaults to 'IDCTAB'
- If there is no SIP model the value is 'NOMODEL'
- If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
- npolfile: string or None (default)
- Name of a unique file where the non-polynomial distortion was stored.
- If None:
- The code looks for 'NPOLFILE' in science header.
- If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
- If npol model exists, it is set to 'UNKNOWN'
- d2imfile: string
- Name of a unique file where the detector to image correction was
- stored. If None:
- The code looks for 'D2IMFILE' in the science header.
- If 'D2IMFILE' is not found and there is no d2im correction,
- it is set to 'NOMODEL'
- If d2im correction exists, but 'D2IMFILE' is missing from science
- header, it is set to 'UNKNOWN'
- author: string
- Name of user who created the headerlet, added as 'AUTHOR' keyword
- to headerlet PRIMARY header
- descrip: string
- Short description of the solution provided by the headerlet
- This description will be added as the single 'DESCRIP' keyword
- to the headerlet PRIMARY header
- history: filename, string or list of strings
- Long (possibly multi-line) description of the solution provided
- by the headerlet. These comments will be added as 'HISTORY' cards
- to the headerlet PRIMARY header
- If filename is specified, it will format and attach all text from
- that file as the history.
- logging: boolean
- enable file folling
- logmode: 'w' or 'a'
- log file open mode
- """
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
- message = """
- No valid WCS found found in %s.
- A valid value for either "wcsname" or "wcskey"
- needs to be specified.
- """ % fname
- if close_fobj:
- fobj.close()
- logger.critical(message)
- raise ValueError
-
- # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
- if wcskey == 'PRIMARY':
- wcskey = ' '
- wcskey = wcskey.upper()
-
- numhlt = countExtn(fobj, 'HDRLET')
-
- if wcsname is None:
- scihdr = fobj[sciext, 1].header
- wcsname = scihdr['wcsname'+wcskey]
-
- if hdrname in [None, ' ', '']:
- hdrname = wcsname
-
- # Check to see whether or not a HeaderletHDU with this hdrname already
- # exists
- hdrnames = get_headerlet_kw_names(fobj)
- if hdrname not in hdrnames:
- hdrletobj = create_headerlet(fobj, sciext=sciext,
- wcsname=wcsname, wcskey=wcskey,
- hdrname=hdrname,
- sipname=sipname, npolfile=npolfile,
- d2imfile=d2imfile, author=author,
- descrip=descrip, history=history,
- nmatch=nmatch, catalog=catalog,
- logging=False)
- hlt_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
-
- if destim is not None:
- hlt_hdu[0].header['destim'] = destim
-
- fobj.append(hlt_hdu)
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- else:
- message = """
- Headerlet with hdrname %s already archived for WCS %s
- No new headerlet appended to %s .
- """ % (hdrname, wcsname, fname)
- logger.critical(message)
-
- if close_fobj:
- fobj.close()
-
-#### Headerlet Class definitions
-class Headerlet(fits.HDUList):
- """
- A Headerlet class
- Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets
- """
-
- def __init__(self, hdus=[], file=None, logging=False, logmode='w'):
- """
- Parameters
- ----------
- hdus : list
- List of HDUs to be used to create the headerlet object itself
- file: string
- File-like object from which HDUs should be read
- logging: boolean
- enable file logging
- logmode: 'w' or 'a'
- for internal use only, indicates whether the log file
- should be open in attach or write mode
- """
- self.logging = logging
- init_logging('class Headerlet', level=logging, mode=logmode)
-
- super(Headerlet, self).__init__(hdus, file=file)
-
- def init_attrs(self):
- self.fname = self.filename()
- self.hdrname = self[0].header["HDRNAME"]
- self.wcsname = self[0].header["WCSNAME"]
- self.upwcsver = self[0].header.get("UPWCSVER", "")
- self.pywcsver = self[0].header.get("PYWCSVER", "")
- self.destim = self[0].header["DESTIM"]
- self.sipname = self[0].header["SIPNAME"]
- self.idctab = self[0].header["IDCTAB"]
- self.npolfile = self[0].header["NPOLFILE"]
- self.d2imfile = self[0].header["D2IMFILE"]
- self.distname = self[0].header["DISTNAME"]
-
- try:
- self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) #None instead of 1?
- except (IndexError, KeyError):
- self.vafactor = self[0].header.get("VAFACTOR", 1) #None instead of 1?
- self.author = self[0].header["AUTHOR"]
- self.descrip = self[0].header["DESCRIP"]
-
- self.fit_kws = ['HDRNAME', 'NMATCH', 'CATALOG']
- self.history = ''
- # header['HISTORY'] returns an iterable of all HISTORY values
- if 'HISTORY' in self[0].header:
- for hist in self[0].header['HISTORY']:
- self.history += hist + '\n'
-
- self.d2imerr = 0
- self.axiscorr = 1
-
- # Overridden to support the Headerlet logging features
- @classmethod
- def fromfile(cls, fileobj, mode='readonly', memmap=False,
- save_backup=False, logging=False, logmode='w', **kwargs):
- hlet = super(cls, cls).fromfile(fileobj, mode, memmap, save_backup,
- **kwargs)
- if len(hlet) > 0:
- hlet.init_attrs()
- hlet.logging = logging
- init_logging('class Headerlet', level=logging, mode=logmode)
- return hlet
-
- @classmethod
- def fromstring(cls, data, **kwargs):
- hlet = super(cls, cls).fromstring(data, **kwargs)
- hlet.logging = logging
- init_logging('class Headerlet', level=logging, mode=logmode)
- return hlet
-
- def apply_as_primary(self, fobj, attach=True, archive=True, force=False):
- """
- Copy this headerlet as a primary WCS to fobj
-
- Parameters
- ----------
- fobj: string, HDUList
- science file to which the headerlet should be applied
- attach: boolean
- flag indicating if the headerlet should be attached as a
- HeaderletHDU to fobj. If True checks that HDRNAME is unique
- in the fobj and stops if not.
- archive: boolean (default is True)
- When the distortion model in the headerlet is the same as the
- distortion model of the science file, this flag indicates if
- the primary WCS should be saved as an alternate and a headerlet
- extension.
- When the distortion models do not match this flag indicates if
- the current primary and alternate WCSs should be archived as
- headerlet extensions and alternate WCS.
- force: boolean (default is False)
- When the distortion models of the headerlet and the primary do
- not match, and archive is False this flag forces an update
- of the primary
- """
- self.hverify()
- fobj, fname, close_dest = parse_filename(fobj, mode='update')
- if not self.verify_dest(fobj, fname):
- if close_dest:
- fobj.close()
- raise ValueError("Destination name does not match headerlet"
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
-
- # Check to see whether the distortion model in the destination
- # matches the distortion model in the headerlet being applied
-
- dname = self.get_destination_model(fobj)
- dist_models_equal = self.equal_distmodel(dname)
- if not dist_models_equal and not force:
- raise ValueError("Distortion models do not match"
- " To overwrite the distortion model, set force=True")
-
- orig_hlt_hdu = None
- numhlt = countExtn(fobj, 'HDRLET')
- hdrlet_extnames = get_headerlet_kw_names(fobj)
-
- # Insure that WCSCORR table has been created with all original
- # WCS's recorded prior to adding the headerlet WCS
- wcscorr.init_wcscorr(fobj)
-
-
- ### start archive
- # If archive has been specified
- # regardless of whether or not the distortion models are equal...
-
- numsip = countExtn(self, 'SIPWCS')
- sciext_list = []
- alt_hlethdu = []
- for i in range(1, numsip+1):
- sipheader = self[('SIPWCS', i)].header
- sciext_list.append((sipheader['TG_ENAME'], sipheader['TG_EVER']))
- target_ext = sciext_list[0]
- if archive:
- if 'wcsname' in fobj[target_ext].header:
- hdrname = fobj[target_ext].header['WCSNAME']
- wcsname = hdrname
- else:
- hdrname = fobj[0].header['ROOTNAME'] + '_orig'
- wcsname = None
- if hdrname not in hdrlet_extnames:
- # - if WCS has not been saved, write out WCS as headerlet extension
- # Create a headerlet for the original Primary WCS data in the file,
- # create an HDU from the original headerlet, and append it to
- # the file
- orig_hlt = create_headerlet(fobj, sciext=sciext_list, #[target_ext],
- wcsname=wcsname,
- hdrname=hdrname,
- logging=self.logging)
- orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt)
- numhlt += 1
- orig_hlt_hdu.header['EXTVER'] = numhlt
- logger.info("Created headerlet %s to be attached to file" % hdrname)
- else:
- logger.info("Headerlet with name %s is already attached" % hdrname)
-
-
- if dist_models_equal:
- # Use the WCSNAME to determine whether or not to archive
- # Primary WCS as altwcs
- # wcsname = hwcs.wcs.name
- scihdr = fobj[target_ext].header
- if 'hdrname' in scihdr:
- priwcs_name = scihdr['hdrname']
- else:
- if 'wcsname' in scihdr:
- priwcs_name = scihdr['wcsname']
- else:
- if 'idctab' in scihdr:
- priwcs_name = ''.join(['IDC_',
- utils.extract_rootname(scihdr['idctab'],
- suffix='_idc')])
- else:
- priwcs_name = 'UNKNOWN'
- nextkey = altwcs.next_wcskey(fobj, ext=target_ext)
- altwcs.archiveWCS(fobj, ext=sciext_list, wcskey=nextkey,
- wcsname=priwcs_name)
- else:
-
- for hname in altwcs.wcsnames(fobj, ext=target_ext).values():
- if hname != 'OPUS' and hname not in hdrlet_extnames:
- # get HeaderletHDU for alternate WCS as well
- alt_hlet = create_headerlet(fobj, sciext=sciext_list,
- wcsname=hname, wcskey=wcskey,
- hdrname=hname, sipname=None,
- npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- logging=self.logging)
- numhlt += 1
- alt_hlet_hdu = HeaderletHDU.fromheaderlet(alt_hlet)
- alt_hlet_hdu.header['EXTVER'] = numhlt
- alt_hlethdu.append(alt_hlet_hdu)
- hdrlet_extnames.append(hname)
-
- self._del_dest_WCS_ext(fobj)
- for i in range(1, numsip+1):
- target_ext = sciext_list[i-1]
- self._del_dest_WCS(fobj, target_ext)
- sipwcs = HSTWCS(self, ('SIPWCS', i))
- idckw = sipwcs._idc2hdr()
- priwcs = sipwcs.to_fits(relax=True)
- numnpol = 1
- numd2im = 1
- if sipwcs.wcs.has_cd():
- priwcs[0].header = altwcs.pc2cd(priwcs[0].header)
- priwcs[0].header.extend(idckw)
- if 'crder1' in sipheader:
- for card in sipheader['crder*'].cards:
- priwcs[0].header.set(card.keyword, card.value, card.comment,
- after='WCSNAME')
- # Update WCS with HDRNAME as well
-
- for kw in ['SIMPLE', 'BITPIX', 'NAXIS', 'EXTEND']:
- try:
- priwcs[0].header.remove(kw)
- except ValueError:
- pass
-
- priwcs[0].header.set('WCSNAME', self[0].header['WCSNAME'], "")
- priwcs[0].header.set('WCSAXES', self[('SIPWCS', i)].header['WCSAXES'], "")
- priwcs[0].header.set('HDRNAME', self[0].header['HDRNAME'], "")
- if sipwcs.det2im1 or sipwcs.det2im2:
- try:
- d2imerr = self[('SIPWCS', i)].header['D2IMERR*']
- priwcs[0].header.extend(d2imerr)
- except KeyError:
- pass
- try:
- priwcs[0].header.append(self[('SIPWCS', i)].header.cards['D2IMEXT'])
- except KeyError:
- pass
- if 'D2IM1.EXTVER' in priwcs[0].header:
- priwcs[0].header['D2IM1.EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
- priwcs[('D2IMARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
- if 'D2IM2.EXTVER' in priwcs[0].header:
- priwcs[0].header['D2IM2.EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
- priwcs[('D2IMARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
- # D2IM1 will NOT exist for WFPC2 data...
- if 'D2IM1.EXTVER' in priwcs[0].header:
- # only set number of D2IM extensions to 2 if D2IM1 exists
- numd2im = 2
-
- if sipwcs.cpdis1 or sipwcs.cpdis2:
- try:
- cperr = self[('SIPWCS', i)].header['CPERR*']
- priwcs[0].header.extend(cperr)
- except KeyError:
- pass
- try:
- priwcs[0].header.append(self[('SIPWCS', i)].header.cards['NPOLEXT'])
- except KeyError:
- pass
- if 'DP1.EXTVER' in priwcs[0].header:
- priwcs[0].header['DP1.EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
- priwcs[('WCSDVARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
- if 'DP2.EXTVER' in priwcs[0].header:
- priwcs[0].header['DP2.EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
- priwcs[('WCSDVARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
- numnpol = 2
-
- fobj[target_ext].header.extend(priwcs[0].header)
- if sipwcs.cpdis1:
- whdu = priwcs[('WCSDVARR', (i-1)*numnpol+1)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['DP1.EXTVER'])
- fobj.append(whdu)
- if sipwcs.cpdis2:
- whdu = priwcs[('WCSDVARR', i*numnpol)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['DP2.EXTVER'])
- fobj.append(whdu)
- if sipwcs.det2im1: #or sipwcs.det2im2:
- whdu = priwcs[('D2IMARR', (i-1)*numd2im+1)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM1.EXTVER'])
- fobj.append(whdu)
- if sipwcs.det2im2:
- whdu = priwcs[('D2IMARR', i*numd2im)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM2.EXTVER'])
- fobj.append(whdu)
-
- update_versions(self[0].header, fobj[0].header)
- refs = update_ref_files(self[0].header, fobj[0].header)
- # Update the WCSCORR table with new rows from the headerlet's WCSs
- wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
-
- # Append the original headerlet
- if archive and orig_hlt_hdu:
- fobj.append(orig_hlt_hdu)
- # Append any alternate WCS Headerlets
- if len(alt_hlethdu) > 0:
- for ahdu in alt_hlethdu:
- fobj.append(ahdu)
- if attach:
- # Finally, append an HDU for this headerlet
- self.attach_to_file(fobj)
- utils.updateNEXTENDKw(fobj)
- if close_dest:
- fobj.close()
-
-
- def apply_as_alternate(self, fobj, attach=True, wcskey=None, wcsname=None):
- """
- Copy this headerlet as an alternate WCS to fobj
-
- Parameters
- ----------
- fobj: string, HDUList
- science file/HDUList to which the headerlet should be applied
- attach: boolean
- flag indicating if the headerlet should be attached as a
- HeaderletHDU to fobj. If True checks that HDRNAME is unique
- in the fobj and stops if not.
- wcskey: string
- Key value (A-Z, except O) for this alternate WCS
- If None, the next available key will be used
- wcsname: string
- Name to be assigned to this alternate WCS
- WCSNAME is a required keyword in a Headerlet but this allows the
- user to change it as desired.
-
- """
- self.hverify()
- fobj, fname, close_dest = parse_filename(fobj, mode='update')
- if not self.verify_dest(fobj, fname):
- if close_dest:
- fobj.close()
- raise ValueError("Destination name does not match headerlet"
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
-
- # Verify whether this headerlet has the same distortion
- #found in the image being updated
- dname = self.get_destination_model(fobj)
- dist_models_equal = self.equal_distmodel(dname)
- if not dist_models_equal:
- raise ValueError("Distortion models do not match \n"
- "Headerlet: %s \n"
- "Destination file: %s\n"
- "attach_to_file() can be used to append this headerlet" %(self.distname, dname))
-
- # Insure that WCSCORR table has been created with all original
- # WCS's recorded prior to adding the headerlet WCS
- wcscorr.init_wcscorr(fobj)
-
- # determine value of WCSNAME to be used
- if wcsname is not None:
- wname = wcsname
- else:
- wname = self[0].header['WCSNAME']
- tg_ename = self[('SIPWCS', 1)].header['TG_ENAME']
- tg_ever = self[('SIPWCS', 1)].header['TG_EVER']
- # determine what alternate WCS this headerlet will be assigned to
- if wcskey is None:
- wkey = altwcs.next_wcskey(fobj[(tg_ename, tg_ever)].header)
- else:
- wcskey = wcskey.upper()
- available_keys = altwcs.available_wcskeys(fobj[(tg_ename, tg_ever)].header)
- if wcskey in available_keys:
- wkey = wcskey
- else:
- mess = "Observation %s already contains alternate WCS with key %s" % (fname, wcskey)
- logger.critical(mess)
- if close_dest:
- fobj.close()
- raise ValueError(mess)
- numsip = countExtn(self, 'SIPWCS')
- for idx in range(1, numsip + 1):
- siphdr = self[('SIPWCS', idx)].header
- tg_ext = (siphdr['TG_ENAME'], siphdr['TG_EVER'])
-
- fhdr = fobj[tg_ext].header
- hwcs = pywcs.WCS(siphdr, self)
- hwcs_header = hwcs.to_header(key=wkey)
- _idc2hdr(siphdr, fhdr, towkey=wkey)
- if hwcs.wcs.has_cd():
- hwcs_header = altwcs.pc2cd(hwcs_header, key=wkey)
-
- fhdr.extend(hwcs_header)
- fhdr['WCSNAME' + wkey] = wname
- # also update with HDRNAME (a non-WCS-standard kw)
- for kw in self.fit_kws:
- #fhdr.insert(wind, pyfits.Card(kw + wkey,
- # self[0].header[kw]))
- fhdr.append(fits.Card(kw + wkey, self[0].header[kw]))
- # Update the WCSCORR table with new rows from the headerlet's WCSs
- wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
-
- if attach:
- self.attach_to_file(fobj)
- utils.updateNEXTENDKw(fobj)
-
- if close_dest:
- fobj.close()
-
- def attach_to_file(self, fobj, archive=False):
- """
- Attach Headerlet as an HeaderletHDU to a science file
-
- Parameters
- ----------
- fobj: string, HDUList
- science file/HDUList to which the headerlet should be applied
- archive: string
- Specifies whether or not to update WCSCORR table when attaching
-
- Notes
- -----
- The algorithm used by this method:
- - verify headerlet can be applied to this file (based on DESTIM)
- - verify that HDRNAME is unique for this file
- - attach as HeaderletHDU to fobj
-
- """
- self.hverify()
- fobj, fname, close_dest = parse_filename(fobj, mode='update')
- destver = self.verify_dest(fobj, fname)
- hdrver = self.verify_hdrname(fobj)
- if destver and hdrver:
-
- numhlt = countExtn(fobj, 'HDRLET')
- new_hlt = HeaderletHDU.fromheaderlet(self)
- new_hlt.header['extver'] = numhlt + 1
- fobj.append(new_hlt)
- utils.updateNEXTENDKw(fobj)
- else:
- message = "Headerlet %s cannot be attached to" % (self.hdrname)
- message += "observation %s" % (fname)
- if not destver:
- message += " * Image %s keyword ROOTNAME not equal to " % (fname)
- message += " DESTIM = '%s'\n" % (self.destim)
- if not hdrver:
- message += " * Image %s already has headerlet " % (fname)
- message += "with HDRNAME='%s'\n" % (self.hdrname)
- logger.critical(message)
- if close_dest:
- fobj.close()
-
- def info(self, columns=None, pad=2, maxwidth=None,
- output=None, clobber=True, quiet=False):
- """
- Prints a summary of this headerlet
- The summary includes:
- HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
-
- Parameters
- ----------
- columns: list
- List of headerlet PRIMARY header keywords to report in summary
- By default (set to None), it will use the default set of keywords
- defined as the global list DEFAULT_SUMMARY_COLS
- pad: int
- Number of padding spaces to put between printed columns
- [Default: 2]
- maxwidth: int
- Maximum column width(not counting padding) for any column in summary
- By default (set to None), each column's full width will be used
- output: string (optional)
- Name of optional output file to record summary. This filename
- can contain environment variables.
- [Default: None]
- clobber: bool
- If True, will overwrite any previous output file of same name
- quiet: bool
- If True, will NOT report info to STDOUT
-
- """
- summary_cols, summary_dict = self.summary(columns=columns)
- print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
- idcol=None, output=output, clobber=clobber, quiet=quiet)
-
- def summary(self, columns=None):
- """
- Returns a summary of this headerlet as a dictionary
-
- The summary includes a summary of the distortion model as :
- HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
-
- Parameters
- ----------
- columns: list
- List of headerlet PRIMARY header keywords to report in summary
- By default(set to None), it will use the default set of keywords
- defined as the global list DEFAULT_SUMMARY_COLS
-
- Returns
- -------
- summary: dict
- Dictionary of values for summary
- """
- if columns is None:
- summary_cols = DEFAULT_SUMMARY_COLS
- else:
- summary_cols = columns
-
- # Initialize summary dict based on requested columns
- summary = {}
- for kw in summary_cols:
- summary[kw] = copy.deepcopy(COLUMN_DICT)
-
- # Populate the summary with headerlet values
- for kw in summary_cols:
- if kw in self[0].header:
- val = self[0].header[kw]
- else:
- val = 'INDEF'
- summary[kw]['vals'].append(val)
- summary[kw]['width'].append(max(len(val), len(kw)))
-
- return summary_cols, summary
-
- def hverify(self):
- """
- Verify the headerlet file is a valid fits file and has
- the required Primary Header keywords
- """
- self.verify()
- header = self[0].header
- assert('DESTIM' in header and header['DESTIM'].strip())
- assert('HDRNAME' in header and header['HDRNAME'].strip())
- assert('UPWCSVER' in header)
-
- def verify_hdrname(self, dest):
- """
- Verifies that the headerlet can be applied to the observation
-
- Reports whether or not this file already has a headerlet with this
- HDRNAME.
- """
- unique = verify_hdrname_is_unique(dest, self.hdrname)
- logger.debug("verify_hdrname() returned %s"%unique)
- return unique
-
- def get_destination_model(self, dest):
- """
- Verifies that the headerlet can be applied to the observation
-
- Determines whether or not the file specifies the same distortion
- model/reference files.
- """
- destim_opened = False
- if not isinstance(dest, fits.HDUList):
- destim = fits.open(dest)
- destim_opened = True
- else:
- destim = dest
- dname = destim[0].header['DISTNAME'] if 'distname' in destim[0].header \
- else self.build_distname(dest)
- if destim_opened:
- destim.close()
- return dname
-
- def equal_distmodel(self, dmodel):
- if dmodel != self[0].header['DISTNAME']:
- if self.logging:
- message = """
- Distortion model in headerlet not the same as destination model
- Headerlet model : %s
- Destination model: %s
- """ % (self[0].header['DISTNAME'], dmodel)
- logger.critical(message)
- return False
- else:
- return True
-
- def verify_dest(self, dest, fname):
- """
- verifies that the headerlet can be applied to the observation
-
- DESTIM in the primary header of the headerlet must match ROOTNAME
- of the science file (or the name of the destination file)
- """
- try:
- if not isinstance(dest, fits.HDUList):
- droot = fits.getval(dest, 'ROOTNAME')
- else:
- droot = dest[0].header['ROOTNAME']
- except KeyError:
- logger.debug("Keyword 'ROOTNAME' not found in destination file")
- droot = dest.split('.fits')[0]
- if droot == self.destim:
- logger.debug("verify_destim() returned True")
- return True
- else:
- logger.debug("verify_destim() returned False")
- logger.critical("Destination name does not match headerlet. "
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
- return False
-
- def build_distname(self, dest):
- """
- Builds the DISTNAME for dest based on reference file names.
- """
-
- try:
- npolfile = dest[0].header['NPOLFILE']
- except KeyError:
- npolfile = None
- try:
- d2imfile = dest[0].header['D2IMFILE']
- except KeyError:
- d2imfile = None
-
- sipname, idctab = utils.build_sipname(dest, dest, None)
- npolname, npolfile = utils.build_npolname(dest, npolfile)
- d2imname, d2imfile = utils.build_d2imname(dest, d2imfile)
- dname = utils.build_distname(sipname,npolname,d2imname)
- return dname
-
- def tofile(self, fname, destim=None, hdrname=None, clobber=False):
- """
- Write this headerlet to a file
-
- Parameters
- ----------
- fname: string
- file name
- destim: string (optional)
- provide a value for DESTIM keyword
- hdrname: string (optional)
- provide a value for HDRNAME keyword
- clobber: boolean
- a flag which allows to overwrte an existing file
- """
- if not destim or not hdrname:
- self.hverify()
- self.writeto(fname, clobber=clobber)
-
- def _del_dest_WCS(self, dest, ext=None):
- """
- Delete the WCS of a science file extension
- """
-
- logger.info("Deleting all WCSs of file %s" % dest.filename())
- numext = len(dest)
-
- if ext:
- fext = dest[ext]
- self._remove_d2im(fext)
- self._remove_sip(fext)
- self._remove_lut(fext)
- self._remove_primary_WCS(fext)
- self._remove_idc_coeffs(fext)
- self._remove_fit_values(fext)
- else:
- for idx in range(numext):
- # Only delete WCS from extensions which may have WCS keywords
- if ('XTENSION' in dest[idx].header and
- dest[idx].header['XTENSION'] == 'IMAGE'):
- self._remove_d2im(dest[idx])
- self._remove_sip(dest[idx])
- self._remove_lut(dest[idx])
- self._remove_primary_WCS(dest[idx])
- self._remove_idc_coeffs(dest[idx])
- self._remove_fit_values(dest[idx])
- self._remove_ref_files(dest[0])
- """
- if not ext:
- self._remove_alt_WCS(dest, ext=range(numext))
- else:
- self._remove_alt_WCS(dest, ext=ext)
- """
- def _del_dest_WCS_ext(self, dest):
- numwdvarr = countExtn(dest, 'WCSDVARR')
- numd2im = countExtn(dest, 'D2IMARR')
- if numwdvarr > 0:
- for idx in range(1, numwdvarr + 1):
- del dest[('WCSDVARR', idx)]
- if numd2im > 0:
- for idx in range(1, numd2im + 1):
- del dest[('D2IMARR', idx)]
-
- def _remove_ref_files(self, phdu):
- """
- phdu: Primary HDU
- """
- refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE', 'SIPNAME', 'DISTNAME']
- for kw in refkw:
- try:
- del phdu.header[kw]
- except KeyError:
- pass
-
- def _remove_fit_values(self, ext):
- """
- Remove the any existing astrometric fit values from a FITS extension
- """
-
- logger.debug("Removing astrometric fit values from (%s, %s)"%
- (ext.name, ext.ver))
- dkeys = altwcs.wcskeys(ext.header)
- if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values
- for fitkw in ['NMATCH', 'CATALOG']:
- for k in dkeys:
- fkw = (fitkw+k).rstrip()
- if fkw in ext.header:
- del ext.header[fkw]
-
- def _remove_sip(self, ext):
- """
- Remove the SIP distortion of a FITS extension
- """
-
- logger.debug("Removing SIP distortion from (%s, %s)"
- % (ext.name, ext.ver))
- for prefix in ['A', 'B', 'AP', 'BP']:
- try:
- order = ext.header[prefix + '_ORDER']
- del ext.header[prefix + '_ORDER']
- except KeyError:
- continue
- for i in range(order + 1):
- for j in range(order + 1):
- key = prefix + '_%d_%d' % (i, j)
- try:
- del ext.header[key]
- except KeyError:
- pass
- try:
- del ext.header['IDCTAB']
- except KeyError:
- pass
-
- def _remove_lut(self, ext):
- """
- Remove the Lookup Table distortion of a FITS extension
- """
-
- logger.debug("Removing LUT distortion from (%s, %s)"
- % (ext.name, ext.ver))
- try:
- cpdis = ext.header['CPDIS*']
- except KeyError:
- return
- try:
- for c in range(1, len(cpdis) + 1):
- del ext.header['DP%s*...' % c]
- del ext.header[cpdis.cards[c - 1].keyword]
- del ext.header['CPERR*']
- del ext.header['NPOLFILE']
- del ext.header['NPOLEXT']
- except KeyError:
- pass
-
- def _remove_d2im(self, ext):
- """
- Remove the Detector to Image correction of a FITS extension
- """
-
- logger.debug("Removing D2IM correction from (%s, %s)"
- % (ext.name, ext.ver))
- try:
- d2imdis = ext.header['D2IMDIS*']
- except KeyError:
- return
- try:
- for c in range(1, len(d2imdis) + 1):
- del ext.header['D2IM%s*...' % c]
- del ext.header[d2imdis.cards[c - 1].keyword]
- del ext.header['D2IMERR*']
- del ext.header['D2IMFILE']
- del ext.header['D2IMEXT']
- except KeyError:
- pass
-
- def _remove_alt_WCS(self, dest, ext):
- """
- Remove Alternate WCSs of a FITS extension.
- A WCS with wcskey 'O' is never deleted.
- """
- dkeys = altwcs.wcskeys(dest[('SCI', 1)].header)
- for val in ['O', '', ' ']:
- if val in dkeys:
- dkeys.remove(val) # Never delete WCS with wcskey='O'
-
- logger.debug("Removing alternate WCSs with keys %s from %s"
- % (dkeys, dest.filename()))
- for k in dkeys:
- altwcs.deleteWCS(dest, ext=ext, wcskey=k)
-
- def _remove_primary_WCS(self, ext):
- """
- Remove the primary WCS of a FITS extension
- """
-
- logger.debug("Removing Primary WCS from (%s, %s)"
- % (ext.name, ext.ver))
- naxis = ext.header['NAXIS']
- for key in basic_wcs:
- for i in range(1, naxis + 1):
- try:
- del ext.header[key + str(i)]
- except KeyError:
- pass
- try:
- del ext.header['WCSAXES']
- except KeyError:
- pass
- try:
- del ext.header['WCSNAME']
- except KeyError:
- pass
-
- def _remove_idc_coeffs(self, ext):
- """
- Remove IDC coefficients of a FITS extension
- """
-
- logger.debug("Removing IDC coefficient from (%s, %s)"
- % (ext.name, ext.ver))
- coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
- for k in coeffs:
- try:
- del ext.header[k]
- except KeyError:
- pass
-
-@with_logging
-def _idc2hdr(fromhdr, tohdr, towkey=' '):
- """
- Copy the IDC (HST specific) keywords from one header to another
-
- """
- # save some of the idc coefficients
- coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
- for c in coeffs:
- try:
- tohdr[c+towkey] = fromhdr[c]
- logger.debug("Copied %s to header")
- except KeyError:
- continue
-
-
-def get_extname_extver_list(fobj, sciext):
- """
- Create a list of (EXTNAME, EXTVER) tuples
-
- Based on sciext keyword (see docstring for create_headerlet)
- walk throughh the file and convert extensions in `sciext` to
- valid (EXTNAME, EXTVER) tuples.
- """
- extlist = []
- if isinstance(sciext, int):
- if sciext == 0:
- extname = 'PRIMARY'
- extver = 1
- else:
- try:
- extname = fobj[sciext].header['EXTNAME']
- except KeyError:
- extname = ""
- try:
- extver = fobj[sciext].header['EXTVER']
- except KeyError:
- extver = 1
- extlist.append((extname, extver))
- elif isinstance(sciext, str):
- if sciext == 'PRIMARY':
- extname = "PRIMARY"
- extver = 1
- extlist.append((extname, extver))
- else:
- for ext in fobj:
- try:
- extname = ext.header['EXTNAME']
- except KeyError:
- continue
- if extname.upper() == sciext.upper():
- try:
- extver = ext.header['EXTVER']
- except KeyError:
- extver = 1
- extlist.append((extname, extver))
- elif isinstance(sciext, list):
- if isinstance(sciext[0], int):
- for i in sciext:
- try:
- extname = fobj[i].header['EXTNAME']
- except KeyError:
- if i == 0:
- extname = "PRIMARY"
- extver = 1
- else:
- extname = ""
- try:
- extver = fobj[i].header['EXTVER']
- except KeyError:
- extver = 1
- extlist.append((extname, extver))
- else:
- extlist = sciext[:]
- else:
- errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
- " a valid EXTNAME string, or an integer."
- logger.critical(errstr)
- raise ValueError
- return extlist
-
-
-class HeaderletHDU(fits.hdu.nonstandard.FitsHDU):
- """
- A non-standard extension HDU for encapsulating Headerlets in a file. These
- HDUs have an extension type of HDRLET and their EXTNAME is derived from the
- Headerlet's HDRNAME.
-
- The data itself is a FITS file embedded within the HDU data. The file name
- is derived from the HDRNAME keyword, and should be in the form
- `<HDRNAME>_hdr.fits`. If the COMPRESS keyword evaluates to `True`, the tar
- file is compressed with gzip compression.
-
- The structure of this HDU is the same as that proposed for the 'FITS'
- extension type proposed here:
- http://listmgr.cv.nrao.edu/pipermail/fitsbits/2002-April/thread.html
-
- The Headerlet contained in the HDU's data can be accessed by the
- `headerlet` attribute.
- """
-
- _extension = 'HDRLET'
-
- @lazyproperty
- def headerlet(self):
- """Return the encapsulated headerlet as a Headerlet object.
-
- This is similar to the hdulist property inherited from the FitsHDU
- class, though the hdulist property returns a normal HDUList object.
- """
-
- return Headerlet(self.hdulist)
-
- @classmethod
- def fromheaderlet(cls, headerlet, compress=False):
- """
- Creates a new HeaderletHDU from a given Headerlet object.
-
- Parameters
- ----------
- headerlet : `Headerlet`
- A valid Headerlet object.
-
- compress : bool, optional
- Gzip compress the headerlet data.
-
- Returns
- -------
- hlet : `HeaderletHDU`
- A `HeaderletHDU` object for the given `Headerlet` that can be
- attached as an extension to an existing `HDUList`.
- """
-
- # TODO: Perhaps check that the given object is in fact a valid
- # Headerlet
- hlet = cls.fromhdulist(headerlet, compress)
-
- # Add some more headerlet-specific keywords to the header
- phdu = headerlet[0]
-
- if 'SIPNAME' in phdu.header:
- sipname = phdu.header['SIPNAME']
- else:
- sipname = phdu.header['WCSNAME']
-
- hlet.header['HDRNAME'] = (phdu.header['HDRNAME'],
- phdu.header.comments['HDRNAME'])
- hlet.header['DATE'] = (phdu.header['DATE'],
- phdu.header.comments['DATE'])
- hlet.header['SIPNAME'] = (sipname, 'SIP distortion model name')
- hlet.header['WCSNAME'] = (phdu.header['WCSNAME'], 'WCS name')
- hlet.header['DISTNAME'] = (phdu.header['DISTNAME'],
- 'Distortion model name')
- hlet.header['NPOLFILE'] = (phdu.header['NPOLFILE'],
- phdu.header.comments['NPOLFILE'])
- hlet.header['D2IMFILE'] = (phdu.header['D2IMFILE'],
- phdu.header.comments['D2IMFILE'])
- hlet.header['EXTNAME'] = (cls._extension, 'Extension name')
-
- return hlet
-
-
-fits.register_hdu(HeaderletHDU)