Verified by Garnet Grid

How to Build a Data Migration Pipeline: ETL Patterns and Validation

Step-by-step guide to migrating data between systems. Covers schema mapping, ETL pipeline construction, data validation, and zero-downtime cutover strategies.

Data migration is where good intentions meet messy reality. The source system never has the data you expected, the mapping is never 1:1, and there’s always a column called misc_field_3 that everyone depends on but nobody can explain. This guide gives you a structured approach.


Step 1: Profile Your Source Data

Never trust the documentation. Profile the actual data.

1.1 SQL-Based Profiling

-- Column-level profiling: nulls, cardinality, min/max
SELECT
    column_name,
    data_type,
    COUNT(*) AS total_rows,
    COUNT(column_name) AS non_null,
    COUNT(*) - COUNT(column_name) AS null_count,
    ROUND(100.0 * (COUNT(*) - COUNT(column_name)) / COUNT(*), 2) AS null_pct,
    COUNT(DISTINCT column_name) AS distinct_values
FROM information_schema.columns c
CROSS APPLY (
    SELECT TOP 1000000 *
    FROM your_source_table
) t
GROUP BY column_name, data_type
ORDER BY null_pct DESC;

1.2 Python Profiling

import pandas as pd

df = pd.read_sql("SELECT * FROM customers", conn)

# Generate profiling report
profile = pd.DataFrame({
    'dtype': df.dtypes,
    'non_null': df.count(),
    'null_pct': (df.isnull().sum() / len(df) * 100).round(2),
    'unique': df.nunique(),
    'sample': df.iloc[0]
})
print(profile.to_markdown())

Step 2: Build Your Schema Mapping Document

The mapping document is the contract between source and target. Every stakeholder must sign off.

Source TableSource ColumnTypeTarget TableTarget ColumnTypeTransform
cust_mastercust_idINTcustomersidUUIDGenerate UUID
cust_mastercust_nmVARCHAR(50)customersfull_nameVARCHAR(100)Trim + Title Case
cust_masterphone_1VARCHAR(20)customersphoneVARCHAR(15)Strip non-digits
cust_mastercrt_dtDATETIMEcustomerscreated_atTIMESTAMPTZConvert to UTC
cust_mastermisc_field_3VARCHAR(100)customerslegacy_refVARCHAR(100)Pass-through

:::tip[Golden Rule] If a column exists in the source but you can’t map it, don’t delete it — park it in a _legacy or _unmapped table. You will need it during reconciliation. :::


Step 3: Build the ETL Pipeline

3.1 Python + SQLAlchemy Pattern

from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime
import uuid

# Connection setup
source_engine = create_engine("mssql+pyodbc://...")
target_engine = create_engine("postgresql://...")

BATCH_SIZE = 5000

def extract(offset, limit):
    """Extract a batch from source"""
    query = text(f"""
        SELECT cust_id, cust_nm, phone_1, crt_dt, misc_field_3
        FROM cust_master
        ORDER BY cust_id
        OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY
    """)
    return pd.read_sql(query, source_engine,
                       params={"offset": offset, "limit": limit})

def transform(df):
    """Apply business rules and data cleansing"""
    transformed = pd.DataFrame()
    transformed['id'] = [str(uuid.uuid4()) for _ in range(len(df))]
    transformed['full_name'] = df['cust_nm'].str.strip().str.title()
    transformed['phone'] = df['phone_1'].str.replace(r'[^\d]', '', regex=True)
    transformed['created_at'] = pd.to_datetime(df['crt_dt'], utc=True)
    transformed['legacy_ref'] = df['misc_field_3']
    transformed['migrated_at'] = datetime.utcnow()
    transformed['source_id'] = df['cust_id']  # Keep for reconciliation
    return transformed

def load(df):
    """Load batch into target"""
    df.to_sql('customers', target_engine,
              if_exists='append', index=False,
              method='multi', chunksize=1000)

def run_migration():
    """Execute full migration in batches"""
    offset = 0
    total = 0
    while True:
        batch = extract(offset, BATCH_SIZE)
        if batch.empty:
            break

        transformed = transform(batch)
        load(transformed)

        total += len(batch)
        offset += BATCH_SIZE
        print(f"Migrated {total} records...")

    print(f"Migration complete: {total} records")

if __name__ == "__main__":
    run_migration()

Step 4: Validate the Migration

Validation is not optional. Every migration needs at minimum three checks.

4.1 Row Count Reconciliation

-- Source count
SELECT COUNT(*) AS source_count FROM cust_master;

-- Target count
SELECT COUNT(*) AS target_count FROM customers;

-- They must match (or you must explain the delta)

4.2 Checksum Validation

-- Source checksum (SQL Server)
SELECT CHECKSUM_AGG(CHECKSUM(cust_id, cust_nm, phone_1))
FROM cust_master;

-- Target checksum (PostgreSQL equivalent)
SELECT md5(string_agg(
    id::text || full_name || phone, ''
    ORDER BY source_id
)) FROM customers;

4.3 Sample-Based Spot Checks

# Random sample comparison
import random

source_ids = pd.read_sql(
    "SELECT cust_id FROM cust_master", source_engine
)['cust_id'].tolist()

sample = random.sample(source_ids, min(100, len(source_ids)))

for sid in sample:
    source_row = pd.read_sql(
        f"SELECT * FROM cust_master WHERE cust_id = {sid}",
        source_engine
    ).iloc[0]

    target_row = pd.read_sql(
        f"SELECT * FROM customers WHERE source_id = {sid}",
        target_engine
    ).iloc[0]

    # Compare transformed values
    assert target_row['full_name'] == source_row['cust_nm'].strip().title()
    print(f"✔ Record {sid} validated")

Step 5: Plan the Cutover

Zero-Downtime Strategy

  1. T-7 days: Run full historical migration
  2. T-1 day: Run incremental sync (changed records only)
  3. T-4 hours: Set source to read-only mode
  4. T-2 hours: Final incremental sync
  5. T-0: Switch application to target database
  6. T+1 hour: Validate in production
  7. T+24 hours: Confirm or rollback
-- Change tracking for incremental sync (SQL Server)
ALTER DATABASE SourceDB SET CHANGE_TRACKING = ON
    (CHANGE_RETENTION = 7 DAYS, AUTO_CLEANUP = ON);

-- Get changes since last sync
SELECT ct.cust_id, ct.SYS_CHANGE_OPERATION
FROM CHANGETABLE(CHANGES cust_master, @last_sync_version) AS ct;

Migration Checklist

  • Profile source data (nulls, types, cardinality)
  • Build and sign off schema mapping document
  • Implement ETL pipeline with batch processing
  • Row count reconciliation (source = target)
  • Checksum validation
  • Sample-based spot checks (100+ records)
  • Incremental sync mechanism tested
  • Cutover runbook with rollback steps
  • Stakeholder sign-off on validation results

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For a managed data migration engagement, visit garnetgrid.com. :::