Debugging map/reduce jobs may be a difficult task - especially when the code is not properly structured and is not designed for unit testing. Nobody is perfect :) In addition to that, there may be a number of reasons to collect some non-critical diagnostic information from map/reduce jobs - for various performance analysis, searching for tricky failures etc.

Hadoop does not offer too much when it comes to listening to what your job has to say. And personally I do not like an idea of any kind remote logging from map/reduce jobs. In a performance-sensitive environment I would avoid doing any additional network I/O for logging purposes - unless this traffic can be batched, compressed and sent at appropriate moment. Unfortunately, this would require using of another kind of logging framework.

But, in fact, there is a legal way for accessing the logs of your map/reduce job. You can get up to four log types:

  • STDOUT (this is what is printed to stdout)
  • STDERR (as you can guess, this is what gets printed to stderr)
  • SYSLOG (this is the result of log4j output)
  • PROFILE (this is the profile.out file - if you use task profiling feature available in Hadoop)
  • DEBUGOUT (used when debugging map/reduce jobs)

In most of the cases you would be primarily interested in your job's stdout, stderr or syslog outputs as they are easy to produce from your code. The last one will, in fact, be the log4j output and if you want to control log4j settings you will have to modify a file on each node in your cluster.

In order to understand what are you looking for, you need to understand how your job is executed. A job has it unique identifier. Every job has map and reduce phase. Each map or reduce phase consists of a number of tasks. And, finally, for each task there will be one or more task attempts. Thus, what you are looking for it the output of each task attempt of your job.

Typically when you run a map/reduce job you get the object of type org.apache.hadoop.mapreduce.Job. Using this object you can either wait for the job completion or poll it periodically to check its status. It also provides you access to the “task completion events” via its Job.getTaskCompletionEvents(int size) method. This is what you need to get the information on each piece of work your map/reduce job is doing. What you get back is a bunch of org.apache.hadoop.mapred.TaskCompletionEvent objects.

Important! Hadoop documentation does not explain the meaning of that integer parameter that you pass to Job.getTaskCompletionEvents() method. In fact, even if you are analyzing the completed job you will never get all of your task completion events back from one call to that method. These events are downloaded from the job tracker and they are not downloaded all at once, they are downloaded in chunks or depending on their availability. So, in order to download them all for a completed job you would need a code like this:

    List<TaskCompletionEvent> completionEvents = new LinkedList<TaskCompletionEvent>();
...
while (true) {
      try {
        TaskCompletionEvent[] bunchOfEvents;
        bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
        if (bunchOfEvents == null || bunchOfEvents.length == 0) {
          break;
        }
        completionEvents.addAll(Arrays.asList(bunchOfEvents));
      } catch (IOException e) {
...
        break;
      }

Note that this code works only for the completed jobs - because it fetches more and more events until no more events is available. For a job that is running you will need a different logic. You will need to loop while job is running and do some kind of sleep() while trying to fetch more events. Hadoop also offers a way for your to be notified about the job completion.

Each task completion event needs to be analyzed. For example, you may not want to deal with any events except the ones that are from the succeeded task attempts (e.g. TaskCompletionEvent.getTaskStatus() returns Status.SUCCEEDED). Although I can imagine completely opposite - you may want to collect the information about the failures only, this is also a valid use case.

Then you need to construct the log download URL as follows:

    TaskCompletionEvent taskCompletionEvent = ...
...
    StringBuilder logURL = new StringBuilder(taskCompletionEvent.getTaskTrackerHttp());
    logURL.append("/tasklog?attemptid=");
    logURL.append(taskCompletionEvent.getTaskAttemptId().toString());
    logURL.append("&plaintext=true");
    logURL.append("&filter=" + TaskLog.LogName.STDOUT);

You can use different filter to get different type of the log. What you get back after executing HTTP GET on this URL is the plain-text content of the appropriate log file.

You may want to download these logs in parallel but remember that depending on the size of your cluster and size and type of your job you may easily end up with hundreds or even thousands tasks attempts.

I also suggest you to refer to the article Hadoop Log Location and Retention, it explains the directory structures very well. Also it explains for how long are the logs kept on the task trackers and what configuration parameters affect this retention time.

For those who are more curious I would suggest to look at TaskLogServlet class in Hadoop source code. All this log access over HTTP is handled there.

Happy map/reducing :)




blog comments powered by Disqus

Published

20 January 2013

Category

java

Tags