2014년 12월 30일 화요일

[cascading-user] Troubleshooting extreme reducer skew

I'm having a problem with a transformation I'm writing, where according to Driven in two of the steps of one of my flows nearly all of the tuples (e.g, 16.76M / 16.88M) get fed to just one slice of the reduce step:


I've checked the counts for the grouping fields, and none of the groups have nearly anything like 16M tuples in it; the largest group has 1,281 tuples in it.  How can I troubleshoot the problem here?

This step runs in EMR (AMI 3.3.1, Amazon Hadoop 2.4.0) as part of a cascade.  None of the other flows in the cascade experience anything similar.



I'll add a couple more observations and details that either I missed or learned since:
  1. I'm using Cascading 2.6.1
  2. I looked at the Hadoop job configuration for one of the offending tasks, and mapred.partitioner.class = cascading.tuple.hadoop.util.GroupingSortingPartitioner.
  3. The GroupBy's two grouping fields are a Long and a String, and the additional sorting field is a java.sql.Timestamp.
  4. I'm not deliberately or knowingly doing anything with Comparators or Hashers.  So based on my reading of the Cascading code, the tuples should be getting partitioned according to the ObjectHasher.
  5. One of my job runs did not experience the problem, so there seems to be some nondeterminism about this.



One prehistoric cave man approach is to figure out how to log, for each group, the grouping key and group size.

Easy if you have a custom Buffer already, otherwise more challenging, but given the small # of records (16M) it wouldn't add much to put that in as a separate output.

Assuming the total # of groups isn't huge, you can then grep the logs (or example the output files) to see if one or a few of the groups have many more elements than what you'd expect.

Though one quick question - are you seeing reduce task times that mirror what Driven is reporting? E.g. one of the reduce tasks is taking much, much longer than any of the other tasks.




The problem is already happening precisely at the steps of two custom buffers, so I'll give that a shot.  I doubt the problem is there, given my examination of group sizes in the output data; the original data comes from a SQL database, and a quick query says that the biggest group for the buffer is ought to be no larger than 1,250 tuples.  But hey, maybe I messed up somewhere in between, so no harm in double-checking.

To your last question, the first graph of the first screenshot (for which I, unhelpfully, did not include the legend) shows that one of the reduce slices takes 15+ minutes, and the others take



Actually, Driven reports on the number of groups that were handled by each of the reduce slices.  The big one handled 2,173,882 groups; the next one in size handled 10,736, and all the other 29 slices handled no groups.  That doesn't look right.



Ok, I believe I may have found a bug in Cascading.  Here's what I've found.

The logic for assigning tuples to reduce partitions lives in the GroupingSortingPartitioner class:



  public int getPartition( TuplePair key, Tuple value, int numReduceTasks )
    {
    return ( hashCode( key.getLhs() ) & Integer.MAX_VALUE ) % numReduceTasks;
    }
  }

The lhs of the TuplePair is the grouping tuple for the tuple being assigned a partition.  The hashCode() method here is from TupleHasher:



  public final int hashCode( Tuple tuple )
    {
    int hash = 1;

    List<Object> elements = Tuple.elements( tuple );

    for( int i = 0; i < elements.size(); i++ )
      {
      Object element = elements.get( i );

      hash = 31 * hash + ( element != null ? hashers[ i % hashers.length ].hashCode( element ) : 0 );
      }

    return hash;
    }

The reason I'm getting extreme skew in the reduce stage is the unlucky combination of these factors:
  1. I'm grouping with two fields;
  2. The second of these fields has only two distinct values, and one of them is orders of magnitude more frequent that the other;
  3. In nearly all the EMR runs of my job, the reduce step has been partitioned into 31 tasks—precisely the prime number used in hascCode() above to combine the hashes of the fields.
So if you look closely at the code I've pasted above, when numReduceTasks = 31 this interacts poorly with the use of 31 as a prime factor in hashCode().

The good news is I've managed to work around this simply by changing the order of the fields in the GroupBy.



Simple fast hash algorithms tend to have these pathological cases. 

One possible solution would be to pass the number of reducers into the hashCode algorithm and then have it find a prime number higher than that. But that would create quite a bit of additional computation in a method that is called a lot. it would also break the API.

I think for now the solution of allowing custom hashers is the best choice. You'll know your data far better than any simple general purpose hash we can write. In your case your second element has more distribution than your first so you want its bits to impact the final hash more than the first.

On Wednesday, December 31, 2014 2:34:30 PM UTC-8, Luis Casillas wrote:  

Ok, I believe I may have found a bug in Cascading.  Here's what I've found.

 ...

So if you look closely at the code I've pasted above, when numReduceTasks = 31 this interacts poorly with the use of 31 as a prime factor in hashCode().



Using a prime number of reduce tasks is…well, kind of odd :)

Normally you'd set this to the number of reduce slots in your cluster - why is it 31 in your EMR setup?

In any case, I don't think I'd call this a bug in Cascading, but rather an indication that the TupleHasher code could be improved.

E.g. using the JOAAT algorithm, which would look like:

    public final int hashCode(Tuple tuple) {
        int hash = 0;

        List<Object> elements = Tuple.elements(tuple);


        for (int i = 0; i < elements.size(); i++) {
            Object element = elements.get(i);

            int elementHash = element != null ? hashers[i % hashers.length].hashCode(element) : 0;

            hash += elementHash;
            hash += (hash << 10);
            hash ^= (hash >> 6);
        }

        hash += (hash << 3);
        hash ^= (hash >> 11);
        hash += (hash << 15);
        
        return hash;
    }



I've been thinking that there are three types of solution here:
  1. As you suggest, change the hash algorithm so that it take the number of slices into account.
  2. Use a different way of combining hashes, such that the chances that it will collide with the number of slices is much lower.
  3. Use a different way of computing the partition for a tuple, so that more bits of the grouping tuple's hashcode contribute to the choice of partition.
I think you've formulated a good reason why #1 is dispreferrred, but my guess is that either #2 or #3 should offer a good solution.  (Though one that needs to be thought through very carefully with a cup of coffee and a copy of Knuth perhaps...)



We've been burned once or twice on issues like this as well.

Ken: one way you could wind up with 31 reducers is that sometimes people do a hack of setting each stage in a big pipeline to a different number of reducers (which is easy to do in scalding) so they can clearly see which stage is which in the job tracker.



I'm just using the default EMR configuration, and that's what it gave me for a cluster with two core nodes.  It does seem to like numbers of the shape 2^n - smallDelta; with 4 core nodes I see reduce task counts like 63 or 62.

Whether to call it a bug or not, we do agree that the TupleHasher could be improved.  That JOAAT algorithm does seem like a good suggestion, though I figure this will need to be thought out very carefully...



yes, lets replace the hash function.

anyone willing to do a bake off?

ultimately this should be pluggable, but i’d like to see a preferred default implementation. 

according to this, since we are really just hashing ints, FNV-1a and murmur2 has the least collisions



댓글 없음:

댓글 쓰기