Chapter: Java The Complete Reference - The Java Library - The Concurrency Utilities

Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail

Using Synchronization Objects

Synchronization objects are supported by the Semaphore, CountDownLatch, CyclicBarrier, Exchanger, and Phaser classes.

Using Synchronization Objects

 

Synchronization objects are supported by the Semaphore, CountDownLatch, CyclicBarrier, Exchanger, and Phaser classes. Collectively, they enable you to handle several formerly difficult synchronization situations with ease. They are also applicable to a wide range of programs—even those that contain only limited concurrency. Because the synchronization objects will be of interest to nearly all Java programs, each is examined here in some detail.

Semaphore

 

The synchronization object that many readers will immediately recognize is Semaphore, which implements a classic semaphore. A semaphore controls access to a shared resource through the use of a counter. If the counter is greater than zero, then access is allowed. If it is zero, then access is denied. What the counter is counting are permits that allow access to the shared resource. Thus, to access the resource, a thread must be granted a permit from the semaphore.

 

In general, to use a semaphore, the thread that wants access to the shared resource tries to acquire a permit. If the semaphore’s count is greater than zero, then the thread acquires a permit, which causes the semaphore’s count to be decremented. Otherwise, the thread will be blocked until a permit can be acquired. When the thread no longer needs access

to the shared resource, it releases the permit, which causes the semaphore’s count to be incremented. If there is another thread waiting for a permit, then that thread will acquire a permit at that time. Java’s Semaphore class implements this mechanism.

 

Semaphore has the two constructors shown here:

 

Semaphore(int num) Semaphore(int num, boolean how)

 

Here, num specifies the initial permit count. Thus, num specifies the number of threads that can access a shared resource at any one time. If num is one, then only one thread can access the resource at any one time. By default, waiting threads are granted a permit in an undefined order. By setting how to true, you can ensure that waiting threads are granted a permit in the order in which they requested access.

 

To acquire a permit, call the acquire( ) method, which has these two forms:

 

void acquire( ) throws InterruptedException

 

void acquire(int num) throws InterruptedException

 

The first form acquires one permit. The second form acquires num permits. Most often, the first form is used. If the permit cannot be granted at the time of the call, then the invoking thread suspends until the permit is available.

 

To release a permit, call release( ), which has these two forms:

 

void release( )

 

void release(int num)

 

The first form releases one permit. The second form releases the number of permits specified by num.

To use a semaphore to control access to a resource, each thread that wants to use that resource must first call acquire( ) before accessing the resource. When the thread is done with the resource, it must call release( ). Here is an example that illustrates the use of a semaphore:

 

 

// A simple semaphore example.

 

import java.util.concurrent.*;

 

class SemDemo {

public static void main(String args[]) { Semaphore sem = new Semaphore(1);

 

new IncThread(sem, "A"); new DecThread(sem, "B");

 

}

 

}

 

    A shared resource. class Shared {

static int count = 0;

 

}

 

    A thread of execution that increments count. class IncThread implements Runnable {

 

String name; Semaphore sem;

 

IncThread(Semaphore s, String n) { sem = s;

 

name = n;

 

new Thread(this).start();

 

}

 

public void run() { System.out.println("Starting " + name);

 

try {

 

// First, get a permit.

 

System.out.println(name + " is waiting for a permit."); sem.acquire();

 

System.out.println(name + " gets a permit.");

 

// Now, access shared resource.

for(int i=0; i < 5; i++) {

Shared.count++;

 

System.out.println(name + ": " + Shared.count);

 

// Now, allow a context switch -- if possible.

Thread.sleep(10);

 

}

 

} catch (InterruptedException exc) { System.out.println(exc);

 

}

 

// Release the permit.

 

System.out.println(name + " releases the permit."); sem.release();

 

}

 

}

// A thread of execution that decrements count.

class DecThread implements Runnable {

 

String name; Semaphore sem;

 

DecThread(Semaphore s, String n) { sem = s;

 

name = n;

 

new Thread(this).start();

 

}

 

public void run() { System.out.println("Starting " + name);

 

try {

 

// First, get a permit.

 

System.out.println(name + " is waiting for a permit."); sem.acquire();

 

System.out.println(name + " gets a permit.");

 

// Now, access shared resource.

for(int i=0; i < 5; i++) {

Shared.count--;

 

System.out.println(name + ": " + Shared.count);

 

// Now, allow a context switch -- if possible.

Thread.sleep(10);

 

}

 

} catch (InterruptedException exc) { System.out.println(exc);

 

}

 

// Release the permit.

 

System.out.println(name + " releases the permit."); sem.release();

 

}

 

}

 

The output from the program is shown here. (The precise order in which the threads execute may vary.)

