Source code for straklip.photometry

'''
This module implements the Detection class, which defines the Detection object fundamental to perform photomerty, as well ass the 
photometry classes (aperture,PSF and matched filter) and lux2counts (and viceversa) transofrmations

'''

import sys
sys.path.append('/')
from straklip.utils.ancillary import find_closer,gaussian_func

import numpy as np
import matplotlib.pyplot as plt
from itertools import product
from photutils import CircularAperture,CircularAnnulus,RectangularAperture
from photutils.psf import BasicPSFPhotometry,DAOGroup
from photutils.aperture.circle import ApertureMask
from photutils.detection import DAOStarFinder,find_peaks
from photutils.psf import FittableImageModel
from astropy import units as u
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.table import Table
from astropy.stats import sigma_clipped_stats
from scipy import optimize
from astropy.stats import sigma_clip

[docs] class Detection: "This class define the master Detection object for photometry" def __init__(self, data, x, y, edata=None, psf=None,fwhm=None,Sky=0,eSky=0,nSky=1,thrpt=1,ethrpt=0,Ei=1,grow_corr=0): ''' Initialize Detection object Parameters ---------- data : numpy array tile from photometric analysis. It is assumed to be in counts. x : float x coordinates of brighter pixel to anchor the photometry. y : float y coordinates of brighter pixel to anchor the photometry. edata : numpy array, optional error tile for PSF photometry. The default is None. psf : numpy array, optional psf for MF/PSF photometry. The default is None. fwhm : float, optional fwhm for MF/PSF photometry. If not known, try to evaluate from image using radial_profile in flux_converter. The default is None. Sky : float, optional value of the sky in the tile. The default is 0. eSky : float, optional error on the sky. The default is 0. nSky : int, optional number of pixel used to evaluate the sky. The default is 0. Returns ------- None. ''' self.x=x self.y=y self.data=data.copy() self.edata=edata self.psf=psf self.fwhm=fwhm self.Sky=Sky self.nSky=nSky self.eSky=eSky self.thrpt=thrpt self.ethrpt=ethrpt self.Ei=Ei self.grow_corr=grow_corr del data
[docs] def find_sources(self,fwhm=2.5,sigma=5.,std=0): daofind = DAOStarFinder(fwhm=fwhm, threshold=sigma*std) self.sources = daofind(self.data).to_pandas()
[docs] def find_peaks(self,threshold,box_size): self.peaks = find_peaks(self.data, threshold, box_size=box_size).to_pandas()
[docs] class flux_converter: "This class handle transnformation that can be applyed to the output of each photometry"
[docs] def counts_and_errors(self): ''' evaluate counts and uncertainties following DAOPHOT formula for aperture/MF photometry or the equivalent noise area for PSF photometry ''' if hasattr(self,'eq_noise_area'): self.counts=(self.sum)/(self.Ei*self.thrpt)#-self.Sky self.ecounts=np.sqrt(self.eq_noise_area*abs(self.Sky)+self.counts) else: self.counts=(self.sum-self.Sky*self.Nap)/(self.Ei*self.thrpt)#-self.Sky var1=self.Nap*self.eSky**2 var2=self.counts var3=(self.Nap**2*self.eSky**2)/self.nSky var4=self.ethrpt**2 ### Need to think how the spread in the troughput affect the corrected errors on the counts! self.ecounts=np.sqrt(var1+var2+var3+var4) self.Nsigma=np.round(self.counts/self.ecounts,3)
[docs] def flux2mag(self,zpt=0.0,ezpt=0.0,exptime=1.0):#,gain=1.0): ''' Convert flux to magnitudes and uncertanties Parameters ---------- zpt : float, optional zeropoint for the observation. The default is 0. zpt : float, optional error on the zeropoint. The default is 1. ezpt : float, optional exposure time for this detection. The default is 1. exptime : float, optional exposure time for this detection. The default is 1. Returns ------- None. ''' self.zpt=zpt self.exptime=exptime # self.gain=gain # if self.counts/(self.exptime*self.gain)>0: # self.mag=-2.5*np.log10((self.counts/(self.exptime*self.gain)))+self.zpt if self.counts / (self.exptime) > 0: self.mag=-2.5*np.log10((self.counts/(self.exptime)))+self.zpt self.emag=np.sqrt((1.0857*(self.ecounts/self.counts))**2+ezpt**2) else: self.mag = np.nan self.emag = np.nan
[docs] def mag2flux(self): pass
[docs] def radial_profile(self, max_rad=5,initial_guess = [1,0,1,0],showplot=False): ''' Evaluate the radial profile of the fistribution of counts in the tile Parameters ---------- max_rad : int, optional maximum distance from the target in pixels where to evaluate the profile. The default is 5. initial_guess : list, optional initial guess for the gaussian fit. The default is [1,0,1,0]. showplot : bool, optional choose to show pltos. The default is False. Returns ------- None. ''' x_radial_dist=[] y_radial_dist=[] for y in range(int(self.y*2)): for x in range(int(self.x*2)): dist=np.sqrt((x-self.x)**2+(y-self.y)**2) if dist<=max_rad: x_radial_dist.append(dist) y_radial_dist.append(self.data[y][x]-self.Sky) x_radial_dist=np.array(x_radial_dist) y_radial_dist=np.array(y_radial_dist) popt, pcov = optimize.curve_fit(gaussian_func, x_radial_dist, y_radial_dist,p0=initial_guess,maxfev=10000000) x_radial_profile = np.linspace(0,9,1000) y_radial_profile = gaussian_func(x_radial_profile,*popt) pedestal= np.mean(y_radial_dist[-100:]) self.fwhm=2*np.sqrt(2*np.log(2))*popt[2] w,v=find_closer(x_radial_profile,self.fwhm/2) if showplot==True: print('Pedestal %.5f'%pedestal) print('FWHM: ',self.fwhm) fig,ax=plt.subplots() ax.set_title('Radial profile') ax.plot(x_radial_dist,y_radial_dist,'o',ms=2) ax.plot(x_radial_profile,y_radial_profile) ax.axhline(y_radial_profile[v],linestyle='--') ax.axvline(self.fwhm/2,linestyle='--') ax.axhline(pedestal,linestyle='--') ax.set_ylim(0,max(y_radial_profile)) plt.show()
[docs] class photometry_AP: "This class handle the aperture photometry"
[docs] def aperture_mask(self,aptype='circular',method='exact',radius1=1,radius2=None,ap_x=2,ap_y=2): ''' Create an aperture based on aptype selection Parameters ---------- aptype : (circular,square,4pixels), optional defin the aperture type to use during aperture photometry. The default is 'cirular'. method : Int or None, optional The method used to determine the overlap of the aperture on the pixel grid. With center they are either 0 or 1, while exact produces partial-pixel masks (i.e., values between 0 and 1). With subpixel, a pixel is divided into subpixels, each of which are considered to be entirely in or out of the aperture depending on whether its center is in or out of the aperture. If subpixels=1, this method is equivalent to 'center'. The aperture weights will contain values between 0 and 1. The default is 'exact'. radius1 : Int, optional radius of circula aperture in pixels or inner radius for annulus aperture. The default is 1. radius2 : Int or None, optional outer radius for annulus aperture. if None use circular aperture. The default is None. ap_x : Int, optional widht of the square aperture. The default is 2. ap_y: Int or None, optional hight of the square aperture. The default is 2. Returns ------- None. ''' self.aptype=aptype self.ap_x=ap_x self.ap_y=ap_y if aptype=='circular': self.radius1=radius1 self.radius2=radius2 if radius2: aperture = CircularAnnulus([self.x,self.y], r_in=self.radius1, r_out=self.radius2) else: aperture = CircularAperture([self.x,self.y], r=self.radius1) self.Nap=aperture.area#len(aperture.data[aperture.data > 0].ravel()) self.aperture = aperture.to_mask(method=method) elif aptype=='square': self.ap_x=ap_x self.ap_y=ap_y aperture = RectangularAperture([self.x,self.y], self.ap_x,self.ap_y) self.Nap=aperture.area self.aperture = aperture.to_mask(method=method) elif aptype == '4pixels': Mask_sum_list=[] Mask_pos_list=[] xint=int(round(self.x)) yint=int(round(self.y)) if yint-1 >= 0 and xint+1<=self.data.shape[1]: M1_pos=[[yint-1,xint+1],[yint,xint+1],[yint-1,xint],[yint,xint]] try: M1_sum=np.nansum([self.data[M1_pos[0][0],M1_pos[0][1]],self.data[M1_pos[1][0],M1_pos[1][1]],self.data[M1_pos[2][0],M1_pos[2][1]],self.data[M1_pos[3][0],M1_pos[3][1]]]) Mask_sum_list.append(M1_sum) Mask_pos_list.append(M1_pos) except: pass if yint+1 <= self.data.shape[0] and xint+1<=self.data.shape[1]: M2_pos=[[yint,xint+1],[yint+1,xint+1],[yint,xint],[yint+1,xint]] try: M2_sum=np.nansum([self.data[M2_pos[0][0],M2_pos[0][1]],self.data[M2_pos[1][0],M2_pos[1][1]],self.data[M2_pos[2][0],M2_pos[2][1]],self.data[M2_pos[3][0],M2_pos[3][1]]]) Mask_sum_list.append(M2_sum) Mask_pos_list.append(M2_pos) except: pass if xint-1 >=0 and yint-1 >= 0 : M3_pos=[[yint-1,xint],[yint,xint],[yint-1,xint-1],[yint,xint-1]] try: M3_sum=np.nansum([self.data[M3_pos[0][0],M3_pos[0][1]],self.data[M3_pos[1][0],M3_pos[1][1]],self.data[M3_pos[2][0],M3_pos[2][1]],self.data[M3_pos[3][0],M3_pos[3][1]]]) Mask_sum_list.append(M3_sum) Mask_pos_list.append(M3_pos) except: pass if xint-1 >= 0 and yint+1 <= self.data.shape[0] : M4_pos=[[yint,xint],[yint+1,xint],[yint,xint-1],[yint+1,xint-1]] try: M4_sum=np.nansum([self.data[M4_pos[0][0],M4_pos[0][1]],self.data[M4_pos[1][0],M4_pos[1][1]],self.data[M4_pos[2][0],M4_pos[2][1]],self.data[M4_pos[3][0],M4_pos[3][1]]]) Mask_sum_list.append(M4_sum) Mask_pos_list.append(M4_pos) except: pass Mask_sel_pos=Mask_sum_list.index(np.nanmax(Mask_sum_list)) self.Nap=len(Mask_pos_list[Mask_sel_pos]) self.aperture=Mask_pos_list[Mask_sel_pos] else: raise ValueError('!!!!! ERROR !!!!! aptype MUST be either circular, square or 4pixels')
[docs] def mask_aperture_data(self,mask_shape=(2,2)): """ Mask the data inside the aperture. Results depending on the selected type of aperture through 'aperture' Parameters ---------- mask_shape : tuple, optional shape dimension for the new data mask . The default is (2,2). Returns ------- None """ if isinstance(self.aperture,(list,np.ndarray)): xs = range(self.data.shape[0]) ys = range(self.data.shape[1]) indices = np.array(list(product(xs, ys))).tolist() A=np.array(indices) B=np.array(self.aperture) dims = np.maximum(B.max(0),A.max(0))+1 sub = A[~np.in1d(np.ravel_multi_index(A.T,dims),np.ravel_multi_index(B.T,dims))] mask=np.ones(self.data.shape).astype(bool) mask[tuple(sub.T)] = False data_mask_in=self.data[mask].reshape(mask_shape) data_mask_out=self.data.astype(float).copy() data_mask_out[mask]=np.nan self.data_mask_out=data_mask_out del data_mask_out elif isinstance(self.aperture,ApertureMask): data_mask_in = self.aperture.multiply(self.data) mask=self.aperture.to_image((self.data.shape[0],self.data.shape[1])).astype(bool) # print(mask) data_mask_out=self.data.astype(float).copy() data_mask_out[mask]=np.nan self.data_mask_out=data_mask_out del data_mask_out self.data_mask_in=data_mask_in
[docs] def aperture_stats(self,aperture=None,sigmaclip=False,sigma=2.5,fill=0,r=3,sat_thr=np.inf): ''' Provide usefull stats about the aperture Parameters ---------- aperture : TYPE, optional type of aperture performed. Can be either a list of positions in the tile or an aperture object. The default is None. sigmaclip : bool, optional choose perform sigma clip on data. The default is False. sigma : float, optional number of sigma clip. The default is 3. fill : float, optional fill the data outside the mask with this value. The default is 0. r : int, optional number of decimals to round to. The default is 3. sat_thr : float, optional threshold (in counts) above which the source is considered saturatated. The default is inf. Returns ------- None. ''' if isinstance(aperture,(list,np.ndarray)): data=np.array([self.data[y,x] for y,x in aperture]) if fill!=0: data[data<=0]=fill if sigmaclip: data = sigma_clip(data[~np.isnan(data)], sigma=sigma) else: data=self.data_mask_in if fill!=0: data[aperture.data<=0]=fill if sigmaclip: data = sigma_clip(data[~np.isnan(data)], sigma=sigma) # data[data<=0]=np.nan self.nsat=(data>=np.float64(sat_thr)).astype(int).sum() self.sum=np.round(np.nansum(data[data>0]),r)#[data>0]) self.mean, self.median, self.std = np.round(sigma_clipped_stats(data[data>0],sigma=sigma),r)
[docs] def grow_curves(self,fig=None,ax=None,gstep=0.25,sigma=4.5,p=2,showplot=False,r_in=1,r_min=3,r_max=15): ''' Evaluate grow curves for aperture photometry Parameters ---------- fig : matplotlib.pyplot plot figure. ax : matplotlib.pyplot plot axis. gstep : float, optional step to create different growcurves. The default is 0.25. sigma : float, optional value of the sigma cut. The default is 3. p : float, optional create different growcurves in range -p, +p. The default is 2. showplot : bool, optional choose to show plots. The default is False. r_in : int, optional starting distance from the center of the tile to consider when evaluate the for growcurves. The default is 1. r_min : int, optional minimum distance from the center of the tile to consider when evaluate the flatness of the for growcurves. The default is 3. r_max : int, optional maximum distance from the center of the tile to consider when evaluate the flatness of the for growcurves. The default is 15. Returns ------- None. ''' counts_list=[] aperture_list=[] sky_list=[] r_list=np.arange(r_in,r_max) for ri in r_list: photometry_AP.aperture_mask(self,radius1=ri) photometry_AP.mask_aperture_data(self) photometry_AP.aperture_stats(self,aperture=self.aperture,sigmaclip=True,sigma=sigma)#,Sky=self.Sky,nSky=self.Nap,eSky=self.eSky) counts_list.append(self.sum-self.Sky*self.Nap) aperture_list.append(self.Nap) sky_list.append(self.Sky) counts_list=np.array(counts_list) sky_list=np.array(sky_list) r_list=np.array(r_list) aperture_list=np.array(aperture_list) corr_list=np.array([i for i in np.arange(p,-p-gstep,-gstep)]) values_list=[] if showplot or fig!=None: if fig==None: fig,ax=plt.subplots(1,1,figsize=(7,7)) list_of_counts=[] for i in corr_list: counts=[] sel=[] for r in range(len(r_list)): counts_temp=(counts_list[r]+(sky_list[r]*aperture_list[r]*(i/100))) counts.append(counts_temp)#/ee_list[r]) if r_list[r]>=r_min and r_list[r]<=r_max: sel.append(True) else:sel.append(False) counts=np.array(counts) list_of_counts.append(counts) mean,med,std=sigma_clipped_stats(counts[sel],sigma=sigma) values_list.append(std) list_of_counts=np.array(list_of_counts) self.grow_corr=np.round(corr_list[np.argmin(values_list)],3) if showplot or fig!=None: ax.plot(r_list,counts_list,'-',color='b',lw=4,label='old')#,color=colors[elno],lw=2.,label='%.2f%%'%(corr_list[elno])) ax.plot(r_list,list_of_counts[np.argmin(values_list)],'-',color='k',lw=4,label='%.2f%%'%(self.grow_corr)) ax.legend(loc='best') if showplot: plt.show()
[docs] class photometry_MF: "This class handle the match filter photometry" # def matched_filter(self,kl_basis=[],sub=1): # """ # The matched filter routine # Parameters # ---------- # kl_basis: the KLIP basis # sub: ?? # Output # ------ # None # """ # if sub>1: # psf=np.kron(self.psf, np.ones((sub,sub)))/(sub*sub) # target=np.kron(self.data, np.ones((sub,sub)))/(sub*sub) # if len(kl_basis)!=0: kl_basis=np.kron(kl_basis, np.ones((sub,sub))) # else: # target=self.data # psf=self.psf # mf = MF.create_matched_filter(psf) # if len(kl_basis)==0: # # if working on model: # thpt = MF.calc_matched_filter_throughput(mf) # else: # # if working on residuals: # locations = np.stack(np.unravel_index(np.arange(target.size), target.shape)).T # thpt = MF.calc_matched_filter_throughput_klip(mf, locations, # kl_basis.reshape([kl_basis.shape[0]] + list(target.shape)), # verbose=False).reshape(target.shape[1],target.shape[0]) # mf_target = MF.apply_matched_filter_fft(target, mf) # self.mf=mf # self.mf_target=mf_target # self.thpt=thpt
[docs] def mf_stats(self,aperture=False,aptype='circular',method='exact',radius1=1,radius2=None,ap_x=1,ap_y=1): """ Provide usefull stats about the MF Parameters ---------- aptype : (circular,square,4pixels), optional defin the aperture type to use during aperture photometry. The default is 'cirular'. method : Int or None, optional The method used to determine the overlap of the aperture on the pixel grid. With center they are either 0 or 1, while exact produces partial-pixel masks (i.e., values between 0 and 1). With subpixel, a pixel is divided into subpixels, each of which are considered to be entirely in or out of the aperture depending on whether its center is in or out of the aperture. If subpixels=1, this method is equivalent to 'center'. The aperture weights will contain values between 0 and 1. The default is 'exact'. radius1 : Int, optional radius of circula aperture in pixels or inner radius for annulus aperture. The default is 1. radius2 : Int or None, optional outer radius for annulus aperture. if None use circular aperture. The default is None. ap_x : Int, optional widht of the square aperture. The default is 1. ap_y: Int or None, optional hight of the square aperture. The default is None. Output ------ None """ if aperture: photometry_AP.aperture_mask(self,aptype=aptype,method=method,radius1=radius1,radius2=radius2,ap_x=ap_x,ap_y=ap_y) photometry_AP.mask_aperture_data(self) # self.data=self.aperture.multiply(self.data) w=np.where(self.aperture.multiply(self.mf_target)==np.nanmax(self.aperture.multiply(self.mf_target))) self.mf_t=self.aperture.multiply(self.mf_target)[w][0] else: # self.mf_target=self.data.copy() w=np.where(self.mf_target==np.nanmax(self.mf_target)) self.mf_t=self.mf_target[w][0] self.Nap=1#len(self.aperture.data[self.aperture.data > 0].ravel()) self.sum=(self.mf_t/self.thpt) self.mf_yx=w
[docs] class photometry_PSF: "This class handle the psf photometry"
[docs] def equivalent_noise_area(self,weavelenght, pixelscale,telescope_aperture=2.3774): """ evaluate equivalent noise area for psf photometry noise estimation Parameters ---------- weavelenght : float filter wavelenght. pixelscale : float instrument pixelscale telescope_aperture : float, optional telescope aperture. The default is 2.3774. Output ------ None """ a=0.5*(weavelenght/(telescope_aperture*u.m.to(u.nm)))*u.rad.to(u.arcsec)/pixelscale #'a in in unit of pixel^2 eq_noise_diameter=2*a eq_noise_area=8*np.pi*a**2 self.Nap=int(round(eq_noise_area-(np.pi*eq_noise_diameter)/(np.sqrt(2)),2)) self.eq_noise_area=eq_noise_area
[docs] def psf_stats(self,fitshape,aperture_radius=5,flux=None,method='subpixel'): ''' Perform PSF photometry and provide usefull stats about the PSF photometry Parameters ---------- fitshape : int,list Rectangular shape around the center of a star which will be used to collect the data to do the fitting. Can be an integer to be the same along both axes. For example, 5 is the same as (5, 5), which means to fit only at the following relative pixel positions: [-2, -1, 0, 1, 2]. Each element of fitshape must be an odd number.. aperture_radius : int, optional The radius (in units of pixels) used to compute initial estimates for the fluxes of sources. aperture_radius must be set if initial flux guesses are not input to the photometry. The default is None. flux : float, optional Estimated flux of the target for a better fit. The default is None. method : Int or None, optional The method used to determine the overlap of the aperture on the pixel grid. With center they are either 0 or 1, while exact produces partial-pixel masks (i.e., values between 0 and 1). With subpixel, a pixel is divided into subpixels, each of which are considered to be entirely in or out of the aperture depending on whether its center is in or out of the aperture. If subpixels=1, this method is equivalent to 'center'. The aperture weights will contain values between 0 and 1. The default is 'subpixel'. Returns ------- None. ''' fitter=LevMarLSQFitter() daogroup = DAOGroup(2*self.fwhm)#*gaussian_sigma_to_fwhm) psf_model = FittableImageModel(self.psf,normalize=True) if (fitshape%2) ==0: fitshape+=1 if flux==None: pos = Table(names=['id','x_0', 'y_0'], data=[[1],[self.x],[self.y]]) photometry = BasicPSFPhotometry(group_maker=daogroup, bkg_estimator=None, psf_model=psf_model, fitter=fitter, fitshape=fitshape, aperture_radius=aperture_radius) else: pos = Table(names=['id','x_0', 'y_0','flux_0'], data=[[1],[self.x],[self.y],[flux]]) photometry = BasicPSFPhotometry(group_maker=daogroup, bkg_estimator=None, psf_model=psf_model, fitter=fitter, fitshape=fitshape) psf_photometry = photometry(image=self.data-self.Sky,init_guesses=pos) phot_table=psf_photometry self.sum=(phot_table['flux_fit'][0]) # ri_shape=int((fitshape-1)/2) self.residual = photometry.get_residual_image() photometry_AP.aperture_mask(self,aptype='square',ap_x=fitshape,ap_y=fitshape,method=method) if hasattr(self, 'edata'): residual_cut=self.aperture.multiply(self.residual) edata_cut=self.aperture.multiply(self.edata) self.chi_sq= (1/(len(edata_cut.ravel())-3))*np.nansum(residual_cut**2/edata_cut**2)