Importer

Importer

Health Data Importer.

Uploads Apple Health export.zip data to a local Redis TimeSeries database via an Extract → Transform → Load (ETL) pipeline.

Typical usage:

importer = HealthDataImporter()
importer.etl(write_feather=True)

for f in importer.failures:
    print(f)

# Retry only the failed data points (can be called in a new session)
remaining = importer.retry_failed(df)

# Overwrite existing data points with the latest values
importer.update()
class src.importer.importer.HealthDataImporter(connection, data_dir='data', in_file='export.zip', working_dir=None, out_file='export.feather', failures_file='upload_failures.json')[source]

Bases: object

Import Apple Health export data into Redis TimeSeries.

After calling etl() or update(), any upload failures are accessible via failures and persisted to failures_file so that retry_failed() can be called in a later Python session.

Parameters:
  • connection (Redis) – redis.Redis to connect to.

  • data_dir (str) – Sub-directory (relative to working_dir) that holds data files.

  • in_file (str) – Name of the Apple Health ZIP export inside data_dir.

  • working_dir (Path | str | None) – Root directory; defaults to the current working directory.

  • out_file (str) – Name of the Feather cache file written to data_dir.

  • failures_file (str) – Name of the JSON file that persists upload failures between sessions.

Example:

# Uses the conventional data/export.zip layout
importer = HealthDataImporter(connection=redis_connect())
importer.etl(write_feather=True)

# Non-standard layout
HealthDataImporter(data_dir="exports", in_file="2026-q1.zip",
                   connection=redis_connect())
_delete_failures_file()[source]

Delete failures_file if it exists.

Called after a fully successful load or a fully successful retry to ensure no stale file misleads a future retry_failed() call.

Example:

importer._delete_failures_file()
Return type:

None

_extract(*, write_feather, no_cache)[source]

Parse the Apple Health export and return a raw DataFrame.

Prefers the Feather cache at output_file to avoid re-running the slow XML-to-Feather conversion. Falls back to the ZIP export at zip_file if no cache exists.

Parameters:
  • write_feather (bool) – Write a Feather cache file after parsing the ZIP export.

  • no_cache (bool) – If True, ignores pre-existing cache file and reads ZIP input.

Return type:

DataFrame

Returns:

Raw health records as a DataFrame.

Raises:

FileNotFoundError – When neither the Feather cache nor the source ZIP can be found.

Example:

df = importer._extract(write_feather=True, no_cache=False)
_read_failures_file()[source]

Read and deserialise the failures file.

Return type:

list[RowFailure | BatchFailure]

Returns:

List of UploadFailure objects.

Raises:

Example:

failures = importer._read_failures_file()
_update_failures_file()[source]
Return type:

None

_write_failures_file(failures)[source]

Serialise failures and write them to failures_file.

Overwrites any existing file so the file always reflects the current state.

Parameters:

failures (list[RowFailure | BatchFailure]) – List of UploadFailure objects to persist.

Return type:

None

Example:

importer._write_failures_file(importer.failures)
etl(*, write_feather=False, persist_failures=True, no_cache=False)[source]

Run the full Extract → Transform → Load pipeline.

Uses DuplicatePolicy.First so that re-running the same export never overwrites existing data points. To overwrite, use update() instead.

When setting persist_failures=True any upload failures overwrite failures_file so that retry_failed() can be called in another Python session:

importer.etl(persist_failures=True)
# failures are persisted at data/upload_failures.json and as
# instance attribute
if importer.failures:
    importer.retry_failed(df)
Parameters:
  • write_feather (bool) – Persist the parsed data as a Feather cache so that subsequent runs skip the slow XML extraction step.

  • persist_failures (bool) – Persist a file that contains which data could not be uploaded as a JSON file.

  • no_cache (bool) – If True, ignores pre-existing cache file and reads ZIP input.

Raises:
  • FileNotFoundError – When neither the Feather cache nor the source ZIP can be found.

  • NotImplementedError – If NaN values are found in any column other than unit, indicating an unexpected schema change.

  • ValueError – If a row without a unit has a numeric value; only categorical string values are expected in that position.

Example:

:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

importer.etl(write_feather=True)

retry_failed(*, persist_failures=True)[source]

Re-attempt uploading every data point recorded in failures_file.