Starting A

 

A is waiting for a permit.

 

A gets a permit.

 

A: 1

 

Starting B

 

B is waiting for a permit.

 

A: 2

 

A: 3

 

A: 4

 

A: 5

 

A releases the permit.

B gets a permit.

 

B: 4

 

B: 3

 

B: 2

 

B: 1

 

B: 0

 

B releases the permit.

 

 

The program uses a semaphore to control access to the count variable, which is a static variable within the Shared class. Shared.count is incremented five times by the run( ) method of IncThread and decremented five times by DecThread. To prevent these two threads from accessing Shared.count at the same time, access is allowed only after a permit is acquired from the controlling semaphore. After access is complete, the permit is released. In this way, only one thread at a time will access Shared.count, as the output shows.

In both IncThread and DecThread, notice the call to sleep( ) within run( ). It is used to “prove” that accesses to Shared.count are synchronized by the semaphore. In run( ), the call to sleep( ) causes the invoking thread to pause between each access to Shared.count. This would normally enable the second thread to run. However, because of the semaphore, the second thread must wait until the first has released the permit, which happens only after all accesses by the first thread are complete. Thus, Shared.count is first incremented five times by IncThread and then decremented five times by DecThread. The increments and decrements are not intermixed.

 

Without the use of the semaphore, accesses to Shared.count by both threads would have occurred simultaneously, and the increments and decrements would be intermixed. To confirm this, try commenting out the calls to acquire( ) and release( ). When you run the program, you will see that access to Shared.count is no longer synchronized, and each thread accesses it as soon as it gets a timeslice.

 

Although many uses of a semaphore are as straightforward as that shown in the preceding program, more intriguing uses are also possible. Here is an example. The following program reworks the producer/consumer program shown in Chapter 11 so that it uses two semaphores to regulate the producer and consumer threads, ensuring that each call to put( ) is followed by a corresponding call to get( ):

 

    //An implementation of a producer and consumer

 

    //that use semaphores to control synchronization.

 

import java.util.concurrent.Semaphore;

 

class Q { int n;

 

// Start with consumer semaphore unavailable. static Semaphore

semCon = new Semaphore(0); static Semaphore semProd = new Semaphore(1);

 

void get() { try {

 

semCon.acquire();

 

} catch(InterruptedException e) {

System.out.println("InterruptedException caught");

}

 

System.out.println("Got: " + n); semProd.release();

 

}

 

void put(int n) { try {

 

semProd.acquire();

 

} catch(InterruptedException e) {

System.out.println("InterruptedException caught");

}

 

this.n = n; System.out.println("Put: " + n); semCon.release();

 

}

 

}

 

class Producer implements Runnable { Q q;

 

Producer(Q q) { this.q = q;

 

new Thread(this, "Producer").start();

 

}

 

public void run() {

 

for(int i=0; i < 20; i++) q.put(i);

 

}

 

}

 

class Consumer implements Runnable { Q q;

 

Consumer(Q q) { this.q = q;

 

new Thread(this, "Consumer").start();

 

}

 

public void run() {

 

for(int i=0; i < 20; i++) q.get();

 

}

 

}

 

class ProdCon {

 

public static void main(String args[]) { Q q = new Q();

 

new Consumer(q); new Producer(q);

 

}

 

}

Put: 0 Got: 0 Put: 1 Got: 1 Put: 2 Got: 2 Put: 3 Got: 3 Put: 4 Got: 4 Put: 5 Got: 5

 

.

 

.

 

As you can see, the calls to put( ) and get( ) are synchronized. That is, each call to put( ) is followed by a call to get( ) and no values are missed. Without the semaphores, multiple calls to put( ) would have occurred without matching calls to get( ), resulting in values being missed. (To prove this, remove the semaphore code and observe the results.)

The sequencing of put( ) and get( ) calls is handled by two semaphores: semProd and semCon. Before put( ) can produce a value, it must acquire a permit from semProd. After it has set the value, it releases semCon. Before get( ) can consume a value, it must acquire a permit from semCon. After it consumes the value, it releases semProd. This “give and take” mechanism ensures that each call to put( ) must be followed by a call to get( ).

Notice that semCon is initialized with no available permits. This ensures that put( ) executes first. The ability to set the initial synchronization state is one of the more powerful aspects of a semaphore.

 

CountDownLatch

 

Sometimes you will want a thread to wait until one or more events have occurred. To handle such a situation, the concurrent API supplies CountDownLatch. A CountDownLatch is initially created with a count of the number of events that must occur before the latch is released. Each time an event happens, the count is decremented. When the count reaches zero, the latch opens.

 

CountDownLatch has the following constructor: CountDownLatch(int num)

