summaryrefslogtreecommitdiff
path: root/stwcs/wcsutil/headerlet.py
diff options
context:
space:
mode:
Diffstat (limited to 'stwcs/wcsutil/headerlet.py')
-rw-r--r--stwcs/wcsutil/headerlet.py336
1 files changed, 170 insertions, 166 deletions
diff --git a/stwcs/wcsutil/headerlet.py b/stwcs/wcsutil/headerlet.py
index 5a980aa..03861df 100644
--- a/stwcs/wcsutil/headerlet.py
+++ b/stwcs/wcsutil/headerlet.py
@@ -34,7 +34,9 @@ from . import wcscorr
from .hstwcs import HSTWCS
from .mappings import basic_wcs
-#### Logging support functions
+# Logging support functions
+
+
class FuncNameLoggingFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None):
if '%(funcName)s' not in fmt:
@@ -59,8 +61,8 @@ logger.addHandler(ch)
logger.setLevel(logging.DEBUG)
FITS_STD_KW = ['XTENSION', 'BITPIX', 'NAXIS', 'PCOUNT',
- 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN',
- 'INHERIT', 'DATE', 'IRAF-TLM']
+ 'GCOUNT', 'EXTNAME', 'EXTVER', 'ORIGIN',
+ 'INHERIT', 'DATE', 'IRAF-TLM']
DEFAULT_SUMMARY_COLS = ['HDRNAME', 'WCSNAME', 'DISTNAME', 'AUTHOR', 'DATE',
'SIPNAME', 'NPOLFILE', 'D2IMFILE', 'DESCRIP']
@@ -120,10 +122,13 @@ def with_logging(func):
return func(*args, **kw)
return wrapped
-#### Utility functions
+# Utility functions
+
+
def is_par_blank(par):
return par in ['', ' ', 'INDEF', "None", None]
+
def parse_filename(fname, mode='readonly'):
"""
Interprets the input as either a filename of a file that needs to be opened
@@ -174,6 +179,7 @@ def parse_filename(fname, mode='readonly'):
fname = ''
return fobj, fname, close_fobj
+
def get_headerlet_kw_names(fobj, kw='HDRNAME'):
"""
Returns a list of specified keywords from all HeaderletHDU
@@ -198,6 +204,7 @@ def get_headerlet_kw_names(fobj, kw='HDRNAME'):
return hdrnames
+
def get_header_kw_vals(hdr, kwname, kwval, default=0):
if kwval is None:
if kwname in hdr:
@@ -206,6 +213,7 @@ def get_header_kw_vals(hdr, kwname, kwval, default=0):
kwval = default
return kwval
+
@with_logging
def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
strict=True, logging=False, logmode='w'):
@@ -259,9 +267,9 @@ def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
hdrlets = []
if hdrext is not None and isinstance(hdrext, int):
- if hdrext in range(len(fobj)): # insure specified hdrext is in fobj
+ if hdrext in range(len(fobj)): # insure specified hdrext is in fobj
if isinstance(fobj[hdrext], fits.hdu.base.NonstandardExtHDU) and \
- fobj[hdrext].header['EXTNAME'] == 'HDRLET':
+ fobj[hdrext].header['EXTNAME'] == 'HDRLET':
hdrlets.append(hdrext)
else:
for ext in fobj:
@@ -280,9 +288,9 @@ def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
(hdrextnum == ext.header['EXTVER']) and
(hdrextname == ext.header['EXTNAME']))
hdrname_match = ((hdrname is not None) and
- (hdrname == ext.header['HDRNAME']))
+ (hdrname == ext.header['HDRNAME']))
distname_match = ((distname is not None) and
- (distname == ext.header['DISTNAME']))
+ (distname == ext.header['DISTNAME']))
if hdrext_match or hdrname_match or distname_match:
hdrlets.append(fobj.index(ext))
@@ -310,6 +318,7 @@ def find_headerlet_HDUs(fobj, hdrext=None, hdrname=None, distname=None,
return hdrlets
+
def verify_hdrname_is_unique(fobj, hdrname):
"""
Verifies that no other HeaderletHDU extension has the specified hdrname.
@@ -331,6 +340,7 @@ def verify_hdrname_is_unique(fobj, hdrname):
return unique
+
def update_versions(sourcehdr, desthdr):
"""
Update keywords which store version numbers
@@ -344,6 +354,7 @@ def update_versions(sourcehdr, desthdr):
except KeyError:
desthdr[key] = (" ", phdukw[key])
+
def update_ref_files(source, dest):
"""
Update the reference files name in the primary header of 'dest'
@@ -372,8 +383,9 @@ def update_ref_files(source, dest):
phdukw[key] = False
return phdukw
+
def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
- output=None, clobber=True, quiet=False ):
+ output=None, clobber=True, quiet=False ):
"""
Print out summary dictionary to STDOUT, and possibly an output file
@@ -404,9 +416,9 @@ def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
for row in range(nrows):
if idcol:
outstr += COLUMN_FMT.format(idcol['vals'][row],
- width=idcol['width']+pad)
+ width=idcol['width'] + pad)
for kw in summary_cols:
- val = summary_dict[kw]['vals'][row][:(column_widths[kw]-pad)]
+ val = summary_dict[kw]['vals'][row][:(column_widths[kw] - pad)]
outstr += COLUMN_FMT.format(val, width=column_widths[kw])
outstr += '\n'
if not quiet:
@@ -415,7 +427,7 @@ def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
# If specified, write info to separate text file
write_file = False
if output:
- output = fu.osfn(output) # Expand any environment variables in filename
+ output = fu.osfn(output) # Expand any environment variables in filename
write_file = True
if os.path.exists(output):
if clobber:
@@ -430,11 +442,13 @@ def print_summary(summary_cols, summary_dict, pad=2, maxwidth=None, idcol=None,
fout.write(outstr)
fout.close()
-#### Private utility functions
+# Private utility functions
+
+
def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
- sipname, npolfile, d2imfile,
- nmatch,catalog, wcskey,
- author, descrip, history):
+ sipname, npolfile, d2imfile,
+ nmatch, catalog, wcskey,
+ author, descrip, history):
# convert input values into valid FITS kw values
if author is None:
author = ''
@@ -447,7 +461,7 @@ def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
npolname, npolfile = utils.build_npolname(fobj, npolfile)
logger.info("Setting npolfile value to %s" % npolname)
- d2imname, d2imfile = utils.build_d2imname(fobj,d2imfile)
+ d2imname, d2imfile = utils.build_d2imname(fobj, d2imfile)
logger.info("Setting d2imfile value to %s" % d2imname)
distname = utils.build_distname(sipname, npolname, d2imname)
@@ -461,23 +475,14 @@ def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
else:
history = ''
- rms_ra = fobj[wcsext].header.get("CRDER1"+wcskey, 0)
- rms_dec = fobj[wcsext].header.get("CRDER2"+wcskey, 0)
+ rms_ra = fobj[wcsext].header.get("CRDER1" + wcskey, 0)
+ rms_dec = fobj[wcsext].header.get("CRDER2" + wcskey, 0)
if not nmatch:
- nmatch = fobj[wcsext].header.get("NMATCH"+wcskey, 0)
+ nmatch = fobj[wcsext].header.get("NMATCH" + wcskey, 0)
if not catalog:
- catalog = fobj[wcsext].header.get('CATALOG'+wcskey, "")
+ catalog = fobj[wcsext].header.get('CATALOG' + wcskey, "")
# get the version of STWCS used to create the WCS of the science file.
- #try:
- #upwcsver = fobj[0].header.cards[fobj[0].header.index('UPWCSVER')]
- #except KeyError:
- #upwcsver = pyfits.Card("UPWCSVER", " ",
- #"Version of STWCS used to update the WCS")
- #try:
- #pywcsver = fobj[0].header.cards[fobj[0].header.index('PYWCSVER')]
- #except KeyError:
- #pywcsver = pyfits.Card("PYWCSVER", " ",
- #"Version of PYWCS used to update the WCS")
+
upwcsver = fobj[0].header.get('UPWCSVER', "")
pywcsver = fobj[0].header.get('PYWCSVER', "")
# build Primary HDU
@@ -495,7 +500,7 @@ def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
phdu.header['D2IMFILE'] = (d2imfile,
'origin of detector to image correction')
phdu.header['IDCTAB'] = (idctab,
- 'origin of Polynomial Distortion')
+ 'origin of Polynomial Distortion')
phdu.header['AUTHOR'] = (author, 'headerlet created by this user')
phdu.header['DESCRIP'] = (descrip,
'Short description of headerlet solution')
@@ -526,7 +531,9 @@ def _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
return phdu
-#### Public Interface functions
+# Public Interface functions
+
+
@with_logging
def extract_headerlet(filename, output, extnum=None, hdrname=None,
clobber=False, logging=True):
@@ -611,11 +618,11 @@ def extract_headerlet(filename, output, extnum=None, hdrname=None,
@with_logging
def write_headerlet(filename, hdrname, output=None, sciext='SCI',
- wcsname=None, wcskey=None, destim=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- attach=True, clobber=False, logging=False):
+ wcsname=None, wcskey=None, destim=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ attach=True, clobber=False, logging=False):
"""
Save a WCS as a headerlet FITS file.
@@ -733,22 +740,22 @@ def write_headerlet(filename, hdrname, output=None, sciext='SCI',
# Interpret sciext input for this file
if isinstance(sciext, int):
- sciextlist = [sciext] # allow for specification of simple FITS header
+ sciextlist = [sciext] # allow for specification of simple FITS header
elif isinstance(sciext, str):
numsciext = countExtn(fobj, sciext)
if numsciext > 0:
- sciextlist = [tuple((sciext,i)) for i in range(1, numsciext+1)]
+ sciextlist = [tuple((sciext, i)) for i in range(1, numsciext + 1)]
else:
sciextlist = [0]
elif isinstance(sciext, list):
sciextlist = sciext
else:
- errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
- " a valid EXTNAME string, or an integer."
+ errstr = "Expected sciext to be a list of FITS extensions with science data\n" + \
+ " a valid EXTNAME string, or an integer."
logger.critical(errstr)
raise ValueError
- wnames = altwcs.wcsnames(fobj,ext=sciextlist[0])
+ wnames = altwcs.wcsnames(fobj, ext=sciextlist[0])
# Insure that WCSCORR table has been created with all original
# WCS's recorded prior to adding the headerlet WCS
@@ -756,7 +763,7 @@ def write_headerlet(filename, hdrname, output=None, sciext='SCI',
if wcsname is None:
scihdr = fobj[sciextlist[0]].header
- wname = scihdr['wcsname'+wcskey]
+ wname = scihdr['wcsname' + wcskey]
else:
wname = wcsname
if hdrname in [None, ' ', '']:
@@ -764,17 +771,17 @@ def write_headerlet(filename, hdrname, output=None, sciext='SCI',
logger.critical('Creating the headerlet from image %s' % fname)
hdrletobj = create_headerlet(fobj, sciext=sciextlist,
- wcsname=wname, wcskey=wcskey,
- hdrname=hdrname,
- sipname=sipname, npolfile=npolfile,
- d2imfile=d2imfile, author=author,
- descrip=descrip, history=history,
- nmatch=nmatch, catalog=catalog,
- logging=False)
+ wcsname=wname, wcskey=wcskey,
+ hdrname=hdrname,
+ sipname=sipname, npolfile=npolfile,
+ d2imfile=d2imfile, author=author,
+ descrip=descrip, history=history,
+ nmatch=nmatch, catalog=catalog,
+ logging=False)
if attach:
# Check to see whether or not a HeaderletHDU with
- #this hdrname already exists
+ # this hdrname already exists
hdrnames = get_headerlet_kw_names(fobj)
if hdrname not in hdrnames:
hdrlet_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
@@ -810,14 +817,15 @@ def write_headerlet(filename, hdrname, output=None, sciext='SCI',
outname = output
if not outname.endswith('.fits'):
- outname = '{0}_{1}_hlet.fits'.format(frootname,outname)
+ outname = '{0}_{1}_hlet.fits'.format(frootname, outname)
# If user specifies an output filename for headerlet, write it out
hdrletobj.tofile(outname, clobber=clobber)
- logger.critical( 'Created Headerlet file %s ' % outname)
+ logger.critical('Created Headerlet file %s ' % outname)
del hdrletobj
+
@with_logging
def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
wcskey=" ", wcsname=None,
@@ -856,7 +864,8 @@ def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
if " ", use the primary (default)
if None use wcsname
wcsname: string or None
- if wcskey is None use wcsname specified here to choose an alternate WCS for the headerlet
+ if wcskey is None use wcsname specified here to choose an alternate WCS
+ for the headerlet
sipname: string or None (default)
Name of unique file where the polynomial distortion coefficients were
read from. If None, the behavior is:
@@ -935,7 +944,7 @@ def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
if not wcsname:
# User did not specify a value for 'wcsname'
if wcsnamekw in fobj[wcsext].header:
- #check if there's a WCSNAME for this wcskey in the header
+ # check if there's a WCSNAME for this wcskey in the header
wcsname = fobj[wcsext].header[wcsnamekw]
logger.info("Setting wcsname from header[%s] to %s" % (wcsnamekw, wcsname))
else:
@@ -973,8 +982,9 @@ def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
wkeys = altwcs.wcskeys(fobj, ext=wcsext)
if wcskey != ' ':
if wcskey not in wkeys:
- logger.critical('No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext)))
- raise ValueError("No WCS with wcskey=%s found in extension %s. Skipping...' % (wcskey, str(wcsext))")
+ mess = "Skipping extension {0} - no WCS with wcskey={1} found.".format(wcsext, wcskey)
+ logger.critical(mess)
+ raise ValueError(mess)
# get remaining required keywords
if destim is None:
@@ -1005,13 +1015,11 @@ def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
logger.critical(message)
raise KeyError
-
-
hdul = []
phdu = _create_primary_HDU(fobj, fname, wcsext, destim, hdrname, wcsname,
- sipname, npolfile, d2imfile,
- nmatch, catalog, wcskey,
- author, descrip, history)
+ sipname, npolfile, d2imfile,
+ nmatch, catalog, wcskey,
+ author, descrip, history)
hdul.append(phdu)
wcsdvarr_extns = []
"""
@@ -1100,14 +1108,6 @@ def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
whdu.update_ext_version(ihdu.header['D2IM2.EXTVER'])
hdul.append(whdu)
-
- #if hwcs.det2im1 or hwcs.det2im2:
- #try:
- #darr = hdul[('D2IMARR', 1)]
- #except KeyError:
- #whdu = whdul[('D2IMARR')]
- #whdu.update_ext_version(1)
- #hdul.append(whdu)
if close_file:
fobj.close()
@@ -1115,9 +1115,10 @@ def create_headerlet(filename, sciext='SCI', hdrname=None, destim=None,
hlet.init_attrs()
return hlet
+
@with_logging
def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True,
- force=False, logging=False, logmode='a'):
+ force=False, logging=False, logmode='a'):
"""
Apply headerlet 'hdrfile' to a science observation 'destfile' as the primary WCS
@@ -1146,18 +1147,19 @@ def apply_headerlet_as_primary(filename, hdrlet, attach=True, archive=True,
hdrlet = [hdrlet]
if len(hdrlet) != len(filename):
logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),
+ len(hdrlet)))
- for fname,h in zip(filename,hdrlet):
- print("Applying {0} as Primary WCS to {1}".format(h,fname))
+ for fname, h in zip(filename, hdrlet):
+ print("Applying {0} as Primary WCS to {1}".format(h, fname))
hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
hlet.apply_as_primary(fname, attach=attach, archive=archive,
- force=force)
+ force=force)
@with_logging
def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None,
- wcsname=None, logging=False, logmode='w'):
+ wcsname=None, logging=False, logmode='w'):
"""
Apply headerlet to a science observation as an alternate WCS
@@ -1188,13 +1190,14 @@ def apply_headerlet_as_alternate(filename, hdrlet, attach=True, wcskey=None,
hdrlet = [hdrlet]
if len(hdrlet) != len(filename):
logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),
+ len(hdrlet)))
- for fname,h in zip(filename,hdrlet):
- print('Applying {0} as an alternate WCS to {1}'.format(h,fname))
+ for fname, h in zip(filename, hdrlet):
+ print('Applying {0} as an alternate WCS to {1}'.format(h, fname))
hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
hlet.apply_as_alternate(fname, attach=attach,
- wcsname=wcsname, wcskey=wcskey)
+ wcsname=wcsname, wcskey=wcskey)
@with_logging
@@ -1218,12 +1221,13 @@ def attach_headerlet(filename, hdrlet, logging=False, logmode='a'):
hdrlet = [hdrlet]
if len(hdrlet) != len(filename):
logger.critical("Filenames must have matching headerlets. "
- "{0:d} filenames and {1:d} headerlets specified".format(len(filename),len(hdrlet)))
+ "{0:d} filenames and {1:d} headerlets specified".format(len(filename),
+ len(hdrlet)))
- for fname,h in zip(filename,hdrlet):
- print('Attaching {0} as Headerlet extension to {1}'.format(h,fname))
+ for fname, h in zip(filename, hdrlet):
+ print('Attaching {0} as Headerlet extension to {1}'.format(h, fname))
hlet = Headerlet.fromfile(h, logging=logging, logmode=logmode)
- hlet.attach_to_file(fname,archive=True)
+ hlet.attach_to_file(fname, archive=True)
@with_logging
@@ -1262,12 +1266,13 @@ def delete_headerlet(filename, hdrname=None, hdrext=None, distname=None,
filename = [filename]
for f in filename:
- print("Deleting Headerlet from ",f)
+ print("Deleting Headerlet from ", f)
_delete_single_headerlet(f, hdrname=hdrname, hdrext=hdrext,
- distname=distname, logging=logging, logmode='a')
+ distname=distname, logging=logging, logmode='a')
+
def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None,
- logging=False, logmode='w'):
+ logging=False, logmode='w'):
"""
Deletes HeaderletHDU(s) from a SINGLE science file
@@ -1297,7 +1302,7 @@ def _delete_single_headerlet(filename, hdrname=None, hdrext=None, distname=None,
logmode: 'a' or 'w'
"""
hdrlet_ind = find_headerlet_HDUs(filename, hdrname=hdrname, hdrext=hdrext,
- distname=distname, logging=logging, logmode='a')
+ distname=distname, logging=logging, logmode='a')
if len(hdrlet_ind) == 0:
message = """
No HDUs deleted... No Headerlet HDUs found with '
@@ -1404,8 +1409,8 @@ def headerlet_summary(filename, columns=None, pad=2, maxwidth=None,
# Print out the summary dictionary
print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
- idcol=extnums_col, output=output,
- clobber=clobber, quiet=quiet)
+ idcol=extnums_col, output=output,
+ clobber=clobber, quiet=quiet)
@with_logging
@@ -1452,7 +1457,7 @@ def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
message = """
Multiple Headerlet extensions found with the same name.
%d Headerlets with "%s" = %s found in %s.
- """% (len(hdrlet_ind), kwerr, kwval, fname)
+ """ % (len(hdrlet_ind), kwerr, kwval, fname)
if close_fobj:
fobj.close()
logger.critical(message)
@@ -1464,7 +1469,7 @@ def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
if hasattr(fobj[hdrlet_ind[0]], 'hdulist'):
hdrlet = fobj[hdrlet_indx].hdulist
else:
- hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS
+ hdrlet = fobj[hdrlet_indx].headerlet # older convention in PyFITS
# read in the names of the extensions which HeaderletHDU updates
extlist = []
@@ -1503,7 +1508,7 @@ def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
else:
if 'idctab' in scihdr:
priwcs_hdrname = ''.join(['IDC_',
- utils.extract_rootname(scihdr['idctab'], suffix='_idc')])
+ utils.extract_rootname(scihdr['idctab'], suffix='_idc')])
else:
priwcs_hdrname = 'UNKNOWN'
priwcs_name = priwcs_hdrname
@@ -1513,7 +1518,7 @@ def restore_from_headerlet(filename, hdrname=None, hdrext=None, archive=True,
if archive and priwcs_unique:
if priwcs_unique:
newhdrlet = create_headerlet(fobj, sciext=scihdr['extname'],
- hdrname=priwcs_hdrname)
+ hdrname=priwcs_hdrname)
newhdrlet.attach_to_file(fobj)
#
# copy hdrlet as a primary
@@ -1598,7 +1603,7 @@ def restore_all_with_distname(filename, distname, primary, archive=True,
if hasattr(fobj[primary_ind], 'hdulist'):
primary_hdrlet = fobj[primary_ind].hdulist
else:
- primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS
+ primary_hdrlet = fobj[primary_ind].headerlet # older convention in PyFITS
pri_distname = primary_hdrlet[0].header['distname']
if pri_distname != distname:
if close_fobj:
@@ -1625,7 +1630,7 @@ def restore_all_with_distname(filename, distname, primary, archive=True,
if hasattr(fobj[hlet], 'hdulist'):
hdrlet = fobj[hlet].hdulist
else:
- hdrlet = fobj[hlet].headerlet # older convention in PyFITS
+ hdrlet = fobj[hlet].headerlet # older convention in PyFITS
if hlet == primary_ind:
hdrlet.apply_as_primary(fobj, attach=False,
archive=archive, force=True)
@@ -1641,11 +1646,11 @@ def restore_all_with_distname(filename, distname, primary, archive=True,
@with_logging
def archive_as_headerlet(filename, hdrname, sciext='SCI',
- wcsname=None, wcskey=None, destim=None,
- sipname=None, npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- nmatch=None, catalog=None,
- logging=False, logmode='w'):
+ wcsname=None, wcskey=None, destim=None,
+ sipname=None, npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ nmatch=None, catalog=None,
+ logging=False, logmode='w'):
"""
Save a WCS as a headerlet extension and write it out to a file.
@@ -1735,7 +1740,7 @@ def archive_as_headerlet(filename, hdrname, sciext='SCI',
if wcsname is None:
scihdr = fobj[sciext, 1].header
- wcsname = scihdr['wcsname'+wcskey]
+ wcsname = scihdr['wcsname' + wcskey]
if hdrname in [None, ' ', '']:
hdrname = wcsname
@@ -1745,13 +1750,13 @@ def archive_as_headerlet(filename, hdrname, sciext='SCI',
hdrnames = get_headerlet_kw_names(fobj)
if hdrname not in hdrnames:
hdrletobj = create_headerlet(fobj, sciext=sciext,
- wcsname=wcsname, wcskey=wcskey,
- hdrname=hdrname,
- sipname=sipname, npolfile=npolfile,
- d2imfile=d2imfile, author=author,
- descrip=descrip, history=history,
- nmatch=nmatch, catalog=catalog,
- logging=False)
+ wcsname=wcsname, wcskey=wcskey,
+ hdrname=hdrname,
+ sipname=sipname, npolfile=npolfile,
+ d2imfile=d2imfile, author=author,
+ descrip=descrip, history=history,
+ nmatch=nmatch, catalog=catalog,
+ logging=False)
hlt_hdu = HeaderletHDU.fromheaderlet(hdrletobj)
if destim is not None:
@@ -1771,7 +1776,10 @@ def archive_as_headerlet(filename, hdrname, sciext='SCI',
if close_fobj:
fobj.close()
-#### Headerlet Class definitions
+
+# Headerlet Class definitions
+
+
class Headerlet(fits.HDUList):
"""
A Headerlet class
@@ -1811,9 +1819,9 @@ class Headerlet(fits.HDUList):
self.distname = self[0].header["DISTNAME"]
try:
- self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) #None instead of 1?
+ self.vafactor = self[("SIPWCS", 1)].header.get("VAFACTOR", 1) # None instead of 1?
except (IndexError, KeyError):
- self.vafactor = self[0].header.get("VAFACTOR", 1) #None instead of 1?
+ self.vafactor = self[0].header.get("VAFACTOR", 1) # None instead of 1?
self.author = self[0].header["AUTHOR"]
self.descrip = self[0].header["DESCRIP"]
@@ -1846,7 +1854,7 @@ class Headerlet(fits.HDUList):
init_logging('class Headerlet', level=logging, mode=logmode)
return hlet
- def apply_as_primary(self, fobj, attach=True, archive=True, force=False):
+ def apply_as_primary(self, fobj, attach=True, archive=True, force=False):
"""
Copy this headerlet as a primary WCS to fobj
@@ -1877,7 +1885,8 @@ class Headerlet(fits.HDUList):
if close_dest:
fobj.close()
raise ValueError("Destination name does not match headerlet"
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+ "Observation {0} cannot be updated with"
+ "headerlet {1}".format((fname, self.hdrname)))
# Check to see whether the distortion model in the destination
# matches the distortion model in the headerlet being applied
@@ -1886,7 +1895,7 @@ class Headerlet(fits.HDUList):
dist_models_equal = self.equal_distmodel(dname)
if not dist_models_equal and not force:
raise ValueError("Distortion models do not match"
- " To overwrite the distortion model, set force=True")
+ " To overwrite the distortion model, set force=True")
orig_hlt_hdu = None
numhlt = countExtn(fobj, 'HDRLET')
@@ -1896,15 +1905,14 @@ class Headerlet(fits.HDUList):
# WCS's recorded prior to adding the headerlet WCS
wcscorr.init_wcscorr(fobj)
-
- ### start archive
+ # start archive
# If archive has been specified
- # regardless of whether or not the distortion models are equal...
+ # regardless of whether or not the distortion models are equal...
numsip = countExtn(self, 'SIPWCS')
sciext_list = []
alt_hlethdu = []
- for i in range(1, numsip+1):
+ for i in range(1, numsip + 1):
sipheader = self[('SIPWCS', i)].header
sciext_list.append((sipheader['TG_ENAME'], sipheader['TG_EVER']))
target_ext = sciext_list[0]
@@ -1920,10 +1928,10 @@ class Headerlet(fits.HDUList):
# Create a headerlet for the original Primary WCS data in the file,
# create an HDU from the original headerlet, and append it to
# the file
- orig_hlt = create_headerlet(fobj, sciext=sciext_list, #[target_ext],
- wcsname=wcsname,
- hdrname=hdrname,
- logging=self.logging)
+ orig_hlt = create_headerlet(fobj, sciext=sciext_list, # [target_ext],
+ wcsname=wcsname,
+ hdrname=hdrname,
+ logging=self.logging)
orig_hlt_hdu = HeaderletHDU.fromheaderlet(orig_hlt)
numhlt += 1
orig_hlt_hdu.header['EXTVER'] = numhlt
@@ -1931,7 +1939,6 @@ class Headerlet(fits.HDUList):
else:
logger.info("Headerlet with name %s is already attached" % hdrname)
-
if dist_models_equal:
# Use the WCSNAME to determine whether or not to archive
# Primary WCS as altwcs
@@ -1945,8 +1952,8 @@ class Headerlet(fits.HDUList):
else:
if 'idctab' in scihdr:
priwcs_name = ''.join(['IDC_',
- utils.extract_rootname(scihdr['idctab'],
- suffix='_idc')])
+ utils.extract_rootname(scihdr['idctab'],
+ suffix='_idc')])
else:
priwcs_name = 'UNKNOWN'
nextkey = altwcs.next_wcskey(fobj, ext=target_ext)
@@ -1958,11 +1965,11 @@ class Headerlet(fits.HDUList):
if hname != 'OPUS' and hname not in hdrlet_extnames:
# get HeaderletHDU for alternate WCS as well
alt_hlet = create_headerlet(fobj, sciext=sciext_list,
- wcsname=hname, wcskey=wcskey,
- hdrname=hname, sipname=None,
- npolfile=None, d2imfile=None,
- author=None, descrip=None, history=None,
- logging=self.logging)
+ wcsname=hname, wcskey=wcskey,
+ hdrname=hname, sipname=None,
+ npolfile=None, d2imfile=None,
+ author=None, descrip=None, history=None,
+ logging=self.logging)
numhlt += 1
alt_hlet_hdu = HeaderletHDU.fromheaderlet(alt_hlet)
alt_hlet_hdu.header['EXTVER'] = numhlt
@@ -1970,8 +1977,8 @@ class Headerlet(fits.HDUList):
hdrlet_extnames.append(hname)
self._del_dest_WCS_ext(fobj)
- for i in range(1, numsip+1):
- target_ext = sciext_list[i-1]
+ for i in range(1, numsip + 1):
+ target_ext = sciext_list[i - 1]
self._del_dest_WCS(fobj, target_ext)
sipwcs = HSTWCS(self, ('SIPWCS', i))
idckw = sipwcs._idc2hdr()
@@ -2035,19 +2042,19 @@ class Headerlet(fits.HDUList):
fobj[target_ext].header.extend(priwcs[0].header)
if sipwcs.cpdis1:
- whdu = priwcs[('WCSDVARR', (i-1)*numnpol+1)].copy()
+ whdu = priwcs[('WCSDVARR', (i - 1) * numnpol + 1)].copy()
whdu.update_ext_version(self[('SIPWCS', i)].header['DP1.EXTVER'])
fobj.append(whdu)
if sipwcs.cpdis2:
- whdu = priwcs[('WCSDVARR', i*numnpol)].copy()
+ whdu = priwcs[('WCSDVARR', i * numnpol)].copy()
whdu.update_ext_version(self[('SIPWCS', i)].header['DP2.EXTVER'])
fobj.append(whdu)
- if sipwcs.det2im1: #or sipwcs.det2im2:
- whdu = priwcs[('D2IMARR', (i-1)*numd2im+1)].copy()
+ if sipwcs.det2im1: # or sipwcs.det2im2:
+ whdu = priwcs[('D2IMARR', (i - 1) * numd2im + 1)].copy()
whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM1.EXTVER'])
fobj.append(whdu)
if sipwcs.det2im2:
- whdu = priwcs[('D2IMARR', i*numd2im)].copy()
+ whdu = priwcs[('D2IMARR', i * numd2im)].copy()
whdu.update_ext_version(self[('SIPWCS', i)].header['D2IM2.EXTVER'])
fobj.append(whdu)
@@ -2070,7 +2077,6 @@ class Headerlet(fits.HDUList):
if close_dest:
fobj.close()
-
def apply_as_alternate(self, fobj, attach=True, wcskey=None, wcsname=None):
"""
Copy this headerlet as an alternate WCS to fobj
@@ -2098,17 +2104,19 @@ class Headerlet(fits.HDUList):
if close_dest:
fobj.close()
raise ValueError("Destination name does not match headerlet"
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+ "Observation %s cannot be updated with"
+ "headerlet %s" % (fname, self.hdrname))
# Verify whether this headerlet has the same distortion
- #found in the image being updated
+ # found in the image being updated
dname = self.get_destination_model(fobj)
dist_models_equal = self.equal_distmodel(dname)
if not dist_models_equal:
raise ValueError("Distortion models do not match \n"
"Headerlet: %s \n"
"Destination file: %s\n"
- "attach_to_file() can be used to append this headerlet" %(self.distname, dname))
+ "attach_to_file() can be used to append this headerlet" %
+ (self.distname, dname))
# Insure that WCSCORR table has been created with all original
# WCS's recorded prior to adding the headerlet WCS
@@ -2148,12 +2156,12 @@ class Headerlet(fits.HDUList):
hwcs_header = altwcs.pc2cd(hwcs_header, key=wkey)
for ax in range(1, hwcs.naxis + 1):
hwcs_header['CTYPE{0}{1}'.format(ax, wkey)] = \
- self[('SIPWCS', 1)].header['CTYPE{0}'.format(ax)]
+ self[('SIPWCS', 1)].header['CTYPE{0}'.format(ax)]
fhdr.extend(hwcs_header)
fhdr['WCSNAME' + wkey] = wname
# also update with HDRNAME (a non-WCS-standard kw)
for kw in self.fit_kws:
- #fhdr.insert(wind, pyfits.Card(kw + wkey,
+ # fhdr.insert(wind, pyfits.Card(kw + wkey,
# self[0].header[kw]))
fhdr.append(fits.Card(kw + wkey, self[0].header[kw]))
# Update the WCSCORR table with new rows from the headerlet's WCSs
@@ -2210,7 +2218,7 @@ class Headerlet(fits.HDUList):
fobj.close()
def info(self, columns=None, pad=2, maxwidth=None,
- output=None, clobber=True, quiet=False):
+ output=None, clobber=True, quiet=False):
"""
Prints a summary of this headerlet
The summary includes:
@@ -2240,7 +2248,7 @@ class Headerlet(fits.HDUList):
"""
summary_cols, summary_dict = self.summary(columns=columns)
print_summary(summary_cols, summary_dict, pad=pad, maxwidth=maxwidth,
- idcol=None, output=output, clobber=clobber, quiet=quiet)
+ idcol=None, output=output, clobber=clobber, quiet=quiet)
def summary(self, columns=None):
"""
@@ -2301,7 +2309,7 @@ class Headerlet(fits.HDUList):
HDRNAME.
"""
unique = verify_hdrname_is_unique(dest, self.hdrname)
- logger.debug("verify_hdrname() returned %s"%unique)
+ logger.debug("verify_hdrname() returned %s" % unique)
return unique
def get_destination_model(self, dest):
@@ -2318,7 +2326,7 @@ class Headerlet(fits.HDUList):
else:
destim = dest
dname = destim[0].header['DISTNAME'] if 'distname' in destim[0].header \
- else self.build_distname(dest)
+ else self.build_distname(dest)
if destim_opened:
destim.close()
return dname
@@ -2357,7 +2365,8 @@ class Headerlet(fits.HDUList):
else:
logger.debug("verify_destim() returned False")
logger.critical("Destination name does not match headerlet. "
- "Observation %s cannot be updated with headerlet %s" % (fname, self.hdrname))
+ "Observation %s cannot be updated with"
+ "headerlet %s" % (fname, self.hdrname))
return False
def build_distname(self, dest):
@@ -2377,7 +2386,7 @@ class Headerlet(fits.HDUList):
sipname, idctab = utils.build_sipname(dest, dest, None)
npolname, npolfile = utils.build_npolname(dest, npolfile)
d2imname, d2imfile = utils.build_d2imname(dest, d2imfile)
- dname = utils.build_distname(sipname,npolname,d2imname)
+ dname = utils.build_distname(sipname, npolname, d2imname)
return dname
def tofile(self, fname, destim=None, hdrname=None, clobber=False):
@@ -2418,8 +2427,7 @@ class Headerlet(fits.HDUList):
else:
for idx in range(numext):
# Only delete WCS from extensions which may have WCS keywords
- if ('XTENSION' in dest[idx].header and
- dest[idx].header['XTENSION'] == 'IMAGE'):
+ if ('XTENSION' in dest[idx].header and dest[idx].header['XTENSION'] == 'IMAGE'):
self._remove_d2im(dest[idx])
self._remove_sip(dest[idx])
self._remove_lut(dest[idx])
@@ -2427,12 +2435,7 @@ class Headerlet(fits.HDUList):
self._remove_idc_coeffs(dest[idx])
self._remove_fit_values(dest[idx])
self._remove_ref_files(dest[0])
- """
- if not ext:
- self._remove_alt_WCS(dest, ext=range(numext))
- else:
- self._remove_alt_WCS(dest, ext=ext)
- """
+
def _del_dest_WCS_ext(self, dest):
numwdvarr = countExtn(dest, 'WCSDVARR')
numd2im = countExtn(dest, 'D2IMARR')
@@ -2459,13 +2462,13 @@ class Headerlet(fits.HDUList):
Remove the any existing astrometric fit values from a FITS extension
"""
- logger.debug("Removing astrometric fit values from (%s, %s)"%
+ logger.debug("Removing astrometric fit values from (%s, %s)" %
(ext.name, ext.ver))
dkeys = altwcs.wcskeys(ext.header)
- if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values
+ if 'O' in dkeys: dkeys.remove('O') # Do not remove wcskey='O' values
for fitkw in ['NMATCH', 'CATALOG']:
for k in dkeys:
- fkw = (fitkw+k).rstrip()
+ fkw = (fitkw + k).rstrip()
if fkw in ext.header:
del ext.header[fkw]
@@ -2544,7 +2547,7 @@ class Headerlet(fits.HDUList):
dkeys = altwcs.wcskeys(dest[('SCI', 1)].header)
for val in ['O', '', ' ']:
if val in dkeys:
- dkeys.remove(val) # Never delete WCS with wcskey='O'
+ dkeys.remove(val) # Never delete WCS with wcskey='O'
logger.debug("Removing alternate WCSs with keys %s from %s"
% (dkeys, dest.filename()))
@@ -2588,6 +2591,7 @@ class Headerlet(fits.HDUList):
except KeyError:
pass
+
@with_logging
def _idc2hdr(fromhdr, tohdr, towkey=' '):
"""
@@ -2598,7 +2602,7 @@ def _idc2hdr(fromhdr, tohdr, towkey=' '):
coeffs = ['OCX10', 'OCX11', 'OCY10', 'OCY11', 'IDCSCALE']
for c in coeffs:
try:
- tohdr[c+towkey] = fromhdr[c]
+ tohdr[c + towkey] = fromhdr[c]
logger.debug("Copied %s to header")
except KeyError:
continue
@@ -2663,8 +2667,8 @@ def get_extname_extver_list(fobj, sciext):
else:
extlist = sciext[:]
else:
- errstr = "Expected sciext to be a list of FITS extensions with science data\n"+\
- " a valid EXTNAME string, or an integer."
+ errstr = "Expected sciext to be a list of FITS extensions with science data\n" + \
+ " a valid EXTNAME string, or an integer."
logger.critical(errstr)
raise ValueError
return extlist