Chapter: Java The Complete Reference - The Java Library - The Concurrency Utilities

Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail

Phaser - Java Class

Another synchronization class is called Phaser. Its primary purpose is to enable the synchronization of threads that represent one or more phases of activity.

Phaser

 

Another synchronization class is called Phaser. Its primary purpose is to enable the synchronization of threads that represent one or more phases of activity. For example, you might have a set of threads that implement three phases of an order-processing application. In the first phase, separate threads are used to validate customer information, check inventory, and confirm pricing. When that phase is complete, the second phase has two threads that compute shipping costs and all applicable tax. After that, a final phase confirms payment and determines estimated shipping time. In the past, to synchronize the multiple threads that comprise this scenario would require a bit of work on your part. With the inclusion of Phaser, the process is now much easier.

 

To begin, it helps to know that a Phaser works a bit like a CyclicBarrier, described earlier, except that it supports multiple phases. As a result, Phaser lets you define a synchronization object that waits until a specific phase has completed. It then advances to the next phase, again waiting until that phase concludes. It is important to understand that Phaser can also be used to synchronize only a single phase. In this regard, it acts much like a CyclicBarrier. However, its primary use is to synchronize multiple phases.

Phaser defines four constructors. Here are the two used in this section: Phaser( )

Phaser(int numParties)

 

The first creates a phaser that has a registration count of zero. The second sets the registration count to numParties. The term party is often applied to the objects that register with a phaser. Although often there is a one-to-correspondence between the number of registrants and the number of threads being synchronized, this is not required. In both cases, the current phase is zero. That is, when a Phaser is created, it is initially at phase zero.

 

In general, here is how you use Phaser. First, create a new instance of Phaser. Next, register one or more parties with the phaser, either by calling register( ) or by specifying the number of parties in the constructor. For each registered party, have the phaser wait until all registered parties complete a phase. A party signals this by calling one of a variety of methods supplied by Phaser, such as arrive( ) or arriveAndAwaitAdvance( ). After all parties have arrived, the phase is complete, and the phaser can move on to the next phase (if there is one), or terminate. The following sections explain the process in detail.

To register parties after a Phaser has been constructed, call register( ). It is shown here: int register()

It returns the phase number of the phase to which it is registered.

 

To signal that a party has completed a phase, it must call arrive( ) or some variation of arrive( ). When the number of arrivals equals the number of registered parties, the phase is completed and the Phaser moves on to the next phase (if there is one). The arrive( ) method has this general form:

 

int arrive( )

 

This method signals that a party (normally a thread of execution) has completed some task (or portion of a task). It returns the current phase number. If the phaser has been terminated, then it returns a negative value. The arrive( ) method does not suspend execution of the calling thread. This means that it does not wait for the phase to be completed. This method should be called only by a registered party.

If you want to indicate the completion of a phase and then wait until all other registrants have also completed that phase, use arriveAndAwaitAdvance( ). It is shown here:

 

int arriveAndAwaitAdvance( )

 

It waits until all parties have arrived. It returns the next phase number or a negative value if the phaser has been terminated. This method should be called only by a registered party.

 

A thread can arrive and then deregister itself by calling arriveAndDeregister( ). It is shown here:

 

int arriveAndDeregister( )

 

It returns the current phase number or a negative value if the phaser has been terminated. It does not wait until the phase is complete. This method should be called only by a registered party.

 

To obtain the current phase number, call getPhase( ), which is shown here: final int getPhase( )

 

When a Phaser is created, the first phase will be 0, the second phase 1, the third phase 2, and so on. A negative value is returned if the invoking Phaser has been terminated.

Here is an example that shows Phaser in action. It creates three threads, each of which have three phases. It uses a Phaser to synchronize each phase.

 

// An example of Phaser.

 

import java.util.concurrent.*;

 

class PhaserDemo {

 

public static void main(String args[]) { Phaser phsr = new Phaser(1);

 

int curPhase;

 

System.out.println("Starting");

 

new MyThread(phsr, "A"); new MyThread(phsr, "B"); new MyThread(phsr, "C");

 

     Wait for all threads to complete phase one. curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance(); System.out.println("Phase " + curPhase + " Complete");

 

     Wait for all threads to complete phase two. curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance(); System.out.println("Phase " + curPhase + " Complete");

 

curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance(); System.out.println("Phase " + curPhase + " Complete");

 

// Deregister the main thread.

phsr.arriveAndDeregister();

 

if(phsr.isTerminated())

 

System.out.println("The Phaser is terminated");

 

}

 

}

 

// A thread of execution that uses a Phaser.

