summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordencheva <dencheva@stsci.edu>2011-08-05 15:26:06 -0400
committerdencheva <dencheva@stsci.edu>2011-08-05 15:26:06 -0400
commit2cf8cec79a5ffce3b3ba5b83b77338d2762020a5 (patch)
tree9109d3c23801b5f39f851796b2852ada7d5bcc29
parentf88d142626eba126ab8fee38f47794d20f804c13 (diff)
downloadstwcs_hcf-2cf8cec79a5ffce3b3ba5b83b77338d2762020a5.tar.gz
Almost a complete rewrite of altwcs, the goals being:
- make all public functions work with HDUList objects in memory (in addition to files) - make all public functions work with simple fits files - remove all HST specific assumptions (like extname values) git-svn-id: http://svn.stsci.edu/svn/ssb/stsci_python/stsci_python/trunk/stwcs@13513 fe389314-cf27-0410-b35b-8c050e845b92
-rw-r--r--lib/stwcs/wcsutil/altwcs.py365
1 files changed, 231 insertions, 134 deletions
diff --git a/lib/stwcs/wcsutil/altwcs.py b/lib/stwcs/wcsutil/altwcs.py
index d250b52..063df18 100644
--- a/lib/stwcs/wcsutil/altwcs.py
+++ b/lib/stwcs/wcsutil/altwcs.py
@@ -5,14 +5,15 @@ import string
import numpy as np
import pywcs
import pyfits
+from stsci.tools import fileutil as fu
altwcskw = ['WCSAXES', 'CRVAL', 'CRPIX', 'PC', 'CDELT', 'CD', 'CTYPE', 'CUNIT',
'PV', 'PS']
# file operations
-def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):
+def archiveWCS(fname, ext=None, wcskey=" ", wcsname=" ", reusekey=False):
"""
- Copy the primary WCS to the hader as an alternate WCS
+ Copy the primary WCS to the header as an alternate WCS
with wcskey and name WCSNAME. It loops over all extensions in 'ext'
Parameters
@@ -21,14 +22,28 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):
a file name or a file object
ext: an int, a tuple, a python list of integers or a python list of tuples (e.g.('sci',1))
fits extensions to work with
+ if None, work with a list of all extensions
wcskey: string "A"-"Z" or " "
if " ": get next available key if wcsname is also " " or try
to get a key from WCSNAME value
wcsname: string
Name of alternate WCS description
- clobber: boolean
- if Ture - overwrites a WCS with the same key
+ reusekey: boolean
+ if True - overwrites a WCS with the same key
+
+ Examples
+ --------
+ Copy the primary WCS of an in memory headrlet object to an
+ alternate WCS with key 'T'
+
+ >>> hlet=headerlet.createHeaderlet('junk.fits', 'hdr1.fits')
+ >>> altwcs.wcskeys(hlet[1].header)
+ ['A']
+ >>> altwcs.archiveWCS(hlet, ext=[('SIPWCS',1),('SIPWCS',2)], wcskey='T')
+ >>> altwcs.wcskeys(hlet[1].header)
+ ['A', 'T']
+
See Also
--------
wcsutils.restoreWCS: Copy an alternate WCS to the primary WCS
@@ -40,21 +55,16 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):
else:
f = fname
- if not parpasscheck(f, ext, wcskey, wcsname):
+ if not _parpasscheck(f, ext, wcskey, wcsname):
closefobj(fname, f)
return
- if isinstance(ext, int) or isinstance(ext, tuple):
+ if not ext:
+ ext = range(len(f))
+ elif isinstance(ext, int) or isinstance(ext, tuple):
ext = [ext]
- wcsext = 0
- eindx = 0
- for e in f:
- if 'extname' in e.header and 'sci' in e.header['extname'].lower():
- wcsext = eindx
- break
- eindx += 1
-
+ wcsext = ext[0]
if wcskey == " ":
# try getting the key from WCSNAME
if not wcsname.strip():
@@ -76,7 +86,12 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):
wname = wcsname
for e in ext:
- w = pywcs.WCS(f[e].header, fobj=f)
+ try:
+ w = pywcs.WCS(f[e].header, fobj=f)
+ except:
+ # this should be revisited, should catchspecific errors
+ # KeyError - not a valid key, ValueError - not a valid extension, etc
+ continue
hwcs = w.to_header()
wcsnamekey = 'WCSNAME' + wkey
f[e].header.update(key=wcsnamekey, value=wcsname)
@@ -90,15 +105,17 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):
#f[e].header.update(key=okey, value=norient)
closefobj(fname, f)
-def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):
+def restoreWCS(f, ext=None, fromext=None, toext=None, wcskey=" ", wcsname=" ",
+ clobber=False):
"""
- Copy a WCS with key "WCSKEY" to a primary WCS
+ Copy a WCS with key "WCSKEY" to the primary WCS
Reads in a WCS defined with wcskey and saves it as the primary WCS.
- If clobber is False, writes out new files whose names are the original
- names with an attached 3 character string _'WCSKEY'_.
- Otherwise overwrites the files. Goes sequentially through the list of extensions
- The WCS is restored from the 'SCI' extension but the primary WCS of all
+ If clobber is False, writes out a new file whose name is the original
+ name with an attached 3 character string _'WCSKEY'_.
+ Otherwise overwrites the file. Goes sequentially through the list
+ of extensions.
+ The WCS is restored from the 'SCI' extension but the WCS of all
extensions with the same EXTVER are updated.
@@ -106,6 +123,11 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):
----------
f: string or pyfits.HDUList object
a file name or a file object
+ fromext: string
+ extname from which to read in the alternate WCS, for example 'SCI'
+ toext: string or python list
+ extname or a list of extnames to which the WCS will be copied as
+ primary, for example ['SCI', 'ERR', 'DQ']
ext: an int, a tuple, a python list of integers or a python list
of tuples (e.g.('sci',1))
fits extensions to work with
@@ -114,7 +136,7 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):
or " " - find a key from WCSNAMe value
wcsname: string (optional)
if given and wcskey is " ", will try to restore by WCSNAME value
- clobber: boolean
+ clobber: boolean, default False
A flag to define if the original files should be overwritten
See Also
@@ -130,80 +152,72 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):
else:
fobj = f
- if not parpasscheck(fobj, ext, wcskey, wcsname):
+ if not _parpasscheck(fobj, ext=ext, wcskey=wcskey, fromext=fromext, toext=toext, clobber=clobber):
closefobj(f, fobj)
return
if isinstance(ext, int) or isinstance(ext, tuple):
ext = [ext]
-
- if not clobber:
- name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey
+ #if isinstance(fromext, str):
+ # fromext = [fromext]
+ if isinstance(toext, str):
+ toext = [toext]
+
+ # the case of an HDUList object in memory without an associated file
+ if fobj.filename() is not None:
+ if not clobber:
+ name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey
+ else:
+ name = fobj.filename()
+
+ simplefits = fu.isFits(fobj)[1] is 'simple'
+ if simplefits:
+ wcskeyext = 0
else:
- name = fobj.filename()
-
+ wcskeyext = 1
+
if wcskey == " ":
if wcsname.strip():
- wkey = getKeyFromName(fobj[1].header, wcsname)
+ wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
if not wkey:
print 'Could not get a key from wcsname %s .' % wcsname
closefobj(f, fobj)
return
else:
- if wcskey not in wcskeys(fobj[1].header):
+ if wcskey not in wcskeys(fobj, ext=wcskeyext):
print "Could not find alternate WCS with key %s in this file" % wcskey
closefobj(f, fobj)
return
wkey = wcskey
-
- for e in ext:
- try:
- extname = fobj[e].header['EXTNAME'].lower()
- except KeyError:
- continue
- #Restore always from a 'SCI' extension but write it out to 'ERR' and 'DQ'
- if extname == 'sci':
- sciver = fobj[e].header['extver']
- try:
- nwcs = pywcs.WCS(fobj[e].header, fobj=fobj, key=wkey)
- except KeyError:
- print 'restoreWCS: Could not read WCS with key %s in file %s, \
- extension %d' % (wkey, fobj.filename(), e)
- closefobj(f, fobj)
- return #raise
- hwcs = nwcs.to_header()
-
- if nwcs.wcs.has_cd():
- pc2cd(hwcs, key=wkey)
- for k in hwcs.keys():
- key = k[:-1]
- if key in fobj[e].header.keys():
- fobj[e].header.update(key=key, value = hwcs[k])
- else:
- continue
- if wcskey == 'O' and fobj[e].header.has_key('TDDALPHA'):
- fobj[e].header['TDDALPHA'] = 0.0
- fobj[e].header['TDDBETA'] = 0.0
- if fobj[e].header.has_key('ORIENTAT'):
- norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey]))
- fobj[e].header.update(key='ORIENTAT', value=norient)
- elif extname in ['err', 'dq', 'sdq', 'time', 'samp']:
- cextver = fobj[e].header['extver']
- if cextver == sciver:
- for k in hwcs.keys():
- key = k[:-1]
- fobj[e].header.update(key=key, value = hwcs[k])
- if fobj[e].header.has_key('ORIENTAT'):
- norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey]))
- fobj[e].header.update(key='ORIENTAT', value=norient)
+
+ if ext:
+ # if ext is given ignore fromext and toext.
+ for e in ext:
+ _restore(fobj, wkey, fromextnum=e)
+ else:
+ # the case when ext is None but fromext and toext are given
+ # This only works with MEF files. It uses extname, extver.
+ # For each extname in fromext find the corresponding extver for each
+ # extname in toext and update the primary WCS.
+ if fromext and toext:
+ countext = fu.countExtn(fobj, fromext)
+ if not countext:
+ raise KeyError("File does not have extension with extname %s", fromext)
+ else:
+ for i in range(1, countext+1):
+ for toe in toext:
+ _restore(fobj, fromextnum=i, fromextnam=fromext, toextnum=i, toextnam=toe, ukey=wkey)
else:
- continue
+ ext = range(len(fobj))
+ for e in ext:
+ _restore(fobj, wkey, fromextnum=e)
- if not clobber:
- fobj.writeto(name)
- closefobj(f, fobj)
+ if fobj.filename() is not None:
+ if not clobber:
+ fobj.writeto(name)
+ closefobj(f, fobj)
-def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
+def deleteWCS(fname, ext=None, wcskey=" ", wcsname=" "):
"""
Delete an alternate WCS defined with wcskey.
If wcskey is " " try to get a key from WCSNAME.
@@ -222,11 +236,13 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
else:
fobj = fname
- if not parpasscheck(fobj, ext, wcskey, wcsname):
+ if not _parpasscheck(fobj, ext, wcskey, wcsname):
closefobj(fname, fobj)
return
- if isinstance(ext, int) or isinstance(ext, tuple):
+ if not ext:
+ ext = range(len(fobj))
+ elif isinstance(ext, int) or isinstance(ext, tuple):
ext = [ext]
# Do not allow deleting the original WCS.
@@ -235,6 +251,11 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
closefobj(fname, fobj)
return
+ simplefits = fu.isFits(fobj)[1] is 'simple'
+ if simplefits:
+ wcskeyext = 0
+ else:
+ wcskeyext = 1
if wcskey == " ":
# try getting the key from WCSNAME
if wcsname == " ":
@@ -242,13 +263,13 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
closefobj(fname, fobj)
return
else:
- wkey = getKeyFromName(fobj[1].header, wcsname)
+ wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
if not wkey:
print 'Could not get a key: wcsname "%s" not found in header.' % wcsname
closefobj(fname, fobj)
return
else:
- if wcskey not in wcskeys(fobj[1].header):
+ if wcskey not in wcskeys(fobj[wcskeyext].header):
print "Could not find alternate WCS with key %s in this file" % wcskey
closefobj(fname, fobj)
return
@@ -274,8 +295,63 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
print "Did not find WCS with key %s in any of the extensions" % wkey
closefobj(fname, fobj)
+def _restore(fobj, ukey, fromextnum, toextnum=None, fromextnam=None, toextnam=None):
+ if fromextnam:
+ fromextension = (fromextnam, fromextnum)
+ else:
+ fromextension = fromextnum
+ if toextnum:
+ if toextnam:
+ toextension = (toextnam, toextnum)
+ else:
+ toextension =toextnum
+ else:
+ toextension = fromextension
+
+ try:
+ nwcs = pywcs.WCS(fobj[fromextension].header, fobj=fobj, key=ukey)
+ except KeyError:
+ print 'restoreWCS: Could not read WCS with key %s' %ukey
+ print ' Skipping %s[%s]' % (fobj.filename(), str(fromextension))
+ return
+ hwcs = nwcs.to_header()
+
+ if nwcs.wcs.has_cd():
+ pc2cd(hwcs, key=ukey)
+ for k in hwcs.keys():
+ key = k[:-1]
+ if key in fobj[toextension].header.keys():
+ fobj[toextension].header.update(key=key, value = hwcs[k])
+ else:
+ continue
+ if key == 'O' and fobj[toextension].header.has_key('TDDALPHA'):
+ fobj[toextension].header['TDDALPHA'] = 0.0
+ fobj[toeztension].header['TDDBETA'] = 0.0
+ if fobj[toextension].header.has_key('ORIENTAT'):
+ norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %ukey], hwcs['CD2_2'+'%s' %ukey]))
+ fobj[toextension].header.update(key='ORIENTAT', value=norient)
+
#header operations
-def wcskeys(header):
+def _check_headerpars(fobj, ext):
+ if not isinstance(fobj, pyfits.Header) and not isinstance(fobj, pyfits.HDUList) \
+ and not isinstance(fobj, str):
+ raise ValueError("Expected a file name, a file object or a header\n")
+
+ if not isinstance(fobj, pyfits.Header):
+ #raise ValueError("Expected a valid ext parameter when input is a file")
+ if not isinstance(ext, int) and not isinstance(ext, tuple):
+ raise ValueError("Expected ext to be a number or a tuple, e.g. ('SCI', 1)\n")
+
+def _getheader(fobj, ext):
+ if isinstance(fobj, str):
+ hdr = pyfits.getheader(fobj,ext)
+ elif isinstance(fobj, pyfits.Header):
+ hdr = fobj
+ else:
+ hdr = fobj[ext].header
+ return hdr
+
+def wcskeys(fobj, ext=None):
"""
Returns a list of characters used in the header for alternate
WCS description with WCSNAME keyword
@@ -284,11 +360,12 @@ def wcskeys(header):
----------
hdr: pyfits.Header
"""
- assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input"
- names = header["WCSNAME*"]
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
+ names = hdr["WCSNAME*"]
return [key.split('WCSNAME')[1].upper() for key in names.keys()]
-def wcsnames(header):
+def wcsnames(fobj, ext=None):
"""
Returns a dictionary of wcskey: WCSNAME pairs
@@ -296,13 +373,15 @@ def wcsnames(header):
----------
header: pyfits.Header
"""
- names = header["WCSNAME*"]
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
+ names = hdr["WCSNAME*"]
d = {}
for c in names:
d[c.key[-1]] = c.value
return d
-def available_wcskeys(header):
+def available_wcskeys(fobj, ext=None):
"""
Returns a list of characters which are not used in the header
with WCSNAME keyword. Any of them can be used to save a new
@@ -312,9 +391,10 @@ def available_wcskeys(header):
----------
header: pyfits.Header
"""
- assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input"
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
all_keys = list(string.ascii_uppercase)
- used_keys = wcskeys(header)
+ used_keys = wcskeys(hdr)
try:
used_keys.remove("")
except ValueError:
@@ -322,7 +402,7 @@ def available_wcskeys(header):
[all_keys.remove(key) for key in used_keys]
return all_keys
-def next_wcskey(header):
+def next_wcskey(fobj, ext=None):
"""
Returns next available character to be used for an alternate WCS
@@ -330,8 +410,9 @@ def next_wcskey(header):
----------
header: pyfits.Header
"""
- assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input"
- allkeys = available_wcskeys(header)
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, hdr)
+ allkeys = available_wcskeys(hdr)
if allkeys != []:
return allkeys[0]
else:
@@ -384,61 +465,57 @@ def pc2cd(hdr, key=' '):
hdr.update(key='CD'+c+'%s' %key, value=val)
return hdr
-def parpasscheck(fobj, ext, wcskey, wcsname, clobber=True):
-
+def _parpasscheck(fobj, ext, wcskey, fromext=None, toext=None, reusekey=False, clobber=False):
+ """
+ Check input parameters to altwcs functions
+
+ fobj: string or pyfits.HDUList object
+ a file name or a file object
+ ext: an int, a tuple, a python list of integers or a python list
+ of tuples (e.g.('sci',1))
+ fits extensions to work with
+ wcskey: a charater
+ "A"-"Z" or " "- Used for one of 26 alternate WCS definitions
+ wcsname: string (optional)
+ if given and wcskey is " ", will try to restore by WCSNAME value
+ reusekey: boolean
+ A flag which indicates whether to reuse a wcskey in the header
+ clobber: boolean
+ A flag to define if the original files should be overwritten
+ """
if not isinstance(fobj,pyfits.HDUList):
print "First parameter must be a fits file object or a file name."
return False
- try:
- assert (fobj.fileinfo(0)['filemode'] == 'update')
- except AssertionError:
- print "First parameter must be a file name or a file object opened in 'update' mode."
- return False
+
+ # first one covers the case of an object created in memory
+ # (e.g. headerlet) for which fileinfo returns None
+ if fobj.fileinfo(0) is None:
+ pass
+ else:
+ # an HDUList object with associated file
+ if fobj.fileinfo(0)['filemode'] is not 'update' and clobber:
+ print "First parameter must be a file name or a file object opened in 'update' mode."
+ return False
if not isinstance(ext, int) and not isinstance(ext, tuple) \
- and not isinstance(ext, list):
+ and not isinstance(ext, list) and ext is not None:
print "Ext must be integer, tuple, a list of int extension numbers, \
or a list of tuples representing a fits extension, for example ('sci', 1)."
return False
+
+ if not isinstance(fromext, str) and fromext is not None:
+ print "fromext must be a string representing a valid extname"
+ return False
+ if not isinstance(toext, list) and not isinstance(toext, str) and \
+ toext is not None :
+ print "toext must be a string or a list of strings representing extname"
+ return False
+
if len(wcskey) != 1:
print 'Parameter wcskey must be a character - one of "A"-"Z" or " "'
return False
-
- wcsext = 0
- eindx = 0
- for e in fobj:
- if 'extname' in e.header and 'sci' in e.header['extname'].lower():
- wcsext = eindx
- break
- eindx += 1
-
-
- if wcskey == " ":
- # try getting the key from WCSNAME
- """
- if wcsname == " " or wcsname == "":
- #wkey = next_wcskey(f[1].header)
- #if not wkey:
- # print "Could not get a valid key from header"
- return False
- """
- if wcsname.strip():
- wkey = getKeyFromName(fobj[wcsext].header, wcsname)
- if wkey and not clobber:
- print 'Wcsname %s is already used.' % wcsname
- print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey'
- print 'or use "clobber=True" to overwrite the values.'
- return False
- else:
- if wcskey not in available_wcskeys(fobj[wcsext].header):
- if clobber==False:
- print 'Wcskey %s is already used.' % wcskey
- print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey'
- print 'or use "clobber=True" to overwrite the values.'
- return False
-
-
+
return True
def closefobj(fname, f):
@@ -449,3 +526,23 @@ def closefobj(fname, f):
"""
if isinstance(fname, str):
f.close()
+
+def mapFitsExt2HDUListInd(fname, extname):
+ """
+ Map FITS extensions with 'EXTNAME' to HDUList indexes.
+ """
+
+ if not isinstance(fname, pyfits.HDUList):
+ f = pyfits.open(fname)
+ close_file = True
+ else:
+ f = fname
+ close_file = False
+ d = {}
+ for hdu in f:
+ if 'EXTNAME' in hdu.header and hdu.header['EXTNAME'] == extname:
+ extver = hdu.header['EXTVER']
+ d[(extname, extver)] = f.index_of((extname, extver))
+ if close_file:
+ f.close()
+ return d