Home | | Web Programming | Parallel Programming via the Fork/Join Framework

Chapter: Java The Complete Reference : The Java Library : The Concurrency Utilities

Parallel Programming via the Fork/Join Framework

In recent years, an important new trend has emerged in software development: parallel programming. Parallel programming is the name commonly given to the techniques that take advantage of computers that contain two or more processors (multicore).

Parallel Programming via the Fork/Join Framework

In recent years, an important new trend has emerged in software development: parallel programming. Parallel programming is the name commonly given to the techniques that take advantage of computers that contain two or more processors (multicore). As most readers will know, multicore computers are becoming commonplace. The advantage that multi-processor environments offer is the ability to significantly increase program performance. As a result, there has been a growing need for a mechanism that gives Java programmers a simple, yet effective way to make use of multiple processors in a clean, scalable manner. To answer this need, JDK 7 added several new classes and interfaces that support parallel programming. They are commonly referred to as the Fork/Join Framework. It is one of the more important additions that has recently been made to the Java class library. The Fork/Join Framework is defined in the java.util.concurrent package.

The Fork/Join Framework enhances multithreaded programming in two important ways. First, it simplifies the creation and use of multiple threads. Second, it automatically makes use of multiple processors. In other words, by using the Fork/Join Framework you enable your applications to automatically scale to make use of the number of available processors. These two features make the Fork/Join Framework the recommended approach to multithreading when parallel processing is desired.

Before continuing, it is important to point out the distinction between traditional multithreading and parallel programming. In the past, most computers had a single CPU and multithreading was primarily used to take advantage of idle time, such as when a program is waiting for user input. Using this approach, one thread can execute while another is waiting. In other words, on a single-CPU system, multithreading is used to allow two or more tasks to share the CPU. This type of multithreading is typically supported by an object of type Thread (as described in Chapter 11). Although this type of multithreading will always remain quite useful, it was not optimized for situations in which two or more CPUs are available (multicore computers).

 

When multiple CPUs are present, a second type of multithreading capability that supports true parallel execution is required. With two or more CPUs, it is possible to execute portions of a program simultaneously, with each part executing on its own CPU. This can be used to significantly speed up the execution of some types of operations, such as sorting, transforming, or searching a large array. In many cases, these types of operations can be broken down into smaller pieces (each acting on a portion of the array), and each piece can be run on its own CPU. As you can imagine, the gain in efficiency can be enormous. Simply put: Parallel programming will be part of nearly every programmer’s future because it offers a way to dramatically improve program performance.

 

The Main Fork/Join Classes

 

The Fork/Join Framework is packaged in java.util.concurrent. At the core of the Fork/Join Framework are the following four classes:

ForkJoinTask<V> : An abstract class that defines a task

ForkJoinPool : Manages the execution of ForkJoinTasks

RecursiveAction : A subclass of ForkJoinTask<V> for tasks that do not return values

RecursiveTask<V> : A subclass of ForkJoinTask<V> for tasks that return values

Here is how they relate. A ForkJoinPool manages the execution of ForkJoinTasks. ForkJoinTask is an abstract class that is extended by the abstract classes RecursiveAction and RecursiveTask. Typically, your code will extend these classes to create a task. Before looking at the process in detail, an overview of the key aspects of each class will be helpful.

ForkJoinTask<V>

 

ForkJoinTask<V> is an abstract class that defines a task that can be managed by a ForkJoinPool. The type parameter V specifies the result type of the task. ForkJoinTask differs from Thread in that ForkJoinTask represents lightweight abstraction of a task, rather than a thread of execution. ForkJoinTasks are executed by threads managed by a thread pool of type ForkJoinPool. This mechanism allows a large number of tasks to be managed by a small number of actual threads. Thus, ForkJoinTasks are very efficient when compared to threads.

ForkJoinTask defines many methods. At the core are fork( ) and join( ), shown here: final ForkJoinTask<V> fork( )

final V join( )

 

The fork( ) method submits the invoking task for asynchronous execution of the invoking task. This means that the thread that calls fork( ) continues to run. The fork( ) method returns this after the task is scheduled for execution. Prior to JDK 8, fork( ) could be executed only from within the computational portion of another ForkJoinTask, which is running within a ForkJoinPool. (You will see how to create the computational portion of a task shortly.) However, with the advent of JDK 8, if fork( ) is not called while executing within a ForkJoinPool, then a common pool is automatically used. The join( ) method waits until the task on which it is called terminates. The result of the task is returned. Thus, through the use of fork( ) and join( ), you can start one or more new tasks and then wait for them to finish.

