Designing Data-Intensive Applications — Chapter 11

Batch Processing

MapReduce, Spark, dataflow engines — processing massive datasets efficiently.

Prerequisites: Basic Python + File I/O. That's it.
11
Chapters
9+
Simulations
5
Interview Dimensions

Chapter 0: The Problem

You are on-call at a web company. Product wants the top 100 most-visited URLs from yesterday's access logs. Sounds easy, right? The catch: yesterday generated 10 TB of logs spread across 500 web servers. On a single machine reading from a local SSD at 500 MB/s, just reading the data takes 10 TB / 500 MB/s = 20,000 seconds, or about 5.5 hours. And that is before you do any sorting or counting.

You have 100 machines in your cluster. If you could somehow split the work evenly across all of them, the job should take about 3 minutes of reading time. But how do you split it? How do you handle one machine crashing mid-job? How do you combine 100 partial results into one final answer?

This is the fundamental question of batch processing: how do you process a massive dataset efficiently by distributing the work across many machines, while tolerating failures and producing correct results?

Batch vs online vs stream. There are three families of systems. Online systems (web servers, databases) handle individual requests as they arrive, with latency measured in milliseconds. Batch processing systems (the topic of this chapter) take a large, bounded dataset and produce a new derived dataset, with latency measured in minutes to hours. Stream processing systems (Kafka, Flink) sit between the two: they process unbounded data as it arrives, with latency measured in seconds. Batch is the oldest and simplest model. Understand it first.

The Surprising Power of One Machine

Before we reach for a distributed cluster, let us see how far a single machine can go. The Unix command line already has tools for this exact problem:

bash
# Count top URLs from an access log
cat access.log            \   # read the file
  | awk '{print $7}'      \   # extract the URL (7th field)
  | sort                   \   # sort alphabetically
  | uniq -c                \   # count consecutive identical lines
  | sort -rn               \   # sort by count, descending
  | head -100              \   # take top 100

This pipeline handles surprisingly large files. The sort command uses an external merge-sort algorithm: when data exceeds available memory, it writes sorted chunks to temporary files on disk, then merges them. It can sort files larger than RAM. On a modern machine with an NVMe SSD, this pipeline processes about 1 GB per minute. For a 1 GB log file, you are done in a minute. For 10 TB, you need about 7 days.

Seven days is too slow. You need to parallelize. But the shape of the solution is already visible in that Unix pipeline: read, extract, sort, count, sort, pick the top. MapReduce just distributes each of those steps across hundreds of machines.

One Machine vs Many: The Simulation

The canvas below shows the fundamental tradeoff. A single machine grinds through the full dataset sequentially. Ten machines each process one-tenth of the data in parallel, then combine results. Watch the time difference.

Single Machine vs Distributed Processing

Click "Run" to animate. The single machine processes all 10 chunks sequentially. The cluster processes them in parallel.

Ready

The single machine takes 10 units of time. The cluster takes 1 unit of processing plus a small overhead for distribution and merging. That overhead — the coordination cost — is what the rest of this chapter is about.

The key insight from this chapter. Batch processing is about taking a computation that works on a single machine (Unix pipes, SQL queries, Python scripts) and distributing it across a cluster while preserving correctness. The intellectual challenge is not the computation itself — it is handling partitioning, shuffling, failures, and stragglers.
Interview warm-up: You have 10 TB of web logs and need the top 100 URLs. A junior engineer suggests loading the data into a PostgreSQL database and running SELECT url, COUNT(*) FROM logs GROUP BY url ORDER BY COUNT(*) DESC LIMIT 100. What is the fundamental problem with this approach?

Chapter 1: Unix Philosophy

Before MapReduce, before Hadoop, before Spark — there was the Unix command line. The original batch processing system. And it embodies design principles that every modern data processing framework copied, whether they admit it or not.

The Four Tenets

Doug McIlroy, the inventor of Unix pipes, described the philosophy in 1978:

1. Make each program do one thing well. sort sorts. uniq deduplicates. wc counts. No program tries to do everything.

2. Expect the output of every program to become the input to another. Programs communicate through a uniform interface: streams of bytes (usually text, one record per line).

3. Design and build software to be tried early. Small tools are easy to test and debug in isolation.

4. Use tools in preference to unskilled help. Automate everything.

The uniform interface is the crucial innovation. Every program reads from stdin (standard input) and writes to stdout (standard output). The pipe (|) connects one program's stdout to another's stdin. No serialization format negotiation, no protocol handshakes, no schema definitions — just bytes flowing through a pipeline.

A Complete Example: Analyzing Access Logs

A standard Apache access log looks like this:

text
216.58.214.206 - - [17/May/2026:10:05:03 +0000] "GET /api/users HTTP/1.1" 200 3428
192.168.1.42  - - [17/May/2026:10:05:04 +0000] "GET /index.html HTTP/1.1" 200 12054
10.0.0.15     - - [17/May/2026:10:05:04 +0000] "POST /api/login HTTP/1.1" 302 0
216.58.214.206 - - [17/May/2026:10:05:05 +0000] "GET /api/users HTTP/1.1" 200 3428
192.168.1.42  - - [17/May/2026:10:05:06 +0000] "GET /about.html HTTP/1.1" 200 8192

To find the top 5 most-visited URLs:

bash
cat access.log       \  # Read all lines
  | awk '{print $7}' \  # Extract field 7 (the URL path)
  | sort              \  # Sort alphabetically: groups identical URLs together
  | uniq -c           \  # Count consecutive identical lines
  | sort -rn          \  # Sort numerically (-n), reversed (-r) = highest first
  | head -5           \  # Take the first 5 lines

Let us trace the data through every stage:

Unix Pipeline: Data Flow Through Each Stage

Click each stage button to see what the data looks like at that point in the pipeline.

Why sort Can Handle Datasets Bigger Than RAM

The sort command does not load the entire file into memory. It uses external merge sort:

1. Read chunks
Read as much data as fits in available memory (e.g., 4 GB).
2. Sort each chunk in-memory
Use quicksort or mergesort. Write the sorted chunk to a temporary file on disk.
3. Repeat
Read the next chunk, sort it, write to another temp file. Continue until all input is processed.
4. K-way merge
Open all sorted temp files simultaneously. Repeatedly pick the smallest element across all files. Write to stdout. This requires only one buffer per file, so memory usage is O(k) where k is the number of chunks.

This algorithm reads and writes the data exactly twice: once to create sorted chunks, once to merge them. The total I/O is 2N, and it works for arbitrarily large files. The same idea appears in MapReduce's shuffle phase.

Why Unix Pipes Do Not Scale Beyond One Machine

The Unix pipeline is elegant but limited in three fundamental ways:

LimitationWhy it mattersHow MapReduce solves it
Single machineAll data flows through one CPU, one disk. Throughput is bounded by a single machine's I/O bandwidth (~500 MB/s SSD, ~12 GB/s NVMe).Splits data across hundreds of machines, each reading its own local partition.
No parallelismThe pipe is strictly sequential: each stage must finish before the next can complete. (sort requires all input before producing output.)Map tasks run in parallel across all partitions. Reduce tasks run in parallel across all keys.
No fault toleranceIf the machine crashes at 90%, you restart from zero. For a 5-hour job, that is a disaster.Intermediate results are written to disk. If a task fails, only that task is re-run.

Worked Example: External Merge Sort With Numbers

Let us trace through an external merge sort with a concrete example to make the algorithm crystal clear. Say we have 12 numbers to sort but only enough memory for 4 at a time.

Input: [9, 3, 7, 1, 8, 2, 6, 4, 5, 11, 10, 12]
Memory limit: 4 numbers at a time

══════════ PHASE 1: Create sorted chunks ══════════
Read chunk 1: [9, 3, 7, 1] → sort in memory → write [1, 3, 7, 9] to temp_0
Read chunk 2: [8, 2, 6, 4] → sort in memory → write [2, 4, 6, 8] to temp_1
Read chunk 3: [5, 11, 10, 12] → sort in memory → write [5, 10, 11, 12] to temp_2

══════════ PHASE 2: K-way merge (k=3) ══════════
Open all 3 files. Read the first element from each:
  temp_0 head: 1   temp_1 head: 2   temp_2 head: 5
  Min = 1 (from temp_0). Output: [1]. Advance temp_0.

  temp_0 head: 2   temp_1 head: 2   temp_2 head: 5
  Min = 2 (tie — take from temp_1). Output: [1, 2]. Advance temp_1.

  temp_0 head: 3   temp_1 head: 3   temp_2 head: 5
  ... continue until all files exhausted ...

Final output: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

Total I/O: write 12 numbers (phase 1) + read 12 numbers (phase 2) = 2N
Memory used: 4 numbers + 3 read buffers (one per file) = O(memory + k)

