Phaser
Another synchronization class
is called Phaser. Its primary
purpose is to enable the synchronization of threads that represent one or more
phases of activity. For example, you might have a set of threads that implement
three phases of an order-processing application. In the first phase, separate
threads are used to validate customer information, check inventory, and confirm
pricing. When that phase is complete, the second phase has two threads that
compute shipping costs and all applicable tax. After that, a final phase
confirms payment and determines estimated shipping time. In the past, to
synchronize the multiple threads that comprise this scenario would require a
bit of work on your part. With the inclusion of Phaser, the process is now much easier.
To begin, it helps to know
that a Phaser works a bit like a CyclicBarrier, described earlier,
except that it supports multiple phases. As a result, Phaser lets you define a synchronization object that waits until a
specific phase has completed. It then advances to the next phase, again waiting
until that phase concludes. It is important to understand that Phaser can also be used to synchronize
only a single phase. In this regard, it acts much like a CyclicBarrier. However, its primary use is to synchronize multiple
phases.
Phaser defines four constructors. Here are the two used in this section: Phaser( )
Phaser(int numParties)
The first creates a phaser
that has a registration count of zero. The second sets the registration count
to numParties. The term party is often applied to the objects
that register with a phaser. Although often there is a one-to-correspondence
between the number of registrants and the number of threads being synchronized,
this is not required. In both cases, the current phase is zero. That is, when a
Phaser is created, it is initially
at phase zero.
In general, here is how you
use Phaser. First, create a new
instance of Phaser. Next, register
one or more parties with the phaser, either by calling register( ) or by specifying the number of parties in the
constructor. For each registered party, have the phaser wait until all
registered parties complete a phase. A party signals this by calling one of a
variety of methods supplied by Phaser,
such as arrive( ) or arriveAndAwaitAdvance( ). After all
parties have arrived, the phase is complete, and the phaser can move on to the
next phase (if there is one), or terminate. The following sections explain the
process in detail.
To register parties after a Phaser has been constructed, call register( ). It is shown here: int
register()
It returns the phase number
of the phase to which it is registered.
To signal that a party has
completed a phase, it must call arrive(
) or some variation of arrive( ).
When the number of arrivals equals the number of registered parties, the phase
is completed and the Phaser moves on to the next phase (if
there is one). The arrive( ) method
has this general form:
int arrive( )
This method signals that a
party (normally a thread of execution) has completed some task (or portion of a
task). It returns the current phase number. If the phaser has been terminated,
then it returns a negative value. The arrive(
) method does not suspend execution of the calling thread. This means that
it does not wait for the phase to be completed. This method should be called
only by a registered party.
If you want to indicate the
completion of a phase and then wait until all other registrants have also
completed that phase, use arriveAndAwaitAdvance(
). It is shown here:
int arriveAndAwaitAdvance( )
It waits until all parties
have arrived. It returns the next phase number or a negative value if the
phaser has been terminated. This method should be called only by a registered
party.
A thread can arrive and then
deregister itself by calling arriveAndDeregister(
). It is shown here:
int arriveAndDeregister( )
It returns the current phase
number or a negative value if the phaser has been terminated. It does not wait
until the phase is complete. This method should be called only by a registered
party.
To obtain the current phase
number, call getPhase( ), which is
shown here: final int getPhase( )
When a Phaser is created, the first phase will be 0, the second phase 1,
the third phase 2, and so on. A negative value is returned if the invoking Phaser has been terminated.
Here is an example that shows
Phaser in action. It creates three
threads, each of which have three phases. It uses a Phaser to synchronize each phase.
// An example of Phaser.
import java.util.concurrent.*;
class PhaserDemo {
public static void main(String args[]) { Phaser
phsr = new Phaser(1);
int curPhase;
System.out.println("Starting");
new MyThread(phsr, "A"); new
MyThread(phsr, "B"); new MyThread(phsr, "C");
Wait for all threads to complete phase one.
curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance();
System.out.println("Phase " + curPhase + " Complete");
Wait for all threads to complete phase two.
curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance();
System.out.println("Phase " + curPhase + " Complete");
curPhase = phsr.getPhase();
phsr.arriveAndAwaitAdvance(); System.out.println("Phase " + curPhase
+ " Complete");
// Deregister the main thread.
phsr.arriveAndDeregister();
if(phsr.isTerminated())
System.out.println("The Phaser is
terminated");
}
}
// A thread of execution that uses a Phaser.
class MyThread implements Runnable {
Phaser phsr; String name;
MyThread(Phaser p, String n) { phsr = p;
name = n; phsr.register();
new Thread(this).start();
}
public void run() {
System.out.println("Thread " + name +
" Beginning Phase One"); phsr.arriveAndAwaitAdvance(); // Signal
arrival.
//Pause a bit to prevent jumbled output. This
is for illustration
//only. It is not required for the proper
operation of the phaser.
try {
Thread.sleep(10);
} catch(InterruptedException e) {
System.out.println(e);
}
System.out.println("Thread " + name +
" Beginning Phase Two"); phsr.arriveAndAwaitAdvance(); // Signal
arrival.
Pause a bit to prevent jumbled output. This is
for illustration
only. It is not required for the proper
operation of the phaser. try {
Thread.sleep(10);
} catch(InterruptedException e) {
System.out.println(e);
}
System.out.println("Thread " + name +
" Beginning Phase Three"); phsr.arriveAndDeregister(); // Signal
arrival and deregister.
}
}
The output is shown here:
Starting
Thread A Beginning Phase One
Thread C Beginning Phase One
Thread B Beginning Phase One
Phase 0 Complete
Thread B Beginning Phase Two
Thread C Beginning Phase Two
Thread A Beginning Phase Two
Phase 1 Complete
Thread C Beginning Phase Three
Thread B Beginning Phase Three
Thread A Beginning Phase Three
Phase 2 Complete
The Phaser is terminated
Let’s look closely at the key
sections of the program. First, in main(
), a Phaser called phsr is created with an initial party
count of 1 (which corresponds to the main thread). Then three threads are started by creating three MyThread objects. Notice that MyThread is passed a reference to phsr (the phaser). The MyThread objects use this phaser to
synchronize their activities. Next, main(
) calls getPhase( ) to obtain
the current phase number (which is initially zero) and then calls arriveAndAwaitAdvance( ). This causes main( ) to suspend until phase zero has
completed. This won’t happen until all
MyThreads also call arriveAndAwaitAdvance( ). When this
occurs, main( ) will resume
execution, at which point it displays that phase zero has completed, and it
moves on to the next phase. This process repeats until all three phases have
finished. Then, main( ) calls arriveAndDeregister( ). At that point,
all three MyThreads have also
deregistered. Since this results in there being no registered parties when the
phaser advances to the next phase, the phaser is terminated.
Now look at MyThread. First, notice that the
constructor is passed a reference to the phaser that it will use and then
registers with the new thread as a party on that phaser. Thus, each new MyThread becomes a party registered
with the passed-in phaser. Also notice that each thread has three phases. In
this example, each phase consists of a placeholder that simply displays the
name of the thread and what it is doing. Obviously, in real-world code, the
thread would be performing more meaningful actions. Between the first two
phases, the thread calls arriveAndAwaitAdvance(
). Thus, each thread waits until all threads have completed the phase (and
the main thread is ready). After all threads have arrived (including the main
thread), the phaser moves on to the next phase. After the third phase, each
thread deregisters itself with a call to arriveAndDeregister(
). As the comments in MyThread explain,
the calls to sleep( ) are used for
the purposes of illustration to ensure that
the output is not jumbled because of the multithreading. They are not needed to
make the phaser work properly. If you remove them, the output may look a bit
jumbled, but the phases will still be synchronized correctly.
One other point: Although the
preceding example used three threads that were all of the same type, this is
not a requirement. Each party that uses a phaser can be unique, with each
performing some separate task.
It is possible to take
control of precisely what happens when a phase advance occurs. To do this, you
must override the onAdvance( )
method. This method is called by the run time when a Phaser advances from one phase to the next. It is shown here:
protected boolean
onAdvance(int phase, int numParties)
Here, phase will contain the current phase number prior to being
incremented and numParties will
contain the number of registered parties. To terminate the phaser, onAdvance(
) must return true. To keep the
phaser alive, onAdvance( ) must
return false.
The default version of onAdvance( ) returns true (thus terminating the phaser) when
there are no registered parties. As a general rule, your override should also
follow this practice.
One reason to override onAdvance( ) is to enable a phaser to
execute a specific number of phases and then stop. The following example gives
you the flavor of this usage. It creates a class called MyPhaser that extends Phaser
so that it will run a specified number of phases. It does this by overriding
the onAdvance( ) method. The MyPhaser constructor accepts one
argument, which specifies the number of phases to execute. Notice that MyPhaser automatically registers one
party. This behavior is useful in this example, but the needs
of your own applications may
differ.
//Extend Phaser and override onAdvance() so
that only a specific
//number of phases are executed.
import java.util.concurrent.*;
//Extend MyPhaser to allow only a specific
number of phases
//to be executed.
class MyPhaser extends Phaser { int numPhases;
MyPhaser(int parties, int phaseCount) {
super(parties);
numPhases = phaseCount - 1;
}
//Override onAdvance() to execute the specified
//number of phases.
protected boolean onAdvance(int p, int
regParties) {
This println() statement is for illustration
only.
Normally, onAdvance() will not display output.
System.out.println("Phase " + p + " completed.\n");
//If all phases have completed, return true
if(p == numPhases || regParties == 0) return
true;
// Otherwise, return false.
return false;
}
}
class PhaserDemo2 {
public static void main(String args[]) {
MyPhaser phsr = new MyPhaser(1, 4);
System.out.println("Starting\n");
new MyThread(phsr, "A"); new
MyThread(phsr, "B"); new MyThread(phsr, "C");
// Wait for the specified number of phases to
complete.
while(!phsr.isTerminated()) {
phsr.arriveAndAwaitAdvance();
}
System.out.println("The Phaser is
terminated");
}
}
// A thread of execution that uses a Phaser.
class MyThread implements Runnable {
Phaser phsr; String name;
MyThread(Phaser p, String n) { phsr = p;
name = n; phsr.register();
new Thread(this).start();
}
public void run() {
while(!phsr.isTerminated()) {
System.out.println("Thread " + name +
" Beginning Phase " + phsr.getPhase());
phsr.arriveAndAwaitAdvance();
//Pause a bit to prevent jumbled output. This
is for illustration
//only. It is not required for the proper
operation of the phaser.
try {
Thread.sleep(10);
} catch(InterruptedException e) {
System.out.println(e);
}
}
}
}
The output from the program
is shown here:
Starting
Thread B Beginning Phase 0
Thread A Beginning Phase 0
Thread C Beginning Phase 0
Phase 0 completed.
Thread A Beginning Phase 1
Thread B Beginning Phase 1
Thread C Beginning Phase 1
Phase 1 completed.
Thread C Beginning Phase 2
Thread B Beginning Phase 2
Thread A Beginning Phase 2
Phase 2 completed.
Thread C Beginning Phase 3
Thread B Beginning Phase 3
Thread A Beginning Phase 3
Phase 3 completed.
The Phaser is terminated
Inside main( ), one instance of Phaser
is created. It is passed 4 as an argument, which means that it will execute
four phases and then stop. Next, three threads are created and then the
following loop is entered:
// Wait for the specified number of phases to
complete.
while(!phsr.isTerminated()) {
phsr.arriveAndAwaitAdvance();
}
This loop simply calls arriveAndAwaitAdvance( ) until the
phaser is terminated. The phaser won’t terminate until the specified number of
phases have been executed. In this case, the loop continues to execute until
four phases have run. Next, notice that the threads also call arriveAndAwaitAdvance( ) within a loop
that runs until the phaser is terminated. This means that they will execute until the specified number of phases
has been completed.
Now, look closely at the code
for onAdvance( ). Each time onAdvance( ) is called, it is passed
the current phase and the number of registered parties. If the current phase
equals the specified phase, or if the number of registered parties is zero, onAdvance( ) returns true, thus stopping the phaser. This is
accomplished with this line of code:
// If all phases have completed, return true
if(p == numPhases || regParties == 0) return
true;
As you can see, very little
code is needed to accommodate the desired outcome.
Before moving on, it is
useful to point out that you don’t necessarily need to explicitly extend Phaser as the previous example does to
simply override onAdvance( ). In
some cases, more compact code can be created by using an anonymous inner class
to override onAdvance( ).
Phaser has additional capabilities that may be of use in your
applications. You can wait for a
specific phase by calling awaitAdvance(
), which is shown here:
int awaitAdvance(int phase)
Here, phase indicates the phase number on which awaitAdvance( ) will wait until a transition to the next phase
takes place. It will return immediately if the argument passed to phase is not equal to the current phase.
It will also return immediately if the phaser is terminated. However, if phase is passed the current phase, then
it will wait until the phase increments. This method should be called only by a
registered party. There is also an interruptible version of this method called awaitAdvanceInterruptibly( ).
To register more than one
party, call bulkRegister( ). To
obtain the number of registered parties, call getRegisteredParties( ). You can also obtain the number of arrived
parties and unarrived parties by calling getArrivedParties(
) and getUnarrivedParties( ),
respectively. To force the phaser to enter a terminated state, call forceTermination( ).
Phaser also lets you create a tree of phasers. This is supported by two additional constructors, which let you specify the parent, and the getParent( ) method.
Related Topics
Privacy Policy, Terms and Conditions, DMCA Policy and Compliant
Copyright © 2018-2024 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.