MinuteSort with Flat Datacenter Storage

Microsoft Research recently crushed the world record for MinuteSort, sorting 1.4TB in a minute. This replaces the former record held by Yahoo’s 1406 node Hadoop cluster in the Daytona MinuteSort category, and means that Hadoop no longer holds any world sorting record titles.

I found MSR’s approach of “MinuteSort with Flat Datacenter Storage” (FDS) to be intriguing. Most of the prior sort winners (e.g. Hadoop, TritonSort) try to colocate computation and data, since you normally pay a throughput (and thus latency) cost to go over the network. FDS separates out compute from storage, heavily provisioning a full bisection bandwidth network to match the I/O rate of the hard disks on storage nodes.

I’m going to give a rundown of the paper, and then pull out salient points for Hadoop at the end.


FDS the storage system is pretty simple. It’s a straight-up blob store, where blobs are arbitrarily large, but are read and written in 8MB chunks called tracts. Blobs are identified with a 128-bit GUID, and tracts are 8MB offsets into the blob. Choosing 8MB means that the disks get essentially full sequential write bandwidth, since seek times are amortized over these long reads. It provides asynchronous APIs for read and write operations; pretty necessary for doing high-performance parallel I/O.

Data is placed on storage nodes (aka tractservers) through a simple hashing scheme. There’s no Namenode like in Hadoop. Instead, clients cache a tract locator table (TLT) which lists all of the tractservers in immutable order. Lookups for a tract in a blob are done by hashing the blob GUID, adding the tract number, and then moduloing to index into the TLT. The paper doesn’t talk about replication for availability and durability, but it’d be easy to tack it on the way MongoDB does it, which is making identical clones of each tractserver and then doing failover with Paxos or some such.

There’s some more complexity in the hashing scheme to handle load balancing, which is reminiscent of Chord’s virtual node approach. Client access through the TLT is normally going to be sequential, since clients will mostly be sequentially scanning through the tracts of a blob. Remember that to find the TLT index, the blob ID is hashed, but the tract number is just added. This means you can get convoy effects where clients auto-synchronize as they wait on a slow tractserver, and from then on move in lockstep through the list. This is countered by taking 20 copies of the list of tractservers, shuffling each copy, and then concatenating them together. This super-list prevents auto-synchronizing lockstep, and also does a good job of balancing data distribution across tractservers.

An interesting note is that storage nodes are heterogeneous, with the authors saying they “bought, begged, and borrowed” a mix of machines. They again borrow from Chord’s virtual nodes by adjusting the number of times a node appears in the TLT proportionally to its I/O capability. This assumes that I/O rate and amount of storage on a node are also proportional, since there just this one mechanism of tweaking the # of appearances in the TLT to distribute both I/O and data. This assumption doesn’t hold up since hard disk sizes are increasing faster than I/O rates, but maybe they didn’t have heterogeneous enough hardware to care. In any case, it’s not addressed in the paper.


Each node has dual 10GigE Ethernet, connected by a full-bisection bandwidth Clos network. Since their flows are all short and bursty (random 8MB accesses), TCP’s slow start is a bad fit. Instead, they use a windowed RTS/CTS scheme which limits the number of flows at each receiver, and thus limits congestion in the Clos. Control messages are sent through separate TCP connections to improve latency; this is because TCP’s per-flow fairness allows them to skip buffering.

They also try to zero-copy everything right from disk to network to application buffers. This is basically a requirement if you’re trying to beat a sorting record.


There are two phases: a read and partition phase, and a write phase. During the read phase, compute nodes read tracts of unsorted data from storage nodes, sort the data into buffers based on partition, and then stream these buffers to the node responsible for the partition. After the data is partitioned, each node does a write phase where it sorts its partition and writes it out to disk. Since they’re doing MinuteSort, the dataset fits entirely in memory and only has to be read and written once.

Separated storage makes handling stragglers very easy. Since the initial read phase is lovingly stateless, any node can read and partition any tract of data. This means a dynamic work queue can be used for load balancing, which is an approach I really like. Whenever a node finishes a work item, it polls a central coordinator for more work to do. As long as the size of each work item isn’t too big, stragglers no longer have a major impact on overall job completion time.

To determine the right size for work items, the authors use a clever dynamic “Zeno allocation” scheme. Basically, work items decrease in size as the job nears completion. This makes sense because stragglers hurt you at the end of the job, so it’s worth the overhead of using smaller work items for that last wave of processing.

Computation and communication are overlapped during the read phase. As a node receives buffers of partitioned data, it saves up until it has a 250MB chunk and then quicksorts it in the background. Thus during the write phase, each node has to do maybe one more 250MB quicksort, then just merges all of its buffers together as they’re written out to disk. This hierarchy of sorts is pretty much the same approach I used in my external sorting implementation. It also lets them do external sorting of datasets bigger than memory.

Hadoop comparison

There are a number of reasons why this is significantly faster than Hadoop MapReduce.

  • In FDS, data is only read and written to disk once. Hadoop’s intermediate data is sorted and spilled to disk by the mapper, while FDS keeps this entirely in memory.
  • FDS also streams and sorts data during the read phase, while Hadoop has a barrier between its map and reduce phases. Hadoop can do some pre-aggregation in the mapper with a combiner, but not as flexibly as in FDS, not on the reduce side, and not without hogging an entire task slot.
  • FDS has better task scheduling, with a dynamic work queue and dynamic work item sizes.
  • FDS uses a single process for both the read and write phases, so the JVM startup cost is gone, and there’s no unnecessary movement of data between address spaces.

That said, FDS gets away with some of this because of its problem domain. The MapReduce paradigm is optimized for simplicity and scale-out rather than raw performance. Serializing intermediate data and having non-overlapping map and reduce phases makes fault-tolerance easier. If a reducer dies, a new reducer can just read all the intermediate data off disk. If a mapper dies, there’s no icky problems with a reducer having already processed some of the mapper’s output. Hadoop is also optimized for overall cluster throughput rather than completion time of this single MinuteSort job, which is part of why it uses much larger splits (64MB or 128MB) and doesn’t have dynamic task sizing (hurts overall throughput).


What I get out of this is that MSR built a highly-optimized blob store, connected it to a 20Gbit Clos network, and ran an in-memory sort. While it does crush Hadoop, I don’t think there’s much here over TritonSort, since their 1.47TB/min for the Indy MinuteSort wasn’t that much better than TritonSort’s 1.35TB/min in 2011, and TritonSort was built with academic dollars rather than MSR dollars. I like the idea of separated storage, but building out a full Clos network is expensive. I liked their tricks of hashing to determine tract locations, but they really should present a full fault-tolerance and durability story. Network scheduling in the general case is also more complicated than in FDS sort, and this is part of why Clos aren’t so useful in practice: they’re nice to program for, but are hard to keep fully utilized.

I think the main areas where Hadoop could see improvement are increased use of zero-copy I/O and a better story for intermediate data and use of memory in the cluster. Since zero-copy has improved greatly in recent releases of Hadoop, the big remaining issue is memory caching. I need to toot the Berkeley horn here and mention Spark and PACMan, which both address this problem. We really need the equivalent of HDFS for memory, since it can have huge performance benefits when there’s iterative computation or hot data. The difficulty here is settling on the right abstractions and mechanisms.

Leave a Reply