Another important ForkJoinTask method is invoke( ). It combines the fork and join operations into a single call because it begins a task and then waits for it to end. It is shown here:

 

final V invoke( )

 

The result of the invoking task is returned.

 

You can invoke more than one task at a time by using invokeAll( ). Two of its forms are shown here:

 

static void invokeAll(ForkJoinTask<?> taskA, ForkJoinTask<?> taskB) static void invokeAll(ForkJoinTask<?> ... taskList)

 

In the first case, taskA and taskB are executed. In the second case, all specified tasks are executed. In both cases, the calling thread waits until all of the specified tasks have terminated. Prior to JDK 8, the invokeAll( ) method could be executed only from within the computational portion of another ForkJoinTask, which is running within a ForkJoinPool. JDK 8’s inclusion of the common pool relaxed this requirement.

 

RecursiveAction

 

A subclass of ForkJoinTask is RecursiveAction. This class encapsulates a task that does not return a result. Typically, your code will extend RecursiveAction to create a task that has a void return type. RecursiveAction specifies four methods, but only one is usually of interest: the abstract method called compute( ). When you extend RecursiveAction to create a concrete class, you will put the code that defines the task inside compute( ). The compute( ) method represents the computational portion of the task.

 

The compute( ) method is defined by RecursiveAction like this: protected abstract void compute( )

 

Notice that compute( ) is protected and abstract. This means that it must be implemented by a subclass (unless that subclass is also abstract).

In general, RecursiveAction is used to implement a recursive, divide-and-conquer strategy for tasks that don’t return results. (See “The Divide-and-Conquer Strategy” later in this chapter.)

 

RecursiveTask<V>

 

Another subclass of ForkJoinTask is RecursiveTask<V>. This class encapsulates a task that returns a result. The result type is specified by V. Typically, your code will extend

RecursiveTask<V> to create a task that returns a value. Like RecursiveAction, it too specifies four methods, but often only the abstract compute( ) method is used, which represents the computational portion of the task. When you extend RecursiveTask<V> to create a concrete class, put the code that represents the task inside compute( ). This code must also return the result of the task.

 

The compute( ) method is defined by RecursiveTask<V> like this: protected abstract V compute( )

 

Notice that compute( ) is protected and abstract. This means that it must be implemented by a subclass. When implemented, it must return the result of the task.

In general, RecursiveTask is used to implement a recursive, divide-and-conquer strategy for tasks that return results. (See “The Divide-and-Conquer Strategy” later in this chapter.)

 

ForkJoinPool

 

The execution of ForkJoinTasks takes place within a ForkJoinPool, which also manages the execution of the tasks. Therefore, in order to execute a ForkJoinTask, you must first have a ForkJoinPool. Beginning with JDK 8, there are two ways to acquire a ForkJoinPool. First, you can explicitly create one by using a ForkJoinPool constructor. Second, you can use what is referred to as the common pool. The common pool (which was added by JDK 8) is a static ForkJoinPool that is automatically available for your use. Each method is introduced here, beginning with manually constructing a pool.

ForkJoinPool defines several constructors. Here are two commonly used ones: ForkJoinPool( )

ForkJoinPool(int pLevel)

 

The first creates a default pool that supports a level of parallelism equal to the number of processors available in the system. The second lets you specify the level of parallelism. Its value must be greater than zero and not more than the limits of the implementation. The level of parallelism determines the number of threads that can execute concurrently. As a result, the level of parallelism effectively determines the number of tasks that can be executed simultaneously. (Of course, the number of tasks that can execute simultaneously cannot exceed the number of processors.) It is important to understand that the level of parallelism does not, however, limit the number of tasks that can be managed by the pool. A ForkJoinPool can manage many more tasks than its level of parallelism. Also, the level of parallelism is only a target. It is not a guarantee.

 

After you have created an instance of ForkJoinPool, you can start a task in a number of different ways. The first task started is often thought of as the main task. Frequently, the main task begins subtasks that are also managed by the pool. One common way to begin a main task is to call invoke( ) on the ForkJoinPool. It is shown here:

 

<T> T invoke(ForkJoinTask<T> task)

 

This method begins the task specified by task, and it returns the result of the task. This means that the calling code waits until invoke( ) returns.

 

To start a task without waiting for its completion, you can use execute( ). Here is one of its forms:

 

