Parallelizing NINJA

Here's where I'm placing my thoughts/suggestions for parallelizing NINJA.  Subject to modification:

I think of the parallelization as going in three phases:

  1. Computing the distance matrix
  2. Building bin-pair heaps
  3. Iterating through node-merges

(1) Computing the distance matrix

  • I suggest using the fastdist method of Elias and Lagergren, simply because of speed. If the developers prefer some other method, it's fine with me.
  • The input is an alignment of n sequences; the output is effectively an n x n distance matrix.
  • This matrix should be conceptually broken up into regions, and those regions distributed among the compute nodes.
    • for example, each row could be assigned to a compute node (possibly more than one row per node). Alternately, the matrix could be divided into small square regions. Doesn't matter much, so I'll write the rest as though they were broken into rows.
  • Each compute node calculates the pairwise distances for the region that its been assigned
    • the values are kept locally, either in memory or on disk if necessary
    • each compute node returns the row sums to the head node.
      • note: only the row sum values are sent to the head node - not all the distance values.
      • in the simple case where the matrix is divided into rows, the head node now has the t_i values from the paper; if the matrix is divided in some other way, the head node must do some simple computation to get those values.

(2) Building bin-pair heaps

The head node is responsible for figuring out the bin ranges described in the paper. These ranges depend on the t-values gathered from the compute nodes. 

The default number of bins in the current implementation of NINJA is 30. This results in 465 unique bin-pairs.  The number of bins generated by the parallelized version might be more, to take better advantage of the cluster.

Remember that each bin-pair has a corresponding heap. The expected amount of work per heap is O(n lg n) to create the heap, then O(lg n) for each entry pulled off the heap. Supposing a constant number of entries are expected to come off a heap in each iteration (there may be better guesses here, but lets run with it), and noting that the heap will be alive for Cn iterations before the data structure is rebuilt (where C corresponds to the rebuild-frequency ratio), that means each heap will endure O(n lg n) actions in its life.   That said, it seems reasonable to distribute the heaps to compute nodes in the following way:

  • let H_x be the set of heaps that are assigned to the x'th compute node;
  • let h_x,m be the mth heap in set H_x
  • let n_x,m be the size of h_x,m
  • then distribute heaps to the compute nodes so as to as much as possible make  sum(n_x,m  lg(n_x,m)) = sum(n_y,m  lg(n_y,m))   for all pairs of compute nodes x and y.

A simple greedy heuristic for this is to sort the heaps by size, then assign them to compute nodes in order from largest to smallest.  For C compute nodes, assign the first C heaps. Then after than, assign the largest unassigned heap to the compute node x with the smallest sum  sum(n_x,m  lg(n_x,m)).  Should be good enough.

The point about possibly wanting more than 30 bins is that the heap sizes can vary quite a bit, so the smaller the bin ranges, the better granularity you have for then splitting them fairly evenly.  There's a trade-off though:  if you make too many heaps, you lose out of the logarithmic benefits it gives you.  I don't know where the optimal cutoff will be.

Once the assignment of bin-pair heaps to compute nodes are done (all on the head node - very fast), the head node then sends those assignments out to the compute nodes. The compute nodes are responsible for distributing the distance triples (i,j, and d_ij) they've computed out to the correct place. Upon receiving distance triples, a compute node needs to insert it into the correct bin-pair heap.  An easy way to do this is to route all the data tranfers through the head node.  That would still be a lot faster than having used a single node to compute all the distances ... but it would be faster to employ a peer-to-peer approach of letting the compute nodes pass distances to each other.

(3) Iterating through node-merges

This involves a few pieces:

  1. bin-pair heaps
  2. candidate pools
  3. candidate heaps
  4. distance matrix

I'll first discuss this as though the method only involves bin-pair heaps, then come back and discuss details required to make sense of the other parts.

Bin-pair heaps

At each iteration, the head node requests that each compute node go through its bin-pair heaps and return the triple with the best q_ij value among that compute node's heaps.  The simple way to do this is to just make the request, then silently await the results from the compute nodes.  The problem with this is that there will certainly be cases where one compute node has already found a triple with a very low q-value, while the other compute nodes are churning through their heaps, working with a much higher best-yet-seen q-value.  One way to overcome this is to have each compute node regularly broadcast a message with its best-yet-seen q-value.  When a compute node gets such a message from another compute node, it can then set its threshold for continuing to pull values off a heap to match the new value. Probably the easiest way to do this is to have each node broadcast every time they improve on the global best-yet-seen.

