Parallel
Programming via the Fork/Join Framework
In recent years, an important
new trend has emerged in software development: parallel programming.
Parallel programming is the name commonly given to the techniques that take advantage of computers that
contain two or more processors (multicore). As most readers will know,
multicore computers are becoming commonplace. The advantage that
multi-processor environments offer is the ability to significantly increase
program performance. As a result, there has been a growing need for a mechanism
that gives Java programmers a simple, yet effective way to make use of multiple
processors in a clean, scalable manner. To answer this need, JDK 7 added
several new classes and interfaces that support parallel programming. They are
commonly referred to as the Fork/Join
Framework. It is one of the more important additions that has recently been
made to the Java class library. The Fork/Join Framework is defined in the java.util.concurrent package.
The Fork/Join Framework
enhances multithreaded programming in two important ways. First, it simplifies
the creation and use of multiple threads. Second, it automatically makes use of
multiple processors. In other words, by using the Fork/Join Framework you
enable your applications to automatically scale to make use of the number of
available processors. These two features make the Fork/Join Framework the
recommended approach to multithreading when parallel processing is desired.
Before continuing, it is
important to point out the distinction between traditional multithreading and
parallel programming. In the past, most computers had a single CPU and
multithreading was primarily used to take advantage of idle time, such as when
a program is waiting for user input. Using this approach, one thread can
execute while another is waiting. In other words, on a single-CPU system,
multithreading is used to allow two or more tasks to share the CPU. This type
of multithreading is typically supported by an object of type Thread (as described in Chapter 11).
Although this type of multithreading will always remain quite useful, it was
not optimized for situations in which two or more CPUs are available (multicore
computers).
When multiple CPUs are
present, a second type of multithreading capability that supports true parallel
execution is required. With two or more CPUs, it is possible to execute
portions of a program simultaneously, with each part executing on its own CPU.
This can be used to significantly speed up the execution of some types of
operations, such as sorting, transforming, or searching a large array. In many
cases, these types of operations can be broken down into smaller pieces (each
acting on a portion of the array), and each piece can be run on its own CPU. As
you can imagine, the gain in efficiency can be enormous. Simply put: Parallel
programming will be part of nearly every programmer’s future because it offers
a way to dramatically improve program performance.
The
Main Fork/Join Classes
The Fork/Join Framework is
packaged in java.util.concurrent. At
the core of the Fork/Join Framework are the following four classes:
ForkJoinTask<V> : An
abstract class that defines a task
ForkJoinPool : Manages the
execution of ForkJoinTasks
RecursiveAction : A subclass
of ForkJoinTask<V> for tasks that do not return values
RecursiveTask<V> : A
subclass of ForkJoinTask<V> for tasks that return values
Here is how they relate. A ForkJoinPool manages the execution of ForkJoinTasks. ForkJoinTask is an abstract class that is extended by the abstract
classes RecursiveAction and RecursiveTask. Typically, your code
will extend these classes to create a task. Before looking at the process in
detail, an overview of the key aspects of each class will be helpful.
ForkJoinTask<V>
ForkJoinTask<V> is an abstract class that defines a task that
can be managed by a ForkJoinPool. The type parameter V specifies the result type of the
task. ForkJoinTask differs from Thread in that ForkJoinTask represents lightweight abstraction of a task, rather
than a thread of execution. ForkJoinTasks
are executed by threads managed by a thread pool of type ForkJoinPool. This mechanism allows a large number of tasks to be
managed by a small number of actual
threads. Thus, ForkJoinTasks are
very efficient when compared to threads.
ForkJoinTask defines many methods. At the core are fork( ) and join( ),
shown here: final ForkJoinTask<V>
fork( )
final V join( )
The fork( ) method submits the invoking task for asynchronous execution
of the invoking task. This means that the thread that calls fork( ) continues to run. The fork( ) method returns this after the task is scheduled for
execution. Prior to JDK 8, fork( )
could be executed only from within the computational portion of another ForkJoinTask, which is running within a
ForkJoinPool. (You will see how to
create the computational portion of a task shortly.) However, with the advent
of JDK 8, if fork( ) is not called
while executing within a ForkJoinPool,
then a common pool is automatically used. The join( ) method waits until the task on which it is called
terminates. The result of the task is returned. Thus, through the use of fork( ) and join( ), you can start one or more new tasks and then wait for them
to finish.
Another important ForkJoinTask method is invoke( ). It combines the fork and
join operations into a single call because it begins a task and then waits for
it to end. It is shown here:
final V invoke( )
The result of the invoking
task is returned.
You can invoke more than one
task at a time by using invokeAll( ).
Two of its forms are shown here:
static void
invokeAll(ForkJoinTask<?> taskA,
ForkJoinTask<?> taskB) static
void invokeAll(ForkJoinTask<?> ... taskList)
In the first case, taskA and taskB are executed. In the second case, all specified tasks are
executed. In both cases, the calling thread waits until all of the specified
tasks have terminated. Prior to JDK 8, the invokeAll(
) method could be executed only from within the computational portion of
another ForkJoinTask, which is
running within a ForkJoinPool. JDK
8’s inclusion of the common pool relaxed this requirement.
RecursiveAction
A subclass of ForkJoinTask is RecursiveAction. This class encapsulates a task that does not
return a result. Typically, your code will extend RecursiveAction to create a task that has a void return type.
RecursiveAction specifies four methods, but only one is usually of
interest: the abstract method called
compute( ). When you extend RecursiveAction to create a concrete
class, you will put the code that defines the task inside compute( ). The compute( )
method represents the computational
portion of the task.
The compute( ) method is defined by RecursiveAction like this: protected abstract void compute( )
Notice that compute( ) is protected and abstract.
This means that it must be implemented by a subclass (unless that subclass is
also abstract).
In general, RecursiveAction is used to implement a
recursive, divide-and-conquer strategy for tasks that don’t return results.
(See “The Divide-and-Conquer Strategy” later in this chapter.)
RecursiveTask<V>
Another subclass of ForkJoinTask is RecursiveTask<V>. This class encapsulates a task that returns
a result. The result type is specified by V.
Typically, your code will extend
RecursiveTask<V> to create a task that returns a value. Like RecursiveAction, it too specifies four methods, but often only the
abstract compute( ) method is used,
which represents the computational portion of the task. When you extend RecursiveTask<V> to create a
concrete class, put the code that represents the task inside compute( ). This code must also return
the result of the task.
The compute( ) method is defined by RecursiveTask<V> like this: protected abstract V compute( )
Notice that compute( ) is protected and abstract.
This means that it must be implemented by a subclass. When implemented, it must
return the result of the task.
In general, RecursiveTask is used to implement a
recursive, divide-and-conquer strategy for tasks that return results. (See “The
Divide-and-Conquer Strategy” later in this chapter.)
ForkJoinPool
The execution of ForkJoinTasks takes place within a ForkJoinPool, which also manages the
execution of the tasks. Therefore, in order to execute a ForkJoinTask, you must first have a ForkJoinPool. Beginning with JDK 8, there are two ways to acquire a ForkJoinPool. First, you can explicitly create one by using
a ForkJoinPool constructor. Second,
you can use what is referred to as the common
pool. The common pool (which was added by JDK 8) is a static ForkJoinPool that is automatically
available for your use. Each method is introduced here, beginning with manually constructing a pool.
ForkJoinPool defines several constructors. Here are two commonly used ones: ForkJoinPool( )
ForkJoinPool(int pLevel)
The first creates a default
pool that supports a level of parallelism equal to the number of processors
available in the system. The second lets you specify the level of parallelism.
Its value must be greater than zero and not more than the limits of the
implementation. The level of parallelism determines the number of threads that
can execute concurrently. As a result, the level of parallelism effectively
determines the number of tasks that can be executed simultaneously. (Of course,
the number of tasks that can execute simultaneously cannot exceed the number of
processors.) It is important to understand that the level of parallelism does not, however, limit the number of
tasks that can be managed by the pool. A ForkJoinPool
can manage many more tasks than its level of parallelism. Also, the level of
parallelism is only a target. It is not a guarantee.
After you have created an
instance of ForkJoinPool, you can
start a task in a number of different ways. The first task started is often
thought of as the main task. Frequently, the main task begins subtasks that are
also managed by the pool. One common way to begin a main task is to call invoke( ) on the ForkJoinPool. It is shown here:
<T> T
invoke(ForkJoinTask<T> task)
This method begins the task
specified by task, and it returns the
result of the task. This means that the calling code waits until invoke( ) returns.
To start a task without
waiting for its completion, you can use execute(
). Here is one of its forms:
void
execute(ForkJoinTask<?> task)
In this case, task is started, but the calling code
does not wait for its completion. Rather, the calling code continues execution
asynchronously.
Beginning with JDK 8, it is
not necessary to explicitly construct a ForkJoinPool
because a common pool is available for your use. In general, if you are not
using a pool that you explicitly created, then the common pool will
automatically be used. Although it won’t always be necessary, you can obtain a
reference to the common pool by calling commonPool(
), which is defined by ForkJoinPool.
It is shown here:
static ForkJoinPool
commonPool( )
A reference to the common
pool is returned. The common pool provides a default level of parallelism. It
can be set by use of a system property. (See the API documentation for
details.) Typically, the default common pool is a good choice for many
applications. Of course, you can always construct your own pool.
There are two basic ways to
start a task using the common pool. First, you can obtain a reference to the
pool by calling commonPool( ) and
then use that reference to call invoke(
) or execute( ), as just
described. Second, you can call ForkJoinTask
methods such as fork( ) or invoke( ) on the task from outside its
computational portion. In this case, the common pool will automatically be
used. In other words, fork( ) and invoke( ) will start a task using the
common pool if the task is not already running within a ForkJoinPool.
ForkJoinPool manages the execution of its threads using an approach called work-stealing. Each worker thread maintains a queue
of tasks. If one worker thread’s queue is empty, it will take a task from
another worker thread. This adds to overall efficiency and helps maintain a
balanced load. (Because of demands on CPU time by other processes in the
system, even two worker threads with identical tasks in their respective queues
may not complete at the same time.)
One other point: ForkJoinPool uses daemon threads. A
daemon thread is automatically terminated when all user threads have
terminated. Thus, there is no need to explicitly shut down a ForkJoinPool. However, with the
exception of the common pool, it is possible to do so by calling shutdown( ). The shutdown( ) method has no effect on the common pool.
The
Divide-and-Conquer Strategy
As a general rule, users of
the Fork/Join Framework will employ a divide-and-conquer
strategy that is based on recursion. This is why the two subclasses of ForkJoinTask are called RecursiveAction and RecursiveTask. It is anticipated that you will extend one of these classes when creating your own
fork/join task.
The divide-and-conquer
strategy is based on recursively dividing a task into smaller subtasks until
the size of a subtask is small enough to be handled sequentially. For example,
a task that applies a transform to each element in an array of N integers can be broken down into two
subtasks in which each transforms half the elements in the array. That is, one
subtask transforms the elements 0 to N/2,
and the other transforms the elements N/2
to N. In turn, each subtask can be
reduced to another set of subtasks, each transforming half of the remaining
elements. This process of dividing the array will continue until a threshold is
reached in which a sequential solution is faster than creating another
division.
The advantage of the
divide-and-conquer strategy is that the processing can occur in parallel.
Therefore, instead of cycling through an entire array using a single thread,
pieces of the array can be processed simultaneously. Of course, the
divide-and-conquer approach works in many cases in which an array (or
collection) is not present, but the most common uses involve some type of
array, collection, or grouping of data.
One of the keys to best
employing the divide-and-conquer strategy is correctly selecting the threshold
at which sequential processing (rather than further division) is used.
Typically, an optimal threshold is obtained through profiling the execution
characteristics. However, very significant speed-ups will still occur even when
a less-than-optimal threshold is used. It is, however, best to avoid overly
large or overly small thresholds. At the time of this writing, the Java API documentation
for ForkJoinTask<T> states
that, as a rule-of-thumb, a task should perform somewhere between 100 and
10,000 computational steps.
It is also important to
understand that the optimal threshold value is also affected by how much time
the computation takes. If each computational step is fairly long, then smaller
thresholds might be better. Conversely, if each computational step is quite
short, then larger thresholds could yield better results. For applications that
are to be run on a known system, with a known number of processors, you can use
the number of processors to make informed decisions about the threshold value.
However, for applications that will be running on a variety of systems, the
capabilities of which are not known in advance, you can make no assumptions
about the execution environment.
One other point: Although
multiple processors may be available on a system, other tasks (and the
operating system, itself) will be competing with your application for CPU time.
Thus, it is important not to assume that your program will have unrestricted
access to all CPUs. Furthermore, different runs of the same program may display
different run time characteristics because of varying task loads.
A
Simple First Fork/Join Example
At this point, a simple example
that demonstrates the Fork/Join Framework and the divide-and-conquer strategy
will be helpful. Following is a program that transforms the elements in an
array of double into their square
roots. It does so via a subclass of RecursiveAction.
Notice that it creates its own ForkJoinPool.
// A simple example of the basic
divide-and-conquer strategy.
//In this case, RecursiveAction is used.
import java.util.concurrent.*;
import java.util.*;
//A ForkJoinTask (via RecursiveAction) that
transforms
//the elements in an array of doubles into
their square roots.
class SqrtTransform extends RecursiveAction {
//The threshold value is arbitrarily set at
1,000 in this example.
//In real-world code, its optimal value can be
determined by
//profiling and experimentation.
final int seqThreshold = 1000;
//Array to be accessed.
double[] data;
//Determines what part of data to process.
int start, end;
SqrtTransform(double[] vals, int s, int e ) {
data = vals;
start = s; end = e;
}
// This is the method in which parallel
computation will occur.
protected void compute() {
//If number of elements is below the sequential
threshold,
//then process sequentially.
if((end - start) < seqThreshold) {
// Transform each element into its square root.
for(int i = start; i < end; i++) {
data[i] = Math.sqrt(data[i]);
}
}
else {
//Otherwise, continue to break the data into
smaller pieces.
//Find the midpoint.
int middle = (start + end) / 2;
// Invoke new tasks, using the subdivided data.
invokeAll(new SqrtTransform(data, start,
middle),
new SqrtTransform(data, middle, end));
}
}
// Demonstrate parallel execution.
class ForkJoinDemo {
public static void main(String args[]) { //
Create a task pool.
ForkJoinPool fjp = new ForkJoinPool();
double[] nums = new double[100000];
// Give nums some values.
for(int i = 0; i < nums.length; i++) nums[i]
= (double) i;
System.out.println("A portion of the
original sequence:");
for(int i=0; i < 10; i++)
System.out.print(nums[i] + " ");
System.out.println("\n");
SqrtTransform task = new SqrtTransform(nums, 0,
nums.length);
// Start the main ForkJoinTask.
fjp.invoke(task);
System.out.println("A portion of the
transformed sequence" + " (to four decimal places):");
for(int i=0; i < 10; i++)
System.out.format("%.4f ", nums[i]);
System.out.println();
}
}
The output from the program
is shown here:
A portion of the original sequence: 0.0 1.0 2.0
3.0 4.0 5.0 6.0 7.0 8.0 9.0
A portion of the transformed sequence (to four
decimal places): 0.0000 1.0000 1.4142 1.7321 2.0000 2.2361 2.4495 2.6458 2.8284
3.0000
As you can see, the values of
the array elements have been transformed into their square roots. Let’s look
closely at how this program works. First, notice that SqrtTransform is a class
that extends RecursiveAction. As explained, RecursiveAction extends ForkJoinTask for tasks that do not
return results. Next, notice the final
variable seqThreshold. This is the
value that determines when sequential processing will take place. This value is
set (somewhat arbitrarily) to 1,000. Next, notice that a reference to the array
to be processed is stored in data and
that the fields start and end are used to indicate the
boundaries of the elements to be
accessed.
The main action of the
program takes place in compute( ).
It begins by checking if the number of elements to be processed is below the
sequential processing threshold. If it is, then those elements are processed
(by computing their square root in this example). If the sequential processing
threshold has not been reached, then two new tasks are started by calling invokeAll( ). In this case, each
subtask processes half the elements. As explained earlier, invokeAll( ) waits until both tasks return. After all of the
recursive calls unwind, each element in the array will have been modified, with
much of the action taking place in parallel (if multiple processors are
available).
As mentioned, beginning with
JDK 8, it is not necessary to explicitly construct a ForkJoinPool because a common pool is available for your use.
Furthermore, using the common pool
is a simple matter. For example, you can obtain a reference to the common pool
by calling the static commonPool( )
method defined by ForkJoinPool.
Therefore, the preceding program could be rewritten to use the common pool by
replacing the call to the ForkJoinPool constructor
with a call to commonPool( ), as
shown here:
ForkJoinPool fjp = ForkJoinPool.commonPool();
Alternatively, there is no
need to explicitly obtain a reference to the common pool because calling the ForkJoinTask methods invoke( ) or fork( ) on a task that is not already part of a pool will cause it
to execute within the common pool automatically. For example, in the preceding
program, you can eliminate the fjp
variable entirely and start the task using this line:
task.invoke();
As this discussion shows, the
common pool is one of the enhancements JDK 8 made to the Fork/Join Framework
that improves its ease-of-use. Furthermore, in many cases, the common pool is
the preferable approach, assuming that JDK 7 compatibility is not required.
Understanding
the Impact of the Level of Parallelism
Before moving on, it is
important to understand the impact that the level of parallelism has on the
performance of a fork/join task and how the parallelism and the threshold
interact. The program shown in this section lets you experiment with different
degrees of parallelism and threshold values. Assuming that you are using a
multicore computer, you can interactively observe the effect of these values.
In the preceding example, the
default level of parallelism was used. However, you can specify the level of
parallelism that you want. One way is to specify it when you create a ForkJoinPool using this constructor:
ForkJoinPool(int pLevel)
Here, pLevel specifies the level of parallelism, which must be greater
than zero and less than the implementation defined limit.
The following program creates
a fork/join task that transforms an array of doubles. The transformation is arbitrary, but it is designed to
consume several CPU cycles. This was done to ensure that the effects of
changing the threshold or the level of parallelism would be more clearly
displayed. To use the program, specify the threshold value and the level of
parallelism on the command line. The program then runs the tasks. It also
displays the amount of time it takes the tasks to run. To do this, it uses System.nanoTime( ), which returns the
value of the JVM’s high-resolution timer.
//A simple program that lets you experiment
with the effects of
//changing the threshold and parallelism of a
ForkJoinTask.
import java.util.concurrent.*;
//A ForkJoinTask (via RecursiveAction) that
performs a
//a transform on the elements of an array of
doubles.
class Transform extends RecursiveAction {
//Sequential threshold, which is set by the
constructor.
int seqThreshold;
//Array to be accessed.
double[] data;
// Determines what part of data to process.
int start, end;
Transform(double[] vals, int s, int e, int t )
{ data = vals;
start = s; end = e;
seqThreshold = t;
}
// This is the method in which parallel
computation will occur.
protected void compute() {
//If number of elements is below the sequential
threshold,
//then process sequentially.
if((end - start) < seqThreshold) {
//The following code assigns an element at an
even index the
//square root of its original value. An element
at an odd
//index is assigned its cube root. This code is
designed
//to simply consume CPU time so that the
effects of concurrent
//execution are more readily observable.
for(int i = start; i < end; i++) {
if((data[i] % 2) == 0)
data[i] = Math.sqrt(data[i]); else
data[i] = Math.cbrt(data[i]);
}
}
else {
//Otherwise, continue to break the data into
smaller pieces.
//Find the midpoint.
int middle = (start + end) / 2;
// Invoke new tasks, using the subdivided data.
invokeAll(new Transform(data, start, middle,
seqThreshold),
new Transform(data, middle, end,
seqThreshold));
}
}
}
// Demonstrate parallel execution.
class FJExperiment {
public static void main(String args[]) { int
pLevel;
int threshold;
if(args.length != 2) {
System.out.println("Usage: FJExperiment
parallelism threshold "); return;
}
pLevel = Integer.parseInt(args[0]); threshold =
Integer.parseInt(args[1]);
// These variables are used to time the task.
long beginT, endT;
// Create a task pool. Notice that the
parallelism level is set.
ForkJoinPool fjp = new ForkJoinPool(pLevel);
double[] nums = new double[1000000];
for(int i = 0; i < nums.length; i++) nums[i]
= (double) i;
Transform task = new Transform(nums, 0,
nums.length, threshold);
// Starting timing.
beginT = System.nanoTime();
//Start the main ForkJoinTask.
fjp.invoke(task);
End timing.
endT = System.nanoTime();
System.out.println("Level of parallelism:
" + pLevel); System.out.println("Sequential threshold: " +
threshold); System.out.println("Elapsed time: " + (endT - beginT) +
" ns"); System.out.println();
}
}
To use the program, specify
the level of parallelism followed by the threshold limit. You should try
experimenting with different values for each, observing the results. Remember,
to be effective, you must run the code on a computer with at least two
processors. Also, understand that two different runs may (almost certainly
will) produce different results because of the effect of other processes in the
system consuming CPU time.
To give you an idea of the
difference that parallelism makes, try this experiment. First, execute the
program like this:
java FJExperiment 1 1000
This requests 1 level of
parallelism (essentially sequential execution) with a threshold of 1,000. Here
is a sample run produced on a dual-core computer:
Level of parallelism: 1
Sequential threshold: 1000
Elapsed time: 259677487 ns
Now, specify 2 levels of
parallelism like this:
java FJExperiment 2 1000
Here is sample output from
this run produced by the same dual-core computer:
Level of parallelism: 2 Sequential threshold:
1000 Elapsed time: 169254472 ns
As is evident, adding
parallelism substantially decreases execution time, thus increasing the speed
of the program. You should experiment with varying the threshold and
parallelism on your own computer. The results may surprise you.
Here are two other methods
that you might find useful when experimenting with the execution
characteristics of a fork/join program. First, you can obtain the level of
parallelism by calling getParallelism( ),
which is defined by ForkJoinPool. It
is shown here:
int getParallelism( )
It returns the parallelism
level currently in effect. Recall that for pools that you create, by default,
this value will equal the number of available processors. (To obtain the
parallelism level for the common pool, you can also use getCommonPoolParallelism( ), which was added by JDK 8.) Second, you
can obtain the number of processors available in the system by calling availableProcessors( ), which is
defined by the Runtime class. It is
shown here:
int availableProcessors( )
The value returned may change
from one call to the next because of other system demands.
An
Example that Uses RecursiveTask<V>
The two preceding examples
are based on RecursiveAction, which
means that they concurrently execute tasks that do not return results. To
create a task that returns a result, use RecursiveTask.
In general, solutions are designed in the same manner as just shown. The key
difference is that the compute( )
method returns a result. Thus, you must aggregate the results, so that when the
first invocation finishes, it returns the overall result. Another difference is
that you will typically start a subtask by calling fork( ) and join( )
explicitly (rather than implicitly by calling invokeAll( ), for example).
The following program
demonstrates RecursiveTask. It
creates a task called Sum that
returns the summation of the values in an array of double. In this example, the array consists of 5,000 elements.
However, every other value is negative. Thus, the first values in the array are
0, –1, 2, –3, 4, and so on. (So that this example will work with both JDK 7 and
JDK 8, it creates its own pool. You might try changing it to use the common
pool as an exercise.)
//A simple example that uses
RecursiveTask<V>.
import java.util.concurrent.*;
//A RecursiveTask that computes the summation
of an array of doubles.
class Sum extends RecursiveTask<Double> {
//The sequential threshold value.
final int seqThresHold = 500;
//Array to be accessed.
double[] data;
// Determines what part of data to process.
int start, end;
Sum(double[] vals, int s, int e ) { data =
vals;
start = s; end = e;
}
// Find the summation of an array of doubles.
protected Double compute() {
double sum = 0;
//If number of elements is below the sequential
threshold,
//then process sequentially.
if((end - start) < seqThresHold) { // Sum
the elements.
for(int i = start; i < end; i++) sum +=
data[i];
}
else {
//Otherwise, continue to break the data into
smaller pieces.
//Find the midpoint.
int middle = (start + end) / 2;
//Invoke new tasks, using the subdivided data.
Sum subTaskA = new Sum(data, start, middle);
Sum subTaskB = new Sum(data, middle, end);
//Start each subtask by forking.
subTaskA.fork();
subTaskB.fork();
//Wait for the subtasks to return, and
aggregate the results.
sum = subTaskA.join() + subTaskB.join();
}
// Return the final sum.
return sum;
}
}
// Demonstrate parallel execution.
class RecurTaskDemo {
public static void main(String args[]) { //
Create a task pool.
ForkJoinPool fjp = new ForkJoinPool();
double[] nums = new double[5000];
Initialize nums with values that alternate
between
positive and negative.
for(int i=0; i < nums.length; i++)
nums[i] = (double) (((i%2) == 0) ? i : -i) ;
Sum task = new Sum(nums, 0, nums.length);
//Start the ForkJoinTasks. Notice that, in this case,
invoke() returns a result.
double summation = fjp.invoke(task);
System.out.println("Summation " +
summation);
}
}
Here’s the output from the
program:
Summation -2500.0
There are a couple of
interesting items in this program. First, notice that the two subtasks are
executed by calling fork( ), as
shown here:
subTaskA.fork();
subTaskB.fork();
In this case, fork( ) is used because it starts a
task but does not wait for it to finish. (Thus, it asynchronously runs the
task.) The result of each task is obtained by calling join( ), as shown here:
sum = subTaskA.join() + subTaskB.join();
This statement waits until
each task ends. It then adds the results of each and assigns the total to sum. Thus, the summation of each
subtask is added to the running total. Finally, compute( ) ends by returning
sum, which will be the final total when the first invocation returns.
There are other ways to
approach the handling of the asynchronous execution of the subtasks. For
example, the following sequence uses fork(
) to start subTaskA and uses invoke( ) to start and wait for subTaskB:
subTaskA.fork();
sum = subTaskB.invoke() + subTaskA.join();
Another alternative is to
have subTaskB call compute( ) directly, as shown here:
subTaskA.fork();
sum = subTaskB.compute() + subTaskA.join();
Executing
a Task Asynchronously
The preceding programs have
called invoke( ) on a ForkJoinPool to initiate a task. This
approach is commonly used when the calling thread must wait until the task has
completed (which is often the case) because invoke( ) does not return until the task has terminated. However,
you can start a task asynchronously. In this approach, the calling thread
continues to execute. Thus, both the calling thread and the task execute
simultaneously. To start a task asynchronously, use execute( ), which is also defined by ForkJoinPool. It has the two forms shown here:
void
execute(ForkJoinTask<?> task)
void execute(Runnable task)
In both forms, task specifies the task to run. Notice
that the second form lets you specify a Runnable
rather than a ForkJoinTask task.
Thus, it forms a bridge between Java’s traditional approach to multithreading and the new Fork/Join Framework. It is
important to remember that the threads used by a ForkJoinPool are daemon. Thus, they will end when the main thread
ends. As a result, you may need to keep the main thread alive until the tasks
have finished.
Cancelling
a Task
A task can be cancelled by
calling cancel( ), which is defined
by ForkJoinTask. It has this general
form:
boolean cancel(boolean interuptOK)
It returns true if the task on which it was called
is cancelled. It returns false if
the task has ended or can’t be cancelled. At this time, the interruptOK parameter is not used by the
default implementation. In general, cancel(
) is intended to be called from code outside the task because a task can
easily cancel itself by returning.
You can determine if a task
has been cancelled by calling isCancelled(
), as shown here: final boolean isCancelled( )
It returns true if the invoking task has been
cancelled prior to completion and false
otherwise.
Determining
a Task’s Completion Status
In addition to isCancelled( ), which was just
described, ForkJoinTask includes two
other methods that you can use to determine a task’s completion status. The
first is isCompletedNormally( ),
which is shown here:
final boolean isCompletedNormally(
)
It returns true if the invoking task completed
normally, that is, if it did not throw an exception and it was not cancelled
via a call to cancel( ). It returns false otherwise.
The second is isCompletedAbnormally( ), which is
shown here: final boolean isCompletedAbnormally( )
It returns true if the invoking task completed
because it was cancelled or because it threw an exception. It returns false otherwise.
Restarting
a Task
Normally, you cannot rerun a
task. In other words, once a task completes, it cannot be restarted. However,
you can reinitialize the state of the task (after it has completed) so it can
be run again. This is done by calling reinitialize(
), as shown here:
void reinitialize( )
This method resets the state
of the invoking task. However, any modification made to any persistent data
that is operated upon by the task will not be undone. For example, if the task
modifies an array, then those modifications are not undone by calling reinitialize( ).
Things
to Explore
The preceding discussion
presented the fundamentals of the Fork/Join Framework and described several
commonly used methods. However, Fork/Join is a rich framework that includes
additional capabilities that give you extended control over concurrency. Although
it is far beyond the scope of this book to examine all of the issues and
nuances surrounding parallel programming and the Fork/Join Framework, a
sampling of the other features are mentioned here.
A
Sampling of Other ForkJoinTask Features
In some cases, you will want
to ensure that methods such as invokeAll(
) and fork( ) are called only
from within a ForkJoinTask. (This
may be especially important when using JDK 7, which does not support the common
pool.) This is usually a simple matter, but occasionally, you may have code
that can be executed from either inside or outside a task. You can determine if
your code is executing inside a task by calling inForkJoinPool( ).
You can convert a Runnable or Callable object into a ForkJoinTask
by using the adapt( ) method defined
by ForkJoinTask. It has three forms,
one for converting a Callable, one
for a Runnable that does not return
a result, and one for a Runnable that
does return a result. In the case of
a Callable, the call( ) method is run. In the case of Runnable, the run( )
method is run.
You can obtain an approximate
count of the number of tasks that are in the queue of the invoking thread by
calling getQueuedTaskCount( ). You
can obtain an approximate count of how many tasks the invoking thread has in
its queue that are in excess of the number of other threads in the pool that
might “steal” them, by calling getSurplusQueuedTaskCount(
). Remember, in the Fork/Join Framework, work-stealing is one way in which
a high level of efficiency is obtained. Although this process is automatic, in
some cases, the information may prove helpful in optimizing through-put.
ForkJoinTask defines the following variants of join( ) and invoke( ) that
begin with the prefix quietly. They are shown here:
In essence, these methods are
similar to their non-quiet counterparts except they don’t return values or
throw exceptions.
You can attempt to
“un-invoke” (in other words, unschedule) a task by calling tryUnfork( ). JDK 8 adds several methods, such as getForkJoinTaskTag( ) and setForkJoinTaskTag( ),
that support tags. Tags are
short integer values that are linked with a task. They may be useful in
specialized applications.
ForkJoinTask implements Serializable.
Thus, it can be serialized. However, serialization is not used during execution.
A
Sampling of Other ForkJoinPool Features
One method that is quite
useful when tuning fork/join applications is ForkJoinPool’s override of toString(
). It displays a “user-friendly” synopsis of the state of the pool. To see
it in action, use this sequence to start and then wait for the task in the FJExperiment class of the task
experimenter program shown earlier:
//Asynchronously start the main ForkJoinTask.
fjp.execute(task);
//Display the state of the pool while waiting.
while(!task.isDone()) {
System.out.println(fjp);
}
When you run the program, you
will see a series of messages on the screen that describe the state of the
pool. Here is an example of one. Of course, your output may vary, based on the
number of processors, threshold values, task load, and so on.
java.util.concurrent.ForkJoinPool@141d683[Running,
parallelism = 2,
size = 2, active = 0, running = 2, steals = 0,
tasks = 0, submissions = 1]
You can determine if a pool
is currently idle by calling isQuiescent(
). It returns true if the pool
has no active threads and false
otherwise.
You can obtain the number of
worker threads currently in the pool by calling getPoolSize( ). You can obtain an approximate count of the active
threads in the pool by calling getActiveThreadCount( ).
To shut down a pool, call shutdown( ). Currently active tasks
will still be executed, but no new tasks can be started. To stop a pool
immediately, call shutdownNow( ). In
this case, an attempt is made to cancel currently active tasks. (It is
important to point out, however, that neither of these methods affects the
common pool.) You can determine if a pool is shut down by calling isShutdown( ). It returns true if the pool has been shut down and
false otherwise. To determine if the
pool has been shut down and all tasks have been completed, call isTerminated( ).
Some
Fork/Join Tips
Here are a few tips to help
you avoid some of the more troublesome pitfalls associated with using the
Fork/Join Framework. First, avoid using a sequential threshold that is too low.
In general, erring on the high side is better than erring on the low side. If
the threshold is too low, more time can be consumed generating and switching
tasks than in processing the tasks. Second, usually it is best to use the default
level of parallelism. If you specify a smaller number, it may significantly
reduce the benefits of using the Fork/Join Framework.
In general, a ForkJoinTask should not use
synchronized methods or synchronized blocks of code. Also, you will not normally
want to have the compute( ) method
use other types of synchronization, such as a semaphore. (The new Phaser can, however, be used when
appropriate because it is compatible with the fork/join mechanism.) Remember,
the main idea behind a ForkJoinTask
is the divide-and-conquer strategy. Such an approach does not normally lend
itself to situations in which outside synchronization is needed. Also, avoid
situations in which substantial blocking will occur through I/O. Therefore, in
general,
a ForkJoinTask will not perform I/O. Simply put, to best utilize the
Fork/Join Framework, a task should perform a computation that can run without
outside blocking or synchronization.
One last point: Except under
unusual circumstances, do not make assumptions about the execution environment
that your code will run in. This means you should not assume that some specific
number of processors will be available, or that the execution characteristics
of your program won’t be affected by other processes running at the same time.
Related Topics
Privacy Policy, Terms and Conditions, DMCA Policy and Compliant
Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.