This exact algorithm runs inside sort when you pipe a 100 GB file through it. Instead of 4 numbers, it sorts 4 GB chunks. Instead of 3 files, it might merge 25. But the logic is identical.

MapReduce = distributed Unix pipes. The map phase is like awk '{print $7}' — it extracts and transforms. The shuffle phase is like sort — it groups by key. The reduce phase is like uniq -c | sort -rn — it aggregates. MapReduce just does each step on many machines simultaneously.
Concept check: The Unix sort command uses external merge sort to handle files larger than RAM. How much disk I/O does this algorithm perform relative to the input size N?

Chapter 2: MapReduce Basics

In 2004, Jeff Dean and Sanjay Ghemawat at Google published the paper that changed data processing forever. Their insight was simple: most batch computations can be expressed as two functions — map and reduce — and if you constrain yourself to this API, the framework can handle all the hard parts (parallelism, fault tolerance, data distribution) automatically.

The Two Functions

Map takes one input record and emits zero or more key-value pairs. It runs independently on each input record with no shared state between records. This is what makes it trivially parallelizable — if you have 1000 input records and 10 machines, each machine can map 100 records with zero coordination.

Reduce takes a key and all the values associated with that key, and combines them into a single output. Every value with the same key is guaranteed to go to the same reducer. This is the aggregation step.

Between map and reduce sits the shuffle: the framework sorts and groups all map output by key, sending each key's values to the appropriate reducer. This is the expensive part — it requires network transfer across the cluster.

The Canonical Example: Word Count

Count how many times each word appears in a set of documents. This is the "Hello World" of MapReduce.

// Input: a set of documents, each a string of text
// Output: (word, count) for every distinct word

// MAP function: for each document, emit (word, 1) for every word
map(document) →
  for word in document.split():
    emit(word, 1)

// REDUCE function: for each word, sum all the 1s
reduce(word, values) →
  emit(word, sum(values))

Let us trace this through a concrete example with three input documents (three map tasks):

══════════ INPUT SPLITS ══════════
Split 0: "the cat sat on the mat"
Split 1: "the dog sat on the log"
Split 2: "the cat and the dog"

══════════ MAP OUTPUT (each mapper runs independently) ══════════
Mapper 0: (the,1) (cat,1) (sat,1) (on,1) (the,1) (mat,1)
Mapper 1: (the,1) (dog,1) (sat,1) (on,1) (the,1) (log,1)
Mapper 2: (the,1) (cat,1) (and,1) (the,1) (dog,1)

══════════ SHUFFLE (sort by key, group values) ══════════
and → [1]
cat → [1, 1]
dog → [1, 1]
log → [1]
mat → [1]
on → [1, 1]
sat → [1, 1]
the → [1, 1, 1, 1, 1, 1]

══════════ REDUCE OUTPUT (sum the values for each key) ══════════
and:1 cat:2 dog:2 log:1 mat:1 on:2 sat:2 the:6

Interactive MapReduce

MapReduce Word Count — Step by Step

Watch three mappers process three input splits in parallel, then the shuffle groups by key, and reducers aggregate. Click "Step" to advance.

Stage: Input Splits

Why This API Is Powerful

The map/reduce constraint seems limiting — you can only express computations as stateless transformations (map) followed by keyed aggregations (reduce). But this constraint is exactly what enables the framework to:

PropertyHow map/reduce enables it
ParallelismMap tasks are independent — no shared state. The framework can run as many in parallel as there are machines.
Fault toleranceIf a map task fails, re-run it on another machine using the same input split. The function is deterministic, so the output will be identical.
Data localityThe framework schedules map tasks on the machine that holds the input data. Network transfer is minimized.
ScalabilityAdding machines adds proportional throughput. No code changes required.
The key constraint: no side effects. Map and reduce must be pure functions — they read input, produce output, and touch nothing else. No writing to a database. No reading from an external service. No shared mutable state. If you violate this, the framework cannot safely re-run failed tasks (the re-run would double-write) or run tasks in parallel (they would race).

Python Implementation

python
from collections import defaultdict

# ── Map function: takes a document string, yields (key, value) pairs ──
def mapper(document):
    for word in document.lower().split():
        yield (word, 1)

# ── Reduce function: takes a key and list of values, yields (key, result) ──
def reducer(word, counts):
    return (word, sum(counts))

# ── Simulate MapReduce locally ──
documents = [
    "the cat sat on the mat",
    "the dog sat on the log",
    "the cat and the dog",
]

# MAP PHASE: run mapper on each document (in parallel in real MR)
map_output = []
for doc in documents:
    map_output.extend(mapper(doc))
# map_output = [('the',1), ('cat',1), ('sat',1), ('on',1), ...]

# SHUFFLE PHASE: group by key
shuffled = defaultdict(list)
for key, value in map_output:
    shuffled[key].append(value)
# shuffled = {'the': [1,1,1,1,1,1], 'cat': [1,1], ...}

# REDUCE PHASE: aggregate each group (in parallel in real MR)
results = []
for word, counts in sorted(shuffled.items()):
    results.append(reducer(word, counts))
# results = [('and',1), ('cat',2), ('dog',2), ('log',1),
#            ('mat',1), ('on',2), ('sat',2), ('the',6)]

for word, count in sorted(results, key=lambda x: -x[1]):
    print(f"{word}: {count}")
Design question: You are running a MapReduce word count on 1 TB of text with 100 mappers. One of the mappers crashes after processing 80% of its 10 GB input split. What happens?

Chapter 3: MapReduce in Depth

The word count example shows the API. Now let us look under the hood at how the framework actually executes a MapReduce job across a cluster of machines.

HDFS: Where the Data Lives

MapReduce is tightly coupled with the Hadoop Distributed File System (HDFS). HDFS splits every file into blocks (default 128 MB each) and replicates each block to 3 different machines. A 1 TB file becomes ~8,000 blocks spread across the cluster.

// A 1 TB file in HDFS:
File size: 1 TB = 1,024 GB = 1,048,576 MB
Block size: 128 MB
Number of blocks: 1,048,576 / 128 = 8,192 blocks
Replication factor: 3
Total storage used: 8,192 × 3 = 24,576 block replicas
// Spread across, say, 100 nodes = ~246 block replicas per node

The key property: HDFS knows which machine holds which block. The MapReduce scheduler uses this to achieve data locality — it assigns each map task to a machine that already holds the input block. The data does not cross the network. The code goes to the data, not the other way around.

Anatomy of a MapReduce Job

1. Job Submission
Client submits the job: mapper code, reducer code, input path, output path, and configuration (number of reducers, memory limits). The JobTracker (Hadoop 1) or ResourceManager (YARN/Hadoop 2) takes over.
2. Input Splitting
The framework queries HDFS for the input file's block locations. Each block becomes one input split = one map task. A 1 TB file with 128 MB blocks creates 8,192 map tasks.
3. Map Phase
Each map task reads its input split, applies the mapper function to every record, and writes key-value pairs to a local buffer. When the buffer fills, it is sorted by key, optionally run through a combiner (a local reduce), and written to disk as a sorted spill file. Multiple spills are merged into a single sorted file per map task.
4. Shuffle Phase
Each reducer contacts every mapper to fetch its partition of the sorted output. The keys are partitioned across reducers by hash(key) % num_reducers. Each reducer does a merge sort of all the fetched partitions, producing a single sorted stream grouped by key.
5. Reduce Phase
Each reducer iterates over its sorted key-value groups, calling the reduce function for each key. Output is written to HDFS (one output file per reducer).

The Cluster in Action

MapReduce Cluster Execution

Watch HDFS blocks distributed across 5 nodes, mappers running locally, and data shuffled to reducers. Click "Kill Mapper" to simulate a failure and see the re-run.

Ready

Failure Handling: The Three Cases

What failsWhat happensCost
Map taskThe framework detects the failure (no heartbeat for 10 minutes). It re-runs the map task on another node. The input block is still on HDFS (replicated 3x). All previously fetched shuffle output from this mapper is discarded; reducers re-fetch from the new mapper.Re-process one input split (~128 MB). Other map tasks unaffected.
Reduce taskThe framework re-runs the reduce task. It re-fetches all shuffle data from the mappers (mappers keep their sorted output until the job completes). No map tasks re-run.Re-fetch and re-reduce one partition. Other reducers unaffected.
Entire nodeAll map and reduce tasks on that node are re-scheduled on other nodes. Map outputs stored on the dead node are lost, so any reduce tasks that needed them must re-fetch (those mappers re-run on other replicas).Proportional to how much work was on the dead node.

Speculative Execution: Dealing with Stragglers

In a cluster of 1,000 machines, some will be slow. A failing disk, a noisy neighbor (another process stealing CPU), or a hot CPU throttling can make one task take 10x longer than the rest. The entire job waits for the slowest task.

