summaryrefslogtreecommitdiff
path: root/lib/stwcs/wcsutil
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stwcs/wcsutil')
-rw-r--r--lib/stwcs/wcsutil/__init__.py34
-rw-r--r--lib/stwcs/wcsutil/altwcs.py758
-rw-r--r--lib/stwcs/wcsutil/convertwcs.py118
-rw-r--r--lib/stwcs/wcsutil/getinput.py62
-rw-r--r--lib/stwcs/wcsutil/headerlet.py2754
-rw-r--r--lib/stwcs/wcsutil/hstwcs.py988
-rw-r--r--lib/stwcs/wcsutil/instruments.py320
-rw-r--r--lib/stwcs/wcsutil/mappings.py29
-rw-r--r--lib/stwcs/wcsutil/mosaic.py183
-rw-r--r--lib/stwcs/wcsutil/wcscorr.py668
-rw-r--r--lib/stwcs/wcsutil/wcsdiff.py150
11 files changed, 0 insertions, 6064 deletions
diff --git a/lib/stwcs/wcsutil/__init__.py b/lib/stwcs/wcsutil/__init__.py
deleted file mode 100644
index 65280be..0000000
--- a/lib/stwcs/wcsutil/__init__.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from __future__ import absolute_import, print_function # confidence high
-
-from .altwcs import *
-from .hstwcs import HSTWCS
-
-
-def help():
- doc = """ \
- 1. Using a `astropy.io.fits.HDUList` object and an extension number \n
- Example:\n
- from astropy.io improt fits
- fobj = fits.open('some_file.fits')\n
- w = wcsutil.HSTWCS(fobj, 3)\n\n
-
- 2. Create an HSTWCS object using a qualified file name. \n
- Example:\n
- w = wcsutil.HSTWCS('j9irw4b1q_flt.fits[sci,1]')\n\n
-
- 3. Create an HSTWCS object using a file name and an extension number. \n
- Example:\n
- w = wcsutil.HSTWCS('j9irw4b1q_flt.fits', ext=2)\n\n
-
- 4. Create an HSTWCS object from WCS with key 'O'.\n
- Example:\n
-
- w = wcsutil.HSTWCS('j9irw4b1q_flt.fits', ext=2, wcskey='O')\n\n
-
- 5. Create a template HSTWCS object for a DEFAULT object.\n
- Example:\n
- w = wcsutil.HSTWCS(instrument='DEFAULT')\n\n
- """
-
- print('How to create an HSTWCS object:\n\n')
- print(doc)
diff --git a/lib/stwcs/wcsutil/altwcs.py b/lib/stwcs/wcsutil/altwcs.py
deleted file mode 100644
index aae1a9f..0000000
--- a/lib/stwcs/wcsutil/altwcs.py
+++ /dev/null
@@ -1,758 +0,0 @@
-from __future__ import division, print_function # confidence high
-import os
-import string
-
-import numpy as np
-from astropy import wcs as pywcs
-from astropy.io import fits
-from stsci.tools import fileutil as fu
-
-altwcskw = ['WCSAXES', 'CRVAL', 'CRPIX', 'PC', 'CDELT', 'CD', 'CTYPE', 'CUNIT',
- 'PV', 'PS']
-altwcskw_extra = ['LATPOLE','LONPOLE','RESTWAV','RESTFRQ']
-
-# file operations
-def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", reusekey=False):
- """
- Copy the primary WCS to the header as an alternate WCS
- with wcskey and name WCSNAME. It loops over all extensions in 'ext'
-
- Parameters
- ----------
- fname : string or `astropy.io.fits.HDUList`
- file name or a file object
- ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1))
- fits extensions to work with
- If a string is provided, it should specify the EXTNAME of extensions
- with WCSs to be archived
- wcskey : string "A"-"Z" or " "
- if " ": get next available key if wcsname is also " " or try
- to get a key from WCSNAME value
- wcsname : string
- Name of alternate WCS description
- reusekey : boolean
- if True - overwrites a WCS with the same key
-
- Examples
- --------
- Copy the primary WCS of an in memory headrlet object to an
- alternate WCS with key 'T'
-
- >>> hlet=headerlet.createHeaderlet('junk.fits', 'hdr1.fits')
- >>> altwcs.wcskeys(hlet[1].header)
- ['A']
- >>> altwcs.archiveWCS(hlet, ext=[('SIPWCS',1),('SIPWCS',2)], wcskey='T')
- >>> altwcs.wcskeys(hlet[1].header)
- ['A', 'T']
-
-
- See Also
- --------
- wcsutil.restoreWCS: Copy an alternate WCS to the primary WCS
-
- """
-
- if isinstance(fname, str):
- f = fits.open(fname, mode='update')
- else:
- f = fname
-
- if not _parpasscheck(f, ext, wcskey, wcsname):
- closefobj(fname, f)
- raise ValueError("Input parameters problem")
-
- # Interpret input 'ext' value to get list of extensions to process
- ext = _buildExtlist(f, ext)
-
- if not wcskey and not wcsname:
- raise KeyError("Either wcskey or wcsname should be specified")
-
- if wcsname.strip() == "":
- try:
- wcsname = readAltWCS(f, ext[0], wcskey=" ")['WCSNAME']
- except KeyError:
- pass
- wcsext = ext[0]
- if wcskey != " " and wcskey in wcskeys(f[wcsext].header) and not reusekey:
- closefobj(fname, f)
- raise KeyError("Wcskey %s is aready used. \
- Run archiveWCS() with reusekey=True to overwrite this alternate WCS. \
- Alternatively choose another wcskey with altwcs.available_wcskeys()." %wcskey)
- elif wcskey == " ":
- # wcsname exists, overwrite it if reuse is True or get the next key
- if wcsname.strip() in wcsnames(f[wcsext].header).values():
- if reusekey:
- # try getting the key from an existing WCS with WCSNAME
- wkey = getKeyFromName(f[wcsext].header, wcsname)
- wname = wcsname
- if wkey == ' ':
- wkey = next_wcskey(f[wcsext].header)
- elif wkey is None:
- closefobj(fname, f)
- raise KeyError("Could not get a valid wcskey from wcsname %s" %wcsname)
- else:
- closefobj(fname, f)
- raise KeyError("Wcsname %s is aready used. \
- Run archiveWCS() with reusekey=True to overwrite this alternate WCS. \
- Alternatively choose another wcskey with altwcs.available_wcskeys() or\
- choose another wcsname." %wcsname)
- else:
- wkey = next_wcskey(f[wcsext].header)
- if wcsname.strip():
- wname = wcsname
- else:
- # determine which WCSNAME needs to be replicated in archived WCS
- wnames = wcsnames(f[wcsext].header)
- if 'O' in wnames: del wnames['O'] # we don't want OPUS/original
- if len(wnames) > 0:
- if ' ' in wnames:
- wname = wnames[' ']
- else:
- akeys = string.uppercase
- wname = "DEFAULT"
- for key in akeys[-1::]:
- if key in wnames:
- wname = wnames
- break
- else:
- wname = "DEFAULT"
- else:
- wkey = wcskey
- wname = wcsname
-
- for e in ext:
- hwcs = readAltWCS(f,e,wcskey=' ')
- if hwcs is None:
- continue
-
- wcsnamekey = 'WCSNAME' + wkey
- f[e].header[wcsnamekey] = wname
-
- try:
- old_wcsname=hwcs.pop('WCSNAME')
- except:
- pass
-
- for k in hwcs.keys():
- key = k[:7] + wkey
- f[e].header[key] = hwcs[k]
- closefobj(fname, f)
-
-def restore_from_to(f, fromext=None, toext=None, wcskey=" ", wcsname=" "):
- """
- Copy an alternate WCS from one extension as a primary WCS of another extension
-
- Reads in a WCS defined with wcskey and saves it as the primary WCS.
- Goes sequentially through the list of extensions in ext.
- Alternatively uses 'fromext' and 'toext'.
-
-
- Parameters
- ----------
- f: string or `astropy.io.fits.HDUList`
- a file name or a file object
- fromext: string
- extname from which to read in the alternate WCS, for example 'SCI'
- toext: string or python list
- extname or a list of extnames to which the WCS will be copied as
- primary, for example ['SCI', 'ERR', 'DQ']
- wcskey: a charater
- "A"-"Z" - Used for one of 26 alternate WCS definitions.
- or " " - find a key from WCSNAMe value
- wcsname: string (optional)
- if given and wcskey is " ", will try to restore by WCSNAME value
-
- See Also
- --------
- archiveWCS - copy the primary WCS as an alternate WCS
- restoreWCS - Copy a WCS with key "WCSKEY" to the primary WCS
-
- """
- if isinstance(f, str):
- fobj = fits.open(f, mode='update')
- else:
- fobj = f
-
- if not _parpasscheck(fobj, ext=None, wcskey=wcskey, fromext=fromext, toext=toext):
- closefobj(f, fobj)
- raise ValueError("Input parameters problem")
-
- # Interpret input 'ext' value to get list of extensions to process
- #ext = _buildExtlist(fobj, ext)
-
- if isinstance(toext, str):
- toext = [toext]
-
- # the case of an HDUList object in memory without an associated file
-
- #if fobj.filename() is not None:
- # name = fobj.filename()
-
- simplefits = fu.isFits(fobj)[1] is 'simple'
- if simplefits:
- wcskeyext = 0
- else:
- wcskeyext = 1
-
- if wcskey == " ":
- if wcsname.strip():
- wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
- if not wkey:
- closefobj(f, fobj)
- raise KeyError("Could not get a key from wcsname %s ." % wcsname)
- else:
- if wcskey not in wcskeys(fobj, ext=wcskeyext):
- print("Could not find alternate WCS with key %s in this file" % wcskey)
- closefobj(f, fobj)
- return
- wkey = wcskey
-
- countext = fu.countExtn(fobj, fromext)
- if not countext:
- raise KeyError("File does not have extension with extname %s", fromext)
- else:
- for i in range(1, countext+1):
- for toe in toext:
- _restore(fobj, fromextnum=i, fromextnam=fromext, toextnum=i, toextnam=toe, ukey=wkey)
-
- if fobj.filename() is not None:
- #fobj.writeto(name)
- closefobj(f, fobj)
-
-def restoreWCS(f, ext, wcskey=" ", wcsname=" "):
- """
- Copy a WCS with key "WCSKEY" to the primary WCS
-
- Reads in a WCS defined with wcskey and saves it as the primary WCS.
- Goes sequentially through the list of extensions in ext.
- Alternatively uses 'fromext' and 'toext'.
-
-
- Parameters
- ----------
- f : str or `astropy.io.fits.HDUList`
- file name or a file object
- ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1))
- fits extensions to work with
- If a string is provided, it should specify the EXTNAME of extensions
- with WCSs to be archived
- wcskey : str
- "A"-"Z" - Used for one of 26 alternate WCS definitions.
- or " " - find a key from WCSNAMe value
- wcsname : str
- (optional) if given and wcskey is " ", will try to restore by WCSNAME value
-
- See Also
- --------
- archiveWCS - copy the primary WCS as an alternate WCS
- restore_from_to
-
- """
- if isinstance(f, str):
- fobj = fits.open(f, mode='update')
- else:
- fobj = f
-
- if not _parpasscheck(fobj, ext=ext, wcskey=wcskey):
- closefobj(f, fobj)
- raise ValueError("Input parameters problem")
-
- # Interpret input 'ext' value to get list of extensions to process
- ext = _buildExtlist(fobj, ext)
-
-
- # the case of an HDUList object in memory without an associated file
-
- #if fobj.filename() is not None:
- # name = fobj.filename()
-
- simplefits = fu.isFits(fobj)[1] is 'simple'
- if simplefits:
- wcskeyext = 0
- else:
- wcskeyext = 1
-
- if wcskey == " ":
- if wcsname.strip():
- wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
- if not wkey:
- closefobj(f, fobj)
- raise KeyError("Could not get a key from wcsname %s ." % wcsname)
- else:
- if wcskey not in wcskeys(fobj, ext=wcskeyext):
- #print "Could not find alternate WCS with key %s in this file" % wcskey
- closefobj(f, fobj)
- return
- wkey = wcskey
-
- for e in ext:
- _restore(fobj, wkey, fromextnum=e, verbose=False)
-
- if fobj.filename() is not None:
- #fobj.writeto(name)
- closefobj(f, fobj)
-
-def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):
- """
- Delete an alternate WCS defined with wcskey.
- If wcskey is " " try to get a key from WCSNAME.
-
- Parameters
- ----------
- fname : str or a `astropy.io.fits.HDUList`
- ext : int, tuple, str, or list of integers or tuples (e.g.('sci',1))
- fits extensions to work with
- If a string is provided, it should specify the EXTNAME of extensions
- with WCSs to be archived
- wcskey : str
- one of 'A'-'Z' or " "
- wcsname : str
- Name of alternate WCS description
- """
- if isinstance(fname, str):
- fobj = fits.open(fname, mode='update')
- else:
- fobj = fname
-
- if not _parpasscheck(fobj, ext, wcskey, wcsname):
- closefobj(fname, fobj)
- raise ValueError("Input parameters problem")
-
- # Interpret input 'ext' value to get list of extensions to process
- ext = _buildExtlist(fobj, ext)
- # Do not allow deleting the original WCS.
- if wcskey == 'O':
- print("Wcskey 'O' is reserved for the original WCS and should not be deleted.")
- closefobj(fname, fobj)
- return
-
- wcskeyext = ext[0]
-
- if not wcskeys and not wcsname:
- raise KeyError("Either wcskey or wcsname should be specified")
-
- if wcskey == " ":
- # try getting the key from WCSNAME
- wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)
- if not wkey:
- closefobj(fname, fobj)
- raise KeyError("Could not get a key: wcsname '%s' not found in header." % wcsname)
- else:
- if wcskey not in wcskeys(fobj[wcskeyext].header):
- closefobj(fname, fobj)
- raise KeyError("Could not find alternate WCS with key %s in this file" % wcskey)
- wkey = wcskey
-
- prexts = []
- for i in ext:
- hdr = fobj[i].header
- hwcs = readAltWCS(fobj,i,wcskey=wkey)
- if hwcs is None:
- continue
- for k in hwcs[::-1]:
- del hdr[k]
- #del hdr['ORIENT'+wkey]
- prexts.append(i)
- if prexts != []:
- print('Deleted all instances of WCS with key %s in extensions' % wkey, prexts)
- else:
- print("Did not find WCS with key %s in any of the extensions" % wkey)
- closefobj(fname, fobj)
-
-def _buildExtlist(fobj, ext):
- """
- Utility function to interpret the provided value of 'ext' and return a list
- of 'valid' values which can then be used by the rest of the functions in
- this module.
-
- Parameters
- ----------
- fobj: HDUList
- file to be examined
- ext: an int, a tuple, string, list of integers or tuples (e.g.('sci',1))
- fits extensions to work with
- If a string is provided, it should specify the EXTNAME of extensions
- with WCSs to be archived
- """
- if not isinstance(ext,list):
- if isinstance(ext,str):
- extstr = ext
- ext = []
- for extn in range(1, len(fobj)):
- if 'extname' in fobj[extn].header and fobj[extn].header['extname'] == extstr:
- ext.append(extn)
- elif isinstance(ext, int) or isinstance(ext, tuple):
- ext = [ext]
- else:
- raise KeyError("Valid extensions in 'ext' parameter need to be specified.")
- return ext
-
-def _restore(fobj, ukey, fromextnum,
- toextnum=None, fromextnam=None, toextnam=None, verbose=True):
- """
- fobj: string of HDUList
- ukey: string 'A'-'Z'
- wcs key
- fromextnum: int
- extver of extension from which to copy WCS
- fromextnam: string
- extname of extension from which to copy WCS
- toextnum: int
- extver of extension to which to copy WCS
- toextnam: string
- extname of extension to which to copy WCS
- """
- # create an extension tuple, e.g. ('SCI', 2)
- if fromextnam:
- fromextension = (fromextnam, fromextnum)
- else:
- fromextension = fromextnum
- if toextnum:
- if toextnam:
- toextension = (toextnam, toextnum)
- else:
- toextension =toextnum
- else:
- toextension = fromextension
-
- hwcs = readAltWCS(fobj,fromextension,wcskey=ukey,verbose=verbose)
- if hwcs is None:
- return
-
- for k in hwcs.keys():
- key = k[:-1]
- if key in fobj[toextension].header:
- #fobj[toextension].header.update(key=key, value = hwcs[k])
- fobj[toextension].header[key] = hwcs[k]
- else:
- continue
- if key == 'O' and 'TDDALPHA' in fobj[toextension].header:
- fobj[toextension].header['TDDALPHA'] = 0.0
- fobj[toextension].header['TDDBETA'] = 0.0
- if 'ORIENTAT' in fobj[toextension].header:
- norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %ukey], hwcs['CD2_2'+'%s' %ukey]))
- fobj[toextension].header['ORIENTAT'] = norient
- # Reset 2014 TDD keywords prior to computing new values (if any are computed)
- for kw in ['TDD_CYA','TDD_CYB','TDD_CXA','TDD_CXB']:
- if kw in fobj[toextension].header:
- fobj[toextension].header[kw] = 0.0
-
-#header operations
-def _check_headerpars(fobj, ext):
- if not isinstance(fobj, fits.Header) and not isinstance(fobj, fits.HDUList) \
- and not isinstance(fobj, str):
- raise ValueError("Expected a file name, a file object or a header\n")
-
- if not isinstance(fobj, fits.Header):
- #raise ValueError("Expected a valid ext parameter when input is a file")
- if not isinstance(ext, int) and not isinstance(ext, tuple):
- raise ValueError("Expected ext to be a number or a tuple, e.g. ('SCI', 1)\n")
-
-def _getheader(fobj, ext):
- if isinstance(fobj, str):
- hdr = fits.getheader(fobj,ext)
- elif isinstance(fobj, fits.Header):
- hdr = fobj
- else:
- hdr = fobj[ext].header
- return hdr
-
-def readAltWCS(fobj, ext, wcskey=' ',verbose=False):
- """ Reads in alternate WCS from specified extension
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- fits filename or fits file object
- containing alternate/primary WCS(s) to be converted
- wcskey : str
- [" ",A-Z]
- alternate/primary WCS key that will be replaced by the new key
- ext : int
- fits extension number
-
- Returns
- -------
- hdr: fits.Header
- header object with ONLY the keywords for specified alternate WCS
- """
- if isinstance(fobj, str):
- fobj = fits.open(fobj)
-
- hdr = _getheader(fobj,ext)
- try:
- nwcs = pywcs.WCS(hdr, fobj=fobj, key=wcskey)
- except KeyError:
- if verbose:
- print('readAltWCS: Could not read WCS with key %s' %wcskey)
- print(' Skipping %s[%s]' % (fobj.filename(), str(ext)))
- return None
- hwcs = nwcs.to_header()
-
- if nwcs.wcs.has_cd():
- hwcs = pc2cd(hwcs, key=wcskey)
-
- return hwcs
-
-def convertAltWCS(fobj,ext,oldkey=" ",newkey=' '):
- """
- Translates the alternate/primary WCS with one key to an alternate/primary WCS with
- another key.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`, or `astropy.io.fits.Header`
- fits filename, fits file object or fits header
- containing alternate/primary WCS(s) to be converted
- ext : int
- extension number
- oldkey : str
- [" ",A-Z]
- alternate/primary WCS key that will be replaced by the new key
- newkey : str
- [" ",A-Z]
- new alternate/primary WCS key
-
- Returns
- -------
- hdr: `astropy.io.fits.Header`
- header object with keywords renamed from oldkey to newkey
- """
- hdr = readAltWCS(fobj,ext,wcskey=oldkey)
- if hdr is None:
- return None
- # Converting WCS to new key
- for card in hdr:
- if oldkey == ' ' or oldkey == '':
- cname = card
- else:
- cname = card.rstrip(oldkey)
- hdr.rename_key(card,cname+newkey,force=True)
-
- return hdr
-
-def wcskeys(fobj, ext=None):
- """
- Returns a list of characters used in the header for alternate
- WCS description with WCSNAME keyword
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
- fits file name, fits file object or fits header
- ext : int or None
- extension number
- if None, fobj must be a header
- """
- _check_headerpars(fobj, ext)
- hdr = _getheader(fobj, ext)
- names = hdr["WCSNAME*"]
- d = []
- for key in names:
- wkey = key.replace('WCSNAME','')
- if wkey == '': wkey = ' '
- d.append(wkey)
- return d
-
-def wcsnames(fobj, ext=None):
- """
- Returns a dictionary of wcskey: WCSNAME pairs
-
- Parameters
- ----------
- fobj : stri, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
- fits file name, fits file object or fits header
- ext : int or None
- extension number
- if None, fobj must be a header
-
- """
- _check_headerpars(fobj, ext)
- hdr = _getheader(fobj, ext)
- names = hdr["WCSNAME*"]
- d = {}
- for keyword, value in names.items():
- wkey = keyword.replace('WCSNAME','')
- if wkey == '': wkey = ' '
- d[wkey] = value
- return d
-
-def available_wcskeys(fobj, ext=None):
- """
- Returns a list of characters which are not used in the header
- with WCSNAME keyword. Any of them can be used to save a new
- WCS.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
- fits file name, fits file object or fits header
- ext : int or None
- extension number
- if None, fobj must be a header
- """
- _check_headerpars(fobj, ext)
- hdr = _getheader(fobj, ext)
- all_keys = list(string.ascii_uppercase)
- used_keys = wcskeys(hdr)
- try:
- used_keys.remove(" ")
- except ValueError:
- pass
- [all_keys.remove(key) for key in used_keys]
- return all_keys
-
-def next_wcskey(fobj, ext=None):
- """
- Returns next available character to be used for an alternate WCS
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList` or `astropy.io.fits.Header`
- fits file name, fits file object or fits header
- ext : int or None
- extension number
- if None, fobj must be a header
- """
- _check_headerpars(fobj, ext)
- hdr = _getheader(fobj, ext)
- allkeys = available_wcskeys(hdr)
- if allkeys != []:
- return allkeys[0]
- else:
- return None
-
-def getKeyFromName(header, wcsname):
- """
- If WCSNAME is found in header, return its key, else return
- None. This is used to update an alternate WCS
- repeatedly and not generate new keys every time.
-
- Parameters
- ----------
- header : `astropy.io.fits.Header`
- wcsname : str
- value of WCSNAME
- """
- wkey = None
- names = wcsnames(header)
- wkeys = []
- for item in names.items():
- if item[1].lower() == wcsname.lower():
- wkeys.append(item[0])
- wkeys.sort()
- if len(wkeys) > 0:
- wkey = wkeys[-1]
- else:
- wkey = None
- return wkey
-
-def pc2cd(hdr, key=' '):
- """
- Convert a CD PC matrix to a CD matrix.
-
- WCSLIB (and PyWCS) recognizes CD keywords as input
- but converts them and works internally with the PC matrix.
- to_header() returns the PC matrix even if the i nput was a
- CD matrix. To keep input and output consistent we check
- for has_cd and convert the PC back to CD.
-
- Parameters
- ----------
- hdr: `astropy.io.fits.Header`
-
- """
- for c in ['1_1', '1_2', '2_1', '2_2']:
- try:
- val = hdr['PC'+c+'%s' % key]
- del hdr['PC'+c+ '%s' % key]
- except KeyError:
- if c=='1_1' or c == '2_2':
- val = 1.
- else:
- val = 0.
- #hdr.update(key='CD'+c+'%s' %key, value=val)
- hdr['CD{0}{1}'.format(c, key)] = val
- return hdr
-
-def _parpasscheck(fobj, ext, wcskey, fromext=None, toext=None, reusekey=False):
- """
- Check input parameters to altwcs functions
-
- fobj : str or `astropy.io.fits.HDUList` object
- a file name or a file object
- ext : int, a tuple, a python list of integers or a python list
- of tuples (e.g.('sci',1))
- fits extensions to work with
- wcskey : str
- "A"-"Z" or " "- Used for one of 26 alternate WCS definitions
- wcsname : str
- (optional)
- if given and wcskey is " ", will try to restore by WCSNAME value
- reusekey : bool
- A flag which indicates whether to reuse a wcskey in the header
- """
- if not isinstance(fobj, fits.HDUList):
- print("First parameter must be a fits file object or a file name.")
- return False
-
- # first one covers the case of an object created in memory
- # (e.g. headerlet) for which fileinfo returns None
- if fobj.fileinfo(0) is None:
- pass
- else:
- # an HDUList object with associated file
- if fobj.fileinfo(0)['filemode'] is not 'update':
- print("First parameter must be a file name or a file object opened in 'update' mode.")
- return False
-
- if not isinstance(ext, int) and not isinstance(ext, tuple) \
- and not isinstance(ext,str) \
- and not isinstance(ext, list) and ext is not None:
- print("Ext must be integer, tuple, string,a list of int extension numbers, \n\
- or a list of tuples representing a fits extension, for example ('sci', 1).")
- return False
-
- if not isinstance(fromext, str) and fromext is not None:
- print("fromext must be a string representing a valid extname")
- return False
-
- if not isinstance(toext, list) and not isinstance(toext, str) and \
- toext is not None :
- print("toext must be a string or a list of strings representing extname")
- return False
-
- if len(wcskey) != 1:
- print('Parameter wcskey must be a character - one of "A"-"Z" or " "')
- return False
-
- return True
-
-def closefobj(fname, f):
- """
- Functions in this module accept as input a file name or a file object.
- If the input was a file name (string) we close the object. If the user
- passed a file object we leave it to the user to close it.
- """
- if isinstance(fname, str):
- f.close()
-
-def mapFitsExt2HDUListInd(fname, extname):
- """
- Map FITS extensions with 'EXTNAME' to HDUList indexes.
- """
-
- if not isinstance(fname, fits.HDUList):
- f = fits.open(fname)
- close_file = True
- else:
- f = fname
- close_file = False
- d = {}
- for hdu in f:
- if 'EXTNAME' in hdu.header and hdu.header['EXTNAME'] == extname:
- extver = hdu.header['EXTVER']
- d[(extname, extver)] = f.index_of((extname, extver))
- if close_file:
- f.close()
- return d
diff --git a/lib/stwcs/wcsutil/convertwcs.py b/lib/stwcs/wcsutil/convertwcs.py
deleted file mode 100644
index a384eb1..0000000
--- a/lib/stwcs/wcsutil/convertwcs.py
+++ /dev/null
@@ -1,118 +0,0 @@
-from __future__ import print_function
-try:
- import stwcs
- from stwcs import wcsutil
-except:
- stwcs = None
-
-from stsci.tools import fileutil
-
-OPUS_WCSKEYS = ['OCRVAL1','OCRVAL2','OCRPIX1','OCRPIX2',
- 'OCD1_1','OCD1_2','OCD2_1','OCD2_2',
- 'OCTYPE1','OCTYPE2']
-
-
-def archive_prefix_OPUS_WCS(fobj,extname='SCI'):
- """ Identifies WCS keywords which were generated by OPUS and archived
- using a prefix of 'O' for all 'SCI' extensions in the file
-
- Parameters
- ----------
- fobj : str or `astropy.io.fits.HDUList`
- Filename or fits object of a file
-
- """
- if stwcs is None:
- print('=====================')
- print('The STWCS package is needed to convert an old-style OPUS WCS to an alternate WCS')
- print('=====================')
- raise ImportError
-
-
- closefits = False
- if isinstance(fobj,str):
- # A filename was provided as input
- fobj = fits.open(fobj,mode='update')
- closefits=True
-
- # Define the header
- ext = ('sci',1)
- hdr = fobj[ext].header
-
- numextn = fileutil.countExtn(fobj)
- extlist = []
- for e in range(1,numextn+1):
- extlist.append(('sci',e))
-
- # Insure that the 'O' alternate WCS is present
- if 'O' not in wcsutil.wcskeys(hdr):
- # if not, archive the Primary WCS as the default OPUS WCS
- wcsutil.archiveWCS(fobj,extlist, wcskey='O', wcsname='OPUS')
-
- # find out how many SCI extensions are in the image
- numextn = fileutil.countExtn(fobj,extname=extname)
- if numextn == 0:
- extname = 'PRIMARY'
-
- # create HSTWCS object from PRIMARY WCS
- wcsobj = wcsutil.HSTWCS(fobj,ext=ext,wcskey='O')
- # get list of WCS keywords
- wcskeys = list(wcsobj.wcs2header().keys())
-
- # For each SCI extension...
- for e in range(1,numextn+1):
- # Now, look for any WCS keywords with a prefix of 'O'
- for key in wcskeys:
- okey = 'O'+key[:7]
- hdr = fobj[(extname,e)].header
- if okey in hdr:
- # Update alternate WCS keyword with prefix-O OPUS keyword value
- hdr[key] = hdr[okey]
-
- if closefits:
- fobj.close()
-
-def create_prefix_OPUS_WCS(fobj,extname='SCI'):
- """ Creates alternate WCS with a prefix of 'O' for OPUS generated WCS values
- to work with old MultiDrizzle.
-
- Parameters
- ----------
- fobj : str or `astropy.io.fits.HDUList`
- Filename or fits object of a file
-
- Raises
- ------
- IOError:
- if input FITS object was not opened in 'update' mode
-
- """
- # List of O-prefix keywords to create
- owcskeys = OPUS_WCSKEYS
-
- closefits = False
- if isinstance(fobj,str):
- # A filename was provided as input
- fobj = fits.open(fobj, mode='update')
- closefits=True
- else:
- # check to make sure this FITS obj has been opened in update mode
- if fobj.fileinfo(0)['filemode'] != 'update':
- print('File not opened with "mode=update". Quitting...')
- raise IOError
-
- # check for existance of O-prefix WCS
- if owcskeys[0] not in fobj['sci',1].header:
-
- # find out how many SCI extensions are in the image
- numextn = fileutil.countExtn(fobj,extname=extname)
- if numextn == 0:
- extname = ''
- for extn in range(1,numextn+1):
- hdr = fobj[(extname,extn)].header
- for okey in owcskeys:
- hdr[okey] = hdr[okey[1:]+'O']
-
- # Close FITS image if we had to open it...
- if closefits:
- fobj.close()
diff --git a/lib/stwcs/wcsutil/getinput.py b/lib/stwcs/wcsutil/getinput.py
deleted file mode 100644
index 8ee1123..0000000
--- a/lib/stwcs/wcsutil/getinput.py
+++ /dev/null
@@ -1,62 +0,0 @@
-from astropy.io import fits
-from stsci.tools import irafglob, fileutil, parseinput
-
-def parseSingleInput(f=None, ext=None):
- if isinstance(f, str):
- # create an HSTWCS object from a filename
- if ext != None:
- filename = f
- if isinstance(ext,tuple):
- if ext[0] == '':
- extnum = ext[1] # handle ext=('',1)
- else:
- extnum = ext
- else:
- extnum = int(ext)
- elif ext == None:
- filename, ext = fileutil.parseFilename(f)
- ext = fileutil.parseExtn(ext)
- if ext[0] == '':
- extnum = int(ext[1]) #handle ext=('',extnum)
- else:
- extnum = ext
- phdu = fits.open(filename)
- hdr0 = phdu[0].header
- try:
- ehdr = phdu[extnum].header
- except (IndexError, KeyError) as e:
- raise e.__class__('Unable to get extension %s.' % extnum)
-
- elif isinstance(f, fits.HDUList):
- phdu = f
- if ext == None:
- extnum = 0
- else:
- extnum = ext
- ehdr = f[extnum].header
- hdr0 = f[0].header
- filename = hdr0.get('FILENAME', "")
-
- else:
- raise ValueError('Input must be a file name string or a'
- '`astropy.io.fits.HDUList` object')
-
- return filename, hdr0, ehdr, phdu
-
-
-def parseMultipleInput(input):
- if isinstance(input, str):
- if input[0] == '@':
- # input is an @ file
- filelist = irafglob.irafglob(input)
- else:
- try:
- filelist, output = parseinput.parseinput(input)
- except IOError: raise
- elif isinstance(input, list):
- if isinstance(input[0], wcsutil.HSTWCS):
- # a list of HSTWCS objects
- return input
- else:
- filelist = input[:]
- return filelist
diff --git a/lib/stwcs/wcsutil/headerlet.py b/lib/stwcs/wcsutil/headerlet.py
deleted file mode 100644
index c0dd9b0..0000000
--- a/lib/stwcs/wcsutil/headerlet.py
+++ /dev/null
@@ -1,2754 +0,0 @@
-"""
-This module implements headerlets.
-
-A headerlet serves as a mechanism for encapsulating WCS information
-which can be used to update the WCS solution of an image. The idea
-came up first from the desire for passing improved astrometric
-solutions for HST data and provide those solutions in a manner
-that would not require getting entirely new images from the archive
-when only the WCS information has been updated.
-
-"""
-
-from __future__ import absolute_import, division, print_function
-import os
-import sys
-import functools
-import logging
-import textwrap
-import copy
-import time
-
-import numpy as np
-from astropy.io import fits
-#import pywcs
-from astropy import wcs as pywcs
-from astropy.utils import lazyproperty
-
-from stsci.tools.fileutil import countExtn
-from stsci.tools import fileutil as fu
-from stsci.tools import parseinput
-
-from stwcs.updatewcs import utils
-from . import altwcs
-from . import wcscorr
-from .hstwcs import HSTWCS
-from .mappings import basic_wcs
-
-#### Logging support functions
-class FuncNameLoggingFormatter(logging.Formatter):
- def __init__(self, fmt=None, datefmt=None):
- if '%(funcName)s' not in fmt:
- fmt = '%(funcName)s' + fmt
- logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt)
-
- def format(self, record):
- record = copy.copy(record)
- if hasattr(record, 'funcName') and record.funcName == 'init_logging':
- record.funcName = ''
- else:
- record.funcName += ' '
- return logging.Formatter.format(self, record)
-
-
-logger = logging.getLogger(__name__)
-formatter = FuncNameLoggingFormatter("%(levelname)s: %(message)s")
-ch = logging.StreamHandler()
-ch.setFormatter(formatter)
-ch.setLevel(logging.CRITICAL)
-logger.addHandler(ch)
-logger.setLevel(logging.DEBUG)
-
-FITS_STD_KW = ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT',
- 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN',
- 'INHERIT', 'DATE', 'IRAF-TLM']
-
-DEFAULT_SUMMARY_COLS = ['HDRNAME', 'WCSNAME', 'DISTNAME', 'AUTHOR', 'DATE',
- 'SIPNAME', 'NPOLFILE', 'D2IMFILE', 'DESCRIP']
-COLUMN_DICT = {'vals': [], 'width': []}
-COLUMN_FMT = '{:<{width}}'
-
-
-def init_logging(funcname=None, level=100, mode='w', **kwargs):
- """
-
- Initialize logging for a function
-
- Parameters
- ----------
- funcname: string
- Name of function which will be recorded in log
- level: int, or bool, or string
- int or string : Logging level
- bool: False - switch off logging
- Text logging level for the message ("DEBUG", "INFO",
- "WARNING", "ERROR", "CRITICAL")
- mode: 'w' or 'a'
- attach to logfile ('a' or start a new logfile ('w')
-
- """
- for hndl in logger.handlers:
- if isinstance(hndl, logging.FileHandler):
- has_file_handler = True
- else:
- has_file_handler = False
- if level:
- if not has_file_handler:
- logname = 'headerlet.log'
- fh = logging.FileHandler(logname, mode=mode)
- fh.setFormatter(formatter)
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
- logger.info("%s: Starting %s with arguments:\n\t %s" %
- (time.asctime(), funcname, kwargs))
-
-
-def with_logging(func):
- @functools.wraps(func)
- def wrapped(*args, **kw):
- level = kw.get('logging', 100)
- mode = kw.get('logmode', 'w')
- func_args = kw.copy()
- if sys.version_info[0] >= 3:
- argnames = func.__code__.co_varnames
- else:
- argnames = func.func_code.co_varnames
-
- for argname, arg in zip(argnames, args):
- func_args[argname] = arg
-
- init_logging(func.__name__, level, mode, **func_args)
- return func(*args, **kw)
- return wrapped
-
-#### Utility functions
-def is_par_blank(par):
- return par in ['', ' ', 'INDEF', "None", None]
-
-def parse_filename(fname, mode='readonly'):
- """
- Interprets the input as either a filename of a file that needs to be opened
- or a PyFITS object.
-
- Parameters
- ----------
- fname : str, `astropy.io.fits.HDUList`
- Input pointing to a file or `astropy.io.fits.HDUList` object.
- An input filename (str) will be expanded as necessary to
- interpret any environmental variables
- included in the filename.
-
- mode : string
- Specifies what mode to use when opening the file, if it needs
- to open the file at all [Default: 'readonly']
-
- Returns
- -------
- fobj : `astropy.io.fits.HDUList`
- FITS file handle for input
-
- fname : str
- Name of input file
-
- close_fobj : bool
- Flag specifying whether or not fobj needs to be closed since it was
- opened by this function. This allows a program to know whether they
- need to worry about closing the FITS object as opposed to letting
- the higher level interface close the object.
-
- """
- close_fobj = False
- if not isinstance(fname, list):
- if sys.version_info[0] >= 3:
- is_string = isinstance(fname, str)
- else:
- is_string = isinstance(fname, basestring)
- if is_string:
- fname = fu.osfn(fname)
- fobj = fits.open(fname, mode=mode)
- close_fobj = True
- else:
- fobj = fname
- if hasattr(fobj, 'filename'):
- fname = fobj.filename()
- else:
- fname = ''
- return fobj, fname, close_fobj
-
-def get_headerlet_kw_names(fobj, kw='HDRNAME'):
- """
- Returns a list of specified keywords from all HeaderletHDU
- extensions in a science file.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- kw : str
- Name of keyword to be read and reported
- """
-
- fobj, fname, open_fobj = parse_filename(fobj)
-
- hdrnames = []
- for ext in fobj:
- if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
- hdrnames.append(ext.header[kw])
-
- if open_fobj:
- fobj.close()
-
- return hdrnames
-
-def get_header_kw_vals(hdr, kwname, kwval, default=0):
- if kwval is None:
- if kwname in hdr:
- kwval = hdr[kwname]
- else:
- kwval = default
- return kwval
-
-@with_logging
-def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
- strict=True, logging=False, logmode='w'):
- """
- Returns all HeaderletHDU extensions in a science file that matches
- the inputs specified by the user. If no hdrext, hdrname or distname are
- specified, this function will return a list of all HeaderletHDU objects.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- Name of FITS file or open fits object (`astropy.io.fits.HDUList` instance)
- hdrext : int, tuple or None
- index number(EXTVER) or extension tuple of HeaderletHDU to be returned
- hdrname : string
- value of HDRNAME for HeaderletHDU to be returned
- distname : string
- value of DISTNAME for HeaderletHDUs to be returned
- strict : bool [Default: True]
- Specifies whether or not at least one parameter needs to be provided
- If False, all extension indices returned if hdrext, hdrname and distname
- are all None. If True and hdrext, hdrname, and distname are all None,
- raise an Exception requiring one to be specified.
- logging : boolean
- enable logging to a file called headerlet.log
- logmode : 'w' or 'a'
- log file open mode
-
- Returns
- -------
- hdrlets : list
- A list of all matching HeaderletHDU extension indices (could be just one)
-
- """
-
- get_all = False
- if hdrext is None and hdrname is None and distname is None:
- if not strict:
- get_all = True
- else:
- mess = """\n
- =====================================================
- No valid Headerlet extension specified.
- Either "hdrname", "hdrext", or "distname" needs to be specified.
- =====================================================
- """
- logger.critical(mess)
- raise ValueError
-
- fobj, fname, open_fobj = parse_filename(fobj)
-
- hdrlets = []
- if hdrext is not None and isinstance(hdrext, int):
- if hdrext in range(len(fobj)): # insure specified hdrext is in fobj
- if isinstance(fobj[hdrext], fits.hdu.base.NonstandardExtHDU) and \
- fobj[hdrext].header['EXTNAME'] == 'HDRLET':
- hdrlets.append(hdrext)
- else:
- for ext in fobj:
- if isinstance(ext, fits.hdu.base.NonstandardExtHDU):
- if get_all:
- hdrlets.append(fobj.index(ext))
- else:
- if hdrext is not None:
- if isinstance(hdrext, tuple):
- hdrextname = hdrext[0]
- hdrextnum = hdrext[1]
- else:
- hdrextname = 'HDRLET'
- hdrextnum = hdrext
- hdrext_match = ((hdrext is not None) and
- (hdrextnum == ext.header['EXTVER']) and
- (hdrextname == ext.header['EXTNAME']))
- hdrname_match = ((hdrname is not None) and
- (hdrname == ext.header['HDRNAME']))
- distname_match = ((distname is not None) and
- (distname == ext.header['DISTNAME']))
- if hdrext_match or hdrname_match or distname_match:
- hdrlets.append(fobj.index(ext))
-
- if open_fobj:
- fobj.close()
-
- if len(hdrlets) == 0:
- if hdrname:
- kwerr = 'hdrname'
- kwval = hdrname
- elif hdrext:
- kwerr = 'hdrext'
- kwval = hdrext
- else:
- kwerr = 'distname'
- kwval = distname
- message = """\n
- =====================================================
- No valid Headerlet extension found!'
- "%s" = %s not found in %s.' % (kwerr, kwval, fname)
- =====================================================
- """
- logger.critical(message)
- raise ValueError
-
- return hdrlets
-
-def verify_hdrname_is_unique(fobj, hdrname):
- """
- Verifies that no other HeaderletHDU extension has the specified hdrname.
-
- Parameters
- ----------
- fobj : str, `astropy.io.fits.HDUList`
- Name of FITS file or open fits file object
- hdrname : str
- value of HDRNAME for HeaderletHDU to be compared as unique
-
- Returns
- -------
- unique: bool
- If True, no other HeaderletHDU has the specified HDRNAME value
- """
- hdrnames_list = get_headerlet_kw_names(fobj)
- unique = not(hdrname in hdrnames_list)
-
- return unique
-
-def update_versions(sourcehdr, desthdr):
- """
- Update keywords which store version numbers
- """
- phdukw = {'PYWCSVER': 'Version of PYWCS used to updated the WCS',
- 'UPWCSVER': 'Version of STWCS used to updated the WCS'
- }
- for key in phdukw:
- try:
- desthdr[key] = (sourcehdr[key], sourcehdr.comments[key])
- except KeyError:
- desthdr[key] = (" ", phdukw[key])
-
-def update_ref_files(source, dest):
- """
- Update the reference files name in the primary header of 'dest'
- using values from 'source'
-
- Parameters
- ----------
- source : `astropy.io.fits.Header`
- dest : `astropy.io.fits.Header`
- """
- logger.info("Updating reference files")
- phdukw = {'NPOLFILE': True,
- 'IDCTAB': True,
- 'D2IMFILE': True,
- 'SIPNAME': True,
- 'DISTNAME': True}
-
- for key in phdukw:
- try:
- try:
- del dest[key]
- except:
- pass
- dest.set(key, source[key], source.comments[key])
- except KeyError:
- phdukw[key] = False
- return phdukw
-
-def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
- output=None, clobber=True, quiet=False ):
- """
- Print out summary dictionary to STDOUT, and possibly an output file
-
- """
- nrows = None
- if idcol:
- nrows = len(idcol['vals'])
-
- # Find max width of each column
- column_widths = {}
- for kw in summary_dict:
- colwidth = np.array(summary_dict[kw]['width']).max()
- if maxwidth:
- colwidth = min(colwidth, maxwidth)
- column_widths[kw] = colwidth + pad
- if nrows is None:
- nrows = len(summary_dict[kw]['vals'])
-
- # print rows now
- outstr = ''
- # Start with column names
- if idcol:
- outstr += COLUMN_FMT.format(idcol['name'], width=idcol['width'] + pad)
- for kw in summary_cols:
- outstr += COLUMN_FMT.format(kw, width=column_widths[kw])
- outstr += '\n'
- # Now, add a row for each headerlet
- for row in range(nrows):
- if idcol:
- outstr += COLUMN_FMT.format(idcol['vals'][row],
- width=idcol['width']+pad)
- for kw in summary_cols:
- val = summary_dict[kw]['vals'][row][:(column_widths[kw]-pad)]
- outstr += COLUMN_FMT.format(val, width=column_widths[kw])
- outstr += '\n'
- if not quiet:
- print(outstr)
-
- # If specified, write info to separate text file
- write_file = False
- if output:
- output = fu.osfn(output) # Expand any environment variables in filename
- write_file = True
- if os.path.exists(output):
- if clobber:
- os.remove(output)
- else:
- print('WARNING: Not writing results to file!')
- print(' Output text file ', output, ' already exists.')
- print(' Set "clobber" to True or move file before trying again.')
- write_file = False
- if write_file:
- fout = open(output, mode='w')
- fout.write(outstr)
- fout.close()
-
-#### Private utility functions
-def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
- sipname, npolfile, d2imfile,
- nmatch,catalog, wcskey,
- author, descrip, history):
- # convert input values into valid FITS kw values
- if author is None:
- author = ''
- if descrip is None:
- descrip = ''
-
- sipname, idctab = utils.build_sipname(fobj, fname, sipname)
- logger.info("Setting sipname value to %s" % sipname)
-
- npolname, npolfile = utils.build_npolname(fobj, npolfile)
- logger.info("Setting npolfile value to %s" % npolname)
-
- d2imname, d2imfile = utils.build_d2imname(fobj,d2imfile)
- logger.info("Setting d2imfile value to %s" % d2imname)
-
- distname = utils.build_distname(sipname, npolname, d2imname)
- logger.info("Setting distname to %s" % distname)
-
- # open file and parse comments
- if history not in ['', ' ', None, 'INDEF'] and os.path.isfile(history):
- f = open(fu.osfn(history))
- history = f.readlines()
- f.close()
- else:
- history = ''
-
- rms_ra = fobj[wcsext].header.get("CRDER1"+wcskey, 0)
- rms_dec = fobj[wcsext].header.get("CRDER2"+wcskey, 0)
- if not nmatch:
- nmatch = fobj[wcsext].header.get("NMATCH"+wcskey, 0)
- if not catalog:
- catalog = fobj[wcsext].header.get('CATALOG'+wcskey, "")
- # get the version of STWCS used to create the WCS of the science file.
- #try:
- #upwcsver = fobj[0].header.cards[fobj[0].header.index('UPWCSVER')]
- #except KeyError:
- #upwcsver = pyfits.Card("UPWCSVER", " ",
- #"Version of STWCS used to update the WCS")
- #try:
- #pywcsver = fobj[0].header.cards[fobj[0].header.index('PYWCSVER')]
- #except KeyError:
- #pywcsver = pyfits.Card("PYWCSVER", " ",
- #"Version of PYWCS used to update the WCS")
- upwcsver = fobj[0].header.get('UPWCSVER', "")
- pywcsver = fobj[0].header.get('PYWCSVER', "")
- # build Primary HDU
- phdu = fits.PrimaryHDU()
- phdu.header['DESTIM'] = (destim, 'Destination observation root name')
- phdu.header['HDRNAME'] = (hdrname, 'Headerlet name')
- fmt = "%Y-%m-%dT%H:%M:%S"
- phdu.header['DATE'] = (time.strftime(fmt), 'Date FITS file was generated')
- phdu.header['WCSNAME'] = (wcsname, 'WCS name')
- phdu.header['DISTNAME'] = (distname, 'Distortion model name')
- phdu.header['SIPNAME'] = (sipname,
- 'origin of SIP polynomial distortion model')
- phdu.header['NPOLFILE'] = (npolfile,
- 'origin of non-polynmial distortion model')
- phdu.header['D2IMFILE'] = (d2imfile,
- 'origin of detector to image correction')
- phdu.header['IDCTAB'] = (idctab,
- 'origin of Polynomial Distortion')
- phdu.header['AUTHOR'] = (author, 'headerlet created by this user')
- phdu.header['DESCRIP'] = (descrip,
- 'Short description of headerlet solution')
- phdu.header['RMS_RA'] = (rms_ra,
- 'RMS in RA at ref pix of headerlet solution')
- phdu.header['RMS_DEC'] = (rms_dec,
- 'RMS in Dec at ref pix of headerlet solution')
- phdu.header['NMATCH'] = (nmatch,
- 'Number of sources used for headerlet solution')
- phdu.header['CATALOG'] = (catalog,
- 'Astrometric catalog used for headerlet '
- 'solution')
- phdu.header['UPWCSVER'] = (upwcsver, "Version of STWCS used to update the WCS")
- phdu.header['PYWCSVER'] = (pywcsver, "Version of PYWCS used to update the WCS")
-
- # clean up history string in order to remove whitespace characters that
- # would cause problems with FITS
- if isinstance(history, list):
- history_str = ''
- for line in history:
- history_str += line
- else:
- history_str = history
- history_lines = textwrap.wrap(history_str, width=70)
- for hline in history_lines:
- phdu.header.add_history(hline)
-
- return phdu
-
-
-#### Public Interface functions
-@with_logging
-def extract_headerlet(filename, output, extnum=None, hdrname=None,
- clobber=False, logging=True):
- """
- Finds a headerlet extension in a science file and writes it out as
- a headerlet FITS file.
-
- If both hdrname and extnum are given they should match, if not
- raise an Exception
-
- Parameters
- ----------
- filename: string or HDUList or Python list
- This specifies the name(s) of science file(s) from which headerlets
- will be extracted.
-
- String input formats supported include use of wild-cards, IRAF-style
- '@'-files (given as '@<filename>') and comma-separated list of names.
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- If a list of filenames has been specified, it will extract a
- headerlet from the same extnum from all filenames.
- output: string
- Filename or just rootname of output headerlet FITS file
- If string does not contain '.fits', it will create a filename with
- '_hlet.fits' suffix
- extnum: int
- Extension number which contains the headerlet to be written out
- hdrname: string
- Unique name for headerlet, stored as the HDRNAME keyword
- It stops if a value is not provided and no extnum has been specified
- clobber: bool
- If output file already exists, this parameter specifies whether or not
- to overwrite that file [Default: False]
- logging: boolean
- enable logging to a file
-
- """
-
- if isinstance(filename, fits.HDUList):
- filename = [filename]
- else:
- filename, oname = parseinput.parseinput(filename)
-
- for f in filename:
- fobj, fname, close_fobj = parse_filename(f)
- frootname = fu.buildNewRootname(fname)
- if hdrname in ['', ' ', None, 'INDEF'] and extnum is None:
- if close_fobj:
- fobj.close()
- logger.critical("Expected a valid extnum or hdrname parameter")
- raise ValueError
- if hdrname is not None:
- extn_from_hdrname = find_headerlet_HDUs(fobj, hdrname=hdrname)[0]
- if extn_from_hdrname != extnum:
- logger.critical("hdrname and extnmu should refer to the same FITS extension")
- raise ValueError
- else:
- hdrhdu = fobj[extn_from_hdrname]
- else:
- hdrhdu = fobj[extnum]
-
- if not isinstance(hdrhdu, HeaderletHDU):
- logger.critical("Specified extension is not a headerlet")
- raise ValueError
-
- hdrlet = hdrhdu.headerlet
-
- if output is None:
- output = frootname
-
- if '.fits' in output:
- outname = output
- else:
- outname = '%s_hlet.fits' % output
-
- hdrlet.tofile(outname, clobber=clobber)
-
- if close_fobj:
- fobj.close()
-
-
-@with_logging
-def write_headerlet(filename, hdrname, output=None, sciext='SCI',
- wcsname=None, wcskey=None, destim=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- attach=True, clobber=False, logging=False):
-
- """
- Save a WCS as a headerlet FITS file.
-
- This function will create a headerlet, write out the headerlet to a
- separate headerlet file, then, optionally, attach it as an extension
- to the science image (if it has not already been archived)
-
- Either wcsname or wcskey must be provided; if both are given, they must
- match a valid WCS.
-
- Updates wcscorr if necessary.
-
- Parameters
- ----------
- filename: string or HDUList or Python list
- This specifies the name(s) of science file(s) from which headerlets
- will be created and written out.
- String input formats supported include use of wild-cards, IRAF-style
- '@'-files (given as '@<filename>') and comma-separated list of names.
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string
- Unique name for this headerlet, stored as HDRNAME keyword
- output: string or None
- Filename or just rootname of output headerlet FITS file
- If string does not contain '.fits', it will create a filename
- starting with the science filename and ending with '_hlet.fits'.
- If None, a default filename based on the input filename will be
- generated for the headerlet FITS filename
- sciext: string
- name (EXTNAME) of extension that contains WCS to be saved
- wcsname: string
- name of WCS to be archived, if " ": stop
- wcskey: one of A...Z or " " or "PRIMARY"
- if " " or "PRIMARY" - archive the primary WCS
- destim: string
- DESTIM keyword
- if NOne, use ROOTNAME or science file name
- sipname: string or None (default)
- Name of unique file where the polynomial distortion coefficients were
- read from. If None, the behavior is:
- The code looks for a keyword 'SIPNAME' in the science header
- If not found, for HST it defaults to 'IDCTAB'
- If there is no SIP model the value is 'NOMODEL'
- If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
- npolfile: string or None (default)
- Name of a unique file where the non-polynomial distortion was stored.
- If None:
- The code looks for 'NPOLFILE' in science header.
- If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
- If npol model exists, it is set to 'UNKNOWN'
- d2imfile: string
- Name of a unique file where the detector to image correction was
- stored. If None:
- The code looks for 'D2IMFILE' in the science header.
- If 'D2IMFILE' is not found and there is no d2im correction,
- it is set to 'NOMODEL'
- If d2im correction exists, but 'D2IMFILE' is missing from science
- header, it is set to 'UNKNOWN'
- author: string
- Name of user who created the headerlet, added as 'AUTHOR' keyword
- to headerlet PRIMARY header
- descrip: string
- Short description of the solution provided by the headerlet
- This description will be added as the single 'DESCRIP' keyword
- to the headerlet PRIMARY header
- history: filename, string or list of strings
- Long (possibly multi-line) description of the solution provided
- by the headerlet. These comments will be added as 'HISTORY' cards
- to the headerlet PRIMARY header
- If filename is specified, it will format and attach all text from
- that file as the history.
- attach: bool
- Specify whether or not to attach this headerlet as a new extension
- It will verify that no other headerlet extension has been created with
- the same 'hdrname' value.
- clobber: bool
- If output file already exists, this parameter specifies whether or not
- to overwrite that file [Default: False]
- logging: boolean
- enable file logging
- """
-
- if isinstance(filename, fits.HDUList):
- filename = [filename]
- else:
- filename, oname = parseinput.parseinput(filename)
-
- for f in filename:
- if isinstance(f, str):
- fname = f
- else:
- fname = f.filename()
-
- if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
- message = """\n
- No valid WCS found found in %s.
- A valid value for either "wcsname" or "wcskey"
- needs to be specified.
- """ % fname
- logger.critical(message)
- raise ValueError
-
- # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
- if wcskey == 'PRIMARY':
- wcskey = ' '
-
- if attach:
- umode = 'update'
- else:
- umode = 'readonly'
-
- fobj, fname, close_fobj = parse_filename(f, mode=umode)
-
- # Interpret sciext input for this file
- if isinstance(sciext, int):
- sciextlist = [sciext] # allow for specification of simple FITS header
- elif isinstance(sciext, str):
- numsciext = countExtn(fobj, sciext)
- if numsciext > 0:
- sciextlist = [tuple((sciext,i)) for i in range(1, numsciext+1)]
- else:
- sciextlist = [0]
- elif isinstance(sciext, list):
- sciextlist = sciext
- else:
- errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
- " a valid EXTNAME string, or an integer."
- logger.critical(errstr)
- raise ValueError
-
- wnames = altwcs.wcsnames(fobj,ext=sciextlist[0])
-
- # Insure that WCSCORR table has been created with all original
- # WCS's recorded prior to adding the headerlet WCS
- wcscorr.init_wcscorr(fobj)
-
- if wcsname is None:
- scihdr = fobj[sciextlist[0]].header
- wname = scihdr['wcsname'+wcskey]
- else:
- wname = wcsname
- if hdrname in [None, ' ', '']:
- hdrname = wcsname
-
- logger.critical('Creating the headerlet from image %s' % fname)
- hdrletobj = create_headerlet(fobj, sciext=sciextlist,
- wcsname=wname, wcskey=wcskey,
- hdrname=hdrname,
- sipname=sipname, npolfile=npolfile,
- d2imfile=d2imfile, author=author,
- descrip=descrip, history=history,
- nmatch=nmatch, catalog=catalog,
- logging=False)
-
- if attach:
- # Check to see whether or not a HeaderletHDU with
- #this hdrname already exists
- hdrnames = get_headerlet_kw_names(fobj)
- if hdrname not in hdrnames:
- hdrlet_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
-
- if destim is not None:
- hdrlet_hdu.header['destim'] = destim
-
- fobj.append(hdrlet_hdu)
-
- # Update the WCSCORR table with new rows from the headerlet's WCSs
- wcscorr.update_wcscorr(fobj, source=hdrletobj,
- extname='SIPWCS', wcs_id=wname)
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- else:
- message = """
- Headerlet with hdrname %s already archived for WCS %s.
- No new headerlet appended to %s.
- """ % (hdrname, wname, fname)
- logger.critical(message)
-
- if close_fobj:
- logger.info('Closing image in write_headerlet()...')
- fobj.close()
-
- frootname = fu.buildNewRootname(fname)
-
- if output is None:
- # Generate default filename for headerlet FITS file
- outname = '{0}_hlet.fits'.format(frootname)
- else:
- outname = output
-
- if not outname.endswith('.fits'):
- outname = '{0}_{1}_hlet.fits'.format(frootname,outname)
-
- # If user specifies an output filename for headerlet, write it out
- hdrletobj.tofile(outname, clobber=clobber)
- logger.critical( 'Created Headerlet file %s ' % outname)
-
- del hdrletobj
-
-@with_logging
-def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
- wcskey=" ", wcsname=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- logging=False, logmode='w'):
- """
- Create a headerlet from a WCS in a science file
- If both wcskey and wcsname are given they should match, if not
- raise an Exception
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- sciext: string or python list (default: 'SCI')
- Extension in which the science data with the linear WCS is.
- The headerlet will be created from these extensions.
- If string - a valid EXTNAME is expected
- If int - specifies an extension with a valid WCS, such as 0 for a
- simple FITS file
- If list - a list of FITS extension numbers or strings representing
- extension tuples, e.g. ('SCI, 1') is expected.
- hdrname: string
- value of HDRNAME keyword
- Takes the value from the HDRNAME<wcskey> keyword, if not available from WCSNAME<wcskey>
- It stops if neither is found in the science file and a value is not provided
- destim: string or None
- name of file this headerlet can be applied to
- if None, use ROOTNAME keyword
- wcskey: char (A...Z) or " " or "PRIMARY" or None
- a char representing an alternate WCS to be used for the headerlet
- if " ", use the primary (default)
- if None use wcsname
- wcsname: string or None
- if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet
- sipname: string or None (default)
- Name of unique file where the polynomial distortion coefficients were
- read from. If None, the behavior is:
- The code looks for a keyword 'SIPNAME' in the science header
- If not found, for HST it defaults to 'IDCTAB'
- If there is no SIP model the value is 'NOMODEL'
- If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
- npolfile: string or None (default)
- Name of a unique file where the non-polynomial distortion was stored.
- If None:
- The code looks for 'NPOLFILE' in science header.
- If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
- If npol model exists, it is set to 'UNKNOWN'
- d2imfile: string
- Name of a unique file where the detector to image correction was
- If None:
- The code looks for 'D2IMFILE' in the science header.
- If 'D2IMFILE' is not found and there is no d2im correction,
- it is set to 'NOMODEL'
- If d2im correction exists, but 'D2IMFILE' is missing from science
- header, it is set to 'UNKNOWN'
- author: string
- Name of user who created the headerlet, added as 'AUTHOR' keyword
- to headerlet PRIMARY header
- descrip: string
- Short description of the solution provided by the headerlet
- This description will be added as the single 'DESCRIP' keyword
- to the headerlet PRIMARY header
- history: filename, string or list of strings
- Long (possibly multi-line) description of the solution provided
- by the headerlet. These comments will be added as 'HISTORY' cards
- to the headerlet PRIMARY header
- If filename is specified, it will format and attach all text from
- that file as the history.
- nmatch: int (optional)
- Number of sources used in the new solution fit
- catalog: string (optional)
- Astrometric catalog used for headerlet solution
- logging: boolean
- enable file logging
- logmode: 'w' or 'a'
- log file open mode
-
- Returns
- -------
- Headerlet object
-
- """
- if wcskey == 'O':
- message = "Warning: 'O' is a reserved key for the original WCS. Quitting..."
- logger.info(message)
- return None
-
- fobj, fname, close_file = parse_filename(filename)
- # based on `sciext` create a list of (EXTNAME, EXTVER) tuples
- # of extensions with WCS to be saved in a headerlet
- sciext = get_extname_extver_list(fobj, sciext)
- logger.debug("Data extensions from which to create headerlet:\n\t %s"
- % (str(sciext)))
- if not sciext:
- logger.critical("No valid target extensions found in file %s" % fname)
- raise ValueError
-
- # Define extension to evaluate for verification of input parameters
- wcsext = sciext[0]
- logger.debug("sciext in create_headerlet is %s" % str(sciext))
- # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
- if wcskey == 'PRIMARY':
- wcskey = ' '
- logger.info("wcskey reset from 'PRIMARY' to ' '")
- wcskey = wcskey.upper()
- wcsnamekw = "".join(["WCSNAME", wcskey.upper()]).rstrip()
- hdrnamekw = "".join(["HDRNAME", wcskey.upper()]).rstrip()
-
- wnames = altwcs.wcsnames(fobj, ext=wcsext)
- if not wcsname:
- # User did not specify a value for 'wcsname'
- if wcsnamekw in fobj[wcsext].header:
- #check if there's a WCSNAME for this wcskey in the header
- wcsname = fobj[wcsext].header[wcsnamekw]
- logger.info("Setting wcsname from header[%s] to %s" % (wcsnamekw, wcsname))
- else:
- if hdrname not in ['', ' ', None, "INDEF"]:
- """
- If wcsname for this wcskey was not provided
- and WCSNAME<wcskey> does not exist in the header
- and hdrname is provided, then
- use hdrname as WCSNAME for the headerlet.
- """
- wcsname = hdrname
- logger.debug("Setting wcsname from hdrname to %s" % hdrname)
- else:
- if hdrnamekw in fobj[wcsext].header:
- wcsname = fobj[wcsext].header[hdrnamekw]
- logger.debug("Setting wcsname from header[%s] to %s" % (hdrnamekw, wcsname))
- else:
- message = """
- Required keywords 'HDRNAME' or 'WCSNAME' not found!
- Please specify a value for parameter 'hdrname'
- or update header with 'WCSNAME' keyword.
- """
- logger.critical(message)
- raise KeyError
- else:
- # Verify that 'wcsname' and 'wcskey' values specified by user reference
- # the same WCS
- wname = fobj[wcsext].header[wcsnamekw]
- if wcsname != wname:
- message = "\tInconsistent values for 'wcskey' and 'wcsname' specified!\n"
- message += " 'wcskey' = %s and 'wcsname' = %s. \n" % (wcskey, wcsname)
- message += "Actual value of %s found to be %s. \n" % (wcsnamekw, wname)
- logger.critical(message)
- raise KeyError
- wkeys = altwcs.wcskeys(fobj, ext=wcsext)
- if wcskey != ' ':
- if wcskey not in wkeys:
- logger.critical('No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext)))
- raise ValueError("No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))")
-
- # get remaining required keywords
- if destim is None:
- if 'ROOTNAME' in fobj[0].header:
- destim = fobj[0].header['ROOTNAME']
- logger.info("Setting destim to rootname of the science file")
- else:
- destim = fname
- logger.info('DESTIM not provided')
- logger.info('Keyword "ROOTNAME" not found')
- logger.info('Using file name as DESTIM')
-
- if not hdrname:
- # check if HDRNAME<wcskey> is in header
- if hdrnamekw in fobj[wcsext].header:
- hdrname = fobj[wcsext].header[hdrnamekw]
- else:
- if wcsnamekw in fobj[wcsext].header:
- hdrname = fobj[wcsext].header[wcsnamekw]
- message = """
- Using default value for HDRNAME of "%s" derived from %s.
- """ % (hdrname, wcsnamekw)
- logger.info(message)
- logger.info("Setting hdrname to %s from header[%s]"
- % (hdrname, wcsnamekw))
- else:
- message = "Required keywords 'HDRNAME' or 'WCSNAME' not found"
- logger.critical(message)
- raise KeyError
-
-
-
- hdul = []
- phdu = _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
- sipname, npolfile, d2imfile,
- nmatch, catalog, wcskey,
- author, descrip, history)
- hdul.append(phdu)
- wcsdvarr_extns = []
- """
- nd2i is a counter for d2i extensions to be used when the science file
- has an old d2i correction format. The old format did not write EXTVER
- kw for the d2i correction in the science header bu tthe new format expects
- them.
- """
- nd2i_extver = 1
- for ext in sciext:
- wkeys = altwcs.wcskeys(fobj, ext=ext)
- if wcskey != ' ':
- if wcskey not in wkeys:
- logger.debug(
- 'No WCS with wcskey=%s found in extension %s. '
- 'Skipping...' % (wcskey, str(ext)))
- raise ValueError("")
-
- hwcs = HSTWCS(fobj, ext=ext, wcskey=wcskey)
-
- whdul = hwcs.to_fits(relax=True, key=" ")
- if hasattr(hwcs, 'orientat'):
- orient_comment = "positions angle of image y axis (deg. e of n)"
- whdul[0].header['ORIENTAT'] = (hwcs.orientat, orient_comment)
-
- whdul[0].header.append(('TG_ENAME', ext[0], 'Target science data extname'))
- whdul[0].header.append(('TG_EVER', ext[1], 'Target science data extver'))
-
- if hwcs.wcs.has_cd():
- whdul[0].header = altwcs.pc2cd(whdul[0].header)
-
- idckw = hwcs._idc2hdr()
- whdul[0].header.extend(idckw)
-
- if hwcs.det2im1 or hwcs.det2im2:
- try:
- whdul[0].header.append(fobj[ext].header.cards['D2IMEXT'])
- except KeyError:
- pass
- whdul[0].header.extend(fobj[ext].header.cards['D2IMERR*'])
- if 'D2IM1.EXTVER' in whdul[0].header:
- try:
- whdul[0].header['D2IM1.EXTVER'] = fobj[ext].header['D2IM1.EXTVER']
- except KeyError:
- whdul[0].header['D2IM1.EXTVER'] = nd2i_extver
- nd2i_extver += 1
- if 'D2IM2.EXTVER' in whdul[0].header:
- try:
- whdul[0].header['D2IM2.EXTVER'] = fobj[ext].header['D2IM2.EXTVER']
- except KeyError:
- whdul[0].header['D2IM2.EXTVER'] = nd2i_extver
- nd2i_extver += 1
-
- if hwcs.cpdis1 or hwcs.cpdis2:
- whdul[0].header.extend(fobj[ext].header.cards['CPERR*'])
- try:
- whdul[0].header.append(fobj[ext].header.cards['NPOLEXT'])
- except KeyError:
- pass
- if 'DP1.EXTVER' in whdul[0].header:
- whdul[0].header['DP1.EXTVER'] = fobj[ext].header['DP1.EXTVER']
- if 'DP2.EXTVER' in whdul[0].header:
- whdul[0].header['DP2.EXTVER'] = fobj[ext].header['DP2.EXTVER']
- ihdu = fits.ImageHDU(header=whdul[0].header, name='SIPWCS')
-
- if ext[0] != "PRIMARY":
- ihdu.update_ext_version(fobj[ext].header['EXTVER'], comment='Extension version')
-
- hdul.append(ihdu)
-
- if hwcs.cpdis1:
- whdu = whdul[('WCSDVARR', 1)].copy()
- whdu.update_ext_version(fobj[ext].header['DP1.EXTVER'])
- hdul.append(whdu)
- if hwcs.cpdis2:
- whdu = whdul[('WCSDVARR', 2)].copy()
- whdu.update_ext_version(fobj[ext].header['DP2.EXTVER'])
- hdul.append(whdu)
-
- if hwcs.det2im1:
- whdu = whdul[('D2IMARR', 1)].copy()
- whdu.update_ext_version(ihdu.header['D2IM1.EXTVER'])
- hdul.append(whdu)
- if hwcs.det2im2:
- whdu = whdul[('D2IMARR', 2)].copy()
- whdu.update_ext_version(ihdu.header['D2IM2.EXTVER'])
- hdul.append(whdu)
-
-
- #if hwcs.det2im1 or hwcs.det2im2:
- #try:
- #darr = hdul[('D2IMARR', 1)]
- #except KeyError:
- #whdu = whdul[('D2IMARR')]
- #whdu.update_ext_version(1)
- #hdul.append(whdu)
- if close_file:
- fobj.close()
-
- hlet = Headerlet(hdul, logging=logging, logmode='a')
- hlet.init_attrs()
- return hlet
-
-@with_logging
-def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True,
- force=False, logging=False, logmode='a'):
- """
- Apply headerlet 'hdrfile' to a science observation 'destfile' as the primary WCS
-
- Parameters
- ----------
- filename: string or list of strings
- File name(s) of science observation whose WCS solution will be updated
- hdrlet: string or list of strings
- Headerlet file(s), must match 1-to-1 with input filename(s)
- attach: boolean
- True (default): append headerlet to FITS file as a new extension.
- archive: boolean
- True (default): before updating, create a headerlet with the
- WCS old solution.
- force: boolean
- If True, this will cause the headerlet to replace the current PRIMARY
- WCS even if it has a different distortion model. [Default: False]
- logging: boolean
- enable file logging
- logmode: 'w' or 'a'
- log file open mode
- """
- if not isinstance(filename, list):
- filename = [filename]
- if not isinstance(hdrlet, list):
- hdrlet = [hdrlet]
- if len(hdrlet) != len(filename):
- logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
-
- for fname,h in zip(filename,hdrlet):
- print("Applying {0} as Primary WCS to {1}".format(h,fname))
- hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.apply_as_primary(fname, attach=attach, archive=archive,
- force=force)
-
-
-@with_logging
-def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None,
- wcsname=None, logging=False, logmode='w'):
- """
- Apply headerlet to a science observation as an alternate WCS
-
- Parameters
- ----------
- filename: string or list of strings
- File name(s) of science observation whose WCS solution will be updated
- hdrlet: string or list of strings
- Headerlet file(s), must match 1-to-1 with input filename(s)
- attach: boolean
- flag indicating if the headerlet should be attached as a
- HeaderletHDU to fobj. If True checks that HDRNAME is unique
- in the fobj and stops if not.
- wcskey: string
- Key value (A-Z, except O) for this alternate WCS
- If None, the next available key will be used
- wcsname: string
- Name to be assigned to this alternate WCS
- WCSNAME is a required keyword in a Headerlet but this allows the
- user to change it as desired.
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- if not isinstance(filename, list):
- filename = [filename]
- if not isinstance(hdrlet, list):
- hdrlet = [hdrlet]
- if len(hdrlet) != len(filename):
- logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
-
- for fname,h in zip(filename,hdrlet):
- print('Applying {0} as an alternate WCS to {1}'.format(h,fname))
- hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.apply_as_alternate(fname, attach=attach,
- wcsname=wcsname, wcskey=wcskey)
-
-
-@with_logging
-def attach_headerlet(filename, hdrlet, logging=False, logmode='a'):
- """
- Attach Headerlet as an HeaderletHDU to a science file
-
- Parameters
- ----------
- filename: HDUList or list of HDULists
- science file(s) to which the headerlet should be applied
- hdrlet: string, Headerlet object or list of strings or Headerlet objects
- string representing a headerlet file(s), must match 1-to-1 input filename(s)
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- if not isinstance(filename, list):
- filename = [filename]
- if not isinstance(hdrlet, list):
- hdrlet = [hdrlet]
- if len(hdrlet) != len(filename):
- logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
-
- for fname,h in zip(filename,hdrlet):
- print('Attaching {0} as Headerlet extension to {1}'.format(h,fname))
- hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.attach_to_file(fname,archive=True)
-
-
-@with_logging
-def delete_headerlet(filename, hdrname=None, hdrext=None, distname=None,
- logging=False, logmode='w'):
- """
- Deletes HeaderletHDU(s) with same HDRNAME from science files
-
- Notes
- -----
- One of hdrname, hdrext or distname should be given.
- If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
- If hdrext is given - delete HeaderletHDU in extension.
- If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
- Updates wcscorr
-
- Parameters
- ----------
- filename: string, HDUList or list of strings
- Filename can be specified as a single filename or HDUList, or
- a list of filenames
- Each input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string or None
- HeaderletHDU primary header keyword HDRNAME
- hdrext: int, tuple or None
- HeaderletHDU FITS extension number
- tuple has the form ('HDRLET', 1)
- distname: string or None
- distortion model as specified in the DISTNAME keyword
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- if not isinstance(filename, list):
- filename = [filename]
-
- for f in filename:
- print("Deleting Headerlet from ",f)
- _delete_single_headerlet(f, hdrname=hdrname, hdrext=hdrext,
- distname=distname, logging=logging, logmode='a')
-
-def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None,
- logging=False, logmode='w'):
- """
- Deletes HeaderletHDU(s) from a SINGLE science file
-
- Notes
- -----
- One of hdrname, hdrext or distname should be given.
- If hdrname is given - delete a HeaderletHDU with a name HDRNAME from fobj.
- If hdrext is given - delete HeaderletHDU in extension.
- If distname is given - deletes all HeaderletHDUs with a specific distortion model from fobj.
- Updates wcscorr
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string or None
- HeaderletHDU primary header keyword HDRNAME
- hdrext: int, tuple or None
- HeaderletHDU FITS extension number
- tuple has the form ('HDRLET', 1)
- distname: string or None
- distortion model as specified in the DISTNAME keyword
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
- hdrlet_ind = find_headerlet_HDUs(filename, hdrname=hdrname, hdrext=hdrext,
- distname=distname, logging=logging, logmode='a')
- if len(hdrlet_ind) == 0:
- message = """
- No HDUs deleted... No Headerlet HDUs found with '
- hdrname = %s
- hdrext = %s
- distname = %s
- Please review input parameters and try again.
- """ % (hdrname, str(hdrext), distname)
- logger.critical(message)
- return
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- # delete row(s) from WCSCORR table now...
- #
- #
- if hdrname not in ['', ' ', None, 'INDEF']:
- selections = {'hdrname': hdrname}
- elif hdrname in ['', ' ', None, 'INDEF'] and hdrext is not None:
- selections = {'hdrname': fobj[hdrext].header['hdrname']}
- else:
- selections = {'distname': distname}
- wcscorr.delete_wcscorr_row(fobj['WCSCORR'].data, selections)
-
- # delete the headerlet extension now
- for hdrind in hdrlet_ind:
- del fobj[hdrind]
-
- utils.updateNEXTENDKw(fobj)
- # Update file object with changes
- fobj.flush()
- # close file, if was opened by this function
- if close_fobj:
- fobj.close()
- logger.critical('Deleted headerlet from extension(s) %s ' % str(hdrlet_ind))
-
-
-def headerlet_summary(filename, columns=None, pad=2, maxwidth=None,
- output=None, clobber=True, quiet=False):
- """
-
- Print a summary of all HeaderletHDUs in a science file to STDOUT, and
- optionally to a text file
- The summary includes:
- HDRLET_ext_number HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- columns: list
- List of headerlet PRIMARY header keywords to report in summary
- By default (set to None), it will use the default set of keywords
- defined as the global list DEFAULT_SUMMARY_COLS
- pad: int
- Number of padding spaces to put between printed columns
- [Default: 2]
- maxwidth: int
- Maximum column width(not counting padding) for any column in summary
- By default (set to None), each column's full width will be used
- output: string (optional)
- Name of optional output file to record summary. This filename
- can contain environment variables.
- [Default: None]
- clobber: bool
- If True, will overwrite any previous output file of same name
- quiet: bool
- If True, will NOT report info to STDOUT
-
- """
- if columns is None:
- summary_cols = DEFAULT_SUMMARY_COLS
- else:
- summary_cols = columns
-
- summary_dict = {}
- for kw in summary_cols:
- summary_dict[kw] = copy.deepcopy(COLUMN_DICT)
-
- # Define Extension number column
- extnums_col = copy.deepcopy(COLUMN_DICT)
- extnums_col['name'] = 'EXTN'
- extnums_col['width'] = 6
-
- fobj, fname, close_fobj = parse_filename(filename)
- # find all HDRLET extensions and combine info into a single summary
- for extn in fobj:
- if 'extname' in extn.header and extn.header['extname'] == 'HDRLET':
- hdrlet_indx = fobj.index_of(('hdrlet', extn.header['extver']))
- try:
- ext_cols, ext_summary = extn.headerlet.summary(columns=summary_cols)
- extnums_col['vals'].append(hdrlet_indx)
- for kw in summary_cols:
- for key in COLUMN_DICT:
- summary_dict[kw][key].extend(ext_summary[kw][key])
- except:
- print("Skipping headerlet")
- print("Could not read Headerlet from extension ", hdrlet_indx)
-
- if close_fobj:
- fobj.close()
-
- # Print out the summary dictionary
- print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
- idcol=extnums_col, output=output,
- clobber=clobber, quiet=quiet)
-
-
-@with_logging
-def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
- force=False, logging=False, logmode='w'):
- """
- Restores a headerlet as a primary WCS
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string
- HDRNAME keyword of HeaderletHDU
- hdrext: int or tuple
- Headerlet extension number of tuple ('HDRLET',2)
- archive: boolean (default: True)
- When the distortion model in the headerlet is the same as the distortion model of
- the science file, this flag indicates if the primary WCS should be saved as an alternate
- nd a headerlet extension.
- When the distortion models do not match this flag indicates if the current primary and
- alternate WCSs should be archived as headerlet extensions and alternate WCS.
- force: boolean (default:False)
- When the distortion models of the headerlet and the primary do not match, and archive
- is False, this flag forces an update of the primary.
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
-
- hdrlet_ind = find_headerlet_HDUs(filename, hdrext=hdrext, hdrname=hdrname)
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- if len(hdrlet_ind) > 1:
- if hdrext:
- kwerr = 'hdrext'
- kwval = hdrext
- else:
- kwerr = 'hdrname'
- kwval = hdrname
- message = """
- Multiple Headerlet extensions found with the same name.
- %d Headerlets with "%s" = %s found in %s.
- """% (len(hdrlet_ind), kwerr, kwval, fname)
- if close_fobj:
- fobj.close()
- logger.critical(message)
- raise ValueError
-
- hdrlet_indx = hdrlet_ind[0]
-
- # read headerlet from HeaderletHDU into memory
- if hasattr(fobj[hdrlet_ind[0]], 'hdulist'):
- hdrlet = fobj[hdrlet_indx].hdulist
- else:
- hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS
-
- # read in the names of the extensions which HeaderletHDU updates
- extlist = []
- for ext in hdrlet:
- if 'extname' in ext.header and ext.header['extname'] == 'SIPWCS':
- # convert from string to tuple or int
- sciext = eval(ext.header['sciext'])
- extlist.append(fobj[sciext])
- # determine whether distortion is the same
- current_distname = hdrlet[0].header['distname']
- same_dist = True
- if current_distname != fobj[0].header['distname']:
- same_dist = False
- if not archive and not force:
- if close_fobj:
- fobj.close()
- message = """
- Headerlet does not have the same distortion as image!
- Set "archive"=True to save old distortion model, or
- set "force"=True to overwrite old model with new.
- """
- logger.critical(message)
- raise ValueError
-
- # check whether primary WCS has been archived already
- # Use information from first 'SCI' extension
- priwcs_name = None
-
- scihdr = extlist[0].header
- sci_wcsnames = altwcs.wcsnames(scihdr).values()
- if 'hdrname' in scihdr:
- priwcs_hdrname = scihdr['hdrname']
- else:
- if 'wcsname' in scihdr:
- priwcs_hdrname = priwcs_name = scihdr['wcsname']
- else:
- if 'idctab' in scihdr:
- priwcs_hdrname = ''.join(['IDC_',
- utils.extract_rootname(scihdr['idctab'], suffix='_idc')])
- else:
- priwcs_hdrname = 'UNKNOWN'
- priwcs_name = priwcs_hdrname
- scihdr['WCSNAME'] = priwcs_name
-
- priwcs_unique = verify_hdrname_is_unique(fobj, priwcs_hdrname)
- if archive and priwcs_unique:
- if priwcs_unique:
- newhdrlet = create_headerlet(fobj, sciext=scihdr['extname'],
- hdrname=priwcs_hdrname)
- newhdrlet.attach_to_file(fobj)
- #
- # copy hdrlet as a primary
- #
- hdrlet.apply_as_primary(fobj, attach=False, archive=archive, force=force)
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- if close_fobj:
- fobj.close()
-
-
-@with_logging
-def restore_all_with_distname(filename, distname, primary, archive=True,
- sciext='SCI', logging=False, logmode='w'):
- """
- Restores all HeaderletHDUs with a given distortion model as alternate WCSs and a primary
-
- Parameters
- --------------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- distname: string
- distortion model as represented by a DISTNAME keyword
- primary: int or string or None
- HeaderletHDU to be restored as primary
- if int - a fits extension
- if string - HDRNAME
- if None - use first HeaderletHDU
- archive: boolean (default True)
- flag indicating if HeaderletHDUs should be created from the
- primary and alternate WCSs in fname before restoring all matching
- headerlet extensions
- logging: boolean
- enable file logging
- logmode: 'a' or 'w'
- """
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- hdrlet_ind = find_headerlet_HDUs(fobj, distname=distname)
- if len(hdrlet_ind) == 0:
- message = """
- No Headerlet extensions found with
-
- DISTNAME = %s in %s.
-
- For a full list of DISTNAMEs found in all headerlet extensions:
-
- get_headerlet_kw_names(fobj, kw='DISTNAME')
- """ % (distname, fname)
- if close_fobj:
- fobj.close()
- logger.critical(message)
- raise ValueError
-
- # Interpret 'primary' parameter input into extension number
- if primary is None:
- primary_ind = hdrlet_ind[0]
- elif isinstance(primary, int):
- primary_ind = primary
- else:
- primary_ind = None
- for ind in hdrlet_ind:
- if fobj[ind].header['hdrname'] == primary:
- primary_ind = ind
- break
- if primary_ind is None:
- if close_fobj:
- fobj.close()
- message = """
- No Headerlet extensions found with DISTNAME = %s in %s.
- """ % (primary, fname)
- logger.critical(message)
- raise ValueError
- # Check to see whether 'primary' HeaderletHDU has same distname as user
- # specified on input
-
- # read headerlet from HeaderletHDU into memory
- if hasattr(fobj[primary_ind], 'hdulist'):
- primary_hdrlet = fobj[primary_ind].hdulist
- else:
- primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS
- pri_distname = primary_hdrlet[0].header['distname']
- if pri_distname != distname:
- if close_fobj:
- fobj.close()
- message = """
- Headerlet extension to be used as PRIMARY WCS
- has "DISTNAME" = %s
- "DISTNAME" = %s was specified on input.
- All updated WCSs must have same DISTNAME. Quitting...'
- """ % (pri_distname, distname)
- logger.critical(message)
- raise ValueError
-
- # read in the names of the WCSs which the HeaderletHDUs will update
- wnames = altwcs.wcsnames(fobj[sciext, 1].header)
-
- # work out how many HeaderletHDUs will be used to update the WCSs
- numhlt = len(hdrlet_ind)
- hdrnames = get_headerlet_kw_names(fobj, kw='wcsname')
-
- # read in headerletHDUs and update WCS keywords
- for hlet in hdrlet_ind:
- if fobj[hlet].header['distname'] == distname:
- if hasattr(fobj[hlet], 'hdulist'):
- hdrlet = fobj[hlet].hdulist
- else:
- hdrlet = fobj[hlet].headerlet # older convention in PyFITS
- if hlet == primary_ind:
- hdrlet.apply_as_primary(fobj, attach=False,
- archive=archive, force=True)
- else:
- hdrlet.apply_as_alternate(fobj, attach=False,
- wcsname=hdrlet[0].header['wcsname'])
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- if close_fobj:
- fobj.close()
-
-
-@with_logging
-def archive_as_headerlet(filename, hdrname, sciext='SCI',
- wcsname=None, wcskey=None, destim=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- logging=False, logmode='w'):
- """
- Save a WCS as a headerlet extension and write it out to a file.
-
- This function will create a headerlet, attach it as an extension to the
- science image (if it has not already been archived) then, optionally,
- write out the headerlet to a separate headerlet file.
-
- Either wcsname or wcskey must be provided, if both are given, they must match a valid WCS
- Updates wcscorr if necessary.
-
- Parameters
- ----------
- filename: string or HDUList
- Either a filename or PyFITS HDUList object for the input science file
- An input filename (str) will be expanded as necessary to interpret
- any environmental variables included in the filename.
- hdrname: string
- Unique name for this headerlet, stored as HDRNAME keyword
- sciext: string
- name (EXTNAME) of extension that contains WCS to be saved
- wcsname: string
- name of WCS to be archived, if " ": stop
- wcskey: one of A...Z or " " or "PRIMARY"
- if " " or "PRIMARY" - archive the primary WCS
- destim: string
- DESTIM keyword
- if NOne, use ROOTNAME or science file name
- sipname: string or None (default)
- Name of unique file where the polynomial distortion coefficients were
- read from. If None, the behavior is:
- The code looks for a keyword 'SIPNAME' in the science header
- If not found, for HST it defaults to 'IDCTAB'
- If there is no SIP model the value is 'NOMODEL'
- If there is a SIP model but no SIPNAME, it is set to 'UNKNOWN'
- npolfile: string or None (default)
- Name of a unique file where the non-polynomial distortion was stored.
- If None:
- The code looks for 'NPOLFILE' in science header.
- If 'NPOLFILE' was not found and there is no npol model, it is set to 'NOMODEL'
- If npol model exists, it is set to 'UNKNOWN'
- d2imfile: string
- Name of a unique file where the detector to image correction was
- stored. If None:
- The code looks for 'D2IMFILE' in the science header.
- If 'D2IMFILE' is not found and there is no d2im correction,
- it is set to 'NOMODEL'
- If d2im correction exists, but 'D2IMFILE' is missing from science
- header, it is set to 'UNKNOWN'
- author: string
- Name of user who created the headerlet, added as 'AUTHOR' keyword
- to headerlet PRIMARY header
- descrip: string
- Short description of the solution provided by the headerlet
- This description will be added as the single 'DESCRIP' keyword
- to the headerlet PRIMARY header
- history: filename, string or list of strings
- Long (possibly multi-line) description of the solution provided
- by the headerlet. These comments will be added as 'HISTORY' cards
- to the headerlet PRIMARY header
- If filename is specified, it will format and attach all text from
- that file as the history.
- logging: boolean
- enable file folling
- logmode: 'w' or 'a'
- log file open mode
- """
-
- fobj, fname, close_fobj = parse_filename(filename, mode='update')
-
- if wcsname in [None, ' ', '', 'INDEF'] and wcskey is None:
- message = """
- No valid WCS found found in %s.
- A valid value for either "wcsname" or "wcskey"
- needs to be specified.
- """ % fname
- if close_fobj:
- fobj.close()
- logger.critical(message)
- raise ValueError
-
- # Translate 'wcskey' value for PRIMARY WCS to valid altwcs value of ' '
- if wcskey == 'PRIMARY':
- wcskey = ' '
- wcskey = wcskey.upper()
-
- numhlt = countExtn(fobj, 'HDRLET')
-
- if wcsname is None:
- scihdr = fobj[sciext, 1].header
- wcsname = scihdr['wcsname'+wcskey]
-
- if hdrname in [None, ' ', '']:
- hdrname = wcsname
-
- # Check to see whether or not a HeaderletHDU with this hdrname already
- # exists
- hdrnames = get_headerlet_kw_names(fobj)
- if hdrname not in hdrnames:
- hdrletobj = create_headerlet(fobj, sciext=sciext,
- wcsname=wcsname, wcskey=wcskey,
- hdrname=hdrname,
- sipname=sipname, npolfile=npolfile,
- d2imfile=d2imfile, author=author,
- descrip=descrip, history=history,
- nmatch=nmatch, catalog=catalog,
- logging=False)
- hlt_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
-
- if destim is not None:
- hlt_hdu[0].header['destim'] = destim
-
- fobj.append(hlt_hdu)
-
- utils.updateNEXTENDKw(fobj)
- fobj.flush()
- else:
- message = """
- Headerlet with hdrname %s already archived for WCS %s
- No new headerlet appended to %s .
- """ % (hdrname, wcsname, fname)
- logger.critical(message)
-
- if close_fobj:
- fobj.close()
-
-#### Headerlet Class definitions
-class Headerlet(fits.HDUList):
- """
- A Headerlet class
- Ref: http://mediawiki.stsci.edu/mediawiki/index.php/Telescopedia:Headerlets
- """
-
- def __init__(self, hdus=[], file=None, logging=False, logmode='w'):
- """
- Parameters
- ----------
- hdus : list
- List of HDUs to be used to create the headerlet object itself
- file: string
- File-like object from which HDUs should be read
- logging: boolean
- enable file logging
- logmode: 'w' or 'a'
- for internal use only, indicates whether the log file
- should be open in attach or write mode
- """
- self.logging = logging
- init_logging('class Headerlet', level=logging, mode=logmode)
-
- super(Headerlet, self).__init__(hdus, file=file)
-
- def init_attrs(self):
- self.fname = self.filename()
- self.hdrname = self[0].header["HDRNAME"]
- self.wcsname = self[0].header["WCSNAME"]
- self.upwcsver = self[0].header.get("UPWCSVER", "")
- self.pywcsver = self[0].header.get("PYWCSVER", "")
- self.destim = self[0].header["DESTIM"]
- self.sipname = self[0].header["SIPNAME"]
- self.idctab = self[0].header["IDCTAB"]
- self.npolfile = self[0].header["NPOLFILE"]
- self.d2imfile = self[0].header["D2IMFILE"]
- self.distname = self[0].header["DISTNAME"]
-
- try:
- self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) #None instead of 1?
- except (IndexError, KeyError):
- self.vafactor = self[0].header.get("VAFACTOR", 1) #None instead of 1?
- self.author = self[0].header["AUTHOR"]
- self.descrip = self[0].header["DESCRIP"]
-
- self.fit_kws = ['HDRNAME', 'NMATCH', 'CATALOG']
- self.history = ''
- # header['HISTORY'] returns an iterable of all HISTORY values
- if 'HISTORY' in self[0].header:
- for hist in self[0].header['HISTORY']:
- self.history += hist + '\n'
-
- self.d2imerr = 0
- self.axiscorr = 1
-
- # Overridden to support the Headerlet logging features
- @classmethod
- def fromfile(cls, fileobj, mode='readonly', memmap=False,
- save_backup=False, logging=False, logmode='w', **kwargs):
- hlet = super(cls, cls).fromfile(fileobj, mode, memmap, save_backup,
- **kwargs)
- if len(hlet) > 0:
- hlet.init_attrs()
- hlet.logging = logging
- init_logging('class Headerlet', level=logging, mode=logmode)
- return hlet
-
- @classmethod
- def fromstring(cls, data, **kwargs):
- hlet = super(cls, cls).fromstring(data, **kwargs)
- hlet.logging = logging
- init_logging('class Headerlet', level=logging, mode=logmode)
- return hlet
-
- def apply_as_primary(self, fobj, attach=True, archive=True, force=False):
- """
- Copy this headerlet as a primary WCS to fobj
-
- Parameters
- ----------
- fobj: string, HDUList
- science file to which the headerlet should be applied
- attach: boolean
- flag indicating if the headerlet should be attached as a
- HeaderletHDU to fobj. If True checks that HDRNAME is unique
- in the fobj and stops if not.
- archive: boolean (default is True)
- When the distortion model in the headerlet is the same as the
- distortion model of the science file, this flag indicates if
- the primary WCS should be saved as an alternate and a headerlet
- extension.
- When the distortion models do not match this flag indicates if
- the current primary and alternate WCSs should be archived as
- headerlet extensions and alternate WCS.
- force: boolean (default is False)
- When the distortion models of the headerlet and the primary do
- not match, and archive is False this flag forces an update
- of the primary
- """
- self.hverify()
- fobj, fname, close_dest = parse_filename(fobj, mode='update')
- if not self.verify_dest(fobj, fname):
- if close_dest:
- fobj.close()
- raise ValueError("Destination name does not match headerlet"
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
-
- # Check to see whether the distortion model in the destination
- # matches the distortion model in the headerlet being applied
-
- dname = self.get_destination_model(fobj)
- dist_models_equal = self.equal_distmodel(dname)
- if not dist_models_equal and not force:
- raise ValueError("Distortion models do not match"
- " To overwrite the distortion model, set force=True")
-
- orig_hlt_hdu = None
- numhlt = countExtn(fobj, 'HDRLET')
- hdrlet_extnames = get_headerlet_kw_names(fobj)
-
- # Insure that WCSCORR table has been created with all original
- # WCS's recorded prior to adding the headerlet WCS
- wcscorr.init_wcscorr(fobj)
-
-
- ### start archive
- # If archive has been specified
- # regardless of whether or not the distortion models are equal...
-
- numsip = countExtn(self, 'SIPWCS')
- sciext_list = []
- alt_hlethdu = []
- for i in range(1, numsip+1):
- sipheader = self[('SIPWCS', i)].header
- sciext_list.append((sipheader['TG_ENAME'], sipheader['TG_EVER']))
- target_ext = sciext_list[0]
- if archive:
- if 'wcsname' in fobj[target_ext].header:
- hdrname = fobj[target_ext].header['WCSNAME']
- wcsname = hdrname
- else:
- hdrname = fobj[0].header['ROOTNAME'] + '_orig'
- wcsname = None
- if hdrname not in hdrlet_extnames:
- # - if WCS has not been saved, write out WCS as headerlet extension
- # Create a headerlet for the original Primary WCS data in the file,
- # create an HDU from the original headerlet, and append it to
- # the file
- orig_hlt = create_headerlet(fobj, sciext=sciext_list, #[target_ext],
- wcsname=wcsname,
- hdrname=hdrname,
- logging=self.logging)
- orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt)
- numhlt += 1
- orig_hlt_hdu.header['EXTVER'] = numhlt
- logger.info("Created headerlet %s to be attached to file" % hdrname)
- else:
- logger.info("Headerlet with name %s is already attached" % hdrname)
-
-
- if dist_models_equal:
- # Use the WCSNAME to determine whether or not to archive
- # Primary WCS as altwcs
- # wcsname = hwcs.wcs.name
- scihdr = fobj[target_ext].header
- if 'hdrname' in scihdr:
- priwcs_name = scihdr['hdrname']
- else:
- if 'wcsname' in scihdr:
- priwcs_name = scihdr['wcsname']
- else:
- if 'idctab' in scihdr:
- priwcs_name = ''.join(['IDC_',
- utils.extract_rootname(scihdr['idctab'],
- suffix='_idc')])
- else:
- priwcs_name = 'UNKNOWN'
- nextkey = altwcs.next_wcskey(fobj, ext=target_ext)
- altwcs.archiveWCS(fobj, ext=sciext_list, wcskey=nextkey,
- wcsname=priwcs_name)
- else:
-
- for hname in altwcs.wcsnames(fobj, ext=target_ext).values():
- if hname != 'OPUS' and hname not in hdrlet_extnames:
- # get HeaderletHDU for alternate WCS as well
- alt_hlet = create_headerlet(fobj, sciext=sciext_list,
- wcsname=hname, wcskey=wcskey,
- hdrname=hname, sipname=None,
- npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- logging=self.logging)
- numhlt += 1
- alt_hlet_hdu = HeaderletHDU.fromheaderlet(alt_hlet)
- alt_hlet_hdu.header['EXTVER'] = numhlt
- alt_hlethdu.append(alt_hlet_hdu)
- hdrlet_extnames.append(hname)
-
- self._del_dest_WCS_ext(fobj)
- for i in range(1, numsip+1):
- target_ext = sciext_list[i-1]
- self._del_dest_WCS(fobj, target_ext)
- sipwcs = HSTWCS(self, ('SIPWCS', i))
- idckw = sipwcs._idc2hdr()
- priwcs = sipwcs.to_fits(relax=True)
- numnpol = 1
- numd2im = 1
- if sipwcs.wcs.has_cd():
- priwcs[0].header = altwcs.pc2cd(priwcs[0].header)
- priwcs[0].header.extend(idckw)
- if 'crder1' in sipheader:
- for card in sipheader['crder*'].cards:
- priwcs[0].header.set(card.keyword, card.value, card.comment,
- after='WCSNAME')
- # Update WCS with HDRNAME as well
-
- for kw in ['SIMPLE', 'BITPIX', 'NAXIS', 'EXTEND']:
- try:
- priwcs[0].header.remove(kw)
- except ValueError:
- pass
-
- priwcs[0].header.set('WCSNAME', self[0].header['WCSNAME'], "")
- priwcs[0].header.set('WCSAXES', self[('SIPWCS', i)].header['WCSAXES'], "")
- priwcs[0].header.set('HDRNAME', self[0].header['HDRNAME'], "")
- if sipwcs.det2im1 or sipwcs.det2im2:
- try:
- d2imerr = self[('SIPWCS', i)].header['D2IMERR*']
- priwcs[0].header.extend(d2imerr)
- except KeyError:
- pass
- try:
- priwcs[0].header.append(self[('SIPWCS', i)].header.cards['D2IMEXT'])
- except KeyError:
- pass
- if 'D2IM1.EXTVER' in priwcs[0].header:
- priwcs[0].header['D2IM1.EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
- priwcs[('D2IMARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM1.EXTVER']
- if 'D2IM2.EXTVER' in priwcs[0].header:
- priwcs[0].header['D2IM2.EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
- priwcs[('D2IMARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['D2IM2.EXTVER']
- # D2IM1 will NOT exist for WFPC2 data...
- if 'D2IM1.EXTVER' in priwcs[0].header:
- # only set number of D2IM extensions to 2 if D2IM1 exists
- numd2im = 2
-
- if sipwcs.cpdis1 or sipwcs.cpdis2:
- try:
- cperr = self[('SIPWCS', i)].header['CPERR*']
- priwcs[0].header.extend(cperr)
- except KeyError:
- pass
- try:
- priwcs[0].header.append(self[('SIPWCS', i)].header.cards['NPOLEXT'])
- except KeyError:
- pass
- if 'DP1.EXTVER' in priwcs[0].header:
- priwcs[0].header['DP1.EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
- priwcs[('WCSDVARR', 1)].header['EXTVER'] = self[('SIPWCS', i)].header['DP1.EXTVER']
- if 'DP2.EXTVER' in priwcs[0].header:
- priwcs[0].header['DP2.EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
- priwcs[('WCSDVARR', 2)].header['EXTVER'] = self[('SIPWCS', i)].header['DP2.EXTVER']
- numnpol = 2
-
- fobj[target_ext].header.extend(priwcs[0].header)
- if sipwcs.cpdis1:
- whdu = priwcs[('WCSDVARR', (i-1)*numnpol+1)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['DP1.EXTVER'])
- fobj.append(whdu)
- if sipwcs.cpdis2:
- whdu = priwcs[('WCSDVARR', i*numnpol)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['DP2.EXTVER'])
- fobj.append(whdu)
- if sipwcs.det2im1: #or sipwcs.det2im2:
- whdu = priwcs[('D2IMARR', (i-1)*numd2im+1)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM1.EXTVER'])
- fobj.append(whdu)
- if sipwcs.det2im2:
- whdu = priwcs[('D2IMARR', i*numd2im)].copy()
- whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM2.EXTVER'])
- fobj.append(whdu)
-
- update_versions(self[0].header, fobj[0].header)
- refs = update_ref_files(self[0].header, fobj[0].header)
- # Update the WCSCORR table with new rows from the headerlet's WCSs
- wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
-
- # Append the original headerlet
- if archive and orig_hlt_hdu:
- fobj.append(orig_hlt_hdu)
- # Append any alternate WCS Headerlets
- if len(alt_hlethdu) > 0:
- for ahdu in alt_hlethdu:
- fobj.append(ahdu)
- if attach:
- # Finally, append an HDU for this headerlet
- self.attach_to_file(fobj)
- utils.updateNEXTENDKw(fobj)
- if close_dest:
- fobj.close()
-
-
- def apply_as_alternate(self, fobj, attach=True, wcskey=None, wcsname=None):
- """
- Copy this headerlet as an alternate WCS to fobj
-
- Parameters
- ----------
- fobj: string, HDUList
- science file/HDUList to which the headerlet should be applied
- attach: boolean
- flag indicating if the headerlet should be attached as a
- HeaderletHDU to fobj. If True checks that HDRNAME is unique
- in the fobj and stops if not.
- wcskey: string
- Key value (A-Z, except O) for this alternate WCS
- If None, the next available key will be used
- wcsname: string
- Name to be assigned to this alternate WCS
- WCSNAME is a required keyword in a Headerlet but this allows the
- user to change it as desired.
-
- """
- self.hverify()
- fobj, fname, close_dest = parse_filename(fobj, mode='update')
- if not self.verify_dest(fobj, fname):
- if close_dest:
- fobj.close()
- raise ValueError("Destination name does not match headerlet"
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
-
- # Verify whether this headerlet has the same distortion
- #found in the image being updated
- dname = self.get_destination_model(fobj)
- dist_models_equal = self.equal_distmodel(dname)
- if not dist_models_equal:
- raise ValueError("Distortion models do not match \n"
- "Headerlet: %s \n"
- "Destination file: %s\n"
- "attach_to_file() can be used to append this headerlet" %(self.distname, dname))
-
- # Insure that WCSCORR table has been created with all original
- # WCS's recorded prior to adding the headerlet WCS
- wcscorr.init_wcscorr(fobj)
-
- # determine value of WCSNAME to be used
- if wcsname is not None:
- wname = wcsname
- else:
- wname = self[0].header['WCSNAME']
- tg_ename = self[('SIPWCS', 1)].header['TG_ENAME']
- tg_ever = self[('SIPWCS', 1)].header['TG_EVER']
- # determine what alternate WCS this headerlet will be assigned to
- if wcskey is None:
- wkey = altwcs.next_wcskey(fobj[(tg_ename, tg_ever)].header)
- else:
- wcskey = wcskey.upper()
- available_keys = altwcs.available_wcskeys(fobj[(tg_ename, tg_ever)].header)
- if wcskey in available_keys:
- wkey = wcskey
- else:
- mess = "Observation %s already contains alternate WCS with key %s" % (fname, wcskey)
- logger.critical(mess)
- if close_dest:
- fobj.close()
- raise ValueError(mess)
- numsip = countExtn(self, 'SIPWCS')
- for idx in range(1, numsip + 1):
- siphdr = self[('SIPWCS', idx)].header
- tg_ext = (siphdr['TG_ENAME'], siphdr['TG_EVER'])
-
- fhdr = fobj[tg_ext].header
- hwcs = pywcs.WCS(siphdr, self)
- hwcs_header = hwcs.to_header(key=wkey)
- _idc2hdr(siphdr, fhdr, towkey=wkey)
- if hwcs.wcs.has_cd():
- hwcs_header = altwcs.pc2cd(hwcs_header, key=wkey)
-
- fhdr.extend(hwcs_header)
- fhdr['WCSNAME' + wkey] = wname
- # also update with HDRNAME (a non-WCS-standard kw)
- for kw in self.fit_kws:
- #fhdr.insert(wind, pyfits.Card(kw + wkey,
- # self[0].header[kw]))
- fhdr.append(fits.Card(kw + wkey, self[0].header[kw]))
- # Update the WCSCORR table with new rows from the headerlet's WCSs
- wcscorr.update_wcscorr(fobj, self, 'SIPWCS')
-
- if attach:
- self.attach_to_file(fobj)
- utils.updateNEXTENDKw(fobj)
-
- if close_dest:
- fobj.close()
-
- def attach_to_file(self, fobj, archive=False):
- """
- Attach Headerlet as an HeaderletHDU to a science file
-
- Parameters
- ----------
- fobj: string, HDUList
- science file/HDUList to which the headerlet should be applied
- archive: string
- Specifies whether or not to update WCSCORR table when attaching
-
- Notes
- -----
- The algorithm used by this method:
- - verify headerlet can be applied to this file (based on DESTIM)
- - verify that HDRNAME is unique for this file
- - attach as HeaderletHDU to fobj
-
- """
- self.hverify()
- fobj, fname, close_dest = parse_filename(fobj, mode='update')
- destver = self.verify_dest(fobj, fname)
- hdrver = self.verify_hdrname(fobj)
- if destver and hdrver:
-
- numhlt = countExtn(fobj, 'HDRLET')
- new_hlt = HeaderletHDU.fromheaderlet(self)
- new_hlt.header['extver'] = numhlt + 1
- fobj.append(new_hlt)
- utils.updateNEXTENDKw(fobj)
- else:
- message = "Headerlet %s cannot be attached to" % (self.hdrname)
- message += "observation %s" % (fname)
- if not destver:
- message += " * Image %s keyword ROOTNAME not equal to " % (fname)
- message += " DESTIM = '%s'\n" % (self.destim)
- if not hdrver:
- message += " * Image %s already has headerlet " % (fname)
- message += "with HDRNAME='%s'\n" % (self.hdrname)
- logger.critical(message)
- if close_dest:
- fobj.close()
-
- def info(self, columns=None, pad=2, maxwidth=None,
- output=None, clobber=True, quiet=False):
- """
- Prints a summary of this headerlet
- The summary includes:
- HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
-
- Parameters
- ----------
- columns: list
- List of headerlet PRIMARY header keywords to report in summary
- By default (set to None), it will use the default set of keywords
- defined as the global list DEFAULT_SUMMARY_COLS
- pad: int
- Number of padding spaces to put between printed columns
- [Default: 2]
- maxwidth: int
- Maximum column width(not counting padding) for any column in summary
- By default (set to None), each column's full width will be used
- output: string (optional)
- Name of optional output file to record summary. This filename
- can contain environment variables.
- [Default: None]
- clobber: bool
- If True, will overwrite any previous output file of same name
- quiet: bool
- If True, will NOT report info to STDOUT
-
- """
- summary_cols, summary_dict = self.summary(columns=columns)
- print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
- idcol=None, output=output, clobber=clobber, quiet=quiet)
-
- def summary(self, columns=None):
- """
- Returns a summary of this headerlet as a dictionary
-
- The summary includes a summary of the distortion model as :
- HDRNAME WCSNAME DISTNAME SIPNAME NPOLFILE D2IMFILE
-
- Parameters
- ----------
- columns: list
- List of headerlet PRIMARY header keywords to report in summary
- By default(set to None), it will use the default set of keywords
- defined as the global list DEFAULT_SUMMARY_COLS
-
- Returns
- -------
- summary: dict
- Dictionary of values for summary
- """
- if columns is None:
- summary_cols = DEFAULT_SUMMARY_COLS
- else:
- summary_cols = columns
-
- # Initialize summary dict based on requested columns
- summary = {}
- for kw in summary_cols:
- summary[kw] = copy.deepcopy(COLUMN_DICT)
-
- # Populate the summary with headerlet values
- for kw in summary_cols:
- if kw in self[0].header:
- val = self[0].header[kw]
- else:
- val = 'INDEF'
- summary[kw]['vals'].append(val)
- summary[kw]['width'].append(max(len(val), len(kw)))
-
- return summary_cols, summary
-
- def hverify(self):
- """
- Verify the headerlet file is a valid fits file and has
- the required Primary Header keywords
- """
- self.verify()
- header = self[0].header
- assert('DESTIM' in header and header['DESTIM'].strip())
- assert('HDRNAME' in header and header['HDRNAME'].strip())
- assert('UPWCSVER' in header)
-
- def verify_hdrname(self, dest):
- """
- Verifies that the headerlet can be applied to the observation
-
- Reports whether or not this file already has a headerlet with this
- HDRNAME.
- """
- unique = verify_hdrname_is_unique(dest, self.hdrname)
- logger.debug("verify_hdrname() returned %s"%unique)
- return unique
-
- def get_destination_model(self, dest):
- """
- Verifies that the headerlet can be applied to the observation
-
- Determines whether or not the file specifies the same distortion
- model/reference files.
- """
- destim_opened = False
- if not isinstance(dest, fits.HDUList):
- destim = fits.open(dest)
- destim_opened = True
- else:
- destim = dest
- dname = destim[0].header['DISTNAME'] if 'distname' in destim[0].header \
- else self.build_distname(dest)
- if destim_opened:
- destim.close()
- return dname
-
- def equal_distmodel(self, dmodel):
- if dmodel != self[0].header['DISTNAME']:
- if self.logging:
- message = """
- Distortion model in headerlet not the same as destination model
- Headerlet model : %s
- Destination model: %s
- """ % (self[0].header['DISTNAME'], dmodel)
- logger.critical(message)
- return False
- else:
- return True
-
- def verify_dest(self, dest, fname):
- """
- verifies that the headerlet can be applied to the observation
-
- DESTIM in the primary header of the headerlet must match ROOTNAME
- of the science file (or the name of the destination file)
- """
- try:
- if not isinstance(dest, fits.HDUList):
- droot = fits.getval(dest, 'ROOTNAME')
- else:
- droot = dest[0].header['ROOTNAME']
- except KeyError:
- logger.debug("Keyword 'ROOTNAME' not found in destination file")
- droot = dest.split('.fits')[0]
- if droot == self.destim:
- logger.debug("verify_destim() returned True")
- return True
- else:
- logger.debug("verify_destim() returned False")
- logger.critical("Destination name does not match headerlet. "
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
- return False
-
- def build_distname(self, dest):
- """
- Builds the DISTNAME for dest based on reference file names.
- """
-
- try:
- npolfile = dest[0].header['NPOLFILE']
- except KeyError:
- npolfile = None
- try:
- d2imfile = dest[0].header['D2IMFILE']
- except KeyError:
- d2imfile = None
-
- sipname, idctab = utils.build_sipname(dest, dest, None)
- npolname, npolfile = utils.build_npolname(dest, npolfile)
- d2imname, d2imfile = utils.build_d2imname(dest, d2imfile)
- dname = utils.build_distname(sipname,npolname,d2imname)
- return dname
-
- def tofile(self, fname, destim=None, hdrname=None, clobber=False):
- """
- Write this headerlet to a file
-
- Parameters
- ----------
- fname: string
- file name
- destim: string (optional)
- provide a value for DESTIM keyword
- hdrname: string (optional)
- provide a value for HDRNAME keyword
- clobber: boolean
- a flag which allows to overwrte an existing file
- """
- if not destim or not hdrname:
- self.hverify()
- self.writeto(fname, clobber=clobber)
-
- def _del_dest_WCS(self, dest, ext=None):
- """
- Delete the WCS of a science file extension
- """
-
- logger.info("Deleting all WCSs of file %s" % dest.filename())
- numext = len(dest)
-
- if ext:
- fext = dest[ext]
- self._remove_d2im(fext)
- self._remove_sip(fext)
- self._remove_lut(fext)
- self._remove_primary_WCS(fext)
- self._remove_idc_coeffs(fext)
- self._remove_fit_values(fext)
- else:
- for idx in range(numext):
- # Only delete WCS from extensions which may have WCS keywords
- if ('XTENSION' in dest[idx].header and
- dest[idx].header['XTENSION'] == 'IMAGE'):
- self._remove_d2im(dest[idx])
- self._remove_sip(dest[idx])
- self._remove_lut(dest[idx])
- self._remove_primary_WCS(dest[idx])
- self._remove_idc_coeffs(dest[idx])
- self._remove_fit_values(dest[idx])
- self._remove_ref_files(dest[0])
- """
- if not ext:
- self._remove_alt_WCS(dest, ext=range(numext))
- else:
- self._remove_alt_WCS(dest, ext=ext)
- """
- def _del_dest_WCS_ext(self, dest):
- numwdvarr = countExtn(dest, 'WCSDVARR')
- numd2im = countExtn(dest, 'D2IMARR')
- if numwdvarr > 0:
- for idx in range(1, numwdvarr + 1):
- del dest[('WCSDVARR', idx)]
- if numd2im > 0:
- for idx in range(1, numd2im + 1):
- del dest[('D2IMARR', idx)]
-
- def _remove_ref_files(self, phdu):
- """
- phdu: Primary HDU
- """
- refkw = ['IDCTAB', 'NPOLFILE', 'D2IMFILE', 'SIPNAME', 'DISTNAME']
- for kw in refkw:
- try:
- del phdu.header[kw]
- except KeyError:
- pass
-
- def _remove_fit_values(self, ext):
- """
- Remove the any existing astrometric fit values from a FITS extension
- """
-
- logger.debug("Removing astrometric fit values from (%s, %s)"%
- (ext.name, ext.ver))
- dkeys = altwcs.wcskeys(ext.header)
- if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values
- for fitkw in ['NMATCH', 'CATALOG']:
- for k in dkeys:
- fkw = (fitkw+k).rstrip()
- if fkw in ext.header:
- del ext.header[fkw]
-
- def _remove_sip(self, ext):
- """
- Remove the SIP distortion of a FITS extension
- """
-
- logger.debug("Removing SIP distortion from (%s, %s)"
- % (ext.name, ext.ver))
- for prefix in ['A', 'B', 'AP', 'BP']:
- try:
- order = ext.header[prefix + '_ORDER']
- del ext.header[prefix + '_ORDER']
- except KeyError:
- continue
- for i in range(order + 1):
- for j in range(order + 1):
- key = prefix + '_%d_%d' % (i, j)
- try:
- del ext.header[key]
- except KeyError:
- pass
- try:
- del ext.header['IDCTAB']
- except KeyError:
- pass
-
- def _remove_lut(self, ext):
- """
- Remove the Lookup Table distortion of a FITS extension
- """
-
- logger.debug("Removing LUT distortion from (%s, %s)"
- % (ext.name, ext.ver))
- try:
- cpdis = ext.header['CPDIS*']
- except KeyError:
- return
- try:
- for c in range(1, len(cpdis) + 1):
- del ext.header['DP%s*...' % c]
- del ext.header[cpdis.cards[c - 1].keyword]
- del ext.header['CPERR*']
- del ext.header['NPOLFILE']
- del ext.header['NPOLEXT']
- except KeyError:
- pass
-
- def _remove_d2im(self, ext):
- """
- Remove the Detector to Image correction of a FITS extension
- """
-
- logger.debug("Removing D2IM correction from (%s, %s)"
- % (ext.name, ext.ver))
- try:
- d2imdis = ext.header['D2IMDIS*']
- except KeyError:
- return
- try:
- for c in range(1, len(d2imdis) + 1):
- del ext.header['D2IM%s*...' % c]
- del ext.header[d2imdis.cards[c - 1].keyword]
- del ext.header['D2IMERR*']
- del ext.header['D2IMFILE']
- del ext.header['D2IMEXT']
- except KeyError:
- pass
-
- def _remove_alt_WCS(self, dest, ext):
- """
- Remove Alternate WCSs of a FITS extension.
- A WCS with wcskey 'O' is never deleted.
- """
- dkeys = altwcs.wcskeys(dest[('SCI', 1)].header)
- for val in ['O', '', ' ']:
- if val in dkeys:
- dkeys.remove(val) # Never delete WCS with wcskey='O'
-
- logger.debug("Removing alternate WCSs with keys %s from %s"
- % (dkeys, dest.filename()))
- for k in dkeys:
- altwcs.deleteWCS(dest, ext=ext, wcskey=k)
-
- def _remove_primary_WCS(self, ext):
- """
- Remove the primary WCS of a FITS extension
- """
-
- logger.debug("Removing Primary WCS from (%s, %s)"
- % (ext.name, ext.ver))
- naxis = ext.header['NAXIS']
- for key in basic_wcs:
- for i in range(1, naxis + 1):
- try:
- del ext.header[key + str(i)]
- except KeyError:
- pass
- try:
- del ext.header['WCSAXES']
- except KeyError:
- pass
- try:
- del ext.header['WCSNAME']
- except KeyError:
- pass
-
- def _remove_idc_coeffs(self, ext):
- """
- Remove IDC coefficients of a FITS extension
- """
-
- logger.debug("Removing IDC coefficient from (%s, %s)"
- % (ext.name, ext.ver))
- coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
- for k in coeffs:
- try:
- del ext.header[k]
- except KeyError:
- pass
-
-@with_logging
-def _idc2hdr(fromhdr, tohdr, towkey=' '):
- """
- Copy the IDC (HST specific) keywords from one header to another
-
- """
- # save some of the idc coefficients
- coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
- for c in coeffs:
- try:
- tohdr[c+towkey] = fromhdr[c]
- logger.debug("Copied %s to header")
- except KeyError:
- continue
-
-
-def get_extname_extver_list(fobj, sciext):
- """
- Create a list of (EXTNAME, EXTVER) tuples
-
- Based on sciext keyword (see docstring for create_headerlet)
- walk throughh the file and convert extensions in `sciext` to
- valid (EXTNAME, EXTVER) tuples.
- """
- extlist = []
- if isinstance(sciext, int):
- if sciext == 0:
- extname = 'PRIMARY'
- extver = 1
- else:
- try:
- extname = fobj[sciext].header['EXTNAME']
- except KeyError:
- extname = ""
- try:
- extver = fobj[sciext].header['EXTVER']
- except KeyError:
- extver = 1
- extlist.append((extname, extver))
- elif isinstance(sciext, str):
- if sciext == 'PRIMARY':
- extname = "PRIMARY"
- extver = 1
- extlist.append((extname, extver))
- else:
- for ext in fobj:
- try:
- extname = ext.header['EXTNAME']
- except KeyError:
- continue
- if extname.upper() == sciext.upper():
- try:
- extver = ext.header['EXTVER']
- except KeyError:
- extver = 1
- extlist.append((extname, extver))
- elif isinstance(sciext, list):
- if isinstance(sciext[0], int):
- for i in sciext:
- try:
- extname = fobj[i].header['EXTNAME']
- except KeyError:
- if i == 0:
- extname = "PRIMARY"
- extver = 1
- else:
- extname = ""
- try:
- extver = fobj[i].header['EXTVER']
- except KeyError:
- extver = 1
- extlist.append((extname, extver))
- else:
- extlist = sciext[:]
- else:
- errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
- " a valid EXTNAME string, or an integer."
- logger.critical(errstr)
- raise ValueError
- return extlist
-
-
-class HeaderletHDU(fits.hdu.nonstandard.FitsHDU):
- """
- A non-standard extension HDU for encapsulating Headerlets in a file. These
- HDUs have an extension type of HDRLET and their EXTNAME is derived from the
- Headerlet's HDRNAME.
-
- The data itself is a FITS file embedded within the HDU data. The file name
- is derived from the HDRNAME keyword, and should be in the form
- `<HDRNAME>_hdr.fits`. If the COMPRESS keyword evaluates to `True`, the tar
- file is compressed with gzip compression.
-
- The structure of this HDU is the same as that proposed for the 'FITS'
- extension type proposed here:
- http://listmgr.cv.nrao.edu/pipermail/fitsbits/2002-April/thread.html
-
- The Headerlet contained in the HDU's data can be accessed by the
- `headerlet` attribute.
- """
-
- _extension = 'HDRLET'
-
- @lazyproperty
- def headerlet(self):
- """Return the encapsulated headerlet as a Headerlet object.
-
- This is similar to the hdulist property inherited from the FitsHDU
- class, though the hdulist property returns a normal HDUList object.
- """
-
- return Headerlet(self.hdulist)
-
- @classmethod
- def fromheaderlet(cls, headerlet, compress=False):
- """
- Creates a new HeaderletHDU from a given Headerlet object.
-
- Parameters
- ----------
- headerlet : `Headerlet`
- A valid Headerlet object.
-
- compress : bool, optional
- Gzip compress the headerlet data.
-
- Returns
- -------
- hlet : `HeaderletHDU`
- A `HeaderletHDU` object for the given `Headerlet` that can be
- attached as an extension to an existing `HDUList`.
- """
-
- # TODO: Perhaps check that the given object is in fact a valid
- # Headerlet
- hlet = cls.fromhdulist(headerlet, compress)
-
- # Add some more headerlet-specific keywords to the header
- phdu = headerlet[0]
-
- if 'SIPNAME' in phdu.header:
- sipname = phdu.header['SIPNAME']
- else:
- sipname = phdu.header['WCSNAME']
-
- hlet.header['HDRNAME'] = (phdu.header['HDRNAME'],
- phdu.header.comments['HDRNAME'])
- hlet.header['DATE'] = (phdu.header['DATE'],
- phdu.header.comments['DATE'])
- hlet.header['SIPNAME'] = (sipname, 'SIP distortion model name')
- hlet.header['WCSNAME'] = (phdu.header['WCSNAME'], 'WCS name')
- hlet.header['DISTNAME'] = (phdu.header['DISTNAME'],
- 'Distortion model name')
- hlet.header['NPOLFILE'] = (phdu.header['NPOLFILE'],
- phdu.header.comments['NPOLFILE'])
- hlet.header['D2IMFILE'] = (phdu.header['D2IMFILE'],
- phdu.header.comments['D2IMFILE'])
- hlet.header['EXTNAME'] = (cls._extension, 'Extension name')
-
- return hlet
-
-
-fits.register_hdu(HeaderletHDU)
diff --git a/lib/stwcs/wcsutil/hstwcs.py b/lib/stwcs/wcsutil/hstwcs.py
deleted file mode 100644
index bfebcfc..0000000
--- a/lib/stwcs/wcsutil/hstwcs.py
+++ /dev/null
@@ -1,988 +0,0 @@
-from __future__ import absolute_import, division, print_function # confidence high
-
-import os
-from astropy.wcs import WCS
-from astropy.io import fits
-from stwcs.distortion import models, coeff_converter
-import numpy as np
-from stsci.tools import fileutil
-
-from . import altwcs
-from . import getinput
-from . import mappings
-from . import instruments
-from .mappings import inst_mappings, ins_spec_kw
-from .mappings import basic_wcs
-
-__docformat__ = 'restructuredtext'
-
-#
-#### Utility functions copied from 'updatewcs.utils' to avoid circular imports
-#
-def extract_rootname(kwvalue,suffix=""):
- """ Returns the rootname from a full reference filename
-
- If a non-valid value (any of ['','N/A','NONE','INDEF',None]) is input,
- simply return a string value of 'NONE'
-
- This function will also replace any 'suffix' specified with a blank.
- """
- # check to see whether a valid kwvalue has been provided as input
- if kwvalue.strip() in ['','N/A','NONE','INDEF',None]:
- return 'NONE' # no valid value, so return 'NONE'
-
- # for a valid kwvalue, parse out the rootname
- # strip off any environment variable from input filename, if any are given
- if '$' in kwvalue:
- fullval = kwvalue[kwvalue.find('$')+1:]
- else:
- fullval = kwvalue
- # Extract filename without path from kwvalue
- fname = os.path.basename(fullval).strip()
-
- # Now, rip out just the rootname from the full filename
- rootname = fileutil.buildNewRootname(fname)
-
- # Now, remove any known suffix from rootname
- rootname = rootname.replace(suffix,'')
- return rootname.strip()
-
-def build_default_wcsname(idctab):
-
- idcname = extract_rootname(idctab,suffix='_idc')
- wcsname = 'IDC_' + idcname
- return wcsname
-
-
-class NoConvergence(Exception):
- """
- An error class used to report non-convergence and/or divergence of
- numerical methods. It is used to report errors in the iterative solution
- used by the :py:meth:`~stwcs.hstwcs.HSTWCS.all_world2pix`\ .
-
- Attributes
- ----------
-
- best_solution : numpy.array
- Best solution achieved by the method.
-
- accuracy : float
- Accuracy of the :py:attr:`best_solution`\ .
-
- niter : int
- Number of iterations performed by the numerical method to compute
- :py:attr:`best_solution`\ .
-
- divergent : None, numpy.array
- Indices of the points in :py:attr:`best_solution` array for which the
- solution appears to be divergent. If the solution does not diverge,
- `divergent` will be set to `None`.
-
- failed2converge : None, numpy.array
- Indices of the points in :py:attr:`best_solution` array for which the
- solution failed to converge within the specified maximum number
- of iterations. If there are no non-converging poits (i.e., if
- the required accuracy has been achieved for all points) then
- `failed2converge` will be set to `None`.
-
- """
- def __init__(self, *args, **kwargs):
- super(NoConvergence, self).__init__(*args)
-
- self.best_solution = kwargs.pop('best_solution', None)
- self.accuracy = kwargs.pop('accuracy', None)
- self.niter = kwargs.pop('niter', None)
- self.divergent = kwargs.pop('divergent', None)
- self.failed2converge= kwargs.pop('failed2converge', None)
-
-
-#
-#### HSTWCS Class definition
-#
-class HSTWCS(WCS):
-
- def __init__(self, fobj=None, ext=None, minerr=0.0, wcskey=" "):
- """
- Create a WCS object based on the instrument.
-
- In addition to basic WCS keywords this class provides
- instrument specific information needed in distortion computation.
-
- Parameters
- ----------
- fobj : str or `astropy.io.fits.HDUList` object or None
- file name, e.g j9irw4b1q_flt.fits
- fully qualified filename[EXTNAME,EXTNUM], e.g. j9irw4b1q_flt.fits[sci,1]
- `astropy.io.fits` file object, e.g fits.open('j9irw4b1q_flt.fits'), in which case the
- user is responsible for closing the file object.
- ext : int, tuple or None
- extension number
- if ext is tuple, it must be ("EXTNAME", EXTNUM), e.g. ("SCI", 2)
- if ext is None, it is assumed the data is in the primary hdu
- minerr : float
- minimum value a distortion correction must have in order to be applied.
- If CPERRja, CQERRja are smaller than minerr, the corersponding
- distortion is not applied.
- wcskey : str
- A one character A-Z or " " used to retrieve and define an
- alternate WCS description.
- """
-
- self.inst_kw = ins_spec_kw
- self.minerr = minerr
- self.wcskey = wcskey
-
- if fobj is not None:
- filename, hdr0, ehdr, phdu = getinput.parseSingleInput(f=fobj,
- ext=ext)
- self.filename = filename
- instrument_name = hdr0.get('INSTRUME', 'DEFAULT')
- if instrument_name == 'DEFAULT' or instrument_name not in list(inst_mappings.keys()):
- #['IRAF/ARTDATA','',' ','N/A']:
- self.instrument = 'DEFAULT'
- else:
- self.instrument = instrument_name
- # Set the correct reference frame
- refframe = determine_refframe(hdr0)
- ehdr['RADESYS'] = refframe
-
- WCS.__init__(self, ehdr, fobj=phdu, minerr=self.minerr,
- key=self.wcskey)
- if self.instrument == 'DEFAULT':
- self.pc2cd()
- # If input was a `astropy.io.fits.HDUList` object, it's the user's
- # responsibility to close it, otherwise, it's closed here.
- if not isinstance(fobj, fits.HDUList):
- phdu.close()
- self.setInstrSpecKw(hdr0, ehdr)
- self.readIDCCoeffs(ehdr)
- extname = ehdr.get('EXTNAME', '')
- extnum = ehdr.get('EXTVER', None)
- self.extname = (extname, extnum)
- else:
- # create a default HSTWCS object
- self.instrument = 'DEFAULT'
- WCS.__init__(self, minerr=self.minerr, key=self.wcskey)
- self.pc2cd()
- self.setInstrSpecKw()
- self.setPscale()
- self.setOrient()
-
- @property
- def naxis1(self):
- return self._naxis1
-
- @naxis1.setter
- def naxis1(self, value):
- self._naxis1 = value
-
- @property
- def naxis2(self):
- return self._naxis2
-
- @naxis2.setter
- def naxis2(self, value):
- self._naxis2 = value
-
- def readIDCCoeffs(self, header):
- """
- Reads in first order IDCTAB coefficients if present in the header
- """
- coeffs = ['ocx10', 'ocx11', 'ocy10', 'ocy11', 'idcscale',
- 'idcv2ref','idcv3ref', 'idctheta']
- for c in coeffs:
- self.__setattr__(c, header.get(c, None))
-
- def setInstrSpecKw(self, prim_hdr=None, ext_hdr=None):
- """
- Populate the instrument specific attributes:
-
- These can be in different headers but each instrument class has knowledge
- of where to look for them.
-
- Parameters
- ----------
- prim_hdr : `astropy.io.fits.Header`
- primary header
- ext_hdr : `astropy.io.fits.Header`
- extension header
-
- """
- if self.instrument in list(inst_mappings.keys()):
- inst_kl = inst_mappings[self.instrument]
- inst_kl = instruments.__dict__[inst_kl]
- insobj = inst_kl(prim_hdr, ext_hdr)
-
- for key in self.inst_kw:
- try:
- self.__setattr__(key, insobj.__getattribute__(key))
- except AttributeError:
- # Some of the instrument's attributes are recorded in the primary header and
- # were already set, (e.g. 'DETECTOR'), the code below is a check for that case.
- if not self.__getattribute__(key):
- raise
- else:
- pass
-
- else:
- raise KeyError("Unsupported instrument - %s" %self.instrument)
-
- def setPscale(self):
- """
- Calculates the plate scale from the CD matrix
- """
- try:
- cd11 = self.wcs.cd[0][0]
- cd21 = self.wcs.cd[1][0]
- self.pscale = np.sqrt(np.power(cd11,2)+np.power(cd21,2)) * 3600.
- except AttributeError:
- if self.wcs.has_cd():
- print("This file has a PC matrix. You may want to convert it \n \
- to a CD matrix, if reasonable, by running pc2.cd() method.\n \
- The plate scale can be set then by calling setPscale() method.\n")
- self.pscale = None
-
- def setOrient(self):
- """
- Computes ORIENTAT from the CD matrix
- """
- try:
- cd12 = self.wcs.cd[0][1]
- cd22 = self.wcs.cd[1][1]
- self.orientat = np.rad2deg(np.arctan2(cd12,cd22))
- except AttributeError:
- if self.wcs.has_cd():
- print("This file has a PC matrix. You may want to convert it \n \
- to a CD matrix, if reasonable, by running pc2.cd() method.\n \
- The orientation can be set then by calling setOrient() method.\n")
- self.pscale = None
-
- def updatePscale(self, scale):
- """
- Updates the CD matrix with a new plate scale
- """
- self.wcs.cd = self.wcs.cd/self.pscale*scale
- self.setPscale()
-
- def readModel(self, update=False, header=None):
- """
- Reads distortion model from IDCTAB.
-
- If IDCTAB is not found ('N/A', "", or not found on disk), then
- if SIP coefficients and first order IDCTAB coefficients are present
- in the header, restore the idcmodel from the header.
- If not - assign None to self.idcmodel.
-
- Parameters
- ----------
- header : `astropy.io.fits.Header`
- fits extension header
- update : bool (False)
- if True - record the following IDCTAB quantities as header keywords:
- CX10, CX11, CY10, CY11, IDCSCALE, IDCTHETA, IDCXREF, IDCYREF,
- IDCV2REF, IDCV3REF
- """
- if self.idctab in [None, '', ' ','N/A']:
- #Keyword idctab is not present in header - check for sip coefficients
- if header is not None and 'IDCSCALE' in header:
- self._readModelFromHeader(header)
- else:
- print("Distortion model is not available: IDCTAB=None\n")
- self.idcmodel = None
- elif not os.path.exists(fileutil.osfn(self.idctab)):
- if header is not None and 'IDCSCALE' in header:
- self._readModelFromHeader(header)
- else:
- print('Distortion model is not available: IDCTAB file %s not found\n' % self.idctab)
- self.idcmodel = None
- else:
- self.readModelFromIDCTAB(header=header, update=update)
-
- def _readModelFromHeader(self, header):
- # Recreate idc model from SIP coefficients and header kw
- print('Restoring IDC model from SIP coefficients\n')
- model = models.GeometryModel()
- cx, cy = coeff_converter.sip2idc(self)
- model.cx = cx
- model.cy = cy
- model.name = "sip"
- model.norder = header['A_ORDER']
-
- refpix = {}
- refpix['XREF'] = header['IDCXREF']
- refpix['YREF'] = header['IDCYREF']
- refpix['PSCALE'] = header['IDCSCALE']
- refpix['V2REF'] = header['IDCV2REF']
- refpix['V3REF'] = header['IDCV3REF']
- refpix['THETA'] = header['IDCTHETA']
- model.refpix = refpix
-
- self.idcmodel = model
-
-
- def readModelFromIDCTAB(self, header=None, update=False):
- """
- Read distortion model from idc table.
-
- Parameters
- ----------
- header : `astropy.io.fits.Header`
- fits extension header
- update : booln (False)
- if True - save teh following as header keywords:
- CX10, CX11, CY10, CY11, IDCSCALE, IDCTHETA, IDCXREF, IDCYREF,
- IDCV2REF, IDCV3REF
-
- """
- if self.date_obs == None:
- print('date_obs not available\n')
- self.idcmodel = None
- return
- if self.filter1 == None and self.filter2 == None:
- 'No filter information available\n'
- self.idcmodel = None
- return
-
- self.idcmodel = models.IDCModel(self.idctab,
- chip=self.chip, direction='forward', date=self.date_obs,
- filter1=self.filter1, filter2=self.filter2,
- offtab=self.offtab, binned=self.binned)
-
- if self.ltv1 != 0. or self.ltv2 != 0.:
- self.resetLTV()
-
- if update:
- if header==None:
- print('Update header with IDC model kw requested but header was not provided\n.')
- else:
- self._updatehdr(header)
-
- def resetLTV(self):
- """
- Reset LTV values for polarizer data
-
- The polarizer field is smaller than the detector field.
- The distortion coefficients are defined for the entire
- polarizer field and the LTV values are set as with subarray
- data. This may also be true for other special filters.
- This is a special case when the observation is considered
- a subarray in terms of detector field but a full frame in
- terms of distortion model.
- To avoid shifting the distortion coefficients the LTV values
- are reset to 0.
- """
- if self.naxis1 == self.idcmodel.refpix['XSIZE'] and \
- self.naxis2 == self.idcmodel.refpix['YSIZE']:
- self.ltv1 = 0.
- self.ltv2 = 0.
-
- def wcs2header(self, sip2hdr=False, idc2hdr=True, wcskey=None, relax=False):
- """
- Create a `astropy.io.fits.Header` object from WCS keywords.
-
- If the original header had a CD matrix, return a CD matrix,
- otherwise return a PC matrix.
-
- Parameters
- ----------
- sip2hdr : bool
- If True - include SIP coefficients
- """
-
- h = self.to_header(key=wcskey, relax=relax)
- if not wcskey:
- wcskey = self.wcs.alt
- if self.wcs.has_cd():
- h = altwcs.pc2cd(h, key=wcskey)
-
- if 'wcsname' not in h:
- if self.idctab is not None:
- wname = build_default_wcsname(self.idctab)
- else:
- wname = 'DEFAULT'
- h['wcsname{0}'.format(wcskey)] = wname
-
- if idc2hdr:
- for card in self._idc2hdr():
- h[card.keyword + wcskey] = (card.value, card.comment)
- try:
- del h['RESTFRQ']
- del h['RESTWAV']
- except KeyError: pass
-
- if sip2hdr and self.sip:
- for card in self._sip2hdr('a'):
- h[card.keyword] = (card.value, card.comment)
- for card in self._sip2hdr('b'):
- h[card.keyword] = (card.value, card.comment)
-
- try:
- ap = self.sip.ap
- except AssertionError:
- ap = None
- try:
- bp = self.sip.bp
- except AssertionError:
- bp = None
-
- if ap:
- for card in self._sip2hdr('ap'):
- h[card.keyword] = (card.value, card.comment)
- if bp:
- for card in self._sip2hdr('bp'):
- h[card.keyword] = (card.value, card.comment)
- return h
-
- def _sip2hdr(self, k):
- """
- Get a set of SIP coefficients in the form of an array
- and turn them into a `astropy.io.fits.Cardlist`.
- k - one of 'a', 'b', 'ap', 'bp'
- """
-
- cards = [] #fits.CardList()
- korder = self.sip.__getattribute__(k+'_order')
- cards.append(fits.Card(keyword=k.upper()+'_ORDER', value=korder))
- coeffs = self.sip.__getattribute__(k)
- ind = coeffs.nonzero()
- for i in range(len(ind[0])):
- card = fits.Card(keyword=k.upper()+'_'+str(ind[0][i])+'_'+str(ind[1][i]),
- value=coeffs[ind[0][i], ind[1][i]])
- cards.append(card)
- return cards
-
- def _idc2hdr(self):
- # save some of the idc coefficients
- coeffs = ['ocx10', 'ocx11', 'ocy10', 'ocy11', 'idcscale']
- cards = [] #fits.CardList()
- for c in coeffs:
- try:
- val = self.__getattribute__(c)
- except AttributeError:
- continue
- if val:
- cards.append(fits.Card(keyword=c, value=val))
- return cards
-
- def pc2cd(self):
- if not self.wcs.has_pc():
- self.wcs.pc = self.wcs.get_pc()
- self.wcs.cd = self.wcs.pc * self.wcs.cdelt[1]
-
- def all_world2pix(self, *args, **kwargs):
- """
- all_world2pix(*arg, accuracy=1.0e-4, maxiter=20, adaptive=False, \
-detect_divergence=True, quiet=False)
-
- Performs full inverse transformation using iterative solution
- on full forward transformation with complete distortion model.
-
- Parameters
- ----------
- accuracy : float, optional (Default = 1.0e-4)
- Required accuracy of the solution. Iteration terminates when the
- correction to the solution found during the previous iteration
- is smaller (in the sence of the L2 norm) than `accuracy`\ .
-
- maxiter : int, optional (Default = 20)
- Maximum number of iterations allowed to reach the solution.
-
- adaptive : bool, optional (Default = False)
- Specifies whether to adaptively select only points that did not
- converge to a solution whithin the required accuracy for the
- next iteration. Default is recommended for HST as well as most
- other instruments.
-
- .. note::
- The :py:meth:`all_world2pix` uses a vectorized implementation
- of the method of consecutive approximations (see `Notes`
- section below) in which it iterates over *all* input poits
- *regardless* until the required accuracy has been reached for
- *all* input points. In some cases it may be possible that
- *almost all* points have reached the required accuracy but
- there are only a few of input data points left for which
- additional iterations may be needed (this depends mostly on the
- characteristics of the geometric distortions for a given
- instrument). In this situation it may be
- advantageous to set `adaptive` = `True`\ in which
- case :py:meth:`all_world2pix` will continue iterating *only* over
- the points that have not yet converged to the required
- accuracy. However, for the HST's ACS/WFC detector, which has
- the strongest distortions of all HST instruments, testing has
- shown that enabling this option would lead to a about 10-30\%
- penalty in computational time (depending on specifics of the
- image, geometric distortions, and number of input points to be
- converted). Therefore, for HST instruments,
- it is recommended to set `adaptive` = `False`\ . The only
- danger in getting this setting wrong will be a performance
- penalty.
-
- .. note::
- When `detect_divergence` is `True`\ , :py:meth:`all_world2pix` \
- will automatically switch to the adaptive algorithm once
- divergence has been detected.
-
- detect_divergence : bool, optional (Default = True)
- Specifies whether to perform a more detailed analysis of the
- convergence to a solution. Normally :py:meth:`all_world2pix`
- may not achieve the required accuracy
- if either the `tolerance` or `maxiter` arguments are too low.
- However, it may happen that for some geometric distortions
- the conditions of convergence for the the method of consecutive
- approximations used by :py:meth:`all_world2pix` may not be
- satisfied, in which case consecutive approximations to the
- solution will diverge regardless of the `tolerance` or `maxiter`
- settings.
-
- When `detect_divergence` is `False`\ , these divergent points
- will be detected as not having achieved the required accuracy
- (without further details). In addition, if `adaptive` is `False`
- then the algorithm will not know that the solution (for specific
- points) is diverging and will continue iterating and trying to
- "improve" diverging solutions. This may result in NaN or Inf
- values in the return results (in addition to a performance
- penalties). Even when `detect_divergence` is
- `False`\ , :py:meth:`all_world2pix`\ , at the end of the iterative
- process, will identify invalid results (NaN or Inf) as "diverging"
- solutions and will raise :py:class:`NoConvergence` unless
- the `quiet` parameter is set to `True`\ .
-
- When `detect_divergence` is `True`\ , :py:meth:`all_world2pix` will
- detect points for
- which current correction to the coordinates is larger than
- the correction applied during the previous iteration **if** the
- requested accuracy **has not yet been achieved**\ . In this case,
- if `adaptive` is `True`, these points will be excluded from
- further iterations and if `adaptive`
- is `False`\ , :py:meth:`all_world2pix` will automatically
- switch to the adaptive algorithm.
-
- .. note::
- When accuracy has been achieved, small increases in
- current corrections may be possible due to rounding errors
- (when `adaptive` is `False`\ ) and such increases
- will be ignored.
-
- .. note::
- Setting `detect_divergence` to `True` will incurr about 5-10\%
- performance penalty (in our testing on ACS/WFC images).
- Because the benefits of enabling this feature outweigh
- the small performance penalty, it is recommended to set
- `detect_divergence` to `True`\ , unless extensive testing
- of the distortion models for images from specific
- instruments show a good stability of the numerical method
- for a wide range of coordinates (even outside the image
- itself).
-
- .. note::
- Indices of the diverging inverse solutions will be reported
- in the `divergent` attribute of the
- raised :py:class:`NoConvergence` object.
-
- quiet : bool, optional (Default = False)
- Do not throw :py:class:`NoConvergence` exceptions when the method
- does not converge to a solution with the required accuracy
- within a specified number of maximum iterations set by `maxiter`
- parameter. Instead, simply return the found solution.
-
- Raises
- ------
- NoConvergence
- The method does not converge to a
- solution with the required accuracy within a specified number
- of maximum iterations set by the `maxiter` parameter.
-
- Notes
- -----
- Inputs can either be (RA, Dec, origin) or (RADec, origin) where RA
- and Dec are 1-D arrays/lists of coordinates and RADec is an
- array/list of pairs of coordinates.
-
- Using the method of consecutive approximations we iterate starting
- with the initial approximation, which is computed using the
- non-distorion-aware :py:meth:`wcs_world2pix` (or equivalent).
-
- The :py:meth:`all_world2pix` function uses a vectorized implementation
- of the method of consecutive approximations and therefore it is
- highly efficient (>30x) when *all* data points that need to be
- converted from sky coordinates to image coordinates are passed at
- *once*\ . Therefore, it is advisable, whenever possible, to pass
- as input a long array of all points that need to be converted
- to :py:meth:`all_world2pix` instead of calling :py:meth:`all_world2pix`
- for each data point. Also see the note to the `adaptive` parameter.
-
- Examples
- --------
- >>> import stwcs
- >>> from astropy.io import fits
- >>> hdulist = fits.open('j94f05bgq_flt.fits')
- >>> w = stwcs.wcsutil.HSTWCS(hdulist, ext=('sci',1))
- >>> hdulist.close()
-
- >>> ra, dec = w.all_pix2world([1,2,3],[1,1,1],1); print(ra); print(dec)
- [ 5.52645241 5.52649277 5.52653313]
- [-72.05171776 -72.05171295 -72.05170814]
- >>> radec = w.all_pix2world([[1,1],[2,1],[3,1]],1); print(radec)
- [[ 5.52645241 -72.05171776]
- [ 5.52649277 -72.05171295]
- [ 5.52653313 -72.05170814]]
- >>> x, y = w.all_world2pix(ra,dec,1)
- >>> print(x)
- [ 1.00000233 2.00000232 3.00000233]
- >>> print(y)
- [ 0.99999997 0.99999997 0.99999998]
- >>> xy = w.all_world2pix(radec,1)
- >>> print(xy)
- [[ 1.00000233 0.99999997]
- [ 2.00000232 0.99999997]
- [ 3.00000233 0.99999998]]
- >>> xy = w.all_world2pix(radec,1, maxiter=3, accuracy=1.0e-10, \
-quiet=False)
- NoConvergence: 'HSTWCS.all_world2pix' failed to converge to requested \
-accuracy after 3 iterations.
-
- >>>
- Now try to use some diverging data:
- >>> divradec = w.all_pix2world([[1.0,1.0],[10000.0,50000.0],\
-[3.0,1.0]],1); print(divradec)
- [[ 5.52645241 -72.05171776]
- [ 7.15979392 -70.81405561]
- [ 5.52653313 -72.05170814]]
-
- >>> try:
- >>> xy = w.all_world2pix(divradec,1, maxiter=20, accuracy=1.0e-4, \
-adaptive=False, detect_divergence=True, quiet=False)
- >>> except stwcs.wcsutil.hstwcs.NoConvergence as e:
- >>> print("Indices of diverging points: {}".format(e.divergent))
- >>> print("Indices of poorly converging points: {}".format(e.failed2converge))
- >>> print("Best solution: {}".format(e.best_solution))
- >>> print("Achieved accuracy: {}".format(e.accuracy))
- >>> raise e
- Indices of diverging points:
- [1]
- Indices of poorly converging points:
- None
- Best solution:
- [[ 1.00006219e+00 9.99999288e-01]
- [ -1.99440907e+06 1.44308548e+06]
- [ 3.00006257e+00 9.99999316e-01]]
- Achieved accuracy:
- [[ 5.98554253e-05 6.79918148e-07]
- [ 8.59514088e+11 6.61703754e+11]
- [ 6.02334592e-05 6.59713067e-07]]
- Traceback (innermost last):
- File "<console>", line 8, in <module>
- NoConvergence: 'HSTWCS.all_world2pix' failed to converge to the requested accuracy.
- After 5 iterations, the solution is diverging at least for one input point.
-
- >>> try:
- >>> xy = w.all_world2pix(divradec,1, maxiter=20, accuracy=1.0e-4, \
-adaptive=False, detect_divergence=False, quiet=False)
- >>> except stwcs.wcsutil.hstwcs.NoConvergence as e:
- >>> print("Indices of diverging points: {}".format(e.divergent))
- >>> print("Indices of poorly converging points: {}".format(e.failed2converge))
- >>> print("Best solution: {}".format(e.best_solution))
- >>> print("Achieved accuracy: {}".format(e.accuracy))
- >>> raise e
- Indices of diverging points:
- [1]
- Indices of poorly converging points:
- None
- Best solution:
- [[ 1. 1.]
- [ nan nan]
- [ 3. 1.]]
- Achieved accuracy:
- [[ 0. 0.]
- [ nan nan]
- [ 0. 0.]]
- Traceback (innermost last):
- File "<console>", line 8, in <module>
- NoConvergence: 'HSTWCS.all_world2pix' failed to converge to the requested accuracy.
- After 20 iterations, the solution is diverging at least for one input point.
-
- """
- #####################################################################
- ## PROCESS ARGUMENTS: ##
- #####################################################################
- nargs = len(args)
-
- if nargs == 3:
- try:
- ra = np.asarray(args[0], dtype=np.float64)
- dec = np.asarray(args[1], dtype=np.float64)
- #assert( len(ra.shape) == 1 and len(dec.shape) == 1 )
- origin = int(args[2])
- vect1D = True
- except:
- raise TypeError("When providing three arguments, they must " \
- "be (Ra, Dec, origin) where Ra and Dec are " \
- "Nx1 vectors.")
- elif nargs == 2:
- try:
- rd = np.asarray(args[0], dtype=np.float64)
- #assert( rd.shape[1] == 2 )
- ra = rd[:,0]
- dec = rd[:,1]
- origin = int(args[1])
- vect1D = False
- except:
- raise TypeError("When providing two arguments, they must " \
- "be (RaDec, origin) where RaDec is a Nx2 array.")
- else:
- raise TypeError("Expected 2 or 3 arguments, {:d} given." \
- .format(nargs))
-
- # process optional arguments:
- accuracy = kwargs.pop('accuracy', 1.0e-4)
- maxiter = kwargs.pop('maxiter', 20)
- adaptive = kwargs.pop('adaptive', False)
- detect_divergence = kwargs.pop('detect_divergence', True)
- quiet = kwargs.pop('quiet', False)
-
- #####################################################################
- ## INITIALIZE ITERATIVE PROCESS: ##
- #####################################################################
- x0, y0 = self.wcs_world2pix(ra, dec, origin) # <-- initial approximation
- # (WCS based only)
-
- # see if an iterative solution is required (when any of the
- # non-CD-matrix corrections are present). If not required
- # return initial approximation (x0, y0).
- if self.sip is None and \
- self.cpdis1 is None and self.cpdis2 is None and \
- self.det2im1 is None and self.det2im2 is None:
- # no non-WCS corrections are detected - return
- # initial approximation
- if vect1D:
- return [x0, y0]
- else:
- return np.dstack([x0,y0])[0]
-
- x = x0.copy() # 0-order solution
- y = y0.copy() # 0-order solution
-
- # initial correction:
- dx, dy = self.pix2foc(x, y, origin)
- # If pix2foc does not apply all the required distortion
- # corrections then replace the above line with:
- #r0, d0 = self.all_pix2world(x, y, origin)
- #dx, dy = self.wcs_world2pix(r0, d0, origin )
- dx -= x0
- dy -= y0
-
- # update initial solution:
- x -= dx
- y -= dy
-
- # norn (L2) squared of the correction:
- dn2prev = dx**2+dy**2
- dn2 = dn2prev
-
- # prepare for iterative process
- iterlist = list(range(1, maxiter+1))
- accuracy2 = accuracy**2
- ind = None
- inddiv = None
-
- npts = x.shape[0]
-
- # turn off numpy runtime warnings for 'invalid' and 'over':
- old_invalid = np.geterr()['invalid']
- old_over = np.geterr()['over']
- np.seterr(invalid = 'ignore', over = 'ignore')
-
- #####################################################################
- ## NON-ADAPTIVE ITERATIONS: ##
- #####################################################################
- if not adaptive:
- for k in iterlist:
- # check convergence:
- if np.max(dn2) < accuracy2:
- break
-
- # find correction to the previous solution:
- dx, dy = self.pix2foc(x, y, origin)
- # If pix2foc does not apply all the required distortion
- # corrections then replace the above line with:
- #r0, d0 = self.all_pix2world(x, y, origin)
- #dx, dy = self.wcs_world2pix(r0, d0, origin )
- dx -= x0
- dy -= y0
-
- # update norn (L2) squared of the correction:
- dn2 = dx**2+dy**2
-
- # check for divergence (we do this in two stages
- # to optimize performance for the most common
- # scenario when succesive approximations converge):
- if detect_divergence:
- ind, = np.where(dn2 <= dn2prev)
- if ind.shape[0] < npts:
- inddiv, = np.where(
- np.logical_and(dn2 > dn2prev, dn2 >= accuracy2))
- if inddiv.shape[0] > 0:
- # apply correction only to the converging points:
- x[ind] -= dx[ind]
- y[ind] -= dy[ind]
- # switch to adaptive iterations:
- ind, = np.where((dn2 >= accuracy2) & \
- (dn2 <= dn2prev) & np.isfinite(dn2))
- iterlist = iterlist[k:]
- adaptive = True
- break
- #dn2prev[ind] = dn2[ind]
- dn2prev = dn2
-
- # apply correction:
- x -= dx
- y -= dy
-
- #####################################################################
- ## ADAPTIVE ITERATIONS: ##
- #####################################################################
- if adaptive:
- if ind is None:
- ind = np.asarray(list(range(npts)), dtype=np.int64)
-
- for k in iterlist:
- # check convergence:
- if ind.shape[0] == 0:
- break
-
- # find correction to the previous solution:
- dx[ind], dy[ind] = self.pix2foc(x[ind], y[ind], origin)
- # If pix2foc does not apply all the required distortion
- # corrections then replace the above line with:
- #r0[ind], d0[ind] = self.all_pix2world(x[ind], y[ind], origin)
- #dx[ind], dy[ind] = self.wcs_world2pix(r0[ind], d0[ind], origin)
- dx[ind] -= x0[ind]
- dy[ind] -= y0[ind]
-
- # update norn (L2) squared of the correction:
- dn2 = dx**2+dy**2
-
- # update indices of elements that still need correction:
- if detect_divergence:
- ind, = np.where((dn2 >= accuracy2) & (dn2 <= dn2prev))
- #ind = ind[np.where((dn2[ind] >= accuracy2) & (dn2[ind] <= dn2prev))]
- dn2prev[ind] = dn2[ind]
- else:
- ind, = np.where(dn2 >= accuracy2)
- #ind = ind[np.where(dn2[ind] >= accuracy2)]
-
- # apply correction:
- x[ind] -= dx[ind]
- y[ind] -= dy[ind]
-
- #####################################################################
- ## FINAL DETECTION OF INVALID, DIVERGING, ##
- ## AND FAILED-TO-CONVERGE POINTS ##
- #####################################################################
- # Identify diverging and/or invalid points:
- invalid = (((~np.isfinite(y)) | (~np.isfinite(x)) | \
- (~np.isfinite(dn2))) & \
- (np.isfinite(ra)) & (np.isfinite(dec)))
- # When detect_divergence==False, dn2prev is outdated (it is the
- # norm^2 of the very first correction). Still better than nothing...
- inddiv, = np.where(((dn2 >= accuracy2) & (dn2 > dn2prev)) | invalid)
- if inddiv.shape[0] == 0:
- inddiv = None
- # identify points that did not converge within
- # 'maxiter' iterations:
- if k >= maxiter:
- ind,= np.where((dn2 >= accuracy2) & (dn2 <= dn2prev) & (~invalid))
- if ind.shape[0] == 0:
- ind = None
- else:
- ind = None
-
- #####################################################################
- ## RAISE EXCEPTION IF DIVERGING OR TOO SLOWLY CONVERGING ##
- ## DATA POINTS HAVE BEEN DETECTED: ##
- #####################################################################
- # raise exception if diverging or too slowly converging
- if (ind is not None or inddiv is not None) and not quiet:
- if vect1D:
- sol = [x, y]
- err = [np.abs(dx), np.abs(dy)]
- else:
- sol = np.dstack( [x, y] )[0]
- err = np.dstack( [np.abs(dx), np.abs(dy)] )[0]
-
- # restore previous numpy error settings:
- np.seterr(invalid = old_invalid, over = old_over)
-
- if inddiv is None:
- raise NoConvergence("'HSTWCS.all_world2pix' failed to " \
- "converge to the requested accuracy after {:d} " \
- "iterations.".format(k), best_solution = sol, \
- accuracy = err, niter = k, failed2converge = ind, \
- divergent = None)
- else:
- raise NoConvergence("'HSTWCS.all_world2pix' failed to " \
- "converge to the requested accuracy.{0:s}" \
- "After {1:d} iterations, the solution is diverging " \
- "at least for one input point." \
- .format(os.linesep, k), best_solution = sol, \
- accuracy = err, niter = k, failed2converge = ind, \
- divergent = inddiv)
-
- #####################################################################
- ## FINALIZE AND FORMAT DATA FOR RETURN: ##
- #####################################################################
- # restore previous numpy error settings:
- np.seterr(invalid = old_invalid, over = old_over)
-
- if vect1D:
- return [x, y]
- else:
- return np.dstack( [x, y] )[0]
-
- def _updatehdr(self, ext_hdr):
- #kw2add : OCX10, OCX11, OCY10, OCY11
- # record the model in the header for use by pydrizzle
- ext_hdr['OCX10'] = self.idcmodel.cx[1,0]
- ext_hdr['OCX11'] = self.idcmodel.cx[1,1]
- ext_hdr['OCY10'] = self.idcmodel.cy[1,0]
- ext_hdr['OCY11'] = self.idcmodel.cy[1,1]
- ext_hdr['IDCSCALE'] = self.idcmodel.refpix['PSCALE']
- ext_hdr['IDCTHETA'] = self.idcmodel.refpix['THETA']
- ext_hdr['IDCXREF'] = self.idcmodel.refpix['XREF']
- ext_hdr['IDCYREF'] = self.idcmodel.refpix['YREF']
- ext_hdr['IDCV2REF'] = self.idcmodel.refpix['V2REF']
- ext_hdr['IDCV3REF'] = self.idcmodel.refpix['V3REF']
-
- def printwcs(self):
- """
- Print the basic WCS keywords.
- """
- print('WCS Keywords\n')
- print('CD_11 CD_12: %r %r' % (self.wcs.cd[0,0], self.wcs.cd[0,1]))
- print('CD_21 CD_22: %r %r' % (self.wcs.cd[1,0], self.wcs.cd[1,1]))
- print('CRVAL : %r %r' % (self.wcs.crval[0], self.wcs.crval[1]))
- print('CRPIX : %r %r' % (self.wcs.crpix[0], self.wcs.crpix[1]))
- print('NAXIS : %d %d' % (self.naxis1, self.naxis2))
- print('Plate Scale : %r' % self.pscale)
- print('ORIENTAT : %r' % self.orientat)
-
-
-def determine_refframe(phdr):
- """
- Determine the reference frame in standard FITS WCS terms.
-
- Parameters
- ----------
- phdr : `astropy.io.fits.Header`
- Primary Header of an HST observation
-
- In HST images the reference frame is recorded in the primary extension as REFFRAME.
- Values are "GSC1" which means FK5 or ICRS (for GSC2 observations).
- """
- try:
- refframe = phdr['REFFRAME']
- except KeyError:
- refframe = " "
- if refframe == "GSC1":
- refframe = "FK5"
- return refframe
diff --git a/lib/stwcs/wcsutil/instruments.py b/lib/stwcs/wcsutil/instruments.py
deleted file mode 100644
index f662513..0000000
--- a/lib/stwcs/wcsutil/instruments.py
+++ /dev/null
@@ -1,320 +0,0 @@
-from __future__ import absolute_import, division, print_function # confidence high
-
-from .mappings import ins_spec_kw
-
-class InstrWCS(object):
- """
- A base class for instrument specific keyword definition.
- It prvides a default implementation (modeled by ACS) for
- all set_kw methods.
- """
- def __init__(self, hdr0=None, hdr=None):
- self.exthdr = hdr
- self.primhdr = hdr0
- self.set_ins_spec_kw()
-
- def set_ins_spec_kw(self):
- """
- This method MUST call all set_kw methods.
- There should be a set_kw method for all kw listed in
- mappings.ins_spec_kw. TypeError handles the case when
- fobj='DEFAULT'.
- """
- self.set_idctab()
- self.set_offtab()
- self.set_date_obs()
- self.set_ra_targ()
- self.set_dec_targ()
- self.set_pav3()
- self.set_detector()
- self.set_filter1()
- self.set_filter2()
- self.set_vafactor()
- self.set_naxis1()
- self.set_naxis2()
- self.set_ltv1()
- self.set_ltv2()
- self.set_binned()
- self.set_chip()
- self.set_parity()
-
- def set_idctab(self):
- try:
- self.idctab = self.primhdr['IDCTAB']
- except (KeyError, TypeError):
- self.idctab = None
-
- def set_offtab(self):
- try:
- self.offtab = self.primhdr['OFFTAB']
- except (KeyError, TypeError):
- self.offtab = None
-
- def set_date_obs(self):
- try:
- self.date_obs = self.primhdr['DATE-OBS']
- except (KeyError, TypeError):
- self.date_obs = None
-
- def set_ra_targ(self):
- try:
- self.ra_targ = self.primhdr['RA-TARG']
- except (KeyError, TypeError):
- self.ra_targ = None
-
- def set_dec_targ(self):
- try:
- self.dec_targ = self.primhdr['DEC-TARG']
- except (KeyError, TypeError):
- self.dec_targ = None
-
- def set_pav3(self):
- try:
- self.pav3 = self.primhdr['PA_V3']
- except (KeyError, TypeError):
- self.pav3 = None
-
- def set_filter1(self):
- try:
- self.filter1 = self.primhdr['FILTER1']
- except (KeyError, TypeError):
- self.filter1 = None
-
- def set_filter2(self):
- try:
- self.filter2 = self.primhdr['FILTER2']
- except (KeyError, TypeError):
- self.filter2 = None
-
- def set_vafactor(self):
- try:
- self.vafactor = self.exthdr['VAFACTOR']
- except (KeyError, TypeError):
- self.vafactor = 1
-
- def set_naxis1(self):
- try:
- self.naxis1 = self.exthdr['naxis1']
- except (KeyError, TypeError):
- try:
- self.naxis1 = self.exthdr['npix1']
- except (KeyError, TypeError):
- self.naxis1 = None
-
- def set_naxis2(self):
- try:
- self.naxis2 = self.exthdr['naxis2']
- except (KeyError, TypeError):
- try:
- self.naxis2 = self.exthdr['npix2']
- except (KeyError, TypeError):
- self.naxis2 = None
-
- def set_ltv1(self):
- try:
- self.ltv1 = self.exthdr['LTV1']
- except (KeyError, TypeError):
- self.ltv1 = 0.0
-
- def set_ltv2(self):
- try:
- self.ltv2 = self.exthdr['LTV2']
- except (KeyError, TypeError):
- self.ltv2 = 0.0
-
- def set_binned(self):
- try:
- self.binned = self.exthdr['BINAXIS1']
- except (KeyError, TypeError):
- self.binned = 1
-
- def set_chip(self):
- try:
- self.chip = self.exthdr['CCDCHIP']
- except (KeyError, TypeError):
- self.chip = 1
-
- def set_parity(self):
- self.parity = [[1.0,0.0],[0.0,-1.0]]
-
- def set_detector(self):
- # each instrument has a different kw for detector and it can be
- # in a different header, so this is to be handled by the instrument classes
- self.detector = 'DEFAULT'
-
-class ACSWCS(InstrWCS):
- """
- get instrument specific kw
- """
-
- def __init__(self, hdr0, hdr):
- self.primhdr = hdr0
- self.exthdr = hdr
- InstrWCS.__init__(self,hdr0, hdr)
- self.set_ins_spec_kw()
-
- def set_detector(self):
- try:
- self.detector = self.primhdr['DETECTOR']
- except KeyError:
- print('ERROR: Detector kw not found.\n')
- raise
-
- def set_parity(self):
- parity = {'WFC':[[1.0,0.0],[0.0,-1.0]],
- 'HRC':[[-1.0,0.0],[0.0,1.0]],
- 'SBC':[[-1.0,0.0],[0.0,1.0]]}
-
- if self.detector not in list(parity.keys()):
- parity = InstrWCS.set_parity(self)
- else:
- self.parity = parity[self.detector]
-
-
-class WFPC2WCS(InstrWCS):
-
-
- def __init__(self, hdr0, hdr):
- self.primhdr = hdr0
- self.exthdr = hdr
- InstrWCS.__init__(self,hdr0, hdr)
- self.set_ins_spec_kw()
-
- def set_filter1(self):
- self.filter1 = self.primhdr.get('FILTNAM1', None)
- if self.filter1 == " " or self.filter1 == None:
- self.filter1 = 'CLEAR1'
-
- def set_filter2(self):
- self.filter2 = self.primhdr.get('FILTNAM2', None)
- if self.filter2 == " " or self.filter2 == None:
- self.filter2 = 'CLEAR2'
-
-
- def set_binned(self):
- mode = self.primhdr.get('MODE', 1)
- if mode == 'FULL':
- self.binned = 1
- elif mode == 'AREA':
- self.binned = 2
-
- def set_chip(self):
- self.chip = self.exthdr.get('DETECTOR', 1)
-
- def set_parity(self):
- self.parity = [[-1.0,0.],[0.,1.0]]
-
- def set_detector(self):
- try:
- self.detector = self.exthdr['DETECTOR']
- except KeyError:
- print('ERROR: Detector kw not found.\n')
- raise
-
-
-class WFC3WCS(InstrWCS):
- """
- Create a WFC3 detector specific class
- """
-
- def __init__(self, hdr0, hdr):
- self.primhdr = hdr0
- self.exthdr = hdr
- InstrWCS.__init__(self,hdr0, hdr)
- self.set_ins_spec_kw()
-
- def set_detector(self):
- try:
- self.detector = self.primhdr['DETECTOR']
- except KeyError:
- print('ERROR: Detector kw not found.\n')
- raise
-
- def set_filter1(self):
- self.filter1 = self.primhdr.get('FILTER', None)
- if self.filter1 == " " or self.filter1 == None:
- self.filter1 = 'CLEAR'
-
- def set_filter2(self):
- #Nicmos idc tables do not allow 2 filters.
- self.filter2 = 'CLEAR'
-
- def set_parity(self):
- parity = {'UVIS':[[-1.0,0.0],[0.0,1.0]],
- 'IR':[[-1.0,0.0],[0.0,1.0]]}
-
- if self.detector not in list(parity.keys()):
- parity = InstrWCS.set_parity(self)
- else:
- self.parity = parity[self.detector]
-
-class NICMOSWCS(InstrWCS):
- """
- Create a NICMOS specific class
- """
-
- def __init__(self, hdr0, hdr):
- self.primhdr = hdr0
- self.exthdr = hdr
- InstrWCS.__init__(self,hdr0, hdr)
- self.set_ins_spec_kw()
-
- def set_parity(self):
- self.parity = [[-1.0,0.],[0.,1.0]]
-
- def set_filter1(self):
- self.filter1 = self.primhdr.get('FILTER', None)
- if self.filter1 == " " or self.filter1 == None:
- self.filter1 = 'CLEAR'
-
- def set_filter2(self):
- #Nicmos idc tables do not allow 2 filters.
- self.filter2 = 'CLEAR'
-
- def set_chip(self):
- self.chip = self.detector
-
- def set_detector(self):
- try:
- self.detector = self.primhdr['CAMERA']
- except KeyError:
- print('ERROR: Detector kw not found.\n')
- raise
-
-class STISWCS(InstrWCS):
- """
- A STIS specific class
- """
-
- def __init__(self, hdr0, hdr):
- self.primhdr = hdr0
- self.exthdr = hdr
- InstrWCS.__init__(self,hdr0, hdr)
- self.set_ins_spec_kw()
-
- def set_parity(self):
- self.parity = [[-1.0,0.],[0.,1.0]]
-
- def set_filter1(self):
- self.filter1 = self.exthdr.get('OPT_ELEM', None)
- if self.filter1 == " " or self.filter1 == None:
- self.filter1 = 'CLEAR1'
-
- def set_filter2(self):
- self.filter2 = self.exthdr.get('FILTER', None)
- if self.filter2 == " " or self.filter2 == None:
- self.filter2 = 'CLEAR2'
-
- def set_detector(self):
- try:
- self.detector = self.primhdr['DETECTOR']
- except KeyError:
- print('ERROR: Detector kw not found.\n')
- raise
-
- def set_date_obs(self):
- try:
- self.date_obs = self.exthdr['DATE-OBS']
- except (KeyError, TypeError):
- self.date_obs = None
-
diff --git a/lib/stwcs/wcsutil/mappings.py b/lib/stwcs/wcsutil/mappings.py
deleted file mode 100644
index 24038bf..0000000
--- a/lib/stwcs/wcsutil/mappings.py
+++ /dev/null
@@ -1,29 +0,0 @@
-from __future__ import division # confidence high
-
-# This dictionary maps an instrument into an instrument class
-# The instrument class handles instrument specific keywords
-
-inst_mappings={'WFPC2': 'WFPC2WCS',
- 'ACS': 'ACSWCS',
- 'NICMOS': 'NICMOSWCS',
- 'STIS': 'STISWCS',
- 'WFC3': 'WFC3WCS',
- 'DEFAULT': 'InstrWCS'
- }
-
-
-# A list of instrument specific keywords
-# Every instrument class must have methods which define each of these
-# as class attributes.
-ins_spec_kw = [ 'idctab', 'offtab', 'date_obs', 'ra_targ', 'dec_targ', 'pav3', \
- 'detector', 'ltv1', 'ltv2', 'parity', 'binned','vafactor', \
- 'chip', 'naxis1', 'naxis2', 'filter1', 'filter2']
-
-# A list of keywords defined in the primary header.
-# The HSTWCS class sets this as attributes
-prim_hdr_kw = ['detector', 'offtab', 'idctab', 'date-obs',
- 'pa_v3', 'ra_targ', 'dec_targ']
-
-# These are the keywords which are archived before MakeWCS is run
-basic_wcs = ['CD1_', 'CD2_', 'CRVAL', 'CTYPE', 'CRPIX', 'CTYPE', 'CDELT', 'CUNIT']
-
diff --git a/lib/stwcs/wcsutil/mosaic.py b/lib/stwcs/wcsutil/mosaic.py
deleted file mode 100644
index 9d2d0a3..0000000
--- a/lib/stwcs/wcsutil/mosaic.py
+++ /dev/null
@@ -1,183 +0,0 @@
-from __future__ import division, print_function
-import numpy as np
-from matplotlib import pyplot as plt
-from astropy.io import fits
-import string
-
-from stsci.tools import parseinput, irafglob
-from stwcs.distortion import utils
-from stwcs import updatewcs, wcsutil
-from stwcs.wcsutil import altwcs
-
-def vmosaic(fnames, outwcs=None, ref_wcs=None, ext=None, extname=None, undistort=True, wkey='V', wname='VirtualMosaic', plot=False, clobber=False):
- """
- Create a virtual mosaic using the WCS of the input images.
-
- Parameters
- ----------
- fnames: a string or a list
- a string or a list of filenames, or a list of wcsutil.HSTWCS objects
- outwcs: an HSTWCS object
- if given, represents the output tangent plane
- if None, the output WCS is calculated from the input observations.
- ref_wcs: an HSTWCS object
- if output wcs is not given, this will be used as a reference for the
- calculation of the output WCS. If ref_wcs is None and outwcs is None,
- then the first observation in th einput list is used as reference.
- ext: an int, a tuple or a list
- an int - represents a FITS extension, e.g. 0 is the primary HDU
- a tuple - uses the notation (extname, extver), e.g. ('sci',1)
- Can be a list of integers or tuples representing FITS extensions
- extname: string
- the value of the EXTNAME keyword for the extensions to be used in the mosaic
- undistort: boolean (default: True)
- undistort (or not) the output WCS
- wkey: string
- default: 'V'
- one character A-Z to be used to record the virtual mosaic WCS as
- an alternate WCS in the headers of the input files.
- wname: string
- default: 'VirtualMosaic
- a string to be used as a WCSNAME value for the alternate WCS representign
- the virtual mosaic
- plot: boolean
- if True and matplotlib is installed will make a plot of the tangent plane
- and the location of the input observations.
- clobber: boolean
- This covers the case when an alternate WCS with the requested key
- already exists in the header of the input files.
- if clobber is True, it will be overwritten
- if False, it will compute the new one but will not write it to the headers.
-
- Notes
- -----
- The algorithm is:
- 1. If output WCS is not given it is calculated from the input WCSes.
- The first image is used as a reference, if no reference is given.
- This represents the virtual mosaic WCS.
- 2. For each input observation/chip, an HSTWCS object is created
- and its footprint on the sky is calculated (using only the four corners).
- 3. For each input observation the footprint is projected on the output
- tangent plane and the virtual WCS is recorded in the header.
- """
- wcsobjects = readWCS(fnames, ext, extname)
- if outwcs != None:
- outwcs = outwcs.deepcopy()
- else:
- if ref_wcs != None:
- outwcs = utils.output_wcs(wcsobjects, ref_wcs=ref_wcs, undistort=undistort)
- else:
- outwcs = utils.output_wcs(wcsobjects, undistort=undistort)
- if plot:
- outc=np.array([[0.,0], [outwcs._naxis1, 0],
- [outwcs._naxis1, outwcs._naxis2],
- [0, outwcs._naxis2], [0, 0]])
- plt.plot(outc[:,0], outc[:,1])
- for wobj in wcsobjects:
- outcorners = outwcs.wcs_world2pix(wobj.calc_footprint(),1)
- if plot:
- plt.plot(outcorners[:,0], outcorners[:,1])
- objwcs = outwcs.deepcopy()
- objwcs.wcs.crpix = objwcs.wcs.crpix - (outcorners[0])
- updatehdr(wobj.filename, objwcs,wkey=wkey, wcsname=wname, ext=wobj.extname, clobber=clobber)
- return outwcs
-
-def updatehdr(fname, wcsobj, wkey, wcsname, ext=1, clobber=False):
- hdr = fits.getheader(fname, ext=ext)
- all_keys = list(string.ascii_uppercase)
- if wkey.upper() not in all_keys:
- raise KeyError("wkey must be one character: A-Z")
- if wkey not in altwcs.available_wcskeys(hdr):
- if not clobber:
- raise ValueError("wkey %s is already in use. Use clobber=True to overwrite it or specify a different key." %wkey)
- else:
- altwcs.deleteWCS(fname, ext=ext, wcskey='V')
- f = fits.open(fname, mode='update')
-
- hwcs = wcs2header(wcsobj)
- wcsnamekey = 'WCSNAME' + wkey
- f[ext].header[wcsnamekey] = wcsname
- for k in hwcs:
- f[ext].header[k[:7]+wkey] = hwcs[k]
-
- f.close()
-
-def wcs2header(wcsobj):
-
- h = wcsobj.to_header()
-
- if wcsobj.wcs.has_cd():
- altwcs.pc2cd(h)
- h['CTYPE1'] = 'RA---TAN'
- h['CTYPE2'] = 'DEC--TAN'
- norient = np.rad2deg(np.arctan2(h['CD1_2'],h['CD2_2']))
- #okey = 'ORIENT%s' % wkey
- okey = 'ORIENT'
- h[okey] = norient
- return h
-
-def readWCS(input, exts=None, extname=None):
- if isinstance(input, str):
- if input[0] == '@':
- # input is an @ file
- filelist = irafglob.irafglob(input)
- else:
- try:
- filelist, output = parseinput.parseinput(input)
- except IOError: raise
- elif isinstance(input, list):
- if isinstance(input[0], wcsutil.HSTWCS):
- # a list of HSTWCS objects
- return input
- else:
- filelist = input[:]
- wcso = []
- fomited = []
- # figure out which FITS extension(s) to use
- if exts == None and extname == None:
- #Assume it's simple FITS and the data is in the primary HDU
- for f in filelist:
- try:
- wcso = wcsutil.HSTWCS(f)
- except AttributeError:
- fomited.append(f)
- continue
- elif exts != None and validateExt(exts):
- exts = [exts]
- for f in filelist:
- try:
- wcso.extend([wcsutil.HSTWCS(f, ext=e) for e in exts])
- except KeyError:
- fomited.append(f)
- continue
- elif extname != None:
- for f in filelist:
- fobj = fits.open(f)
- for i in range(len(fobj)):
- try:
- ename = fobj[i].header['EXTNAME']
- except KeyError:
- continue
- if ename.lower() == extname.lower():
- wcso.append(wcsutil.HSTWCS(f,ext=i))
- else:
- continue
- fobj.close()
- if fomited != []:
- print("These files were skipped:")
- for f in fomited:
- print(f)
- return wcso
-
-
-def validateExt(ext):
- if not isinstance(ext, int) and not isinstance(ext, tuple) \
- and not isinstance(ext, list):
- print("Ext must be integer, tuple, a list of int extension numbers, \
- or a list of tuples representing a fits extension, for example ('sci', 1).")
- return False
- else:
- return True
-
-
-
diff --git a/lib/stwcs/wcsutil/wcscorr.py b/lib/stwcs/wcsutil/wcscorr.py
deleted file mode 100644
index 3f9b7d5..0000000
--- a/lib/stwcs/wcsutil/wcscorr.py
+++ /dev/null
@@ -1,668 +0,0 @@
-from __future__ import absolute_import, division, print_function
-
-import os,copy
-import numpy as np
-from astropy.io import fits
-
-import stwcs
-from stwcs.wcsutil import altwcs
-from stwcs.updatewcs import utils
-from stsci.tools import fileutil
-from . import convertwcs
-
-DEFAULT_WCS_KEYS = ['CRVAL1','CRVAL2','CRPIX1','CRPIX2',
- 'CD1_1','CD1_2','CD2_1','CD2_2',
- 'CTYPE1','CTYPE2','ORIENTAT']
-DEFAULT_PRI_KEYS = ['HDRNAME','SIPNAME','NPOLNAME','D2IMNAME','DESCRIP']
-COL_FITSKW_DICT = {'RMS_RA':'sci.crder1','RMS_DEC':'sci.crder2',
- 'NMatch':'sci.nmatch','Catalog':'sci.catalog'}
-
-###
-### WCSEXT table related keyword archive functions
-###
-def init_wcscorr(input, force=False):
- """
- This function will initialize the WCSCORR table if it is not already present,
- and look for WCS keywords with a prefix of 'O' as the original OPUS
- generated WCS as the initial row for the table or use the current WCS
- keywords as initial row if no 'O' prefix keywords are found.
-
- This function will NOT overwrite any rows already present.
-
- This function works on all SCI extensions at one time.
- """
- # TODO: Create some sort of decorator or (for Python2.5) context for
- # opening a FITS file and closing it when done, if necessary
- if not isinstance(input, fits.HDUList):
- # input must be a filename, so open as `astropy.io.fits.HDUList` object
- fimg = fits.open(input, mode='update')
- need_to_close = True
- else:
- fimg = input
- need_to_close = False
-
- # Do not try to generate a WCSCORR table for a simple FITS file
- numsci = fileutil.countExtn(fimg)
- if len(fimg) == 1 or numsci == 0:
- return
-
- enames = []
- for e in fimg: enames.append(e.name)
- if 'WCSCORR' in enames:
- if not force:
- return
- else:
- del fimg['wcscorr']
- print('Initializing new WCSCORR table for ',fimg.filename())
-
- used_wcskeys = altwcs.wcskeys(fimg['SCI', 1].header)
-
- # define the primary columns of the WCSEXT table with initial rows for each
- # SCI extension for the original OPUS solution
- numwcs = len(used_wcskeys)
- if numwcs == 0: numwcs = 1
-
- # create new table with more rows than needed initially to make it easier to
- # add new rows later
- wcsext = create_wcscorr(descrip=True,numrows=numsci, padding=(numsci*numwcs) + numsci * 4)
- # Assign the correct EXTNAME value to this table extension
- wcsext.header['TROWS'] = (numsci * 2, 'Number of updated rows in table')
- wcsext.header['EXTNAME'] = ('WCSCORR', 'Table with WCS Update history')
- wcsext.header['EXTVER'] = 1
-
- # define set of WCS keywords which need to be managed and copied to the table
- wcs1 = stwcs.wcsutil.HSTWCS(fimg,ext=('SCI',1))
- idc2header = True
- if wcs1.idcscale is None:
- idc2header = False
- wcs_keywords = list(wcs1.wcs2header(idc2hdr=idc2header).keys())
-
- prihdr = fimg[0].header
- prihdr_keys = DEFAULT_PRI_KEYS
- pri_funcs = {'SIPNAME':stwcs.updatewcs.utils.build_sipname,
- 'NPOLNAME':stwcs.updatewcs.utils.build_npolname,
- 'D2IMNAME':stwcs.updatewcs.utils.build_d2imname}
-
- # Now copy original OPUS values into table
- for extver in range(1, numsci + 1):
- rowind = find_wcscorr_row(wcsext.data,
- {'WCS_ID': 'OPUS', 'EXTVER': extver,
- 'WCS_key':'O'})
- # There should only EVER be a single row for each extension with OPUS values
- rownum = np.where(rowind)[0][0]
- #print 'Archiving OPUS WCS in row number ',rownum,' in WCSCORR table for SCI,',extver
-
- hdr = fimg['SCI', extver].header
- # define set of WCS keywords which need to be managed and copied to the table
- if used_wcskeys is None:
- used_wcskeys = altwcs.wcskeys(hdr)
- # Check to see whether or not there is an OPUS alternate WCS present,
- # if so, get its values directly, otherwise, archive the PRIMARY WCS
- # as the OPUS values in the WCSCORR table
- if 'O' not in used_wcskeys:
- altwcs.archiveWCS(fimg,('SCI',extver),wcskey='O', wcsname='OPUS')
- wkey = 'O'
-
- wcs = stwcs.wcsutil.HSTWCS(fimg, ext=('SCI', extver), wcskey=wkey)
- wcshdr = wcs.wcs2header(idc2hdr=idc2header)
-
- if wcsext.data.field('CRVAL1')[rownum] != 0:
- # If we find values for these keywords already in the table, do not
- # overwrite them again
- print('WCS keywords already updated...')
- break
- for key in wcs_keywords:
- if key in wcsext.data.names:
- wcsext.data.field(key)[rownum] = wcshdr[(key+wkey)[:8]]
- # Now get any keywords from PRIMARY header needed for WCS updates
- for key in prihdr_keys:
- if key in prihdr:
- val = prihdr[key]
- else:
- val = ''
- wcsext.data.field(key)[rownum] = val
-
- # Now that we have archived the OPUS alternate WCS, remove it from the list
- # of used_wcskeys
- if 'O' in used_wcskeys:
- used_wcskeys.remove('O')
-
- # Now copy remaining alternate WCSs into table
- # TODO: Much of this appears to be redundant with update_wcscorr; consider
- # merging them...
- for uwkey in used_wcskeys:
- for extver in range(1, numsci + 1):
- hdr = fimg['SCI', extver].header
- wcs = stwcs.wcsutil.HSTWCS(fimg, ext=('SCI', extver),
- wcskey=uwkey)
- wcshdr = wcs.wcs2header()
- if 'WCSNAME' + uwkey not in wcshdr:
- wcsid = utils.build_default_wcsname(fimg[0].header['idctab'])
- else:
- wcsid = wcshdr['WCSNAME' + uwkey]
-
- # identify next empty row
- rowind = find_wcscorr_row(wcsext.data,
- selections={'wcs_id':['','0.0']})
- rows = np.where(rowind)
- if len(rows[0]) > 0:
- rownum = np.where(rowind)[0][0]
- else:
- print('No available rows found for updating. ')
-
- # Update selection columns for this row with relevant values
- wcsext.data.field('WCS_ID')[rownum] = wcsid
- wcsext.data.field('EXTVER')[rownum] = extver
- wcsext.data.field('WCS_key')[rownum] = uwkey
-
- # Look for standard WCS keyword values
- for key in wcs_keywords:
- if key in wcsext.data.names:
- wcsext.data.field(key)[rownum] = wcshdr[key + uwkey]
- # Now get any keywords from PRIMARY header needed for WCS updates
- for key in prihdr_keys:
- if key in pri_funcs:
- val = pri_funcs[key](fimg)[0]
- else:
- if key in prihdr:
- val = prihdr[key]
- else:
- val = ''
- wcsext.data.field(key)[rownum] = val
-
- # Append this table to the image FITS file
- fimg.append(wcsext)
- # force an update now
- # set the verify flag to 'warn' so that it will always succeed, but still
- # tell the user if PyFITS detects any problems with the file as a whole
- utils.updateNEXTENDKw(fimg)
-
- fimg.flush('warn')
-
- if need_to_close:
- fimg.close()
-
-
-def find_wcscorr_row(wcstab, selections):
- """
- Return an array of indices from the table (NOT HDU) 'wcstab' that matches the
- selections specified by the user.
-
- The row selection criteria must be specified as a dictionary with
- column name as key and value(s) representing the valid desired row values.
- For example, {'wcs_id':'OPUS','extver':2}.
- """
-
- mask = None
- for i in selections:
- icol = wcstab.field(i)
- if isinstance(icol,np.chararray): icol = icol.rstrip()
- selecti = selections[i]
- if not isinstance(selecti,list):
- if isinstance(selecti,str):
- selecti = selecti.rstrip()
- bmask = (icol == selecti)
- if mask is None:
- mask = bmask.copy()
- else:
- mask = np.logical_and(mask,bmask)
- del bmask
- else:
- for si in selecti:
- if isinstance(si,str):
- si = si.rstrip()
- bmask = (icol == si)
- if mask is None:
- mask = bmask.copy()
- else:
- mask = np.logical_or(mask,bmask)
- del bmask
-
- return mask
-
-
-def archive_wcs_file(image, wcs_id=None):
- """
- Update WCSCORR table with rows for each SCI extension to record the
- newly updated WCS keyword values.
- """
-
- if not isinstance(image, fits.HDUList):
- fimg = fits.open(image, mode='update')
- close_image = True
- else:
- fimg = image
- close_image = False
-
- update_wcscorr(fimg, wcs_id=wcs_id)
-
- if close_image:
- fimg.close()
-
-
-def update_wcscorr(dest, source=None, extname='SCI', wcs_id=None, active=True):
- """
- Update WCSCORR table with a new row or rows for this extension header. It
- copies the current set of WCS keywords as a new row of the table based on
- keyed WCSs as per Paper I Multiple WCS standard).
-
- Parameters
- ----------
- dest : HDUList
- The HDU list whose WCSCORR table should be appended to (the WCSCORR HDU
- must already exist)
- source : HDUList, optional
- The HDU list containing the extension from which to extract the WCS
- keywords to add to the WCSCORR table. If None, the dest is also used
- as the source.
- extname : str, optional
- The extension name from which to take new WCS keywords. If there are
- multiple extensions with that name, rows are added for each extension
- version.
- wcs_id : str, optional
- The name of the WCS to add, as in the WCSNAMEa keyword. If
- unspecified, all the WCSs in the specified extensions are added.
- active: bool, optional
- When True, indicates that the update should reflect an update of the
- active WCS information, not just appending the WCS to the file as a
- headerlet
- """
- if not isinstance(dest, fits.HDUList):
- dest = fits.open(dest,mode='update')
- fname = dest.filename()
-
- if source is None:
- source = dest
-
- if extname == 'PRIMARY':
- return
-
- numext = fileutil.countExtn(source, extname)
- if numext == 0:
- raise ValueError('No %s extensions found in the source HDU list.'
- % extname)
- # Initialize the WCSCORR table extension in dest if not already present
- init_wcscorr(dest)
- try:
- dest.index_of('WCSCORR')
- except KeyError:
- return
-
- # check to see whether or not this is an up-to-date table
- # replace with newly initialized table with current format
- old_table = dest['WCSCORR']
- wcscorr_cols = ['WCS_ID','EXTVER', 'SIPNAME',
- 'HDRNAME', 'NPOLNAME', 'D2IMNAME']
-
- for colname in wcscorr_cols:
- if colname not in old_table.data.columns.names:
- print("WARNING: Replacing outdated WCSCORR table...")
- outdated_table = old_table.copy()
- del dest['WCSCORR']
- init_wcscorr(dest)
- old_table = dest['WCSCORR']
- break
-
- # Current implementation assumes the same WCS keywords are in each
- # extension version; if this should not be assumed then this can be
- # modified...
- wcs_keys = altwcs.wcskeys(source[(extname, 1)].header)
- wcs_keys = [kk for kk in wcs_keys if kk]
- if ' ' not in wcs_keys: wcs_keys.append(' ') # Insure that primary WCS gets used
- # apply logic for only updating WCSCORR table with specified keywords
- # corresponding to the WCS with WCSNAME=wcs_id
- if wcs_id is not None:
- wnames = altwcs.wcsnames(source[(extname, 1)].header)
- wkeys = []
- for letter in wnames:
- if wnames[letter] == wcs_id:
- wkeys.append(letter)
- if len(wkeys) > 1 and ' ' in wkeys:
- wkeys.remove(' ')
- wcs_keys = wkeys
- wcshdr = stwcs.wcsutil.HSTWCS(source, ext=(extname, 1)).wcs2header()
- wcs_keywords = list(wcshdr.keys())
-
- if 'O' in wcs_keys:
- wcs_keys.remove('O') # 'O' is reserved for original OPUS WCS
-
- # create new table for hdr and populate it with the newly updated values
- new_table = create_wcscorr(descrip=True,numrows=0, padding=len(wcs_keys)*numext)
- prihdr = source[0].header
-
- # Get headerlet related keywords here
- sipname, idctab = utils.build_sipname(source, fname, "None")
- npolname, npolfile = utils.build_npolname(source, None)
- d2imname, d2imfile = utils.build_d2imname(source, None)
- if 'hdrname' in prihdr:
- hdrname = prihdr['hdrname']
- else:
- hdrname = ''
-
- idx = -1
- for wcs_key in wcs_keys:
- for extver in range(1, numext + 1):
- extn = (extname, extver)
- if 'SIPWCS' in extname and not active:
- tab_extver = 0 # Since it has not been added to the SCI header yet
- else:
- tab_extver = extver
- hdr = source[extn].header
- if 'WCSNAME'+wcs_key in hdr:
- wcsname = hdr['WCSNAME' + wcs_key]
- else:
- wcsname = utils.build_default_wcsname(hdr['idctab'])
-
- selection = {'WCS_ID': wcsname, 'EXTVER': tab_extver,
- 'SIPNAME':sipname, 'HDRNAME': hdrname,
- 'NPOLNAME': npolname, 'D2IMNAME':d2imname
- }
-
- # Ensure that an entry for this WCS is not already in the dest
- # table; if so just skip it
- rowind = find_wcscorr_row(old_table.data, selection)
- if np.any(rowind):
- continue
-
- idx += 1
-
- wcs = stwcs.wcsutil.HSTWCS(source, ext=extn, wcskey=wcs_key)
- wcshdr = wcs.wcs2header()
-
- # Update selection column values
- for key, val in selection.items():
- if key in new_table.data.names:
- new_table.data.field(key)[idx] = val
-
- for key in wcs_keywords:
- if key in new_table.data.names:
- new_table.data.field(key)[idx] = wcshdr[key + wcs_key]
-
- for key in DEFAULT_PRI_KEYS:
- if key in new_table.data.names and key in prihdr:
- new_table.data.field(key)[idx] = prihdr[key]
- # Now look for additional, non-WCS-keyword table column data
- for key in COL_FITSKW_DICT:
- fitkw = COL_FITSKW_DICT[key]
- # Interpret any 'pri.hdrname' or
- # 'sci.crpix1' formatted keyword names
- if '.' in fitkw:
- srchdr,fitkw = fitkw.split('.')
- if 'pri' in srchdr.lower(): srchdr = prihdr
- else: srchdr = source[extn].header
- else:
- srchdr = source[extn].header
-
- if fitkw+wcs_key in srchdr:
- new_table.data.field(key)[idx] = srchdr[fitkw+wcs_key]
-
-
- # If idx was never incremented, no rows were added, so there's nothing else
- # to do...
- if idx < 0:
- return
-
- # Now, we need to merge this into the existing table
- rowind = find_wcscorr_row(old_table.data, {'wcs_id':['','0.0']})
- old_nrows = np.where(rowind)[0][0]
- new_nrows = new_table.data.shape[0]
-
- # check to see if there is room for the new row
- if (old_nrows + new_nrows) > old_table.data.shape[0]-1:
- pad_rows = 2 * new_nrows
- # if not, create a new table with 'pad_rows' new empty rows
- upd_table = fits.new_table(old_table.columns,header=old_table.header,
- nrows=old_table.data.shape[0]+pad_rows)
- else:
- upd_table = old_table
- pad_rows = 0
- # Now, add
- for name in old_table.columns.names:
- if name in new_table.data.names:
- # reset the default values to ones specific to the row definitions
- for i in range(pad_rows):
- upd_table.data.field(name)[old_nrows+i] = old_table.data.field(name)[-1]
- # Now populate with values from new table
- upd_table.data.field(name)[old_nrows:old_nrows + new_nrows] = \
- new_table.data.field(name)
- upd_table.header['TROWS'] = old_nrows + new_nrows
-
- # replace old extension with newly updated table extension
- dest['WCSCORR'] = upd_table
-
-
-def restore_file_from_wcscorr(image, id='OPUS', wcskey=''):
- """ Copies the values of the WCS from the WCSCORR based on ID specified by user.
- The default will be to restore the original OPUS-derived values to the Primary WCS.
- If wcskey is specified, the WCS with that key will be updated instead.
- """
-
- if not isinstance(image, fits.HDUList):
- fimg = fits.open(image, mode='update')
- close_image = True
- else:
- fimg = image
- close_image = False
- numsci = fileutil.countExtn(fimg)
- wcs_table = fimg['WCSCORR']
- orig_rows = (wcs_table.data.field('WCS_ID') == 'OPUS')
- # create an HSTWCS object to figure out what WCS keywords need to be updated
- wcsobj = stwcs.wcsutil.HSTWCS(fimg,ext=('sci',1))
- wcshdr = wcsobj.wcs2header()
- for extn in range(1,numsci+1):
- # find corresponding row from table
- ext_rows = (wcs_table.data.field('EXTVER') == extn)
- erow = np.where(np.logical_and(ext_rows,orig_rows))[0][0]
- for key in wcshdr:
- if key in wcs_table.data.names: # insure that keyword is column in table
- tkey = key
-
- if 'orient' in key.lower():
- key = 'ORIENT'
- if wcskey == '':
- skey = key
- else:
- skey = key[:7]+wcskey
- fimg['sci',extn].header[skey] = wcs_table.data.field(tkey)[erow]
- for key in DEFAULT_PRI_KEYS:
- if key in wcs_table.data.names:
- if wcskey == '':
- pkey = key
- else:
- pkey = key[:7]+wcskey
- fimg[0].header[pkey] = wcs_table.data.field(key)[erow]
-
- utils.updateNEXTENDKw(fimg)
-
- # close the image now that the update has been completed.
- if close_image:
- fimg.close()
-
-
-def create_wcscorr(descrip=False, numrows=1, padding=0):
- """
- Return the basic definitions for a WCSCORR table.
- The dtype definitions for the string columns are set to the maximum allowed so
- that all new elements will have the same max size which will be automatically
- truncated to this limit upon updating (if needed).
-
- The table is initialized with rows corresponding to the OPUS solution
- for all the 'SCI' extensions.
- """
-
- trows = numrows + padding
- # define initialized arrays as placeholders for column data
- # TODO: I'm certain there's an easier way to do this... for example, simply
- # define the column names and formats, then create an empty array using
- # them as a dtype, then create the new table from that array.
- def_float64_zeros = np.array([0.0] * trows, dtype=np.float64)
- def_float64_ones = def_float64_zeros + 1.0
- def_float_col = {'format': 'D', 'array': def_float64_zeros.copy()}
- def_float1_col = {'format': 'D', 'array':def_float64_ones.copy()}
- def_str40_col = {'format': '40A',
- 'array': np.array([''] * trows, dtype='S40')}
- def_str24_col = {'format': '24A',
- 'array': np.array([''] * trows, dtype='S24')}
- def_int32_col = {'format': 'J',
- 'array': np.array([0]*trows,dtype=np.int32)}
-
- # If more columns are needed, simply add their definitions to this list
- col_names = [('HDRNAME', def_str24_col), ('SIPNAME', def_str24_col),
- ('NPOLNAME', def_str24_col), ('D2IMNAME', def_str24_col),
- ('CRVAL1', def_float_col), ('CRVAL2', def_float_col),
- ('CRPIX1', def_float_col), ('CRPIX2', def_float_col),
- ('CD1_1', def_float_col), ('CD1_2', def_float_col),
- ('CD2_1', def_float_col), ('CD2_2', def_float_col),
- ('CTYPE1', def_str24_col), ('CTYPE2', def_str24_col),
- ('ORIENTAT', def_float_col), ('PA_V3', def_float_col),
- ('RMS_RA', def_float_col), ('RMS_Dec', def_float_col),
- ('NMatch', def_int32_col), ('Catalog', def_str40_col)]
-
- # Define selector columns
- id_col = fits.Column(name='WCS_ID', format='40A',
- array=np.array(['OPUS'] * numrows + [''] * padding,
- dtype='S24'))
- extver_col = fits.Column(name='EXTVER', format='I',
- array=np.array(list(range(1, numrows + 1)),
- dtype=np.int16))
- wcskey_col = fits.Column(name='WCS_key', format='A',
- array=np.array(['O'] * numrows + [''] * padding,
- dtype='S'))
- # create list of remaining columns to be added to table
- col_list = [id_col, extver_col, wcskey_col] # start with selector columns
-
- for c in col_names:
- cdef = copy.deepcopy(c[1])
- col_list.append(fits.Column(name=c[0], format=cdef['format'],
- array=cdef['array']))
-
- if descrip:
- col_list.append(
- fits.Column(name='DESCRIP', format='128A',
- array=np.array(
- ['Original WCS computed by OPUS'] * numrows,
- dtype='S128')))
-
- # Now create the new table from the column definitions
- newtab = fits.new_table(fits.ColDefs(col_list), nrows=trows)
- # The fact that setting .name is necessary should be considered a bug in
- # pyfits.
- # TODO: Make sure this is fixed in pyfits, then remove this
- newtab.name = 'WCSCORR'
-
- return newtab
-
-def delete_wcscorr_row(wcstab,selections=None,rows=None):
- """
- Sets all values in a specified row or set of rows to default values
-
- This function will essentially erase the specified row from the table
- without actually removing the row from the table. This avoids the problems
- with trying to resize the number of rows in the table while preserving the
- ability to update the table with new rows again without resizing the table.
-
- Parameters
- ----------
- wcstab: object
- PyFITS binTable object for WCSCORR table
- selections: dict
- Dictionary of wcscorr column names and values to be used to select
- the row or set of rows to erase
- rows: int, list
- If specified, will specify what rows from the table to erase regardless
- of the value of 'selections'
- """
-
- if selections is None and rows is None:
- print('ERROR: Some row selection information must be provided!')
- print(' Either a row numbers or "selections" must be provided.')
- raise ValueError
-
- delete_rows = None
- if rows is None:
- if 'wcs_id' in selections and selections['wcs_id'] == 'OPUS':
- delete_rows = None
- print('WARNING: OPUS WCS information can not be deleted from WCSCORR table.')
- print(' This row will not be deleted!')
- else:
- rowind = find_wcscorr_row(wcstab, selections=selections)
- delete_rows = np.where(rowind)[0].tolist()
- else:
- if not isinstance(rows,list):
- rows = [rows]
- delete_rows = rows
-
- # Insure that rows pointing to OPUS WCS do not get deleted, even by accident
- for row in delete_rows:
- if wcstab['WCS_key'][row] == 'O' or wcstab['WCS_ID'][row] == 'OPUS':
- del delete_rows[delete_rows.index(row)]
-
- if delete_rows is None:
- return
-
- # identify next empty row
- rowind = find_wcscorr_row(wcstab, selections={'wcs_id':['','0.0']})
- last_blank_row = np.where(rowind)[0][-1]
-
- # copy values from blank row into user-specified rows
- for colname in wcstab.names:
- wcstab[colname][delete_rows] = wcstab[colname][last_blank_row]
-
-def update_wcscorr_column(wcstab, column, values, selections=None, rows=None):
- """
- Update the values in 'column' with 'values' for selected rows
-
- Parameters
- ----------
- wcstab: object
- PyFITS binTable object for WCSCORR table
- column: string
- Name of table column with values that need to be updated
- values: string, int, or list
- Value or set of values to copy into the selected rows for the column
- selections: dict
- Dictionary of wcscorr column names and values to be used to select
- the row or set of rows to erase
- rows: int, list
- If specified, will specify what rows from the table to erase regardless
- of the value of 'selections'
- """
- if selections is None and rows is None:
- print('ERROR: Some row selection information must be provided!')
- print(' Either a row numbers or "selections" must be provided.')
- raise ValueError
-
- if not isinstance(values, list):
- values = [values]
-
- update_rows = None
- if rows is None:
- if 'wcs_id' in selections and selections['wcs_id'] == 'OPUS':
- update_rows = None
- print('WARNING: OPUS WCS information can not be deleted from WCSCORR table.')
- print(' This row will not be deleted!')
- else:
- rowind = find_wcscorr_row(wcstab, selections=selections)
- update_rows = np.where(rowind)[0].tolist()
- else:
- if not isinstance(rows,list):
- rows = [rows]
- update_rows = rows
-
- if update_rows is None:
- return
-
- # Expand single input value to apply to all selected rows
- if len(values) > 1 and len(values) < len(update_rows):
- print('ERROR: Number of new values',len(values))
- print(' does not match number of rows',len(update_rows),' to be updated!')
- print(' Please enter either 1 value or the same number of values')
- print(' as there are rows to be updated.')
- print(' Table will not be updated...')
- raise ValueError
-
- if len(values) == 1 and len(values) < len(update_rows):
- values = values * len(update_rows)
- # copy values from blank row into user-specified rows
- for row in update_rows:
- wcstab[column][row] = values[row]
diff --git a/lib/stwcs/wcsutil/wcsdiff.py b/lib/stwcs/wcsutil/wcsdiff.py
deleted file mode 100644
index cfc2d66..0000000
--- a/lib/stwcs/wcsutil/wcsdiff.py
+++ /dev/null
@@ -1,150 +0,0 @@
-from __future__ import print_function
-from astropy import wcs as pywcs
-from collections import OrderedDict
-from astropy.io import fits
-from .headerlet import parse_filename
-import numpy as np
-
-def is_wcs_identical(scifile, file2, sciextlist, fextlist, scikey=" ",
- file2key=" ", verbose=False):
- """
- Compares the WCS solution of 2 files.
-
- Parameters
- ----------
- scifile: string
- name of file1 (usually science file)
- IRAF style extension syntax is accepted as well
- for example scifile[1] or scifile[sci,1]
- file2: string
- name of second file (for example headerlet)
- sciextlist - list
- a list of int or tuple ('SCI', 1), extensions in the first file
- fextlist - list
- a list of int or tuple ('SIPWCS', 1), extensions in the second file
- scikey: string
- alternate WCS key in scifile
- file2key: string
- alternate WCS key in file2
- verbose: boolean
- True: print to stdout
-
- Notes
- -----
- These can be 2 science observations or 2 headerlets
- or a science observation and a headerlet. The two files
- have the same WCS solution if the following are the same:
-
- - rootname/destim
- - primary WCS
- - SIP coefficients
- - NPOL distortion
- - D2IM correction
-
- """
- result = True
- diff = OrderedDict()
- fobj, fname, close_file = parse_filename(file2)
- sciobj, sciname, close_scifile = parse_filename(scifile)
- diff['file_names'] = [scifile, file2]
- if get_rootname(scifile) != get_rootname(file2):
- #logger.info('Rootnames do not match.')
- diff['rootname'] = ("%s: %s", "%s: %s") % (sciname, get_rootname(scifile), file2, get_rootname(file2))
- result = False
- for i, j in zip(sciextlist, fextlist):
- w1 = pywcs.WCS(sciobj[i].header, sciobj, key=scikey)
- w2 = pywcs.WCS(fobj[j].header, fobj, key=file2key)
- diff['extension'] = [get_extname_extnum(sciobj[i]), get_extname_extnum(fobj[j])]
- if not np.allclose(w1.wcs.crval, w2.wcs.crval, rtol=10**(-7)):
- #logger.info('CRVALs do not match')
- diff['CRVAL'] = w1.wcs.crval, w2.wcs.crval
- result = False
- if not np.allclose(w1.wcs.crpix, w2.wcs.crpix, rtol=10**(-7)):
- #logger.info('CRPIX do not match')
- diff ['CRPIX'] = w1.wcs.crpix, w2.wcs.crpix
- result = False
- if not np.allclose(w1.wcs.cd, w2.wcs.cd, rtol=10**(-7)):
- #logger.info('CDs do not match')
- diff ['CD'] = w1.wcs.cd, w2.wcs.cd
- result = False
- if not (np.array(w1.wcs.ctype) == np.array(w2.wcs.ctype)).all():
- #logger.info('CTYPEs do not match')
- diff ['CTYPE'] = w1.wcs.ctype, w2.wcs.ctype
- result = False
- if w1.sip or w2.sip:
- if (w2.sip and not w1.sip) or (w1.sip and not w2.sip):
- diff['sip'] = 'one sip extension is missing'
- result = False
- if not np.allclose(w1.sip.a, w2.sip.a, rtol=10**(-7)):
- diff['SIP_A'] = 'SIP_A differ'
- result = False
- if not np.allclose(w1.sip.b, w2.sip.b, rtol=10**(-7)):
- #logger.info('SIP coefficients do not match')
- diff ['SIP_B'] = (w1.sip.b, w2.sip.b)
- result = False
- if w1.cpdis1 or w2.cpdis1:
- if w1.cpdis1 and not w2.cpdis1 or w2.cpdis1 and not w1.cpdis1:
- diff['CPDIS1'] = "CPDIS1 missing"
- result=False
- if w1.cpdis2 and not w2.cpdis2 or w2.cpdis2 and not w1.cpdis2:
- diff['CPDIS2'] = "CPDIS2 missing"
- result = False
- if not np.allclose(w1.cpdis1.data, w2.cpdis1.data, rtol=10**(-7)):
- #logger.info('NPOL distortions do not match')
- diff ['CPDIS1_data'] = (w1.cpdis1.data, w2.cpdis1.data)
- result = False
- if not np.allclose(w1.cpdis2.data, w2.cpdis2.data, rtol=10**(-7)):
- #logger.info('NPOL distortions do not match')
- diff ['CPDIS2_data'] = (w1.cpdis2.data, w2.cpdis2.data)
- result = False
- if w1.det2im1 or w2.det2im1:
- if w1.det2im1 and not w2.det2im1 or \
- w2.det2im1 and not w1.det2im1:
- diff['DET2IM'] = "Det2im1 missing"
- result = False
- if not np.allclose(w1.det2im1.data, w2.det2im1.data, rtol=10**(-7)):
- #logger.info('Det2Im corrections do not match')
- diff ['D2IM1_data'] = (w1.det2im1.data, w2.det2im1.data)
- result = False
- if w1.det2im2 or w2.det2im2:
- if w1.det2im2 and not w2.det2im2 or \
- w2.det2im2 and not w1.det2im2:
- diff['DET2IM2'] = "Det2im2 missing"
- result = False
- if not np.allclose(w1.det2im2.data, w2.det2im2.data, rtol=10**(-7)):
- #logger.info('Det2Im corrections do not match')
- diff ['D2IM2_data'] = (w1.det2im2.data, w2.det2im2.data)
- result = False
- if not result and verbose:
- for key in diff:
- print(key, ":\t", diff[key][0], "\t", diff[key][1])
- if close_file:
- fobj.close()
- if close_scifile:
- sciobj.close()
- return result, diff
-
-def get_rootname(fname):
- """
- Returns the value of ROOTNAME or DESTIM
- """
-
- hdr = fits.getheader(fname)
- try:
- rootname = hdr['ROOTNAME']
- except KeyError:
- try:
- rootname = hdr['DESTIM']
- except KeyError:
- rootname = fname
- return rootname
-
-def get_extname_extnum(ext):
- """
- Return (EXTNAME, EXTNUM) of a FITS extension
- """
- extname = ""
- extnum=1
- extname = ext.header.get('EXTNAME', extname)
- extnum = ext.header.get('EXTVER', extnum)
- return (extname, extnum)