summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/stwcs/wcsutil/headerlet.py400
1 files changed, 262 insertions, 138 deletions
diff --git a/lib/stwcs/wcsutil/headerlet.py b/lib/stwcs/wcsutil/headerlet.py
index b1b873b..838499e 100644
--- a/lib/stwcs/wcsutil/headerlet.py
+++ b/lib/stwcs/wcsutil/headerlet.py
@@ -15,6 +15,7 @@ import wcscorr
from hstwcs import HSTWCS
from mappings import basic_wcs
from stsci.tools.fileutil import countExtn
+from stsci.tools import fileutil as fu
module_logger = logging.getLogger('headerlet')
@@ -30,6 +31,17 @@ def setLogger(logger, level, mode='w'):
logger.addHandler(fh)
logger.setLevel(level)
+def hdrnames(fobj):
+ """
+ Returns a list of HDRNAME keywords from all HeaderletHDU
+ extensions in a science file.
+
+ Parameters
+ ----------
+ fobj: string, pyfits.HDUList
+ """
+
+
def isWCSIdentical(scifile, file2, verbose=False):
"""
Compares the WCS solution of 2 files.
@@ -129,31 +141,69 @@ def isWCSIdentical(scifile, file2, verbose=False):
return result
-# TODO: It would be logical for this to be part of the Headerlet class, perhaps
-# as a classmethod
-def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, logmode='w'):
+def create_headerlet(fname, sciext=None, hdrname=None, destim=None, wcskey=" ", wcsname=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ verbose=100, logmode='w'):
"""
- Create a headerlet from a science observation
-
+ Create a headerlet from a WCS in a science file
+ If both wcskey and wcsname are given they should match, if not
+ raise an Exception
+
Parameters
----------
- fname: string
- Name of file with science observation
+ fname: string or HDUList
+ science file
+ sciext: string or python list
+ Extension in which the science data is. The headerlet will be created
+ from these extensions.
+ If string - a valid EXTNAME is expected
+ If list - a list of FITS extension numbers or extension tuples ('SCI', 1)
+ is expected.
+ If None, loops over all extensions in the file, including the primary
hdrname: string
- Name for the headerlet, stored in the primary header of the headerlet
- destim: string
- Destination image, stored in the primary header of the headerlet.
- If None ROOTNAME is used of the science observation is used.
- ROOTNAME has precedence, destim is used for observations without
- ROOTNAME in the primary header
- output: string
- Save the headerlet to the given filename.
- verbose: False or a python logging level
- (one of 'INFO', 'DEBUG' logging levels)
- (an integer representing a logging level)
- logmode: 'w', 'a'
- used internally for controling access to the log file
+ value of HDRNAME keyword
+ Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey>
+ It stops if neither is found in the science file and a value is not provided
+ destim: string or None
+ name of file this headerlet can be applied to
+ if None, use ROOTNAME keyword
+ wcskey: char (A...Z) or " " or None
+ a char representing an alternate WCS to be used for the headerlet
+ if " ", use the primary (default)
+ if None use wcsname
+ wcsname: string or None
+ if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ stored. If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ verbose: int
+ python logging level
+ logmode: 'w' or 'a'
+ log file open mode
+
+ Returns
+ -------
+ Headerlet object
"""
+
if verbose:
setLogger(module_logger, verbose, mode=logmode)
@@ -170,103 +220,159 @@ def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, log
else:
fobj = fname
close_file = False
+
+ # get all required keywords
if destim is None:
try:
destim = fobj[0].header['ROOTNAME']
except KeyError:
- module_logger.exception('Required keyword "DESTIM" not found')
- print 'Please provide a value for the DESTIM keyword'
- raise
- if hdrname is None:
- module_logger.critical("Required keyword 'HDRNAME' not given")
- raise ValueError("Please provide a name for the headerlet, HDRNAME is "
- "a required parameter.")
-
+ destim = fname
+ module_logger.info('DESTIM not provided')
+ module_logger.info('Keyword "ROOTNAME" not found')
+ module_logger.info('Using file name as DESTIM')
+
+ if not hdrname:
+ # check if HDRNAME<wcskey> is in header
+ hdrname = "".join(["HDRNAME",wcskey.upper()])
+ try:
+ hdrname = fobj[1].header['HDRNAME']
+ except KeyError:
+ try:
+ hdrname = fobj[1].header['WCSNAME']
+ except KeyError as detail:
+ message = "Required keyword 'HDRNAME' not given"
+ module_logger.critical(message)
+ print message, detail
+
+ if not wcsname:
+ wname = "".join(["WCSNAME",wcskey.upper()])
+ try:
+ wcsname = fobj[1].header[wname]
+ except KeyError as detail:
+ message = "Missing required keyword 'WCSNAME'."
+ module_logger.critical(message)
+ print message, detail
+
+ if not sipname:
+ try:
+ sipname = fobj[0].header["SIPNAME"]
+ except KeyError:
+ try:
+ sipname = fobj[0].header["IDCTAB"]
+ except KeyError:
+ if 'A_ORDER' in fobj[1].header or 'B_ORDER' in fobj[1].header:
+ sipname = 'UNKNOWN'
+ else:
+ sipname = 'NOMODEL'
+
+ if not npolfile:
+ try:
+ npolfile = fobj[0].header["NPOLFILE"]
+ except KeyError:
+ if countExtn(f, 'WCSDVARR'):
+ npolfile = 'UNKNOWN'
+ else:
+ npolfile = 'NOMODEL'
+
+ if not d2imfile:
+ try:
+ d2imfile = fobj[0].header["D2IMFILE"]
+ except KeyError:
+ if countExtn(f, 'D2IMARR'):
+ npolfile = 'UNKNOWN'
+ else:
+ npolfile = 'NOMODEL'
+
+ distname = "_".join([sipname, npolfile, d2imfile])
+
# get the version of STWCS used to create the WCS of the science file.
try:
upwcsver = fobj[0].header.ascard['STWCSVER']
except KeyError:
upwcsver = pyfits.Card("STWCSVER", " ",
"Version of STWCS used to update the WCS")
-
- # get all keys for alternate WCSs
- altkeys = altwcs.wcskeys(fobj[('SCI', 1)].header)
-
- if 'O' in altkeys:
- altkeys.remove('O')
-
- numsci = countExtn(fname, 'SCI')
- module_logger.debug("Number of 'SCI' extensions in file %s is %s"
- % (fname, numsci))
+ try:
+ pywcsver = fobj[0].header.ascard['PYWCSVER']
+ except KeyError:
+ pywcsver = pyfits.Card("PYWCSVER", " ",
+ "Version of PYWCS used to update the WCS")
+
+ if not sciext:
+ sciext = range(fobj)
+ elif isinstance(sciext, str):
+ numsciext = countExtn(fobj, sciext)
+ sciext = [(sciext, i) for i in range(1, numsciext+1)]
+ elif isinstance(sciext, list):
+ pass
+ else:
+ raise ValueError("Expected sciext to be a list of FITS extensions with science data or a string of valid EXTNAME")
+
+ if wcskey is 'O':
+ message = "Warning: 'O' is a reserved key for the original WCS"
+ module_logger.info(message)
+ print message
+
+ module_logger.debug("Data extensions form which to create headerlet:\n\t %s"
+ % (str(sciext)))
hdul = pyfits.HDUList()
- phdu = pyfits.PrimaryHDU()
- phdu.header.update('DESTIM', destim,
- comment='Destination observation root name')
- phdu.header.update('HDRNAME', hdrname, comment='Headerlet name')
- fmt="%Y-%m-%dT%H:%M:%S"
- phdu.header.update('DATE', time.strftime(fmt),
- comment='Date FITS file was generated')
- phdu.header.ascard.append(upwcsver)
-
- refs = updateRefFiles(fobj[0].header.ascard, phdu.header.ascard,
- verbose=verbose)
- phdukw.update(refs)
- phdu.header.update(key='VAFACTOR',
- value=fobj[('SCI',1)].header.get('VAFACTOR', 1.))
+ phdu = _createPrimaryHDU(destim, hdrname, distname, wcsname,
+ sipname, npolfile, d2imfile, upwcsver, pywcsver)
hdul.append(phdu)
-
- for e in range(1, numsci + 1):
- hwcs = HSTWCS(fname, ext=('SCI', e))
- h = hwcs.wcs2header(sip2hdr=True).ascard
- for ak in altkeys:
- awcs = HSTWCS(fname,ext=('SCI', e), wcskey=ak)
- h.extend(awcs.wcs2header(idc2hdr=False).ascard)
- h.append(pyfits.Card(key='VAFACTOR', value=hwcs.vafactor,
- comment='Velocity aberration plate scale factor'))
- h.insert(0, pyfits.Card(key='EXTNAME', value='SIPWCS',
- comment='Extension name'))
- h.insert(1, pyfits.Card(key='EXTVER', value=e,
- comment='Extension version'))
-
- fhdr = fobj[('SCI', e)].header.ascard
- if phdukw['NPOLFILE']:
- cpdis = fhdr['CPDIS*...']
- for c in range(1, len(cpdis) + 1):
- h.append(cpdis[c - 1])
- dp = fhdr['DP%s.*...' % c]
- h.extend(dp)
-
+
+ if fu.isFits(fobj)[1] is not 'simple':
+
+ for e in sciext:
+ hwcs = HSTWCS(fname, ext=e, wcskey=wcskey)
+ h = hwcs.wcs2header(sip2hdr=True).ascard
+ h.append(pyfits.Card(key='VAFACTOR', value=hwcs.vafactor,
+ comment='Velocity aberration plate scale factor'))
+ h.insert(0, pyfits.Card(key='EXTNAME', value='SIPWCS',
+ comment='Extension name'))
+ if isinstance(e, int): val = e
+ else: val = e[1]
+ h.insert(1, pyfits.Card(key='EXTVER', value=val,
+ comment='Extension version'))
+ h.append(pyfits.Card("SCIEXT", str(e),
+ "Target science data extension"))
+ fhdr = fobj[e].header.ascard
+ if npolfile is not 'NOMODEL':
+ cpdis = fhdr['CPDIS*...']
+ for c in range(1, len(cpdis) + 1):
+ h.append(cpdis[c - 1])
+ dp = fhdr['DP%s.*...' % c]
+ h.extend(dp)
+
+ try:
+ h.append(fhdr['CPERROR%s' % c])
+ except KeyError:
+ pass
+
try:
- h.append(fhdr['CPERROR%s' % c])
+ h.append(fhdr['NPOLEXT'])
except KeyError:
pass
-
- try:
- h.append(fhdr['NPOLEXT'])
- except KeyError:
- pass
-
- if phdukw['D2IMFILE']:
- try:
- h.append(fhdr['D2IMEXT'])
- except KeyError:
- pass
-
- try:
- h.append(fhdr['AXISCORR'])
- except KeyError:
- module_logger.exception("'D2IMFILE' kw exists but keyword 'AXISCORR' was not found in "
- "%s['SCI',%d]" % (fname, e))
- raise
-
- try:
- h.append(fhdr['D2IMERR'])
- except KeyError:
- h.append(pyfits.Card(key='DPERROR', value=0,
- comment='Maximum error of D2IMARR'))
-
- hdu = pyfits.ImageHDU(header=pyfits.Header(h))
- hdul.append(hdu)
+
+ if d2imfile is not 'NOMODEL':
+ try:
+ h.append(fhdr['D2IMEXT'])
+ except KeyError:
+ pass
+
+ try:
+ h.append(fhdr['AXISCORR'])
+ except KeyError:
+ module_logger.exception("'D2IMFILE' kw exists but keyword 'AXISCORR' was not found in "
+ "%s['SCI',%d]" % (fname, e))
+ raise
+
+ try:
+ h.append(fhdr['D2IMERR'])
+ except KeyError:
+ h.append(pyfits.Card(key='DPERROR', value=0,
+ comment='Maximum error of D2IMARR'))
+
+ hdu = pyfits.ImageHDU(header=pyfits.Header(h))
+ hdul.append(hdu)
numwdvarr = countExtn(fname, 'WCSDVARR')
numd2im = countExtn(fname, 'D2IMARR')
for w in range(1, numwdvarr + 1):
@@ -275,15 +381,30 @@ def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, log
for d in range(1, numd2im + 1):
hdu = fobj[('D2IMARR', d)].copy()
hdul.append(hdu)
- if output is not None:
- # write the headerlet to a file
- if not output.endswith('_hdr.fits'):
- output = output + '_hdr.fits'
- hdul.writeto(output, clobber=True)
+
if close_file:
fobj.close()
return Headerlet(hdul,verbose=verbose, logmode='a')
+def _createPrimaryHDU(destim, hdrname, distname, wcsname,
+ sipname, npolfile, d2imfile, upwcsver, pywcsver):
+ phdu = pyfits.PrimaryHDU()
+ phdu.header.update('DESTIM', destim,
+ comment='Destination observation root name')
+ phdu.header.update('HDRNAME', hdrname, comment='Headerlet name')
+ fmt="%Y-%m-%dT%H:%M:%S"
+ phdu.header.update('DATE', time.strftime(fmt),
+ comment='Date FITS file was generated')
+ phdu.header.update('WCSNAME', wcsname, comment='WCS name')
+ phdu.header.update('DISTNAME', distname, comment='Distortion model name')
+ phdu.header.update('SIPNAME', sipname, comment='origin of SIP polynomial distortion model')
+ phdu.header.update('NPOLFILE', npolfile, comment='origin of non-polynmial distortion model')
+ phdu.header.update('D2IMFILE', d2imfile, comment='origin of detector to image correction')
+
+ phdu.header.ascard.append(upwcsver)
+ phdu.header.ascard.append(pywcsver)
+ return phdu
+
def applyHeaderlet(hdrfile, destfile, createheaderlet=True, hdrname=None,
verbose=False):
"""
@@ -378,24 +499,23 @@ def mapFitsExt2HDUListInd(fname, extname):
class Headerlet(pyfits.HDUList):
"""
A Headerlet class
- Ref: http://stsdas.stsci.edu/stsci_python_sphinxdocs/stwcs/headerlet_def.html
+ Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets
"""
- def __init__(self, fobj, wcskeys=[], mode='copyonwrite', verbose=False,
- logmode='w'):
+ def __init__(self, fobj, mode='copyonwrite', verbose=False, logmode='w'):
"""
Parameters
----------
fobj: string
Name of headerlet file, file-like object, a list of HDU
instances, or an HDUList instance
- wcskeys: python list
- a list of wcskeys to be included in the headerlet
- created from the old WCS solution before the
- science file is updated. If empty: all alternate (if any)
- WCSs are copied to the headerlet.
mode: string, optional
Mode with which to open the given file object
+ verbose: int
+ python logging level, higher numbers trigger more output
+ logmode: 'w' or 'a'
+ for internal use only, indicates whether the log file
+ should be open in attach or write mode
"""
self.verbose = verbose
self.hdr_logger = logging.getLogger('headerlet.Headerlet')
@@ -404,44 +524,48 @@ class Headerlet(pyfits.HDUList):
else:
self.hdr_logger.setLevel(100)
- self.hdr_logger.info("Creating a Headerlet object from wcskeys %s" % wcskeys)
- self.wcskeys = wcskeys
if not isinstance(fobj, list):
fobj = pyfits.open(fobj, mode=mode)
super(Headerlet, self).__init__(fobj)
self.fname = self.filename()
self.hdrname = self[0].header["HDRNAME"]
+ self.wcsname = self[0].header["WCSNAME"]
self.stwcsver = self[0].header.get("STWCSVER", "")
+ self.stwcsver = self[0].header.get("PYWCSVER", "")
self.destim = self[0].header["DESTIM"]
- self.idctab = self[0].header.get("IDCTAB", "")
- self.npolfile = self[0].header.get("NPOLFILE", "")
- self.d2imfile = self[0].header.get("D2IMFILE", "")
+ self.idctab = self[0].header["SIPNAME"]
+ self.npolfile = self[0].header["NPOLFILE"]
+ self.d2imfile = self[0].header["D2IMFILE"]
+ self.distname = self[0].header["DISTNAME"]
self.vafactor = self[1].header.get("VAFACTOR", 1) #None instead of 1?
self.d2imerr = 0
self.axiscorr = 1
- def apply(self, dest, createheaderlet=True, hdrname=None, attach=True,
- createsummary=True):
+ def apply_as_primary(fobj, attach=True, archive=True, force=False):
"""
- Apply this headerlet to a file.
-
+ Copy this headerlet as a primary WCS to fobj
+
Parameters
----------
- dest: string
- Name of file to which to apply the WCS in the headerlet
- hdrname: string
- A unique name for the headerlet
- createheaderlet: boolean
- A flag which indicates if a headerlet should be created
- from the old WCS and attached to the science file (default: True)
- attach: boolean, default: True
- By default the headerlet being applied will be attached
- as an extension to the science file. Set 'attach' to False
- to disable this.
- createsummary: boolean, default: True
- Set this to False to disable creating and updating of wcscorr table.
- This is used primarily for testing.
+ fobj: string, HDUList
+ science file to which the headerlet should be applied
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ archive: boolean (default is True)
+ When the distortion model in the headerlet is the same as the
+ distortion model of the science file, this flag indicates if
+ the primary WCS should be saved as an alternate and a headerlet
+ extension.
+ When the distortion models do not match this flag indicates if
+ the current primary and alternate WCSs should be archived as
+ headerlet extensions and alternate WCS.
+ force: boolean (default is False)
+ When the distortion models of the headerlet and the primary do
+ not match, and archive is False this flag forces an update
+ of the primary
"""
self.hverify()
if self.verify_dest(dest):