Speculative execution detects tasks that are running much slower than average and launches a duplicate copy on another machine. Whichever copy finishes first is used; the other is killed. This wastes a small amount of cluster resources but can dramatically reduce job completion time.

// Without speculative execution:
99 map tasks finish in 5 minutes. 1 straggler takes 50 minutes.
Total job time: 50 minutes.

// With speculative execution:
After 10 minutes, the framework notices the straggler.
Launches a speculative copy on a healthy node.
Speculative copy finishes in 5 minutes (at minute 15).
Total job time: 15 minutes.
// Speedup: 3.3x, at the cost of one extra task.
The combiner optimization. If your reduce function is associative and commutative (like sum, max, or min), you can run it as a combiner on the map side before the shuffle. Each mapper locally reduces its output: instead of emitting (the, 1), (the, 1), (the, 1), it emits (the, 3). This reduces the volume of data transferred during shuffle — often by 10x or more. Word count with a combiner sends one (word, local_count) pair per mapper per word, instead of one pair per word occurrence.

Python MapReduce: Complete Implementation

python
import os, sys, hashlib, json, multiprocessing
from collections import defaultdict
from functools import reduce as functools_reduce

class MapReduceFramework:
    """A simplified MapReduce framework running on a single machine
    using multiprocessing to simulate distributed execution."""

    def __init__(self, num_mappers=4, num_reducers=2):
        self.num_mappers = num_mappers
        self.num_reducers = num_reducers

    def _partition(self, key):
        # Deterministic partition: same key always goes to same reducer
        return int(hashlib.md5(key.encode()).hexdigest(), 16) % self.num_reducers

    def _map_worker(self, split, mapper_fn):
        # Group map output by target reducer
        partitions = defaultdict(list)
        for record in split:
            for key, value in mapper_fn(record):
                r = self._partition(key)
                partitions[r].append((key, value))
        # Sort each partition by key (simulates map-side sort)
        for r in partitions:
            partitions[r].sort(key=lambda x: x[0])
        return dict(partitions)

    def run(self, data, mapper_fn, reducer_fn):
        # Split data across mappers
        splits = [[] for _ in range(self.num_mappers)]
        for i, record in enumerate(data):
            splits[i % self.num_mappers].append(record)

        # MAP phase (would be parallel on a real cluster)
        map_outputs = [self._map_worker(s, mapper_fn) for s in splits]

        # SHUFFLE phase: collect each reducer's data from all mappers
        reducer_inputs = defaultdict(list)
        for mo in map_outputs:
            for r, pairs in mo.items():
                reducer_inputs[r].extend(pairs)

        # Sort each reducer's input by key (merge phase)
        for r in reducer_inputs:
            reducer_inputs[r].sort(key=lambda x: x[0])

        # REDUCE phase: group by key, call reducer
        results = []
        for r in sorted(reducer_inputs.keys()):
            groups = defaultdict(list)
            for key, value in reducer_inputs[r]:
                groups[key].append(value)
            for key in sorted(groups.keys()):
                result = reducer_fn(key, groups[key])
                if result is not None:
                    results.append(result)
        return results

# ── Usage ──
def word_mapper(line):
    for word in line.lower().split():
        yield (word, 1)

def word_reducer(word, counts):
    return (word, sum(counts))

mr = MapReduceFramework(num_mappers=3, num_reducers=2)
data = ["the cat sat on the mat",
        "the dog sat on the log",
        "the cat and the dog"]
results = mr.run(data, word_mapper, word_reducer)
for word, count in sorted(results, key=lambda x: -x[1]):
    print(f"{word}: {count}")
# Output: the: 6, cat: 2, dog: 2, on: 2, sat: 2, and: 1, log: 1, mat: 1

Worked Example: The Combiner in Action

The combiner is one of the most important optimizations in MapReduce. Let us trace its effect on a concrete word count example to see exactly how much data it saves during the shuffle.

══════════ WITHOUT COMBINER ══════════
Mapper 0 processes: "the cat the cat the cat the cat"
Map output: (the,1)(cat,1)(the,1)(cat,1)(the,1)(cat,1)(the,1)(cat,1)
8 key-value pairs sent over network during shuffle

══════════ WITH COMBINER (local reduce) ══════════
Mapper 0 processes: "the cat the cat the cat the cat"
Map output (before combiner): (the,1)(cat,1)(the,1)(cat,1)(the,1)(cat,1)(the,1)(cat,1)
Combiner runs sum() per key locally:
After combiner: (cat,4)(the,4)
2 key-value pairs sent over network during shuffle

Reduction in shuffle data: 8 → 2 = 75% less network transfer
For real-world text: typical reduction is 90-95%
When can you use a combiner? Only when the reduce function is associative and commutative. Sum: yes (a+b+c = c+a+b). Max: yes. Min: yes. Average: NO — you cannot average averages. For average, the combiner must emit (key, (sum, count)) and the reducer computes sum/count. Mean is not composable, but (sum, count) pairs are.

The Combiner in Python

python
from collections import defaultdict

def map_with_combiner(document, combiner_fn=None):
    """Simulate a mapper with optional local combiner."""
    # Map phase: emit (word, 1) for each word
    map_output = []
    for word in document.lower().split():
        map_output.append((word, 1))

    if combiner_fn is None:
        return map_output  # No combiner: send all pairs

    # Combiner: group locally and apply combiner function
    local_groups = defaultdict(list)
    for key, value in map_output:
        local_groups[key].append(value)

    combined = []
    for key, values in local_groups.items():
        combined.append((key, combiner_fn(values)))
    return combined

# Without combiner
doc = "the cat sat on the mat the cat sat on the mat"
no_combiner = map_with_combiner(doc)
print(f"Without combiner: {len(no_combiner)} pairs")
# 12 pairs: (the,1)(cat,1)(sat,1)(on,1)(the,1)(mat,1)...

# With combiner (sum)
with_combiner = map_with_combiner(doc, combiner_fn=sum)
print(f"With combiner: {len(with_combiner)} pairs")
# 5 pairs: (the,4)(cat,2)(sat,2)(on,2)(mat,2)
print(f"Shuffle data reduced by {100*(1-len(with_combiner)/len(no_combiner)):.0f}%")
# Shuffle data reduced by 58%
Debug scenario: Your MapReduce job has 1,000 map tasks. 999 finish in 5 minutes, but one is still running after 45 minutes. The TaskTracker logs show the slow mapper is reading from a disk with 50% higher latency than average. What mechanism in the MapReduce framework addresses this, and how does it work?

Chapter 4: Joins in MapReduce

Word count is nice for tutorials. In the real world, the hard problem is joins. You have a table of user click events (billions of rows: user_id, timestamp, url, ...) and a table of user profiles (millions of rows: user_id, name, country, signup_date, ...). You need to produce a combined dataset: each click event enriched with the user's country. This is a join on user_id.

In SQL, you just write SELECT * FROM events JOIN profiles ON events.user_id = profiles.user_id. In MapReduce, you have to implement the join yourself. There are three strategies, each with different tradeoffs.

Strategy 1: Reduce-Side Join (Sort-Merge Join)

The most general approach. Both datasets go through the same MapReduce job. Mappers tag each record with its source (events vs profiles), and the shuffle brings all records with the same user_id to the same reducer.

══════════ MAP PHASE ══════════
// Events mapper:
for event in events_split:
  emit(event.user_id, ("event", event))

// Profiles mapper:
for profile in profiles_split:
  emit(profile.user_id, ("profile", profile))

══════════ SHUFFLE ══════════
// All records with the same user_id go to the same reducer:
user_42 → [("profile", {name:"Alice", country:"US"}),
           ("event", {url:"/home", ts:"10:05"}),
           ("event", {url:"/buy", ts:"10:06"})]

══════════ REDUCE PHASE ══════════
// For each user_id, find the profile, then join with each event:
reduce(user_id, records):
  profile = [r for r in records if r.tag == "profile"][0]
  for r in records:
    if r.tag == "event":
      emit(user_id, merge(r.data, profile.data))
Secondary sort trick. In the reduce phase above, we need the profile to arrive before the events (so we can hold it in memory while iterating over events). MapReduce lets you control the sort order within a key group. Tag profiles with sort key 0 and events with sort key 1, so profiles always come first. This is called secondary sort.

Strategy 2: Map-Side Join (Broadcast Join)

If one dataset is small enough to fit in memory (the profiles table at a few hundred MB), you can skip the shuffle entirely. Load the small table into a hash map in every mapper. For each event, look up the user_id in the hash map.

// MAP setup: load the small table into memory
profiles = load_into_hashmap("hdfs://profiles/") // ~200 MB in RAM

