Source code for src.importer.transform
"""Apple Health DataFrame transformation pipeline.
Provides public function `transform` that handles necessary transformations
of raw Apple Health export data.
All public and private functions in this module accept a ``df`` argument that
refers to the same underlying :class:`~pandas.DataFrame` and mutate it
in-place. No copies are created, keeping peak RAM proportional to the
export size rather than a multiple of it.
"""
import logging
import pandas as pd
from ..model import CATEGORICAL_IDENTIFIER_MAPS
from ..model.base import MissingUnit
from .data_check import KNOWN_CATEGORY_TYPE_VIOLATIONS, check_export_data
logger = logging.getLogger(__name__)
# these columns are expected to contain entries without value
COLUMNS_WITHOUT_VALUE = ["unit", "device"]
[docs]
def _drop_null_values(df: pd.DataFrame) -> None:
"""Drop rows with a ``NaN`` ``value`` field and log the count.
Mutates *df* in-place.
Note:
If *all* rows have a ``NaN`` ``value``, *df* will be empty after
this call. Subsequent steps handle an empty DataFrame gracefully.
Args:
df: Health records DataFrame; rows with a ``NaN`` ``value`` are
removed.
Example::
before = len(df)
_drop_null_values(df)
print(f"Dropped {before - len(df)} rows")
"""
null_mask = pd.isna(df["value"])
n_dropped = int(null_mask.sum())
if n_dropped:
df.drop(index=df.index[null_mask], inplace=True) # noqa: PD002
logger.warning("Dropped %d rows with missing 'value'.", n_dropped)
[docs]
def _handle_categorical_units(df: pd.DataFrame) -> None:
"""Assign integer values and a sentinel unit to categorical records.
Rows without a ``unit`` value are treated as categorical. Their string ``value``
is resolved to a signed integer/NaN via :func:`_map_categories`, and their ``unit``
is set to :attr:`~model.base.MissingUnit.CATEGORICAL`. For `type`/`value` pairs
in ``KNOWN_CATEGORY_TYPE_VIOLATIONS`` `type` is updated.
Note:
A warning is logged (and the row left unmodified) if a ``type`` or
``value`` string is absent from the categorical registry.
Args:
df: Health records DataFrame; the ``value`` and ``unit`` columns are
mutated in-place for categorical rows.
Raises:
NotImplementedError: If ``NaN`` values are found in any column other
than ``unit``, indicating an unexpected schema change.
ValueError: If a row without a unit has a numeric ``value``; only
categorical string values are expected in that position.
Example::
_handle_categorical_units(df)
# Categorical rows now have integer values and unit == "Categorical"
"""
null_columns: pd.Index = df.columns[df.isna().any()]
if not null_columns.isin(COLUMNS_WITHOUT_VALUE).all():
unexpected = null_columns.difference(COLUMNS_WITHOUT_VALUE).tolist()
raise NotImplementedError(
f"Unexpected column(s) have NaN (schema may have changed): {unexpected}"
)
no_unit: pd.Series = df["unit"].isna()
if not no_unit.any():
return
numeric_mask = pd.to_numeric(df.loc[no_unit, "value"], errors="coerce").notna()
if numeric_mask.any():
raise ValueError(
"Some records without a unit have a numeric value; "
"expected only categorical strings."
)
_replace_known_violations(df)
_map_categories(df, no_unit)
df.loc[no_unit, "unit"] = MissingUnit.CATEGORICAL.value
[docs]
def _timestamps_to_unix(series: pd.Series) -> pd.Series:
"""Convert a ``datetime64`` Series to Unix timestamps in whole seconds.
Divides the nanosecond epoch integer representation by 10⁹ using integer
floor division to avoid floating-point rounding errors.
Args:
series: A ``datetime64[ns]`` pandas Series.
Returns:
An ``int64`` Series of Unix timestamps in **seconds**.
Example::
unix_ts = _timestamps_to_unix(df["startDate"])
# unix_ts.dtype == int64
"""
return (
series.astype("datetime64[ns, UTC]").astype("int64") // 1_000_000_000
).astype("int64")
[docs]
def _map_categories(df: pd.DataFrame, no_unit: pd.Series) -> None:
"""Replace categorical string values with integer values in-place.
Note:
The :meth:`~pd.DataFrame.groupby` call creates a temporary copy of the
categorical slice; the slice size is expected to be small relative
to the full DataFrame.
Args:
df: Health records DataFrame; the ``value`` column is mutated in-place
for rows selected by *no_unit*.
no_unit: Boolean mask selecting categorical rows (those with no
``unit`` value).
Raises:
KeyError: If a ``type`` string is absent from
:data:`~model.CATEGORICAL_IDENTIFIER_MAPS`, or if a ``value``
string is not a valid member name of the corresponding
:class:`~model.base.HKCategoryTypeIdentifier`.
Example::
_map_categories(df, df["unit"].isna())
# df.loc[no_unit, "value"] now contains signed numbers
"""
categorical_slice = df.loc[no_unit, ["type", "value"]]
result: list[str] = []
missing: dict[str, set[str]] = {}
for type_, value in zip(
categorical_slice["type"], categorical_slice["value"], strict=True
):
try:
result.append(str(CATEGORICAL_IDENTIFIER_MAPS[type_][value]))
except KeyError:
missing.setdefault(type_, set()).add(value)
if missing:
raise KeyError(f"Unknown value(s) for type(s): {missing}")
df.loc[categorical_slice.index, "value"] = result
[docs]
def _replace_known_violations(df: pd.DataFrame) -> None:
"""Replace faulty category identifiers.
Replaces `type` for :class:`~..model.base.HKCategoryTypeIdentifier` if `value`
better matches another identifier. Identifier/value combinations that should
be replaced are kept in :const:`~.data_check.KNOWN_CATEGORY_TYPE_VIOLATIONS`.
"""
for faulty_type, (
value_list,
correct_type,
) in KNOWN_CATEGORY_TYPE_VIOLATIONS.items():
df.loc[(df["type"] == faulty_type) & (df["value"].isin(value_list)), "type"] = (
correct_type
)