from __future__ import division import time import numpy as np import pyfits from hstwcs import HSTWCS import altwcs from pyfits import HDUList from mappings import basic_wcs def isWCSIdentical(scifile, file2): """ Compares the WCS solution of 2 files. These can be 2 science observations or 2 headerlets or a science observation and a headerlet. The two files have the same WCS solution if the following are the same: - rootname/destim - primary WCS - SIP coefficients - NPOL distortion - D2IM correction - Velocity aberation """ result = True numsci1 = max(countext(scifile, 'SCI'), countext(scifile, 'SIPWCS')) numsci2 = max(countext(file2, 'SCI'), countext(file2, 'SIPWCS')) if numsci1 == 0 or numsci2==0 or numsci1!= numsci2: print "Number of SCI and SIPWCS extensions do not match." result = False if getRootname(scifile) != getRootname(file2): print 'Rootnames do not match.' result = False try: extname1 = pyfits.getval(scifile, 'EXTNAME', ext=('SCI', 1)) except KeyError: extname1 = 'SIPWCS' try: extname2 = pyfits.getval(file2, 'EXTNAME', ext=('SCI', 1)) except KeyError: extname2 = 'SIPWCS' for i in range(1, numsci1+1): w1 = HSTWCS(scifile,ext=(extname1,i)) w2 = HSTWCS(file2, ext=(extname2,i)) if not (w1.wcs.crval == w2.wcs.crval).all() or \ not (w1.wcs.crpix == w2.wcs.crpix).all() or \ not (w1.wcs.cd == w2.wcs.cd).all() or \ not (np.array(w1.wcs.ctype) == np.array(w2.wcs.ctype)).all(): print 'Primary WCSs do not match' result = False if w1.sip or w2.sip: if (w2.sip and not w1.sip) or (w1.sip and not w2.sip) or \ not (w1.sip.a == w2.sip.a).all() or \ not (w1.sip.b == w2.sip.b).all(): print 'SIP coefficients do not match' result = False if w1.cpdis1 or w2.cipdis1: if w1.cpdis1 and not w2.cpdis1 or \ w2.cpdis1 and not w1.cpdis1 or \ not (w1.cpdis1.data == w2.cpdis1.data).all(): print 'NPOL distortions do not match' result = False if w1.cpdis2 or w2.cpdis2: if w1.cpdis2 and not w2.cpdis2 or \ w2.cpdis2 and not w1.cpdis2 or \ not (w1.cpdis2.data == w2.cpdis2.data).all(): print 'NPOL distortions do not match' result = False if w1.det2im1 or w2.det2im1: if w1.det2im1 and not w2.det2im1 or \ w2.det2im1 and not w1.det2im1 or\ not (w1.det2im1.data == w2.det2im1.data).all(): print 'Det2Im corrections do not match' result = False if w1.det2im2 or w2.det2im2: if w1.det2im2 and not w2.det2im2 or \ w2.det2im2 and not w1.det2im2 or\ not (w1.det2im2.data == w2.det2im2.data).all(): print 'Det2Im corrections do not match' result = False if w1.vafactor != w2.vafactor: print 'VA factors do not match' result = False return result def createHeaderlet(fname, hdrname, destim=None, output=None): """ Create a headerlet from a science observation Parameters ---------- fname: string Name of file with science observation hdrname: string Name for the headerlet, stored in the primary header of the headerlet If output is None, hdrname is used as output destim: string Destination image, stored in the primary header of the headerlet. If None ROOTNAME is used of the science observation is used. ROOTNAME has precedence, destim is used for observations without ROOTNAME in the primary header output: string Name for the headerlet file. HDRNAME is used if output is not given. """ fmt="%Y-%m-%dT%H:%M:%S" phdukw = {'IDCTAB': True, 'NPOLFILE': True, 'D2IMFILE': True} fobj = pyfits.open(fname) if destim is None: try: destim = fobj[0].header['ROOTNAME'] except KeyError: raise ValueError, 'Please provide a value for the DESTIM keyword' if hdrname is None: raise ValueError, "Please provide a name for the headerlet, HDRNAME is a required parameter." altkeys = altwcs.wcskeys(fobj[('SCI',1)].header) try: upwcsver = fobj[0].header.ascard['STWCSVER'] except KeyError: upwcsver = pyfits.Card("STWCSVER", " ","Version of STWCS used to update the WCS") if 'O' in altkeys: altkeys.remove('O') numsci = countext(fname, 'SCI') hdul = pyfits.HDUList() phdu = pyfits.PrimaryHDU() phdu.header.update('DESTIM', destim, comment='Destination observation root name') phdu.header.update('HDRNAME',hdrname, comment='Headerlet name') phdu.header.update('DATE',time.strftime(fmt), comment='Date FITS file was generated') phdu.header.ascard.append(upwcsver) updateRefFiles(fobj[0].header.ascard, phdu.header.ascard) phdu.header.update(key='VAFACTOR', value=fobj[('SCI',1)].header.get('VAFACTOR', 1.)) hdul.append(phdu) for e in range(1,numsci+1): hwcs = HSTWCS(fname,ext=('SCI',e)) h = hwcs.wcs2header(sip2hdr=True).ascard for ak in altkeys: awcs = HSTWCS(fname,ext=('SCI',e), wcskey=ak) h.extend(awcs.wcs2header(idc2hdr=False).ascard) h.insert(0,pyfits.Card(key='EXTNAME', value='SIPWCS', comment='Extension name')) h.insert(1,pyfits.Card(key='EXTVER', value=e, comment='Extension version')) fhdr = fobj[('SCI',e)].header.ascard if phdukw['NPOLFILE']: cpdis = fhdr['CPDIS*...'] for c in range(1,len(cpdis)+1): h.append(cpdis[c-1]) dp = fhdr['DP%s.*...'%c] h.extend(dp) try: h.append(fhdr['CPERROR%s'%c]) except KeyError: pass try: h.append(fhdr['NPOLEXT']) except KeyError: pass if phdukw['D2IMFILE']: try: h.append(fhdr['D2IMEXT']) except KeyError: pass try: h.append(fhdr['AXISCORR']) except KeyError: print "Required keyword AXISCORR was not found in %s['SCI',%d]" %(fname,e) print "Unable to create headerlet ... quiting\n" raise try: h.append(fhdr['D2IMERR']) except KeyError: h.append(pyfits.Card(key='DPERROR', value=0, comment='Maximum error of D2IMARR')) hdu = pyfits.ImageHDU(header=pyfits.Header(h)) # temporary fix for pyfits ticket # 48 hdu._extver = e hdul.append(hdu) numwdvarr = countext(fname, 'WCSDVARR') numd2im = countext(fname, 'D2IMARR') for w in range(1, numwdvarr+1): hdu = fobj[('WCSDVARR',w)].copy() # temporary fix for pyfits ticket # 48 hdu._extver = w hdul.append(hdu) for d in range(1, numd2im+1): hdu = fobj[('D2IMARR',d)].copy() # temporary fix for pyfits ticket # 48 hdu._extver = d hdul.append(hdu) if output is not None: # write the headerlet to a file if not output.endswith('_hdr.fits'): output = output+'_hdr.fits' hdul.writeto(output,clobber=True) fobj.close() return Headerlet(hdul) def applyHeaderlet(hdrfile, destfile, destim=None, hdrname=None, createheaderlet=True): """ Apply headerlet 'hdrfile' to a science observation 'destfile' Parameters ---------- hdrfile: string Headerlet file destfile: string File name of science observation wyhose WCS solution willbe updated destim: string or None (default) ROOTNAME of destfile (default) This string will be written as the DESTIM keyword in the headerlet, created from the old WCS solution Normally it should be None, unless the science file is missing the ROOTNAME kw, in which case the name of the file (stripped of .fits) should be specified. hdrname: string or None (default) will be the value of the HDRNAME keyword in the headerlet. It's not required if createheaderlet is False createheaderlet: boolean True (default): before updating, create a headerlet with the WCS old solution. """ hlet = Headerlet(hdrfile) hlet.apply2obs(destfile, destim=destim, hdrname=hdrname, createheaderlet=createheaderlet) def updateRefFiles(source, dest): """ Update the reference files name in the primary header of 'dest' using values from 'source' Parameters ---------- source: pyfits.Header.ascardlist dest: pyfits.Header.ascardlist """ phdukw = {'IDCTAB': True, 'NPOLFILE': True, 'D2IMFILE': True} try: wind = dest.index_of('HISTORY') except KeyError: wind = len(dest) for key in phdukw.keys(): try: value = source[key] dest.insert(wind, value) except KeyError: phdukw[key] = False def getRootname(fname): """ returns the value of ROOTNAME or DESTIM """ try: rootname = pyfits.getval(fname, 'ROOTNAME') except KeyError: rootname = pyfits.getval(fname, 'DESTIM') return rootname def mapFitsExt2HDUListInd(fname, extname): """ Map FITS extensions with 'EXTNAME' to HDUList indexes. """ if isinstance(fname, str): f = pyfits.open(fname) else: f = fname d = {} for hdu in f: if hdu.header.has_key('EXTNAME') and hdu.header['EXTNAME'] == extname: extver = hdu.header['EXTVER'] d[(extname, extver)] = f.index_of((extname, extver)) f.close() return d def countext(fname, extname): """ Return the number of extensions with EXTNAME in the file. """ if isinstance(fname, str): f = pyfits.open(fname) else: f = fname numext = 0 for hdu in f: if hdu.header.has_key('EXTNAME') and hdu.header['EXTNAME'] == extname: numext+=1 return numext def _delDestWCS(dest): """ Delete the WCS of a science file """ fobj = pyfits.open(dest, mode='update') numext = len(fobj) for e in range(numext): _removeD2IM(fobj[e]) _removeSIP(fobj[e]) _removeLUT(fobj[e]) _removePrimaryWCS(fobj[e]) _removeIDCCoeffs(fobj[e]) try: del fobj[e].header.ascard['VAFACTOR'] except KeyError: pass _removeAltWCS(fobj, ext=range(numext)) numwdvarr = countext(dest,'WCSDVARR') numd2im = countext(dest,'D2IMARR') for i in range(1, numwdvarr+1): del fobj[('WCSDVARR',i)] for i in range(1, numd2im+1): del fobj[('D2IMARR',i)] fobj.close() def _removeSIP(ext): """ Remove the SIP distortion of a FITS extension """ for prefix in ['A','B', 'AP', 'BP']: try: order = ext.header[prefix+'_ORDER'] del ext.header[prefix+'_ORDER'] except KeyError: continue for i in range(order+1): for j in range(order+1): key = prefix + '_%d_%d' % (i,j) try: del ext.header[key] except KeyError: pass try: del ext.header['IDCTAB'] except KeyError: pass def _removeLUT(ext): """ Remove the Lookup Table distortion of a FITS extension """ try: cpdis = ext.header['CPDIS*'] except KeyError: return try: for c in range(1,len(cpdis)+1): del ext.header['DP%s.*...'%c] del ext.header[cpdis[c-1].key] del ext.header['CPERR*'] del ext.header['NPOLFILE'] del ext.header['NPOLEXT'] except KeyError: pass def _removeD2IM(ext): """ Remove the Detector to Image correction of a FITS extension """ d2imkeys = ['D2IMFILE', 'AXISCORR', 'D2IMEXT', 'D2IMERR'] for k in d2imkeys: try: del ext.header[k] except KeyError: pass def _removeAltWCS(dest, ext): """ Remove Alternate WCSs of a FITS extension. A WCS with wcskey 'O' is never deleted. """ dkeys = altwcs.wcskeys(dest[('SCI',1)].header) for k in dkeys: altwcs.deleteWCS(dest, ext=ext, wcskey=k) def _removePrimaryWCS(ext): """ Remove the primary WCS of a FITS extension """ naxis = ext.header.ascard['NAXIS'].value for key in basic_wcs: for i in range(1,naxis+1): try: del ext.header.ascard[key+str(i)] except KeyError: pass try: del ext.header.ascard['WCSAXES'] except KeyError: pass def _removeIDCCoeffs(ext): """ Remove IDC coefficients of a FITS extension """ coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE'] for k in coeffs: try: del ext.header.ascard[k] except KeyError: pass class Headerlet(HDUList): """ A Headerlet class Ref: http://stsdas.stsci.edu/stsci_python_sphinxdocs/stwcs/headerlet_def.html """ def __init__(self, fname, wcskeys=[]): """ Parameters ---------- fname: string Name of headerlet file wcskeys: python list a list of wcskeys to be included in the headerlet created from the old WCS solution before the science file is updated. If empty: all alternate (if any) WCSs are copied to the headerlet. """ self.wcskeys = wcskeys if isinstance(fname,str): fobj = pyfits.open(fname) elif isinstance(fname, list): fobj = fname else: raise ValueError, "Input must be a file name (string) or a pyfits file object (HDUList)." HDUList.__init__(self, fobj) self.fname = self.filename() self.hdrname = self[0].header["HDRNAME"] self.stwcsver = self[0].header.get("STWCSVER","") self.destim = self[0].header["DESTIM"] self.idctab = self[0].header.get("IDCTAB","") self.npolfile = self[0].header.get("NPOLFILE","") self.d2imfile = self[0].header.get("D2IMFILE","") self.vafactor = self[1].header.get("VAFACTOR", 1) #None instead of 1? self.d2imerr = 0 self.axiscorr = 1 def apply2obs(self,dest, destim=None, hdrname=None, createheaderlet=True): """ apply this headerlet to a file """ self.hverify() if self.verify_dest(dest): if createheaderlet: createHeaderlet(dest, hdrname, destim) _delDestWCS(dest) fobj = pyfits.open(dest, mode='update') updateRefFiles(self[0].header.ascard, fobj[0].header.ascard) numsip = countext(self, 'SIPWCS') for i in range(1,numsip+1): fhdr = fobj[('SCI',i)].header.ascard siphdr = self[('SIPWCS',i)].header.ascard # a minimal attempt to get the position of the WCS keywords group # in the header by looking for the PA_APER kw. # at least make sure the WCS kw are written befir the HISTORY kw # if everything fails, append the kw to the header try: wind = fhdr.index_of('PA_APER') except KeyError: try: wind = fhdr.index_of('HISTORY') except KeyError: wind = len(fhdr) for k in siphdr.keys(): if k not in ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT', 'GCOUNT','EXTNAME', 'EXTVER', 'ORIGIN', 'INHERIT', 'DATE', 'IRAF-TLM']: fhdr.insert(wind, siphdr[k]) else: pass #! Always attach these extensions last. Otherwise therr headers may # get updated with the other WCS kw. numwdvar = countext(self,'WCSDVARR') numd2im = countext(self, 'D2IMARR') for e in range(1,numwdvar+1): hdu = self[('WCSDVARR',e)].copy() fobj.append(self[('WCSDVARR',e)].copy()) for e in range(1, numd2im+1): fobj.append(self[('D2IMARR',e)].copy()) fobj.close() else: print "Observation %s cannot be updated with headerlet %s" %(dest, self.hdrname) def hverify(self): self.verify() assert(self[0].header.has_key('DESTIM') and self[0].header['DESTIM']!= " ") assert(self[0].header.has_key('HDRNAME') and self[0].header['HDRNAME']!= " ") assert(self[0].header.has_key('STWCSVER')) def verify_dest(self, dest): """ verifies that the headerlet can be applied to the observation DESTIM in the primary header of the headerlet must match ROOTNAME of the science file (or the name of the destination file) """ try: droot = pyfits.getval(dest, 'ROOTNAME') except KeyError: droot = dest.split('.fits')[0] if droot == self.destim: return True else: return False def tofile(self, fname, destim=None, hdrname=None, clobber=False): if not destim or not hdrname: self.hverify() self.writeto(fname, clobber=clobber)