Here, num specifies the number of events that must occur in order for the latch to open. To wait on the latch, a thread calls await( ), which has the forms shown here:

 

void await( ) throws InterruptedException

 

boolean await(long wait, TimeUnit tu) throws InterruptedException

 

The first form waits until the count associated with the invoking CountDownLatch reaches zero. The second form waits only for the period of time specified by wait. The units represented by wait are specified by tu, which is an object the TimeUnit enumeration.

(TimeUnit is described later in this chapter.) It returns false if the time limit is reached and true if the countdown reaches zero

To signal an event, call the countDown( ) method, shown next: void countDown( )

 

Each call to countDown( ) decrements the count associated with the invoking object.

 

The following program demonstrates CountDownLatch. It creates a latch that requires five events to occur before it opens.

 

// An example of CountDownLatch.

 

import java.util.concurrent.CountDownLatch;

 

class CDLDemo {

 

public static void main(String args[]) { CountDownLatch cdl = new CountDownLatch(5);

 

System.out.println("Starting");

 

new MyThread(cdl);

 

try { cdl.await();

 

} catch (InterruptedException exc) { System.out.println(exc);

 

}

 

System.out.println("Done");

 

}

 

}

 

class MyThread implements Runnable { CountDownLatch latch;

 

MyThread(CountDownLatch c) { latch = c;

 

new Thread(this).start();

 

}

 

public void run() {

 

for(int i = 0; i<5; i++) { System.out.println(i); latch.countDown();

// decrement count

}

 

}

 

}

 

The output produced by the program is shown here:

Starting 0 1

2

 

3

 

4 Done

 

Inside main( ), a CountDownLatch called cdl is created with an initial count of five. Next, an instance of MyThread is created, which begins execution of a new thread. Notice that cdl is passed as a parameter to MyThread’s constructor and stored in the latch instance variable. Then, the main thread calls await( ) on cdl, which causes execution of the main thread to pause until cdl’s count has been decremented five times.

Inside the run( ) method of MyThread, a loop is created that iterates five times. With each iteration, the countDown( ) method is called on latch, which refers to cdl in main( ). After the fifth iteration, the latch opens, which allows the main thread to resume.

CountDownLatch is a powerful yet easy-to-use synchronization object that is appropriate whenever a thread must wait for one or more events to occur.

CyclicBarrier

 

A situation not uncommon in concurrent programming occurs when a set of two or more threads must wait at a predetermined execution point until all threads in the set have reached that point. To handle such a situation, the concurrent API supplies the CyclicBarrier class. It enables you to define a synchronization object that suspends until the specified number of threads has reached the barrier point.

 

CyclicBarrier has the following two constructors:

 

CyclicBarrier(int numThreads) CyclicBarrier(int numThreads, Runnable action)

 

Here, numThreads specifies the number of threads that must reach the barrier before execution continues. In the second form, action specifies a thread that will be executed when the barrier is reached.

 

Here is the general procedure that you will follow to use CyclicBarrier. First, create a CyclicBarrier object, specifying the number of threads that you will be waiting for. Next, when each thread reaches the barrier, have it call await( ) on that object. This will pause execution of the thread until all of the other threads also call await( ). Once the specified number of threads has reached the barrier, await( ) will return and execution will resume. Also, if you have specified an action, then that thread is executed.

The await( ) method has the following two forms:

 

int await( ) throws InterruptedException, BrokenBarrierException

 

int await(long wait, TimeUnit tu)

 

throws InterruptedException, BrokenBarrierException, TimeoutException

 

The first form waits until all the threads have reached the barrier point. The second form waits only for the period of time specified by wait. The units represented by wait are specified by tu. Both forms return a value that indicates the order that the threads arrive at the barrier point. The first thread returns a value equal to the number of threads waited upon minus one. The last thread returns zero.

Here is an example that illustrates CyclicBarrier. It waits until a set of three threads has reached the barrier. When that occurs, the thread specified by BarAction executes.

 

// An example of CyclicBarrier.

 

import java.util.concurrent.*;

 

class BarDemo {

 

public static void main(String args[]) {

 

CyclicBarrier cb = new CyclicBarrier(3, new BarAction() );

 

System.out.println("Starting");

 

new MyThread(cb, "A"); new MyThread(cb, "B"); new MyThread(cb, "C");

 

}

 

}

 

// A thread of execution that uses a CyclicBarrier.

 

class MyThread implements Runnable { CyclicBarrier cbar;

 

String name;

 

MyThread(CyclicBarrier c, String n) { cbar = c;

 

name = n;

 

new Thread(this).start();

 

}

 

public void run() {

 

System.out.println(name);

 

try { cbar.await();

 

} catch (BrokenBarrierException exc) { System.out.println(exc);

 

} catch (InterruptedException exc) { System.out.println(exc);

 

}

 

}

 

}

 

     //An object of this class is called when the

 

     //CyclicBarrier ends.

 

