#%%
import astropy.units as u
import numpy as np
import os
from shapely.geometry import Polygon
from astropy.coordinates import SkyCoord
from astropy.io import ascii
from astropy.time import Time
import time
from astropy.table import Table, vstack
from astroquery.vizier import Vizier
from pathlib import Path
from ezphot.helper import Helper
#%%
[docs]
class SkyCatalog:
"""
SkyCatalog class is used to query the sky catalog and get the catalog data.
This class acts as an instance of reference catalog for a given object.
"""
[docs]
def __init__(self,
objname : str = None,
ra = None, # in deg
dec = None, # in deg
fov_ra : float = 1.3,
fov_dec : float = 0.9,
catalog_type : str = 'GAIAXP', #GAIAXP, GAIA, APASS, PS1, SDSS, SMSS, GAIAXP_CORR_LAMOST
catalog_version: str = 'v1',
overlapped_fraction : float = 0.8,
verbose : bool = True
):
"""
Initialize the SkyCatalog instance
Parameters
----------
objname : str, optional
Object name to query. If not provided, the object name will be inferred from the ra and dec.
ra : float, optional
Right ascension in degrees. If not provided, the object name will be inferred from SIMBAD object name.
dec : float, optional
Declination in degrees. If not provided, the object name will be inferred from SIMBAD object name.
fov_ra : float, optional
Field of view in right ascension in degrees
fov_dec : float, optional
Field of view in declination in degrees
catalog_type : str, optional
Catalog type to query
overlapped_fraction : float, optional
Overlapped fraction of the field of view
verbose : bool, optional
Whether to print verbose output
"""
if catalog_type not in ['GAIAXP', 'GAIA', 'APASS', 'PS1', 'SDSS', 'SMSS', 'GAIAXP_CORR_LAMOST']:
raise ValueError('Invalid catalog type: %s' % catalog_type)
self.helper = Helper()
self.objname = objname
self.ra = ra
self.dec = dec
self.fov_ra = fov_ra
self.fov_dec = fov_dec
self.catalog_type = catalog_type
self.catalog_version = catalog_version
self.overlapped_fraction = overlapped_fraction
self.filename = None
self.summary_path = os.path.join(self.helper.config['CATALOG_DIR'], 'summary.ascii_fixed_width')
self._register_objinfo(objname = objname, ra = ra, dec = dec, fov_ra = fov_ra, fov_dec = fov_dec, catalog_type = catalog_type, catalog_version = catalog_version, verbose = verbose)
self._get_catalog(catalog_type = catalog_type, verbose = verbose)
def __repr__(self):
txt = f"SkyCatalog[objname = {self.objname}, type = {self.catalog_type}, version = {self.catalog_version}]"
return txt
def _query(self, catalog_name: str = 'APASS', verbose: bool = False):
# APASS DR9
def apass_query(ra_deg, dec_deg, rad_deg, maxmag = 20, minmag = 10, maxsources=100000):
"""
Query APASS @ VizieR using astroquery.vizier
:param ra_deg: RA in degrees
:param dec_deg: Declination in degrees
:param rad_deg: field radius in degrees
:param maxmag: upper limit G magnitude (optional)
:param maxsources: maximum number of sources
:return: astropy.table object
"""
vquery = Vizier(columns=['*'],
column_filters={"Bmag":
("<%f" % maxmag),
"Vmag":
("<%f" % maxmag),
"g'mag":
("<%f" % maxmag),
"r'mag":
("<%f" % maxmag),
"i'mag":
("<%f" % maxmag),
"Bmag":
(">%f" % minmag),
"Vmag":
(">%f" % minmag),
"g'mag":
(">%f" % minmag),
"r'mag":
(">%f" % minmag),
"i'mag":
(">%f" % minmag),
},
row_limit=maxsources)
field = SkyCoord(ra=ra_deg, dec=dec_deg,
unit=(u.deg, u.deg),
frame='icrs')
query_data = vquery.query_region(field,
width=("%fd" % rad_deg),
catalog="II/336/apass9")
if len(query_data) > 0:
return query_data[0]
else:
return None
# SDSS DR12
def sdss_query(ra_deg, dec_deg, rad_deg, maxmag = 20, minmag = 10,maxsources=100000):
"""
Query SDSS @ VizieR using astroquery.vizier
:param ra_deg: RA in degrees
:param dec_deg: Declination in degrees
:param rad_deg: field radius in degrees
:param maxmag: upper limit G magnitude (optional)
:param maxsources: maximum number of sources
:return: astropy.table object
"""
vquery = Vizier(columns=['*'],
column_filters={"gmag":
("<%f" % maxmag),
"rmag":
("<%f" % maxmag),
"imag":
("<%f" % maxmag),
"gmag":
(">%f" % minmag),
"rmag":
(">%f" % minmag),
"imag":
(">%f" % minmag)
},
row_limit=maxsources)
field = SkyCoord(ra=ra_deg, dec=dec_deg,
unit=(u.deg, u.deg),
frame='icrs')
query_data = vquery.query_region(field,
width=("%fd" % rad_deg),
catalog="V/147/sdss12")
if len(query_data) > 0:
return query_data[0]
else:
return None
# PanSTARRS DR1
def ps1_query(ra_deg, dec_deg, rad_deg, maxmag = 20, minmag = 10, maxsources= 500000):
"""
Query PanSTARRS @ VizieR using astroquery.vizier
:param ra_deg: RA in degrees
:param dec_deg: Declination in degrees
:param rad_deg: field radius in degrees
:param maxmag: upper limit G magnitude (optional)
:param maxsources: maximum number of sources
:return: astropy.table object
"""
vquery = Vizier(columns=['*'],
column_filters={"gmag":
("<%f" % maxmag),
"rmag":
("<%f" % maxmag),
"imag":
("<%f" % maxmag),
"gmag":
(">%f" % minmag),
"rmag":
(">%f" % minmag),
"imag":
(">%f" % minmag),
},
row_limit=maxsources)
field = SkyCoord(ra=ra_deg, dec=dec_deg,
unit=(u.deg, u.deg),
frame='icrs')
query_data = vquery.query_region(field,
width=("%fd" % rad_deg),
catalog="II/349/ps1")
if len(query_data) > 0:
return query_data[0]
else:
return None
# SkyMapper DR4
def smss_query(ra_deg, dec_deg, rad_deg, maxmag = 20, minmag = 10, maxsources=1000000):
"""
Query PanSTARRS @ VizieR using astroquery.vizier
:param ra_deg: RA in degrees
:param dec_deg: Declination in degrees
:param rad_deg: field radius in degrees
:param maxmag: upper limit G magnitude (optional)
:param maxsources: maximum number of sources
:return: astropy.table object
"""
vquery = Vizier(columns=['ObjectId','RAICRS','DEICRS','Niflags','flags','Ngood','Ngoodu','Ngoodv','Ngoodg','Ngoodr','Ngoodi','Ngoodz','ClassStar','uPSF','e_uPSF','vPSF','e_vPSF','gPSF','e_gPSF','rPSF','e_rPSF','iPSF','e_iPSF','zPSF','e_zPSF'],
column_filters={"gPSF":
("<%f" % maxmag),
"rPSF":
("<%f" % maxmag),
"iPSF":
("<%f" % maxmag),
"gmag":
(">%f" % minmag),
"rmag":
(">%f" % minmag),
"imag":
(">%f" % minmag),
},
row_limit=maxsources)
field = SkyCoord(ra=ra_deg, dec=dec_deg,
unit=(u.deg, u.deg),
frame='icrs')
query_data = vquery.query_region(field,
width=("%fd" % rad_deg),
catalog="II/379/smssdr4")
if len(query_data) > 0:
return query_data[0]
else:
return None
# SkyMapper DR4
def gaia_query(ra_deg, dec_deg, rad_deg, maxmag = 20, minmag = 10, maxsources=1000000):
"""
Query PanSTARRS @ VizieR using astroquery.vizier
:param ra_deg: RA in degrees
:param dec_deg: Declination in degrees
:param rad_deg: field radius in degrees
:param maxmag: upper limit G magnitude (optional)
:param maxsources: maximum number of sources
:return: astropy.table object
"""
vquery = Vizier(columns=['RA_ICRS', 'DE_ICRS', 'E_BP_RP_corr', 'Bmag', 'BFlag', 'Vmag', 'VFlag', 'Rmag', 'RFlag', 'gmag', 'gFlag', 'rmag', 'rFlag', 'imag', 'iFlag'],
row_limit=maxsources)
field = SkyCoord(ra=ra_deg, dec=dec_deg,
unit=(u.deg, u.deg),
frame='icrs')
query_data = vquery.query_region(field,
width=("%fd" % rad_deg),
catalog="I/360/syntphot")
query_data[0]['e_Bmag'] = 0.02
query_data[0]['e_Vmag'] = 0.02
query_data[0]['e_Rmag'] = 0.02
query_data[0]['e_gmag'] = 0.02
query_data[0]['e_rmag'] = 0.02
query_data[0]['e_imag'] = 0.02
if len(query_data) > 0:
return query_data[0]
else:
return None
if catalog_name == 'APASS':
self.catalog_type = 'APASS'
self.helper.print('Start APASS query...', verbose)
data = apass_query(ra_deg = float(self.ra), dec_deg = float(self.dec), rad_deg = np.max([self.fov_ra, self.fov_dec]))
elif catalog_name == 'PS1':
self.catalog_type = 'PS1'
self.helper.print('Start PS1 query...', verbose)
data = ps1_query(ra_deg = float(self.ra), dec_deg = float(self.dec), rad_deg = np.max([self.fov_ra, self.fov_dec]))
elif catalog_name == 'SDSS':
self.catalog_type = 'SDSS'
self.helper.print('Start SDSS query...', verbose)
data = sdss_query(ra_deg = float(self.ra), dec_deg = float(self.dec), rad_deg = np.max([self.fov_ra, self.fov_dec]))
elif catalog_name == 'SMSS':
self.catalog_type = 'SMSS'
self.helper.print('Start SMSS query...', verbose)
data = smss_query(ra_deg = float(self.ra), dec_deg = float(self.dec), rad_deg = np.max([self.fov_ra, self.fov_dec]))
elif catalog_name == 'GAIA':
self.catalog_type = 'GAIA'
self.helper.print('Start GAIA query...', verbose)
data = gaia_query(ra_deg = float(self.ra), dec_deg = float(self.dec), rad_deg = np.max([self.fov_ra, self.fov_dec]))
else:
raise ValueError(f'{catalog_name} is not supported (supported catalogs: APASS, PS1, SDSS, SMSS, GAIA)')
if not data:
raise ValueError(f'{catalog_name} is not found in the catalog')
return data
def get_reference_sources(self, mag_lower : float = 10, mag_upper : float = 20, **kwargs):
if not self.data:
raise RuntimeError(f'No catalog data found for {self.objname}')
# For APASS Cut
cutline_apass = dict(e_ra = [0, 0.5], e_dec = [0, 0.5], e_V_mag = [0.01, 0.05], V_mag = [mag_lower, mag_upper])
# For GAIA cut
cutline_gaia = dict(V_flag = [0,1], V_mag = [mag_lower, mag_upper])
# For GAIAXP cut pmra, pmdec for astrometric reference stars, bp-rp for color
cutline_gaiaxp = {"pmra" : [-20,20], "pmdec" : [-20,20], "bp-rp" : [0.0, 1.5], "g_mean" : [mag_lower, mag_upper]}
# For PS1 cut
cutline_ps1 = {"gFlags": [0,10], "g_mag": [mag_lower, mag_upper]}
# For SMSS cut
cutline_smss = {"ngood": [20,999], "class_star": [0.8, 1.0], "g_mag": [mag_lower, mag_upper]}
# For GAIAXP_CORR_LAMOST cut
cutline_gaiaxp_corr_lamost = {"pmra": [-20, 20], "pmdec": [-20, 20], "bp-rp": [0.0, 1.5], "g_mean": [mag_lower, mag_upper]}
if self.catalog_type == 'APASS':
cutline = cutline_apass
elif self.catalog_type == 'GAIA':
cutline = cutline_gaia
elif self.catalog_type == 'GAIAXP':
cutline = cutline_gaiaxp
elif self.catalog_type == 'PS1':
cutline = cutline_ps1
elif self.catalog_type == 'SMSS':
cutline = cutline_smss
elif self.catalog_type == 'GAIAXP_CORR_LAMOST':
cutline = cutline_gaiaxp_corr_lamost
else:
raise ValueError('Invalid catalog type: %s' % self.catalog_type)
cutline = {**cutline, **kwargs}
ref_sources = self.data
applied_kwargs = []
for key, value in cutline.items():
if key in ref_sources.colnames:
applied_kwargs.append({key : [value]})
ref_sources = ref_sources[(ref_sources[key] > value[0]) & (ref_sources[key] < value[1])]
return ref_sources, applied_kwargs
def _get_catalog(self, catalog_type : str, verbose : bool = True):
if catalog_type == 'GAIAXP':
self._get_GAIAXP(verbose = verbose)
elif catalog_type == 'GAIA':
self._get_GAIA(verbose = verbose)
elif catalog_type == 'APASS':
self._get_APASS(verbose = verbose)
elif catalog_type == 'PS1':
self._get_PS1(verbose = verbose)
elif catalog_type == 'SDSS':
self._get_SDSS(verbose = verbose)
elif catalog_type == 'SMSS':
self._get_SMSS(verbose = verbose)
elif catalog_type == 'GAIAXP_CORR_LAMOST':
self._get_GAIAXP_CORR_LAMOST(verbose = verbose)
else:
raise ValueError('Invalid catalog type: %s' % catalog_type)
def _get_GAIAXP(self, verbose = False):
def GAIAXP_format(GAIAXP_catalog) -> Table:
original = ('source_id', 'ra', 'dec', 'parallax', 'pmra', 'pmdec', 'phot_g_mean_mag', 'bp_rp', 'mag_u', 'mag_g', 'mag_r', 'mag_i', 'mag_z', 'mag_m375w', 'mag_m400', 'mag_m412', 'mag_m425', 'mag_m425w', 'mag_m437', 'mag_m450', 'mag_m462', 'mag_m475', 'mag_m487', 'mag_m500', 'mag_m512', 'mag_m525', 'mag_m537', 'mag_m550', 'mag_m562', 'mag_m575', 'mag_m587', 'mag_m600', 'mag_m612', 'mag_m625', 'mag_m637', 'mag_m650', 'mag_m662', 'mag_m675', 'mag_m687', 'mag_m700', 'mag_m712', 'mag_m725', 'mag_m737', 'mag_m750', 'mag_m762', 'mag_m775', 'mag_m787', 'mag_m800', 'mag_m812', 'mag_m825', 'mag_m837', 'mag_m850', 'mag_m862', 'mag_m875', 'mag_m887')
format_ = ('id', 'ra', 'dec', 'parallax', 'pmra', 'pmdec', 'g_mean', 'bp-rp', 'u_mag', 'g_mag', 'r_mag', 'i_mag', 'z_mag', 'm375w_mag', 'm400_mag', 'm412_mag', 'm425_mag', 'm425w_mag', 'm437_mag', 'm450_mag', 'm462_mag', 'm475_mag', 'm487_mag', 'm500_mag', 'm512_mag', 'm525_mag', 'm537_mag', 'm550_mag', 'm562_mag', 'm575_mag', 'm587_mag', 'm600_mag', 'm612_mag', 'm625_mag', 'm637_mag', 'm650_mag', 'm662_mag', 'm675_mag', 'm687_mag', 'm700_mag', 'm712_mag', 'm725_mag', 'm737_mag', 'm750_mag', 'm762_mag', 'm775_mag', 'm787_mag', 'm800_mag', 'm812_mag', 'm825_mag', 'm837_mag', 'm850_mag', 'm862_mag', 'm875_mag', 'm887_mag')
GAIAXP_catalog.rename_columns(original, format_)
formatted_catalog = self._match_digit_tbl(GAIAXP_catalog)
return formatted_catalog
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
else:
raise ValueError(f'{self.objname} does not exist in GAIAXP catalog')
self.data = None
if data:
self.data = GAIAXP_format(data)
def _get_GAIAXP_CORR_LAMOST(self, verbose = False):
def GAIAXP_CORR_LAMOST_format(GAIAXP_catalog) -> Table:
# Already formatted
# original = ()
# format_ = ()
# GAIAXP_catalog.rename_columns(original, format_)
formatted_catalog = self._match_digit_tbl(GAIAXP_catalog)
return formatted_catalog
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
else:
raise ValueError(f'{self.objname} does not exist in GAIAXP catalog')
self.data = None
if data:
self.data = GAIAXP_CORR_LAMOST_format(data)
def _get_GAIA(self, verbose = True):
def GAIA_format(GAIA_catalog) -> Table:
original = ('RA_ICRS', 'DE_ICRS', 'Bmag', 'e_Bmag', 'BFlag', 'Vmag', 'e_Vmag', 'VFlag', 'Rmag', 'e_Rmag', 'RFlag', 'gmag', 'e_gmag', 'gFlag', 'rmag', 'e_rmag', 'rFlag', 'imag', 'e_imag', 'iFlag')
format_ = ('ra', 'dec', 'B_mag', 'e_B_mag', 'B_flag', 'V_mag', 'e_Vmag', 'V_flag', 'R_mag', 'e_Rmag', 'R_flag', 'g_mag', 'e_gmag', 'g_flag', 'r_mag', 'e_rmag', 'r_flag', 'i_mag', 'e_imag', 'i_flag')
GAIA_catalog.rename_columns(original, format_)
if 'E_BP_RP_corr' in GAIA_catalog.colnames:
GAIA_catalog.rename_columns(['E_BP_RP_corr'], ['c_star'])
else:
GAIA_catalog['c_star'] = 0
formatted_catalog = self._match_digit_tbl(GAIA_catalog)
return formatted_catalog
# If filename is defined by _register_objinfo function (meaning located in catalog_archive), Query from archive
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
# Else, query object in the catalog and save to catalog_archive
else:
try:
# Try query and save to catalog_archive
data = self._query(catalog_name = 'GAIA', verbose = verbose)
filename = f'{self.objname}_GAIA.csv'
catalog_file = os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type, filename)
os.makedirs(os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type), exist_ok = True)
data.write(catalog_file, format ='csv', overwrite = True)
summary_tbl = ascii.read(self.summary_path, format = 'fixed_width')
catalog_file = Path(catalog_file)
file_info = catalog_file.stat()
rel_path = catalog_file.relative_to(self.helper.config['CATALOG_DIR'])
summary_tbl.add_row([str(rel_path), self.objname, self.catalog_type, self.ra, self.dec, self.fov_ra, self.fov_dec, self.fov_ra * self.fov_dec, file_info.st_size, time.strftime("%Y-%m-%d %H:%M:%S.%f"[:-3], time.localtime(file_info.st_mtime)), self.catalog_version])
summary_tbl.write(self.summary_path, format = 'ascii.fixed_width', overwrite = True)
self.helper.print(f'GAIA catalog saved and registered to {catalog_file}', verbose)
except:
# Elase, return Error
raise ValueError(f'{self.objname} does not exist in GAIA catalog')
# After getting the data, format the data
self.data = None
if data:
self.data = GAIA_format(data)
def _get_APASS(self, verbose = True):
def APASS_format(APASS_catalog) -> Table:
original = ('RAJ2000','DEJ2000','e_RAJ2000','e_DEJ2000','Bmag','e_Bmag','Vmag','e_Vmag',"g'mag","e_g'mag","r'mag","e_r'mag","i'mag","e_i'mag")
format_ = ('ra','dec','e_ra','e_dec','B_mag','e_B_mag','V_mag','e_V_mag','g_mag','e_g_mag','r_mag','e_r_mag','i_mag','e_i_mag')
APASS_catalog.rename_columns(original, format_)
formatted_catalog = self._match_digit_tbl(APASS_catalog)
return formatted_catalog
# If filename is defined by _register_objinfo function (meaning located in catalog_archive), Query from archive
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
# Else, query object in the catalog and save to catalog_archive
else:
try:
# Try query and save to catalog_archive
data = self._query(catalog_name = 'APASS', verbose = verbose)
filename = f'{self.objname}_APASS.csv'
catalog_file = os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type, filename)
os.makedirs(os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type), exist_ok = True)
data.write(catalog_file, format ='csv', overwrite = True)
summary_tbl = ascii.read(self.summary_path, format = 'fixed_width')
catalog_file = Path(catalog_file)
file_info = catalog_file.stat()
rel_path = catalog_file.relative_to(self.helper.config['CATALOG_DIR'])
summary_tbl.add_row([str(rel_path), self.objname, self.catalog_type, self.ra, self.dec, self.fov_ra, self.fov_dec, self.fov_ra * self.fov_dec, file_info.st_size, time.strftime("%Y-%m-%d %H:%M:%S.%f"[:-3], time.localtime(file_info.st_mtime)), self.catalog_version])
summary_tbl.write(self.summary_path, format = 'ascii.fixed_width', overwrite = True)
self.helper.print(f'APASS catalog saved and registered to {catalog_file}', verbose)
except:
# Elase, return Error
raise ValueError(f'{self.objname} does not exist in APASS catalog')
# After getting the data, format the data
self.data = None
if data:
self.data = APASS_format(data)
def _get_PS1(self, verbose = True):
def PS1_format(PS1_catalog) -> Table:
original = ('objID','RAJ2000','DEJ2000','e_RAJ2000','e_DEJ2000','gmag','e_gmag','rmag','e_rmag','imag','e_imag','zmag','e_zmag','ymag','e_ymag','gKmag','e_gKmag','rKmag','e_rKmag','iKmag','e_iKmag','zKmag','e_zKmag','yKmag','e_yKmag')
format_ = ('ID','ra','dec','e_ra','e_dec','g_mag','e_g_mag','r_mag','e_r_mag','i_mag','e_i_mag','z_mag','e_z_mag','y_mag','e_y_mag','g_Kmag','e_g_Kmag','r_Kmag','e_r_Kmag','i_Kmag','e_i_Kmag','z_Kmag','e_z_Kmag','y_Kmag','e_y_Kmag')
PS1_catalog.rename_columns(original, format_)
formatted_catalog = self._match_digit_tbl(PS1_catalog)
return formatted_catalog
# If filename is defined by _register_objinfo function (meaning located in catalog_archive), Query from archive
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
# Else, query object in the catalog and save to catalog_archive
else:
try:
# Try query and save to catalog_archive
data = self._query(catalog_name = 'PS1', verbose = verbose)
filename = f'{self.objname}_PS1.csv'
catalog_file = os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type, filename)
os.makedirs(os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type), exist_ok = True)
data.write(catalog_file, format ='csv', overwrite = True)
summary_tbl = ascii.read(self.summary_path, format = 'fixed_width')
catalog_file = Path(catalog_file)
file_info = catalog_file.stat()
rel_path = catalog_file.relative_to(self.helper.config['CATALOG_DIR'])
summary_tbl.add_row([str(rel_path), self.objname, self.catalog_type, self.ra, self.dec, self.fov_ra, self.fov_dec, self.fov_ra * self.fov_dec, file_info.st_size, time.strftime("%Y-%m-%d %H:%M:%S.%f"[:-3], time.localtime(file_info.st_mtime)), self.catalog_version])
summary_tbl.write(self.summary_path, format = 'ascii.fixed_width', overwrite = True)
self.helper.print(f'PS1 catalog saved and registered to {catalog_file}', verbose)
except:
# Elase, return Error
raise ValueError(f'{self.objname} does not exist in PS1 catalog')
# After getting the data, format the data
self.data = None
if data:
self.data = PS1_format(data)
def _get_SMSS(self, verbose = True):
def SMSS_format(SMSS_catalog) -> Table:
original = ('ObjectId','RAICRS','DEICRS','Niflags','flags','Ngood','Ngoodu','Ngoodv','Ngoodg','Ngoodr','Ngoodi','Ngoodz','ClassStar','uPSF','e_uPSF','vPSF','e_vPSF','gPSF','e_gPSF','rPSF','e_rPSF','iPSF','e_iPSF','zPSF','e_zPSF')
format_ = ('ID','ra','dec','nimflag','flag','ngood','ngoodu','ngoodv','ngoodg','ngoodr','ngoodi','ngoodz','class_star','u_mag','e_u_mag','v_mag','e_v_mag','g_mag','e_g_mag','r_mag','e_r_mag','i_mag','e_i_mag','z_mag','e_z_mag')
SMSS_catalog.rename_columns(original, format_)
formatted_catalog = self._match_digit_tbl(SMSS_catalog)
return formatted_catalog
# If filename is defined by _register_objinfo function (meaning located in catalog_archive), Query from archive
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
# Else, query object in the catalog and save to catalog_archive
else:
try:
# Try query and save to catalog_archive
data = self._query(catalog_name = 'SMSS', verbose = verbose)
filename = f'{self.objname}_SMSS.csv'
catalog_file = os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type, filename)
os.makedirs(os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type), exist_ok = True)
data.write(catalog_file, format ='csv', overwrite = True)
summary_tbl = ascii.read(self.summary_path, format = 'fixed_width')
catalog_file = Path(catalog_file)
file_info = catalog_file.stat()
rel_path = catalog_file.relative_to(self.helper.config['CATALOG_DIR'])
summary_tbl.add_row([str(rel_path), self.objname, self.catalog_type, self.ra, self.dec, self.fov_ra, self.fov_dec, self.fov_ra * self.fov_dec, file_info.st_size, time.strftime("%Y-%m-%d %H:%M:%S.%f"[:-3], time.localtime(file_info.st_mtime)), self.catalog_version])
summary_tbl.write(self.summary_path, format = 'ascii.fixed_width', overwrite = True)
self.helper.print(f'SMSS catalog saved and registered to {catalog_file}', verbose)
except:
# Elase, return Error
raise ValueError(f'{self.objname} does not exist in SMSS catalog')
# After getting the data, format the data
self.data = None
if data:
self.data = SMSS_format(data)
def _get_SDSS(self, verbose = True):
def SDSS_format(SDSS_catalog) -> Table:
original = ('RA_ICRS','DE_ICRS','umag','e_umag','gmag','e_gmag','rmag','e_rmag','imag','e_imag','zmag','e_zmag')
format_ = ('ra','dec','umag','e_umag','gmag','e_gmag','rmag','e_rmag','imag','e_imag','zmag','e_zmag')
SDSS_catalog.rename_columns(original, format_)
formatted_catalog = self._match_digit_tbl(SDSS_catalog)
return formatted_catalog
# If filename is defined by _register_objinfo function (meaning located in catalog_archive), Query from archive
if self.filename:
self.helper.print(f'Catalog file found in archive: {self.filename}', verbose)
data = self._get_catalog_from_archive(filename = self.filename)
# Else, query object in the catalog and save to catalog_archive
else:
try:
# Try query and save to catalog_archive
data = self._query(catalog_name = 'SDSS', verbose = verbose)
filename = f'{self.objname}_SDSS.csv'
catalog_file = os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type, filename)
os.makedirs(os.path.join(self.helper.config['CATALOG_DIR'], self.catalog_type), exist_ok = True)
data.write(catalog_file, format ='csv', overwrite = True)
summary_tbl = ascii.read(self.summary_path, format = 'fixed_width')
catalog_file = Path(catalog_file)
file_info = catalog_file.stat()
rel_path = catalog_file.relative_to(self.helper.config['CATALOG_DIR'])
summary_tbl.add_row([str(rel_path), self.objname, self.catalog_type, self.ra, self.dec, self.fov_ra, self.fov_dec, self.fov_ra * self.fov_dec, file_info.st_size, time.strftime("%Y-%m-%d %H:%M:%S.%f"[:-3], time.localtime(file_info.st_mtime)), self.catalog_version])
summary_tbl.write(self.summary_path, format = 'ascii.fixed_width', overwrite = True)
self.helper.print(f'SDSS catalog saved and registered to {catalog_file}', verbose)
except:
# Elase, return Error
raise ValueError(f'{self.objname} does not exist in SDSS catalog')
# After getting the data, format the data
self.data = None
if data:
self.data = SDSS_format(data)
def _match_digit_tbl(self, tbl):
for column in tbl.columns:
if tbl[column].dtype == 'float64':
tbl[column].format = '{:.5f}'
return tbl
def _get_catalog_from_archive(self, filename : str):
catalog_file = os.path.join(self.helper.config['CATALOG_DIR'], filename)
is_exist = os.path.exists(catalog_file)
if is_exist:
data = ascii.read(catalog_file, format = 'csv')
return data
else:
return None
def _get_cataloginfo_by_coord(self,
coord: SkyCoord,
fov_ra: float = 1.5,
fov_dec: float = 1.5,
overlapped_fraction: float = 0.9,
verbose: bool = False) -> Table:
"""
Retrieves catalog information based on coordinates.
Requires fov_ra and fov_dec to calculate overlap.
Returns catalogs with sufficient overlap based on the given fraction.
"""
try:
catalog_summary_tbl = ascii.read(self.summary_path, format='fixed_width')
catalog_coords = SkyCoord(ra=catalog_summary_tbl['ra'],
dec=catalog_summary_tbl['dec'],
unit=(u.deg, u.deg))
# Cut tiles into 10deg x 10deg regions
ra_min, ra_max = coord.ra.deg - 5, coord.ra.deg + 5
dec_min, dec_max = coord.dec.deg - 5, coord.dec.deg + 5
cut_tiles_mask = (
(catalog_summary_tbl['ra'] >= ra_min) & (catalog_summary_tbl['ra'] <= ra_max) &
(catalog_summary_tbl['dec'] >= dec_min) & (catalog_summary_tbl['dec'] <= dec_max)
)
catalog_summary_tbl = catalog_summary_tbl[cut_tiles_mask]
catalog_coords = catalog_coords[cut_tiles_mask]
overlap_catalogs = []
overlap_fractions = []
for idx, (cat_ra, cat_dec, cat_fov_ra, cat_fov_dec) in enumerate(zip(
catalog_summary_tbl['ra'],
catalog_summary_tbl['dec'],
catalog_summary_tbl['fov_ra'],
catalog_summary_tbl['fov_dec'])):
target_polygon = Polygon([
(coord.ra.deg - fov_ra / 2, coord.dec.deg - fov_dec / 2),
(coord.ra.deg + fov_ra / 2, coord.dec.deg - fov_dec / 2),
(coord.ra.deg + fov_ra / 2, coord.dec.deg + fov_dec / 2),
(coord.ra.deg - fov_ra / 2, coord.dec.deg + fov_dec / 2)
])
tile_polygon = Polygon([
(cat_ra - cat_fov_ra / 2, cat_dec - cat_fov_dec / 2),
(cat_ra + cat_fov_ra / 2, cat_dec - cat_fov_dec / 2),
(cat_ra + cat_fov_ra / 2, cat_dec + cat_fov_dec / 2),
(cat_ra - cat_fov_ra / 2, cat_dec + cat_fov_dec / 2)
])
if target_polygon.intersects(tile_polygon):
intersection = target_polygon.intersection(tile_polygon)
target_area = fov_ra * fov_dec
fraction_overlap = intersection.area / target_area
if fraction_overlap >= overlapped_fraction:
overlap_catalogs.append(catalog_summary_tbl[idx])
overlap_fractions.append(fraction_overlap)
if len(overlap_catalogs) > 0:
return vstack(overlap_catalogs)
else:
raise ValueError("No catalog found with sufficient overlap.")
except Exception as e:
raise RuntimeError(f'Failed to access catalog summary: {e}')
def _get_cataloginfo_by_objname(self, objname, catalog_type, catalog_version):
catalog_summary_file = os.path.join(self.helper.config['CATALOG_DIR'], 'summary.ascii_fixed_width')
catalog_summary_tbl = ascii.read(catalog_summary_file, format = 'fixed_width')
idx = (catalog_summary_tbl['objname'] == objname) & (catalog_summary_tbl['catalog_type'] == catalog_type) & (catalog_summary_tbl['catalog_version'] == catalog_version)
if np.sum(idx) > 0:
matched_info = catalog_summary_tbl[idx]
return matched_info
else:
raise ValueError(f"{objname} not found in catalog_summary")
def _register_objinfo(self, objname, ra, dec, fov_ra, fov_dec, catalog_type, catalog_version, verbose):
self.objname = objname
self.ra = ra
self.dec = dec
self.fov_ra = fov_ra
self.fov_dec = fov_dec
self.catalog_type = catalog_type
self.catalog_version = catalog_version
# 1. Check if neither objname nor (ra, dec) are provided
if (objname is None) and (ra is None) and (dec is None):
raise ValueError('objname or (ra, dec) must be provided')
# 2. If objname is given but ra and dec are not, retrieve coordinates
if objname is not None:
try:
catinfo = self._get_cataloginfo_by_objname(objname = objname, catalog_type = catalog_type, catalog_version = catalog_version)
self.ra = catinfo['ra'][0]
self.dec = catinfo['dec'][0]
self.fov_ra = catinfo['fov_ra'][0]
self.fov_dec = catinfo['fov_dec'][0]
self.filename = catinfo['file'][0]
except:
try:
coord = self._query_coord_from_objname(objname = objname, verbose = verbose)
self.ra = coord.ra.deg
self.dec = coord.dec.deg
except:
raise ValueError(f"Failed to query coordinates for {objname}")
# 3. If objname is not provided, generate it using the coordinate format
else:
if ra is not None and dec is not None:
coord = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame='icrs')
try:
catinfo = self._get_cataloginfo_by_coord(coord = coord, fov_ra = fov_ra, fov_dec = fov_dec, overlapped_fraction = self.overlapped_fraction)
self.objname = catinfo['objname'][0]
self.ra = catinfo['ra'][0]
self.dec = catinfo['dec'][0]
self.catalog_version = catinfo['catalog_version'][0]
self.fov_ra = catinfo['fov_ra'][0]
self.fov_dec = catinfo['fov_dec'][0]
self.filename = catinfo['file'][0]
self.is_registered = True
except:
ra_hms = coord.ra.hms
dec_dms = coord.dec.dms
self.objname = f'J{int(ra_hms.h):02}{int(ra_hms.m):02}{ra_hms.s:05.2f}' \
f'{int(dec_dms.d):+03}{int(abs(dec_dms.m)):02}{abs(dec_dms.s):04.1f}'
if (self.objname is None) or (self.ra is None) or (self.dec is None):
raise ValueError('objname, ra, and dec must be provided')
def _query_coord_from_objname(self, objname, verbose) -> SkyCoord:
from astroquery.simbad import Simbad
# Create a custom Simbad instance with the necessary fields
self.helper.print(f'Querying coordinates for {objname} from SIMBAD', verbose)
custom_simbad = Simbad()
custom_simbad.add_votable_fields('ra', 'dec')
# Query an object (e.g., "Vega")
result_table = custom_simbad.query_object(objname)
# Extract coordinates
if result_table is not None:
ra = result_table['ra'][0] # Right Ascension
dec = result_table['dec'][0] # Declination
coord = SkyCoord(ra, dec, unit = (u.deg, u.deg))
self.helper.print(f'Coordinates for {objname} from SIMBAD: {coord}', verbose)
return coord
else:
raise ValueError("Object not found in SIMBAD.")
# %%
if __name__ == '__main__':
from astropy.io import ascii
self = SkyCatalog(objname = 'NGC1566', catalog_type = 'APASS', catalog_version = 'v1')
# %%