class MyThread implements Runnable {

 

Phaser phsr; String name;

 

MyThread(Phaser p, String n) { phsr = p;

 

name = n; phsr.register();

 

new Thread(this).start();

 

}

 

public void run() {

 

System.out.println("Thread " + name + " Beginning Phase One"); phsr.arriveAndAwaitAdvance(); // Signal arrival.

 

     //Pause a bit to prevent jumbled output. This is for illustration

 

     //only. It is not required for the proper operation of the phaser.

 

     try {

 

Thread.sleep(10);

 

} catch(InterruptedException e) { System.out.println(e);

 

}

 

System.out.println("Thread " + name + " Beginning Phase Two"); phsr.arriveAndAwaitAdvance(); // Signal arrival.

 

     Pause a bit to prevent jumbled output. This is for illustration

 

     only. It is not required for the proper operation of the phaser. try {

 

Thread.sleep(10);

 

} catch(InterruptedException e) { System.out.println(e);

 

}

 

System.out.println("Thread " + name + " Beginning Phase Three"); phsr.arriveAndDeregister(); // Signal arrival and deregister.

 

}

 

}

 

The output is shown here:

Starting

 

Thread A Beginning Phase One

 

Thread C Beginning Phase One

Thread B Beginning Phase One

 

Phase 0 Complete

 

Thread B Beginning Phase Two

 

Thread C Beginning Phase Two

 

Thread A Beginning Phase Two

 

Phase 1 Complete

 

Thread C Beginning Phase Three

 

Thread B Beginning Phase Three

 

Thread A Beginning Phase Three

 

Phase 2 Complete

 

The Phaser is terminated

 

Let’s look closely at the key sections of the program. First, in main( ), a Phaser called phsr is created with an initial party count of 1 (which corresponds to the main thread). Then three threads are started by creating three MyThread objects. Notice that MyThread is passed a reference to phsr (the phaser). The MyThread objects use this phaser to synchronize their activities. Next, main( ) calls getPhase( ) to obtain the current phase number (which is initially zero) and then calls arriveAndAwaitAdvance( ). This causes main( ) to suspend until phase zero has completed. This won’t happen until all MyThreads also call arriveAndAwaitAdvance( ). When this occurs, main( ) will resume execution, at which point it displays that phase zero has completed, and it moves on to the next phase. This process repeats until all three phases have finished. Then, main( ) calls arriveAndDeregister( ). At that point, all three MyThreads have also deregistered. Since this results in there being no registered parties when the phaser advances to the next phase, the phaser is terminated.

 

Now look at MyThread. First, notice that the constructor is passed a reference to the phaser that it will use and then registers with the new thread as a party on that phaser. Thus, each new MyThread becomes a party registered with the passed-in phaser. Also notice that each thread has three phases. In this example, each phase consists of a placeholder that simply displays the name of the thread and what it is doing. Obviously, in real-world code, the thread would be performing more meaningful actions. Between the first two phases, the thread calls arriveAndAwaitAdvance( ). Thus, each thread waits until all threads have completed the phase (and the main thread is ready). After all threads have arrived (including the main thread), the phaser moves on to the next phase. After the third phase, each thread deregisters itself with a call to arriveAndDeregister( ). As the comments in MyThread explain, the calls to sleep( ) are used for the purposes of illustration to ensure that the output is not jumbled because of the multithreading. They are not needed to make the phaser work properly. If you remove them, the output may look a bit jumbled, but the phases will still be synchronized correctly.

 

One other point: Although the preceding example used three threads that were all of the same type, this is not a requirement. Each party that uses a phaser can be unique, with each performing some separate task.

It is possible to take control of precisely what happens when a phase advance occurs. To do this, you must override the onAdvance( ) method. This method is called by the run time when a Phaser advances from one phase to the next. It is shown here:

 

protected boolean onAdvance(int phase, int numParties)

 

Here, phase will contain the current phase number prior to being incremented and numParties will contain the number of registered parties. To terminate the phaser, onAdvance( ) must return true. To keep the phaser alive, onAdvance( ) must return false.

The default version of onAdvance( ) returns true (thus terminating the phaser) when there are no registered parties. As a general rule, your override should also follow this practice.

One reason to override onAdvance( ) is to enable a phaser to execute a specific number of phases and then stop. The following example gives you the flavor of this usage. It creates a class called MyPhaser that extends Phaser so that it will run a specified number of phases. It does this by overriding the onAdvance( ) method. The MyPhaser constructor accepts one argument, which specifies the number of phases to execute. Notice that MyPhaser automatically registers one party. This behavior is useful in this example, but the needs

 

of your own applications may differ.

 

     //Extend Phaser and override onAdvance() so that only a specific

 

     //number of phases are executed.

 

import java.util.concurrent.*;

 

     //Extend MyPhaser to allow only a specific number of phases

 

     //to be executed.

 

