Tuesday 31 January 2012

Parallel Processing with ForkJoinPool

With the release of Java 7 came the new Executor Service, ForkJoinPool. The class uses the notion of work-stealing threads which execute tasks which have been created by other tasks. Ultimately, this division of work will result in large problems becoming a series of smaller ones which will eventually result in one answer when all the tasks have completed their processing.

This post will demonstrate this new threading feature by an interpolation example. In this case, for two numbers with a given number of steps, the code will attempt to construct new points between those numbers using linear interpolation, and for simplicity using only a number of steps divisible by 2. eg for the numbers 5 and 25 with a step number of 4, the generated sequence would be:

[5.0, 10.0, 15.0, 20.0, 25.0]

The InterpolationService class below will contain an instance of the ForkJoinPool class. A ForkJoinPool instance can be created with an argument representing a target parallelism. This can either be supplied or left as the default which will be the number of processors ie  Runtime.availableProcessors()

package com.city81.forkjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class InterpolateService {

    private final ForkJoinPool forkJoinPool;

    public InterpolateService() {
        forkJoinPool = new ForkJoinPool();
    }

    public List<Double> interpolate(double valueOne, double valueTwo, int steps) {
        
        ForkJoinTask<List<Double>> job = forkJoinPool.submit(
            new InterpolateTask(valueOne, valueTwo, steps));
        return job.join();
    }

    private static class InterpolateTask 
        extends RecursiveTask<List<Double>> {

        private final double valueOne;
        private final double valueTwo;
        private final int steps;
        private static final int LOWEST_STEP = 2;

        protected InterpolateTask(double valueOne, double valueTwo, int steps) {

            this.valueOne = valueOne;
            this.valueTwo = valueTwo;
            this.steps = steps;
        }

        @Override
        protected List<Double> compute() {
    
            List<Double> interpolatedArray = new ArrayList<Double>();
            double interpolatedValue = interpolate(valueOne, valueTwo);
    
            if (this.steps == LOWEST_STEP) {
                interpolatedArray.add(valueOne);
                interpolatedArray.add(interpolatedValue);
                interpolatedArray.add(valueTwo);
            } else {
                InterpolateTask interpolateTask1 = 
                    new InterpolateTask(valueOne, interpolatedValue, (steps/2));
                interpolateTask1.fork();
                InterpolateTask interpolateTask2 = 
                    new InterpolateTask(interpolatedValue, valueTwo, (steps/2));
                List<Double> interpolatedArrayTask2 = 
                    interpolateTask2.compute();
                List<Double> interpolatedArrayTask1 = 
                    interpolateTask1.join();
                interpolatedArray.addAll(
                    interpolatedArrayTask1.subList(0, interpolatedArrayTask1.size() - 1));
                interpolatedArray.addAll(interpolatedArrayTask2);
            }
            // System.out.println(interpolatedArray);
            return interpolatedArray;
        }

        private double interpolate(
            double valueOne, double valueTwo) {
   
            return ((valueTwo - valueOne) / 2.0) + valueOne;
        }

    }

    public static void main(String[] args) {
        InterpolateService interpolateService = new InterpolateService();
        List<Double> interpolatedArray = interpolateService.interpolate(5,25,4);
        System.out.println(interpolatedArray);
    }

}

The InterpolationService 's only public method interpolate, takes two numbers and the number of steps. These are the parameters to construct an instance of our private static class InterpolateTask. This class implements RecursiveTask which can return a result when the compute method is called on the task. An alternative implementation (but not suitable for this example) would be RecursiveAction whose compute method returns nothing.

The constructed task is submitted to the ForkJoinPool.

The compute method is where the logic for subdividing the task resides. It decides whether the work it has to process is small enough to process now or whether it needs further division. In the case of our example, the task will determine a task small enough when the step size is 2. If not, it will create two sub tasks, dividing the step size each time.

Eventually, all tasks will be of a small enough size to be completed. This will result in the arrays being added together as the recursive task tree is traversed back up to the original task.

This is not example of how to efficiently perform linear interpolation on a set of numbers but is just one example implementation of how to use the ForkJoinPool and its associated Task classes. Over time, hopefully this post can be updated to make the above code more efficient as well gathering metrics and benchmarking against other implementations.