// MAP function: join on the fly
map(event):
  profile = profiles.get(event.user_id)
  if profile:
    emit(event.user_id, merge(event, profile))

// No shuffle needed. No reduce phase. Output goes straight to HDFS.

This is vastly faster because there is no shuffle (no network transfer of the large events table). The tradeoff: the small table must fit in memory on every mapper node.

Strategy 3: Map-Side Merge Join

If both datasets are already sorted by the join key and partitioned the same way (e.g., both are output of a previous MapReduce job with the same number of reducers and same partitioning function), you can do a merge join in the mapper with no shuffle. Each mapper reads the corresponding partition of both datasets and merges them in sorted order. This is the fastest join but requires the most stringent preconditions.

The Skew Problem

A reduce-side join works beautifully until one key has massively more values than the rest. Imagine a social network where user_id = "taylor_swift" has 500 million follower events. All 500 million records go to one reducer, while other reducers handle 10,000 records each. That one reducer runs for hours while the others finish in minutes.

This is data skew, and it is the single most common cause of slow MapReduce jobs in production.

Solutions to skew. (1) Salted keys: Append a random number (0-99) to the hot key. "taylor_swift" becomes "taylor_swift_42", "taylor_swift_17", etc. This spreads the key across 100 reducers. Run a second MapReduce to re-aggregate. (2) Skew join in Hive/Pig: The framework detects hot keys (by sampling) and automatically broadcasts the small table's rows for those keys to multiple reducers. (3) Map-side join: If the small table fits in memory, skew is irrelevant — every mapper can handle any key.
Reduce-Side Join vs Map-Side Join

Watch how a reduce-side join shuffles both datasets to the reducer, vs a map-side join that loads the small table into memory and joins locally. Toggle between the two.

Worked Example: Join Events with Profiles

python
from collections import defaultdict

# ── Sample data ──
events = [
    {"user_id": 1, "url": "/home",  "ts": "10:05"},
    {"user_id": 2, "url": "/buy",   "ts": "10:06"},
    {"user_id": 1, "url": "/about", "ts": "10:07"},
    {"user_id": 3, "url": "/home",  "ts": "10:08"},
    {"user_id": 2, "url": "/cart",  "ts": "10:09"},
]
profiles = [
    {"user_id": 1, "name": "Alice", "country": "US"},
    {"user_id": 2, "name": "Bob",   "country": "UK"},
    {"user_id": 3, "name": "Carol", "country": "JP"},
]

# ── REDUCE-SIDE JOIN ──
# Map phase: tag each record with its source
map_output = []
for e in events:
    map_output.append((e["user_id"], ("event", e)))
for p in profiles:
    map_output.append((p["user_id"], ("profile", p)))

# Shuffle: group by user_id
groups = defaultdict(list)
for uid, record in map_output:
    groups[uid].append(record)

# Reduce: join
for uid in sorted(groups.keys()):
    records = groups[uid]
    profile = None
    user_events = []
    for tag, data in records:
        if tag == "profile":
            profile = data
        else:
            user_events.append(data)
    for ev in user_events:
        print({**ev, "name": profile["name"], "country": profile["country"]})
# {user_id:1, url:/home, ts:10:05, name:Alice, country:US}
# {user_id:1, url:/about, ts:10:07, name:Alice, country:US}
# {user_id:2, url:/buy, ts:10:06, name:Bob, country:UK}
# ... etc

# ── MAP-SIDE JOIN (broadcast) ──
profile_map = {p["user_id"]: p for p in profiles}
for ev in events:
    p = profile_map.get(ev["user_id"])
    if p:
        print({**ev, "name": p["name"], "country": p["country"]})
# Same output, but no shuffle needed!
Design question: You need to join a 5 TB click events table with a 200 MB user profiles table. Both are stored on HDFS. Which join strategy should you use and why?

Chapter 5: Beyond MapReduce

MapReduce launched the big data revolution in 2004. By 2012, its limitations were painfully clear. The industry moved on to dataflow engines — Spark, Flink, and Tez — that kept MapReduce's strengths (fault tolerance, horizontal scaling) while fixing its worst inefficiencies.

MapReduce's Three Pain Points

Pain 1: Materialization between stages. A real-world computation rarely fits into one MapReduce job. Finding the top URLs per country requires: Job 1 (extract URL + country, count), Job 2 (find top per country). Between jobs, the intermediate results are written to HDFS. That means: serialize to disk, replicate 3x across the network, then read back from disk for the next job. For a pipeline with 5 stages, the intermediate data is written and read 4 times unnecessarily.

Pain 2: No support for iteration. Machine learning algorithms (gradient descent, PageRank, k-means) iterate: apply a transformation, check convergence, repeat. In MapReduce, each iteration is a separate job. Each job writes to HDFS and the next reads from HDFS. Training a model for 100 iterations means 100 rounds of disk I/O for data that could just stay in memory.

Pain 3: Rigid Map-then-Reduce structure. Not every computation fits the two-stage pattern. What if you need map → reduce → map → reduce → map? Each arrow is a separate job with full HDFS materialization. MapReduce chains are awkward and slow.

MapReduce Chain vs Dataflow Engine

Left: a 3-stage MapReduce pipeline writes to HDFS between every stage. Right: a Spark pipeline keeps data in memory. Watch the data flow and compare completion times.

Ready

Dataflow Engines: The Big Idea

Instead of the rigid map → shuffle → reduce pipeline, dataflow engines model computation as a directed acyclic graph (DAG) of operators. Any operator can connect to any other. Data flows through the graph, pipelined in memory when possible.

FeatureMapReduceDataflow Engines (Spark, Flink, Tez)
Computation modelFixed two-stage: Map → Shuffle → ReduceArbitrary DAG of operators
Intermediate dataAlways written to HDFS between jobsKept in memory, spilled to disk only if needed
IterationEach iteration = separate job with full I/OData stays in memory across iterations (Spark: cache/persist)
Fault toleranceRe-run from HDFS input (always available)Recompute from lineage (Spark) or checkpoint (Flink)
Startup overheadHigh (JVM launch per job, HDFS writes)Low (long-running executors, in-memory shuffle)
LatencyMinutes to hours (I/O dominated)Seconds to minutes (compute dominated)

Spark's Key Abstraction: RDDs

Apache Spark (Zaharia et al., 2012) introduced the Resilient Distributed Dataset (RDD). An RDD is a read-only, partitioned collection of records with a recorded lineage — the sequence of transformations that produced it from the original input data.

Lineage is the secret to fault tolerance without HDFS writes. If a partition of an RDD is lost (because a node crashes), Spark recomputes it by replaying the lineage from the last materialized ancestor. It only recomputes the lost partition, not the entire dataset.

// Lineage example:
rdd0 = sc.textFile("hdfs://logs/") // read from HDFS
rdd1 = rdd0.flatMap(lambda line: line.split()) // split into words
rdd2 = rdd1.map(lambda w: (w, 1)) // (word, 1) pairs
rdd3 = rdd2.reduceByKey(lambda a, b: a + b) // count per word

// If partition 5 of rdd3 is lost:
// Spark traces lineage: rdd3[5] ← rdd2[5] ← rdd1[5] ← rdd0[5]
// Re-reads partition 5 from HDFS, replays flatMap, map, reduceByKey
// Only partition 5 is recomputed. Other partitions are untouched.
Lazy evaluation. Spark transformations (map, filter, flatMap, groupByKey) are lazy — they build up the lineage DAG but do not execute anything. Only actions (count, collect, saveAsTextFile) trigger execution. This lets Spark's optimizer see the entire computation before executing it, enabling optimizations like pipelining multiple narrow transformations into one pass and choosing efficient join strategies.
Narrow vs wide dependencies. A narrow dependency (map, filter, union) means each partition of the parent RDD feeds into at most one partition of the child. These can be pipelined — no shuffle needed. A wide dependency (groupByKey, join, reduceByKey) means each partition of the child depends on all partitions of the parent. These require a shuffle and create a stage boundary. Spark executes all narrow transformations within a stage as a single pipelined pass, and only materializes data at stage boundaries.

Fault Tolerance: Lineage vs Replication

MapReduce achieves fault tolerance through replication: intermediate data is written to disk (and HDFS replicates the input 3x). If anything fails, the data is still there.

Spark achieves fault tolerance through lineage: intermediate data is NOT written to disk (it stays in memory). If a partition is lost, Spark reconstructs it by replaying the transformations from the last materialized point (usually the original HDFS input). This is cheaper than replication as long as the lineage is short. For very long lineages (100+ transformations), Spark provides checkpointing: manually write an RDD to HDFS to truncate the lineage chain.

