'''
This module implements the Detection class, which defines the Detection object fundamental to perform photomerty, as well ass the
photometry classes (aperture,PSF and matched filter) and lux2counts (and viceversa) transofrmations
'''
import sys
sys.path.append('/')
from straklip.utils.ancillary import find_closer,gaussian_func
import numpy as np
import matplotlib.pyplot as plt
from itertools import product
from photutils import CircularAperture,CircularAnnulus,RectangularAperture
from photutils.psf import BasicPSFPhotometry,DAOGroup
from photutils.aperture.circle import ApertureMask
from photutils.detection import DAOStarFinder,find_peaks
from photutils.psf import FittableImageModel
from astropy import units as u
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.table import Table
from astropy.stats import sigma_clipped_stats
from scipy import optimize
from astropy.stats import sigma_clip
[docs]
class Detection:
"This class define the master Detection object for photometry"
def __init__(self, data, x, y, edata=None, psf=None,fwhm=None,Sky=0,eSky=0,nSky=1,thrpt=1,ethrpt=0,Ei=1,grow_corr=0):
'''
Initialize Detection object
Parameters
----------
data : numpy array
tile from photometric analysis. It is assumed to be in counts.
x : float
x coordinates of brighter pixel to anchor the photometry.
y : float
y coordinates of brighter pixel to anchor the photometry.
edata : numpy array, optional
error tile for PSF photometry. The default is None.
psf : numpy array, optional
psf for MF/PSF photometry. The default is None.
fwhm : float, optional
fwhm for MF/PSF photometry. If not known, try to evaluate from
image using radial_profile in flux_converter. The default is None.
Sky : float, optional
value of the sky in the tile. The default is 0.
eSky : float, optional
error on the sky. The default is 0.
nSky : int, optional
number of pixel used to evaluate the sky. The default is 0.
Returns
-------
None.
'''
self.x=x
self.y=y
self.data=data.copy()
self.edata=edata
self.psf=psf
self.fwhm=fwhm
self.Sky=Sky
self.nSky=nSky
self.eSky=eSky
self.thrpt=thrpt
self.ethrpt=ethrpt
self.Ei=Ei
self.grow_corr=grow_corr
del data
[docs]
def find_sources(self,fwhm=2.5,sigma=5.,std=0):
daofind = DAOStarFinder(fwhm=fwhm, threshold=sigma*std)
self.sources = daofind(self.data).to_pandas()
[docs]
def find_peaks(self,threshold,box_size):
self.peaks = find_peaks(self.data, threshold, box_size=box_size).to_pandas()
[docs]
class flux_converter:
"This class handle transnformation that can be applyed to the output of each photometry"
[docs]
def counts_and_errors(self):
'''
evaluate counts and uncertainties following DAOPHOT formula for aperture/MF photometry or the equivalent noise area for PSF photometry
'''
if hasattr(self,'eq_noise_area'):
self.counts=(self.sum)/(self.Ei*self.thrpt)#-self.Sky
self.ecounts=np.sqrt(self.eq_noise_area*abs(self.Sky)+self.counts)
else:
self.counts=(self.sum-self.Sky*self.Nap)/(self.Ei*self.thrpt)#-self.Sky
var1=self.Nap*self.eSky**2
var2=self.counts
var3=(self.Nap**2*self.eSky**2)/self.nSky
var4=self.ethrpt**2 ### Need to think how the spread in the troughput affect the corrected errors on the counts!
self.ecounts=np.sqrt(var1+var2+var3+var4)
self.Nsigma=np.round(self.counts/self.ecounts,3)
[docs]
def flux2mag(self,zpt=0.0,ezpt=0.0,exptime=1.0):#,gain=1.0):
'''
Convert flux to magnitudes and uncertanties
Parameters
----------
zpt : float, optional
zeropoint for the observation. The default is 0.
zpt : float, optional
error on the zeropoint. The default is 1.
ezpt : float, optional
exposure time for this detection. The default is 1.
exptime : float, optional
exposure time for this detection. The default is 1.
Returns
-------
None.
'''
self.zpt=zpt
self.exptime=exptime
# self.gain=gain
# if self.counts/(self.exptime*self.gain)>0:
# self.mag=-2.5*np.log10((self.counts/(self.exptime*self.gain)))+self.zpt
if self.counts / (self.exptime) > 0:
self.mag=-2.5*np.log10((self.counts/(self.exptime)))+self.zpt
self.emag=np.sqrt((1.0857*(self.ecounts/self.counts))**2+ezpt**2)
else:
self.mag = np.nan
self.emag = np.nan
[docs]
def mag2flux(self): pass
[docs]
def radial_profile(self, max_rad=5,initial_guess = [1,0,1,0],showplot=False):
'''
Evaluate the radial profile of the fistribution of counts in the tile
Parameters
----------
max_rad : int, optional
maximum distance from the target in pixels where to evaluate the profile. The default is 5.
initial_guess : list, optional
initial guess for the gaussian fit. The default is [1,0,1,0].
showplot : bool, optional
choose to show pltos. The default is False.
Returns
-------
None.
'''
x_radial_dist=[]
y_radial_dist=[]
for y in range(int(self.y*2)):
for x in range(int(self.x*2)):
dist=np.sqrt((x-self.x)**2+(y-self.y)**2)
if dist<=max_rad:
x_radial_dist.append(dist)
y_radial_dist.append(self.data[y][x]-self.Sky)
x_radial_dist=np.array(x_radial_dist)
y_radial_dist=np.array(y_radial_dist)
popt, pcov = optimize.curve_fit(gaussian_func, x_radial_dist, y_radial_dist,p0=initial_guess,maxfev=10000000)
x_radial_profile = np.linspace(0,9,1000)
y_radial_profile = gaussian_func(x_radial_profile,*popt)
pedestal= np.mean(y_radial_dist[-100:])
self.fwhm=2*np.sqrt(2*np.log(2))*popt[2]
w,v=find_closer(x_radial_profile,self.fwhm/2)
if showplot==True:
print('Pedestal %.5f'%pedestal)
print('FWHM: ',self.fwhm)
fig,ax=plt.subplots()
ax.set_title('Radial profile')
ax.plot(x_radial_dist,y_radial_dist,'o',ms=2)
ax.plot(x_radial_profile,y_radial_profile)
ax.axhline(y_radial_profile[v],linestyle='--')
ax.axvline(self.fwhm/2,linestyle='--')
ax.axhline(pedestal,linestyle='--')
ax.set_ylim(0,max(y_radial_profile))
plt.show()
[docs]
class photometry_AP:
"This class handle the aperture photometry"
[docs]
def aperture_mask(self,aptype='circular',method='exact',radius1=1,radius2=None,ap_x=2,ap_y=2):
'''
Create an aperture based on aptype selection
Parameters
----------
aptype : (circular,square,4pixels), optional
defin the aperture type to use during aperture photometry.
The default is 'cirular'.
method : Int or None, optional
The method used to determine the overlap of the aperture on the pixel grid.
With center they are either 0 or 1, while exact produces
partial-pixel masks (i.e., values between 0 and 1). With subpixel, a pixel is divided into subpixels,
each of which are considered to be entirely in or out of the aperture depending on whether its center is in
or out of the aperture. If subpixels=1, this method is equivalent to 'center'.
The aperture weights will contain values between 0 and 1.
The default is 'exact'.
radius1 : Int, optional
radius of circula aperture in pixels or inner radius for annulus aperture. The default is 1.
radius2 : Int or None, optional
outer radius for annulus aperture. if None use circular aperture. The default is None.
ap_x : Int, optional
widht of the square aperture. The default is 2.
ap_y: Int or None, optional
hight of the square aperture. The default is 2.
Returns
-------
None.
'''
self.aptype=aptype
self.ap_x=ap_x
self.ap_y=ap_y
if aptype=='circular':
self.radius1=radius1
self.radius2=radius2
if radius2:
aperture = CircularAnnulus([self.x,self.y], r_in=self.radius1, r_out=self.radius2)
else:
aperture = CircularAperture([self.x,self.y], r=self.radius1)
self.Nap=aperture.area#len(aperture.data[aperture.data > 0].ravel())
self.aperture = aperture.to_mask(method=method)
elif aptype=='square':
self.ap_x=ap_x
self.ap_y=ap_y
aperture = RectangularAperture([self.x,self.y], self.ap_x,self.ap_y)
self.Nap=aperture.area
self.aperture = aperture.to_mask(method=method)
elif aptype == '4pixels':
Mask_sum_list=[]
Mask_pos_list=[]
xint=int(round(self.x))
yint=int(round(self.y))
if yint-1 >= 0 and xint+1<=self.data.shape[1]:
M1_pos=[[yint-1,xint+1],[yint,xint+1],[yint-1,xint],[yint,xint]]
try:
M1_sum=np.nansum([self.data[M1_pos[0][0],M1_pos[0][1]],self.data[M1_pos[1][0],M1_pos[1][1]],self.data[M1_pos[2][0],M1_pos[2][1]],self.data[M1_pos[3][0],M1_pos[3][1]]])
Mask_sum_list.append(M1_sum)
Mask_pos_list.append(M1_pos)
except:
pass
if yint+1 <= self.data.shape[0] and xint+1<=self.data.shape[1]:
M2_pos=[[yint,xint+1],[yint+1,xint+1],[yint,xint],[yint+1,xint]]
try:
M2_sum=np.nansum([self.data[M2_pos[0][0],M2_pos[0][1]],self.data[M2_pos[1][0],M2_pos[1][1]],self.data[M2_pos[2][0],M2_pos[2][1]],self.data[M2_pos[3][0],M2_pos[3][1]]])
Mask_sum_list.append(M2_sum)
Mask_pos_list.append(M2_pos)
except:
pass
if xint-1 >=0 and yint-1 >= 0 :
M3_pos=[[yint-1,xint],[yint,xint],[yint-1,xint-1],[yint,xint-1]]
try:
M3_sum=np.nansum([self.data[M3_pos[0][0],M3_pos[0][1]],self.data[M3_pos[1][0],M3_pos[1][1]],self.data[M3_pos[2][0],M3_pos[2][1]],self.data[M3_pos[3][0],M3_pos[3][1]]])
Mask_sum_list.append(M3_sum)
Mask_pos_list.append(M3_pos)
except:
pass
if xint-1 >= 0 and yint+1 <= self.data.shape[0] :
M4_pos=[[yint,xint],[yint+1,xint],[yint,xint-1],[yint+1,xint-1]]
try:
M4_sum=np.nansum([self.data[M4_pos[0][0],M4_pos[0][1]],self.data[M4_pos[1][0],M4_pos[1][1]],self.data[M4_pos[2][0],M4_pos[2][1]],self.data[M4_pos[3][0],M4_pos[3][1]]])
Mask_sum_list.append(M4_sum)
Mask_pos_list.append(M4_pos)
except:
pass
Mask_sel_pos=Mask_sum_list.index(np.nanmax(Mask_sum_list))
self.Nap=len(Mask_pos_list[Mask_sel_pos])
self.aperture=Mask_pos_list[Mask_sel_pos]
else:
raise ValueError('!!!!! ERROR !!!!! aptype MUST be either circular, square or 4pixels')
[docs]
def mask_aperture_data(self,mask_shape=(2,2)):
"""
Mask the data inside the aperture. Results depending on the selected type of aperture through 'aperture'
Parameters
----------
mask_shape : tuple, optional
shape dimension for the new data mask . The default is (2,2).
Returns
-------
None
"""
if isinstance(self.aperture,(list,np.ndarray)):
xs = range(self.data.shape[0])
ys = range(self.data.shape[1])
indices = np.array(list(product(xs, ys))).tolist()
A=np.array(indices)
B=np.array(self.aperture)
dims = np.maximum(B.max(0),A.max(0))+1
sub = A[~np.in1d(np.ravel_multi_index(A.T,dims),np.ravel_multi_index(B.T,dims))]
mask=np.ones(self.data.shape).astype(bool)
mask[tuple(sub.T)] = False
data_mask_in=self.data[mask].reshape(mask_shape)
data_mask_out=self.data.astype(float).copy()
data_mask_out[mask]=np.nan
self.data_mask_out=data_mask_out
del data_mask_out
elif isinstance(self.aperture,ApertureMask):
data_mask_in = self.aperture.multiply(self.data)
mask=self.aperture.to_image((self.data.shape[0],self.data.shape[1])).astype(bool)
# print(mask)
data_mask_out=self.data.astype(float).copy()
data_mask_out[mask]=np.nan
self.data_mask_out=data_mask_out
del data_mask_out
self.data_mask_in=data_mask_in
[docs]
def aperture_stats(self,aperture=None,sigmaclip=False,sigma=2.5,fill=0,r=3,sat_thr=np.inf):
'''
Provide usefull stats about the aperture
Parameters
----------
aperture : TYPE, optional
type of aperture performed. Can be either a list of positions in the tile or an aperture object. The default is None.
sigmaclip : bool, optional
choose perform sigma clip on data. The default is False.
sigma : float, optional
number of sigma clip. The default is 3.
fill : float, optional
fill the data outside the mask with this value. The default is 0.
r : int, optional
number of decimals to round to. The default is 3.
sat_thr : float, optional
threshold (in counts) above which the source is considered saturatated. The default is inf.
Returns
-------
None.
'''
if isinstance(aperture,(list,np.ndarray)):
data=np.array([self.data[y,x] for y,x in aperture])
if fill!=0: data[data<=0]=fill
if sigmaclip: data = sigma_clip(data[~np.isnan(data)], sigma=sigma)
else:
data=self.data_mask_in
if fill!=0: data[aperture.data<=0]=fill
if sigmaclip: data = sigma_clip(data[~np.isnan(data)], sigma=sigma)
# data[data<=0]=np.nan
self.nsat=(data>=np.float64(sat_thr)).astype(int).sum()
self.sum=np.round(np.nansum(data[data>0]),r)#[data>0])
self.mean, self.median, self.std = np.round(sigma_clipped_stats(data[data>0],sigma=sigma),r)
[docs]
def grow_curves(self,fig=None,ax=None,gstep=0.25,sigma=4.5,p=2,showplot=False,r_in=1,r_min=3,r_max=15):
'''
Evaluate grow curves for aperture photometry
Parameters
----------
fig : matplotlib.pyplot
plot figure.
ax : matplotlib.pyplot
plot axis.
gstep : float, optional
step to create different growcurves. The default is 0.25.
sigma : float, optional
value of the sigma cut. The default is 3.
p : float, optional
create different growcurves in range -p, +p. The default is 2.
showplot : bool, optional
choose to show plots. The default is False.
r_in : int, optional
starting distance from the center of the tile to consider when
evaluate the for growcurves. The default is 1.
r_min : int, optional
minimum distance from the center of the tile to consider when
evaluate the flatness of the for growcurves. The default is 3.
r_max : int, optional
maximum distance from the center of the tile to consider when
evaluate the flatness of the for growcurves. The default is 15.
Returns
-------
None.
'''
counts_list=[]
aperture_list=[]
sky_list=[]
r_list=np.arange(r_in,r_max)
for ri in r_list:
photometry_AP.aperture_mask(self,radius1=ri)
photometry_AP.mask_aperture_data(self)
photometry_AP.aperture_stats(self,aperture=self.aperture,sigmaclip=True,sigma=sigma)#,Sky=self.Sky,nSky=self.Nap,eSky=self.eSky)
counts_list.append(self.sum-self.Sky*self.Nap)
aperture_list.append(self.Nap)
sky_list.append(self.Sky)
counts_list=np.array(counts_list)
sky_list=np.array(sky_list)
r_list=np.array(r_list)
aperture_list=np.array(aperture_list)
corr_list=np.array([i for i in np.arange(p,-p-gstep,-gstep)])
values_list=[]
if showplot or fig!=None:
if fig==None: fig,ax=plt.subplots(1,1,figsize=(7,7))
list_of_counts=[]
for i in corr_list:
counts=[]
sel=[]
for r in range(len(r_list)):
counts_temp=(counts_list[r]+(sky_list[r]*aperture_list[r]*(i/100)))
counts.append(counts_temp)#/ee_list[r])
if r_list[r]>=r_min and r_list[r]<=r_max:
sel.append(True)
else:sel.append(False)
counts=np.array(counts)
list_of_counts.append(counts)
mean,med,std=sigma_clipped_stats(counts[sel],sigma=sigma)
values_list.append(std)
list_of_counts=np.array(list_of_counts)
self.grow_corr=np.round(corr_list[np.argmin(values_list)],3)
if showplot or fig!=None:
ax.plot(r_list,counts_list,'-',color='b',lw=4,label='old')#,color=colors[elno],lw=2.,label='%.2f%%'%(corr_list[elno]))
ax.plot(r_list,list_of_counts[np.argmin(values_list)],'-',color='k',lw=4,label='%.2f%%'%(self.grow_corr))
ax.legend(loc='best')
if showplot:
plt.show()
[docs]
class photometry_MF:
"This class handle the match filter photometry"
# def matched_filter(self,kl_basis=[],sub=1):
# """
# The matched filter routine
# Parameters
# ----------
# kl_basis: the KLIP basis
# sub: ??
# Output
# ------
# None
# """
# if sub>1:
# psf=np.kron(self.psf, np.ones((sub,sub)))/(sub*sub)
# target=np.kron(self.data, np.ones((sub,sub)))/(sub*sub)
# if len(kl_basis)!=0: kl_basis=np.kron(kl_basis, np.ones((sub,sub)))
# else:
# target=self.data
# psf=self.psf
# mf = MF.create_matched_filter(psf)
# if len(kl_basis)==0:
# # if working on model:
# thpt = MF.calc_matched_filter_throughput(mf)
# else:
# # if working on residuals:
# locations = np.stack(np.unravel_index(np.arange(target.size), target.shape)).T
# thpt = MF.calc_matched_filter_throughput_klip(mf, locations,
# kl_basis.reshape([kl_basis.shape[0]] + list(target.shape)),
# verbose=False).reshape(target.shape[1],target.shape[0])
# mf_target = MF.apply_matched_filter_fft(target, mf)
# self.mf=mf
# self.mf_target=mf_target
# self.thpt=thpt
[docs]
def mf_stats(self,aperture=False,aptype='circular',method='exact',radius1=1,radius2=None,ap_x=1,ap_y=1):
"""
Provide usefull stats about the MF
Parameters
----------
aptype : (circular,square,4pixels), optional
defin the aperture type to use during aperture photometry.
The default is 'cirular'.
method : Int or None, optional
The method used to determine the overlap of the aperture on the pixel grid.
With center they are either 0 or 1, while exact produces
partial-pixel masks (i.e., values between 0 and 1). With subpixel, a pixel is divided into subpixels,
each of which are considered to be entirely in or out of the aperture depending on whether its center is in
or out of the aperture. If subpixels=1, this method is equivalent to 'center'.
The aperture weights will contain values between 0 and 1.
The default is 'exact'.
radius1 : Int, optional
radius of circula aperture in pixels or inner radius for annulus aperture. The default is 1.
radius2 : Int or None, optional
outer radius for annulus aperture. if None use circular aperture. The default is None.
ap_x : Int, optional
widht of the square aperture. The default is 1.
ap_y: Int or None, optional
hight of the square aperture. The default is None.
Output
------
None
"""
if aperture:
photometry_AP.aperture_mask(self,aptype=aptype,method=method,radius1=radius1,radius2=radius2,ap_x=ap_x,ap_y=ap_y)
photometry_AP.mask_aperture_data(self)
# self.data=self.aperture.multiply(self.data)
w=np.where(self.aperture.multiply(self.mf_target)==np.nanmax(self.aperture.multiply(self.mf_target)))
self.mf_t=self.aperture.multiply(self.mf_target)[w][0]
else:
# self.mf_target=self.data.copy()
w=np.where(self.mf_target==np.nanmax(self.mf_target))
self.mf_t=self.mf_target[w][0]
self.Nap=1#len(self.aperture.data[self.aperture.data > 0].ravel())
self.sum=(self.mf_t/self.thpt)
self.mf_yx=w
[docs]
class photometry_PSF:
"This class handle the psf photometry"
[docs]
def equivalent_noise_area(self,weavelenght, pixelscale,telescope_aperture=2.3774):
"""
evaluate equivalent noise area for psf photometry noise estimation
Parameters
----------
weavelenght : float
filter wavelenght.
pixelscale : float
instrument pixelscale
telescope_aperture : float, optional
telescope aperture. The default is 2.3774.
Output
------
None
"""
a=0.5*(weavelenght/(telescope_aperture*u.m.to(u.nm)))*u.rad.to(u.arcsec)/pixelscale #'a in in unit of pixel^2
eq_noise_diameter=2*a
eq_noise_area=8*np.pi*a**2
self.Nap=int(round(eq_noise_area-(np.pi*eq_noise_diameter)/(np.sqrt(2)),2))
self.eq_noise_area=eq_noise_area
[docs]
def psf_stats(self,fitshape,aperture_radius=5,flux=None,method='subpixel'):
'''
Perform PSF photometry and provide usefull stats about the PSF photometry
Parameters
----------
fitshape : int,list
Rectangular shape around the center of a star which will be used
to collect the data to do the fitting. Can be an integer to be the
same along both axes. For example, 5 is the same as (5, 5),
which means to fit only at the following relative pixel
positions: [-2, -1, 0, 1, 2]. Each element of fitshape must be
an odd number..
aperture_radius : int, optional
The radius (in units of pixels) used to compute initial estimates
for the fluxes of sources. aperture_radius must be set if initial
flux guesses are not input to the photometry. The default is None.
flux : float, optional
Estimated flux of the target for a better fit. The default is None.
method : Int or None, optional
The method used to determine the overlap of the aperture on the pixel grid.
With center they are either 0 or 1, while exact produces
partial-pixel masks (i.e., values between 0 and 1). With subpixel, a pixel is divided into subpixels,
each of which are considered to be entirely in or out of the aperture depending on whether its center is in
or out of the aperture. If subpixels=1, this method is equivalent to 'center'.
The aperture weights will contain values between 0 and 1.
The default is 'subpixel'.
Returns
-------
None.
'''
fitter=LevMarLSQFitter()
daogroup = DAOGroup(2*self.fwhm)#*gaussian_sigma_to_fwhm)
psf_model = FittableImageModel(self.psf,normalize=True)
if (fitshape%2) ==0: fitshape+=1
if flux==None:
pos = Table(names=['id','x_0', 'y_0'], data=[[1],[self.x],[self.y]])
photometry = BasicPSFPhotometry(group_maker=daogroup,
bkg_estimator=None,
psf_model=psf_model,
fitter=fitter,
fitshape=fitshape,
aperture_radius=aperture_radius)
else:
pos = Table(names=['id','x_0', 'y_0','flux_0'], data=[[1],[self.x],[self.y],[flux]])
photometry = BasicPSFPhotometry(group_maker=daogroup,
bkg_estimator=None,
psf_model=psf_model,
fitter=fitter,
fitshape=fitshape)
psf_photometry = photometry(image=self.data-self.Sky,init_guesses=pos)
phot_table=psf_photometry
self.sum=(phot_table['flux_fit'][0])
# ri_shape=int((fitshape-1)/2)
self.residual = photometry.get_residual_image()
photometry_AP.aperture_mask(self,aptype='square',ap_x=fitshape,ap_y=fitshape,method=method)
if hasattr(self, 'edata'):
residual_cut=self.aperture.multiply(self.residual)
edata_cut=self.aperture.multiply(self.edata)
self.chi_sq= (1/(len(edata_cut.ravel())-3))*np.nansum(residual_cut**2/edata_cut**2)