Source code for fantasy_agn.tools

from astropy.io import fits
from itertools import product
from numpy.testing._private.utils import KnownFailureException
from sfdmap2 import sfdmap
import numpy as np
from PyAstronomy import pyasl
from copy import deepcopy
import pylab as plt
from spectres import spectres
from scipy import interpolate, linalg
from scipy.integrate import simps
from sherpa.data import Data1D
from sherpa.fit import Fit
from sherpa.optmethods import LevMar, NelderMead, MonCar, GridSearch
from sherpa.stats import LeastSq, Cash, Chi2, Likelihood, Stat
from sherpa.estmethods import Confidence
import pandas as pd
import glob
import json
import concurrent.futures
from sherpa.models.parameter import Parameter, tinyval
import natsort
from sklearn import preprocessing
from RegscorePy import aic, bic
import multiprocessing.pool
import os

script_dir = os.path.dirname(__file__)
sfdpath = os.path.join(script_dir, "sfddata")
pathEigen = os.path.join(script_dir, "eigen/")
import warnings

warnings.filterwarnings("ignore")


[docs]class spectrum(object): def __init__(self): self._wave = None self._flux = None self._err = None self._z = None self._ra = None self._dec = None self._mjd = None self._plate = None self._fiber = None self._host = None self._agn = None self._style = None
[docs] def DeRedden(self, ebv=0): """ Function for dereddening a flux vector using the parametrization given by [Fitzpatrick (1999)](https://iopscience.iop.org/article/10.1086/316293). Parameters ---------- ebv : float, optional Color excess E(B-V). If not given it will be automatically derived from Dust map data from [Schlegel, Finkbeiner and Davis (1998)](http://adsabs.harvard.edu/abs/1998ApJ...500..525S), by default 0 """ if ebv != 0: self.flux = pyasl.unred(self.wave, self.flux, ebv) else: m = sfdmap.SFDMap(sfdpath) self.flux = pyasl.unred(self.wave, self.flux, m.ebv(self.ra, self.dec))
[docs] def CorRed(self, redshift=0): """ The CorRed function corrects the flux for redshift. It takes in a redshift and corrects the wavelength, flux, and error arrays by that given redshift. :param self: Used to Reference the class object. :param redshift=0: Used to Specify the redshift of the object. :return: The wavelength, flux and error arrays for the object at a redshift of z=0. """ if redshift != 0: self.z = redshift self.wave = self.wave / (1 + self.z) self.flux = self.flux * (1 + self.z) self.err = self.err * (1 + self.z) self.fwhm = self.fwhm / (1 + self.z)
[docs] def rebin(self, minw=3900, maxw=6800): """ The rebin function rebins the spectrum to a smaller wavelength range. The default is 3900-6800 angstroms, but you can specify your own min and max values. :param self: Used to Reference the object itself. :param minw=3900: Used to Set the minimum wavelength of the new array. :param maxw=6800: Used to Define the upper limit of the wavelength range to be used for fitting. :return: The new wavelength and flux values for the spectrum. """ new = np.arange(minw, maxw, 1) flux, err = spectres(new, self.wave, self.flux, spec_errs=self.err) self.flux = flux self.err = err self.wave = new
[docs] def crop(self, xmin=4050, xmax=7300): """ The crop function crops the spectrum to a specified wavelength range. Parameters: xmin (float): The minimum wavelength of the crop region. Default is 4050 Angstroms. xmax (float): The maximum wavelength of the crop region. Default is 7300 Angstroms. Returns: None, but modifies self in place by cropping it to only include wavelengths between xmin and xmax. :param self: Used to Reference the object itself. :param xmin=4050: Used to Set the lower wavelength limit of the cropped spectrum. :param xmax=7300: Used to Select the wavelength range of interest. :return: A new spectrum object with the cropped wavelength range. """ mask = (self.wave > xmin) & (self.wave < xmax) self.wave = self.wave[mask] self.flux = self.flux[mask] self.err = self.err[mask]
[docs] def plot_spec(self): """ The plot_spec function plots the spectra. :param self: Used to Access the class attributes. :return: A tuple containing the x and y values for a plot of the spectrum. """ plt.plot(self.wave, self.flux) plt.show()
[docs] def show_range(self): print("Minimum of wavelength is ", self.wave[0]) print("Maximum of wavelength is ", self.wave[-1])
[docs] def vac_to_air(self): """ Convert vacuum to air wavelengths :param lam_vac - Wavelength in Angstroms :return: lam_air - Wavelength in Angstroms """ self.wave=self.wave/_wave_convert(self.wave)
[docs] def fit_host_sdss(self, mask_host=False, custom=False, mask_list=[]): """ The fit_host_sdss function fits a 5 host galaxy eigenspectra and an 10 AGNs eigenspectra to the observed spectrum. It takes as input: mask_host (bool): If True, it masks the host emission lines using _host_mask function. Default is False. custom (bool): If True, it masks user defined emission lines using _custom_mask function. Default is False. mask_list ([float]): A list of wavelengths in angstroms that will be masked if custom=True .Default is empty list []. Returns: self.(wave,flux,err) : The wavelength array , flux array and error array after fitting for host galaxy and AGN components respectively. :param self: Used to Access variables that belongs to the class. :param mask_host=False: Used to Mask the host. :param custom=False: Used to Mask the host. :param mask_list=[]: Used to Mask the data points that are not used in the fit. :return: The host and the agn component. """ self.wave_old = self.wave self.flux_old = self.flux self.err_old = self.err a = _prepare_host(self.wave, self.flux, self.err, 5, 10) k = _fith(a[3], a[4], a[1]) host = 0 for i in range(5): host += k[i] * a[3][i] if mask_host and custom==False: host= _host_mask(a[0], host) if mask_host and custom==True: host= _custom_mask(a[0], host, mask_list) ag_r = 0 for i in range(10): ag_r += k[5 + i] * a[4][i] if _check_host(host) == True and _check_host(ag_r) == True: self.host = host self.flux = a[1] - host self.agn=ag_r self.wave = a[0] self.err = a[2] plt.figure(figsize=(18, 6)) plt.plot(a[0], a[1]) plt.plot(a[0], host, label="host") plt.plot(a[0], ag_r, label="AGN") plt.plot(a[0], host+ag_r, label="FIT") else: print('Host contribution is negliglable')
[docs] def fit_host(self, mask_host=False, custom=False, mask_list=[]): """ The fit_host function fits a host galaxy and an AGN to the observed spectrum. It takes as input: mask_host (bool): If True, it masks the host emission lines using _host_mask function. Default is False. custom (bool): If True, it masks user defined emission lines using _custom_mask function. Default is False. mask_list ([float]): A list of wavelengths in angstroms that will be masked if custom=True .Default is empty list []. Returns: self.(wave,flux,err) : The wavelength array , flux array and error array after fitting for host galaxy and AGN components respectively. :param self: Used to Access variables that belongs to the class. :param mask_host=False: Used to Mask the host. :param custom=False: Used to Mask the host. :param mask_list=[]: Used to Mask the data points that are not used in the fit. :return: The host and the agn component. """ self.wave_old = self.wave self.flux_old = self.flux self.err_old = self.err lis = list(product(np.linspace(3, 10, 8), np.linspace(3, 15, 13))) ch = [] HOST = [] AGN = [] FIT = [] a = _prepare_host(self.wave, self.flux, self.err, 10, 15) for i in range(len(lis)): gal = int(lis[i][0]) qso = int(lis[i][1]) hs = a[3][:gal] ags = a[4][:qso] k = _fith(hs, ags, a[1]) host = 0 for i in range(gal): host += k[i] * a[3][i] if mask_host and custom==False: host= _host_mask(a[0], host) if mask_host and custom==True: host= _custom_mask(a[0], host, mask_list) ag_r = 0 for i in range(qso): ag_r += k[gal + i] * a[4][i] if _check_host(host) == True and _check_host(ag_r) == True: HOST.append(host) AGN.append(ag_r) FIT.append(host + ag_r) ch.append(_calculate_chi(a[1], a[2], host + ag_r, k)) if not ch: print( "Uuups... All of the combinations returned negative host.Better luck with other AGN" ) self.host=np.zeros(len(a[0])) else: idx = _find_nearest(ch) print("Number of galaxy components ", int(lis[idx][0])) print("Number of QSO components ", int(lis[idx][1])) print("Reduced chi square ", ch[idx]) plt.figure(figsize=(18, 6)) plt.plot(a[0], a[1]) plt.plot(a[0], HOST[idx], label="host") plt.plot(a[0], AGN[idx], label="AGN") plt.plot(a[0], FIT[idx], label="FIT") self.host = HOST[idx] self.flux = a[1] - HOST[idx] self.agn=AGN[idx] self.wave = a[0] self.err = a[2] plt.legend() plt.savefig(self.name+'_host.pdf')
[docs] def restore(self): """ Restore the spectra (wave, flux and error) to the one before host removal """ self.wave = self.wave_old self.flux = self.flux_old self.err = self.err_old
[docs] def plot(self): fig = px.line(x=self.wave, y=self.flux) fig.show()
[docs] def fit(self, model, ntrial=1): """ The fit function fits a model to the data. It returns a tuple of (model, fit results). :param self: Used to Reference the class object. :param model: Used to Define the model that is used in the fit. :param ntrial=1: Used to Specify the number of times we want to repeat the fit. :return: The results of the fit. """ stat=Chi2() method=LevMar() d = Data1D("AGN", self.wave, self.flux, self.err) gfit = Fit(d, model, stat=stat, method=method) gfit = Fit(d, model, stat=stat, method=method) gres = gfit.fit() statistic=gres.dstatval print('stati', statistic) if ntrial > 1: i=0 while i < ntrial: gfit = Fit(d, model, stat=stat, method=method) gres = gfit.fit() print(i+1, "iter stat: ", gres.dstatval) if gres.dstatval==statistic: break i+=1 self.gres = gres self.d = d self.model = model mod = model
[docs] def save_json(self, suffix="pars"): """ The save_json function saves the parameter values in a JSON file. The filename is constructed from the name of the model and either 'pars' or 'samples'. :param self: Used to Refer to the object itself. :param suffix='pars': Used to Specify the name of the file that is saved. :return: A dictionary of the parameter names and values. """ dicte = zip(self.gres.parnames, self.gres.parvals) res = dict(dicte) filename = self.name + "_" + suffix + ".json" with open(filename, "w") as fp: json.dump(res, fp)
@staticmethod def __fitting(d): model = mod stat = LeastSq() method = LevMar() gfit = Fit(d, model, stat=stat, method=method) gres = gfit.fit() return list(gres.parvals)
[docs] def monte_carlo(self, nsample=10, save_files=True): """ The monte_carlo function performs Monte Carlo resampling of the data by perturbing the flux based on the input error. It returns a dictionary containing the fit parameters and their uncertainties. :param self: Used to Reference the class object. :param model: Used to Define the model that is used in the fit. :param nsample=100: Used to Specify the number of Monte Carlo samples to generate. :return: A dictionary containing the fit parameters and their uncertainties. """ dict_list = [] for i in range(nsample): # Perturb the flux based on the input error flux_perturbed = np.random.normal(loc=self.flux, scale=self.err) # Fit the model to the perturbed data d_perturbed = Data1D("AGN", self.wave, flux_perturbed, self.err) gfit_perturbed = Fit(d_perturbed, self.model, stat=Chi2(), method=LevMar()) gres_perturbed = gfit_perturbed.fit() if save_files == True: dicte = zip(gres_perturbed.parnames, gres_perturbed.parvals) res = dict(dicte) dict_list.append(res) else: pass df=pd.DataFrame(dict_list) df.to_csv(self.name+'_pars'+'.csv')
[docs]class read_sdss(spectrum): def __init__(self, filename): super(read_sdss, self).__init__() hdulist = fits.open(filename) hdu = hdulist[1] data = hdu.data self.z = hdulist[2].data["z"][0] self.ra = hdulist[0].header["plug_ra"] self.dec = hdulist[0].header["plug_dec"] self.mjd = hdulist[0].header["mjd"] self.plate = hdulist[0].header["plateid"] self.fiber = hdulist[0].header["fiberid"] self.name = filename.split(".")[0] try: y = data.flux x = 10 ** (data.loglam) iv = data.ivar except AttributeError: y = data.FLUX x = 10 ** (data.LOGLAM) iv = data.IVAR super_threshold_indices = iv == 0 iv[super_threshold_indices] = np.median(iv) x=_vac_to_air(x) self.err = 1.0 / np.sqrt(iv) self.flux = y self.wave = x c = 299792.458 wdisp = data["wdisp"] frac = self.wave[1] / self.wave[0] dlam = (frac - 1) * self.wave fwhm = 2.355 * wdisp * dlam velscale = np.log(frac) * c self.fwhm = fwhm self.velscale = velscale
[docs]class read_gama_fits(spectrum): def __init__(self, filename): """Class for reading [GAMA](http://www.gama-survey.org/) spectrum file as ```spectrum``` object. Extracting the - Wavelength - Flux - Flux error - Redshift - Ra and Dec - Plate, Fiber and MJD Parameters ---------- filename : .fits Name of the SDSS fits file to read """ super(read_gama_fits, self).__init__() hdu = fits.open(filename) if hdu[0].header["TELESCOP"] == "SDSS 2.5-M": h1 = hdu[0].header lam_temp = h1["CRVAL1"] + h1["CDELT1"] * np.arange(h1["NAXIS1"]) wave = 10 ** lam_temp self.wave = wave else: wmin = hdu[0].header["WMIN"] wmax = hdu[0].header["WMAX"] NAXIS1 = hdu[0].header["NAXIS1"] self.wave = np.linspace(wmin, wmax, NAXIS1) data = hdu[0].data flux = data[0].tolist() c = 299792.458 self.ra = hdu[0].header["RA"] self.dec = hdu[0].header["DEC"] self.z = hdu[0].header["Z"] self.name = filename.split(".")[-2] dataset = pd.DataFrame( {"wave": self.wave, "flux": flux, "err": data[1].tolist()}, index=None ) dataset = dataset.interpolate() dataset = dataset.dropna() # flux=np.asarray(flux) self.wave = dataset.wave.to_numpy() # flux[np.isnan(flux)]=0 self.flux = dataset.flux.to_numpy() self.err = dataset.err.to_numpy() # wdisp = data['wdisp'] frac = self.wave[1] / self.wave[0] dlam = (frac - 1) * self.wave fwhm = 2.355 * dlam velscale = np.log(frac) * c self.fwhm = fwhm self.velscale = velscale
[docs]class make_spec(spectrum): def __init__(self, wav, fl, er, ra = None, dec = None, z = None, name='spectrum'): super(make_spec,self).__init__() self.wave = wav self.flux = fl self.err = er self.ra = ra self.dec = dec self.z = z self.name =name c = 299792.458 frac = self.wave[1] / self.wave[0] dlam = (frac - 1) * self.wave fwhm = 2.355 * dlam velscale = np.log(frac) * c self.fwhm = fwhm
[docs]class read_text(spectrum): def __init__(self, filename): """Class to read ASCI like file as as ```spectrum``` object. Parameters ---------- filename : ASCII File name should be three or two columns where first column is wavelength, second flux and third (optinal) flux error. """ super(read_text, self).__init__() try: self.wave, self.flux, self.err = np.genfromtxt(filename, unpack=True) except ValueError: self.wave, self.flux = np.genfromtxt(filename, unpack=True) self.err = np.ones_like(self.wave) c = 299792.458 # wdisp = data['wdisp'] frac = self.wave[1] / self.wave[0] dlam = (frac - 1) * self.wave fwhm = 2.355 * dlam velscale = np.log(frac) * c self.fwhm = fwhm self.velscale = velscale self.name = filename.split(".")[-2]
def _maskingEigen(x): """Preparing mask for eigen spectra host preparation Parameters ---------- x : array Spectrum wavelength to mask in order to rebin Returns ------- boolean array """ if min(x) > 3450: minx = min(x) else: minx = 3450 if max(x) < 7000: maxx = max(x) - 10 else: maxx = 7200 - 10 mask = (x > minx) & (x < maxx) return mask def _prepare_host(wave, flux, err, galaxy=5, agns=10): """Preparing qso and galaxy eigen spectra Parameters ---------- wave : array Spectrum wavelength flux : array Spectrum flux err : array Spectrum error flux galaxy : int, optional number of galaxy eigen spectra, by default 5 agns : int, optional number of qso eigen spectra, by default 10 Returns ------- data cube rebbined spectra and derived host galaxy """ if galaxy > 10 or agns > 50: print( "number of galaxy eigenspectra has to be less than 10 and QSO eigenspectra less than 50" ) else: glx = fits.open(pathEigen + "gal_eigenspec_Yip2004.fits") glx = glx[1].data gl_wave = glx["wave"].flatten() gl_wave=_vac_to_air(gl_wave) gl_flux = glx["pca"].reshape(glx["pca"].shape[1], glx["pca"].shape[2]) qso = fits.open(pathEigen + "qso_eigenspec_Yip2004_global.fits") qso = qso[1].data qso_wave = qso["wave"].flatten() qso_wave=_vac_to_air(qso_wave) qso_flux = qso["pca"].reshape(qso["pca"].shape[1], qso["pca"].shape[2]) mask = _maskingEigen(wave) wave1 = wave[mask] flux, err = spectres(wave1, wave, flux, spec_errs=err) qso_prime = [] g_prime = [] for i in range(galaxy): gix = _resam(wave, gl_wave, gl_flux[i]) g_prime.append(gix) for i in range(agns): yi = _resam(wave, qso_wave, qso_flux[i]) qso_prime.append(yi) # self.wave = wave1 # np.save('gal.npy', g_prime) return wave1, flux, err, g_prime, qso_prime return def _resam(x, wv, flx): """[summary] Args: x ([type]): [description] wv ([type]): [description] flx ([type]): [description] Returns: [type]: [description] """ mask = _maskingEigen(x) y = spectres(x[mask], wv, flx) return y def _fitHost(wave, flux, err, galax, qsos): a = _prepare_host(wave, flux, err, galax, qsos) A = np.vstack(a[3], a[4]).T k = np.linalg.lstsq(A, a[1], rcond=None)[0] def _calculate_chi(data, er, func, coef): diff = pow(data - func, 2.0) test_statistic = (diff / pow(er, 2.0)).sum() NDF = len(data) - len(coef) return test_statistic / NDF def _fith(hs, ags, data): A = np.vstack([hs, ags]).T W=1/data**2 Aw = A * np.sqrt(W[:,np.newaxis]) dataw=data * np.sqrt(W) k = np.linalg.lstsq(Aw, dataw, rcond=None)[0] return k def _find_nearest(array): array = np.asarray(array) idx = (np.abs(array - 1)).argmin() return idx def _check_host(hst): if np.sum(np.where(hst < 0.0, True, False)) > 100: return False else: return True def _randomize(d): flux = d.y + np.random.randn(len(d.y)) * d.staterror d = Data1D("AGN", d.x, flux, d.staterror) return d def _host_mask(wave, host): line_mask = np.where( (wave < 4970.) & (wave > 4950.) | (wave < 5020.) & (wave > 5000.) | (wave < 6590.) & (wave > 6540.) | (wave < 3737.) & (wave > 3717.) | (wave < 4872.) & (wave > 4852.) | (wave < 4350.) & (wave > 4330.) | (wave < 6750.) & (wave > 6600.) | (wave < 4111.) & (wave > 4091.), True, False) f = interpolate.interp1d(wave[~line_mask],host[~line_mask], bounds_error = False, fill_value = 0) return f(wave) def _custom_mask(wave, host, mask_list): line_mask=np.where(wave, False, True) for m in mask_list: mask = np.where( (wave < m[1]) & (wave > m[0]), True, False) line_mask += mask f = interpolate.interp1d(wave[~line_mask],host[~line_mask], bounds_error = False, fill_value = 0) return f(wave) def _wave_convert(lam): """ Convert between vacuum and air wavelengths using equation (1) of Ciddor 1996, Applied Optics 35, 1566 http://doi.org/10.1364/AO.35.001566 :param lam - Wavelength in Angstroms :return: conversion factor """ lam = np.asarray(lam) sigma2 = (1e4/lam)**2 fact = 1 + 5.792105e-2/(238.0185 - sigma2) + 1.67917e-3/(57.362 - sigma2) return fact def _vac_to_air(lam_vac): """ Convert vacuum to air wavelengths :param lam_vac - Wavelength in Angstroms :return: lam_air - Wavelength in Angstroms """ return lam_vac/_wave_convert(lam_vac)