diff options
Diffstat (limited to 'lib/stwcs/wcsutil/headerlet.py')
-rw-r--r-- | lib/stwcs/wcsutil/headerlet.py | 400 |
1 files changed, 262 insertions, 138 deletions
diff --git a/lib/stwcs/wcsutil/headerlet.py b/lib/stwcs/wcsutil/headerlet.py index b1b873b..838499e 100644 --- a/lib/stwcs/wcsutil/headerlet.py +++ b/lib/stwcs/wcsutil/headerlet.py @@ -15,6 +15,7 @@ import wcscorr from hstwcs import HSTWCS from mappings import basic_wcs from stsci.tools.fileutil import countExtn +from stsci.tools import fileutil as fu module_logger = logging.getLogger('headerlet') @@ -30,6 +31,17 @@ def setLogger(logger, level, mode='w'): logger.addHandler(fh) logger.setLevel(level) +def hdrnames(fobj): + """ + Returns a list of HDRNAME keywords from all HeaderletHDU + extensions in a science file. + + Parameters + ---------- + fobj: string, pyfits.HDUList + """ + + def isWCSIdentical(scifile, file2, verbose=False): """ Compares the WCS solution of 2 files. @@ -129,31 +141,69 @@ def isWCSIdentical(scifile, file2, verbose=False): return result -# TODO: It would be logical for this to be part of the Headerlet class, perhaps -# as a classmethod -def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, logmode='w'): +def create_headerlet(fname, sciext=None, hdrname=None, destim=None, wcskey=" ", wcsname=None, + sipname=None, npolfile=None, d2imfile=None, + verbose=100, logmode='w'): """ - Create a headerlet from a science observation - + Create a headerlet from a WCS in a science file + If both wcskey and wcsname are given they should match, if not + raise an Exception + Parameters ---------- - fname: string - Name of file with science observation + fname: string or HDUList + science file + sciext: string or python list + Extension in which the science data is. The headerlet will be created + from these extensions. + If string - a valid EXTNAME is expected + If list - a list of FITS extension numbers or extension tuples ('SCI', 1) + is expected. + If None, loops over all extensions in the file, including the primary hdrname: string - Name for the headerlet, stored in the primary header of the headerlet - destim: string - Destination image, stored in the primary header of the headerlet. - If None ROOTNAME is used of the science observation is used. - ROOTNAME has precedence, destim is used for observations without - ROOTNAME in the primary header - output: string - Save the headerlet to the given filename. - verbose: False or a python logging level - (one of 'INFO', 'DEBUG' logging levels) - (an integer representing a logging level) - logmode: 'w', 'a' - used internally for controling access to the log file + value of HDRNAME keyword + Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey> + It stops if neither is found in the science file and a value is not provided + destim: string or None + name of file this headerlet can be applied to + if None, use ROOTNAME keyword + wcskey: char (A...Z) or " " or None + a char representing an alternate WCS to be used for the headerlet + if " ", use the primary (default) + if None use wcsname + wcsname: string or None + if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet + sipname: string or None (default) + Name of unique file where the polynomial distortion coefficients were + read from. If None, the behavior is: + The code looks for a keyword 'SIPNAME' in the science header + If not found, for HST it defaults to 'IDCTAB' + If there is no SIP model the value is 'NOMODEL' + If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN' + npolfile: string or None (default) + Name of a unique file where the non-polynomial distortion was stored. + If None: + The code looks for 'NPOLFILE' in science header. + If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL' + If npol model exists, it is set to 'UNKNOWN' + d2imfile: string + Name of a unique file where the detector to image correction was + stored. If None: + The code looks for 'D2IMFILE' in the science header. + If 'D2IMFILE' is not found and there is no d2im correction, + it is set to 'NOMODEL' + If d2im correction exists, but 'D2IMFILE' is missing from science + header, it is set to 'UNKNOWN' + verbose: int + python logging level + logmode: 'w' or 'a' + log file open mode + + Returns + ------- + Headerlet object """ + if verbose: setLogger(module_logger, verbose, mode=logmode) @@ -170,103 +220,159 @@ def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, log else: fobj = fname close_file = False + + # get all required keywords if destim is None: try: destim = fobj[0].header['ROOTNAME'] except KeyError: - module_logger.exception('Required keyword "DESTIM" not found') - print 'Please provide a value for the DESTIM keyword' - raise - if hdrname is None: - module_logger.critical("Required keyword 'HDRNAME' not given") - raise ValueError("Please provide a name for the headerlet, HDRNAME is " - "a required parameter.") - + destim = fname + module_logger.info('DESTIM not provided') + module_logger.info('Keyword "ROOTNAME" not found') + module_logger.info('Using file name as DESTIM') + + if not hdrname: + # check if HDRNAME<wcskey> is in header + hdrname = "".join(["HDRNAME",wcskey.upper()]) + try: + hdrname = fobj[1].header['HDRNAME'] + except KeyError: + try: + hdrname = fobj[1].header['WCSNAME'] + except KeyError as detail: + message = "Required keyword 'HDRNAME' not given" + module_logger.critical(message) + print message, detail + + if not wcsname: + wname = "".join(["WCSNAME",wcskey.upper()]) + try: + wcsname = fobj[1].header[wname] + except KeyError as detail: + message = "Missing required keyword 'WCSNAME'." + module_logger.critical(message) + print message, detail + + if not sipname: + try: + sipname = fobj[0].header["SIPNAME"] + except KeyError: + try: + sipname = fobj[0].header["IDCTAB"] + except KeyError: + if 'A_ORDER' in fobj[1].header or 'B_ORDER' in fobj[1].header: + sipname = 'UNKNOWN' + else: + sipname = 'NOMODEL' + + if not npolfile: + try: + npolfile = fobj[0].header["NPOLFILE"] + except KeyError: + if countExtn(f, 'WCSDVARR'): + npolfile = 'UNKNOWN' + else: + npolfile = 'NOMODEL' + + if not d2imfile: + try: + d2imfile = fobj[0].header["D2IMFILE"] + except KeyError: + if countExtn(f, 'D2IMARR'): + npolfile = 'UNKNOWN' + else: + npolfile = 'NOMODEL' + + distname = "_".join([sipname, npolfile, d2imfile]) + # get the version of STWCS used to create the WCS of the science file. try: upwcsver = fobj[0].header.ascard['STWCSVER'] except KeyError: upwcsver = pyfits.Card("STWCSVER", " ", "Version of STWCS used to update the WCS") - - # get all keys for alternate WCSs - altkeys = altwcs.wcskeys(fobj[('SCI', 1)].header) - - if 'O' in altkeys: - altkeys.remove('O') - - numsci = countExtn(fname, 'SCI') - module_logger.debug("Number of 'SCI' extensions in file %s is %s" - % (fname, numsci)) + try: + pywcsver = fobj[0].header.ascard['PYWCSVER'] + except KeyError: + pywcsver = pyfits.Card("PYWCSVER", " ", + "Version of PYWCS used to update the WCS") + + if not sciext: + sciext = range(fobj) + elif isinstance(sciext, str): + numsciext = countExtn(fobj, sciext) + sciext = [(sciext, i) for i in range(1, numsciext+1)] + elif isinstance(sciext, list): + pass + else: + raise ValueError("Expected sciext to be a list of FITS extensions with science data or a string of valid EXTNAME") + + if wcskey is 'O': + message = "Warning: 'O' is a reserved key for the original WCS" + module_logger.info(message) + print message + + module_logger.debug("Data extensions form which to create headerlet:\n\t %s" + % (str(sciext))) hdul = pyfits.HDUList() - phdu = pyfits.PrimaryHDU() - phdu.header.update('DESTIM', destim, - comment='Destination observation root name') - phdu.header.update('HDRNAME', hdrname, comment='Headerlet name') - fmt="%Y-%m-%dT%H:%M:%S" - phdu.header.update('DATE', time.strftime(fmt), - comment='Date FITS file was generated') - phdu.header.ascard.append(upwcsver) - - refs = updateRefFiles(fobj[0].header.ascard, phdu.header.ascard, - verbose=verbose) - phdukw.update(refs) - phdu.header.update(key='VAFACTOR', - value=fobj[('SCI',1)].header.get('VAFACTOR', 1.)) + phdu = _createPrimaryHDU(destim, hdrname, distname, wcsname, + sipname, npolfile, d2imfile, upwcsver, pywcsver) hdul.append(phdu) - - for e in range(1, numsci + 1): - hwcs = HSTWCS(fname, ext=('SCI', e)) - h = hwcs.wcs2header(sip2hdr=True).ascard - for ak in altkeys: - awcs = HSTWCS(fname,ext=('SCI', e), wcskey=ak) - h.extend(awcs.wcs2header(idc2hdr=False).ascard) - h.append(pyfits.Card(key='VAFACTOR', value=hwcs.vafactor, - comment='Velocity aberration plate scale factor')) - h.insert(0, pyfits.Card(key='EXTNAME', value='SIPWCS', - comment='Extension name')) - h.insert(1, pyfits.Card(key='EXTVER', value=e, - comment='Extension version')) - - fhdr = fobj[('SCI', e)].header.ascard - if phdukw['NPOLFILE']: - cpdis = fhdr['CPDIS*...'] - for c in range(1, len(cpdis) + 1): - h.append(cpdis[c - 1]) - dp = fhdr['DP%s.*...' % c] - h.extend(dp) - + + if fu.isFits(fobj)[1] is not 'simple': + + for e in sciext: + hwcs = HSTWCS(fname, ext=e, wcskey=wcskey) + h = hwcs.wcs2header(sip2hdr=True).ascard + h.append(pyfits.Card(key='VAFACTOR', value=hwcs.vafactor, + comment='Velocity aberration plate scale factor')) + h.insert(0, pyfits.Card(key='EXTNAME', value='SIPWCS', + comment='Extension name')) + if isinstance(e, int): val = e + else: val = e[1] + h.insert(1, pyfits.Card(key='EXTVER', value=val, + comment='Extension version')) + h.append(pyfits.Card("SCIEXT", str(e), + "Target science data extension")) + fhdr = fobj[e].header.ascard + if npolfile is not 'NOMODEL': + cpdis = fhdr['CPDIS*...'] + for c in range(1, len(cpdis) + 1): + h.append(cpdis[c - 1]) + dp = fhdr['DP%s.*...' % c] + h.extend(dp) + + try: + h.append(fhdr['CPERROR%s' % c]) + except KeyError: + pass + try: - h.append(fhdr['CPERROR%s' % c]) + h.append(fhdr['NPOLEXT']) except KeyError: pass - - try: - h.append(fhdr['NPOLEXT']) - except KeyError: - pass - - if phdukw['D2IMFILE']: - try: - h.append(fhdr['D2IMEXT']) - except KeyError: - pass - - try: - h.append(fhdr['AXISCORR']) - except KeyError: - module_logger.exception("'D2IMFILE' kw exists but keyword 'AXISCORR' was not found in " - "%s['SCI',%d]" % (fname, e)) - raise - - try: - h.append(fhdr['D2IMERR']) - except KeyError: - h.append(pyfits.Card(key='DPERROR', value=0, - comment='Maximum error of D2IMARR')) - - hdu = pyfits.ImageHDU(header=pyfits.Header(h)) - hdul.append(hdu) + + if d2imfile is not 'NOMODEL': + try: + h.append(fhdr['D2IMEXT']) + except KeyError: + pass + + try: + h.append(fhdr['AXISCORR']) + except KeyError: + module_logger.exception("'D2IMFILE' kw exists but keyword 'AXISCORR' was not found in " + "%s['SCI',%d]" % (fname, e)) + raise + + try: + h.append(fhdr['D2IMERR']) + except KeyError: + h.append(pyfits.Card(key='DPERROR', value=0, + comment='Maximum error of D2IMARR')) + + hdu = pyfits.ImageHDU(header=pyfits.Header(h)) + hdul.append(hdu) numwdvarr = countExtn(fname, 'WCSDVARR') numd2im = countExtn(fname, 'D2IMARR') for w in range(1, numwdvarr + 1): @@ -275,15 +381,30 @@ def createHeaderlet(fname, hdrname, destim=None, output=None, verbose=False, log for d in range(1, numd2im + 1): hdu = fobj[('D2IMARR', d)].copy() hdul.append(hdu) - if output is not None: - # write the headerlet to a file - if not output.endswith('_hdr.fits'): - output = output + '_hdr.fits' - hdul.writeto(output, clobber=True) + if close_file: fobj.close() return Headerlet(hdul,verbose=verbose, logmode='a') +def _createPrimaryHDU(destim, hdrname, distname, wcsname, + sipname, npolfile, d2imfile, upwcsver, pywcsver): + phdu = pyfits.PrimaryHDU() + phdu.header.update('DESTIM', destim, + comment='Destination observation root name') + phdu.header.update('HDRNAME', hdrname, comment='Headerlet name') + fmt="%Y-%m-%dT%H:%M:%S" + phdu.header.update('DATE', time.strftime(fmt), + comment='Date FITS file was generated') + phdu.header.update('WCSNAME', wcsname, comment='WCS name') + phdu.header.update('DISTNAME', distname, comment='Distortion model name') + phdu.header.update('SIPNAME', sipname, comment='origin of SIP polynomial distortion model') + phdu.header.update('NPOLFILE', npolfile, comment='origin of non-polynmial distortion model') + phdu.header.update('D2IMFILE', d2imfile, comment='origin of detector to image correction') + + phdu.header.ascard.append(upwcsver) + phdu.header.ascard.append(pywcsver) + return phdu + def applyHeaderlet(hdrfile, destfile, createheaderlet=True, hdrname=None, verbose=False): """ @@ -378,24 +499,23 @@ def mapFitsExt2HDUListInd(fname, extname): class Headerlet(pyfits.HDUList): """ A Headerlet class - Ref: http://stsdas.stsci.edu/stsci_python_sphinxdocs/stwcs/headerlet_def.html + Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets """ - def __init__(self, fobj, wcskeys=[], mode='copyonwrite', verbose=False, - logmode='w'): + def __init__(self, fobj, mode='copyonwrite', verbose=False, logmode='w'): """ Parameters ---------- fobj: string Name of headerlet file, file-like object, a list of HDU instances, or an HDUList instance - wcskeys: python list - a list of wcskeys to be included in the headerlet - created from the old WCS solution before the - science file is updated. If empty: all alternate (if any) - WCSs are copied to the headerlet. mode: string, optional Mode with which to open the given file object + verbose: int + python logging level, higher numbers trigger more output + logmode: 'w' or 'a' + for internal use only, indicates whether the log file + should be open in attach or write mode """ self.verbose = verbose self.hdr_logger = logging.getLogger('headerlet.Headerlet') @@ -404,44 +524,48 @@ class Headerlet(pyfits.HDUList): else: self.hdr_logger.setLevel(100) - self.hdr_logger.info("Creating a Headerlet object from wcskeys %s" % wcskeys) - self.wcskeys = wcskeys if not isinstance(fobj, list): fobj = pyfits.open(fobj, mode=mode) super(Headerlet, self).__init__(fobj) self.fname = self.filename() self.hdrname = self[0].header["HDRNAME"] + self.wcsname = self[0].header["WCSNAME"] self.stwcsver = self[0].header.get("STWCSVER", "") + self.stwcsver = self[0].header.get("PYWCSVER", "") self.destim = self[0].header["DESTIM"] - self.idctab = self[0].header.get("IDCTAB", "") - self.npolfile = self[0].header.get("NPOLFILE", "") - self.d2imfile = self[0].header.get("D2IMFILE", "") + self.idctab = self[0].header["SIPNAME"] + self.npolfile = self[0].header["NPOLFILE"] + self.d2imfile = self[0].header["D2IMFILE"] + self.distname = self[0].header["DISTNAME"] self.vafactor = self[1].header.get("VAFACTOR", 1) #None instead of 1? self.d2imerr = 0 self.axiscorr = 1 - def apply(self, dest, createheaderlet=True, hdrname=None, attach=True, - createsummary=True): + def apply_as_primary(fobj, attach=True, archive=True, force=False): """ - Apply this headerlet to a file. - + Copy this headerlet as a primary WCS to fobj + Parameters ---------- - dest: string - Name of file to which to apply the WCS in the headerlet - hdrname: string - A unique name for the headerlet - createheaderlet: boolean - A flag which indicates if a headerlet should be created - from the old WCS and attached to the science file (default: True) - attach: boolean, default: True - By default the headerlet being applied will be attached - as an extension to the science file. Set 'attach' to False - to disable this. - createsummary: boolean, default: True - Set this to False to disable creating and updating of wcscorr table. - This is used primarily for testing. + fobj: string, HDUList + science file to which the headerlet should be applied + attach: boolean + flag indicating if the headerlet should be attached as a + HeaderletHDU to fobj. If True checks that HDRNAME is unique + in the fobj and stops if not. + archive: boolean (default is True) + When the distortion model in the headerlet is the same as the + distortion model of the science file, this flag indicates if + the primary WCS should be saved as an alternate and a headerlet + extension. + When the distortion models do not match this flag indicates if + the current primary and alternate WCSs should be archived as + headerlet extensions and alternate WCS. + force: boolean (default is False) + When the distortion models of the headerlet and the primary do + not match, and archive is False this flag forces an update + of the primary """ self.hverify() if self.verify_dest(dest): |