Source code for ezphot.utils.databrowser



#%%
import inspect
from pathlib import Path
from typing import List, Union
from functools import partial
from multiprocessing import Pool
from tqdm import tqdm

from ezphot.helper import Helper

#%%
import os
os.environ["ASTROPY_LOG_LEVEL"] = "ERROR"
os.environ["ASTROPY_WCS_SIP"] = "IGNORE"   # not documented, but silences SIP complaints


def _get_imginfo(filelist, pattern):
    helper = Helper()   
    imginfo = helper.get_imginfo(filelist, pattern=pattern)
    return imginfo

def _load_image(cls, path):
    try:    
        # Construct with or without telinfo
        return cls(path)

    except Exception as e:
        print(f"[WARNING] Failed to load {path}: {e}")
        return None

[docs] class DataBrowser: """ DataBrowser is a class that provides a unified interface for searching and loading data from the telescope data directory. It provides: 1. Search for files matching the current filters and return them as different types of objects. Types of objects: - ``ImageSet`` of ``ScienceImage`` - ``ImageSet`` of ``ReferenceImage`` - ``ImageSet`` of ``CalibrationImage`` - ``ImageSet`` of ``Background`` - ``ImageSet`` of ``Errormap`` - ``ImageSet`` of ``Mask`` - ``CatalogSet`` of ``Catalog`` """
[docs] def __init__(self, foldertype: str = None): self.helper = Helper() self.foldertype = foldertype self.basepath = self._get_default_path() # Search attributes self.observatory = None self.telkey = None self.imgtype = None self.telname = None self.objname = None self.filter = None self.obsdate = None
def __repr__(self): txt = f"<DataBrowser(searchpath='{self.searchpath}', foldertype='{self.foldertype}')>\n" txt += "Search Attributes:\n" txt += f" observatory : {self.observatory or '*'}\n" txt += f" telkey : {self.telkey or '*'}\n" txt += f" imgtype : {self.imgtype or '*'}\n" txt += f" telname : {self.telname or '*'}\n" txt += f" objname : {self.objname or '*'}\n" txt += f" filter : {self.filter or '*'}\n" txt += f" obsdate : {self.obsdate or '*'}\n" txt += f'\n For help, use \'help(self)\' or `self.help()`.' return txt def help(self): # Get all public methods from the class, excluding `help` methods = [ (name, obj) for name, obj in inspect.getmembers(self.__class__, inspect.isfunction) if not name.startswith("_") and name != "help" ] # Build plain text list with parameters lines = [] for name, func in methods: sig = inspect.signature(func) params = [str(p) for p in sig.parameters.values() if p.name != "self"] sig_str = f"({', '.join(params)})" if params else "()" lines.append(f"- {name}{sig_str}") # Final plain text output help_text = "" print(f"Help for {self.__class__.__name__}\n{help_text}\n\nPublic methods:\n" + "\n".join(lines)) def search(self, pattern: str ='*.fits', return_type: str = 'path') -> Union[dict, List[Path]]: """ Search for FITS files matching the current attributes and return them grouped by telescope or as objects. This method will search for the files in the searchpath defined by the current filters (observatory, telkey, objname, telname, filter, obsdate). Parameters ---------- pattern : str Filename pattern to match (e.g., ``'*.fits'``). return_type : str ``'path'``, ``'science'``, ``'reference'``, ``'calibration'``, ``'background'``, ``'errormap'``, ``'mask'`` or ``'imginfo'`` to convert paths to objects. Returns ------- output : Dict, Table, ImageSet, CatalogSet - If ``return_type``=``'path'``, returns ``dict[telname] = list of file paths``. - If ``return_type``=``'imginfo'``, returns ``astropy.table.Table`` of imginfo. - If ``return_type``=``'science'``, ``'reference'``, ``'calibration'``, ``'mask'``, ``'background'``, ``'errormap'``, returns ``'ImageSet'``. - If ``return_type``=``'catalog'``, returns CatalogSet. """ return self.search_folder(pattern=pattern, folder=self.searchpath, return_type=return_type) def search_folder(self, pattern: str, folder: str, return_type: str = 'path') -> Union[dict, List[Path]]: """ Search for FITS files matching the current attributes and return them grouped by telescope or as objects. This method will search for the files in the searchpath defined by the current filters (observatory, telkey, objname, telname, filter, obsdate). Parameters ---------- pattern : str Filename pattern to match (e.g., ``'*.fits'``). folder : str Folder to search in. return_type : str ``'path'``, ``'science'``, ``'reference'``, ``'calibration'``, ``'background'``, ``'errormap'``, ``'mask'`` or ``'imginfo'`` to convert paths to objects. Returns ------- output : Dict, Table, ImageSet, CatalogSet - If ``return_type``=``'path'``, returns ``dict[telname] = list of file paths``. - If ``return_type``=``'imginfo'``, returns ``astropy.table.Table`` of imginfo. - If ``return_type``=``'science'``, ``'reference'``, ``'calibration'``, ``'mask'``, ``'background'``, ``'errormap'``, returns ``'ImageSet'``. - If ``return_type``=``'catalog'``, returns ``'CatalogSet'``. """ import glob from collections import defaultdict glob_pattern = str(folder / pattern) matched_files = glob.glob(glob_pattern, recursive=True) print(f"[INFO] Found {len(matched_files)} files matching '{glob_pattern}'") if return_type == 'path': return matched_files elif return_type == 'imginfo': return self._to_imginfo(matched_files, pattern) elif return_type == 'science': from ezphot.imageobjects import ImageSet return ImageSet(self._to_science_images(matched_files)) elif return_type == 'reference': from ezphot.imageobjects import ImageSet return ImageSet(self._to_reference_images(matched_files)) elif return_type == 'calibration': from ezphot.imageobjects import ImageSet return ImageSet(self._to_calibration_images(matched_files)) elif return_type == 'background': from ezphot.imageobjects import ImageSet return ImageSet(self._to_background(matched_files)) elif return_type == 'errormap': from ezphot.imageobjects import ImageSet return ImageSet(self._to_errormap(matched_files)) elif return_type == 'catalog': from ezphot.dataobjects import CatalogSet return CatalogSet(self._to_catalog(matched_files)) else: raise ValueError(f"Invalid return_type: {return_type}. Choose from 'path', 'science', 'reference', 'calibration', 'background', 'errormap', 'catalog'.") def _get_default_path(self): default_path_dict = { "scidata": self.helper.config["SCIDATA_DIR"], "refdata": self.helper.config["REFDATA_DIR"], "calibdata": self.helper.config["CALIBDATA_DIR"], "mcalibdata": self.helper.config["CALIBDATA_MASTERDIR"], "obsdata": self.helper.config["OBSDATA_DIR"] } if self.foldertype not in default_path_dict: print(f"[WARNING] Unknown foldertype: {self.foldertype}. Available types: {list(default_path_dict.keys())}") return None else: return Path(default_path_dict[self.foldertype]) def _to_imginfo(self, filepaths: List[Union[str, Path]], pattern: str = '*.fits'): from astropy.table import vstack if not filepaths: return None # Group files by their parent directory from collections import defaultdict dir_to_files = defaultdict(list) for path in filepaths: path = Path(path) dir_to_files[path.parent].append(path) # Convert grouped files to a list for multiprocessing file_groups = list(dir_to_files.values()) # Run multiprocessing if len(file_groups) > 1: with Pool(16) as pool: results = list( tqdm( pool.starmap(_get_imginfo, [(group, pattern) for group in file_groups]), total=len(file_groups), desc="Collecting ImgInfo" ) ) else: results = [_get_imginfo(file_groups[0], pattern)] # Combine results tables = [tbl for tbl in results if tbl is not None and len(tbl) > 0] if not tables: return None return vstack(tables, metadata_conflicts='silent') def _to_science_images(self, filepaths: List[Union[str, Path]]): from ezphot.imageobjects import ScienceImage with Pool(16) as pool: func = partial(_load_image, ScienceImage) images = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Science Images")) return [img for img in images if img is not None] def _to_reference_images(self, filepaths: List[Union[str, Path]]): from ezphot.imageobjects import ReferenceImage with Pool(16) as pool: func = partial(_load_image, ReferenceImage) images = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Reference Images")) return [img for img in images if img is not None] def _to_calibration_images(self, filepaths: List[Union[str, Path]]): from ezphot.imageobjects import CalibrationImage with Pool(16) as pool: func = partial(_load_image, CalibrationImage) images = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Calibration Images")) return [img for img in images if img is not None] def _to_background(self, filepaths: List[Union[str, Path]]): from ezphot.imageobjects import Background with Pool(16) as pool: func = partial(_load_image, Background) backgrounds = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Backgrounds")) return [bg for bg in backgrounds if bg is not None] def _to_errormap(self, filepaths: List[Union[str, Path]]): from ezphot.imageobjects import Errormap with Pool(16) as pool: func = partial(_load_image, Errormap) errormaps = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Errormaps")) return [emap for emap in errormaps if emap is not None] def _to_mask(self, filepaths: List[Union[str, Path]]): from ezphot.imageobjects import Mask with Pool(16) as pool: func = partial(_load_image, Mask) masks = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Masks")) return [mask for mask in masks if mask is not None] def _to_catalog(self, filepaths: List[Union[str, Path]]): from ezphot.dataobjects import Catalog with Pool(16) as pool: func = partial(_load_image, Catalog) masks = list(tqdm(pool.imap(func, filepaths), total=len(filepaths), desc="Loading Catalogs")) return [mask for mask in masks if mask is not None] @property def searchpath(self): """ Get the search path of the current filters. Returns ------- pathlib.Path The search path. """ if self.foldertype == 'scidata' or self.foldertype == 'refdata': path = self.basepath / (self.observatory or '*') / (self.telkey or '*') / (self.objname or '*') / (self.telname or '*') / (self.filter or '*') elif self.foldertype == 'calibdata': path = self.basepath / (self.observatory or '*') / (self.telkey or '*') / (self.imgtype or '*') / (self.telname or '*') elif self.foldertype == 'mcalibdata': path = self.basepath / '*' / (self.observatory or '*') / (self.telkey or '*') / (self.telname or '*') / (self.imgtype or '*') elif self.foldertype == 'obsdata': path = self.basepath / (self.observatory or '*') / (self.telname or '*') / (self.obsdate or '*') else: raise ValueError(f"Unknown foldertype: {self.foldertype}") return path @property def keys(self) -> dict: """ Get the keys of the current filters. Returns ------- dict Dictionary of keys. """ base = self.basepath result = { 'observatory': set(), 'telkey': set(), 'objname': set(), 'telname': set(), 'filter': set(), 'imgtype': set(), 'obsdate': set(), } if not base.exists(): return result try: if self.foldertype in ['scidata', 'refdata']: for obs in base.iterdir(): if not obs.is_dir(): continue if self.observatory and obs.name != self.observatory: continue observatory_has_match = False for telkey in obs.iterdir(): if not telkey.is_dir(): continue if self.telkey and telkey.name != self.telkey: continue telkey_has_match = False for obj in telkey.iterdir(): if not obj.is_dir(): continue if self.objname and obj.name != self.objname: continue obj_has_match = False for tel in obj.iterdir(): if not tel.is_dir(): continue if self.telname and tel.name != self.telname: continue tel_has_match = False for filt in tel.iterdir(): if not filt.is_dir(): continue if self.filter and filt.name != self.filter: continue # Success! result['filter'].add(filt.name) tel_has_match = True if tel_has_match: result['telname'].add(tel.name) obj_has_match = True if obj_has_match: result['objname'].add(obj.name) telkey_has_match = True if telkey_has_match: result['telkey'].add(telkey.name) observatory_has_match = True if observatory_has_match: result['observatory'].add(obs.name) elif self.foldertype == 'calibdata': for obs in base.iterdir(): if not obs.is_dir(): continue if self.observatory and obs.name != self.observatory: continue result['observatory'].add(obs.name) for telkey in (obs / self.telkey if self.telkey else obs).iterdir(): if not telkey.is_dir(): continue result['telkey'].add(telkey.name) for imgtype_dir in telkey.iterdir(): if not imgtype_dir.is_dir(): continue if self.imgtype and imgtype_dir.name != self.imgtype: continue result['imgtype'].add(imgtype_dir.name) for telname_dir in imgtype_dir.iterdir(): if telname_dir.is_dir(): result['telname'].add(telname_dir.name) elif self.foldertype == 'mcalibdata': for user in base.iterdir(): if not user.is_dir(): continue for obs in user.iterdir(): if not obs.is_dir(): continue if self.observatory and obs.name != self.observatory: continue result['observatory'].add(obs.name) for telkey in obs.iterdir(): if not telkey.is_dir(): continue result['telkey'].add(telkey.name) for tel in telkey.iterdir(): if not tel.is_dir(): continue result['telname'].add(tel.name) for imgtype in tel.iterdir(): if imgtype.is_dir(): result['imgtype'].add(imgtype.name) elif self.foldertype == 'obsdata': for obs in base.iterdir(): if not obs.is_dir(): continue if self.observatory and obs.name != self.observatory: continue result['observatory'].add(obs.name) for tel in obs.iterdir(): if not tel.is_dir(): continue result['telname'].add(tel.name) for obsdate in tel.iterdir(): if obsdate.is_dir(): result['obsdate'].add(obsdate.name) except Exception as e: print(f"[WARNING] list_available failed: {e}") return result