Tools
Tools: Building a Supermarket Data Pipeline
2026-02-05
0 views
admin
How I Built an Automated System That Turns Messy Sales Data Into Business Gold ## The Problem: Data Drowning ## The Solution: An Automated Data Factory ## How It Works ## Step 1: Extraction: "Fishing for Data" ## Step 2: Transformation: "The Car Wash for Data" ## Step 3: Loading: "Two Warehouses, Two Purposes" ## PostgreSQL: The Library ## MongoDB: The Flexible Warehouse ## How It Works (The Technical Deep-Dive) ## Architecture Overview ## The Modular Design Philosophy ## The Code Walkthrough ## Extraction: Pandas Does the Heavy Lifting ## Transformation: Clean Data or Bust ## Loading: Two Paths, One Pipeline ## Docker: The "It Works on My Machine" Killer ## The docker-compose.yml Magic ## Key Lessons & Design Decisions ## Why Two Databases? ## Why Python? ## Why Modular Design? ## Future Enhancements ## Conclusion Ever wonder how your favorite supermarket knows exactly when to restock the shelves, which products are flying off the racks, or why they always seem to have your favorite snacks in stock? The secret lies in data pipelines, and I built one from scratch. Imagine you're the manager of a busy supermarket(e.g., Naivas). Every single day, thousands of transactions flow through your registers, customers buying milk, bread, snacks, cleaning supplies. Each transaction generates a line of data: who bought what, how much they paid, and how they paid. Now here's the challenge: all this data is sitting in a messy Google spreadsheet, updated by cashiers in real-time. It's like having a river of gold nuggets flowing past you, but no way to catch them. The questions that keep you up at night: This is exactly the problem I solved with the Supermarket ETL Pipeline. Think of my solution like a water treatment plant for data: Capture the source Google Sheet showing raw transaction data with columns like id, quantity, product_name, total_amount, payment_method, customer_type. Show some messy/duplicate rows if possible. My pipeline starts by reaching out to Google Sheets, think of it like casting a fishing net into a lake. The spreadsheet contains raw transaction records: every purchase, every customer, every payment. Why Google Sheets? Because it's where real businesses often keep their data, it's accessible, shareable, and doesn't require expensive software. Terminal showing extraction logs Capture the terminal output showing: "Starting extraction from Google Sheets" and "Extracted X rows" messages. Raw data is messy. Imagine every car that comes through a car wash covered in mud, leaves, and bird droppings. The transformation stage is my car wash, it takes dirty data and makes it sparkle. The pipeline keeps only what matters: Here's where it gets interesting. Instead of storing data in just one place, I built a dual-database strategy. Think of it like having two different storage facilities: PostgreSQL is like a meticulously organized library. Every book (data record) has its place, follows strict rules, and can be cross-referenced with other books easily. MongoDB is like a modern warehouse with adjustable shelving. You can store items of different shapes and sizes without reorganizing everything. Docker containers running For my fellow engineers, let's pop the hood and look at the engine. Project folder structure Instead of one giant script, I split the pipeline into specialized modules, like having different specialists in a hospital: The magic: Google Sheets can export any public sheet as CSV. Pandas reads it directly from the URL, no authentication needed for public sheets! PostgreSQL with SQLAlchemy: MongoDB with PyMongo: One of the biggest headaches in software is environment setup. "It works on my machine!" is the developer's equivalent of "the dog ate my homework." Docker solves this by containerizing everything. My entire stack, Python app, PostgreSQL, MongoDB runs in isolated containers that work identically on any machine. To run the entire system: Think of it like LEGO blocks. Each module is a self-contained piece that: This pipeline is production-ready, but here's what could come next: Building this ETL pipeline taught me that good data engineering is invisible. When it works, nobody notices, the reports are accurate, the app loads fast, and decisions get made with confidence. But behind that invisibility is careful architecture: modular code, dual-database strategy, containerized deployment, and clean data transformations. Whether you're a business analyst who just wants clean data, or an engineer looking to build your own pipeline, I hope this walkthrough demystified the magic behind turning chaotic spreadsheets into business intelligence gold. The supermarket never runs out of your favorite snacks because somewhere, a data pipeline is quietly doing its job. If you're interested in the code, check out the repository here:GitHub Repo Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
The Pipeline says: "Hey Google, give me all the sales data!"
Google responds: "Here's 1,000 rows of transactions!" Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
The Pipeline says: "Hey Google, give me all the sales data!"
Google responds: "Here's 1,000 rows of transactions!" CODE_BLOCK:
The Pipeline says: "Hey Google, give me all the sales data!"
Google responds: "Here's 1,000 rows of transactions!" CODE_BLOCK:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Google Sheets │────▶│ Python ETL │────▶│ PostgreSQL │
│ (Data Source) │ │ (Container) │ │ (Relational) │
└─────────────────┘ │ │ └─────────────────┘ │ • Extract │ │ • Transform │ ┌─────────────────┐ │ • Load │────▶│ MongoDB │ └─────────────────┘ │ (Document) │ └─────────────────┘ Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Google Sheets │────▶│ Python ETL │────▶│ PostgreSQL │
│ (Data Source) │ │ (Container) │ │ (Relational) │
└─────────────────┘ │ │ └─────────────────┘ │ • Extract │ │ • Transform │ ┌─────────────────┐ │ • Load │────▶│ MongoDB │ └─────────────────┘ │ (Document) │ └─────────────────┘ CODE_BLOCK:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Google Sheets │────▶│ Python ETL │────▶│ PostgreSQL │
│ (Data Source) │ │ (Container) │ │ (Relational) │
└─────────────────┘ │ │ └─────────────────┘ │ • Extract │ │ • Transform │ ┌─────────────────┐ │ • Load │────▶│ MongoDB │ └─────────────────┘ │ (Document) │ └─────────────────┘ COMMAND_BLOCK:
from etl_pipeline.config import Config
from etl_pipeline.extract import extract_data
from etl_pipeline.transform import transform_data
from etl_pipeline.load_postgres import load_to_postgres
from etl_pipeline.load_mongo import load_to_mongo
import sys
import logging # Configure logging to stdout
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)]
) def main(): logging.info("ETL Application pipeline initialized.") # 1. Extract try: if Config.DATA_SOURCE_TYPE == "sheets": logging.info(f"Starting extraction from Google Sheets (ID: {Config.GOOGLE_SHEET_ID})") # Extract data data = extract_data( source_type="sheets", sheet_id=Config.GOOGLE_SHEET_ID ) else: logging.error(f"Unknown data source: {Config.DATA_SOURCE_TYPE}") return logging.info(f"Extracted {len(data)} rows.") # 2. Transform logging.info("Step 2: Transform") transformed_data = transform_data(data) logging.info(f"Transformed Data Shape: {transformed_data.shape}") # 3. Load to PostgreSQL logging.info("Step 3: Load to PostgreSQL") load_to_postgres(transformed_data, Config.POSTGRES_URL) # Load to MongoDB logging.info("Step 4: Load to MongoDB") load_to_mongo( transformed_data, Config.MONGO_URI, Config.MONGO_DB ) logging.info("\nETL pipeline completed successfully.") except Exception as e: logging.critical(f"ETL failed: {e}") if __name__ == "__main__": main() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
from etl_pipeline.config import Config
from etl_pipeline.extract import extract_data
from etl_pipeline.transform import transform_data
from etl_pipeline.load_postgres import load_to_postgres
from etl_pipeline.load_mongo import load_to_mongo
import sys
import logging # Configure logging to stdout
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)]
) def main(): logging.info("ETL Application pipeline initialized.") # 1. Extract try: if Config.DATA_SOURCE_TYPE == "sheets": logging.info(f"Starting extraction from Google Sheets (ID: {Config.GOOGLE_SHEET_ID})") # Extract data data = extract_data( source_type="sheets", sheet_id=Config.GOOGLE_SHEET_ID ) else: logging.error(f"Unknown data source: {Config.DATA_SOURCE_TYPE}") return logging.info(f"Extracted {len(data)} rows.") # 2. Transform logging.info("Step 2: Transform") transformed_data = transform_data(data) logging.info(f"Transformed Data Shape: {transformed_data.shape}") # 3. Load to PostgreSQL logging.info("Step 3: Load to PostgreSQL") load_to_postgres(transformed_data, Config.POSTGRES_URL) # Load to MongoDB logging.info("Step 4: Load to MongoDB") load_to_mongo( transformed_data, Config.MONGO_URI, Config.MONGO_DB ) logging.info("\nETL pipeline completed successfully.") except Exception as e: logging.critical(f"ETL failed: {e}") if __name__ == "__main__": main() COMMAND_BLOCK:
from etl_pipeline.config import Config
from etl_pipeline.extract import extract_data
from etl_pipeline.transform import transform_data
from etl_pipeline.load_postgres import load_to_postgres
from etl_pipeline.load_mongo import load_to_mongo
import sys
import logging # Configure logging to stdout
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)]
) def main(): logging.info("ETL Application pipeline initialized.") # 1. Extract try: if Config.DATA_SOURCE_TYPE == "sheets": logging.info(f"Starting extraction from Google Sheets (ID: {Config.GOOGLE_SHEET_ID})") # Extract data data = extract_data( source_type="sheets", sheet_id=Config.GOOGLE_SHEET_ID ) else: logging.error(f"Unknown data source: {Config.DATA_SOURCE_TYPE}") return logging.info(f"Extracted {len(data)} rows.") # 2. Transform logging.info("Step 2: Transform") transformed_data = transform_data(data) logging.info(f"Transformed Data Shape: {transformed_data.shape}") # 3. Load to PostgreSQL logging.info("Step 3: Load to PostgreSQL") load_to_postgres(transformed_data, Config.POSTGRES_URL) # Load to MongoDB logging.info("Step 4: Load to MongoDB") load_to_mongo( transformed_data, Config.MONGO_URI, Config.MONGO_DB ) logging.info("\nETL pipeline completed successfully.") except Exception as e: logging.critical(f"ETL failed: {e}") if __name__ == "__main__": main() CODE_BLOCK:
def extract_from_public_sheet(sheet_id): export_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv" df = pd.read_csv(export_url) return df Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
def extract_from_public_sheet(sheet_id): export_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv" df = pd.read_csv(export_url) return df CODE_BLOCK:
def extract_from_public_sheet(sheet_id): export_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv" df = pd.read_csv(export_url) return df CODE_BLOCK:
def transform_data(df): required_columns = ["id", "quantity", "product_name", "total_amount", "payment_method", "customer_type"] df_transformed = df[required_columns].copy() df_transformed.drop_duplicates(subset=['id'], inplace=True) df_transformed.dropna(subset=['id'], inplace=True) return df_transformed Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
def transform_data(df): required_columns = ["id", "quantity", "product_name", "total_amount", "payment_method", "customer_type"] df_transformed = df[required_columns].copy() df_transformed.drop_duplicates(subset=['id'], inplace=True) df_transformed.dropna(subset=['id'], inplace=True) return df_transformed CODE_BLOCK:
def transform_data(df): required_columns = ["id", "quantity", "product_name", "total_amount", "payment_method", "customer_type"] df_transformed = df[required_columns].copy() df_transformed.drop_duplicates(subset=['id'], inplace=True) df_transformed.dropna(subset=['id'], inplace=True) return df_transformed CODE_BLOCK:
def load_to_postgres(df, db_url, table_name="transactions"): engine = create_engine(db_url) df.to_sql(table_name, engine, if_exists='replace', index=False) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
def load_to_postgres(df, db_url, table_name="transactions"): engine = create_engine(db_url) df.to_sql(table_name, engine, if_exists='replace', index=False) CODE_BLOCK:
def load_to_postgres(df, db_url, table_name="transactions"): engine = create_engine(db_url) df.to_sql(table_name, engine, if_exists='replace', index=False) CODE_BLOCK:
def load_to_mongo(df, mongo_uri, db_name, collection_name="transactions"): client = MongoClient(mongo_uri) collection = client[db_name][collection_name] records = df.to_dict("records") collection.insert_many(records) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
def load_to_mongo(df, mongo_uri, db_name, collection_name="transactions"): client = MongoClient(mongo_uri) collection = client[db_name][collection_name] records = df.to_dict("records") collection.insert_many(records) CODE_BLOCK:
def load_to_mongo(df, mongo_uri, db_name, collection_name="transactions"): client = MongoClient(mongo_uri) collection = client[db_name][collection_name] records = df.to_dict("records") collection.insert_many(records) COMMAND_BLOCK:
services: postgres: image: postgres:15 # PostgreSQL runs in its own container mongo: image: mongo:6 # MongoDB runs in its own container etl-app: build: . depends_on: - postgres - mongo # My Python app waits for databases to be ready Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
services: postgres: image: postgres:15 # PostgreSQL runs in its own container mongo: image: mongo:6 # MongoDB runs in its own container etl-app: build: . depends_on: - postgres - mongo # My Python app waits for databases to be ready COMMAND_BLOCK:
services: postgres: image: postgres:15 # PostgreSQL runs in its own container mongo: image: mongo:6 # MongoDB runs in its own container etl-app: build: . depends_on: - postgres - mongo # My Python app waits for databases to be ready COMMAND_BLOCK:
docker compose up -d --build
docker compose exec etl-app python main.py Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
docker compose up -d --build
docker compose exec etl-app python main.py COMMAND_BLOCK:
docker compose up -d --build
docker compose exec etl-app python main.py - Which products are selling the most?
- What payment methods do customers prefer?
- Are there duplicate transactions messing up your accounting?
- How can you make this data useful for reports AND for your mobile app? - id — Unique transaction identifier
- quantity — How many items purchased
- product_name — What was bought
- total_amount — How much was paid
- payment_method — Cash, card, or digital
- customer_type — Member or regular customer - Financial reports ("How much revenue did we make last month?")
- Accounting audits (data integrity is guaranteed)
- Complex queries ("Show me all cash transactions over $100 from member customers") - Mobile app backends (JSON-friendly)
- Rapid prototyping ("Let's quickly add a new field!")
- Analytics dashboards (flexible data exploration) - Testability: I can test the transformation logic without needing a database connection
- Maintainability: Changing the data source doesn't break the loading logic
- Scalability: Adding a new destination (like Snowflake) is just adding one new file - Only keep essential columns (data minimization)
- Remove duplicates by transaction ID (data integrity)
- Drop rows with missing IDs (no orphan records) - Pandas: Industry-standard for data manipulation
- SQLAlchemy: ORM that prevents SQL injection
- PyMongo: Lightweight MongoDB driver
- Rich ecosystem: Libraries for everything - Can be tested independently
- Can be replaced without breaking others
- Makes debugging a breeze - Scheduling: Run automatically every hour with Apache Airflow or cron
- Message Queues: Use Kafka/RabbitMQ for async processing at scale
- Data Validation: Add Great Expectations for data quality checks
- Monitoring: Add Prometheus/Grafana for pipeline observability
- More Sources: Extend to pull from APIs, S3, or other databases
how-totutorialguidedev.toaimlcronpostgresqlapachedockerpythondatabasegitgithub