Source code for straklip.input_tables

import pandas as pd
from straklip.stralog import getLogger
import glob
import numpy as np
from astropy.io import fits

[docs] class Tables: def __init__(self,data_cfg, pipe_cfg, skip_originals=False): self.pipe_cfg = pipe_cfg self.data_cfg = data_cfg if not skip_originals: for table_name in ['mvs_table','unq_table']: self.load_table_into_df(table_name) self.select_tables()
[docs] def select_tables(self): self.mvs_table=self.mvs_table.loc[ ~self.mvs_table[self.mvs_table.columns[self.mvs_table.columns.str.contains('fits')]].isna().all( axis=1)] self.unq_table=self.unq_table.loc[self.unq_table.unq_ids.isin(self.mvs_table.unq_ids.unique())] self.crossmatch_ids_table=self.mvs_table[['unq_ids','mvs_ids']] self.mvs_table.drop('unq_ids',inplace=True,axis=1)
[docs] def canonize(self,label_list=['vis']): """ Enforce cannonicity of df str values lowercase""" for label in label_list: self.mvs_table[label] = self.mvs_table[label].astype(str).str.lower()
[docs] def rename_df(self,table,table_labels='mvs_table',default_labels='default_mvs_table',new_labels_dict={}): selected_labels=[] for key in [x for x in list(getattr(self.data_cfg,table_labels).keys()) if x in list(self.pipe_cfg.buildhdf[default_labels].keys())]: if table_labels == 'mvs_table' and key =='id': for elno in range(len(list(getattr(self.data_cfg, table_labels)[key].keys()))): label = list(getattr(self.data_cfg, table_labels)[key].values())[elno] newlabel = list(getattr(self.data_cfg, table_labels)[key].keys())[elno] new_labels_dict[label] = newlabel selected_labels.append(newlabel) elif isinstance(getattr(self.data_cfg, table_labels)[key],list): for elno in range(len(getattr(self.data_cfg,table_labels)[key])): label =getattr(self.data_cfg,table_labels)[key][elno] new_labels_dict[label]= self.pipe_cfg.buildhdf[default_labels][key]+'_%s'%self.data_cfg.filters[elno] selected_labels.append(self.pipe_cfg.buildhdf[default_labels][key]+'_%s'%self.data_cfg.filters[elno]) else: new_labels_dict[getattr(self.data_cfg,table_labels)[key]] = self.pipe_cfg.buildhdf[default_labels][key] selected_labels.append(self.pipe_cfg.buildhdf[default_labels][key]) getattr(self,table).rename(columns=new_labels_dict,inplace=True) setattr(self,table,getattr(self,table)[selected_labels]) getLogger(__name__).info(f'Renamed df columns to default')
[docs] def load_table_into_df(self,table_name): if table_name == 'mvs_table': setattr(self, table_name, pd.read_table(getattr(self.pipe_cfg,'paths')['database'] + '/' + getattr(self.data_cfg,table_name)['name'], sep=getattr(self.data_cfg,table_name)['sep'], skip_blank_lines=True, converters={getattr(self.data_cfg, table_name)['id']['unq_ids']: int, getattr(self.data_cfg, table_name)['id']['mvs_ids']: int, getattr(self.data_cfg, table_name)['vis']: str} ).dropna( how='all').reset_index(drop=True)) else: setattr(self, table_name, pd.read_table( getattr(self.pipe_cfg, 'paths')['database'] + '/' + getattr(self.data_cfg, table_name)['name'], sep=getattr(self.data_cfg, table_name)['sep'], skip_blank_lines=True, converters = {getattr(self.data_cfg, table_name)['id']: int, getattr(self.data_cfg, table_name)['type']: int} ).dropna( how='all').reset_index(drop=True)) getLogger(__name__).info('Loaded "%s" into df' % getattr(self.data_cfg,table_name)['name']) self.rename_df(table=table_name, table_labels=table_name, default_labels='default_%s'%table_name, new_labels_dict={}) if table_name == 'mvs_table': self.ancillary_info()
[docs] def check_fits_file_existence(self): path = self.pipe_cfg.paths['data'] fits_list = glob.glob(path + '/*.fits') if len(fits_list) == 0: getLogger(__name__).critical(f'No fits files found in {path}.') raise ValueError(f'No fits files found in {path}.') else: getLogger(__name__).info(f'{len(fits_list)} fits files found in {path}.') return(fits_list)
[docs] def ancillary_info(self): self.canonize() for label in ['rota','pav3','exptime']: for filter in self.data_cfg.filters: if not np.any(self.mvs_table.columns.str.contains('%s_%s' % (label,filter.lower()))): self.mvs_table['%s_%s' % (label,filter.lower())] = np.nan fits_list=self.check_fits_file_existence() for file in fits_list: hdul = fits.open(file) filename = hdul[0].header['ROOTNAME'] vis = filename[4:6].lower() ext = 1 if hdul[1].header['CCDCHIP'] == 2 else 4 try: filter1 = hdul[0].header['FILTER1'].lower() filter2 = hdul[0].header['FILTER2'].lower() except: filter1 = hdul[0].header['FILTER'].lower() filter2='' if np.any([filter in self.data_cfg.filters for filter in [filter1,filter2]]): if filter1[0] == 'f': filter = filter1 elif filter2[0] == 'f': filter = filter2 EXPTIME = hdul[0].header['EXPTIME'] PA_V3 = hdul[0].header['PA_V3'] ROTA = hdul[1].header['ORIENTAT'] # self.mvs_table.loc[(self.mvs_table.vis==str(vis).lower())&(self.mvs_table.ext==ext)&(self.mvs_table['fits_'+filter.lower()]==filename),['rota_'+filter.lower()]] = round(float(ROTA), 3) self.mvs_table.loc[(self.mvs_table.vis==str(vis).lower())&(self.mvs_table.ext==ext)&(self.mvs_table['fits_'+filter.lower()]==filename),['pav3_'+filter.lower()]] = round(float(PA_V3), 3) self.mvs_table.loc[(self.mvs_table.vis==str(vis).lower())&(self.mvs_table.ext==ext)&(self.mvs_table['fits_'+filter.lower()]==filename),['exptime_'+filter.lower()]] = round(float(EXPTIME), 3) # self.mvs_table.loc[(self.mvs_table.vis==str(vis).lower()),['fits_'+filter.lower()]] = filename getLogger(__name__).info(f'Added ancillary info to df')