// Lineage-based recovery example:
rdd0 = sc.textFile("hdfs://input") # checkpoint: data is on HDFS
rdd1 = rdd0.filter(...) # lineage: filter of rdd0
rdd2 = rdd1.map(...) # lineage: map of rdd1
rdd3 = rdd2.reduceByKey(...) # lineage: reduceByKey of rdd2

// If partition 7 of rdd3 is lost (node crash):
// 1. Spark checks: rdd3[7] depends on rdd2[all] (wide dependency = shuffle)
// 2. But shuffle files are on executor local disks — if the shuffle output
// for partition 7 is still on disk, just re-reduce it.
// 3. If shuffle output is also lost: trace back to rdd0,
// re-read from HDFS, re-filter, re-map, re-shuffle, re-reduce.
// 4. Only the lost partitions are recomputed. Others are untouched.
Interview framing. When asked "how does Spark handle failures?" the answer is: lineage-based recomputation. Spark records the transformation graph (DAG). If a partition is lost, it recomputes it by replaying the lineage from the nearest materialized ancestor (HDFS input or checkpoint). For narrow dependencies, only the lost partition is recomputed. For wide dependencies (post-shuffle), it may need to recompute the entire parent stage if shuffle files are lost. This is why long-running iterative jobs should checkpoint periodically.
Concept check: A Spark job reads 1 TB from HDFS, filters out 90% of records, maps the remaining 100 GB, then does a reduceByKey. How many times is data written to disk (assuming enough memory)?

Chapter 6: Spark Deep Dive

Spark is not just "faster MapReduce." It is a complete rethinking of how distributed data processing should work. Let us dig into the mechanics.

Transformations vs Actions

Transformations are lazy — they define a computation but do not execute it. They return a new RDD (or DataFrame). Actions are eager — they trigger execution of the entire lineage DAG and return a result or write to storage.

Transformations (lazy)Actions (eager)
map(f) — apply f to each elementcount() — return number of elements
filter(f) — keep elements where f is truecollect() — return all elements to driver
flatMap(f) — map then flattentake(n) — return first n elements
groupByKey() — group values by keyreduce(f) — aggregate all elements
reduceByKey(f) — combine values per keysaveAsTextFile(path) — write to HDFS
join(other) — join two pair RDDsforeach(f) — apply f for side effects
distinct() — remove duplicatesfirst() — return the first element

The DAG and Stage Execution

When you call an action, Spark's DAG scheduler examines the lineage graph and breaks it into stages at every wide dependency (shuffle boundary). Within a stage, all narrow transformations are fused into a single task that processes one partition end-to-end without writing intermediate data.

Spark DAG: Stages, Shuffles, and Tasks

A multi-stage Spark job: read → filter → map (Stage 1, narrow) → shuffle → groupByKey → map (Stage 2, narrow) → shuffle → reduceByKey (Stage 3). Click "Run" to animate execution.

Ready

Caching and Persistence

If you use an RDD multiple times (e.g., iterate over it in a loop), Spark recomputes it from scratch each time by default. To avoid this, you can cache (or persist) an RDD in memory. The first action materializes it; subsequent actions read from the cache.

// Without caching: each action recomputes from HDFS
rdd = sc.textFile("hdfs://logs/").filter(...).map(...)
rdd.count() // reads from HDFS, filters, maps, counts
rdd.take(10) // reads from HDFS AGAIN, filters, maps, takes 10

// With caching: computed once, reused from memory
rdd = sc.textFile("hdfs://logs/").filter(...).map(...).cache()
rdd.count() // reads from HDFS, filters, maps, CACHES, counts
rdd.take(10) // reads from CACHE (instant)

Persistence levels control where cached data goes:

LevelWhereWhen to use
MEMORY_ONLYJVM heap as deserialized Java objectsDefault. Fastest access but high memory usage.
MEMORY_AND_DISKMemory first, spill to local diskWhen data does not fully fit in memory.
MEMORY_ONLY_SERMemory as serialized bytesTrades CPU for 2-5x less memory usage.
DISK_ONLYLocal disk onlyRarely useful — recomputation is often faster.

Broadcast Variables and Accumulators

Broadcast variables are Spark's version of the map-side broadcast join. You take a small dataset on the driver, broadcast it to all executors, and each task can look it up locally.

Accumulators are write-only shared variables for distributed counters and sums. Each task can add to an accumulator, but only the driver can read the final value.

The groupByKey vs reduceByKey Trap

This is probably the single most common performance mistake in Spark. Both operations group data by key, but they do it very differently.

groupByKey shuffles ALL values for each key to the same executor, then hands you the complete list. If key "the" appears 10 million times, a single executor receives a list of 10 million values. It must all fit in memory.

reduceByKey applies the reduce function on the map side first (like a combiner), then shuffles the partial results. If key "the" appears 10 million times across 100 partitions, each partition locally reduces its portion (100,000 occurrences each become a single count), and only 100 partial results are shuffled — not 10 million.

══════════ groupByKey (DANGEROUS) ══════════
// 10 million (the, 1) pairs shuffled to one executor
rdd.groupByKey() # → (the, [1, 1, 1, ... 10M items])
.mapValues(sum) # OOM if list too large!

Memory at reducer: O(values_per_key) = O(10M) = ~80 MB for one key
Shuffle data: 10M pairs × ~20 bytes = 200 MB for one key

══════════ reduceByKey (CORRECT) ══════════
// Each partition locally sums its (the, 1) pairs before shuffle
rdd.reduceByKey(lambda a, b: a + b) # combiner runs locally first!
# Only 100 partial sums shuffled (one per partition)

Memory at reducer: O(num_partitions) = O(100) = negligible
Shuffle data: 100 pairs × ~20 bytes = 2 KB for one key
Reduction: 200 MB → 2 KB = 100,000x less shuffle data
Rule of thumb. Never use groupByKey if you are just going to aggregate the values (sum, count, max, min). Use reduceByKey, aggregateByKey, or combineByKey instead. The only legitimate use of groupByKey is when you genuinely need the full list of values per key (e.g., to sort them, or to apply a function that needs all values simultaneously). Even then, consider whether aggregateByKey with a custom accumulator can avoid materializing the full list.

Complete PySpark Example

python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BatchProcessing").getOrCreate()
sc = spark.sparkContext

# ── 1. Word count (the basics) ──
lines = sc.textFile("hdfs://logs/access.log")
word_counts = (
    lines
    .flatMap(lambda line: line.split())  # narrow: flatMap
    .map(lambda word: (word, 1))          # narrow: map
    .reduceByKey(lambda a, b: a + b)     # wide: shuffle + reduce
)
# Stage 1: textFile → flatMap → map (pipelined, no disk)
# Stage 2: reduceByKey (shuffle boundary)

top_10 = word_counts.sortBy(lambda x: -x[1]).take(10)

# ── 2. Join with broadcast (map-side join) ──
profiles = {1: "US", 2: "UK", 3: "JP"}  # small lookup table
bc_profiles = sc.broadcast(profiles)       # send to all executors

events = sc.parallelize([
    (1, "/home"), (2, "/buy"), (1, "/about"),
    (3, "/home"), (2, "/cart"),
])

enriched = events.map(
    lambda x: (x[0], x[1], bc_profiles.value.get(x[0], "?"))
)
# (1, '/home', 'US'), (2, '/buy', 'UK'), ...
# No shuffle! The broadcast variable is on every executor.

# ── 3. Multi-stage aggregation ──
# Top 5 URLs per country
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = spark.createDataFrame(
    enriched.collect(),
    ["user_id", "url", "country"]
)
url_counts = df.groupBy("country", "url").count()
w = Window.partitionBy("country").orderBy(F.desc("count"))
top_urls = url_counts.withColumn("rank", F.row_number().over(w))
top_urls.filter(F.col("rank") <= 5).show()
Debug scenario: Your Spark job reads 500 GB from HDFS, runs a filter (keeps 10%), then does a groupByKey, then maps the result, then does another groupByKey. You notice the job is slow. Looking at the Spark UI, you see Stage 1 processes 500 GB but Stage 2 processes 500 GB again despite the filter. What went wrong?

Chapter 7: Graph Processing — The Showcase

Not all data is tables and key-value pairs. Many of the most important batch processing problems are graphs: the web link graph (PageRank), social networks (friend recommendations), road networks (shortest paths), and knowledge graphs (entity resolution). These require iterative algorithms that MapReduce handles poorly.

PageRank: The Algorithm That Built Google

Larry Page's insight: a web page is important if many important pages link to it. This circular definition becomes a linear algebra problem. Assign each page a rank. On each iteration, each page distributes its rank equally among all pages it links to. After enough iterations, the ranks converge.

// PageRank update rule:
PR(v) = (1 - d) / N + d × ∑u → v PR(u) / out_degree(u)

// Where:
// PR(v) = PageRank of page v
// d = damping factor (typically 0.85)
// N = total number of pages
// u → v means page u links to page v
// out_degree(u) = number of links on page u