Reads the failures JSON file written by the most recent etl(), update(), or previous retry_failed() call which overwrites failures. This method can be called in a completely separate Python session as long as the failures file and the Feather cache still exist:

# New session — no need to re-run etl()
importer = HealthDataImporter()
df = feather.read_feather("data/export.feather")
importer.retry_failed()

Retry behaviour:

Loads dataframe from self.out_file and finds entries that failed to upload, and passes this subset to _load().

After the retry:

  • If all previously failed data points now succeed, the failures file is deleted.

  • If some failures remain, the file is overwritten with only the still-failing entries.

Parameters:

persist_failures (bool) – Persist a file that contains which data could not be uploaded as a JSON file.

Return type:

None

Raises::
FileNotFoundError: When neither the Feather cache nor the source

ZIP can be found, or if failures_file does not exist.

NotImplementedError: If NaN values are found in any column other

than unit, indicating an unexpected schema change.

ValueError: If a row without a unit has a numeric value; only

categorical string values are expected in that position.

Example:

importer.retry_failed()
if not importer.failures:
    print("All failures resolved.")
update(*, write_feather=False, persist_failures=True, no_cache=False)[source]

Re-import the export, overwriting existing data points.

Identical to etl() except it uses DuplicatePolicy.LAST, which means any timestamp that already exists in Redis is overwritten with the new value rather than kept.

Parameters:
  • write_feather (bool) – Persist the parsed data as a Feather cache.

  • persist_failures (bool) – Persist a file that contains which data could not be uploaded as a JSON file.

  • no_cache (bool) – If True, ignores pre-existing cache file and reads ZIP input.

Raises:
  • FileNotFoundError – When neither the Feather cache nor the source ZIP can be found.

  • NotImplementedError – If NaN values are found in any column other than unit, indicating an unexpected schema change.

  • ValueError – If a row without a unit has a numeric value; only categorical string values are expected in that position.

Example:

:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

importer.update()

src.importer.importer._load(df, r, duplicate_policy=DuplicatePolicy.FIRST)[source]

Batch-upload all records to Redis TimeSeries.

Each unique type is uploaded in its own pipeline transaction so that a failure in one type does not abort the others. Returns a list of UploadFailure objects.

Parameters:
Return type:

list[RowFailure | BatchFailure]

Returns:

A new list of UploadFailure objects. Empty if all data points were successfully uploaded.

Example:

failures = importer._load(df, importer.connection, DuplicatePolicy.LAST)

Parser

Parser for Apple Health export files.

Reads the export.zip produced by the Apple Health app and returns a single tidy DataFrame that can be saved to by the caller.

exception src.importer.parser.NoHealthDataError[source]

Bases: ValueError

Raised when trying to parse a zip file without usable data.

src.importer.parser.parse_apple_health(zip_path)[source]

Parse an Apple Health export archive into a DataFrame.

Reads all Record elements from the export.xml inside the given export.zip and returns them as a single tidy DataFrame with timezone-aware datetime columns. The output schema is a superset of the one produced by apple-health-exporter’s health_xml_to_feather, adding device, sourceVersion, and creationDate columns.

Parameters:

zip_path (str | Path) – Path to the export.zip file generated by the Apple Health app.

Returns:

  • type (str): The HealthKit record type, e.g. HKQuantityTypeIdentifierHeartRate.

  • sourceName (str): Human-readable name of the recording source, e.g. "Apple Watch".

  • sourceVersion (str): Version string of the recording source app.

  • device (str): Full device identifier string as reported by HealthKit.

  • unit (str): Unit of measurement, e.g. "count/min" or "kg".

  • startDate (datetime64[ns]): Start of the recorded interval.

  • endDate (datetime64[ns]): End of the recorded interval.

  • creationDate (datetime64[ns]): When the record was created

  • value (str): The recorded value as a string, e.g. "72" or "5.6". Cast to a numeric type by the caller if needed.

Return type:

DataFrame

Raises:

NoHealthDataError – If the zip_path file contains none of the _COLUMNS data.

Example

>>> df = parse_apple_health("export.zip")
>>> df.to_feather("data.feather")
>>> heart_rate = df[df["type"] == "HKQuantityTypeIdentifierHeartRate"]
>>> heart_rate["value"].astype(float).mean()

Pipeline

Redis TimeSeries pipeline upload helpers.

All writes to Redis TimeSeries are batched through a single Pipeline per data-type call, reducing round-trips to one network exchange per type regardless of row count.

