Source code for src.importer.response

"""Domain models for upload failure tracking and duplicate-write policy.

This module defines:

* :class:`DuplicatePolicy` — Redis TimeSeries write-conflict strategy.
* :class:`RowFailure` — a failed upload for a single DataFrame row.
* :class:`BatchFailure` — a failed upload for an entire data-type batch.
* :data:`UploadFailure` — union alias consumed by the pipeline and importer.
* :func:`failures_to_json` / :func:`failures_from_json` — persistence helpers.
"""

import json
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------


[docs] class DuplicatePolicy(Enum): """Redis TimeSeries per-command duplicate-write strategy. :attr:`FIRST` is used by :meth:`~.importer.HealthDataImporter.etl` for initial imports so that re-running an export never overwrites existing data points. It is also used by :meth:`~.importer.HealthDataImporter.retry_failed`. :attr:`LAST` is used by :meth:`~.importer.HealthDataImporter.update` to overwrite existing data points with new values. Example:: policy = DuplicatePolicy.FIRST pipe.add(key="hr:start", timestamp=ts, value=v, duplicate_policy=policy.value) """ FIRST = "FIRST" LAST = "LAST"
# --------------------------------------------------------------------------- # Failure dataclasses # ---------------------------------------------------------------------------
[docs] @dataclass class RowFailure: """Records a failed upload for a single DataFrame row. Produced by :func:`~.pipeline.upload_batch` when one or both ``TS.ADD`` commands for a specific row return a :class:`~redis.exceptions.ResponseError` inside the pipeline response (i.e. the connection itself stayed up, but that individual command failed). Attributes: data_type: The ``data_type`` column value for this row, e.g. ``"HKQuantityTypeIdentifierHeartRate"``. row_index: The pandas DataFrame index label of the failed row. Use ``df.loc[failure.row_index]`` to retrieve the full row. start_error: Human-readable message from the :class:`~redis.exceptions.ResponseError` for the ``<type>:start`` command, or ``None`` if that command succeeded. end_error: Human-readable message from the :class:`~redis.exceptions.ResponseError` for the ``<type>:end`` command, or ``None`` if that command succeeded. Example:: f = RowFailure(data_type="HKQuantityTypeIdentifierHeartRate", row_index=42, start_error="TSDB: Duplicate") """ data_type: str row_index: Any start_error: str | None = field(default=None) end_error: str | None = field(default=None)
[docs] def __str__(self) -> str: """Create human-readable string representation of this failure. Returns: String in the form RowFailure(data_type=..., row_index=..., errors=[...]). """ errors: list[str] = [] if self.start_error is not None: errors.append(f"start={self.start_error!r}") if self.end_error is not None: errors.append(f"end={self.end_error!r}") return ( f"RowFailure(data_type={self.data_type!r}, " f"row_index={self.row_index!r}, " f"errors=[{', '.join(errors)}])" )
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns: A plain ``dict`` with keys ``kind``, ``data_type``, ``row_index``, ``start_error``, and ``end_error``. Example:: RowFailure("HR", 0, start_error="err").to_dict() # → {"kind": "row", "data_type": "HR", "row_index": 0, # "start_error": "err", "end_error": None} """ return { "kind": "row", "data_type": self.data_type, "row_index": self.row_index, "start_error": self.start_error, "end_error": self.end_error, }
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> "RowFailure": """Deserialise from a dictionary produced by :meth:`to_dict`. Args: d: Dictionary with ``data_type``, ``row_index``, ``start_error``, and ``end_error`` keys. Returns: A :class:`RowFailure` instance. Example:: RowFailure.from_dict({"data_type": "HR", "row_index": 0, "start_error": None, "end_error": None}) """ return cls( data_type=d["data_type"], row_index=d["row_index"], start_error=d.get("start_error"), end_error=d.get("end_error"), )
[docs] @dataclass class BatchFailure: """Records a failed upload for an entire data-type batch. Produced by :func:`~.pipeline.upload_batch` when the pipeline itself raises a :class:`~redis.exceptions.RedisError` (e.g. a connection failure, authentication error, or server-side crash). In this case no row-level response is available, so the entire batch is marked failed. Attributes: data_type: The ``data_type`` column value for this batch, e.g. ``"HKQuantityTypeIdentifierHeartRate"``. error: Human-readable message from the :class:`~redis.exceptions.RedisError` that was raised. Example:: f = BatchFailure(data_type="HKQuantityTypeIdentifierHeartRate", error="Connection reset by peer") """ data_type: str error: str
[docs] def __str__(self) -> str: """Create human-readable string representation of this failure. Returns: String in the form BatchFailure(data_type=..., errors=[...]). """ return f"BatchFailure(data_type={self.data_type!r}, error={self.error!r})"
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns: A plain ``dict`` with keys ``kind``, ``data_type``, and ``error``. Example:: BatchFailure("HR", "timeout").to_dict() # → {"kind": "batch", "data_type": "HR", "error": "timeout"} """ return { "kind": "batch", "data_type": self.data_type, "error": self.error, }
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> "BatchFailure": """Deserialise from a dictionary produced by :meth:`to_dict`. Args: d: Dictionary with ``data_type`` and ``error`` keys. Returns: A :class:`BatchFailure` instance. Example:: BatchFailure.from_dict({"data_type": "HR", "error": "timeout"}) """ return cls( data_type=d["data_type"], error=d["error"], )
#: Union type for a single upload failure — either a row-level or batch-level #: failure. Used as the element type of the failures list returned by #: :meth:`~HealthDataImporter._load` and stored on the importer instance. UploadFailure = RowFailure | BatchFailure # --------------------------------------------------------------------------- # JSON persistence helpers # ---------------------------------------------------------------------------
[docs] def failures_to_json(failures: list[UploadFailure]) -> str: """Serialise a list of :class:`UploadFailure` objects to a JSON string. Args: failures: List of :class:`RowFailure` or :class:`BatchFailure` objects. Returns: A pretty-printed JSON string suitable for writing to disk. Example:: text = failures_to_json([BatchFailure("HR", "timeout")]) Path("failures.json").write_text(text) """ return json.dumps([f.to_dict() for f in failures], indent=2)
[docs] def failures_from_json(text: str) -> list[UploadFailure]: """Deserialise a JSON string produced by :func:`failures_to_json`. Args: text: JSON string as written by :func:`failures_to_json`. Returns: List of :class:`RowFailure` and/or :class:`BatchFailure` objects. Raises: ValueError: If an entry has an unknown ``kind`` value. Example:: failures = failures_from_json(Path("failures.json").read_text()) """ failures: list[UploadFailure] = [] for entry in json.loads(text): kind = entry.get("kind") if kind == "row": failures.append(RowFailure.from_dict(entry)) elif kind == "batch": failures.append(BatchFailure.from_dict(entry)) else: raise ValueError( f"Unknown failure kind {kind!r} in failures file. " "File may be corrupted." ) return failures