// Interpretation: 85% of your rank comes from incoming links.
// 15% is distributed equally (the "random surfer" teleport).

The damping factor d = 0.85 models a random web surfer who follows links 85% of the time and jumps to a random page 15% of the time. Without it, pages with no outgoing links (sink nodes) would absorb all the rank.

The Pregel Model: Thinking Like a Vertex

Google's Pregel framework (2010) introduced a vertex-centric programming model. Instead of thinking about the global graph, you write code from the perspective of a single vertex:

1. Receive messages
Each vertex reads all messages sent to it in the previous superstep.
2. Compute
Update your value based on the incoming messages and your current state.
3. Send messages
Send messages to your neighbors for the next superstep.
4. Vote to halt
If nothing changed, vote to halt. The algorithm terminates when ALL vertices have voted to halt AND no messages are in flight.

This is the Bulk Synchronous Parallel (BSP) model: all vertices execute in lockstep. Superstep k completes before superstep k+1 begins. Messages sent in superstep k are delivered in superstep k+1.

PageRank in Pregel

// Each vertex (page) executes this at every superstep:

def compute(vertex, messages):
  // Step 1: Update my PageRank from incoming messages
  if superstep == 0:
    vertex.value = 1.0 / N // initial rank: uniform
  else:
    vertex.value = 0.15 / N + 0.85 * sum(messages)

  // Step 2: Send my share of rank to all neighbors
  share = vertex.value / len(vertex.out_edges)
  for edge in vertex.out_edges:
    send_message(edge.target, share)

  // Step 3: Vote to halt after 30 iterations (or check convergence)
  if superstep >= 30:
    vote_to_halt()

Interactive PageRank Simulation

This is the showcase. You have a small graph of 8 pages with directed links between them. Click "Step" to run one Pregel superstep. Watch the PageRank values converge. The node with the highest rank grows larger. Messages (rank shares) animate along edges between supersteps.

Interactive PageRank — Pregel Simulation

Click "Step" to run one superstep. Each node sends its rank / out_degree to neighbors. Watch convergence. Drag nodes to rearrange.

Superstep: 0 | Max PR: 0.125
Damping (d) 0.85

Python PageRank Implementation

python
import numpy as np

def pagerank(edges, num_nodes, damping=0.85, iterations=30):
    """Compute PageRank using power iteration.

    Args:
        edges: list of (source, target) pairs
        num_nodes: total number of nodes
        damping: probability of following a link (vs random jump)
        iterations: number of iterations

    Returns:
        ranks: array of PageRank values (sum = 1.0)
    """
    # Build adjacency: out_neighbors[u] = list of nodes u links to
    out_neighbors = [[] for _ in range(num_nodes)]
    for src, dst in edges:
        out_neighbors[src].append(dst)

    # Initialize: uniform rank
    ranks = np.full(num_nodes, 1.0 / num_nodes)

    for iteration in range(iterations):
        new_ranks = np.full(num_nodes, (1.0 - damping) / num_nodes)

        for u in range(num_nodes):
            if len(out_neighbors[u]) > 0:
                share = ranks[u] / len(out_neighbors[u])
                for v in out_neighbors[u]:
                    new_ranks[v] += damping * share
            else:
                # Sink node: distribute rank to ALL nodes (random jump)
                new_ranks += damping * ranks[u] / num_nodes

        ranks = new_ranks
        print(f"Iter {iteration}: max={ranks.max():.4f}, "
              f"node={ranks.argmax()}, sum={ranks.sum():.4f}")

    return ranks

# Example: 5-node web graph
edges = [
    (0, 1), (0, 2),  # page 0 links to 1 and 2
    (1, 2),          # page 1 links to 2
    (2, 0),          # page 2 links to 0
    (3, 2),          # page 3 links to 2
    (4, 0), (4, 2),  # page 4 links to 0 and 2
]
ranks = pagerank(edges, num_nodes=5, damping=0.85, iterations=20)
# Node 2 gets the highest rank: many pages link to it
for i, r in enumerate(ranks):
    print(f"Page {i}: {r:.4f}")

PageRank in Spark

python
# Spark PageRank: iterative computation with caching
links = sc.parallelize([
    (0, [1, 2]), (1, [2]), (2, [0]),
    (3, [2]), (4, [0, 2]),
]).cache()  # CRITICAL: cache the graph structure. Without this,
           # Spark re-reads from HDFS every iteration.

N = 5
ranks = sc.parallelize([(i, 1.0 / N) for i in range(N)])

for _ in range(20):
    # Join ranks with links: (node, (rank, neighbors))
    contribs = links.join(ranks).flatMap(
        lambda x: [(n, x[1][0][1] / len(x[1][0][0])) for n in x[1][0][0]]
    )  # Error: this is getting complex. Use clear variable names:

# Cleaner version with named operations:
for _ in range(20):
    contribs = (
        links.join(ranks)
        # Each element: (node, ([neighbors], rank))
        .flatMap(lambda node_data: [
            (neighbor, node_data[1][1] / len(node_data[1][0]))
            for neighbor in node_data[1][0]
        ])
    )
    ranks = contribs.reduceByKey(lambda a, b: a + b).mapValues(
        lambda r: 0.15 / N + 0.85 * r
    )

results = ranks.collect()
Why Spark beats MapReduce for PageRank. Each iteration requires a join (ranks with links) and a reduceByKey. In MapReduce, each iteration is a separate job: write ranks to HDFS, read links from HDFS, join, reduce, write new ranks to HDFS. 20 iterations = 20 HDFS round-trips. In Spark, the links RDD is cached in memory. Only the ranks are shuffled each iteration. The HDFS I/O drops from O(iterations) to O(1). For a web-scale graph (billions of edges), this is the difference between hours and minutes.
Concept check: In the Pregel model, when does the algorithm terminate?

Chapter 8: Batch Processing in Practice

Theory is nice. Now let us talk about the real-world problems you will face building and operating batch pipelines at scale.

Common Batch Processing Use Cases

Use caseInputOutputTypical tool
ETLRaw event logs (JSON, CSV)Cleaned, partitioned Parquet filesSpark
Search index buildingDocuments (web pages, products)Inverted index (term → [doc_ids])MapReduce / Spark
Recommendation engineUser-item interactionsRecommendation model or precomputed rankingsSpark MLlib
ML feature pipelineRaw events + entity tablesFeature store (precomputed features per entity)Spark / dbt + warehouse
Data warehouse loadOperational DB snapshotsAnalytical tables (star schema)Spark / Hive / dbt
Log analysisApplication logs, web server logsDashboards, anomaly alertsSpark / Presto

Framework Comparison

FrameworkModelLatencyBest forWeakness
Hadoop MapReduceMap → ReduceMinutes-hoursSimple ETL on massive dataSlow (disk I/O between stages), rigid API
Apache SparkDAG of RDDs/DataFramesSeconds-minutesIterative ML, multi-stage ETL, interactive queriesMemory hungry, GC pressure at scale
Apache FlinkDAG of dataflowsMilliseconds (streaming), seconds (batch)Unified stream+batch, event-time processingSmaller ecosystem than Spark
Presto / TrinoSQL engine over distributed dataSeconds-minutesInteractive SQL on data lake (Parquet, ORC)Not designed for heavy ETL writes
dbt + WarehouseSQL transforms inside a warehouse (BigQuery, Snowflake)Seconds-minutesAnalytics engineering, data modelingVendor lock-in, cost at scale

Design Challenge: Daily Clickstream ETL

You are designing an ETL pipeline that processes 100 TB of clickstream data daily. The raw data arrives as compressed JSON in S3, one file per web server per hour (500 servers × 24 hours = 12,000 files). The output is a cleaned, partitioned Parquet dataset in a data lake, partitioned by date and country.

1. Ingestion
Read 12,000 JSON files from S3. Each file is ~8.3 GB uncompressed (100 TB / 12,000). With gzip compression (typical 5:1), each file is ~1.7 GB on disk. Total reads: 100 TB / 5 = 20 TB compressed.
2. Parsing + Validation
Parse JSON. Validate schemas (reject malformed records). Extract timestamp, user_id, URL, country (from IP geolocation). This is a narrow transformation — each record is independent.
3. Deduplication
Remove duplicate events (same user_id + timestamp + URL within 1 second). Requires groupByKey on (user_id, timestamp_bucket) — a shuffle.
4. Enrichment
Join with user profiles (broadcast join — profiles table is ~10 GB). Join with URL metadata (broadcast join — URL table is ~500 MB).
5. Partitioned Write
Write to S3 as Parquet, partitioned by date and country. Repartition by (date, country) before writing — another shuffle.
// Resource estimation:
Data: 100 TB uncompressed, ~20 TB compressed
Cluster: 200 nodes, each with 16 cores, 64 GB RAM, 1 TB local SSD
Total cores: 3,200. Total RAM: 12.8 TB. Total local disk: 200 TB.

