Return Zero

Python: Simple Data Pipelines Optimization Tips

Mr Silva

I’ve been dipping my toes into pipelines for some time now, and I’ve learned some hard lessons along the way. This isn’t glamorous work, it’s plain, simple drudge work, the bottom-of-the-barrel, get-your-hands-dirty kind of stuff that nobody talks about in conference presentations

If you’re handling more than a million records per day (and trust me, that threshold comes faster than you think), these strategies will help you avoid the dreaded pipeline crashes, timeouts, and resource exhaustion that keep us up at night.

What I feel are “The Big Three”

  1. Making database integrations actually work with massive data volumes
  2. Speeding up delivery times (because nobody likes waiting)
  3. Smart data storage that won’t break the bank or your servers

Taming Large Data Volume Integrations

Picture this: You’re trying to process huge number of records, your database is groaning under the load, and suddenly everything grinds to a halt. Sound familiar? I’ve been there, and here’s what I learned.

The golden rule? Break it down into bite-sized pieces. And please, for the love of all that’s holy, avoid LIMIT and OFFSET – they’re performance killers at scale.

Method 1: Time-Based Chunking (The Hour-by-Hour Approach)

This is my go-to method when I need something that works reliably. Instead of trying to swallow an entire day’s worth of data in one gulp, I split it into hourly chunks.

Here’s how it looks for processing logs from September 1st, 2024:

-- Hour 1
SELECT * FROM logs WHERE updated_at BETWEEN '2024-09-01 00:00:00' AND '2024-09-01 00:59:59'

-- Hour 2  
SELECT * FROM logs WHERE updated_at BETWEEN '2024-09-01 01:00:00' AND '2024-09-01 01:59:59'

-- And so on... up to hour 24
SELECT * FROM logs WHERE updated_at BETWEEN '2024-09-01 23:00:00' AND '2024-09-01 23:59:59'

Why this works: Your database can actually breathe between queries, and you won’t get those nasty timeout errors.

The catch: Sometimes data isn’t evenly distributed. I once had a system that did bulk updates at 10 AM every day – guess which hour always took forever to process? 😅

Method 2: Primary Key Chunking (For When Data Gets Weird)

When time-based splitting doesn’t cut it (looking at you, uneven data distribution), I switch to this approach:

Step 1: Find all the IDs that were updated on your target date

SELECT id FROM logs WHERE DATE(updated_at) = '2024-09-01';

Step 2: Group them into manageable chunks (I like 1,000 records per batch)

SELECT * FROM logs WHERE id IN (id1, id2, id3, ... id1000);

The trade-off: You’re hitting the database twice, which adds complexity. But when you have uneven data distribution, this approach is a lifesaver.

Making Your Pipeline Actually Fast

Nobody wants to wait around for data. Here’s how I squeeze every bit of performance out of my pipelines.

Embrace the Power of Parallel Processing

Remember those 24 hourly chunks from earlier? Instead of running them one by one like a patient person, let’s get impatient and run multiple chunks simultaneously.

Here’s a Python example that runs 4 processes concurrently, each handling 6 queries:

import threading

def process(parent, index):
    print(f"Processing batch: {index}")
    data = execute_to_database(parent[index])
    return store_data(data)

# Organize our 24 hours into 4 groups of 6
parent = []
child = []

for x in range(24):
    x_1 = x + 1
    x_string = f"{x:02d}"  # Nice formatting trick!
    
    start = f"{x_string}:00:00"
    end = f"{x_string}:59:59"
    sql = f"SELECT * FROM logs WHERE updated_at BETWEEN '{start}' AND '{end}'"
    
    child.append(sql)
    
    # Every 6th item, start a new group
    if x_1 % 6 == 0:
        parent.append(child)
        child = []

# Fire up our parallel workers
threads = []
for i in range(4):
    thread = threading.Thread(target=process, args=(parent, i))
    threads.append(thread)
    thread.start()

# Wait for everyone to finish
for thread in threads:
    thread.join()

Smart Storage Solutions

Let’s talk about everyone’s favorite topic: making data storage both fast and cheap. (Okay, maybe it’s not everyone’s favorite, but it should be!)

Why Parquet Files Are Your Best Friend

I used to store everything in CSV files until I discovered Apache Parquet. Here’s why I’ll never go back:

The 10MB Rule

Here’s a trick that’s served me well: split your Parquet files into 10MB chunks. Smaller files = faster reads = happier systems.

import pandas as pd
import numpy as np
import math

# Convert your data to a DataFrame
data = pd.DataFrame(your_data_here)

# Calculate memory usage and determine chunk size
memory = data.memory_usage(deep=True).sum()
memory_in_mb = math.ceil(memory / 1048576)

# Aim for ~10MB per file
chunksize = math.ceil(memory_in_mb / 10)
split_data = np.array_split(data, chunksize)

# Save each chunk as a separate Parquet file
for i, chunk in enumerate(split_data):
    filename = f"data_chunk_{i}.parquet"
    chunk.to_parquet(filename)
    print(f"Saved {filename}")

The Simple Memory Trick Everyone Forgets

This one’s embarrassingly simple but incredibly effective: clean up after yourself!

data1 = pd.DataFrame(first_batch_of_data)
data2 = pd.DataFrame(second_batch_of_data)

# Combine them
all_data = pd.concat([data1, data2], axis=0)

# Here's the magic: free up that memory!
del data1, data2

# You just saved potentially gigabytes of RAM

I can’t tell you how many times this simple step has prevented out-of-memory errors in production.


Wrapping Up

These techniques have saved me more times than I can count. The key is to start with small optimizations and gradually work your way up to more complex solutions as your data grows.

Remember: a simple pipeline that works reliably at scale is worth its weight in gold. Your future self (and your on-call rotation) will thank you for building robust, optimized systems from the start.

Subscribe
Notify of
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top