summaryrefslogtreecommitdiff
path: root/stwcs/wcsutil
diff options
context:
space:
mode:
Diffstat (limited to 'stwcs/wcsutil')
-rw-r--r--stwcs/wcsutil/__init__.py34
-rw-r--r--stwcs/wcsutil/altwcs.py758
-rw-r--r--stwcs/wcsutil/convertwcs.py118
-rw-r--r--stwcs/wcsutil/getinput.py62
-rw-r--r--stwcs/wcsutil/headerlet.py2754
-rw-r--r--stwcs/wcsutil/hstwcs.py988
-rw-r--r--stwcs/wcsutil/instruments.py320
-rw-r--r--stwcs/wcsutil/mappings.py29
-rw-r--r--stwcs/wcsutil/mosaic.py183
-rw-r--r--stwcs/wcsutil/wcscorr.py668
-rw-r--r--stwcs/wcsutil/wcsdiff.py150
11 files changed, 6064 insertions, 0 deletions
diff --git a/stwcs/wcsutil/__init__.py b/stwcs/wcsutil/__init__.py
new file mode 100644
index 0000000..65280be
--- /dev/null
+++ b/stwcs/wcsutil/__init__.py
@@ -0,0 +1,34 @@
+from __future__ import absolute_import, print_function # confidence high
+
+from .altwcs import *
+from .hstwcs import HSTWCS
+
+
+def help():
+ doc = """ \
+ 1. Using a `astropy.io.fits.HDUList` object and an extension number \n
+ Example:\n
+ from astropy.io improt fits
+ fobj = fits.open('some_file.fits')\n
+ w = wcsutil.HSTWCS(fobj, 3)\n\n
+
+ 2. Create an HSTWCS object using a qualified file name. \n
+ Example:\n
+ w = wcsutil.HSTWCS('j9irw4b1q_flt.fits[sci,1]')\n\n
+
+ 3. Create an HSTWCS object using a file name and an extension number. \n
+ Example:\n
+ w = wcsutil.HSTWCS('j9irw4b1q_flt.fits', ext=2)\n\n
+
+ 4. Create an HSTWCS object from WCS with key 'O'.\n
+ Example:\n
+
+ w = wcsutil.HSTWCS('j9irw4b1q_flt.fits', ext=2, wcskey='O')\n\n
+
+ 5. Create a template HSTWCS object for a DEFAULT object.\n
+ Example:\n
+ w = wcsutil.HSTWCS(instrument='DEFAULT')\n\n
+ """
+
+ print('How to create an HSTWCS object:\n\n')
+ print(doc)
diff --git a/stwcs/wcsutil/altwcs.py b/stwcs/wcsutil/altwcs.py
new file mode 100644
index 0000000..aae1a9f
--- /dev/null
+++ b/stwcs/wcsutil/altwcs.py
@@ -0,0 +1,758 @@
+from __future__ import division, print_function # confidence high
+import os
+import string
+
+import numpy as np
+from astropy import wcs as pywcs
+from astropy.io import fits
+from stsci.tools import fileutil as fu
+
+altwcskw = ['WCSAXES', 'CRVAL', 'CRPIX', 'PC', 'CDELT', 'CD', 'CTYPE', 'CUNIT',
+ 'PV', 'PS']
+altwcskw_extra = ['LATPOLE','LONPOLE','RESTWAV','RESTFRQ']
+
+# file operations
+def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", reusekey=False):
+ """
+ Copy the primary WCS to the header as an alternate WCS
+ with wcskey and name WCSNAME. It loops over all extensions in 'ext'
+
+ Parameters
+ ----------
+ fname : string or `astropy.io.fits.HDUList`
+ file name or a file object
+ ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1))
+ fits extensions to work with
+ If a string is provided, it should specify the EXTNAME of extensions
+ with WCSs to be archived
+ wcskey : string "A"-"Z" or " "
+ if " ": get next available key if wcsname is also " " or try
+ to get a key from WCSNAME value
+ wcsname : string
+ Name of alternate WCS description
+ reusekey : boolean
+ if True - overwrites a WCS with the same key
+
+ Examples
+ --------
+ Copy the primary WCS of an in memory headrlet object to an
+ alternate WCS with key 'T'
+
+ >>> hlet=headerlet.createHeaderlet('junk.fits', 'hdr1.fits')
+ >>> altwcs.wcskeys(hlet[1].header)
+ ['A']
+ >>> altwcs.archiveWCS(hlet, ext=[('SIPWCS',1),('SIPWCS',2)], wcskey='T')
+ >>> altwcs.wcskeys(hlet[1].header)
+ ['A', 'T']
+
+
+ See Also
+ --------
+ wcsutil.restoreWCS: Copy an alternate WCS to the primary WCS
+
+ """
+
+ if isinstance(fname, str):
+ f = fits.open(fname, mode='update')
+ else:
+ f = fname
+
+ if not _parpasscheck(f, ext, wcskey, wcsname):
+ closefobj(fname, f)
+ raise ValueError("Input parameters problem")
+
+ # Interpret input 'ext' value to get list of extensions to process
+ ext = _buildExtlist(f, ext)
+
+ if not wcskey and not wcsname:
+ raise KeyError("Either wcskey or wcsname should be specified")
+
+ if wcsname.strip() == "":
+ try:
+ wcsname = readAltWCS(f, ext[0], wcskey=" ")['WCSNAME']
+ except KeyError:
+ pass
+ wcsext = ext[0]
+ if wcskey != " " and wcskey in wcskeys(f[wcsext].header) and not reusekey:
+ closefobj(fname, f)
+ raise KeyError("Wcskey %s is aready used. \
+ Run archiveWCS() with reusekey=True to overwrite this alternate WCS. \
+ Alternatively choose another wcskey with altwcs.available_wcskeys()." %wcskey)
+ elif wcskey == " ":
+ # wcsname exists, overwrite it if reuse is True or get the next key
+ if wcsname.strip() in wcsnames(f[wcsext].header).values():
+ if reusekey:
+ # try getting the key from an existing WCS with WCSNAME
+ wkey = getKeyFromName(f[wcsext].header, wcsname)
+ wname = wcsname
+ if wkey == ' ':
+ wkey = next_wcskey(f[wcsext].header)
+ elif wkey is None:
+ closefobj(fname, f)
+ raise KeyError("Could not get a valid wcskey from wcsname %s" %wcsname)
+ else:
+ closefobj(fname, f)
+ raise KeyError("Wcsname %s is aready used. \
+ Run archiveWCS() with reusekey=True to overwrite this alternate WCS. \
+ Alternatively choose another wcskey with altwcs.available_wcskeys() or\
+ choose another wcsname." %wcsname)
+ else:
+ wkey = next_wcskey(f[wcsext].header)
+ if wcsname.strip():
+ wname = wcsname
+ else:
+ # determine which WCSNAME needs to be replicated in archived WCS
+ wnames = wcsnames(f[wcsext].header)
+ if 'O' in wnames: del wnames['O'] # we don't want OPUS/original
+ if len(wnames) > 0:
+ if ' ' in wnames:
+ wname = wnames[' ']
+ else:
+ akeys = string.uppercase
+ wname = "DEFAULT"
+ for key in akeys[-1::]:
+ if key in wnames:
+ wname = wnames
+ break
+ else:
+ wname = "DEFAULT"
+ else:
+ wkey = wcskey
+ wname = wcsname
+
+ for e in ext:
+ hwcs = readAltWCS(f,e,wcskey=' ')
+ if hwcs is None:
+ continue
+
+ wcsnamekey = 'WCSNAME' + wkey
+ f[e].header[wcsnamekey] = wname
+
+ try:
+ old_wcsname=hwcs.pop('WCSNAME')
+ except:
+ pass
+
+ for k in hwcs.keys():
+ key = k[:7] + wkey
+ f[e].header[key] = hwcs[k]
+ closefobj(fname, f)
+
+def restore_from_to(f, fromext=None, toext=None, wcskey=" ", wcsname=" "):
+ """
+ Copy an alternate WCS from one extension as a primary WCS of another extension
+
+ Reads in a WCS defined with wcskey and saves it as the primary WCS.
+ Goes sequentially through the list of extensions in ext.
+ Alternatively uses 'fromext' and 'toext'.
+
+
+ Parameters
+ ----------
+ f: string or `astropy.io.fits.HDUList`
+ a file name or a file object
+ fromext: string
+ extname from which to read in the alternate WCS, for example 'SCI'
+ toext: string or python list
+ extname or a list of extnames to which the WCS will be copied as
+ primary, for example ['SCI', 'ERR', 'DQ']
+ wcskey: a charater
+ "A"-"Z" - Used for one of 26 alternate WCS definitions.
+ or " " - find a key from WCSNAMe value
+ wcsname: string (optional)
+ if given and wcskey is " ", will try to restore by WCSNAME value
+
+ See Also
+ --------
+ archiveWCS - copy the primary WCS as an alternate WCS
+ restoreWCS - Copy a WCS with key "WCSKEY" to the primary WCS
+
+ """
+ if isinstance(f, str):
+ fobj = fits.open(f, mode='update')
+ else:
+ fobj = f
+
+ if not _parpasscheck(fobj, ext=None, wcskey=wcskey, fromext=fromext, toext=toext):
+ closefobj(f, fobj)
+ raise ValueError("Input parameters problem")
+
+ # Interpret input 'ext' value to get list of extensions to process
+ #ext = _buildExtlist(fobj, ext)
+
+ if isinstance(toext, str):
+ toext = [toext]
+
+ # the case of an HDUList object in memory without an associated file
+
+ #if fobj.filename() is not None:
+ # name = fobj.filename()
+
+ simplefits = fu.isFits(fobj)[1] is 'simple'
+ if simplefits:
+ wcskeyext = 0
+ else:
+ wcskeyext = 1
+
+ if wcskey == " ":
+ if wcsname.strip():
+ wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
+ if not wkey:
+ closefobj(f, fobj)
+ raise KeyError("Could not get a key from wcsname %s ." % wcsname)
+ else:
+ if wcskey not in wcskeys(fobj, ext=wcskeyext):
+ print("Could not find alternate WCS with key %s in this file" % wcskey)
+ closefobj(f, fobj)
+ return
+ wkey = wcskey
+
+ countext = fu.countExtn(fobj, fromext)
+ if not countext:
+ raise KeyError("File does not have extension with extname %s", fromext)
+ else:
+ for i in range(1, countext+1):
+ for toe in toext:
+ _restore(fobj, fromextnum=i, fromextnam=fromext, toextnum=i, toextnam=toe, ukey=wkey)
+
+ if fobj.filename() is not None:
+ #fobj.writeto(name)
+ closefobj(f, fobj)
+
+def restoreWCS(f, ext, wcskey=" ", wcsname=" "):
+ """
+ Copy a WCS with key "WCSKEY" to the primary WCS
+
+ Reads in a WCS defined with wcskey and saves it as the primary WCS.
+ Goes sequentially through the list of extensions in ext.
+ Alternatively uses 'fromext' and 'toext'.
+
+
+ Parameters
+ ----------
+ f : str or `astropy.io.fits.HDUList`
+ file name or a file object
+ ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1))
+ fits extensions to work with
+ If a string is provided, it should specify the EXTNAME of extensions
+ with WCSs to be archived
+ wcskey : str
+ "A"-"Z" - Used for one of 26 alternate WCS definitions.
+ or " " - find a key from WCSNAMe value
+ wcsname : str
+ (optional) if given and wcskey is " ", will try to restore by WCSNAME value
+
+ See Also
+ --------
+ archiveWCS - copy the primary WCS as an alternate WCS
+ restore_from_to
+
+ """
+ if isinstance(f, str):
+ fobj = fits.open(f, mode='update')
+ else:
+ fobj = f
+
+ if not _parpasscheck(fobj, ext=ext, wcskey=wcskey):
+ closefobj(f, fobj)
+ raise ValueError("Input parameters problem")
+
+ # Interpret input 'ext' value to get list of extensions to process
+ ext = _buildExtlist(fobj, ext)
+
+
+ # the case of an HDUList object in memory without an associated file
+
+ #if fobj.filename() is not None:
+ # name = fobj.filename()
+
+ simplefits = fu.isFits(fobj)[1] is 'simple'
+ if simplefits:
+ wcskeyext = 0
+ else:
+ wcskeyext = 1
+
+ if wcskey == " ":
+ if wcsname.strip():
+ wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
+ if not wkey:
+ closefobj(f, fobj)
+ raise KeyError("Could not get a key from wcsname %s ." % wcsname)
+ else:
+ if wcskey not in wcskeys(fobj, ext=wcskeyext):
+ #print "Could not find alternate WCS with key %s in this file" % wcskey
+ closefobj(f, fobj)
+ return
+ wkey = wcskey
+
+ for e in ext:
+ _restore(fobj, wkey, fromextnum=e, verbose=False)
+
+ if fobj.filename() is not None:
+ #fobj.writeto(name)
+ closefobj(f, fobj)
+
+def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
+ """
+ Delete an alternate WCS defined with wcskey.
+ If wcskey is " " try to get a key from WCSNAME.
+
+ Parameters
+ ----------
+ fname : str or a `astropy.io.fits.HDUList`
+ ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1))
+ fits extensions to work with
+ If a string is provided, it should specify the EXTNAME of extensions
+ with WCSs to be archived
+ wcskey : str
+ one of 'A'-'Z' or " "
+ wcsname : str
+ Name of alternate WCS description
+ """
+ if isinstance(fname, str):
+ fobj = fits.open(fname, mode='update')
+ else:
+ fobj = fname
+
+ if not _parpasscheck(fobj, ext, wcskey, wcsname):
+ closefobj(fname, fobj)
+ raise ValueError("Input parameters problem")
+
+ # Interpret input 'ext' value to get list of extensions to process
+ ext = _buildExtlist(fobj, ext)
+ # Do not allow deleting the original WCS.
+ if wcskey == 'O':
+ print("Wcskey 'O' is reserved for the original WCS and should not be deleted.")
+ closefobj(fname, fobj)
+ return
+
+ wcskeyext = ext[0]
+
+ if not wcskeys and not wcsname:
+ raise KeyError("Either wcskey or wcsname should be specified")
+
+ if wcskey == " ":
+ # try getting the key from WCSNAME
+ wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
+ if not wkey:
+ closefobj(fname, fobj)
+ raise KeyError("Could not get a key: wcsname '%s' not found in header." % wcsname)
+ else:
+ if wcskey not in wcskeys(fobj[wcskeyext].header):
+ closefobj(fname, fobj)
+ raise KeyError("Could not find alternate WCS with key %s in this file" % wcskey)
+ wkey = wcskey
+
+ prexts = []
+ for i in ext:
+ hdr = fobj[i].header
+ hwcs = readAltWCS(fobj,i,wcskey=wkey)
+ if hwcs is None:
+ continue
+ for k in hwcs[::-1]:
+ del hdr[k]
+ #del hdr['ORIENT'+wkey]
+ prexts.append(i)
+ if prexts != []:
+ print('Deleted all instances of WCS with key %s in extensions' % wkey, prexts)
+ else:
+ print("Did not find WCS with key %s in any of the extensions" % wkey)
+ closefobj(fname, fobj)
+
+def _buildExtlist(fobj, ext):
+ """
+ Utility function to interpret the provided value of 'ext' and return a list
+ of 'valid' values which can then be used by the rest of the functions in
+ this module.
+
+ Parameters
+ ----------
+ fobj: HDUList
+ file to be examined
+ ext: an int, a tuple, string, list of integers or tuples (e.g.('sci',1))
+ fits extensions to work with
+ If a string is provided, it should specify the EXTNAME of extensions
+ with WCSs to be archived
+ """
+ if not isinstance(ext,list):
+ if isinstance(ext,str):
+ extstr = ext
+ ext = []
+ for extn in range(1, len(fobj)):
+ if 'extname' in fobj[extn].header and fobj[extn].header['extname'] == extstr:
+ ext.append(extn)
+ elif isinstance(ext, int) or isinstance(ext, tuple):
+ ext = [ext]
+ else:
+ raise KeyError("Valid extensions in 'ext' parameter need to be specified.")
+ return ext
+
+def _restore(fobj, ukey, fromextnum,
+ toextnum=None, fromextnam=None, toextnam=None, verbose=True):
+ """
+ fobj: string of HDUList
+ ukey: string 'A'-'Z'
+ wcs key
+ fromextnum: int
+ extver of extension from which to copy WCS
+ fromextnam: string
+ extname of extension from which to copy WCS
+ toextnum: int
+ extver of extension to which to copy WCS
+ toextnam: string
+ extname of extension to which to copy WCS
+ """
+ # create an extension tuple, e.g. ('SCI', 2)
+ if fromextnam:
+ fromextension = (fromextnam, fromextnum)
+ else:
+ fromextension = fromextnum
+ if toextnum:
+ if toextnam:
+ toextension = (toextnam, toextnum)
+ else:
+ toextension =toextnum
+ else:
+ toextension = fromextension
+
+ hwcs = readAltWCS(fobj,fromextension,wcskey=ukey,verbose=verbose)
+ if hwcs is None:
+ return
+
+ for k in hwcs.keys():
+ key = k[:-1]
+ if key in fobj[toextension].header:
+ #fobj[toextension].header.update(key=key, value = hwcs[k])
+ fobj[toextension].header[key] = hwcs[k]
+ else:
+ continue
+ if key == 'O' and 'TDDALPHA' in fobj[toextension].header:
+ fobj[toextension].header['TDDALPHA'] = 0.0
+ fobj[toextension].header['TDDBETA'] = 0.0
+ if 'ORIENTAT' in fobj[toextension].header:
+ norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %ukey], hwcs['CD2_2'+'%s' %ukey]))
+ fobj[toextension].header['ORIENTAT'] = norient
+ # Reset 2014 TDD keywords prior to computing new values (if any are computed)
+ for kw in ['TDD_CYA','TDD_CYB','TDD_CXA','TDD_CXB']:
+ if kw in fobj[toextension].header:
+ fobj[toextension].header[kw] = 0.0
+
+#header operations
+def _check_headerpars(fobj, ext):
+ if not isinstance(fobj, fits.Header) and not isinstance(fobj, fits.HDUList) \
+ and not isinstance(fobj, str):
+ raise ValueError("Expected a file name, a file object or a header\n")
+
+ if not isinstance(fobj, fits.Header):
+ #raise ValueError("Expected a valid ext parameter when input is a file")
+ if not isinstance(ext, int) and not isinstance(ext, tuple):
+ raise ValueError("Expected ext to be a number or a tuple, e.g. ('SCI', 1)\n")
+
+def _getheader(fobj, ext):
+ if isinstance(fobj, str):
+ hdr = fits.getheader(fobj,ext)
+ elif isinstance(fobj, fits.Header):
+ hdr = fobj
+ else:
+ hdr = fobj[ext].header
+ return hdr
+
+def readAltWCS(fobj, ext, wcskey=' ',verbose=False):
+ """ Reads in alternate WCS from specified extension
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ fits filename or fits file object
+ containing alternate/primary WCS(s) to be converted
+ wcskey : str
+ [" ",A-Z]
+ alternate/primary WCS key that will be replaced by the new key
+ ext : int
+ fits extension number
+
+ Returns
+ -------
+ hdr: fits.Header
+ header object with ONLY the keywords for specified alternate WCS
+ """
+ if isinstance(fobj, str):
+ fobj = fits.open(fobj)
+
+ hdr = _getheader(fobj,ext)
+ try:
+ nwcs = pywcs.WCS(hdr, fobj=fobj, key=wcskey)
+ except KeyError:
+ if verbose:
+ print('readAltWCS: Could not read WCS with key %s' %wcskey)
+ print(' Skipping %s[%s]' % (fobj.filename(), str(ext)))
+ return None
+ hwcs = nwcs.to_header()
+
+ if nwcs.wcs.has_cd():
+ hwcs = pc2cd(hwcs, key=wcskey)
+
+ return hwcs
+
+def convertAltWCS(fobj,ext,oldkey=" ",newkey=' '):
+ """
+ Translates the alternate/primary WCS with one key to an alternate/primary WCS with
+ another key.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`, or `astropy.io.fits.Header`
+ fits filename, fits file object or fits header
+ containing alternate/primary WCS(s) to be converted
+ ext : int
+ extension number
+ oldkey : str
+ [" ",A-Z]
+ alternate/primary WCS key that will be replaced by the new key
+ newkey : str
+ [" ",A-Z]
+ new alternate/primary WCS key
+
+ Returns
+ -------
+ hdr: `astropy.io.fits.Header`
+ header object with keywords renamed from oldkey to newkey
+ """
+ hdr = readAltWCS(fobj,ext,wcskey=oldkey)
+ if hdr is None:
+ return None
+ # Converting WCS to new key
+ for card in hdr:
+ if oldkey == ' ' or oldkey == '':
+ cname = card
+ else:
+ cname = card.rstrip(oldkey)
+ hdr.rename_key(card,cname+newkey,force=True)
+
+ return hdr
+
+def wcskeys(fobj, ext=None):
+ """
+ Returns a list of characters used in the header for alternate
+ WCS description with WCSNAME keyword
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
+ fits file name, fits file object or fits header
+ ext : int or None
+ extension number
+ if None, fobj must be a header
+ """
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
+ names = hdr["WCSNAME*"]
+ d = []
+ for key in names:
+ wkey = key.replace('WCSNAME','')
+ if wkey == '': wkey = ' '
+ d.append(wkey)
+ return d
+
+def wcsnames(fobj, ext=None):
+ """
+ Returns a dictionary of wcskey: WCSNAME pairs
+
+ Parameters
+ ----------
+ fobj : stri, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
+ fits file name, fits file object or fits header
+ ext : int or None
+ extension number
+ if None, fobj must be a header
+
+ """
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
+ names = hdr["WCSNAME*"]
+ d = {}
+ for keyword, value in names.items():
+ wkey = keyword.replace('WCSNAME','')
+ if wkey == '': wkey = ' '
+ d[wkey] = value
+ return d
+
+def available_wcskeys(fobj, ext=None):
+ """
+ Returns a list of characters which are not used in the header
+ with WCSNAME keyword. Any of them can be used to save a new
+ WCS.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
+ fits file name, fits file object or fits header
+ ext : int or None
+ extension number
+ if None, fobj must be a header
+ """
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
+ all_keys = list(string.ascii_uppercase)
+ used_keys = wcskeys(hdr)
+ try:
+ used_keys.remove(" ")
+ except ValueError:
+ pass
+ [all_keys.remove(key) for key in used_keys]
+ return all_keys
+
+def next_wcskey(fobj, ext=None):
+ """
+ Returns next available character to be used for an alternate WCS
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
+ fits file name, fits file object or fits header
+ ext : int or None
+ extension number
+ if None, fobj must be a header
+ """
+ _check_headerpars(fobj, ext)
+ hdr = _getheader(fobj, ext)
+ allkeys = available_wcskeys(hdr)
+ if allkeys != []:
+ return allkeys[0]
+ else:
+ return None
+
+def getKeyFromName(header, wcsname):
+ """
+ If WCSNAME is found in header, return its key, else return
+ None. This is used to update an alternate WCS
+ repeatedly and not generate new keys every time.
+
+ Parameters
+ ----------
+ header : `astropy.io.fits.Header`
+ wcsname : str
+ value of WCSNAME
+ """
+ wkey = None
+ names = wcsnames(header)
+ wkeys = []
+ for item in names.items():
+ if item[1].lower() == wcsname.lower():
+ wkeys.append(item[0])
+ wkeys.sort()
+ if len(wkeys) > 0:
+ wkey = wkeys[-1]
+ else:
+ wkey = None
+ return wkey
+
+def pc2cd(hdr, key=' '):
+ """
+ Convert a CD PC matrix to a CD matrix.
+
+ WCSLIB (and PyWCS) recognizes CD keywords as input
+ but converts them and works internally with the PC matrix.
+ to_header() returns the PC matrix even if the i nput was a
+ CD matrix. To keep input and output consistent we check
+ for has_cd and convert the PC back to CD.
+
+ Parameters
+ ----------
+ hdr: `astropy.io.fits.Header`
+
+ """
+ for c in ['1_1', '1_2', '2_1', '2_2']:
+ try:
+ val = hdr['PC'+c+'%s' % key]
+ del hdr['PC'+c+ '%s' % key]
+ except KeyError:
+ if c=='1_1' or c == '2_2':
+ val = 1.
+ else:
+ val = 0.
+ #hdr.update(key='CD'+c+'%s' %key, value=val)
+ hdr['CD{0}{1}'.format(c, key)] = val
+ return hdr
+
+def _parpasscheck(fobj, ext, wcskey, fromext=None, toext=None, reusekey=False):
+ """
+ Check input parameters to altwcs functions
+
+ fobj : str or `astropy.io.fits.HDUList` object
+ a file name or a file object
+ ext : int, a tuple, a python list of integers or a python list
+ of tuples (e.g.('sci',1))
+ fits extensions to work with
+ wcskey : str
+ "A"-"Z" or " "- Used for one of 26 alternate WCS definitions
+ wcsname : str
+ (optional)
+ if given and wcskey is " ", will try to restore by WCSNAME value
+ reusekey : bool
+ A flag which indicates whether to reuse a wcskey in the header
+ """
+ if not isinstance(fobj, fits.HDUList):
+ print("First parameter must be a fits file object or a file name.")
+ return False
+
+ # first one covers the case of an object created in memory
+ # (e.g. headerlet) for which fileinfo returns None
+ if fobj.fileinfo(0) is None:
+ pass
+ else:
+ # an HDUList object with associated file
+ if fobj.fileinfo(0)['filemode'] is not 'update':
+ print("First parameter must be a file name or a file object opened in 'update' mode.")
+ return False
+
+ if not isinstance(ext, int) and not isinstance(ext, tuple) \
+ and not isinstance(ext,str) \
+ and not isinstance(ext, list) and ext is not None:
+ print("Ext must be integer, tuple, string,a list of int extension numbers, \n\
+ or a list of tuples representing a fits extension, for example ('sci', 1).")
+ return False
+
+ if not isinstance(fromext, str) and fromext is not None:
+ print("fromext must be a string representing a valid extname")
+ return False
+
+ if not isinstance(toext, list) and not isinstance(toext, str) and \
+ toext is not None :
+ print("toext must be a string or a list of strings representing extname")
+ return False
+
+ if len(wcskey) != 1:
+ print('Parameter wcskey must be a character - one of "A"-"Z" or " "')
+ return False
+
+ return True
+
+def closefobj(fname, f):
+ """
+ Functions in this module accept as input a file name or a file object.
+ If the input was a file name (string) we close the object. If the user
+ passed a file object we leave it to the user to close it.
+ """
+ if isinstance(fname, str):
+ f.close()
+
+def mapFitsExt2HDUListInd(fname, extname):
+ """
+ Map FITS extensions with 'EXTNAME' to HDUList indexes.
+ """
+
+ if not isinstance(fname, fits.HDUList):
+ f = fits.open(fname)
+ close_file = True
+ else:
+ f = fname
+ close_file = False
+ d = {}
+ for hdu in f:
+ if 'EXTNAME' in hdu.header and hdu.header['EXTNAME'] == extname:
+ extver = hdu.header['EXTVER']
+ d[(extname, extver)] = f.index_of((extname, extver))
+ if close_file:
+ f.close()
+ return d
diff --git a/stwcs/wcsutil/convertwcs.py b/stwcs/wcsutil/convertwcs.py
new file mode 100644
index 0000000..a384eb1
--- /dev/null
+++ b/stwcs/wcsutil/convertwcs.py
@@ -0,0 +1,118 @@
+from __future__ import print_function
+try:
+ import stwcs
+ from stwcs import wcsutil
+except:
+ stwcs = None
+
+from stsci.tools import fileutil
+
+OPUS_WCSKEYS = ['OCRVAL1','OCRVAL2','OCRPIX1','OCRPIX2',
+ 'OCD1_1','OCD1_2','OCD2_1','OCD2_2',
+ 'OCTYPE1','OCTYPE2']
+
+
+def archive_prefix_OPUS_WCS(fobj,extname='SCI'):
+ """ Identifies WCS keywords which were generated by OPUS and archived
+ using a prefix of 'O' for all 'SCI' extensions in the file
+
+ Parameters
+ ----------
+ fobj : str or `astropy.io.fits.HDUList`
+ Filename or fits object of a file
+
+ """
+ if stwcs is None:
+ print('=====================')
+ print('The STWCS package is needed to convert an old-style OPUS WCS to an alternate WCS')
+ print('=====================')
+ raise ImportError
+
+
+ closefits = False
+ if isinstance(fobj,str):
+ # A filename was provided as input
+ fobj = fits.open(fobj,mode='update')
+ closefits=True
+
+ # Define the header
+ ext = ('sci',1)
+ hdr = fobj[ext].header
+
+ numextn = fileutil.countExtn(fobj)
+ extlist = []
+ for e in range(1,numextn+1):
+ extlist.append(('sci',e))
+
+ # Insure that the 'O' alternate WCS is present
+ if 'O' not in wcsutil.wcskeys(hdr):
+ # if not, archive the Primary WCS as the default OPUS WCS
+ wcsutil.archiveWCS(fobj,extlist, wcskey='O', wcsname='OPUS')
+
+ # find out how many SCI extensions are in the image
+ numextn = fileutil.countExtn(fobj,extname=extname)
+ if numextn == 0:
+ extname = 'PRIMARY'
+
+ # create HSTWCS object from PRIMARY WCS
+ wcsobj = wcsutil.HSTWCS(fobj,ext=ext,wcskey='O')
+ # get list of WCS keywords
+ wcskeys = list(wcsobj.wcs2header().keys())
+
+ # For each SCI extension...
+ for e in range(1,numextn+1):
+ # Now, look for any WCS keywords with a prefix of 'O'
+ for key in wcskeys:
+ okey = 'O'+key[:7]
+ hdr = fobj[(extname,e)].header
+ if okey in hdr:
+ # Update alternate WCS keyword with prefix-O OPUS keyword value
+ hdr[key] = hdr[okey]
+
+ if closefits:
+ fobj.close()
+
+def create_prefix_OPUS_WCS(fobj,extname='SCI'):
+ """ Creates alternate WCS with a prefix of 'O' for OPUS generated WCS values
+ to work with old MultiDrizzle.
+
+ Parameters
+ ----------
+ fobj : str or `astropy.io.fits.HDUList`
+ Filename or fits object of a file
+
+ Raises
+ ------
+ IOError:
+ if input FITS object was not opened in 'update' mode
+
+ """
+ # List of O-prefix keywords to create
+ owcskeys = OPUS_WCSKEYS
+
+ closefits = False
+ if isinstance(fobj,str):
+ # A filename was provided as input
+ fobj = fits.open(fobj, mode='update')
+ closefits=True
+ else:
+ # check to make sure this FITS obj has been opened in update mode
+ if fobj.fileinfo(0)['filemode'] != 'update':
+ print('File not opened with "mode=update". Quitting...')
+ raise IOError
+
+ # check for existance of O-prefix WCS
+ if owcskeys[0] not in fobj['sci',1].header:
+
+ # find out how many SCI extensions are in the image
+ numextn = fileutil.countExtn(fobj,extname=extname)
+ if numextn == 0:
+ extname = ''
+ for extn in range(1,numextn+1):
+ hdr = fobj[(extname,extn)].header
+ for okey in owcskeys:
+ hdr[okey] = hdr[okey[1:]+'O']
+
+ # Close FITS image if we had to open it...
+ if closefits:
+ fobj.close()
diff --git a/stwcs/wcsutil/getinput.py b/stwcs/wcsutil/getinput.py
new file mode 100644
index 0000000..8ee1123
--- /dev/null
+++ b/stwcs/wcsutil/getinput.py
@@ -0,0 +1,62 @@
+from astropy.io import fits
+from stsci.tools import irafglob, fileutil, parseinput
+
+def parseSingleInput(f=None, ext=None):
+ if isinstance(f, str):
+ # create an HSTWCS object from a filename
+ if ext != None:
+ filename = f
+ if isinstance(ext,tuple):
+ if ext[0] == '':
+ extnum = ext[1] # handle ext=('',1)
+ else:
+ extnum = ext
+ else:
+ extnum = int(ext)
+ elif ext == None:
+ filename, ext = fileutil.parseFilename(f)
+ ext = fileutil.parseExtn(ext)
+ if ext[0] == '':
+ extnum = int(ext[1]) #handle ext=('',extnum)
+ else:
+ extnum = ext
+ phdu = fits.open(filename)
+ hdr0 = phdu[0].header
+ try:
+ ehdr = phdu[extnum].header
+ except (IndexError, KeyError) as e:
+ raise e.__class__('Unable to get extension %s.' % extnum)
+
+ elif isinstance(f, fits.HDUList):
+ phdu = f
+ if ext == None:
+ extnum = 0
+ else:
+ extnum = ext
+ ehdr = f[extnum].header
+ hdr0 = f[0].header
+ filename = hdr0.get('FILENAME', "")
+
+ else:
+ raise ValueError('Input must be a file name string or a'
+ '`astropy.io.fits.HDUList` object')
+
+ return filename, hdr0, ehdr, phdu
+
+
+def parseMultipleInput(input):
+ if isinstance(input, str):
+ if input[0] == '@':
+ # input is an @ file
+ filelist = irafglob.irafglob(input)
+ else:
+ try:
+ filelist, output = parseinput.parseinput(input)
+ except IOError: raise
+ elif isinstance(input, list):
+ if isinstance(input[0], wcsutil.HSTWCS):
+ # a list of HSTWCS objects
+ return input
+ else:
+ filelist = input[:]
+ return filelist
diff --git a/stwcs/wcsutil/headerlet.py b/stwcs/wcsutil/headerlet.py
new file mode 100644
index 0000000..c0dd9b0
--- /dev/null
+++ b/stwcs/wcsutil/headerlet.py
@@ -0,0 +1,2754 @@
+"""
+This module implements headerlets.
+
+A headerlet serves as a mechanism for encapsulating WCS information
+which can be used to update the WCS solution of an image. The idea
+came up first from the desire for passing improved astrometric
+solutions for HST data and provide those solutions in a manner
+that would not require getting entirely new images from the archive
+when only the WCS information has been updated.
+
+"""
+
+from __future__ import absolute_import, division, print_function
+import os
+import sys
+import functools
+import logging
+import textwrap
+import copy
+import time
+
+import numpy as np
+from astropy.io import fits
+#import pywcs
+from astropy import wcs as pywcs
+from astropy.utils import lazyproperty
+
+from stsci.tools.fileutil import countExtn
+from stsci.tools import fileutil as fu
+from stsci.tools import parseinput
+
+from stwcs.updatewcs import utils
+from . import altwcs
+from . import wcscorr
+from .hstwcs import HSTWCS
+from .mappings import basic_wcs
+
+#### Logging support functions
+class FuncNameLoggingFormatter(logging.Formatter):
+ def __init__(self, fmt=None, datefmt=None):
+ if '%(funcName)s' not in fmt:
+ fmt = '%(funcName)s' + fmt
+ logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt)
+
+ def format(self, record):
+ record = copy.copy(record)
+ if hasattr(record, 'funcName') and record.funcName == 'init_logging':
+ record.funcName = ''
+ else:
+ record.funcName += ' '
+ return logging.Formatter.format(self, record)
+
+
+logger = logging.getLogger(__name__)
+formatter = FuncNameLoggingFormatter("%(levelname)s: %(message)s")
+ch = logging.StreamHandler()
+ch.setFormatter(formatter)
+ch.setLevel(logging.CRITICAL)
+logger.addHandler(ch)
+logger.setLevel(logging.DEBUG)
+
+FITS_STD_KW = ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT',
+ 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN',
+ 'INHERIT', 'DATE', 'IRAF-TLM']
+
+DEFAULT_SUMMARY_COLS = ['HDRNAME', 'WCSNAME', 'DISTNAME', 'AUTHOR', 'DATE',
+ 'SIPNAME', 'NPOLFILE', 'D2IMFILE', 'DESCRIP']
+COLUMN_DICT = {'vals': [], 'width': []}
+COLUMN_FMT = '{:<{width}}'
+
+
+def init_logging(funcname=None, level=100, mode='w', **kwargs):
+ """
+
+ Initialize logging for a function
+
+ Parameters
+ ----------
+ funcname: string
+ Name of function which will be recorded in log
+ level: int, or bool, or string
+ int or string : Logging level
+ bool: False - switch off logging
+ Text logging level for the message ("DEBUG", "INFO",
+ "WARNING", "ERROR", "CRITICAL")
+ mode: 'w' or 'a'
+ attach to logfile ('a' or start a new logfile ('w')
+
+ """
+ for hndl in logger.handlers:
+ if isinstance(hndl, logging.FileHandler):
+ has_file_handler = True
+ else:
+ has_file_handler = False
+ if level:
+ if not has_file_handler:
+ logname = 'headerlet.log'
+ fh = logging.FileHandler(logname, mode=mode)
+ fh.setFormatter(formatter)
+ fh.setLevel(logging.DEBUG)
+ logger.addHandler(fh)
+ logger.info("%s: Starting %s with arguments:\n\t %s" %
+ (time.asctime(), funcname, kwargs))
+
+
+def with_logging(func):
+ @functools.wraps(func)
+ def wrapped(*args, **kw):
+ level = kw.get('logging', 100)
+ mode = kw.get('logmode', 'w')
+ func_args = kw.copy()
+ if sys.version_info[0] >= 3:
+ argnames = func.__code__.co_varnames
+ else:
+ argnames = func.func_code.co_varnames
+
+ for argname, arg in zip(argnames, args):
+ func_args[argname] = arg
+
+ init_logging(func.__name__, level, mode, **func_args)
+ return func(*args, **kw)
+ return wrapped
+
+#### Utility functions
+def is_par_blank(par):
+ return par in ['', ' ', 'INDEF', "None", None]
+
+def parse_filename(fname, mode='readonly'):
+ """
+ Interprets the input as either a filename of a file that needs to be opened
+ or a PyFITS object.
+
+ Parameters
+ ----------
+ fname : str, `astropy.io.fits.HDUList`
+ Input pointing to a file or `astropy.io.fits.HDUList` object.
+ An input filename (str) will be expanded as necessary to
+ interpret any environmental variables
+ included in the filename.
+
+ mode : string
+ Specifies what mode to use when opening the file, if it needs
+ to open the file at all [Default: 'readonly']
+
+ Returns
+ -------
+ fobj : `astropy.io.fits.HDUList`
+ FITS file handle for input
+
+ fname : str
+ Name of input file
+
+ close_fobj : bool
+ Flag specifying whether or not fobj needs to be closed since it was
+ opened by this function. This allows a program to know whether they
+ need to worry about closing the FITS object as opposed to letting
+ the higher level interface close the object.
+
+ """
+ close_fobj = False
+ if not isinstance(fname, list):
+ if sys.version_info[0] >= 3:
+ is_string = isinstance(fname, str)
+ else:
+ is_string = isinstance(fname, basestring)
+ if is_string:
+ fname = fu.osfn(fname)
+ fobj = fits.open(fname, mode=mode)
+ close_fobj = True
+ else:
+ fobj = fname
+ if hasattr(fobj, 'filename'):
+ fname = fobj.filename()
+ else:
+ fname = ''
+ return fobj, fname, close_fobj
+
+def get_headerlet_kw_names(fobj, kw='HDRNAME'):
+ """
+ Returns a list of specified keywords from all HeaderletHDU
+ extensions in a science file.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ kw : str
+ Name of keyword to be read and reported
+ """
+
+ fobj, fname, open_fobj = parse_filename(fobj)
+
+ hdrnames = []
+ for ext in fobj:
+ if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
+ hdrnames.append(ext.header[kw])
+
+ if open_fobj:
+ fobj.close()
+
+ return hdrnames
+
+def get_header_kw_vals(hdr, kwname, kwval, default=0):
+ if kwval is None:
+ if kwname in hdr:
+ kwval = hdr[kwname]
+ else:
+ kwval = default
+ return kwval
+
+@with_logging
+def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
+ strict=True, logging=False, logmode='w'):
+ """
+ Returns all HeaderletHDU extensions in a science file that matches
+ the inputs specified by the user. If no hdrext, hdrname or distname are
+ specified, this function will return a list of all HeaderletHDU objects.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ Name of FITS file or open fits object (`astropy.io.fits.HDUList` instance)
+ hdrext : int, tuple or None
+ index number(EXTVER) or extension tuple of HeaderletHDU to be returned
+ hdrname : string
+ value of HDRNAME for HeaderletHDU to be returned
+ distname : string
+ value of DISTNAME for HeaderletHDUs to be returned
+ strict : bool [Default: True]
+ Specifies whether or not at least one parameter needs to be provided
+ If False, all extension indices returned if hdrext, hdrname and distname
+ are all None. If True and hdrext, hdrname, and distname are all None,
+ raise an Exception requiring one to be specified.
+ logging : boolean
+ enable logging to a file called headerlet.log
+ logmode : 'w' or 'a'
+ log file open mode
+
+ Returns
+ -------
+ hdrlets : list
+ A list of all matching HeaderletHDU extension indices (could be just one)
+
+ """
+
+ get_all = False
+ if hdrext is None and hdrname is None and distname is None:
+ if not strict:
+ get_all = True
+ else:
+ mess = """\n
+ =====================================================
+ No valid Headerlet extension specified.
+ Either "hdrname", "hdrext", or "distname" needs to be specified.
+ =====================================================
+ """
+ logger.critical(mess)
+ raise ValueError
+
+ fobj, fname, open_fobj = parse_filename(fobj)
+
+ hdrlets = []
+ if hdrext is not None and isinstance(hdrext, int):
+ if hdrext in range(len(fobj)): # insure specified hdrext is in fobj
+ if isinstance(fobj[hdrext], fits.hdu.base.NonstandardExtHDU) and \
+ fobj[hdrext].header['EXTNAME'] == 'HDRLET':
+ hdrlets.append(hdrext)
+ else:
+ for ext in fobj:
+ if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
+ if get_all:
+ hdrlets.append(fobj.index(ext))
+ else:
+ if hdrext is not None:
+ if isinstance(hdrext, tuple):
+ hdrextname = hdrext[0]
+ hdrextnum = hdrext[1]
+ else:
+ hdrextname = 'HDRLET'
+ hdrextnum = hdrext
+ hdrext_match = ((hdrext is not None) and
+ (hdrextnum == ext.header['EXTVER']) and
+ (hdrextname == ext.header['EXTNAME']))
+ hdrname_match = ((hdrname is not None) and
+ (hdrname == ext.header['HDRNAME']))
+ distname_match = ((distname is not None) and
+ (distname == ext.header['DISTNAME']))
+ if hdrext_match or hdrname_match or distname_match:
+ hdrlets.append(fobj.index(ext))
+
+ if open_fobj:
+ fobj.close()
+
+ if len(hdrlets) == 0:
+ if hdrname:
+ kwerr = 'hdrname'
+ kwval = hdrname
+ elif hdrext:
+ kwerr = 'hdrext'
+ kwval = hdrext
+ else:
+ kwerr = 'distname'
+ kwval = distname
+ message = """\n
+ =====================================================
+ No valid Headerlet extension found!'
+ "%s" = %s not found in %s.' % (kwerr, kwval, fname)
+ =====================================================
+ """
+ logger.critical(message)
+ raise ValueError
+
+ return hdrlets
+
+def verify_hdrname_is_unique(fobj, hdrname):
+ """
+ Verifies that no other HeaderletHDU extension has the specified hdrname.
+
+ Parameters
+ ----------
+ fobj : str, `astropy.io.fits.HDUList`
+ Name of FITS file or open fits file object
+ hdrname : str
+ value of HDRNAME for HeaderletHDU to be compared as unique
+
+ Returns
+ -------
+ unique: bool
+ If True, no other HeaderletHDU has the specified HDRNAME value
+ """
+ hdrnames_list = get_headerlet_kw_names(fobj)
+ unique = not(hdrname in hdrnames_list)
+
+ return unique
+
+def update_versions(sourcehdr, desthdr):
+ """
+ Update keywords which store version numbers
+ """
+ phdukw = {'PYWCSVER': 'Version of PYWCS used to updated the WCS',
+ 'UPWCSVER': 'Version of STWCS used to updated the WCS'
+ }
+ for key in phdukw:
+ try:
+ desthdr[key] = (sourcehdr[key], sourcehdr.comments[key])
+ except KeyError:
+ desthdr[key] = (" ", phdukw[key])
+
+def update_ref_files(source, dest):
+ """
+ Update the reference files name in the primary header of 'dest'
+ using values from 'source'
+
+ Parameters
+ ----------
+ source : `astropy.io.fits.Header`
+ dest : `astropy.io.fits.Header`
+ """
+ logger.info("Updating reference files")
+ phdukw = {'NPOLFILE': True,
+ 'IDCTAB': True,
+ 'D2IMFILE': True,
+ 'SIPNAME': True,
+ 'DISTNAME': True}
+
+ for key in phdukw:
+ try:
+ try:
+ del dest[key]
+ except:
+ pass
+ dest.set(key, source[key], source.comments[key])
+ except KeyError:
+ phdukw[key] = False
+ return phdukw
+
+def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
+ output=None, clobber=True, quiet=False ):
+ """
+ Print out summary dictionary to STDOUT, and possibly an output file
+
+ """
+ nrows = None
+ if idcol:
+ nrows = len(idcol['vals'])
+
+ # Find max width of each column
+ column_widths = {}
+ for kw in summary_dict:
+ colwidth = np.array(summary_dict[kw]['width']).max()
+ if maxwidth:
+ colwidth = min(colwidth, maxwidth)
+ column_widths[kw] = colwidth + pad
+ if nrows is None:
+ nrows = len(summary_dict[kw]['vals'])
+
+ # print rows now
+ outstr = ''
+ # Start with column names
+ if idcol:
+ outstr += COLUMN_FMT.format(idcol['name'], width=idcol['width'] + pad)
+ for kw in summary_cols:
+ outstr += COLUMN_FMT.format(kw, width=column_widths[kw])
+ outstr += '\n'
+ # Now, add a row for each headerlet
+ for row in range(nrows):
+ if idcol:
+ outstr += COLUMN_FMT.format(idcol['vals'][row],
+ width=idcol['width']+pad)
+ for kw in summary_cols:
+ val = summary_dict[kw]['vals'][row][:(column_widths[kw]-pad)]
+ outstr += COLUMN_FMT.format(val, width=column_widths[kw])
+ outstr += '\n'
+ if not quiet:
+ print(outstr)
+
+ # If specified, write info to separate text file
+ write_file = False
+ if output:
+ output = fu.osfn(output) # Expand any environment variables in filename
+ write_file = True
+ if os.path.exists(output):
+ if clobber:
+ os.remove(output)
+ else:
+ print('WARNING: Not writing results to file!')
+ print(' Output text file ', output, ' already exists.')
+ print(' Set "clobber" to True or move file before trying again.')
+ write_file = False
+ if write_file:
+ fout = open(output, mode='w')
+ fout.write(outstr)
+ fout.close()
+
+#### Private utility functions
+def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
+ sipname, npolfile, d2imfile,
+ nmatch,catalog, wcskey,
+ author, descrip, history):
+ # convert input values into valid FITS kw values
+ if author is None:
+ author = ''
+ if descrip is None:
+ descrip = ''
+
+ sipname, idctab = utils.build_sipname(fobj, fname, sipname)
+ logger.info("Setting sipname value to %s" % sipname)
+
+ npolname, npolfile = utils.build_npolname(fobj, npolfile)
+ logger.info("Setting npolfile value to %s" % npolname)
+
+ d2imname, d2imfile = utils.build_d2imname(fobj,d2imfile)
+ logger.info("Setting d2imfile value to %s" % d2imname)
+
+ distname = utils.build_distname(sipname, npolname, d2imname)
+ logger.info("Setting distname to %s" % distname)
+
+ # open file and parse comments
+ if history not in ['', ' ', None, 'INDEF'] and os.path.isfile(history):
+ f = open(fu.osfn(history))
+ history = f.readlines()
+ f.close()
+ else:
+ history = ''
+
+ rms_ra = fobj[wcsext].header.get("CRDER1"+wcskey, 0)
+ rms_dec = fobj[wcsext].header.get("CRDER2"+wcskey, 0)
+ if not nmatch:
+ nmatch = fobj[wcsext].header.get("NMATCH"+wcskey, 0)
+ if not catalog:
+ catalog = fobj[wcsext].header.get('CATALOG'+wcskey, "")
+ # get the version of STWCS used to create the WCS of the science file.
+ #try:
+ #upwcsver = fobj[0].header.cards[fobj[0].header.index('UPWCSVER')]
+ #except KeyError:
+ #upwcsver = pyfits.Card("UPWCSVER", " ",
+ #"Version of STWCS used to update the WCS")
+ #try:
+ #pywcsver = fobj[0].header.cards[fobj[0].header.index('PYWCSVER')]
+ #except KeyError:
+ #pywcsver = pyfits.Card("PYWCSVER", " ",
+ #"Version of PYWCS used to update the WCS")
+ upwcsver = fobj[0].header.get('UPWCSVER', "")
+ pywcsver = fobj[0].header.get('PYWCSVER', "")
+ # build Primary HDU
+ phdu = fits.PrimaryHDU()
+ phdu.header['DESTIM'] = (destim, 'Destination observation root name')
+ phdu.header['HDRNAME'] = (hdrname, 'Headerlet name')
+ fmt = "%Y-%m-%dT%H:%M:%S"
+ phdu.header['DATE'] = (time.strftime(fmt), 'Date FITS file was generated')
+ phdu.header['WCSNAME'] = (wcsname, 'WCS name')
+ phdu.header['DISTNAME'] = (distname, 'Distortion model name')
+ phdu.header['SIPNAME'] = (sipname,
+ 'origin of SIP polynomial distortion model')
+ phdu.header['NPOLFILE'] = (npolfile,
+ 'origin of non-polynmial distortion model')
+ phdu.header['D2IMFILE'] = (d2imfile,
+ 'origin of detector to image correction')
+ phdu.header['IDCTAB'] = (idctab,
+ 'origin of Polynomial Distortion')
+ phdu.header['AUTHOR'] = (author, 'headerlet created by this user')
+ phdu.header['DESCRIP'] = (descrip,
+ 'Short description of headerlet solution')
+ phdu.header['RMS_RA'] = (rms_ra,
+ 'RMS in RA at ref pix of headerlet solution')
+ phdu.header['RMS_DEC'] = (rms_dec,
+ 'RMS in Dec at ref pix of headerlet solution')
+ phdu.header['NMATCH'] = (nmatch,
+ 'Number of sources used for headerlet solution')
+ phdu.header['CATALOG'] = (catalog,
+ 'Astrometric catalog used for headerlet '
+ 'solution')
+ phdu.header['UPWCSVER'] = (upwcsver, "Version of STWCS used to update the WCS")
+ phdu.header['PYWCSVER'] = (pywcsver, "Version of PYWCS used to update the WCS")
+
+ # clean up history string in order to remove whitespace characters that
+ # would cause problems with FITS
+ if isinstance(history, list):
+ history_str = ''
+ for line in history:
+ history_str += line
+ else:
+ history_str = history
+ history_lines = textwrap.wrap(history_str, width=70)
+ for hline in history_lines:
+ phdu.header.add_history(hline)
+
+ return phdu
+
+
+#### Public Interface functions
+@with_logging
+def extract_headerlet(filename, output, extnum=None, hdrname=None,
+ clobber=False, logging=True):
+ """
+ Finds a headerlet extension in a science file and writes it out as
+ a headerlet FITS file.
+
+ If both hdrname and extnum are given they should match, if not
+ raise an Exception
+
+ Parameters
+ ----------
+ filename: string or HDUList or Python list
+ This specifies the name(s) of science file(s) from which headerlets
+ will be extracted.
+
+ String input formats supported include use of wild-cards, IRAF-style
+ '@'-files (given as '@<filename>') and comma-separated list of names.
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ If a list of filenames has been specified, it will extract a
+ headerlet from the same extnum from all filenames.
+ output: string
+ Filename or just rootname of output headerlet FITS file
+ If string does not contain '.fits', it will create a filename with
+ '_hlet.fits' suffix
+ extnum: int
+ Extension number which contains the headerlet to be written out
+ hdrname: string
+ Unique name for headerlet, stored as the HDRNAME keyword
+ It stops if a value is not provided and no extnum has been specified
+ clobber: bool
+ If output file already exists, this parameter specifies whether or not
+ to overwrite that file [Default: False]
+ logging: boolean
+ enable logging to a file
+
+ """
+
+ if isinstance(filename, fits.HDUList):
+ filename = [filename]
+ else:
+ filename, oname = parseinput.parseinput(filename)
+
+ for f in filename:
+ fobj, fname, close_fobj = parse_filename(f)
+ frootname = fu.buildNewRootname(fname)
+ if hdrname in ['', ' ', None, 'INDEF'] and extnum is None:
+ if close_fobj:
+ fobj.close()
+ logger.critical("Expected a valid extnum or hdrname parameter")
+ raise ValueError
+ if hdrname is not None:
+ extn_from_hdrname = find_headerlet_HDUs(fobj, hdrname=hdrname)[0]
+ if extn_from_hdrname != extnum:
+ logger.critical("hdrname and extnmu should refer to the same FITS extension")
+ raise ValueError
+ else:
+ hdrhdu = fobj[extn_from_hdrname]
+ else:
+ hdrhdu = fobj[extnum]
+
+ if not isinstance(hdrhdu, HeaderletHDU):
+ logger.critical("Specified extension is not a headerlet")
+ raise ValueError
+
+ hdrlet = hdrhdu.headerlet
+
+ if output is None:
+ output = frootname
+
+ if '.fits' in output:
+ outname = output
+ else:
+ outname = '%s_hlet.fits' % output
+
+ hdrlet.tofile(outname, clobber=clobber)
+
+ if close_fobj:
+ fobj.close()
+
+
+@with_logging
+def write_headerlet(filename, hdrname, output=None, sciext='SCI',
+ wcsname=None, wcskey=None, destim=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ attach=True, clobber=False, logging=False):
+
+ """
+ Save a WCS as a headerlet FITS file.
+
+ This function will create a headerlet, write out the headerlet to a
+ separate headerlet file, then, optionally, attach it as an extension
+ to the science image (if it has not already been archived)
+
+ Either wcsname or wcskey must be provided; if both are given, they must
+ match a valid WCS.
+
+ Updates wcscorr if necessary.
+
+ Parameters
+ ----------
+ filename: string or HDUList or Python list
+ This specifies the name(s) of science file(s) from which headerlets
+ will be created and written out.
+ String input formats supported include use of wild-cards, IRAF-style
+ '@'-files (given as '@<filename>') and comma-separated list of names.
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string
+ Unique name for this headerlet, stored as HDRNAME keyword
+ output: string or None
+ Filename or just rootname of output headerlet FITS file
+ If string does not contain '.fits', it will create a filename
+ starting with the science filename and ending with '_hlet.fits'.
+ If None, a default filename based on the input filename will be
+ generated for the headerlet FITS filename
+ sciext: string
+ name (EXTNAME) of extension that contains WCS to be saved
+ wcsname: string
+ name of WCS to be archived, if " ": stop
+ wcskey: one of A...Z or " " or "PRIMARY"
+ if " " or "PRIMARY" - archive the primary WCS
+ destim: string
+ DESTIM keyword
+ if NOne, use ROOTNAME or science file name
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ stored. If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ author: string
+ Name of user who created the headerlet, added as 'AUTHOR' keyword
+ to headerlet PRIMARY header
+ descrip: string
+ Short description of the solution provided by the headerlet
+ This description will be added as the single 'DESCRIP' keyword
+ to the headerlet PRIMARY header
+ history: filename, string or list of strings
+ Long (possibly multi-line) description of the solution provided
+ by the headerlet. These comments will be added as 'HISTORY' cards
+ to the headerlet PRIMARY header
+ If filename is specified, it will format and attach all text from
+ that file as the history.
+ attach: bool
+ Specify whether or not to attach this headerlet as a new extension
+ It will verify that no other headerlet extension has been created with
+ the same 'hdrname' value.
+ clobber: bool
+ If output file already exists, this parameter specifies whether or not
+ to overwrite that file [Default: False]
+ logging: boolean
+ enable file logging
+ """
+
+ if isinstance(filename, fits.HDUList):
+ filename = [filename]
+ else:
+ filename, oname = parseinput.parseinput(filename)
+
+ for f in filename:
+ if isinstance(f, str):
+ fname = f
+ else:
+ fname = f.filename()
+
+ if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
+ message = """\n
+ No valid WCS found found in %s.
+ A valid value for either "wcsname" or "wcskey"
+ needs to be specified.
+ """ % fname
+ logger.critical(message)
+ raise ValueError
+
+ # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
+ if wcskey == 'PRIMARY':
+ wcskey = ' '
+
+ if attach:
+ umode = 'update'
+ else:
+ umode = 'readonly'
+
+ fobj, fname, close_fobj = parse_filename(f, mode=umode)
+
+ # Interpret sciext input for this file
+ if isinstance(sciext, int):
+ sciextlist = [sciext] # allow for specification of simple FITS header
+ elif isinstance(sciext, str):
+ numsciext = countExtn(fobj, sciext)
+ if numsciext > 0:
+ sciextlist = [tuple((sciext,i)) for i in range(1, numsciext+1)]
+ else:
+ sciextlist = [0]
+ elif isinstance(sciext, list):
+ sciextlist = sciext
+ else:
+ errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
+ " a valid EXTNAME string, or an integer."
+ logger.critical(errstr)
+ raise ValueError
+
+ wnames = altwcs.wcsnames(fobj,ext=sciextlist[0])
+
+ # Insure that WCSCORR table has been created with all original
+ # WCS's recorded prior to adding the headerlet WCS
+ wcscorr.init_wcscorr(fobj)
+
+ if wcsname is None:
+ scihdr = fobj[sciextlist[0]].header
+ wname = scihdr['wcsname'+wcskey]
+ else:
+ wname = wcsname
+ if hdrname in [None, ' ', '']:
+ hdrname = wcsname
+
+ logger.critical('Creating the headerlet from image %s' % fname)
+ hdrletobj = create_headerlet(fobj, sciext=sciextlist,
+ wcsname=wname, wcskey=wcskey,
+ hdrname=hdrname,
+ sipname=sipname, npolfile=npolfile,
+ d2imfile=d2imfile, author=author,
+ descrip=descrip, history=history,
+ nmatch=nmatch, catalog=catalog,
+ logging=False)
+
+ if attach:
+ # Check to see whether or not a HeaderletHDU with
+ #this hdrname already exists
+ hdrnames = get_headerlet_kw_names(fobj)
+ if hdrname not in hdrnames:
+ hdrlet_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
+
+ if destim is not None:
+ hdrlet_hdu.header['destim'] = destim
+
+ fobj.append(hdrlet_hdu)
+
+ # Update the WCSCORR table with new rows from the headerlet's WCSs
+ wcscorr.update_wcscorr(fobj, source=hdrletobj,
+ extname='SIPWCS', wcs_id=wname)
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ else:
+ message = """
+ Headerlet with hdrname %s already archived for WCS %s.
+ No new headerlet appended to %s.
+ """ % (hdrname, wname, fname)
+ logger.critical(message)
+
+ if close_fobj:
+ logger.info('Closing image in write_headerlet()...')
+ fobj.close()
+
+ frootname = fu.buildNewRootname(fname)
+
+ if output is None:
+ # Generate default filename for headerlet FITS file
+ outname = '{0}_hlet.fits'.format(frootname)
+ else:
+ outname = output
+
+ if not outname.endswith('.fits'):
+ outname = '{0}_{1}_hlet.fits'.format(frootname,outname)
+
+ # If user specifies an output filename for headerlet, write it out
+ hdrletobj.tofile(outname, clobber=clobber)
+ logger.critical( 'Created Headerlet file %s ' % outname)
+
+ del hdrletobj
+
+@with_logging
+def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
+ wcskey=" ", wcsname=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ logging=False, logmode='w'):
+ """
+ Create a headerlet from a WCS in a science file
+ If both wcskey and wcsname are given they should match, if not
+ raise an Exception
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ sciext: string or python list (default: 'SCI')
+ Extension in which the science data with the linear WCS is.
+ The headerlet will be created from these extensions.
+ If string - a valid EXTNAME is expected
+ If int - specifies an extension with a valid WCS, such as 0 for a
+ simple FITS file
+ If list - a list of FITS extension numbers or strings representing
+ extension tuples, e.g. ('SCI, 1') is expected.
+ hdrname: string
+ value of HDRNAME keyword
+ Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey>
+ It stops if neither is found in the science file and a value is not provided
+ destim: string or None
+ name of file this headerlet can be applied to
+ if None, use ROOTNAME keyword
+ wcskey: char (A...Z) or " " or "PRIMARY" or None
+ a char representing an alternate WCS to be used for the headerlet
+ if " ", use the primary (default)
+ if None use wcsname
+ wcsname: string or None
+ if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ author: string
+ Name of user who created the headerlet, added as 'AUTHOR' keyword
+ to headerlet PRIMARY header
+ descrip: string
+ Short description of the solution provided by the headerlet
+ This description will be added as the single 'DESCRIP' keyword
+ to the headerlet PRIMARY header
+ history: filename, string or list of strings
+ Long (possibly multi-line) description of the solution provided
+ by the headerlet. These comments will be added as 'HISTORY' cards
+ to the headerlet PRIMARY header
+ If filename is specified, it will format and attach all text from
+ that file as the history.
+ nmatch: int (optional)
+ Number of sources used in the new solution fit
+ catalog: string (optional)
+ Astrometric catalog used for headerlet solution
+ logging: boolean
+ enable file logging
+ logmode: 'w' or 'a'
+ log file open mode
+
+ Returns
+ -------
+ Headerlet object
+
+ """
+ if wcskey == 'O':
+ message = "Warning: 'O' is a reserved key for the original WCS. Quitting..."
+ logger.info(message)
+ return None
+
+ fobj, fname, close_file = parse_filename(filename)
+ # based on `sciext` create a list of (EXTNAME, EXTVER) tuples
+ # of extensions with WCS to be saved in a headerlet
+ sciext = get_extname_extver_list(fobj, sciext)
+ logger.debug("Data extensions from which to create headerlet:\n\t %s"
+ % (str(sciext)))
+ if not sciext:
+ logger.critical("No valid target extensions found in file %s" % fname)
+ raise ValueError
+
+ # Define extension to evaluate for verification of input parameters
+ wcsext = sciext[0]
+ logger.debug("sciext in create_headerlet is %s" % str(sciext))
+ # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
+ if wcskey == 'PRIMARY':
+ wcskey = ' '
+ logger.info("wcskey reset from 'PRIMARY' to ' '")
+ wcskey = wcskey.upper()
+ wcsnamekw = "".join(["WCSNAME", wcskey.upper()]).rstrip()
+ hdrnamekw = "".join(["HDRNAME", wcskey.upper()]).rstrip()
+
+ wnames = altwcs.wcsnames(fobj, ext=wcsext)
+ if not wcsname:
+ # User did not specify a value for 'wcsname'
+ if wcsnamekw in fobj[wcsext].header:
+ #check if there's a WCSNAME for this wcskey in the header
+ wcsname = fobj[wcsext].header[wcsnamekw]
+ logger.info("Setting wcsname from header[%s] to %s" % (wcsnamekw, wcsname))
+ else:
+ if hdrname not in ['', ' ', None, "INDEF"]:
+ """
+ If wcsname for this wcskey was not provided
+ and WCSNAME<wcskey> does not exist in the header
+ and hdrname is provided, then
+ use hdrname as WCSNAME for the headerlet.
+ """
+ wcsname = hdrname
+ logger.debug("Setting wcsname from hdrname to %s" % hdrname)
+ else:
+ if hdrnamekw in fobj[wcsext].header:
+ wcsname = fobj[wcsext].header[hdrnamekw]
+ logger.debug("Setting wcsname from header[%s] to %s" % (hdrnamekw, wcsname))
+ else:
+ message = """
+ Required keywords 'HDRNAME' or 'WCSNAME' not found!
+ Please specify a value for parameter 'hdrname'
+ or update header with 'WCSNAME' keyword.
+ """
+ logger.critical(message)
+ raise KeyError
+ else:
+ # Verify that 'wcsname' and 'wcskey' values specified by user reference
+ # the same WCS
+ wname = fobj[wcsext].header[wcsnamekw]
+ if wcsname != wname:
+ message = "\tInconsistent values for 'wcskey' and 'wcsname' specified!\n"
+ message += " 'wcskey' = %s and 'wcsname' = %s. \n" % (wcskey, wcsname)
+ message += "Actual value of %s found to be %s. \n" % (wcsnamekw, wname)
+ logger.critical(message)
+ raise KeyError
+ wkeys = altwcs.wcskeys(fobj, ext=wcsext)
+ if wcskey != ' ':
+ if wcskey not in wkeys:
+ logger.critical('No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext)))
+ raise ValueError("No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))")
+
+ # get remaining required keywords
+ if destim is None:
+ if 'ROOTNAME' in fobj[0].header:
+ destim = fobj[0].header['ROOTNAME']
+ logger.info("Setting destim to rootname of the science file")
+ else:
+ destim = fname
+ logger.info('DESTIM not provided')
+ logger.info('Keyword "ROOTNAME" not found')
+ logger.info('Using file name as DESTIM')
+
+ if not hdrname:
+ # check if HDRNAME<wcskey> is in header
+ if hdrnamekw in fobj[wcsext].header:
+ hdrname = fobj[wcsext].header[hdrnamekw]
+ else:
+ if wcsnamekw in fobj[wcsext].header:
+ hdrname = fobj[wcsext].header[wcsnamekw]
+ message = """
+ Using default value for HDRNAME of "%s" derived from %s.
+ """ % (hdrname, wcsnamekw)
+ logger.info(message)
+ logger.info("Setting hdrname to %s from header[%s]"
+ % (hdrname, wcsnamekw))
+ else:
+ message = "Required keywords 'HDRNAME' or 'WCSNAME' not found"
+ logger.critical(message)
+ raise KeyError
+
+
+
+ hdul = []
+ phdu = _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
+ sipname, npolfile, d2imfile,
+ nmatch, catalog, wcskey,
+ author, descrip, history)
+ hdul.append(phdu)
+ wcsdvarr_extns = []
+ """
+ nd2i is a counter for d2i extensions to be used when the science file
+ has an old d2i correction format. The old format did not write EXTVER
+ kw for the d2i correction in the science header bu tthe new format expects
+ them.
+ """
+ nd2i_extver = 1
+ for ext in sciext:
+ wkeys = altwcs.wcskeys(fobj, ext=ext)
+ if wcskey != ' ':
+ if wcskey not in wkeys:
+ logger.debug(
+ 'No WCS with wcskey=%s found in extension %s. '
+ 'Skipping...' % (wcskey, str(ext)))
+ raise ValueError("")
+
+ hwcs = HSTWCS(fobj, ext=ext, wcskey=wcskey)
+
+ whdul = hwcs.to_fits(relax=True, key=" ")
+ if hasattr(hwcs, 'orientat'):
+ orient_comment = "positions angle of image y axis (deg. e of n)"
+ whdul[0].header['ORIENTAT'] = (hwcs.orientat, orient_comment)
+
+ whdul[0].header.append(('TG_ENAME', ext[0], 'Target science data extname'))
+ whdul[0].header.append(('TG_EVER', ext[1], 'Target science data extver'))
+
+ if hwcs.wcs.has_cd():
+ whdul[0].header = altwcs.pc2cd(whdul[0].header)
+
+ idckw = hwcs._idc2hdr()
+ whdul[0].header.extend(idckw)
+
+ if hwcs.det2im1 or hwcs.det2im2:
+ try:
+ whdul[0].header.append(fobj[ext].header.cards['D2IMEXT'])
+ except KeyError:
+ pass
+ whdul[0].header.extend(fobj[ext].header.cards['D2IMERR*'])
+ if 'D2IM1.EXTVER' in whdul[0].header:
+ try:
+ whdul[0].header['D2IM1.EXTVER'] = fobj[ext].header['D2IM1.EXTVER']
+ except KeyError:
+ whdul[0].header['D2IM1.EXTVER'] = nd2i_extver
+ nd2i_extver += 1
+ if 'D2IM2.EXTVER' in whdul[0].header:
+ try:
+ whdul[0].header['D2IM2.EXTVER'] = fobj[ext].header['D2IM2.EXTVER']
+ except KeyError:
+ whdul[0].header['D2IM2.EXTVER'] = nd2i_extver
+ nd2i_extver += 1
+
+ if hwcs.cpdis1 or hwcs.cpdis2:
+ whdul[0].header.extend(fobj[ext].header.cards['CPERR*'])
+ try:
+ whdul[0].header.append(fobj[ext].header.cards['NPOLEXT'])
+ except KeyError:
+ pass
+ if 'DP1.EXTVER' in whdul[0].header:
+ whdul[0].header['DP1.EXTVER'] = fobj[ext].header['DP1.EXTVER']
+ if 'DP2.EXTVER' in whdul[0].header:
+ whdul[0].header['DP2.EXTVER'] = fobj[ext].header['DP2.EXTVER']
+ ihdu = fits.ImageHDU(header=whdul[0].header, name='SIPWCS')
+
+ if ext[0] != "PRIMARY":
+ ihdu.update_ext_version(fobj[ext].header['EXTVER'], comment='Extension version')
+
+ hdul.append(ihdu)
+
+ if hwcs.cpdis1:
+ whdu = whdul[('WCSDVARR', 1)].copy()
+ whdu.update_ext_version(fobj[ext].header['DP1.EXTVER'])
+ hdul.append(whdu)
+ if hwcs.cpdis2:
+ whdu = whdul[('WCSDVARR', 2)].copy()
+ whdu.update_ext_version(fobj[ext].header['DP2.EXTVER'])
+ hdul.append(whdu)
+
+ if hwcs.det2im1:
+ whdu = whdul[('D2IMARR', 1)].copy()
+ whdu.update_ext_version(ihdu.header['D2IM1.EXTVER'])
+ hdul.append(whdu)
+ if hwcs.det2im2:
+ whdu = whdul[('D2IMARR', 2)].copy()
+ whdu.update_ext_version(ihdu.header['D2IM2.EXTVER'])
+ hdul.append(whdu)
+
+
+ #if hwcs.det2im1 or hwcs.det2im2:
+ #try:
+ #darr = hdul[('D2IMARR', 1)]
+ #except KeyError:
+ #whdu = whdul[('D2IMARR')]
+ #whdu.update_ext_version(1)
+ #hdul.append(whdu)
+ if close_file:
+ fobj.close()
+
+ hlet = Headerlet(hdul, logging=logging, logmode='a')
+ hlet.init_attrs()
+ return hlet
+
+@with_logging
+def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True,
+ force=False, logging=False, logmode='a'):
+ """
+ Apply headerlet 'hdrfile' to a science observation 'destfile' as the primary WCS
+
+ Parameters
+ ----------
+ filename: string or list of strings
+ File name(s) of science observation whose WCS solution will be updated
+ hdrlet: string or list of strings
+ Headerlet file(s), must match 1-to-1 with input filename(s)
+ attach: boolean
+ True (default): append headerlet to FITS file as a new extension.
+ archive: boolean
+ True (default): before updating, create a headerlet with the
+ WCS old solution.
+ force: boolean
+ If True, this will cause the headerlet to replace the current PRIMARY
+ WCS even if it has a different distortion model. [Default: False]
+ logging: boolean
+ enable file logging
+ logmode: 'w' or 'a'
+ log file open mode
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+ if not isinstance(hdrlet, list):
+ hdrlet = [hdrlet]
+ if len(hdrlet) != len(filename):
+ logger.critical("Filenames must have matching headerlets. "
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+
+ for fname,h in zip(filename,hdrlet):
+ print("Applying {0} as Primary WCS to {1}".format(h,fname))
+ hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
+ hlet.apply_as_primary(fname, attach=attach, archive=archive,
+ force=force)
+
+
+@with_logging
+def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None,
+ wcsname=None, logging=False, logmode='w'):
+ """
+ Apply headerlet to a science observation as an alternate WCS
+
+ Parameters
+ ----------
+ filename: string or list of strings
+ File name(s) of science observation whose WCS solution will be updated
+ hdrlet: string or list of strings
+ Headerlet file(s), must match 1-to-1 with input filename(s)
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ wcskey: string
+ Key value (A-Z, except O) for this alternate WCS
+ If None, the next available key will be used
+ wcsname: string
+ Name to be assigned to this alternate WCS
+ WCSNAME is a required keyword in a Headerlet but this allows the
+ user to change it as desired.
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+ if not isinstance(hdrlet, list):
+ hdrlet = [hdrlet]
+ if len(hdrlet) != len(filename):
+ logger.critical("Filenames must have matching headerlets. "
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+
+ for fname,h in zip(filename,hdrlet):
+ print('Applying {0} as an alternate WCS to {1}'.format(h,fname))
+ hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
+ hlet.apply_as_alternate(fname, attach=attach,
+ wcsname=wcsname, wcskey=wcskey)
+
+
+@with_logging
+def attach_headerlet(filename, hdrlet, logging=False, logmode='a'):
+ """
+ Attach Headerlet as an HeaderletHDU to a science file
+
+ Parameters
+ ----------
+ filename: HDUList or list of HDULists
+ science file(s) to which the headerlet should be applied
+ hdrlet: string, Headerlet object or list of strings or Headerlet objects
+ string representing a headerlet file(s), must match 1-to-1 input filename(s)
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+ if not isinstance(hdrlet, list):
+ hdrlet = [hdrlet]
+ if len(hdrlet) != len(filename):
+ logger.critical("Filenames must have matching headerlets. "
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+
+ for fname,h in zip(filename,hdrlet):
+ print('Attaching {0} as Headerlet extension to {1}'.format(h,fname))
+ hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
+ hlet.attach_to_file(fname,archive=True)
+
+
+@with_logging
+def delete_headerlet(filename, hdrname=None, hdrext=None, distname=None,
+ logging=False, logmode='w'):
+ """
+ Deletes HeaderletHDU(s) with same HDRNAME from science files
+
+ Notes
+ -----
+ One of hdrname, hdrext or distname should be given.
+ If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
+ If hdrext is given - delete HeaderletHDU in extension.
+ If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
+ Updates wcscorr
+
+ Parameters
+ ----------
+ filename: string, HDUList or list of strings
+ Filename can be specified as a single filename or HDUList, or
+ a list of filenames
+ Each input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string or None
+ HeaderletHDU primary header keyword HDRNAME
+ hdrext: int, tuple or None
+ HeaderletHDU FITS extension number
+ tuple has the form ('HDRLET', 1)
+ distname: string or None
+ distortion model as specified in the DISTNAME keyword
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ if not isinstance(filename, list):
+ filename = [filename]
+
+ for f in filename:
+ print("Deleting Headerlet from ",f)
+ _delete_single_headerlet(f, hdrname=hdrname, hdrext=hdrext,
+ distname=distname, logging=logging, logmode='a')
+
+def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None,
+ logging=False, logmode='w'):
+ """
+ Deletes HeaderletHDU(s) from a SINGLE science file
+
+ Notes
+ -----
+ One of hdrname, hdrext or distname should be given.
+ If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
+ If hdrext is given - delete HeaderletHDU in extension.
+ If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
+ Updates wcscorr
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string or None
+ HeaderletHDU primary header keyword HDRNAME
+ hdrext: int, tuple or None
+ HeaderletHDU FITS extension number
+ tuple has the form ('HDRLET', 1)
+ distname: string or None
+ distortion model as specified in the DISTNAME keyword
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+ hdrlet_ind = find_headerlet_HDUs(filename, hdrname=hdrname, hdrext=hdrext,
+ distname=distname, logging=logging, logmode='a')
+ if len(hdrlet_ind) == 0:
+ message = """
+ No HDUs deleted... No Headerlet HDUs found with '
+ hdrname = %s
+ hdrext = %s
+ distname = %s
+ Please review input parameters and try again.
+ """ % (hdrname, str(hdrext), distname)
+ logger.critical(message)
+ return
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ # delete row(s) from WCSCORR table now...
+ #
+ #
+ if hdrname not in ['', ' ', None, 'INDEF']:
+ selections = {'hdrname': hdrname}
+ elif hdrname in ['', ' ', None, 'INDEF'] and hdrext is not None:
+ selections = {'hdrname': fobj[hdrext].header['hdrname']}
+ else:
+ selections = {'distname': distname}
+ wcscorr.delete_wcscorr_row(fobj['WCSCORR'].data, selections)
+
+ # delete the headerlet extension now
+ for hdrind in hdrlet_ind:
+ del fobj[hdrind]
+
+ utils.updateNEXTENDKw(fobj)
+ # Update file object with changes
+ fobj.flush()
+ # close file, if was opened by this function
+ if close_fobj:
+ fobj.close()
+ logger.critical('Deleted headerlet from extension(s) %s ' % str(hdrlet_ind))
+
+
+def headerlet_summary(filename, columns=None, pad=2, maxwidth=None,
+ output=None, clobber=True, quiet=False):
+ """
+
+ Print a summary of all HeaderletHDUs in a science file to STDOUT, and
+ optionally to a text file
+ The summary includes:
+ HDRLET_ext_number HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ columns: list
+ List of headerlet PRIMARY header keywords to report in summary
+ By default (set to None), it will use the default set of keywords
+ defined as the global list DEFAULT_SUMMARY_COLS
+ pad: int
+ Number of padding spaces to put between printed columns
+ [Default: 2]
+ maxwidth: int
+ Maximum column width(not counting padding) for any column in summary
+ By default (set to None), each column's full width will be used
+ output: string (optional)
+ Name of optional output file to record summary. This filename
+ can contain environment variables.
+ [Default: None]
+ clobber: bool
+ If True, will overwrite any previous output file of same name
+ quiet: bool
+ If True, will NOT report info to STDOUT
+
+ """
+ if columns is None:
+ summary_cols = DEFAULT_SUMMARY_COLS
+ else:
+ summary_cols = columns
+
+ summary_dict = {}
+ for kw in summary_cols:
+ summary_dict[kw] = copy.deepcopy(COLUMN_DICT)
+
+ # Define Extension number column
+ extnums_col = copy.deepcopy(COLUMN_DICT)
+ extnums_col['name'] = 'EXTN'
+ extnums_col['width'] = 6
+
+ fobj, fname, close_fobj = parse_filename(filename)
+ # find all HDRLET extensions and combine info into a single summary
+ for extn in fobj:
+ if 'extname' in extn.header and extn.header['extname'] == 'HDRLET':
+ hdrlet_indx = fobj.index_of(('hdrlet', extn.header['extver']))
+ try:
+ ext_cols, ext_summary = extn.headerlet.summary(columns=summary_cols)
+ extnums_col['vals'].append(hdrlet_indx)
+ for kw in summary_cols:
+ for key in COLUMN_DICT:
+ summary_dict[kw][key].extend(ext_summary[kw][key])
+ except:
+ print("Skipping headerlet")
+ print("Could not read Headerlet from extension ", hdrlet_indx)
+
+ if close_fobj:
+ fobj.close()
+
+ # Print out the summary dictionary
+ print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
+ idcol=extnums_col, output=output,
+ clobber=clobber, quiet=quiet)
+
+
+@with_logging
+def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
+ force=False, logging=False, logmode='w'):
+ """
+ Restores a headerlet as a primary WCS
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string
+ HDRNAME keyword of HeaderletHDU
+ hdrext: int or tuple
+ Headerlet extension number of tuple ('HDRLET',2)
+ archive: boolean (default: True)
+ When the distortion model in the headerlet is the same as the distortion model of
+ the science file, this flag indicates if the primary WCS should be saved as an alternate
+ nd a headerlet extension.
+ When the distortion models do not match this flag indicates if the current primary and
+ alternate WCSs should be archived as headerlet extensions and alternate WCS.
+ force: boolean (default:False)
+ When the distortion models of the headerlet and the primary do not match, and archive
+ is False, this flag forces an update of the primary.
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+
+ hdrlet_ind = find_headerlet_HDUs(filename, hdrext=hdrext, hdrname=hdrname)
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ if len(hdrlet_ind) > 1:
+ if hdrext:
+ kwerr = 'hdrext'
+ kwval = hdrext
+ else:
+ kwerr = 'hdrname'
+ kwval = hdrname
+ message = """
+ Multiple Headerlet extensions found with the same name.
+ %d Headerlets with "%s" = %s found in %s.
+ """% (len(hdrlet_ind), kwerr, kwval, fname)
+ if close_fobj:
+ fobj.close()
+ logger.critical(message)
+ raise ValueError
+
+ hdrlet_indx = hdrlet_ind[0]
+
+ # read headerlet from HeaderletHDU into memory
+ if hasattr(fobj[hdrlet_ind[0]], 'hdulist'):
+ hdrlet = fobj[hdrlet_indx].hdulist
+ else:
+ hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS
+
+ # read in the names of the extensions which HeaderletHDU updates
+ extlist = []
+ for ext in hdrlet:
+ if 'extname' in ext.header and ext.header['extname'] == 'SIPWCS':
+ # convert from string to tuple or int
+ sciext = eval(ext.header['sciext'])
+ extlist.append(fobj[sciext])
+ # determine whether distortion is the same
+ current_distname = hdrlet[0].header['distname']
+ same_dist = True
+ if current_distname != fobj[0].header['distname']:
+ same_dist = False
+ if not archive and not force:
+ if close_fobj:
+ fobj.close()
+ message = """
+ Headerlet does not have the same distortion as image!
+ Set "archive"=True to save old distortion model, or
+ set "force"=True to overwrite old model with new.
+ """
+ logger.critical(message)
+ raise ValueError
+
+ # check whether primary WCS has been archived already
+ # Use information from first 'SCI' extension
+ priwcs_name = None
+
+ scihdr = extlist[0].header
+ sci_wcsnames = altwcs.wcsnames(scihdr).values()
+ if 'hdrname' in scihdr:
+ priwcs_hdrname = scihdr['hdrname']
+ else:
+ if 'wcsname' in scihdr:
+ priwcs_hdrname = priwcs_name = scihdr['wcsname']
+ else:
+ if 'idctab' in scihdr:
+ priwcs_hdrname = ''.join(['IDC_',
+ utils.extract_rootname(scihdr['idctab'], suffix='_idc')])
+ else:
+ priwcs_hdrname = 'UNKNOWN'
+ priwcs_name = priwcs_hdrname
+ scihdr['WCSNAME'] = priwcs_name
+
+ priwcs_unique = verify_hdrname_is_unique(fobj, priwcs_hdrname)
+ if archive and priwcs_unique:
+ if priwcs_unique:
+ newhdrlet = create_headerlet(fobj, sciext=scihdr['extname'],
+ hdrname=priwcs_hdrname)
+ newhdrlet.attach_to_file(fobj)
+ #
+ # copy hdrlet as a primary
+ #
+ hdrlet.apply_as_primary(fobj, attach=False, archive=archive, force=force)
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ if close_fobj:
+ fobj.close()
+
+
+@with_logging
+def restore_all_with_distname(filename, distname, primary, archive=True,
+ sciext='SCI', logging=False, logmode='w'):
+ """
+ Restores all HeaderletHDUs with a given distortion model as alternate WCSs and a primary
+
+ Parameters
+ --------------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ distname: string
+ distortion model as represented by a DISTNAME keyword
+ primary: int or string or None
+ HeaderletHDU to be restored as primary
+ if int - a fits extension
+ if string - HDRNAME
+ if None - use first HeaderletHDU
+ archive: boolean (default True)
+ flag indicating if HeaderletHDUs should be created from the
+ primary and alternate WCSs in fname before restoring all matching
+ headerlet extensions
+ logging: boolean
+ enable file logging
+ logmode: 'a' or 'w'
+ """
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ hdrlet_ind = find_headerlet_HDUs(fobj, distname=distname)
+ if len(hdrlet_ind) == 0:
+ message = """
+ No Headerlet extensions found with
+
+ DISTNAME = %s in %s.
+
+ For a full list of DISTNAMEs found in all headerlet extensions:
+
+ get_headerlet_kw_names(fobj, kw='DISTNAME')
+ """ % (distname, fname)
+ if close_fobj:
+ fobj.close()
+ logger.critical(message)
+ raise ValueError
+
+ # Interpret 'primary' parameter input into extension number
+ if primary is None:
+ primary_ind = hdrlet_ind[0]
+ elif isinstance(primary, int):
+ primary_ind = primary
+ else:
+ primary_ind = None
+ for ind in hdrlet_ind:
+ if fobj[ind].header['hdrname'] == primary:
+ primary_ind = ind
+ break
+ if primary_ind is None:
+ if close_fobj:
+ fobj.close()
+ message = """
+ No Headerlet extensions found with DISTNAME = %s in %s.
+ """ % (primary, fname)
+ logger.critical(message)
+ raise ValueError
+ # Check to see whether 'primary' HeaderletHDU has same distname as user
+ # specified on input
+
+ # read headerlet from HeaderletHDU into memory
+ if hasattr(fobj[primary_ind], 'hdulist'):
+ primary_hdrlet = fobj[primary_ind].hdulist
+ else:
+ primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS
+ pri_distname = primary_hdrlet[0].header['distname']
+ if pri_distname != distname:
+ if close_fobj:
+ fobj.close()
+ message = """
+ Headerlet extension to be used as PRIMARY WCS
+ has "DISTNAME" = %s
+ "DISTNAME" = %s was specified on input.
+ All updated WCSs must have same DISTNAME. Quitting...'
+ """ % (pri_distname, distname)
+ logger.critical(message)
+ raise ValueError
+
+ # read in the names of the WCSs which the HeaderletHDUs will update
+ wnames = altwcs.wcsnames(fobj[sciext, 1].header)
+
+ # work out how many HeaderletHDUs will be used to update the WCSs
+ numhlt = len(hdrlet_ind)
+ hdrnames = get_headerlet_kw_names(fobj, kw='wcsname')
+
+ # read in headerletHDUs and update WCS keywords
+ for hlet in hdrlet_ind:
+ if fobj[hlet].header['distname'] == distname:
+ if hasattr(fobj[hlet], 'hdulist'):
+ hdrlet = fobj[hlet].hdulist
+ else:
+ hdrlet = fobj[hlet].headerlet # older convention in PyFITS
+ if hlet == primary_ind:
+ hdrlet.apply_as_primary(fobj, attach=False,
+ archive=archive, force=True)
+ else:
+ hdrlet.apply_as_alternate(fobj, attach=False,
+ wcsname=hdrlet[0].header['wcsname'])
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ if close_fobj:
+ fobj.close()
+
+
+@with_logging
+def archive_as_headerlet(filename, hdrname, sciext='SCI',
+ wcsname=None, wcskey=None, destim=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ logging=False, logmode='w'):
+ """
+ Save a WCS as a headerlet extension and write it out to a file.
+
+ This function will create a headerlet, attach it as an extension to the
+ science image (if it has not already been archived) then, optionally,
+ write out the headerlet to a separate headerlet file.
+
+ Either wcsname or wcskey must be provided, if both are given, they must match a valid WCS
+ Updates wcscorr if necessary.
+
+ Parameters
+ ----------
+ filename: string or HDUList
+ Either a filename or PyFITS HDUList object for the input science file
+ An input filename (str) will be expanded as necessary to interpret
+ any environmental variables included in the filename.
+ hdrname: string
+ Unique name for this headerlet, stored as HDRNAME keyword
+ sciext: string
+ name (EXTNAME) of extension that contains WCS to be saved
+ wcsname: string
+ name of WCS to be archived, if " ": stop
+ wcskey: one of A...Z or " " or "PRIMARY"
+ if " " or "PRIMARY" - archive the primary WCS
+ destim: string
+ DESTIM keyword
+ if NOne, use ROOTNAME or science file name
+ sipname: string or None (default)
+ Name of unique file where the polynomial distortion coefficients were
+ read from. If None, the behavior is:
+ The code looks for a keyword 'SIPNAME' in the science header
+ If not found, for HST it defaults to 'IDCTAB'
+ If there is no SIP model the value is 'NOMODEL'
+ If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
+ npolfile: string or None (default)
+ Name of a unique file where the non-polynomial distortion was stored.
+ If None:
+ The code looks for 'NPOLFILE' in science header.
+ If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
+ If npol model exists, it is set to 'UNKNOWN'
+ d2imfile: string
+ Name of a unique file where the detector to image correction was
+ stored. If None:
+ The code looks for 'D2IMFILE' in the science header.
+ If 'D2IMFILE' is not found and there is no d2im correction,
+ it is set to 'NOMODEL'
+ If d2im correction exists, but 'D2IMFILE' is missing from science
+ header, it is set to 'UNKNOWN'
+ author: string
+ Name of user who created the headerlet, added as 'AUTHOR' keyword
+ to headerlet PRIMARY header
+ descrip: string
+ Short description of the solution provided by the headerlet
+ This description will be added as the single 'DESCRIP' keyword
+ to the headerlet PRIMARY header
+ history: filename, string or list of strings
+ Long (possibly multi-line) description of the solution provided
+ by the headerlet. These comments will be added as 'HISTORY' cards
+ to the headerlet PRIMARY header
+ If filename is specified, it will format and attach all text from
+ that file as the history.
+ logging: boolean
+ enable file folling
+ logmode: 'w' or 'a'
+ log file open mode
+ """
+
+ fobj, fname, close_fobj = parse_filename(filename, mode='update')
+
+ if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
+ message = """
+ No valid WCS found found in %s.
+ A valid value for either "wcsname" or "wcskey"
+ needs to be specified.
+ """ % fname
+ if close_fobj:
+ fobj.close()
+ logger.critical(message)
+ raise ValueError
+
+ # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
+ if wcskey == 'PRIMARY':
+ wcskey = ' '
+ wcskey = wcskey.upper()
+
+ numhlt = countExtn(fobj, 'HDRLET')
+
+ if wcsname is None:
+ scihdr = fobj[sciext, 1].header
+ wcsname = scihdr['wcsname'+wcskey]
+
+ if hdrname in [None, ' ', '']:
+ hdrname = wcsname
+
+ # Check to see whether or not a HeaderletHDU with this hdrname already
+ # exists
+ hdrnames = get_headerlet_kw_names(fobj)
+ if hdrname not in hdrnames:
+ hdrletobj = create_headerlet(fobj, sciext=sciext,
+ wcsname=wcsname, wcskey=wcskey,
+ hdrname=hdrname,
+ sipname=sipname, npolfile=npolfile,
+ d2imfile=d2imfile, author=author,
+ descrip=descrip, history=history,
+ nmatch=nmatch, catalog=catalog,
+ logging=False)
+ hlt_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
+
+ if destim is not None:
+ hlt_hdu[0].header['destim'] = destim
+
+ fobj.append(hlt_hdu)
+
+ utils.updateNEXTENDKw(fobj)
+ fobj.flush()
+ else:
+ message = """
+ Headerlet with hdrname %s already archived for WCS %s
+ No new headerlet appended to %s .
+ """ % (hdrname, wcsname, fname)
+ logger.critical(message)
+
+ if close_fobj:
+ fobj.close()
+
+#### Headerlet Class definitions
+class Headerlet(fits.HDUList):
+ """
+ A Headerlet class
+ Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets
+ """
+
+ def __init__(self, hdus=[], file=None, logging=False, logmode='w'):
+ """
+ Parameters
+ ----------
+ hdus : list
+ List of HDUs to be used to create the headerlet object itself
+ file: string
+ File-like object from which HDUs should be read
+ logging: boolean
+ enable file logging
+ logmode: 'w' or 'a'
+ for internal use only, indicates whether the log file
+ should be open in attach or write mode
+ """
+ self.logging = logging
+ init_logging('class Headerlet', level=logging, mode=logmode)
+
+ super(Headerlet, self).__init__(hdus, file=file)
+
+ def init_attrs(self):
+ self.fname = self.filename()
+ self.hdrname = self[0].header["HDRNAME"]
+ self.wcsname = self[0].header["WCSNAME"]
+ self.upwcsver = self[0].header.get("UPWCSVER", "")
+ self.pywcsver = self[0].header.get("PYWCSVER", "")
+ self.destim = self[0].header["DESTIM"]
+ self.sipname = self[0].header["SIPNAME"]
+ self.idctab = self[0].header["IDCTAB"]
+ self.npolfile = self[0].header["NPOLFILE"]
+ self.d2imfile = self[0].header["D2IMFILE"]
+ self.distname = self[0].header["DISTNAME"]
+
+ try:
+ self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) #None instead of 1?
+ except (IndexError, KeyError):
+ self.vafactor = self[0].header.get("VAFACTOR", 1) #None instead of 1?
+ self.author = self[0].header["AUTHOR"]
+ self.descrip = self[0].header["DESCRIP"]
+
+ self.fit_kws = ['HDRNAME', 'NMATCH', 'CATALOG']
+ self.history = ''
+ # header['HISTORY'] returns an iterable of all HISTORY values
+ if 'HISTORY' in self[0].header:
+ for hist in self[0].header['HISTORY']:
+ self.history += hist + '\n'
+
+ self.d2imerr = 0
+ self.axiscorr = 1
+
+ # Overridden to support the Headerlet logging features
+ @classmethod
+ def fromfile(cls, fileobj, mode='readonly', memmap=False,
+ save_backup=False, logging=False, logmode='w', **kwargs):
+ hlet = super(cls, cls).fromfile(fileobj, mode, memmap, save_backup,
+ **kwargs)
+ if len(hlet) > 0:
+ hlet.init_attrs()
+ hlet.logging = logging
+ init_logging('class Headerlet', level=logging, mode=logmode)
+ return hlet
+
+ @classmethod
+ def fromstring(cls, data, **kwargs):
+ hlet = super(cls, cls).fromstring(data, **kwargs)
+ hlet.logging = logging
+ init_logging('class Headerlet', level=logging, mode=logmode)
+ return hlet
+
+ def apply_as_primary(self, fobj, attach=True, archive=True, force=False):
+ """
+ Copy this headerlet as a primary WCS to fobj
+
+ Parameters
+ ----------
+ fobj: string, HDUList
+ science file to which the headerlet should be applied
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ archive: boolean (default is True)
+ When the distortion model in the headerlet is the same as the
+ distortion model of the science file, this flag indicates if
+ the primary WCS should be saved as an alternate and a headerlet
+ extension.
+ When the distortion models do not match this flag indicates if
+ the current primary and alternate WCSs should be archived as
+ headerlet extensions and alternate WCS.
+ force: boolean (default is False)
+ When the distortion models of the headerlet and the primary do
+ not match, and archive is False this flag forces an update
+ of the primary
+ """
+ self.hverify()
+ fobj, fname, close_dest = parse_filename(fobj, mode='update')
+ if not self.verify_dest(fobj, fname):
+ if close_dest:
+ fobj.close()
+ raise ValueError("Destination name does not match headerlet"
+ "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+
+ # Check to see whether the distortion model in the destination
+ # matches the distortion model in the headerlet being applied
+
+ dname = self.get_destination_model(fobj)
+ dist_models_equal = self.equal_distmodel(dname)
+ if not dist_models_equal and not force:
+ raise ValueError("Distortion models do not match"
+ " To overwrite the distortion model, set force=True")
+
+ orig_hlt_hdu = None
+ numhlt = countExtn(fobj, 'HDRLET')
+ hdrlet_extnames = get_headerlet_kw_names(fobj)
+
+ # Insure that WCSCORR table has been created with all original
+ # WCS's recorded prior to adding the headerlet WCS
+ wcscorr.init_wcscorr(fobj)
+
+
+ ### start archive
+ # If archive has been specified
+ # regardless of whether or not the distortion models are equal...
+
+ numsip = countExtn(self, 'SIPWCS')
+ sciext_list = []
+ alt_hlethdu = []
+ for i in range(1, numsip+1):
+ sipheader = self[('SIPWCS', i)].header
+ sciext_list.append((sipheader['TG_ENAME'], sipheader['TG_EVER']))
+ target_ext = sciext_list[0]
+ if archive:
+ if 'wcsname' in fobj[target_ext].header:
+ hdrname = fobj[target_ext].header['WCSNAME']
+ wcsname = hdrname
+ else:
+ hdrname = fobj[0].header['ROOTNAME'] + '_orig'
+ wcsname = None
+ if hdrname not in hdrlet_extnames:
+ # - if WCS has not been saved, write out WCS as headerlet extension
+ # Create a headerlet for the original Primary WCS data in the file,
+ # create an HDU from the original headerlet, and append it to
+ # the file
+ orig_hlt = create_headerlet(fobj, sciext=sciext_list, #[target_ext],
+ wcsname=wcsname,
+ hdrname=hdrname,
+ logging=self.logging)
+ orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt)
+ numhlt += 1
+ orig_hlt_hdu.header['EXTVER'] = numhlt
+ logger.info("Created headerlet %s to be attached to file" % hdrname)
+ else:
+ logger.info("Headerlet with name %s is already attached" % hdrname)
+
+
+ if dist_models_equal:
+ # Use the WCSNAME to determine whether or not to archive
+ # Primary WCS as altwcs
+ # wcsname = hwcs.wcs.name
+ scihdr = fobj[target_ext].header
+ if 'hdrname' in scihdr:
+ priwcs_name = scihdr['hdrname']
+ else:
+ if 'wcsname' in scihdr:
+ priwcs_name = scihdr['wcsname']
+ else:
+ if 'idctab' in scihdr:
+ priwcs_name = ''.join(['IDC_',
+ utils.extract_rootname(scihdr['idctab'],
+ suffix='_idc')])
+ else:
+ priwcs_name = 'UNKNOWN'
+ nextkey = altwcs.next_wcskey(fobj, ext=target_ext)
+ altwcs.archiveWCS(fobj, ext=sciext_list, wcskey=nextkey,
+ wcsname=priwcs_name)
+ else:
+
+ for hname in altwcs.wcsnames(fobj, ext=target_ext).values():
+ if hname != 'OPUS' and hname not in hdrlet_extnames:
+ # get HeaderletHDU for alternate WCS as well
+ alt_hlet = create_headerlet(fobj, sciext=sciext_list,
+ wcsname=hname, wcskey=wcskey,
+ hdrname=hname, sipname=None,
+ npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ logging=self.logging)
+ numhlt += 1
+ alt_hlet_hdu = HeaderletHDU.fromheaderlet(alt_hlet)
+ alt_hlet_hdu.header['EXTVER'] = numhlt
+ alt_hlethdu.append(alt_hlet_hdu)
+ hdrlet_extnames.append(hname)
+
+ self._del_dest_WCS_ext(fobj)
+ for i in range(1, numsip+1):
+ target_ext = sciext_list[i-1]
+ self._del_dest_WCS(fobj, target_ext)
+ sipwcs = HSTWCS(self, ('SIPWCS', i))
+ idckw = sipwcs._idc2hdr()
+ priwcs = sipwcs.to_fits(relax=True)
+ numnpol = 1
+ numd2im = 1
+ if sipwcs.wcs.has_cd():
+ priwcs[0].header = altwcs.pc2cd(priwcs[0].header)
+ priwcs[0].header.extend(idckw)
+ if 'crder1' in sipheader:
+ for card in sipheader['crder*'].cards:
+ priwcs[0].header.set(card.keyword, card.value, card.comment,
+ after='WCSNAME')
+ # Update WCS with HDRNAME as well
+
+ for kw in ['SIMPLE', 'BITPIX', 'NAXIS', 'EXTEND']:
+ try:
+ priwcs[0].header.remove(kw)
+ except ValueError:
+ pass
+
+ priwcs[0].header.set('WCSNAME', self[0].header['WCSNAME'], "")
+ priwcs[0].header.set('WCSAXES', self[('SIPWCS', i)].header['WCSAXES'], "")
+ priwcs[0].header.set('HDRNAME', self[0].header['HDRNAME'], "")
+ if sipwcs.det2im1 or sipwcs.det2im2:
+ try:
+ d2imerr = self[('SIPWCS', i)].header['D2IMERR*']
+ priwcs[0].header.extend(d2imerr)
+ except KeyError:
+ pass
+ try:
+ priwcs[0].header.append(self[('SIPWCS', i)].header.cards['D2IMEXT'])
+ except KeyError:
+ pass
+ if 'D2IM1.EXTVER' in priwcs[0].header:
+ priwcs[0].header['D2IM1.EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
+ priwcs[('D2IMARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
+ if 'D2IM2.EXTVER' in priwcs[0].header:
+ priwcs[0].header['D2IM2.EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
+ priwcs[('D2IMARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
+ # D2IM1 will NOT exist for WFPC2 data...
+ if 'D2IM1.EXTVER' in priwcs[0].header:
+ # only set number of D2IM extensions to 2 if D2IM1 exists
+ numd2im = 2
+
+ if sipwcs.cpdis1 or sipwcs.cpdis2:
+ try:
+ cperr = self[('SIPWCS', i)].header['CPERR*']
+ priwcs[0].header.extend(cperr)
+ except KeyError:
+ pass
+ try:
+ priwcs[0].header.append(self[('SIPWCS', i)].header.cards['NPOLEXT'])
+ except KeyError:
+ pass
+ if 'DP1.EXTVER' in priwcs[0].header:
+ priwcs[0].header['DP1.EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
+ priwcs[('WCSDVARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
+ if 'DP2.EXTVER' in priwcs[0].header:
+ priwcs[0].header['DP2.EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
+ priwcs[('WCSDVARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
+ numnpol = 2
+
+ fobj[target_ext].header.extend(priwcs[0].header)
+ if sipwcs.cpdis1:
+ whdu = priwcs[('WCSDVARR', (i-1)*numnpol+1)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['DP1.EXTVER'])
+ fobj.append(whdu)
+ if sipwcs.cpdis2:
+ whdu = priwcs[('WCSDVARR', i*numnpol)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['DP2.EXTVER'])
+ fobj.append(whdu)
+ if sipwcs.det2im1: #or sipwcs.det2im2:
+ whdu = priwcs[('D2IMARR', (i-1)*numd2im+1)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM1.EXTVER'])
+ fobj.append(whdu)
+ if sipwcs.det2im2:
+ whdu = priwcs[('D2IMARR', i*numd2im)].copy()
+ whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM2.EXTVER'])
+ fobj.append(whdu)
+
+ update_versions(self[0].header, fobj[0].header)
+ refs = update_ref_files(self[0].header, fobj[0].header)
+ # Update the WCSCORR table with new rows from the headerlet's WCSs
+ wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
+
+ # Append the original headerlet
+ if archive and orig_hlt_hdu:
+ fobj.append(orig_hlt_hdu)
+ # Append any alternate WCS Headerlets
+ if len(alt_hlethdu) > 0:
+ for ahdu in alt_hlethdu:
+ fobj.append(ahdu)
+ if attach:
+ # Finally, append an HDU for this headerlet
+ self.attach_to_file(fobj)
+ utils.updateNEXTENDKw(fobj)
+ if close_dest:
+ fobj.close()
+
+
+ def apply_as_alternate(self, fobj, attach=True, wcskey=None, wcsname=None):
+ """
+ Copy this headerlet as an alternate WCS to fobj
+
+ Parameters
+ ----------
+ fobj: string, HDUList
+ science file/HDUList to which the headerlet should be applied
+ attach: boolean
+ flag indicating if the headerlet should be attached as a
+ HeaderletHDU to fobj. If True checks that HDRNAME is unique
+ in the fobj and stops if not.
+ wcskey: string
+ Key value (A-Z, except O) for this alternate WCS
+ If None, the next available key will be used
+ wcsname: string
+ Name to be assigned to this alternate WCS
+ WCSNAME is a required keyword in a Headerlet but this allows the
+ user to change it as desired.
+
+ """
+ self.hverify()
+ fobj, fname, close_dest = parse_filename(fobj, mode='update')
+ if not self.verify_dest(fobj, fname):
+ if close_dest:
+ fobj.close()
+ raise ValueError("Destination name does not match headerlet"
+ "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+
+ # Verify whether this headerlet has the same distortion
+ #found in the image being updated
+ dname = self.get_destination_model(fobj)
+ dist_models_equal = self.equal_distmodel(dname)
+ if not dist_models_equal:
+ raise ValueError("Distortion models do not match \n"
+ "Headerlet: %s \n"
+ "Destination file: %s\n"
+ "attach_to_file() can be used to append this headerlet" %(self.distname, dname))
+
+ # Insure that WCSCORR table has been created with all original
+ # WCS's recorded prior to adding the headerlet WCS
+ wcscorr.init_wcscorr(fobj)
+
+ # determine value of WCSNAME to be used
+ if wcsname is not None:
+ wname = wcsname
+ else:
+ wname = self[0].header['WCSNAME']
+ tg_ename = self[('SIPWCS', 1)].header['TG_ENAME']
+ tg_ever = self[('SIPWCS', 1)].header['TG_EVER']
+ # determine what alternate WCS this headerlet will be assigned to
+ if wcskey is None:
+ wkey = altwcs.next_wcskey(fobj[(tg_ename, tg_ever)].header)
+ else:
+ wcskey = wcskey.upper()
+ available_keys = altwcs.available_wcskeys(fobj[(tg_ename, tg_ever)].header)
+ if wcskey in available_keys:
+ wkey = wcskey
+ else:
+ mess = "Observation %s already contains alternate WCS with key %s" % (fname, wcskey)
+ logger.critical(mess)
+ if close_dest:
+ fobj.close()
+ raise ValueError(mess)
+ numsip = countExtn(self, 'SIPWCS')
+ for idx in range(1, numsip + 1):
+ siphdr = self[('SIPWCS', idx)].header
+ tg_ext = (siphdr['TG_ENAME'], siphdr['TG_EVER'])
+
+ fhdr = fobj[tg_ext].header
+ hwcs = pywcs.WCS(siphdr, self)
+ hwcs_header = hwcs.to_header(key=wkey)
+ _idc2hdr(siphdr, fhdr, towkey=wkey)
+ if hwcs.wcs.has_cd():
+ hwcs_header = altwcs.pc2cd(hwcs_header, key=wkey)
+
+ fhdr.extend(hwcs_header)
+ fhdr['WCSNAME' + wkey] = wname
+ # also update with HDRNAME (a non-WCS-standard kw)
+ for kw in self.fit_kws:
+ #fhdr.insert(wind, pyfits.Card(kw + wkey,
+ # self[0].header[kw]))
+ fhdr.append(fits.Card(kw + wkey, self[0].header[kw]))
+ # Update the WCSCORR table with new rows from the headerlet's WCSs
+ wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
+
+ if attach:
+ self.attach_to_file(fobj)
+ utils.updateNEXTENDKw(fobj)
+
+ if close_dest:
+ fobj.close()
+
+ def attach_to_file(self, fobj, archive=False):
+ """
+ Attach Headerlet as an HeaderletHDU to a science file
+
+ Parameters
+ ----------
+ fobj: string, HDUList
+ science file/HDUList to which the headerlet should be applied
+ archive: string
+ Specifies whether or not to update WCSCORR table when attaching
+
+ Notes
+ -----
+ The algorithm used by this method:
+ - verify headerlet can be applied to this file (based on DESTIM)
+ - verify that HDRNAME is unique for this file
+ - attach as HeaderletHDU to fobj
+
+ """
+ self.hverify()
+ fobj, fname, close_dest = parse_filename(fobj, mode='update')
+ destver = self.verify_dest(fobj, fname)
+ hdrver = self.verify_hdrname(fobj)
+ if destver and hdrver:
+
+ numhlt = countExtn(fobj, 'HDRLET')
+ new_hlt = HeaderletHDU.fromheaderlet(self)
+ new_hlt.header['extver'] = numhlt + 1
+ fobj.append(new_hlt)
+ utils.updateNEXTENDKw(fobj)
+ else:
+ message = "Headerlet %s cannot be attached to" % (self.hdrname)
+ message += "observation %s" % (fname)
+ if not destver:
+ message += " * Image %s keyword ROOTNAME not equal to " % (fname)
+ message += " DESTIM = '%s'\n" % (self.destim)
+ if not hdrver:
+ message += " * Image %s already has headerlet " % (fname)
+ message += "with HDRNAME='%s'\n" % (self.hdrname)
+ logger.critical(message)
+ if close_dest:
+ fobj.close()
+
+ def info(self, columns=None, pad=2, maxwidth=None,
+ output=None, clobber=True, quiet=False):
+ """
+ Prints a summary of this headerlet
+ The summary includes:
+ HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
+
+ Parameters
+ ----------
+ columns: list
+ List of headerlet PRIMARY header keywords to report in summary
+ By default (set to None), it will use the default set of keywords
+ defined as the global list DEFAULT_SUMMARY_COLS
+ pad: int
+ Number of padding spaces to put between printed columns
+ [Default: 2]
+ maxwidth: int
+ Maximum column width(not counting padding) for any column in summary
+ By default (set to None), each column's full width will be used
+ output: string (optional)
+ Name of optional output file to record summary. This filename
+ can contain environment variables.
+ [Default: None]
+ clobber: bool
+ If True, will overwrite any previous output file of same name
+ quiet: bool
+ If True, will NOT report info to STDOUT
+
+ """
+ summary_cols, summary_dict = self.summary(columns=columns)
+ print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
+ idcol=None, output=output, clobber=clobber, quiet=quiet)
+
+ def summary(self, columns=None):
+ """
+ Returns a summary of this headerlet as a dictionary
+
+ The summary includes a summary of the distortion model as :
+ HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
+
+ Parameters
+ ----------
+ columns: list
+ List of headerlet PRIMARY header keywords to report in summary
+ By default(set to None), it will use the default set of keywords
+ defined as the global list DEFAULT_SUMMARY_COLS
+
+ Returns
+ -------
+ summary: dict
+ Dictionary of values for summary
+ """
+ if columns is None:
+ summary_cols = DEFAULT_SUMMARY_COLS
+ else:
+ summary_cols = columns
+
+ # Initialize summary dict based on requested columns
+ summary = {}
+ for kw in summary_cols:
+ summary[kw] = copy.deepcopy(COLUMN_DICT)
+
+ # Populate the summary with headerlet values
+ for kw in summary_cols:
+ if kw in self[0].header:
+ val = self[0].header[kw]
+ else:
+ val = 'INDEF'
+ summary[kw]['vals'].append(val)
+ summary[kw]['width'].append(max(len(val), len(kw)))
+
+ return summary_cols, summary
+
+ def hverify(self):
+ """
+ Verify the headerlet file is a valid fits file and has
+ the required Primary Header keywords
+ """
+ self.verify()
+ header = self[0].header
+ assert('DESTIM' in header and header['DESTIM'].strip())
+ assert('HDRNAME' in header and header['HDRNAME'].strip())
+ assert('UPWCSVER' in header)
+
+ def verify_hdrname(self, dest):
+ """
+ Verifies that the headerlet can be applied to the observation
+
+ Reports whether or not this file already has a headerlet with this
+ HDRNAME.
+ """
+ unique = verify_hdrname_is_unique(dest, self.hdrname)
+ logger.debug("verify_hdrname() returned %s"%unique)
+ return unique
+
+ def get_destination_model(self, dest):
+ """
+ Verifies that the headerlet can be applied to the observation
+
+ Determines whether or not the file specifies the same distortion
+ model/reference files.
+ """
+ destim_opened = False
+ if not isinstance(dest, fits.HDUList):
+ destim = fits.open(dest)
+ destim_opened = True
+ else:
+ destim = dest
+ dname = destim[0].header['DISTNAME'] if 'distname' in destim[0].header \
+ else self.build_distname(dest)
+ if destim_opened:
+ destim.close()
+ return dname
+
+ def equal_distmodel(self, dmodel):
+ if dmodel != self[0].header['DISTNAME']:
+ if self.logging:
+ message = """
+ Distortion model in headerlet not the same as destination model
+ Headerlet model : %s
+ Destination model: %s
+ """ % (self[0].header['DISTNAME'], dmodel)
+ logger.critical(message)
+ return False
+ else:
+ return True
+
+ def verify_dest(self, dest, fname):
+ """
+ verifies that the headerlet can be applied to the observation
+
+ DESTIM in the primary header of the headerlet must match ROOTNAME
+ of the science file (or the name of the destination file)
+ """
+ try:
+ if not isinstance(dest, fits.HDUList):
+ droot = fits.getval(dest, 'ROOTNAME')
+ else:
+ droot = dest[0].header['ROOTNAME']
+ except KeyError:
+ logger.debug("Keyword 'ROOTNAME' not found in destination file")
+ droot = dest.split('.fits')[0]
+ if droot == self.destim:
+ logger.debug("verify_destim() returned True")
+ return True
+ else:
+ logger.debug("verify_destim() returned False")
+ logger.critical("Destination name does not match headerlet. "
+ "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+ return False
+
+ def build_distname(self, dest):
+ """
+ Builds the DISTNAME for dest based on reference file names.
+ """
+
+ try:
+ npolfile = dest[0].header['NPOLFILE']
+ except KeyError:
+ npolfile = None
+ try:
+ d2imfile = dest[0].header['D2IMFILE']
+ except KeyError:
+ d2imfile = None
+
+ sipname, idctab = utils.build_sipname(dest, dest, None)
+ npolname, npolfile = utils.build_npolname(dest, npolfile)
+ d2imname, d2imfile = utils.build_d2imname(dest, d2imfile)
+ dname = utils.build_distname(sipname,npolname,d2imname)
+ return dname
+
+ def tofile(self, fname, destim=None, hdrname=None, clobber=False):
+ """
+ Write this headerlet to a file
+
+ Parameters
+ ----------
+ fname: string
+ file name
+ destim: string (optional)
+ provide a value for DESTIM keyword
+ hdrname: string (optional)
+ provide a value for HDRNAME keyword
+ clobber: boolean
+ a flag which allows to overwrte an existing file
+ """
+ if not destim or not hdrname:
+ self.hverify()
+ self.writeto(fname, clobber=clobber)
+
+ def _del_dest_WCS(self, dest, ext=None):
+ """
+ Delete the WCS of a science file extension
+ """
+
+ logger.info("Deleting all WCSs of file %s" % dest.filename())
+ numext = len(dest)
+
+ if ext:
+ fext = dest[ext]
+ self._remove_d2im(fext)
+ self._remove_sip(fext)
+ self._remove_lut(fext)
+ self._remove_primary_WCS(fext)
+ self._remove_idc_coeffs(fext)
+ self._remove_fit_values(fext)
+ else:
+ for idx in range(numext):
+ # Only delete WCS from extensions which may have WCS keywords
+ if ('XTENSION' in dest[idx].header and
+ dest[idx].header['XTENSION'] == 'IMAGE'):
+ self._remove_d2im(dest[idx])
+ self._remove_sip(dest[idx])
+ self._remove_lut(dest[idx])
+ self._remove_primary_WCS(dest[idx])
+ self._remove_idc_coeffs(dest[idx])
+ self._remove_fit_values(dest[idx])
+ self._remove_ref_files(dest[0])
+ """
+ if not ext:
+ self._remove_alt_WCS(dest, ext=range(numext))
+ else:
+ self._remove_alt_WCS(dest, ext=ext)
+ """
+ def _del_dest_WCS_ext(self, dest):
+ numwdvarr = countExtn(dest, 'WCSDVARR')
+ numd2im = countExtn(dest, 'D2IMARR')
+ if numwdvarr > 0:
+ for idx in range(1, numwdvarr + 1):
+ del dest[('WCSDVARR', idx)]
+ if numd2im > 0:
+ for idx in range(1, numd2im + 1):
+ del dest[('D2IMARR', idx)]
+
+ def _remove_ref_files(self, phdu):
+ """
+ phdu: Primary HDU
+ """
+ refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE', 'SIPNAME', 'DISTNAME']
+ for kw in refkw:
+ try:
+ del phdu.header[kw]
+ except KeyError:
+ pass
+
+ def _remove_fit_values(self, ext):
+ """
+ Remove the any existing astrometric fit values from a FITS extension
+ """
+
+ logger.debug("Removing astrometric fit values from (%s, %s)"%
+ (ext.name, ext.ver))
+ dkeys = altwcs.wcskeys(ext.header)
+ if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values
+ for fitkw in ['NMATCH', 'CATALOG']:
+ for k in dkeys:
+ fkw = (fitkw+k).rstrip()
+ if fkw in ext.header:
+ del ext.header[fkw]
+
+ def _remove_sip(self, ext):
+ """
+ Remove the SIP distortion of a FITS extension
+ """
+
+ logger.debug("Removing SIP distortion from (%s, %s)"
+ % (ext.name, ext.ver))
+ for prefix in ['A', 'B', 'AP', 'BP']:
+ try:
+ order = ext.header[prefix + '_ORDER']
+ del ext.header[prefix + '_ORDER']
+ except KeyError:
+ continue
+ for i in range(order + 1):
+ for j in range(order + 1):
+ key = prefix + '_%d_%d' % (i, j)
+ try:
+ del ext.header[key]
+ except KeyError:
+ pass
+ try:
+ del ext.header['IDCTAB']
+ except KeyError:
+ pass
+
+ def _remove_lut(self, ext):
+ """
+ Remove the Lookup Table distortion of a FITS extension
+ """
+
+ logger.debug("Removing LUT distortion from (%s, %s)"
+ % (ext.name, ext.ver))
+ try:
+ cpdis = ext.header['CPDIS*']
+ except KeyError:
+ return
+ try:
+ for c in range(1, len(cpdis) + 1):
+ del ext.header['DP%s*...' % c]
+ del ext.header[cpdis.cards[c - 1].keyword]
+ del ext.header['CPERR*']
+ del ext.header['NPOLFILE']
+ del ext.header['NPOLEXT']
+ except KeyError:
+ pass
+
+ def _remove_d2im(self, ext):
+ """
+ Remove the Detector to Image correction of a FITS extension
+ """
+
+ logger.debug("Removing D2IM correction from (%s, %s)"
+ % (ext.name, ext.ver))
+ try:
+ d2imdis = ext.header['D2IMDIS*']
+ except KeyError:
+ return
+ try:
+ for c in range(1, len(d2imdis) + 1):
+ del ext.header['D2IM%s*...' % c]
+ del ext.header[d2imdis.cards[c - 1].keyword]
+ del ext.header['D2IMERR*']
+ del ext.header['D2IMFILE']
+ del ext.header['D2IMEXT']
+ except KeyError:
+ pass
+
+ def _remove_alt_WCS(self, dest, ext):
+ """
+ Remove Alternate WCSs of a FITS extension.
+ A WCS with wcskey 'O' is never deleted.
+ """
+ dkeys = altwcs.wcskeys(dest[('SCI', 1)].header)
+ for val in ['O', '', ' ']:
+ if val in dkeys:
+ dkeys.remove(val) # Never delete WCS with wcskey='O'
+
+ logger.debug("Removing alternate WCSs with keys %s from %s"
+ % (dkeys, dest.filename()))
+ for k in dkeys:
+ altwcs.deleteWCS(dest, ext=ext, wcskey=k)
+
+ def _remove_primary_WCS(self, ext):
+ """
+ Remove the primary WCS of a FITS extension
+ """
+
+ logger.debug("Removing Primary WCS from (%s, %s)"
+ % (ext.name, ext.ver))
+ naxis = ext.header['NAXIS']
+ for key in basic_wcs:
+ for i in range(1, naxis + 1):
+ try:
+ del ext.header[key + str(i)]
+ except KeyError:
+ pass
+ try:
+ del ext.header['WCSAXES']
+ except KeyError:
+ pass
+ try:
+ del ext.header['WCSNAME']
+ except KeyError:
+ pass
+
+ def _remove_idc_coeffs(self, ext):
+ """
+ Remove IDC coefficients of a FITS extension
+ """
+
+ logger.debug("Removing IDC coefficient from (%s, %s)"
+ % (ext.name, ext.ver))
+ coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
+ for k in coeffs:
+ try:
+ del ext.header[k]
+ except KeyError:
+ pass
+
+@with_logging
+def _idc2hdr(fromhdr, tohdr, towkey=' '):
+ """
+ Copy the IDC (HST specific) keywords from one header to another
+
+ """
+ # save some of the idc coefficients
+ coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
+ for c in coeffs:
+ try:
+ tohdr[c+towkey] = fromhdr[c]
+ logger.debug("Copied %s to header")
+ except KeyError:
+ continue
+
+
+def get_extname_extver_list(fobj, sciext):
+ """
+ Create a list of (EXTNAME, EXTVER) tuples
+
+ Based on sciext keyword (see docstring for create_headerlet)
+ walk throughh the file and convert extensions in `sciext` to
+ valid (EXTNAME, EXTVER) tuples.
+ """
+ extlist = []
+ if isinstance(sciext, int):
+ if sciext == 0:
+ extname = 'PRIMARY'
+ extver = 1
+ else:
+ try:
+ extname = fobj[sciext].header['EXTNAME']
+ except KeyError:
+ extname = ""
+ try:
+ extver = fobj[sciext].header['EXTVER']
+ except KeyError:
+ extver = 1
+ extlist.append((extname, extver))
+ elif isinstance(sciext, str):
+ if sciext == 'PRIMARY':
+ extname = "PRIMARY"
+ extver = 1
+ extlist.append((extname, extver))
+ else:
+ for ext in fobj:
+ try:
+ extname = ext.header['EXTNAME']
+ except KeyError:
+ continue
+ if extname.upper() == sciext.upper():
+ try:
+ extver = ext.header['EXTVER']
+ except KeyError:
+ extver = 1
+ extlist.append((extname, extver))
+ elif isinstance(sciext, list):
+ if isinstance(sciext[0], int):
+ for i in sciext:
+ try:
+ extname = fobj[i].header['EXTNAME']
+ except KeyError:
+ if i == 0:
+ extname = "PRIMARY"
+ extver = 1
+ else:
+ extname = ""
+ try:
+ extver = fobj[i].header['EXTVER']
+ except KeyError:
+ extver = 1
+ extlist.append((extname, extver))
+ else:
+ extlist = sciext[:]
+ else:
+ errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
+ " a valid EXTNAME string, or an integer."
+ logger.critical(errstr)
+ raise ValueError
+ return extlist
+
+
+class HeaderletHDU(fits.hdu.nonstandard.FitsHDU):
+ """
+ A non-standard extension HDU for encapsulating Headerlets in a file. These
+ HDUs have an extension type of HDRLET and their EXTNAME is derived from the
+ Headerlet's HDRNAME.
+
+ The data itself is a FITS file embedded within the HDU data. The file name
+ is derived from the HDRNAME keyword, and should be in the form
+ `<HDRNAME>_hdr.fits`. If the COMPRESS keyword evaluates to `True`, the tar
+ file is compressed with gzip compression.
+
+ The structure of this HDU is the same as that proposed for the 'FITS'
+ extension type proposed here:
+ http://listmgr.cv.nrao.edu/pipermail/fitsbits/2002-April/thread.html
+
+ The Headerlet contained in the HDU's data can be accessed by the
+ `headerlet` attribute.
+ """
+
+ _extension = 'HDRLET'
+
+ @lazyproperty
+ def headerlet(self):
+ """Return the encapsulated headerlet as a Headerlet object.
+
+ This is similar to the hdulist property inherited from the FitsHDU
+ class, though the hdulist property returns a normal HDUList object.
+ """
+
+ return Headerlet(self.hdulist)
+
+ @classmethod
+ def fromheaderlet(cls, headerlet, compress=False):
+ """
+ Creates a new HeaderletHDU from a given Headerlet object.
+
+ Parameters
+ ----------
+ headerlet : `Headerlet`
+ A valid Headerlet object.
+
+ compress : bool, optional
+ Gzip compress the headerlet data.
+
+ Returns
+ -------
+ hlet : `HeaderletHDU`
+ A `HeaderletHDU` object for the given `Headerlet` that can be
+ attached as an extension to an existing `HDUList`.
+ """
+
+ # TODO: Perhaps check that the given object is in fact a valid
+ # Headerlet
+ hlet = cls.fromhdulist(headerlet, compress)
+
+ # Add some more headerlet-specific keywords to the header
+ phdu = headerlet[0]
+
+ if 'SIPNAME' in phdu.header:
+ sipname = phdu.header['SIPNAME']
+ else:
+ sipname = phdu.header['WCSNAME']
+
+ hlet.header['HDRNAME'] = (phdu.header['HDRNAME'],
+ phdu.header.comments['HDRNAME'])
+ hlet.header['DATE'] = (phdu.header['DATE'],
+ phdu.header.comments['DATE'])
+ hlet.header['SIPNAME'] = (sipname, 'SIP distortion model name')
+ hlet.header['WCSNAME'] = (phdu.header['WCSNAME'], 'WCS name')
+ hlet.header['DISTNAME'] = (phdu.header['DISTNAME'],
+ 'Distortion model name')
+ hlet.header['NPOLFILE'] = (phdu.header['NPOLFILE'],
+ phdu.header.comments['NPOLFILE'])
+ hlet.header['D2IMFILE'] = (phdu.header['D2IMFILE'],
+ phdu.header.comments['D2IMFILE'])
+ hlet.header['EXTNAME'] = (cls._extension, 'Extension name')
+
+ return hlet
+
+
+fits.register_hdu(HeaderletHDU)
diff --git a/stwcs/wcsutil/hstwcs.py b/stwcs/wcsutil/hstwcs.py
new file mode 100644
index 0000000..bfebcfc
--- /dev/null
+++ b/stwcs/wcsutil/hstwcs.py
@@ -0,0 +1,988 @@
+from __future__ import absolute_import, division, print_function # confidence high
+
+import os
+from astropy.wcs import WCS
+from astropy.io import fits
+from stwcs.distortion import models, coeff_converter
+import numpy as np
+from stsci.tools import fileutil
+
+from . import altwcs
+from . import getinput
+from . import mappings
+from . import instruments
+from .mappings import inst_mappings, ins_spec_kw
+from .mappings import basic_wcs
+
+__docformat__ = 'restructuredtext'
+
+#
+#### Utility functions copied from 'updatewcs.utils' to avoid circular imports
+#
+def extract_rootname(kwvalue,suffix=""):
+ """ Returns the rootname from a full reference filename
+
+ If a non-valid value (any of ['','N/A','NONE','INDEF',None]) is input,
+ simply return a string value of 'NONE'
+
+ This function will also replace any 'suffix' specified with a blank.
+ """
+ # check to see whether a valid kwvalue has been provided as input
+ if kwvalue.strip() in ['','N/A','NONE','INDEF',None]:
+ return 'NONE' # no valid value, so return 'NONE'
+
+ # for a valid kwvalue, parse out the rootname
+ # strip off any environment variable from input filename, if any are given
+ if '$' in kwvalue:
+ fullval = kwvalue[kwvalue.find('$')+1:]
+ else:
+ fullval = kwvalue
+ # Extract filename without path from kwvalue
+ fname = os.path.basename(fullval).strip()
+
+ # Now, rip out just the rootname from the full filename
+ rootname = fileutil.buildNewRootname(fname)
+
+ # Now, remove any known suffix from rootname
+ rootname = rootname.replace(suffix,'')
+ return rootname.strip()
+
+def build_default_wcsname(idctab):
+
+ idcname = extract_rootname(idctab,suffix='_idc')
+ wcsname = 'IDC_' + idcname
+ return wcsname
+
+
+class NoConvergence(Exception):
+ """
+ An error class used to report non-convergence and/or divergence of
+ numerical methods. It is used to report errors in the iterative solution
+ used by the :py:meth:`~stwcs.hstwcs.HSTWCS.all_world2pix`\ .
+
+ Attributes
+ ----------
+
+ best_solution : numpy.array
+ Best solution achieved by the method.
+
+ accuracy : float
+ Accuracy of the :py:attr:`best_solution`\ .
+
+ niter : int
+ Number of iterations performed by the numerical method to compute
+ :py:attr:`best_solution`\ .
+
+ divergent : None, numpy.array
+ Indices of the points in :py:attr:`best_solution` array for which the
+ solution appears to be divergent. If the solution does not diverge,
+ `divergent` will be set to `None`.
+
+ failed2converge : None, numpy.array
+ Indices of the points in :py:attr:`best_solution` array for which the
+ solution failed to converge within the specified maximum number
+ of iterations. If there are no non-converging poits (i.e., if
+ the required accuracy has been achieved for all points) then
+ `failed2converge` will be set to `None`.
+
+ """
+ def __init__(self, *args, **kwargs):
+ super(NoConvergence, self).__init__(*args)
+
+ self.best_solution = kwargs.pop('best_solution', None)
+ self.accuracy = kwargs.pop('accuracy', None)
+ self.niter = kwargs.pop('niter', None)
+ self.divergent = kwargs.pop('divergent', None)
+ self.failed2converge= kwargs.pop('failed2converge', None)
+
+
+#
+#### HSTWCS Class definition
+#
+class HSTWCS(WCS):
+
+ def __init__(self, fobj=None, ext=None, minerr=0.0, wcskey=" "):
+ """
+ Create a WCS object based on the instrument.
+
+ In addition to basic WCS keywords this class provides
+ instrument specific information needed in distortion computation.
+
+ Parameters
+ ----------
+ fobj : str or `astropy.io.fits.HDUList` object or None
+ file name, e.g j9irw4b1q_flt.fits
+ fully qualified filename[EXTNAME,EXTNUM], e.g. j9irw4b1q_flt.fits[sci,1]
+ `astropy.io.fits` file object, e.g fits.open('j9irw4b1q_flt.fits'), in which case the
+ user is responsible for closing the file object.
+ ext : int, tuple or None
+ extension number
+ if ext is tuple, it must be ("EXTNAME", EXTNUM), e.g. ("SCI", 2)
+ if ext is None, it is assumed the data is in the primary hdu
+ minerr : float
+ minimum value a distortion correction must have in order to be applied.
+ If CPERRja, CQERRja are smaller than minerr, the corersponding
+ distortion is not applied.
+ wcskey : str
+ A one character A-Z or " " used to retrieve and define an
+ alternate WCS description.
+ """
+
+ self.inst_kw = ins_spec_kw
+ self.minerr = minerr
+ self.wcskey = wcskey
+
+ if fobj is not None:
+ filename, hdr0, ehdr, phdu = getinput.parseSingleInput(f=fobj,
+ ext=ext)
+ self.filename = filename
+ instrument_name = hdr0.get('INSTRUME', 'DEFAULT')
+ if instrument_name == 'DEFAULT' or instrument_name not in list(inst_mappings.keys()):
+ #['IRAF/ARTDATA','',' ','N/A']:
+ self.instrument = 'DEFAULT'
+ else:
+ self.instrument = instrument_name
+ # Set the correct reference frame
+ refframe = determine_refframe(hdr0)
+ ehdr['RADESYS'] = refframe
+
+ WCS.__init__(self, ehdr, fobj=phdu, minerr=self.minerr,
+ key=self.wcskey)
+ if self.instrument == 'DEFAULT':
+ self.pc2cd()
+ # If input was a `astropy.io.fits.HDUList` object, it's the user's
+ # responsibility to close it, otherwise, it's closed here.
+ if not isinstance(fobj, fits.HDUList):
+ phdu.close()
+ self.setInstrSpecKw(hdr0, ehdr)
+ self.readIDCCoeffs(ehdr)
+ extname = ehdr.get('EXTNAME', '')
+ extnum = ehdr.get('EXTVER', None)
+ self.extname = (extname, extnum)
+ else:
+ # create a default HSTWCS object
+ self.instrument = 'DEFAULT'
+ WCS.__init__(self, minerr=self.minerr, key=self.wcskey)
+ self.pc2cd()
+ self.setInstrSpecKw()
+ self.setPscale()
+ self.setOrient()
+
+ @property
+ def naxis1(self):
+ return self._naxis1
+
+ @naxis1.setter
+ def naxis1(self, value):
+ self._naxis1 = value
+
+ @property
+ def naxis2(self):
+ return self._naxis2
+
+ @naxis2.setter
+ def naxis2(self, value):
+ self._naxis2 = value
+
+ def readIDCCoeffs(self, header):
+ """
+ Reads in first order IDCTAB coefficients if present in the header
+ """
+ coeffs = ['ocx10', 'ocx11', 'ocy10', 'ocy11', 'idcscale',
+ 'idcv2ref','idcv3ref', 'idctheta']
+ for c in coeffs:
+ self.__setattr__(c, header.get(c, None))
+
+ def setInstrSpecKw(self, prim_hdr=None, ext_hdr=None):
+ """
+ Populate the instrument specific attributes:
+
+ These can be in different headers but each instrument class has knowledge
+ of where to look for them.
+
+ Parameters
+ ----------
+ prim_hdr : `astropy.io.fits.Header`
+ primary header
+ ext_hdr : `astropy.io.fits.Header`
+ extension header
+
+ """
+ if self.instrument in list(inst_mappings.keys()):
+ inst_kl = inst_mappings[self.instrument]
+ inst_kl = instruments.__dict__[inst_kl]
+ insobj = inst_kl(prim_hdr, ext_hdr)
+
+ for key in self.inst_kw:
+ try:
+ self.__setattr__(key, insobj.__getattribute__(key))
+ except AttributeError:
+ # Some of the instrument's attributes are recorded in the primary header and
+ # were already set, (e.g. 'DETECTOR'), the code below is a check for that case.
+ if not self.__getattribute__(key):
+ raise
+ else:
+ pass
+
+ else:
+ raise KeyError("Unsupported instrument - %s" %self.instrument)
+
+ def setPscale(self):
+ """
+ Calculates the plate scale from the CD matrix
+ """
+ try:
+ cd11 = self.wcs.cd[0][0]
+ cd21 = self.wcs.cd[1][0]
+ self.pscale = np.sqrt(np.power(cd11,2)+np.power(cd21,2)) * 3600.
+ except AttributeError:
+ if self.wcs.has_cd():
+ print("This file has a PC matrix. You may want to convert it \n \
+ to a CD matrix, if reasonable, by running pc2.cd() method.\n \
+ The plate scale can be set then by calling setPscale() method.\n")
+ self.pscale = None
+
+ def setOrient(self):
+ """
+ Computes ORIENTAT from the CD matrix
+ """
+ try:
+ cd12 = self.wcs.cd[0][1]
+ cd22 = self.wcs.cd[1][1]
+ self.orientat = np.rad2deg(np.arctan2(cd12,cd22))
+ except AttributeError:
+ if self.wcs.has_cd():
+ print("This file has a PC matrix. You may want to convert it \n \
+ to a CD matrix, if reasonable, by running pc2.cd() method.\n \
+ The orientation can be set then by calling setOrient() method.\n")
+ self.pscale = None
+
+ def updatePscale(self, scale):
+ """
+ Updates the CD matrix with a new plate scale
+ """
+ self.wcs.cd = self.wcs.cd/self.pscale*scale
+ self.setPscale()
+
+ def readModel(self, update=False, header=None):
+ """
+ Reads distortion model from IDCTAB.
+
+ If IDCTAB is not found ('N/A', "", or not found on disk), then
+ if SIP coefficients and first order IDCTAB coefficients are present
+ in the header, restore the idcmodel from the header.
+ If not - assign None to self.idcmodel.
+
+ Parameters
+ ----------
+ header : `astropy.io.fits.Header`
+ fits extension header
+ update : bool (False)
+ if True - record the following IDCTAB quantities as header keywords:
+ CX10, CX11, CY10, CY11, IDCSCALE, IDCTHETA, IDCXREF, IDCYREF,
+ IDCV2REF, IDCV3REF
+ """
+ if self.idctab in [None, '', ' ','N/A']:
+ #Keyword idctab is not present in header - check for sip coefficients
+ if header is not None and 'IDCSCALE' in header:
+ self._readModelFromHeader(header)
+ else:
+ print("Distortion model is not available: IDCTAB=None\n")
+ self.idcmodel = None
+ elif not os.path.exists(fileutil.osfn(self.idctab)):
+ if header is not None and 'IDCSCALE' in header:
+ self._readModelFromHeader(header)
+ else:
+ print('Distortion model is not available: IDCTAB file %s not found\n' % self.idctab)
+ self.idcmodel = None
+ else:
+ self.readModelFromIDCTAB(header=header, update=update)
+
+ def _readModelFromHeader(self, header):
+ # Recreate idc model from SIP coefficients and header kw
+ print('Restoring IDC model from SIP coefficients\n')
+ model = models.GeometryModel()
+ cx, cy = coeff_converter.sip2idc(self)
+ model.cx = cx
+ model.cy = cy
+ model.name = "sip"
+ model.norder = header['A_ORDER']
+
+ refpix = {}
+ refpix['XREF'] = header['IDCXREF']
+ refpix['YREF'] = header['IDCYREF']
+ refpix['PSCALE'] = header['IDCSCALE']
+ refpix['V2REF'] = header['IDCV2REF']
+ refpix['V3REF'] = header['IDCV3REF']
+ refpix['THETA'] = header['IDCTHETA']
+ model.refpix = refpix
+
+ self.idcmodel = model
+
+
+ def readModelFromIDCTAB(self, header=None, update=False):
+ """
+ Read distortion model from idc table.
+
+ Parameters
+ ----------
+ header : `astropy.io.fits.Header`
+ fits extension header
+ update : booln (False)
+ if True - save teh following as header keywords:
+ CX10, CX11, CY10, CY11, IDCSCALE, IDCTHETA, IDCXREF, IDCYREF,
+ IDCV2REF, IDCV3REF
+
+ """
+ if self.date_obs == None:
+ print('date_obs not available\n')
+ self.idcmodel = None
+ return
+ if self.filter1 == None and self.filter2 == None:
+ 'No filter information available\n'
+ self.idcmodel = None
+ return
+
+ self.idcmodel = models.IDCModel(self.idctab,
+ chip=self.chip, direction='forward', date=self.date_obs,
+ filter1=self.filter1, filter2=self.filter2,
+ offtab=self.offtab, binned=self.binned)
+
+ if self.ltv1 != 0. or self.ltv2 != 0.:
+ self.resetLTV()
+
+ if update:
+ if header==None:
+ print('Update header with IDC model kw requested but header was not provided\n.')
+ else:
+ self._updatehdr(header)
+
+ def resetLTV(self):
+ """
+ Reset LTV values for polarizer data
+
+ The polarizer field is smaller than the detector field.
+ The distortion coefficients are defined for the entire
+ polarizer field and the LTV values are set as with subarray
+ data. This may also be true for other special filters.
+ This is a special case when the observation is considered
+ a subarray in terms of detector field but a full frame in
+ terms of distortion model.
+ To avoid shifting the distortion coefficients the LTV values
+ are reset to 0.
+ """
+ if self.naxis1 == self.idcmodel.refpix['XSIZE'] and \
+ self.naxis2 == self.idcmodel.refpix['YSIZE']:
+ self.ltv1 = 0.
+ self.ltv2 = 0.
+
+ def wcs2header(self, sip2hdr=False, idc2hdr=True, wcskey=None, relax=False):
+ """
+ Create a `astropy.io.fits.Header` object from WCS keywords.
+
+ If the original header had a CD matrix, return a CD matrix,
+ otherwise return a PC matrix.
+
+ Parameters
+ ----------
+ sip2hdr : bool
+ If True - include SIP coefficients
+ """
+
+ h = self.to_header(key=wcskey, relax=relax)
+ if not wcskey:
+ wcskey = self.wcs.alt
+ if self.wcs.has_cd():
+ h = altwcs.pc2cd(h, key=wcskey)
+
+ if 'wcsname' not in h:
+ if self.idctab is not None:
+ wname = build_default_wcsname(self.idctab)
+ else:
+ wname = 'DEFAULT'
+ h['wcsname{0}'.format(wcskey)] = wname
+
+ if idc2hdr:
+ for card in self._idc2hdr():
+ h[card.keyword + wcskey] = (card.value, card.comment)
+ try:
+ del h['RESTFRQ']
+ del h['RESTWAV']
+ except KeyError: pass
+
+ if sip2hdr and self.sip:
+ for card in self._sip2hdr('a'):
+ h[card.keyword] = (card.value, card.comment)
+ for card in self._sip2hdr('b'):
+ h[card.keyword] = (card.value, card.comment)
+
+ try:
+ ap = self.sip.ap
+ except AssertionError:
+ ap = None
+ try:
+ bp = self.sip.bp
+ except AssertionError:
+ bp = None
+
+ if ap:
+ for card in self._sip2hdr('ap'):
+ h[card.keyword] = (card.value, card.comment)
+ if bp:
+ for card in self._sip2hdr('bp'):
+ h[card.keyword] = (card.value, card.comment)
+ return h
+
+ def _sip2hdr(self, k):
+ """
+ Get a set of SIP coefficients in the form of an array
+ and turn them into a `astropy.io.fits.Cardlist`.
+ k - one of 'a', 'b', 'ap', 'bp'
+ """
+
+ cards = [] #fits.CardList()
+ korder = self.sip.__getattribute__(k+'_order')
+ cards.append(fits.Card(keyword=k.upper()+'_ORDER', value=korder))
+ coeffs = self.sip.__getattribute__(k)
+ ind = coeffs.nonzero()
+ for i in range(len(ind[0])):
+ card = fits.Card(keyword=k.upper()+'_'+str(ind[0][i])+'_'+str(ind[1][i]),
+ value=coeffs[ind[0][i], ind[1][i]])
+ cards.append(card)
+ return cards
+
+ def _idc2hdr(self):
+ # save some of the idc coefficients
+ coeffs = ['ocx10', 'ocx11', 'ocy10', 'ocy11', 'idcscale']
+ cards = [] #fits.CardList()
+ for c in coeffs:
+ try:
+ val = self.__getattribute__(c)
+ except AttributeError:
+ continue
+ if val:
+ cards.append(fits.Card(keyword=c, value=val))
+ return cards
+
+ def pc2cd(self):
+ if not self.wcs.has_pc():
+ self.wcs.pc = self.wcs.get_pc()
+ self.wcs.cd = self.wcs.pc * self.wcs.cdelt[1]
+
+ def all_world2pix(self, *args, **kwargs):
+ """
+ all_world2pix(*arg, accuracy=1.0e-4, maxiter=20, adaptive=False, \
+detect_divergence=True, quiet=False)
+
+ Performs full inverse transformation using iterative solution
+ on full forward transformation with complete distortion model.
+
+ Parameters
+ ----------
+ accuracy : float, optional (Default = 1.0e-4)
+ Required accuracy of the solution. Iteration terminates when the
+ correction to the solution found during the previous iteration
+ is smaller (in the sence of the L2 norm) than `accuracy`\ .
+
+ maxiter : int, optional (Default = 20)
+ Maximum number of iterations allowed to reach the solution.
+
+ adaptive : bool, optional (Default = False)
+ Specifies whether to adaptively select only points that did not
+ converge to a solution whithin the required accuracy for the
+ next iteration. Default is recommended for HST as well as most
+ other instruments.
+
+ .. note::
+ The :py:meth:`all_world2pix` uses a vectorized implementation
+ of the method of consecutive approximations (see `Notes`
+ section below) in which it iterates over *all* input poits
+ *regardless* until the required accuracy has been reached for
+ *all* input points. In some cases it may be possible that
+ *almost all* points have reached the required accuracy but
+ there are only a few of input data points left for which
+ additional iterations may be needed (this depends mostly on the
+ characteristics of the geometric distortions for a given
+ instrument). In this situation it may be
+ advantageous to set `adaptive` = `True`\ in which
+ case :py:meth:`all_world2pix` will continue iterating *only* over
+ the points that have not yet converged to the required
+ accuracy. However, for the HST's ACS/WFC detector, which has
+ the strongest distortions of all HST instruments, testing has
+ shown that enabling this option would lead to a about 10-30\%
+ penalty in computational time (depending on specifics of the
+ image, geometric distortions, and number of input points to be
+ converted). Therefore, for HST instruments,
+ it is recommended to set `adaptive` = `False`\ . The only
+ danger in getting this setting wrong will be a performance
+ penalty.
+
+ .. note::
+ When `detect_divergence` is `True`\ , :py:meth:`all_world2pix` \
+ will automatically switch to the adaptive algorithm once
+ divergence has been detected.
+
+ detect_divergence : bool, optional (Default = True)
+ Specifies whether to perform a more detailed analysis of the
+ convergence to a solution. Normally :py:meth:`all_world2pix`
+ may not achieve the required accuracy
+ if either the `tolerance` or `maxiter` arguments are too low.
+ However, it may happen that for some geometric distortions
+ the conditions of convergence for the the method of consecutive
+ approximations used by :py:meth:`all_world2pix` may not be
+ satisfied, in which case consecutive approximations to the
+ solution will diverge regardless of the `tolerance` or `maxiter`
+ settings.
+
+ When `detect_divergence` is `False`\ , these divergent points
+ will be detected as not having achieved the required accuracy
+ (without further details). In addition, if `adaptive` is `False`
+ then the algorithm will not know that the solution (for specific
+ points) is diverging and will continue iterating and trying to
+ "improve" diverging solutions. This may result in NaN or Inf
+ values in the return results (in addition to a performance
+ penalties). Even when `detect_divergence` is
+ `False`\ , :py:meth:`all_world2pix`\ , at the end of the iterative
+ process, will identify invalid results (NaN or Inf) as "diverging"
+ solutions and will raise :py:class:`NoConvergence` unless
+ the `quiet` parameter is set to `True`\ .
+
+ When `detect_divergence` is `True`\ , :py:meth:`all_world2pix` will
+ detect points for
+ which current correction to the coordinates is larger than
+ the correction applied during the previous iteration **if** the
+ requested accuracy **has not yet been achieved**\ . In this case,
+ if `adaptive` is `True`, these points will be excluded from
+ further iterations and if `adaptive`
+ is `False`\ , :py:meth:`all_world2pix` will automatically
+ switch to the adaptive algorithm.
+
+ .. note::
+ When accuracy has been achieved, small increases in
+ current corrections may be possible due to rounding errors
+ (when `adaptive` is `False`\ ) and such increases
+ will be ignored.
+
+ .. note::
+ Setting `detect_divergence` to `True` will incurr about 5-10\%
+ performance penalty (in our testing on ACS/WFC images).
+ Because the benefits of enabling this feature outweigh
+ the small performance penalty, it is recommended to set
+ `detect_divergence` to `True`\ , unless extensive testing
+ of the distortion models for images from specific
+ instruments show a good stability of the numerical method
+ for a wide range of coordinates (even outside the image
+ itself).
+
+ .. note::
+ Indices of the diverging inverse solutions will be reported
+ in the `divergent` attribute of the
+ raised :py:class:`NoConvergence` object.
+
+ quiet : bool, optional (Default = False)
+ Do not throw :py:class:`NoConvergence` exceptions when the method
+ does not converge to a solution with the required accuracy
+ within a specified number of maximum iterations set by `maxiter`
+ parameter. Instead, simply return the found solution.
+
+ Raises
+ ------
+ NoConvergence
+ The method does not converge to a
+ solution with the required accuracy within a specified number
+ of maximum iterations set by the `maxiter` parameter.
+
+ Notes
+ -----
+ Inputs can either be (RA, Dec, origin) or (RADec, origin) where RA
+ and Dec are 1-D arrays/lists of coordinates and RADec is an
+ array/list of pairs of coordinates.
+
+ Using the method of consecutive approximations we iterate starting
+ with the initial approximation, which is computed using the
+ non-distorion-aware :py:meth:`wcs_world2pix` (or equivalent).
+
+ The :py:meth:`all_world2pix` function uses a vectorized implementation
+ of the method of consecutive approximations and therefore it is
+ highly efficient (>30x) when *all* data points that need to be
+ converted from sky coordinates to image coordinates are passed at
+ *once*\ . Therefore, it is advisable, whenever possible, to pass
+ as input a long array of all points that need to be converted
+ to :py:meth:`all_world2pix` instead of calling :py:meth:`all_world2pix`
+ for each data point. Also see the note to the `adaptive` parameter.
+
+ Examples
+ --------
+ >>> import stwcs
+ >>> from astropy.io import fits
+ >>> hdulist = fits.open('j94f05bgq_flt.fits')
+ >>> w = stwcs.wcsutil.HSTWCS(hdulist, ext=('sci',1))
+ >>> hdulist.close()
+
+ >>> ra, dec = w.all_pix2world([1,2,3],[1,1,1],1); print(ra); print(dec)
+ [ 5.52645241 5.52649277 5.52653313]
+ [-72.05171776 -72.05171295 -72.05170814]
+ >>> radec = w.all_pix2world([[1,1],[2,1],[3,1]],1); print(radec)
+ [[ 5.52645241 -72.05171776]
+ [ 5.52649277 -72.05171295]
+ [ 5.52653313 -72.05170814]]
+ >>> x, y = w.all_world2pix(ra,dec,1)
+ >>> print(x)
+ [ 1.00000233 2.00000232 3.00000233]
+ >>> print(y)
+ [ 0.99999997 0.99999997 0.99999998]
+ >>> xy = w.all_world2pix(radec,1)
+ >>> print(xy)
+ [[ 1.00000233 0.99999997]
+ [ 2.00000232 0.99999997]
+ [ 3.00000233 0.99999998]]
+ >>> xy = w.all_world2pix(radec,1, maxiter=3, accuracy=1.0e-10, \
+quiet=False)
+ NoConvergence: 'HSTWCS.all_world2pix' failed to converge to requested \
+accuracy after 3 iterations.
+
+ >>>
+ Now try to use some diverging data:
+ >>> divradec = w.all_pix2world([[1.0,1.0],[10000.0,50000.0],\
+[3.0,1.0]],1); print(divradec)
+ [[ 5.52645241 -72.05171776]
+ [ 7.15979392 -70.81405561]
+ [ 5.52653313 -72.05170814]]
+
+ >>> try:
+ >>> xy = w.all_world2pix(divradec,1, maxiter=20, accuracy=1.0e-4, \
+adaptive=False, detect_divergence=True, quiet=False)
+ >>> except stwcs.wcsutil.hstwcs.NoConvergence as e:
+ >>> print("Indices of diverging points: {}".format(e.divergent))
+ >>> print("Indices of poorly converging points: {}".format(e.failed2converge))
+ >>> print("Best solution: {}".format(e.best_solution))
+ >>> print("Achieved accuracy: {}".format(e.accuracy))
+ >>> raise e
+ Indices of diverging points:
+ [1]
+ Indices of poorly converging points:
+ None
+ Best solution:
+ [[ 1.00006219e+00 9.99999288e-01]
+ [ -1.99440907e+06 1.44308548e+06]
+ [ 3.00006257e+00 9.99999316e-01]]
+ Achieved accuracy:
+ [[ 5.98554253e-05 6.79918148e-07]
+ [ 8.59514088e+11 6.61703754e+11]
+ [ 6.02334592e-05 6.59713067e-07]]
+ Traceback (innermost last):
+ File "<console>", line 8, in <module>
+ NoConvergence: 'HSTWCS.all_world2pix' failed to converge to the requested accuracy.
+ After 5 iterations, the solution is diverging at least for one input point.
+
+ >>> try:
+ >>> xy = w.all_world2pix(divradec,1, maxiter=20, accuracy=1.0e-4, \
+adaptive=False, detect_divergence=False, quiet=False)
+ >>> except stwcs.wcsutil.hstwcs.NoConvergence as e:
+ >>> print("Indices of diverging points: {}".format(e.divergent))
+ >>> print("Indices of poorly converging points: {}".format(e.failed2converge))
+ >>> print("Best solution: {}".format(e.best_solution))
+ >>> print("Achieved accuracy: {}".format(e.accuracy))
+ >>> raise e
+ Indices of diverging points:
+ [1]
+ Indices of poorly converging points:
+ None
+ Best solution:
+ [[ 1. 1.]
+ [ nan nan]
+ [ 3. 1.]]
+ Achieved accuracy:
+ [[ 0. 0.]
+ [ nan nan]
+ [ 0. 0.]]
+ Traceback (innermost last):
+ File "<console>", line 8, in <module>
+ NoConvergence: 'HSTWCS.all_world2pix' failed to converge to the requested accuracy.
+ After 20 iterations, the solution is diverging at least for one input point.
+
+ """
+ #####################################################################
+ ## PROCESS ARGUMENTS: ##
+ #####################################################################
+ nargs = len(args)
+
+ if nargs == 3:
+ try:
+ ra = np.asarray(args[0], dtype=np.float64)
+ dec = np.asarray(args[1], dtype=np.float64)
+ #assert( len(ra.shape) == 1 and len(dec.shape) == 1 )
+ origin = int(args[2])
+ vect1D = True
+ except:
+ raise TypeError("When providing three arguments, they must " \
+ "be (Ra, Dec, origin) where Ra and Dec are " \
+ "Nx1 vectors.")
+ elif nargs == 2:
+ try:
+ rd = np.asarray(args[0], dtype=np.float64)
+ #assert( rd.shape[1] == 2 )
+ ra = rd[:,0]
+ dec = rd[:,1]
+ origin = int(args[1])
+ vect1D = False
+ except:
+ raise TypeError("When providing two arguments, they must " \
+ "be (RaDec, origin) where RaDec is a Nx2 array.")
+ else:
+ raise TypeError("Expected 2 or 3 arguments, {:d} given." \
+ .format(nargs))
+
+ # process optional arguments:
+ accuracy = kwargs.pop('accuracy', 1.0e-4)
+ maxiter = kwargs.pop('maxiter', 20)
+ adaptive = kwargs.pop('adaptive', False)
+ detect_divergence = kwargs.pop('detect_divergence', True)
+ quiet = kwargs.pop('quiet', False)
+
+ #####################################################################
+ ## INITIALIZE ITERATIVE PROCESS: ##
+ #####################################################################
+ x0, y0 = self.wcs_world2pix(ra, dec, origin) # <-- initial approximation
+ # (WCS based only)
+
+ # see if an iterative solution is required (when any of the
+ # non-CD-matrix corrections are present). If not required
+ # return initial approximation (x0, y0).
+ if self.sip is None and \
+ self.cpdis1 is None and self.cpdis2 is None and \
+ self.det2im1 is None and self.det2im2 is None:
+ # no non-WCS corrections are detected - return
+ # initial approximation
+ if vect1D:
+ return [x0, y0]
+ else:
+ return np.dstack([x0,y0])[0]
+
+ x = x0.copy() # 0-order solution
+ y = y0.copy() # 0-order solution
+
+ # initial correction:
+ dx, dy = self.pix2foc(x, y, origin)
+ # If pix2foc does not apply all the required distortion
+ # corrections then replace the above line with:
+ #r0, d0 = self.all_pix2world(x, y, origin)
+ #dx, dy = self.wcs_world2pix(r0, d0, origin )
+ dx -= x0
+ dy -= y0
+
+ # update initial solution:
+ x -= dx
+ y -= dy
+
+ # norn (L2) squared of the correction:
+ dn2prev = dx**2+dy**2
+ dn2 = dn2prev
+
+ # prepare for iterative process
+ iterlist = list(range(1, maxiter+1))
+ accuracy2 = accuracy**2
+ ind = None
+ inddiv = None
+
+ npts = x.shape[0]
+
+ # turn off numpy runtime warnings for 'invalid' and 'over':
+ old_invalid = np.geterr()['invalid']
+ old_over = np.geterr()['over']
+ np.seterr(invalid = 'ignore', over = 'ignore')
+
+ #####################################################################
+ ## NON-ADAPTIVE ITERATIONS: ##
+ #####################################################################
+ if not adaptive:
+ for k in iterlist:
+ # check convergence:
+ if np.max(dn2) < accuracy2:
+ break
+
+ # find correction to the previous solution:
+ dx, dy = self.pix2foc(x, y, origin)
+ # If pix2foc does not apply all the required distortion
+ # corrections then replace the above line with:
+ #r0, d0 = self.all_pix2world(x, y, origin)
+ #dx, dy = self.wcs_world2pix(r0, d0, origin )
+ dx -= x0
+ dy -= y0
+
+ # update norn (L2) squared of the correction:
+ dn2 = dx**2+dy**2
+
+ # check for divergence (we do this in two stages
+ # to optimize performance for the most common
+ # scenario when succesive approximations converge):
+ if detect_divergence:
+ ind, = np.where(dn2 <= dn2prev)
+ if ind.shape[0] < npts:
+ inddiv, = np.where(
+ np.logical_and(dn2 > dn2prev, dn2 >= accuracy2))
+ if inddiv.shape[0] > 0:
+ # apply correction only to the converging points:
+ x[ind] -= dx[ind]
+ y[ind] -= dy[ind]
+ # switch to adaptive iterations:
+ ind, = np.where((dn2 >= accuracy2) & \
+ (dn2 <= dn2prev) & np.isfinite(dn2))
+ iterlist = iterlist[k:]
+ adaptive = True
+ break
+ #dn2prev[ind] = dn2[ind]
+ dn2prev = dn2
+
+ # apply correction:
+ x -= dx
+ y -= dy
+
+ #####################################################################
+ ## ADAPTIVE ITERATIONS: ##
+ #####################################################################
+ if adaptive:
+ if ind is None:
+ ind = np.asarray(list(range(npts)), dtype=np.int64)
+
+ for k in iterlist:
+ # check convergence:
+ if ind.shape[0] == 0:
+ break
+
+ # find correction to the previous solution:
+ dx[ind], dy[ind] = self.pix2foc(x[ind], y[ind], origin)
+ # If pix2foc does not apply all the required distortion
+ # corrections then replace the above line with:
+ #r0[ind], d0[ind] = self.all_pix2world(x[ind], y[ind], origin)
+ #dx[ind], dy[ind] = self.wcs_world2pix(r0[ind], d0[ind], origin)
+ dx[ind] -= x0[ind]
+ dy[ind] -= y0[ind]
+
+ # update norn (L2) squared of the correction:
+ dn2 = dx**2+dy**2
+
+ # update indices of elements that still need correction:
+ if detect_divergence:
+ ind, = np.where((dn2 >= accuracy2) & (dn2 <= dn2prev))
+ #ind = ind[np.where((dn2[ind] >= accuracy2) & (dn2[ind] <= dn2prev))]
+ dn2prev[ind] = dn2[ind]
+ else:
+ ind, = np.where(dn2 >= accuracy2)
+ #ind = ind[np.where(dn2[ind] >= accuracy2)]
+
+ # apply correction:
+ x[ind] -= dx[ind]
+ y[ind] -= dy[ind]
+
+ #####################################################################
+ ## FINAL DETECTION OF INVALID, DIVERGING, ##
+ ## AND FAILED-TO-CONVERGE POINTS ##
+ #####################################################################
+ # Identify diverging and/or invalid points:
+ invalid = (((~np.isfinite(y)) | (~np.isfinite(x)) | \
+ (~np.isfinite(dn2))) & \
+ (np.isfinite(ra)) & (np.isfinite(dec)))
+ # When detect_divergence==False, dn2prev is outdated (it is the
+ # norm^2 of the very first correction). Still better than nothing...
+ inddiv, = np.where(((dn2 >= accuracy2) & (dn2 > dn2prev)) | invalid)
+ if inddiv.shape[0] == 0:
+ inddiv = None
+ # identify points that did not converge within
+ # 'maxiter' iterations:
+ if k >= maxiter:
+ ind,= np.where((dn2 >= accuracy2) & (dn2 <= dn2prev) & (~invalid))
+ if ind.shape[0] == 0:
+ ind = None
+ else:
+ ind = None
+
+ #####################################################################
+ ## RAISE EXCEPTION IF DIVERGING OR TOO SLOWLY CONVERGING ##
+ ## DATA POINTS HAVE BEEN DETECTED: ##
+ #####################################################################
+ # raise exception if diverging or too slowly converging
+ if (ind is not None or inddiv is not None) and not quiet:
+ if vect1D:
+ sol = [x, y]
+ err = [np.abs(dx), np.abs(dy)]
+ else:
+ sol = np.dstack( [x, y] )[0]
+ err = np.dstack( [np.abs(dx), np.abs(dy)] )[0]
+
+ # restore previous numpy error settings:
+ np.seterr(invalid = old_invalid, over = old_over)
+
+ if inddiv is None:
+ raise NoConvergence("'HSTWCS.all_world2pix' failed to " \
+ "converge to the requested accuracy after {:d} " \
+ "iterations.".format(k), best_solution = sol, \
+ accuracy = err, niter = k, failed2converge = ind, \
+ divergent = None)
+ else:
+ raise NoConvergence("'HSTWCS.all_world2pix' failed to " \
+ "converge to the requested accuracy.{0:s}" \
+ "After {1:d} iterations, the solution is diverging " \
+ "at least for one input point." \
+ .format(os.linesep, k), best_solution = sol, \
+ accuracy = err, niter = k, failed2converge = ind, \
+ divergent = inddiv)
+
+ #####################################################################
+ ## FINALIZE AND FORMAT DATA FOR RETURN: ##
+ #####################################################################
+ # restore previous numpy error settings:
+ np.seterr(invalid = old_invalid, over = old_over)
+
+ if vect1D:
+ return [x, y]
+ else:
+ return np.dstack( [x, y] )[0]
+
+ def _updatehdr(self, ext_hdr):
+ #kw2add : OCX10, OCX11, OCY10, OCY11
+ # record the model in the header for use by pydrizzle
+ ext_hdr['OCX10'] = self.idcmodel.cx[1,0]
+ ext_hdr['OCX11'] = self.idcmodel.cx[1,1]
+ ext_hdr['OCY10'] = self.idcmodel.cy[1,0]
+ ext_hdr['OCY11'] = self.idcmodel.cy[1,1]
+ ext_hdr['IDCSCALE'] = self.idcmodel.refpix['PSCALE']
+ ext_hdr['IDCTHETA'] = self.idcmodel.refpix['THETA']
+ ext_hdr['IDCXREF'] = self.idcmodel.refpix['XREF']
+ ext_hdr['IDCYREF'] = self.idcmodel.refpix['YREF']
+ ext_hdr['IDCV2REF'] = self.idcmodel.refpix['V2REF']
+ ext_hdr['IDCV3REF'] = self.idcmodel.refpix['V3REF']
+
+ def printwcs(self):
+ """
+ Print the basic WCS keywords.
+ """
+ print('WCS Keywords\n')
+ print('CD_11 CD_12: %r %r' % (self.wcs.cd[0,0], self.wcs.cd[0,1]))
+ print('CD_21 CD_22: %r %r' % (self.wcs.cd[1,0], self.wcs.cd[1,1]))
+ print('CRVAL : %r %r' % (self.wcs.crval[0], self.wcs.crval[1]))
+ print('CRPIX : %r %r' % (self.wcs.crpix[0], self.wcs.crpix[1]))
+ print('NAXIS : %d %d' % (self.naxis1, self.naxis2))
+ print('Plate Scale : %r' % self.pscale)
+ print('ORIENTAT : %r' % self.orientat)
+
+
+def determine_refframe(phdr):
+ """
+ Determine the reference frame in standard FITS WCS terms.
+
+ Parameters
+ ----------
+ phdr : `astropy.io.fits.Header`
+ Primary Header of an HST observation
+
+ In HST images the reference frame is recorded in the primary extension as REFFRAME.
+ Values are "GSC1" which means FK5 or ICRS (for GSC2 observations).
+ """
+ try:
+ refframe = phdr['REFFRAME']
+ except KeyError:
+ refframe = " "
+ if refframe == "GSC1":
+ refframe = "FK5"
+ return refframe
diff --git a/stwcs/wcsutil/instruments.py b/stwcs/wcsutil/instruments.py
new file mode 100644
index 0000000..f662513
--- /dev/null
+++ b/stwcs/wcsutil/instruments.py
@@ -0,0 +1,320 @@
+from __future__ import absolute_import, division, print_function # confidence high
+
+from .mappings import ins_spec_kw
+
+class InstrWCS(object):
+ """
+ A base class for instrument specific keyword definition.
+ It prvides a default implementation (modeled by ACS) for
+ all set_kw methods.
+ """
+ def __init__(self, hdr0=None, hdr=None):
+ self.exthdr = hdr
+ self.primhdr = hdr0
+ self.set_ins_spec_kw()
+
+ def set_ins_spec_kw(self):
+ """
+ This method MUST call all set_kw methods.
+ There should be a set_kw method for all kw listed in
+ mappings.ins_spec_kw. TypeError handles the case when
+ fobj='DEFAULT'.
+ """
+ self.set_idctab()
+ self.set_offtab()
+ self.set_date_obs()
+ self.set_ra_targ()
+ self.set_dec_targ()
+ self.set_pav3()
+ self.set_detector()
+ self.set_filter1()
+ self.set_filter2()
+ self.set_vafactor()
+ self.set_naxis1()
+ self.set_naxis2()
+ self.set_ltv1()
+ self.set_ltv2()
+ self.set_binned()
+ self.set_chip()
+ self.set_parity()
+
+ def set_idctab(self):
+ try:
+ self.idctab = self.primhdr['IDCTAB']
+ except (KeyError, TypeError):
+ self.idctab = None
+
+ def set_offtab(self):
+ try:
+ self.offtab = self.primhdr['OFFTAB']
+ except (KeyError, TypeError):
+ self.offtab = None
+
+ def set_date_obs(self):
+ try:
+ self.date_obs = self.primhdr['DATE-OBS']
+ except (KeyError, TypeError):
+ self.date_obs = None
+
+ def set_ra_targ(self):
+ try:
+ self.ra_targ = self.primhdr['RA-TARG']
+ except (KeyError, TypeError):
+ self.ra_targ = None
+
+ def set_dec_targ(self):
+ try:
+ self.dec_targ = self.primhdr['DEC-TARG']
+ except (KeyError, TypeError):
+ self.dec_targ = None
+
+ def set_pav3(self):
+ try:
+ self.pav3 = self.primhdr['PA_V3']
+ except (KeyError, TypeError):
+ self.pav3 = None
+
+ def set_filter1(self):
+ try:
+ self.filter1 = self.primhdr['FILTER1']
+ except (KeyError, TypeError):
+ self.filter1 = None
+
+ def set_filter2(self):
+ try:
+ self.filter2 = self.primhdr['FILTER2']
+ except (KeyError, TypeError):
+ self.filter2 = None
+
+ def set_vafactor(self):
+ try:
+ self.vafactor = self.exthdr['VAFACTOR']
+ except (KeyError, TypeError):
+ self.vafactor = 1
+
+ def set_naxis1(self):
+ try:
+ self.naxis1 = self.exthdr['naxis1']
+ except (KeyError, TypeError):
+ try:
+ self.naxis1 = self.exthdr['npix1']
+ except (KeyError, TypeError):
+ self.naxis1 = None
+
+ def set_naxis2(self):
+ try:
+ self.naxis2 = self.exthdr['naxis2']
+ except (KeyError, TypeError):
+ try:
+ self.naxis2 = self.exthdr['npix2']
+ except (KeyError, TypeError):
+ self.naxis2 = None
+
+ def set_ltv1(self):
+ try:
+ self.ltv1 = self.exthdr['LTV1']
+ except (KeyError, TypeError):
+ self.ltv1 = 0.0
+
+ def set_ltv2(self):
+ try:
+ self.ltv2 = self.exthdr['LTV2']
+ except (KeyError, TypeError):
+ self.ltv2 = 0.0
+
+ def set_binned(self):
+ try:
+ self.binned = self.exthdr['BINAXIS1']
+ except (KeyError, TypeError):
+ self.binned = 1
+
+ def set_chip(self):
+ try:
+ self.chip = self.exthdr['CCDCHIP']
+ except (KeyError, TypeError):
+ self.chip = 1
+
+ def set_parity(self):
+ self.parity = [[1.0,0.0],[0.0,-1.0]]
+
+ def set_detector(self):
+ # each instrument has a different kw for detector and it can be
+ # in a different header, so this is to be handled by the instrument classes
+ self.detector = 'DEFAULT'
+
+class ACSWCS(InstrWCS):
+ """
+ get instrument specific kw
+ """
+
+ def __init__(self, hdr0, hdr):
+ self.primhdr = hdr0
+ self.exthdr = hdr
+ InstrWCS.__init__(self,hdr0, hdr)
+ self.set_ins_spec_kw()
+
+ def set_detector(self):
+ try:
+ self.detector = self.primhdr['DETECTOR']
+ except KeyError:
+ print('ERROR: Detector kw not found.\n')
+ raise
+
+ def set_parity(self):
+ parity = {'WFC':[[1.0,0.0],[0.0,-1.0]],
+ 'HRC':[[-1.0,0.0],[0.0,1.0]],
+ 'SBC':[[-1.0,0.0],[0.0,1.0]]}
+
+ if self.detector not in list(parity.keys()):
+ parity = InstrWCS.set_parity(self)
+ else:
+ self.parity = parity[self.detector]
+
+
+class WFPC2WCS(InstrWCS):
+
+
+ def __init__(self, hdr0, hdr):
+ self.primhdr = hdr0
+ self.exthdr = hdr
+ InstrWCS.__init__(self,hdr0, hdr)
+ self.set_ins_spec_kw()
+
+ def set_filter1(self):
+ self.filter1 = self.primhdr.get('FILTNAM1', None)
+ if self.filter1 == " " or self.filter1 == None:
+ self.filter1 = 'CLEAR1'
+
+ def set_filter2(self):
+ self.filter2 = self.primhdr.get('FILTNAM2', None)
+ if self.filter2 == " " or self.filter2 == None:
+ self.filter2 = 'CLEAR2'
+
+
+ def set_binned(self):
+ mode = self.primhdr.get('MODE', 1)
+ if mode == 'FULL':
+ self.binned = 1
+ elif mode == 'AREA':
+ self.binned = 2
+
+ def set_chip(self):
+ self.chip = self.exthdr.get('DETECTOR', 1)
+
+ def set_parity(self):
+ self.parity = [[-1.0,0.],[0.,1.0]]
+
+ def set_detector(self):
+ try:
+ self.detector = self.exthdr['DETECTOR']
+ except KeyError:
+ print('ERROR: Detector kw not found.\n')
+ raise
+
+
+class WFC3WCS(InstrWCS):
+ """
+ Create a WFC3 detector specific class
+ """
+
+ def __init__(self, hdr0, hdr):
+ self.primhdr = hdr0
+ self.exthdr = hdr
+ InstrWCS.__init__(self,hdr0, hdr)
+ self.set_ins_spec_kw()
+
+ def set_detector(self):
+ try:
+ self.detector = self.primhdr['DETECTOR']
+ except KeyError:
+ print('ERROR: Detector kw not found.\n')
+ raise
+
+ def set_filter1(self):
+ self.filter1 = self.primhdr.get('FILTER', None)
+ if self.filter1 == " " or self.filter1 == None:
+ self.filter1 = 'CLEAR'
+
+ def set_filter2(self):
+ #Nicmos idc tables do not allow 2 filters.
+ self.filter2 = 'CLEAR'
+
+ def set_parity(self):
+ parity = {'UVIS':[[-1.0,0.0],[0.0,1.0]],
+ 'IR':[[-1.0,0.0],[0.0,1.0]]}
+
+ if self.detector not in list(parity.keys()):
+ parity = InstrWCS.set_parity(self)
+ else:
+ self.parity = parity[self.detector]
+
+class NICMOSWCS(InstrWCS):
+ """
+ Create a NICMOS specific class
+ """
+
+ def __init__(self, hdr0, hdr):
+ self.primhdr = hdr0
+ self.exthdr = hdr
+ InstrWCS.__init__(self,hdr0, hdr)
+ self.set_ins_spec_kw()
+
+ def set_parity(self):
+ self.parity = [[-1.0,0.],[0.,1.0]]
+
+ def set_filter1(self):
+ self.filter1 = self.primhdr.get('FILTER', None)
+ if self.filter1 == " " or self.filter1 == None:
+ self.filter1 = 'CLEAR'
+
+ def set_filter2(self):
+ #Nicmos idc tables do not allow 2 filters.
+ self.filter2 = 'CLEAR'
+
+ def set_chip(self):
+ self.chip = self.detector
+
+ def set_detector(self):
+ try:
+ self.detector = self.primhdr['CAMERA']
+ except KeyError:
+ print('ERROR: Detector kw not found.\n')
+ raise
+
+class STISWCS(InstrWCS):
+ """
+ A STIS specific class
+ """
+
+ def __init__(self, hdr0, hdr):
+ self.primhdr = hdr0
+ self.exthdr = hdr
+ InstrWCS.__init__(self,hdr0, hdr)
+ self.set_ins_spec_kw()
+
+ def set_parity(self):
+ self.parity = [[-1.0,0.],[0.,1.0]]
+
+ def set_filter1(self):
+ self.filter1 = self.exthdr.get('OPT_ELEM', None)
+ if self.filter1 == " " or self.filter1 == None:
+ self.filter1 = 'CLEAR1'
+
+ def set_filter2(self):
+ self.filter2 = self.exthdr.get('FILTER', None)
+ if self.filter2 == " " or self.filter2 == None:
+ self.filter2 = 'CLEAR2'
+
+ def set_detector(self):
+ try:
+ self.detector = self.primhdr['DETECTOR']
+ except KeyError:
+ print('ERROR: Detector kw not found.\n')
+ raise
+
+ def set_date_obs(self):
+ try:
+ self.date_obs = self.exthdr['DATE-OBS']
+ except (KeyError, TypeError):
+ self.date_obs = None
+
diff --git a/stwcs/wcsutil/mappings.py b/stwcs/wcsutil/mappings.py
new file mode 100644
index 0000000..24038bf
--- /dev/null
+++ b/stwcs/wcsutil/mappings.py
@@ -0,0 +1,29 @@
+from __future__ import division # confidence high
+
+# This dictionary maps an instrument into an instrument class
+# The instrument class handles instrument specific keywords
+
+inst_mappings={'WFPC2': 'WFPC2WCS',
+ 'ACS': 'ACSWCS',
+ 'NICMOS': 'NICMOSWCS',
+ 'STIS': 'STISWCS',
+ 'WFC3': 'WFC3WCS',
+ 'DEFAULT': 'InstrWCS'
+ }
+
+
+# A list of instrument specific keywords
+# Every instrument class must have methods which define each of these
+# as class attributes.
+ins_spec_kw = [ 'idctab', 'offtab', 'date_obs', 'ra_targ', 'dec_targ', 'pav3', \
+ 'detector', 'ltv1', 'ltv2', 'parity', 'binned','vafactor', \
+ 'chip', 'naxis1', 'naxis2', 'filter1', 'filter2']
+
+# A list of keywords defined in the primary header.
+# The HSTWCS class sets this as attributes
+prim_hdr_kw = ['detector', 'offtab', 'idctab', 'date-obs',
+ 'pa_v3', 'ra_targ', 'dec_targ']
+
+# These are the keywords which are archived before MakeWCS is run
+basic_wcs = ['CD1_', 'CD2_', 'CRVAL', 'CTYPE', 'CRPIX', 'CTYPE', 'CDELT', 'CUNIT']
+
diff --git a/stwcs/wcsutil/mosaic.py b/stwcs/wcsutil/mosaic.py
new file mode 100644
index 0000000..9d2d0a3
--- /dev/null
+++ b/stwcs/wcsutil/mosaic.py
@@ -0,0 +1,183 @@
+from __future__ import division, print_function
+import numpy as np
+from matplotlib import pyplot as plt
+from astropy.io import fits
+import string
+
+from stsci.tools import parseinput, irafglob
+from stwcs.distortion import utils
+from stwcs import updatewcs, wcsutil
+from stwcs.wcsutil import altwcs
+
+def vmosaic(fnames, outwcs=None, ref_wcs=None, ext=None, extname=None, undistort=True, wkey='V', wname='VirtualMosaic', plot=False, clobber=False):
+ """
+ Create a virtual mosaic using the WCS of the input images.
+
+ Parameters
+ ----------
+ fnames: a string or a list
+ a string or a list of filenames, or a list of wcsutil.HSTWCS objects
+ outwcs: an HSTWCS object
+ if given, represents the output tangent plane
+ if None, the output WCS is calculated from the input observations.
+ ref_wcs: an HSTWCS object
+ if output wcs is not given, this will be used as a reference for the
+ calculation of the output WCS. If ref_wcs is None and outwcs is None,
+ then the first observation in th einput list is used as reference.
+ ext: an int, a tuple or a list
+ an int - represents a FITS extension, e.g. 0 is the primary HDU
+ a tuple - uses the notation (extname, extver), e.g. ('sci',1)
+ Can be a list of integers or tuples representing FITS extensions
+ extname: string
+ the value of the EXTNAME keyword for the extensions to be used in the mosaic
+ undistort: boolean (default: True)
+ undistort (or not) the output WCS
+ wkey: string
+ default: 'V'
+ one character A-Z to be used to record the virtual mosaic WCS as
+ an alternate WCS in the headers of the input files.
+ wname: string
+ default: 'VirtualMosaic
+ a string to be used as a WCSNAME value for the alternate WCS representign
+ the virtual mosaic
+ plot: boolean
+ if True and matplotlib is installed will make a plot of the tangent plane
+ and the location of the input observations.
+ clobber: boolean
+ This covers the case when an alternate WCS with the requested key
+ already exists in the header of the input files.
+ if clobber is True, it will be overwritten
+ if False, it will compute the new one but will not write it to the headers.
+
+ Notes
+ -----
+ The algorithm is:
+ 1. If output WCS is not given it is calculated from the input WCSes.
+ The first image is used as a reference, if no reference is given.
+ This represents the virtual mosaic WCS.
+ 2. For each input observation/chip, an HSTWCS object is created
+ and its footprint on the sky is calculated (using only the four corners).
+ 3. For each input observation the footprint is projected on the output
+ tangent plane and the virtual WCS is recorded in the header.
+ """
+ wcsobjects = readWCS(fnames, ext, extname)
+ if outwcs != None:
+ outwcs = outwcs.deepcopy()
+ else:
+ if ref_wcs != None:
+ outwcs = utils.output_wcs(wcsobjects, ref_wcs=ref_wcs, undistort=undistort)
+ else:
+ outwcs = utils.output_wcs(wcsobjects, undistort=undistort)
+ if plot:
+ outc=np.array([[0.,0], [outwcs._naxis1, 0],
+ [outwcs._naxis1, outwcs._naxis2],
+ [0, outwcs._naxis2], [0, 0]])
+ plt.plot(outc[:,0], outc[:,1])
+ for wobj in wcsobjects:
+ outcorners = outwcs.wcs_world2pix(wobj.calc_footprint(),1)
+ if plot:
+ plt.plot(outcorners[:,0], outcorners[:,1])
+ objwcs = outwcs.deepcopy()
+ objwcs.wcs.crpix = objwcs.wcs.crpix - (outcorners[0])
+ updatehdr(wobj.filename, objwcs,wkey=wkey, wcsname=wname, ext=wobj.extname, clobber=clobber)
+ return outwcs
+
+def updatehdr(fname, wcsobj, wkey, wcsname, ext=1, clobber=False):
+ hdr = fits.getheader(fname, ext=ext)
+ all_keys = list(string.ascii_uppercase)
+ if wkey.upper() not in all_keys:
+ raise KeyError("wkey must be one character: A-Z")
+ if wkey not in altwcs.available_wcskeys(hdr):
+ if not clobber:
+ raise ValueError("wkey %s is already in use. Use clobber=True to overwrite it or specify a different key." %wkey)
+ else:
+ altwcs.deleteWCS(fname, ext=ext, wcskey='V')
+ f = fits.open(fname, mode='update')
+
+ hwcs = wcs2header(wcsobj)
+ wcsnamekey = 'WCSNAME' + wkey
+ f[ext].header[wcsnamekey] = wcsname
+ for k in hwcs:
+ f[ext].header[k[:7]+wkey] = hwcs[k]
+
+ f.close()
+
+def wcs2header(wcsobj):
+
+ h = wcsobj.to_header()
+
+ if wcsobj.wcs.has_cd():
+ altwcs.pc2cd(h)
+ h['CTYPE1'] = 'RA---TAN'
+ h['CTYPE2'] = 'DEC--TAN'
+ norient = np.rad2deg(np.arctan2(h['CD1_2'],h['CD2_2']))
+ #okey = 'ORIENT%s' % wkey
+ okey = 'ORIENT'
+ h[okey] = norient
+ return h
+
+def readWCS(input, exts=None, extname=None):
+ if isinstance(input, str):
+ if input[0] == '@':
+ # input is an @ file
+ filelist = irafglob.irafglob(input)
+ else:
+ try:
+ filelist, output = parseinput.parseinput(input)
+ except IOError: raise
+ elif isinstance(input, list):
+ if isinstance(input[0], wcsutil.HSTWCS):
+ # a list of HSTWCS objects
+ return input
+ else:
+ filelist = input[:]
+ wcso = []
+ fomited = []
+ # figure out which FITS extension(s) to use
+ if exts == None and extname == None:
+ #Assume it's simple FITS and the data is in the primary HDU
+ for f in filelist:
+ try:
+ wcso = wcsutil.HSTWCS(f)
+ except AttributeError:
+ fomited.append(f)
+ continue
+ elif exts != None and validateExt(exts):
+ exts = [exts]
+ for f in filelist:
+ try:
+ wcso.extend([wcsutil.HSTWCS(f, ext=e) for e in exts])
+ except KeyError:
+ fomited.append(f)
+ continue
+ elif extname != None:
+ for f in filelist:
+ fobj = fits.open(f)
+ for i in range(len(fobj)):
+ try:
+ ename = fobj[i].header['EXTNAME']
+ except KeyError:
+ continue
+ if ename.lower() == extname.lower():
+ wcso.append(wcsutil.HSTWCS(f,ext=i))
+ else:
+ continue
+ fobj.close()
+ if fomited != []:
+ print("These files were skipped:")
+ for f in fomited:
+ print(f)
+ return wcso
+
+
+def validateExt(ext):
+ if not isinstance(ext, int) and not isinstance(ext, tuple) \
+ and not isinstance(ext, list):
+ print("Ext must be integer, tuple, a list of int extension numbers, \
+ or a list of tuples representing a fits extension, for example ('sci', 1).")
+ return False
+ else:
+ return True
+
+
+
diff --git a/stwcs/wcsutil/wcscorr.py b/stwcs/wcsutil/wcscorr.py
new file mode 100644
index 0000000..3f9b7d5
--- /dev/null
+++ b/stwcs/wcsutil/wcscorr.py
@@ -0,0 +1,668 @@
+from __future__ import absolute_import, division, print_function
+
+import os,copy
+import numpy as np
+from astropy.io import fits
+
+import stwcs
+from stwcs.wcsutil import altwcs
+from stwcs.updatewcs import utils
+from stsci.tools import fileutil
+from . import convertwcs
+
+DEFAULT_WCS_KEYS = ['CRVAL1','CRVAL2','CRPIX1','CRPIX2',
+ 'CD1_1','CD1_2','CD2_1','CD2_2',
+ 'CTYPE1','CTYPE2','ORIENTAT']
+DEFAULT_PRI_KEYS = ['HDRNAME','SIPNAME','NPOLNAME','D2IMNAME','DESCRIP']
+COL_FITSKW_DICT = {'RMS_RA':'sci.crder1','RMS_DEC':'sci.crder2',
+ 'NMatch':'sci.nmatch','Catalog':'sci.catalog'}
+
+###
+### WCSEXT table related keyword archive functions
+###
+def init_wcscorr(input, force=False):
+ """
+ This function will initialize the WCSCORR table if it is not already present,
+ and look for WCS keywords with a prefix of 'O' as the original OPUS
+ generated WCS as the initial row for the table or use the current WCS
+ keywords as initial row if no 'O' prefix keywords are found.
+
+ This function will NOT overwrite any rows already present.
+
+ This function works on all SCI extensions at one time.
+ """
+ # TODO: Create some sort of decorator or (for Python2.5) context for
+ # opening a FITS file and closing it when done, if necessary
+ if not isinstance(input, fits.HDUList):
+ # input must be a filename, so open as `astropy.io.fits.HDUList` object
+ fimg = fits.open(input, mode='update')
+ need_to_close = True
+ else:
+ fimg = input
+ need_to_close = False
+
+ # Do not try to generate a WCSCORR table for a simple FITS file
+ numsci = fileutil.countExtn(fimg)
+ if len(fimg) == 1 or numsci == 0:
+ return
+
+ enames = []
+ for e in fimg: enames.append(e.name)
+ if 'WCSCORR' in enames:
+ if not force:
+ return
+ else:
+ del fimg['wcscorr']
+ print('Initializing new WCSCORR table for ',fimg.filename())
+
+ used_wcskeys = altwcs.wcskeys(fimg['SCI', 1].header)
+
+ # define the primary columns of the WCSEXT table with initial rows for each
+ # SCI extension for the original OPUS solution
+ numwcs = len(used_wcskeys)
+ if numwcs == 0: numwcs = 1
+
+ # create new table with more rows than needed initially to make it easier to
+ # add new rows later
+ wcsext = create_wcscorr(descrip=True,numrows=numsci, padding=(numsci*numwcs) + numsci * 4)
+ # Assign the correct EXTNAME value to this table extension
+ wcsext.header['TROWS'] = (numsci * 2, 'Number of updated rows in table')
+ wcsext.header['EXTNAME'] = ('WCSCORR', 'Table with WCS Update history')
+ wcsext.header['EXTVER'] = 1
+
+ # define set of WCS keywords which need to be managed and copied to the table
+ wcs1 = stwcs.wcsutil.HSTWCS(fimg,ext=('SCI',1))
+ idc2header = True
+ if wcs1.idcscale is None:
+ idc2header = False
+ wcs_keywords = list(wcs1.wcs2header(idc2hdr=idc2header).keys())
+
+ prihdr = fimg[0].header
+ prihdr_keys = DEFAULT_PRI_KEYS
+ pri_funcs = {'SIPNAME':stwcs.updatewcs.utils.build_sipname,
+ 'NPOLNAME':stwcs.updatewcs.utils.build_npolname,
+ 'D2IMNAME':stwcs.updatewcs.utils.build_d2imname}
+
+ # Now copy original OPUS values into table
+ for extver in range(1, numsci + 1):
+ rowind = find_wcscorr_row(wcsext.data,
+ {'WCS_ID': 'OPUS', 'EXTVER': extver,
+ 'WCS_key':'O'})
+ # There should only EVER be a single row for each extension with OPUS values
+ rownum = np.where(rowind)[0][0]
+ #print 'Archiving OPUS WCS in row number ',rownum,' in WCSCORR table for SCI,',extver
+
+ hdr = fimg['SCI', extver].header
+ # define set of WCS keywords which need to be managed and copied to the table
+ if used_wcskeys is None:
+ used_wcskeys = altwcs.wcskeys(hdr)
+ # Check to see whether or not there is an OPUS alternate WCS present,
+ # if so, get its values directly, otherwise, archive the PRIMARY WCS
+ # as the OPUS values in the WCSCORR table
+ if 'O' not in used_wcskeys:
+ altwcs.archiveWCS(fimg,('SCI',extver),wcskey='O', wcsname='OPUS')
+ wkey = 'O'
+
+ wcs = stwcs.wcsutil.HSTWCS(fimg, ext=('SCI', extver), wcskey=wkey)
+ wcshdr = wcs.wcs2header(idc2hdr=idc2header)
+
+ if wcsext.data.field('CRVAL1')[rownum] != 0:
+ # If we find values for these keywords already in the table, do not
+ # overwrite them again
+ print('WCS keywords already updated...')
+ break
+ for key in wcs_keywords:
+ if key in wcsext.data.names:
+ wcsext.data.field(key)[rownum] = wcshdr[(key+wkey)[:8]]
+ # Now get any keywords from PRIMARY header needed for WCS updates
+ for key in prihdr_keys:
+ if key in prihdr:
+ val = prihdr[key]
+ else:
+ val = ''
+ wcsext.data.field(key)[rownum] = val
+
+ # Now that we have archived the OPUS alternate WCS, remove it from the list
+ # of used_wcskeys
+ if 'O' in used_wcskeys:
+ used_wcskeys.remove('O')
+
+ # Now copy remaining alternate WCSs into table
+ # TODO: Much of this appears to be redundant with update_wcscorr; consider
+ # merging them...
+ for uwkey in used_wcskeys:
+ for extver in range(1, numsci + 1):
+ hdr = fimg['SCI', extver].header
+ wcs = stwcs.wcsutil.HSTWCS(fimg, ext=('SCI', extver),
+ wcskey=uwkey)
+ wcshdr = wcs.wcs2header()
+ if 'WCSNAME' + uwkey not in wcshdr:
+ wcsid = utils.build_default_wcsname(fimg[0].header['idctab'])
+ else:
+ wcsid = wcshdr['WCSNAME' + uwkey]
+
+ # identify next empty row
+ rowind = find_wcscorr_row(wcsext.data,
+ selections={'wcs_id':['','0.0']})
+ rows = np.where(rowind)
+ if len(rows[0]) > 0:
+ rownum = np.where(rowind)[0][0]
+ else:
+ print('No available rows found for updating. ')
+
+ # Update selection columns for this row with relevant values
+ wcsext.data.field('WCS_ID')[rownum] = wcsid
+ wcsext.data.field('EXTVER')[rownum] = extver
+ wcsext.data.field('WCS_key')[rownum] = uwkey
+
+ # Look for standard WCS keyword values
+ for key in wcs_keywords:
+ if key in wcsext.data.names:
+ wcsext.data.field(key)[rownum] = wcshdr[key + uwkey]
+ # Now get any keywords from PRIMARY header needed for WCS updates
+ for key in prihdr_keys:
+ if key in pri_funcs:
+ val = pri_funcs[key](fimg)[0]
+ else:
+ if key in prihdr:
+ val = prihdr[key]
+ else:
+ val = ''
+ wcsext.data.field(key)[rownum] = val
+
+ # Append this table to the image FITS file
+ fimg.append(wcsext)
+ # force an update now
+ # set the verify flag to 'warn' so that it will always succeed, but still
+ # tell the user if PyFITS detects any problems with the file as a whole
+ utils.updateNEXTENDKw(fimg)
+
+ fimg.flush('warn')
+
+ if need_to_close:
+ fimg.close()
+
+
+def find_wcscorr_row(wcstab, selections):
+ """
+ Return an array of indices from the table (NOT HDU) 'wcstab' that matches the
+ selections specified by the user.
+
+ The row selection criteria must be specified as a dictionary with
+ column name as key and value(s) representing the valid desired row values.
+ For example, {'wcs_id':'OPUS','extver':2}.
+ """
+
+ mask = None
+ for i in selections:
+ icol = wcstab.field(i)
+ if isinstance(icol,np.chararray): icol = icol.rstrip()
+ selecti = selections[i]
+ if not isinstance(selecti,list):
+ if isinstance(selecti,str):
+ selecti = selecti.rstrip()
+ bmask = (icol == selecti)
+ if mask is None:
+ mask = bmask.copy()
+ else:
+ mask = np.logical_and(mask,bmask)
+ del bmask
+ else:
+ for si in selecti:
+ if isinstance(si,str):
+ si = si.rstrip()
+ bmask = (icol == si)
+ if mask is None:
+ mask = bmask.copy()
+ else:
+ mask = np.logical_or(mask,bmask)
+ del bmask
+
+ return mask
+
+
+def archive_wcs_file(image, wcs_id=None):
+ """
+ Update WCSCORR table with rows for each SCI extension to record the
+ newly updated WCS keyword values.
+ """
+
+ if not isinstance(image, fits.HDUList):
+ fimg = fits.open(image, mode='update')
+ close_image = True
+ else:
+ fimg = image
+ close_image = False
+
+ update_wcscorr(fimg, wcs_id=wcs_id)
+
+ if close_image:
+ fimg.close()
+
+
+def update_wcscorr(dest, source=None, extname='SCI', wcs_id=None, active=True):
+ """
+ Update WCSCORR table with a new row or rows for this extension header. It
+ copies the current set of WCS keywords as a new row of the table based on
+ keyed WCSs as per Paper I Multiple WCS standard).
+
+ Parameters
+ ----------
+ dest : HDUList
+ The HDU list whose WCSCORR table should be appended to (the WCSCORR HDU
+ must already exist)
+ source : HDUList, optional
+ The HDU list containing the extension from which to extract the WCS
+ keywords to add to the WCSCORR table. If None, the dest is also used
+ as the source.
+ extname : str, optional
+ The extension name from which to take new WCS keywords. If there are
+ multiple extensions with that name, rows are added for each extension
+ version.
+ wcs_id : str, optional
+ The name of the WCS to add, as in the WCSNAMEa keyword. If
+ unspecified, all the WCSs in the specified extensions are added.
+ active: bool, optional
+ When True, indicates that the update should reflect an update of the
+ active WCS information, not just appending the WCS to the file as a
+ headerlet
+ """
+ if not isinstance(dest, fits.HDUList):
+ dest = fits.open(dest,mode='update')
+ fname = dest.filename()
+
+ if source is None:
+ source = dest
+
+ if extname == 'PRIMARY':
+ return
+
+ numext = fileutil.countExtn(source, extname)
+ if numext == 0:
+ raise ValueError('No %s extensions found in the source HDU list.'
+ % extname)
+ # Initialize the WCSCORR table extension in dest if not already present
+ init_wcscorr(dest)
+ try:
+ dest.index_of('WCSCORR')
+ except KeyError:
+ return
+
+ # check to see whether or not this is an up-to-date table
+ # replace with newly initialized table with current format
+ old_table = dest['WCSCORR']
+ wcscorr_cols = ['WCS_ID','EXTVER', 'SIPNAME',
+ 'HDRNAME', 'NPOLNAME', 'D2IMNAME']
+
+ for colname in wcscorr_cols:
+ if colname not in old_table.data.columns.names:
+ print("WARNING: Replacing outdated WCSCORR table...")
+ outdated_table = old_table.copy()
+ del dest['WCSCORR']
+ init_wcscorr(dest)
+ old_table = dest['WCSCORR']
+ break
+
+ # Current implementation assumes the same WCS keywords are in each
+ # extension version; if this should not be assumed then this can be
+ # modified...
+ wcs_keys = altwcs.wcskeys(source[(extname, 1)].header)
+ wcs_keys = [kk for kk in wcs_keys if kk]
+ if ' ' not in wcs_keys: wcs_keys.append(' ') # Insure that primary WCS gets used
+ # apply logic for only updating WCSCORR table with specified keywords
+ # corresponding to the WCS with WCSNAME=wcs_id
+ if wcs_id is not None:
+ wnames = altwcs.wcsnames(source[(extname, 1)].header)
+ wkeys = []
+ for letter in wnames:
+ if wnames[letter] == wcs_id:
+ wkeys.append(letter)
+ if len(wkeys) > 1 and ' ' in wkeys:
+ wkeys.remove(' ')
+ wcs_keys = wkeys
+ wcshdr = stwcs.wcsutil.HSTWCS(source, ext=(extname, 1)).wcs2header()
+ wcs_keywords = list(wcshdr.keys())
+
+ if 'O' in wcs_keys:
+ wcs_keys.remove('O') # 'O' is reserved for original OPUS WCS
+
+ # create new table for hdr and populate it with the newly updated values
+ new_table = create_wcscorr(descrip=True,numrows=0, padding=len(wcs_keys)*numext)
+ prihdr = source[0].header
+
+ # Get headerlet related keywords here
+ sipname, idctab = utils.build_sipname(source, fname, "None")
+ npolname, npolfile = utils.build_npolname(source, None)
+ d2imname, d2imfile = utils.build_d2imname(source, None)
+ if 'hdrname' in prihdr:
+ hdrname = prihdr['hdrname']
+ else:
+ hdrname = ''
+
+ idx = -1
+ for wcs_key in wcs_keys:
+ for extver in range(1, numext + 1):
+ extn = (extname, extver)
+ if 'SIPWCS' in extname and not active:
+ tab_extver = 0 # Since it has not been added to the SCI header yet
+ else:
+ tab_extver = extver
+ hdr = source[extn].header
+ if 'WCSNAME'+wcs_key in hdr:
+ wcsname = hdr['WCSNAME' + wcs_key]
+ else:
+ wcsname = utils.build_default_wcsname(hdr['idctab'])
+
+ selection = {'WCS_ID': wcsname, 'EXTVER': tab_extver,
+ 'SIPNAME':sipname, 'HDRNAME': hdrname,
+ 'NPOLNAME': npolname, 'D2IMNAME':d2imname
+ }
+
+ # Ensure that an entry for this WCS is not already in the dest
+ # table; if so just skip it
+ rowind = find_wcscorr_row(old_table.data, selection)
+ if np.any(rowind):
+ continue
+
+ idx += 1
+
+ wcs = stwcs.wcsutil.HSTWCS(source, ext=extn, wcskey=wcs_key)
+ wcshdr = wcs.wcs2header()
+
+ # Update selection column values
+ for key, val in selection.items():
+ if key in new_table.data.names:
+ new_table.data.field(key)[idx] = val
+
+ for key in wcs_keywords:
+ if key in new_table.data.names:
+ new_table.data.field(key)[idx] = wcshdr[key + wcs_key]
+
+ for key in DEFAULT_PRI_KEYS:
+ if key in new_table.data.names and key in prihdr:
+ new_table.data.field(key)[idx] = prihdr[key]
+ # Now look for additional, non-WCS-keyword table column data
+ for key in COL_FITSKW_DICT:
+ fitkw = COL_FITSKW_DICT[key]
+ # Interpret any 'pri.hdrname' or
+ # 'sci.crpix1' formatted keyword names
+ if '.' in fitkw:
+ srchdr,fitkw = fitkw.split('.')
+ if 'pri' in srchdr.lower(): srchdr = prihdr
+ else: srchdr = source[extn].header
+ else:
+ srchdr = source[extn].header
+
+ if fitkw+wcs_key in srchdr:
+ new_table.data.field(key)[idx] = srchdr[fitkw+wcs_key]
+
+
+ # If idx was never incremented, no rows were added, so there's nothing else
+ # to do...
+ if idx < 0:
+ return
+
+ # Now, we need to merge this into the existing table
+ rowind = find_wcscorr_row(old_table.data, {'wcs_id':['','0.0']})
+ old_nrows = np.where(rowind)[0][0]
+ new_nrows = new_table.data.shape[0]
+
+ # check to see if there is room for the new row
+ if (old_nrows + new_nrows) > old_table.data.shape[0]-1:
+ pad_rows = 2 * new_nrows
+ # if not, create a new table with 'pad_rows' new empty rows
+ upd_table = fits.new_table(old_table.columns,header=old_table.header,
+ nrows=old_table.data.shape[0]+pad_rows)
+ else:
+ upd_table = old_table
+ pad_rows = 0
+ # Now, add
+ for name in old_table.columns.names:
+ if name in new_table.data.names:
+ # reset the default values to ones specific to the row definitions
+ for i in range(pad_rows):
+ upd_table.data.field(name)[old_nrows+i] = old_table.data.field(name)[-1]
+ # Now populate with values from new table
+ upd_table.data.field(name)[old_nrows:old_nrows + new_nrows] = \
+ new_table.data.field(name)
+ upd_table.header['TROWS'] = old_nrows + new_nrows
+
+ # replace old extension with newly updated table extension
+ dest['WCSCORR'] = upd_table
+
+
+def restore_file_from_wcscorr(image, id='OPUS', wcskey=''):
+ """ Copies the values of the WCS from the WCSCORR based on ID specified by user.
+ The default will be to restore the original OPUS-derived values to the Primary WCS.
+ If wcskey is specified, the WCS with that key will be updated instead.
+ """
+
+ if not isinstance(image, fits.HDUList):
+ fimg = fits.open(image, mode='update')
+ close_image = True
+ else:
+ fimg = image
+ close_image = False
+ numsci = fileutil.countExtn(fimg)
+ wcs_table = fimg['WCSCORR']
+ orig_rows = (wcs_table.data.field('WCS_ID') == 'OPUS')
+ # create an HSTWCS object to figure out what WCS keywords need to be updated
+ wcsobj = stwcs.wcsutil.HSTWCS(fimg,ext=('sci',1))
+ wcshdr = wcsobj.wcs2header()
+ for extn in range(1,numsci+1):
+ # find corresponding row from table
+ ext_rows = (wcs_table.data.field('EXTVER') == extn)
+ erow = np.where(np.logical_and(ext_rows,orig_rows))[0][0]
+ for key in wcshdr:
+ if key in wcs_table.data.names: # insure that keyword is column in table
+ tkey = key
+
+ if 'orient' in key.lower():
+ key = 'ORIENT'
+ if wcskey == '':
+ skey = key
+ else:
+ skey = key[:7]+wcskey
+ fimg['sci',extn].header[skey] = wcs_table.data.field(tkey)[erow]
+ for key in DEFAULT_PRI_KEYS:
+ if key in wcs_table.data.names:
+ if wcskey == '':
+ pkey = key
+ else:
+ pkey = key[:7]+wcskey
+ fimg[0].header[pkey] = wcs_table.data.field(key)[erow]
+
+ utils.updateNEXTENDKw(fimg)
+
+ # close the image now that the update has been completed.
+ if close_image:
+ fimg.close()
+
+
+def create_wcscorr(descrip=False, numrows=1, padding=0):
+ """
+ Return the basic definitions for a WCSCORR table.
+ The dtype definitions for the string columns are set to the maximum allowed so
+ that all new elements will have the same max size which will be automatically
+ truncated to this limit upon updating (if needed).
+
+ The table is initialized with rows corresponding to the OPUS solution
+ for all the 'SCI' extensions.
+ """
+
+ trows = numrows + padding
+ # define initialized arrays as placeholders for column data
+ # TODO: I'm certain there's an easier way to do this... for example, simply
+ # define the column names and formats, then create an empty array using
+ # them as a dtype, then create the new table from that array.
+ def_float64_zeros = np.array([0.0] * trows, dtype=np.float64)
+ def_float64_ones = def_float64_zeros + 1.0
+ def_float_col = {'format': 'D', 'array': def_float64_zeros.copy()}
+ def_float1_col = {'format': 'D', 'array':def_float64_ones.copy()}
+ def_str40_col = {'format': '40A',
+ 'array': np.array([''] * trows, dtype='S40')}
+ def_str24_col = {'format': '24A',
+ 'array': np.array([''] * trows, dtype='S24')}
+ def_int32_col = {'format': 'J',
+ 'array': np.array([0]*trows,dtype=np.int32)}
+
+ # If more columns are needed, simply add their definitions to this list
+ col_names = [('HDRNAME', def_str24_col), ('SIPNAME', def_str24_col),
+ ('NPOLNAME', def_str24_col), ('D2IMNAME', def_str24_col),
+ ('CRVAL1', def_float_col), ('CRVAL2', def_float_col),
+ ('CRPIX1', def_float_col), ('CRPIX2', def_float_col),
+ ('CD1_1', def_float_col), ('CD1_2', def_float_col),
+ ('CD2_1', def_float_col), ('CD2_2', def_float_col),
+ ('CTYPE1', def_str24_col), ('CTYPE2', def_str24_col),
+ ('ORIENTAT', def_float_col), ('PA_V3', def_float_col),
+ ('RMS_RA', def_float_col), ('RMS_Dec', def_float_col),
+ ('NMatch', def_int32_col), ('Catalog', def_str40_col)]
+
+ # Define selector columns
+ id_col = fits.Column(name='WCS_ID', format='40A',
+ array=np.array(['OPUS'] * numrows + [''] * padding,
+ dtype='S24'))
+ extver_col = fits.Column(name='EXTVER', format='I',
+ array=np.array(list(range(1, numrows + 1)),
+ dtype=np.int16))
+ wcskey_col = fits.Column(name='WCS_key', format='A',
+ array=np.array(['O'] * numrows + [''] * padding,
+ dtype='S'))
+ # create list of remaining columns to be added to table
+ col_list = [id_col, extver_col, wcskey_col] # start with selector columns
+
+ for c in col_names:
+ cdef = copy.deepcopy(c[1])
+ col_list.append(fits.Column(name=c[0], format=cdef['format'],
+ array=cdef['array']))
+
+ if descrip:
+ col_list.append(
+ fits.Column(name='DESCRIP', format='128A',
+ array=np.array(
+ ['Original WCS computed by OPUS'] * numrows,
+ dtype='S128')))
+
+ # Now create the new table from the column definitions
+ newtab = fits.new_table(fits.ColDefs(col_list), nrows=trows)
+ # The fact that setting .name is necessary should be considered a bug in
+ # pyfits.
+ # TODO: Make sure this is fixed in pyfits, then remove this
+ newtab.name = 'WCSCORR'
+
+ return newtab
+
+def delete_wcscorr_row(wcstab,selections=None,rows=None):
+ """
+ Sets all values in a specified row or set of rows to default values
+
+ This function will essentially erase the specified row from the table
+ without actually removing the row from the table. This avoids the problems
+ with trying to resize the number of rows in the table while preserving the
+ ability to update the table with new rows again without resizing the table.
+
+ Parameters
+ ----------
+ wcstab: object
+ PyFITS binTable object for WCSCORR table
+ selections: dict
+ Dictionary of wcscorr column names and values to be used to select
+ the row or set of rows to erase
+ rows: int, list
+ If specified, will specify what rows from the table to erase regardless
+ of the value of 'selections'
+ """
+
+ if selections is None and rows is None:
+ print('ERROR: Some row selection information must be provided!')
+ print(' Either a row numbers or "selections" must be provided.')
+ raise ValueError
+
+ delete_rows = None
+ if rows is None:
+ if 'wcs_id' in selections and selections['wcs_id'] == 'OPUS':
+ delete_rows = None
+ print('WARNING: OPUS WCS information can not be deleted from WCSCORR table.')
+ print(' This row will not be deleted!')
+ else:
+ rowind = find_wcscorr_row(wcstab, selections=selections)
+ delete_rows = np.where(rowind)[0].tolist()
+ else:
+ if not isinstance(rows,list):
+ rows = [rows]
+ delete_rows = rows
+
+ # Insure that rows pointing to OPUS WCS do not get deleted, even by accident
+ for row in delete_rows:
+ if wcstab['WCS_key'][row] == 'O' or wcstab['WCS_ID'][row] == 'OPUS':
+ del delete_rows[delete_rows.index(row)]
+
+ if delete_rows is None:
+ return
+
+ # identify next empty row
+ rowind = find_wcscorr_row(wcstab, selections={'wcs_id':['','0.0']})
+ last_blank_row = np.where(rowind)[0][-1]
+
+ # copy values from blank row into user-specified rows
+ for colname in wcstab.names:
+ wcstab[colname][delete_rows] = wcstab[colname][last_blank_row]
+
+def update_wcscorr_column(wcstab, column, values, selections=None, rows=None):
+ """
+ Update the values in 'column' with 'values' for selected rows
+
+ Parameters
+ ----------
+ wcstab: object
+ PyFITS binTable object for WCSCORR table
+ column: string
+ Name of table column with values that need to be updated
+ values: string, int, or list
+ Value or set of values to copy into the selected rows for the column
+ selections: dict
+ Dictionary of wcscorr column names and values to be used to select
+ the row or set of rows to erase
+ rows: int, list
+ If specified, will specify what rows from the table to erase regardless
+ of the value of 'selections'
+ """
+ if selections is None and rows is None:
+ print('ERROR: Some row selection information must be provided!')
+ print(' Either a row numbers or "selections" must be provided.')
+ raise ValueError
+
+ if not isinstance(values, list):
+ values = [values]
+
+ update_rows = None
+ if rows is None:
+ if 'wcs_id' in selections and selections['wcs_id'] == 'OPUS':
+ update_rows = None
+ print('WARNING: OPUS WCS information can not be deleted from WCSCORR table.')
+ print(' This row will not be deleted!')
+ else:
+ rowind = find_wcscorr_row(wcstab, selections=selections)
+ update_rows = np.where(rowind)[0].tolist()
+ else:
+ if not isinstance(rows,list):
+ rows = [rows]
+ update_rows = rows
+
+ if update_rows is None:
+ return
+
+ # Expand single input value to apply to all selected rows
+ if len(values) > 1 and len(values) < len(update_rows):
+ print('ERROR: Number of new values',len(values))
+ print(' does not match number of rows',len(update_rows),' to be updated!')
+ print(' Please enter either 1 value or the same number of values')
+ print(' as there are rows to be updated.')
+ print(' Table will not be updated...')
+ raise ValueError
+
+ if len(values) == 1 and len(values) < len(update_rows):
+ values = values * len(update_rows)
+ # copy values from blank row into user-specified rows
+ for row in update_rows:
+ wcstab[column][row] = values[row]
diff --git a/stwcs/wcsutil/wcsdiff.py b/stwcs/wcsutil/wcsdiff.py
new file mode 100644
index 0000000..cfc2d66
--- /dev/null
+++ b/stwcs/wcsutil/wcsdiff.py
@@ -0,0 +1,150 @@
+from __future__ import print_function
+from astropy import wcs as pywcs
+from collections import OrderedDict
+from astropy.io import fits
+from .headerlet import parse_filename
+import numpy as np
+
+def is_wcs_identical(scifile, file2, sciextlist, fextlist, scikey=" ",
+ file2key=" ", verbose=False):
+ """
+ Compares the WCS solution of 2 files.
+
+ Parameters
+ ----------
+ scifile: string
+ name of file1 (usually science file)
+ IRAF style extension syntax is accepted as well
+ for example scifile[1] or scifile[sci,1]
+ file2: string
+ name of second file (for example headerlet)
+ sciextlist - list
+ a list of int or tuple ('SCI', 1), extensions in the first file
+ fextlist - list
+ a list of int or tuple ('SIPWCS', 1), extensions in the second file
+ scikey: string
+ alternate WCS key in scifile
+ file2key: string
+ alternate WCS key in file2
+ verbose: boolean
+ True: print to stdout
+
+ Notes
+ -----
+ These can be 2 science observations or 2 headerlets
+ or a science observation and a headerlet. The two files
+ have the same WCS solution if the following are the same:
+
+ - rootname/destim
+ - primary WCS
+ - SIP coefficients
+ - NPOL distortion
+ - D2IM correction
+
+ """
+ result = True
+ diff = OrderedDict()
+ fobj, fname, close_file = parse_filename(file2)
+ sciobj, sciname, close_scifile = parse_filename(scifile)
+ diff['file_names'] = [scifile, file2]
+ if get_rootname(scifile) != get_rootname(file2):
+ #logger.info('Rootnames do not match.')
+ diff['rootname'] = ("%s: %s", "%s: %s") % (sciname, get_rootname(scifile), file2, get_rootname(file2))
+ result = False
+ for i, j in zip(sciextlist, fextlist):
+ w1 = pywcs.WCS(sciobj[i].header, sciobj, key=scikey)
+ w2 = pywcs.WCS(fobj[j].header, fobj, key=file2key)
+ diff['extension'] = [get_extname_extnum(sciobj[i]), get_extname_extnum(fobj[j])]
+ if not np.allclose(w1.wcs.crval, w2.wcs.crval, rtol=10**(-7)):
+ #logger.info('CRVALs do not match')
+ diff['CRVAL'] = w1.wcs.crval, w2.wcs.crval
+ result = False
+ if not np.allclose(w1.wcs.crpix, w2.wcs.crpix, rtol=10**(-7)):
+ #logger.info('CRPIX do not match')
+ diff ['CRPIX'] = w1.wcs.crpix, w2.wcs.crpix
+ result = False
+ if not np.allclose(w1.wcs.cd, w2.wcs.cd, rtol=10**(-7)):
+ #logger.info('CDs do not match')
+ diff ['CD'] = w1.wcs.cd, w2.wcs.cd
+ result = False
+ if not (np.array(w1.wcs.ctype) == np.array(w2.wcs.ctype)).all():
+ #logger.info('CTYPEs do not match')
+ diff ['CTYPE'] = w1.wcs.ctype, w2.wcs.ctype
+ result = False
+ if w1.sip or w2.sip:
+ if (w2.sip and not w1.sip) or (w1.sip and not w2.sip):
+ diff['sip'] = 'one sip extension is missing'
+ result = False
+ if not np.allclose(w1.sip.a, w2.sip.a, rtol=10**(-7)):
+ diff['SIP_A'] = 'SIP_A differ'
+ result = False
+ if not np.allclose(w1.sip.b, w2.sip.b, rtol=10**(-7)):
+ #logger.info('SIP coefficients do not match')
+ diff ['SIP_B'] = (w1.sip.b, w2.sip.b)
+ result = False
+ if w1.cpdis1 or w2.cpdis1:
+ if w1.cpdis1 and not w2.cpdis1 or w2.cpdis1 and not w1.cpdis1:
+ diff['CPDIS1'] = "CPDIS1 missing"
+ result=False
+ if w1.cpdis2 and not w2.cpdis2 or w2.cpdis2 and not w1.cpdis2:
+ diff['CPDIS2'] = "CPDIS2 missing"
+ result = False
+ if not np.allclose(w1.cpdis1.data, w2.cpdis1.data, rtol=10**(-7)):
+ #logger.info('NPOL distortions do not match')
+ diff ['CPDIS1_data'] = (w1.cpdis1.data, w2.cpdis1.data)
+ result = False
+ if not np.allclose(w1.cpdis2.data, w2.cpdis2.data, rtol=10**(-7)):
+ #logger.info('NPOL distortions do not match')
+ diff ['CPDIS2_data'] = (w1.cpdis2.data, w2.cpdis2.data)
+ result = False
+ if w1.det2im1 or w2.det2im1:
+ if w1.det2im1 and not w2.det2im1 or \
+ w2.det2im1 and not w1.det2im1:
+ diff['DET2IM'] = "Det2im1 missing"
+ result = False
+ if not np.allclose(w1.det2im1.data, w2.det2im1.data, rtol=10**(-7)):
+ #logger.info('Det2Im corrections do not match')
+ diff ['D2IM1_data'] = (w1.det2im1.data, w2.det2im1.data)
+ result = False
+ if w1.det2im2 or w2.det2im2:
+ if w1.det2im2 and not w2.det2im2 or \
+ w2.det2im2 and not w1.det2im2:
+ diff['DET2IM2'] = "Det2im2 missing"
+ result = False
+ if not np.allclose(w1.det2im2.data, w2.det2im2.data, rtol=10**(-7)):
+ #logger.info('Det2Im corrections do not match')
+ diff ['D2IM2_data'] = (w1.det2im2.data, w2.det2im2.data)
+ result = False
+ if not result and verbose:
+ for key in diff:
+ print(key, ":\t", diff[key][0], "\t", diff[key][1])
+ if close_file:
+ fobj.close()
+ if close_scifile:
+ sciobj.close()
+ return result, diff
+
+def get_rootname(fname):
+ """
+ Returns the value of ROOTNAME or DESTIM
+ """
+
+ hdr = fits.getheader(fname)
+ try:
+ rootname = hdr['ROOTNAME']
+ except KeyError:
+ try:
+ rootname = hdr['DESTIM']
+ except KeyError:
+ rootname = fname
+ return rootname
+
+def get_extname_extnum(ext):
+ """
+ Return (EXTNAME, EXTNUM) of a FITS extension
+ """
+ extname = ""
+ extnum=1
+ extname = ext.header.get('EXTNAME', extname)
+ extnum = ext.header.get('EXTVER', extnum)
+ return (extname, extnum)