diff options
Diffstat (limited to 'wcsutil/headerlet.py')
-rw-r--r-- | wcsutil/headerlet.py | 898 |
1 files changed, 0 insertions, 898 deletions
diff --git a/wcsutil/headerlet.py b/wcsutil/headerlet.py deleted file mode 100644 index a30f06b..0000000 --- a/wcsutil/headerlet.py +++ /dev/null @@ -1,898 +0,0 @@ -from __future__ import division -import logging -import os -import tarfile -import tempfile -import time -import warnings -from cStringIO import StringIO - -import numpy as np -import pyfits - -import altwcs -import wcscorr -from hstwcs import HSTWCS -from mappings import basic_wcs -from pytools.fileutil import countExtn - -module_logger = logging.getLogger('headerlet') - -import atexit -atexit.register(logging.shutdown) - -def setLogger(logger, level, mode='w'): - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - log_filename = 'headerlet.log' - fh = logging.FileHandler(log_filename, mode=mode) - fh.setLevel(logging.DEBUG) - fh.setFormatter(formatter) - logger.addHandler(fh) - logger.setLevel(level) - -def isWCSIdentical(scifile, file2, verbose=False): - """ - Compares the WCS solution of 2 files. - - Parameters - ---------- - scifile: file1 - file2: file2 - verbose: False or a python logging level - (one of 'INFO', 'DEBUG' logging levels) - (an integer representing a logging level) - - Notes - ----- - These can be 2 science observations or 2 headerlets - or a science observation and a headerlet. The two files - have the same WCS solution if the following are the same: - - - rootname/destim - - primary WCS - - SIP coefficients - - NPOL distortion - - D2IM correction - - Velocity aberation - - """ - if verbose: - setLogger(module_logger, verbose) - else: - module_logger.setLevel(100) - - module_logger.info("Starting isWCSIdentical: %s" % time.asctime()) - - result = True - numsci1 = max(countExtn(scifile), countExtn(scifile, 'SIPWCS')) - numsci2 = max(countExtn(file2), countExtn(file2, 'SIPWCS')) - - if numsci1 == 0 or numsci2 == 0 or numsci1 != numsci2: - module_logger.info("Number of SCI and SIPWCS extensions do not match.") - result = False - - if getRootname(scifile) != getRootname(file2): - module_logger.info('Rootnames do not match.') - result = False - try: - extname1 = pyfits.getval(scifile, 'EXTNAME', ext=('SCI', 1)) - except KeyError: - extname1 = 'SIPWCS' - try: - extname2 = pyfits.getval(file2, 'EXTNAME', ext=('SCI', 1)) - except KeyError: - extname2 = 'SIPWCS' - - for i in range(1, numsci1 + 1): - w1 = HSTWCS(scifile, ext=(extname1, i)) - w2 = HSTWCS(file2, ext=(extname2, i)) - if not np.allclose(w1.wcs.crval, w2.wcs.crval, rtol=1e-7) or \ - not np.allclose(w1.wcs.crpix, w2.wcs.crpix, rtol=1e-7) or \ - not np.allclose(w1.wcs.cd, w2.wcs.cd, rtol=1e-7) or \ - not (np.array(w1.wcs.ctype) == np.array(w2.wcs.ctype)).all(): - module_logger.info('Primary WCSs do not match') - result = False - if w1.sip or w2.sip: - if (w2.sip and not w1.sip) or (w1.sip and not w2.sip) or \ - not np.allclose(w1.sip.a, w2.sip.a, rtol=1e-7) or \ - not np.allclose(w1.sip.b, w2.sip.b, rtol=1e-7): - module_logger.info('SIP coefficients do not match') - result = False - if w1.cpdis1 or w2.cpdis1: - if w1.cpdis1 and not w2.cpdis1 or \ - w2.cpdis1 and not w1.cpdis1 or \ - not np.allclose(w1.cpdis1.data, w2.cpdis1.data): - module_logger.info('NPOL distortions do not match') - result = False - if w1.cpdis2 or w2.cpdis2: - if w1.cpdis2 and not w2.cpdis2 or \ - w2.cpdis2 and not w1.cpdis2 or \ - not np.allclose(w1.cpdis2.data, w2.cpdis2.data): - module_logger.info('NPOL distortions do not match') - result = False - if w1.det2im1 or w2.det2im1: - if w1.det2im1 and not w2.det2im1 or \ - w2.det2im1 and not w1.det2im1 or\ - not np.allclose(w1.det2im1.data, w2.det2im1.data): - module_logger.info('Det2Im corrections do not match') - result = False - if w1.det2im2 or w2.det2im2: - if w1.det2im2 and not w2.det2im2 or \ - w2.det2im2 and not w1.det2im2 or\ - not np.allclose(w1.det2im2.data, w2.det2im2.data): - module_logger.info('Det2Im corrections do not match') - result = False - if w1.vafactor != w2.vafactor: - module_logger.info('VA factors do not match') - result = False - - return result - - -# TODO: It would be logical for this to be part of the Headerlet class, perhaps -# as a classmethod -def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, logmode='w'): - """ - Create a headerlet from a science observation - - Parameters - ---------- - fname: string - Name of file with science observation - hdrname: string - Name for the headerlet, stored in the primary header of the headerlet - destim: string - Destination image, stored in the primary header of the headerlet. - If None ROOTNAME is used of the science observation is used. - ROOTNAME has precedence, destim is used for observations without - ROOTNAME in the primary header - output: string - Save the headerlet to the given filename. - verbose: False or a python logging level - (one of 'INFO', 'DEBUG' logging levels) - (an integer representing a logging level) - logmode: 'w', 'a' - used internally for controling access to the log file - """ - - if verbose: - setLogger(module_logger, verbose, mode=logmode) - else: - module_logger.setLevel(100) - - module_logger.info("Starting createHeaderlet: %s" % time.asctime()) - phdukw = {'IDCTAB': True, - 'NPOLFILE': True, - 'D2IMFILE': True} - if not isinstance(fname, pyfits.HDUList): - fobj = pyfits.open(fname) - close_file = True - else: - fobj = fname - close_file = False - if destim is None: - try: - destim = fobj[0].header['ROOTNAME'] - except KeyError: - module_logger.exception('Required keyword "DESTIM" not found') - print 'Please provide a value for the DESTIM keyword' - raise - if hdrname is None: - module_logger.critical("Required keyword 'HDRNAME' not given") - raise ValueError("Please provide a name for the headerlet, HDRNAME is " - "a required parameter.") - - # get the version of STWCS used to create the WCS of the science file. - try: - upwcsver = fobj[0].header.ascard['STWCSVER'] - except KeyError: - upwcsver = pyfits.Card("STWCSVER", " ", - "Version of STWCS used to update the WCS") - - # get all keys for alternate WCSs - altkeys = altwcs.wcskeys(fobj[('SCI', 1)].header) - - if 'O' in altkeys: - altkeys.remove('O') - - numsci = countExtn(fname, 'SCI') - module_logger.debug("Number of 'SCI' extensions in file %s is %s" - % (fname, numsci)) - hdul = pyfits.HDUList() - phdu = pyfits.PrimaryHDU() - phdu.header.update('DESTIM', destim, - comment='Destination observation root name') - phdu.header.update('HDRNAME', hdrname, comment='Headerlet name') - fmt="%Y-%m-%dT%H:%M:%S" - phdu.header.update('DATE', time.strftime(fmt), - comment='Date FITS file was generated') - phdu.header.ascard.append(upwcsver) - - refs = updateRefFiles(fobj[0].header.ascard, phdu.header.ascard, verbose=verbose) - phdukw.update(refs) - phdu.header.update(key='VAFACTOR', - value=fobj[('SCI',1)].header.get('VAFACTOR', 1.)) - hdul.append(phdu) - - for e in range(1, numsci + 1): - hwcs = HSTWCS(fname, ext=('SCI', e)) - h = hwcs.wcs2header(sip2hdr=True).ascard - for ak in altkeys: - awcs = HSTWCS(fname,ext=('SCI', e), wcskey=ak) - h.extend(awcs.wcs2header(idc2hdr=False).ascard) - h.append(pyfits.Card(key='VAFACTOR', value=hwcs.vafactor, - comment='Velocity aberration plate scale factor')) - h.insert(0, pyfits.Card(key='EXTNAME', value='SIPWCS', - comment='Extension name')) - h.insert(1, pyfits.Card(key='EXTVER', value=e, - comment='Extension version')) - - fhdr = fobj[('SCI', e)].header.ascard - if phdukw['NPOLFILE']: - cpdis = fhdr['CPDIS*...'] - for c in range(1, len(cpdis) + 1): - h.append(cpdis[c - 1]) - dp = fhdr['DP%s.*...' % c] - h.extend(dp) - - try: - h.append(fhdr['CPERROR%s' % c]) - except KeyError: - pass - - try: - h.append(fhdr['NPOLEXT']) - except KeyError: - pass - - if phdukw['D2IMFILE']: - try: - h.append(fhdr['D2IMEXT']) - except KeyError: - pass - - try: - h.append(fhdr['AXISCORR']) - except KeyError: - module_logger.exception("'D2IMFILE' kw exists but keyword 'AXISCORR' was not found in " - "%s['SCI',%d]" % (fname, e)) - raise - - try: - h.append(fhdr['D2IMERR']) - except KeyError: - h.append(pyfits.Card(key='DPERROR', value=0, - comment='Maximum error of D2IMARR')) - - hdu = pyfits.ImageHDU(header=pyfits.Header(h)) - # temporary fix for pyfits ticket # 48 - hdu._extver = e - hdul.append(hdu) - numwdvarr = countExtn(fname, 'WCSDVARR') - numd2im = countExtn(fname, 'D2IMARR') - for w in range(1, numwdvarr + 1): - hdu = fobj[('WCSDVARR', w)].copy() - # temporary fix for pyfits ticket # 48 - hdu._extver = w - hdul.append(hdu) - for d in range(1, numd2im + 1): - hdu = fobj[('D2IMARR', d)].copy() - # temporary fix for pyfits ticket # 48 - hdu._extver = d - hdul.append(hdu) - if output is not None: - # write the headerlet to a file - if not output.endswith('_hdr.fits'): - output = output + '_hdr.fits' - hdul.writeto(output, clobber=True) - if close_file: - fobj.close() - return Headerlet(hdul,verbose=verbose, logmode='a') - -def applyHeaderlet(hdrfile, destfile, createheaderlet=True, hdrname=None, - verbose=False): - """ - Apply headerlet 'hdrfile' to a science observation 'destfile' - - Parameters - ---------- - hdrfile: string - Headerlet file - destfile: string - File name of science observation whose WCS solution will be updated - createheaderlet: boolean - True (default): before updating, create a headerlet with the - WCS old solution. - hdrname: string or None (default) - will be the value of the HDRNAME keyword in the headerlet generated - for the old WCS solution. If not specified, a sensible default - will be used. Not required if createheaderlet is False - verbose: False or a python logging level - (one of 'INFO', 'DEBUG' logging levels) - (an integer representing a logging level) - """ - if verbose: - setLogger(module_logger, verbose) - else: - module_logger.setLevel(100) - module_logger.info("Starting applyHeaderlet: %s" % time.asctime()) - hlet = Headerlet(hdrfile, verbose=verbose, logmode='a') - hlet.apply(destfile, createheaderlet=createheaderlet, hdrname=hdrname) - -def updateRefFiles(source, dest, verbose=False): - """ - Update the reference files name in the primary header of 'dest' - using values from 'source' - - Parameters - ---------- - source: pyfits.Header.ascardlist - dest: pyfits.Header.ascardlist - """ - module_logger.info("Updating reference files") - phdukw = {'IDCTAB': True, - 'NPOLFILE': True, - 'D2IMFILE': True} - - try: - wind = dest.index_of('HISTORY') - except KeyError: - wind = len(dest) - for key in phdukw.keys(): - try: - value = source[key] - dest.insert(wind, value) - except KeyError: - # TODO: I don't understand what the point of this is. Is it meant - # for logging purposes? Right now it isn't used. - phdukw[key] = False - return phdukw - -def getRootname(fname): - """ - returns the value of ROOTNAME or DESTIM - """ - - try: - rootname = pyfits.getval(fname, 'ROOTNAME') - except KeyError: - rootname = pyfits.getval(fname, 'DESTIM') - return rootname - -def mapFitsExt2HDUListInd(fname, extname): - """ - Map FITS extensions with 'EXTNAME' to HDUList indexes. - """ - - if not isinstance(fname, pyfits.HDUList): - f = pyfits.open(fname) - close_file = True - else: - f = fname - close_file = False - d = {} - for hdu in f: - # TODO: Replace calls to header.has_key() with `in header` once - # pyfits refactoring branch is in production use - if hdu.header.has_key('EXTNAME') and hdu.header['EXTNAME'] == extname: - extver = hdu.header['EXTVER'] - d[(extname, extver)] = f.index_of((extname, extver)) - if close_file: - f.close() - return d - - -class Headerlet(pyfits.HDUList): - """ - A Headerlet class - Ref: http://stsdas.stsci.edu/stsci_python_sphinxdocs/stwcs/headerlet_def.html - """ - - def __init__(self, fobj, wcskeys=[], mode='copyonwrite', verbose=False, logmode='w'): - """ - Parameters - ---------- - fobj: string - Name of headerlet file, file-like object, a list of HDU - instances, or an HDUList instance - wcskeys: python list - a list of wcskeys to be included in the headerlet - created from the old WCS solution before the - science file is updated. If empty: all alternate (if any) - WCSs are copied to the headerlet. - mode: string, optional - Mode with which to open the given file object - """ - self.verbose = verbose - self.hdr_logger = logging.getLogger('headerlet.Headerlet') - if self.verbose: - setLogger(self.hdr_logger, self.verbose, mode=logmode) - else: - self.hdr_logger.setLevel(100) - - self.hdr_logger.info("Creating a Headerlet object from wcskeys %s" % wcskeys) - self.wcskeys = wcskeys - if not isinstance(fobj, list): - fobj = pyfits.open(fobj, mode=mode) - - super(Headerlet, self).__init__(fobj) - self.fname = self.filename() - self.hdrname = self[0].header["HDRNAME"] - self.stwcsver = self[0].header.get("STWCSVER", "") - self.destim = self[0].header["DESTIM"] - self.idctab = self[0].header.get("IDCTAB", "") - self.npolfile = self[0].header.get("NPOLFILE", "") - self.d2imfile = self[0].header.get("D2IMFILE", "") - self.vafactor = self[1].header.get("VAFACTOR", 1) #None instead of 1? - self.d2imerr = 0 - self.axiscorr = 1 - - def apply(self, dest, createheaderlet=True, hdrname=None, attach=True, createsummary=True): - """ - Apply this headerlet to a file. - - Parameters - ---------- - dest: string - Name of file to which to apply the WCS in the headerlet - hdrname: string - A unique name for the headerlet - createheaderlet: boolean - A flag which indicates if a headerlet should be created - from the old WCS and attached to the science file (default: True) - attach: boolean, default: True - By default the headerlet being applied will be attached - as an extension to the science file. Set 'attach' to False - to disable this. - createsummary: boolean, default: True - Set this to False to disable creating and updating of wcscorr table. - This is used primarily for testing. - """ - self.hverify() - if self.verify_dest(dest): - if not isinstance(dest, pyfits.HDUList): - fobj = pyfits.open(dest, mode='update') - close_dest = True - else: - fobj = dest - close_dest = False - - # Create the WCSCORR HDU/table from the existing WCS keywords if - # necessary - if createsummary: - try: - # TODO: in the pyfits refactoring branch if will be easier to - # test whether an HDUList contains a certain extension HDU - # without relying on try/except - wcscorr_table = fobj['WCSCORR'] - except KeyError: - # The WCSCORR table needs to be created - wcscorr.init_wcscorr(fobj) - - orig_hlt_hdu = None - numhlt = countExtn(fobj, 'HDRLET') - if createheaderlet: - # Create a headerlet for the original WCS data in the file, - # create an HDU from the original headerlet, and append it to - # the file - if not hdrname: - hdrname = fobj[0].header['ROOTNAME'] + '_orig' - orig_hlt = createHeaderlet(fobj, hdrname, verbose=self.verbose, logmode='a') - orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt) - orig_hlt_hdu.update_ext_version(numhlt + 1) - numhlt += 1 - - self._delDestWCS(fobj) - refs = updateRefFiles(self[0].header.ascard, fobj[0].header.ascard, verbose=self.verbose) - numsip = countExtn(self, 'SIPWCS') - for idx in range(1, numsip + 1): - fhdr = fobj[('SCI', idx)].header.ascard - siphdr = self[('SIPWCS', idx)].header.ascard - # a minimal attempt to get the position of the WCS keywords group - # in the header by looking for the PA_APER kw. - # at least make sure the WCS kw are written befir the HISTORY kw - # if everything fails, append the kw to the header - try: - wind = fhdr.index_of('PA_APER') - except KeyError: - try: - wind = fhdr.index_of('HISTORY') - except KeyError: - wind = len(fhdr) - self.hdr_logger.debug("Inserting WCS keywords at index %s" % wind) - # TODO: Drop .keys() when refactored pyfits comes into use - for k in siphdr.keys(): - if k not in ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT', - 'GCOUNT','EXTNAME', 'EXTVER', 'ORIGIN', - 'INHERIT', 'DATE', 'IRAF-TLM']: - fhdr.insert(wind, siphdr[k]) - else: - pass - - #! Always attach these extensions last. Otherwise their headers may - # get updated with the other WCS kw. - numwdvar = countExtn(self, 'WCSDVARR') - numd2im = countExtn(self, 'D2IMARR') - for idx in range(1, numwdvar + 1): - fobj.append(self[('WCSDVARR', idx)].copy()) - for idx in range(1, numd2im + 1): - fobj.append(self[('D2IMARR', idx)].copy()) - - # Update the WCSCORR table with new rows from the headerlet's WCSs - if createsummary: - wcscorr.update_wcscorr(fobj, self, 'SIPWCS') - - # Append the original headerlet - if createheaderlet and orig_hlt_hdu: - fobj.append(orig_hlt_hdu) - - if attach: - # Finally, append an HDU for this headerlet - new_hlt = HeaderletHDU.fromheaderlet(self) - new_hlt.update_ext_version(numhlt + 1) - fobj.append(new_hlt) - - if close_dest: - fobj.close() - else: - self.hdr_logger.critical("Observation %s cannot be updated with headerlet " - "%s" % (fobj.filename(), self.hdrname)) - print "Observation %s cannot be updated with headerlet %s" \ - % (fobj.filename(), self.hdrname) - - - def hverify(self): - self.verify() - assert(self[0].header.has_key('DESTIM') and - self[0].header['DESTIM'].strip()) - assert(self[0].header.has_key('HDRNAME') and - self[0].header['HDRNAME'].strip()) - assert(self[0].header.has_key('STWCSVER')) - - - def verify_dest(self, dest): - """ - verifies that the headerlet can be applied to the observation - - DESTIM in the primary header of the headerlet must match ROOTNAME - of the science file (or the name of the destination file) - """ - - try: - if not isinstance(dest, pyfits.HDUList): - droot = pyfits.getval(dest, 'ROOTNAME') - else: - droot = dest[0].header['ROOTNAME'] - except KeyError: - self.hdr_logger.debug("Keyword 'ROOTNAME' not found in destination file") - droot = dest.split('.fits')[0] - if droot == self.destim: - self.hdr_logger.debug("verify_destim() returned True") - return True - else: - self.hdr_logger.debug("verify_destim() returned False") - return False - - def tofile(self, fname, destim=None, hdrname=None, clobber=False): - if not destim or not hdrname: - self.hverify() - self.writeto(fname, clobber=clobber) - - def _delDestWCS(self, dest): - """ - Delete the WCS of a science file - """ - - self.hdr_logger.info("Deleting all WCSs of file %s" % dest.filename()) - numext = len(dest) - - for idx in range(numext): - # Only delete WCS from extensions which may have WCS keywords - if dest[idx].__dict__.has_key('_xtn') and "IMAGE" in dest[idx]._xtn: - self._removeD2IM(dest[idx]) - self._removeSIP(dest[idx]) - self._removeLUT(dest[idx]) - self._removePrimaryWCS(dest[idx]) - self._removeIDCCoeffs(dest[idx]) - try: - del dest[idx].header.ascard['VAFACTOR'] - except KeyError: - pass - - self._removeRefFiles(dest[0]) - self._removeAltWCS(dest, ext=range(numext)) - numwdvarr = countExtn(dest, 'WCSDVARR') - numd2im = countExtn(dest, 'D2IMARR') - for idx in range(1, numwdvarr + 1): - del dest[('WCSDVARR', idx)] - for idx in range(1, numd2im + 1): - del dest[('D2IMARR', idx)] - - def _removeRefFiles(self, phdu): - """ - phdu: Primary HDU - """ - refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE'] - for kw in refkw: - try: - del phdu.header.ascard[kw] - except KeyError: - pass - - def _removeSIP(self, ext): - """ - Remove the SIP distortion of a FITS extension - """ - - self.hdr_logger.debug("Removing SIP distortion from (%s, %s)" - % (ext.name, ext._extver)) - for prefix in ['A', 'B', 'AP', 'BP']: - try: - order = ext.header[prefix + '_ORDER'] - del ext.header[prefix + '_ORDER'] - except KeyError: - continue - for i in range(order + 1): - for j in range(order + 1): - key = prefix + '_%d_%d' % (i, j) - try: - del ext.header[key] - except KeyError: - pass - try: - del ext.header['IDCTAB'] - except KeyError: - pass - - def _removeLUT(self, ext): - """ - Remove the Lookup Table distortion of a FITS extension - """ - - self.hdr_logger.debug("Removing LUT distortion from (%s, %s)" - % (ext.name, ext._extver)) - try: - cpdis = ext.header['CPDIS*'] - except KeyError: - return - try: - for c in range(1, len(cpdis) + 1): - del ext.header['DP%s.*...' % c] - del ext.header[cpdis[c - 1].key] - del ext.header['CPERR*'] - del ext.header['NPOLFILE'] - del ext.header['NPOLEXT'] - except KeyError: - pass - - def _removeD2IM(self, ext): - """ - Remove the Detector to Image correction of a FITS extension - """ - - self.hdr_logger.debug("Removing D2IM correction from (%s, %s)" - % (ext.name, ext._extver)) - d2imkeys = ['D2IMFILE', 'AXISCORR', 'D2IMEXT', 'D2IMERR'] - for k in d2imkeys: - try: - del ext.header[k] - except KeyError: - pass - - def _removeAltWCS(self, dest, ext): - """ - Remove Alternate WCSs of a FITS extension. - A WCS with wcskey 'O' is never deleted. - """ - dkeys = altwcs.wcskeys(dest[('SCI', 1)].header) - self.hdr_logger.debug("Removing alternate WCSs with keys %s from %s" - % (dkeys, dest.filename())) - for k in dkeys: - altwcs.deleteWCS(dest, ext=ext, wcskey=k) - - def _removePrimaryWCS(self, ext): - """ - Remove the primary WCS of a FITS extension - """ - - self.hdr_logger.debug("Removing Primary WCS from (%s, %s)" - % (ext.name, ext._extver)) - naxis = ext.header.ascard['NAXIS'].value - for key in basic_wcs: - for i in range(1, naxis + 1): - try: - del ext.header.ascard[key + str(i)] - except KeyError: - pass - try: - del ext.header.ascard['WCSAXES'] - except KeyError: - pass - - def _removeIDCCoeffs(self, ext): - """ - Remove IDC coefficients of a FITS extension - """ - - self.hdr_logger.debug("Removing IDC coefficient from (%s, %s)" - % (ext.name, ext._extver)) - coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE'] - for k in coeffs: - try: - del ext.header.ascard[k] - except KeyError: - pass - - -class HeaderletHDU(pyfits.core._NonstandardExtHDU): - """ - A non-standard extension HDU for encapsulating Headerlets in a file. These - HDUs have an extension type of HDRLET and their EXTNAME is derived from the - Headerlet's HDRNAME. - - The data itself is a tar file containing a single file, which is the - Headerlet file itself. The file name is derived from the HDRNAME keyword, - and should be in the form `<HDRNAME>_hdr.fits`. If the COMPRESS keyword - evaluates to `True`, the tar file is compressed with gzip compression. - - The Headerlet contained in the HDU's data can be accessed by the - `headerlet` attribute. - """ - - _xtn = _extension = 'HDRLET' - - def __init__(self, data=None, header=None): - super(HeaderletHDU, self).__init__(data=data, header=header) - # TODO: This can be removed after the next pyfits release, but for now - # the _ExtensionHDU base class sets self._xtn = '' in its __init__(). - self._xtn = self._extension - # For some reason _NonstandardExtHDU.__init__ sets self.name = None, - # even if it's already been set by the EXTNAME keyword in - # _ExtensionHDU.__init__() -_-; - if header and header.has_key('EXTNAME') and not self.name: - self.name = header['EXTNAME'] - # self._extver, if set, is still preserved. From - # _ExtensionHDU.__init__()...totally inconsistent. - - def __getattr__(self, attr): - if attr == 'data': - size = self.size() - self._file.seek(self._datLoc) - self.__dict__[attr] = self._file.read(size) - elif attr == 'headerlet': - self._file.seek(self._datLoc) - s = StringIO() - # Read the data into a StringIO--reading directly from the file - # won't work (at least for gzipped files) due to problems deep - # within the gzip module that make it difficult to read gzip files - # embedded in another file - s.write(self._file.read(self.size())) - s.seek(0) - if self._header['COMPRESS']: - mode = 'r:gz' - else: - mode = 'r' - t = tarfile.open(mode=mode, fileobj=s) - members = t.getmembers() - if not len(members): - raise ValueError('The Headerlet contents are missing.') - elif len(members) > 1: - warnings.warn('More than one file is contained in this ' - 'only the headerlet file should be present.') - hlt_name = self._header['HDRNAME'] + '_hdr.fits' - try: - hlt_info = t.getmember(hlt_name) - except KeyError: - warnings.warn('The file %s was missing from the HDU data. ' - 'Assuming that the first file in the data is ' - 'headerlet file.' % hlt_name) - hlt_info = members[0] - hlt_file = t.extractfile(hlt_info) - # hlt_file is a file-like object - hlt = Headerlet(hlt_file, mode='readonly') - self.__dict__[attr] = hlt - else: - return pyfits.core._ValidHDU.__getattr__(self, attr) - try: - return self.__dict__[attr] - except KeyError: - raise AttributeError(attr) - - @classmethod - def fromheaderlet(cls, headerlet, compress=False): - """ - Creates a new HeaderletHDU from a given Headerlet object. - - Parameters - ---------- - headerlet : Headerlet - A valid Headerlet object. - compress : bool, optional - Gzip compress the headerlet data. - """ - - phdu = headerlet[0] - phduhdr = phdu.header - hlt_filename = phdu.header['HDRNAME'] + '_hdr.fits' - - # TODO: As it stands there's no good way to write out an HDUList in - # memory, since it automatically closes the given file-like object when - # it's done writing. I'd argue that if passed an open file handler it - # should not close it, but for now we'll have to write to a temp file. - fd, name = tempfile.mkstemp() - try: - f = os.fdopen(fd, 'rb+') - headerlet.writeto(f) - # The tar file itself we'll write in memory, as it should be - # relatively small - if compress: - mode = 'w:gz' - else: - mode = 'w' - s = StringIO() - t = tarfile.open(mode=mode, fileobj=s) - t.add(name, arcname=hlt_filename) - t.close() - finally: - os.remove(name) - - cards = [ - pyfits.Card('XTENSION', cls._extension, 'Headerlet extension'), - pyfits.Card('BITPIX', 8, 'array data type'), - pyfits.Card('NAXIS', 1, 'number of array dimensions'), - pyfits.Card('NAXIS1', len(s.getvalue()), 'Axis length'), - pyfits.Card('PCOUNT', 0, 'number of parameters'), - pyfits.Card('GCOUNT', 1, 'number of groups'), - pyfits.Card('EXTNAME', cls._extension, - 'name of the headerlet extension'), - phdu.header.ascard['HDRNAME'], - phdu.header.ascard['DATE'], - pyfits.Card('SIPNAME', headerlet['SIPWCS', 1].header['WCSNAMEA'], - 'SIP distortion model name'), - phdu.header.ascard['NPOLFILE'], - phdu.header.ascard['D2IMFILE'], - pyfits.Card('COMPRESS', compress, 'Uses gzip compression') - ] - - header = pyfits.Header(pyfits.CardList(cards)) - - hdu = cls(data=pyfits.DELAYED, header=header) - hdu._file = s - hdu._datLoc = 0 - return hdu - - @classmethod - def match_header(cls, header): - """ - This is a class method used in the pyfits refactoring branch to - recognize whether or not this class should be used for instantiating - an HDU object based on values in the header. - - It is included here for forward-compatibility. - """ - - card = header.ascard[0] - if card.key != 'XTENSION': - return False - xtension = card.value.rstrip() - return xtension == cls._extension - - # TODO: Add header verification - - def _summary(self): - # TODO: Perhaps make this more descriptive... - return '%-10s %-12s %4d' % (self.name, self.__class__.__name__, - len(self._header.ascard)) - -# Monkey-patch pyfits to add minimal support for HeaderletHDUs -# TODO: Get rid of this ASAP!!! (it won't be necessary with the pyfits -# refactoring branch) -if not hasattr(pyfits.Header._updateHDUtype, '_patched'): - __old_updateHDUtype = pyfits.Header._updateHDUtype - def __updateHDUtype(self): - if HeaderletHDU.match_header(self): - self._hdutype = HeaderletHDU - else: - __old_updateHDUtype(self) - __updateHDUtype._patched = True - pyfits.Header._updateHDUtype = __updateHDUtype |