Source code for ezphot.utils.sdtdataquerier

#%%
from tqdm import tqdm
import inspect
import os
import glob
import subprocess
from multiprocessing import Pool

from ezphot.helper import Helper
#%%

[docs] class SDTDataQuerier: """ SDTDataQuerier is a class that syncs data from the source directory to the destination directory. This class provides 1. Syncing observational data 2. Syncing calibrated data 3. Showing the list of files in the source and destination directories 4. Showing the list of folders in the source and destination directories """
[docs] def __init__(self, ccd : str = 'C361K'): self.helper = Helper() self.folders = [] self.ccd = ccd
def __repr__(self): return f"SDTDataQuerier(ccd={self.ccd})\n For help, use 'help(self)' or `self.help()`." def help(self): # Get all public methods from the class, excluding `help` methods = [ (name, obj) for name, obj in inspect.getmembers(self.__class__, inspect.isfunction) if not name.startswith("_") and name != "help" ] # Build plain text list with parameters lines = [] for name, func in methods: sig = inspect.signature(func) params = [str(p) for p in sig.parameters.values() if p.name != "self"] sig_str = f"({', '.join(params)})" if params else "()" lines.append(f"- {name}{sig_str}") # Final plain text output help_text = "" print(f"Help for {self.__class__.__name__}\n{help_text}\n\nPublic methods:\n" + "\n".join(lines)) def sync_obsdata(self, foldername: str, file_pattern: str = '*.fits', ignore_exists: bool = True): """ Syncs all FITS files from a given foldername in the source directory to the destination directory. This function opens multiple rsync processes to sync the data. (n_processes = len(telescope_ids)) The destination directory is defined in the helper.config['SDTDATA_OBSDESTDIR'] Parameters ---------- foldername : str The foldername to sync. file_pattern : str The file pattern to sync. ignore_exists : bool If True, ignore the files that already exist in the destination directory. """ # Step 1: Get telescope directories source_pattern = os.path.join(self.helper.config['SDTDATA_OBSSOURCEDIR'],"7DT??", foldername) telescope_dirs = glob.glob(source_pattern) # Step 2: Extract telescope IDs telescope_ids = [os.path.basename(os.path.dirname(os.path.normpath(t))) for t in telescope_dirs] if not telescope_ids: print("No telescope folders found.") # Step 3: Move all files to temporary storage in parallel with Pool(processes=len(telescope_ids)) as pool: dest_folders = pool.starmap(self._run_obsrsync, [(tid, foldername, file_pattern, ignore_exists) for tid in telescope_ids]) def sync_scidata(self, targetname : str, file_pattern: str = '*.fits', ignore_exists: bool = True): """ Syncs all FITS files from a given targetname in the source directory to the destination directory. This function opens multiple rsync processes to sync the data. (n_processes = len(telescope_ids)) The destination directory is defined in the helper.config['SDTDATA_SCIDESTDIR'] Parameters ---------- targetname : str The target name to sync. file_pattern : str The file pattern to sync. ignore_exists : bool If True, ignore the files that already exist in the destination directory. """ # Step 1: Get telescope directories source_pattern = os.path.join(self.helper.config['SDTDATA_SCISOURCEDIR'], targetname, "7DT??") telescope_dirs = glob.glob(source_pattern) # Step 2: Extract telescope IDs telescope_ids = [os.path.basename((os.path.normpath(t))) for t in telescope_dirs] if not telescope_ids: print("No telescope folders found.") # Step 3: Move all files to temporary storage in parallel with Pool(processes=len(telescope_ids)) as pool: dest_folders = pool.starmap(self._run_scirsync, [(tid, targetname, file_pattern, ignore_exists) for tid in telescope_ids]) def show_obssourcedata(self, foldername: str, show_only_numbers: bool = False, pattern: str = '*.fits'): """ Shows the number or list of FITS files matching a pattern in a given folder across all 7DT?? telescope directories. Parameters ---------- foldername : str Subfolder name (e.g., filter name) inside each 7DT?? directory. show_only_numbers : bool If True, return only the number of matched FITS files per telescope. pattern : str Glob pattern to match FITS files (default: ``'*.fits'``). Returns ------- fits_counts : dict Dictionary of {telescope_id: count or list of file paths}, sorted by telescope ID. """ import glob import os fits_counts = {} # Find all 7DT?? telescope directories telescope_dirs = glob.glob(os.path.join(self.helper.config['SDTDATA_OBSSOURCEDIR'], "7DT??")) print(f"Found {len(telescope_dirs)} telescope directories.") for telescope_dir in tqdm(telescope_dirs, desc="Searching telescopes..."): telescope_id = os.path.basename(telescope_dir) folder_path = os.path.join(telescope_dir, foldername) if os.path.isdir(folder_path): fits_files = glob.glob(os.path.join(folder_path, pattern)) if show_only_numbers: fits_counts[telescope_id] = len(fits_files) else: fits_counts[telescope_id] = sorted(fits_files) sorted_fits_counts = {tid: fits_counts[tid] for tid in sorted(fits_counts)} total_counts = sum([len(files) if isinstance(files, list) else files for files in sorted_fits_counts.values()]) if not sorted_fits_counts: print("No matched folders found.") else: print(f"Total files found: {total_counts}") return sorted_fits_counts def show_obsdestdata(self, foldername: str, show_only_numbers: bool = False, pattern: str = '*.fits'): """ Shows the number or list of FITS files matching a pattern in a given folder across all 7DT?? telescopes. Parameters ---------- foldername : str Subfolder name (e.g., filter name) inside each 7DT?? directory. show_only_numbers : bool If True, only show counts of matched files. pattern : str Pattern to match FITS files (e.g., '*.fits', 'calib*.fits'). Returns ------- dict Dictionary keyed by telescope ID, containing counts or file lists. """ import glob import os fits_counts = {} telescope_dirs = glob.glob(os.path.join(self.helper.config['SDTDATA_OBSDESTDIR'], '7DT??', foldername)) for telescope_dir in telescope_dirs: telescope_id = os.path.basename(os.path.dirname(telescope_dir)) fits_files = glob.glob(os.path.join(telescope_dir, pattern)) if show_only_numbers: fits_counts[telescope_id] = len(fits_files) else: fits_counts[telescope_id] = sorted(fits_files) sorted_fits_counts = {tid: fits_counts[tid] for tid in sorted(fits_counts)} if not sorted_fits_counts: print("No matched folders found.") else: print(sorted_fits_counts) return sorted_fits_counts def show_obssourcefolder(self, folder_key : str = '*'): """ Shows the contents of the source and destination directories. """ print("Source directory:", os.path.join( self.helper.config['SDTDATA_OBSSOURCEDIR'], "7DT??", folder_key)) if "*" in folder_key: folder_key = folder_key.replace("*", "") folders = set() for entry in os.scandir( self.helper.config['SDTDATA_OBSSOURCEDIR']): if entry.is_dir() and entry.name.startswith("7DT") and len(entry.name) == 5: subfolders = {os.path.join(sub.name) for sub in os.scandir(entry.path) if sub.is_dir()} folders.update(subfolders) sorted_folders = sorted(folders) matched_folders = [] for folder in sorted_folders: if folder_key in folder: matched_folders.append(folder) else: pass if not matched_folders: print("No matched folders found.") else: print(f"{len(matched_folders)} folders found.") return matched_folders def show_obsdestfolder(self, folder_key : str = None): """ Shows the contents of the source and destination directories. """ print("Source directory:", os.path.join( self.helper.config['SDTDATA_OBSDESTDIR'], "7DT??", folder_key)) if "*" in folder_key: folder_key = folder_key.replace("*", "") folders = set() for entry in os.scandir( self.helper.config['SDTDATA_OBSDESTDIR']): if entry.is_dir() and entry.name.startswith("7DT") and len(entry.name) == 5: subfolders = {os.path.join(sub.name) for sub in os.scandir(entry.path) if sub.is_dir()} folders.update(subfolders) sorted_folders = sorted(folders) matched_folders = [] for folder in sorted_folders: if folder_key in folder: matched_folders.append(folder) else: pass if not matched_folders: print("No matched folders found.") else: print(f"{len(matched_folders)} folders found.") return matched_folders def show_scisourcedata(self, targetname: str, show_only_numbers: bool = False, key: str = 'filter', # 'filter' or 'telescope' file_pattern: str = '*.fits' # e.g., '*.fits', 'calib*.fits' ): """ Shows the number of FITS files matching a pattern for each specified folder across all 7DT?? directories. Parameters ---------- targetname : str Target name under SDTDATA_SCISOURCEDIR. show_only_numbers : bool If True, only show counts instead of filenames. key : str 'filter' (default) to group by filter folders under telescopes, or 'telescope' to group only by telescope. file_pattern : str File pattern to match FITS files, e.g., '*.fits', 'calib*.fits', '*stack*.fits'. Returns ------- dict Dictionary of telescope or filter-wise counts or file lists. """ import glob import os fits_counts = {} if key.lower() == 'filter': dirs = glob.glob(os.path.join(self.helper.config['SDTDATA_SCISOURCEDIR'], targetname, "7DT??", '*')) elif key.lower() == 'telescope': dirs = glob.glob(os.path.join(self.helper.config['SDTDATA_SCISOURCEDIR'], targetname, "7DT??")) else: raise ValueError("Invalid key. Must be 'filter' or 'telescope'.") print(f'# of directories: {len(dirs)}') for dir in tqdm(dirs, desc="Searching directories..."): id_ = os.path.basename(dir) if key.lower() == 'filter': fits_files = glob.glob(os.path.join(dir, file_pattern)) else: fits_files = glob.glob(os.path.join(dir, "*", file_pattern)) if id_ not in fits_counts: fits_counts[id_] = 0 if show_only_numbers else [] if show_only_numbers: fits_counts[id_] += len(fits_files) else: fits_counts[id_].extend(sorted(fits_files)) sorted_fits_counts = {id_: fits_counts[id_] for id_ in sorted(fits_counts)} total_counts = sum([len(files) if isinstance(files, list) else files for files in sorted_fits_counts.values()]) if not sorted_fits_counts: print("No matched targets found.") return None else: print(f"Total files found: {total_counts}") return sorted_fits_counts def show_scidestdata(self, targetname: str, show_only_numbers: bool = False, key: str = 'filter', # 'filter' or 'telescope' pattern: str = '*.fits' # e.g., '*.fits', 'calib*.fits' ): """ Shows the number of FITS files matching a pattern for each specified folder across all 7DT?? directories. Parameters ---------- targetname : str Target name under SDTDATA_SCIDESTDIR. show_only_numbers : bool If True, only show counts instead of filenames. key : str 'filter' (default) to group by filter folders under telescopes, or 'telescope' to group only by telescope. pattern : str File pattern to match FITS files, e.g., '*.fits', 'calib*.fits'. Returns ------- dict Dictionary of telescope or filter-wise counts or file lists. """ import glob import os fits_counts = {} if key.lower() == 'filter': dirs = glob.glob(os.path.join(self.helper.config['SDTDATA_SCIDESTDIR'], targetname, "7DT??", '*')) elif key.lower() == 'telescope': dirs = glob.glob(os.path.join(self.helper.config['SDTDATA_SCIDESTDIR'], targetname, "7DT??")) else: raise ValueError("Invalid key. Must be 'filter' or 'telescope'.") for dir in dirs: id_ = os.path.basename(dir) if key.lower() == 'filter': fits_files = glob.glob(os.path.join(dir, pattern)) else: fits_files = glob.glob(os.path.join(dir, "*", pattern)) if id_ not in fits_counts: fits_counts[id_] = 0 if show_only_numbers else [] if show_only_numbers: fits_counts[id_] += len(fits_files) else: fits_counts[id_].extend(sorted(fits_files)) sorted_fits_counts = {id_: fits_counts[id_] for id_ in sorted(fits_counts)} total_counts = sum([len(files) if isinstance(files, list) else files for files in sorted_fits_counts.values()]) if not sorted_fits_counts: print("No matched folders found.") return None else: print(f"Total files found: {total_counts}") return sorted_fits_counts def show_scisourcefolder(self, folder_key : str = '*'): """ Shows the contents of the source directory. Parameters ---------- folder_key : str The folder key to show. Returns ------- list of folder names: list List of folder names that match the folder key. """ print("Source directory:", os.path.join( self.helper.config['SDTDATA_SCISOURCEDIR'], folder_key)) if "*" in folder_key: folder_key = folder_key.replace("*", "") matched_folders = [] all_targets = os.listdir(self.helper.config['SDTDATA_SCISOURCEDIR']) for target in all_targets: if folder_key in target: matched_folders.append(target) all_matched_folders = set(matched_folders) print(f"{len(all_matched_folders)} folders found.") return sorted(all_matched_folders) def show_obsdestfolder(self, folder_key : str = '*'): """ Shows the contents of the source and destination directories. Parameters ---------- folder_key : str The folder key to show. Returns ------- list of folder names: list List of folder names that match the folder key. """ print("Source directory:", os.path.join( self.helper.config['SDTDATA_SCIDESTDIR'], folder_key)) if "*" in folder_key: folder_key = folder_key.replace("*", "") matched_folders = [] all_targets = os.listdir(self.helper.config['SDTDATA_SCIDESTDIR']) for target in all_targets: if folder_key in target: matched_folders.append(target) all_matched_folders = set(matched_folders) return sorted(all_matched_folders) def _run_obsrsync(self, telescope_id, foldername, file_pattern='*com.fits', ignore_exists=True): """ Copy ONLY files matching file_pattern from src to dest (recursively). """ src_folder = os.path.join(self.helper.config['SDTDATA_OBSSOURCEDIR'], telescope_id, foldername) dest_folder = os.path.join(self.helper.config['SDTDATA_OBSDESTDIR'], telescope_id, foldername) if not os.path.exists(src_folder): print(f"Source folder does not exist: {src_folder}") return os.makedirs(dest_folder, exist_ok=True) # Normalize to list (support multiple patterns if needed) includes = [file_pattern] if isinstance(file_pattern, str) else list(file_pattern or []) cmd = [ "rsync", "-av", "--info=progress2", "--no-inc-recursive", # lowers memory on big trees "--prune-empty-dirs", # don't create empty dirs at dest "--ignore-existing" if ignore_exists else "", "--include", "*/" # allow directory traversal ] for pat in includes: cmd += ["--include", pat] # IMPORTANT: exclude everything else cmd += ["--exclude", "*", src_folder + "/", dest_folder + "/"] print("Running:", " ".join(cmd)) subprocess.run(cmd, check=True) return dest_folder def _run_scirsync(self, telescope_id, targetname, file_pattern='*com.fits', ignore_exists=True): """ Copy ONLY files matching file_pattern from src to dest (recursively). """ src_folder = os.path.join(self.helper.config['SDTDATA_SCISOURCEDIR'], targetname, telescope_id) dest_folder = os.path.join(self.helper.config['SDTDATA_SCIDESTDIR'], targetname, telescope_id) if not os.path.exists(src_folder): print(f"Source folder does not exist: {src_folder}") return os.makedirs(dest_folder, exist_ok=True) # Normalize to list (support multiple patterns if needed) includes = [file_pattern] if isinstance(file_pattern, str) else list(file_pattern or []) cmd = [ "rsync", "-av", "--info=progress2", "--no-inc-recursive", # lowers memory on big trees "--prune-empty-dirs", # don't create empty dirs at dest "--ignore-existing" if ignore_exists else "", "--include", "*/" # allow directory traversal ] for pat in includes: cmd += ["--include", pat] # IMPORTANT: exclude everything else cmd += ["--exclude", "*", src_folder + "/", dest_folder + "/"] print("Running:", " ".join(cmd)) subprocess.run(cmd, check=True) return dest_folder