Tools
Tools: From Messy JSON to Health Insights: Building a Modern ETL Pipeline with DBT and BigQuery
2026-02-10
0 views
admin
The Architecture: From Raw Bytes to Dashboard ## Prerequisites ## Step 1: Ingesting the "Mess" with Cloud Functions ## Step 2: Normalization with DBT (The Magic Sauce) ## The Staging Model (stg_heart_rate.sql) ## Step 3: Production-Grade Patterns ## Step 4: Visualizing Trends in Looker ## Conclusion: Learning in Public Have you ever tried to export your health data from Google Health Connect? If you have, you know it’s a wild ride. You expect a clean spreadsheet, but what you get is a "JSON explosion"—nested objects, inconsistent timestamps, and more metadata than actual data. To turn this digital noise into actionable health trends, you need a robust ETL pipeline, professional Data Modeling, and a reliable BigQuery Data Warehouse. In this tutorial, we are going to build a modern data stack that transforms raw wearable telemetry into a clean, queryable format using DBT (Data Build Tool) and Google Cloud Functions. Before we dive into the code, let's visualize how the data flows from your wrist to your screen. We’ll use a "Medallion Architecture" (Bronze, Silver, Gold) to ensure data integrity. To follow along, you'll need: Google Health Connect exports are often dumped into a Cloud Storage bucket. We need a serverless trigger to catch these files and pipe them into BigQuery as raw JSON strings. Now that we have a mountain of JSON in our Bronze layer, we use DBT to flatten it. This is where we turn raw_content into actual columns like heart_rate, step_count, and sleep_duration. Using BigQuery's JSON_EXTRACT functions makes this incredibly efficient. While the above works for a hobby project, production environments require data quality tests, documentation, and snapshotting for slowly changing dimensions. If you're looking for more production-ready examples and advanced engineering patterns—such as handling late-arriving data or multi-tenant warehouse schemas—I highly recommend checking out the technical deep dives at WellAlly Tech Blog. They cover some fantastic "Day 2" operations that take your data engineering from "it works" to "it scales." With your Gold models ready (e.g., fct_daily_health_summary), connect BigQuery to Looker Studio. Suddenly, that messy JSON is a beautiful line graph showing how your resting heart rate drops after you actually start going to the gym! Building ETL pipelines isn't just about moving data; it's about making data useful. By using DBT with BigQuery, we’ve created a system that is: What are you building with your health data? Drop a comment below or share your DBT tips! Don't forget to subscribe for more data engineering adventures. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
graph TD A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions) B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)] C -->|dbt run| D[(BigQuery: Silver - Staging)] D -->|dbt test/build| E[(BigQuery: Gold - Metrics)] E -->|Visualize| F[Looker Studio] style B fill:#f9f,stroke:#333,stroke-width:2px style D fill:#bbf,stroke:#333,stroke-width:2px Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
graph TD A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions) B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)] C -->|dbt run| D[(BigQuery: Silver - Staging)] D -->|dbt test/build| E[(BigQuery: Gold - Metrics)] E -->|Visualize| F[Looker Studio] style B fill:#f9f,stroke:#333,stroke-width:2px style D fill:#bbf,stroke:#333,stroke-width:2px CODE_BLOCK:
graph TD A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions) B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)] C -->|dbt run| D[(BigQuery: Silver - Staging)] D -->|dbt test/build| E[(BigQuery: Gold - Metrics)] E -->|Visualize| F[Looker Studio] style B fill:#f9f,stroke:#333,stroke-width:2px style D fill:#bbf,stroke:#333,stroke-width:2px COMMAND_BLOCK:
import base64
import json
from google.cloud import bigquery # Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest' def ingest_health_data(event, context): """Triggered by a change to a Cloud Storage bucket.""" file_name = event['name'] print(f"Processing file: {file_name}") # Logic to read JSON from GCS and stream to BigQuery # We store the entire record as a single JSON column initially # to avoid schema evolution headaches. table_ref = client.dataset(dataset_id).table(table_id) table = client.get_table(table_ref) # Simplified example of streaming a row rows_to_insert = [ {"raw_content": json.dumps(event), "ingested_at": "AUTO"} ] errors = client.insert_rows_json(table, rows_to_insert) if not errors: print("New rows have been added.") else: print(f"Encountered errors while inserting rows: {errors}") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import base64
import json
from google.cloud import bigquery # Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest' def ingest_health_data(event, context): """Triggered by a change to a Cloud Storage bucket.""" file_name = event['name'] print(f"Processing file: {file_name}") # Logic to read JSON from GCS and stream to BigQuery # We store the entire record as a single JSON column initially # to avoid schema evolution headaches. table_ref = client.dataset(dataset_id).table(table_id) table = client.get_table(table_ref) # Simplified example of streaming a row rows_to_insert = [ {"raw_content": json.dumps(event), "ingested_at": "AUTO"} ] errors = client.insert_rows_json(table, rows_to_insert) if not errors: print("New rows have been added.") else: print(f"Encountered errors while inserting rows: {errors}") COMMAND_BLOCK:
import base64
import json
from google.cloud import bigquery # Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest' def ingest_health_data(event, context): """Triggered by a change to a Cloud Storage bucket.""" file_name = event['name'] print(f"Processing file: {file_name}") # Logic to read JSON from GCS and stream to BigQuery # We store the entire record as a single JSON column initially # to avoid schema evolution headaches. table_ref = client.dataset(dataset_id).table(table_id) table = client.get_table(table_ref) # Simplified example of streaming a row rows_to_insert = [ {"raw_content": json.dumps(event), "ingested_at": "AUTO"} ] errors = client.insert_rows_json(table, rows_to_insert) if not errors: print("New rows have been added.") else: print(f"Encountered errors while inserting rows: {errors}") CODE_BLOCK:
-- models/staging/stg_heart_rate.sql WITH raw_data AS ( SELECT JSON_EXTRACT_SCALAR(raw_content, '$.device_id') as device_id, JSON_EXTRACT(raw_content, '$.metrics.heart_rate') as hr_array, CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) as event_timestamp FROM {{ source('health_raw', 'health_connect_ingest') }}
) SELECT device_id, event_timestamp, -- Extracting nested JSON values CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) as bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) as hr_item
WHERE bpm IS NOT NULL Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
-- models/staging/stg_heart_rate.sql WITH raw_data AS ( SELECT JSON_EXTRACT_SCALAR(raw_content, '$.device_id') as device_id, JSON_EXTRACT(raw_content, '$.metrics.heart_rate') as hr_array, CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) as event_timestamp FROM {{ source('health_raw', 'health_connect_ingest') }}
) SELECT device_id, event_timestamp, -- Extracting nested JSON values CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) as bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) as hr_item
WHERE bpm IS NOT NULL CODE_BLOCK:
-- models/staging/stg_heart_rate.sql WITH raw_data AS ( SELECT JSON_EXTRACT_SCALAR(raw_content, '$.device_id') as device_id, JSON_EXTRACT(raw_content, '$.metrics.heart_rate') as hr_array, CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) as event_timestamp FROM {{ source('health_raw', 'health_connect_ingest') }}
) SELECT device_id, event_timestamp, -- Extracting nested JSON values CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) as bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) as hr_item
WHERE bpm IS NOT NULL - Google Cloud Platform (GCP) account with BigQuery enabled.
- DBT Core or DBT Cloud set up.
- The tech_stack: Python (for Cloud Functions), SQL (for DBT), and Looker for the final polish. - Select Project: Your GCP Project.
- Table: dbt_metrics.fct_daily_health_summary.
- Metrics: Create a Time Series chart with Avg(bpm) and Sum(steps). - Version Controlled: All logic lives in SQL files.
- Scalable: BigQuery handles the heavy lifting.
- Extensible: Want to add Oura Ring or Apple Watch data? Just add a new staging model.
how-totutorialguidedev.toaiserverpythongit