summaryrefslogtreecommitdiff
path: root/stwcs/updatewcs/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'stwcs/updatewcs/__init__.py')
-rw-r--r--stwcs/updatewcs/__init__.py376
1 files changed, 376 insertions, 0 deletions
diff --git a/stwcs/updatewcs/__init__.py b/stwcs/updatewcs/__init__.py
new file mode 100644
index 0000000..eb5e850
--- /dev/null
+++ b/stwcs/updatewcs/__init__.py
@@ -0,0 +1,376 @@
+from __future__ import absolute_import, division, print_function # confidence high
+
+from astropy.io import fits
+from stwcs import wcsutil
+from stwcs.wcsutil import HSTWCS
+import stwcs
+
+from astropy import wcs as pywcs
+import astropy
+
+from . import utils, corrections, makewcs
+from . import npol, det2im
+from stsci.tools import parseinput, fileutil
+from . import apply_corrections
+
+import time
+import logging
+logger = logging.getLogger('stwcs.updatewcs')
+
+import atexit
+atexit.register(logging.shutdown)
+
+#Note: The order of corrections is important
+
+def updatewcs(input, vacorr=True, tddcorr=True, npolcorr=True, d2imcorr=True,
+ checkfiles=True, verbose=False):
+ """
+
+ Updates HST science files with the best available calibration information.
+ This allows users to retrieve from the archive self contained science files
+ which do not require additional reference files.
+
+ Basic WCS keywords are updated in the process and new keywords (following WCS
+ Paper IV and the SIP convention) as well as new extensions are added to the science files.
+
+
+ Example
+ -------
+ >>>from stwcs import updatewcs
+ >>>updatewcs.updatewcs(filename)
+
+ Dependencies
+ ------------
+ `stsci.tools`
+ `astropy.io.fits`
+ `astropy.wcs`
+
+ Parameters
+ ----------
+ input: a python list of file names or a string (wild card characters allowed)
+ input files may be in fits, geis or waiver fits format
+ vacorr: boolean
+ If True, vecocity aberration correction will be applied
+ tddcorr: boolean
+ If True, time dependent distortion correction will be applied
+ npolcorr: boolean
+ If True, a Lookup table distortion will be applied
+ d2imcorr: boolean
+ If True, detector to image correction will be applied
+ checkfiles: boolean
+ If True, the format of the input files will be checked,
+ geis and waiver fits files will be converted to MEF format.
+ Default value is True for standalone mode.
+ """
+ if verbose == False:
+ logger.setLevel(100)
+ else:
+ formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+ log_filename = 'stwcs.log'
+ fh = logging.FileHandler(log_filename, mode='w')
+ fh.setLevel(logging.DEBUG)
+ fh.setFormatter(formatter)
+ logger.addHandler(fh)
+ logger.setLevel(verbose)
+ args = "vacorr=%s, tddcorr=%s, npolcorr=%s, d2imcorr=%s, checkfiles=%s, \
+ " % (str(vacorr), str(tddcorr), str(npolcorr),
+ str(d2imcorr), str(checkfiles))
+ logger.info('\n\tStarting UPDATEWCS: %s', time.asctime())
+
+ files = parseinput.parseinput(input)[0]
+ logger.info("\n\tInput files: %s, " % [i for i in files])
+ logger.info("\n\tInput arguments: %s" %args)
+ if checkfiles:
+ files = checkFiles(files)
+ if not files:
+ print('No valid input, quitting ...\n')
+ return
+
+ for f in files:
+ acorr = apply_corrections.setCorrections(f, vacorr=vacorr, \
+ tddcorr=tddcorr,npolcorr=npolcorr, d2imcorr=d2imcorr)
+ if 'MakeWCS' in acorr and newIDCTAB(f):
+ logger.warning("\n\tNew IDCTAB file detected. All current WCSs will be deleted")
+ cleanWCS(f)
+
+ makecorr(f, acorr)
+
+ return files
+
+def makecorr(fname, allowed_corr):
+ """
+ Purpose
+ =======
+ Applies corrections to the WCS of a single file
+
+ :Parameters:
+ `fname`: string
+ file name
+ `acorr`: list
+ list of corrections to be applied
+ """
+ logger.info("Allowed corrections: {0}".format(allowed_corr))
+ f = fits.open(fname, mode='update')
+ #Determine the reference chip and create the reference HSTWCS object
+ nrefchip, nrefext = getNrefchip(f)
+ wcsutil.restoreWCS(f, nrefext, wcskey='O')
+ rwcs = HSTWCS(fobj=f, ext=nrefext)
+ rwcs.readModel(update=True,header=f[nrefext].header)
+
+ if 'DET2IMCorr' in allowed_corr:
+ kw2update = det2im.DET2IMCorr.updateWCS(f)
+ for kw in kw2update:
+ f[1].header[kw] = kw2update[kw]
+
+ for i in range(len(f))[1:]:
+ extn = f[i]
+
+ if 'extname' in extn.header:
+ extname = extn.header['extname'].lower()
+ if extname == 'sci':
+ wcsutil.restoreWCS(f, ext=i, wcskey='O')
+ sciextver = extn.header['extver']
+ ref_wcs = rwcs.deepcopy()
+ hdr = extn.header
+ ext_wcs = HSTWCS(fobj=f, ext=i)
+ ### check if it exists first!!!
+ # 'O ' can be safely archived again because it has been restored first.
+ wcsutil.archiveWCS(f, ext=i, wcskey="O", wcsname="OPUS", reusekey=True)
+ ext_wcs.readModel(update=True,header=hdr)
+ for c in allowed_corr:
+ if c != 'NPOLCorr' and c != 'DET2IMCorr':
+ corr_klass = corrections.__getattribute__(c)
+ kw2update = corr_klass.updateWCS(ext_wcs, ref_wcs)
+ for kw in kw2update:
+ hdr[kw] = kw2update[kw]
+ # give the primary WCS a WCSNAME value
+ idcname = f[0].header.get('IDCTAB', " ")
+ if idcname.strip() and 'idc.fits' in idcname:
+ wname = ''.join(['IDC_',
+ utils.extract_rootname(idcname,suffix='_idc')])
+ else: wname = " "
+ hdr['WCSNAME'] = wname
+
+ elif extname in ['err', 'dq', 'sdq', 'samp', 'time']:
+ cextver = extn.header['extver']
+ if cextver == sciextver:
+ hdr = f[('SCI',sciextver)].header
+ w = pywcs.WCS(hdr, f)
+ copyWCS(w, extn.header)
+
+ else:
+ continue
+
+ if 'NPOLCorr' in allowed_corr:
+ kw2update = npol.NPOLCorr.updateWCS(f)
+ for kw in kw2update:
+ f[1].header[kw] = kw2update[kw]
+ # Finally record the version of the software which updated the WCS
+ if 'HISTORY' in f[0].header:
+ f[0].header.set('UPWCSVER', value=stwcs.__version__,
+ comment="Version of STWCS used to updated the WCS",
+ before='HISTORY')
+ f[0].header.set('PYWCSVER', value=astropy.__version__,
+ comment="Version of PYWCS used to updated the WCS",
+ before='HISTORY')
+ elif 'ASN_MTYP' in f[0].header:
+ f[0].header.set('UPWCSVER', value=stwcs.__version__,
+ comment="Version of STWCS used to updated the WCS",
+ after='ASN_MTYP')
+ f[0].header.set('PYWCSVER', value=astropy.__version__,
+ comment="Version of PYWCS used to updated the WCS",
+ after='ASN_MTYP')
+ else:
+ # Find index of last non-blank card, and insert this new keyword after that card
+ for i in range(len(f[0].header) - 1, 0, -1):
+ if f[0].header[i].strip() != '':
+ break
+ f[0].header.set('UPWCSVER', stwcs.__version__,
+ "Version of STWCS used to updated the WCS",
+ after=i)
+ f[0].header.set('PYWCSVER', astropy.__version__,
+ "Version of PYWCS used to updated the WCS",
+ after=i)
+ # add additional keywords to be used by headerlets
+ distdict = utils.construct_distname(f,rwcs)
+ f[0].header['DISTNAME'] = distdict['DISTNAME']
+ f[0].header['SIPNAME'] = distdict['SIPNAME']
+ # Make sure NEXTEND keyword remains accurate
+ f[0].header['NEXTEND'] = len(f)-1
+ f.close()
+
+def copyWCS(w, ehdr):
+ """
+ This is a convenience function to copy a WCS object
+ to a header as a primary WCS. It is used only to copy the
+ WCS of the 'SCI' extension to the headers of 'ERR', 'DQ', 'SDQ',
+ 'TIME' or 'SAMP' extensions.
+ """
+ hwcs = w.to_header()
+
+ if w.wcs.has_cd():
+ wcsutil.pc2cd(hwcs)
+ for k in hwcs.keys():
+ key = k[:7]
+ ehdr[key] = hwcs[k]
+
+def getNrefchip(fobj):
+ """
+
+ Finds which FITS extension holds the reference chip.
+
+ The reference chip has EXTNAME='SCI', can be in any extension and
+ is instrument specific. This functions provides mappings between
+ sci extensions, chips and fits extensions.
+ In the case of a subarray when the reference chip is missing, the
+ first 'SCI' extension is the reference chip.
+
+ Parameters
+ ----------
+ fobj: `astropy.io.fits.HDUList` object
+ """
+ nrefext = 1
+ nrefchip = 1
+ instrument = fobj[0].header['INSTRUME']
+
+ if instrument == 'WFPC2':
+ chipkw = 'DETECTOR'
+ extvers = [("SCI",img.header['EXTVER']) for img in
+ fobj[1:] if img.header['EXTNAME'].lower()=='sci']
+ detectors = [img.header[chipkw] for img in fobj[1:] if
+ img.header['EXTNAME'].lower()=='sci']
+ fitsext = [i for i in range(len(fobj))[1:] if
+ fobj[i].header['EXTNAME'].lower()=='sci']
+ det2ext=dict(list(zip(detectors, extvers)))
+ ext2det=dict(list(zip(extvers, detectors)))
+ ext2fitsext=dict(list(zip(extvers, fitsext)))
+
+ if 3 not in detectors:
+ nrefchip = ext2det.pop(extvers[0])
+ nrefext = ext2fitsext.pop(extvers[0])
+ else:
+ nrefchip = 3
+ extname = det2ext.pop(nrefchip)
+ nrefext = ext2fitsext.pop(extname)
+
+ elif (instrument == 'ACS' and fobj[0].header['DETECTOR'] == 'WFC') or \
+ (instrument == 'WFC3' and fobj[0].header['DETECTOR'] == 'UVIS'):
+ chipkw = 'CCDCHIP'
+ extvers = [("SCI",img.header['EXTVER']) for img in
+ fobj[1:] if img.header['EXTNAME'].lower()=='sci']
+ detectors = [img.header[chipkw] for img in fobj[1:] if
+ img.header['EXTNAME'].lower()=='sci']
+ fitsext = [i for i in range(len(fobj))[1:] if
+ fobj[i].header['EXTNAME'].lower()=='sci']
+ det2ext=dict(list(zip(detectors, extvers)))
+ ext2det=dict(list(zip(extvers, detectors)))
+ ext2fitsext=dict(list(zip(extvers, fitsext)))
+
+ if 2 not in detectors:
+ nrefchip = ext2det.pop(extvers[0])
+ nrefext = ext2fitsext.pop(extvers[0])
+ else:
+ nrefchip = 2
+ extname = det2ext.pop(nrefchip)
+ nrefext = ext2fitsext.pop(extname)
+ else:
+ for i in range(len(fobj)):
+ extname = fobj[i].header.get('EXTNAME', None)
+ if extname != None and extname.lower == 'sci':
+ nrefext = i
+ break
+
+ return nrefchip, nrefext
+
+def checkFiles(input):
+ """
+ Checks that input files are in the correct format.
+ Converts geis and waiver fits files to multiextension fits.
+ """
+ from stsci.tools.check_files import geis2mef, waiver2mef, checkFiles
+ logger.info("\n\tChecking files %s" % input)
+ removed_files = []
+ newfiles = []
+ if not isinstance(input, list):
+ input=[input]
+
+ for file in input:
+ try:
+ imgfits,imgtype = fileutil.isFits(file)
+ except IOError:
+ logger.warning( "\n\tFile %s could not be found, removing it from the input list.\n" %file)
+ removed_files.append(file)
+ continue
+ # Check for existence of waiver FITS input, and quit if found.
+ # Or should we print a warning and continue but not use that file
+ if imgfits:
+ if imgtype == 'waiver':
+ newfilename = waiver2mef(file, convert_dq=True)
+ if newfilename == None:
+ logger.warning("\n\tRemoving file %s from input list - could not convert waiver to mef" %file)
+ removed_files.append(file)
+ else:
+ newfiles.append(newfilename)
+ else:
+ newfiles.append(file)
+
+ # If a GEIS image is provided as input, create a new MEF file with
+ # a name generated using 'buildFITSName()'
+ # Convert the corresponding data quality file if present
+ if not imgfits:
+ newfilename = geis2mef(file, convert_dq=True)
+ if newfilename == None:
+ logger.warning("\n\tRemoving file %s from input list - could not convert geis to mef" %file)
+ removed_files.append(file)
+ else:
+ newfiles.append(newfilename)
+ if removed_files:
+ logger.warning('\n\tThe following files will be removed from the list of files to be processed %s' % removed_files)
+
+ newfiles = checkFiles(newfiles)[0]
+ logger.info("\n\tThese files passed the input check and will be processed: %s" % newfiles)
+ return newfiles
+
+def newIDCTAB(fname):
+ #When this is called we know there's a kw IDCTAB in the header
+ hdul = fits.open(fname)
+ idctab = fileutil.osfn(hdul[0].header['IDCTAB'])
+ try:
+ #check for the presence of IDCTAB in the first extension
+ oldidctab = fileutil.osfn(hdul[1].header['IDCTAB'])
+ except KeyError:
+ return False
+ if idctab == oldidctab:
+ return False
+ else:
+ return True
+
+def cleanWCS(fname):
+ # A new IDCTAB means all previously computed WCS's are invalid
+ # We are deleting all of them except the original OPUS WCS.
+ f = fits.open(fname, mode='update')
+ keys = wcsutil.wcskeys(f[1].header)
+ # Remove the primary WCS from the list
+ try:
+ keys.remove(' ')
+ except ValueError:
+ pass
+ fext = list(range(1, len(f)))
+ for key in keys:
+ try:
+ wcsutil.deleteWCS(fname, ext=fext, wcskey=key)
+ except KeyError:
+ # Some extensions don't have the alternate (or any) WCS keywords
+ continue
+
+def getCorrections(instrument):
+ """
+ Print corrections available for an instrument
+
+ :Parameters:
+ `instrument`: string, one of 'WFPC2', 'NICMOS', 'STIS', 'ACS', 'WFC3'
+ """
+ acorr = apply_corrections.allowed_corrections[instrument]
+
+ print("The following corrections will be performed for instrument %s\n" % instrument)
+ for c in acorr: print(c,': ' , apply_corrections.cnames[c])