void execute(ForkJoinTask<?> task)

 

In this case, task is started, but the calling code does not wait for its completion. Rather, the calling code continues execution asynchronously.

 

Beginning with JDK 8, it is not necessary to explicitly construct a ForkJoinPool because a common pool is available for your use. In general, if you are not using a pool that you explicitly created, then the common pool will automatically be used. Although it won’t always be necessary, you can obtain a reference to the common pool by calling commonPool( ), which is defined by ForkJoinPool. It is shown here:

 

static ForkJoinPool commonPool( )

 

A reference to the common pool is returned. The common pool provides a default level of parallelism. It can be set by use of a system property. (See the API documentation for details.) Typically, the default common pool is a good choice for many applications. Of course, you can always construct your own pool.

There are two basic ways to start a task using the common pool. First, you can obtain a reference to the pool by calling commonPool( ) and then use that reference to call invoke( ) or execute( ), as just described. Second, you can call ForkJoinTask methods such as fork( ) or invoke( ) on the task from outside its computational portion. In this case, the common pool will automatically be used. In other words, fork( ) and invoke( ) will start a task using the common pool if the task is not already running within a ForkJoinPool.

 

ForkJoinPool manages the execution of its threads using an approach called work-stealing. Each worker thread maintains a queue of tasks. If one worker thread’s queue is empty, it will take a task from another worker thread. This adds to overall efficiency and helps maintain a balanced load. (Because of demands on CPU time by other processes in the system, even two worker threads with identical tasks in their respective queues may not complete at the same time.)

 

One other point: ForkJoinPool uses daemon threads. A daemon thread is automatically terminated when all user threads have terminated. Thus, there is no need to explicitly shut down a ForkJoinPool. However, with the exception of the common pool, it is possible to do so by calling shutdown( ). The shutdown( ) method has no effect on the common pool.

 

The Divide-and-Conquer Strategy

 

As a general rule, users of the Fork/Join Framework will employ a divide-and-conquer strategy that is based on recursion. This is why the two subclasses of ForkJoinTask are called RecursiveAction and RecursiveTask. It is anticipated that you will extend one of these classes when creating your own fork/join task.

 

The divide-and-conquer strategy is based on recursively dividing a task into smaller subtasks until the size of a subtask is small enough to be handled sequentially. For example, a task that applies a transform to each element in an array of N integers can be broken down into two subtasks in which each transforms half the elements in the array. That is, one subtask transforms the elements 0 to N/2, and the other transforms the elements N/2 to N. In turn, each subtask can be reduced to another set of subtasks, each transforming half of the remaining elements. This process of dividing the array will continue until a threshold is reached in which a sequential solution is faster than creating another division.

The advantage of the divide-and-conquer strategy is that the processing can occur in parallel. Therefore, instead of cycling through an entire array using a single thread, pieces of the array can be processed simultaneously. Of course, the divide-and-conquer approach works in many cases in which an array (or collection) is not present, but the most common uses involve some type of array, collection, or grouping of data.

One of the keys to best employing the divide-and-conquer strategy is correctly selecting the threshold at which sequential processing (rather than further division) is used. Typically, an optimal threshold is obtained through profiling the execution characteristics. However, very significant speed-ups will still occur even when a less-than-optimal threshold is used. It is, however, best to avoid overly large or overly small thresholds. At the time of this writing, the Java API documentation for ForkJoinTask<T> states that, as a rule-of-thumb, a task should perform somewhere between 100 and 10,000 computational steps.

It is also important to understand that the optimal threshold value is also affected by how much time the computation takes. If each computational step is fairly long, then smaller thresholds might be better. Conversely, if each computational step is quite short, then larger thresholds could yield better results. For applications that are to be run on a known system, with a known number of processors, you can use the number of processors to make informed decisions about the threshold value. However, for applications that will be running on a variety of systems, the capabilities of which are not known in advance, you can make no assumptions about the execution environment.

 

One other point: Although multiple processors may be available on a system, other tasks (and the operating system, itself) will be competing with your application for CPU time. Thus, it is important not to assume that your program will have unrestricted access to all CPUs. Furthermore, different runs of the same program may display different run time characteristics because of varying task loads.

 

A Simple First Fork/Join Example

 

At this point, a simple example that demonstrates the Fork/Join Framework and the divide-and-conquer strategy will be helpful. Following is a program that transforms the elements in an array of double into their square roots. It does so via a subclass of RecursiveAction. Notice that it creates its own ForkJoinPool.

 

