diff options
Diffstat (limited to 'stwcs/wcsutil')
-rw-r--r-- | stwcs/wcsutil/__init__.py | 34 | ||||
-rw-r--r-- | stwcs/wcsutil/altwcs.py | 758 | ||||
-rw-r--r-- | stwcs/wcsutil/convertwcs.py | 118 | ||||
-rw-r--r-- | stwcs/wcsutil/getinput.py | 62 | ||||
-rw-r--r-- | stwcs/wcsutil/headerlet.py | 2754 | ||||
-rw-r--r-- | stwcs/wcsutil/hstwcs.py | 988 | ||||
-rw-r--r-- | stwcs/wcsutil/instruments.py | 320 | ||||
-rw-r--r-- | stwcs/wcsutil/mappings.py | 29 | ||||
-rw-r--r-- | stwcs/wcsutil/mosaic.py | 183 | ||||
-rw-r--r-- | stwcs/wcsutil/wcscorr.py | 668 | ||||
-rw-r--r-- | stwcs/wcsutil/wcsdiff.py | 150 |
11 files changed, 6064 insertions, 0 deletions
diff --git a/stwcs/wcsutil/__init__.py b/stwcs/wcsutil/__init__.py new file mode 100644 index 0000000..65280be --- /dev/null +++ b/stwcs/wcsutil/__init__.py @@ -0,0 +1,34 @@ +from __future__ import absolute_import, print_function # confidence high + +from .altwcs import * +from .hstwcs import HSTWCS + + +def help(): + doc = """ \ + 1. Using a `astropy.io.fits.HDUList` object and an extension number \n + Example:\n + from astropy.io improt fits + fobj = fits.open('some_file.fits')\n + w = wcsutil.HSTWCS(fobj, 3)\n\n + + 2. Create an HSTWCS object using a qualified file name. \n + Example:\n + w = wcsutil.HSTWCS('j9irw4b1q_flt.fits[sci,1]')\n\n + + 3. Create an HSTWCS object using a file name and an extension number. \n + Example:\n + w = wcsutil.HSTWCS('j9irw4b1q_flt.fits', ext=2)\n\n + + 4. Create an HSTWCS object from WCS with key 'O'.\n + Example:\n + + w = wcsutil.HSTWCS('j9irw4b1q_flt.fits', ext=2, wcskey='O')\n\n + + 5. Create a template HSTWCS object for a DEFAULT object.\n + Example:\n + w = wcsutil.HSTWCS(instrument='DEFAULT')\n\n + """ + + print('How to create an HSTWCS object:\n\n') + print(doc) diff --git a/stwcs/wcsutil/altwcs.py b/stwcs/wcsutil/altwcs.py new file mode 100644 index 0000000..aae1a9f --- /dev/null +++ b/stwcs/wcsutil/altwcs.py @@ -0,0 +1,758 @@ +from __future__ import division, print_function # confidence high +import os +import string + +import numpy as np +from astropy import wcs as pywcs +from astropy.io import fits +from stsci.tools import fileutil as fu + +altwcskw = ['WCSAXES', 'CRVAL', 'CRPIX', 'PC', 'CDELT', 'CD', 'CTYPE', 'CUNIT', + 'PV', 'PS'] +altwcskw_extra = ['LATPOLE','LONPOLE','RESTWAV','RESTFRQ'] + +# file operations +def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", reusekey=False): + """ + Copy the primary WCS to the header as an alternate WCS + with wcskey and name WCSNAME. It loops over all extensions in 'ext' + + Parameters + ---------- + fname : string or `astropy.io.fits.HDUList` + file name or a file object + ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1)) + fits extensions to work with + If a string is provided, it should specify the EXTNAME of extensions + with WCSs to be archived + wcskey : string "A"-"Z" or " " + if " ": get next available key if wcsname is also " " or try + to get a key from WCSNAME value + wcsname : string + Name of alternate WCS description + reusekey : boolean + if True - overwrites a WCS with the same key + + Examples + -------- + Copy the primary WCS of an in memory headrlet object to an + alternate WCS with key 'T' + + >>> hlet=headerlet.createHeaderlet('junk.fits', 'hdr1.fits') + >>> altwcs.wcskeys(hlet[1].header) + ['A'] + >>> altwcs.archiveWCS(hlet, ext=[('SIPWCS',1),('SIPWCS',2)], wcskey='T') + >>> altwcs.wcskeys(hlet[1].header) + ['A', 'T'] + + + See Also + -------- + wcsutil.restoreWCS: Copy an alternate WCS to the primary WCS + + """ + + if isinstance(fname, str): + f = fits.open(fname, mode='update') + else: + f = fname + + if not _parpasscheck(f, ext, wcskey, wcsname): + closefobj(fname, f) + raise ValueError("Input parameters problem") + + # Interpret input 'ext' value to get list of extensions to process + ext = _buildExtlist(f, ext) + + if not wcskey and not wcsname: + raise KeyError("Either wcskey or wcsname should be specified") + + if wcsname.strip() == "": + try: + wcsname = readAltWCS(f, ext[0], wcskey=" ")['WCSNAME'] + except KeyError: + pass + wcsext = ext[0] + if wcskey != " " and wcskey in wcskeys(f[wcsext].header) and not reusekey: + closefobj(fname, f) + raise KeyError("Wcskey %s is aready used. \ + Run archiveWCS() with reusekey=True to overwrite this alternate WCS. \ + Alternatively choose another wcskey with altwcs.available_wcskeys()." %wcskey) + elif wcskey == " ": + # wcsname exists, overwrite it if reuse is True or get the next key + if wcsname.strip() in wcsnames(f[wcsext].header).values(): + if reusekey: + # try getting the key from an existing WCS with WCSNAME + wkey = getKeyFromName(f[wcsext].header, wcsname) + wname = wcsname + if wkey == ' ': + wkey = next_wcskey(f[wcsext].header) + elif wkey is None: + closefobj(fname, f) + raise KeyError("Could not get a valid wcskey from wcsname %s" %wcsname) + else: + closefobj(fname, f) + raise KeyError("Wcsname %s is aready used. \ + Run archiveWCS() with reusekey=True to overwrite this alternate WCS. \ + Alternatively choose another wcskey with altwcs.available_wcskeys() or\ + choose another wcsname." %wcsname) + else: + wkey = next_wcskey(f[wcsext].header) + if wcsname.strip(): + wname = wcsname + else: + # determine which WCSNAME needs to be replicated in archived WCS + wnames = wcsnames(f[wcsext].header) + if 'O' in wnames: del wnames['O'] # we don't want OPUS/original + if len(wnames) > 0: + if ' ' in wnames: + wname = wnames[' '] + else: + akeys = string.uppercase + wname = "DEFAULT" + for key in akeys[-1::]: + if key in wnames: + wname = wnames + break + else: + wname = "DEFAULT" + else: + wkey = wcskey + wname = wcsname + + for e in ext: + hwcs = readAltWCS(f,e,wcskey=' ') + if hwcs is None: + continue + + wcsnamekey = 'WCSNAME' + wkey + f[e].header[wcsnamekey] = wname + + try: + old_wcsname=hwcs.pop('WCSNAME') + except: + pass + + for k in hwcs.keys(): + key = k[:7] + wkey + f[e].header[key] = hwcs[k] + closefobj(fname, f) + +def restore_from_to(f, fromext=None, toext=None, wcskey=" ", wcsname=" "): + """ + Copy an alternate WCS from one extension as a primary WCS of another extension + + Reads in a WCS defined with wcskey and saves it as the primary WCS. + Goes sequentially through the list of extensions in ext. + Alternatively uses 'fromext' and 'toext'. + + + Parameters + ---------- + f: string or `astropy.io.fits.HDUList` + a file name or a file object + fromext: string + extname from which to read in the alternate WCS, for example 'SCI' + toext: string or python list + extname or a list of extnames to which the WCS will be copied as + primary, for example ['SCI', 'ERR', 'DQ'] + wcskey: a charater + "A"-"Z" - Used for one of 26 alternate WCS definitions. + or " " - find a key from WCSNAMe value + wcsname: string (optional) + if given and wcskey is " ", will try to restore by WCSNAME value + + See Also + -------- + archiveWCS - copy the primary WCS as an alternate WCS + restoreWCS - Copy a WCS with key "WCSKEY" to the primary WCS + + """ + if isinstance(f, str): + fobj = fits.open(f, mode='update') + else: + fobj = f + + if not _parpasscheck(fobj, ext=None, wcskey=wcskey, fromext=fromext, toext=toext): + closefobj(f, fobj) + raise ValueError("Input parameters problem") + + # Interpret input 'ext' value to get list of extensions to process + #ext = _buildExtlist(fobj, ext) + + if isinstance(toext, str): + toext = [toext] + + # the case of an HDUList object in memory without an associated file + + #if fobj.filename() is not None: + # name = fobj.filename() + + simplefits = fu.isFits(fobj)[1] is 'simple' + if simplefits: + wcskeyext = 0 + else: + wcskeyext = 1 + + if wcskey == " ": + if wcsname.strip(): + wkey = getKeyFromName(fobj[wcskeyext].header, wcsname) + if not wkey: + closefobj(f, fobj) + raise KeyError("Could not get a key from wcsname %s ." % wcsname) + else: + if wcskey not in wcskeys(fobj, ext=wcskeyext): + print("Could not find alternate WCS with key %s in this file" % wcskey) + closefobj(f, fobj) + return + wkey = wcskey + + countext = fu.countExtn(fobj, fromext) + if not countext: + raise KeyError("File does not have extension with extname %s", fromext) + else: + for i in range(1, countext+1): + for toe in toext: + _restore(fobj, fromextnum=i, fromextnam=fromext, toextnum=i, toextnam=toe, ukey=wkey) + + if fobj.filename() is not None: + #fobj.writeto(name) + closefobj(f, fobj) + +def restoreWCS(f, ext, wcskey=" ", wcsname=" "): + """ + Copy a WCS with key "WCSKEY" to the primary WCS + + Reads in a WCS defined with wcskey and saves it as the primary WCS. + Goes sequentially through the list of extensions in ext. + Alternatively uses 'fromext' and 'toext'. + + + Parameters + ---------- + f : str or `astropy.io.fits.HDUList` + file name or a file object + ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1)) + fits extensions to work with + If a string is provided, it should specify the EXTNAME of extensions + with WCSs to be archived + wcskey : str + "A"-"Z" - Used for one of 26 alternate WCS definitions. + or " " - find a key from WCSNAMe value + wcsname : str + (optional) if given and wcskey is " ", will try to restore by WCSNAME value + + See Also + -------- + archiveWCS - copy the primary WCS as an alternate WCS + restore_from_to + + """ + if isinstance(f, str): + fobj = fits.open(f, mode='update') + else: + fobj = f + + if not _parpasscheck(fobj, ext=ext, wcskey=wcskey): + closefobj(f, fobj) + raise ValueError("Input parameters problem") + + # Interpret input 'ext' value to get list of extensions to process + ext = _buildExtlist(fobj, ext) + + + # the case of an HDUList object in memory without an associated file + + #if fobj.filename() is not None: + # name = fobj.filename() + + simplefits = fu.isFits(fobj)[1] is 'simple' + if simplefits: + wcskeyext = 0 + else: + wcskeyext = 1 + + if wcskey == " ": + if wcsname.strip(): + wkey = getKeyFromName(fobj[wcskeyext].header, wcsname) + if not wkey: + closefobj(f, fobj) + raise KeyError("Could not get a key from wcsname %s ." % wcsname) + else: + if wcskey not in wcskeys(fobj, ext=wcskeyext): + #print "Could not find alternate WCS with key %s in this file" % wcskey + closefobj(f, fobj) + return + wkey = wcskey + + for e in ext: + _restore(fobj, wkey, fromextnum=e, verbose=False) + + if fobj.filename() is not None: + #fobj.writeto(name) + closefobj(f, fobj) + +def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): + """ + Delete an alternate WCS defined with wcskey. + If wcskey is " " try to get a key from WCSNAME. + + Parameters + ---------- + fname : str or a `astropy.io.fits.HDUList` + ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1)) + fits extensions to work with + If a string is provided, it should specify the EXTNAME of extensions + with WCSs to be archived + wcskey : str + one of 'A'-'Z' or " " + wcsname : str + Name of alternate WCS description + """ + if isinstance(fname, str): + fobj = fits.open(fname, mode='update') + else: + fobj = fname + + if not _parpasscheck(fobj, ext, wcskey, wcsname): + closefobj(fname, fobj) + raise ValueError("Input parameters problem") + + # Interpret input 'ext' value to get list of extensions to process + ext = _buildExtlist(fobj, ext) + # Do not allow deleting the original WCS. + if wcskey == 'O': + print("Wcskey 'O' is reserved for the original WCS and should not be deleted.") + closefobj(fname, fobj) + return + + wcskeyext = ext[0] + + if not wcskeys and not wcsname: + raise KeyError("Either wcskey or wcsname should be specified") + + if wcskey == " ": + # try getting the key from WCSNAME + wkey = getKeyFromName(fobj[wcskeyext].header, wcsname) + if not wkey: + closefobj(fname, fobj) + raise KeyError("Could not get a key: wcsname '%s' not found in header." % wcsname) + else: + if wcskey not in wcskeys(fobj[wcskeyext].header): + closefobj(fname, fobj) + raise KeyError("Could not find alternate WCS with key %s in this file" % wcskey) + wkey = wcskey + + prexts = [] + for i in ext: + hdr = fobj[i].header + hwcs = readAltWCS(fobj,i,wcskey=wkey) + if hwcs is None: + continue + for k in hwcs[::-1]: + del hdr[k] + #del hdr['ORIENT'+wkey] + prexts.append(i) + if prexts != []: + print('Deleted all instances of WCS with key %s in extensions' % wkey, prexts) + else: + print("Did not find WCS with key %s in any of the extensions" % wkey) + closefobj(fname, fobj) + +def _buildExtlist(fobj, ext): + """ + Utility function to interpret the provided value of 'ext' and return a list + of 'valid' values which can then be used by the rest of the functions in + this module. + + Parameters + ---------- + fobj: HDUList + file to be examined + ext: an int, a tuple, string, list of integers or tuples (e.g.('sci',1)) + fits extensions to work with + If a string is provided, it should specify the EXTNAME of extensions + with WCSs to be archived + """ + if not isinstance(ext,list): + if isinstance(ext,str): + extstr = ext + ext = [] + for extn in range(1, len(fobj)): + if 'extname' in fobj[extn].header and fobj[extn].header['extname'] == extstr: + ext.append(extn) + elif isinstance(ext, int) or isinstance(ext, tuple): + ext = [ext] + else: + raise KeyError("Valid extensions in 'ext' parameter need to be specified.") + return ext + +def _restore(fobj, ukey, fromextnum, + toextnum=None, fromextnam=None, toextnam=None, verbose=True): + """ + fobj: string of HDUList + ukey: string 'A'-'Z' + wcs key + fromextnum: int + extver of extension from which to copy WCS + fromextnam: string + extname of extension from which to copy WCS + toextnum: int + extver of extension to which to copy WCS + toextnam: string + extname of extension to which to copy WCS + """ + # create an extension tuple, e.g. ('SCI', 2) + if fromextnam: + fromextension = (fromextnam, fromextnum) + else: + fromextension = fromextnum + if toextnum: + if toextnam: + toextension = (toextnam, toextnum) + else: + toextension =toextnum + else: + toextension = fromextension + + hwcs = readAltWCS(fobj,fromextension,wcskey=ukey,verbose=verbose) + if hwcs is None: + return + + for k in hwcs.keys(): + key = k[:-1] + if key in fobj[toextension].header: + #fobj[toextension].header.update(key=key, value = hwcs[k]) + fobj[toextension].header[key] = hwcs[k] + else: + continue + if key == 'O' and 'TDDALPHA' in fobj[toextension].header: + fobj[toextension].header['TDDALPHA'] = 0.0 + fobj[toextension].header['TDDBETA'] = 0.0 + if 'ORIENTAT' in fobj[toextension].header: + norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %ukey], hwcs['CD2_2'+'%s' %ukey])) + fobj[toextension].header['ORIENTAT'] = norient + # Reset 2014 TDD keywords prior to computing new values (if any are computed) + for kw in ['TDD_CYA','TDD_CYB','TDD_CXA','TDD_CXB']: + if kw in fobj[toextension].header: + fobj[toextension].header[kw] = 0.0 + +#header operations +def _check_headerpars(fobj, ext): + if not isinstance(fobj, fits.Header) and not isinstance(fobj, fits.HDUList) \ + and not isinstance(fobj, str): + raise ValueError("Expected a file name, a file object or a header\n") + + if not isinstance(fobj, fits.Header): + #raise ValueError("Expected a valid ext parameter when input is a file") + if not isinstance(ext, int) and not isinstance(ext, tuple): + raise ValueError("Expected ext to be a number or a tuple, e.g. ('SCI', 1)\n") + +def _getheader(fobj, ext): + if isinstance(fobj, str): + hdr = fits.getheader(fobj,ext) + elif isinstance(fobj, fits.Header): + hdr = fobj + else: + hdr = fobj[ext].header + return hdr + +def readAltWCS(fobj, ext, wcskey=' ',verbose=False): + """ Reads in alternate WCS from specified extension + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` + fits filename or fits file object + containing alternate/primary WCS(s) to be converted + wcskey : str + [" ",A-Z] + alternate/primary WCS key that will be replaced by the new key + ext : int + fits extension number + + Returns + ------- + hdr: fits.Header + header object with ONLY the keywords for specified alternate WCS + """ + if isinstance(fobj, str): + fobj = fits.open(fobj) + + hdr = _getheader(fobj,ext) + try: + nwcs = pywcs.WCS(hdr, fobj=fobj, key=wcskey) + except KeyError: + if verbose: + print('readAltWCS: Could not read WCS with key %s' %wcskey) + print(' Skipping %s[%s]' % (fobj.filename(), str(ext))) + return None + hwcs = nwcs.to_header() + + if nwcs.wcs.has_cd(): + hwcs = pc2cd(hwcs, key=wcskey) + + return hwcs + +def convertAltWCS(fobj,ext,oldkey=" ",newkey=' '): + """ + Translates the alternate/primary WCS with one key to an alternate/primary WCS with + another key. + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList`, or `astropy.io.fits.Header` + fits filename, fits file object or fits header + containing alternate/primary WCS(s) to be converted + ext : int + extension number + oldkey : str + [" ",A-Z] + alternate/primary WCS key that will be replaced by the new key + newkey : str + [" ",A-Z] + new alternate/primary WCS key + + Returns + ------- + hdr: `astropy.io.fits.Header` + header object with keywords renamed from oldkey to newkey + """ + hdr = readAltWCS(fobj,ext,wcskey=oldkey) + if hdr is None: + return None + # Converting WCS to new key + for card in hdr: + if oldkey == ' ' or oldkey == '': + cname = card + else: + cname = card.rstrip(oldkey) + hdr.rename_key(card,cname+newkey,force=True) + + return hdr + +def wcskeys(fobj, ext=None): + """ + Returns a list of characters used in the header for alternate + WCS description with WCSNAME keyword + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header` + fits file name, fits file object or fits header + ext : int or None + extension number + if None, fobj must be a header + """ + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) + names = hdr["WCSNAME*"] + d = [] + for key in names: + wkey = key.replace('WCSNAME','') + if wkey == '': wkey = ' ' + d.append(wkey) + return d + +def wcsnames(fobj, ext=None): + """ + Returns a dictionary of wcskey: WCSNAME pairs + + Parameters + ---------- + fobj : stri, `astropy.io.fits.HDUList` or `astropy.io.fits.Header` + fits file name, fits file object or fits header + ext : int or None + extension number + if None, fobj must be a header + + """ + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) + names = hdr["WCSNAME*"] + d = {} + for keyword, value in names.items(): + wkey = keyword.replace('WCSNAME','') + if wkey == '': wkey = ' ' + d[wkey] = value + return d + +def available_wcskeys(fobj, ext=None): + """ + Returns a list of characters which are not used in the header + with WCSNAME keyword. Any of them can be used to save a new + WCS. + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header` + fits file name, fits file object or fits header + ext : int or None + extension number + if None, fobj must be a header + """ + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) + all_keys = list(string.ascii_uppercase) + used_keys = wcskeys(hdr) + try: + used_keys.remove(" ") + except ValueError: + pass + [all_keys.remove(key) for key in used_keys] + return all_keys + +def next_wcskey(fobj, ext=None): + """ + Returns next available character to be used for an alternate WCS + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header` + fits file name, fits file object or fits header + ext : int or None + extension number + if None, fobj must be a header + """ + _check_headerpars(fobj, ext) + hdr = _getheader(fobj, ext) + allkeys = available_wcskeys(hdr) + if allkeys != []: + return allkeys[0] + else: + return None + +def getKeyFromName(header, wcsname): + """ + If WCSNAME is found in header, return its key, else return + None. This is used to update an alternate WCS + repeatedly and not generate new keys every time. + + Parameters + ---------- + header : `astropy.io.fits.Header` + wcsname : str + value of WCSNAME + """ + wkey = None + names = wcsnames(header) + wkeys = [] + for item in names.items(): + if item[1].lower() == wcsname.lower(): + wkeys.append(item[0]) + wkeys.sort() + if len(wkeys) > 0: + wkey = wkeys[-1] + else: + wkey = None + return wkey + +def pc2cd(hdr, key=' '): + """ + Convert a CD PC matrix to a CD matrix. + + WCSLIB (and PyWCS) recognizes CD keywords as input + but converts them and works internally with the PC matrix. + to_header() returns the PC matrix even if the i nput was a + CD matrix. To keep input and output consistent we check + for has_cd and convert the PC back to CD. + + Parameters + ---------- + hdr: `astropy.io.fits.Header` + + """ + for c in ['1_1', '1_2', '2_1', '2_2']: + try: + val = hdr['PC'+c+'%s' % key] + del hdr['PC'+c+ '%s' % key] + except KeyError: + if c=='1_1' or c == '2_2': + val = 1. + else: + val = 0. + #hdr.update(key='CD'+c+'%s' %key, value=val) + hdr['CD{0}{1}'.format(c, key)] = val + return hdr + +def _parpasscheck(fobj, ext, wcskey, fromext=None, toext=None, reusekey=False): + """ + Check input parameters to altwcs functions + + fobj : str or `astropy.io.fits.HDUList` object + a file name or a file object + ext : int, a tuple, a python list of integers or a python list + of tuples (e.g.('sci',1)) + fits extensions to work with + wcskey : str + "A"-"Z" or " "- Used for one of 26 alternate WCS definitions + wcsname : str + (optional) + if given and wcskey is " ", will try to restore by WCSNAME value + reusekey : bool + A flag which indicates whether to reuse a wcskey in the header + """ + if not isinstance(fobj, fits.HDUList): + print("First parameter must be a fits file object or a file name.") + return False + + # first one covers the case of an object created in memory + # (e.g. headerlet) for which fileinfo returns None + if fobj.fileinfo(0) is None: + pass + else: + # an HDUList object with associated file + if fobj.fileinfo(0)['filemode'] is not 'update': + print("First parameter must be a file name or a file object opened in 'update' mode.") + return False + + if not isinstance(ext, int) and not isinstance(ext, tuple) \ + and not isinstance(ext,str) \ + and not isinstance(ext, list) and ext is not None: + print("Ext must be integer, tuple, string,a list of int extension numbers, \n\ + or a list of tuples representing a fits extension, for example ('sci', 1).") + return False + + if not isinstance(fromext, str) and fromext is not None: + print("fromext must be a string representing a valid extname") + return False + + if not isinstance(toext, list) and not isinstance(toext, str) and \ + toext is not None : + print("toext must be a string or a list of strings representing extname") + return False + + if len(wcskey) != 1: + print('Parameter wcskey must be a character - one of "A"-"Z" or " "') + return False + + return True + +def closefobj(fname, f): + """ + Functions in this module accept as input a file name or a file object. + If the input was a file name (string) we close the object. If the user + passed a file object we leave it to the user to close it. + """ + if isinstance(fname, str): + f.close() + +def mapFitsExt2HDUListInd(fname, extname): + """ + Map FITS extensions with 'EXTNAME' to HDUList indexes. + """ + + if not isinstance(fname, fits.HDUList): + f = fits.open(fname) + close_file = True + else: + f = fname + close_file = False + d = {} + for hdu in f: + if 'EXTNAME' in hdu.header and hdu.header['EXTNAME'] == extname: + extver = hdu.header['EXTVER'] + d[(extname, extver)] = f.index_of((extname, extver)) + if close_file: + f.close() + return d diff --git a/stwcs/wcsutil/convertwcs.py b/stwcs/wcsutil/convertwcs.py new file mode 100644 index 0000000..a384eb1 --- /dev/null +++ b/stwcs/wcsutil/convertwcs.py @@ -0,0 +1,118 @@ +from __future__ import print_function +try: + import stwcs + from stwcs import wcsutil +except: + stwcs = None + +from stsci.tools import fileutil + +OPUS_WCSKEYS = ['OCRVAL1','OCRVAL2','OCRPIX1','OCRPIX2', + 'OCD1_1','OCD1_2','OCD2_1','OCD2_2', + 'OCTYPE1','OCTYPE2'] + + +def archive_prefix_OPUS_WCS(fobj,extname='SCI'): + """ Identifies WCS keywords which were generated by OPUS and archived + using a prefix of 'O' for all 'SCI' extensions in the file + + Parameters + ---------- + fobj : str or `astropy.io.fits.HDUList` + Filename or fits object of a file + + """ + if stwcs is None: + print('=====================') + print('The STWCS package is needed to convert an old-style OPUS WCS to an alternate WCS') + print('=====================') + raise ImportError + + + closefits = False + if isinstance(fobj,str): + # A filename was provided as input + fobj = fits.open(fobj,mode='update') + closefits=True + + # Define the header + ext = ('sci',1) + hdr = fobj[ext].header + + numextn = fileutil.countExtn(fobj) + extlist = [] + for e in range(1,numextn+1): + extlist.append(('sci',e)) + + # Insure that the 'O' alternate WCS is present + if 'O' not in wcsutil.wcskeys(hdr): + # if not, archive the Primary WCS as the default OPUS WCS + wcsutil.archiveWCS(fobj,extlist, wcskey='O', wcsname='OPUS') + + # find out how many SCI extensions are in the image + numextn = fileutil.countExtn(fobj,extname=extname) + if numextn == 0: + extname = 'PRIMARY' + + # create HSTWCS object from PRIMARY WCS + wcsobj = wcsutil.HSTWCS(fobj,ext=ext,wcskey='O') + # get list of WCS keywords + wcskeys = list(wcsobj.wcs2header().keys()) + + # For each SCI extension... + for e in range(1,numextn+1): + # Now, look for any WCS keywords with a prefix of 'O' + for key in wcskeys: + okey = 'O'+key[:7] + hdr = fobj[(extname,e)].header + if okey in hdr: + # Update alternate WCS keyword with prefix-O OPUS keyword value + hdr[key] = hdr[okey] + + if closefits: + fobj.close() + +def create_prefix_OPUS_WCS(fobj,extname='SCI'): + """ Creates alternate WCS with a prefix of 'O' for OPUS generated WCS values + to work with old MultiDrizzle. + + Parameters + ---------- + fobj : str or `astropy.io.fits.HDUList` + Filename or fits object of a file + + Raises + ------ + IOError: + if input FITS object was not opened in 'update' mode + + """ + # List of O-prefix keywords to create + owcskeys = OPUS_WCSKEYS + + closefits = False + if isinstance(fobj,str): + # A filename was provided as input + fobj = fits.open(fobj, mode='update') + closefits=True + else: + # check to make sure this FITS obj has been opened in update mode + if fobj.fileinfo(0)['filemode'] != 'update': + print('File not opened with "mode=update". Quitting...') + raise IOError + + # check for existance of O-prefix WCS + if owcskeys[0] not in fobj['sci',1].header: + + # find out how many SCI extensions are in the image + numextn = fileutil.countExtn(fobj,extname=extname) + if numextn == 0: + extname = '' + for extn in range(1,numextn+1): + hdr = fobj[(extname,extn)].header + for okey in owcskeys: + hdr[okey] = hdr[okey[1:]+'O'] + + # Close FITS image if we had to open it... + if closefits: + fobj.close() diff --git a/stwcs/wcsutil/getinput.py b/stwcs/wcsutil/getinput.py new file mode 100644 index 0000000..8ee1123 --- /dev/null +++ b/stwcs/wcsutil/getinput.py @@ -0,0 +1,62 @@ +from astropy.io import fits +from stsci.tools import irafglob, fileutil, parseinput + +def parseSingleInput(f=None, ext=None): + if isinstance(f, str): + # create an HSTWCS object from a filename + if ext != None: + filename = f + if isinstance(ext,tuple): + if ext[0] == '': + extnum = ext[1] # handle ext=('',1) + else: + extnum = ext + else: + extnum = int(ext) + elif ext == None: + filename, ext = fileutil.parseFilename(f) + ext = fileutil.parseExtn(ext) + if ext[0] == '': + extnum = int(ext[1]) #handle ext=('',extnum) + else: + extnum = ext + phdu = fits.open(filename) + hdr0 = phdu[0].header + try: + ehdr = phdu[extnum].header + except (IndexError, KeyError) as e: + raise e.__class__('Unable to get extension %s.' % extnum) + + elif isinstance(f, fits.HDUList): + phdu = f + if ext == None: + extnum = 0 + else: + extnum = ext + ehdr = f[extnum].header + hdr0 = f[0].header + filename = hdr0.get('FILENAME', "") + + else: + raise ValueError('Input must be a file name string or a' + '`astropy.io.fits.HDUList` object') + + return filename, hdr0, ehdr, phdu + + +def parseMultipleInput(input): + if isinstance(input, str): + if input[0] == '@': + # input is an @ file + filelist = irafglob.irafglob(input) + else: + try: + filelist, output = parseinput.parseinput(input) + except IOError: raise + elif isinstance(input, list): + if isinstance(input[0], wcsutil.HSTWCS): + # a list of HSTWCS objects + return input + else: + filelist = input[:] + return filelist diff --git a/stwcs/wcsutil/headerlet.py b/stwcs/wcsutil/headerlet.py new file mode 100644 index 0000000..c0dd9b0 --- /dev/null +++ b/stwcs/wcsutil/headerlet.py @@ -0,0 +1,2754 @@ +""" +This module implements headerlets. + +A headerlet serves as a mechanism for encapsulating WCS information +which can be used to update the WCS solution of an image. The idea +came up first from the desire for passing improved astrometric +solutions for HST data and provide those solutions in a manner +that would not require getting entirely new images from the archive +when only the WCS information has been updated. + +""" + +from __future__ import absolute_import, division, print_function +import os +import sys +import functools +import logging +import textwrap +import copy +import time + +import numpy as np +from astropy.io import fits +#import pywcs +from astropy import wcs as pywcs +from astropy.utils import lazyproperty + +from stsci.tools.fileutil import countExtn +from stsci.tools import fileutil as fu +from stsci.tools import parseinput + +from stwcs.updatewcs import utils +from . import altwcs +from . import wcscorr +from .hstwcs import HSTWCS +from .mappings import basic_wcs + +#### Logging support functions +class FuncNameLoggingFormatter(logging.Formatter): + def __init__(self, fmt=None, datefmt=None): + if '%(funcName)s' not in fmt: + fmt = '%(funcName)s' + fmt + logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt) + + def format(self, record): + record = copy.copy(record) + if hasattr(record, 'funcName') and record.funcName == 'init_logging': + record.funcName = '' + else: + record.funcName += ' ' + return logging.Formatter.format(self, record) + + +logger = logging.getLogger(__name__) +formatter = FuncNameLoggingFormatter("%(levelname)s: %(message)s") +ch = logging.StreamHandler() +ch.setFormatter(formatter) +ch.setLevel(logging.CRITICAL) +logger.addHandler(ch) +logger.setLevel(logging.DEBUG) + +FITS_STD_KW = ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT', + 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN', + 'INHERIT', 'DATE', 'IRAF-TLM'] + +DEFAULT_SUMMARY_COLS = ['HDRNAME', 'WCSNAME', 'DISTNAME', 'AUTHOR', 'DATE', + 'SIPNAME', 'NPOLFILE', 'D2IMFILE', 'DESCRIP'] +COLUMN_DICT = {'vals': [], 'width': []} +COLUMN_FMT = '{:<{width}}' + + +def init_logging(funcname=None, level=100, mode='w', **kwargs): + """ + + Initialize logging for a function + + Parameters + ---------- + funcname: string + Name of function which will be recorded in log + level: int, or bool, or string + int or string : Logging level + bool: False - switch off logging + Text logging level for the message ("DEBUG", "INFO", + "WARNING", "ERROR", "CRITICAL") + mode: 'w' or 'a' + attach to logfile ('a' or start a new logfile ('w') + + """ + for hndl in logger.handlers: + if isinstance(hndl, logging.FileHandler): + has_file_handler = True + else: + has_file_handler = False + if level: + if not has_file_handler: + logname = 'headerlet.log' + fh = logging.FileHandler(logname, mode=mode) + fh.setFormatter(formatter) + fh.setLevel(logging.DEBUG) + logger.addHandler(fh) + logger.info("%s: Starting %s with arguments:\n\t %s" % + (time.asctime(), funcname, kwargs)) + + +def with_logging(func): + @functools.wraps(func) + def wrapped(*args, **kw): + level = kw.get('logging', 100) + mode = kw.get('logmode', 'w') + func_args = kw.copy() + if sys.version_info[0] >= 3: + argnames = func.__code__.co_varnames + else: + argnames = func.func_code.co_varnames + + for argname, arg in zip(argnames, args): + func_args[argname] = arg + + init_logging(func.__name__, level, mode, **func_args) + return func(*args, **kw) + return wrapped + +#### Utility functions +def is_par_blank(par): + return par in ['', ' ', 'INDEF', "None", None] + +def parse_filename(fname, mode='readonly'): + """ + Interprets the input as either a filename of a file that needs to be opened + or a PyFITS object. + + Parameters + ---------- + fname : str, `astropy.io.fits.HDUList` + Input pointing to a file or `astropy.io.fits.HDUList` object. + An input filename (str) will be expanded as necessary to + interpret any environmental variables + included in the filename. + + mode : string + Specifies what mode to use when opening the file, if it needs + to open the file at all [Default: 'readonly'] + + Returns + ------- + fobj : `astropy.io.fits.HDUList` + FITS file handle for input + + fname : str + Name of input file + + close_fobj : bool + Flag specifying whether or not fobj needs to be closed since it was + opened by this function. This allows a program to know whether they + need to worry about closing the FITS object as opposed to letting + the higher level interface close the object. + + """ + close_fobj = False + if not isinstance(fname, list): + if sys.version_info[0] >= 3: + is_string = isinstance(fname, str) + else: + is_string = isinstance(fname, basestring) + if is_string: + fname = fu.osfn(fname) + fobj = fits.open(fname, mode=mode) + close_fobj = True + else: + fobj = fname + if hasattr(fobj, 'filename'): + fname = fobj.filename() + else: + fname = '' + return fobj, fname, close_fobj + +def get_headerlet_kw_names(fobj, kw='HDRNAME'): + """ + Returns a list of specified keywords from all HeaderletHDU + extensions in a science file. + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` + kw : str + Name of keyword to be read and reported + """ + + fobj, fname, open_fobj = parse_filename(fobj) + + hdrnames = [] + for ext in fobj: + if isinstance(ext, fits.hdu.base.NonstandardExtHDU): + hdrnames.append(ext.header[kw]) + + if open_fobj: + fobj.close() + + return hdrnames + +def get_header_kw_vals(hdr, kwname, kwval, default=0): + if kwval is None: + if kwname in hdr: + kwval = hdr[kwname] + else: + kwval = default + return kwval + +@with_logging +def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None, + strict=True, logging=False, logmode='w'): + """ + Returns all HeaderletHDU extensions in a science file that matches + the inputs specified by the user. If no hdrext, hdrname or distname are + specified, this function will return a list of all HeaderletHDU objects. + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` + Name of FITS file or open fits object (`astropy.io.fits.HDUList` instance) + hdrext : int, tuple or None + index number(EXTVER) or extension tuple of HeaderletHDU to be returned + hdrname : string + value of HDRNAME for HeaderletHDU to be returned + distname : string + value of DISTNAME for HeaderletHDUs to be returned + strict : bool [Default: True] + Specifies whether or not at least one parameter needs to be provided + If False, all extension indices returned if hdrext, hdrname and distname + are all None. If True and hdrext, hdrname, and distname are all None, + raise an Exception requiring one to be specified. + logging : boolean + enable logging to a file called headerlet.log + logmode : 'w' or 'a' + log file open mode + + Returns + ------- + hdrlets : list + A list of all matching HeaderletHDU extension indices (could be just one) + + """ + + get_all = False + if hdrext is None and hdrname is None and distname is None: + if not strict: + get_all = True + else: + mess = """\n + ===================================================== + No valid Headerlet extension specified. + Either "hdrname", "hdrext", or "distname" needs to be specified. + ===================================================== + """ + logger.critical(mess) + raise ValueError + + fobj, fname, open_fobj = parse_filename(fobj) + + hdrlets = [] + if hdrext is not None and isinstance(hdrext, int): + if hdrext in range(len(fobj)): # insure specified hdrext is in fobj + if isinstance(fobj[hdrext], fits.hdu.base.NonstandardExtHDU) and \ + fobj[hdrext].header['EXTNAME'] == 'HDRLET': + hdrlets.append(hdrext) + else: + for ext in fobj: + if isinstance(ext, fits.hdu.base.NonstandardExtHDU): + if get_all: + hdrlets.append(fobj.index(ext)) + else: + if hdrext is not None: + if isinstance(hdrext, tuple): + hdrextname = hdrext[0] + hdrextnum = hdrext[1] + else: + hdrextname = 'HDRLET' + hdrextnum = hdrext + hdrext_match = ((hdrext is not None) and + (hdrextnum == ext.header['EXTVER']) and + (hdrextname == ext.header['EXTNAME'])) + hdrname_match = ((hdrname is not None) and + (hdrname == ext.header['HDRNAME'])) + distname_match = ((distname is not None) and + (distname == ext.header['DISTNAME'])) + if hdrext_match or hdrname_match or distname_match: + hdrlets.append(fobj.index(ext)) + + if open_fobj: + fobj.close() + + if len(hdrlets) == 0: + if hdrname: + kwerr = 'hdrname' + kwval = hdrname + elif hdrext: + kwerr = 'hdrext' + kwval = hdrext + else: + kwerr = 'distname' + kwval = distname + message = """\n + ===================================================== + No valid Headerlet extension found!' + "%s" = %s not found in %s.' % (kwerr, kwval, fname) + ===================================================== + """ + logger.critical(message) + raise ValueError + + return hdrlets + +def verify_hdrname_is_unique(fobj, hdrname): + """ + Verifies that no other HeaderletHDU extension has the specified hdrname. + + Parameters + ---------- + fobj : str, `astropy.io.fits.HDUList` + Name of FITS file or open fits file object + hdrname : str + value of HDRNAME for HeaderletHDU to be compared as unique + + Returns + ------- + unique: bool + If True, no other HeaderletHDU has the specified HDRNAME value + """ + hdrnames_list = get_headerlet_kw_names(fobj) + unique = not(hdrname in hdrnames_list) + + return unique + +def update_versions(sourcehdr, desthdr): + """ + Update keywords which store version numbers + """ + phdukw = {'PYWCSVER': 'Version of PYWCS used to updated the WCS', + 'UPWCSVER': 'Version of STWCS used to updated the WCS' + } + for key in phdukw: + try: + desthdr[key] = (sourcehdr[key], sourcehdr.comments[key]) + except KeyError: + desthdr[key] = (" ", phdukw[key]) + +def update_ref_files(source, dest): + """ + Update the reference files name in the primary header of 'dest' + using values from 'source' + + Parameters + ---------- + source : `astropy.io.fits.Header` + dest : `astropy.io.fits.Header` + """ + logger.info("Updating reference files") + phdukw = {'NPOLFILE': True, + 'IDCTAB': True, + 'D2IMFILE': True, + 'SIPNAME': True, + 'DISTNAME': True} + + for key in phdukw: + try: + try: + del dest[key] + except: + pass + dest.set(key, source[key], source.comments[key]) + except KeyError: + phdukw[key] = False + return phdukw + +def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None, + output=None, clobber=True, quiet=False ): + """ + Print out summary dictionary to STDOUT, and possibly an output file + + """ + nrows = None + if idcol: + nrows = len(idcol['vals']) + + # Find max width of each column + column_widths = {} + for kw in summary_dict: + colwidth = np.array(summary_dict[kw]['width']).max() + if maxwidth: + colwidth = min(colwidth, maxwidth) + column_widths[kw] = colwidth + pad + if nrows is None: + nrows = len(summary_dict[kw]['vals']) + + # print rows now + outstr = '' + # Start with column names + if idcol: + outstr += COLUMN_FMT.format(idcol['name'], width=idcol['width'] + pad) + for kw in summary_cols: + outstr += COLUMN_FMT.format(kw, width=column_widths[kw]) + outstr += '\n' + # Now, add a row for each headerlet + for row in range(nrows): + if idcol: + outstr += COLUMN_FMT.format(idcol['vals'][row], + width=idcol['width']+pad) + for kw in summary_cols: + val = summary_dict[kw]['vals'][row][:(column_widths[kw]-pad)] + outstr += COLUMN_FMT.format(val, width=column_widths[kw]) + outstr += '\n' + if not quiet: + print(outstr) + + # If specified, write info to separate text file + write_file = False + if output: + output = fu.osfn(output) # Expand any environment variables in filename + write_file = True + if os.path.exists(output): + if clobber: + os.remove(output) + else: + print('WARNING: Not writing results to file!') + print(' Output text file ', output, ' already exists.') + print(' Set "clobber" to True or move file before trying again.') + write_file = False + if write_file: + fout = open(output, mode='w') + fout.write(outstr) + fout.close() + +#### Private utility functions +def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname, + sipname, npolfile, d2imfile, + nmatch,catalog, wcskey, + author, descrip, history): + # convert input values into valid FITS kw values + if author is None: + author = '' + if descrip is None: + descrip = '' + + sipname, idctab = utils.build_sipname(fobj, fname, sipname) + logger.info("Setting sipname value to %s" % sipname) + + npolname, npolfile = utils.build_npolname(fobj, npolfile) + logger.info("Setting npolfile value to %s" % npolname) + + d2imname, d2imfile = utils.build_d2imname(fobj,d2imfile) + logger.info("Setting d2imfile value to %s" % d2imname) + + distname = utils.build_distname(sipname, npolname, d2imname) + logger.info("Setting distname to %s" % distname) + + # open file and parse comments + if history not in ['', ' ', None, 'INDEF'] and os.path.isfile(history): + f = open(fu.osfn(history)) + history = f.readlines() + f.close() + else: + history = '' + + rms_ra = fobj[wcsext].header.get("CRDER1"+wcskey, 0) + rms_dec = fobj[wcsext].header.get("CRDER2"+wcskey, 0) + if not nmatch: + nmatch = fobj[wcsext].header.get("NMATCH"+wcskey, 0) + if not catalog: + catalog = fobj[wcsext].header.get('CATALOG'+wcskey, "") + # get the version of STWCS used to create the WCS of the science file. + #try: + #upwcsver = fobj[0].header.cards[fobj[0].header.index('UPWCSVER')] + #except KeyError: + #upwcsver = pyfits.Card("UPWCSVER", " ", + #"Version of STWCS used to update the WCS") + #try: + #pywcsver = fobj[0].header.cards[fobj[0].header.index('PYWCSVER')] + #except KeyError: + #pywcsver = pyfits.Card("PYWCSVER", " ", + #"Version of PYWCS used to update the WCS") + upwcsver = fobj[0].header.get('UPWCSVER', "") + pywcsver = fobj[0].header.get('PYWCSVER', "") + # build Primary HDU + phdu = fits.PrimaryHDU() + phdu.header['DESTIM'] = (destim, 'Destination observation root name') + phdu.header['HDRNAME'] = (hdrname, 'Headerlet name') + fmt = "%Y-%m-%dT%H:%M:%S" + phdu.header['DATE'] = (time.strftime(fmt), 'Date FITS file was generated') + phdu.header['WCSNAME'] = (wcsname, 'WCS name') + phdu.header['DISTNAME'] = (distname, 'Distortion model name') + phdu.header['SIPNAME'] = (sipname, + 'origin of SIP polynomial distortion model') + phdu.header['NPOLFILE'] = (npolfile, + 'origin of non-polynmial distortion model') + phdu.header['D2IMFILE'] = (d2imfile, + 'origin of detector to image correction') + phdu.header['IDCTAB'] = (idctab, + 'origin of Polynomial Distortion') + phdu.header['AUTHOR'] = (author, 'headerlet created by this user') + phdu.header['DESCRIP'] = (descrip, + 'Short description of headerlet solution') + phdu.header['RMS_RA'] = (rms_ra, + 'RMS in RA at ref pix of headerlet solution') + phdu.header['RMS_DEC'] = (rms_dec, + 'RMS in Dec at ref pix of headerlet solution') + phdu.header['NMATCH'] = (nmatch, + 'Number of sources used for headerlet solution') + phdu.header['CATALOG'] = (catalog, + 'Astrometric catalog used for headerlet ' + 'solution') + phdu.header['UPWCSVER'] = (upwcsver, "Version of STWCS used to update the WCS") + phdu.header['PYWCSVER'] = (pywcsver, "Version of PYWCS used to update the WCS") + + # clean up history string in order to remove whitespace characters that + # would cause problems with FITS + if isinstance(history, list): + history_str = '' + for line in history: + history_str += line + else: + history_str = history + history_lines = textwrap.wrap(history_str, width=70) + for hline in history_lines: + phdu.header.add_history(hline) + + return phdu + + +#### Public Interface functions +@with_logging +def extract_headerlet(filename, output, extnum=None, hdrname=None, + clobber=False, logging=True): + """ + Finds a headerlet extension in a science file and writes it out as + a headerlet FITS file. + + If both hdrname and extnum are given they should match, if not + raise an Exception + + Parameters + ---------- + filename: string or HDUList or Python list + This specifies the name(s) of science file(s) from which headerlets + will be extracted. + + String input formats supported include use of wild-cards, IRAF-style + '@'-files (given as '@<filename>') and comma-separated list of names. + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + If a list of filenames has been specified, it will extract a + headerlet from the same extnum from all filenames. + output: string + Filename or just rootname of output headerlet FITS file + If string does not contain '.fits', it will create a filename with + '_hlet.fits' suffix + extnum: int + Extension number which contains the headerlet to be written out + hdrname: string + Unique name for headerlet, stored as the HDRNAME keyword + It stops if a value is not provided and no extnum has been specified + clobber: bool + If output file already exists, this parameter specifies whether or not + to overwrite that file [Default: False] + logging: boolean + enable logging to a file + + """ + + if isinstance(filename, fits.HDUList): + filename = [filename] + else: + filename, oname = parseinput.parseinput(filename) + + for f in filename: + fobj, fname, close_fobj = parse_filename(f) + frootname = fu.buildNewRootname(fname) + if hdrname in ['', ' ', None, 'INDEF'] and extnum is None: + if close_fobj: + fobj.close() + logger.critical("Expected a valid extnum or hdrname parameter") + raise ValueError + if hdrname is not None: + extn_from_hdrname = find_headerlet_HDUs(fobj, hdrname=hdrname)[0] + if extn_from_hdrname != extnum: + logger.critical("hdrname and extnmu should refer to the same FITS extension") + raise ValueError + else: + hdrhdu = fobj[extn_from_hdrname] + else: + hdrhdu = fobj[extnum] + + if not isinstance(hdrhdu, HeaderletHDU): + logger.critical("Specified extension is not a headerlet") + raise ValueError + + hdrlet = hdrhdu.headerlet + + if output is None: + output = frootname + + if '.fits' in output: + outname = output + else: + outname = '%s_hlet.fits' % output + + hdrlet.tofile(outname, clobber=clobber) + + if close_fobj: + fobj.close() + + +@with_logging +def write_headerlet(filename, hdrname, output=None, sciext='SCI', + wcsname=None, wcskey=None, destim=None, + sipname=None, npolfile=None, d2imfile=None, + author=None, descrip=None, history=None, + nmatch=None, catalog=None, + attach=True, clobber=False, logging=False): + + """ + Save a WCS as a headerlet FITS file. + + This function will create a headerlet, write out the headerlet to a + separate headerlet file, then, optionally, attach it as an extension + to the science image (if it has not already been archived) + + Either wcsname or wcskey must be provided; if both are given, they must + match a valid WCS. + + Updates wcscorr if necessary. + + Parameters + ---------- + filename: string or HDUList or Python list + This specifies the name(s) of science file(s) from which headerlets + will be created and written out. + String input formats supported include use of wild-cards, IRAF-style + '@'-files (given as '@<filename>') and comma-separated list of names. + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + hdrname: string + Unique name for this headerlet, stored as HDRNAME keyword + output: string or None + Filename or just rootname of output headerlet FITS file + If string does not contain '.fits', it will create a filename + starting with the science filename and ending with '_hlet.fits'. + If None, a default filename based on the input filename will be + generated for the headerlet FITS filename + sciext: string + name (EXTNAME) of extension that contains WCS to be saved + wcsname: string + name of WCS to be archived, if " ": stop + wcskey: one of A...Z or " " or "PRIMARY" + if " " or "PRIMARY" - archive the primary WCS + destim: string + DESTIM keyword + if NOne, use ROOTNAME or science file name + sipname: string or None (default) + Name of unique file where the polynomial distortion coefficients were + read from. If None, the behavior is: + The code looks for a keyword 'SIPNAME' in the science header + If not found, for HST it defaults to 'IDCTAB' + If there is no SIP model the value is 'NOMODEL' + If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN' + npolfile: string or None (default) + Name of a unique file where the non-polynomial distortion was stored. + If None: + The code looks for 'NPOLFILE' in science header. + If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL' + If npol model exists, it is set to 'UNKNOWN' + d2imfile: string + Name of a unique file where the detector to image correction was + stored. If None: + The code looks for 'D2IMFILE' in the science header. + If 'D2IMFILE' is not found and there is no d2im correction, + it is set to 'NOMODEL' + If d2im correction exists, but 'D2IMFILE' is missing from science + header, it is set to 'UNKNOWN' + author: string + Name of user who created the headerlet, added as 'AUTHOR' keyword + to headerlet PRIMARY header + descrip: string + Short description of the solution provided by the headerlet + This description will be added as the single 'DESCRIP' keyword + to the headerlet PRIMARY header + history: filename, string or list of strings + Long (possibly multi-line) description of the solution provided + by the headerlet. These comments will be added as 'HISTORY' cards + to the headerlet PRIMARY header + If filename is specified, it will format and attach all text from + that file as the history. + attach: bool + Specify whether or not to attach this headerlet as a new extension + It will verify that no other headerlet extension has been created with + the same 'hdrname' value. + clobber: bool + If output file already exists, this parameter specifies whether or not + to overwrite that file [Default: False] + logging: boolean + enable file logging + """ + + if isinstance(filename, fits.HDUList): + filename = [filename] + else: + filename, oname = parseinput.parseinput(filename) + + for f in filename: + if isinstance(f, str): + fname = f + else: + fname = f.filename() + + if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None: + message = """\n + No valid WCS found found in %s. + A valid value for either "wcsname" or "wcskey" + needs to be specified. + """ % fname + logger.critical(message) + raise ValueError + + # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' ' + if wcskey == 'PRIMARY': + wcskey = ' ' + + if attach: + umode = 'update' + else: + umode = 'readonly' + + fobj, fname, close_fobj = parse_filename(f, mode=umode) + + # Interpret sciext input for this file + if isinstance(sciext, int): + sciextlist = [sciext] # allow for specification of simple FITS header + elif isinstance(sciext, str): + numsciext = countExtn(fobj, sciext) + if numsciext > 0: + sciextlist = [tuple((sciext,i)) for i in range(1, numsciext+1)] + else: + sciextlist = [0] + elif isinstance(sciext, list): + sciextlist = sciext + else: + errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\ + " a valid EXTNAME string, or an integer." + logger.critical(errstr) + raise ValueError + + wnames = altwcs.wcsnames(fobj,ext=sciextlist[0]) + + # Insure that WCSCORR table has been created with all original + # WCS's recorded prior to adding the headerlet WCS + wcscorr.init_wcscorr(fobj) + + if wcsname is None: + scihdr = fobj[sciextlist[0]].header + wname = scihdr['wcsname'+wcskey] + else: + wname = wcsname + if hdrname in [None, ' ', '']: + hdrname = wcsname + + logger.critical('Creating the headerlet from image %s' % fname) + hdrletobj = create_headerlet(fobj, sciext=sciextlist, + wcsname=wname, wcskey=wcskey, + hdrname=hdrname, + sipname=sipname, npolfile=npolfile, + d2imfile=d2imfile, author=author, + descrip=descrip, history=history, + nmatch=nmatch, catalog=catalog, + logging=False) + + if attach: + # Check to see whether or not a HeaderletHDU with + #this hdrname already exists + hdrnames = get_headerlet_kw_names(fobj) + if hdrname not in hdrnames: + hdrlet_hdu = HeaderletHDU.fromheaderlet(hdrletobj) + + if destim is not None: + hdrlet_hdu.header['destim'] = destim + + fobj.append(hdrlet_hdu) + + # Update the WCSCORR table with new rows from the headerlet's WCSs + wcscorr.update_wcscorr(fobj, source=hdrletobj, + extname='SIPWCS', wcs_id=wname) + + utils.updateNEXTENDKw(fobj) + fobj.flush() + else: + message = """ + Headerlet with hdrname %s already archived for WCS %s. + No new headerlet appended to %s. + """ % (hdrname, wname, fname) + logger.critical(message) + + if close_fobj: + logger.info('Closing image in write_headerlet()...') + fobj.close() + + frootname = fu.buildNewRootname(fname) + + if output is None: + # Generate default filename for headerlet FITS file + outname = '{0}_hlet.fits'.format(frootname) + else: + outname = output + + if not outname.endswith('.fits'): + outname = '{0}_{1}_hlet.fits'.format(frootname,outname) + + # If user specifies an output filename for headerlet, write it out + hdrletobj.tofile(outname, clobber=clobber) + logger.critical( 'Created Headerlet file %s ' % outname) + + del hdrletobj + +@with_logging +def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None, + wcskey=" ", wcsname=None, + sipname=None, npolfile=None, d2imfile=None, + author=None, descrip=None, history=None, + nmatch=None, catalog=None, + logging=False, logmode='w'): + """ + Create a headerlet from a WCS in a science file + If both wcskey and wcsname are given they should match, if not + raise an Exception + + Parameters + ---------- + filename: string or HDUList + Either a filename or PyFITS HDUList object for the input science file + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + sciext: string or python list (default: 'SCI') + Extension in which the science data with the linear WCS is. + The headerlet will be created from these extensions. + If string - a valid EXTNAME is expected + If int - specifies an extension with a valid WCS, such as 0 for a + simple FITS file + If list - a list of FITS extension numbers or strings representing + extension tuples, e.g. ('SCI, 1') is expected. + hdrname: string + value of HDRNAME keyword + Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey> + It stops if neither is found in the science file and a value is not provided + destim: string or None + name of file this headerlet can be applied to + if None, use ROOTNAME keyword + wcskey: char (A...Z) or " " or "PRIMARY" or None + a char representing an alternate WCS to be used for the headerlet + if " ", use the primary (default) + if None use wcsname + wcsname: string or None + if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet + sipname: string or None (default) + Name of unique file where the polynomial distortion coefficients were + read from. If None, the behavior is: + The code looks for a keyword 'SIPNAME' in the science header + If not found, for HST it defaults to 'IDCTAB' + If there is no SIP model the value is 'NOMODEL' + If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN' + npolfile: string or None (default) + Name of a unique file where the non-polynomial distortion was stored. + If None: + The code looks for 'NPOLFILE' in science header. + If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL' + If npol model exists, it is set to 'UNKNOWN' + d2imfile: string + Name of a unique file where the detector to image correction was + If None: + The code looks for 'D2IMFILE' in the science header. + If 'D2IMFILE' is not found and there is no d2im correction, + it is set to 'NOMODEL' + If d2im correction exists, but 'D2IMFILE' is missing from science + header, it is set to 'UNKNOWN' + author: string + Name of user who created the headerlet, added as 'AUTHOR' keyword + to headerlet PRIMARY header + descrip: string + Short description of the solution provided by the headerlet + This description will be added as the single 'DESCRIP' keyword + to the headerlet PRIMARY header + history: filename, string or list of strings + Long (possibly multi-line) description of the solution provided + by the headerlet. These comments will be added as 'HISTORY' cards + to the headerlet PRIMARY header + If filename is specified, it will format and attach all text from + that file as the history. + nmatch: int (optional) + Number of sources used in the new solution fit + catalog: string (optional) + Astrometric catalog used for headerlet solution + logging: boolean + enable file logging + logmode: 'w' or 'a' + log file open mode + + Returns + ------- + Headerlet object + + """ + if wcskey == 'O': + message = "Warning: 'O' is a reserved key for the original WCS. Quitting..." + logger.info(message) + return None + + fobj, fname, close_file = parse_filename(filename) + # based on `sciext` create a list of (EXTNAME, EXTVER) tuples + # of extensions with WCS to be saved in a headerlet + sciext = get_extname_extver_list(fobj, sciext) + logger.debug("Data extensions from which to create headerlet:\n\t %s" + % (str(sciext))) + if not sciext: + logger.critical("No valid target extensions found in file %s" % fname) + raise ValueError + + # Define extension to evaluate for verification of input parameters + wcsext = sciext[0] + logger.debug("sciext in create_headerlet is %s" % str(sciext)) + # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' ' + if wcskey == 'PRIMARY': + wcskey = ' ' + logger.info("wcskey reset from 'PRIMARY' to ' '") + wcskey = wcskey.upper() + wcsnamekw = "".join(["WCSNAME", wcskey.upper()]).rstrip() + hdrnamekw = "".join(["HDRNAME", wcskey.upper()]).rstrip() + + wnames = altwcs.wcsnames(fobj, ext=wcsext) + if not wcsname: + # User did not specify a value for 'wcsname' + if wcsnamekw in fobj[wcsext].header: + #check if there's a WCSNAME for this wcskey in the header + wcsname = fobj[wcsext].header[wcsnamekw] + logger.info("Setting wcsname from header[%s] to %s" % (wcsnamekw, wcsname)) + else: + if hdrname not in ['', ' ', None, "INDEF"]: + """ + If wcsname for this wcskey was not provided + and WCSNAME<wcskey> does not exist in the header + and hdrname is provided, then + use hdrname as WCSNAME for the headerlet. + """ + wcsname = hdrname + logger.debug("Setting wcsname from hdrname to %s" % hdrname) + else: + if hdrnamekw in fobj[wcsext].header: + wcsname = fobj[wcsext].header[hdrnamekw] + logger.debug("Setting wcsname from header[%s] to %s" % (hdrnamekw, wcsname)) + else: + message = """ + Required keywords 'HDRNAME' or 'WCSNAME' not found! + Please specify a value for parameter 'hdrname' + or update header with 'WCSNAME' keyword. + """ + logger.critical(message) + raise KeyError + else: + # Verify that 'wcsname' and 'wcskey' values specified by user reference + # the same WCS + wname = fobj[wcsext].header[wcsnamekw] + if wcsname != wname: + message = "\tInconsistent values for 'wcskey' and 'wcsname' specified!\n" + message += " 'wcskey' = %s and 'wcsname' = %s. \n" % (wcskey, wcsname) + message += "Actual value of %s found to be %s. \n" % (wcsnamekw, wname) + logger.critical(message) + raise KeyError + wkeys = altwcs.wcskeys(fobj, ext=wcsext) + if wcskey != ' ': + if wcskey not in wkeys: + logger.critical('No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))) + raise ValueError("No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))") + + # get remaining required keywords + if destim is None: + if 'ROOTNAME' in fobj[0].header: + destim = fobj[0].header['ROOTNAME'] + logger.info("Setting destim to rootname of the science file") + else: + destim = fname + logger.info('DESTIM not provided') + logger.info('Keyword "ROOTNAME" not found') + logger.info('Using file name as DESTIM') + + if not hdrname: + # check if HDRNAME<wcskey> is in header + if hdrnamekw in fobj[wcsext].header: + hdrname = fobj[wcsext].header[hdrnamekw] + else: + if wcsnamekw in fobj[wcsext].header: + hdrname = fobj[wcsext].header[wcsnamekw] + message = """ + Using default value for HDRNAME of "%s" derived from %s. + """ % (hdrname, wcsnamekw) + logger.info(message) + logger.info("Setting hdrname to %s from header[%s]" + % (hdrname, wcsnamekw)) + else: + message = "Required keywords 'HDRNAME' or 'WCSNAME' not found" + logger.critical(message) + raise KeyError + + + + hdul = [] + phdu = _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname, + sipname, npolfile, d2imfile, + nmatch, catalog, wcskey, + author, descrip, history) + hdul.append(phdu) + wcsdvarr_extns = [] + """ + nd2i is a counter for d2i extensions to be used when the science file + has an old d2i correction format. The old format did not write EXTVER + kw for the d2i correction in the science header bu tthe new format expects + them. + """ + nd2i_extver = 1 + for ext in sciext: + wkeys = altwcs.wcskeys(fobj, ext=ext) + if wcskey != ' ': + if wcskey not in wkeys: + logger.debug( + 'No WCS with wcskey=%s found in extension %s. ' + 'Skipping...' % (wcskey, str(ext))) + raise ValueError("") + + hwcs = HSTWCS(fobj, ext=ext, wcskey=wcskey) + + whdul = hwcs.to_fits(relax=True, key=" ") + if hasattr(hwcs, 'orientat'): + orient_comment = "positions angle of image y axis (deg. e of n)" + whdul[0].header['ORIENTAT'] = (hwcs.orientat, orient_comment) + + whdul[0].header.append(('TG_ENAME', ext[0], 'Target science data extname')) + whdul[0].header.append(('TG_EVER', ext[1], 'Target science data extver')) + + if hwcs.wcs.has_cd(): + whdul[0].header = altwcs.pc2cd(whdul[0].header) + + idckw = hwcs._idc2hdr() + whdul[0].header.extend(idckw) + + if hwcs.det2im1 or hwcs.det2im2: + try: + whdul[0].header.append(fobj[ext].header.cards['D2IMEXT']) + except KeyError: + pass + whdul[0].header.extend(fobj[ext].header.cards['D2IMERR*']) + if 'D2IM1.EXTVER' in whdul[0].header: + try: + whdul[0].header['D2IM1.EXTVER'] = fobj[ext].header['D2IM1.EXTVER'] + except KeyError: + whdul[0].header['D2IM1.EXTVER'] = nd2i_extver + nd2i_extver += 1 + if 'D2IM2.EXTVER' in whdul[0].header: + try: + whdul[0].header['D2IM2.EXTVER'] = fobj[ext].header['D2IM2.EXTVER'] + except KeyError: + whdul[0].header['D2IM2.EXTVER'] = nd2i_extver + nd2i_extver += 1 + + if hwcs.cpdis1 or hwcs.cpdis2: + whdul[0].header.extend(fobj[ext].header.cards['CPERR*']) + try: + whdul[0].header.append(fobj[ext].header.cards['NPOLEXT']) + except KeyError: + pass + if 'DP1.EXTVER' in whdul[0].header: + whdul[0].header['DP1.EXTVER'] = fobj[ext].header['DP1.EXTVER'] + if 'DP2.EXTVER' in whdul[0].header: + whdul[0].header['DP2.EXTVER'] = fobj[ext].header['DP2.EXTVER'] + ihdu = fits.ImageHDU(header=whdul[0].header, name='SIPWCS') + + if ext[0] != "PRIMARY": + ihdu.update_ext_version(fobj[ext].header['EXTVER'], comment='Extension version') + + hdul.append(ihdu) + + if hwcs.cpdis1: + whdu = whdul[('WCSDVARR', 1)].copy() + whdu.update_ext_version(fobj[ext].header['DP1.EXTVER']) + hdul.append(whdu) + if hwcs.cpdis2: + whdu = whdul[('WCSDVARR', 2)].copy() + whdu.update_ext_version(fobj[ext].header['DP2.EXTVER']) + hdul.append(whdu) + + if hwcs.det2im1: + whdu = whdul[('D2IMARR', 1)].copy() + whdu.update_ext_version(ihdu.header['D2IM1.EXTVER']) + hdul.append(whdu) + if hwcs.det2im2: + whdu = whdul[('D2IMARR', 2)].copy() + whdu.update_ext_version(ihdu.header['D2IM2.EXTVER']) + hdul.append(whdu) + + + #if hwcs.det2im1 or hwcs.det2im2: + #try: + #darr = hdul[('D2IMARR', 1)] + #except KeyError: + #whdu = whdul[('D2IMARR')] + #whdu.update_ext_version(1) + #hdul.append(whdu) + if close_file: + fobj.close() + + hlet = Headerlet(hdul, logging=logging, logmode='a') + hlet.init_attrs() + return hlet + +@with_logging +def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True, + force=False, logging=False, logmode='a'): + """ + Apply headerlet 'hdrfile' to a science observation 'destfile' as the primary WCS + + Parameters + ---------- + filename: string or list of strings + File name(s) of science observation whose WCS solution will be updated + hdrlet: string or list of strings + Headerlet file(s), must match 1-to-1 with input filename(s) + attach: boolean + True (default): append headerlet to FITS file as a new extension. + archive: boolean + True (default): before updating, create a headerlet with the + WCS old solution. + force: boolean + If True, this will cause the headerlet to replace the current PRIMARY + WCS even if it has a different distortion model. [Default: False] + logging: boolean + enable file logging + logmode: 'w' or 'a' + log file open mode + """ + if not isinstance(filename, list): + filename = [filename] + if not isinstance(hdrlet, list): + hdrlet = [hdrlet] + if len(hdrlet) != len(filename): + logger.critical("Filenames must have matching headerlets. " + "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet))) + + for fname,h in zip(filename,hdrlet): + print("Applying {0} as Primary WCS to {1}".format(h,fname)) + hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode) + hlet.apply_as_primary(fname, attach=attach, archive=archive, + force=force) + + +@with_logging +def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None, + wcsname=None, logging=False, logmode='w'): + """ + Apply headerlet to a science observation as an alternate WCS + + Parameters + ---------- + filename: string or list of strings + File name(s) of science observation whose WCS solution will be updated + hdrlet: string or list of strings + Headerlet file(s), must match 1-to-1 with input filename(s) + attach: boolean + flag indicating if the headerlet should be attached as a + HeaderletHDU to fobj. If True checks that HDRNAME is unique + in the fobj and stops if not. + wcskey: string + Key value (A-Z, except O) for this alternate WCS + If None, the next available key will be used + wcsname: string + Name to be assigned to this alternate WCS + WCSNAME is a required keyword in a Headerlet but this allows the + user to change it as desired. + logging: boolean + enable file logging + logmode: 'a' or 'w' + """ + if not isinstance(filename, list): + filename = [filename] + if not isinstance(hdrlet, list): + hdrlet = [hdrlet] + if len(hdrlet) != len(filename): + logger.critical("Filenames must have matching headerlets. " + "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet))) + + for fname,h in zip(filename,hdrlet): + print('Applying {0} as an alternate WCS to {1}'.format(h,fname)) + hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode) + hlet.apply_as_alternate(fname, attach=attach, + wcsname=wcsname, wcskey=wcskey) + + +@with_logging +def attach_headerlet(filename, hdrlet, logging=False, logmode='a'): + """ + Attach Headerlet as an HeaderletHDU to a science file + + Parameters + ---------- + filename: HDUList or list of HDULists + science file(s) to which the headerlet should be applied + hdrlet: string, Headerlet object or list of strings or Headerlet objects + string representing a headerlet file(s), must match 1-to-1 input filename(s) + logging: boolean + enable file logging + logmode: 'a' or 'w' + """ + if not isinstance(filename, list): + filename = [filename] + if not isinstance(hdrlet, list): + hdrlet = [hdrlet] + if len(hdrlet) != len(filename): + logger.critical("Filenames must have matching headerlets. " + "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet))) + + for fname,h in zip(filename,hdrlet): + print('Attaching {0} as Headerlet extension to {1}'.format(h,fname)) + hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode) + hlet.attach_to_file(fname,archive=True) + + +@with_logging +def delete_headerlet(filename, hdrname=None, hdrext=None, distname=None, + logging=False, logmode='w'): + """ + Deletes HeaderletHDU(s) with same HDRNAME from science files + + Notes + ----- + One of hdrname, hdrext or distname should be given. + If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj. + If hdrext is given - delete HeaderletHDU in extension. + If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj. + Updates wcscorr + + Parameters + ---------- + filename: string, HDUList or list of strings + Filename can be specified as a single filename or HDUList, or + a list of filenames + Each input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + hdrname: string or None + HeaderletHDU primary header keyword HDRNAME + hdrext: int, tuple or None + HeaderletHDU FITS extension number + tuple has the form ('HDRLET', 1) + distname: string or None + distortion model as specified in the DISTNAME keyword + logging: boolean + enable file logging + logmode: 'a' or 'w' + """ + if not isinstance(filename, list): + filename = [filename] + + for f in filename: + print("Deleting Headerlet from ",f) + _delete_single_headerlet(f, hdrname=hdrname, hdrext=hdrext, + distname=distname, logging=logging, logmode='a') + +def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None, + logging=False, logmode='w'): + """ + Deletes HeaderletHDU(s) from a SINGLE science file + + Notes + ----- + One of hdrname, hdrext or distname should be given. + If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj. + If hdrext is given - delete HeaderletHDU in extension. + If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj. + Updates wcscorr + + Parameters + ---------- + filename: string or HDUList + Either a filename or PyFITS HDUList object for the input science file + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + hdrname: string or None + HeaderletHDU primary header keyword HDRNAME + hdrext: int, tuple or None + HeaderletHDU FITS extension number + tuple has the form ('HDRLET', 1) + distname: string or None + distortion model as specified in the DISTNAME keyword + logging: boolean + enable file logging + logmode: 'a' or 'w' + """ + hdrlet_ind = find_headerlet_HDUs(filename, hdrname=hdrname, hdrext=hdrext, + distname=distname, logging=logging, logmode='a') + if len(hdrlet_ind) == 0: + message = """ + No HDUs deleted... No Headerlet HDUs found with ' + hdrname = %s + hdrext = %s + distname = %s + Please review input parameters and try again. + """ % (hdrname, str(hdrext), distname) + logger.critical(message) + return + + fobj, fname, close_fobj = parse_filename(filename, mode='update') + + # delete row(s) from WCSCORR table now... + # + # + if hdrname not in ['', ' ', None, 'INDEF']: + selections = {'hdrname': hdrname} + elif hdrname in ['', ' ', None, 'INDEF'] and hdrext is not None: + selections = {'hdrname': fobj[hdrext].header['hdrname']} + else: + selections = {'distname': distname} + wcscorr.delete_wcscorr_row(fobj['WCSCORR'].data, selections) + + # delete the headerlet extension now + for hdrind in hdrlet_ind: + del fobj[hdrind] + + utils.updateNEXTENDKw(fobj) + # Update file object with changes + fobj.flush() + # close file, if was opened by this function + if close_fobj: + fobj.close() + logger.critical('Deleted headerlet from extension(s) %s ' % str(hdrlet_ind)) + + +def headerlet_summary(filename, columns=None, pad=2, maxwidth=None, + output=None, clobber=True, quiet=False): + """ + + Print a summary of all HeaderletHDUs in a science file to STDOUT, and + optionally to a text file + The summary includes: + HDRLET_ext_number HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE + + Parameters + ---------- + filename: string or HDUList + Either a filename or PyFITS HDUList object for the input science file + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + columns: list + List of headerlet PRIMARY header keywords to report in summary + By default (set to None), it will use the default set of keywords + defined as the global list DEFAULT_SUMMARY_COLS + pad: int + Number of padding spaces to put between printed columns + [Default: 2] + maxwidth: int + Maximum column width(not counting padding) for any column in summary + By default (set to None), each column's full width will be used + output: string (optional) + Name of optional output file to record summary. This filename + can contain environment variables. + [Default: None] + clobber: bool + If True, will overwrite any previous output file of same name + quiet: bool + If True, will NOT report info to STDOUT + + """ + if columns is None: + summary_cols = DEFAULT_SUMMARY_COLS + else: + summary_cols = columns + + summary_dict = {} + for kw in summary_cols: + summary_dict[kw] = copy.deepcopy(COLUMN_DICT) + + # Define Extension number column + extnums_col = copy.deepcopy(COLUMN_DICT) + extnums_col['name'] = 'EXTN' + extnums_col['width'] = 6 + + fobj, fname, close_fobj = parse_filename(filename) + # find all HDRLET extensions and combine info into a single summary + for extn in fobj: + if 'extname' in extn.header and extn.header['extname'] == 'HDRLET': + hdrlet_indx = fobj.index_of(('hdrlet', extn.header['extver'])) + try: + ext_cols, ext_summary = extn.headerlet.summary(columns=summary_cols) + extnums_col['vals'].append(hdrlet_indx) + for kw in summary_cols: + for key in COLUMN_DICT: + summary_dict[kw][key].extend(ext_summary[kw][key]) + except: + print("Skipping headerlet") + print("Could not read Headerlet from extension ", hdrlet_indx) + + if close_fobj: + fobj.close() + + # Print out the summary dictionary + print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth, + idcol=extnums_col, output=output, + clobber=clobber, quiet=quiet) + + +@with_logging +def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True, + force=False, logging=False, logmode='w'): + """ + Restores a headerlet as a primary WCS + + Parameters + ---------- + filename: string or HDUList + Either a filename or PyFITS HDUList object for the input science file + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + hdrname: string + HDRNAME keyword of HeaderletHDU + hdrext: int or tuple + Headerlet extension number of tuple ('HDRLET',2) + archive: boolean (default: True) + When the distortion model in the headerlet is the same as the distortion model of + the science file, this flag indicates if the primary WCS should be saved as an alternate + nd a headerlet extension. + When the distortion models do not match this flag indicates if the current primary and + alternate WCSs should be archived as headerlet extensions and alternate WCS. + force: boolean (default:False) + When the distortion models of the headerlet and the primary do not match, and archive + is False, this flag forces an update of the primary. + logging: boolean + enable file logging + logmode: 'a' or 'w' + """ + + hdrlet_ind = find_headerlet_HDUs(filename, hdrext=hdrext, hdrname=hdrname) + + fobj, fname, close_fobj = parse_filename(filename, mode='update') + + if len(hdrlet_ind) > 1: + if hdrext: + kwerr = 'hdrext' + kwval = hdrext + else: + kwerr = 'hdrname' + kwval = hdrname + message = """ + Multiple Headerlet extensions found with the same name. + %d Headerlets with "%s" = %s found in %s. + """% (len(hdrlet_ind), kwerr, kwval, fname) + if close_fobj: + fobj.close() + logger.critical(message) + raise ValueError + + hdrlet_indx = hdrlet_ind[0] + + # read headerlet from HeaderletHDU into memory + if hasattr(fobj[hdrlet_ind[0]], 'hdulist'): + hdrlet = fobj[hdrlet_indx].hdulist + else: + hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS + + # read in the names of the extensions which HeaderletHDU updates + extlist = [] + for ext in hdrlet: + if 'extname' in ext.header and ext.header['extname'] == 'SIPWCS': + # convert from string to tuple or int + sciext = eval(ext.header['sciext']) + extlist.append(fobj[sciext]) + # determine whether distortion is the same + current_distname = hdrlet[0].header['distname'] + same_dist = True + if current_distname != fobj[0].header['distname']: + same_dist = False + if not archive and not force: + if close_fobj: + fobj.close() + message = """ + Headerlet does not have the same distortion as image! + Set "archive"=True to save old distortion model, or + set "force"=True to overwrite old model with new. + """ + logger.critical(message) + raise ValueError + + # check whether primary WCS has been archived already + # Use information from first 'SCI' extension + priwcs_name = None + + scihdr = extlist[0].header + sci_wcsnames = altwcs.wcsnames(scihdr).values() + if 'hdrname' in scihdr: + priwcs_hdrname = scihdr['hdrname'] + else: + if 'wcsname' in scihdr: + priwcs_hdrname = priwcs_name = scihdr['wcsname'] + else: + if 'idctab' in scihdr: + priwcs_hdrname = ''.join(['IDC_', + utils.extract_rootname(scihdr['idctab'], suffix='_idc')]) + else: + priwcs_hdrname = 'UNKNOWN' + priwcs_name = priwcs_hdrname + scihdr['WCSNAME'] = priwcs_name + + priwcs_unique = verify_hdrname_is_unique(fobj, priwcs_hdrname) + if archive and priwcs_unique: + if priwcs_unique: + newhdrlet = create_headerlet(fobj, sciext=scihdr['extname'], + hdrname=priwcs_hdrname) + newhdrlet.attach_to_file(fobj) + # + # copy hdrlet as a primary + # + hdrlet.apply_as_primary(fobj, attach=False, archive=archive, force=force) + + utils.updateNEXTENDKw(fobj) + fobj.flush() + if close_fobj: + fobj.close() + + +@with_logging +def restore_all_with_distname(filename, distname, primary, archive=True, + sciext='SCI', logging=False, logmode='w'): + """ + Restores all HeaderletHDUs with a given distortion model as alternate WCSs and a primary + + Parameters + -------------- + filename: string or HDUList + Either a filename or PyFITS HDUList object for the input science file + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + distname: string + distortion model as represented by a DISTNAME keyword + primary: int or string or None + HeaderletHDU to be restored as primary + if int - a fits extension + if string - HDRNAME + if None - use first HeaderletHDU + archive: boolean (default True) + flag indicating if HeaderletHDUs should be created from the + primary and alternate WCSs in fname before restoring all matching + headerlet extensions + logging: boolean + enable file logging + logmode: 'a' or 'w' + """ + + fobj, fname, close_fobj = parse_filename(filename, mode='update') + + hdrlet_ind = find_headerlet_HDUs(fobj, distname=distname) + if len(hdrlet_ind) == 0: + message = """ + No Headerlet extensions found with + + DISTNAME = %s in %s. + + For a full list of DISTNAMEs found in all headerlet extensions: + + get_headerlet_kw_names(fobj, kw='DISTNAME') + """ % (distname, fname) + if close_fobj: + fobj.close() + logger.critical(message) + raise ValueError + + # Interpret 'primary' parameter input into extension number + if primary is None: + primary_ind = hdrlet_ind[0] + elif isinstance(primary, int): + primary_ind = primary + else: + primary_ind = None + for ind in hdrlet_ind: + if fobj[ind].header['hdrname'] == primary: + primary_ind = ind + break + if primary_ind is None: + if close_fobj: + fobj.close() + message = """ + No Headerlet extensions found with DISTNAME = %s in %s. + """ % (primary, fname) + logger.critical(message) + raise ValueError + # Check to see whether 'primary' HeaderletHDU has same distname as user + # specified on input + + # read headerlet from HeaderletHDU into memory + if hasattr(fobj[primary_ind], 'hdulist'): + primary_hdrlet = fobj[primary_ind].hdulist + else: + primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS + pri_distname = primary_hdrlet[0].header['distname'] + if pri_distname != distname: + if close_fobj: + fobj.close() + message = """ + Headerlet extension to be used as PRIMARY WCS + has "DISTNAME" = %s + "DISTNAME" = %s was specified on input. + All updated WCSs must have same DISTNAME. Quitting...' + """ % (pri_distname, distname) + logger.critical(message) + raise ValueError + + # read in the names of the WCSs which the HeaderletHDUs will update + wnames = altwcs.wcsnames(fobj[sciext, 1].header) + + # work out how many HeaderletHDUs will be used to update the WCSs + numhlt = len(hdrlet_ind) + hdrnames = get_headerlet_kw_names(fobj, kw='wcsname') + + # read in headerletHDUs and update WCS keywords + for hlet in hdrlet_ind: + if fobj[hlet].header['distname'] == distname: + if hasattr(fobj[hlet], 'hdulist'): + hdrlet = fobj[hlet].hdulist + else: + hdrlet = fobj[hlet].headerlet # older convention in PyFITS + if hlet == primary_ind: + hdrlet.apply_as_primary(fobj, attach=False, + archive=archive, force=True) + else: + hdrlet.apply_as_alternate(fobj, attach=False, + wcsname=hdrlet[0].header['wcsname']) + + utils.updateNEXTENDKw(fobj) + fobj.flush() + if close_fobj: + fobj.close() + + +@with_logging +def archive_as_headerlet(filename, hdrname, sciext='SCI', + wcsname=None, wcskey=None, destim=None, + sipname=None, npolfile=None, d2imfile=None, + author=None, descrip=None, history=None, + nmatch=None, catalog=None, + logging=False, logmode='w'): + """ + Save a WCS as a headerlet extension and write it out to a file. + + This function will create a headerlet, attach it as an extension to the + science image (if it has not already been archived) then, optionally, + write out the headerlet to a separate headerlet file. + + Either wcsname or wcskey must be provided, if both are given, they must match a valid WCS + Updates wcscorr if necessary. + + Parameters + ---------- + filename: string or HDUList + Either a filename or PyFITS HDUList object for the input science file + An input filename (str) will be expanded as necessary to interpret + any environmental variables included in the filename. + hdrname: string + Unique name for this headerlet, stored as HDRNAME keyword + sciext: string + name (EXTNAME) of extension that contains WCS to be saved + wcsname: string + name of WCS to be archived, if " ": stop + wcskey: one of A...Z or " " or "PRIMARY" + if " " or "PRIMARY" - archive the primary WCS + destim: string + DESTIM keyword + if NOne, use ROOTNAME or science file name + sipname: string or None (default) + Name of unique file where the polynomial distortion coefficients were + read from. If None, the behavior is: + The code looks for a keyword 'SIPNAME' in the science header + If not found, for HST it defaults to 'IDCTAB' + If there is no SIP model the value is 'NOMODEL' + If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN' + npolfile: string or None (default) + Name of a unique file where the non-polynomial distortion was stored. + If None: + The code looks for 'NPOLFILE' in science header. + If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL' + If npol model exists, it is set to 'UNKNOWN' + d2imfile: string + Name of a unique file where the detector to image correction was + stored. If None: + The code looks for 'D2IMFILE' in the science header. + If 'D2IMFILE' is not found and there is no d2im correction, + it is set to 'NOMODEL' + If d2im correction exists, but 'D2IMFILE' is missing from science + header, it is set to 'UNKNOWN' + author: string + Name of user who created the headerlet, added as 'AUTHOR' keyword + to headerlet PRIMARY header + descrip: string + Short description of the solution provided by the headerlet + This description will be added as the single 'DESCRIP' keyword + to the headerlet PRIMARY header + history: filename, string or list of strings + Long (possibly multi-line) description of the solution provided + by the headerlet. These comments will be added as 'HISTORY' cards + to the headerlet PRIMARY header + If filename is specified, it will format and attach all text from + that file as the history. + logging: boolean + enable file folling + logmode: 'w' or 'a' + log file open mode + """ + + fobj, fname, close_fobj = parse_filename(filename, mode='update') + + if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None: + message = """ + No valid WCS found found in %s. + A valid value for either "wcsname" or "wcskey" + needs to be specified. + """ % fname + if close_fobj: + fobj.close() + logger.critical(message) + raise ValueError + + # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' ' + if wcskey == 'PRIMARY': + wcskey = ' ' + wcskey = wcskey.upper() + + numhlt = countExtn(fobj, 'HDRLET') + + if wcsname is None: + scihdr = fobj[sciext, 1].header + wcsname = scihdr['wcsname'+wcskey] + + if hdrname in [None, ' ', '']: + hdrname = wcsname + + # Check to see whether or not a HeaderletHDU with this hdrname already + # exists + hdrnames = get_headerlet_kw_names(fobj) + if hdrname not in hdrnames: + hdrletobj = create_headerlet(fobj, sciext=sciext, + wcsname=wcsname, wcskey=wcskey, + hdrname=hdrname, + sipname=sipname, npolfile=npolfile, + d2imfile=d2imfile, author=author, + descrip=descrip, history=history, + nmatch=nmatch, catalog=catalog, + logging=False) + hlt_hdu = HeaderletHDU.fromheaderlet(hdrletobj) + + if destim is not None: + hlt_hdu[0].header['destim'] = destim + + fobj.append(hlt_hdu) + + utils.updateNEXTENDKw(fobj) + fobj.flush() + else: + message = """ + Headerlet with hdrname %s already archived for WCS %s + No new headerlet appended to %s . + """ % (hdrname, wcsname, fname) + logger.critical(message) + + if close_fobj: + fobj.close() + +#### Headerlet Class definitions +class Headerlet(fits.HDUList): + """ + A Headerlet class + Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets + """ + + def __init__(self, hdus=[], file=None, logging=False, logmode='w'): + """ + Parameters + ---------- + hdus : list + List of HDUs to be used to create the headerlet object itself + file: string + File-like object from which HDUs should be read + logging: boolean + enable file logging + logmode: 'w' or 'a' + for internal use only, indicates whether the log file + should be open in attach or write mode + """ + self.logging = logging + init_logging('class Headerlet', level=logging, mode=logmode) + + super(Headerlet, self).__init__(hdus, file=file) + + def init_attrs(self): + self.fname = self.filename() + self.hdrname = self[0].header["HDRNAME"] + self.wcsname = self[0].header["WCSNAME"] + self.upwcsver = self[0].header.get("UPWCSVER", "") + self.pywcsver = self[0].header.get("PYWCSVER", "") + self.destim = self[0].header["DESTIM"] + self.sipname = self[0].header["SIPNAME"] + self.idctab = self[0].header["IDCTAB"] + self.npolfile = self[0].header["NPOLFILE"] + self.d2imfile = self[0].header["D2IMFILE"] + self.distname = self[0].header["DISTNAME"] + + try: + self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) #None instead of 1? + except (IndexError, KeyError): + self.vafactor = self[0].header.get("VAFACTOR", 1) #None instead of 1? + self.author = self[0].header["AUTHOR"] + self.descrip = self[0].header["DESCRIP"] + + self.fit_kws = ['HDRNAME', 'NMATCH', 'CATALOG'] + self.history = '' + # header['HISTORY'] returns an iterable of all HISTORY values + if 'HISTORY' in self[0].header: + for hist in self[0].header['HISTORY']: + self.history += hist + '\n' + + self.d2imerr = 0 + self.axiscorr = 1 + + # Overridden to support the Headerlet logging features + @classmethod + def fromfile(cls, fileobj, mode='readonly', memmap=False, + save_backup=False, logging=False, logmode='w', **kwargs): + hlet = super(cls, cls).fromfile(fileobj, mode, memmap, save_backup, + **kwargs) + if len(hlet) > 0: + hlet.init_attrs() + hlet.logging = logging + init_logging('class Headerlet', level=logging, mode=logmode) + return hlet + + @classmethod + def fromstring(cls, data, **kwargs): + hlet = super(cls, cls).fromstring(data, **kwargs) + hlet.logging = logging + init_logging('class Headerlet', level=logging, mode=logmode) + return hlet + + def apply_as_primary(self, fobj, attach=True, archive=True, force=False): + """ + Copy this headerlet as a primary WCS to fobj + + Parameters + ---------- + fobj: string, HDUList + science file to which the headerlet should be applied + attach: boolean + flag indicating if the headerlet should be attached as a + HeaderletHDU to fobj. If True checks that HDRNAME is unique + in the fobj and stops if not. + archive: boolean (default is True) + When the distortion model in the headerlet is the same as the + distortion model of the science file, this flag indicates if + the primary WCS should be saved as an alternate and a headerlet + extension. + When the distortion models do not match this flag indicates if + the current primary and alternate WCSs should be archived as + headerlet extensions and alternate WCS. + force: boolean (default is False) + When the distortion models of the headerlet and the primary do + not match, and archive is False this flag forces an update + of the primary + """ + self.hverify() + fobj, fname, close_dest = parse_filename(fobj, mode='update') + if not self.verify_dest(fobj, fname): + if close_dest: + fobj.close() + raise ValueError("Destination name does not match headerlet" + "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname)) + + # Check to see whether the distortion model in the destination + # matches the distortion model in the headerlet being applied + + dname = self.get_destination_model(fobj) + dist_models_equal = self.equal_distmodel(dname) + if not dist_models_equal and not force: + raise ValueError("Distortion models do not match" + " To overwrite the distortion model, set force=True") + + orig_hlt_hdu = None + numhlt = countExtn(fobj, 'HDRLET') + hdrlet_extnames = get_headerlet_kw_names(fobj) + + # Insure that WCSCORR table has been created with all original + # WCS's recorded prior to adding the headerlet WCS + wcscorr.init_wcscorr(fobj) + + + ### start archive + # If archive has been specified + # regardless of whether or not the distortion models are equal... + + numsip = countExtn(self, 'SIPWCS') + sciext_list = [] + alt_hlethdu = [] + for i in range(1, numsip+1): + sipheader = self[('SIPWCS', i)].header + sciext_list.append((sipheader['TG_ENAME'], sipheader['TG_EVER'])) + target_ext = sciext_list[0] + if archive: + if 'wcsname' in fobj[target_ext].header: + hdrname = fobj[target_ext].header['WCSNAME'] + wcsname = hdrname + else: + hdrname = fobj[0].header['ROOTNAME'] + '_orig' + wcsname = None + if hdrname not in hdrlet_extnames: + # - if WCS has not been saved, write out WCS as headerlet extension + # Create a headerlet for the original Primary WCS data in the file, + # create an HDU from the original headerlet, and append it to + # the file + orig_hlt = create_headerlet(fobj, sciext=sciext_list, #[target_ext], + wcsname=wcsname, + hdrname=hdrname, + logging=self.logging) + orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt) + numhlt += 1 + orig_hlt_hdu.header['EXTVER'] = numhlt + logger.info("Created headerlet %s to be attached to file" % hdrname) + else: + logger.info("Headerlet with name %s is already attached" % hdrname) + + + if dist_models_equal: + # Use the WCSNAME to determine whether or not to archive + # Primary WCS as altwcs + # wcsname = hwcs.wcs.name + scihdr = fobj[target_ext].header + if 'hdrname' in scihdr: + priwcs_name = scihdr['hdrname'] + else: + if 'wcsname' in scihdr: + priwcs_name = scihdr['wcsname'] + else: + if 'idctab' in scihdr: + priwcs_name = ''.join(['IDC_', + utils.extract_rootname(scihdr['idctab'], + suffix='_idc')]) + else: + priwcs_name = 'UNKNOWN' + nextkey = altwcs.next_wcskey(fobj, ext=target_ext) + altwcs.archiveWCS(fobj, ext=sciext_list, wcskey=nextkey, + wcsname=priwcs_name) + else: + + for hname in altwcs.wcsnames(fobj, ext=target_ext).values(): + if hname != 'OPUS' and hname not in hdrlet_extnames: + # get HeaderletHDU for alternate WCS as well + alt_hlet = create_headerlet(fobj, sciext=sciext_list, + wcsname=hname, wcskey=wcskey, + hdrname=hname, sipname=None, + npolfile=None, d2imfile=None, + author=None, descrip=None, history=None, + logging=self.logging) + numhlt += 1 + alt_hlet_hdu = HeaderletHDU.fromheaderlet(alt_hlet) + alt_hlet_hdu.header['EXTVER'] = numhlt + alt_hlethdu.append(alt_hlet_hdu) + hdrlet_extnames.append(hname) + + self._del_dest_WCS_ext(fobj) + for i in range(1, numsip+1): + target_ext = sciext_list[i-1] + self._del_dest_WCS(fobj, target_ext) + sipwcs = HSTWCS(self, ('SIPWCS', i)) + idckw = sipwcs._idc2hdr() + priwcs = sipwcs.to_fits(relax=True) + numnpol = 1 + numd2im = 1 + if sipwcs.wcs.has_cd(): + priwcs[0].header = altwcs.pc2cd(priwcs[0].header) + priwcs[0].header.extend(idckw) + if 'crder1' in sipheader: + for card in sipheader['crder*'].cards: + priwcs[0].header.set(card.keyword, card.value, card.comment, + after='WCSNAME') + # Update WCS with HDRNAME as well + + for kw in ['SIMPLE', 'BITPIX', 'NAXIS', 'EXTEND']: + try: + priwcs[0].header.remove(kw) + except ValueError: + pass + + priwcs[0].header.set('WCSNAME', self[0].header['WCSNAME'], "") + priwcs[0].header.set('WCSAXES', self[('SIPWCS', i)].header['WCSAXES'], "") + priwcs[0].header.set('HDRNAME', self[0].header['HDRNAME'], "") + if sipwcs.det2im1 or sipwcs.det2im2: + try: + d2imerr = self[('SIPWCS', i)].header['D2IMERR*'] + priwcs[0].header.extend(d2imerr) + except KeyError: + pass + try: + priwcs[0].header.append(self[('SIPWCS', i)].header.cards['D2IMEXT']) + except KeyError: + pass + if 'D2IM1.EXTVER' in priwcs[0].header: + priwcs[0].header['D2IM1.EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER'] + priwcs[('D2IMARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER'] + if 'D2IM2.EXTVER' in priwcs[0].header: + priwcs[0].header['D2IM2.EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER'] + priwcs[('D2IMARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER'] + # D2IM1 will NOT exist for WFPC2 data... + if 'D2IM1.EXTVER' in priwcs[0].header: + # only set number of D2IM extensions to 2 if D2IM1 exists + numd2im = 2 + + if sipwcs.cpdis1 or sipwcs.cpdis2: + try: + cperr = self[('SIPWCS', i)].header['CPERR*'] + priwcs[0].header.extend(cperr) + except KeyError: + pass + try: + priwcs[0].header.append(self[('SIPWCS', i)].header.cards['NPOLEXT']) + except KeyError: + pass + if 'DP1.EXTVER' in priwcs[0].header: + priwcs[0].header['DP1.EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER'] + priwcs[('WCSDVARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER'] + if 'DP2.EXTVER' in priwcs[0].header: + priwcs[0].header['DP2.EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER'] + priwcs[('WCSDVARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER'] + numnpol = 2 + + fobj[target_ext].header.extend(priwcs[0].header) + if sipwcs.cpdis1: + whdu = priwcs[('WCSDVARR', (i-1)*numnpol+1)].copy() + whdu.update_ext_version(self[('SIPWCS', i)].header['DP1.EXTVER']) + fobj.append(whdu) + if sipwcs.cpdis2: + whdu = priwcs[('WCSDVARR', i*numnpol)].copy() + whdu.update_ext_version(self[('SIPWCS', i)].header['DP2.EXTVER']) + fobj.append(whdu) + if sipwcs.det2im1: #or sipwcs.det2im2: + whdu = priwcs[('D2IMARR', (i-1)*numd2im+1)].copy() + whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM1.EXTVER']) + fobj.append(whdu) + if sipwcs.det2im2: + whdu = priwcs[('D2IMARR', i*numd2im)].copy() + whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM2.EXTVER']) + fobj.append(whdu) + + update_versions(self[0].header, fobj[0].header) + refs = update_ref_files(self[0].header, fobj[0].header) + # Update the WCSCORR table with new rows from the headerlet's WCSs + wcscorr.update_wcscorr(fobj, self, 'SIPWCS') + + # Append the original headerlet + if archive and orig_hlt_hdu: + fobj.append(orig_hlt_hdu) + # Append any alternate WCS Headerlets + if len(alt_hlethdu) > 0: + for ahdu in alt_hlethdu: + fobj.append(ahdu) + if attach: + # Finally, append an HDU for this headerlet + self.attach_to_file(fobj) + utils.updateNEXTENDKw(fobj) + if close_dest: + fobj.close() + + + def apply_as_alternate(self, fobj, attach=True, wcskey=None, wcsname=None): + """ + Copy this headerlet as an alternate WCS to fobj + + Parameters + ---------- + fobj: string, HDUList + science file/HDUList to which the headerlet should be applied + attach: boolean + flag indicating if the headerlet should be attached as a + HeaderletHDU to fobj. If True checks that HDRNAME is unique + in the fobj and stops if not. + wcskey: string + Key value (A-Z, except O) for this alternate WCS + If None, the next available key will be used + wcsname: string + Name to be assigned to this alternate WCS + WCSNAME is a required keyword in a Headerlet but this allows the + user to change it as desired. + + """ + self.hverify() + fobj, fname, close_dest = parse_filename(fobj, mode='update') + if not self.verify_dest(fobj, fname): + if close_dest: + fobj.close() + raise ValueError("Destination name does not match headerlet" + "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname)) + + # Verify whether this headerlet has the same distortion + #found in the image being updated + dname = self.get_destination_model(fobj) + dist_models_equal = self.equal_distmodel(dname) + if not dist_models_equal: + raise ValueError("Distortion models do not match \n" + "Headerlet: %s \n" + "Destination file: %s\n" + "attach_to_file() can be used to append this headerlet" %(self.distname, dname)) + + # Insure that WCSCORR table has been created with all original + # WCS's recorded prior to adding the headerlet WCS + wcscorr.init_wcscorr(fobj) + + # determine value of WCSNAME to be used + if wcsname is not None: + wname = wcsname + else: + wname = self[0].header['WCSNAME'] + tg_ename = self[('SIPWCS', 1)].header['TG_ENAME'] + tg_ever = self[('SIPWCS', 1)].header['TG_EVER'] + # determine what alternate WCS this headerlet will be assigned to + if wcskey is None: + wkey = altwcs.next_wcskey(fobj[(tg_ename, tg_ever)].header) + else: + wcskey = wcskey.upper() + available_keys = altwcs.available_wcskeys(fobj[(tg_ename, tg_ever)].header) + if wcskey in available_keys: + wkey = wcskey + else: + mess = "Observation %s already contains alternate WCS with key %s" % (fname, wcskey) + logger.critical(mess) + if close_dest: + fobj.close() + raise ValueError(mess) + numsip = countExtn(self, 'SIPWCS') + for idx in range(1, numsip + 1): + siphdr = self[('SIPWCS', idx)].header + tg_ext = (siphdr['TG_ENAME'], siphdr['TG_EVER']) + + fhdr = fobj[tg_ext].header + hwcs = pywcs.WCS(siphdr, self) + hwcs_header = hwcs.to_header(key=wkey) + _idc2hdr(siphdr, fhdr, towkey=wkey) + if hwcs.wcs.has_cd(): + hwcs_header = altwcs.pc2cd(hwcs_header, key=wkey) + + fhdr.extend(hwcs_header) + fhdr['WCSNAME' + wkey] = wname + # also update with HDRNAME (a non-WCS-standard kw) + for kw in self.fit_kws: + #fhdr.insert(wind, pyfits.Card(kw + wkey, + # self[0].header[kw])) + fhdr.append(fits.Card(kw + wkey, self[0].header[kw])) + # Update the WCSCORR table with new rows from the headerlet's WCSs + wcscorr.update_wcscorr(fobj, self, 'SIPWCS') + + if attach: + self.attach_to_file(fobj) + utils.updateNEXTENDKw(fobj) + + if close_dest: + fobj.close() + + def attach_to_file(self, fobj, archive=False): + """ + Attach Headerlet as an HeaderletHDU to a science file + + Parameters + ---------- + fobj: string, HDUList + science file/HDUList to which the headerlet should be applied + archive: string + Specifies whether or not to update WCSCORR table when attaching + + Notes + ----- + The algorithm used by this method: + - verify headerlet can be applied to this file (based on DESTIM) + - verify that HDRNAME is unique for this file + - attach as HeaderletHDU to fobj + + """ + self.hverify() + fobj, fname, close_dest = parse_filename(fobj, mode='update') + destver = self.verify_dest(fobj, fname) + hdrver = self.verify_hdrname(fobj) + if destver and hdrver: + + numhlt = countExtn(fobj, 'HDRLET') + new_hlt = HeaderletHDU.fromheaderlet(self) + new_hlt.header['extver'] = numhlt + 1 + fobj.append(new_hlt) + utils.updateNEXTENDKw(fobj) + else: + message = "Headerlet %s cannot be attached to" % (self.hdrname) + message += "observation %s" % (fname) + if not destver: + message += " * Image %s keyword ROOTNAME not equal to " % (fname) + message += " DESTIM = '%s'\n" % (self.destim) + if not hdrver: + message += " * Image %s already has headerlet " % (fname) + message += "with HDRNAME='%s'\n" % (self.hdrname) + logger.critical(message) + if close_dest: + fobj.close() + + def info(self, columns=None, pad=2, maxwidth=None, + output=None, clobber=True, quiet=False): + """ + Prints a summary of this headerlet + The summary includes: + HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE + + Parameters + ---------- + columns: list + List of headerlet PRIMARY header keywords to report in summary + By default (set to None), it will use the default set of keywords + defined as the global list DEFAULT_SUMMARY_COLS + pad: int + Number of padding spaces to put between printed columns + [Default: 2] + maxwidth: int + Maximum column width(not counting padding) for any column in summary + By default (set to None), each column's full width will be used + output: string (optional) + Name of optional output file to record summary. This filename + can contain environment variables. + [Default: None] + clobber: bool + If True, will overwrite any previous output file of same name + quiet: bool + If True, will NOT report info to STDOUT + + """ + summary_cols, summary_dict = self.summary(columns=columns) + print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth, + idcol=None, output=output, clobber=clobber, quiet=quiet) + + def summary(self, columns=None): + """ + Returns a summary of this headerlet as a dictionary + + The summary includes a summary of the distortion model as : + HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE + + Parameters + ---------- + columns: list + List of headerlet PRIMARY header keywords to report in summary + By default(set to None), it will use the default set of keywords + defined as the global list DEFAULT_SUMMARY_COLS + + Returns + ------- + summary: dict + Dictionary of values for summary + """ + if columns is None: + summary_cols = DEFAULT_SUMMARY_COLS + else: + summary_cols = columns + + # Initialize summary dict based on requested columns + summary = {} + for kw in summary_cols: + summary[kw] = copy.deepcopy(COLUMN_DICT) + + # Populate the summary with headerlet values + for kw in summary_cols: + if kw in self[0].header: + val = self[0].header[kw] + else: + val = 'INDEF' + summary[kw]['vals'].append(val) + summary[kw]['width'].append(max(len(val), len(kw))) + + return summary_cols, summary + + def hverify(self): + """ + Verify the headerlet file is a valid fits file and has + the required Primary Header keywords + """ + self.verify() + header = self[0].header + assert('DESTIM' in header and header['DESTIM'].strip()) + assert('HDRNAME' in header and header['HDRNAME'].strip()) + assert('UPWCSVER' in header) + + def verify_hdrname(self, dest): + """ + Verifies that the headerlet can be applied to the observation + + Reports whether or not this file already has a headerlet with this + HDRNAME. + """ + unique = verify_hdrname_is_unique(dest, self.hdrname) + logger.debug("verify_hdrname() returned %s"%unique) + return unique + + def get_destination_model(self, dest): + """ + Verifies that the headerlet can be applied to the observation + + Determines whether or not the file specifies the same distortion + model/reference files. + """ + destim_opened = False + if not isinstance(dest, fits.HDUList): + destim = fits.open(dest) + destim_opened = True + else: + destim = dest + dname = destim[0].header['DISTNAME'] if 'distname' in destim[0].header \ + else self.build_distname(dest) + if destim_opened: + destim.close() + return dname + + def equal_distmodel(self, dmodel): + if dmodel != self[0].header['DISTNAME']: + if self.logging: + message = """ + Distortion model in headerlet not the same as destination model + Headerlet model : %s + Destination model: %s + """ % (self[0].header['DISTNAME'], dmodel) + logger.critical(message) + return False + else: + return True + + def verify_dest(self, dest, fname): + """ + verifies that the headerlet can be applied to the observation + + DESTIM in the primary header of the headerlet must match ROOTNAME + of the science file (or the name of the destination file) + """ + try: + if not isinstance(dest, fits.HDUList): + droot = fits.getval(dest, 'ROOTNAME') + else: + droot = dest[0].header['ROOTNAME'] + except KeyError: + logger.debug("Keyword 'ROOTNAME' not found in destination file") + droot = dest.split('.fits')[0] + if droot == self.destim: + logger.debug("verify_destim() returned True") + return True + else: + logger.debug("verify_destim() returned False") + logger.critical("Destination name does not match headerlet. " + "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname)) + return False + + def build_distname(self, dest): + """ + Builds the DISTNAME for dest based on reference file names. + """ + + try: + npolfile = dest[0].header['NPOLFILE'] + except KeyError: + npolfile = None + try: + d2imfile = dest[0].header['D2IMFILE'] + except KeyError: + d2imfile = None + + sipname, idctab = utils.build_sipname(dest, dest, None) + npolname, npolfile = utils.build_npolname(dest, npolfile) + d2imname, d2imfile = utils.build_d2imname(dest, d2imfile) + dname = utils.build_distname(sipname,npolname,d2imname) + return dname + + def tofile(self, fname, destim=None, hdrname=None, clobber=False): + """ + Write this headerlet to a file + + Parameters + ---------- + fname: string + file name + destim: string (optional) + provide a value for DESTIM keyword + hdrname: string (optional) + provide a value for HDRNAME keyword + clobber: boolean + a flag which allows to overwrte an existing file + """ + if not destim or not hdrname: + self.hverify() + self.writeto(fname, clobber=clobber) + + def _del_dest_WCS(self, dest, ext=None): + """ + Delete the WCS of a science file extension + """ + + logger.info("Deleting all WCSs of file %s" % dest.filename()) + numext = len(dest) + + if ext: + fext = dest[ext] + self._remove_d2im(fext) + self._remove_sip(fext) + self._remove_lut(fext) + self._remove_primary_WCS(fext) + self._remove_idc_coeffs(fext) + self._remove_fit_values(fext) + else: + for idx in range(numext): + # Only delete WCS from extensions which may have WCS keywords + if ('XTENSION' in dest[idx].header and + dest[idx].header['XTENSION'] == 'IMAGE'): + self._remove_d2im(dest[idx]) + self._remove_sip(dest[idx]) + self._remove_lut(dest[idx]) + self._remove_primary_WCS(dest[idx]) + self._remove_idc_coeffs(dest[idx]) + self._remove_fit_values(dest[idx]) + self._remove_ref_files(dest[0]) + """ + if not ext: + self._remove_alt_WCS(dest, ext=range(numext)) + else: + self._remove_alt_WCS(dest, ext=ext) + """ + def _del_dest_WCS_ext(self, dest): + numwdvarr = countExtn(dest, 'WCSDVARR') + numd2im = countExtn(dest, 'D2IMARR') + if numwdvarr > 0: + for idx in range(1, numwdvarr + 1): + del dest[('WCSDVARR', idx)] + if numd2im > 0: + for idx in range(1, numd2im + 1): + del dest[('D2IMARR', idx)] + + def _remove_ref_files(self, phdu): + """ + phdu: Primary HDU + """ + refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE', 'SIPNAME', 'DISTNAME'] + for kw in refkw: + try: + del phdu.header[kw] + except KeyError: + pass + + def _remove_fit_values(self, ext): + """ + Remove the any existing astrometric fit values from a FITS extension + """ + + logger.debug("Removing astrometric fit values from (%s, %s)"% + (ext.name, ext.ver)) + dkeys = altwcs.wcskeys(ext.header) + if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values + for fitkw in ['NMATCH', 'CATALOG']: + for k in dkeys: + fkw = (fitkw+k).rstrip() + if fkw in ext.header: + del ext.header[fkw] + + def _remove_sip(self, ext): + """ + Remove the SIP distortion of a FITS extension + """ + + logger.debug("Removing SIP distortion from (%s, %s)" + % (ext.name, ext.ver)) + for prefix in ['A', 'B', 'AP', 'BP']: + try: + order = ext.header[prefix + '_ORDER'] + del ext.header[prefix + '_ORDER'] + except KeyError: + continue + for i in range(order + 1): + for j in range(order + 1): + key = prefix + '_%d_%d' % (i, j) + try: + del ext.header[key] + except KeyError: + pass + try: + del ext.header['IDCTAB'] + except KeyError: + pass + + def _remove_lut(self, ext): + """ + Remove the Lookup Table distortion of a FITS extension + """ + + logger.debug("Removing LUT distortion from (%s, %s)" + % (ext.name, ext.ver)) + try: + cpdis = ext.header['CPDIS*'] + except KeyError: + return + try: + for c in range(1, len(cpdis) + 1): + del ext.header['DP%s*...' % c] + del ext.header[cpdis.cards[c - 1].keyword] + del ext.header['CPERR*'] + del ext.header['NPOLFILE'] + del ext.header['NPOLEXT'] + except KeyError: + pass + + def _remove_d2im(self, ext): + """ + Remove the Detector to Image correction of a FITS extension + """ + + logger.debug("Removing D2IM correction from (%s, %s)" + % (ext.name, ext.ver)) + try: + d2imdis = ext.header['D2IMDIS*'] + except KeyError: + return + try: + for c in range(1, len(d2imdis) + 1): + del ext.header['D2IM%s*...' % c] + del ext.header[d2imdis.cards[c - 1].keyword] + del ext.header['D2IMERR*'] + del ext.header['D2IMFILE'] + del ext.header['D2IMEXT'] + except KeyError: + pass + + def _remove_alt_WCS(self, dest, ext): + """ + Remove Alternate WCSs of a FITS extension. + A WCS with wcskey 'O' is never deleted. + """ + dkeys = altwcs.wcskeys(dest[('SCI', 1)].header) + for val in ['O', '', ' ']: + if val in dkeys: + dkeys.remove(val) # Never delete WCS with wcskey='O' + + logger.debug("Removing alternate WCSs with keys %s from %s" + % (dkeys, dest.filename())) + for k in dkeys: + altwcs.deleteWCS(dest, ext=ext, wcskey=k) + + def _remove_primary_WCS(self, ext): + """ + Remove the primary WCS of a FITS extension + """ + + logger.debug("Removing Primary WCS from (%s, %s)" + % (ext.name, ext.ver)) + naxis = ext.header['NAXIS'] + for key in basic_wcs: + for i in range(1, naxis + 1): + try: + del ext.header[key + str(i)] + except KeyError: + pass + try: + del ext.header['WCSAXES'] + except KeyError: + pass + try: + del ext.header['WCSNAME'] + except KeyError: + pass + + def _remove_idc_coeffs(self, ext): + """ + Remove IDC coefficients of a FITS extension + """ + + logger.debug("Removing IDC coefficient from (%s, %s)" + % (ext.name, ext.ver)) + coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE'] + for k in coeffs: + try: + del ext.header[k] + except KeyError: + pass + +@with_logging +def _idc2hdr(fromhdr, tohdr, towkey=' '): + """ + Copy the IDC (HST specific) keywords from one header to another + + """ + # save some of the idc coefficients + coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE'] + for c in coeffs: + try: + tohdr[c+towkey] = fromhdr[c] + logger.debug("Copied %s to header") + except KeyError: + continue + + +def get_extname_extver_list(fobj, sciext): + """ + Create a list of (EXTNAME, EXTVER) tuples + + Based on sciext keyword (see docstring for create_headerlet) + walk throughh the file and convert extensions in `sciext` to + valid (EXTNAME, EXTVER) tuples. + """ + extlist = [] + if isinstance(sciext, int): + if sciext == 0: + extname = 'PRIMARY' + extver = 1 + else: + try: + extname = fobj[sciext].header['EXTNAME'] + except KeyError: + extname = "" + try: + extver = fobj[sciext].header['EXTVER'] + except KeyError: + extver = 1 + extlist.append((extname, extver)) + elif isinstance(sciext, str): + if sciext == 'PRIMARY': + extname = "PRIMARY" + extver = 1 + extlist.append((extname, extver)) + else: + for ext in fobj: + try: + extname = ext.header['EXTNAME'] + except KeyError: + continue + if extname.upper() == sciext.upper(): + try: + extver = ext.header['EXTVER'] + except KeyError: + extver = 1 + extlist.append((extname, extver)) + elif isinstance(sciext, list): + if isinstance(sciext[0], int): + for i in sciext: + try: + extname = fobj[i].header['EXTNAME'] + except KeyError: + if i == 0: + extname = "PRIMARY" + extver = 1 + else: + extname = "" + try: + extver = fobj[i].header['EXTVER'] + except KeyError: + extver = 1 + extlist.append((extname, extver)) + else: + extlist = sciext[:] + else: + errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\ + " a valid EXTNAME string, or an integer." + logger.critical(errstr) + raise ValueError + return extlist + + +class HeaderletHDU(fits.hdu.nonstandard.FitsHDU): + """ + A non-standard extension HDU for encapsulating Headerlets in a file. These + HDUs have an extension type of HDRLET and their EXTNAME is derived from the + Headerlet's HDRNAME. + + The data itself is a FITS file embedded within the HDU data. The file name + is derived from the HDRNAME keyword, and should be in the form + `<HDRNAME>_hdr.fits`. If the COMPRESS keyword evaluates to `True`, the tar + file is compressed with gzip compression. + + The structure of this HDU is the same as that proposed for the 'FITS' + extension type proposed here: + http://listmgr.cv.nrao.edu/pipermail/fitsbits/2002-April/thread.html + + The Headerlet contained in the HDU's data can be accessed by the + `headerlet` attribute. + """ + + _extension = 'HDRLET' + + @lazyproperty + def headerlet(self): + """Return the encapsulated headerlet as a Headerlet object. + + This is similar to the hdulist property inherited from the FitsHDU + class, though the hdulist property returns a normal HDUList object. + """ + + return Headerlet(self.hdulist) + + @classmethod + def fromheaderlet(cls, headerlet, compress=False): + """ + Creates a new HeaderletHDU from a given Headerlet object. + + Parameters + ---------- + headerlet : `Headerlet` + A valid Headerlet object. + + compress : bool, optional + Gzip compress the headerlet data. + + Returns + ------- + hlet : `HeaderletHDU` + A `HeaderletHDU` object for the given `Headerlet` that can be + attached as an extension to an existing `HDUList`. + """ + + # TODO: Perhaps check that the given object is in fact a valid + # Headerlet + hlet = cls.fromhdulist(headerlet, compress) + + # Add some more headerlet-specific keywords to the header + phdu = headerlet[0] + + if 'SIPNAME' in phdu.header: + sipname = phdu.header['SIPNAME'] + else: + sipname = phdu.header['WCSNAME'] + + hlet.header['HDRNAME'] = (phdu.header['HDRNAME'], + phdu.header.comments['HDRNAME']) + hlet.header['DATE'] = (phdu.header['DATE'], + phdu.header.comments['DATE']) + hlet.header['SIPNAME'] = (sipname, 'SIP distortion model name') + hlet.header['WCSNAME'] = (phdu.header['WCSNAME'], 'WCS name') + hlet.header['DISTNAME'] = (phdu.header['DISTNAME'], + 'Distortion model name') + hlet.header['NPOLFILE'] = (phdu.header['NPOLFILE'], + phdu.header.comments['NPOLFILE']) + hlet.header['D2IMFILE'] = (phdu.header['D2IMFILE'], + phdu.header.comments['D2IMFILE']) + hlet.header['EXTNAME'] = (cls._extension, 'Extension name') + + return hlet + + +fits.register_hdu(HeaderletHDU) diff --git a/stwcs/wcsutil/hstwcs.py b/stwcs/wcsutil/hstwcs.py new file mode 100644 index 0000000..bfebcfc --- /dev/null +++ b/stwcs/wcsutil/hstwcs.py @@ -0,0 +1,988 @@ +from __future__ import absolute_import, division, print_function # confidence high + +import os +from astropy.wcs import WCS +from astropy.io import fits +from stwcs.distortion import models, coeff_converter +import numpy as np +from stsci.tools import fileutil + +from . import altwcs +from . import getinput +from . import mappings +from . import instruments +from .mappings import inst_mappings, ins_spec_kw +from .mappings import basic_wcs + +__docformat__ = 'restructuredtext' + +# +#### Utility functions copied from 'updatewcs.utils' to avoid circular imports +# +def extract_rootname(kwvalue,suffix=""): + """ Returns the rootname from a full reference filename + + If a non-valid value (any of ['','N/A','NONE','INDEF',None]) is input, + simply return a string value of 'NONE' + + This function will also replace any 'suffix' specified with a blank. + """ + # check to see whether a valid kwvalue has been provided as input + if kwvalue.strip() in ['','N/A','NONE','INDEF',None]: + return 'NONE' # no valid value, so return 'NONE' + + # for a valid kwvalue, parse out the rootname + # strip off any environment variable from input filename, if any are given + if '$' in kwvalue: + fullval = kwvalue[kwvalue.find('$')+1:] + else: + fullval = kwvalue + # Extract filename without path from kwvalue + fname = os.path.basename(fullval).strip() + + # Now, rip out just the rootname from the full filename + rootname = fileutil.buildNewRootname(fname) + + # Now, remove any known suffix from rootname + rootname = rootname.replace(suffix,'') + return rootname.strip() + +def build_default_wcsname(idctab): + + idcname = extract_rootname(idctab,suffix='_idc') + wcsname = 'IDC_' + idcname + return wcsname + + +class NoConvergence(Exception): + """ + An error class used to report non-convergence and/or divergence of + numerical methods. It is used to report errors in the iterative solution + used by the :py:meth:`~stwcs.hstwcs.HSTWCS.all_world2pix`\ . + + Attributes + ---------- + + best_solution : numpy.array + Best solution achieved by the method. + + accuracy : float + Accuracy of the :py:attr:`best_solution`\ . + + niter : int + Number of iterations performed by the numerical method to compute + :py:attr:`best_solution`\ . + + divergent : None, numpy.array + Indices of the points in :py:attr:`best_solution` array for which the + solution appears to be divergent. If the solution does not diverge, + `divergent` will be set to `None`. + + failed2converge : None, numpy.array + Indices of the points in :py:attr:`best_solution` array for which the + solution failed to converge within the specified maximum number + of iterations. If there are no non-converging poits (i.e., if + the required accuracy has been achieved for all points) then + `failed2converge` will be set to `None`. + + """ + def __init__(self, *args, **kwargs): + super(NoConvergence, self).__init__(*args) + + self.best_solution = kwargs.pop('best_solution', None) + self.accuracy = kwargs.pop('accuracy', None) + self.niter = kwargs.pop('niter', None) + self.divergent = kwargs.pop('divergent', None) + self.failed2converge= kwargs.pop('failed2converge', None) + + +# +#### HSTWCS Class definition +# +class HSTWCS(WCS): + + def __init__(self, fobj=None, ext=None, minerr=0.0, wcskey=" "): + """ + Create a WCS object based on the instrument. + + In addition to basic WCS keywords this class provides + instrument specific information needed in distortion computation. + + Parameters + ---------- + fobj : str or `astropy.io.fits.HDUList` object or None + file name, e.g j9irw4b1q_flt.fits + fully qualified filename[EXTNAME,EXTNUM], e.g. j9irw4b1q_flt.fits[sci,1] + `astropy.io.fits` file object, e.g fits.open('j9irw4b1q_flt.fits'), in which case the + user is responsible for closing the file object. + ext : int, tuple or None + extension number + if ext is tuple, it must be ("EXTNAME", EXTNUM), e.g. ("SCI", 2) + if ext is None, it is assumed the data is in the primary hdu + minerr : float + minimum value a distortion correction must have in order to be applied. + If CPERRja, CQERRja are smaller than minerr, the corersponding + distortion is not applied. + wcskey : str + A one character A-Z or " " used to retrieve and define an + alternate WCS description. + """ + + self.inst_kw = ins_spec_kw + self.minerr = minerr + self.wcskey = wcskey + + if fobj is not None: + filename, hdr0, ehdr, phdu = getinput.parseSingleInput(f=fobj, + ext=ext) + self.filename = filename + instrument_name = hdr0.get('INSTRUME', 'DEFAULT') + if instrument_name == 'DEFAULT' or instrument_name not in list(inst_mappings.keys()): + #['IRAF/ARTDATA','',' ','N/A']: + self.instrument = 'DEFAULT' + else: + self.instrument = instrument_name + # Set the correct reference frame + refframe = determine_refframe(hdr0) + ehdr['RADESYS'] = refframe + + WCS.__init__(self, ehdr, fobj=phdu, minerr=self.minerr, + key=self.wcskey) + if self.instrument == 'DEFAULT': + self.pc2cd() + # If input was a `astropy.io.fits.HDUList` object, it's the user's + # responsibility to close it, otherwise, it's closed here. + if not isinstance(fobj, fits.HDUList): + phdu.close() + self.setInstrSpecKw(hdr0, ehdr) + self.readIDCCoeffs(ehdr) + extname = ehdr.get('EXTNAME', '') + extnum = ehdr.get('EXTVER', None) + self.extname = (extname, extnum) + else: + # create a default HSTWCS object + self.instrument = 'DEFAULT' + WCS.__init__(self, minerr=self.minerr, key=self.wcskey) + self.pc2cd() + self.setInstrSpecKw() + self.setPscale() + self.setOrient() + + @property + def naxis1(self): + return self._naxis1 + + @naxis1.setter + def naxis1(self, value): + self._naxis1 = value + + @property + def naxis2(self): + return self._naxis2 + + @naxis2.setter + def naxis2(self, value): + self._naxis2 = value + + def readIDCCoeffs(self, header): + """ + Reads in first order IDCTAB coefficients if present in the header + """ + coeffs = ['ocx10', 'ocx11', 'ocy10', 'ocy11', 'idcscale', + 'idcv2ref','idcv3ref', 'idctheta'] + for c in coeffs: + self.__setattr__(c, header.get(c, None)) + + def setInstrSpecKw(self, prim_hdr=None, ext_hdr=None): + """ + Populate the instrument specific attributes: + + These can be in different headers but each instrument class has knowledge + of where to look for them. + + Parameters + ---------- + prim_hdr : `astropy.io.fits.Header` + primary header + ext_hdr : `astropy.io.fits.Header` + extension header + + """ + if self.instrument in list(inst_mappings.keys()): + inst_kl = inst_mappings[self.instrument] + inst_kl = instruments.__dict__[inst_kl] + insobj = inst_kl(prim_hdr, ext_hdr) + + for key in self.inst_kw: + try: + self.__setattr__(key, insobj.__getattribute__(key)) + except AttributeError: + # Some of the instrument's attributes are recorded in the primary header and + # were already set, (e.g. 'DETECTOR'), the code below is a check for that case. + if not self.__getattribute__(key): + raise + else: + pass + + else: + raise KeyError("Unsupported instrument - %s" %self.instrument) + + def setPscale(self): + """ + Calculates the plate scale from the CD matrix + """ + try: + cd11 = self.wcs.cd[0][0] + cd21 = self.wcs.cd[1][0] + self.pscale = np.sqrt(np.power(cd11,2)+np.power(cd21,2)) * 3600. + except AttributeError: + if self.wcs.has_cd(): + print("This file has a PC matrix. You may want to convert it \n \ + to a CD matrix, if reasonable, by running pc2.cd() method.\n \ + The plate scale can be set then by calling setPscale() method.\n") + self.pscale = None + + def setOrient(self): + """ + Computes ORIENTAT from the CD matrix + """ + try: + cd12 = self.wcs.cd[0][1] + cd22 = self.wcs.cd[1][1] + self.orientat = np.rad2deg(np.arctan2(cd12,cd22)) + except AttributeError: + if self.wcs.has_cd(): + print("This file has a PC matrix. You may want to convert it \n \ + to a CD matrix, if reasonable, by running pc2.cd() method.\n \ + The orientation can be set then by calling setOrient() method.\n") + self.pscale = None + + def updatePscale(self, scale): + """ + Updates the CD matrix with a new plate scale + """ + self.wcs.cd = self.wcs.cd/self.pscale*scale + self.setPscale() + + def readModel(self, update=False, header=None): + """ + Reads distortion model from IDCTAB. + + If IDCTAB is not found ('N/A', "", or not found on disk), then + if SIP coefficients and first order IDCTAB coefficients are present + in the header, restore the idcmodel from the header. + If not - assign None to self.idcmodel. + + Parameters + ---------- + header : `astropy.io.fits.Header` + fits extension header + update : bool (False) + if True - record the following IDCTAB quantities as header keywords: + CX10, CX11, CY10, CY11, IDCSCALE, IDCTHETA, IDCXREF, IDCYREF, + IDCV2REF, IDCV3REF + """ + if self.idctab in [None, '', ' ','N/A']: + #Keyword idctab is not present in header - check for sip coefficients + if header is not None and 'IDCSCALE' in header: + self._readModelFromHeader(header) + else: + print("Distortion model is not available: IDCTAB=None\n") + self.idcmodel = None + elif not os.path.exists(fileutil.osfn(self.idctab)): + if header is not None and 'IDCSCALE' in header: + self._readModelFromHeader(header) + else: + print('Distortion model is not available: IDCTAB file %s not found\n' % self.idctab) + self.idcmodel = None + else: + self.readModelFromIDCTAB(header=header, update=update) + + def _readModelFromHeader(self, header): + # Recreate idc model from SIP coefficients and header kw + print('Restoring IDC model from SIP coefficients\n') + model = models.GeometryModel() + cx, cy = coeff_converter.sip2idc(self) + model.cx = cx + model.cy = cy + model.name = "sip" + model.norder = header['A_ORDER'] + + refpix = {} + refpix['XREF'] = header['IDCXREF'] + refpix['YREF'] = header['IDCYREF'] + refpix['PSCALE'] = header['IDCSCALE'] + refpix['V2REF'] = header['IDCV2REF'] + refpix['V3REF'] = header['IDCV3REF'] + refpix['THETA'] = header['IDCTHETA'] + model.refpix = refpix + + self.idcmodel = model + + + def readModelFromIDCTAB(self, header=None, update=False): + """ + Read distortion model from idc table. + + Parameters + ---------- + header : `astropy.io.fits.Header` + fits extension header + update : booln (False) + if True - save teh following as header keywords: + CX10, CX11, CY10, CY11, IDCSCALE, IDCTHETA, IDCXREF, IDCYREF, + IDCV2REF, IDCV3REF + + """ + if self.date_obs == None: + print('date_obs not available\n') + self.idcmodel = None + return + if self.filter1 == None and self.filter2 == None: + 'No filter information available\n' + self.idcmodel = None + return + + self.idcmodel = models.IDCModel(self.idctab, + chip=self.chip, direction='forward', date=self.date_obs, + filter1=self.filter1, filter2=self.filter2, + offtab=self.offtab, binned=self.binned) + + if self.ltv1 != 0. or self.ltv2 != 0.: + self.resetLTV() + + if update: + if header==None: + print('Update header with IDC model kw requested but header was not provided\n.') + else: + self._updatehdr(header) + + def resetLTV(self): + """ + Reset LTV values for polarizer data + + The polarizer field is smaller than the detector field. + The distortion coefficients are defined for the entire + polarizer field and the LTV values are set as with subarray + data. This may also be true for other special filters. + This is a special case when the observation is considered + a subarray in terms of detector field but a full frame in + terms of distortion model. + To avoid shifting the distortion coefficients the LTV values + are reset to 0. + """ + if self.naxis1 == self.idcmodel.refpix['XSIZE'] and \ + self.naxis2 == self.idcmodel.refpix['YSIZE']: + self.ltv1 = 0. + self.ltv2 = 0. + + def wcs2header(self, sip2hdr=False, idc2hdr=True, wcskey=None, relax=False): + """ + Create a `astropy.io.fits.Header` object from WCS keywords. + + If the original header had a CD matrix, return a CD matrix, + otherwise return a PC matrix. + + Parameters + ---------- + sip2hdr : bool + If True - include SIP coefficients + """ + + h = self.to_header(key=wcskey, relax=relax) + if not wcskey: + wcskey = self.wcs.alt + if self.wcs.has_cd(): + h = altwcs.pc2cd(h, key=wcskey) + + if 'wcsname' not in h: + if self.idctab is not None: + wname = build_default_wcsname(self.idctab) + else: + wname = 'DEFAULT' + h['wcsname{0}'.format(wcskey)] = wname + + if idc2hdr: + for card in self._idc2hdr(): + h[card.keyword + wcskey] = (card.value, card.comment) + try: + del h['RESTFRQ'] + del h['RESTWAV'] + except KeyError: pass + + if sip2hdr and self.sip: + for card in self._sip2hdr('a'): + h[card.keyword] = (card.value, card.comment) + for card in self._sip2hdr('b'): + h[card.keyword] = (card.value, card.comment) + + try: + ap = self.sip.ap + except AssertionError: + ap = None + try: + bp = self.sip.bp + except AssertionError: + bp = None + + if ap: + for card in self._sip2hdr('ap'): + h[card.keyword] = (card.value, card.comment) + if bp: + for card in self._sip2hdr('bp'): + h[card.keyword] = (card.value, card.comment) + return h + + def _sip2hdr(self, k): + """ + Get a set of SIP coefficients in the form of an array + and turn them into a `astropy.io.fits.Cardlist`. + k - one of 'a', 'b', 'ap', 'bp' + """ + + cards = [] #fits.CardList() + korder = self.sip.__getattribute__(k+'_order') + cards.append(fits.Card(keyword=k.upper()+'_ORDER', value=korder)) + coeffs = self.sip.__getattribute__(k) + ind = coeffs.nonzero() + for i in range(len(ind[0])): + card = fits.Card(keyword=k.upper()+'_'+str(ind[0][i])+'_'+str(ind[1][i]), + value=coeffs[ind[0][i], ind[1][i]]) + cards.append(card) + return cards + + def _idc2hdr(self): + # save some of the idc coefficients + coeffs = ['ocx10', 'ocx11', 'ocy10', 'ocy11', 'idcscale'] + cards = [] #fits.CardList() + for c in coeffs: + try: + val = self.__getattribute__(c) + except AttributeError: + continue + if val: + cards.append(fits.Card(keyword=c, value=val)) + return cards + + def pc2cd(self): + if not self.wcs.has_pc(): + self.wcs.pc = self.wcs.get_pc() + self.wcs.cd = self.wcs.pc * self.wcs.cdelt[1] + + def all_world2pix(self, *args, **kwargs): + """ + all_world2pix(*arg, accuracy=1.0e-4, maxiter=20, adaptive=False, \ +detect_divergence=True, quiet=False) + + Performs full inverse transformation using iterative solution + on full forward transformation with complete distortion model. + + Parameters + ---------- + accuracy : float, optional (Default = 1.0e-4) + Required accuracy of the solution. Iteration terminates when the + correction to the solution found during the previous iteration + is smaller (in the sence of the L2 norm) than `accuracy`\ . + + maxiter : int, optional (Default = 20) + Maximum number of iterations allowed to reach the solution. + + adaptive : bool, optional (Default = False) + Specifies whether to adaptively select only points that did not + converge to a solution whithin the required accuracy for the + next iteration. Default is recommended for HST as well as most + other instruments. + + .. note:: + The :py:meth:`all_world2pix` uses a vectorized implementation + of the method of consecutive approximations (see `Notes` + section below) in which it iterates over *all* input poits + *regardless* until the required accuracy has been reached for + *all* input points. In some cases it may be possible that + *almost all* points have reached the required accuracy but + there are only a few of input data points left for which + additional iterations may be needed (this depends mostly on the + characteristics of the geometric distortions for a given + instrument). In this situation it may be + advantageous to set `adaptive` = `True`\ in which + case :py:meth:`all_world2pix` will continue iterating *only* over + the points that have not yet converged to the required + accuracy. However, for the HST's ACS/WFC detector, which has + the strongest distortions of all HST instruments, testing has + shown that enabling this option would lead to a about 10-30\% + penalty in computational time (depending on specifics of the + image, geometric distortions, and number of input points to be + converted). Therefore, for HST instruments, + it is recommended to set `adaptive` = `False`\ . The only + danger in getting this setting wrong will be a performance + penalty. + + .. note:: + When `detect_divergence` is `True`\ , :py:meth:`all_world2pix` \ + will automatically switch to the adaptive algorithm once + divergence has been detected. + + detect_divergence : bool, optional (Default = True) + Specifies whether to perform a more detailed analysis of the + convergence to a solution. Normally :py:meth:`all_world2pix` + may not achieve the required accuracy + if either the `tolerance` or `maxiter` arguments are too low. + However, it may happen that for some geometric distortions + the conditions of convergence for the the method of consecutive + approximations used by :py:meth:`all_world2pix` may not be + satisfied, in which case consecutive approximations to the + solution will diverge regardless of the `tolerance` or `maxiter` + settings. + + When `detect_divergence` is `False`\ , these divergent points + will be detected as not having achieved the required accuracy + (without further details). In addition, if `adaptive` is `False` + then the algorithm will not know that the solution (for specific + points) is diverging and will continue iterating and trying to + "improve" diverging solutions. This may result in NaN or Inf + values in the return results (in addition to a performance + penalties). Even when `detect_divergence` is + `False`\ , :py:meth:`all_world2pix`\ , at the end of the iterative + process, will identify invalid results (NaN or Inf) as "diverging" + solutions and will raise :py:class:`NoConvergence` unless + the `quiet` parameter is set to `True`\ . + + When `detect_divergence` is `True`\ , :py:meth:`all_world2pix` will + detect points for + which current correction to the coordinates is larger than + the correction applied during the previous iteration **if** the + requested accuracy **has not yet been achieved**\ . In this case, + if `adaptive` is `True`, these points will be excluded from + further iterations and if `adaptive` + is `False`\ , :py:meth:`all_world2pix` will automatically + switch to the adaptive algorithm. + + .. note:: + When accuracy has been achieved, small increases in + current corrections may be possible due to rounding errors + (when `adaptive` is `False`\ ) and such increases + will be ignored. + + .. note:: + Setting `detect_divergence` to `True` will incurr about 5-10\% + performance penalty (in our testing on ACS/WFC images). + Because the benefits of enabling this feature outweigh + the small performance penalty, it is recommended to set + `detect_divergence` to `True`\ , unless extensive testing + of the distortion models for images from specific + instruments show a good stability of the numerical method + for a wide range of coordinates (even outside the image + itself). + + .. note:: + Indices of the diverging inverse solutions will be reported + in the `divergent` attribute of the + raised :py:class:`NoConvergence` object. + + quiet : bool, optional (Default = False) + Do not throw :py:class:`NoConvergence` exceptions when the method + does not converge to a solution with the required accuracy + within a specified number of maximum iterations set by `maxiter` + parameter. Instead, simply return the found solution. + + Raises + ------ + NoConvergence + The method does not converge to a + solution with the required accuracy within a specified number + of maximum iterations set by the `maxiter` parameter. + + Notes + ----- + Inputs can either be (RA, Dec, origin) or (RADec, origin) where RA + and Dec are 1-D arrays/lists of coordinates and RADec is an + array/list of pairs of coordinates. + + Using the method of consecutive approximations we iterate starting + with the initial approximation, which is computed using the + non-distorion-aware :py:meth:`wcs_world2pix` (or equivalent). + + The :py:meth:`all_world2pix` function uses a vectorized implementation + of the method of consecutive approximations and therefore it is + highly efficient (>30x) when *all* data points that need to be + converted from sky coordinates to image coordinates are passed at + *once*\ . Therefore, it is advisable, whenever possible, to pass + as input a long array of all points that need to be converted + to :py:meth:`all_world2pix` instead of calling :py:meth:`all_world2pix` + for each data point. Also see the note to the `adaptive` parameter. + + Examples + -------- + >>> import stwcs + >>> from astropy.io import fits + >>> hdulist = fits.open('j94f05bgq_flt.fits') + >>> w = stwcs.wcsutil.HSTWCS(hdulist, ext=('sci',1)) + >>> hdulist.close() + + >>> ra, dec = w.all_pix2world([1,2,3],[1,1,1],1); print(ra); print(dec) + [ 5.52645241 5.52649277 5.52653313] + [-72.05171776 -72.05171295 -72.05170814] + >>> radec = w.all_pix2world([[1,1],[2,1],[3,1]],1); print(radec) + [[ 5.52645241 -72.05171776] + [ 5.52649277 -72.05171295] + [ 5.52653313 -72.05170814]] + >>> x, y = w.all_world2pix(ra,dec,1) + >>> print(x) + [ 1.00000233 2.00000232 3.00000233] + >>> print(y) + [ 0.99999997 0.99999997 0.99999998] + >>> xy = w.all_world2pix(radec,1) + >>> print(xy) + [[ 1.00000233 0.99999997] + [ 2.00000232 0.99999997] + [ 3.00000233 0.99999998]] + >>> xy = w.all_world2pix(radec,1, maxiter=3, accuracy=1.0e-10, \ +quiet=False) + NoConvergence: 'HSTWCS.all_world2pix' failed to converge to requested \ +accuracy after 3 iterations. + + >>> + Now try to use some diverging data: + >>> divradec = w.all_pix2world([[1.0,1.0],[10000.0,50000.0],\ +[3.0,1.0]],1); print(divradec) + [[ 5.52645241 -72.05171776] + [ 7.15979392 -70.81405561] + [ 5.52653313 -72.05170814]] + + >>> try: + >>> xy = w.all_world2pix(divradec,1, maxiter=20, accuracy=1.0e-4, \ +adaptive=False, detect_divergence=True, quiet=False) + >>> except stwcs.wcsutil.hstwcs.NoConvergence as e: + >>> print("Indices of diverging points: {}".format(e.divergent)) + >>> print("Indices of poorly converging points: {}".format(e.failed2converge)) + >>> print("Best solution: {}".format(e.best_solution)) + >>> print("Achieved accuracy: {}".format(e.accuracy)) + >>> raise e + Indices of diverging points: + [1] + Indices of poorly converging points: + None + Best solution: + [[ 1.00006219e+00 9.99999288e-01] + [ -1.99440907e+06 1.44308548e+06] + [ 3.00006257e+00 9.99999316e-01]] + Achieved accuracy: + [[ 5.98554253e-05 6.79918148e-07] + [ 8.59514088e+11 6.61703754e+11] + [ 6.02334592e-05 6.59713067e-07]] + Traceback (innermost last): + File "<console>", line 8, in <module> + NoConvergence: 'HSTWCS.all_world2pix' failed to converge to the requested accuracy. + After 5 iterations, the solution is diverging at least for one input point. + + >>> try: + >>> xy = w.all_world2pix(divradec,1, maxiter=20, accuracy=1.0e-4, \ +adaptive=False, detect_divergence=False, quiet=False) + >>> except stwcs.wcsutil.hstwcs.NoConvergence as e: + >>> print("Indices of diverging points: {}".format(e.divergent)) + >>> print("Indices of poorly converging points: {}".format(e.failed2converge)) + >>> print("Best solution: {}".format(e.best_solution)) + >>> print("Achieved accuracy: {}".format(e.accuracy)) + >>> raise e + Indices of diverging points: + [1] + Indices of poorly converging points: + None + Best solution: + [[ 1. 1.] + [ nan nan] + [ 3. 1.]] + Achieved accuracy: + [[ 0. 0.] + [ nan nan] + [ 0. 0.]] + Traceback (innermost last): + File "<console>", line 8, in <module> + NoConvergence: 'HSTWCS.all_world2pix' failed to converge to the requested accuracy. + After 20 iterations, the solution is diverging at least for one input point. + + """ + ##################################################################### + ## PROCESS ARGUMENTS: ## + ##################################################################### + nargs = len(args) + + if nargs == 3: + try: + ra = np.asarray(args[0], dtype=np.float64) + dec = np.asarray(args[1], dtype=np.float64) + #assert( len(ra.shape) == 1 and len(dec.shape) == 1 ) + origin = int(args[2]) + vect1D = True + except: + raise TypeError("When providing three arguments, they must " \ + "be (Ra, Dec, origin) where Ra and Dec are " \ + "Nx1 vectors.") + elif nargs == 2: + try: + rd = np.asarray(args[0], dtype=np.float64) + #assert( rd.shape[1] == 2 ) + ra = rd[:,0] + dec = rd[:,1] + origin = int(args[1]) + vect1D = False + except: + raise TypeError("When providing two arguments, they must " \ + "be (RaDec, origin) where RaDec is a Nx2 array.") + else: + raise TypeError("Expected 2 or 3 arguments, {:d} given." \ + .format(nargs)) + + # process optional arguments: + accuracy = kwargs.pop('accuracy', 1.0e-4) + maxiter = kwargs.pop('maxiter', 20) + adaptive = kwargs.pop('adaptive', False) + detect_divergence = kwargs.pop('detect_divergence', True) + quiet = kwargs.pop('quiet', False) + + ##################################################################### + ## INITIALIZE ITERATIVE PROCESS: ## + ##################################################################### + x0, y0 = self.wcs_world2pix(ra, dec, origin) # <-- initial approximation + # (WCS based only) + + # see if an iterative solution is required (when any of the + # non-CD-matrix corrections are present). If not required + # return initial approximation (x0, y0). + if self.sip is None and \ + self.cpdis1 is None and self.cpdis2 is None and \ + self.det2im1 is None and self.det2im2 is None: + # no non-WCS corrections are detected - return + # initial approximation + if vect1D: + return [x0, y0] + else: + return np.dstack([x0,y0])[0] + + x = x0.copy() # 0-order solution + y = y0.copy() # 0-order solution + + # initial correction: + dx, dy = self.pix2foc(x, y, origin) + # If pix2foc does not apply all the required distortion + # corrections then replace the above line with: + #r0, d0 = self.all_pix2world(x, y, origin) + #dx, dy = self.wcs_world2pix(r0, d0, origin ) + dx -= x0 + dy -= y0 + + # update initial solution: + x -= dx + y -= dy + + # norn (L2) squared of the correction: + dn2prev = dx**2+dy**2 + dn2 = dn2prev + + # prepare for iterative process + iterlist = list(range(1, maxiter+1)) + accuracy2 = accuracy**2 + ind = None + inddiv = None + + npts = x.shape[0] + + # turn off numpy runtime warnings for 'invalid' and 'over': + old_invalid = np.geterr()['invalid'] + old_over = np.geterr()['over'] + np.seterr(invalid = 'ignore', over = 'ignore') + + ##################################################################### + ## NON-ADAPTIVE ITERATIONS: ## + ##################################################################### + if not adaptive: + for k in iterlist: + # check convergence: + if np.max(dn2) < accuracy2: + break + + # find correction to the previous solution: + dx, dy = self.pix2foc(x, y, origin) + # If pix2foc does not apply all the required distortion + # corrections then replace the above line with: + #r0, d0 = self.all_pix2world(x, y, origin) + #dx, dy = self.wcs_world2pix(r0, d0, origin ) + dx -= x0 + dy -= y0 + + # update norn (L2) squared of the correction: + dn2 = dx**2+dy**2 + + # check for divergence (we do this in two stages + # to optimize performance for the most common + # scenario when succesive approximations converge): + if detect_divergence: + ind, = np.where(dn2 <= dn2prev) + if ind.shape[0] < npts: + inddiv, = np.where( + np.logical_and(dn2 > dn2prev, dn2 >= accuracy2)) + if inddiv.shape[0] > 0: + # apply correction only to the converging points: + x[ind] -= dx[ind] + y[ind] -= dy[ind] + # switch to adaptive iterations: + ind, = np.where((dn2 >= accuracy2) & \ + (dn2 <= dn2prev) & np.isfinite(dn2)) + iterlist = iterlist[k:] + adaptive = True + break + #dn2prev[ind] = dn2[ind] + dn2prev = dn2 + + # apply correction: + x -= dx + y -= dy + + ##################################################################### + ## ADAPTIVE ITERATIONS: ## + ##################################################################### + if adaptive: + if ind is None: + ind = np.asarray(list(range(npts)), dtype=np.int64) + + for k in iterlist: + # check convergence: + if ind.shape[0] == 0: + break + + # find correction to the previous solution: + dx[ind], dy[ind] = self.pix2foc(x[ind], y[ind], origin) + # If pix2foc does not apply all the required distortion + # corrections then replace the above line with: + #r0[ind], d0[ind] = self.all_pix2world(x[ind], y[ind], origin) + #dx[ind], dy[ind] = self.wcs_world2pix(r0[ind], d0[ind], origin) + dx[ind] -= x0[ind] + dy[ind] -= y0[ind] + + # update norn (L2) squared of the correction: + dn2 = dx**2+dy**2 + + # update indices of elements that still need correction: + if detect_divergence: + ind, = np.where((dn2 >= accuracy2) & (dn2 <= dn2prev)) + #ind = ind[np.where((dn2[ind] >= accuracy2) & (dn2[ind] <= dn2prev))] + dn2prev[ind] = dn2[ind] + else: + ind, = np.where(dn2 >= accuracy2) + #ind = ind[np.where(dn2[ind] >= accuracy2)] + + # apply correction: + x[ind] -= dx[ind] + y[ind] -= dy[ind] + + ##################################################################### + ## FINAL DETECTION OF INVALID, DIVERGING, ## + ## AND FAILED-TO-CONVERGE POINTS ## + ##################################################################### + # Identify diverging and/or invalid points: + invalid = (((~np.isfinite(y)) | (~np.isfinite(x)) | \ + (~np.isfinite(dn2))) & \ + (np.isfinite(ra)) & (np.isfinite(dec))) + # When detect_divergence==False, dn2prev is outdated (it is the + # norm^2 of the very first correction). Still better than nothing... + inddiv, = np.where(((dn2 >= accuracy2) & (dn2 > dn2prev)) | invalid) + if inddiv.shape[0] == 0: + inddiv = None + # identify points that did not converge within + # 'maxiter' iterations: + if k >= maxiter: + ind,= np.where((dn2 >= accuracy2) & (dn2 <= dn2prev) & (~invalid)) + if ind.shape[0] == 0: + ind = None + else: + ind = None + + ##################################################################### + ## RAISE EXCEPTION IF DIVERGING OR TOO SLOWLY CONVERGING ## + ## DATA POINTS HAVE BEEN DETECTED: ## + ##################################################################### + # raise exception if diverging or too slowly converging + if (ind is not None or inddiv is not None) and not quiet: + if vect1D: + sol = [x, y] + err = [np.abs(dx), np.abs(dy)] + else: + sol = np.dstack( [x, y] )[0] + err = np.dstack( [np.abs(dx), np.abs(dy)] )[0] + + # restore previous numpy error settings: + np.seterr(invalid = old_invalid, over = old_over) + + if inddiv is None: + raise NoConvergence("'HSTWCS.all_world2pix' failed to " \ + "converge to the requested accuracy after {:d} " \ + "iterations.".format(k), best_solution = sol, \ + accuracy = err, niter = k, failed2converge = ind, \ + divergent = None) + else: + raise NoConvergence("'HSTWCS.all_world2pix' failed to " \ + "converge to the requested accuracy.{0:s}" \ + "After {1:d} iterations, the solution is diverging " \ + "at least for one input point." \ + .format(os.linesep, k), best_solution = sol, \ + accuracy = err, niter = k, failed2converge = ind, \ + divergent = inddiv) + + ##################################################################### + ## FINALIZE AND FORMAT DATA FOR RETURN: ## + ##################################################################### + # restore previous numpy error settings: + np.seterr(invalid = old_invalid, over = old_over) + + if vect1D: + return [x, y] + else: + return np.dstack( [x, y] )[0] + + def _updatehdr(self, ext_hdr): + #kw2add : OCX10, OCX11, OCY10, OCY11 + # record the model in the header for use by pydrizzle + ext_hdr['OCX10'] = self.idcmodel.cx[1,0] + ext_hdr['OCX11'] = self.idcmodel.cx[1,1] + ext_hdr['OCY10'] = self.idcmodel.cy[1,0] + ext_hdr['OCY11'] = self.idcmodel.cy[1,1] + ext_hdr['IDCSCALE'] = self.idcmodel.refpix['PSCALE'] + ext_hdr['IDCTHETA'] = self.idcmodel.refpix['THETA'] + ext_hdr['IDCXREF'] = self.idcmodel.refpix['XREF'] + ext_hdr['IDCYREF'] = self.idcmodel.refpix['YREF'] + ext_hdr['IDCV2REF'] = self.idcmodel.refpix['V2REF'] + ext_hdr['IDCV3REF'] = self.idcmodel.refpix['V3REF'] + + def printwcs(self): + """ + Print the basic WCS keywords. + """ + print('WCS Keywords\n') + print('CD_11 CD_12: %r %r' % (self.wcs.cd[0,0], self.wcs.cd[0,1])) + print('CD_21 CD_22: %r %r' % (self.wcs.cd[1,0], self.wcs.cd[1,1])) + print('CRVAL : %r %r' % (self.wcs.crval[0], self.wcs.crval[1])) + print('CRPIX : %r %r' % (self.wcs.crpix[0], self.wcs.crpix[1])) + print('NAXIS : %d %d' % (self.naxis1, self.naxis2)) + print('Plate Scale : %r' % self.pscale) + print('ORIENTAT : %r' % self.orientat) + + +def determine_refframe(phdr): + """ + Determine the reference frame in standard FITS WCS terms. + + Parameters + ---------- + phdr : `astropy.io.fits.Header` + Primary Header of an HST observation + + In HST images the reference frame is recorded in the primary extension as REFFRAME. + Values are "GSC1" which means FK5 or ICRS (for GSC2 observations). + """ + try: + refframe = phdr['REFFRAME'] + except KeyError: + refframe = " " + if refframe == "GSC1": + refframe = "FK5" + return refframe diff --git a/stwcs/wcsutil/instruments.py b/stwcs/wcsutil/instruments.py new file mode 100644 index 0000000..f662513 --- /dev/null +++ b/stwcs/wcsutil/instruments.py @@ -0,0 +1,320 @@ +from __future__ import absolute_import, division, print_function # confidence high + +from .mappings import ins_spec_kw + +class InstrWCS(object): + """ + A base class for instrument specific keyword definition. + It prvides a default implementation (modeled by ACS) for + all set_kw methods. + """ + def __init__(self, hdr0=None, hdr=None): + self.exthdr = hdr + self.primhdr = hdr0 + self.set_ins_spec_kw() + + def set_ins_spec_kw(self): + """ + This method MUST call all set_kw methods. + There should be a set_kw method for all kw listed in + mappings.ins_spec_kw. TypeError handles the case when + fobj='DEFAULT'. + """ + self.set_idctab() + self.set_offtab() + self.set_date_obs() + self.set_ra_targ() + self.set_dec_targ() + self.set_pav3() + self.set_detector() + self.set_filter1() + self.set_filter2() + self.set_vafactor() + self.set_naxis1() + self.set_naxis2() + self.set_ltv1() + self.set_ltv2() + self.set_binned() + self.set_chip() + self.set_parity() + + def set_idctab(self): + try: + self.idctab = self.primhdr['IDCTAB'] + except (KeyError, TypeError): + self.idctab = None + + def set_offtab(self): + try: + self.offtab = self.primhdr['OFFTAB'] + except (KeyError, TypeError): + self.offtab = None + + def set_date_obs(self): + try: + self.date_obs = self.primhdr['DATE-OBS'] + except (KeyError, TypeError): + self.date_obs = None + + def set_ra_targ(self): + try: + self.ra_targ = self.primhdr['RA-TARG'] + except (KeyError, TypeError): + self.ra_targ = None + + def set_dec_targ(self): + try: + self.dec_targ = self.primhdr['DEC-TARG'] + except (KeyError, TypeError): + self.dec_targ = None + + def set_pav3(self): + try: + self.pav3 = self.primhdr['PA_V3'] + except (KeyError, TypeError): + self.pav3 = None + + def set_filter1(self): + try: + self.filter1 = self.primhdr['FILTER1'] + except (KeyError, TypeError): + self.filter1 = None + + def set_filter2(self): + try: + self.filter2 = self.primhdr['FILTER2'] + except (KeyError, TypeError): + self.filter2 = None + + def set_vafactor(self): + try: + self.vafactor = self.exthdr['VAFACTOR'] + except (KeyError, TypeError): + self.vafactor = 1 + + def set_naxis1(self): + try: + self.naxis1 = self.exthdr['naxis1'] + except (KeyError, TypeError): + try: + self.naxis1 = self.exthdr['npix1'] + except (KeyError, TypeError): + self.naxis1 = None + + def set_naxis2(self): + try: + self.naxis2 = self.exthdr['naxis2'] + except (KeyError, TypeError): + try: + self.naxis2 = self.exthdr['npix2'] + except (KeyError, TypeError): + self.naxis2 = None + + def set_ltv1(self): + try: + self.ltv1 = self.exthdr['LTV1'] + except (KeyError, TypeError): + self.ltv1 = 0.0 + + def set_ltv2(self): + try: + self.ltv2 = self.exthdr['LTV2'] + except (KeyError, TypeError): + self.ltv2 = 0.0 + + def set_binned(self): + try: + self.binned = self.exthdr['BINAXIS1'] + except (KeyError, TypeError): + self.binned = 1 + + def set_chip(self): + try: + self.chip = self.exthdr['CCDCHIP'] + except (KeyError, TypeError): + self.chip = 1 + + def set_parity(self): + self.parity = [[1.0,0.0],[0.0,-1.0]] + + def set_detector(self): + # each instrument has a different kw for detector and it can be + # in a different header, so this is to be handled by the instrument classes + self.detector = 'DEFAULT' + +class ACSWCS(InstrWCS): + """ + get instrument specific kw + """ + + def __init__(self, hdr0, hdr): + self.primhdr = hdr0 + self.exthdr = hdr + InstrWCS.__init__(self,hdr0, hdr) + self.set_ins_spec_kw() + + def set_detector(self): + try: + self.detector = self.primhdr['DETECTOR'] + except KeyError: + print('ERROR: Detector kw not found.\n') + raise + + def set_parity(self): + parity = {'WFC':[[1.0,0.0],[0.0,-1.0]], + 'HRC':[[-1.0,0.0],[0.0,1.0]], + 'SBC':[[-1.0,0.0],[0.0,1.0]]} + + if self.detector not in list(parity.keys()): + parity = InstrWCS.set_parity(self) + else: + self.parity = parity[self.detector] + + +class WFPC2WCS(InstrWCS): + + + def __init__(self, hdr0, hdr): + self.primhdr = hdr0 + self.exthdr = hdr + InstrWCS.__init__(self,hdr0, hdr) + self.set_ins_spec_kw() + + def set_filter1(self): + self.filter1 = self.primhdr.get('FILTNAM1', None) + if self.filter1 == " " or self.filter1 == None: + self.filter1 = 'CLEAR1' + + def set_filter2(self): + self.filter2 = self.primhdr.get('FILTNAM2', None) + if self.filter2 == " " or self.filter2 == None: + self.filter2 = 'CLEAR2' + + + def set_binned(self): + mode = self.primhdr.get('MODE', 1) + if mode == 'FULL': + self.binned = 1 + elif mode == 'AREA': + self.binned = 2 + + def set_chip(self): + self.chip = self.exthdr.get('DETECTOR', 1) + + def set_parity(self): + self.parity = [[-1.0,0.],[0.,1.0]] + + def set_detector(self): + try: + self.detector = self.exthdr['DETECTOR'] + except KeyError: + print('ERROR: Detector kw not found.\n') + raise + + +class WFC3WCS(InstrWCS): + """ + Create a WFC3 detector specific class + """ + + def __init__(self, hdr0, hdr): + self.primhdr = hdr0 + self.exthdr = hdr + InstrWCS.__init__(self,hdr0, hdr) + self.set_ins_spec_kw() + + def set_detector(self): + try: + self.detector = self.primhdr['DETECTOR'] + except KeyError: + print('ERROR: Detector kw not found.\n') + raise + + def set_filter1(self): + self.filter1 = self.primhdr.get('FILTER', None) + if self.filter1 == " " or self.filter1 == None: + self.filter1 = 'CLEAR' + + def set_filter2(self): + #Nicmos idc tables do not allow 2 filters. + self.filter2 = 'CLEAR' + + def set_parity(self): + parity = {'UVIS':[[-1.0,0.0],[0.0,1.0]], + 'IR':[[-1.0,0.0],[0.0,1.0]]} + + if self.detector not in list(parity.keys()): + parity = InstrWCS.set_parity(self) + else: + self.parity = parity[self.detector] + +class NICMOSWCS(InstrWCS): + """ + Create a NICMOS specific class + """ + + def __init__(self, hdr0, hdr): + self.primhdr = hdr0 + self.exthdr = hdr + InstrWCS.__init__(self,hdr0, hdr) + self.set_ins_spec_kw() + + def set_parity(self): + self.parity = [[-1.0,0.],[0.,1.0]] + + def set_filter1(self): + self.filter1 = self.primhdr.get('FILTER', None) + if self.filter1 == " " or self.filter1 == None: + self.filter1 = 'CLEAR' + + def set_filter2(self): + #Nicmos idc tables do not allow 2 filters. + self.filter2 = 'CLEAR' + + def set_chip(self): + self.chip = self.detector + + def set_detector(self): + try: + self.detector = self.primhdr['CAMERA'] + except KeyError: + print('ERROR: Detector kw not found.\n') + raise + +class STISWCS(InstrWCS): + """ + A STIS specific class + """ + + def __init__(self, hdr0, hdr): + self.primhdr = hdr0 + self.exthdr = hdr + InstrWCS.__init__(self,hdr0, hdr) + self.set_ins_spec_kw() + + def set_parity(self): + self.parity = [[-1.0,0.],[0.,1.0]] + + def set_filter1(self): + self.filter1 = self.exthdr.get('OPT_ELEM', None) + if self.filter1 == " " or self.filter1 == None: + self.filter1 = 'CLEAR1' + + def set_filter2(self): + self.filter2 = self.exthdr.get('FILTER', None) + if self.filter2 == " " or self.filter2 == None: + self.filter2 = 'CLEAR2' + + def set_detector(self): + try: + self.detector = self.primhdr['DETECTOR'] + except KeyError: + print('ERROR: Detector kw not found.\n') + raise + + def set_date_obs(self): + try: + self.date_obs = self.exthdr['DATE-OBS'] + except (KeyError, TypeError): + self.date_obs = None + diff --git a/stwcs/wcsutil/mappings.py b/stwcs/wcsutil/mappings.py new file mode 100644 index 0000000..24038bf --- /dev/null +++ b/stwcs/wcsutil/mappings.py @@ -0,0 +1,29 @@ +from __future__ import division # confidence high + +# This dictionary maps an instrument into an instrument class +# The instrument class handles instrument specific keywords + +inst_mappings={'WFPC2': 'WFPC2WCS', + 'ACS': 'ACSWCS', + 'NICMOS': 'NICMOSWCS', + 'STIS': 'STISWCS', + 'WFC3': 'WFC3WCS', + 'DEFAULT': 'InstrWCS' + } + + +# A list of instrument specific keywords +# Every instrument class must have methods which define each of these +# as class attributes. +ins_spec_kw = [ 'idctab', 'offtab', 'date_obs', 'ra_targ', 'dec_targ', 'pav3', \ + 'detector', 'ltv1', 'ltv2', 'parity', 'binned','vafactor', \ + 'chip', 'naxis1', 'naxis2', 'filter1', 'filter2'] + +# A list of keywords defined in the primary header. +# The HSTWCS class sets this as attributes +prim_hdr_kw = ['detector', 'offtab', 'idctab', 'date-obs', + 'pa_v3', 'ra_targ', 'dec_targ'] + +# These are the keywords which are archived before MakeWCS is run +basic_wcs = ['CD1_', 'CD2_', 'CRVAL', 'CTYPE', 'CRPIX', 'CTYPE', 'CDELT', 'CUNIT'] + diff --git a/stwcs/wcsutil/mosaic.py b/stwcs/wcsutil/mosaic.py new file mode 100644 index 0000000..9d2d0a3 --- /dev/null +++ b/stwcs/wcsutil/mosaic.py @@ -0,0 +1,183 @@ +from __future__ import division, print_function +import numpy as np +from matplotlib import pyplot as plt +from astropy.io import fits +import string + +from stsci.tools import parseinput, irafglob +from stwcs.distortion import utils +from stwcs import updatewcs, wcsutil +from stwcs.wcsutil import altwcs + +def vmosaic(fnames, outwcs=None, ref_wcs=None, ext=None, extname=None, undistort=True, wkey='V', wname='VirtualMosaic', plot=False, clobber=False): + """ + Create a virtual mosaic using the WCS of the input images. + + Parameters + ---------- + fnames: a string or a list + a string or a list of filenames, or a list of wcsutil.HSTWCS objects + outwcs: an HSTWCS object + if given, represents the output tangent plane + if None, the output WCS is calculated from the input observations. + ref_wcs: an HSTWCS object + if output wcs is not given, this will be used as a reference for the + calculation of the output WCS. If ref_wcs is None and outwcs is None, + then the first observation in th einput list is used as reference. + ext: an int, a tuple or a list + an int - represents a FITS extension, e.g. 0 is the primary HDU + a tuple - uses the notation (extname, extver), e.g. ('sci',1) + Can be a list of integers or tuples representing FITS extensions + extname: string + the value of the EXTNAME keyword for the extensions to be used in the mosaic + undistort: boolean (default: True) + undistort (or not) the output WCS + wkey: string + default: 'V' + one character A-Z to be used to record the virtual mosaic WCS as + an alternate WCS in the headers of the input files. + wname: string + default: 'VirtualMosaic + a string to be used as a WCSNAME value for the alternate WCS representign + the virtual mosaic + plot: boolean + if True and matplotlib is installed will make a plot of the tangent plane + and the location of the input observations. + clobber: boolean + This covers the case when an alternate WCS with the requested key + already exists in the header of the input files. + if clobber is True, it will be overwritten + if False, it will compute the new one but will not write it to the headers. + + Notes + ----- + The algorithm is: + 1. If output WCS is not given it is calculated from the input WCSes. + The first image is used as a reference, if no reference is given. + This represents the virtual mosaic WCS. + 2. For each input observation/chip, an HSTWCS object is created + and its footprint on the sky is calculated (using only the four corners). + 3. For each input observation the footprint is projected on the output + tangent plane and the virtual WCS is recorded in the header. + """ + wcsobjects = readWCS(fnames, ext, extname) + if outwcs != None: + outwcs = outwcs.deepcopy() + else: + if ref_wcs != None: + outwcs = utils.output_wcs(wcsobjects, ref_wcs=ref_wcs, undistort=undistort) + else: + outwcs = utils.output_wcs(wcsobjects, undistort=undistort) + if plot: + outc=np.array([[0.,0], [outwcs._naxis1, 0], + [outwcs._naxis1, outwcs._naxis2], + [0, outwcs._naxis2], [0, 0]]) + plt.plot(outc[:,0], outc[:,1]) + for wobj in wcsobjects: + outcorners = outwcs.wcs_world2pix(wobj.calc_footprint(),1) + if plot: + plt.plot(outcorners[:,0], outcorners[:,1]) + objwcs = outwcs.deepcopy() + objwcs.wcs.crpix = objwcs.wcs.crpix - (outcorners[0]) + updatehdr(wobj.filename, objwcs,wkey=wkey, wcsname=wname, ext=wobj.extname, clobber=clobber) + return outwcs + +def updatehdr(fname, wcsobj, wkey, wcsname, ext=1, clobber=False): + hdr = fits.getheader(fname, ext=ext) + all_keys = list(string.ascii_uppercase) + if wkey.upper() not in all_keys: + raise KeyError("wkey must be one character: A-Z") + if wkey not in altwcs.available_wcskeys(hdr): + if not clobber: + raise ValueError("wkey %s is already in use. Use clobber=True to overwrite it or specify a different key." %wkey) + else: + altwcs.deleteWCS(fname, ext=ext, wcskey='V') + f = fits.open(fname, mode='update') + + hwcs = wcs2header(wcsobj) + wcsnamekey = 'WCSNAME' + wkey + f[ext].header[wcsnamekey] = wcsname + for k in hwcs: + f[ext].header[k[:7]+wkey] = hwcs[k] + + f.close() + +def wcs2header(wcsobj): + + h = wcsobj.to_header() + + if wcsobj.wcs.has_cd(): + altwcs.pc2cd(h) + h['CTYPE1'] = 'RA---TAN' + h['CTYPE2'] = 'DEC--TAN' + norient = np.rad2deg(np.arctan2(h['CD1_2'],h['CD2_2'])) + #okey = 'ORIENT%s' % wkey + okey = 'ORIENT' + h[okey] = norient + return h + +def readWCS(input, exts=None, extname=None): + if isinstance(input, str): + if input[0] == '@': + # input is an @ file + filelist = irafglob.irafglob(input) + else: + try: + filelist, output = parseinput.parseinput(input) + except IOError: raise + elif isinstance(input, list): + if isinstance(input[0], wcsutil.HSTWCS): + # a list of HSTWCS objects + return input + else: + filelist = input[:] + wcso = [] + fomited = [] + # figure out which FITS extension(s) to use + if exts == None and extname == None: + #Assume it's simple FITS and the data is in the primary HDU + for f in filelist: + try: + wcso = wcsutil.HSTWCS(f) + except AttributeError: + fomited.append(f) + continue + elif exts != None and validateExt(exts): + exts = [exts] + for f in filelist: + try: + wcso.extend([wcsutil.HSTWCS(f, ext=e) for e in exts]) + except KeyError: + fomited.append(f) + continue + elif extname != None: + for f in filelist: + fobj = fits.open(f) + for i in range(len(fobj)): + try: + ename = fobj[i].header['EXTNAME'] + except KeyError: + continue + if ename.lower() == extname.lower(): + wcso.append(wcsutil.HSTWCS(f,ext=i)) + else: + continue + fobj.close() + if fomited != []: + print("These files were skipped:") + for f in fomited: + print(f) + return wcso + + +def validateExt(ext): + if not isinstance(ext, int) and not isinstance(ext, tuple) \ + and not isinstance(ext, list): + print("Ext must be integer, tuple, a list of int extension numbers, \ + or a list of tuples representing a fits extension, for example ('sci', 1).") + return False + else: + return True + + + diff --git a/stwcs/wcsutil/wcscorr.py b/stwcs/wcsutil/wcscorr.py new file mode 100644 index 0000000..3f9b7d5 --- /dev/null +++ b/stwcs/wcsutil/wcscorr.py @@ -0,0 +1,668 @@ +from __future__ import absolute_import, division, print_function + +import os,copy +import numpy as np +from astropy.io import fits + +import stwcs +from stwcs.wcsutil import altwcs +from stwcs.updatewcs import utils +from stsci.tools import fileutil +from . import convertwcs + +DEFAULT_WCS_KEYS = ['CRVAL1','CRVAL2','CRPIX1','CRPIX2', + 'CD1_1','CD1_2','CD2_1','CD2_2', + 'CTYPE1','CTYPE2','ORIENTAT'] +DEFAULT_PRI_KEYS = ['HDRNAME','SIPNAME','NPOLNAME','D2IMNAME','DESCRIP'] +COL_FITSKW_DICT = {'RMS_RA':'sci.crder1','RMS_DEC':'sci.crder2', + 'NMatch':'sci.nmatch','Catalog':'sci.catalog'} + +### +### WCSEXT table related keyword archive functions +### +def init_wcscorr(input, force=False): + """ + This function will initialize the WCSCORR table if it is not already present, + and look for WCS keywords with a prefix of 'O' as the original OPUS + generated WCS as the initial row for the table or use the current WCS + keywords as initial row if no 'O' prefix keywords are found. + + This function will NOT overwrite any rows already present. + + This function works on all SCI extensions at one time. + """ + # TODO: Create some sort of decorator or (for Python2.5) context for + # opening a FITS file and closing it when done, if necessary + if not isinstance(input, fits.HDUList): + # input must be a filename, so open as `astropy.io.fits.HDUList` object + fimg = fits.open(input, mode='update') + need_to_close = True + else: + fimg = input + need_to_close = False + + # Do not try to generate a WCSCORR table for a simple FITS file + numsci = fileutil.countExtn(fimg) + if len(fimg) == 1 or numsci == 0: + return + + enames = [] + for e in fimg: enames.append(e.name) + if 'WCSCORR' in enames: + if not force: + return + else: + del fimg['wcscorr'] + print('Initializing new WCSCORR table for ',fimg.filename()) + + used_wcskeys = altwcs.wcskeys(fimg['SCI', 1].header) + + # define the primary columns of the WCSEXT table with initial rows for each + # SCI extension for the original OPUS solution + numwcs = len(used_wcskeys) + if numwcs == 0: numwcs = 1 + + # create new table with more rows than needed initially to make it easier to + # add new rows later + wcsext = create_wcscorr(descrip=True,numrows=numsci, padding=(numsci*numwcs) + numsci * 4) + # Assign the correct EXTNAME value to this table extension + wcsext.header['TROWS'] = (numsci * 2, 'Number of updated rows in table') + wcsext.header['EXTNAME'] = ('WCSCORR', 'Table with WCS Update history') + wcsext.header['EXTVER'] = 1 + + # define set of WCS keywords which need to be managed and copied to the table + wcs1 = stwcs.wcsutil.HSTWCS(fimg,ext=('SCI',1)) + idc2header = True + if wcs1.idcscale is None: + idc2header = False + wcs_keywords = list(wcs1.wcs2header(idc2hdr=idc2header).keys()) + + prihdr = fimg[0].header + prihdr_keys = DEFAULT_PRI_KEYS + pri_funcs = {'SIPNAME':stwcs.updatewcs.utils.build_sipname, + 'NPOLNAME':stwcs.updatewcs.utils.build_npolname, + 'D2IMNAME':stwcs.updatewcs.utils.build_d2imname} + + # Now copy original OPUS values into table + for extver in range(1, numsci + 1): + rowind = find_wcscorr_row(wcsext.data, + {'WCS_ID': 'OPUS', 'EXTVER': extver, + 'WCS_key':'O'}) + # There should only EVER be a single row for each extension with OPUS values + rownum = np.where(rowind)[0][0] + #print 'Archiving OPUS WCS in row number ',rownum,' in WCSCORR table for SCI,',extver + + hdr = fimg['SCI', extver].header + # define set of WCS keywords which need to be managed and copied to the table + if used_wcskeys is None: + used_wcskeys = altwcs.wcskeys(hdr) + # Check to see whether or not there is an OPUS alternate WCS present, + # if so, get its values directly, otherwise, archive the PRIMARY WCS + # as the OPUS values in the WCSCORR table + if 'O' not in used_wcskeys: + altwcs.archiveWCS(fimg,('SCI',extver),wcskey='O', wcsname='OPUS') + wkey = 'O' + + wcs = stwcs.wcsutil.HSTWCS(fimg, ext=('SCI', extver), wcskey=wkey) + wcshdr = wcs.wcs2header(idc2hdr=idc2header) + + if wcsext.data.field('CRVAL1')[rownum] != 0: + # If we find values for these keywords already in the table, do not + # overwrite them again + print('WCS keywords already updated...') + break + for key in wcs_keywords: + if key in wcsext.data.names: + wcsext.data.field(key)[rownum] = wcshdr[(key+wkey)[:8]] + # Now get any keywords from PRIMARY header needed for WCS updates + for key in prihdr_keys: + if key in prihdr: + val = prihdr[key] + else: + val = '' + wcsext.data.field(key)[rownum] = val + + # Now that we have archived the OPUS alternate WCS, remove it from the list + # of used_wcskeys + if 'O' in used_wcskeys: + used_wcskeys.remove('O') + + # Now copy remaining alternate WCSs into table + # TODO: Much of this appears to be redundant with update_wcscorr; consider + # merging them... + for uwkey in used_wcskeys: + for extver in range(1, numsci + 1): + hdr = fimg['SCI', extver].header + wcs = stwcs.wcsutil.HSTWCS(fimg, ext=('SCI', extver), + wcskey=uwkey) + wcshdr = wcs.wcs2header() + if 'WCSNAME' + uwkey not in wcshdr: + wcsid = utils.build_default_wcsname(fimg[0].header['idctab']) + else: + wcsid = wcshdr['WCSNAME' + uwkey] + + # identify next empty row + rowind = find_wcscorr_row(wcsext.data, + selections={'wcs_id':['','0.0']}) + rows = np.where(rowind) + if len(rows[0]) > 0: + rownum = np.where(rowind)[0][0] + else: + print('No available rows found for updating. ') + + # Update selection columns for this row with relevant values + wcsext.data.field('WCS_ID')[rownum] = wcsid + wcsext.data.field('EXTVER')[rownum] = extver + wcsext.data.field('WCS_key')[rownum] = uwkey + + # Look for standard WCS keyword values + for key in wcs_keywords: + if key in wcsext.data.names: + wcsext.data.field(key)[rownum] = wcshdr[key + uwkey] + # Now get any keywords from PRIMARY header needed for WCS updates + for key in prihdr_keys: + if key in pri_funcs: + val = pri_funcs[key](fimg)[0] + else: + if key in prihdr: + val = prihdr[key] + else: + val = '' + wcsext.data.field(key)[rownum] = val + + # Append this table to the image FITS file + fimg.append(wcsext) + # force an update now + # set the verify flag to 'warn' so that it will always succeed, but still + # tell the user if PyFITS detects any problems with the file as a whole + utils.updateNEXTENDKw(fimg) + + fimg.flush('warn') + + if need_to_close: + fimg.close() + + +def find_wcscorr_row(wcstab, selections): + """ + Return an array of indices from the table (NOT HDU) 'wcstab' that matches the + selections specified by the user. + + The row selection criteria must be specified as a dictionary with + column name as key and value(s) representing the valid desired row values. + For example, {'wcs_id':'OPUS','extver':2}. + """ + + mask = None + for i in selections: + icol = wcstab.field(i) + if isinstance(icol,np.chararray): icol = icol.rstrip() + selecti = selections[i] + if not isinstance(selecti,list): + if isinstance(selecti,str): + selecti = selecti.rstrip() + bmask = (icol == selecti) + if mask is None: + mask = bmask.copy() + else: + mask = np.logical_and(mask,bmask) + del bmask + else: + for si in selecti: + if isinstance(si,str): + si = si.rstrip() + bmask = (icol == si) + if mask is None: + mask = bmask.copy() + else: + mask = np.logical_or(mask,bmask) + del bmask + + return mask + + +def archive_wcs_file(image, wcs_id=None): + """ + Update WCSCORR table with rows for each SCI extension to record the + newly updated WCS keyword values. + """ + + if not isinstance(image, fits.HDUList): + fimg = fits.open(image, mode='update') + close_image = True + else: + fimg = image + close_image = False + + update_wcscorr(fimg, wcs_id=wcs_id) + + if close_image: + fimg.close() + + +def update_wcscorr(dest, source=None, extname='SCI', wcs_id=None, active=True): + """ + Update WCSCORR table with a new row or rows for this extension header. It + copies the current set of WCS keywords as a new row of the table based on + keyed WCSs as per Paper I Multiple WCS standard). + + Parameters + ---------- + dest : HDUList + The HDU list whose WCSCORR table should be appended to (the WCSCORR HDU + must already exist) + source : HDUList, optional + The HDU list containing the extension from which to extract the WCS + keywords to add to the WCSCORR table. If None, the dest is also used + as the source. + extname : str, optional + The extension name from which to take new WCS keywords. If there are + multiple extensions with that name, rows are added for each extension + version. + wcs_id : str, optional + The name of the WCS to add, as in the WCSNAMEa keyword. If + unspecified, all the WCSs in the specified extensions are added. + active: bool, optional + When True, indicates that the update should reflect an update of the + active WCS information, not just appending the WCS to the file as a + headerlet + """ + if not isinstance(dest, fits.HDUList): + dest = fits.open(dest,mode='update') + fname = dest.filename() + + if source is None: + source = dest + + if extname == 'PRIMARY': + return + + numext = fileutil.countExtn(source, extname) + if numext == 0: + raise ValueError('No %s extensions found in the source HDU list.' + % extname) + # Initialize the WCSCORR table extension in dest if not already present + init_wcscorr(dest) + try: + dest.index_of('WCSCORR') + except KeyError: + return + + # check to see whether or not this is an up-to-date table + # replace with newly initialized table with current format + old_table = dest['WCSCORR'] + wcscorr_cols = ['WCS_ID','EXTVER', 'SIPNAME', + 'HDRNAME', 'NPOLNAME', 'D2IMNAME'] + + for colname in wcscorr_cols: + if colname not in old_table.data.columns.names: + print("WARNING: Replacing outdated WCSCORR table...") + outdated_table = old_table.copy() + del dest['WCSCORR'] + init_wcscorr(dest) + old_table = dest['WCSCORR'] + break + + # Current implementation assumes the same WCS keywords are in each + # extension version; if this should not be assumed then this can be + # modified... + wcs_keys = altwcs.wcskeys(source[(extname, 1)].header) + wcs_keys = [kk for kk in wcs_keys if kk] + if ' ' not in wcs_keys: wcs_keys.append(' ') # Insure that primary WCS gets used + # apply logic for only updating WCSCORR table with specified keywords + # corresponding to the WCS with WCSNAME=wcs_id + if wcs_id is not None: + wnames = altwcs.wcsnames(source[(extname, 1)].header) + wkeys = [] + for letter in wnames: + if wnames[letter] == wcs_id: + wkeys.append(letter) + if len(wkeys) > 1 and ' ' in wkeys: + wkeys.remove(' ') + wcs_keys = wkeys + wcshdr = stwcs.wcsutil.HSTWCS(source, ext=(extname, 1)).wcs2header() + wcs_keywords = list(wcshdr.keys()) + + if 'O' in wcs_keys: + wcs_keys.remove('O') # 'O' is reserved for original OPUS WCS + + # create new table for hdr and populate it with the newly updated values + new_table = create_wcscorr(descrip=True,numrows=0, padding=len(wcs_keys)*numext) + prihdr = source[0].header + + # Get headerlet related keywords here + sipname, idctab = utils.build_sipname(source, fname, "None") + npolname, npolfile = utils.build_npolname(source, None) + d2imname, d2imfile = utils.build_d2imname(source, None) + if 'hdrname' in prihdr: + hdrname = prihdr['hdrname'] + else: + hdrname = '' + + idx = -1 + for wcs_key in wcs_keys: + for extver in range(1, numext + 1): + extn = (extname, extver) + if 'SIPWCS' in extname and not active: + tab_extver = 0 # Since it has not been added to the SCI header yet + else: + tab_extver = extver + hdr = source[extn].header + if 'WCSNAME'+wcs_key in hdr: + wcsname = hdr['WCSNAME' + wcs_key] + else: + wcsname = utils.build_default_wcsname(hdr['idctab']) + + selection = {'WCS_ID': wcsname, 'EXTVER': tab_extver, + 'SIPNAME':sipname, 'HDRNAME': hdrname, + 'NPOLNAME': npolname, 'D2IMNAME':d2imname + } + + # Ensure that an entry for this WCS is not already in the dest + # table; if so just skip it + rowind = find_wcscorr_row(old_table.data, selection) + if np.any(rowind): + continue + + idx += 1 + + wcs = stwcs.wcsutil.HSTWCS(source, ext=extn, wcskey=wcs_key) + wcshdr = wcs.wcs2header() + + # Update selection column values + for key, val in selection.items(): + if key in new_table.data.names: + new_table.data.field(key)[idx] = val + + for key in wcs_keywords: + if key in new_table.data.names: + new_table.data.field(key)[idx] = wcshdr[key + wcs_key] + + for key in DEFAULT_PRI_KEYS: + if key in new_table.data.names and key in prihdr: + new_table.data.field(key)[idx] = prihdr[key] + # Now look for additional, non-WCS-keyword table column data + for key in COL_FITSKW_DICT: + fitkw = COL_FITSKW_DICT[key] + # Interpret any 'pri.hdrname' or + # 'sci.crpix1' formatted keyword names + if '.' in fitkw: + srchdr,fitkw = fitkw.split('.') + if 'pri' in srchdr.lower(): srchdr = prihdr + else: srchdr = source[extn].header + else: + srchdr = source[extn].header + + if fitkw+wcs_key in srchdr: + new_table.data.field(key)[idx] = srchdr[fitkw+wcs_key] + + + # If idx was never incremented, no rows were added, so there's nothing else + # to do... + if idx < 0: + return + + # Now, we need to merge this into the existing table + rowind = find_wcscorr_row(old_table.data, {'wcs_id':['','0.0']}) + old_nrows = np.where(rowind)[0][0] + new_nrows = new_table.data.shape[0] + + # check to see if there is room for the new row + if (old_nrows + new_nrows) > old_table.data.shape[0]-1: + pad_rows = 2 * new_nrows + # if not, create a new table with 'pad_rows' new empty rows + upd_table = fits.new_table(old_table.columns,header=old_table.header, + nrows=old_table.data.shape[0]+pad_rows) + else: + upd_table = old_table + pad_rows = 0 + # Now, add + for name in old_table.columns.names: + if name in new_table.data.names: + # reset the default values to ones specific to the row definitions + for i in range(pad_rows): + upd_table.data.field(name)[old_nrows+i] = old_table.data.field(name)[-1] + # Now populate with values from new table + upd_table.data.field(name)[old_nrows:old_nrows + new_nrows] = \ + new_table.data.field(name) + upd_table.header['TROWS'] = old_nrows + new_nrows + + # replace old extension with newly updated table extension + dest['WCSCORR'] = upd_table + + +def restore_file_from_wcscorr(image, id='OPUS', wcskey=''): + """ Copies the values of the WCS from the WCSCORR based on ID specified by user. + The default will be to restore the original OPUS-derived values to the Primary WCS. + If wcskey is specified, the WCS with that key will be updated instead. + """ + + if not isinstance(image, fits.HDUList): + fimg = fits.open(image, mode='update') + close_image = True + else: + fimg = image + close_image = False + numsci = fileutil.countExtn(fimg) + wcs_table = fimg['WCSCORR'] + orig_rows = (wcs_table.data.field('WCS_ID') == 'OPUS') + # create an HSTWCS object to figure out what WCS keywords need to be updated + wcsobj = stwcs.wcsutil.HSTWCS(fimg,ext=('sci',1)) + wcshdr = wcsobj.wcs2header() + for extn in range(1,numsci+1): + # find corresponding row from table + ext_rows = (wcs_table.data.field('EXTVER') == extn) + erow = np.where(np.logical_and(ext_rows,orig_rows))[0][0] + for key in wcshdr: + if key in wcs_table.data.names: # insure that keyword is column in table + tkey = key + + if 'orient' in key.lower(): + key = 'ORIENT' + if wcskey == '': + skey = key + else: + skey = key[:7]+wcskey + fimg['sci',extn].header[skey] = wcs_table.data.field(tkey)[erow] + for key in DEFAULT_PRI_KEYS: + if key in wcs_table.data.names: + if wcskey == '': + pkey = key + else: + pkey = key[:7]+wcskey + fimg[0].header[pkey] = wcs_table.data.field(key)[erow] + + utils.updateNEXTENDKw(fimg) + + # close the image now that the update has been completed. + if close_image: + fimg.close() + + +def create_wcscorr(descrip=False, numrows=1, padding=0): + """ + Return the basic definitions for a WCSCORR table. + The dtype definitions for the string columns are set to the maximum allowed so + that all new elements will have the same max size which will be automatically + truncated to this limit upon updating (if needed). + + The table is initialized with rows corresponding to the OPUS solution + for all the 'SCI' extensions. + """ + + trows = numrows + padding + # define initialized arrays as placeholders for column data + # TODO: I'm certain there's an easier way to do this... for example, simply + # define the column names and formats, then create an empty array using + # them as a dtype, then create the new table from that array. + def_float64_zeros = np.array([0.0] * trows, dtype=np.float64) + def_float64_ones = def_float64_zeros + 1.0 + def_float_col = {'format': 'D', 'array': def_float64_zeros.copy()} + def_float1_col = {'format': 'D', 'array':def_float64_ones.copy()} + def_str40_col = {'format': '40A', + 'array': np.array([''] * trows, dtype='S40')} + def_str24_col = {'format': '24A', + 'array': np.array([''] * trows, dtype='S24')} + def_int32_col = {'format': 'J', + 'array': np.array([0]*trows,dtype=np.int32)} + + # If more columns are needed, simply add their definitions to this list + col_names = [('HDRNAME', def_str24_col), ('SIPNAME', def_str24_col), + ('NPOLNAME', def_str24_col), ('D2IMNAME', def_str24_col), + ('CRVAL1', def_float_col), ('CRVAL2', def_float_col), + ('CRPIX1', def_float_col), ('CRPIX2', def_float_col), + ('CD1_1', def_float_col), ('CD1_2', def_float_col), + ('CD2_1', def_float_col), ('CD2_2', def_float_col), + ('CTYPE1', def_str24_col), ('CTYPE2', def_str24_col), + ('ORIENTAT', def_float_col), ('PA_V3', def_float_col), + ('RMS_RA', def_float_col), ('RMS_Dec', def_float_col), + ('NMatch', def_int32_col), ('Catalog', def_str40_col)] + + # Define selector columns + id_col = fits.Column(name='WCS_ID', format='40A', + array=np.array(['OPUS'] * numrows + [''] * padding, + dtype='S24')) + extver_col = fits.Column(name='EXTVER', format='I', + array=np.array(list(range(1, numrows + 1)), + dtype=np.int16)) + wcskey_col = fits.Column(name='WCS_key', format='A', + array=np.array(['O'] * numrows + [''] * padding, + dtype='S')) + # create list of remaining columns to be added to table + col_list = [id_col, extver_col, wcskey_col] # start with selector columns + + for c in col_names: + cdef = copy.deepcopy(c[1]) + col_list.append(fits.Column(name=c[0], format=cdef['format'], + array=cdef['array'])) + + if descrip: + col_list.append( + fits.Column(name='DESCRIP', format='128A', + array=np.array( + ['Original WCS computed by OPUS'] * numrows, + dtype='S128'))) + + # Now create the new table from the column definitions + newtab = fits.new_table(fits.ColDefs(col_list), nrows=trows) + # The fact that setting .name is necessary should be considered a bug in + # pyfits. + # TODO: Make sure this is fixed in pyfits, then remove this + newtab.name = 'WCSCORR' + + return newtab + +def delete_wcscorr_row(wcstab,selections=None,rows=None): + """ + Sets all values in a specified row or set of rows to default values + + This function will essentially erase the specified row from the table + without actually removing the row from the table. This avoids the problems + with trying to resize the number of rows in the table while preserving the + ability to update the table with new rows again without resizing the table. + + Parameters + ---------- + wcstab: object + PyFITS binTable object for WCSCORR table + selections: dict + Dictionary of wcscorr column names and values to be used to select + the row or set of rows to erase + rows: int, list + If specified, will specify what rows from the table to erase regardless + of the value of 'selections' + """ + + if selections is None and rows is None: + print('ERROR: Some row selection information must be provided!') + print(' Either a row numbers or "selections" must be provided.') + raise ValueError + + delete_rows = None + if rows is None: + if 'wcs_id' in selections and selections['wcs_id'] == 'OPUS': + delete_rows = None + print('WARNING: OPUS WCS information can not be deleted from WCSCORR table.') + print(' This row will not be deleted!') + else: + rowind = find_wcscorr_row(wcstab, selections=selections) + delete_rows = np.where(rowind)[0].tolist() + else: + if not isinstance(rows,list): + rows = [rows] + delete_rows = rows + + # Insure that rows pointing to OPUS WCS do not get deleted, even by accident + for row in delete_rows: + if wcstab['WCS_key'][row] == 'O' or wcstab['WCS_ID'][row] == 'OPUS': + del delete_rows[delete_rows.index(row)] + + if delete_rows is None: + return + + # identify next empty row + rowind = find_wcscorr_row(wcstab, selections={'wcs_id':['','0.0']}) + last_blank_row = np.where(rowind)[0][-1] + + # copy values from blank row into user-specified rows + for colname in wcstab.names: + wcstab[colname][delete_rows] = wcstab[colname][last_blank_row] + +def update_wcscorr_column(wcstab, column, values, selections=None, rows=None): + """ + Update the values in 'column' with 'values' for selected rows + + Parameters + ---------- + wcstab: object + PyFITS binTable object for WCSCORR table + column: string + Name of table column with values that need to be updated + values: string, int, or list + Value or set of values to copy into the selected rows for the column + selections: dict + Dictionary of wcscorr column names and values to be used to select + the row or set of rows to erase + rows: int, list + If specified, will specify what rows from the table to erase regardless + of the value of 'selections' + """ + if selections is None and rows is None: + print('ERROR: Some row selection information must be provided!') + print(' Either a row numbers or "selections" must be provided.') + raise ValueError + + if not isinstance(values, list): + values = [values] + + update_rows = None + if rows is None: + if 'wcs_id' in selections and selections['wcs_id'] == 'OPUS': + update_rows = None + print('WARNING: OPUS WCS information can not be deleted from WCSCORR table.') + print(' This row will not be deleted!') + else: + rowind = find_wcscorr_row(wcstab, selections=selections) + update_rows = np.where(rowind)[0].tolist() + else: + if not isinstance(rows,list): + rows = [rows] + update_rows = rows + + if update_rows is None: + return + + # Expand single input value to apply to all selected rows + if len(values) > 1 and len(values) < len(update_rows): + print('ERROR: Number of new values',len(values)) + print(' does not match number of rows',len(update_rows),' to be updated!') + print(' Please enter either 1 value or the same number of values') + print(' as there are rows to be updated.') + print(' Table will not be updated...') + raise ValueError + + if len(values) == 1 and len(values) < len(update_rows): + values = values * len(update_rows) + # copy values from blank row into user-specified rows + for row in update_rows: + wcstab[column][row] = values[row] diff --git a/stwcs/wcsutil/wcsdiff.py b/stwcs/wcsutil/wcsdiff.py new file mode 100644 index 0000000..cfc2d66 --- /dev/null +++ b/stwcs/wcsutil/wcsdiff.py @@ -0,0 +1,150 @@ +from __future__ import print_function +from astropy import wcs as pywcs +from collections import OrderedDict +from astropy.io import fits +from .headerlet import parse_filename +import numpy as np + +def is_wcs_identical(scifile, file2, sciextlist, fextlist, scikey=" ", + file2key=" ", verbose=False): + """ + Compares the WCS solution of 2 files. + + Parameters + ---------- + scifile: string + name of file1 (usually science file) + IRAF style extension syntax is accepted as well + for example scifile[1] or scifile[sci,1] + file2: string + name of second file (for example headerlet) + sciextlist - list + a list of int or tuple ('SCI', 1), extensions in the first file + fextlist - list + a list of int or tuple ('SIPWCS', 1), extensions in the second file + scikey: string + alternate WCS key in scifile + file2key: string + alternate WCS key in file2 + verbose: boolean + True: print to stdout + + Notes + ----- + These can be 2 science observations or 2 headerlets + or a science observation and a headerlet. The two files + have the same WCS solution if the following are the same: + + - rootname/destim + - primary WCS + - SIP coefficients + - NPOL distortion + - D2IM correction + + """ + result = True + diff = OrderedDict() + fobj, fname, close_file = parse_filename(file2) + sciobj, sciname, close_scifile = parse_filename(scifile) + diff['file_names'] = [scifile, file2] + if get_rootname(scifile) != get_rootname(file2): + #logger.info('Rootnames do not match.') + diff['rootname'] = ("%s: %s", "%s: %s") % (sciname, get_rootname(scifile), file2, get_rootname(file2)) + result = False + for i, j in zip(sciextlist, fextlist): + w1 = pywcs.WCS(sciobj[i].header, sciobj, key=scikey) + w2 = pywcs.WCS(fobj[j].header, fobj, key=file2key) + diff['extension'] = [get_extname_extnum(sciobj[i]), get_extname_extnum(fobj[j])] + if not np.allclose(w1.wcs.crval, w2.wcs.crval, rtol=10**(-7)): + #logger.info('CRVALs do not match') + diff['CRVAL'] = w1.wcs.crval, w2.wcs.crval + result = False + if not np.allclose(w1.wcs.crpix, w2.wcs.crpix, rtol=10**(-7)): + #logger.info('CRPIX do not match') + diff ['CRPIX'] = w1.wcs.crpix, w2.wcs.crpix + result = False + if not np.allclose(w1.wcs.cd, w2.wcs.cd, rtol=10**(-7)): + #logger.info('CDs do not match') + diff ['CD'] = w1.wcs.cd, w2.wcs.cd + result = False + if not (np.array(w1.wcs.ctype) == np.array(w2.wcs.ctype)).all(): + #logger.info('CTYPEs do not match') + diff ['CTYPE'] = w1.wcs.ctype, w2.wcs.ctype + result = False + if w1.sip or w2.sip: + if (w2.sip and not w1.sip) or (w1.sip and not w2.sip): + diff['sip'] = 'one sip extension is missing' + result = False + if not np.allclose(w1.sip.a, w2.sip.a, rtol=10**(-7)): + diff['SIP_A'] = 'SIP_A differ' + result = False + if not np.allclose(w1.sip.b, w2.sip.b, rtol=10**(-7)): + #logger.info('SIP coefficients do not match') + diff ['SIP_B'] = (w1.sip.b, w2.sip.b) + result = False + if w1.cpdis1 or w2.cpdis1: + if w1.cpdis1 and not w2.cpdis1 or w2.cpdis1 and not w1.cpdis1: + diff['CPDIS1'] = "CPDIS1 missing" + result=False + if w1.cpdis2 and not w2.cpdis2 or w2.cpdis2 and not w1.cpdis2: + diff['CPDIS2'] = "CPDIS2 missing" + result = False + if not np.allclose(w1.cpdis1.data, w2.cpdis1.data, rtol=10**(-7)): + #logger.info('NPOL distortions do not match') + diff ['CPDIS1_data'] = (w1.cpdis1.data, w2.cpdis1.data) + result = False + if not np.allclose(w1.cpdis2.data, w2.cpdis2.data, rtol=10**(-7)): + #logger.info('NPOL distortions do not match') + diff ['CPDIS2_data'] = (w1.cpdis2.data, w2.cpdis2.data) + result = False + if w1.det2im1 or w2.det2im1: + if w1.det2im1 and not w2.det2im1 or \ + w2.det2im1 and not w1.det2im1: + diff['DET2IM'] = "Det2im1 missing" + result = False + if not np.allclose(w1.det2im1.data, w2.det2im1.data, rtol=10**(-7)): + #logger.info('Det2Im corrections do not match') + diff ['D2IM1_data'] = (w1.det2im1.data, w2.det2im1.data) + result = False + if w1.det2im2 or w2.det2im2: + if w1.det2im2 and not w2.det2im2 or \ + w2.det2im2 and not w1.det2im2: + diff['DET2IM2'] = "Det2im2 missing" + result = False + if not np.allclose(w1.det2im2.data, w2.det2im2.data, rtol=10**(-7)): + #logger.info('Det2Im corrections do not match') + diff ['D2IM2_data'] = (w1.det2im2.data, w2.det2im2.data) + result = False + if not result and verbose: + for key in diff: + print(key, ":\t", diff[key][0], "\t", diff[key][1]) + if close_file: + fobj.close() + if close_scifile: + sciobj.close() + return result, diff + +def get_rootname(fname): + """ + Returns the value of ROOTNAME or DESTIM + """ + + hdr = fits.getheader(fname) + try: + rootname = hdr['ROOTNAME'] + except KeyError: + try: + rootname = hdr['DESTIM'] + except KeyError: + rootname = fname + return rootname + +def get_extname_extnum(ext): + """ + Return (EXTNAME, EXTNUM) of a FITS extension + """ + extname = "" + extnum=1 + extname = ext.header.get('EXTNAME', extname) + extnum = ext.header.get('EXTVER', extnum) + return (extname, extnum) |