diff options
Diffstat (limited to 'lib/stwcs/updatewcs')
-rw-r--r-- | lib/stwcs/updatewcs/__init__.py | 376 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/apply_corrections.py | 248 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/corrections.py | 326 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/det2im.py | 299 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/makewcs.py | 273 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/npol.py | 343 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/utils.py | 264 | ||||
-rw-r--r-- | lib/stwcs/updatewcs/wfpc2_dgeo.py | 123 |
8 files changed, 0 insertions, 2252 deletions
diff --git a/lib/stwcs/updatewcs/__init__.py b/lib/stwcs/updatewcs/__init__.py deleted file mode 100644 index eb5e850..0000000 --- a/lib/stwcs/updatewcs/__init__.py +++ /dev/null @@ -1,376 +0,0 @@ -from __future__ import absolute_import, division, print_function # confidence high - -from astropy.io import fits -from stwcs import wcsutil -from stwcs.wcsutil import HSTWCS -import stwcs - -from astropy import wcs as pywcs -import astropy - -from . import utils, corrections, makewcs -from . import npol, det2im -from stsci.tools import parseinput, fileutil -from . import apply_corrections - -import time -import logging -logger = logging.getLogger('stwcs.updatewcs') - -import atexit -atexit.register(logging.shutdown) - -#Note: The order of corrections is important - -def updatewcs(input, vacorr=True, tddcorr=True, npolcorr=True, d2imcorr=True, - checkfiles=True, verbose=False): - """ - - Updates HST science files with the best available calibration information. - This allows users to retrieve from the archive self contained science files - which do not require additional reference files. - - Basic WCS keywords are updated in the process and new keywords (following WCS - Paper IV and the SIP convention) as well as new extensions are added to the science files. - - - Example - ------- - >>>from stwcs import updatewcs - >>>updatewcs.updatewcs(filename) - - Dependencies - ------------ - `stsci.tools` - `astropy.io.fits` - `astropy.wcs` - - Parameters - ---------- - input: a python list of file names or a string (wild card characters allowed) - input files may be in fits, geis or waiver fits format - vacorr: boolean - If True, vecocity aberration correction will be applied - tddcorr: boolean - If True, time dependent distortion correction will be applied - npolcorr: boolean - If True, a Lookup table distortion will be applied - d2imcorr: boolean - If True, detector to image correction will be applied - checkfiles: boolean - If True, the format of the input files will be checked, - geis and waiver fits files will be converted to MEF format. - Default value is True for standalone mode. - """ - if verbose == False: - logger.setLevel(100) - else: - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - log_filename = 'stwcs.log' - fh = logging.FileHandler(log_filename, mode='w') - fh.setLevel(logging.DEBUG) - fh.setFormatter(formatter) - logger.addHandler(fh) - logger.setLevel(verbose) - args = "vacorr=%s, tddcorr=%s, npolcorr=%s, d2imcorr=%s, checkfiles=%s, \ - " % (str(vacorr), str(tddcorr), str(npolcorr), - str(d2imcorr), str(checkfiles)) - logger.info('\n\tStarting UPDATEWCS: %s', time.asctime()) - - files = parseinput.parseinput(input)[0] - logger.info("\n\tInput files: %s, " % [i for i in files]) - logger.info("\n\tInput arguments: %s" %args) - if checkfiles: - files = checkFiles(files) - if not files: - print('No valid input, quitting ...\n') - return - - for f in files: - acorr = apply_corrections.setCorrections(f, vacorr=vacorr, \ - tddcorr=tddcorr,npolcorr=npolcorr, d2imcorr=d2imcorr) - if 'MakeWCS' in acorr and newIDCTAB(f): - logger.warning("\n\tNew IDCTAB file detected. All current WCSs will be deleted") - cleanWCS(f) - - makecorr(f, acorr) - - return files - -def makecorr(fname, allowed_corr): - """ - Purpose - ======= - Applies corrections to the WCS of a single file - - :Parameters: - `fname`: string - file name - `acorr`: list - list of corrections to be applied - """ - logger.info("Allowed corrections: {0}".format(allowed_corr)) - f = fits.open(fname, mode='update') - #Determine the reference chip and create the reference HSTWCS object - nrefchip, nrefext = getNrefchip(f) - wcsutil.restoreWCS(f, nrefext, wcskey='O') - rwcs = HSTWCS(fobj=f, ext=nrefext) - rwcs.readModel(update=True,header=f[nrefext].header) - - if 'DET2IMCorr' in allowed_corr: - kw2update = det2im.DET2IMCorr.updateWCS(f) - for kw in kw2update: - f[1].header[kw] = kw2update[kw] - - for i in range(len(f))[1:]: - extn = f[i] - - if 'extname' in extn.header: - extname = extn.header['extname'].lower() - if extname == 'sci': - wcsutil.restoreWCS(f, ext=i, wcskey='O') - sciextver = extn.header['extver'] - ref_wcs = rwcs.deepcopy() - hdr = extn.header - ext_wcs = HSTWCS(fobj=f, ext=i) - ### check if it exists first!!! - # 'O ' can be safely archived again because it has been restored first. - wcsutil.archiveWCS(f, ext=i, wcskey="O", wcsname="OPUS", reusekey=True) - ext_wcs.readModel(update=True,header=hdr) - for c in allowed_corr: - if c != 'NPOLCorr' and c != 'DET2IMCorr': - corr_klass = corrections.__getattribute__(c) - kw2update = corr_klass.updateWCS(ext_wcs, ref_wcs) - for kw in kw2update: - hdr[kw] = kw2update[kw] - # give the primary WCS a WCSNAME value - idcname = f[0].header.get('IDCTAB', " ") - if idcname.strip() and 'idc.fits' in idcname: - wname = ''.join(['IDC_', - utils.extract_rootname(idcname,suffix='_idc')]) - else: wname = " " - hdr['WCSNAME'] = wname - - elif extname in ['err', 'dq', 'sdq', 'samp', 'time']: - cextver = extn.header['extver'] - if cextver == sciextver: - hdr = f[('SCI',sciextver)].header - w = pywcs.WCS(hdr, f) - copyWCS(w, extn.header) - - else: - continue - - if 'NPOLCorr' in allowed_corr: - kw2update = npol.NPOLCorr.updateWCS(f) - for kw in kw2update: - f[1].header[kw] = kw2update[kw] - # Finally record the version of the software which updated the WCS - if 'HISTORY' in f[0].header: - f[0].header.set('UPWCSVER', value=stwcs.__version__, - comment="Version of STWCS used to updated the WCS", - before='HISTORY') - f[0].header.set('PYWCSVER', value=astropy.__version__, - comment="Version of PYWCS used to updated the WCS", - before='HISTORY') - elif 'ASN_MTYP' in f[0].header: - f[0].header.set('UPWCSVER', value=stwcs.__version__, - comment="Version of STWCS used to updated the WCS", - after='ASN_MTYP') - f[0].header.set('PYWCSVER', value=astropy.__version__, - comment="Version of PYWCS used to updated the WCS", - after='ASN_MTYP') - else: - # Find index of last non-blank card, and insert this new keyword after that card - for i in range(len(f[0].header) - 1, 0, -1): - if f[0].header[i].strip() != '': - break - f[0].header.set('UPWCSVER', stwcs.__version__, - "Version of STWCS used to updated the WCS", - after=i) - f[0].header.set('PYWCSVER', astropy.__version__, - "Version of PYWCS used to updated the WCS", - after=i) - # add additional keywords to be used by headerlets - distdict = utils.construct_distname(f,rwcs) - f[0].header['DISTNAME'] = distdict['DISTNAME'] - f[0].header['SIPNAME'] = distdict['SIPNAME'] - # Make sure NEXTEND keyword remains accurate - f[0].header['NEXTEND'] = len(f)-1 - f.close() - -def copyWCS(w, ehdr): - """ - This is a convenience function to copy a WCS object - to a header as a primary WCS. It is used only to copy the - WCS of the 'SCI' extension to the headers of 'ERR', 'DQ', 'SDQ', - 'TIME' or 'SAMP' extensions. - """ - hwcs = w.to_header() - - if w.wcs.has_cd(): - wcsutil.pc2cd(hwcs) - for k in hwcs.keys(): - key = k[:7] - ehdr[key] = hwcs[k] - -def getNrefchip(fobj): - """ - - Finds which FITS extension holds the reference chip. - - The reference chip has EXTNAME='SCI', can be in any extension and - is instrument specific. This functions provides mappings between - sci extensions, chips and fits extensions. - In the case of a subarray when the reference chip is missing, the - first 'SCI' extension is the reference chip. - - Parameters - ---------- - fobj: `astropy.io.fits.HDUList` object - """ - nrefext = 1 - nrefchip = 1 - instrument = fobj[0].header['INSTRUME'] - - if instrument == 'WFPC2': - chipkw = 'DETECTOR' - extvers = [("SCI",img.header['EXTVER']) for img in - fobj[1:] if img.header['EXTNAME'].lower()=='sci'] - detectors = [img.header[chipkw] for img in fobj[1:] if - img.header['EXTNAME'].lower()=='sci'] - fitsext = [i for i in range(len(fobj))[1:] if - fobj[i].header['EXTNAME'].lower()=='sci'] - det2ext=dict(list(zip(detectors, extvers))) - ext2det=dict(list(zip(extvers, detectors))) - ext2fitsext=dict(list(zip(extvers, fitsext))) - - if 3 not in detectors: - nrefchip = ext2det.pop(extvers[0]) - nrefext = ext2fitsext.pop(extvers[0]) - else: - nrefchip = 3 - extname = det2ext.pop(nrefchip) - nrefext = ext2fitsext.pop(extname) - - elif (instrument == 'ACS' and fobj[0].header['DETECTOR'] == 'WFC') or \ - (instrument == 'WFC3' and fobj[0].header['DETECTOR'] == 'UVIS'): - chipkw = 'CCDCHIP' - extvers = [("SCI",img.header['EXTVER']) for img in - fobj[1:] if img.header['EXTNAME'].lower()=='sci'] - detectors = [img.header[chipkw] for img in fobj[1:] if - img.header['EXTNAME'].lower()=='sci'] - fitsext = [i for i in range(len(fobj))[1:] if - fobj[i].header['EXTNAME'].lower()=='sci'] - det2ext=dict(list(zip(detectors, extvers))) - ext2det=dict(list(zip(extvers, detectors))) - ext2fitsext=dict(list(zip(extvers, fitsext))) - - if 2 not in detectors: - nrefchip = ext2det.pop(extvers[0]) - nrefext = ext2fitsext.pop(extvers[0]) - else: - nrefchip = 2 - extname = det2ext.pop(nrefchip) - nrefext = ext2fitsext.pop(extname) - else: - for i in range(len(fobj)): - extname = fobj[i].header.get('EXTNAME', None) - if extname != None and extname.lower == 'sci': - nrefext = i - break - - return nrefchip, nrefext - -def checkFiles(input): - """ - Checks that input files are in the correct format. - Converts geis and waiver fits files to multiextension fits. - """ - from stsci.tools.check_files import geis2mef, waiver2mef, checkFiles - logger.info("\n\tChecking files %s" % input) - removed_files = [] - newfiles = [] - if not isinstance(input, list): - input=[input] - - for file in input: - try: - imgfits,imgtype = fileutil.isFits(file) - except IOError: - logger.warning( "\n\tFile %s could not be found, removing it from the input list.\n" %file) - removed_files.append(file) - continue - # Check for existence of waiver FITS input, and quit if found. - # Or should we print a warning and continue but not use that file - if imgfits: - if imgtype == 'waiver': - newfilename = waiver2mef(file, convert_dq=True) - if newfilename == None: - logger.warning("\n\tRemoving file %s from input list - could not convert waiver to mef" %file) - removed_files.append(file) - else: - newfiles.append(newfilename) - else: - newfiles.append(file) - - # If a GEIS image is provided as input, create a new MEF file with - # a name generated using 'buildFITSName()' - # Convert the corresponding data quality file if present - if not imgfits: - newfilename = geis2mef(file, convert_dq=True) - if newfilename == None: - logger.warning("\n\tRemoving file %s from input list - could not convert geis to mef" %file) - removed_files.append(file) - else: - newfiles.append(newfilename) - if removed_files: - logger.warning('\n\tThe following files will be removed from the list of files to be processed %s' % removed_files) - - newfiles = checkFiles(newfiles)[0] - logger.info("\n\tThese files passed the input check and will be processed: %s" % newfiles) - return newfiles - -def newIDCTAB(fname): - #When this is called we know there's a kw IDCTAB in the header - hdul = fits.open(fname) - idctab = fileutil.osfn(hdul[0].header['IDCTAB']) - try: - #check for the presence of IDCTAB in the first extension - oldidctab = fileutil.osfn(hdul[1].header['IDCTAB']) - except KeyError: - return False - if idctab == oldidctab: - return False - else: - return True - -def cleanWCS(fname): - # A new IDCTAB means all previously computed WCS's are invalid - # We are deleting all of them except the original OPUS WCS. - f = fits.open(fname, mode='update') - keys = wcsutil.wcskeys(f[1].header) - # Remove the primary WCS from the list - try: - keys.remove(' ') - except ValueError: - pass - fext = list(range(1, len(f))) - for key in keys: - try: - wcsutil.deleteWCS(fname, ext=fext, wcskey=key) - except KeyError: - # Some extensions don't have the alternate (or any) WCS keywords - continue - -def getCorrections(instrument): - """ - Print corrections available for an instrument - - :Parameters: - `instrument`: string, one of 'WFPC2', 'NICMOS', 'STIS', 'ACS', 'WFC3' - """ - acorr = apply_corrections.allowed_corrections[instrument] - - print("The following corrections will be performed for instrument %s\n" % instrument) - for c in acorr: print(c,': ' , apply_corrections.cnames[c]) diff --git a/lib/stwcs/updatewcs/apply_corrections.py b/lib/stwcs/updatewcs/apply_corrections.py deleted file mode 100644 index 86b623a..0000000 --- a/lib/stwcs/updatewcs/apply_corrections.py +++ /dev/null @@ -1,248 +0,0 @@ -from __future__ import division, print_function # confidence high - -import os -from astropy.io import fits -import time -from stsci.tools import fileutil -import os.path -from stwcs.wcsutil import altwcs -from . import utils -from . import wfpc2_dgeo - -import logging -logger = logging.getLogger("stwcs.updatewcs.apply_corrections") - -#Note: The order of corrections is important - - -# A dictionary which lists the allowed corrections for each instrument. -# These are the default corrections applied also in the pipeline. - -allowed_corrections={'WFPC2': ['DET2IMCorr', 'MakeWCS','CompSIP', 'VACorr'], - 'ACS': ['DET2IMCorr', 'TDDCorr', 'MakeWCS', 'CompSIP','VACorr', 'NPOLCorr'], - 'STIS': ['MakeWCS', 'CompSIP','VACorr'], - 'NICMOS': ['MakeWCS', 'CompSIP','VACorr'], - 'WFC3': ['DET2IMCorr', 'MakeWCS', 'CompSIP','VACorr', 'NPOLCorr'], - } - -cnames = {'DET2IMCorr': 'Detector to Image Correction', - 'TDDCorr': 'Time Dependent Distortion Correction', - 'MakeWCS': 'Recalculate basic WCS keywords based on the distortion model', - 'CompSIP': 'Given IDCTAB distortion model calculate the SIP coefficients', - 'VACorr': 'Velocity Aberration Correction', - 'NPOLCorr': 'Lookup Table Distortion' - } - -def setCorrections(fname, vacorr=True, tddcorr=True, npolcorr=True, d2imcorr=True): - """ - Creates a list of corrections to be applied to a file - based on user input paramters and allowed corrections - for the instrument. - """ - instrument = fits.getval(fname, 'INSTRUME') - # make a copy of this list ! - acorr = allowed_corrections[instrument][:] - - # For WFPC2 images, the old-style DGEOFILE needs to be - # converted on-the-fly into a proper D2IMFILE here... - if instrument == 'WFPC2': - # check for DGEOFILE, and convert it to D2IMFILE if found - d2imfile = wfpc2_dgeo.update_wfpc2_d2geofile(fname) - # Check if idctab is present on disk - # If kw IDCTAB is present in the header but the file is - # not found on disk, do not run TDDCorr, MakeCWS and CompSIP - if not foundIDCTAB(fname): - if 'TDDCorr' in acorr: acorr.remove('TDDCorr') - if 'MakeWCS' in acorr: acorr.remove('MakeWCS') - if 'CompSIP' in acorr: acorr.remove('CompSIP') - - if 'VACorr' in acorr and vacorr==False: acorr.remove('VACorr') - if 'TDDCorr' in acorr: - tddcorr = applyTDDCorr(fname, tddcorr) - if tddcorr == False: acorr.remove('TDDCorr') - - if 'NPOLCorr' in acorr: - npolcorr = applyNpolCorr(fname, npolcorr) - if npolcorr == False: acorr.remove('NPOLCorr') - if 'DET2IMCorr' in acorr: - d2imcorr = applyD2ImCorr(fname, d2imcorr) - if d2imcorr == False: acorr.remove('DET2IMCorr') - logger.info("\n\tCorrections to be applied to %s: %s " % (fname, str(acorr))) - return acorr - - -def foundIDCTAB(fname): - """ - This functions looks for an "IDCTAB" keyword in the primary header. - - Returns - ------- - status : bool - If False : MakeWCS, CompSIP and TDDCorr should not be applied. - If True : there's no restriction on corrections, they all should be applied. - - Raises - ------ - IOError : If IDCTAB file not found on disk. - """ - - try: - idctab = fits.getval(fname, 'IDCTAB').strip() - if idctab == 'N/A' or idctab == "": - return False - except KeyError: - return False - idctab = fileutil.osfn(idctab) - if os.path.exists(idctab): - return True - else: - raise IOError("IDCTAB file {0} not found".format(idctab)) - - -def applyTDDCorr(fname, utddcorr): - """ - The default value of tddcorr for all ACS images is True. - This correction will be performed if all conditions below are True: - - the user did not turn it off on the command line - - the detector is WFC - - the idc table specified in the primary header is available. - """ - - phdr = fits.getheader(fname) - instrument = phdr['INSTRUME'] - try: - detector = phdr['DETECTOR'] - except KeyError: - detector = None - try: - tddswitch = phdr['TDDCORR'] - except KeyError: - tddswitch = 'PERFORM' - - if instrument == 'ACS' and detector == 'WFC' and utddcorr == True and tddswitch == 'PERFORM': - tddcorr = True - try: - idctab = phdr['IDCTAB'] - except KeyError: - tddcorr = False - #print "***IDCTAB keyword not found - not applying TDD correction***\n" - if os.path.exists(fileutil.osfn(idctab)): - tddcorr = True - else: - tddcorr = False - #print "***IDCTAB file not found - not applying TDD correction***\n" - else: - tddcorr = False - return tddcorr - -def applyNpolCorr(fname, unpolcorr): - """ - Determines whether non-polynomial distortion lookup tables should be added - as extensions to the science file based on the 'NPOLFILE' keyword in the - primary header and NPOLEXT kw in the first extension. - This is a default correction and will always run in the pipeline. - The file used to generate the extensions is - recorded in the NPOLEXT keyword in the first science extension. - If 'NPOLFILE' in the primary header is different from 'NPOLEXT' in the - extension header and the file exists on disk and is a 'new type' npolfile, - then the lookup tables will be updated as 'WCSDVARR' extensions. - """ - applyNPOLCorr = True - try: - # get NPOLFILE kw from primary header - fnpol0 = fits.getval(fname, 'NPOLFILE') - if fnpol0 == 'N/A': - utils.remove_distortion(fname, "NPOLFILE") - return False - fnpol0 = fileutil.osfn(fnpol0) - if not fileutil.findFile(fnpol0): - msg = """\n\tKw "NPOLFILE" exists in primary header but file %s not found - Non-polynomial distortion correction will not be applied\n - """ % fnpol0 - logger.critical(msg) - raise IOError("NPOLFILE {0} not found".format(fnpol0)) - try: - # get NPOLEXT kw from first extension header - fnpol1 = fits.getval(fname, 'NPOLEXT', ext=1) - fnpol1 = fileutil.osfn(fnpol1) - if fnpol1 and fileutil.findFile(fnpol1): - if fnpol0 != fnpol1: - applyNPOLCorr = True - else: - msg = """\n\tNPOLEXT with the same value as NPOLFILE found in first extension. - NPOL correction will not be applied.""" - logger.info(msg) - applyNPOLCorr = False - else: - # npl file defined in first extension may not be found - # but if a valid kw exists in the primary header, non-polynomial - #distortion correction should be applied. - applyNPOLCorr = True - except KeyError: - # the case of "NPOLFILE" kw present in primary header but "NPOLEXT" missing - # in first extension header - applyNPOLCorr = True - except KeyError: - logger.info('\n\t"NPOLFILE" keyword not found in primary header') - applyNPOLCorr = False - return applyNPOLCorr - - if isOldStyleDGEO(fname, fnpol0): - applyNPOLCorr = False - return (applyNPOLCorr and unpolcorr) - -def isOldStyleDGEO(fname, dgname): - # checks if the file defined in a NPOLFILE kw is a full size - # (old style) image - - sci_hdr = fits.getheader(fname, ext=1) - dgeo_hdr = fits.getheader(dgname, ext=1) - sci_naxis1 = sci_hdr['NAXIS1'] - sci_naxis2 = sci_hdr['NAXIS2'] - dg_naxis1 = dgeo_hdr['NAXIS1'] - dg_naxis2 = dgeo_hdr['NAXIS2'] - if sci_naxis1 <= dg_naxis1 or sci_naxis2 <= dg_naxis2: - msg = """\n\tOnly full size (old style) DGEO file was found.\n - Non-polynomial distortion correction will not be applied.""" - logger.critical(msg) - return True - else: - return False - -def applyD2ImCorr(fname, d2imcorr): - applyD2IMCorr = True - try: - # get D2IMFILE kw from primary header - fd2im0 = fits.getval(fname, 'D2IMFILE') - if fd2im0 == 'N/A': - utils.remove_distortion(fname, "D2IMFILE") - return False - fd2im0 = fileutil.osfn(fd2im0) - if not fileutil.findFile(fd2im0): - msg = """\n\tKw D2IMFILE exists in primary header but file %s not found\n - Detector to image correction will not be applied\n""" % fd2im0 - logger.critical(msg) - print(msg) - raise IOError("D2IMFILE {0} not found".format(fd2im0)) - try: - # get D2IMEXT kw from first extension header - fd2imext = fits.getval(fname, 'D2IMEXT', ext=1) - fd2imext = fileutil.osfn(fd2imext) - if fd2imext and fileutil.findFile(fd2imext): - if fd2im0 != fd2imext: - applyD2IMCorr = True - else: - applyD2IMCorr = False - else: - # D2IM file defined in first extension may not be found - # but if a valid kw exists in the primary header, - # detector to image correction should be applied. - applyD2IMCorr = True - except KeyError: - # the case of D2IMFILE kw present in primary header but D2IMEXT missing - # in first extension header - applyD2IMCorr = True - except KeyError: - print('D2IMFILE keyword not found in primary header') - applyD2IMCorr = False - return applyD2IMCorr diff --git a/lib/stwcs/updatewcs/corrections.py b/lib/stwcs/updatewcs/corrections.py deleted file mode 100644 index d3641eb..0000000 --- a/lib/stwcs/updatewcs/corrections.py +++ /dev/null @@ -1,326 +0,0 @@ -from __future__ import division, print_function # confidence high - -import copy -import datetime -import logging, time -import numpy as np -from numpy import linalg -from stsci.tools import fileutil - -from . import npol -from . import makewcs -from .utils import diff_angles - -logger=logging.getLogger('stwcs.updatewcs.corrections') - -MakeWCS = makewcs.MakeWCS -NPOLCorr = npol.NPOLCorr - -class TDDCorr(object): - """ - Apply time dependent distortion correction to distortion coefficients and basic - WCS keywords. This correction **must** be done before any other WCS correction. - - Parameters - ---------- - ext_wcs: HSTWCS object - An HSTWCS object to be modified - ref_wcs: HSTWCS object - A reference HSTWCS object - - Notes - ----- - Compute the ACS/WFC time dependent distortion terms as described - in [1]_ and apply the correction to the WCS of the observation. - - The model coefficients are stored in the primary header of the IDCTAB. - :math:`D_{ref}` is the reference date. The computed corrections are saved - in the science extension header as TDDALPHA and TDDBETA keywords. - - .. math:: TDDALPHA = A_{0} + {A_{1}*(obsdate - D_{ref})} - - .. math:: TDDBETA = B_{0} + B_{1}*(obsdate - D_{ref}) - - - The time dependent distortion affects the IDCTAB coefficients, and the - relative location of the two chips. Because the linear order IDCTAB - coefficients ar eused in the computatuion of the NPOL extensions, - the TDD correction affects all components of the distortion model. - - Application of TDD to the IDCTAB polynomial coefficients: - The TDD model is computed in Jay's frame, while the IDCTAB coefficients - are in the HST V2/V3 frame. The coefficients are transformed to Jay's frame, - TDD is applied and they are transformed back to the V2/V3 frame. This - correction is performed in this class. - - Application of TDD to the relative location of the two chips is - done in `makewcs`. - - References - ---------- - .. [1] Jay Anderson, "Variation of the Distortion Solution of the WFC", ACS ISR 2007-08. - - """ - def updateWCS(cls, ext_wcs, ref_wcs): - """ - - Calculates alpha and beta for ACS/WFC data. - - Writes 2 new kw to the extension header: TDDALPHA and TDDBETA - """ - logger.info("\n\tStarting TDDCorr: %s" % time.asctime()) - ext_wcs.idcmodel.ocx = copy.deepcopy(ext_wcs.idcmodel.cx) - ext_wcs.idcmodel.ocy = copy.deepcopy(ext_wcs.idcmodel.cy) - - newkw = {'TDDALPHA': None, 'TDDBETA':None, - 'OCX10':ext_wcs.idcmodel.ocx[1,0], - 'OCX11':ext_wcs.idcmodel.ocx[1,1], - 'OCY10':ext_wcs.idcmodel.ocy[1,0], - 'OCY11':ext_wcs.idcmodel.ocy[1,1], - 'TDD_CTA':None, 'TDD_CTB':None, - 'TDD_CYA':None, 'TDD_CYB':None, - 'TDD_CXA':None, 'TDD_CXB':None} - - if ext_wcs.idcmodel.refpix['skew_coeffs']is not None and \ - ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CTB'] is not None: - cls.apply_tdd2idc2015(ref_wcs) - cls.apply_tdd2idc2015(ext_wcs) - - newkw.update({ - 'TDD_CTA':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CTA'], - 'TDD_CYA':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CYA'], - 'TDD_CXA':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CXA'], - 'TDD_CTB':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CTB'], - 'TDD_CYB':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CYB'], - 'TDD_CXB':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CXB']}) - - elif ext_wcs.idcmodel.refpix['skew_coeffs']is not None and \ - ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CY_BETA'] is not None: - logger.info("\n\t Applying 2014-calibrated TDD: %s" % time.asctime()) - # We have 2014-calibrated TDD, not J.A.-style TDD - cls.apply_tdd2idc2(ref_wcs) - cls.apply_tdd2idc2(ext_wcs) - newkw.update({'TDD_CYA':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CY_ALPHA'], - 'TDD_CYB':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CY_BETA'], - 'TDD_CXA':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CX_ALPHA'], - 'TDD_CXB':ext_wcs.idcmodel.refpix['skew_coeffs']['TDD_CX_BETA']}) - - else: - alpha, beta = cls.compute_alpha_beta(ext_wcs) - cls.apply_tdd2idc(ref_wcs, alpha, beta) - cls.apply_tdd2idc(ext_wcs, alpha, beta) - ext_wcs.idcmodel.refpix['TDDALPHA'] = alpha - ext_wcs.idcmodel.refpix['TDDBETA'] = beta - ref_wcs.idcmodel.refpix['TDDALPHA'] = alpha - ref_wcs.idcmodel.refpix['TDDBETA'] = beta - - newkw.update( {'TDDALPHA': alpha, 'TDDBETA':beta} ) - - return newkw - updateWCS = classmethod(updateWCS) - - def apply_tdd2idc2015(cls, hwcs): - """ Applies 2015-calibrated TDD correction to a couple of IDCTAB - coefficients for ACS/WFC observations. - """ - if not isinstance(hwcs.date_obs,float): - year,month,day = hwcs.date_obs.split('-') - rdate = datetime.datetime(int(year),int(month),int(day)) - rday = float(rdate.strftime("%j"))/365.25 + rdate.year - else: - rday = hwcs.date_obs - - skew_coeffs = hwcs.idcmodel.refpix['skew_coeffs'] - delta_date = rday - skew_coeffs['TDD_DATE'] - - if skew_coeffs['TDD_CXB'] is not None: - hwcs.idcmodel.cx[1,1] += skew_coeffs['TDD_CXB']*delta_date - if skew_coeffs['TDD_CTB'] is not None: - hwcs.idcmodel.cy[1,1] += skew_coeffs['TDD_CTB']*delta_date - if skew_coeffs['TDD_CYB'] is not None: - hwcs.idcmodel.cy[1,0] += skew_coeffs['TDD_CYB']*delta_date - #print("CX[1,1]_TDD={}, CY[1,1]_TDD={}, CY[1,0]_TDD={}".format(hwcs.idcmodel.cx[1,1],hwcs.idcmodel.cy[1,1],hwcs.idcmodel.cy[1,0])) - - apply_tdd2idc2015 = classmethod(apply_tdd2idc2015) - - def apply_tdd2idc2(cls, hwcs): - """ Applies 2014-calibrated TDD correction to single IDCTAB coefficient - of an ACS/WFC observation. - """ - if not isinstance(hwcs.date_obs,float): - year,month,day = hwcs.date_obs.split('-') - rdate = datetime.datetime(int(year),int(month),int(day)) - rday = float(rdate.strftime("%j"))/365.25 + rdate.year - else: - rday = hwcs.date_obs - - skew_coeffs = hwcs.idcmodel.refpix['skew_coeffs'] - cy_beta = skew_coeffs['TDD_CY_BETA'] - cy_alpha = skew_coeffs['TDD_CY_ALPHA'] - delta_date = rday - skew_coeffs['TDD_DATE'] - print("DELTA_DATE: {0} based on rday: {1}, TDD_DATE: {2}".format(delta_date,rday,skew_coeffs['TDD_DATE'])) - - if cy_alpha is None: - hwcs.idcmodel.cy[1,1] += cy_beta*delta_date - else: - new_beta = cy_alpha + cy_beta*delta_date - hwcs.idcmodel.cy[1,1] = new_beta - print("CY11: {0} based on alpha: {1}, beta: {2}".format(hwcs.idcmodel.cy[1,1],cy_alpha,cy_beta)) - - cx_beta = skew_coeffs['TDD_CX_BETA'] - cx_alpha = skew_coeffs['TDD_CX_ALPHA'] - if cx_alpha is not None: - new_beta = cx_alpha + cx_beta*delta_date - hwcs.idcmodel.cx[1,1] = new_beta - print("CX11: {0} based on alpha: {1}, beta: {2}".format(new_beta,cx_alpha,cx_beta)) - - apply_tdd2idc2 = classmethod(apply_tdd2idc2) - - def apply_tdd2idc(cls, hwcs, alpha, beta): - """ - Applies TDD to the idctab coefficients of a ACS/WFC observation. - This should be always the first correction. - """ - theta_v2v3 = 2.234529 - mrotp = fileutil.buildRotMatrix(theta_v2v3) - mrotn = fileutil.buildRotMatrix(-theta_v2v3) - tdd_mat = np.array([[1+(beta/2048.), alpha/2048.],[alpha/2048.,1-(beta/2048.)]],np.float64) - abmat1 = np.dot(tdd_mat, mrotn) - abmat2 = np.dot(mrotp,abmat1) - xshape, yshape = hwcs.idcmodel.cx.shape, hwcs.idcmodel.cy.shape - icxy = np.dot(abmat2,[hwcs.idcmodel.cx.ravel(), hwcs.idcmodel.cy.ravel()]) - hwcs.idcmodel.cx = icxy[0] - hwcs.idcmodel.cy = icxy[1] - hwcs.idcmodel.cx.shape = xshape - hwcs.idcmodel.cy.shape = yshape - - apply_tdd2idc = classmethod(apply_tdd2idc) - - def compute_alpha_beta(cls, ext_wcs): - """ - Compute the ACS time dependent distortion skew terms - as described in ACS ISR 07-08 by J. Anderson. - - Jay's code only computes the alpha/beta values based on a decimal year - with only 3 digits, so this line reproduces that when needed for comparison - with his results. - rday = float(('%0.3f')%rday) - - The zero-point terms account for the skew accumulated between - 2002.0 and 2004.5, when the latest IDCTAB was delivered. - alpha = 0.095 + 0.090*(rday-dday)/2.5 - beta = -0.029 - 0.030*(rday-dday)/2.5 - """ - if not isinstance(ext_wcs.date_obs,float): - year,month,day = ext_wcs.date_obs.split('-') - rdate = datetime.datetime(int(year),int(month),int(day)) - rday = float(rdate.strftime("%j"))/365.25 + rdate.year - else: - rday = ext_wcs.date_obs - - skew_coeffs = ext_wcs.idcmodel.refpix['skew_coeffs'] - if skew_coeffs is None: - # Only print out warning for post-SM4 data where this may matter - if rday > 2009.0: - err_str = "------------------------------------------------------------------------ \n" - err_str += "WARNING: the IDCTAB geometric distortion file specified in the image \n" - err_str += " header did not have the time-dependent distortion coefficients. \n" - err_str += " The pre-SM4 time-dependent skew solution will be used by default.\n" - err_str += " Please update IDCTAB with new reference file from HST archive. \n" - err_str += "------------------------------------------------------------------------ \n" - print(err_str) - # Using default pre-SM4 coefficients - skew_coeffs = {'TDD_A':[0.095,0.090/2.5], - 'TDD_B':[-0.029,-0.030/2.5], - 'TDD_DATE':2004.5,'TDDORDER':1} - - alpha = 0 - beta = 0 - # Compute skew terms, allowing for non-linear coefficients as well - for c in range(skew_coeffs['TDDORDER']+1): - alpha += skew_coeffs['TDD_A'][c]* np.power((rday-skew_coeffs['TDD_DATE']),c) - beta += skew_coeffs['TDD_B'][c]*np.power((rday-skew_coeffs['TDD_DATE']),c) - - return alpha,beta - compute_alpha_beta = classmethod(compute_alpha_beta) - - -class VACorr(object): - """ - Apply velocity aberation correction to WCS keywords. - - Notes - ----- - Velocity Aberration is stored in the extension header keyword 'VAFACTOR'. - The correction is applied to the CD matrix and CRVALs. - - """ - def updateWCS(cls, ext_wcs, ref_wcs): - logger.info("\n\tStarting VACorr: %s" % time.asctime()) - if ext_wcs.vafactor != 1: - ext_wcs.wcs.cd = ext_wcs.wcs.cd * ext_wcs.vafactor - crval0 = ref_wcs.wcs.crval[0] + ext_wcs.vafactor*diff_angles(ext_wcs.wcs.crval[0], - ref_wcs.wcs.crval[0]) - crval1 = ref_wcs.wcs.crval[1] + ext_wcs.vafactor*diff_angles(ext_wcs.wcs.crval[1], - ref_wcs.wcs.crval[1]) - crval = np.array([crval0,crval1]) - ext_wcs.wcs.crval = crval - ext_wcs.wcs.set() - else: - pass - kw2update={'CD1_1': ext_wcs.wcs.cd[0,0], 'CD1_2':ext_wcs.wcs.cd[0,1], - 'CD2_1':ext_wcs.wcs.cd[1,0], 'CD2_2':ext_wcs.wcs.cd[1,1], - 'CRVAL1':ext_wcs.wcs.crval[0], 'CRVAL2':ext_wcs.wcs.crval[1]} - return kw2update - - updateWCS = classmethod(updateWCS) - - -class CompSIP(object): - """ - Compute Simple Imaging Polynomial (SIP) coefficients as defined in [2]_ - from IDC table coefficients. - - This class transforms the TDD corrected IDCTAB coefficients into SIP format. - It also applies a binning factor to the coefficients if the observation was - binned. - - References - ---------- - .. [2] David Shupe, et al, "The SIP Convention of representing Distortion - in FITS Image headers", Astronomical Data Analysis Software And Systems, ASP - Conference Series, Vol. 347, 2005 - - """ - def updateWCS(cls, ext_wcs, ref_wcs): - logger.info("\n\tStarting CompSIP: %s" %time.asctime()) - kw2update = {} - if not ext_wcs.idcmodel: - logger.info("IDC model not found, SIP coefficient will not be computed") - return kw2update - order = ext_wcs.idcmodel.norder - kw2update['A_ORDER'] = order - kw2update['B_ORDER'] = order - pscale = ext_wcs.idcmodel.refpix['PSCALE'] - - cx = ext_wcs.idcmodel.cx - cy = ext_wcs.idcmodel.cy - - matr = np.array([[cx[1,1],cx[1,0]], [cy[1,1],cy[1,0]]], dtype=np.float64) - imatr = linalg.inv(matr) - akeys1 = np.zeros((order+1,order+1), dtype=np.float64) - bkeys1 = np.zeros((order+1,order+1), dtype=np.float64) - for n in range(order+1): - for m in range(order+1): - if n >= m and n>=2: - idcval = np.array([[cx[n,m]],[cy[n,m]]]) - sipval = np.dot(imatr, idcval) - akeys1[m,n-m] = sipval[0] - bkeys1[m,n-m] = sipval[1] - Akey="A_%d_%d" % (m,n-m) - Bkey="B_%d_%d" % (m,n-m) - kw2update[Akey] = sipval[0,0] * ext_wcs.binned - kw2update[Bkey] = sipval[1,0] * ext_wcs.binned - kw2update['CTYPE1'] = 'RA---TAN-SIP' - kw2update['CTYPE2'] = 'DEC--TAN-SIP' - return kw2update - - updateWCS = classmethod(updateWCS) diff --git a/lib/stwcs/updatewcs/det2im.py b/lib/stwcs/updatewcs/det2im.py deleted file mode 100644 index bddb37b..0000000 --- a/lib/stwcs/updatewcs/det2im.py +++ /dev/null @@ -1,299 +0,0 @@ -from __future__ import absolute_import, division # confidence high - -import numpy as np -from astropy.io import fits -from stsci.tools import fileutil - -from . import utils - -import logging, time -logger = logging.getLogger('stwcs.updatewcs.d2im') - -class DET2IMCorr(object): - """ - Defines a Lookup table prior distortion correction as per WCS paper IV. - It uses a reference file defined by the D2IMFILE (suffix 'd2im') keyword - in the primary header. - - Notes - ----- - - Using extensions in the reference file create a WCSDVARR extensions - and add them to the science file. - - Add record-valued keywords to the science extension header to describe - the lookup tables. - - Add a keyword 'D2IMEXT' to the science extension header to store - the name of the reference file used to create the WCSDVARR extensions. - - If WCSDVARR extensions exist and `D2IMFILE` is different from `D2IMEXT`, - a subsequent update will overwrite the existing extensions. - If WCSDVARR extensions were not found in the science file, they will be added. - - """ - - def updateWCS(cls, fobj): - """ - Parameters - ---------- - fobj: `astropy.io.fits.HDUList` object - Science file, for which a distortion correction in a NPOLFILE is available - - """ - logger.info("\n\tStarting DET2IM: %s" %time.asctime()) - try: - assert isinstance(fobj, fits.HDUList) - except AssertionError: - logger.exception('\n\tInput must be a fits.HDUList object') - raise - - cls.applyDet2ImCorr(fobj) - d2imfile = fobj[0].header['D2IMFILE'] - - new_kw = {'D2IMEXT': d2imfile} - return new_kw - - updateWCS = classmethod(updateWCS) - - def applyDet2ImCorr(cls, fobj): - """ - For each science extension in a fits file object: - - create a WCSDVARR extension - - update science header - - add/update D2IMEXT keyword - """ - d2imfile = fileutil.osfn(fobj[0].header['D2IMFILE']) - # Map D2IMARR EXTVER numbers to FITS extension numbers - wcsdvarr_ind = cls.getWCSIndex(fobj) - d2im_num_ext = 1 - for ext in fobj: - try: - extname = ext.header['EXTNAME'].lower() - except KeyError: - continue - if extname == 'sci': - extversion = ext.header['EXTVER'] - ccdchip = cls.get_ccdchip(fobj, extname='SCI', extver=extversion) - header = ext.header - # get the data arrays from the reference file - dx, dy = cls.getData(d2imfile, ccdchip) - # Determine EXTVER for the D2IMARR extension from the D2I file (EXTNAME, EXTVER) kw. - # This is used to populate DPj.EXTVER kw - for ename in zip(['DX', 'DY'], [dx, dy]): - if ename[1] is not None: - error_val = ename[1].max() - cls.addSciExtKw(header, wdvarr_ver=d2im_num_ext, d2im_extname=ename[0], error_val=error_val) - hdu = cls.createD2ImHDU(header, d2imfile=d2imfile, - wdvarr_ver=d2im_num_ext, - d2im_extname=ename[0], - data=ename[1],ccdchip=ccdchip) - if wcsdvarr_ind and d2im_num_ext in wcsdvarr_ind: - fobj[wcsdvarr_ind[d2im_num_ext]] = hdu - else: - fobj.append(hdu) - d2im_num_ext = d2im_num_ext + 1 - applyDet2ImCorr = classmethod(applyDet2ImCorr) - - def getWCSIndex(cls, fobj): - - """ - If fobj has WCSDVARR extensions: - returns a mapping of their EXTVER kw to file object extension numbers - if fobj does not have WCSDVARR extensions: - an empty dictionary is returned - """ - wcsd = {} - for e in range(len(fobj)): - try: - ename = fobj[e].header['EXTNAME'] - except KeyError: - continue - if ename == 'D2IMARR': - wcsd[fobj[e].header['EXTVER']] = e - logger.debug("A map of D2IMARR extensions %s" % wcsd) - return wcsd - - getWCSIndex = classmethod(getWCSIndex) - - def addSciExtKw(cls, hdr, wdvarr_ver=None, d2im_extname=None, error_val=0.0): - """ - Adds kw to sci extension to define WCSDVARR lookup table extensions - - """ - if d2im_extname =='DX': - j=1 - else: - j=2 - - d2imerror = 'D2IMERR%s' %j - d2imdis = 'D2IMDIS%s' %j - d2imext = 'D2IM%s.' %j + 'EXTVER' - d2imnaxes = 'D2IM%s.' %j +'NAXES' - d2imaxis1 = 'D2IM%s.' %j+'AXIS.1' - d2imaxis2 = 'D2IM%s.' %j+'AXIS.2' - keys = [d2imerror, d2imdis, d2imext, d2imnaxes, d2imaxis1, d2imaxis2] - values = {d2imerror: error_val, - d2imdis: 'Lookup', - d2imext: wdvarr_ver, - d2imnaxes: 2, - d2imaxis1: 1, - d2imaxis2: 2} - - comments = {d2imerror: 'Maximum error of NPOL correction for axis %s' % j, - d2imdis: 'Detector to image correction type', - d2imext: 'Version number of WCSDVARR extension containing d2im lookup table', - d2imnaxes: 'Number of independent variables in d2im function', - d2imaxis1: 'Axis number of the jth independent variable in a d2im function', - d2imaxis2: 'Axis number of the jth independent variable in a d2im function' - } - # Look for HISTORY keywords. If present, insert new keywords before them - before_key = 'HISTORY' - if before_key not in hdr: - before_key = None - - for key in keys: - hdr.set(key, value=values[key], comment=comments[key], before=before_key) - - addSciExtKw = classmethod(addSciExtKw) - - def getData(cls,d2imfile, ccdchip): - """ - Get the data arrays from the reference D2I files - Make sure 'CCDCHIP' in the npolfile matches "CCDCHIP' in the science file. - """ - xdata, ydata = (None, None) - d2im = fits.open(d2imfile) - for ext in d2im: - d2imextname = ext.header.get('EXTNAME',"") - d2imccdchip = ext.header.get('CCDCHIP',1) - if d2imextname == 'DX' and d2imccdchip == ccdchip: - xdata = ext.data.copy() - continue - elif d2imextname == 'DY' and d2imccdchip == ccdchip: - ydata = ext.data.copy() - continue - else: - continue - d2im.close() - return xdata, ydata - getData = classmethod(getData) - - def createD2ImHDU(cls, sciheader, d2imfile=None, wdvarr_ver=1, - d2im_extname=None,data = None, ccdchip=1): - """ - Creates an HDU to be added to the file object. - """ - hdr = cls.createD2ImHdr(sciheader, d2imfile=d2imfile, - wdvarr_ver=wdvarr_ver, d2im_extname=d2im_extname, - ccdchip=ccdchip) - hdu = fits.ImageHDU(header=hdr, data=data) - return hdu - - createD2ImHDU = classmethod(createD2ImHDU) - - def createD2ImHdr(cls, sciheader, d2imfile, wdvarr_ver, d2im_extname, ccdchip): - """ - Creates a header for the D2IMARR extension based on the D2I reference file - and sci extension header. The goal is to always work in image coordinates - (also for subarrays and binned images). The WCS for the D2IMARR extension - is such that a full size d2im table is created and then shifted or scaled - if the science image is a subarray or binned image. - """ - d2im = fits.open(d2imfile) - d2im_phdr = d2im[0].header - for ext in d2im: - try: - d2imextname = ext.header['EXTNAME'] - d2imextver = ext.header['EXTVER'] - except KeyError: - continue - d2imccdchip = cls.get_ccdchip(d2im, extname=d2imextname, extver=d2imextver) - if d2imextname == d2im_extname and d2imccdchip == ccdchip: - d2im_header = ext.header - break - else: - continue - d2im.close() - - naxis = d2im[1].header['NAXIS'] - ccdchip = d2imextname - - kw = { 'NAXIS': 'Size of the axis', - 'CDELT': 'Coordinate increment along axis', - 'CRPIX': 'Coordinate system reference pixel', - 'CRVAL': 'Coordinate system value at reference pixel', - } - - kw_comm1 = {} - kw_val1 = {} - for key in kw.keys(): - for i in range(1, naxis+1): - si = str(i) - kw_comm1[key+si] = kw[key] - - for i in range(1, naxis+1): - si = str(i) - kw_val1['NAXIS'+si] = d2im_header.get('NAXIS'+si) - kw_val1['CDELT'+si] = d2im_header.get('CDELT'+si, 1.0) * sciheader.get('LTM'+si+'_'+si, 1) - kw_val1['CRPIX'+si] = d2im_header.get('CRPIX'+si, 0.0) - kw_val1['CRVAL'+si] = (d2im_header.get('CRVAL'+si, 0.0) - \ - sciheader.get('LTV'+str(i), 0)) - kw_comm0 = {'XTENSION': 'Image extension', - 'BITPIX': 'IEEE floating point', - 'NAXIS': 'Number of axes', - 'EXTNAME': 'WCS distortion array', - 'EXTVER': 'Distortion array version number', - 'PCOUNT': 'Special data area of size 0', - 'GCOUNT': 'One data group', - } - - kw_val0 = { 'XTENSION': 'IMAGE', - 'BITPIX': -32, - 'NAXIS': naxis, - 'EXTNAME': 'D2IMARR', - 'EXTVER': wdvarr_ver, - 'PCOUNT': 0, - 'GCOUNT': 1, - 'CCDCHIP': ccdchip, - } - cdl = [] - for key in kw_comm0.keys(): - cdl.append((key, kw_val0[key], kw_comm0[key])) - for key in kw_comm1.keys(): - cdl.append((key, kw_val1[key], kw_comm1[key])) - # Now add keywords from NPOLFILE header to document source of calibration - # include all keywords after and including 'FILENAME' from header - start_indx = -1 - end_indx = 0 - for i, c in enumerate(d2im_phdr): - if c == 'FILENAME': - start_indx = i - if c == '': # remove blanks from end of header - end_indx = i+1 - break - if start_indx >= 0: - for card in d2im_phdr.cards[start_indx:end_indx]: - cdl.append(card) - - hdr = fits.Header(cards=cdl) - - return hdr - - createD2ImHdr = classmethod(createD2ImHdr) - - def get_ccdchip(cls, fobj, extname, extver): - """ - Given a science file or npol file determine CCDCHIP - """ - ccdchip = 1 - if fobj[0].header['INSTRUME'] == 'ACS' and fobj[0].header['DETECTOR'] == 'WFC': - ccdchip = fobj[extname, extver].header['CCDCHIP'] - elif fobj[0].header['INSTRUME'] == 'WFC3' and fobj[0].header['DETECTOR'] == 'UVIS': - ccdchip = fobj[extname, extver].header['CCDCHIP'] - elif fobj[0].header['INSTRUME'] == 'WFPC2': - ccdchip = fobj[extname, extver].header['DETECTOR'] - elif fobj[0].header['INSTRUME'] == 'STIS': - ccdchip = fobj[extname, extver].header['DETECTOR'] - elif fobj[0].header['INSTRUME'] == 'NICMOS': - ccdchip = fobj[extname, extver].header['CAMERA'] - return ccdchip - - get_ccdchip = classmethod(get_ccdchip) diff --git a/lib/stwcs/updatewcs/makewcs.py b/lib/stwcs/updatewcs/makewcs.py deleted file mode 100644 index 06c6f9c..0000000 --- a/lib/stwcs/updatewcs/makewcs.py +++ /dev/null @@ -1,273 +0,0 @@ -from __future__ import absolute_import, division # confidence high - -import datetime - -import numpy as np -import logging, time -from math import sin, sqrt, pow, cos, asin, atan2,pi - -from stsci.tools import fileutil -from . import utils - -logger = logging.getLogger(__name__) - -class MakeWCS(object): - """ - Recompute basic WCS keywords based on PA_V3 and distortion model. - - Notes - ----- - - Compute the reference chip WCS: - - -- CRVAL: transform the model XREF/YREF to the sky - -- PA_V3 is calculated at the target position and adjusted - for each chip orientation - -- CD: PA_V3 and the model scale are used to cnstruct a CD matrix - - - Compute the second chip WCS: - - -- CRVAL: - the distance between the zero points of the two - chip models on the sky - -- CD matrix: first order coefficients are added to the components - of this distance and transfered on the sky. The difference - between CRVAL and these vectors is the new CD matrix for each chip. - -- CRPIX: chip's model zero point in pixel space (XREF/YREF) - - - Time dependent distortion correction is applied to both chips' V2/V3 values. - - """ - tdd_xyref = {1: [2048, 3072], 2:[2048, 1024]} - def updateWCS(cls, ext_wcs, ref_wcs): - """ - recomputes the basic WCS kw - """ - logger.info("\n\tStarting MakeWCS: %s" % time.asctime()) - if not ext_wcs.idcmodel: - logger.info("IDC model not found, turning off Makewcs") - return {} - ltvoff, offshift = cls.getOffsets(ext_wcs) - - v23_corr = cls.zero_point_corr(ext_wcs) - rv23_corr = cls.zero_point_corr(ref_wcs) - - cls.uprefwcs(ext_wcs, ref_wcs, rv23_corr, ltvoff, offshift) - cls.upextwcs(ext_wcs, ref_wcs, v23_corr, rv23_corr, ltvoff, offshift) - - kw2update = {'CD1_1': ext_wcs.wcs.cd[0,0], - 'CD1_2': ext_wcs.wcs.cd[0,1], - 'CD2_1': ext_wcs.wcs.cd[1,0], - 'CD2_2': ext_wcs.wcs.cd[1,1], - 'CRVAL1': ext_wcs.wcs.crval[0], - 'CRVAL2': ext_wcs.wcs.crval[1], - 'CRPIX1': ext_wcs.wcs.crpix[0], - 'CRPIX2': ext_wcs.wcs.crpix[1], - 'IDCTAB': ext_wcs.idctab, - 'OCX10' : ext_wcs.idcmodel.cx[1,0], - 'OCX11' : ext_wcs.idcmodel.cx[1,1], - 'OCY10' : ext_wcs.idcmodel.cy[1,0], - 'OCY11' : ext_wcs.idcmodel.cy[1,1] - } - return kw2update - - updateWCS = classmethod(updateWCS) - - def upextwcs(cls, ext_wcs, ref_wcs, v23_corr, rv23_corr, loff, offsh): - """ - updates an extension wcs - """ - ltvoffx, ltvoffy = loff[0], loff[1] - offshiftx, offshifty = offsh[0], offsh[1] - ltv1 = ext_wcs.ltv1 - ltv2 = ext_wcs.ltv2 - if ltv1 != 0. or ltv2 != 0.: - offsetx = ext_wcs.wcs.crpix[0] - ltv1 - ext_wcs.idcmodel.refpix['XREF'] - offsety = ext_wcs.wcs.crpix[1] - ltv2 - ext_wcs.idcmodel.refpix['YREF'] - ext_wcs.idcmodel.shift(offsetx,offsety) - - fx, fy = ext_wcs.idcmodel.cx, ext_wcs.idcmodel.cy - - ext_wcs.setPscale() - tddscale = (ref_wcs.pscale/fx[1,1]) - v2 = ext_wcs.idcmodel.refpix['V2REF'] + v23_corr[0,0] * tddscale - v3 = ext_wcs.idcmodel.refpix['V3REF'] - v23_corr[1,0] * tddscale - v2ref = ref_wcs.idcmodel.refpix['V2REF'] + rv23_corr[0,0] * tddscale - v3ref = ref_wcs.idcmodel.refpix['V3REF'] - rv23_corr[1,0] * tddscale - - R_scale = ref_wcs.idcmodel.refpix['PSCALE']/3600.0 - off = sqrt((v2-v2ref)**2 + (v3-v3ref)**2)/(R_scale*3600.0) - - if v3 == v3ref: - theta=0.0 - else: - theta = atan2(ext_wcs.parity[0][0]*(v2-v2ref), ext_wcs.parity[1][1]*(v3-v3ref)) - - if ref_wcs.idcmodel.refpix['THETA']: theta += ref_wcs.idcmodel.refpix['THETA']*pi/180.0 - - dX=(off*sin(theta)) + offshiftx - dY=(off*cos(theta)) + offshifty - - px = np.array([[dX,dY]]) - newcrval = ref_wcs.wcs.p2s(px, 1)['world'][0] - newcrpix = np.array([ext_wcs.idcmodel.refpix['XREF'] + ltvoffx, - ext_wcs.idcmodel.refpix['YREF'] + ltvoffy]) - ext_wcs.wcs.crval = newcrval - ext_wcs.wcs.crpix = newcrpix - ext_wcs.wcs.set() - - # Create a small vector, in reference image pixel scale - delmat = np.array([[fx[1,1], fy[1,1]], \ - [fx[1,0], fy[1,0]]]) / R_scale/3600. - - # Account for subarray offset - # Angle of chip relative to chip - if ext_wcs.idcmodel.refpix['THETA']: - dtheta = ext_wcs.idcmodel.refpix['THETA'] - ref_wcs.idcmodel.refpix['THETA'] - else: - dtheta = 0.0 - - rrmat = fileutil.buildRotMatrix(dtheta) - # Rotate the vectors - dxy = np.dot(delmat, rrmat) - wc = ref_wcs.wcs.p2s((px + dxy), 1)['world'] - - # Calculate the new CDs and convert to degrees - cd11 = utils.diff_angles(wc[0,0],newcrval[0])*cos(newcrval[1]*pi/180.0) - cd12 = utils.diff_angles(wc[1,0],newcrval[0])*cos(newcrval[1]*pi/180.0) - cd21 = utils.diff_angles(wc[0,1],newcrval[1]) - cd22 = utils.diff_angles(wc[1,1],newcrval[1]) - cd = np.array([[cd11, cd12], [cd21, cd22]]) - ext_wcs.wcs.cd = cd - ext_wcs.wcs.set() - - upextwcs = classmethod(upextwcs) - - def uprefwcs(cls, ext_wcs, ref_wcs, rv23_corr_tdd, loff, offsh): - """ - Updates the reference chip - """ - ltvoffx, ltvoffy = loff[0], loff[1] - ltv1 = ref_wcs.ltv1 - ltv2 = ref_wcs.ltv2 - if ref_wcs.ltv1 != 0. or ref_wcs.ltv2 != 0.: - offsetx = ref_wcs.wcs.crpix[0] - ltv1 - ref_wcs.idcmodel.refpix['XREF'] - offsety = ref_wcs.wcs.crpix[1] - ltv2 - ref_wcs.idcmodel.refpix['YREF'] - ref_wcs.idcmodel.shift(offsetx,offsety) - - rfx, rfy = ref_wcs.idcmodel.cx, ref_wcs.idcmodel.cy - - offshift = offsh - dec = ref_wcs.wcs.crval[1] - tddscale = (ref_wcs.pscale/ref_wcs.idcmodel.cx[1,1]) - rv23 = [ref_wcs.idcmodel.refpix['V2REF'] + (rv23_corr_tdd[0,0] *tddscale), - ref_wcs.idcmodel.refpix['V3REF'] - (rv23_corr_tdd[1,0] * tddscale)] - # Get an approximate reference position on the sky - rref = np.array([[ref_wcs.idcmodel.refpix['XREF']+ltvoffx , - ref_wcs.idcmodel.refpix['YREF']+ltvoffy]]) - - crval = ref_wcs.wcs.p2s(rref, 1)['world'][0] - # Convert the PA_V3 orientation to the orientation at the aperture - # This is for the reference chip only - we use this for the - # reference tangent plane definition - # It has the same orientation as the reference chip - pv = troll(ext_wcs.pav3,dec,rv23[0], rv23[1]) - # Add the chip rotation angle - if ref_wcs.idcmodel.refpix['THETA']: - pv += ref_wcs.idcmodel.refpix['THETA'] - - - # Set values for the rest of the reference WCS - ref_wcs.wcs.crval = crval - ref_wcs.wcs.crpix = np.array([0.0,0.0])+offsh - parity = ref_wcs.parity - R_scale = ref_wcs.idcmodel.refpix['PSCALE']/3600.0 - cd11 = parity[0][0] * cos(pv*pi/180.0)*R_scale - cd12 = parity[0][0] * -sin(pv*pi/180.0)*R_scale - cd21 = parity[1][1] * sin(pv*pi/180.0)*R_scale - cd22 = parity[1][1] * cos(pv*pi/180.0)*R_scale - - rcd = np.array([[cd11, cd12], [cd21, cd22]]) - ref_wcs.wcs.cd = rcd - ref_wcs.wcs.set() - - uprefwcs = classmethod(uprefwcs) - - def zero_point_corr(cls,hwcs): - if hwcs.idcmodel.refpix['skew_coeffs'] is not None and 'TDD_CY_BETA' in hwcs.idcmodel.refpix['skew_coeffs']: - v23_corr = np.array([[0.],[0.]]) - return v23_corr - else: - try: - alpha = hwcs.idcmodel.refpix['TDDALPHA'] - beta = hwcs.idcmodel.refpix['TDDBETA'] - except KeyError: - alpha = 0.0 - beta = 0.0 - v23_corr = np.array([[0.],[0.]]) - logger.debug("\n\tTDD Zero point correction for chip %s defaulted to: %s" % (hwcs.chip, v23_corr)) - return v23_corr - - tdd = np.array([[beta, alpha], [alpha, -beta]]) - mrotp = fileutil.buildRotMatrix(2.234529)/2048. - xy0 = np.array([[cls.tdd_xyref[hwcs.chip][0]-2048.], [cls.tdd_xyref[hwcs.chip][1]-2048.]]) - v23_corr = np.dot(mrotp,np.dot(tdd,xy0)) * 0.05 - logger.debug("\n\tTDD Zero point correction for chip %s: %s" % (hwcs.chip, v23_corr)) - return v23_corr - - zero_point_corr = classmethod(zero_point_corr) - - def getOffsets(cls, ext_wcs): - ltv1 = ext_wcs.ltv1 - ltv2 = ext_wcs.ltv2 - - offsetx = ext_wcs.wcs.crpix[0] - ltv1 - ext_wcs.idcmodel.refpix['XREF'] - offsety = ext_wcs.wcs.crpix[1] - ltv2 - ext_wcs.idcmodel.refpix['YREF'] - - shiftx = ext_wcs.idcmodel.refpix['XREF'] + ltv1 - shifty = ext_wcs.idcmodel.refpix['YREF'] + ltv2 - if ltv1 != 0. or ltv2 != 0.: - ltvoffx = ltv1 + offsetx - ltvoffy = ltv2 + offsety - offshiftx = offsetx + shiftx - offshifty = offsety + shifty - else: - ltvoffx = 0. - ltvoffy = 0. - offshiftx = 0. - offshifty = 0. - - ltvoff = np.array([ltvoffx, ltvoffy]) - offshift = np.array([offshiftx, offshifty]) - return ltvoff, offshift - - getOffsets = classmethod(getOffsets) - - -def troll(roll, dec, v2, v3): - """ Computes the roll angle at the target position based on: - the roll angle at the V1 axis(roll), - the dec of the target(dec), and - the V2/V3 position of the aperture (v2,v3) in arcseconds. - - Based on algorithm provided by Colin Cox and used in - Generic Conversion at STScI. - """ - # Convert all angles to radians - _roll = np.deg2rad(roll) - _dec = np.deg2rad(dec) - _v2 = np.deg2rad(v2 / 3600.) - _v3 = np.deg2rad(v3 / 3600.) - - # compute components - sin_rho = sqrt((pow(sin(_v2),2)+pow(sin(_v3),2)) - (pow(sin(_v2),2)*pow(sin(_v3),2))) - rho = asin(sin_rho) - beta = asin(sin(_v3)/sin_rho) - if _v2 < 0: beta = pi - beta - gamma = asin(sin(_v2)/sin_rho) - if _v3 < 0: gamma = pi - gamma - A = pi/2. + _roll - beta - B = atan2( sin(A)*cos(_dec), (sin(_dec)*sin_rho - cos(_dec)*cos(rho)*cos(A))) - - # compute final value - troll = np.rad2deg(pi - (gamma+B)) - - return troll diff --git a/lib/stwcs/updatewcs/npol.py b/lib/stwcs/updatewcs/npol.py deleted file mode 100644 index 33579f0..0000000 --- a/lib/stwcs/updatewcs/npol.py +++ /dev/null @@ -1,343 +0,0 @@ -from __future__ import absolute_import, division # confidence high - -import logging, time - -import numpy as np -from astropy.io import fits - -from stsci.tools import fileutil -from . import utils - -logger = logging.getLogger('stwcs.updatewcs.npol') - -class NPOLCorr(object): - """ - Defines a Lookup table prior distortion correction as per WCS paper IV. - It uses a reference file defined by the NPOLFILE (suffix 'NPL') keyword - in the primary header. - - Notes - ----- - - Using extensions in the reference file create a WCSDVARR extensions - and add them to the science file. - - Add record-valued keywords to the science extension header to describe - the lookup tables. - - Add a keyword 'NPOLEXT' to the science extension header to store - the name of the reference file used to create the WCSDVARR extensions. - - If WCSDVARR extensions exist and `NPOLFILE` is different from `NPOLEXT`, - a subsequent update will overwrite the existing extensions. - If WCSDVARR extensions were not found in the science file, they will be added. - - It is assumed that the NPL reference files were created to work with IDC tables - but will be applied with SIP coefficients. A transformation is applied to correct - for the fact that the lookup tables will be applied before the first order coefficients - which are in the CD matrix when the SIP convention is used. - """ - - def updateWCS(cls, fobj): - """ - Parameters - ---------- - fobj : `astropy.io.fits.HDUList` object - Science file, for which a distortion correction in a NPOLFILE is available - - """ - logger.info("\n\tStarting NPOL: %s" %time.asctime()) - try: - assert isinstance(fobj, fits.HDUList) - except AssertionError: - logger.exception('\n\tInput must be a fits.HDUList object') - raise - - cls.applyNPOLCorr(fobj) - nplfile = fobj[0].header['NPOLFILE'] - - new_kw = {'NPOLEXT': nplfile} - return new_kw - - updateWCS = classmethod(updateWCS) - - def applyNPOLCorr(cls, fobj): - """ - For each science extension in a fits file object: - - create a WCSDVARR extension - - update science header - - add/update NPOLEXT keyword - """ - nplfile = fileutil.osfn(fobj[0].header['NPOLFILE']) - # Map WCSDVARR EXTVER numbers to extension numbers - wcsdvarr_ind = cls.getWCSIndex(fobj) - for ext in fobj: - try: - extname = ext.header['EXTNAME'].lower() - except KeyError: - continue - if extname == 'sci': - extversion = ext.header['EXTVER'] - ccdchip = cls.get_ccdchip(fobj, extname='SCI', extver=extversion) - header = ext.header - # get the data arrays from the reference file and transform - # them for use with SIP - dx,dy = cls.getData(nplfile, ccdchip) - idccoeffs = cls.getIDCCoeffs(header) - - if idccoeffs is not None: - dx, dy = cls.transformData(dx,dy, idccoeffs) - - # Determine EXTVER for the WCSDVARR extension from the - # NPL file (EXTNAME, EXTVER) kw. - # This is used to populate DPj.EXTVER kw - wcsdvarr_x_version = 2 * extversion -1 - wcsdvarr_y_version = 2 * extversion - for ename in zip(['DX', 'DY'], [wcsdvarr_x_version,wcsdvarr_y_version],[dx, dy]): - error_val = ename[2].max() - cls.addSciExtKw(header, wdvarr_ver=ename[1], npol_extname=ename[0], error_val=error_val) - hdu = cls.createNpolHDU(header, npolfile=nplfile, \ - wdvarr_ver=ename[1], npl_extname=ename[0], data=ename[2],ccdchip=ccdchip) - if wcsdvarr_ind: - fobj[wcsdvarr_ind[ename[1]]] = hdu - else: - fobj.append(hdu) - - - applyNPOLCorr = classmethod(applyNPOLCorr) - - def getWCSIndex(cls, fobj): - - """ - If fobj has WCSDVARR extensions: - returns a mapping of their EXTVER kw to file object extension numbers - if fobj does not have WCSDVARR extensions: - an empty dictionary is returned - """ - wcsd = {} - for e in range(len(fobj)): - try: - ename = fobj[e].header['EXTNAME'] - except KeyError: - continue - if ename == 'WCSDVARR': - wcsd[fobj[e].header['EXTVER']] = e - logger.debug("A map of WSCDVARR externsions %s" % wcsd) - return wcsd - - getWCSIndex = classmethod(getWCSIndex) - - def addSciExtKw(cls, hdr, wdvarr_ver=None, npol_extname=None, error_val=0.0): - """ - Adds kw to sci extension to define WCSDVARR lookup table extensions - - """ - if npol_extname =='DX': - j=1 - else: - j=2 - - cperror = 'CPERR%s' %j - cpdis = 'CPDIS%s' %j - dpext = 'DP%s.' %j + 'EXTVER' - dpnaxes = 'DP%s.' %j +'NAXES' - dpaxis1 = 'DP%s.' %j+'AXIS.1' - dpaxis2 = 'DP%s.' %j+'AXIS.2' - keys = [cperror, cpdis, dpext, dpnaxes, dpaxis1, dpaxis2] - values = {cperror: error_val, - cpdis: 'Lookup', - dpext: wdvarr_ver, - dpnaxes: 2, - dpaxis1: 1, - dpaxis2: 2} - - comments = {cperror: 'Maximum error of NPOL correction for axis %s' % j, - cpdis: 'Prior distortion function type', - dpext: 'Version number of WCSDVARR extension containing lookup distortion table', - dpnaxes: 'Number of independent variables in distortion function', - dpaxis1: 'Axis number of the jth independent variable in a distortion function', - dpaxis2: 'Axis number of the jth independent variable in a distortion function' - } - # Look for HISTORY keywords. If present, insert new keywords before them - before_key = 'HISTORY' - if before_key not in hdr: - before_key = None - - for key in keys: - hdr.set(key, value=values[key], comment=comments[key], before=before_key) - - addSciExtKw = classmethod(addSciExtKw) - - def getData(cls,nplfile, ccdchip): - """ - Get the data arrays from the reference NPOL files - Make sure 'CCDCHIP' in the npolfile matches "CCDCHIP' in the science file. - """ - npl = fits.open(nplfile) - for ext in npl: - nplextname = ext.header.get('EXTNAME',"") - nplccdchip = ext.header.get('CCDCHIP',1) - if nplextname == 'DX' and nplccdchip == ccdchip: - xdata = ext.data.copy() - continue - elif nplextname == 'DY' and nplccdchip == ccdchip: - ydata = ext.data.copy() - continue - else: - continue - npl.close() - return xdata, ydata - getData = classmethod(getData) - - def transformData(cls, dx, dy, coeffs): - """ - Transform the NPOL data arrays for use with SIP - """ - ndx, ndy = np.dot(coeffs, [dx.ravel(), dy.ravel()]).astype(np.float32) - ndx.shape = dx.shape - ndy.shape=dy.shape - return ndx, ndy - - transformData = classmethod(transformData) - - def getIDCCoeffs(cls, header): - """ - Return a matrix of the scaled first order IDC coefficients. - """ - try: - ocx10 = header['OCX10'] - ocx11 = header['OCX11'] - ocy10 = header['OCY10'] - ocy11 = header['OCY11'] - coeffs = np.array([[ocx11, ocx10], [ocy11,ocy10]], dtype=np.float32) - except KeyError: - logger.exception('\n\tFirst order IDCTAB coefficients are not available. \n\ - Cannot convert SIP to IDC coefficients.') - return None - try: - idcscale = header['IDCSCALE'] - except KeyError: - logger.exception("IDCSCALE not found in header - setting it to 1.") - idcscale = 1 - - return np.linalg.inv(coeffs/idcscale) - - getIDCCoeffs = classmethod(getIDCCoeffs) - - def createNpolHDU(cls, sciheader, npolfile=None, wdvarr_ver=1, npl_extname=None,data = None, ccdchip=1): - """ - Creates an HDU to be added to the file object. - """ - hdr = cls.createNpolHdr(sciheader, npolfile=npolfile, wdvarr_ver=wdvarr_ver, npl_extname=npl_extname, ccdchip=ccdchip) - hdu = fits.ImageHDU(header=hdr, data=data) - return hdu - - createNpolHDU = classmethod(createNpolHDU) - - def createNpolHdr(cls, sciheader, npolfile, wdvarr_ver, npl_extname, ccdchip): - """ - Creates a header for the WCSDVARR extension based on the NPOL reference file - and sci extension header. The goal is to always work in image coordinates - (also for subarrays and binned images. The WCS for the WCSDVARR extension - i ssuch that a full size npol table is created and then shifted or scaled - if the science image is a subarray or binned image. - """ - npl = fits.open(npolfile) - npol_phdr = npl[0].header - for ext in npl: - try: - nplextname = ext.header['EXTNAME'] - nplextver = ext.header['EXTVER'] - except KeyError: - continue - nplccdchip = cls.get_ccdchip(npl, extname=nplextname, extver=nplextver) - if nplextname == npl_extname and nplccdchip == ccdchip: - npol_header = ext.header - break - else: - continue - npl.close() - - naxis = npl[1].header['NAXIS'] - ccdchip = nplextname #npol_header['CCDCHIP'] - - kw = { 'NAXIS': 'Size of the axis', - 'CDELT': 'Coordinate increment along axis', - 'CRPIX': 'Coordinate system reference pixel', - 'CRVAL': 'Coordinate system value at reference pixel', - } - - kw_comm1 = {} - kw_val1 = {} - for key in kw.keys(): - for i in range(1, naxis+1): - si = str(i) - kw_comm1[key+si] = kw[key] - - for i in range(1, naxis+1): - si = str(i) - kw_val1['NAXIS'+si] = npol_header.get('NAXIS'+si) - kw_val1['CDELT'+si] = npol_header.get('CDELT'+si, 1.0) * \ - sciheader.get('LTM'+si+'_'+si, 1) - kw_val1['CRPIX'+si] = npol_header.get('CRPIX'+si, 0.0) - kw_val1['CRVAL'+si] = (npol_header.get('CRVAL'+si, 0.0) - \ - sciheader.get('LTV'+str(i), 0)) - - kw_comm0 = {'XTENSION': 'Image extension', - 'BITPIX': 'IEEE floating point', - 'NAXIS': 'Number of axes', - 'EXTNAME': 'WCS distortion array', - 'EXTVER': 'Distortion array version number', - 'PCOUNT': 'Special data area of size 0', - 'GCOUNT': 'One data group', - } - - kw_val0 = { 'XTENSION': 'IMAGE', - 'BITPIX': -32, - 'NAXIS': naxis, - 'EXTNAME': 'WCSDVARR', - 'EXTVER': wdvarr_ver, - 'PCOUNT': 0, - 'GCOUNT': 1, - 'CCDCHIP': ccdchip, - } - cdl = [] - for key in kw_comm0.keys(): - cdl.append((key, kw_val0[key], kw_comm0[key])) - for key in kw_comm1.keys(): - cdl.append((key, kw_val1[key], kw_comm1[key])) - # Now add keywords from NPOLFILE header to document source of calibration - # include all keywords after and including 'FILENAME' from header - start_indx = -1 - end_indx = 0 - for i, c in enumerate(npol_phdr): - if c == 'FILENAME': - start_indx = i - if c == '': # remove blanks from end of header - end_indx = i+1 - break - if start_indx >= 0: - for card in npol_phdr.cards[start_indx:end_indx]: - cdl.append(card) - - hdr = fits.Header(cards=cdl) - - return hdr - - createNpolHdr = classmethod(createNpolHdr) - - def get_ccdchip(cls, fobj, extname, extver): - """ - Given a science file or npol file determine CCDCHIP - """ - ccdchip = 1 - if fobj[0].header['INSTRUME'] == 'ACS' and fobj[0].header['DETECTOR'] == 'WFC': - ccdchip = fobj[extname, extver].header['CCDCHIP'] - elif fobj[0].header['INSTRUME'] == 'WFC3' and fobj[0].header['DETECTOR'] == 'UVIS': - ccdchip = fobj[extname, extver].header['CCDCHIP'] - elif fobj[0].header['INSTRUME'] == 'WFPC2': - ccdchip = fobj[extname, extver].header['DETECTOR'] - elif fobj[0].header['INSTRUME'] == 'STIS': - ccdchip = fobj[extname, extver].header['DETECTOR'] - elif fobj[0].header['INSTRUME'] == 'NICMOS': - ccdchip = fobj[extname, extver].header['CAMERA'] - return ccdchip - - get_ccdchip = classmethod(get_ccdchip) diff --git a/lib/stwcs/updatewcs/utils.py b/lib/stwcs/updatewcs/utils.py deleted file mode 100644 index 1214199..0000000 --- a/lib/stwcs/updatewcs/utils.py +++ /dev/null @@ -1,264 +0,0 @@ -from __future__ import division # confidence high -import os -from astropy.io import fits -from stsci.tools import fileutil - -import logging -logger = logging.getLogger("stwcs.updatewcs.utils") - - -def diff_angles(a,b): - """ - Perform angle subtraction a-b taking into account - small-angle differences across 360degree line. - """ - - diff = a - b - - if diff > 180.0: - diff -= 360.0 - - if diff < -180.0: - diff += 360.0 - - return diff - -def getBinning(fobj, extver=1): - # Return the binning factor - binned = 1 - if fobj[0].header['INSTRUME'] == 'WFPC2': - mode = fobj[0].header.get('MODE', "") - if mode == 'AREA': binned = 2 - else: - binned = fobj['SCI', extver].header.get('BINAXIS',1) - return binned - -def updateNEXTENDKw(fobj): - """ - Updates PRIMARY header with correct value for NEXTEND, if present. - - Parameters - ----------- - fobj : `astropy.io.fits.HDUList` - The FITS object for file opened in `update` mode - - """ - if 'nextend' in fobj[0].header: - fobj[0].header['nextend'] = len(fobj) -1 - -def extract_rootname(kwvalue,suffix=""): - """ Returns the rootname from a full reference filename - - If a non-valid value (any of ['','N/A','NONE','INDEF',None]) is input, - simply return a string value of 'NONE' - - This function will also replace any 'suffix' specified with a blank. - """ - # check to see whether a valid kwvalue has been provided as input - if kwvalue.strip() in ['','N/A','NONE','INDEF',None]: - return 'NONE' # no valid value, so return 'NONE' - - # for a valid kwvalue, parse out the rootname - # strip off any environment variable from input filename, if any are given - if '$' in kwvalue: - fullval = kwvalue[kwvalue.find('$')+1:] - else: - fullval = kwvalue - # Extract filename without path from kwvalue - fname = os.path.basename(fullval).strip() - - # Now, rip out just the rootname from the full filename - rootname = fileutil.buildNewRootname(fname) - - # Now, remove any known suffix from rootname - rootname = rootname.replace(suffix,'') - return rootname.strip() - -def construct_distname(fobj,wcsobj): - """ - This function constructs the value for the keyword 'DISTNAME'. - It relies on the reference files specified by the keywords 'IDCTAB', - 'NPOLFILE', and 'D2IMFILE'. - - The final constructed value will be of the form: - <idctab rootname>-<npolfile rootname>-<d2imfile rootname> - and have a value of 'NONE' if no reference files are specified. - """ - idcname = extract_rootname(fobj[0].header.get('IDCTAB', "NONE"), - suffix='_idc') - if (idcname is None or idcname=='NONE') and wcsobj.sip is not None: - idcname = 'UNKNOWN' - - npolname, npolfile = build_npolname(fobj) - if npolname is None and wcsobj.cpdis1 is not None: - npolname = 'UNKNOWN' - - d2imname, d2imfile = build_d2imname(fobj) - if d2imname is None and wcsobj.det2im is not None: - d2imname = 'UNKNOWN' - - sipname, idctab = build_sipname(fobj) - - distname = build_distname(sipname,npolname,d2imname) - return {'DISTNAME':distname,'SIPNAME':sipname} - -def build_distname(sipname,npolname,d2imname): - """ - Core function to build DISTNAME keyword value without the HSTWCS input. - """ - - distname = sipname.strip() - if npolname != 'NONE' or d2imname != 'NONE': - if d2imname != 'NONE': - distname+= '-'+npolname.strip() + '-'+d2imname.strip() - else: - distname+='-'+npolname.strip() - - return distname - -def build_default_wcsname(idctab): - - idcname = extract_rootname(idctab,suffix='_idc') - wcsname = 'IDC_' + idcname - return wcsname - -def build_sipname(fobj, fname=None, sipname=None): - """ - Build a SIPNAME from IDCTAB - - Parameters - ---------- - fobj : `astropy.io.fits.HDUList` - file object - fname : string - science file name (to be used if ROOTNAME is not present - sipname : string - user supplied SIPNAME keyword - - Returns - ------- - sipname, idctab - """ - try: - idctab = fobj[0].header['IDCTAB'] - idcname = extract_rootname(idctab,suffix='_idc') - except KeyError: - idctab = 'N/A' - idcname= 'N/A' - if not fname: - try: - fname = fobj.filename() - except: - fname = " " - if not sipname: - try: - sipname = fobj[0].header["SIPNAME"] - if idcname == 'N/A' or idcname not in sipname: - raise KeyError - except KeyError: - if idcname != 'N/A': - try: - rootname = fobj[0].header['rootname'] - except KeyError: - rootname = fname - sipname = rootname +'_'+ idcname - else: - if 'A_ORDER' in fobj[1].header or 'B_ORDER' in fobj[1].header: - sipname = 'UNKNOWN' - else: - idcname = 'NOMODEL' - - return sipname, idctab - -def build_npolname(fobj, npolfile=None): - """ - Build a NPOLNAME from NPOLFILE - - Parameters - ---------- - fobj : `astropy.io.fits.HDUList` - file object - npolfile : string - user supplied NPOLFILE keyword - - Returns - ------- - npolname, npolfile - """ - if not npolfile: - try: - npolfile = fobj[0].header["NPOLFILE"] - except KeyError: - npolfile = ' ' - if fileutil.countExtn(fobj, 'WCSDVARR'): - npolname = 'UNKNOWN' - else: - npolname = 'NOMODEL' - npolname = extract_rootname(npolfile, suffix='_npl') - if npolname == 'NONE': - npolname = 'NOMODEL' - else: - npolname = extract_rootname(npolfile, suffix='_npl') - if npolname == 'NONE': - npolname = 'NOMODEL' - return npolname, npolfile - -def build_d2imname(fobj, d2imfile=None): - """ - Build a D2IMNAME from D2IMFILE - - Parameters - ---------- - fobj : `astropy.io.fits.HDUList` - file object - d2imfile : string - user supplied NPOLFILE keyword - - Returns - ------- - d2imname, d2imfile - """ - if not d2imfile: - try: - d2imfile = fobj[0].header["D2IMFILE"] - except KeyError: - d2imfile = 'N/A' - if fileutil.countExtn(fobj, 'D2IMARR'): - d2imname = 'UNKNOWN' - else: - d2imname = 'NOMODEL' - d2imname = extract_rootname(d2imfile,suffix='_d2i') - if d2imname == 'NONE': - d2imname = 'NOMODEL' - else: - d2imname = extract_rootname(d2imfile,suffix='_d2i') - if d2imname == 'NONE': - d2imname = 'NOMODEL' - return d2imname, d2imfile - - -def remove_distortion(fname, dist_keyword): - logger.info("Removing distortion {0} from file {0}".format(dist_keyword, fname)) - from ..wcsutil import altwcs - if dist_keyword == "NPOLFILE": - extname = "WCSDVARR" - keywords = ["CPERR*", "DP1.*", "DP2.*", "CPDIS*", "NPOLEXT"] - elif dist_keyword == "D2IMFILE": - extname = "D2IMARR" - keywords = ["D2IMERR*", "D2IM1.*", "D2IM2.*", "D2IMDIS*", "D2IMEXT"] - else: - raise AttributeError("Unrecognized distortion keyword " - "{0} when attempting to remove distortion".format(dist_keyword)) - ext_mapping = altwcs.mapFitsExt2HDUListInd(fname, "SCI").values() - f = fits.open(fname, mode="update") - for hdu in ext_mapping: - for kw in keywords: - try: - del f[hdu].header[kw] - except KeyError: - pass - ext_mapping = altwcs.mapFitsExt2HDUListInd(fname, extname).values() - ext_mapping.sort() - for hdu in ext_mapping[::-1]: - del f[hdu] - f.close() diff --git a/lib/stwcs/updatewcs/wfpc2_dgeo.py b/lib/stwcs/updatewcs/wfpc2_dgeo.py deleted file mode 100644 index e57bb5c..0000000 --- a/lib/stwcs/updatewcs/wfpc2_dgeo.py +++ /dev/null @@ -1,123 +0,0 @@ -""" wfpc2_dgeo - Functions to convert WFPC2 DGEOFILE into D2IMFILE - -""" -import os -import datetime - -import astropy -from astropy.io import fits -import numpy as np - -from stsci.tools import fileutil - -import logging -logger = logging.getLogger("stwcs.updatewcs.apply_corrections") - -def update_wfpc2_d2geofile(filename, fhdu=None): - """ - Creates a D2IMFILE from the DGEOFILE for a WFPC2 image (input), and - modifies the header to reflect the new usage. - - Parameters - ---------- - filename: string - Name of WFPC2 file to be processed. This file will be updated - to delete any reference to a DGEOFILE and add a D2IMFILE to replace - that correction when running updatewcs. - fhdu: object - FITS object for WFPC2 image. If user has already opened the WFPC2 - file, they can simply pass that FITS object in for direct processing. - - Returns - ------- - d2imfile: string - Name of D2IMFILE created from DGEOFILE. The D2IMFILE keyword in the - image header will be updated/added to point to this newly created file. - - """ - - close_fhdu = False - if fhdu is None: - fhdu = fileutil.openImage(filename, mode='update') - close_fhdu = True - - dgeofile = fhdu['PRIMARY'].header.get('DGEOFILE', None) - already_converted = dgeofile not in [None, "N/A", "", " "] - if already_converted or 'ODGEOFIL' in fhdu['PRIMARY'].header: - if not already_converted: - dgeofile = fhdu['PRIMARY'].header.get('ODGEOFIL', None) - logger.info('Converting DGEOFILE %s into D2IMFILE...' % dgeofile) - rootname = filename[:filename.find('.fits')] - d2imfile = convert_dgeo_to_d2im(dgeofile,rootname) - fhdu['PRIMARY'].header['ODGEOFIL'] = dgeofile - fhdu['PRIMARY'].header['DGEOFILE'] = 'N/A' - fhdu['PRIMARY'].header['D2IMFILE'] = d2imfile - else: - d2imfile = None - fhdu['PRIMARY'].header['DGEOFILE'] = 'N/A' - if 'D2IMFILE' not in fhdu['PRIMARY'].header: - fhdu['PRIMARY'].header['D2IMFILE'] = 'N/A' - - # Only close the file handle if opened in this function - if close_fhdu: - fhdu.close() - - # return the d2imfile name so that calling routine can keep - # track of the new file created and delete it later if necessary - # (multidrizzle clean=True mode of operation) - return d2imfile - -def convert_dgeo_to_d2im(dgeofile,output,clobber=True): - """ Routine that converts the WFPC2 DGEOFILE into a D2IMFILE. - """ - dgeo = fileutil.openImage(dgeofile) - outname = output+'_d2im.fits' - - removeFileSafely(outname) - data = np.array([dgeo['dy',1].data[:,0]]) - scihdu = fits.ImageHDU(data=data) - dgeo.close() - # add required keywords for D2IM header - scihdu.header['EXTNAME'] = ('DY', 'Extension name') - scihdu.header['EXTVER'] = (1, 'Extension version') - fits_str = 'PYFITS Version '+str(astropy.__version__) - scihdu.header['ORIGIN'] = (fits_str, 'FITS file originator') - scihdu.header['INHERIT'] = (False, 'Inherits global header') - - dnow = datetime.datetime.now() - scihdu.header['DATE'] = (str(dnow).replace(' ','T'), - 'Date FITS file was generated') - - scihdu.header['CRPIX1'] = (0, 'Distortion array reference pixel') - scihdu.header['CDELT1'] = (1, 'Grid step size in first coordinate') - scihdu.header['CRVAL1'] = (0, 'Image array pixel coordinate') - scihdu.header['CRPIX2'] = (0, 'Distortion array reference pixel') - scihdu.header['CDELT2'] = (1, 'Grid step size in second coordinate') - scihdu.header['CRVAL2'] = (0, 'Image array pixel coordinate') - - phdu = fits.PrimaryHDU() - phdu.header['INSTRUME'] = 'WFPC2' - d2imhdu = fits.HDUList() - d2imhdu.append(phdu) - scihdu.header['DETECTOR'] = (1, 'CCD number of the detector: PC 1, WFC 2-4 ') - d2imhdu.append(scihdu.copy()) - scihdu.header['EXTVER'] = (2, 'Extension version') - scihdu.header['DETECTOR'] = (2, 'CCD number of the detector: PC 1, WFC 2-4 ') - d2imhdu.append(scihdu.copy()) - scihdu.header['EXTVER'] = (3, 'Extension version') - scihdu.header['DETECTOR'] = (3, 'CCD number of the detector: PC 1, WFC 2-4 ') - d2imhdu.append(scihdu.copy()) - scihdu.header['EXTVER'] = (4, 'Extension version') - scihdu.header['DETECTOR'] = (4, 'CCD number of the detector: PC 1, WFC 2-4 ') - d2imhdu.append(scihdu.copy()) - d2imhdu.writeto(outname) - d2imhdu.close() - - return outname - - -def removeFileSafely(filename,clobber=True): - """ Delete the file specified, but only if it exists and clobber is True. - """ - if filename is not None and filename.strip() != '': - if os.path.exists(filename) and clobber: os.remove(filename) |