Importantly execute() is called with raise_on_error=False. This means individual TS.ADD command failures are surfaced as ResponseError objects inside the response list rather than raised as exceptions, allowing the response-inspection loop in _resolve_failures() to produce granular RowFailure entries instead of collapsing an entire batch into a single BatchFailure for even a single failure.

A BatchFailure is still produced when the pipeline itself raises a RedisError (connection loss, auth failure, etc.) — in that case no per-row response is available.

Provides upload_batch() for public use.

class src.importer.pipeline._HealthRow(*args, **kwargs)[source]

Bases: Protocol

Row attributes consumed by _add_row_to_pipeline().

Structural type matching what df.itertuples() yields for a well-formed health DataFrame. Only the attributes actually read are required; extra attributes on the namedtuple are ignored.

_abc_impl = <_abc._abc_data object>
_is_protocol = True
endDate: Any
sourceName: str
startDate: Any
type: str
unit: str
value: Any
src.importer.pipeline._add_row_to_pipeline(pipe, row, duplicate_policy=DuplicatePolicy.FIRST)[source]

Queue a single health record’s start and end TS.ADD commands on pipe.

Two commands are issued per record:

  • <type>:start — timestamped at startDate

  • <type>:end — timestamped at endDate

The commands are not executed until execute() is called by the caller, so this function is always O(1) and performs no network I/O.

Parameters:
  • pipe (Pipeline) – An open Redis TimeSeries pipeline.

  • row (_HealthRow) – A single itertuples row from the health records DataFrame. Expected named attributes: type, startDate, endDate, value, sourceName, unit.

  • duplicate_policy (DuplicatePolicy) – Per-command write-conflict strategy forwarded to every TS.ADD call.

Return type:

None

Example:

pipe = rts.pipeline()
for row in df.itertuples():
    _add_row_to_pipeline(pipe, row, DuplicatePolicy.FIRST)
pipe.execute(raise_on_error=False)
src.importer.pipeline._resolve_failures(response, df)[source]

Inspect a pipeline response and record per-row failures.

Maps the flat response list back to individual rows using the response[2i] / response[2i+1] convention (start / end command per row i). Successful commands return an int (the stored timestamp); failed commands return a ResponseError object when the pipeline was executed with raise_on_error=False.

Parameters:
  • response (list[Any]) – Flat list returned by pipe.execute(raise_on_error=False). Length must equal 2 * len(indices).

  • df (DataFrame) – Health records DataFrame of batch belonging to input response.

Return type:

list[RowFailure]

Returns:

List of RowFailure objects. Empty on full success.

Raises:

IndexError if response and df don't have the same length

Example:

pipe = rts.pipeline()
# … queue commands …
response = pipe.execute(raise_on_error=False)
failures = _resolve_failures(response, df)
src.importer.pipeline.upload_batch(rts, df, duplicate_policy=DuplicatePolicy.FIRST)[source]

Upload data slice to Redis TimeSeries via a single pipeline.

All TS.ADD commands for the slice are batched into one Pipeline and flushed in a single network round-trip. Pipeline.execute() is called with raise_on_error=False so that individual command failures are returned as ResponseError objects in the response list rather than raised as exceptions, enabling granular RowFailure reporting.

Failure semantics:

  • Connection-level failure (RedisError raised by Pipeline.execute()): Error is handled by caller.

  • Command-level failure (ResponseError objects in the response list): one RowFailure per affected row, with the error message(s) for its start and/or end command.

Response mapping (two TS.ADD commands per row):

response[2i]   → start command for row i
response[2i+1] → end command for row i
Parameters:
  • rts (TimeSeries) – Redis TimeSeries client.

  • df (DataFrame) – Health records DataFrame for a batch.

  • duplicate_policy (DuplicatePolicy) – Per-command write-conflict strategy forwarded to every pipe.add() call.

Return type:

list[RowFailure]

Returns:

List of RowFailure objects. Empty on full success.

Example:

r = redis_connect()
rts = r.ts()
row_failures = upload_batch(rts, df)

Response

Domain models for upload failure tracking and duplicate-write policy.

This module defines:

class src.importer.response.BatchFailure(data_type, error)[source]

Bases: object

Records a failed upload for an entire data-type batch.