When all compute nodes have exhausted their heaps, the head node knows which triple had the best q-value, and coordinates the details of merging those two nodes into a new one, deciding which heap the new one belongs to, then sending it the the corresponding compute node.

Candidate pools and candidate heaps

In the serial algorithm, when triples are pulled off heaps, they are stored in a single candidate pool. When that pool reaches a threshold size, all elements in the pool are put into a newly created candidate heap, and the pool is emptied.  The workflow in a fixed iteration is then to: (a) scan through triples in the candidate pool, if any; (b) scan through the set of candidate heaps, using the best-yet-seen q-value to limit how much scanning is done; (c) scan through the bin-pair heaps.

It's not blazingly obvious (to me) how to treat the total set of candidates pulled off heaps on all the compute nodes, but it seems to make sense to just keep each triple on the compute node that owns the heap the triple came from. This means there will be one candidate pool per compute node. At the end of each iteration, the compute nodes can all inform the head node of the size of their pools, and if the sum of those sizes meets some threshold, a new heap can be formed with the triples from all those pools.  The new heap needs to be placed on some compute node - might as well use the same greedy approach described for assigning bin-pair heaps.  

An alternate option is to keep the pools on each compute node independent. Rather than occasionally merge the pools into a candidate heap that is placed on some node determined by the head node, the compute nodes could each be responsible for creating a new locally-stored candidate heap when its pool reaches threshold size.This would be easier to code, but (I think) will lead to longer run times.  Might be worth testing that, though.

Under this parallelized framework, the workflow in a fixed iteration is then: 

  1. the head node requests that all compute nodes scan through triples in their candidate pools, and report back the best-yet-seen q-value
  2. the head node requests that each compute node scan through the set of candidate heaps and bin-pair heaps.

 (these don't have to be done serially - the head node could just ask each compute node to "start scanning", and expect that the compute node will occasionally broadcast a best-yet-seen q-value, which will then be used for the bounding used in the heap scans)

Distance Matrix

At the end of each iteration, a pair (i,j) of nodes/clusters is merged, and a new cluster ij is formed. This requires interaction with a distance matrix D, as the new distance from ij to each remaining cluster k -- d_{ij,k} -- depends on distance values d_{i,k} and d_{j,k}.  Values d_{ij,k} are computed for all positions k by scanning through two rows of D (i and j) in order of increasing k, and keeping the resulting d_{ij,k} in memory.  Sadly this is the one part of the algorithm for which I don't yet see an easy parallelization.  I haven't benchmarked this, but I'd bet that this part of the process is a very small part of the total run time, so if the rest is parallelized, then the head node has to do this on its own at the end of each iteration, I'm hopeful that things won't be too terribly slow.

One possible optimization:  it often turns out that a triple encountered fairly early in the heap-scanning process proves to be the best one globally.  Of course we don't know this will be the case, but it may suggest a speedup, along these lines:  after the candidate heaps have been scanned, we'll have a triple with very low q-value.   The head node could optimistically guess that this will end up being the best triple, and start scanning the corresponding i and j rows of D - storing the d_{ij,k}

values into an in-memory array - even before the scan of the bin-pair heaps is complete.  If, at any point, one of the compute nodes broadcasts that it has found a better triple, the head node can start all over again with the new rows i' and j' - simply overwriting the values calculated based on the original i and j.  

A possible improvement to this is that I've noticed that when a cluster i is part of a good triple with some other cluster j, it often turns out that when a better triple is found, either i or j is a part of that triple, e.g. i,j' .  This suggestes that it might be helpful to use the optimistic lookahead to read promising rows from D into memory (since the real bottleneck is the disk access).  We could, for example, keep the most recent K (e.g. K=10) rows from D in memory.  This way, if the final best i,j pair has one or both involved clusters in memory, we'll avoid the disk-I/O at that time, since it will already have been done in advance.  (it doesn't need to be the most recent K rows ... it could be the K rows that have most often been part of a good scoring triple).

One other possible way of speeding things up some may be to split D among several disks on a single system.  Specifically, if we have 4 disks, D could be divided into 4 pieces vertically (each piece contains 1/4 of the columns and all of the rows).  Then a multi-threaded process could read/write d-values from those various D files essentially independently, mitigating some of the latency issues.  I don't see how to cleanly spread this out over multiple systems.

That's it for now.  I'm open to suggestions or questions.