MapReduce, Spark, dataflow engines — processing massive datasets efficiently.
You are on-call at a web company. Product wants the top 100 most-visited URLs from yesterday's access logs. Sounds easy, right? The catch: yesterday generated 10 TB of logs spread across 500 web servers. On a single machine reading from a local SSD at 500 MB/s, just reading the data takes 10 TB / 500 MB/s = 20,000 seconds, or about 5.5 hours. And that is before you do any sorting or counting.
You have 100 machines in your cluster. If you could somehow split the work evenly across all of them, the job should take about 3 minutes of reading time. But how do you split it? How do you handle one machine crashing mid-job? How do you combine 100 partial results into one final answer?
This is the fundamental question of batch processing: how do you process a massive dataset efficiently by distributing the work across many machines, while tolerating failures and producing correct results?
Before we reach for a distributed cluster, let us see how far a single machine can go. The Unix command line already has tools for this exact problem:
bash # Count top URLs from an access log cat access.log \ # read the file | awk '{print $7}' \ # extract the URL (7th field) | sort \ # sort alphabetically | uniq -c \ # count consecutive identical lines | sort -rn \ # sort by count, descending | head -100 \ # take top 100
This pipeline handles surprisingly large files. The sort command uses an external merge-sort algorithm: when data exceeds available memory, it writes sorted chunks to temporary files on disk, then merges them. It can sort files larger than RAM. On a modern machine with an NVMe SSD, this pipeline processes about 1 GB per minute. For a 1 GB log file, you are done in a minute. For 10 TB, you need about 7 days.
Seven days is too slow. You need to parallelize. But the shape of the solution is already visible in that Unix pipeline: read, extract, sort, count, sort, pick the top. MapReduce just distributes each of those steps across hundreds of machines.
The canvas below shows the fundamental tradeoff. A single machine grinds through the full dataset sequentially. Ten machines each process one-tenth of the data in parallel, then combine results. Watch the time difference.
Click "Run" to animate. The single machine processes all 10 chunks sequentially. The cluster processes them in parallel.
ReadyThe single machine takes 10 units of time. The cluster takes 1 unit of processing plus a small overhead for distribution and merging. That overhead — the coordination cost — is what the rest of this chapter is about.
SELECT url, COUNT(*) FROM logs GROUP BY url ORDER BY COUNT(*) DESC LIMIT 100. What is the fundamental problem with this approach?Before MapReduce, before Hadoop, before Spark — there was the Unix command line. The original batch processing system. And it embodies design principles that every modern data processing framework copied, whether they admit it or not.
Doug McIlroy, the inventor of Unix pipes, described the philosophy in 1978:
sort sorts. uniq deduplicates. wc counts. No program tries to do everything.The uniform interface is the crucial innovation. Every program reads from stdin (standard input) and writes to stdout (standard output). The pipe (|) connects one program's stdout to another's stdin. No serialization format negotiation, no protocol handshakes, no schema definitions — just bytes flowing through a pipeline.
A standard Apache access log looks like this:
text 216.58.214.206 - - [17/May/2026:10:05:03 +0000] "GET /api/users HTTP/1.1" 200 3428 192.168.1.42 - - [17/May/2026:10:05:04 +0000] "GET /index.html HTTP/1.1" 200 12054 10.0.0.15 - - [17/May/2026:10:05:04 +0000] "POST /api/login HTTP/1.1" 302 0 216.58.214.206 - - [17/May/2026:10:05:05 +0000] "GET /api/users HTTP/1.1" 200 3428 192.168.1.42 - - [17/May/2026:10:05:06 +0000] "GET /about.html HTTP/1.1" 200 8192
To find the top 5 most-visited URLs:
bash cat access.log \ # Read all lines | awk '{print $7}' \ # Extract field 7 (the URL path) | sort \ # Sort alphabetically: groups identical URLs together | uniq -c \ # Count consecutive identical lines | sort -rn \ # Sort numerically (-n), reversed (-r) = highest first | head -5 \ # Take the first 5 lines
Let us trace the data through every stage:
Click each stage button to see what the data looks like at that point in the pipeline.
The sort command does not load the entire file into memory. It uses external merge sort:
This algorithm reads and writes the data exactly twice: once to create sorted chunks, once to merge them. The total I/O is 2N, and it works for arbitrarily large files. The same idea appears in MapReduce's shuffle phase.
The Unix pipeline is elegant but limited in three fundamental ways:
| Limitation | Why it matters | How MapReduce solves it |
|---|---|---|
| Single machine | All data flows through one CPU, one disk. Throughput is bounded by a single machine's I/O bandwidth (~500 MB/s SSD, ~12 GB/s NVMe). | Splits data across hundreds of machines, each reading its own local partition. |
| No parallelism | The pipe is strictly sequential: each stage must finish before the next can complete. (sort requires all input before producing output.) | Map tasks run in parallel across all partitions. Reduce tasks run in parallel across all keys. |
| No fault tolerance | If the machine crashes at 90%, you restart from zero. For a 5-hour job, that is a disaster. | Intermediate results are written to disk. If a task fails, only that task is re-run. |
Let us trace through an external merge sort with a concrete example to make the algorithm crystal clear. Say we have 12 numbers to sort but only enough memory for 4 at a time.
This exact algorithm runs inside sort when you pipe a 100 GB file through it. Instead of 4 numbers, it sorts 4 GB chunks. Instead of 3 files, it might merge 25. But the logic is identical.
awk '{print $7}' — it extracts and transforms. The shuffle phase is like sort — it groups by key. The reduce phase is like uniq -c | sort -rn — it aggregates. MapReduce just does each step on many machines simultaneously.sort command uses external merge sort to handle files larger than RAM. How much disk I/O does this algorithm perform relative to the input size N?In 2004, Jeff Dean and Sanjay Ghemawat at Google published the paper that changed data processing forever. Their insight was simple: most batch computations can be expressed as two functions — map and reduce — and if you constrain yourself to this API, the framework can handle all the hard parts (parallelism, fault tolerance, data distribution) automatically.
Map takes one input record and emits zero or more key-value pairs. It runs independently on each input record with no shared state between records. This is what makes it trivially parallelizable — if you have 1000 input records and 10 machines, each machine can map 100 records with zero coordination.
Reduce takes a key and all the values associated with that key, and combines them into a single output. Every value with the same key is guaranteed to go to the same reducer. This is the aggregation step.
Between map and reduce sits the shuffle: the framework sorts and groups all map output by key, sending each key's values to the appropriate reducer. This is the expensive part — it requires network transfer across the cluster.
Count how many times each word appears in a set of documents. This is the "Hello World" of MapReduce.
Let us trace this through a concrete example with three input documents (three map tasks):
Watch three mappers process three input splits in parallel, then the shuffle groups by key, and reducers aggregate. Click "Step" to advance.
The map/reduce constraint seems limiting — you can only express computations as stateless transformations (map) followed by keyed aggregations (reduce). But this constraint is exactly what enables the framework to:
| Property | How map/reduce enables it |
|---|---|
| Parallelism | Map tasks are independent — no shared state. The framework can run as many in parallel as there are machines. |
| Fault tolerance | If a map task fails, re-run it on another machine using the same input split. The function is deterministic, so the output will be identical. |
| Data locality | The framework schedules map tasks on the machine that holds the input data. Network transfer is minimized. |
| Scalability | Adding machines adds proportional throughput. No code changes required. |
python from collections import defaultdict # ── Map function: takes a document string, yields (key, value) pairs ── def mapper(document): for word in document.lower().split(): yield (word, 1) # ── Reduce function: takes a key and list of values, yields (key, result) ── def reducer(word, counts): return (word, sum(counts)) # ── Simulate MapReduce locally ── documents = [ "the cat sat on the mat", "the dog sat on the log", "the cat and the dog", ] # MAP PHASE: run mapper on each document (in parallel in real MR) map_output = [] for doc in documents: map_output.extend(mapper(doc)) # map_output = [('the',1), ('cat',1), ('sat',1), ('on',1), ...] # SHUFFLE PHASE: group by key shuffled = defaultdict(list) for key, value in map_output: shuffled[key].append(value) # shuffled = {'the': [1,1,1,1,1,1], 'cat': [1,1], ...} # REDUCE PHASE: aggregate each group (in parallel in real MR) results = [] for word, counts in sorted(shuffled.items()): results.append(reducer(word, counts)) # results = [('and',1), ('cat',2), ('dog',2), ('log',1), # ('mat',1), ('on',2), ('sat',2), ('the',6)] for word, count in sorted(results, key=lambda x: -x[1]): print(f"{word}: {count}")
The word count example shows the API. Now let us look under the hood at how the framework actually executes a MapReduce job across a cluster of machines.
MapReduce is tightly coupled with the Hadoop Distributed File System (HDFS). HDFS splits every file into blocks (default 128 MB each) and replicates each block to 3 different machines. A 1 TB file becomes ~8,000 blocks spread across the cluster.
The key property: HDFS knows which machine holds which block. The MapReduce scheduler uses this to achieve data locality — it assigns each map task to a machine that already holds the input block. The data does not cross the network. The code goes to the data, not the other way around.
hash(key) % num_reducers. Each reducer does a merge sort of all the fetched partitions, producing a single sorted stream grouped by key.Watch HDFS blocks distributed across 5 nodes, mappers running locally, and data shuffled to reducers. Click "Kill Mapper" to simulate a failure and see the re-run.
| What fails | What happens | Cost |
|---|---|---|
| Map task | The framework detects the failure (no heartbeat for 10 minutes). It re-runs the map task on another node. The input block is still on HDFS (replicated 3x). All previously fetched shuffle output from this mapper is discarded; reducers re-fetch from the new mapper. | Re-process one input split (~128 MB). Other map tasks unaffected. |
| Reduce task | The framework re-runs the reduce task. It re-fetches all shuffle data from the mappers (mappers keep their sorted output until the job completes). No map tasks re-run. | Re-fetch and re-reduce one partition. Other reducers unaffected. |
| Entire node | All map and reduce tasks on that node are re-scheduled on other nodes. Map outputs stored on the dead node are lost, so any reduce tasks that needed them must re-fetch (those mappers re-run on other replicas). | Proportional to how much work was on the dead node. |
In a cluster of 1,000 machines, some will be slow. A failing disk, a noisy neighbor (another process stealing CPU), or a hot CPU throttling can make one task take 10x longer than the rest. The entire job waits for the slowest task.
Speculative execution detects tasks that are running much slower than average and launches a duplicate copy on another machine. Whichever copy finishes first is used; the other is killed. This wastes a small amount of cluster resources but can dramatically reduce job completion time.
python import os, sys, hashlib, json, multiprocessing from collections import defaultdict from functools import reduce as functools_reduce class MapReduceFramework: """A simplified MapReduce framework running on a single machine using multiprocessing to simulate distributed execution.""" def __init__(self, num_mappers=4, num_reducers=2): self.num_mappers = num_mappers self.num_reducers = num_reducers def _partition(self, key): # Deterministic partition: same key always goes to same reducer return int(hashlib.md5(key.encode()).hexdigest(), 16) % self.num_reducers def _map_worker(self, split, mapper_fn): # Group map output by target reducer partitions = defaultdict(list) for record in split: for key, value in mapper_fn(record): r = self._partition(key) partitions[r].append((key, value)) # Sort each partition by key (simulates map-side sort) for r in partitions: partitions[r].sort(key=lambda x: x[0]) return dict(partitions) def run(self, data, mapper_fn, reducer_fn): # Split data across mappers splits = [[] for _ in range(self.num_mappers)] for i, record in enumerate(data): splits[i % self.num_mappers].append(record) # MAP phase (would be parallel on a real cluster) map_outputs = [self._map_worker(s, mapper_fn) for s in splits] # SHUFFLE phase: collect each reducer's data from all mappers reducer_inputs = defaultdict(list) for mo in map_outputs: for r, pairs in mo.items(): reducer_inputs[r].extend(pairs) # Sort each reducer's input by key (merge phase) for r in reducer_inputs: reducer_inputs[r].sort(key=lambda x: x[0]) # REDUCE phase: group by key, call reducer results = [] for r in sorted(reducer_inputs.keys()): groups = defaultdict(list) for key, value in reducer_inputs[r]: groups[key].append(value) for key in sorted(groups.keys()): result = reducer_fn(key, groups[key]) if result is not None: results.append(result) return results # ── Usage ── def word_mapper(line): for word in line.lower().split(): yield (word, 1) def word_reducer(word, counts): return (word, sum(counts)) mr = MapReduceFramework(num_mappers=3, num_reducers=2) data = ["the cat sat on the mat", "the dog sat on the log", "the cat and the dog"] results = mr.run(data, word_mapper, word_reducer) for word, count in sorted(results, key=lambda x: -x[1]): print(f"{word}: {count}") # Output: the: 6, cat: 2, dog: 2, on: 2, sat: 2, and: 1, log: 1, mat: 1
The combiner is one of the most important optimizations in MapReduce. Let us trace its effect on a concrete word count example to see exactly how much data it saves during the shuffle.
python from collections import defaultdict def map_with_combiner(document, combiner_fn=None): """Simulate a mapper with optional local combiner.""" # Map phase: emit (word, 1) for each word map_output = [] for word in document.lower().split(): map_output.append((word, 1)) if combiner_fn is None: return map_output # No combiner: send all pairs # Combiner: group locally and apply combiner function local_groups = defaultdict(list) for key, value in map_output: local_groups[key].append(value) combined = [] for key, values in local_groups.items(): combined.append((key, combiner_fn(values))) return combined # Without combiner doc = "the cat sat on the mat the cat sat on the mat" no_combiner = map_with_combiner(doc) print(f"Without combiner: {len(no_combiner)} pairs") # 12 pairs: (the,1)(cat,1)(sat,1)(on,1)(the,1)(mat,1)... # With combiner (sum) with_combiner = map_with_combiner(doc, combiner_fn=sum) print(f"With combiner: {len(with_combiner)} pairs") # 5 pairs: (the,4)(cat,2)(sat,2)(on,2)(mat,2) print(f"Shuffle data reduced by {100*(1-len(with_combiner)/len(no_combiner)):.0f}%") # Shuffle data reduced by 58%
Word count is nice for tutorials. In the real world, the hard problem is joins. You have a table of user click events (billions of rows: user_id, timestamp, url, ...) and a table of user profiles (millions of rows: user_id, name, country, signup_date, ...). You need to produce a combined dataset: each click event enriched with the user's country. This is a join on user_id.
In SQL, you just write SELECT * FROM events JOIN profiles ON events.user_id = profiles.user_id. In MapReduce, you have to implement the join yourself. There are three strategies, each with different tradeoffs.
The most general approach. Both datasets go through the same MapReduce job. Mappers tag each record with its source (events vs profiles), and the shuffle brings all records with the same user_id to the same reducer.
If one dataset is small enough to fit in memory (the profiles table at a few hundred MB), you can skip the shuffle entirely. Load the small table into a hash map in every mapper. For each event, look up the user_id in the hash map.
This is vastly faster because there is no shuffle (no network transfer of the large events table). The tradeoff: the small table must fit in memory on every mapper node.
If both datasets are already sorted by the join key and partitioned the same way (e.g., both are output of a previous MapReduce job with the same number of reducers and same partitioning function), you can do a merge join in the mapper with no shuffle. Each mapper reads the corresponding partition of both datasets and merges them in sorted order. This is the fastest join but requires the most stringent preconditions.
A reduce-side join works beautifully until one key has massively more values than the rest. Imagine a social network where user_id = "taylor_swift" has 500 million follower events. All 500 million records go to one reducer, while other reducers handle 10,000 records each. That one reducer runs for hours while the others finish in minutes.
This is data skew, and it is the single most common cause of slow MapReduce jobs in production.
Watch how a reduce-side join shuffles both datasets to the reducer, vs a map-side join that loads the small table into memory and joins locally. Toggle between the two.
python from collections import defaultdict # ── Sample data ── events = [ {"user_id": 1, "url": "/home", "ts": "10:05"}, {"user_id": 2, "url": "/buy", "ts": "10:06"}, {"user_id": 1, "url": "/about", "ts": "10:07"}, {"user_id": 3, "url": "/home", "ts": "10:08"}, {"user_id": 2, "url": "/cart", "ts": "10:09"}, ] profiles = [ {"user_id": 1, "name": "Alice", "country": "US"}, {"user_id": 2, "name": "Bob", "country": "UK"}, {"user_id": 3, "name": "Carol", "country": "JP"}, ] # ── REDUCE-SIDE JOIN ── # Map phase: tag each record with its source map_output = [] for e in events: map_output.append((e["user_id"], ("event", e))) for p in profiles: map_output.append((p["user_id"], ("profile", p))) # Shuffle: group by user_id groups = defaultdict(list) for uid, record in map_output: groups[uid].append(record) # Reduce: join for uid in sorted(groups.keys()): records = groups[uid] profile = None user_events = [] for tag, data in records: if tag == "profile": profile = data else: user_events.append(data) for ev in user_events: print({**ev, "name": profile["name"], "country": profile["country"]}) # {user_id:1, url:/home, ts:10:05, name:Alice, country:US} # {user_id:1, url:/about, ts:10:07, name:Alice, country:US} # {user_id:2, url:/buy, ts:10:06, name:Bob, country:UK} # ... etc # ── MAP-SIDE JOIN (broadcast) ── profile_map = {p["user_id"]: p for p in profiles} for ev in events: p = profile_map.get(ev["user_id"]) if p: print({**ev, "name": p["name"], "country": p["country"]}) # Same output, but no shuffle needed!
MapReduce launched the big data revolution in 2004. By 2012, its limitations were painfully clear. The industry moved on to dataflow engines — Spark, Flink, and Tez — that kept MapReduce's strengths (fault tolerance, horizontal scaling) while fixing its worst inefficiencies.
Pain 1: Materialization between stages. A real-world computation rarely fits into one MapReduce job. Finding the top URLs per country requires: Job 1 (extract URL + country, count), Job 2 (find top per country). Between jobs, the intermediate results are written to HDFS. That means: serialize to disk, replicate 3x across the network, then read back from disk for the next job. For a pipeline with 5 stages, the intermediate data is written and read 4 times unnecessarily.
Pain 2: No support for iteration. Machine learning algorithms (gradient descent, PageRank, k-means) iterate: apply a transformation, check convergence, repeat. In MapReduce, each iteration is a separate job. Each job writes to HDFS and the next reads from HDFS. Training a model for 100 iterations means 100 rounds of disk I/O for data that could just stay in memory.
Pain 3: Rigid Map-then-Reduce structure. Not every computation fits the two-stage pattern. What if you need map → reduce → map → reduce → map? Each arrow is a separate job with full HDFS materialization. MapReduce chains are awkward and slow.
Left: a 3-stage MapReduce pipeline writes to HDFS between every stage. Right: a Spark pipeline keeps data in memory. Watch the data flow and compare completion times.
Instead of the rigid map → shuffle → reduce pipeline, dataflow engines model computation as a directed acyclic graph (DAG) of operators. Any operator can connect to any other. Data flows through the graph, pipelined in memory when possible.
| Feature | MapReduce | Dataflow Engines (Spark, Flink, Tez) |
|---|---|---|
| Computation model | Fixed two-stage: Map → Shuffle → Reduce | Arbitrary DAG of operators |
| Intermediate data | Always written to HDFS between jobs | Kept in memory, spilled to disk only if needed |
| Iteration | Each iteration = separate job with full I/O | Data stays in memory across iterations (Spark: cache/persist) |
| Fault tolerance | Re-run from HDFS input (always available) | Recompute from lineage (Spark) or checkpoint (Flink) |
| Startup overhead | High (JVM launch per job, HDFS writes) | Low (long-running executors, in-memory shuffle) |
| Latency | Minutes to hours (I/O dominated) | Seconds to minutes (compute dominated) |
Apache Spark (Zaharia et al., 2012) introduced the Resilient Distributed Dataset (RDD). An RDD is a read-only, partitioned collection of records with a recorded lineage — the sequence of transformations that produced it from the original input data.
Lineage is the secret to fault tolerance without HDFS writes. If a partition of an RDD is lost (because a node crashes), Spark recomputes it by replaying the lineage from the last materialized ancestor. It only recomputes the lost partition, not the entire dataset.
MapReduce achieves fault tolerance through replication: intermediate data is written to disk (and HDFS replicates the input 3x). If anything fails, the data is still there.
Spark achieves fault tolerance through lineage: intermediate data is NOT written to disk (it stays in memory). If a partition is lost, Spark reconstructs it by replaying the transformations from the last materialized point (usually the original HDFS input). This is cheaper than replication as long as the lineage is short. For very long lineages (100+ transformations), Spark provides checkpointing: manually write an RDD to HDFS to truncate the lineage chain.
Spark is not just "faster MapReduce." It is a complete rethinking of how distributed data processing should work. Let us dig into the mechanics.
Transformations are lazy — they define a computation but do not execute it. They return a new RDD (or DataFrame). Actions are eager — they trigger execution of the entire lineage DAG and return a result or write to storage.
| Transformations (lazy) | Actions (eager) |
|---|---|
map(f) — apply f to each element | count() — return number of elements |
filter(f) — keep elements where f is true | collect() — return all elements to driver |
flatMap(f) — map then flatten | take(n) — return first n elements |
groupByKey() — group values by key | reduce(f) — aggregate all elements |
reduceByKey(f) — combine values per key | saveAsTextFile(path) — write to HDFS |
join(other) — join two pair RDDs | foreach(f) — apply f for side effects |
distinct() — remove duplicates | first() — return the first element |
When you call an action, Spark's DAG scheduler examines the lineage graph and breaks it into stages at every wide dependency (shuffle boundary). Within a stage, all narrow transformations are fused into a single task that processes one partition end-to-end without writing intermediate data.
A multi-stage Spark job: read → filter → map (Stage 1, narrow) → shuffle → groupByKey → map (Stage 2, narrow) → shuffle → reduceByKey (Stage 3). Click "Run" to animate execution.
If you use an RDD multiple times (e.g., iterate over it in a loop), Spark recomputes it from scratch each time by default. To avoid this, you can cache (or persist) an RDD in memory. The first action materializes it; subsequent actions read from the cache.
Persistence levels control where cached data goes:
| Level | Where | When to use |
|---|---|---|
MEMORY_ONLY | JVM heap as deserialized Java objects | Default. Fastest access but high memory usage. |
MEMORY_AND_DISK | Memory first, spill to local disk | When data does not fully fit in memory. |
MEMORY_ONLY_SER | Memory as serialized bytes | Trades CPU for 2-5x less memory usage. |
DISK_ONLY | Local disk only | Rarely useful — recomputation is often faster. |
Broadcast variables are Spark's version of the map-side broadcast join. You take a small dataset on the driver, broadcast it to all executors, and each task can look it up locally.
Accumulators are write-only shared variables for distributed counters and sums. Each task can add to an accumulator, but only the driver can read the final value.
This is probably the single most common performance mistake in Spark. Both operations group data by key, but they do it very differently.
groupByKey shuffles ALL values for each key to the same executor, then hands you the complete list. If key "the" appears 10 million times, a single executor receives a list of 10 million values. It must all fit in memory.
reduceByKey applies the reduce function on the map side first (like a combiner), then shuffles the partial results. If key "the" appears 10 million times across 100 partitions, each partition locally reduces its portion (100,000 occurrences each become a single count), and only 100 partial results are shuffled — not 10 million.
groupByKey if you are just going to aggregate the values (sum, count, max, min). Use reduceByKey, aggregateByKey, or combineByKey instead. The only legitimate use of groupByKey is when you genuinely need the full list of values per key (e.g., to sort them, or to apply a function that needs all values simultaneously). Even then, consider whether aggregateByKey with a custom accumulator can avoid materializing the full list.python from pyspark.sql import SparkSession spark = SparkSession.builder.appName("BatchProcessing").getOrCreate() sc = spark.sparkContext # ── 1. Word count (the basics) ── lines = sc.textFile("hdfs://logs/access.log") word_counts = ( lines .flatMap(lambda line: line.split()) # narrow: flatMap .map(lambda word: (word, 1)) # narrow: map .reduceByKey(lambda a, b: a + b) # wide: shuffle + reduce ) # Stage 1: textFile → flatMap → map (pipelined, no disk) # Stage 2: reduceByKey (shuffle boundary) top_10 = word_counts.sortBy(lambda x: -x[1]).take(10) # ── 2. Join with broadcast (map-side join) ── profiles = {1: "US", 2: "UK", 3: "JP"} # small lookup table bc_profiles = sc.broadcast(profiles) # send to all executors events = sc.parallelize([ (1, "/home"), (2, "/buy"), (1, "/about"), (3, "/home"), (2, "/cart"), ]) enriched = events.map( lambda x: (x[0], x[1], bc_profiles.value.get(x[0], "?")) ) # (1, '/home', 'US'), (2, '/buy', 'UK'), ... # No shuffle! The broadcast variable is on every executor. # ── 3. Multi-stage aggregation ── # Top 5 URLs per country from pyspark.sql import functions as F from pyspark.sql.window import Window df = spark.createDataFrame( enriched.collect(), ["user_id", "url", "country"] ) url_counts = df.groupBy("country", "url").count() w = Window.partitionBy("country").orderBy(F.desc("count")) top_urls = url_counts.withColumn("rank", F.row_number().over(w)) top_urls.filter(F.col("rank") <= 5).show()
Not all data is tables and key-value pairs. Many of the most important batch processing problems are graphs: the web link graph (PageRank), social networks (friend recommendations), road networks (shortest paths), and knowledge graphs (entity resolution). These require iterative algorithms that MapReduce handles poorly.
Larry Page's insight: a web page is important if many important pages link to it. This circular definition becomes a linear algebra problem. Assign each page a rank. On each iteration, each page distributes its rank equally among all pages it links to. After enough iterations, the ranks converge.
The damping factor d = 0.85 models a random web surfer who follows links 85% of the time and jumps to a random page 15% of the time. Without it, pages with no outgoing links (sink nodes) would absorb all the rank.
Google's Pregel framework (2010) introduced a vertex-centric programming model. Instead of thinking about the global graph, you write code from the perspective of a single vertex:
This is the Bulk Synchronous Parallel (BSP) model: all vertices execute in lockstep. Superstep k completes before superstep k+1 begins. Messages sent in superstep k are delivered in superstep k+1.
This is the showcase. You have a small graph of 8 pages with directed links between them. Click "Step" to run one Pregel superstep. Watch the PageRank values converge. The node with the highest rank grows larger. Messages (rank shares) animate along edges between supersteps.
Click "Step" to run one superstep. Each node sends its rank / out_degree to neighbors. Watch convergence. Drag nodes to rearrange.
python import numpy as np def pagerank(edges, num_nodes, damping=0.85, iterations=30): """Compute PageRank using power iteration. Args: edges: list of (source, target) pairs num_nodes: total number of nodes damping: probability of following a link (vs random jump) iterations: number of iterations Returns: ranks: array of PageRank values (sum = 1.0) """ # Build adjacency: out_neighbors[u] = list of nodes u links to out_neighbors = [[] for _ in range(num_nodes)] for src, dst in edges: out_neighbors[src].append(dst) # Initialize: uniform rank ranks = np.full(num_nodes, 1.0 / num_nodes) for iteration in range(iterations): new_ranks = np.full(num_nodes, (1.0 - damping) / num_nodes) for u in range(num_nodes): if len(out_neighbors[u]) > 0: share = ranks[u] / len(out_neighbors[u]) for v in out_neighbors[u]: new_ranks[v] += damping * share else: # Sink node: distribute rank to ALL nodes (random jump) new_ranks += damping * ranks[u] / num_nodes ranks = new_ranks print(f"Iter {iteration}: max={ranks.max():.4f}, " f"node={ranks.argmax()}, sum={ranks.sum():.4f}") return ranks # Example: 5-node web graph edges = [ (0, 1), (0, 2), # page 0 links to 1 and 2 (1, 2), # page 1 links to 2 (2, 0), # page 2 links to 0 (3, 2), # page 3 links to 2 (4, 0), (4, 2), # page 4 links to 0 and 2 ] ranks = pagerank(edges, num_nodes=5, damping=0.85, iterations=20) # Node 2 gets the highest rank: many pages link to it for i, r in enumerate(ranks): print(f"Page {i}: {r:.4f}")
python # Spark PageRank: iterative computation with caching links = sc.parallelize([ (0, [1, 2]), (1, [2]), (2, [0]), (3, [2]), (4, [0, 2]), ]).cache() # CRITICAL: cache the graph structure. Without this, # Spark re-reads from HDFS every iteration. N = 5 ranks = sc.parallelize([(i, 1.0 / N) for i in range(N)]) for _ in range(20): # Join ranks with links: (node, (rank, neighbors)) contribs = links.join(ranks).flatMap( lambda x: [(n, x[1][0][1] / len(x[1][0][0])) for n in x[1][0][0]] ) # Error: this is getting complex. Use clear variable names: # Cleaner version with named operations: for _ in range(20): contribs = ( links.join(ranks) # Each element: (node, ([neighbors], rank)) .flatMap(lambda node_data: [ (neighbor, node_data[1][1] / len(node_data[1][0])) for neighbor in node_data[1][0] ]) ) ranks = contribs.reduceByKey(lambda a, b: a + b).mapValues( lambda r: 0.15 / N + 0.85 * r ) results = ranks.collect()
Theory is nice. Now let us talk about the real-world problems you will face building and operating batch pipelines at scale.
| Use case | Input | Output | Typical tool |
|---|---|---|---|
| ETL | Raw event logs (JSON, CSV) | Cleaned, partitioned Parquet files | Spark |
| Search index building | Documents (web pages, products) | Inverted index (term → [doc_ids]) | MapReduce / Spark |
| Recommendation engine | User-item interactions | Recommendation model or precomputed rankings | Spark MLlib |
| ML feature pipeline | Raw events + entity tables | Feature store (precomputed features per entity) | Spark / dbt + warehouse |
| Data warehouse load | Operational DB snapshots | Analytical tables (star schema) | Spark / Hive / dbt |
| Log analysis | Application logs, web server logs | Dashboards, anomaly alerts | Spark / Presto |
| Framework | Model | Latency | Best for | Weakness |
|---|---|---|---|---|
| Hadoop MapReduce | Map → Reduce | Minutes-hours | Simple ETL on massive data | Slow (disk I/O between stages), rigid API |
| Apache Spark | DAG of RDDs/DataFrames | Seconds-minutes | Iterative ML, multi-stage ETL, interactive queries | Memory hungry, GC pressure at scale |
| Apache Flink | DAG of dataflows | Milliseconds (streaming), seconds (batch) | Unified stream+batch, event-time processing | Smaller ecosystem than Spark |
| Presto / Trino | SQL engine over distributed data | Seconds-minutes | Interactive SQL on data lake (Parquet, ORC) | Not designed for heavy ETL writes |
| dbt + Warehouse | SQL transforms inside a warehouse (BigQuery, Snowflake) | Seconds-minutes | Analytics engineering, data modeling | Vendor lock-in, cost at scale |
You are designing an ETL pipeline that processes 100 TB of clickstream data daily. The raw data arrives as compressed JSON in S3, one file per web server per hour (500 servers × 24 hours = 12,000 files). The output is a cleaned, partitioned Parquet dataset in a data lake, partitioned by date and country.
| Symptom | Likely cause | Fix |
|---|---|---|
| Spark job OOMs during shuffle | Skewed key: one partition has billions of records | Add salting to the join key. Use spark.sql.adaptive.enabled=true for automatic skew handling. |
| Job stuck at 99% map progress | Straggler task on a slow node | Enable speculative execution: spark.speculation=true |
| Tiny output files (10,000 files of 1 MB each) | Too many partitions for the output size | df.coalesce(100).write.parquet(path) to merge partitions |
| Read performance is terrible | Too many small input files on HDFS/S3 | Run a compaction job to merge small files into larger ones (target 256 MB-1 GB) |
| Spark executor killed by YARN | Memory exceeds container limit (off-heap, or serialized objects too large) | Increase spark.executor.memoryOverhead. Use MEMORY_ONLY_SER persistence. |
The choice of file format can make a 10x difference in batch job performance. Most teams do not think about this enough.
| Format | Type | Compression | Column pruning | Predicate pushdown | Best for |
|---|---|---|---|---|---|
| CSV/JSON | Row-oriented, text | gzip (external) | No — must read all columns | No — must read all rows | Data exchange, human readability. Never for analytics. |
| Avro | Row-oriented, binary | Snappy/Deflate (block-level) | No | No | Write-heavy workloads, Kafka, schema evolution (built-in). |
| Parquet | Columnar, binary | Snappy/Zstd (per column) | Yes — reads only requested columns | Yes — skips row groups using statistics | Analytics, data lakes, any read-heavy batch job. |
| ORC | Columnar, binary | Zlib/Snappy (per stripe) | Yes | Yes | Hive ecosystem, similar to Parquet. |
| Anti-pattern | Why it is bad | Fix |
|---|---|---|
| Small files on HDFS/S3 | Each file incurs metadata overhead. 1 million 1 MB files is 1000x slower to list and open than 1000 1 GB files. HDFS NameNode memory is proportional to file count. | Compact small files into 256 MB-1 GB files. In Spark: df.coalesce(N).write.parquet(path). |
| Unnecessary shuffles | Shuffles are the most expensive operation: data serialized, written to disk, transferred over network, deserialized. Two shuffles where one suffices doubles your job time. | Combine operations that share a key. Use reduceByKey not groupByKey. Avoid repartition unless output partitioning is wrong. |
| Not using data locality | Reading data from a remote node over the network is 10-100x slower than reading from local disk. | In MapReduce, the scheduler automatically prefers local nodes. In Spark on YARN, set spark.locality.wait to allow the scheduler time to find local slots. |
| Collecting to driver | df.collect() pulls ALL data to the driver's memory. For large datasets, the driver OOMs and the job crashes. | Use .take(N) for sampling. Use .write to save to HDFS/S3. Never collect() datasets larger than ~1 GB. |
| Cartesian joins | If both tables have N rows, the output has N² rows. A 1M × 1M join produces 1 trillion rows. | Always filter before joining. Verify join keys have matching types. Add a LIMIT during development. |
Adjust the data size and cluster size to estimate pipeline runtime. See how MapReduce vs Spark compare.
This chapter distills everything into interview-ready formats: cheat sheets, system design skeletons, and coding drills.
| Dimension | Key facts to have ready |
|---|---|
| CONCEPT | MapReduce: map (stateless, parallel), shuffle (sort+partition by key), reduce (aggregate per key). Dataflow: DAG of operators, lazy evaluation, lineage-based fault tolerance. Narrow vs wide dependencies. BSP model for graph algorithms. |
| DESIGN | Join strategies (broadcast vs reduce-side vs merge). Skew handling (salted keys). Partitioning (hash vs range). Caching strategy (cache reused RDDs). Output file sizing (coalesce/repartition). Speculative execution for stragglers. |
| CODE | MapReduce word count. Spark word count, join, aggregation. PageRank in Spark. Python defaultdict-based local MapReduce simulation. |
| DEBUG | OOM during shuffle = skewed key. Stuck at 99% = straggler. Tiny files = too many partitions. Slow reads = too many small input files. Recomputation = forgot to cache. |
| FRONTIER | Lakehouse (Delta Lake, Iceberg): ACID on data lakes. Adaptive Query Execution (AQE) in Spark 3. Photon (Databricks C++ vectorized engine). DuckDB for single-node analytics that outperforms Spark on medium data. |
python # DRILL 1: Implement MapReduce word count from scratch # (See Chapter 2 for the full implementation) # DRILL 2: Implement a basic inverted index from collections import defaultdict def build_inverted_index(documents): """Build an inverted index from a dict of {doc_id: text}. Returns: {word: [(doc_id, term_frequency), ...]} """ index = defaultdict(list) for doc_id, text in documents.items(): words = text.lower().split() tf = defaultdict(int) for w in words: tf[w] += 1 for word, count in tf.items(): index[word].append((doc_id, count)) # Sort posting lists by term frequency (descending) for word in index: index[word].sort(key=lambda x: -x[1]) return dict(index) docs = { "d1": "the cat sat on the mat", "d2": "the dog sat on the log", "d3": "the cat and the dog", } idx = build_inverted_index(docs) print(idx["the"]) # [('d1', 2), ('d2', 2), ('d3', 2)] print(idx["cat"]) # [('d1', 1), ('d3', 1)] print(idx["dog"]) # [('d2', 1), ('d3', 1)] # DRILL 3: Implement PageRank # (See Chapter 7 for the full implementation)
When you get a system design question involving batch processing, follow this structure:
| Question | Key points in your answer |
|---|---|
| What is the difference between MapReduce and Spark? | MapReduce writes intermediate data to HDFS between stages; Spark keeps it in memory. Spark supports arbitrary DAGs, not just map-reduce pairs. Spark has lineage-based fault tolerance instead of materializing everything. |
| When would you use MapReduce over Spark? | When memory is scarce and data is too large to cache. MapReduce's disk-based approach is more predictable under memory pressure. Also: existing Hadoop pipelines where migration cost exceeds benefit. |
| How does Spark handle failures? | Lineage: recompute lost partitions by replaying transformations from the last materialized ancestor. Only the lost partitions are recomputed. For very long lineages, use checkpointing (write to HDFS) to truncate the lineage. |
| What is data skew and how do you handle it? | One key has vastly more values than others, overloading one reducer. Solutions: salted keys (split hot key across N reducers), broadcast join (if one table is small), AQE in Spark 3 (automatic skew handling). |
| Explain narrow vs wide dependencies. | Narrow: each parent partition maps to at most one child partition (map, filter). Can be pipelined. Wide: each child depends on all parents (groupByKey, join). Requires shuffle. Stage boundaries are at wide dependencies. |
| Paper | Year | Key contribution |
|---|---|---|
| Dean & Ghemawat, "MapReduce: Simplified Data Processing on Large Clusters" | 2004 | The original MapReduce paper. Read sections 3 (execution) and 4 (refinements). |
| Zaharia et al., "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing" | 2012 | The Spark paper. Introduced RDDs and lineage-based fault tolerance. |
| Malewicz et al., "Pregel: A System for Large-Scale Graph Processing" | 2010 | Vertex-centric BSP model for graph algorithms. |
| Zaharia et al., "Spark SQL: Relational Data Processing in Spark" | 2015 | DataFrame API and Catalyst optimizer. |
Batch processing does not exist in isolation. It connects to every other part of the data infrastructure.
| Related topic | Relationship to batch processing | DDIA Chapter |
|---|---|---|
| Stream Processing | The natural evolution: process data as it arrives instead of in daily batches. Lambda architecture runs both in parallel. Kappa architecture replaces batch with stream entirely. | Ch 12 |
| Storage Engines | Batch pipelines read from and write to storage systems. Understanding LSM-trees, B-trees, column stores (Parquet) is essential for optimizing I/O. | Ch 3-4 |
| Partitioning / Sharding | MapReduce's shuffle IS a partitioning step: hash(key) % num_reducers. Same ideas apply to database sharding. | Ch 6 |
| Replication | HDFS replicates blocks 3x for fault tolerance. Same motivation as database replication, different mechanism. | Ch 5 |
| Encoding & Evolution | Batch pipelines must handle schema evolution: old data files with missing columns, changed types, added fields. | Ch 4 |
| Era | System | Key innovation | Limitation that drove the next era |
|---|---|---|---|
| 1970s | Unix pipes | Composable tools, stdin/stdout interface | Single machine only |
| 2004 | MapReduce | Distributed, fault-tolerant batch on commodity hardware | Disk I/O between stages, rigid API |
| 2012 | Spark | In-memory DAG execution, lazy evaluation, lineage | JVM overhead, GC at scale |
| 2015 | Flink | True streaming with batch as special case | Ecosystem maturity |
| 2020+ | Lakehouse (Delta, Iceberg) | ACID transactions on data lakes, time travel, schema evolution | Still evolving |
| 2022+ | DuckDB, Polars | Single-node performance that rivals clusters for medium data (1 TB) | Does not scale past one machine |
Two competing approaches to combining batch and stream processing:
Run both a batch layer (complete, accurate, slow) and a speed layer (approximate, real-time). Merge results at query time. The batch layer periodically recomputes the complete truth; the speed layer handles recent data not yet processed by batch.
Pro: Batch results are always correct (immutable input, deterministic computation).
Con: You maintain two systems (Spark + Kafka/Flink) with two codebases computing the same thing in different languages.
Use only a stream processing system. Keep a log of all events (Kafka). If you need to reprocess, replay the log through a new version of your stream processor. No separate batch system.
Pro: One system, one codebase. Simpler operations.
Con: Reprocessing the full log can be slow. Stream processors historically less mature for complex analytics than batch.
The evolution from Unix pipes to modern lakehouse architectures. Each era solved the previous era's biggest limitation.
| Scenario | Recommended approach | Reasoning |
|---|---|---|
| Data < 100 GB, ad-hoc analysis | DuckDB on your laptop | No cluster overhead. Columnar engine on NVMe handles this in seconds. |
| Data 100 GB - 2 TB, regular pipeline | DuckDB or Polars on a large VM | A single c6i.16xlarge (64 cores, 128 GB RAM, NVMe) costs $2.72/hr and handles most analytics. |
| Data 2 TB - 100 TB, complex pipeline with joins | Spark on a managed cluster (Databricks, EMR) | Distributed processing necessary. Use DataFrames for optimizer benefits. |
| Data > 100 TB, daily batch | Spark with careful tuning (AQE, broadcast joins, file sizing) | At this scale, every optimization matters. Consider Photon for CPU-bound workloads. |
| Real-time needed (sub-second latency) | Flink or Kafka Streams | Batch is too slow. Stream processing is the right paradigm. |
| SQL-first analytics team, moderate data | dbt + Snowflake/BigQuery/Redshift | Leverage the warehouse optimizer. Simpler than managing Spark clusters. |
"The art of programming is the art of organizing complexity." — Edsger W. Dijkstra