// Spark partitions: aim for 128 MB per partition
100 TB / 128 MB = ~800,000 partitions (map tasks)
// Each core handles 800,000 / 3,200 = 250 partitions sequentially
// At 2 seconds per partition = 500 seconds = ~8 minutes for map phase

// Shuffle for dedup: ~100 TB over the network
// With 10 Gbps network per node: 100 TB / (200 nodes * 1.25 GB/s) = 400 seconds
// Total estimated: ~20-30 minutes

Common Failure Modes and Debugging

SymptomLikely causeFix
Spark job OOMs during shuffleSkewed key: one partition has billions of recordsAdd salting to the join key. Use spark.sql.adaptive.enabled=true for automatic skew handling.
Job stuck at 99% map progressStraggler task on a slow nodeEnable speculative execution: spark.speculation=true
Tiny output files (10,000 files of 1 MB each)Too many partitions for the output sizedf.coalesce(100).write.parquet(path) to merge partitions
Read performance is terribleToo many small input files on HDFS/S3Run a compaction job to merge small files into larger ones (target 256 MB-1 GB)
Spark executor killed by YARNMemory exceeds container limit (off-heap, or serialized objects too large)Increase spark.executor.memoryOverhead. Use MEMORY_ONLY_SER persistence.

File Formats: The Hidden Performance Lever

The choice of file format can make a 10x difference in batch job performance. Most teams do not think about this enough.

FormatTypeCompressionColumn pruningPredicate pushdownBest for
CSV/JSONRow-oriented, textgzip (external)No — must read all columnsNo — must read all rowsData exchange, human readability. Never for analytics.
AvroRow-oriented, binarySnappy/Deflate (block-level)NoNoWrite-heavy workloads, Kafka, schema evolution (built-in).
ParquetColumnar, binarySnappy/Zstd (per column)Yes — reads only requested columnsYes — skips row groups using statisticsAnalytics, data lakes, any read-heavy batch job.
ORCColumnar, binaryZlib/Snappy (per stripe)YesYesHive ecosystem, similar to Parquet.
// Concrete example: querying 50 columns, you need 3

CSV (row-oriented):
Read: 100 TB (all 50 columns for every row)
Parse: CPU-intensive (text → binary conversion)
Total I/O: 100 TB

Parquet (columnar):
Read: 100 TB × 3/50 = 6 TB (only the 3 requested columns)
Parse: minimal (already binary)
Predicate pushdown: skip row groups where min/max stats eliminate the row
With pushdown: maybe 2 TB actually read
Total I/O: 2 TB

Speedup: 100 TB / 2 TB = 50x faster just from file format choice
The Parquet/Snappy standard. If you are building a new batch pipeline, default to Parquet with Snappy compression. Parquet gives you columnar reads and predicate pushdown. Snappy gives you fast compression/decompression with modest compression ratios (~2-3x). For archival storage where read speed matters less, switch to Zstd for better compression ratios (~5-7x) at the cost of slower decompression. Never store analytics data as CSV or JSON in production.

Anti-Patterns: What Not to Do

Anti-patternWhy it is badFix
Small files on HDFS/S3Each file incurs metadata overhead. 1 million 1 MB files is 1000x slower to list and open than 1000 1 GB files. HDFS NameNode memory is proportional to file count.Compact small files into 256 MB-1 GB files. In Spark: df.coalesce(N).write.parquet(path).
Unnecessary shufflesShuffles are the most expensive operation: data serialized, written to disk, transferred over network, deserialized. Two shuffles where one suffices doubles your job time.Combine operations that share a key. Use reduceByKey not groupByKey. Avoid repartition unless output partitioning is wrong.
Not using data localityReading data from a remote node over the network is 10-100x slower than reading from local disk.In MapReduce, the scheduler automatically prefers local nodes. In Spark on YARN, set spark.locality.wait to allow the scheduler time to find local slots.
Collecting to driverdf.collect() pulls ALL data to the driver's memory. For large datasets, the driver OOMs and the job crashes.Use .take(N) for sampling. Use .write to save to HDFS/S3. Never collect() datasets larger than ~1 GB.
Cartesian joinsIf both tables have N rows, the output has N² rows. A 1M × 1M join produces 1 trillion rows.Always filter before joining. Verify join keys have matching types. Add a LIMIT during development.
ETL Pipeline Estimator

Adjust the data size and cluster size to estimate pipeline runtime. See how MapReduce vs Spark compare.

Data (TB) 100
Nodes 200
Debug scenario: Your daily Spark ETL job normally completes in 25 minutes. Today it has been running for 3 hours. The Spark UI shows 799,999 of 800,000 map tasks complete, and one task has been running for 2.5 hours. What is the most likely cause and how do you fix it both immediately and long-term?

Chapter 9: Interview Arsenal

This chapter distills everything into interview-ready formats: cheat sheets, system design skeletons, and coding drills.

Five-Dimensional Cheat Sheet

DimensionKey facts to have ready
CONCEPTMapReduce: map (stateless, parallel), shuffle (sort+partition by key), reduce (aggregate per key). Dataflow: DAG of operators, lazy evaluation, lineage-based fault tolerance. Narrow vs wide dependencies. BSP model for graph algorithms.
DESIGNJoin strategies (broadcast vs reduce-side vs merge). Skew handling (salted keys). Partitioning (hash vs range). Caching strategy (cache reused RDDs). Output file sizing (coalesce/repartition). Speculative execution for stragglers.
CODEMapReduce word count. Spark word count, join, aggregation. PageRank in Spark. Python defaultdict-based local MapReduce simulation.
DEBUGOOM during shuffle = skewed key. Stuck at 99% = straggler. Tiny files = too many partitions. Slow reads = too many small input files. Recomputation = forgot to cache.
FRONTIERLakehouse (Delta Lake, Iceberg): ACID on data lakes. Adaptive Query Execution (AQE) in Spark 3. Photon (Databricks C++ vectorized engine). DuckDB for single-node analytics that outperforms Spark on medium data.

System Design Templates

Design: Search Index Builder

Requirements: Build an inverted index from 10 billion web pages (50 TB). Update daily.

Architecture:
1. Ingestion: Web crawler stores pages as records in S3/HDFS (one file per crawl batch).
2. Parsing (Map): For each page, extract words. Emit (word, {doc_id, position, tf}) for each word.
3. Inversion (Reduce): Group by word. For each word, sort documents by relevance (tf-idf). Write posting list.
4. Index building (Reduce output): Each reducer writes a shard of the index. N reducers = N index shards.
5. Serving: Load shards onto search servers. Each query fans out to all shards, results are merged.

Key decisions: Use hash partitioning on word (not range) to avoid skew. The word "the" is in every document — use a combiner to pre-aggregate TF per mapper. Index format: compressed posting lists (variable-byte encoding).
Design: Recommendation Pipeline

Requirements: Generate "users who bought X also bought Y" recommendations for 100M products. Update daily.

Architecture:
1. Input: Purchase history (user_id, product_id, timestamp). ~1 billion purchases.
2. Co-occurrence (MapReduce/Spark): For each user, find all pairs of products they bought. Emit (product_A, product_B, 1). ReduceByKey to get co-occurrence counts.
3. Scoring: For each product, rank co-occurring products by count / total_purchases (normalize).
4. Output: A lookup table: product_id → [top 50 recommended product_ids with scores].
5. Serving: Load into a key-value store (Redis, DynamoDB). Serve on product pages.

Skew handling: Popular products (iPhone, etc.) appear in millions of purchase histories. Their co-occurrence pairs explode combinatorially. Solution: cap the number of purchases per user (100 most recent), and use salted keys for hot products.
Design: Data Lake Query Engine

Requirements: SQL queries over 500 TB of Parquet files in S3. P95 query latency under 30 seconds.

Architecture:
1. Catalog: Hive Metastore or AWS Glue Catalog stores table schemas and partition locations.
2. Query engine: Trino (formerly Presto) or Spark SQL. Distributed SQL engine that reads Parquet directly from S3.
3. Partitioning: Tables partitioned by date + region. Queries with date filters skip irrelevant partitions (partition pruning).
4. File format: Parquet with Snappy compression. Column pruning: a query selecting 3 of 50 columns reads only those 3 columns from disk.
5. Caching: Alluxio or local SSD caching layer between S3 and compute. Hot partitions served from cache.

Performance tuning: File sizing (256 MB-1 GB per file). Predicate pushdown (Parquet row group statistics). Column statistics for cost-based optimization.

Coding Drills