class BarAction implements Runnable { public void run() {

 

System.out.println("Barrier Reached!");

 

}

 

}

The output is shown here. (The precise order in which the threads execute may vary.)

 

Starting

 

A

 

B

 

C

 

Barrier Reached!

 

A CyclicBarrier can be reused because it will release waiting threads each time the specified number of threads calls await( ). For example, if you change main( ) in the preceding program so that it looks like this:

 

public static void main(String args[]) {

 

CyclicBarrier cb = new CyclicBarrier(3, new BarAction() );

 

System.out.println("Starting");

 

new MyThread(cb, "A"); new MyThread(cb, "B"); new MyThread(cb, "C"); new MyThread(cb, "X"); new MyThread(cb, "Y"); new MyThread(cb, "Z");

 

}

the following output will be produced. (The precise order in which the threads execute may vary.)

 

Starting

 

A

 

B

 

C

 

Barrier Reached!

 

X

 

Y

 

Z

 

Barrier Reached!

 

As the preceding example shows, the CyclicBarrier offers a streamlined solution to what was previously a complicated problem.

 

Exchanger

 

Perhaps the most interesting of the synchronization classes is Exchanger. It is designed to simplify the exchange of data between two threads. The operation of an Exchanger is astoundingly simple: it simply waits until two separate threads call its exchange( ) method. When that occurs, it exchanges the data supplied by the threads. This mechanism is both elegant and easy to use. Uses for Exchanger are easy to imagine. For example, one thread might prepare a buffer for receiving information over a network connection. Another thread might fill that buffer with the information from the connection. The two threads work together so that each time a new buffer is needed, an exchange is made.

 

Exchanger is a generic class that is declared as shown here: Exchanger<V>

 

Here, V specifies the type of the data being exchanged.

 

The only method defined by Exchanger is exchange( ), which has the two forms shown here:

 

V exchange(V objRef) throws InterruptedException

 

V exchange(V objRef, long wait, TimeUnit tu) throws InterruptedException, TimeoutException

 

Here, objRef is a reference to the data to exchange. The data received from the other thread is returned. The second form of exchange( ) allows a time-out period to be specified. The key point about exchange( ) is that it won’t succeed until it has been called on the same Exchanger object by two separate threads. Thus, exchange( ) synchronizes the exchange

of the data.

 

Here is an example that demonstrates Exchanger. It creates two threads. One thread creates an empty buffer that will receive the data put into it by the second thread. In this case, the data is a string. Thus, the first thread exchanges an empty string for a full one.

 

// An example of Exchanger.

 

import java.util.concurrent.Exchanger;

 

class ExgrDemo {

 

public static void main(String args[]) { Exchanger<String> exgr = new Exchanger<String>();

 

new UseString(exgr); new MakeString(exgr);

 

}

 

}

 

// A Thread that constructs a string.

class MakeString implements Runnable {

 

Exchanger<String> ex; String str;

 

MakeString(Exchanger<String> c) { ex = c;

 

str = new String();

 

new Thread(this).start();

 

}

 

public void run() { char ch = 'A';

 

for(int i = 0; i < 3; i++) {

 

// Fill Buffer

 

for(int j = 0; j < 5; j++)

 

str += ch++;

 

try {

 

// Exchange a full buffer for an empty one.

str = ex.exchange(str);

 

} catch(InterruptedException exc) { System.out.println(exc);

 

}

 

}

 

}

 

}

 

// A Thread that uses a string.

 

class UseString implements Runnable { Exchanger<String> ex;

 

String str; UseString(Exchanger<String> c) {

ex = c;

 

new Thread(this).start();

 

}

 

public void run() {

 

for(int i=0; i < 3; i++) { try {

 

// Exchange an empty buffer for a full one.

str = ex.exchange(new String()); System.out.println("Got: " + str);

 

} catch(InterruptedException exc) { System.out.println(exc);

 

}

 

}

 

}

 

}

 

Here is the output produced by the program:

 

Got: ABCDE

 

Got: FGHIJ

 

Got: KLMNO

 

In the program, the main( ) method creates an Exchanger for strings. This object is then used to synchronize the exchange of strings between the MakeString and UseString classes. The MakeString class fills a string with data. The UseString exchanges an empty string for a full one. It then displays the contents of the newly constructed string. The exchange of empty and full buffers is synchronized by the exchange( ) method, which is called by both classes’ run( ) method.


Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail


Copyright © 2018-2020 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.