summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--wcsutil/altwcs.py444
1 files changed, 444 insertions, 0 deletions
diff --git a/wcsutil/altwcs.py b/wcsutil/altwcs.py
new file mode 100644
index 0000000..2c33902
--- /dev/null
+++ b/wcsutil/altwcs.py
@@ -0,0 +1,444 @@
+from __future__ import division # confidence high
+
+import os.path, string
+import pywcs
+import numpy as np
+import pyfits
+
+"""
+ WCS keywords:
+ 'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2',
+ 'CRVAL*',
+ 'CTYPE*',
+ 'CRPIX*',
+ 'CDELT*',
+ 'CUNIT*',
+ 'ORIENTAT'
+ 'TDDALPHA', 'TDDBETA'
+"""
+
+# file operations
+def archiveWCS(fname, wcskey=" ", wcsname=" ", ext=None, clobber=False):
+ """
+ Copy the primary WCS to the hader as an alternate WCS
+ with wcskey and name WCSNAME.
+
+ Parameters
+ ----------
+ fname: string or pyfits.HDUList
+ a file name or a file object
+ wcskey: string "A"-"Z" or " "
+ if " ": get next available key if wcsname is also " " or try
+ to get a key from WCSNAME value
+ wcsname: string
+ Name of alternate WCS description
+ ext: an int, a python list or None
+ if None - it loops over all extensions
+ otherwise works only on the specified extensions
+ clobber: boolean
+ if Ture - overwrites a WCS with the same key
+
+ See Also
+ --------
+ wcsutils.restoreWCS: Copy an alternate WCS to the primary WCS
+
+ """
+ if isinstance(fname, str):
+ f = pyfits.open(fname, mode='update')
+ else:
+ f = fname
+ assert isinstance(f,pyfits.HDUList)
+ try:
+ assert (f.fileinfo(0)['filemode'] == 'update')
+ except AssertionError:
+ print "File must be opened in update mode."
+ f.close()
+ return
+
+ if ext == None: #update all extenstions
+ exts = range(len(f))
+ elif isinstance(ext, int):
+ exts = [ext]
+ else:
+ assert isinstance(ext, list), "Ext must be a list of int extension numbers, \
+ a fits extension number or None (meaning all extensions)"
+ exts = ext
+
+ assert len(wcskey) == 1, 'Parameter wcskey must be a character - one of "A"-"Z" or " "'
+ if wcskey == " ":
+ # try getting the key from WCSNAME
+ if wcsname == " " or wcsname == "":
+ wkey = next_wcskey(f[1].header)
+ if not wkey:
+ print "Could not get a valid key from header"
+ closefobj(fname, f)
+ return
+ else:
+ wkey = getKeyFromName(f[1].header, wcsname)
+ if wkey and not clobber:
+ print 'Wcsname %s is already used.' % wcsname
+ print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey'
+ print 'or use "clobber=True" to overwrite the values.'
+ closefobj(fname, f)
+ return
+ else:
+ if wcskey not in available_wcskeys(f[1].header):
+ if clobber==False:
+ print 'Wcskey %s is already used.' % wcskey
+ print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey'
+ print 'or use "clobber=True" to overwrite the values.'
+ closefobj(fname,f)
+ return
+ else:
+ # reuse the value for WCSNAME
+ if wcsname == " ":
+ wcsname = f[1].header["WCSNAME"+wcskey]
+ else:
+ wkey = wcskey
+ wname = wcsname
+ wkey = wcskey
+ wname = wcsname
+ print 'in archivewcs wkey, wname', wkey, wname
+ for e in exts:
+ w = pywcs.WCS(f[e].header, fobj=f)
+ hwcs = w.to_header()
+ wcsnamekey = 'WCSNAME' + wkey
+ f[e].header.update(key=wcsnamekey, value=wcsname)
+ if w.wcs.has_cd():
+ pc2cd(hwcs)
+ for k in hwcs.keys():
+ key = k+wkey
+ f[e].header.update(key=key, value=hwcs[k])
+ norient = np.rad2deg(np.arctan2(hwcs['CD1_2'],hwcs['CD2_2']))
+ okey = 'ORIENT%s' % wkey
+ f[e].header.update(key=okey, value=norient)
+ closefobj(fname, f)
+
+def restoreWCS(f, wcskey, wcsname=" ", ext=None, clobber=False):
+ """
+ Copy a WCS with key "WCSKEY" to a primary WCS
+
+ Reads in a WCS defined with wcskey and saves it as the primary WCS.
+ If clobber is False, writes out new files whose names are the original
+ names with an attached 3 character string _'WCSKEY'_.
+ Otherwise overwrites the files. Goes sequentially through the list of extensions
+ The WCS is restored from the 'SCI' extension but the primary WCS of all
+ extensions with the same EXTVER are updated.
+
+
+ Parameters
+ ----------
+ f: string or pyfits.HDUList object
+ a file name or a file object
+ wcskey: a charater
+ "A"-"Z" - Used for one of 26 alternate WCS definitions.
+ or " " - find a key from WCSNAMe value
+ wcsname: string (optional)
+ if given and wcskey is " ", will try to restore by WCSNAME value
+ ext: int, python list or None
+ if None - WCS is restored in all extensions
+ otherwise only in the specified extensions
+ clobber: boolean
+ A flag to define if the original files should be overwritten
+
+ See Also
+ --------
+ wcsutil.archiveWCS - copy the primary WCS as an alternate WCS
+
+ """
+ if isinstance(f, str):
+ if clobber:
+ fobj = pyfits.open(f, mode='update')
+ else:
+ fobj = pyfits.open(f)
+ else:
+ fobj = f
+ assert isinstance(fobj,pyfits.HDUList), \
+ "First parameter must be a file name or a pyfits.HDUList"
+ try:
+ assert (fobj.fileinfo(0)['filemode'] == 'update')
+ except AssertionError:
+ if clobber:
+ print "File must be opened in update mode."
+ closefobj(f, fobj)
+ return
+
+
+ if not clobber:
+ name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey
+ else:
+ # make sure the file was opened in update mode
+ try:
+ assert (fobj.fileinfo(0)['filemode'] == 'update')
+ except AssertionError:
+ print "File must be opened in update mode."
+ closefobj(f, fobj)
+ return
+ print "Overwriting original files\n"
+ name = fobj.filename()
+
+ if ext == None: #update all extenstions
+ exts = range(len(fobj))
+ elif isinstance(ext, int):
+ exts = [ext]
+ else:
+ assert isinstance(ext, list), 'Ext must be a list of int extension numbers, a fits \
+ extension number of None (all extensions)'
+ exts = ext
+
+ assert len(wcskey) == 1, 'Parameter wcskey must be a character - one of "A"-"Z" or " "'
+ if wcskey == " ":
+ # try getting the key from WCSNAME
+ if wcsname == " ":
+ print "Could not get a valid key from header"
+ closefobj(f, fobj)
+ return
+ else:
+ wkey = getKeyFromName(fobj[1].header, wcsname)
+ if not wkey:
+ print 'Could not get a key from wcsname %s .' % wcsname
+ closefobj(f, fobj)
+ return
+ else:
+ if wcskey not in wcskeys(fobj[1].header):
+ print "Could not find alternate WCS with key %s in this file" % wcskey
+ closefobj(f, fobj)
+ return
+ wkey = wcskey
+
+ for e in exts:
+ try:
+ extname = fobj[e].header['EXTNAME'].lower()
+ except KeyError:
+ continue
+ #Restore always from a 'SCI' extension but write it out to 'ERR' and 'DQ'
+ if extname == 'sci':
+ sciver = fobj[e].header['extver']
+ try:
+ nwcs = pywcs.WCS(fobj[e].header, fobj=fobj, key=wkey)
+ except KeyError:
+ print 'restoreWCS: Could not read WCS with key %s in file %s, \
+ extension %d' % (wcskey, fobj.filename(), e)
+ closefobj(f, fobj)
+ return #raise
+ hwcs = nwcs.to_header()
+
+ if nwcs.wcs.has_cd():
+ pc2cd(hwcs, key=wkey)
+ for k in hwcs.keys():
+ key = k[:-1]
+ if key in fobj[e].header.keys():
+ fobj[e].header.update(key=key, value = hwcs[k])
+ else:
+ continue
+ if wcskey == 'O' and fobj[e].header.has_key('TDDALPHA'):
+ fobj[e].header['TDDALPHA'] = 0.0
+ fobj[e].header['TDDBETA'] = 0.0
+ if fobj[e].header.has_key('ORIENTAT'):
+ norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey]))
+ fobj[e].header.update(key='ORIENTAT', value=norient)
+ elif extname in ['err', 'dq', 'sdq', 'time', 'samp']:
+ cextver = fobj[e].header['extver']
+ if cextver == sciver:
+ for k in hwcs.keys():
+ key = k[:-1]
+ fobj[e].header.update(key=key, value = hwcs[k])
+ if fobj[e].header.has_key('ORIENTAT'):
+ norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey]))
+ fobj[e].header.update(key='ORIENTAT', value=norient)
+ else:
+ continue
+
+ if not clobber:
+ fobj.writeto(name)
+ closefobj(f, fobj)
+
+def deleteWCS(fname, wcskey=" ", wcsname=" ", ext=None):
+ """
+ Delete an alternate WCS defined with wcskey.
+ If wcskey is " " try to get a key from WCSNAME.
+
+ Parameters
+ ----------
+ fname: sting or a pyfits.HDUList object
+ wcskey: one of 'A'-'Z' or " "
+ wcsname: string
+ Name of alternate WCS description
+ ext: an int, a python list or None
+ if None - WCS with key wcskey is deleted from all extensions
+ otherwise only from the specified extensions
+ """
+ if isinstance(fname, str):
+ fobj = pyfits.open(fname, mode='update')
+ else:
+ fobj = fname
+ assert isinstance(fobj,pyfits.HDUList)
+ try:
+ assert (fobj.fileinfo(0)['filemode'] == 'update')
+ except AssertionError:
+ print "File must be opened in update mode."
+ fobj.close()
+ return
+
+ if not ext: #work with all extensions
+ exts = range(len(fobj))
+ elif isinstance(ext, list):
+ exts = ext[:]
+ elif isinstance(ext, int):
+ exts = [ext]
+ else:
+ print 'ext paramter can be int, a list of int or None\n'
+ print 'No WCS was deleted\n'
+ closefobj(fname, fobj)
+ return
+
+ assert len(wcskey) == 1, 'Parameter wcskey must be a character - one of "A"-"Z" or " "'
+ if wcskey == " ":
+ # try getting the key from WCSNAME
+ if wcsname == " ":
+ print "Could not get a valid key from header"
+ closefobj(fname, fobj)
+ return
+ else:
+ wkey = getKeyFromName(fobj[1].header, wcsname)
+ if not wkey:
+ print 'Could not get a key: wcsname "%s" not found in header.' % wcsname
+ closefobj(fname, fobj)
+ return
+ else:
+ if wcskey not in wcskeys(fobj[1].header):
+ print "Could not find alternate WCS with key %s in this file" % wcskey
+ closefobj(fname, fobj)
+ return
+ wkey = wcskey
+
+ prexts = []
+ for i in exts:
+ hdr = fobj[i].header
+ try:
+ w = pywcs.WCS(hdr, fobj, key=wkey)
+ except KeyError:
+ continue
+ hwcs = w.to_header()
+ if w.wcs.has_cd():
+ pc2cd(hwcs, key=wkey)
+ for k in hwcs.keys():
+ del hdr[k]
+ del hdr['ORIENT'+wkey]
+ prexts.append(i)
+ if prexts != []:
+ print 'Deleted all instances of WCS with key %s in extensions' % wkey, prexts
+ else:
+ print "Did not find WCS with key %s in any of the extensions" % wkey
+ closefobj(fname, fobj)
+
+#header operations
+def wcskeys(header):
+ """
+ Returns a list of characters used in the header for alternate
+ WCS description with WCSNAME keyword
+
+ Parameters
+ ----------
+ hdr: pyfits.Header
+ """
+ assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input"
+ names = header["WCSNAME*"]
+ return [key.split('WCSNAME')[1].upper() for key in names.keys()]
+
+def wcsnames(header):
+ """
+ Purpose
+ =======
+ Return a dictionary of wcskey: WCSNAME pairs
+ """
+ names = header["WCSNAME*"]
+ d = {}
+ for c in names:
+ d[c.key[-1]] = c.value
+ return d
+
+def available_wcskeys(header):
+ """
+ Returns a list of characters which are not used in the header
+ with WCSNAME keyword. Any of them can be used to save a new
+ WCS.
+
+ Parameters
+ ----------
+ header: pyfits.Header
+ """
+ assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input"
+ all_keys = list(string.ascii_uppercase)
+ used_keys = wcskeys(header)
+ try:
+ used_keys.remove("")
+ except ValueError:
+ pass
+ [all_keys.remove(key) for key in used_keys]
+ return all_keys
+
+def next_wcskey(header):
+ """
+ Returns next available character to be used for an alternate WCS
+
+ Parameters
+ ----------
+ header: pyfits.Header
+ """
+ assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input"
+ allkeys = available_wcskeys(header)
+ if allkeys != []:
+ return allkeys[0]
+ else:
+ return None
+
+def getKeyFromName(header, wcsname):
+ """
+ If WCSNAME is found in header, return its key, else return
+ None. This is used to update an alternate WCS
+ repeatedly and not generate new keys every time.
+ """
+ wkey = None
+ names = wcsnames(header)
+ for item in names.items():
+ if item[1] == wcsname:
+ wkey = item[0]
+ break
+ return wkey
+
+def pc2cd(hdr, key=' '):
+ """
+ Convert a CD PC matrix to a CD matrix.
+
+ WCSLIB (and PyWCS) recognizes CD keywords as input
+ but converts them and works internally with the PC matrix.
+ to_header() returns the PC matrix even if the i nput was a
+ CD matrix. To keep input and output consistent we check
+ for has_cd and convert the PC back to CD.
+
+ Parameters
+ ----------
+ hdr: pyfits.Header
+ """
+ for c in ['1_1', '1_2', '2_1', '2_2']:
+ try:
+ val = hdr['PC'+c+'%s' % key]
+ del hdr['PC'+c+ '%s' % key]
+ except KeyError:
+ if c=='1_1' or c == '2_2':
+ val = 1.
+ else:
+ val = 0.
+ hdr.update(key='CD'+c+'%s' %key, value=val)
+ return hdr
+
+def closefobj(fname,f):
+ """
+ Functions in this module accept as input a file name or a file object.
+ If the input was a file name (string) we close the object. If the user
+ passed a file object we leave it to the user to close it.
+ """
+ if isinstance(fname, str):
+ f.close()
+ \ No newline at end of file