Source code for ezphot.skycatalog.skycatalogutility

#%%
from pathlib import Path
from typing import List, Tuple, Optional
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor

import numpy as np
from astropy.io import ascii
import astropy
from shapely.geometry import box

from ezphot.skycatalog import SkyCatalog
from ezphot.helper import Helper

#%%

[docs] class SkyCatalogUtility: """ Utility class for loading, filtering, and selecting sources from sky catalogs. """
[docs] def __init__(self, catalog_archive_path: Optional[str] = None): """ Parameters ---------- catalog_archive_path : Optional[str], optional Path to the catalog archive summary file. If not provided, the path will be set to the catalog_dir in the config. """ self.helper = Helper() self.catalog_archive_path = Path(catalog_archive_path) if catalog_archive_path else Path(self.helper.config['CATALOG_DIR']) / 'summary.ascii_fixed_width'
def get_catalogs(self, ra: float = None, dec: float = None, objname: str = None, fov_ra: float = 1.0, fov_dec: float = 1.0, catalog_type: str = 'GAIAXP', catalog_version: str = 'v1', fraction_criteria: float = 0.05, query_when_not_archived: bool = True, verbose: bool = False) -> List[SkyCatalog]: """ Load SkyCatalog instances that overlap with a given coordinate and FOV. Parameters ---------- ra : float The right ascension of the target coordinate. dec : float The declination of the target coordinate. objname : str, optional The name of the target object. fov_ra : float, optional The field of view in right ascension. fov_dec : float, optional The field of view in declination. catalog_type : str, optional The type of catalog to load. catalog_version : str, optional The version of catalog to load. fraction_criteria : float, optional The fraction of the catalog that must overlap with the target coordinate and FOV. query_when_not_archived : bool, optional Whether to query the catalog when it is not archived. verbose : bool, optional Whether to print verbose output. Returns ------- catalogs : List[SkyCatalog] The list of SkyCatalog instances that overlap with the target coordinate and FOV. """ def make_sky_rectangle(ra_center, dec_center, fov_ra, fov_dec): ra_offset = (fov_ra / 2) / np.cos(np.radians(dec_center)) dec_offset = fov_dec / 2 return box(ra_center - ra_offset, dec_center - dec_offset, ra_center + ra_offset, dec_center + dec_offset) # Load catalog summary catalog_path = self.catalog_archive_path summary = ascii.read(catalog_path, format='fixed_width') summary = summary[(summary['catalog_type'] == catalog_type) & (summary['catalog_version'] == catalog_version)] # Fast angular pre-filter catalogs = None if objname is not None: summary_objname = summary[summary['objname'] == objname] if len(summary_objname) > 0: catalogs = [self._load_catalog_worker(row) for row in summary_objname] return catalogs else: self.helper.print(f"No catalogs found for {objname}", verbose) if ra is not None and dec is not None: ra_arr = np.asarray(summary['ra']) dec_arr = np.asarray(summary['dec']) cos_dec0, sin_dec0 = np.cos(np.radians(dec)), np.sin(np.radians(dec)) cos_dec, sin_dec = np.cos(np.radians(dec_arr)), np.sin(np.radians(dec_arr)) delta_ra = np.radians(ra_arr - ra) angular_sep = np.degrees(np.arccos(np.clip(sin_dec0 * sin_dec + cos_dec0 * cos_dec * np.cos(delta_ra), -1, 1))) search_radius = min(5, 10 * np.sqrt(fov_ra ** 2 + fov_dec ** 2)) summary_nearby = summary[angular_sep < search_radius] # Target polygon target_poly = make_sky_rectangle(ra, dec, fov_ra, fov_dec) # Intersection filter from shapely.ops import unary_union selected_rows = [] selected_intersections = [] # 1. ๋ชจ๋“  intersecting tile์˜ intersection polygon ์ €์žฅ candidates = [] for row in summary_nearby: tile_poly = make_sky_rectangle( row['ra'], row['dec'], row['fov_ra'], row['fov_dec'] ) if tile_poly.intersects(target_poly): inter = tile_poly.intersection(target_poly) if inter.area > 0: candidates.append((row, inter)) # 2. intersection ๋ฉด์  ํฐ ์ˆœ์„œ๋กœ ์ •๋ ฌ candidates.sort(key=lambda x: x[1].area, reverse=True) # 3. Greedy union union_poly = None for row, inter in candidates: if union_poly is None: trial_union = inter else: trial_union = union_poly.union(inter) old_cov = 0.0 if union_poly is None else union_poly.area / target_poly.area new_cov = trial_union.area / target_poly.area # coverage๊ฐ€ ์ฆ๊ฐ€ํ•˜๋ฉด ์ฑ„ํƒ if new_cov > old_cov: selected_rows.append(row) union_poly = trial_union if new_cov >= 0.999: # ๊ฑฐ์˜ full cover break final_coverage = 0.0 if union_poly is None else union_poly.area / target_poly.area self.helper.print( f"Selected {len(selected_rows)} tiles, coverage = {final_coverage:.3f}", verbose ) catalogs = [self._load_catalog_worker(row) for row in selected_rows] else: raise RuntimeError('Objname or Coordinate(Ra and dec) must be provided') if query_when_not_archived: if len(catalogs) == 0: skycatalog = SkyCatalog(objname = objname, ra = ra, dec = dec, fov_ra = fov_ra, fov_dec = fov_dec, catalog_type = catalog_type, overlapped_fraction = fraction_criteria, verbose = verbose) catalogs = [skycatalog] return catalogs def select_reference_sources(self, catalog: SkyCatalog, mag_lower: Optional[float] = None, mag_upper: Optional[float] = None, **kwargs) -> Tuple['astropy.table.Table', list]: """ Select reference sources from a given SkyCatalog based on predefined cuts. Parameters ---------- catalog : SkyCatalog The SkyCatalog instance to select reference sources from. mag_lower : Optional[float], optional The lower magnitude bound for filtering reference sources. mag_upper : Optional[float], optional The upper magnitude bound for filtering reference sources. **kwargs : dict Additional keyword arguments to filter reference sources. Returns ------- ref_sources : astropy.table.Table The selected reference sources. applied_kwargs : list The applied keyword arguments. """ if not catalog.data: raise RuntimeError(f'No catalog data found for {catalog.objname}') # Define default cutlines cutlines = { 'APASS': dict(e_V_mag=[0.001, 0.2], V_mag=[mag_lower, mag_upper]), 'GAIA': dict(V_flag=[0, 1], V_mag=[mag_lower, mag_upper]), 'GAIAXP': {"pmra": [-20, 20], "pmdec": [-20, 20], "bp-rp": [-2, 2], "g_mean": [mag_lower, mag_upper]}, 'PS1': {"gFlags": [0, 10], "g_mag": [mag_lower, mag_upper]}, 'SMSS': {"ngood": [5, 999], "class_star": [0.3, 1.0], "g_mag": [mag_lower, mag_upper]}, 'GAIAXP_CORR_LAMOST': {"pmra": [-20, 20], "pmdec": [-20, 20], "bp-rp": [-2, 2], "g_mean": [mag_lower, mag_upper]}, } if catalog.catalog_type not in cutlines: raise ValueError(f"Invalid catalog type: {catalog.catalog_type}") cutline = {**cutlines[catalog.catalog_type], **kwargs} ref_sources = catalog.data applied_kwargs = [] for key, value in cutline.items(): if key in ref_sources.colnames: applied_kwargs.append({key: [value]}) ref_sources = ref_sources[(ref_sources[key] >= value[0]) & (ref_sources[key] <= value[1])] return ref_sources, applied_kwargs @staticmethod def _load_catalog_worker(row): """Worker to load a SkyCatalog from a summary row.""" return SkyCatalog(objname=row['objname'], catalog_type=row['catalog_type'], fov_ra=row['fov_ra'], fov_dec=row['fov_dec'], catalog_version=row['catalog_version'], verbose = False)