python
# DRILL 1: Implement MapReduce word count from scratch
# (See Chapter 2 for the full implementation)

# DRILL 2: Implement a basic inverted index
from collections import defaultdict

def build_inverted_index(documents):
    """Build an inverted index from a dict of {doc_id: text}.

    Returns: {word: [(doc_id, term_frequency), ...]}
    """
    index = defaultdict(list)
    for doc_id, text in documents.items():
        words = text.lower().split()
        tf = defaultdict(int)
        for w in words:
            tf[w] += 1
        for word, count in tf.items():
            index[word].append((doc_id, count))
    # Sort posting lists by term frequency (descending)
    for word in index:
        index[word].sort(key=lambda x: -x[1])
    return dict(index)

docs = {
    "d1": "the cat sat on the mat",
    "d2": "the dog sat on the log",
    "d3": "the cat and the dog",
}
idx = build_inverted_index(docs)
print(idx["the"])   # [('d1', 2), ('d2', 2), ('d3', 2)]
print(idx["cat"])   # [('d1', 1), ('d3', 1)]
print(idx["dog"])   # [('d2', 1), ('d3', 1)]

# DRILL 3: Implement PageRank
# (See Chapter 7 for the full implementation)

The "Design a Batch Pipeline" Interview Framework

When you get a system design question involving batch processing, follow this structure:

1. Clarify requirements
Input size (GB/TB/PB)? Latency requirement (minutes/hours/daily)? Freshness requirement? Output format? How many consumers?
2. Choose framework
Data < 2 TB? DuckDB. Data > 2 TB? Spark. Need streaming? Flink. SQL-first team? dbt + warehouse.
3. Data model
Input schema. Output schema. Intermediate representations. File format (Parquet for analytics). Partitioning strategy (by date? by key?).
4. Pipeline stages
Draw the DAG: ingest → validate → transform → join/enrich → aggregate → write. Identify shuffle boundaries (joins, groupBys).
5. Handle the hard parts
Skew (salted keys, broadcast joins). Late data (watermarks, reprocessing). Schema evolution (Avro/Parquet handles this). Failures (idempotent writes, checkpoints).

Quick-Fire Interview Q&A

QuestionKey points in your answer
What is the difference between MapReduce and Spark?MapReduce writes intermediate data to HDFS between stages; Spark keeps it in memory. Spark supports arbitrary DAGs, not just map-reduce pairs. Spark has lineage-based fault tolerance instead of materializing everything.
When would you use MapReduce over Spark?When memory is scarce and data is too large to cache. MapReduce's disk-based approach is more predictable under memory pressure. Also: existing Hadoop pipelines where migration cost exceeds benefit.
How does Spark handle failures?Lineage: recompute lost partitions by replaying transformations from the last materialized ancestor. Only the lost partitions are recomputed. For very long lineages, use checkpointing (write to HDFS) to truncate the lineage.
What is data skew and how do you handle it?One key has vastly more values than others, overloading one reducer. Solutions: salted keys (split hot key across N reducers), broadcast join (if one table is small), AQE in Spark 3 (automatic skew handling).
Explain narrow vs wide dependencies.Narrow: each parent partition maps to at most one child partition (map, filter). Can be pipelined. Wide: each child depends on all parents (groupByKey, join). Requires shuffle. Stage boundaries are at wide dependencies.

Recommended Reading

PaperYearKey contribution
Dean & Ghemawat, "MapReduce: Simplified Data Processing on Large Clusters"2004The original MapReduce paper. Read sections 3 (execution) and 4 (refinements).
Zaharia et al., "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"2012The Spark paper. Introduced RDDs and lineage-based fault tolerance.
Malewicz et al., "Pregel: A System for Large-Scale Graph Processing"2010Vertex-centric BSP model for graph algorithms.
Zaharia et al., "Spark SQL: Relational Data Processing in Spark"2015DataFrame API and Catalyst optimizer.
System design: You need to build a batch pipeline that computes daily user engagement scores from 50 TB of event data. The score requires joining events with user profiles (5 GB), aggregating by user, and running a scoring function on each user's aggregated data. You have a 100-node Spark cluster. Walk through the key design decisions.

Chapter 10: Connections

Batch processing does not exist in isolation. It connects to every other part of the data infrastructure.

Where Batch Fits in the Data Landscape

Related topicRelationship to batch processingDDIA Chapter
Stream ProcessingThe natural evolution: process data as it arrives instead of in daily batches. Lambda architecture runs both in parallel. Kappa architecture replaces batch with stream entirely.Ch 12
Storage EnginesBatch pipelines read from and write to storage systems. Understanding LSM-trees, B-trees, column stores (Parquet) is essential for optimizing I/O.Ch 3-4
Partitioning / ShardingMapReduce's shuffle IS a partitioning step: hash(key) % num_reducers. Same ideas apply to database sharding.Ch 6
ReplicationHDFS replicates blocks 3x for fault tolerance. Same motivation as database replication, different mechanism.Ch 5
Encoding & EvolutionBatch pipelines must handle schema evolution: old data files with missing columns, changed types, added fields.Ch 4

The Evolution of Batch Processing

EraSystemKey innovationLimitation that drove the next era
1970sUnix pipesComposable tools, stdin/stdout interfaceSingle machine only
2004MapReduceDistributed, fault-tolerant batch on commodity hardwareDisk I/O between stages, rigid API
2012SparkIn-memory DAG execution, lazy evaluation, lineageJVM overhead, GC at scale
2015FlinkTrue streaming with batch as special caseEcosystem maturity
2020+Lakehouse (Delta, Iceberg)ACID transactions on data lakes, time travel, schema evolutionStill evolving
2022+DuckDB, PolarsSingle-node performance that rivals clusters for medium data (1 TB)Does not scale past one machine
The pendulum swings back. After a decade of "distribute everything," the industry discovered that modern single machines (128 cores, 2 TB RAM, 30 GB/s NVMe) can handle surprisingly large datasets without the complexity of a cluster. DuckDB processes 1 TB in seconds on a single machine. The new rule of thumb: if your data fits on one machine (up to ~2 TB), use DuckDB or Polars. If it exceeds that, use Spark. If you need real-time, use Flink.

The Lambda and Kappa Architectures

Two competing approaches to combining batch and stream processing:

Lambda Architecture

Run both a batch layer (complete, accurate, slow) and a speed layer (approximate, real-time). Merge results at query time. The batch layer periodically recomputes the complete truth; the speed layer handles recent data not yet processed by batch.

Pro: Batch results are always correct (immutable input, deterministic computation).

Con: You maintain two systems (Spark + Kafka/Flink) with two codebases computing the same thing in different languages.

Kappa Architecture

Use only a stream processing system. Keep a log of all events (Kafka). If you need to reprocess, replay the log through a new version of your stream processor. No separate batch system.

Pro: One system, one codebase. Simpler operations.

Con: Reprocessing the full log can be slow. Stream processors historically less mature for complex analytics than batch.

The Big Picture

Batch Processing Landscape

The evolution from Unix pipes to modern lakehouse architectures. Each era solved the previous era's biggest limitation.

What we covered. This lesson traced a direct line from Unix pipes (1970s) through MapReduce (2004), Spark (2012), and modern dataflow engines (2020s). The core ideas are remarkably stable: partition the data, process partitions in parallel, sort/shuffle to regroup, aggregate per group, handle failures by re-running pure functions. Every framework implements these same ideas with different tradeoffs on latency, memory usage, and API flexibility. Master the ideas, and any specific framework becomes just API documentation.

When to Use What: A Decision Framework

ScenarioRecommended approachReasoning
Data < 100 GB, ad-hoc analysisDuckDB on your laptopNo cluster overhead. Columnar engine on NVMe handles this in seconds.
Data 100 GB - 2 TB, regular pipelineDuckDB or Polars on a large VMA single c6i.16xlarge (64 cores, 128 GB RAM, NVMe) costs $2.72/hr and handles most analytics.
Data 2 TB - 100 TB, complex pipeline with joinsSpark on a managed cluster (Databricks, EMR)Distributed processing necessary. Use DataFrames for optimizer benefits.
Data > 100 TB, daily batchSpark with careful tuning (AQE, broadcast joins, file sizing)At this scale, every optimization matters. Consider Photon for CPU-bound workloads.
Real-time needed (sub-second latency)Flink or Kafka StreamsBatch is too slow. Stream processing is the right paradigm.
SQL-first analytics team, moderate datadbt + Snowflake/BigQuery/RedshiftLeverage the warehouse optimizer. Simpler than managing Spark clusters.

"The art of programming is the art of organizing complexity." — Edsger W. Dijkstra

Final question: A colleague proposes replacing your daily Spark ETL pipeline (processes 50 TB) with DuckDB running on a single machine. Under what conditions would this be a reasonable idea?