How to Build a Data Migration Pipeline: ETL Patterns and Validation
Step-by-step guide to migrating data between systems. Covers schema mapping, ETL pipeline construction, data validation, and zero-downtime cutover strategies.
Data migration is where good intentions meet messy reality. The source system never has the data you expected, the mapping is never 1:1, and there’s always a column called misc_field_3 that everyone depends on but nobody can explain. This guide gives you a structured approach.
Step 1: Profile Your Source Data
Never trust the documentation. Profile the actual data.
1.1 SQL-Based Profiling
-- Column-level profiling: nulls, cardinality, min/max
SELECT
column_name,
data_type,
COUNT(*) AS total_rows,
COUNT(column_name) AS non_null,
COUNT(*) - COUNT(column_name) AS null_count,
ROUND(100.0 * (COUNT(*) - COUNT(column_name)) / COUNT(*), 2) AS null_pct,
COUNT(DISTINCT column_name) AS distinct_values
FROM information_schema.columns c
CROSS APPLY (
SELECT TOP 1000000 *
FROM your_source_table
) t
GROUP BY column_name, data_type
ORDER BY null_pct DESC;
1.2 Python Profiling
import pandas as pd
df = pd.read_sql("SELECT * FROM customers", conn)
# Generate profiling report
profile = pd.DataFrame({
'dtype': df.dtypes,
'non_null': df.count(),
'null_pct': (df.isnull().sum() / len(df) * 100).round(2),
'unique': df.nunique(),
'sample': df.iloc[0]
})
print(profile.to_markdown())
Step 2: Build Your Schema Mapping Document
The mapping document is the contract between source and target. Every stakeholder must sign off.
| Source Table | Source Column | Type | Target Table | Target Column | Type | Transform |
|---|---|---|---|---|---|---|
| cust_master | cust_id | INT | customers | id | UUID | Generate UUID |
| cust_master | cust_nm | VARCHAR(50) | customers | full_name | VARCHAR(100) | Trim + Title Case |
| cust_master | phone_1 | VARCHAR(20) | customers | phone | VARCHAR(15) | Strip non-digits |
| cust_master | crt_dt | DATETIME | customers | created_at | TIMESTAMPTZ | Convert to UTC |
| cust_master | misc_field_3 | VARCHAR(100) | customers | legacy_ref | VARCHAR(100) | Pass-through |
:::tip[Golden Rule]
If a column exists in the source but you can’t map it, don’t delete it — park it in a _legacy or _unmapped table. You will need it during reconciliation.
:::
Step 3: Build the ETL Pipeline
3.1 Python + SQLAlchemy Pattern
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime
import uuid
# Connection setup
source_engine = create_engine("mssql+pyodbc://...")
target_engine = create_engine("postgresql://...")
BATCH_SIZE = 5000
def extract(offset, limit):
"""Extract a batch from source"""
query = text(f"""
SELECT cust_id, cust_nm, phone_1, crt_dt, misc_field_3
FROM cust_master
ORDER BY cust_id
OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY
""")
return pd.read_sql(query, source_engine,
params={"offset": offset, "limit": limit})
def transform(df):
"""Apply business rules and data cleansing"""
transformed = pd.DataFrame()
transformed['id'] = [str(uuid.uuid4()) for _ in range(len(df))]
transformed['full_name'] = df['cust_nm'].str.strip().str.title()
transformed['phone'] = df['phone_1'].str.replace(r'[^\d]', '', regex=True)
transformed['created_at'] = pd.to_datetime(df['crt_dt'], utc=True)
transformed['legacy_ref'] = df['misc_field_3']
transformed['migrated_at'] = datetime.utcnow()
transformed['source_id'] = df['cust_id'] # Keep for reconciliation
return transformed
def load(df):
"""Load batch into target"""
df.to_sql('customers', target_engine,
if_exists='append', index=False,
method='multi', chunksize=1000)
def run_migration():
"""Execute full migration in batches"""
offset = 0
total = 0
while True:
batch = extract(offset, BATCH_SIZE)
if batch.empty:
break
transformed = transform(batch)
load(transformed)
total += len(batch)
offset += BATCH_SIZE
print(f"Migrated {total} records...")
print(f"Migration complete: {total} records")
if __name__ == "__main__":
run_migration()
Step 4: Validate the Migration
Validation is not optional. Every migration needs at minimum three checks.
4.1 Row Count Reconciliation
-- Source count
SELECT COUNT(*) AS source_count FROM cust_master;
-- Target count
SELECT COUNT(*) AS target_count FROM customers;
-- They must match (or you must explain the delta)
4.2 Checksum Validation
-- Source checksum (SQL Server)
SELECT CHECKSUM_AGG(CHECKSUM(cust_id, cust_nm, phone_1))
FROM cust_master;
-- Target checksum (PostgreSQL equivalent)
SELECT md5(string_agg(
id::text || full_name || phone, ''
ORDER BY source_id
)) FROM customers;
4.3 Sample-Based Spot Checks
# Random sample comparison
import random
source_ids = pd.read_sql(
"SELECT cust_id FROM cust_master", source_engine
)['cust_id'].tolist()
sample = random.sample(source_ids, min(100, len(source_ids)))
for sid in sample:
source_row = pd.read_sql(
f"SELECT * FROM cust_master WHERE cust_id = {sid}",
source_engine
).iloc[0]
target_row = pd.read_sql(
f"SELECT * FROM customers WHERE source_id = {sid}",
target_engine
).iloc[0]
# Compare transformed values
assert target_row['full_name'] == source_row['cust_nm'].strip().title()
print(f"✔ Record {sid} validated")
Step 5: Plan the Cutover
Zero-Downtime Strategy
- T-7 days: Run full historical migration
- T-1 day: Run incremental sync (changed records only)
- T-4 hours: Set source to read-only mode
- T-2 hours: Final incremental sync
- T-0: Switch application to target database
- T+1 hour: Validate in production
- T+24 hours: Confirm or rollback
-- Change tracking for incremental sync (SQL Server)
ALTER DATABASE SourceDB SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 7 DAYS, AUTO_CLEANUP = ON);
-- Get changes since last sync
SELECT ct.cust_id, ct.SYS_CHANGE_OPERATION
FROM CHANGETABLE(CHANGES cust_master, @last_sync_version) AS ct;
Migration Checklist
- Profile source data (nulls, types, cardinality)
- Build and sign off schema mapping document
- Implement ETL pipeline with batch processing
- Row count reconciliation (source = target)
- Checksum validation
- Sample-based spot checks (100+ records)
- Incremental sync mechanism tested
- Cutover runbook with rollback steps
- Stakeholder sign-off on validation results
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For a managed data migration engagement, visit garnetgrid.com. :::