// A simple example of the basic divide-and-conquer strategy.

 

     //In this case, RecursiveAction is used.

     import java.util.concurrent.*;

 

import java.util.*;

 

     //A ForkJoinTask (via RecursiveAction) that transforms

 

//the elements in an array of doubles into their square roots.

 

class SqrtTransform extends RecursiveAction {

 

     //The threshold value is arbitrarily set at 1,000 in this example.

 

     //In real-world code, its optimal value can be determined by

 

     //profiling and experimentation.

 

final int seqThreshold = 1000;

 

     //Array to be accessed.

     double[] data;

 

     //Determines what part of data to process.

 

     int start, end;

 

SqrtTransform(double[] vals, int s, int e ) { data = vals;

 

start = s; end = e;

}

 

// This is the method in which parallel computation will occur.

protected void compute() {

 

     //If number of elements is below the sequential threshold,

 

     //then process sequentially.

 

if((end - start) < seqThreshold) {

 

// Transform each element into its square root.

for(int i = start; i < end; i++) {

 

data[i] = Math.sqrt(data[i]);

 

}

 

}

 

else {

 

     //Otherwise, continue to break the data into smaller pieces.

 

     //Find the midpoint.

 

int middle = (start + end) / 2;

 

// Invoke new tasks, using the subdivided data.

invokeAll(new SqrtTransform(data, start, middle),

 

new SqrtTransform(data, middle, end));

 

}

 

}

// Demonstrate parallel execution.

class ForkJoinDemo {

 

public static void main(String args[]) { // Create a task pool.

 

ForkJoinPool fjp = new ForkJoinPool();

 

double[] nums = new double[100000];

 

// Give nums some values.

 

for(int i = 0; i < nums.length; i++) nums[i] = (double) i;

System.out.println("A portion of the original sequence:");

 

for(int i=0; i < 10; i++) System.out.print(nums[i] + " ");

System.out.println("\n");

 

SqrtTransform task = new SqrtTransform(nums, 0, nums.length);

 

// Start the main ForkJoinTask.

fjp.invoke(task);

 

System.out.println("A portion of the transformed sequence" + " (to four decimal places):");

 

for(int i=0; i < 10; i++) System.out.format("%.4f ", nums[i]);

System.out.println();

 

}

 

}

 

The output from the program is shown here:

 

A portion of the original sequence: 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0

 

A portion of the transformed sequence (to four decimal places): 0.0000 1.0000 1.4142 1.7321 2.0000 2.2361 2.4495 2.6458 2.8284 3.0000

 

As you can see, the values of the array elements have been transformed into their square roots. Let’s look closely at how this program works. First, notice that SqrtTransform is a class

 

that extends RecursiveAction. As explained, RecursiveAction extends ForkJoinTask for tasks that do not return results. Next, notice the final variable seqThreshold. This is the value that determines when sequential processing will take place. This value is set (somewhat arbitrarily) to 1,000. Next, notice that a reference to the array to be processed is stored in data and that the fields start and end are used to indicate the boundaries of the elements to be accessed.

 

The main action of the program takes place in compute( ). It begins by checking if the number of elements to be processed is below the sequential processing threshold. If it is, then those elements are processed (by computing their square root in this example). If the sequential processing threshold has not been reached, then two new tasks are started by calling invokeAll( ). In this case, each subtask processes half the elements. As explained earlier, invokeAll( ) waits until both tasks return. After all of the recursive calls unwind, each element in the array will have been modified, with much of the action taking place in parallel (if multiple processors are available).

 

As mentioned, beginning with JDK 8, it is not necessary to explicitly construct a ForkJoinPool because a common pool is available for your use. Furthermore, using the common pool is a simple matter. For example, you can obtain a reference to the common pool by calling the static commonPool( ) method defined by ForkJoinPool. Therefore, the preceding program could be rewritten to use the common pool by replacing the call to the ForkJoinPool constructor with a call to commonPool( ), as shown here:

ForkJoinPool fjp = ForkJoinPool.commonPool();

Alternatively, there is no need to explicitly obtain a reference to the common pool because calling the ForkJoinTask methods invoke( ) or fork( ) on a task that is not already part of a pool will cause it to execute within the common pool automatically. For example, in the preceding program, you can eliminate the fjp variable entirely and start the task using this line:

 

 

task.invoke();

 

As this discussion shows, the common pool is one of the enhancements JDK 8 made to the Fork/Join Framework that improves its ease-of-use. Furthermore, in many cases, the common pool is the preferable approach, assuming that JDK 7 compatibility is not required.

 

Understanding the Impact of the Level of Parallelism

 

Before moving on, it is important to understand the impact that the level of parallelism has on the performance of a fork/join task and how the parallelism and the threshold interact. The program shown in this section lets you experiment with different degrees of parallelism and threshold values. Assuming that you are using a multicore computer, you can interactively observe the effect of these values.

In the preceding example, the default level of parallelism was used. However, you can specify the level of parallelism that you want. One way is to specify it when you create a ForkJoinPool using this constructor:

 

ForkJoinPool(int pLevel)

 

Here, pLevel specifies the level of parallelism, which must be greater than zero and less than the implementation defined limit.

 

The following program creates a fork/join task that transforms an array of doubles. The transformation is arbitrary, but it is designed to consume several CPU cycles. This was done to ensure that the effects of changing the threshold or the level of parallelism would be more clearly displayed. To use the program, specify the threshold value and the level of parallelism on the command line. The program then runs the tasks. It also displays the amount of time it takes the tasks to run. To do this, it uses System.nanoTime( ), which returns the value of the JVM’s high-resolution timer.

 

