Build a Data Pipeline: Data Science Capstone Project

Remember when data just... existed? You grabbed a CSV, opened it in Excel, and hoped for the best?
Those days are behind you now. In the real world, data lives everywhere, streaming from APIs, sitting in databases, arriving in files from partners. And it's messy. Duplicates, missing values, inconsistent formats, outliers that make you question everything.
This is where data pipelines come in. They're the backbone of every serious data science operation: systems that ingest, validate, clean, transform, and export data in a reliable, repeatable way. Think of it as an assembly line for data. Raw materials go in one end; polished, analysis-ready output comes out the other.
This capstone project pulls together everything from Cluster 7, NumPy, Pandas, Matplotlib, Seaborn, scipy, and EDA. You'll build a production-ready pipeline that ingests data from multiple sources, validates it, cleans it, and exports it in a format ready for machine learning. You'll add logging, error handling, testing, and orchestration. By the end, you'll have a real system you could drop into a data science team and they'd actually use.
Let's build it.
Table of Contents
- Why Pipelines Matter (More Than You Think)
- Pipeline Architecture: The Blueprint
- Our Project: A Sales Data Pipeline
- Step 1: Set Up the Project Structure
- Step 2: Configure Logging
- Step 3: Ingest Data from Multiple Sources
- Step 4: Validate Data Structure
- Step 5: Clean and Transform
- Step 6: Aggregate and Export
- Step 7: Orchestrate the Pipeline
- Step 8: Add Incremental Processing
- Step 9: Write Tests
- Running It All Together
- Key Takeaways
- Common Mistakes to Avoid
- What's Next
Why Pipelines Matter (More Than You Think)
Here's the reality: data science projects fail not because the models are wrong, but because the data pipeline is fragile. You spend 80% of your time preparing data. If that process breaks, your entire project stops. Think about it: you build a beautiful machine learning model that achieves 95% accuracy. Then the data stops flowing for a day because someone renamed a column in the source system. Your model can't run. Your insights are delayed. Your stakeholders are frustrated.
This happens constantly in the real world because data engineers and data scientists work in silos. The engineer assumes the scientist knows what format the data will be in. The scientist assumes the engineer won't change anything without notice. Nobody wins.
A good pipeline solves this by making data preparation explicit, repeatable, and resilient:
- Handles failures gracefully – Missing a day of data shouldn't crash the system. You log what went wrong, alert the team, and keep going with what you have.
- Is repeatable – Run it 100 times, get the same output every time. No more "it worked yesterday but not today" mysteries.
- Is transparent – You know what happened at each step. Logging isn't optional; it's critical. When something breaks, you have the full story.
- Scales – Works with 1MB or 10GB without refactoring. The logic stays the same; only the data volume changes.
- Is testable – You can verify each component works before running the whole thing. Unit tests for transforms, integration tests for the full flow.
Without a pipeline, you're writing Jupyter notebooks for each data source, copy-pasting transforms, hoping nothing changes. Add a new column? Better find every place you referenced the old column count. Change the API response format? Welcome to two hours of debugging. That's not data science. That's chaos.
The industries best practitioners learned this lesson the hard way. Now they build pipelines first, ask questions later.
Pipeline Architecture: The Blueprint
Every solid pipeline has these layers, in order. Think of it as an assembly line where each station has one job: the previous station guarantees its output is valid, and it guarantees its output is valid for the next station. This separation of concerns makes debugging easier and testing faster.
┌─────────────────────────────────────┐
│ 1. INGEST: Load from sources │
├─────────────────────────────────────┤
│ 2. VALIDATE: Check structure/type │
├─────────────────────────────────────┤
│ 3. CLEAN: Fix missing, duplicates │
├─────────────────────────────────────┤
│ 4. TRANSFORM: Feature engineering │
├─────────────────────────────────────┤
│ 5. AGGREGATE: Rollup/summary stats │
├─────────────────────────────────────┤
│ 6. EXPORT: Write to final format │
└─────────────────────────────────────┘
Each layer is a separate function with clear inputs, outputs, and error handling. No surprises. No magic. Why split it up instead of doing everything in one giant function? Because when something breaks, and it will, you need to know which step failed. If you have one 500-line function, you're debugging blind. If you have six focused functions, you know exactly where to look.
The beauty of this architecture is that you can test each layer independently. You can mock the ingestion layer and test validation in isolation. You can test cleaning without touching the original files. This makes your test suite fast and your debugging straightforward.
Our Project: A Sales Data Pipeline
Imagine you're building a report for an e-commerce platform. You're the go-to person for getting data to the analytics team, the ML engineers, and the business stakeholders. Data arrives from three messy places, and everyone wants a single source of truth. Sound familiar?
Here's what you're dealing with:
- Orders CSV – Daily export from the database. Sometimes it arrives at 3am, sometimes at 4pm. Sometimes it has duplicate rows. Sometimes a column gets renamed. You never know.
- Products API – Real-time product catalog. It's pagination-based, which means slow. It sometimes times out. Prices change hourly.
- SQLite DB – Customer history. It's append-only, so you'll accumulate historical records. Schema changes are rare but do happen without warning.
Your job: combine these, validate them to catch issues immediately, clean them, and export a single Parquet file ready for analysis or ML. The output needs to be consistent, trustworthy, and repeatable. Run the same pipeline twice and get the exact same results (or deliberately get new data if that's what you want).
Let's build it step by step. We'll start with project structure, then add logging, ingestion, validation, cleaning, transformation, and orchestration. By the end, you'll have a system you could hand to a colleague and they'd know exactly how to run it.
Step 1: Set Up the Project Structure
Before you write any code, you need a home for your pipeline. A good directory structure makes it easy for teammates to understand how the code is organized and where things go. It also prevents the common mistake of having notebooks and scripts scattered everywhere, each with its own idea of where "raw" data lives.
mkdir data-pipeline && cd data-pipeline
mkdir -p data/{raw,processed} logs
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install pandas pandera requests sqlalchemy pyarrow loguruNotice a few things here: We create separate directories for raw and processed data. This prevents you from accidentally overwriting your source files, and it makes it clear which data is trusted and which isn't. We also create a logs/ directory upfront, logging is not an afterthought in production code. The pip install line includes some packages you haven't used yet: pandera for data validation (catches bad data before it spreads), requests for API calls, and loguru for better logging than Python's default.
Your directory looks like this:
data-pipeline/
├── data/
│ ├── raw/ # Downloaded/ingested files go here
│ └── processed/ # Final output
├── logs/ # Pipeline execution logs
├── pipeline.py # Main orchestration
├── sources.py # Data ingestion functions
├── validators.py # Data validation schemas
├── transforms.py # Cleaning & transformation logic
├── tests/ # Unit & integration tests
│ ├── test_sources.py
│ ├── test_transforms.py
│ └── test_pipeline.py
└── config.yaml # Configuration (paths, thresholds)
This layout keeps concerns separated: ingestion logic is in sources.py, validation is in validators.py, cleaning and transformation live in transforms.py. When you have a bug in data cleaning, you know exactly which file to look in. When you want to add a new data source, you add a function to sources.py and maybe a schema to validators.py. Tests live in their own directory so they don't clutter the main code.
Step 2: Configure Logging
Production pipelines must be transparent. Imagine running your pipeline and it crashes. Do you know which step failed? How many rows were processed? Did the API time out on the first retry or the fifth? Without logging, you're debugging blind. With logging, you have a complete trail of what happened. This is non-negotiable in production.
We'll use loguru instead of Python's built-in logging module because it's simpler and handles file rotation automatically. Here's the setup:
# pipeline.py
from loguru import logger
import sys
from datetime import datetime
# Remove default handler, add file handler
logger.remove()
log_file = f"logs/pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}",
level="DEBUG"
)
logger.add(sys.stderr, format="{message}", level="INFO")
logger.info("Pipeline starting...")What's happening here? We remove the default handler (which prints to stdout) and add two new ones: one to a file (for keeping history) and one to stderr (so you see messages in real time). The file gets a timestamp in the name so each run creates a new file. We log at DEBUG level to the file (verbose, for debugging) and INFO level to the console (summary info only). The format string tells us exactly where each log came from: time, severity level, file name, function name, line number, and message.
Every step in your pipeline will call logger.info() or logger.error() as appropriate. When something breaks, you open the log file and see exactly what happened and where. This is the difference between "the pipeline failed" and "the API timed out on the 3rd retry while fetching the products list at 14:32:15."
Step 3: Ingest Data from Multiple Sources
Now comes the hard part: actually getting the data. In the real world, data lives in many places with different formats and access patterns. One source is a CSV file that someone emails you. Another is an API you need to call. A third is a database table. Each one needs a different approach, but they should all return the same thing: a clean Pandas DataFrame.
The key insight here is separation of concerns. You don't want to mix ingestion logic with validation logic. You don't want your cleaning function to know whether the data came from a file or an API. Each ingestion function returns a DataFrame; what's inside it isn't the ingestion function's problem (that's validation's job).
# sources.py
import pandas as pd
import requests
from sqlalchemy import create_engine
from loguru import logger
from pathlib import Path
def ingest_orders_csv(filepath: str) -> pd.DataFrame:
"""Load orders from CSV export."""
logger.info(f"Ingesting CSV from {filepath}")
try:
df = pd.read_csv(filepath)
logger.info(f"Loaded {len(df)} order records")
return df
except FileNotFoundError:
logger.error(f"File not found: {filepath}")
raise
def ingest_products_api(api_url: str, batch_size: int = 100) -> pd.DataFrame:
"""Fetch products from REST API with pagination."""
logger.info(f"Ingesting products from {api_url}")
all_products = []
offset = 0
while True:
logger.debug(f"Fetching batch at offset {offset}")
try:
response = requests.get(
api_url,
params={"offset": offset, "limit": batch_size},
timeout=10
)
response.raise_for_status()
data = response.json()
if not data:
break
all_products.extend(data)
offset += batch_size
except requests.RequestException as e:
logger.error(f"API request failed: {e}")
raise
df = pd.DataFrame(all_products)
logger.info(f"Fetched {len(df)} products")
return df
def ingest_customers_sqlite(db_path: str, query: str) -> pd.DataFrame:
"""Load customer data from SQLite."""
logger.info(f"Querying SQLite from {db_path}")
try:
engine = create_engine(f"sqlite:///{db_path}")
df = pd.read_sql(query, engine)
logger.info(f"Loaded {len(df)} customer records")
return df
except Exception as e:
logger.error(f"SQLite query failed: {e}")
raiseNotice several important patterns here:
Each function does one thing – The CSV loader loads CSVs. The API loader fetches from APIs. The database loader queries databases. This makes it easy to test each one independently and easy to add new sources (just write a new function).
Clear input/output contracts – You know exactly what goes in (filepath, URL, database path) and what comes out (a Pandas DataFrame). No hidden dependencies.
Verbose logging – At the start, we log what we're doing and from where. At the end, we log how many records we got. If something goes wrong in the middle, you can see which batch failed. In production, you want to be able to read the log file and understand exactly what happened.
Error handling – We don't silently fail. If a file is missing, we log it and raise the exception (so the pipeline stops and someone notices). If the API times out, we log that too. A common mistake is ignoring errors and returning empty DataFrames, which causes downstream problems that are much harder to debug.
One more thing to note: the API ingestion uses pagination. Many real-world APIs don't return all data in one request; they return it in chunks. Here we fetch batch by batch until we get an empty response, indicating we've reached the end. This pattern handles APIs that return 100 items per page gracefully, whether there are 100 total items or 100,000.
Step 4: Validate Data Structure
At this point, you have raw data from three sources. But is it what you expected? Did the CSV file get corrupted? Did the API change its response format? Did someone modify the database schema? These things happen all the time, and if you don't catch them immediately, garbage data propagates through your pipeline, corrupting your analysis and wasting everyone's time.
This is where validation comes in. We'll use pandera, which lets you define schemas (rules) about what valid data should look like, then check incoming data against those rules. If something doesn't match, the pipeline stops and alerts you. Much better than discovering the problem three days later when you're explaining to your boss why the report is wrong.
# validators.py
import pandas as pd
import pandera as pa
from loguru import logger
# Define what valid data looks like
orders_schema = pa.DataFrameSchema({
"order_id": pa.Column(int, checks=pa.Check.greater_than(0)),
"customer_id": pa.Column(int, checks=pa.Check.greater_than(0)),
"order_date": pa.Column("object"), # Will convert to datetime
"amount": pa.Column(float, checks=pa.Check.greater_than(0)),
"status": pa.Column(str, checks=pa.Check.isin(["pending", "completed", "cancelled"]))
})
products_schema = pa.DataFrameSchema({
"product_id": pa.Column(int, checks=pa.Check.greater_than(0)),
"name": pa.Column(str),
"price": pa.Column(float, checks=pa.Check.greater_than(0)),
"category": pa.Column(str),
})
customers_schema = pa.DataFrameSchema({
"customer_id": pa.Column(int, checks=pa.Check.greater_than(0)),
"name": pa.Column(str),
"email": pa.Column(str),
"signup_date": pa.Column("object"),
})
def validate_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Validate orders against schema."""
logger.info(f"Validating {len(df)} order records")
try:
validated = orders_schema.validate(df)
logger.info("Orders validation passed")
return validated
except pa.errors.SchemaError as e:
logger.error(f"Orders validation failed: {e}")
raise
def validate_products(df: pd.DataFrame) -> pd.DataFrame:
"""Validate products against schema."""
logger.info(f"Validating {len(df)} products")
try:
validated = products_schema.validate(df)
logger.info("Products validation passed")
return validated
except pa.errors.SchemaError as e:
logger.error(f"Products validation failed: {e}")
raise
def validate_customers(df: pd.DataFrame) -> pd.DataFrame:
"""Validate customers against schema."""
logger.info(f"Validating {len(df)} customer records")
try:
validated = customers_schema.validate(df)
logger.info("Customers validation passed")
return validated
except pa.errors.SchemaError as e:
logger.error(f"Customers validation failed: {e}")
raiseLook at what the schemas are checking:
- Correct columns – Does the DataFrame have the columns we expect (order_id, customer_id, etc.)?
- Correct types – Is order_id an integer? Is amount a float? Pandas sometimes guesses wrong when reading CSVs.
- Valid values – Are all order_ids greater than zero (no negative IDs)? Are all statuses one of the allowed values?
This catches issues immediately. If the API adds a new field that breaks deserialization, you know right away. If someone changes a database column from int to string, you'll see it during validation, not during downstream analysis. If a CSV file gets corrupted and has negative amounts, you catch it. No garbage in, no garbage out.
The error handling is important too: if validation fails, we log the error and raise the exception. This stops the pipeline immediately. It's better to alert someone that validation failed than to silently continue with bad data.
Step 5: Clean and Transform
Now that we've validated the structure, it's time to fix the content. Real-world data is messy. You'll have duplicate rows (a customer appears twice because someone imported the file twice). You'll have missing values (a product has no category). You'll have inconsistent formats (dates stored as strings, categories in mixed case). Your job is to clean all of this up.
Why is this important? Because garbage input produces garbage output. If you merge duplicated orders, your totals will be wrong. If you don't convert date strings to actual datetime objects, you can't do time-series analysis. If categories are sometimes "Electronics" and sometimes "electronics", your grouping won't work. Cleaning is tedious but essential.
# transforms.py
import pandas as pd
from loguru import logger
from datetime import datetime
def clean_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Clean orders data."""
logger.info(f"Cleaning {len(df)} order records")
df = df.copy()
# Convert order_date to datetime
df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
# Remove duplicates (keep first occurrence)
before = len(df)
df = df.drop_duplicates(subset=["order_id"], keep="first")
logger.info(f"Removed {before - len(df)} duplicate orders")
# Remove rows with null order_id (required field)
df = df.dropna(subset=["order_id"])
# Add derived columns
df["year"] = df["order_date"].dt.year
df["month"] = df["order_date"].dt.month
df["day_of_week"] = df["order_date"].dt.day_name()
logger.info(f"Cleaned down to {len(df)} valid records")
return df
def clean_products(df: pd.DataFrame) -> pd.DataFrame:
"""Clean products data."""
logger.info(f"Cleaning {len(df)} products")
df = df.copy()
# Remove duplicates (keep first)
before = len(df)
df = df.drop_duplicates(subset=["product_id"], keep="first")
logger.info(f"Removed {before - len(df)} duplicate products")
# Standardize category names (lowercase, strip whitespace)
df["category"] = df["category"].str.lower().str.strip()
# Remove products with missing required fields
df = df.dropna(subset=["product_id", "name", "price"])
logger.info(f"Cleaned down to {len(df)} valid products")
return df
def clean_customers(df: pd.DataFrame) -> pd.DataFrame:
"""Clean customer data."""
logger.info(f"Cleaning {len(df)} customer records")
df = df.copy()
# Convert signup_date to datetime
df["signup_date"] = pd.to_datetime(df["signup_date"], errors="coerce")
# Remove duplicate customers
before = len(df)
df = df.drop_duplicates(subset=["customer_id"], keep="first")
logger.info(f"Removed {before - len(df)} duplicate customers")
# Remove customers with missing email (contact info required)
df = df.dropna(subset=["email"])
logger.info(f"Cleaned down to {len(df)} valid customers")
return df
def merge_data(orders: pd.DataFrame, products: pd.DataFrame,
customers: pd.DataFrame) -> pd.DataFrame:
"""Merge all tables into single analysis-ready dataset."""
logger.info("Merging orders with products and customers")
# Merge orders with products
merged = orders.merge(products, on="product_id", how="left", suffixes=("_order", "_product"))
logger.info(f"After product merge: {len(merged)} rows")
# Merge with customers
merged = merged.merge(customers, on="customer_id", how="left", suffixes=("", "_customer"))
logger.info(f"After customer merge: {len(merged)} rows")
# Clean up column names (merge adds suffixes we don't need)
merged.columns = [col.replace("_order", "").replace("_product", "").replace("_customer", "")
for col in merged.columns]
logger.info(f"Final merged dataset: {len(merged)} rows × {len(merged.columns)} columns")
return mergedNotice every function logs its work. When you run this, you'll see lines like "Removed 42 duplicate orders" and "Cleaned down to 9,958 valid records." This tells you exactly how much data was lost at each step. If you suddenly lose 90% of your data, you'll see it and investigate.
A few key patterns worth understanding:
- Always copy the DataFrame first (
df = df.copy()). This prevents the infamous SettingWithCopyWarning and keeps you from accidentally modifying the original data. - Track before/after row counts (
before = len(df)). When something unexpected happens downstream, you can trace it back to which cleaning step caused the problem. - Use errors="coerce" when converting date strings. This converts unparseable dates to NaT (Not a Time) instead of crashing, letting you see which records have bad dates.
- Standardize strings (lowercase, strip whitespace). Your categories need to be consistent so grouping and merging work correctly.
The merge at the end combines all three DataFrames into one. We use how="left" so we keep all orders even if there's no matching product or customer. This prevents data loss, though you'll want to investigate any orders with missing products or customers.
Step 6: Aggregate and Export
At this point, you have clean, merged data. Now you need to extract insights and prepare the output. Aggregation means rolling up the data into summaries (total spend per customer, number of orders, etc.). Exporting means writing it to a format that's ready for the next stage: analysis or machine learning.
# transforms.py (continued)
def aggregate_by_customer(df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate order metrics by customer."""
logger.info("Aggregating metrics by customer")
agg = df.groupby("customer_id").agg({
"order_id": "count", # Number of orders
"amount": ["sum", "mean"], # Total and average spend
"order_date": "max", # Most recent order
"name": "first", # Customer name
"email": "first", # Email
}).reset_index()
# Flatten multi-level column names
agg.columns = ["_".join(col).strip("_") if col[1] else col[0]
for col in agg.columns.values]
agg.columns = ["customer_id", "order_count", "total_spend", "avg_spend", "last_order_date", "name", "email"]
logger.info(f"Aggregated to {len(agg)} unique customers")
return agg
def export_parquet(df: pd.DataFrame, output_path: str, compression: str = "snappy") -> None:
"""Export DataFrame to Parquet format."""
logger.info(f"Exporting {len(df)} rows to {output_path}")
try:
df.to_parquet(output_path, compression=compression, index=False)
file_size = Path(output_path).stat().st_size / (1024 * 1024) # MB
logger.info(f"Successfully exported ({file_size:.2f} MB)")
except Exception as e:
logger.error(f"Export failed: {e}")
raiseWhy aggregation? Because the raw transactional data (one row per order) is great for some analyses but not others. If you want to predict customer lifetime value or segment customers by behavior, you need customer-level features. The aggregation function creates those: order count, total spend, average order value, and most recent purchase date. These become the features your machine learning model will use.
Why Parquet? It's a columnar binary format that's become the standard for data pipelines. Unlike CSV, it preserves data types (so your dates stay dates, not strings). Unlike JSON, it's highly compressed. Downstream tools like Spark, DuckDB, or polars can read Parquet files extremely fast. It's also the native format for many ML frameworks. The snappy compression balances speed and compression ratio, it's not the most compact but it's very fast to decompress.
The logging at the end tells you exactly what you're exporting: how many rows, what file, and how big it is. This is crucial. If the file is 10MB but you expected 100MB, something went wrong. If it's 1GB but you expected 10MB, someone changed the pipeline and didn't tell you. The log is your early warning system.
Step 7: Orchestrate the Pipeline
All the individual functions are great, but they need a maestro to coordinate them. This is the orchestration layer: a single function that runs all the steps in the right order, handles errors, logs progress, and reports results. If step 3 fails, the whole pipeline stops, we don't silently continue and produce corrupted output.
# pipeline.py
import pandas as pd
from pathlib import Path
from loguru import logger
from datetime import datetime
from sources import ingest_orders_csv, ingest_products_api, ingest_customers_sqlite
from validators import validate_orders, validate_products, validate_customers
from transforms import (
clean_orders, clean_products, clean_customers,
merge_data, aggregate_by_customer, export_parquet
)
def run_pipeline(config: dict) -> bool:
"""Execute the entire data pipeline."""
start_time = datetime.now()
logger.info("=" * 60)
logger.info("DATA PIPELINE STARTED")
logger.info("=" * 60)
try:
# INGEST
logger.info("\n[STEP 1] Ingesting data from sources...")
orders = ingest_orders_csv(config["orders_csv"])
products = ingest_products_api(config["products_api"])
customers = ingest_customers_sqlite(config["customers_db"], config["customers_query"])
# VALIDATE
logger.info("\n[STEP 2] Validating schemas...")
orders = validate_orders(orders)
products = validate_products(products)
customers = validate_customers(customers)
# CLEAN
logger.info("\n[STEP 3] Cleaning data...")
orders = clean_orders(orders)
products = clean_products(products)
customers = clean_customers(customers)
# TRANSFORM & MERGE
logger.info("\n[STEP 4] Merging datasets...")
merged = merge_data(orders, products, customers)
# AGGREGATE
logger.info("\n[STEP 5] Aggregating metrics...")
customer_metrics = aggregate_by_customer(merged)
# EXPORT
logger.info("\n[STEP 6] Exporting results...")
output_path = config["output_path"]
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
export_parquet(merged, output_path)
# SUCCESS
elapsed = (datetime.now() - start_time).total_seconds()
logger.info("\n" + "=" * 60)
logger.info(f"PIPELINE COMPLETED SUCCESSFULLY ({elapsed:.2f}s)")
logger.info("=" * 60)
return True
except Exception as e:
logger.error(f"\nPIPELINE FAILED: {e}")
logger.error("=" * 60)
return False
if __name__ == "__main__":
config = {
"orders_csv": "data/raw/orders.csv",
"products_api": "https://api.example.com/products",
"customers_db": "data/raw/customers.db",
"customers_query": "SELECT * FROM customers",
"output_path": "data/processed/sales_pipeline.parquet"
}
success = run_pipeline(config)
exit(0 if success else 1)This is the conductor. Notice the structure: each step depends on the previous one. If step 3 (cleaning) fails, we never reach step 4 (merging). If step 2 (validation) fails, we don't corrupt the database by writing invalid data. The whole thing is wrapped in a try/except block, so if anything goes wrong anywhere, we log it and return False.
The configuration dictionary at the bottom holds all the settings: where files are, what API to call, which database to query, where to write output. This makes it easy to change paths without modifying code. In production, you'd load this from a YAML or JSON file or environment variables, so different environments (dev, test, production) can have different paths without code changes.
The return value (True/False) is used for the exit code. If the pipeline succeeds, exit(0) tells the operating system "this ran successfully." If it fails, exit(1) tells the OS "something went wrong." This matters for orchestration tools like Airflow or cron, which use exit codes to decide whether to alert someone or retry.
Step 8: Add Incremental Processing
In the basic pipeline, every time you run it, you ingest all data from scratch. If you have a million orders, you reprocess all of them every run. If you run this every day, that's a lot of wasted work. And if the API has rate limits, you might hit them and get throttled.
The solution is incremental processing: only process data that's new since the last successful run. This is where watermarks come in. A watermark is a timestamp that says "the last time we successfully ran this pipeline was at 2024-02-25 14:30:00. Only process orders newer than that."
# pipeline.py (continued)
def get_last_processed_date(watermark_file: str) -> datetime:
"""Read the last successful processing timestamp."""
if Path(watermark_file).exists():
with open(watermark_file) as f:
return datetime.fromisoformat(f.read().strip())
return datetime(1900, 1, 1) # Default: process everything
def update_watermark(watermark_file: str) -> None:
"""Update watermark to current time."""
Path(watermark_file).parent.mkdir(parents=True, exist_ok=True)
with open(watermark_file, "w") as f:
f.write(datetime.now().isoformat())
def run_pipeline_incremental(config: dict) -> bool:
"""Run pipeline, processing only new data since last run."""
watermark_file = config.get("watermark_file", "data/.watermark")
last_run = get_last_processed_date(watermark_file)
logger.info(f"Last successful run: {last_run}")
try:
# Ingest only new records
logger.info(f"Fetching orders modified after {last_run}...")
orders = ingest_orders_csv(config["orders_csv"])
orders = orders[pd.to_datetime(orders["order_date"]) > last_run]
logger.info(f"Found {len(orders)} new orders")
# ... rest of pipeline
update_watermark(watermark_file)
return True
except Exception as e:
logger.error(f"Pipeline failed, watermark NOT updated")
return FalseNotice the important detail: we update the watermark only if the pipeline succeeds. If the pipeline fails partway through, the watermark doesn't change, so next time we'll retry the same data. This prevents data loss if the pipeline crashes.
The filtering logic (orders[pd.to_datetime(orders["order_date"]) > last_run]) assumes your data has a timestamp column that tells you when it was created or modified. Not all data sources have this. CSVs might not. APIs might return all records every time. In those cases, you need different strategies: you might append to a database instead of replacing files, or you might use checksums to detect duplicates. The principle is the same: only process what's new.
Incremental processing is crucial for production pipelines. It makes them faster, cheaper (fewer API calls), and more resilient to failures. The first time you run it, the watermark is from 1900, so you process everything. After that, you process only what's changed.
Step 9: Write Tests
You've built a beautiful pipeline. Now you need confidence that it actually works. This is where tests come in. A test is a small program that checks whether a function behaves as expected. It catches bugs before they reach production. It documents how your code is supposed to work. It gives you the confidence to refactor without breaking things.
# tests/test_transforms.py
import pandas as pd
import pytest
from transforms import clean_orders, clean_products, merge_data
def test_clean_orders_removes_duplicates():
"""Duplicates should be removed, keeping first."""
df = pd.DataFrame({
"order_id": [1, 1, 2, 3],
"customer_id": [10, 10, 20, 30],
"order_date": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-03"],
"amount": [100, 100, 200, 150],
"status": ["completed", "completed", "pending", "completed"]
})
result = clean_orders(df)
assert len(result) == 3, "Should remove one duplicate"
assert result["order_id"].tolist() == [1, 2, 3]
def test_clean_products_standardizes_categories():
"""Category names should be lowercase."""
df = pd.DataFrame({
"product_id": [1, 2, 3],
"name": ["Widget", "Gadget", "Tool"],
"price": [10.0, 20.0, 30.0],
"category": ["ELECTRONICS", " Home ", "tools"]
})
result = clean_products(df)
assert result["category"].tolist() == ["electronics", "home", "tools"]
def test_merge_data_combines_tables():
"""Merged data should have columns from all sources."""
orders = pd.DataFrame({
"order_id": [1, 2],
"customer_id": [10, 20],
"product_id": [100, 101],
"amount": [50.0, 75.0]
})
products = pd.DataFrame({
"product_id": [100, 101],
"name": ["Widget", "Gadget"],
"price": [25.0, 40.0]
})
customers = pd.DataFrame({
"customer_id": [10, 20],
"name": ["Alice", "Bob"],
"email": ["alice@example.com", "bob@example.com"]
})
result = merge_data(orders, products, customers)
assert len(result) == 2
assert "name" in result.columns
assert "email" in result.columns
# Run: pytest tests/To run these tests, type pytest tests/ from your project directory. Pytest will discover and run all functions starting with test_. Each test creates a small sample DataFrame, runs a transform, and checks that the output is correct.
Why test each transform independently? Because when something breaks in production, you want to pinpoint the problem fast. If you test the whole pipeline as one thing and it fails, where do you look? All of it. If you test individual transforms, you know exactly which one is broken. You can fix that one function while the rest continues working.
The tests also serve as documentation. Someone reading test_clean_orders_removes_duplicates learns exactly what that function should do. The tests are also your safety net when you refactor. Change the implementation of clean_orders but keep the same behavior? All tests still pass. Change the behavior accidentally? Tests fail and you notice immediately.
A common mistake is skipping tests or writing minimal tests. "I'll test manually" people say. But manual testing doesn't scale. If you change something in your clean_orders function, you'd need to manually test it again. If you have automated tests, they run in milliseconds. Another common mistake is testing the wrong thing. Don't test that Pandas works (it already has tests). Test that YOUR code does what you expect. Test edge cases: empty DataFrames, missing columns, invalid values. Test the happy path and the sad path.
Running It All Together
Once everything is in place, running your pipeline is simple:
# From the pipeline directory
python pipeline.py
# You'll see:
# ============================================================
# DATA PIPELINE STARTED
# ============================================================
# 2024-02-25 14:30:22 | INFO | Ingesting CSV from data/raw/orders.csv
# 2024-02-25 14:30:23 | INFO | Loaded 50000 order records
# 2024-02-25 14:30:24 | INFO | Fetched 2500 products
# ...
# ============================================================
# PIPELINE COMPLETED SUCCESSFULLY (12.45s)
# ============================================================This output tells you everything you need to know: when it started, what it loaded, how many records each source had, when it finished, and how long it took. If something goes wrong, you'll see an error message pointing to the exact problem. All of this detail is also written to a log file in the logs/ directory, which you can review later or send to a colleague for debugging.
Your output Parquet file is now in data/processed/sales_pipeline.parquet, ready for analysis or machine learning. It contains clean, validated, merged data from three sources. You can load it with:
import pandas as pd
df = pd.read_parquet("data/processed/sales_pipeline.parquet")
print(df.shape) # (10000, 12) or whatever your data is
print(df.head()) # See the first few rowsThe beauty of Parquet is that it preserves all the data types and compression you specified. No more guessing whether a column is a string or a number. The file is also much smaller than a CSV would be, so it loads faster.
Key Takeaways
You've just built a production-grade data pipeline. Here's what makes it solid:
-
Modular – Each function does one thing, testable in isolation. When something breaks, you know exactly where. When you need to add a feature, you know exactly where to add it.
-
Observable – Logging at every step, clear failure messages. You can read the logs and understand what happened, even days or weeks later. You can see how many records were lost at each step, what errors occurred, and how long everything took.
-
Validated – Pandera catches data quality issues immediately. Bad data gets flagged before it spreads to downstream systems where it's harder to debug.
-
Scalable – Works with growing data volumes. The logic doesn't change if you have 1,000 orders instead of 1 million. If performance becomes an issue, you can optimize individual functions without rewriting everything.
-
Testable – Unit tests for transforms, integration tests for flow. You can verify each piece works before putting it together. Tests make refactoring safe.
-
Incremental – Watermarks avoid reprocessing. Each run only processes new data, making the pipeline faster and cheaper.
-
Maintainable – Clear structure, error handling, documentation. A colleague can read your code and understand what it does. Six months from now, you can read it and remember what you were thinking.
-
Reproducible – Run it a hundred times, get the same results. This is crucial for data science. You need to be able to tell stakeholders "this analysis is based on this data, processed this way, on this date."
This is exactly the pattern used in real data teams at Google, Airbnb, Netflix, and everywhere else that takes data seriously. You're not writing Jupyter notebooks anymore. You're building systems that scale, that are reliable, that can be maintained by multiple people over years.
Common Mistakes to Avoid
Before you go build your own pipelines, let me share a few things we all learn the hard way:
Mistake 1: Not logging enough. You think "I'll just print to the console." Then your pipeline runs in the background on a server and you have no idea what happened. Log everything. Log ingestion, validation, cleaning, merging, exporting. Log row counts before and after each step. Log errors with full tracebacks.
Mistake 2: Ignoring data quality. You think "the data is probably fine." Then someone adds a NULL value to the orders table and your whole analysis breaks. Validate early, validate often. Define schemas and check them.
Mistake 3: One giant function. Everything in one pipeline function that's 500 lines long. Then something breaks and you don't know where. Break it into small pieces. Test the pieces separately.
Mistake 4: Hardcoding paths. Your pipeline works on your laptop but not on the server because you hardcoded /Users/yourname/data/ in the code. Use configuration files or environment variables.
Mistake 5: Assuming data never changes. Someone changes the API response format and your pipeline silently produces garbage. Add version handling. Log schema changes. Set up alerts when validation fails.
Mistake 6: Not testing. "Testing takes too long." No, debugging bad data in production takes too long. Write tests. They're fast. They're worth it.
Mistake 7: Reprocessing everything every time. Your pipeline takes an hour to run because it reprocesses a year of data every day. Use watermarks or checksums to process only new data.
These mistakes are all fixable. Most data engineers have made all of them. The ones who succeed are the ones who learn from them and build better systems next time.
What's Next
You've completed Cluster 7: Data Science with Python. You now know how to acquire, explore, and prepare data at scale. You can ingest from multiple sources, validate data quality, clean and transform, and export in production-ready formats. You can build pipelines that are maintainable, testable, and scalable.
The next cluster, Cluster 8: Machine Learning Foundations, will teach you what to do with that prepared data. Models. Training. Evaluation. Deployment. You'll take these clean, validated datasets and train algorithms that make predictions. You'll learn about supervised and unsupervised learning, training and test sets, feature importance, and model evaluation.
You have all the data science fundamentals. Time to build AI.