class MyPhaser extends Phaser { int numPhases;

 

MyPhaser(int parties, int phaseCount) { super(parties);

 

numPhases = phaseCount - 1;

 

}

 

     //Override onAdvance() to execute the specified

 

     //number of phases.

 

protected boolean onAdvance(int p, int regParties) {

 

     This println() statement is for illustration only.

 

     Normally, onAdvance() will not display output. System.out.println("Phase " + p + " completed.\n");

 

     //If all phases have completed, return true

 

if(p == numPhases || regParties == 0) return true;

 

// Otherwise, return false.

return false;

 

}

 

}

 

class PhaserDemo2 {

 

public static void main(String args[]) {

 

MyPhaser phsr = new MyPhaser(1, 4);

 

System.out.println("Starting\n");

 

new MyThread(phsr, "A"); new MyThread(phsr, "B"); new MyThread(phsr, "C");

 

// Wait for the specified number of phases to complete.

while(!phsr.isTerminated()) {

phsr.arriveAndAwaitAdvance();

 

}

 

System.out.println("The Phaser is terminated");

 

}

 

}

 

// A thread of execution that uses a Phaser.

class MyThread implements Runnable {

 

Phaser phsr; String name;

 

MyThread(Phaser p, String n) { phsr = p;

 

name = n; phsr.register();

 

new Thread(this).start();

 

}

 

public void run() {

 

while(!phsr.isTerminated()) {

 

System.out.println("Thread " + name + " Beginning Phase " + phsr.getPhase());

 

phsr.arriveAndAwaitAdvance();

 

     //Pause a bit to prevent jumbled output. This is for illustration

 

     //only. It is not required for the proper operation of the phaser.

 

     try {

 

Thread.sleep(10);

 

} catch(InterruptedException e) { System.out.println(e);

 

}

 

}

 

}

 

}

The output from the program is shown here:

 

Starting

 

Thread B Beginning Phase 0

 

Thread A Beginning Phase 0

 

Thread C Beginning Phase 0

 

Phase 0 completed.

 

Thread A Beginning Phase 1

 

Thread B Beginning Phase 1

 

Thread C Beginning Phase 1

 

Phase 1 completed.

 

Thread C Beginning Phase 2

 

Thread B Beginning Phase 2

 

Thread A Beginning Phase 2

 

Phase 2 completed.

 

Thread C Beginning Phase 3

 

Thread B Beginning Phase 3

 

Thread A Beginning Phase 3

 

Phase 3 completed.

 

The Phaser is terminated

 

Inside main( ), one instance of Phaser is created. It is passed 4 as an argument, which means that it will execute four phases and then stop. Next, three threads are created and then the following loop is entered:

 

// Wait for the specified number of phases to complete.

 while(!phsr.isTerminated()) {

 

phsr.arriveAndAwaitAdvance();

 

}

 

This loop simply calls arriveAndAwaitAdvance( ) until the phaser is terminated. The phaser won’t terminate until the specified number of phases have been executed. In this case, the loop continues to execute until four phases have run. Next, notice that the threads also call arriveAndAwaitAdvance( ) within a loop that runs until the phaser is terminated. This means that they will execute until the specified number of phases has been completed.

Now, look closely at the code for onAdvance( ). Each time onAdvance( ) is called, it is passed the current phase and the number of registered parties. If the current phase equals the specified phase, or if the number of registered parties is zero, onAdvance( ) returns true, thus stopping the phaser. This is accomplished with this line of code:

 

// If all phases have completed, return true

 

if(p == numPhases || regParties == 0) return true;

 

As you can see, very little code is needed to accommodate the desired outcome.

 

Before moving on, it is useful to point out that you don’t necessarily need to explicitly extend Phaser as the previous example does to simply override onAdvance( ). In some cases, more compact code can be created by using an anonymous inner class to override onAdvance( ).

 

Phaser has additional capabilities that may be of use in your applications. You can wait for a specific phase by calling awaitAdvance( ), which is shown here:

 

int awaitAdvance(int phase)

 

Here, phase indicates the phase number on which awaitAdvance( ) will wait until a transition to the next phase takes place. It will return immediately if the argument passed to phase is not equal to the current phase. It will also return immediately if the phaser is terminated. However, if phase is passed the current phase, then it will wait until the phase increments. This method should be called only by a registered party. There is also an interruptible version of this method called awaitAdvanceInterruptibly( ).

To register more than one party, call bulkRegister( ). To obtain the number of registered parties, call getRegisteredParties( ). You can also obtain the number of arrived parties and unarrived parties by calling getArrivedParties( ) and getUnarrivedParties( ), respectively. To force the phaser to enter a terminated state, call forceTermination( ).

Phaser also lets you create a tree of phasers. This is supported by two additional constructors, which let you specify the parent, and the getParent( ) method.

 

Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail


Copyright © 2018-2020 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.