Produced by upload_batch() when the pipeline itself raises a RedisError (e.g. a connection failure, authentication error, or server-side crash). In this case no row-level response is available, so the entire batch is marked failed.

Variables:
  • data_type – The data_type column value for this batch, e.g. "HKQuantityTypeIdentifierHeartRate".

  • error – Human-readable message from the RedisError that was raised.

Example:

f = BatchFailure(data_type="HKQuantityTypeIdentifierHeartRate",
                 error="Connection reset by peer")
__str__()[source]

Create human-readable string representation of this failure.

Return type:

str

Returns:

String in the form BatchFailure(data_type=…, errors=[…]).

data_type: str
error: str
classmethod from_dict(d)[source]

Deserialise from a dictionary produced by to_dict().

Parameters:

d (dict[str, Any]) – Dictionary with data_type and error keys.

Return type:

BatchFailure

Returns:

A BatchFailure instance.

Example:

BatchFailure.from_dict({"data_type": "HR", "error": "timeout"})
to_dict()[source]

Serialise to a JSON-compatible dictionary.

Return type:

dict[str, Any]

Returns:

A plain dict with keys kind, data_type, and error.

Example:

BatchFailure("HR", "timeout").to_dict()
# → {"kind": "batch", "data_type": "HR", "error": "timeout"}
class src.importer.response.DuplicatePolicy(*values)[source]

Bases: Enum

Redis TimeSeries per-command duplicate-write strategy.

FIRST is used by etl() for initial imports so that re-running an export never overwrites existing data points. It is also used by retry_failed().

LAST is used by update() to overwrite existing data points with new values.

Example:

policy = DuplicatePolicy.FIRST
pipe.add(key="hr:start", timestamp=ts, value=v,
         duplicate_policy=policy.value)
FIRST = 'FIRST'
LAST = 'LAST'
class src.importer.response.RowFailure(data_type, row_index, start_error=None, end_error=None)[source]

Bases: object

Records a failed upload for a single DataFrame row.

Produced by upload_batch() when one or both TS.ADD commands for a specific row return a ResponseError inside the pipeline response (i.e. the connection itself stayed up, but that individual command failed).

Variables:
  • data_type – The data_type column value for this row, e.g. "HKQuantityTypeIdentifierHeartRate".

  • row_index – The pandas DataFrame index label of the failed row. Use df.loc[failure.row_index] to retrieve the full row.

  • start_error – Human-readable message from the ResponseError for the <type>:start command, or None if that command succeeded.

  • end_error – Human-readable message from the ResponseError for the <type>:end command, or None if that command succeeded.

Example:

f = RowFailure(data_type="HKQuantityTypeIdentifierHeartRate",
               row_index=42, start_error="TSDB: Duplicate")
__str__()[source]

Create human-readable string representation of this failure.

Return type:

str

Returns:

String in the form RowFailure(data_type=…, row_index=…, errors=[…]).

data_type: str
end_error: str | None = None
classmethod from_dict(d)[source]

Deserialise from a dictionary produced by to_dict().

Parameters:

d (dict[str, Any]) – Dictionary with data_type, row_index, start_error, and end_error keys.

Return type:

RowFailure

Returns:

A RowFailure instance.

Example:

RowFailure.from_dict({"data_type": "HR", "row_index": 0,
                      "start_error": None, "end_error": None})
row_index: Any
start_error: str | None = None
to_dict()[source]

Serialise to a JSON-compatible dictionary.

Return type:

dict[str, Any]

Returns:

A plain dict with keys kind, data_type, row_index, start_error, and end_error.

Example:

RowFailure("HR", 0, start_error="err").to_dict()
# → {"kind": "row", "data_type": "HR", "row_index": 0,
#    "start_error": "err", "end_error": None}
src.importer.response.UploadFailure = src.importer.response.RowFailure | src.importer.response.BatchFailure

Union type for a single upload failure — either a row-level or batch-level failure. Used as the element type of the failures list returned by _load() and stored on the importer instance.

src.importer.response.failures_from_json(text)[source]

Deserialise a JSON string produced by failures_to_json().

Parameters:

text (str) – JSON string as written by failures_to_json().

Return type:

list[RowFailure | BatchFailure]

Returns:

List of RowFailure and/or BatchFailure objects.

Raises:

ValueError – If an entry has an unknown kind value.

Example:

failures = failures_from_json(Path("failures.json").read_text())
src.importer.response.failures_to_json(failures)[source]

