Source code for src.importer.pipeline

"""Redis TimeSeries pipeline upload helpers.

All writes to Redis TimeSeries are batched through a single
:class:`~redis.commands.timeseries.Pipeline` per data-type call, reducing
round-trips to one network exchange per type regardless of row count.

Importantly :meth:`~redis.commands.timeseries.Pipeline.execute` is called with
``raise_on_error=False``. This means individual ``TS.ADD`` command failures
are surfaced as :class:`~redis.exceptions.ResponseError` *objects* inside the
response list rather than raised as exceptions, allowing the response-inspection
loop in :func:`_resolve_failures` to produce granular :class:`~.models.RowFailure`
entries instead of collapsing an entire batch into a single
:class:`~models.BatchFailure` for even a single failure.

A :class:`~.models.BatchFailure` is still produced when the pipeline itself
raises a :class:`~redis.exceptions.RedisError` (connection loss, auth
failure, etc.) — in that case no per-row response is available.

Provides :func:`upload_batch` for public use.
"""

import logging
from typing import Any, Protocol

import pandas as pd
from redis import ResponseError
from redis.commands.timeseries import Pipeline, TimeSeries

from .response import DuplicatePolicy, RowFailure

logger = logging.getLogger(__name__)


[docs] class _HealthRow(Protocol): """Row attributes consumed by :func:`_add_row_to_pipeline`. Structural type matching what ``df.itertuples()`` yields for a well-formed health DataFrame. Only the attributes actually read are required; extra attributes on the namedtuple are ignored. """ type: str sourceName: str # noqa: N815 unit: str value: Any startDate: Any # noqa: N815 endDate: Any # noqa: N815
[docs] def _add_row_to_pipeline( pipe: Pipeline, row: _HealthRow, duplicate_policy: DuplicatePolicy = DuplicatePolicy.FIRST, ) -> None: """Queue a single health record's start and end ``TS.ADD`` commands on *pipe*. Two commands are issued per record: * ``<type>:start`` — timestamped at ``startDate`` * ``<type>:end`` — timestamped at ``endDate`` The commands are not executed until :meth:`~redis.commands.timeseries.Pipeline.execute` is called by the caller, so this function is always O(1) and performs no network I/O. Args: pipe: An open Redis TimeSeries pipeline. row: A single ``itertuples`` row from the health records DataFrame. Expected named attributes: ``type``, ``startDate``, ``endDate``, ``value``, ``sourceName``, ``unit``. duplicate_policy: Per-command write-conflict strategy forwarded to every ``TS.ADD`` call. Example:: pipe = rts.pipeline() for row in df.itertuples(): _add_row_to_pipeline(pipe, row, DuplicatePolicy.FIRST) pipe.execute(raise_on_error=False) """ labels = {"sourceName": row.sourceName, "unit": row.unit} common: dict[str, Any] = { "labels": labels, "duplicate_policy": duplicate_policy.value, "value": row.value, } pipe.add(key=f"{row.type}:start", timestamp=row.startDate, **common) pipe.add(key=f"{row.type}:end", timestamp=row.endDate, **common)
[docs] def _resolve_failures( response: list[Any], df: pd.DataFrame, ) -> list[RowFailure]: """Inspect a pipeline response and record per-row failures. Maps the flat *response* list back to individual rows using the ``response[2i] / response[2i+1]`` convention (start / end command per row *i*). Successful commands return an ``int`` (the stored timestamp); failed commands return a :class:`~redis.exceptions.ResponseError` object when the pipeline was executed with ``raise_on_error=False``. Args: response: Flat list returned by ``pipe.execute(raise_on_error=False)``. Length must equal ``2 * len(indices)``. df: Health records DataFrame of batch belonging to input `response`. Returns: List of :class:`~.models.RowFailure` objects. Empty on full success. Raises: IndexError if response and df don't have the same length Example:: pipe = rts.pipeline() # … queue commands … response = pipe.execute(raise_on_error=False) failures = _resolve_failures(response, df) """ if len(response) != 2 * len(df): raise IndexError("response must contain two elements per df row.") row_failures: list[RowFailure] = [] for pos, row in enumerate(df.itertuples()): idx = row.Index start_resp = response[pos * 2] end_resp = response[pos * 2 + 1] start_ok = not isinstance(start_resp, ResponseError) end_ok = not isinstance(end_resp, ResponseError) if start_ok and end_ok: continue failure = RowFailure( data_type=row.type, row_index=idx, start_error=None if start_ok else str(start_resp), end_error=None if end_ok else str(end_resp), ) row_failures.append(failure) logger.info( "Row %s (type=%s startDate=%s endDate=%s) failed — %s", idx, row.Index, df.loc[idx, "startDate"], df.loc[idx, "endDate"], failure, ) return row_failures
[docs] def upload_batch( rts: TimeSeries, df: pd.DataFrame, duplicate_policy: DuplicatePolicy = DuplicatePolicy.FIRST, ) -> list[RowFailure]: """Upload data slice to Redis TimeSeries via a single pipeline. All ``TS.ADD`` commands for the slice are batched into one :class:`~redis.commands.timeseries.Pipeline` and flushed in a single network round-trip. :meth:`Pipeline.execute` is called with ``raise_on_error=False`` so that individual command failures are returned as :class:`~redis.exceptions.ResponseError` objects in the response list rather than raised as exceptions, enabling granular :class:`~models.RowFailure` reporting. Failure semantics: * **Connection-level failure** (:class:`~redis.exceptions.RedisError` raised by :meth:`Pipeline.execute`): Error is handled by caller. * **Command-level failure** (:class:`~redis.exceptions.ResponseError` objects in the response list): one :class:`~models.RowFailure` per affected row, with the error message(s) for its start and/or end command. Response mapping (two ``TS.ADD`` commands per row): .. code-block:: text response[2i] → start command for row i response[2i+1] → end command for row i Args: rts: Redis TimeSeries client. df: Health records DataFrame for a batch. duplicate_policy: Per-command write-conflict strategy forwarded to every ``pipe.add()`` call. Returns: List of :class:`~.models.RowFailure` objects. Empty on full success. Example:: r = redis_connect() rts = r.ts() row_failures = upload_batch(rts, df) """ pipe: Pipeline = rts.pipeline() for row in df.itertuples(): _add_row_to_pipeline(pipe, row, duplicate_policy=duplicate_policy) response: list[Any] = pipe.execute(raise_on_error=False) return _resolve_failures( response=response, df=df, )