Importer¶
Importer¶
Health Data Importer.
Uploads Apple Health export.zip data to a local Redis TimeSeries database
via an Extract → Transform → Load (ETL) pipeline.
Typical usage:
importer = HealthDataImporter()
importer.etl(write_feather=True)
for f in importer.failures:
print(f)
# Retry only the failed data points (can be called in a new session)
remaining = importer.retry_failed(df)
# Overwrite existing data points with the latest values
importer.update()
- class src.importer.importer.HealthDataImporter(connection, data_dir='data', in_file='export.zip', working_dir=None, out_file='export.feather', failures_file='upload_failures.json')[source]¶
Bases:
objectImport Apple Health export data into Redis TimeSeries.
After calling
etl()orupdate(), any upload failures are accessible viafailuresand persisted tofailures_fileso thatretry_failed()can be called in a later Python session.- Parameters:
connection (
Redis) –redis.Redisto connect to.data_dir (
str) – Sub-directory (relative to working_dir) that holds data files.in_file (
str) – Name of the Apple Health ZIP export inside data_dir.working_dir (
Path|str|None) – Root directory; defaults to the current working directory.out_file (
str) – Name of the Feather cache file written to data_dir.failures_file (
str) – Name of the JSON file that persists upload failures between sessions.
Example:
# Uses the conventional data/export.zip layout importer = HealthDataImporter(connection=redis_connect()) importer.etl(write_feather=True) # Non-standard layout HealthDataImporter(data_dir="exports", in_file="2026-q1.zip", connection=redis_connect())
- _delete_failures_file()[source]¶
Delete
failures_fileif it exists.Called after a fully successful load or a fully successful retry to ensure no stale file misleads a future
retry_failed()call.Example:
importer._delete_failures_file()
- Return type:
- _extract(*, write_feather, no_cache)[source]¶
Parse the Apple Health export and return a raw DataFrame.
Prefers the Feather cache at
output_fileto avoid re-running the slow XML-to-Feather conversion. Falls back to the ZIP export atzip_fileif no cache exists.- Parameters:
- Return type:
- Returns:
Raw health records as a
DataFrame.- Raises:
FileNotFoundError – When neither the Feather cache nor the source ZIP can be found.
Example:
df = importer._extract(write_feather=True, no_cache=False)
- _read_failures_file()[source]¶
Read and deserialise the failures file.
- Return type:
- Returns:
List of
UploadFailureobjects.- Raises:
FileNotFoundError – If
failures_filedoes not exist.ValueError – If the file contains an unrecognised
kindvalue.
Example:
failures = importer._read_failures_file()
- _write_failures_file(failures)[source]¶
Serialise failures and write them to
failures_file.Overwrites any existing file so the file always reflects the current state.
- Parameters:
failures (
list[RowFailure|BatchFailure]) – List ofUploadFailureobjects to persist.- Return type:
Example:
importer._write_failures_file(importer.failures)
- etl(*, write_feather=False, persist_failures=True, no_cache=False)[source]¶
Run the full Extract → Transform → Load pipeline.
Uses
DuplicatePolicy.Firstso that re-running the same export never overwrites existing data points. To overwrite, useupdate()instead.When setting persist_failures=True any upload failures overwrite
failures_fileso thatretry_failed()can be called in another Python session:importer.etl(persist_failures=True) # failures are persisted at data/upload_failures.json and as # instance attribute if importer.failures: importer.retry_failed(df)
- Parameters:
write_feather (
bool) – Persist the parsed data as a Feather cache so that subsequent runs skip the slow XML extraction step.persist_failures (
bool) – Persist a file that contains which data could not be uploaded as a JSON file.no_cache (
bool) – If True, ignores pre-existing cache file and reads ZIP input.
- Raises:
FileNotFoundError – When neither the Feather cache nor the source ZIP can be found.
NotImplementedError – If
NaNvalues are found in any column other thanunit, indicating an unexpected schema change.ValueError – If a row without a unit has a numeric
value; only categorical string values are expected in that position.
Example:
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
importer.etl(write_feather=True)
- retry_failed(*, persist_failures=True)[source]¶
Re-attempt uploading every data point recorded in
failures_file.Reads the failures JSON file written by the most recent
etl(),update(), or previousretry_failed()call which overwritesfailures. This method can be called in a completely separate Python session as long as the failures file and the Feather cache still exist:# New session — no need to re-run etl() importer = HealthDataImporter() df = feather.read_feather("data/export.feather") importer.retry_failed()
Retry behaviour:
Loads dataframe from self.out_file and finds entries that failed to upload, and passes this subset to
_load().After the retry:
If all previously failed data points now succeed, the failures file is deleted.
If some failures remain, the file is overwritten with only the still-failing entries.
- Parameters:
persist_failures (
bool) – Persist a file that contains which data could not be uploaded as a JSON file.- Return type:
- Raises::
- FileNotFoundError: When neither the Feather cache nor the source
ZIP can be found, or if
failures_filedoes not exist.- NotImplementedError: If
NaNvalues are found in any column other than
unit, indicating an unexpected schema change.- ValueError: If a row without a unit has a numeric
value; only categorical string values are expected in that position.
Example:
importer.retry_failed() if not importer.failures: print("All failures resolved.")
- update(*, write_feather=False, persist_failures=True, no_cache=False)[source]¶
Re-import the export, overwriting existing data points.
Identical to
etl()except it usesDuplicatePolicy.LAST, which means any timestamp that already exists in Redis is overwritten with the new value rather than kept.- Parameters:
- Raises:
FileNotFoundError – When neither the Feather cache nor the source ZIP can be found.
NotImplementedError – If
NaNvalues are found in any column other thanunit, indicating an unexpected schema change.ValueError – If a row without a unit has a numeric
value; only categorical string values are expected in that position.
Example:
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
importer.update()
- src.importer.importer._load(df, r, duplicate_policy=DuplicatePolicy.FIRST)[source]¶
Batch-upload all records to Redis TimeSeries.
Each unique
typeis uploaded in its own pipeline transaction so that a failure in one type does not abort the others. Returns a list ofUploadFailureobjects.- Parameters:
df (
DataFrame) – Transformed health records.r (
Redis) – Active Redis connection.duplicate_policy (
DuplicatePolicy) –FIRST(default, used byetl()andretry_failed()) orLAST(used byupdate()).
- Return type:
- Returns:
A new list of UploadFailure objects. Empty if all data points were successfully uploaded.
Example:
failures = importer._load(df, importer.connection, DuplicatePolicy.LAST)
Parser¶
Parser for Apple Health export files.
Reads the export.zip produced by the Apple Health app and returns a single tidy DataFrame that can be saved to by the caller.
- exception src.importer.parser.NoHealthDataError[source]¶
Bases:
ValueErrorRaised when trying to parse a zip file without usable data.
- src.importer.parser.parse_apple_health(zip_path)[source]¶
Parse an Apple Health export archive into a DataFrame.
Reads all Record elements from the export.xml inside the given export.zip and returns them as a single tidy DataFrame with timezone-aware datetime columns. The output schema is a superset of the one produced by apple-health-exporter’s health_xml_to_feather, adding device, sourceVersion, and creationDate columns.
- Parameters:
zip_path (
str|Path) – Path to the export.zip file generated by the Apple Health app.- Returns:
type (str): The HealthKit record type, e.g.
HKQuantityTypeIdentifierHeartRate.sourceName (str): Human-readable name of the recording source, e.g.
"Apple Watch".sourceVersion (str): Version string of the recording source app.
device (str): Full device identifier string as reported by HealthKit.
unit (str): Unit of measurement, e.g.
"count/min"or"kg".startDate (datetime64[ns]): Start of the recorded interval.
endDate (datetime64[ns]): End of the recorded interval.
creationDate (datetime64[ns]): When the record was created
value (str): The recorded value as a string, e.g.
"72"or"5.6". Cast to a numeric type by the caller if needed.
- Return type:
- Raises:
NoHealthDataError – If the zip_path file contains none of the
_COLUMNSdata.
Example
>>> df = parse_apple_health("export.zip") >>> df.to_feather("data.feather") >>> heart_rate = df[df["type"] == "HKQuantityTypeIdentifierHeartRate"] >>> heart_rate["value"].astype(float).mean()
Pipeline¶
Redis TimeSeries pipeline upload helpers.
All writes to Redis TimeSeries are batched through a single
Pipeline per data-type call, reducing
round-trips to one network exchange per type regardless of row count.
Importantly execute() is called with
raise_on_error=False. This means individual TS.ADD command failures
are surfaced as ResponseError objects inside the
response list rather than raised as exceptions, allowing the response-inspection
loop in _resolve_failures() to produce granular RowFailure
entries instead of collapsing an entire batch into a single
BatchFailure for even a single failure.
A BatchFailure is still produced when the pipeline itself
raises a RedisError (connection loss, auth
failure, etc.) — in that case no per-row response is available.
Provides upload_batch() for public use.
- class src.importer.pipeline._HealthRow(*args, **kwargs)[source]¶
Bases:
ProtocolRow attributes consumed by
_add_row_to_pipeline().Structural type matching what
df.itertuples()yields for a well-formed health DataFrame. Only the attributes actually read are required; extra attributes on the namedtuple are ignored.- _abc_impl = <_abc._abc_data object>¶
- _is_protocol = True¶
- src.importer.pipeline._add_row_to_pipeline(pipe, row, duplicate_policy=DuplicatePolicy.FIRST)[source]¶
Queue a single health record’s start and end
TS.ADDcommands on pipe.Two commands are issued per record:
<type>:start— timestamped atstartDate<type>:end— timestamped atendDate
The commands are not executed until
execute()is called by the caller, so this function is always O(1) and performs no network I/O.- Parameters:
pipe (
Pipeline) – An open Redis TimeSeries pipeline.row (
_HealthRow) – A singleitertuplesrow from the health records DataFrame. Expected named attributes:type,startDate,endDate,value,sourceName,unit.duplicate_policy (
DuplicatePolicy) – Per-command write-conflict strategy forwarded to everyTS.ADDcall.
- Return type:
Example:
pipe = rts.pipeline() for row in df.itertuples(): _add_row_to_pipeline(pipe, row, DuplicatePolicy.FIRST) pipe.execute(raise_on_error=False)
- src.importer.pipeline._resolve_failures(response, df)[source]¶
Inspect a pipeline response and record per-row failures.
Maps the flat response list back to individual rows using the
response[2i] / response[2i+1]convention (start / end command per row i). Successful commands return anint(the stored timestamp); failed commands return aResponseErrorobject when the pipeline was executed withraise_on_error=False.- Parameters:
- Return type:
- Returns:
List of
RowFailureobjects. Empty on full success.- Raises:
IndexError if response and df don't have the same length –
Example:
pipe = rts.pipeline() # … queue commands … response = pipe.execute(raise_on_error=False) failures = _resolve_failures(response, df)
- src.importer.pipeline.upload_batch(rts, df, duplicate_policy=DuplicatePolicy.FIRST)[source]¶
Upload data slice to Redis TimeSeries via a single pipeline.
All
TS.ADDcommands for the slice are batched into onePipelineand flushed in a single network round-trip.Pipeline.execute()is called withraise_on_error=Falseso that individual command failures are returned asResponseErrorobjects in the response list rather than raised as exceptions, enabling granularRowFailurereporting.Failure semantics:
Connection-level failure (
RedisErrorraised byPipeline.execute()): Error is handled by caller.Command-level failure (
ResponseErrorobjects in the response list): oneRowFailureper affected row, with the error message(s) for its start and/or end command.
Response mapping (two
TS.ADDcommands per row):response[2i] → start command for row i response[2i+1] → end command for row i
- Parameters:
rts (
TimeSeries) – Redis TimeSeries client.df (
DataFrame) – Health records DataFrame for a batch.duplicate_policy (
DuplicatePolicy) – Per-command write-conflict strategy forwarded to everypipe.add()call.
- Return type:
- Returns:
List of
RowFailureobjects. Empty on full success.
Example:
r = redis_connect() rts = r.ts() row_failures = upload_batch(rts, df)
Response¶
Domain models for upload failure tracking and duplicate-write policy.
This module defines:
DuplicatePolicy— Redis TimeSeries write-conflict strategy.RowFailure— a failed upload for a single DataFrame row.BatchFailure— a failed upload for an entire data-type batch.UploadFailure— union alias consumed by the pipeline and importer.failures_to_json()/failures_from_json()— persistence helpers.
- class src.importer.response.BatchFailure(data_type, error)[source]¶
Bases:
objectRecords a failed upload for an entire data-type batch.
Produced by
upload_batch()when the pipeline itself raises aRedisError(e.g. a connection failure, authentication error, or server-side crash). In this case no row-level response is available, so the entire batch is marked failed.- Variables:
data_type – The
data_typecolumn value for this batch, e.g."HKQuantityTypeIdentifierHeartRate".error – Human-readable message from the
RedisErrorthat was raised.
Example:
f = BatchFailure(data_type="HKQuantityTypeIdentifierHeartRate", error="Connection reset by peer")
- __str__()[source]¶
Create human-readable string representation of this failure.
- Return type:
- Returns:
String in the form BatchFailure(data_type=…, errors=[…]).
- class src.importer.response.DuplicatePolicy(*values)[source]¶
Bases:
EnumRedis TimeSeries per-command duplicate-write strategy.
FIRSTis used byetl()for initial imports so that re-running an export never overwrites existing data points. It is also used byretry_failed().LASTis used byupdate()to overwrite existing data points with new values.Example:
policy = DuplicatePolicy.FIRST pipe.add(key="hr:start", timestamp=ts, value=v, duplicate_policy=policy.value)
- FIRST = 'FIRST'¶
- LAST = 'LAST'¶
- class src.importer.response.RowFailure(data_type, row_index, start_error=None, end_error=None)[source]¶
Bases:
objectRecords a failed upload for a single DataFrame row.
Produced by
upload_batch()when one or bothTS.ADDcommands for a specific row return aResponseErrorinside the pipeline response (i.e. the connection itself stayed up, but that individual command failed).- Variables:
data_type – The
data_typecolumn value for this row, e.g."HKQuantityTypeIdentifierHeartRate".row_index – The pandas DataFrame index label of the failed row. Use
df.loc[failure.row_index]to retrieve the full row.start_error – Human-readable message from the
ResponseErrorfor the<type>:startcommand, orNoneif that command succeeded.end_error – Human-readable message from the
ResponseErrorfor the<type>:endcommand, orNoneif that command succeeded.
Example:
f = RowFailure(data_type="HKQuantityTypeIdentifierHeartRate", row_index=42, start_error="TSDB: Duplicate")
- __str__()[source]¶
Create human-readable string representation of this failure.
- Return type:
- Returns:
String in the form RowFailure(data_type=…, row_index=…, errors=[…]).
- classmethod from_dict(d)[source]¶
Deserialise from a dictionary produced by
to_dict().- Parameters:
d (
dict[str,Any]) – Dictionary withdata_type,row_index,start_error, andend_errorkeys.- Return type:
- Returns:
A
RowFailureinstance.
Example:
RowFailure.from_dict({"data_type": "HR", "row_index": 0, "start_error": None, "end_error": None})
- to_dict()[source]¶
Serialise to a JSON-compatible dictionary.
- Return type:
- Returns:
A plain
dictwith keyskind,data_type,row_index,start_error, andend_error.
Example:
RowFailure("HR", 0, start_error="err").to_dict() # → {"kind": "row", "data_type": "HR", "row_index": 0, # "start_error": "err", "end_error": None}
- src.importer.response.UploadFailure = src.importer.response.RowFailure | src.importer.response.BatchFailure¶
Union type for a single upload failure — either a row-level or batch-level failure. Used as the element type of the failures list returned by
_load()and stored on the importer instance.
- src.importer.response.failures_from_json(text)[source]¶
Deserialise a JSON string produced by
failures_to_json().- Parameters:
text (
str) – JSON string as written byfailures_to_json().- Return type:
- Returns:
List of
RowFailureand/orBatchFailureobjects.- Raises:
ValueError – If an entry has an unknown
kindvalue.
Example:
failures = failures_from_json(Path("failures.json").read_text())
- src.importer.response.failures_to_json(failures)[source]¶
Serialise a list of
UploadFailureobjects to a JSON string.- Parameters:
failures (
list[RowFailure|BatchFailure]) – List ofRowFailureorBatchFailureobjects.- Return type:
- Returns:
A pretty-printed JSON string suitable for writing to disk.
Example:
text = failures_to_json([BatchFailure("HR", "timeout")]) Path("failures.json").write_text(text)
Transform¶
Apple Health DataFrame transformation pipeline.
Provides public function transform that handles necessary transformations of raw Apple Health export data.
All public and private functions in this module accept a df argument that
refers to the same underlying DataFrame and mutate it
in-place. No copies are created, keeping peak RAM proportional to the
export size rather than a multiple of it.
- src.importer.transform._drop_null_values(df)[source]¶
Drop rows with a
NaNvaluefield and log the count.Mutates df in-place.
Note
If all rows have a
NaNvalue, df will be empty after this call. Subsequent steps handle an empty DataFrame gracefully.- Parameters:
df (
DataFrame) – Health records DataFrame; rows with aNaNvalueare removed.- Return type:
Example:
before = len(df) _drop_null_values(df) print(f"Dropped {before - len(df)} rows")
- src.importer.transform._handle_categorical_units(df)[source]¶
Assign integer values and a sentinel unit to categorical records.
Rows without a
unitvalue are treated as categorical. Their stringvalueis resolved to a signed integer/NaN via_map_categories(), and theirunitis set toCATEGORICAL. For type/value pairs inKNOWN_CATEGORY_TYPE_VIOLATIONStype is updated.Note
A warning is logged (and the row left unmodified) if a
typeorvaluestring is absent from the categorical registry.- Parameters:
df (
DataFrame) – Health records DataFrame; thevalueandunitcolumns are mutated in-place for categorical rows.- Raises:
NotImplementedError – If
NaNvalues are found in any column other thanunit, indicating an unexpected schema change.ValueError – If a row without a unit has a numeric
value; only categorical string values are expected in that position.
Example:
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
_handle_categorical_units(df) # Categorical rows now have integer values and unit == “Categorical”
- src.importer.transform._map_categories(df, no_unit)[source]¶
Replace categorical string values with integer values in-place.
Note
The
groupby()call creates a temporary copy of the categorical slice; the slice size is expected to be small relative to the full DataFrame.- Parameters:
- Raises:
KeyError – If a
typestring is absent fromCATEGORICAL_IDENTIFIER_MAPS, or if avaluestring is not a valid member name of the correspondingHKCategoryTypeIdentifier.
Example:
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
_map_categories(df, df[“unit”].isna()) # df.loc[no_unit, “value”] now contains signed numbers
- src.importer.transform._replace_known_violations(df)[source]¶
Replace faulty category identifiers.
Replaces type for
HKCategoryTypeIdentifierif value better matches another identifier. Identifier/value combinations that should be replaced are kept inKNOWN_CATEGORY_TYPE_VIOLATIONS.- Return type:
- src.importer.transform._timestamps_to_unix(series)[source]¶
Convert a
datetime64Series to Unix timestamps in whole seconds.Divides the nanosecond epoch integer representation by 10⁹ using integer floor division to avoid floating-point rounding errors.
- Parameters:
series (
Series) – Adatetime64[ns]pandas Series.- Return type:
- Returns:
An
int64Series of Unix timestamps in seconds.
Example:
unix_ts = _timestamps_to_unix(df["startDate"]) # unix_ts.dtype == int64
- src.importer.transform.transform(df)[source]¶
Clean and reshape df in-place for upload to Redis TimeSeries.
Applies the following steps in order:
Check input df sanity.
Drop rows whose
valuefield isNaN(_drop_null_values()).Resolve categorical string values to signed integers or NaN and assign the
"Categorical"sentinel unit (_handle_categorical_units()).Convert
startDateandendDatefromstrto Unix timestamps in whole seconds (_timestamps_to_unix()).
Note
Not thread-safe – all mutations are applied directly to the shared DataFrame without locking.
- Parameters:
df (
DataFrame) – The raw health records DataFrame as produced by the extract step; mutated in-place.- Return type:
Example:
transform(df) # df["startDate"] and df["endDate"] are now int64 Unix timestamps