From 2cf8cec79a5ffce3b3ba5b83b77338d2762020a5 Mon Sep 17 00:00:00 2001 From: dencheva Date: Fri, 5 Aug 2011 19:26:06 +0000 Subject: Almost a complete rewrite of altwcs, the goals being: - make all public functions work with HDUList objects in memory (in addition to files) - make all public functions work with simple fits files - remove all HST specific assumptions (like extname values) git-svn-id: http://svn.stsci.edu/svn/ssb/stsci_python/stsci_python/trunk/stwcs@13513 fe389314-cf27-0410-b35b-8c050e845b92 --- lib/stwcs/wcsutil/altwcs.py | 365 ++++++++++++++++++++++++++++---------------- 1 file changed, 231 insertions(+), 134 deletions(-) (limited to 'lib/stwcs/wcsutil/altwcs.py') diff --git a/lib/stwcs/wcsutil/altwcs.py b/lib/stwcs/wcsutil/altwcs.py index d250b52..063df18 100644 --- a/lib/stwcs/wcsutil/altwcs.py +++ b/lib/stwcs/wcsutil/altwcs.py @@ -5,14 +5,15 @@ import string import numpy as np import pywcs import pyfits +from stsci.tools import fileutil as fu altwcskw = ['WCSAXES', 'CRVAL', 'CRPIX', 'PC', 'CDELT', 'CD', 'CTYPE', 'CUNIT', 'PV', 'PS'] # file operations -def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False): +def archiveWCS(fname, ext=None, wcskey=" ", wcsname=" ", reusekey=False): """ - Copy the primary WCS to the hader as an alternate WCS + Copy the primary WCS to the header as an alternate WCS with wcskey and name WCSNAME. It loops over all extensions in 'ext' Parameters @@ -21,14 +22,28 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False): a file name or a file object ext: an int, a tuple, a python list of integers or a python list of tuples (e.g.('sci',1)) fits extensions to work with + if None, work with a list of all extensions wcskey: string "A"-"Z" or " " if " ": get next available key if wcsname is also " " or try to get a key from WCSNAME value wcsname: string Name of alternate WCS description - clobber: boolean - if Ture - overwrites a WCS with the same key + reusekey: boolean + if True - overwrites a WCS with the same key + + Examples + -------- + Copy the primary WCS of an in memory headrlet object to an + alternate WCS with key 'T' + + >>> hlet=headerlet.createHeaderlet('junk.fits', 'hdr1.fits') + >>> altwcs.wcskeys(hlet[1].header) + ['A'] + >>> altwcs.archiveWCS(hlet, ext=[('SIPWCS',1),('SIPWCS',2)], wcskey='T') + >>> altwcs.wcskeys(hlet[1].header) + ['A', 'T'] + See Also -------- wcsutils.restoreWCS: Copy an alternate WCS to the primary WCS @@ -40,21 +55,16 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False): else: f = fname - if not parpasscheck(f, ext, wcskey, wcsname): + if not _parpasscheck(f, ext, wcskey, wcsname): closefobj(fname, f) return - if isinstance(ext, int) or isinstance(ext, tuple): + if not ext: + ext = range(len(f)) + elif isinstance(ext, int) or isinstance(ext, tuple): ext = [ext] - wcsext = 0 - eindx = 0 - for e in f: - if 'extname' in e.header and 'sci' in e.header['extname'].lower(): - wcsext = eindx - break - eindx += 1 - + wcsext = ext[0] if wcskey == " ": # try getting the key from WCSNAME if not wcsname.strip(): @@ -76,7 +86,12 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False): wname = wcsname for e in ext: - w = pywcs.WCS(f[e].header, fobj=f) + try: + w = pywcs.WCS(f[e].header, fobj=f) + except: + # this should be revisited, should catchspecific errors + # KeyError - not a valid key, ValueError - not a valid extension, etc + continue hwcs = w.to_header() wcsnamekey = 'WCSNAME' + wkey f[e].header.update(key=wcsnamekey, value=wcsname) @@ -90,15 +105,17 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False): #f[e].header.update(key=okey, value=norient) closefobj(fname, f) -def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False): +def restoreWCS(f, ext=None, fromext=None, toext=None, wcskey=" ", wcsname=" ", + clobber=False): """ - Copy a WCS with key "WCSKEY" to a primary WCS + Copy a WCS with key "WCSKEY" to the primary WCS Reads in a WCS defined with wcskey and saves it as the primary WCS. - If clobber is False, writes out new files whose names are the original - names with an attached 3 character string _'WCSKEY'_. - Otherwise overwrites the files. Goes sequentially through the list of extensions - The WCS is restored from the 'SCI' extension but the primary WCS of all + If clobber is False, writes out a new file whose name is the original + name with an attached 3 character string _'WCSKEY'_. + Otherwise overwrites the file. Goes sequentially through the list + of extensions. + The WCS is restored from the 'SCI' extension but the WCS of all extensions with the same EXTVER are updated. @@ -106,6 +123,11 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False): ---------- f: string or pyfits.HDUList object a file name or a file object + fromext: string + extname from which to read in the alternate WCS, for example 'SCI' + toext: string or python list + extname or a list of extnames to which the WCS will be copied as + primary, for example ['SCI', 'ERR', 'DQ'] ext: an int, a tuple, a python list of integers or a python list of tuples (e.g.('sci',1)) fits extensions to work with @@ -114,7 +136,7 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False): or " " - find a key from WCSNAMe value wcsname: string (optional) if given and wcskey is " ", will try to restore by WCSNAME value - clobber: boolean + clobber: boolean, default False A flag to define if the original files should be overwritten See Also @@ -130,80 +152,72 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False): else: fobj = f - if not parpasscheck(fobj, ext, wcskey, wcsname): + if not _parpasscheck(fobj, ext=ext, wcskey=wcskey, fromext=fromext, toext=toext, clobber=clobber): closefobj(f, fobj) return if isinstance(ext, int) or isinstance(ext, tuple): ext = [ext] - - if not clobber: - name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey + #if isinstance(fromext, str): + # fromext = [fromext] + if isinstance(toext, str): + toext = [toext] + + # the case of an HDUList object in memory without an associated file + if fobj.filename() is not None: + if not clobber: + name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey + else: + name = fobj.filename() + + simplefits = fu.isFits(fobj)[1] is 'simple' + if simplefits: + wcskeyext = 0 else: - name = fobj.filename() - + wcskeyext = 1 + if wcskey == " ": if wcsname.strip(): - wkey = getKeyFromName(fobj[1].header, wcsname) + wkey = getKeyFromName(fobj[wcskeyext].header, wcsname) if not wkey: print 'Could not get a key from wcsname %s .' % wcsname closefobj(f, fobj) return else: - if wcskey not in wcskeys(fobj[1].header): + if wcskey not in wcskeys(fobj, ext=wcskeyext): print "Could not find alternate WCS with key %s in this file" % wcskey closefobj(f, fobj) return wkey = wcskey - - for e in ext: - try: - extname = fobj[e].header['EXTNAME'].lower() - except KeyError: - continue - #Restore always from a 'SCI' extension but write it out to 'ERR' and 'DQ' - if extname == 'sci': - sciver = fobj[e].header['extver'] - try: - nwcs = pywcs.WCS(fobj[e].header, fobj=fobj, key=wkey) - except KeyError: - print 'restoreWCS: Could not read WCS with key %s in file %s, \ - extension %d' % (wkey, fobj.filename(), e) - closefobj(f, fobj) - return #raise - hwcs = nwcs.to_header() - - if nwcs.wcs.has_cd(): - pc2cd(hwcs, key=wkey) - for k in hwcs.keys(): - key = k[:-1] - if key in fobj[e].header.keys(): - fobj[e].header.update(key=key, value = hwcs[k]) - else: - continue - if wcskey == 'O' and fobj[e].header.has_key('TDDALPHA'): - fobj[e].header['TDDALPHA'] = 0.0 - fobj[e].header['TDDBETA'] = 0.0 - if fobj[e].header.has_key('ORIENTAT'): - norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey])) - fobj[e].header.update(key='ORIENTAT', value=norient) - elif extname in ['err', 'dq', 'sdq', 'time', 'samp']: - cextver = fobj[e].header['extver'] - if cextver == sciver: - for k in hwcs.keys(): - key = k[:-1] - fobj[e].header.update(key=key, value = hwcs[k]) - if fobj[e].header.has_key('ORIENTAT'): - norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey])) - fobj[e].header.update(key='ORIENTAT', value=norient) + + if ext: + # if ext is given ignore fromext and toext. + for e in ext: + _restore(fobj, wkey, fromextnum=e) + else: + # the case when ext is None but fromext and toext are given + # This only works with MEF files. It uses extname, extver. + # For each extname in fromext find the corresponding extver for each + # extname in toext and update the primary WCS. + if fromext and toext: + countext = fu.countExtn(fobj, fromext) + if not countext: + raise KeyError("File does not have extension with extname %s", fromext) + else: + for i in range(1, countext+1): + for toe in toext: + _restore(fobj, fromextnum=i, fromextnam=fromext, toextnum=i, toextnam=toe, ukey=wkey) else: - continue + ext = range(len(fobj)) + for e in ext: + _restore(fobj, wkey, fromextnum=e) - if not clobber: - fobj.writeto(name) - closefobj(f, fobj) + if fobj.filename() is not None: + if not clobber: + fobj.writeto(name) + closefobj(f, fobj) -def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): +def deleteWCS(fname, ext=None, wcskey=" ", wcsname=" "): """ Delete an alternate WCS defined with wcskey. If wcskey is " " try to get a key from WCSNAME. @@ -222,11 +236,13 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): else: fobj = fname - if not parpasscheck(fobj, ext, wcskey, wcsname): + if not _parpasscheck(fobj, ext, wcskey, wcsname): closefobj(fname, fobj) return - if isinstance(ext, int) or isinstance(ext, tuple): + if not ext: + ext = range(len(fobj)) + elif isinstance(ext, int) or isinstance(ext, tuple): ext = [ext] # Do not allow deleting the original WCS. @@ -235,6 +251,11 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): closefobj(fname, fobj) return + simplefits = fu.isFits(fobj)[1] is 'simple' + if simplefits: + wcskeyext = 0 + else: + wcskeyext = 1 if wcskey == " ": # try getting the key from WCSNAME if wcsname == " ": @@ -242,13 +263,13 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): closefobj(fname, fobj) return else: - wkey = getKeyFromName(fobj[1].header, wcsname) + wkey = getKeyFromName(fobj[wcskeyext].header, wcsname) if not wkey: print 'Could not get a key: wcsname "%s" not found in header.' % wcsname closefobj(fname, fobj) return else: - if wcskey not in wcskeys(fobj[1].header): + if wcskey not in wcskeys(fobj[wcskeyext].header): print "Could not find alternate WCS with key %s in this file" % wcskey closefobj(fname, fobj) return @@ -274,8 +295,63 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): print "Did not find WCS with key %s in any of the extensions" % wkey closefobj(fname, fobj) +def _restore(fobj, ukey, fromextnum, toextnum=None, fromextnam=None, toextnam=None): + if fromextnam: + fromextension = (fromextnam, fromextnum) + else: + fromextension = fromextnum + if toextnum: + if toextnam: + toextension = (toextnam, toextnum) + else: + toextension =toextnum + else: + toextension = fromextension + + try: + nwcs = pywcs.WCS(fobj[fromextension].header, fobj=fobj, key=ukey) + except KeyError: + print 'restoreWCS: Could not read WCS with key %s' %ukey + print ' Skipping %s[%s]' % (fobj.filename(), str(fromextension)) + return + hwcs = nwcs.to_header() + + if nwcs.wcs.has_cd(): + pc2cd(hwcs, key=ukey) + for k in hwcs.keys(): + key = k[:-1] + if key in fobj[toextension].header.keys(): + fobj[toextension].header.update(key=key, value = hwcs[k]) + else: + continue + if key == 'O' and fobj[toextension].header.has_key('TDDALPHA'): + fobj[toextension].header['TDDALPHA'] = 0.0 + fobj[toeztension].header['TDDBETA'] = 0.0 + if fobj[toextension].header.has_key('ORIENTAT'): + norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %ukey], hwcs['CD2_2'+'%s' %ukey])) + fobj[toextension].header.update(key='ORIENTAT', value=norient) + #header operations -def wcskeys(header): +def _check_headerpars(fobj, ext): + if not isinstance(fobj, pyfits.Header) and not isinstance(fobj, pyfits.HDUList) \ + and not isinstance(fobj, str): + raise ValueError("Expected a file name, a file object or a header\n") + + if not isinstance(fobj, pyfits.Header): + #raise ValueError("Expected a valid ext parameter when input is a file") + if not isinstance(ext, int) and not isinstance(ext, tuple): + raise ValueError("Expected ext to be a number or a tuple, e.g. ('SCI', 1)\n") + +def _getheader(fobj, ext): + if isinstance(fobj, str): + hdr = pyfits.getheader(fobj,ext) + elif isinstance(fobj, pyfits.Header): + hdr = fobj + else: + hdr = fobj[ext].header + return hdr + +def wcskeys(fobj, ext=None): """ Returns a list of characters used in the header for alternate WCS description with WCSNAME keyword @@ -284,11 +360,12 @@ def wcskeys(header): ---------- hdr: pyfits.Header """ - assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input" - names = header["WCSNAME*"] + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) + names = hdr["WCSNAME*"] return [key.split('WCSNAME')[1].upper() for key in names.keys()] -def wcsnames(header): +def wcsnames(fobj, ext=None): """ Returns a dictionary of wcskey: WCSNAME pairs @@ -296,13 +373,15 @@ def wcsnames(header): ---------- header: pyfits.Header """ - names = header["WCSNAME*"] + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) + names = hdr["WCSNAME*"] d = {} for c in names: d[c.key[-1]] = c.value return d -def available_wcskeys(header): +def available_wcskeys(fobj, ext=None): """ Returns a list of characters which are not used in the header with WCSNAME keyword. Any of them can be used to save a new @@ -312,9 +391,10 @@ def available_wcskeys(header): ---------- header: pyfits.Header """ - assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input" + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) all_keys = list(string.ascii_uppercase) - used_keys = wcskeys(header) + used_keys = wcskeys(hdr) try: used_keys.remove("") except ValueError: @@ -322,7 +402,7 @@ def available_wcskeys(header): [all_keys.remove(key) for key in used_keys] return all_keys -def next_wcskey(header): +def next_wcskey(fobj, ext=None): """ Returns next available character to be used for an alternate WCS @@ -330,8 +410,9 @@ def next_wcskey(header): ---------- header: pyfits.Header """ - assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input" - allkeys = available_wcskeys(header) + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, hdr) + allkeys = available_wcskeys(hdr) if allkeys != []: return allkeys[0] else: @@ -384,61 +465,57 @@ def pc2cd(hdr, key=' '): hdr.update(key='CD'+c+'%s' %key, value=val) return hdr -def parpasscheck(fobj, ext, wcskey, wcsname, clobber=True): - +def _parpasscheck(fobj, ext, wcskey, fromext=None, toext=None, reusekey=False, clobber=False): + """ + Check input parameters to altwcs functions + + fobj: string or pyfits.HDUList object + a file name or a file object + ext: an int, a tuple, a python list of integers or a python list + of tuples (e.g.('sci',1)) + fits extensions to work with + wcskey: a charater + "A"-"Z" or " "- Used for one of 26 alternate WCS definitions + wcsname: string (optional) + if given and wcskey is " ", will try to restore by WCSNAME value + reusekey: boolean + A flag which indicates whether to reuse a wcskey in the header + clobber: boolean + A flag to define if the original files should be overwritten + """ if not isinstance(fobj,pyfits.HDUList): print "First parameter must be a fits file object or a file name." return False - try: - assert (fobj.fileinfo(0)['filemode'] == 'update') - except AssertionError: - print "First parameter must be a file name or a file object opened in 'update' mode." - return False + + # first one covers the case of an object created in memory + # (e.g. headerlet) for which fileinfo returns None + if fobj.fileinfo(0) is None: + pass + else: + # an HDUList object with associated file + if fobj.fileinfo(0)['filemode'] is not 'update' and clobber: + print "First parameter must be a file name or a file object opened in 'update' mode." + return False if not isinstance(ext, int) and not isinstance(ext, tuple) \ - and not isinstance(ext, list): + and not isinstance(ext, list) and ext is not None: print "Ext must be integer, tuple, a list of int extension numbers, \ or a list of tuples representing a fits extension, for example ('sci', 1)." return False + + if not isinstance(fromext, str) and fromext is not None: + print "fromext must be a string representing a valid extname" + return False + if not isinstance(toext, list) and not isinstance(toext, str) and \ + toext is not None : + print "toext must be a string or a list of strings representing extname" + return False + if len(wcskey) != 1: print 'Parameter wcskey must be a character - one of "A"-"Z" or " "' return False - - wcsext = 0 - eindx = 0 - for e in fobj: - if 'extname' in e.header and 'sci' in e.header['extname'].lower(): - wcsext = eindx - break - eindx += 1 - - - if wcskey == " ": - # try getting the key from WCSNAME - """ - if wcsname == " " or wcsname == "": - #wkey = next_wcskey(f[1].header) - #if not wkey: - # print "Could not get a valid key from header" - return False - """ - if wcsname.strip(): - wkey = getKeyFromName(fobj[wcsext].header, wcsname) - if wkey and not clobber: - print 'Wcsname %s is already used.' % wcsname - print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey' - print 'or use "clobber=True" to overwrite the values.' - return False - else: - if wcskey not in available_wcskeys(fobj[wcsext].header): - if clobber==False: - print 'Wcskey %s is already used.' % wcskey - print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey' - print 'or use "clobber=True" to overwrite the values.' - return False - - + return True def closefobj(fname, f): @@ -449,3 +526,23 @@ def closefobj(fname, f): """ if isinstance(fname, str): f.close() + +def mapFitsExt2HDUListInd(fname, extname): + """ + Map FITS extensions with 'EXTNAME' to HDUList indexes. + """ + + if not isinstance(fname, pyfits.HDUList): + f = pyfits.open(fname) + close_file = True + else: + f = fname + close_file = False + d = {} + for hdu in f: + if 'EXTNAME' in hdu.header and hdu.header['EXTNAME'] == extname: + extver = hdu.header['EXTVER'] + d[(extname, extver)] = f.index_of((extname, extver)) + if close_file: + f.close() + return d -- cgit