    //A simple program that lets you experiment with the effects of

 

    //changing the threshold and parallelism of a ForkJoinTask.

 

    import java.util.concurrent.*;

 

    //A ForkJoinTask (via RecursiveAction) that performs a

 

    //a transform on the elements of an array of doubles.

 

class Transform extends RecursiveAction {

 

     //Sequential threshold, which is set by the constructor.

     int seqThreshold;

 

     //Array to be accessed.

 

double[] data;

 

// Determines what part of data to process.

int start, end;

Transform(double[] vals, int s, int e, int t ) { data = vals;

 

start = s; end = e;

seqThreshold = t;

 

}

 

// This is the method in which parallel computation will occur.

protected void compute() {

 

     //If number of elements is below the sequential threshold,

 

     //then process sequentially.

 

if((end - start) < seqThreshold) {

 

     //The following code assigns an element at an even index the

 

     //square root of its original value. An element at an odd

 

     //index is assigned its cube root. This code is designed

 

     //to simply consume CPU time so that the effects of concurrent

 

     //execution are more readily observable.

 

for(int i = start; i < end; i++) { if((data[i] % 2) == 0)

 

data[i] = Math.sqrt(data[i]); else

 

data[i] = Math.cbrt(data[i]);

 

}

 

}

 

else {

 

     //Otherwise, continue to break the data into smaller pieces.

 

     //Find the midpoint.

 

int middle = (start + end) / 2;

 

// Invoke new tasks, using the subdivided data.

invokeAll(new Transform(data, start, middle, seqThreshold),

 

new Transform(data, middle, end, seqThreshold));

 

}

 

}

 

}

 

// Demonstrate parallel execution.

class FJExperiment {

 

public static void main(String args[]) { int pLevel;

 

int threshold;

 

if(args.length !=      2) {

 

System.out.println("Usage: FJExperiment parallelism threshold "); return;

 

}

 

pLevel = Integer.parseInt(args[0]); threshold = Integer.parseInt(args[1]);

 

// These variables are used to time the task.

long beginT, endT;

 

// Create a task pool. Notice that the parallelism level is set.

 ForkJoinPool fjp = new ForkJoinPool(pLevel);

 

double[] nums = new double[1000000];

 

for(int i = 0; i < nums.length; i++) nums[i] = (double) i;

 

Transform task = new Transform(nums, 0, nums.length, threshold);

 

// Starting timing.

 

beginT = System.nanoTime();

 

     //Start the main ForkJoinTask.

     fjp.invoke(task);

 

     End timing.

 

endT = System.nanoTime();

 

System.out.println("Level of parallelism: " + pLevel); System.out.println("Sequential threshold: " + threshold); System.out.println("Elapsed time: " + (endT - beginT) + " ns"); System.out.println();

 

}

 

}

 

To use the program, specify the level of parallelism followed by the threshold limit. You should try experimenting with different values for each, observing the results. Remember, to be effective, you must run the code on a computer with at least two processors. Also, understand that two different runs may (almost certainly will) produce different results because of the effect of other processes in the system consuming CPU time.

To give you an idea of the difference that parallelism makes, try this experiment. First, execute the program like this:

 

java FJExperiment 1 1000

 

This requests 1 level of parallelism (essentially sequential execution) with a threshold of 1,000. Here is a sample run produced on a dual-core computer:

 

Level of parallelism: 1

 

Sequential threshold: 1000

 

Elapsed time: 259677487 ns

 

Now, specify 2 levels of parallelism like this:

 

java FJExperiment 2 1000

 

Here is sample output from this run produced by the same dual-core computer:

 

Level of parallelism: 2 Sequential threshold: 1000 Elapsed time: 169254472 ns

As is evident, adding parallelism substantially decreases execution time, thus increasing the speed of the program. You should experiment with varying the threshold and parallelism on your own computer. The results may surprise you.

Here are two other methods that you might find useful when experimenting with the execution characteristics of a fork/join program. First, you can obtain the level of parallelism by calling getParallelism( ), which is defined by ForkJoinPool. It is shown here:

 

int getParallelism( )

 

It returns the parallelism level currently in effect. Recall that for pools that you create, by default, this value will equal the number of available processors. (To obtain the parallelism level for the common pool, you can also use getCommonPoolParallelism( ), which was added by JDK 8.) Second, you can obtain the number of processors available in the system by calling availableProcessors( ), which is defined by the Runtime class. It is shown here:

 

int availableProcessors( )

 

The value returned may change from one call to the next because of other system demands.

 

An Example that Uses RecursiveTask<V>

 

The two preceding examples are based on RecursiveAction, which means that they concurrently execute tasks that do not return results. To create a task that returns a result, use RecursiveTask. In general, solutions are designed in the same manner as just shown. The key difference is that the compute( ) method returns a result. Thus, you must aggregate the results, so that when the first invocation finishes, it returns the overall result. Another difference is that you will typically start a subtask by calling fork( ) and join( ) explicitly (rather than implicitly by calling invokeAll( ), for example).

The following program demonstrates RecursiveTask. It creates a task called Sum that returns the summation of the values in an array of double. In this example, the array consists of 5,000 elements. However, every other value is negative. Thus, the first values in the array are 0, –1, 2, –3, 4, and so on. (So that this example will work with both JDK 7 and JDK 8, it creates its own pool. You might try changing it to use the common pool as an exercise.)

 

