#%%
from pathlib import Path
from typing import List, Tuple, Optional
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from astropy.io import ascii
import astropy
from shapely.geometry import box
from ezphot.skycatalog import SkyCatalog
from ezphot.helper import Helper
#%%
[docs]
class SkyCatalogUtility:
"""
Utility class for loading, filtering, and selecting sources from sky catalogs.
"""
[docs]
def __init__(self,
catalog_archive_path: Optional[str] = None):
"""
Parameters
----------
catalog_archive_path : Optional[str], optional
Path to the catalog archive summary file. If not provided, the path will be set to the catalog_dir in the config.
"""
self.helper = Helper()
self.catalog_archive_path = Path(catalog_archive_path) if catalog_archive_path else Path(self.helper.config['CATALOG_DIR']) / 'summary.ascii_fixed_width'
def get_catalogs(self,
ra: float = None,
dec: float = None,
objname: str = None,
fov_ra: float = 1.0,
fov_dec: float = 1.0,
catalog_type: str = 'GAIAXP',
catalog_version: str = 'v1',
fraction_criteria: float = 0.05,
query_when_not_archived: bool = True,
verbose: bool = False) -> List[SkyCatalog]:
"""
Load SkyCatalog instances that overlap with a given coordinate and FOV.
Parameters
----------
ra : float
The right ascension of the target coordinate.
dec : float
The declination of the target coordinate.
objname : str, optional
The name of the target object.
fov_ra : float, optional
The field of view in right ascension.
fov_dec : float, optional
The field of view in declination.
catalog_type : str, optional
The type of catalog to load.
catalog_version : str, optional
The version of catalog to load.
fraction_criteria : float, optional
The fraction of the catalog that must overlap with the target coordinate and FOV.
query_when_not_archived : bool, optional
Whether to query the catalog when it is not archived.
verbose : bool, optional
Whether to print verbose output.
Returns
-------
catalogs : List[SkyCatalog]
The list of SkyCatalog instances that overlap with the target coordinate and FOV.
"""
def make_sky_rectangle(ra_center, dec_center, fov_ra, fov_dec):
ra_offset = (fov_ra / 2) / np.cos(np.radians(dec_center))
dec_offset = fov_dec / 2
return box(ra_center - ra_offset, dec_center - dec_offset,
ra_center + ra_offset, dec_center + dec_offset)
# Load catalog summary
catalog_path = self.catalog_archive_path
summary = ascii.read(catalog_path, format='fixed_width')
summary = summary[(summary['catalog_type'] == catalog_type) & (summary['catalog_version'] == catalog_version)]
# Fast angular pre-filter
catalogs = None
if objname is not None:
summary_objname = summary[summary['objname'] == objname]
if len(summary_objname) > 0:
catalogs = [self._load_catalog_worker(row) for row in summary_objname]
return catalogs
else:
self.helper.print(f"No catalogs found for {objname}", verbose)
if ra is not None and dec is not None:
ra_arr = np.asarray(summary['ra'])
dec_arr = np.asarray(summary['dec'])
cos_dec0, sin_dec0 = np.cos(np.radians(dec)), np.sin(np.radians(dec))
cos_dec, sin_dec = np.cos(np.radians(dec_arr)), np.sin(np.radians(dec_arr))
delta_ra = np.radians(ra_arr - ra)
angular_sep = np.degrees(np.arccos(np.clip(sin_dec0 * sin_dec + cos_dec0 * cos_dec * np.cos(delta_ra), -1, 1)))
search_radius = min(5, 10 * np.sqrt(fov_ra ** 2 + fov_dec ** 2))
summary_nearby = summary[angular_sep < search_radius]
# Target polygon
target_poly = make_sky_rectangle(ra, dec, fov_ra, fov_dec)
# Intersection filter
from shapely.ops import unary_union
selected_rows = []
selected_intersections = []
# 1. ๋ชจ๋ intersecting tile์ intersection polygon ์ ์ฅ
candidates = []
for row in summary_nearby:
tile_poly = make_sky_rectangle(
row['ra'], row['dec'], row['fov_ra'], row['fov_dec']
)
if tile_poly.intersects(target_poly):
inter = tile_poly.intersection(target_poly)
if inter.area > 0:
candidates.append((row, inter))
# 2. intersection ๋ฉด์ ํฐ ์์๋ก ์ ๋ ฌ
candidates.sort(key=lambda x: x[1].area, reverse=True)
# 3. Greedy union
union_poly = None
for row, inter in candidates:
if union_poly is None:
trial_union = inter
else:
trial_union = union_poly.union(inter)
old_cov = 0.0 if union_poly is None else union_poly.area / target_poly.area
new_cov = trial_union.area / target_poly.area
# coverage๊ฐ ์ฆ๊ฐํ๋ฉด ์ฑํ
if new_cov > old_cov:
selected_rows.append(row)
union_poly = trial_union
if new_cov >= 0.999: # ๊ฑฐ์ full cover
break
final_coverage = 0.0 if union_poly is None else union_poly.area / target_poly.area
self.helper.print(
f"Selected {len(selected_rows)} tiles, coverage = {final_coverage:.3f}",
verbose
)
catalogs = [self._load_catalog_worker(row) for row in selected_rows]
else:
raise RuntimeError('Objname or Coordinate(Ra and dec) must be provided')
if query_when_not_archived:
if len(catalogs) == 0:
skycatalog = SkyCatalog(objname = objname, ra = ra, dec = dec, fov_ra = fov_ra, fov_dec = fov_dec, catalog_type = catalog_type, overlapped_fraction = fraction_criteria, verbose = verbose)
catalogs = [skycatalog]
return catalogs
def select_reference_sources(self,
catalog: SkyCatalog,
mag_lower: Optional[float] = None,
mag_upper: Optional[float] = None,
**kwargs) -> Tuple['astropy.table.Table', list]:
"""
Select reference sources from a given SkyCatalog based on predefined cuts.
Parameters
----------
catalog : SkyCatalog
The SkyCatalog instance to select reference sources from.
mag_lower : Optional[float], optional
The lower magnitude bound for filtering reference sources.
mag_upper : Optional[float], optional
The upper magnitude bound for filtering reference sources.
**kwargs : dict
Additional keyword arguments to filter reference sources.
Returns
-------
ref_sources : astropy.table.Table
The selected reference sources.
applied_kwargs : list
The applied keyword arguments.
"""
if not catalog.data:
raise RuntimeError(f'No catalog data found for {catalog.objname}')
# Define default cutlines
cutlines = {
'APASS': dict(e_V_mag=[0.001, 0.2], V_mag=[mag_lower, mag_upper]),
'GAIA': dict(V_flag=[0, 1], V_mag=[mag_lower, mag_upper]),
'GAIAXP': {"pmra": [-20, 20], "pmdec": [-20, 20], "bp-rp": [-2, 2], "g_mean": [mag_lower, mag_upper]},
'PS1': {"gFlags": [0, 10], "g_mag": [mag_lower, mag_upper]},
'SMSS': {"ngood": [5, 999], "class_star": [0.3, 1.0], "g_mag": [mag_lower, mag_upper]},
'GAIAXP_CORR_LAMOST': {"pmra": [-20, 20], "pmdec": [-20, 20], "bp-rp": [-2, 2], "g_mean": [mag_lower, mag_upper]},
}
if catalog.catalog_type not in cutlines:
raise ValueError(f"Invalid catalog type: {catalog.catalog_type}")
cutline = {**cutlines[catalog.catalog_type], **kwargs}
ref_sources = catalog.data
applied_kwargs = []
for key, value in cutline.items():
if key in ref_sources.colnames:
applied_kwargs.append({key: [value]})
ref_sources = ref_sources[(ref_sources[key] >= value[0]) & (ref_sources[key] <= value[1])]
return ref_sources, applied_kwargs
@staticmethod
def _load_catalog_worker(row):
"""Worker to load a SkyCatalog from a summary row."""
return SkyCatalog(objname=row['objname'],
catalog_type=row['catalog_type'],
fov_ra=row['fov_ra'],
fov_dec=row['fov_dec'],
catalog_version=row['catalog_version'],
verbose = False)