diff options
author | embray <embray@stsci.edu> | 2011-06-22 19:24:07 -0400 |
---|---|---|
committer | embray <embray@stsci.edu> | 2011-06-22 19:24:07 -0400 |
commit | d93a10017d62f39d80167b45c1044a5e113f5994 (patch) | |
tree | 07967ea82a8550f8a8423bbe30046e798cf6c98e /lib/stwcs/wcsutil/headerlet.py | |
parent | 708b4f32ac133fdb6157ec6e243dc76e32f9a84b (diff) | |
download | stwcs_hcf-d93a10017d62f39d80167b45c1044a5e113f5994.tar.gz |
Redoing the r13221-13223 merge in the actual trunk now. This updates trunk to the setup_refactoring branch (however, coords, pysynphot, and pywcs are still being pulled from the astrolib setup_refactoring branch. Will have to do that separately then update the svn:externals)
git-svn-id: http://svn.stsci.edu/svn/ssb/stsci_python/stsci_python/trunk/stwcs@13225 fe389314-cf27-0410-b35b-8c050e845b92
Diffstat (limited to 'lib/stwcs/wcsutil/headerlet.py')
-rw-r--r-- | lib/stwcs/wcsutil/headerlet.py | 898 |
1 files changed, 898 insertions, 0 deletions
diff --git a/lib/stwcs/wcsutil/headerlet.py b/lib/stwcs/wcsutil/headerlet.py new file mode 100644 index 0000000..0318bf2 --- /dev/null +++ b/lib/stwcs/wcsutil/headerlet.py @@ -0,0 +1,898 @@ +from __future__ import division +import logging +import os +import tarfile +import tempfile +import time +import warnings +from cStringIO import StringIO + +import numpy as np +import pyfits + +import altwcs +import wcscorr +from hstwcs import HSTWCS +from mappings import basic_wcs +from stsci.tools.fileutil import countExtn + +module_logger = logging.getLogger('headerlet') + +import atexit +atexit.register(logging.shutdown) + +def setLogger(logger, level, mode='w'): + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + log_filename = 'headerlet.log' + fh = logging.FileHandler(log_filename, mode=mode) + fh.setLevel(logging.DEBUG) + fh.setFormatter(formatter) + logger.addHandler(fh) + logger.setLevel(level) + +def isWCSIdentical(scifile, file2, verbose=False): + """ + Compares the WCS solution of 2 files. + + Parameters + ---------- + scifile: file1 + file2: file2 + verbose: False or a python logging level + (one of 'INFO', 'DEBUG' logging levels) + (an integer representing a logging level) + + Notes + ----- + These can be 2 science observations or 2 headerlets + or a science observation and a headerlet. The two files + have the same WCS solution if the following are the same: + + - rootname/destim + - primary WCS + - SIP coefficients + - NPOL distortion + - D2IM correction + - Velocity aberation + + """ + if verbose: + setLogger(module_logger, verbose) + else: + module_logger.setLevel(100) + + module_logger.info("Starting isWCSIdentical: %s" % time.asctime()) + + result = True + numsci1 = max(countExtn(scifile), countExtn(scifile, 'SIPWCS')) + numsci2 = max(countExtn(file2), countExtn(file2, 'SIPWCS')) + + if numsci1 == 0 or numsci2 == 0 or numsci1 != numsci2: + module_logger.info("Number of SCI and SIPWCS extensions do not match.") + result = False + + if getRootname(scifile) != getRootname(file2): + module_logger.info('Rootnames do not match.') + result = False + try: + extname1 = pyfits.getval(scifile, 'EXTNAME', ext=('SCI', 1)) + except KeyError: + extname1 = 'SIPWCS' + try: + extname2 = pyfits.getval(file2, 'EXTNAME', ext=('SCI', 1)) + except KeyError: + extname2 = 'SIPWCS' + + for i in range(1, numsci1 + 1): + w1 = HSTWCS(scifile, ext=(extname1, i)) + w2 = HSTWCS(file2, ext=(extname2, i)) + if not np.allclose(w1.wcs.crval, w2.wcs.crval, rtol=1e-7) or \ + not np.allclose(w1.wcs.crpix, w2.wcs.crpix, rtol=1e-7) or \ + not np.allclose(w1.wcs.cd, w2.wcs.cd, rtol=1e-7) or \ + not (np.array(w1.wcs.ctype) == np.array(w2.wcs.ctype)).all(): + module_logger.info('Primary WCSs do not match') + result = False + if w1.sip or w2.sip: + if (w2.sip and not w1.sip) or (w1.sip and not w2.sip) or \ + not np.allclose(w1.sip.a, w2.sip.a, rtol=1e-7) or \ + not np.allclose(w1.sip.b, w2.sip.b, rtol=1e-7): + module_logger.info('SIP coefficients do not match') + result = False + if w1.cpdis1 or w2.cpdis1: + if w1.cpdis1 and not w2.cpdis1 or \ + w2.cpdis1 and not w1.cpdis1 or \ + not np.allclose(w1.cpdis1.data, w2.cpdis1.data): + module_logger.info('NPOL distortions do not match') + result = False + if w1.cpdis2 or w2.cpdis2: + if w1.cpdis2 and not w2.cpdis2 or \ + w2.cpdis2 and not w1.cpdis2 or \ + not np.allclose(w1.cpdis2.data, w2.cpdis2.data): + module_logger.info('NPOL distortions do not match') + result = False + if w1.det2im1 or w2.det2im1: + if w1.det2im1 and not w2.det2im1 or \ + w2.det2im1 and not w1.det2im1 or\ + not np.allclose(w1.det2im1.data, w2.det2im1.data): + module_logger.info('Det2Im corrections do not match') + result = False + if w1.det2im2 or w2.det2im2: + if w1.det2im2 and not w2.det2im2 or \ + w2.det2im2 and not w1.det2im2 or\ + not np.allclose(w1.det2im2.data, w2.det2im2.data): + module_logger.info('Det2Im corrections do not match') + result = False + if w1.vafactor != w2.vafactor: + module_logger.info('VA factors do not match') + result = False + + return result + + +# TODO: It would be logical for this to be part of the Headerlet class, perhaps +# as a classmethod +def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, logmode='w'): + """ + Create a headerlet from a science observation + + Parameters + ---------- + fname: string + Name of file with science observation + hdrname: string + Name for the headerlet, stored in the primary header of the headerlet + destim: string + Destination image, stored in the primary header of the headerlet. + If None ROOTNAME is used of the science observation is used. + ROOTNAME has precedence, destim is used for observations without + ROOTNAME in the primary header + output: string + Save the headerlet to the given filename. + verbose: False or a python logging level + (one of 'INFO', 'DEBUG' logging levels) + (an integer representing a logging level) + logmode: 'w', 'a' + used internally for controling access to the log file + """ + + if verbose: + setLogger(module_logger, verbose, mode=logmode) + else: + module_logger.setLevel(100) + + module_logger.info("Starting createHeaderlet: %s" % time.asctime()) + phdukw = {'IDCTAB': True, + 'NPOLFILE': True, + 'D2IMFILE': True} + if not isinstance(fname, pyfits.HDUList): + fobj = pyfits.open(fname) + close_file = True + else: + fobj = fname + close_file = False + if destim is None: + try: + destim = fobj[0].header['ROOTNAME'] + except KeyError: + module_logger.exception('Required keyword "DESTIM" not found') + print 'Please provide a value for the DESTIM keyword' + raise + if hdrname is None: + module_logger.critical("Required keyword 'HDRNAME' not given") + raise ValueError("Please provide a name for the headerlet, HDRNAME is " + "a required parameter.") + + # get the version of STWCS used to create the WCS of the science file. + try: + upwcsver = fobj[0].header.ascard['STWCSVER'] + except KeyError: + upwcsver = pyfits.Card("STWCSVER", " ", + "Version of STWCS used to update the WCS") + + # get all keys for alternate WCSs + altkeys = altwcs.wcskeys(fobj[('SCI', 1)].header) + + if 'O' in altkeys: + altkeys.remove('O') + + numsci = countExtn(fname, 'SCI') + module_logger.debug("Number of 'SCI' extensions in file %s is %s" + % (fname, numsci)) + hdul = pyfits.HDUList() + phdu = pyfits.PrimaryHDU() + phdu.header.update('DESTIM', destim, + comment='Destination observation root name') + phdu.header.update('HDRNAME', hdrname, comment='Headerlet name') + fmt="%Y-%m-%dT%H:%M:%S" + phdu.header.update('DATE', time.strftime(fmt), + comment='Date FITS file was generated') + phdu.header.ascard.append(upwcsver) + + refs = updateRefFiles(fobj[0].header.ascard, phdu.header.ascard, verbose=verbose) + phdukw.update(refs) + phdu.header.update(key='VAFACTOR', + value=fobj[('SCI',1)].header.get('VAFACTOR', 1.)) + hdul.append(phdu) + + for e in range(1, numsci + 1): + hwcs = HSTWCS(fname, ext=('SCI', e)) + h = hwcs.wcs2header(sip2hdr=True).ascard + for ak in altkeys: + awcs = HSTWCS(fname,ext=('SCI', e), wcskey=ak) + h.extend(awcs.wcs2header(idc2hdr=False).ascard) + h.append(pyfits.Card(key='VAFACTOR', value=hwcs.vafactor, + comment='Velocity aberration plate scale factor')) + h.insert(0, pyfits.Card(key='EXTNAME', value='SIPWCS', + comment='Extension name')) + h.insert(1, pyfits.Card(key='EXTVER', value=e, + comment='Extension version')) + + fhdr = fobj[('SCI', e)].header.ascard + if phdukw['NPOLFILE']: + cpdis = fhdr['CPDIS*...'] + for c in range(1, len(cpdis) + 1): + h.append(cpdis[c - 1]) + dp = fhdr['DP%s.*...' % c] + h.extend(dp) + + try: + h.append(fhdr['CPERROR%s' % c]) + except KeyError: + pass + + try: + h.append(fhdr['NPOLEXT']) + except KeyError: + pass + + if phdukw['D2IMFILE']: + try: + h.append(fhdr['D2IMEXT']) + except KeyError: + pass + + try: + h.append(fhdr['AXISCORR']) + except KeyError: + module_logger.exception("'D2IMFILE' kw exists but keyword 'AXISCORR' was not found in " + "%s['SCI',%d]" % (fname, e)) + raise + + try: + h.append(fhdr['D2IMERR']) + except KeyError: + h.append(pyfits.Card(key='DPERROR', value=0, + comment='Maximum error of D2IMARR')) + + hdu = pyfits.ImageHDU(header=pyfits.Header(h)) + # temporary fix for pyfits ticket # 48 + hdu._extver = e + hdul.append(hdu) + numwdvarr = countExtn(fname, 'WCSDVARR') + numd2im = countExtn(fname, 'D2IMARR') + for w in range(1, numwdvarr + 1): + hdu = fobj[('WCSDVARR', w)].copy() + # temporary fix for pyfits ticket # 48 + hdu._extver = w + hdul.append(hdu) + for d in range(1, numd2im + 1): + hdu = fobj[('D2IMARR', d)].copy() + # temporary fix for pyfits ticket # 48 + hdu._extver = d + hdul.append(hdu) + if output is not None: + # write the headerlet to a file + if not output.endswith('_hdr.fits'): + output = output + '_hdr.fits' + hdul.writeto(output, clobber=True) + if close_file: + fobj.close() + return Headerlet(hdul,verbose=verbose, logmode='a') + +def applyHeaderlet(hdrfile, destfile, createheaderlet=True, hdrname=None, + verbose=False): + """ + Apply headerlet 'hdrfile' to a science observation 'destfile' + + Parameters + ---------- + hdrfile: string + Headerlet file + destfile: string + File name of science observation whose WCS solution will be updated + createheaderlet: boolean + True (default): before updating, create a headerlet with the + WCS old solution. + hdrname: string or None (default) + will be the value of the HDRNAME keyword in the headerlet generated + for the old WCS solution. If not specified, a sensible default + will be used. Not required if createheaderlet is False + verbose: False or a python logging level + (one of 'INFO', 'DEBUG' logging levels) + (an integer representing a logging level) + """ + if verbose: + setLogger(module_logger, verbose) + else: + module_logger.setLevel(100) + module_logger.info("Starting applyHeaderlet: %s" % time.asctime()) + hlet = Headerlet(hdrfile, verbose=verbose, logmode='a') + hlet.apply(destfile, createheaderlet=createheaderlet, hdrname=hdrname) + +def updateRefFiles(source, dest, verbose=False): + """ + Update the reference files name in the primary header of 'dest' + using values from 'source' + + Parameters + ---------- + source: pyfits.Header.ascardlist + dest: pyfits.Header.ascardlist + """ + module_logger.info("Updating reference files") + phdukw = {'IDCTAB': True, + 'NPOLFILE': True, + 'D2IMFILE': True} + + try: + wind = dest.index_of('HISTORY') + except KeyError: + wind = len(dest) + for key in phdukw.keys(): + try: + value = source[key] + dest.insert(wind, value) + except KeyError: + # TODO: I don't understand what the point of this is. Is it meant + # for logging purposes? Right now it isn't used. + phdukw[key] = False + return phdukw + +def getRootname(fname): + """ + returns the value of ROOTNAME or DESTIM + """ + + try: + rootname = pyfits.getval(fname, 'ROOTNAME') + except KeyError: + rootname = pyfits.getval(fname, 'DESTIM') + return rootname + +def mapFitsExt2HDUListInd(fname, extname): + """ + Map FITS extensions with 'EXTNAME' to HDUList indexes. + """ + + if not isinstance(fname, pyfits.HDUList): + f = pyfits.open(fname) + close_file = True + else: + f = fname + close_file = False + d = {} + for hdu in f: + # TODO: Replace calls to header.has_key() with `in header` once + # pyfits refactoring branch is in production use + if hdu.header.has_key('EXTNAME') and hdu.header['EXTNAME'] == extname: + extver = hdu.header['EXTVER'] + d[(extname, extver)] = f.index_of((extname, extver)) + if close_file: + f.close() + return d + + +class Headerlet(pyfits.HDUList): + """ + A Headerlet class + Ref: http://stsdas.stsci.edu/stsci_python_sphinxdocs/stwcs/headerlet_def.html + """ + + def __init__(self, fobj, wcskeys=[], mode='copyonwrite', verbose=False, logmode='w'): + """ + Parameters + ---------- + fobj: string + Name of headerlet file, file-like object, a list of HDU + instances, or an HDUList instance + wcskeys: python list + a list of wcskeys to be included in the headerlet + created from the old WCS solution before the + science file is updated. If empty: all alternate (if any) + WCSs are copied to the headerlet. + mode: string, optional + Mode with which to open the given file object + """ + self.verbose = verbose + self.hdr_logger = logging.getLogger('headerlet.Headerlet') + if self.verbose: + setLogger(self.hdr_logger, self.verbose, mode=logmode) + else: + self.hdr_logger.setLevel(100) + + self.hdr_logger.info("Creating a Headerlet object from wcskeys %s" % wcskeys) + self.wcskeys = wcskeys + if not isinstance(fobj, list): + fobj = pyfits.open(fobj, mode=mode) + + super(Headerlet, self).__init__(fobj) + self.fname = self.filename() + self.hdrname = self[0].header["HDRNAME"] + self.stwcsver = self[0].header.get("STWCSVER", "") + self.destim = self[0].header["DESTIM"] + self.idctab = self[0].header.get("IDCTAB", "") + self.npolfile = self[0].header.get("NPOLFILE", "") + self.d2imfile = self[0].header.get("D2IMFILE", "") + self.vafactor = self[1].header.get("VAFACTOR", 1) #None instead of 1? + self.d2imerr = 0 + self.axiscorr = 1 + + def apply(self, dest, createheaderlet=True, hdrname=None, attach=True, createsummary=True): + """ + Apply this headerlet to a file. + + Parameters + ---------- + dest: string + Name of file to which to apply the WCS in the headerlet + hdrname: string + A unique name for the headerlet + createheaderlet: boolean + A flag which indicates if a headerlet should be created + from the old WCS and attached to the science file (default: True) + attach: boolean, default: True + By default the headerlet being applied will be attached + as an extension to the science file. Set 'attach' to False + to disable this. + createsummary: boolean, default: True + Set this to False to disable creating and updating of wcscorr table. + This is used primarily for testing. + """ + self.hverify() + if self.verify_dest(dest): + if not isinstance(dest, pyfits.HDUList): + fobj = pyfits.open(dest, mode='update') + close_dest = True + else: + fobj = dest + close_dest = False + + # Create the WCSCORR HDU/table from the existing WCS keywords if + # necessary + if createsummary: + try: + # TODO: in the pyfits refactoring branch if will be easier to + # test whether an HDUList contains a certain extension HDU + # without relying on try/except + wcscorr_table = fobj['WCSCORR'] + except KeyError: + # The WCSCORR table needs to be created + wcscorr.init_wcscorr(fobj) + + orig_hlt_hdu = None + numhlt = countExtn(fobj, 'HDRLET') + if createheaderlet: + # Create a headerlet for the original WCS data in the file, + # create an HDU from the original headerlet, and append it to + # the file + if not hdrname: + hdrname = fobj[0].header['ROOTNAME'] + '_orig' + orig_hlt = createHeaderlet(fobj, hdrname, verbose=self.verbose, logmode='a') + orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt) + orig_hlt_hdu.update_ext_version(numhlt + 1) + numhlt += 1 + + self._delDestWCS(fobj) + refs = updateRefFiles(self[0].header.ascard, fobj[0].header.ascard, verbose=self.verbose) + numsip = countExtn(self, 'SIPWCS') + for idx in range(1, numsip + 1): + fhdr = fobj[('SCI', idx)].header.ascard + siphdr = self[('SIPWCS', idx)].header.ascard + # a minimal attempt to get the position of the WCS keywords group + # in the header by looking for the PA_APER kw. + # at least make sure the WCS kw are written befir the HISTORY kw + # if everything fails, append the kw to the header + try: + wind = fhdr.index_of('PA_APER') + except KeyError: + try: + wind = fhdr.index_of('HISTORY') + except KeyError: + wind = len(fhdr) + self.hdr_logger.debug("Inserting WCS keywords at index %s" % wind) + # TODO: Drop .keys() when refactored pyfits comes into use + for k in siphdr.keys(): + if k not in ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT', + 'GCOUNT','EXTNAME', 'EXTVER', 'ORIGIN', + 'INHERIT', 'DATE', 'IRAF-TLM']: + fhdr.insert(wind, siphdr[k]) + else: + pass + + #! Always attach these extensions last. Otherwise their headers may + # get updated with the other WCS kw. + numwdvar = countExtn(self, 'WCSDVARR') + numd2im = countExtn(self, 'D2IMARR') + for idx in range(1, numwdvar + 1): + fobj.append(self[('WCSDVARR', idx)].copy()) + for idx in range(1, numd2im + 1): + fobj.append(self[('D2IMARR', idx)].copy()) + + # Update the WCSCORR table with new rows from the headerlet's WCSs + if createsummary: + wcscorr.update_wcscorr(fobj, self, 'SIPWCS') + + # Append the original headerlet + if createheaderlet and orig_hlt_hdu: + fobj.append(orig_hlt_hdu) + + if attach: + # Finally, append an HDU for this headerlet + new_hlt = HeaderletHDU.fromheaderlet(self) + new_hlt.update_ext_version(numhlt + 1) + fobj.append(new_hlt) + + if close_dest: + fobj.close() + else: + self.hdr_logger.critical("Observation %s cannot be updated with headerlet " + "%s" % (fobj.filename(), self.hdrname)) + print "Observation %s cannot be updated with headerlet %s" \ + % (fobj.filename(), self.hdrname) + + + def hverify(self): + self.verify() + assert(self[0].header.has_key('DESTIM') and + self[0].header['DESTIM'].strip()) + assert(self[0].header.has_key('HDRNAME') and + self[0].header['HDRNAME'].strip()) + assert(self[0].header.has_key('STWCSVER')) + + + def verify_dest(self, dest): + """ + verifies that the headerlet can be applied to the observation + + DESTIM in the primary header of the headerlet must match ROOTNAME + of the science file (or the name of the destination file) + """ + + try: + if not isinstance(dest, pyfits.HDUList): + droot = pyfits.getval(dest, 'ROOTNAME') + else: + droot = dest[0].header['ROOTNAME'] + except KeyError: + self.hdr_logger.debug("Keyword 'ROOTNAME' not found in destination file") + droot = dest.split('.fits')[0] + if droot == self.destim: + self.hdr_logger.debug("verify_destim() returned True") + return True + else: + self.hdr_logger.debug("verify_destim() returned False") + return False + + def tofile(self, fname, destim=None, hdrname=None, clobber=False): + if not destim or not hdrname: + self.hverify() + self.writeto(fname, clobber=clobber) + + def _delDestWCS(self, dest): + """ + Delete the WCS of a science file + """ + + self.hdr_logger.info("Deleting all WCSs of file %s" % dest.filename()) + numext = len(dest) + + for idx in range(numext): + # Only delete WCS from extensions which may have WCS keywords + if dest[idx].__dict__.has_key('_xtn') and "IMAGE" in dest[idx]._xtn: + self._removeD2IM(dest[idx]) + self._removeSIP(dest[idx]) + self._removeLUT(dest[idx]) + self._removePrimaryWCS(dest[idx]) + self._removeIDCCoeffs(dest[idx]) + try: + del dest[idx].header.ascard['VAFACTOR'] + except KeyError: + pass + + self._removeRefFiles(dest[0]) + self._removeAltWCS(dest, ext=range(numext)) + numwdvarr = countExtn(dest, 'WCSDVARR') + numd2im = countExtn(dest, 'D2IMARR') + for idx in range(1, numwdvarr + 1): + del dest[('WCSDVARR', idx)] + for idx in range(1, numd2im + 1): + del dest[('D2IMARR', idx)] + + def _removeRefFiles(self, phdu): + """ + phdu: Primary HDU + """ + refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE'] + for kw in refkw: + try: + del phdu.header.ascard[kw] + except KeyError: + pass + + def _removeSIP(self, ext): + """ + Remove the SIP distortion of a FITS extension + """ + + self.hdr_logger.debug("Removing SIP distortion from (%s, %s)" + % (ext.name, ext._extver)) + for prefix in ['A', 'B', 'AP', 'BP']: + try: + order = ext.header[prefix + '_ORDER'] + del ext.header[prefix + '_ORDER'] + except KeyError: + continue + for i in range(order + 1): + for j in range(order + 1): + key = prefix + '_%d_%d' % (i, j) + try: + del ext.header[key] + except KeyError: + pass + try: + del ext.header['IDCTAB'] + except KeyError: + pass + + def _removeLUT(self, ext): + """ + Remove the Lookup Table distortion of a FITS extension + """ + + self.hdr_logger.debug("Removing LUT distortion from (%s, %s)" + % (ext.name, ext._extver)) + try: + cpdis = ext.header['CPDIS*'] + except KeyError: + return + try: + for c in range(1, len(cpdis) + 1): + del ext.header['DP%s.*...' % c] + del ext.header[cpdis[c - 1].key] + del ext.header['CPERR*'] + del ext.header['NPOLFILE'] + del ext.header['NPOLEXT'] + except KeyError: + pass + + def _removeD2IM(self, ext): + """ + Remove the Detector to Image correction of a FITS extension + """ + + self.hdr_logger.debug("Removing D2IM correction from (%s, %s)" + % (ext.name, ext._extver)) + d2imkeys = ['D2IMFILE', 'AXISCORR', 'D2IMEXT', 'D2IMERR'] + for k in d2imkeys: + try: + del ext.header[k] + except KeyError: + pass + + def _removeAltWCS(self, dest, ext): + """ + Remove Alternate WCSs of a FITS extension. + A WCS with wcskey 'O' is never deleted. + """ + dkeys = altwcs.wcskeys(dest[('SCI', 1)].header) + self.hdr_logger.debug("Removing alternate WCSs with keys %s from %s" + % (dkeys, dest.filename())) + for k in dkeys: + altwcs.deleteWCS(dest, ext=ext, wcskey=k) + + def _removePrimaryWCS(self, ext): + """ + Remove the primary WCS of a FITS extension + """ + + self.hdr_logger.debug("Removing Primary WCS from (%s, %s)" + % (ext.name, ext._extver)) + naxis = ext.header.ascard['NAXIS'].value + for key in basic_wcs: + for i in range(1, naxis + 1): + try: + del ext.header.ascard[key + str(i)] + except KeyError: + pass + try: + del ext.header.ascard['WCSAXES'] + except KeyError: + pass + + def _removeIDCCoeffs(self, ext): + """ + Remove IDC coefficients of a FITS extension + """ + + self.hdr_logger.debug("Removing IDC coefficient from (%s, %s)" + % (ext.name, ext._extver)) + coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE'] + for k in coeffs: + try: + del ext.header.ascard[k] + except KeyError: + pass + + +class HeaderletHDU(pyfits.core._NonstandardExtHDU): + """ + A non-standard extension HDU for encapsulating Headerlets in a file. These + HDUs have an extension type of HDRLET and their EXTNAME is derived from the + Headerlet's HDRNAME. + + The data itself is a tar file containing a single file, which is the + Headerlet file itself. The file name is derived from the HDRNAME keyword, + and should be in the form `<HDRNAME>_hdr.fits`. If the COMPRESS keyword + evaluates to `True`, the tar file is compressed with gzip compression. + + The Headerlet contained in the HDU's data can be accessed by the + `headerlet` attribute. + """ + + _xtn = _extension = 'HDRLET' + + def __init__(self, data=None, header=None): + super(HeaderletHDU, self).__init__(data=data, header=header) + # TODO: This can be removed after the next pyfits release, but for now + # the _ExtensionHDU base class sets self._xtn = '' in its __init__(). + self._xtn = self._extension + # For some reason _NonstandardExtHDU.__init__ sets self.name = None, + # even if it's already been set by the EXTNAME keyword in + # _ExtensionHDU.__init__() -_-; + if header and header.has_key('EXTNAME') and not self.name: + self.name = header['EXTNAME'] + # self._extver, if set, is still preserved. From + # _ExtensionHDU.__init__()...totally inconsistent. + + def __getattr__(self, attr): + if attr == 'data': + size = self.size() + self._file.seek(self._datLoc) + self.__dict__[attr] = self._file.read(size) + elif attr == 'headerlet': + self._file.seek(self._datLoc) + s = StringIO() + # Read the data into a StringIO--reading directly from the file + # won't work (at least for gzipped files) due to problems deep + # within the gzip module that make it difficult to read gzip files + # embedded in another file + s.write(self._file.read(self.size())) + s.seek(0) + if self._header['COMPRESS']: + mode = 'r:gz' + else: + mode = 'r' + t = tarfile.open(mode=mode, fileobj=s) + members = t.getmembers() + if not len(members): + raise ValueError('The Headerlet contents are missing.') + elif len(members) > 1: + warnings.warn('More than one file is contained in this ' + 'only the headerlet file should be present.') + hlt_name = self._header['HDRNAME'] + '_hdr.fits' + try: + hlt_info = t.getmember(hlt_name) + except KeyError: + warnings.warn('The file %s was missing from the HDU data. ' + 'Assuming that the first file in the data is ' + 'headerlet file.' % hlt_name) + hlt_info = members[0] + hlt_file = t.extractfile(hlt_info) + # hlt_file is a file-like object + hlt = Headerlet(hlt_file, mode='readonly') + self.__dict__[attr] = hlt + else: + return pyfits.core._ValidHDU.__getattr__(self, attr) + try: + return self.__dict__[attr] + except KeyError: + raise AttributeError(attr) + + @classmethod + def fromheaderlet(cls, headerlet, compress=False): + """ + Creates a new HeaderletHDU from a given Headerlet object. + + Parameters + ---------- + headerlet : Headerlet + A valid Headerlet object. + compress : bool, optional + Gzip compress the headerlet data. + """ + + phdu = headerlet[0] + phduhdr = phdu.header + hlt_filename = phdu.header['HDRNAME'] + '_hdr.fits' + + # TODO: As it stands there's no good way to write out an HDUList in + # memory, since it automatically closes the given file-like object when + # it's done writing. I'd argue that if passed an open file handler it + # should not close it, but for now we'll have to write to a temp file. + fd, name = tempfile.mkstemp() + try: + f = os.fdopen(fd, 'rb+') + headerlet.writeto(f) + # The tar file itself we'll write in memory, as it should be + # relatively small + if compress: + mode = 'w:gz' + else: + mode = 'w' + s = StringIO() + t = tarfile.open(mode=mode, fileobj=s) + t.add(name, arcname=hlt_filename) + t.close() + finally: + os.remove(name) + + cards = [ + pyfits.Card('XTENSION', cls._extension, 'Headerlet extension'), + pyfits.Card('BITPIX', 8, 'array data type'), + pyfits.Card('NAXIS', 1, 'number of array dimensions'), + pyfits.Card('NAXIS1', len(s.getvalue()), 'Axis length'), + pyfits.Card('PCOUNT', 0, 'number of parameters'), + pyfits.Card('GCOUNT', 1, 'number of groups'), + pyfits.Card('EXTNAME', cls._extension, + 'name of the headerlet extension'), + phdu.header.ascard['HDRNAME'], + phdu.header.ascard['DATE'], + pyfits.Card('SIPNAME', headerlet['SIPWCS', 1].header['WCSNAMEA'], + 'SIP distortion model name'), + phdu.header.ascard['NPOLFILE'], + phdu.header.ascard['D2IMFILE'], + pyfits.Card('COMPRESS', compress, 'Uses gzip compression') + ] + + header = pyfits.Header(pyfits.CardList(cards)) + + hdu = cls(data=pyfits.DELAYED, header=header) + hdu._file = s + hdu._datLoc = 0 + return hdu + + @classmethod + def match_header(cls, header): + """ + This is a class method used in the pyfits refactoring branch to + recognize whether or not this class should be used for instantiating + an HDU object based on values in the header. + + It is included here for forward-compatibility. + """ + + card = header.ascard[0] + if card.key != 'XTENSION': + return False + xtension = card.value.rstrip() + return xtension == cls._extension + + # TODO: Add header verification + + def _summary(self): + # TODO: Perhaps make this more descriptive... + return '%-10s %-12s %4d' % (self.name, self.__class__.__name__, + len(self._header.ascard)) + +# Monkey-patch pyfits to add minimal support for HeaderletHDUs +# TODO: Get rid of this ASAP!!! (it won't be necessary with the pyfits +# refactoring branch) +if not hasattr(pyfits.Header._updateHDUtype, '_patched'): + __old_updateHDUtype = pyfits.Header._updateHDUtype + def __updateHDUtype(self): + if HeaderletHDU.match_header(self): + self._hdutype = HeaderletHDU + else: + __old_updateHDUtype(self) + __updateHDUtype._patched = True + pyfits.Header._updateHDUtype = __updateHDUtype |