diff options
| -rw-r--r-- | lib/stwcs/wcsutil/altwcs.py | 365 | 
1 files changed, 231 insertions, 134 deletions
| diff --git a/lib/stwcs/wcsutil/altwcs.py b/lib/stwcs/wcsutil/altwcs.py index d250b52..063df18 100644 --- a/lib/stwcs/wcsutil/altwcs.py +++ b/lib/stwcs/wcsutil/altwcs.py @@ -5,14 +5,15 @@ import string  import numpy as np  import pywcs  import pyfits +from stsci.tools import fileutil as fu  altwcskw = ['WCSAXES', 'CRVAL', 'CRPIX', 'PC', 'CDELT', 'CD', 'CTYPE', 'CUNIT',              'PV', 'PS']  # file operations -def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False): +def archiveWCS(fname, ext=None, wcskey=" ", wcsname=" ", reusekey=False):      """ -    Copy the primary WCS to the hader as an alternate WCS +    Copy the primary WCS to the header as an alternate WCS      with wcskey and name WCSNAME. It loops over all extensions in 'ext'      Parameters @@ -21,14 +22,28 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):              a file name or a file object      ext:    an int, a tuple, a python list of integers or a python list of tuples (e.g.('sci',1))              fits extensions to work with +            if None, work with a list of all extensions      wcskey: string "A"-"Z" or " "              if " ": get next available key if wcsname is also " " or try              to get a key from WCSNAME value      wcsname: string               Name of alternate WCS description -    clobber: boolean -             if Ture - overwrites a WCS with the same key +    reusekey: boolean +             if True - overwrites a WCS with the same key + +    Examples +    -------- +    Copy the primary WCS of an in memory headrlet object to an  +    alternate WCS with key 'T' +     +    >>> hlet=headerlet.createHeaderlet('junk.fits', 'hdr1.fits') +    >>> altwcs.wcskeys(hlet[1].header) +    ['A'] +    >>> altwcs.archiveWCS(hlet, ext=[('SIPWCS',1),('SIPWCS',2)], wcskey='T') +    >>> altwcs.wcskeys(hlet[1].header) +    ['A', 'T'] +          See Also      --------      wcsutils.restoreWCS: Copy an alternate WCS to the primary WCS @@ -40,21 +55,16 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):      else:          f = fname -    if not parpasscheck(f, ext, wcskey, wcsname): +    if not _parpasscheck(f, ext, wcskey, wcsname):          closefobj(fname, f)          return -    if isinstance(ext, int) or isinstance(ext, tuple): +    if not ext: +        ext = range(len(f)) +    elif isinstance(ext, int) or isinstance(ext, tuple):          ext = [ext] -    wcsext = 0 -    eindx = 0 -    for e in f: -        if 'extname' in e.header and 'sci' in e.header['extname'].lower(): -            wcsext = eindx -            break -        eindx += 1 - +    wcsext = ext[0]      if wcskey == " ":          # try getting the key from WCSNAME          if not wcsname.strip(): @@ -76,7 +86,12 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):              wname = wcsname      for e in ext: -        w = pywcs.WCS(f[e].header, fobj=f) +        try: +            w = pywcs.WCS(f[e].header, fobj=f) +        except: +            # this should be revisited, should catchspecific  errors +            # KeyError - not a valid key, ValueError - not a valid extension, etc +            continue          hwcs = w.to_header()          wcsnamekey = 'WCSNAME' + wkey          f[e].header.update(key=wcsnamekey, value=wcsname) @@ -90,15 +105,17 @@ def archiveWCS(fname, ext, wcskey=" ", wcsname=" ", clobber=False):          #f[e].header.update(key=okey, value=norient)      closefobj(fname, f) -def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False): +def restoreWCS(f, ext=None, fromext=None, toext=None, wcskey=" ", wcsname=" ",  +               clobber=False):      """ -    Copy a WCS with key "WCSKEY" to a primary WCS +    Copy a WCS with key "WCSKEY" to the primary WCS      Reads in a WCS defined with wcskey and saves it as the primary WCS. -    If clobber is False, writes out new files whose names are the original -    names with an attached 3 character string  _'WCSKEY'_. -    Otherwise overwrites the files. Goes sequentially through the list of extensions -    The WCS is restored from the 'SCI' extension but the primary WCS of all +    If clobber is False, writes out a new file whose name is the original +    name with an attached 3 character string  _'WCSKEY'_. +    Otherwise overwrites the file. Goes sequentially through the list  +    of extensions. +    The WCS is restored from the 'SCI' extension but the WCS of all      extensions with the same EXTVER are updated. @@ -106,6 +123,11 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):      ----------      f:       string or pyfits.HDUList object               a file name or a file object +    fromext: string  +             extname from which to read in the alternate WCS, for example 'SCI' +    toext:   string or python list +             extname or a list of extnames to which the WCS will be copied as  +             primary, for example ['SCI', 'ERR', 'DQ']      ext:     an int, a tuple, a python list of integers or a python list               of tuples (e.g.('sci',1))               fits extensions to work with @@ -114,7 +136,7 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):               or " " - find a key from WCSNAMe value      wcsname: string (optional)               if given and wcskey is " ", will try to restore by WCSNAME value -    clobber: boolean +    clobber: boolean, default False               A flag to define if the original files should be overwritten      See Also @@ -130,80 +152,72 @@ def restoreWCS(f, ext, wcskey=" ", wcsname=" ", clobber=False):      else:          fobj = f -    if not parpasscheck(fobj, ext, wcskey, wcsname): +    if not _parpasscheck(fobj, ext=ext, wcskey=wcskey, fromext=fromext, toext=toext, clobber=clobber):          closefobj(f, fobj)          return      if isinstance(ext, int) or isinstance(ext, tuple):          ext = [ext] - -    if not clobber: -        name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey +    #if isinstance(fromext, str): +    #    fromext = [fromext] +    if isinstance(toext, str): +        toext = [toext] +         +    # the case of an HDUList object in memory without an associated file +    if fobj.filename() is not None:  +        if not clobber: +            name = (fobj.filename().split('.fits')[0] + '_%s_' + '.fits') %wcskey +        else: +            name = fobj.filename() +     +    simplefits = fu.isFits(fobj)[1] is 'simple' +    if simplefits: +        wcskeyext = 0      else: -        name = fobj.filename() - +        wcskeyext = 1 +              if wcskey == " ":          if wcsname.strip(): -            wkey = getKeyFromName(fobj[1].header, wcsname) +            wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)              if not wkey:                  print 'Could not get a key from wcsname %s .' % wcsname                  closefobj(f, fobj)                  return      else: -        if wcskey not in wcskeys(fobj[1].header): +        if wcskey not in wcskeys(fobj, ext=wcskeyext):              print "Could not find alternate WCS with key %s in this file" % wcskey              closefobj(f, fobj)              return          wkey = wcskey - -    for e in ext: -        try: -            extname = fobj[e].header['EXTNAME'].lower() -        except KeyError: -            continue -        #Restore always from a 'SCI' extension but write it out to 'ERR' and 'DQ' -        if extname == 'sci': -            sciver = fobj[e].header['extver'] -            try: -                nwcs = pywcs.WCS(fobj[e].header, fobj=fobj, key=wkey) -            except KeyError: -                print 'restoreWCS: Could not read WCS with key %s in file %s,  \ -                extension %d' % (wkey, fobj.filename(), e) -                closefobj(f, fobj) -                return #raise -            hwcs = nwcs.to_header() - -            if nwcs.wcs.has_cd(): -                pc2cd(hwcs, key=wkey) -            for k in hwcs.keys(): -                key = k[:-1] -                if key in fobj[e].header.keys(): -                    fobj[e].header.update(key=key, value = hwcs[k]) -                else: -                    continue -            if wcskey == 'O' and fobj[e].header.has_key('TDDALPHA'): -                fobj[e].header['TDDALPHA'] = 0.0 -                fobj[e].header['TDDBETA'] = 0.0 -            if fobj[e].header.has_key('ORIENTAT'): -                norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey])) -                fobj[e].header.update(key='ORIENTAT', value=norient) -        elif extname in ['err', 'dq', 'sdq', 'time', 'samp']: -            cextver = fobj[e].header['extver'] -            if cextver == sciver: -                for k in hwcs.keys(): -                    key = k[:-1] -                    fobj[e].header.update(key=key, value = hwcs[k]) -                if fobj[e].header.has_key('ORIENTAT'): -                    norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %wkey],hwcs['CD2_2'+'%s' %wkey])) -                    fobj[e].header.update(key='ORIENTAT', value=norient) +         +    if ext: +        # if ext is given ignore fromext and toext. +        for e in ext: +            _restore(fobj, wkey, fromextnum=e) +    else: +        # the case when ext is None but fromext and toext are given +        # This only works with MEF files. It uses extname, extver. +        # For each extname in fromext find the corresponding extver for each  +        # extname in toext and update the primary WCS. +        if fromext and toext: +            countext = fu.countExtn(fobj, fromext) +            if not countext: +                raise KeyError("File does not have extension with extname %s", fromext) +            else: +                for i in range(1, countext+1): +                    for toe in toext: +                        _restore(fobj, fromextnum=i, fromextnam=fromext, toextnum=i, toextnam=toe, ukey=wkey)          else: -            continue +            ext = range(len(fobj)) +            for e in ext: +                _restore(fobj, wkey, fromextnum=e) -    if not clobber: -        fobj.writeto(name) -    closefobj(f, fobj) +    if fobj.filename() is not None: +        if not clobber: +            fobj.writeto(name) +        closefobj(f, fobj) -def deleteWCS(fname, ext, wcskey=" ", wcsname=" "): +def deleteWCS(fname, ext=None, wcskey=" ", wcsname=" "):      """      Delete an alternate WCS defined with wcskey.      If wcskey is " " try to get a key from WCSNAME. @@ -222,11 +236,13 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):      else:          fobj = fname -    if not parpasscheck(fobj, ext, wcskey, wcsname): +    if not _parpasscheck(fobj, ext, wcskey, wcsname):          closefobj(fname, fobj)          return -    if isinstance(ext, int) or isinstance(ext, tuple): +    if not ext: +        ext = range(len(fobj)) +    elif isinstance(ext, int) or isinstance(ext, tuple):          ext = [ext]      # Do not allow deleting the original WCS. @@ -235,6 +251,11 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):          closefobj(fname, fobj)          return +    simplefits = fu.isFits(fobj)[1] is 'simple' +    if simplefits: +        wcskeyext = 0 +    else: +        wcskeyext = 1      if wcskey == " ":          # try getting the key from WCSNAME          if wcsname == " ": @@ -242,13 +263,13 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):              closefobj(fname, fobj)              return          else: -            wkey = getKeyFromName(fobj[1].header, wcsname) +            wkey = getKeyFromName(fobj[wcskeyext].header, wcsname)              if not wkey:                  print 'Could not get a key: wcsname "%s" not found in header.' % wcsname                  closefobj(fname, fobj)                  return      else: -        if wcskey not in wcskeys(fobj[1].header): +        if wcskey not in wcskeys(fobj[wcskeyext].header):              print "Could not find alternate WCS with key %s in this file" % wcskey              closefobj(fname, fobj)              return @@ -274,8 +295,63 @@ def deleteWCS(fname, ext, wcskey=" ", wcsname=" "):          print "Did not find WCS with key %s in any of the extensions" % wkey      closefobj(fname, fobj) +def _restore(fobj, ukey, fromextnum, toextnum=None, fromextnam=None, toextnam=None): +    if fromextnam: +        fromextension = (fromextnam, fromextnum) +    else: +        fromextension = fromextnum +    if toextnum: +        if toextnam: +            toextension = (toextnam, toextnum) +        else: +            toextension =toextnum +    else: +        toextension = fromextension +         +    try: +        nwcs = pywcs.WCS(fobj[fromextension].header, fobj=fobj, key=ukey) +    except KeyError: +        print 'restoreWCS: Could not read WCS with key %s' %ukey  +        print '            Skipping %s[%s]' % (fobj.filename(), str(fromextension)) +        return +    hwcs = nwcs.to_header() + +    if nwcs.wcs.has_cd(): +        pc2cd(hwcs, key=ukey) +    for k in hwcs.keys(): +        key = k[:-1] +        if key in fobj[toextension].header.keys(): +            fobj[toextension].header.update(key=key, value = hwcs[k]) +        else: +            continue +    if key == 'O' and fobj[toextension].header.has_key('TDDALPHA'): +        fobj[toextension].header['TDDALPHA'] = 0.0 +        fobj[toeztension].header['TDDBETA'] = 0.0 +    if fobj[toextension].header.has_key('ORIENTAT'): +        norient = np.rad2deg(np.arctan2(hwcs['CD1_2'+'%s' %ukey], hwcs['CD2_2'+'%s' %ukey])) +        fobj[toextension].header.update(key='ORIENTAT', value=norient) +  #header operations -def wcskeys(header): +def _check_headerpars(fobj, ext): +    if not isinstance(fobj, pyfits.Header) and not isinstance(fobj, pyfits.HDUList) \ +                   and not isinstance(fobj, str): +        raise ValueError("Expected a file name, a file object or a header\n") +     +    if not isinstance(fobj, pyfits.Header): +        #raise ValueError("Expected a valid ext parameter when input is a file") +        if not isinstance(ext, int) and not isinstance(ext, tuple): +            raise ValueError("Expected ext to be a number or a tuple, e.g. ('SCI', 1)\n") + +def _getheader(fobj, ext): +    if isinstance(fobj, str): +        hdr = pyfits.getheader(fobj,ext) +    elif isinstance(fobj, pyfits.Header): +        hdr = fobj +    else: +        hdr = fobj[ext].header +    return hdr +         +def wcskeys(fobj, ext=None):      """      Returns a list of characters used in the header for alternate      WCS description with WCSNAME keyword @@ -284,11 +360,12 @@ def wcskeys(header):      ----------      hdr: pyfits.Header      """ -    assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input" -    names = header["WCSNAME*"] +    _check_headerpars(fobj, ext) +    hdr = _getheader(fobj, ext) +    names = hdr["WCSNAME*"]      return [key.split('WCSNAME')[1].upper() for key in names.keys()] -def wcsnames(header): +def wcsnames(fobj, ext=None):      """      Returns a dictionary of wcskey: WCSNAME pairs @@ -296,13 +373,15 @@ def wcsnames(header):      ----------      header: pyfits.Header      """ -    names = header["WCSNAME*"] +    _check_headerpars(fobj, ext) +    hdr = _getheader(fobj, ext) +    names = hdr["WCSNAME*"]      d = {}      for c in names:          d[c.key[-1]] = c.value      return d -def available_wcskeys(header): +def available_wcskeys(fobj, ext=None):      """      Returns a list of characters which are not used in the header      with WCSNAME keyword. Any of them can be used to save a new @@ -312,9 +391,10 @@ def available_wcskeys(header):      ----------      header: pyfits.Header      """ -    assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input" +    _check_headerpars(fobj, ext) +    hdr = _getheader(fobj, ext)      all_keys = list(string.ascii_uppercase) -    used_keys = wcskeys(header) +    used_keys = wcskeys(hdr)      try:          used_keys.remove("")      except ValueError: @@ -322,7 +402,7 @@ def available_wcskeys(header):      [all_keys.remove(key) for key in used_keys]      return all_keys -def next_wcskey(header): +def next_wcskey(fobj, ext=None):      """      Returns next available character to be used for an alternate WCS @@ -330,8 +410,9 @@ def next_wcskey(header):      ----------      header: pyfits.Header      """ -    assert isinstance(header, pyfits.Header), "Requires a pyfits.Header object as input" -    allkeys = available_wcskeys(header) +    _check_headerpars(fobj, ext) +    hdr = _getheader(fobj, hdr) +    allkeys = available_wcskeys(hdr)      if allkeys != []:          return allkeys[0]      else: @@ -384,61 +465,57 @@ def pc2cd(hdr, key=' '):          hdr.update(key='CD'+c+'%s' %key, value=val)      return hdr -def parpasscheck(fobj, ext, wcskey, wcsname, clobber=True): - +def _parpasscheck(fobj, ext, wcskey, fromext=None, toext=None, reusekey=False, clobber=False): +    """ +    Check input parameters to altwcs functions +     +    fobj:    string or pyfits.HDUList object +             a file name or a file object +    ext:     an int, a tuple, a python list of integers or a python list +             of tuples (e.g.('sci',1)) +             fits extensions to work with +    wcskey:  a charater +             "A"-"Z" or " "- Used for one of 26 alternate WCS definitions +    wcsname: string (optional) +             if given and wcskey is " ", will try to restore by WCSNAME value +    reusekey: boolean +             A flag which indicates whether to reuse a wcskey in the header +    clobber: boolean +             A flag to define if the original files should be overwritten +    """      if not isinstance(fobj,pyfits.HDUList):          print "First parameter must be a fits file object or a file name."          return False -    try: -        assert (fobj.fileinfo(0)['filemode'] == 'update') -    except AssertionError: -        print "First parameter must be a file name or a file object opened in 'update' mode." -        return False +     +    # first one covers the case of an object created in memory  +    # (e.g. headerlet) for which fileinfo returns None +    if fobj.fileinfo(0) is None:  +        pass +    else: +        # an HDUList object with associated file +        if fobj.fileinfo(0)['filemode'] is not 'update' and clobber: +            print "First parameter must be a file name or a file object opened in 'update' mode." +            return False      if not isinstance(ext, int) and not isinstance(ext, tuple) \ -       and not isinstance(ext, list): +       and not isinstance(ext, list) and ext is not None:          print "Ext must be integer, tuple, a list of int extension numbers, \          or a list of tuples representing a fits extension, for example ('sci', 1)."          return False +     +    if not isinstance(fromext, str) and fromext is not None: +        print "fromext must be a string representing a valid extname" +        return False +    if not isinstance(toext, list) and not isinstance(toext, str) and \ +                     toext is not None : +        print "toext must be a string or a list of strings representing extname" +        return False +          if len(wcskey) != 1:          print 'Parameter wcskey must be a character - one of "A"-"Z" or " "'          return False - -    wcsext = 0 -    eindx = 0 -    for e in fobj: -        if 'extname' in e.header and 'sci' in e.header['extname'].lower(): -            wcsext = eindx -            break -        eindx += 1 -         - -    if wcskey == " ": -        # try getting the key from WCSNAME -        """ -        if wcsname == " " or wcsname == "": -            #wkey = next_wcskey(f[1].header) -            #if not wkey: -            #    print "Could not get a valid key from header" -            return False -        """ -        if wcsname.strip(): -            wkey = getKeyFromName(fobj[wcsext].header, wcsname) -            if wkey and not clobber: -                print 'Wcsname %s is already used.' % wcsname -                print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey' -                print 'or use "clobber=True" to overwrite the values.' -                return False -    else: -        if wcskey not in available_wcskeys(fobj[wcsext].header): -            if clobber==False: -                print 'Wcskey %s is already used.' % wcskey -                print 'Use "wcsutil.next_wcskey" to obtain a valid wcskey' -                print 'or use "clobber=True" to overwrite the values.' -                return False - - +          return True  def closefobj(fname, f): @@ -449,3 +526,23 @@ def closefobj(fname, f):      """      if isinstance(fname, str):          f.close() + +def mapFitsExt2HDUListInd(fname, extname): +    """ +    Map FITS extensions with 'EXTNAME' to HDUList indexes. +    """ + +    if not isinstance(fname, pyfits.HDUList): +        f = pyfits.open(fname) +        close_file = True +    else: +        f = fname +        close_file = False +    d = {} +    for hdu in f: +        if 'EXTNAME' in hdu.header and hdu.header['EXTNAME'] == extname: +            extver = hdu.header['EXTVER'] +            d[(extname, extver)] = f.index_of((extname, extver)) +    if close_file: +        f.close() +    return d | 