     //A simple example that uses RecursiveTask<V>.

     import java.util.concurrent.*;

 

     //A RecursiveTask that computes the summation of an array of doubles.

 

     class Sum extends RecursiveTask<Double> {

 

     //The sequential threshold value.

 

     final int seqThresHold = 500;

 

     //Array to be accessed.

 

double[] data;

 

// Determines what part of data to process.

int start, end;

 

Sum(double[] vals, int s, int e ) { data = vals;

start = s; end = e;

}

 

// Find the summation of an array of doubles.

protected Double compute() {

 

double sum = 0;

 

     //If number of elements is below the sequential threshold,

 

     //then process sequentially.

 

if((end - start) < seqThresHold) { // Sum the elements.

 

for(int i = start; i < end; i++) sum += data[i];

 

}

 

else {

 

     //Otherwise, continue to break the data into smaller pieces.

 

     //Find the midpoint.

 

int middle = (start + end) / 2;

 

     //Invoke new tasks, using the subdivided data.

     Sum subTaskA = new Sum(data, start, middle); Sum subTaskB = new Sum(data, middle, end);

 

     //Start each subtask by forking.

 

     subTaskA.fork();

 

subTaskB.fork();

 

     //Wait for the subtasks to return, and aggregate the results.

     sum = subTaskA.join() + subTaskB.join();

 

}

 

// Return the final sum.

return sum;

 

}

 

}

 

// Demonstrate parallel execution.

class RecurTaskDemo {

 

public static void main(String args[]) { // Create a task pool.

 

ForkJoinPool fjp = new ForkJoinPool();

 

double[] nums = new double[5000];

 

     Initialize nums with values that alternate between

 

     positive and negative.

 

for(int i=0; i < nums.length; i++)

 

nums[i] = (double) (((i%2) == 0) ? i : -i) ;

 

Sum task = new Sum(nums, 0, nums.length);

 

     //Start the ForkJoinTasks.  Notice that, in this case,

 

     invoke() returns a result.

 

double summation = fjp.invoke(task);

 

System.out.println("Summation " + summation);

 

}

 

}

 

Here’s the output from the program:

 

Summation -2500.0

 

There are a couple of interesting items in this program. First, notice that the two subtasks are executed by calling fork( ), as shown here:

 

subTaskA.fork();

 

subTaskB.fork();

 

In this case, fork( ) is used because it starts a task but does not wait for it to finish. (Thus, it asynchronously runs the task.) The result of each task is obtained by calling join( ), as shown here:

 

sum = subTaskA.join() + subTaskB.join();

 

This statement waits until each task ends. It then adds the results of each and assigns the total to sum. Thus, the summation of each subtask is added to the running total. Finally, compute( ) ends by returning sum, which will be the final total when the first invocation returns.

 

There are other ways to approach the handling of the asynchronous execution of the subtasks. For example, the following sequence uses fork( ) to start subTaskA and uses invoke( ) to start and wait for subTaskB:

 

subTaskA.fork();

 

sum = subTaskB.invoke() + subTaskA.join();

 

Another alternative is to have subTaskB call compute( ) directly, as shown here:

 

subTaskA.fork();

 

sum = subTaskB.compute() + subTaskA.join();

 

Executing a Task Asynchronously

The preceding programs have called invoke( ) on a ForkJoinPool to initiate a task. This approach is commonly used when the calling thread must wait until the task has completed (which is often the case) because invoke( ) does not return until the task has terminated. However, you can start a task asynchronously. In this approach, the calling thread continues to execute. Thus, both the calling thread and the task execute simultaneously. To start a task asynchronously, use execute( ), which is also defined by ForkJoinPool. It has the two forms shown here:

 

void execute(ForkJoinTask<?> task) void execute(Runnable task)

In both forms, task specifies the task to run. Notice that the second form lets you specify a Runnable rather than a ForkJoinTask task. Thus, it forms a bridge between Java’s traditional approach to multithreading and the new Fork/Join Framework. It is important to remember that the threads used by a ForkJoinPool are daemon. Thus, they will end when the main thread ends. As a result, you may need to keep the main thread alive until the tasks have finished.

 

Cancelling a Task

 

A task can be cancelled by calling cancel( ), which is defined by ForkJoinTask. It has this general form:

 

boolean cancel(boolean interuptOK)

 

It returns true if the task on which it was called is cancelled. It returns false if the task has ended or can’t be cancelled. At this time, the interruptOK parameter is not used by the default implementation. In general, cancel( ) is intended to be called from code outside the task because a task can easily cancel itself by returning.

You can determine if a task has been cancelled by calling isCancelled( ), as shown here: final boolean isCancelled( )

 

It returns true if the invoking task has been cancelled prior to completion and false otherwise.

 

Determining a Task’s Completion Status

 

In addition to isCancelled( ), which was just described, ForkJoinTask includes two other methods that you can use to determine a task’s completion status. The first is isCompletedNormally( ), which is shown here:

 

final boolean isCompletedNormally( )

 

It returns true if the invoking task completed normally, that is, if it did not throw an exception and it was not cancelled via a call to cancel( ). It returns false otherwise.

The second is isCompletedAbnormally( ), which is shown here: final boolean isCompletedAbnormally( )

 

It returns true if the invoking task completed because it was cancelled or because it threw an exception. It returns false otherwise.

 

Restarting a Task

 

Normally, you cannot rerun a task. In other words, once a task completes, it cannot be restarted. However, you can reinitialize the state of the task (after it has completed) so it can be run again. This is done by calling reinitialize( ), as shown here:

 

void reinitialize( )

 

This method resets the state of the invoking task. However, any modification made to any persistent data that is operated upon by the task will not be undone. For example, if the task modifies an array, then those modifications are not undone by calling reinitialize( ).

Things to Explore

 

The preceding discussion presented the fundamentals of the Fork/Join Framework and described several commonly used methods. However, Fork/Join is a rich framework that includes additional capabilities that give you extended control over concurrency. Although it is far beyond the scope of this book to examine all of the issues and nuances surrounding parallel programming and the Fork/Join Framework, a sampling of the other features are mentioned here.

 

A Sampling of Other ForkJoinTask Features

 

In some cases, you will want to ensure that methods such as invokeAll( ) and fork( ) are called only from within a ForkJoinTask. (This may be especially important when using JDK 7, which does not support the common pool.) This is usually a simple matter, but occasionally, you may have code that can be executed from either inside or outside a task. You can determine if your code is executing inside a task by calling inForkJoinPool( ).

You can convert a Runnable or Callable object into a ForkJoinTask by using the adapt( ) method defined by ForkJoinTask. It has three forms, one for converting a Callable, one for a Runnable that does not return a result, and one for a Runnable that does return a result. In the case of a Callable, the call( ) method is run. In the case of Runnable, the run( ) method is run.

 

You can obtain an approximate count of the number of tasks that are in the queue of the invoking thread by calling getQueuedTaskCount( ). You can obtain an approximate count of how many tasks the invoking thread has in its queue that are in excess of the number of other threads in the pool that might “steal” them, by calling getSurplusQueuedTaskCount( ). Remember, in the Fork/Join Framework, work-stealing is one way in which a high level of efficiency is obtained. Although this process is automatic, in some cases, the information may prove helpful in optimizing through-put.

 

ForkJoinTask defines the following variants of join( ) and invoke( ) that begin with the prefix quietly. They are shown here:


In essence, these methods are similar to their non-quiet counterparts except they don’t return values or throw exceptions.

 

You can attempt to “un-invoke” (in other words, unschedule) a task by calling tryUnfork( ). JDK 8 adds several methods, such as getForkJoinTaskTag( ) and setForkJoinTaskTag( ),

that support tags. Tags are short integer values that are linked with a task. They may be useful in specialized applications.

 

ForkJoinTask implements Serializable. Thus, it can be serialized. However, serialization is not used during execution.

A Sampling of Other ForkJoinPool Features

 

One method that is quite useful when tuning fork/join applications is ForkJoinPool’s override of toString( ). It displays a “user-friendly” synopsis of the state of the pool. To see it in action, use this sequence to start and then wait for the task in the FJExperiment class of the task experimenter program shown earlier:

 

