"""Light-curve containers: :class:`LightCurve` and :class:`MultiBandLightCurve`.
These are the universal inputs to every periodogram. A :class:`LightCurve` holds the
finite-or-not time/value/error arrays of a single band; a :class:`MultiBandLightCurve`
groups several bands (filters) of the *same* star for the multi-band methods. Both
ingest from raw arrays, a pandas ``DataFrame``, an astropy/pyarrow table, or a file
(CSV/ECSV/FITS/Parquet) with frictionless column mapping (:class:`ColumnMap`).
The time origin is deliberately *not* shifted here — periodograms are invariant to it
and each method references times to their own minimum internally for numerical
stability (the large-absolute-JD pitfall), so the container preserves the input times
verbatim.
"""
from __future__ import annotations
from collections.abc import Callable, Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
import numpy as np
from cuperiod.core._typing import FloatArray
from cuperiod.core.columns import ColumnMap, Domain
from cuperiod.core.errors import ColumnResolutionError
if TYPE_CHECKING:
import pandas as pd
#: mag = -2.5 log10(flux); d(mag) = (2.5/ln10) d(flux)/flux.
_MAG_PER_DEX: float = 2.5 / np.log(10.0)
def _as_f64(a: object, name: str) -> FloatArray:
arr = np.ascontiguousarray(np.asarray(a, dtype=np.float64))
if arr.ndim != 1:
raise ValueError(f"{name} must be 1-D, got shape {arr.shape}")
return arr
[docs]
@dataclass(frozen=True, eq=False)
class LightCurve:
"""A single band's time series.
Parameters
----------
time : numpy.ndarray
Observation times (JD/HJD/BJD), float64. Units are days throughout cuPeriod.
value : numpy.ndarray
Brightness in magnitudes or flux (see ``domain``).
error : numpy.ndarray, optional
1-sigma uncertainty on ``value``. ``None`` means unweighted.
domain : Domain, default :attr:`Domain.MAGNITUDE`
Whether ``value`` is a magnitude or a flux.
meta : Mapping, optional
Free-form metadata (e.g. object id, band name) carried through results.
Notes
-----
Arrays are coerced to contiguous float64 and length-checked on construction.
Non-finite points are *not* removed here; call :meth:`finite` for that.
"""
time: FloatArray
value: FloatArray
error: FloatArray | None = None
domain: Domain = Domain.MAGNITUDE
meta: Mapping[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
time = _as_f64(self.time, "time")
value = _as_f64(self.value, "value")
if time.size != value.size:
raise ValueError(
f"time and value length mismatch: {time.size} != {value.size}"
)
error = None if self.error is None else _as_f64(self.error, "error")
if error is not None and error.size != time.size:
raise ValueError(
f"error and time length mismatch: {error.size} != {time.size}"
)
object.__setattr__(self, "time", time)
object.__setattr__(self, "value", value)
object.__setattr__(self, "error", error)
# Coerce a string/aliased domain to the enum so identity checks in the
# flux/magnitude conversions are reliable.
object.__setattr__(self, "domain", Domain(self.domain))
# -- properties --------------------------------------------------------------
@property
def n(self) -> int:
"""Number of points."""
return int(self.time.size)
@property
def baseline(self) -> float:
"""Time span ``max(time) - min(time)`` in days (0.0 if empty)."""
if self.time.size == 0:
return 0.0
return float(self.time.max() - self.time.min())
# -- constructors ------------------------------------------------------------
[docs]
@classmethod
def from_arrays(
cls,
time: object,
value: object,
error: object | None = None,
*,
domain: Domain = Domain.MAGNITUDE,
meta: Mapping[str, Any] | None = None,
) -> LightCurve:
"""Build from raw array-likes."""
return cls(
time=_as_f64(time, "time"),
value=_as_f64(value, "value"),
error=None if error is None else _as_f64(error, "error"),
domain=domain,
meta=dict(meta or {}),
)
[docs]
@classmethod
def from_dataframe(
cls,
df: pd.DataFrame,
*,
columns: ColumnMap | None = None,
domain: Domain | None = None,
meta: Mapping[str, Any] | None = None,
) -> LightCurve:
"""Build from a pandas ``DataFrame`` with column auto-detection."""
names, getter = _adapt_table(df)
return cls._from_table(names, getter, columns, domain, meta)
[docs]
@classmethod
def from_file(
cls,
path: str | Path,
*,
columns: ColumnMap | None = None,
domain: Domain | None = None,
meta: Mapping[str, Any] | None = None,
) -> LightCurve:
"""Build from a CSV/ECSV/FITS/Parquet file with column auto-detection."""
names, getter = _read_table_file(Path(path))
base_meta = {"source": str(path), **dict(meta or {})}
return cls._from_table(names, getter, columns, domain, base_meta)
[docs]
@classmethod
def from_fits(
cls,
path: str | Path,
*,
hdu: int | str = 1,
columns: ColumnMap | None = None,
domain: Domain | None = None,
meta: Mapping[str, Any] | None = None,
) -> LightCurve:
"""Build from a binary-table FITS file (``hdu`` selects the extension)."""
from astropy.table import Table
table = Table.read(path, hdu=hdu)
names, getter = _adapt_table(table)
base_meta = {"source": str(path), **dict(meta or {})}
return cls._from_table(names, getter, columns, domain, base_meta)
@classmethod
def _from_table(
cls,
names: list[str],
getter: Callable[[str], FloatArray],
columns: ColumnMap | None,
domain: Domain | None,
meta: Mapping[str, Any] | None,
) -> LightCurve:
resolved = (columns or ColumnMap()).resolve(names, domain=domain)
error = None if resolved.error is None else getter(resolved.error)
return cls(
time=getter(resolved.time),
value=getter(resolved.value),
error=error,
domain=resolved.domain,
meta=dict(meta or {}),
)
# -- transforms --------------------------------------------------------------
[docs]
def finite(self) -> LightCurve:
"""Return a copy with non-finite points (and non-positive errors) removed.
Drops samples where time or value is NaN/inf, and — when errors are present
— where the error is non-finite or ``<= 0`` (an unusable weight).
"""
mask = np.isfinite(self.time) & np.isfinite(self.value)
if self.error is not None:
mask &= np.isfinite(self.error) & (self.error > 0.0)
if mask.all():
return self
return LightCurve(
time=self.time[mask],
value=self.value[mask],
error=None if self.error is None else self.error[mask],
domain=self.domain,
meta=self.meta,
)
[docs]
def as_flux(self, *, zeropoint: float = 0.0) -> LightCurve:
"""Convert to the flux domain (no-op if already flux).
Uses ``flux = 10 ** (-0.4 * (mag - zeropoint))`` and propagates the error as
``flux_err = 0.4 ln(10) * flux * mag_err``.
"""
if self.domain is Domain.FLUX:
return self
flux = np.power(10.0, -0.4 * (self.value - zeropoint))
error = None if self.error is None else (0.4 * np.log(10.0)) * flux * self.error
return LightCurve(self.time, flux, error, Domain.FLUX, self.meta)
[docs]
def as_magnitude(self, *, zeropoint: float = 0.0) -> LightCurve:
"""Convert to the magnitude domain (no-op if already magnitude).
Uses ``mag = -2.5 log10(flux) + zeropoint`` and propagates the error as
``mag_err = (2.5 / ln 10) * flux_err / flux``. Non-positive flux has no real
magnitude; those points become non-finite (without a spurious NumPy warning)
and are removed by :meth:`finite`.
"""
if self.domain is Domain.MAGNITUDE:
return self
with np.errstate(invalid="ignore", divide="ignore"):
mag = -2.5 * np.log10(self.value) + zeropoint
error = (
None
if self.error is None
else _MAG_PER_DEX * self.error / self.value
)
return LightCurve(self.time, mag, error, Domain.MAGNITUDE, self.meta)
[docs]
def in_domain(self, domain: Domain, *, zeropoint: float = 0.0) -> LightCurve:
"""Return this light curve converted to ``domain``."""
if domain is Domain.FLUX:
return self.as_flux(zeropoint=zeropoint)
return self.as_magnitude(zeropoint=zeropoint)
[docs]
@dataclass(frozen=True, eq=False)
class MultiBandLightCurve:
"""Several bands (filters) of the same star, for the multi-band methods.
Parameters
----------
bands : Mapping[str, LightCurve]
Band name → light curve. Order is preserved (insertion order).
meta : Mapping, optional
Free-form metadata (e.g. object id) carried through results.
"""
bands: Mapping[str, LightCurve]
meta: Mapping[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.bands:
raise ValueError("MultiBandLightCurve requires at least one band")
@property
def band_names(self) -> tuple[str, ...]:
"""The band labels, in order."""
return tuple(self.bands)
@property
def n_bands(self) -> int:
"""Number of bands."""
return len(self.bands)
[docs]
@classmethod
def from_light_curves(
cls,
bands: Mapping[str, LightCurve],
*,
meta: Mapping[str, Any] | None = None,
) -> MultiBandLightCurve:
"""Build from an explicit ``{band: LightCurve}`` mapping."""
return cls(bands=dict(bands), meta=dict(meta or {}))
[docs]
@classmethod
def from_dataframe(
cls,
df: pd.DataFrame,
*,
band_column: str | None = None,
columns: ColumnMap | None = None,
domain: Domain | None = None,
meta: Mapping[str, Any] | None = None,
) -> MultiBandLightCurve:
"""Build from one long-format ``DataFrame`` split on a band column.
Parameters
----------
df : pandas.DataFrame
Long-format table with a band/filter column and shared time/value/error
columns.
band_column : str, optional
Band column name; auto-detected from :data:`BAND_NAMES` if ``None``.
columns, domain, meta
As for :meth:`LightCurve.from_dataframe`.
"""
names, _ = _adapt_table(df)
cmap = columns or ColumnMap(band=band_column)
resolved = cmap.resolve(names, domain=domain)
if resolved.band is None:
raise ColumnResolutionError(
"could not resolve a band column; pass band_column=... "
f"or ColumnMap(band=...). Available columns: {names}"
)
bands: dict[str, LightCurve] = {}
for label, sub in df.groupby(resolved.band, sort=False):
names_s, getter_s = _adapt_table(sub)
bands[str(label)] = LightCurve._from_table(
names_s, getter_s, columns, resolved.domain, {"band": str(label)}
)
return cls(bands=bands, meta=dict(meta or {}))
[docs]
def finite(self) -> MultiBandLightCurve:
"""Return a copy with each band's non-finite points removed."""
return MultiBandLightCurve(
bands={k: lc.finite() for k, lc in self.bands.items()}, meta=self.meta
)
[docs]
def stacked(self) -> tuple[FloatArray, FloatArray, FloatArray | None, np.ndarray]:
"""Concatenate all bands into time-sorted ``(time, value, error, band)`` arrays.
Returns
-------
time, value : numpy.ndarray
Concatenated, ascending in time.
error : numpy.ndarray or None
Concatenated errors, or ``None`` if any band lacks errors.
band : numpy.ndarray
Per-point band label (object dtype), aligned with the sorted arrays.
"""
times, values, errors, labels = [], [], [], []
any_missing_error = False
for name, lc in self.bands.items():
times.append(lc.time)
values.append(lc.value)
if lc.error is None:
any_missing_error = True
else:
errors.append(lc.error)
labels.append(np.full(lc.n, name, dtype=object))
time = np.concatenate(times) if times else np.empty(0)
value = np.concatenate(values) if values else np.empty(0)
band = np.concatenate(labels) if labels else np.empty(0, dtype=object)
error = None if any_missing_error else np.concatenate(errors)
order = np.argsort(time, kind="stable")
return (
time[order],
value[order],
None if error is None else error[order],
band[order],
)
# --- table adapters ----------------------------------------------------------
def _column_array(values: object) -> FloatArray:
"""Coerce a table column to a float64 numpy array, filling masked entries."""
if hasattr(values, "filled"): # numpy masked array / astropy masked column
values = values.filled(np.nan) # type: ignore[attr-defined]
return np.asarray(values, dtype=np.float64)
def _adapt_table(obj: Any) -> tuple[list[str], Callable[[str], FloatArray]]:
"""Return ``(column_names, getter)`` for a pandas/astropy/pyarrow/dict table."""
# astropy Table (checked before pandas: Tables also expose .loc/.columns).
if hasattr(obj, "colnames"):
return list(obj.colnames), lambda name: _column_array(obj[name])
# pyarrow Table.
if hasattr(obj, "column_names") and hasattr(obj, "column"):
names = list(obj.column_names)
return names, lambda name: _column_array(
obj.column(name).to_numpy(zero_copy_only=False)
)
# pandas DataFrame (``iloc`` is pandas-specific).
if hasattr(obj, "iloc") and hasattr(obj, "columns"):
cols = [str(c) for c in obj.columns]
return cols, lambda name: _column_array(obj[name].to_numpy())
# plain mapping of arrays.
if isinstance(obj, Mapping):
return list(obj.keys()), lambda name: _column_array(obj[name])
raise TypeError(f"unsupported table type: {type(obj)!r}")
def _read_table_file(path: Path) -> tuple[list[str], Callable[[str], FloatArray]]:
"""Read a tabular file into ``(column_names, getter)`` by extension."""
suffix = path.suffix.lower()
if suffix in {".parquet", ".pq"}:
import pyarrow.parquet as pq
return _adapt_table(pq.read_table(path))
from astropy.table import Table
if suffix in {".fits", ".fit", ".fz"}:
return _adapt_table(Table.read(path))
if suffix in {".csv"}:
return _adapt_table(Table.read(path, format="ascii.csv"))
if suffix in {".ecsv"}:
return _adapt_table(Table.read(path, format="ascii.ecsv"))
if suffix in {".tsv", ".tab"}:
return _adapt_table(Table.read(path, format="ascii.tab"))
# .dat/.txt and unknowns: let astropy guess the ASCII flavor.
return _adapt_table(Table.read(path, format="ascii"))
__all__ = ["LightCurve", "MultiBandLightCurve"]