Serialise a list of UploadFailure objects to a JSON string.

Parameters:

failures (list[RowFailure | BatchFailure]) – List of RowFailure or BatchFailure objects.

Return type:

str

Returns:

A pretty-printed JSON string suitable for writing to disk.

Example:

text = failures_to_json([BatchFailure("HR", "timeout")])
Path("failures.json").write_text(text)

Transform

Apple Health DataFrame transformation pipeline.

Provides public function transform that handles necessary transformations of raw Apple Health export data.

All public and private functions in this module accept a df argument that refers to the same underlying DataFrame and mutate it in-place. No copies are created, keeping peak RAM proportional to the export size rather than a multiple of it.

src.importer.transform._drop_null_values(df)[source]

Drop rows with a NaN value field and log the count.

Mutates df in-place.

Note

If all rows have a NaN value, df will be empty after this call. Subsequent steps handle an empty DataFrame gracefully.

Parameters:

df (DataFrame) – Health records DataFrame; rows with a NaN value are removed.

Return type:

None

Example:

before = len(df)
_drop_null_values(df)
print(f"Dropped {before - len(df)} rows")
src.importer.transform._handle_categorical_units(df)[source]

Assign integer values and a sentinel unit to categorical records.

Rows without a unit value are treated as categorical. Their string value is resolved to a signed integer/NaN via _map_categories(), and their unit is set to CATEGORICAL. For type/value pairs in KNOWN_CATEGORY_TYPE_VIOLATIONS type is updated.

Note

A warning is logged (and the row left unmodified) if a type or value string is absent from the categorical registry.

Parameters:

df (DataFrame) – Health records DataFrame; the value and unit columns are mutated in-place for categorical rows.

Raises:
  • NotImplementedError – If NaN values are found in any column other than unit, indicating an unexpected schema change.

  • ValueError – If a row without a unit has a numeric value; only categorical string values are expected in that position.

Example:

:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

_handle_categorical_units(df) # Categorical rows now have integer values and unit == “Categorical”

src.importer.transform._map_categories(df, no_unit)[source]

Replace categorical string values with integer values in-place.

Note

The groupby() call creates a temporary copy of the categorical slice; the slice size is expected to be small relative to the full DataFrame.

Parameters:
  • df (DataFrame) – Health records DataFrame; the value column is mutated in-place for rows selected by no_unit.

  • no_unit (Series) – Boolean mask selecting categorical rows (those with no unit value).

Raises:

KeyError – If a type string is absent from CATEGORICAL_IDENTIFIER_MAPS, or if a value string is not a valid member name of the corresponding HKCategoryTypeIdentifier.

Example:

:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

_map_categories(df, df[“unit”].isna()) # df.loc[no_unit, “value”] now contains signed numbers

src.importer.transform._replace_known_violations(df)[source]

Replace faulty category identifiers.

Replaces type for HKCategoryTypeIdentifier if value better matches another identifier. Identifier/value combinations that should be replaced are kept in KNOWN_CATEGORY_TYPE_VIOLATIONS.

Return type:

None

src.importer.transform._timestamps_to_unix(series)[source]

Convert a datetime64 Series to Unix timestamps in whole seconds.

Divides the nanosecond epoch integer representation by 10⁹ using integer floor division to avoid floating-point rounding errors.

Parameters:

series (Series) – A datetime64[ns] pandas Series.

Return type:

Series

Returns:

An int64 Series of Unix timestamps in seconds.

Example:

unix_ts = _timestamps_to_unix(df["startDate"])
# unix_ts.dtype == int64
src.importer.transform.transform(df)[source]

Clean and reshape df in-place for upload to Redis TimeSeries.

Applies the following steps in order:

  1. Check input df sanity.

  2. Drop rows whose value field is NaN (_drop_null_values()).

  3. Resolve categorical string values to signed integers or NaN and assign the "Categorical" sentinel unit (_handle_categorical_units()).

  4. Convert startDate and endDate from str to Unix timestamps in whole seconds (_timestamps_to_unix()).

Note

Not thread-safe – all mutations are applied directly to the shared DataFrame without locking.

Parameters:

df (DataFrame) – The raw health records DataFrame as produced by the extract step; mutated in-place.

Return type:

None

Example:

transform(df)
# df["startDate"] and df["endDate"] are now int64 Unix timestamps