    //Asynchronously start the main ForkJoinTask.

    fjp.execute(task);

 

    //Display the state of the pool while waiting.

 

    while(!task.isDone()) {

 

System.out.println(fjp);

 

}

 

When you run the program, you will see a series of messages on the screen that describe the state of the pool. Here is an example of one. Of course, your output may vary, based on the number of processors, threshold values, task load, and so on.

 

java.util.concurrent.ForkJoinPool@141d683[Running, parallelism = 2,

 

size = 2, active = 0, running = 2, steals = 0, tasks = 0, submissions = 1]

 

You can determine if a pool is currently idle by calling isQuiescent( ). It returns true if the pool has no active threads and false otherwise.

You can obtain the number of worker threads currently in the pool by calling getPoolSize( ). You can obtain an approximate count of the active threads in the pool by calling getActiveThreadCount( ).

 

To shut down a pool, call shutdown( ). Currently active tasks will still be executed, but no new tasks can be started. To stop a pool immediately, call shutdownNow( ). In this case, an attempt is made to cancel currently active tasks. (It is important to point out, however, that neither of these methods affects the common pool.) You can determine if a pool is shut down by calling isShutdown( ). It returns true if the pool has been shut down and false otherwise. To determine if the pool has been shut down and all tasks have been completed, call isTerminated( ).

 

Some Fork/Join Tips

 

Here are a few tips to help you avoid some of the more troublesome pitfalls associated with using the Fork/Join Framework. First, avoid using a sequential threshold that is too low. In general, erring on the high side is better than erring on the low side. If the threshold is too low, more time can be consumed generating and switching tasks than in processing the tasks. Second, usually it is best to use the default level of parallelism. If you specify a smaller number, it may significantly reduce the benefits of using the Fork/Join Framework.

In general, a ForkJoinTask should not use synchronized methods or synchronized blocks of code. Also, you will not normally want to have the compute( ) method use other types of synchronization, such as a semaphore. (The new Phaser can, however, be used when appropriate because it is compatible with the fork/join mechanism.) Remember, the main idea behind a ForkJoinTask is the divide-and-conquer strategy. Such an approach does not normally lend itself to situations in which outside synchronization is needed. Also, avoid situations in which substantial blocking will occur through I/O. Therefore, in general,

a ForkJoinTask will not perform I/O. Simply put, to best utilize the Fork/Join Framework, a task should perform a computation that can run without outside blocking or synchronization.

One last point: Except under unusual circumstances, do not make assumptions about the execution environment that your code will run in. This means you should not assume that some specific number of processors will be available, or that the execution characteristics of your program won’t be affected by other processes running at the same time.

 


Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail
Java The Complete Reference : The Java Library : The Concurrency Utilities : Parallel Programming via the Fork/Join Framework |


Privacy Policy, Terms and Conditions, DMCA Policy and Compliant

Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.