Using
Synchronization Objects
Synchronization objects are
supported by the Semaphore, CountDownLatch, CyclicBarrier, Exchanger,
and Phaser classes. Collectively,
they enable you to handle several formerly
difficult synchronization situations with ease. They are also applicable to
a wide range of programs—even those that contain only limited concurrency.
Because the synchronization objects will be of interest to nearly all Java
programs, each is examined here in some detail.
Semaphore
The synchronization object
that many readers will immediately recognize is Semaphore, which implements a classic semaphore. A semaphore
controls access to a shared resource through the use of a counter. If the
counter is greater than zero, then access is allowed. If it is zero, then access
is denied. What the counter is counting are permits
that allow access to the shared resource. Thus, to access the resource, a
thread must be granted a permit from the semaphore.
In general, to use a
semaphore, the thread that wants access to the shared resource tries to acquire
a permit. If the semaphore’s count is greater than zero, then the thread
acquires a permit, which causes the semaphore’s count to be decremented.
Otherwise, the thread will be blocked until a permit can be acquired. When the
thread no longer needs access
to the shared resource, it
releases the permit, which causes the semaphore’s count to be incremented. If
there is another thread waiting for a permit, then that thread will acquire a
permit at that time. Java’s Semaphore
class implements this mechanism.
Semaphore has the two constructors shown here:
Semaphore(int num) Semaphore(int num, boolean how)
Here, num specifies the initial permit count. Thus, num specifies the number of threads that can access a shared
resource at any one time. If num is
one, then only one thread can access the resource at any one time. By default,
waiting threads are granted a permit in an undefined order. By setting how to true, you can ensure that waiting threads are granted a permit in
the order in which they requested access.
To acquire a permit, call the
acquire( ) method, which has these
two forms:
void acquire( ) throws
InterruptedException
void acquire(int num) throws InterruptedException
The first form acquires one
permit. The second form acquires num
permits. Most often, the first form is used. If the permit cannot be granted at
the time of the call, then the invoking thread suspends until the permit is
available.
To release a permit, call release( ), which has these two forms:
void release( )
void release(int num)
The first form releases one
permit. The second form releases the number of permits specified by num.
To use a semaphore to control
access to a resource, each thread that wants to use that resource must first
call acquire( ) before accessing the
resource. When the thread is done with the resource, it must call release( ). Here is an example that
illustrates the use of a semaphore:
// A simple semaphore example.
import java.util.concurrent.*;
class SemDemo {
public static void main(String args[]) {
Semaphore sem = new Semaphore(1);
new IncThread(sem, "A"); new
DecThread(sem, "B");
}
}
A shared resource. class Shared {
static int count = 0;
}
A thread of execution that increments count.
class IncThread implements Runnable {
String name; Semaphore sem;
IncThread(Semaphore s, String n) { sem = s;
name = n;
new Thread(this).start();
}
public void run() {
System.out.println("Starting " + name);
try {
// First, get a permit.
System.out.println(name + " is waiting for
a permit."); sem.acquire();
System.out.println(name + " gets a
permit.");
// Now, access shared resource.
for(int i=0; i < 5; i++) {
Shared.count++;
System.out.println(name + ": " +
Shared.count);
// Now, allow a context switch -- if possible.
Thread.sleep(10);
}
} catch (InterruptedException exc) {
System.out.println(exc);
}
// Release the permit.
System.out.println(name + " releases the
permit."); sem.release();
}
}
// A thread of execution that decrements count.
class DecThread implements Runnable {
String name; Semaphore sem;
DecThread(Semaphore s, String n) { sem = s;
name = n;
new Thread(this).start();
}
public void run() {
System.out.println("Starting " + name);
try {
// First, get a permit.
System.out.println(name + " is waiting for
a permit."); sem.acquire();
System.out.println(name + " gets a
permit.");
// Now, access shared resource.
for(int i=0; i < 5; i++) {
Shared.count--;
System.out.println(name + ": " +
Shared.count);
// Now, allow a context switch -- if possible.
Thread.sleep(10);
}
} catch (InterruptedException exc) {
System.out.println(exc);
}
// Release the permit.
System.out.println(name + " releases the
permit."); sem.release();
}
}
The output from the program
is shown here. (The precise order in which the threads execute may vary.)
Starting A
A is waiting for a permit.
A gets a permit.
A: 1
Starting B
B is waiting for a permit.
A: 2
A: 3
A: 4
A: 5
A releases the permit.
B gets a permit.
B: 4
B: 3
B: 2
B: 1
B: 0
B releases the permit.
The program uses a semaphore
to control access to the count
variable, which is a static variable within the Shared class. Shared.count
is incremented five times by the run( )
method of IncThread and decremented
five times by DecThread. To prevent
these two threads from accessing Shared.count
at the same time, access is allowed only after a permit is acquired from the
controlling semaphore. After access is complete, the permit is released. In
this way, only one thread at a time will access Shared.count, as the output shows.
In both IncThread and DecThread,
notice the call to sleep( ) within run( ). It is used to “prove” that
accesses to Shared.count are
synchronized by the semaphore. In run( ),
the call to sleep( ) causes the
invoking thread to pause between each access to Shared.count. This would normally enable the second thread to run.
However, because of the semaphore, the second thread must wait until the first
has released the permit, which happens only after all accesses by the first
thread are complete. Thus, Shared.count
is first incremented five times by IncThread
and then decremented five times by DecThread.
The increments and decrements are not
intermixed.
Without the use of the
semaphore, accesses to Shared.count
by both threads would have occurred simultaneously, and the increments and
decrements would be intermixed. To confirm this, try commenting out the calls
to acquire( ) and release( ). When you run the program,
you will see that access to Shared.count
is no longer synchronized, and each thread accesses it as soon as it gets a
timeslice.
Although many uses of a
semaphore are as straightforward as that shown in the preceding program, more
intriguing uses are also possible. Here is an example. The following program
reworks the producer/consumer program shown in Chapter 11 so that it uses two
semaphores to regulate the producer and consumer threads, ensuring that each
call to put( ) is followed by a
corresponding call to get( ):
//An implementation of a producer and consumer
//that use semaphores to control
synchronization.
import java.util.concurrent.Semaphore;
class Q { int n;
// Start with consumer semaphore unavailable.
static Semaphore
semCon = new Semaphore(0); static Semaphore
semProd = new Semaphore(1);
void get() { try {
semCon.acquire();
} catch(InterruptedException e) {
System.out.println("InterruptedException
caught");
}
System.out.println("Got: " + n);
semProd.release();
}
void put(int n) { try {
semProd.acquire();
} catch(InterruptedException e) {
System.out.println("InterruptedException
caught");
}
this.n = n; System.out.println("Put:
" + n); semCon.release();
}
}
class Producer implements Runnable { Q q;
Producer(Q q) { this.q = q;
new Thread(this, "Producer").start();
}
public void run() {
for(int i=0; i < 20; i++) q.put(i);
}
}
class Consumer implements Runnable { Q q;
Consumer(Q q) { this.q = q;
new Thread(this, "Consumer").start();
}
public void run() {
for(int i=0; i < 20; i++) q.get();
}
}
class ProdCon {
public static void main(String args[]) { Q q =
new Q();
new Consumer(q); new Producer(q);
}
}
Put: 0 Got: 0 Put: 1 Got: 1 Put: 2 Got: 2 Put:
3 Got: 3 Put: 4 Got: 4 Put: 5 Got: 5
.
.
As you can see, the calls to put( ) and get( ) are synchronized. That is, each call to put( ) is followed by a call to get( ) and no values are missed. Without the semaphores, multiple
calls to put( ) would have occurred
without matching calls to get( ),
resulting in values being missed. (To prove this, remove the semaphore code and
observe the results.)
The sequencing of put( ) and get( ) calls is handled by two semaphores: semProd and semCon.
Before put( ) can produce a value,
it must acquire a permit from semProd.
After it has set the value, it
releases semCon. Before get( ) can consume a value, it must
acquire a permit from semCon. After
it consumes the value, it releases semProd.
This “give and take” mechanism ensures that each call to put( ) must be followed by a call to get( ).
Notice that semCon is initialized with no available
permits. This ensures that put( )
executes first. The ability to set the initial synchronization state is one of
the more powerful aspects of a semaphore.
CountDownLatch
Sometimes you will want a
thread to wait until one or more events have occurred. To handle such a
situation, the concurrent API supplies CountDownLatch.
A CountDownLatch is initially
created with a count of the number of events that must occur before the latch
is released. Each time an event happens, the count is decremented. When the
count reaches zero, the latch opens.
CountDownLatch has the following constructor:
CountDownLatch(int num)
Here, num specifies the number of events that must occur in order for the
latch to open. To wait on the latch, a thread calls await( ), which has the forms shown here:
void await( ) throws
InterruptedException
boolean await(long wait, TimeUnit tu) throws InterruptedException
The first form waits until
the count associated with the invoking CountDownLatch
reaches zero. The second form waits only for the period of time specified by wait. The units represented by wait are specified by tu, which is an object the TimeUnit enumeration.
(TimeUnit is described later in this chapter.) It returns false if the time limit is reached and true if the countdown reaches zero
To signal an event, call the countDown( ) method, shown next: void
countDown( )
Each call to countDown( ) decrements the count
associated with the invoking object.
The following program
demonstrates CountDownLatch. It
creates a latch that requires five events to occur before it opens.
// An example of CountDownLatch.
import java.util.concurrent.CountDownLatch;
class CDLDemo {
public static void main(String args[]) {
CountDownLatch cdl = new CountDownLatch(5);
System.out.println("Starting");
new MyThread(cdl);
try { cdl.await();
} catch (InterruptedException exc) {
System.out.println(exc);
}
System.out.println("Done");
}
}
class MyThread implements Runnable {
CountDownLatch latch;
MyThread(CountDownLatch c) { latch = c;
new Thread(this).start();
}
public void run() {
for(int i = 0; i<5; i++) {
System.out.println(i); latch.countDown();
// decrement count
}
}
}
The output produced by the
program is shown here:
Starting 0 1
2
3
4 Done
Inside main( ), a CountDownLatch
called cdl is created with an
initial count of five. Next, an instance of MyThread is created, which begins execution of a new thread. Notice
that cdl is passed as a parameter to
MyThread’s constructor and stored in
the latch instance variable. Then,
the main thread calls await( ) on cdl, which causes execution of the main
thread to pause until cdl’s count
has been decremented five times.
Inside the run( ) method of MyThread, a loop is created that iterates five times. With each
iteration, the countDown( ) method
is called on latch, which refers to cdl in main( ). After the fifth iteration, the latch opens, which allows
the main thread to resume.
CountDownLatch is a powerful yet easy-to-use synchronization object that is
appropriate whenever a thread must
wait for one or more events to occur.
CyclicBarrier
A situation not uncommon in
concurrent programming occurs when a set of two or more threads must wait at a
predetermined execution point until all threads in the set have reached that
point. To handle such a situation, the concurrent API supplies the CyclicBarrier class. It enables you to
define a synchronization object that suspends until the specified number of
threads has reached the barrier point.
CyclicBarrier has the following two constructors:
CyclicBarrier(int numThreads) CyclicBarrier(int numThreads, Runnable action)
Here, numThreads specifies the number of threads that must reach the
barrier before execution continues. In the second form, action specifies a thread that will be executed when the barrier is
reached.
Here is the general procedure
that you will follow to use CyclicBarrier.
First, create a CyclicBarrier object,
specifying the number of threads that you will be waiting for. Next, when each thread reaches the barrier,
have it call await( ) on that
object. This will pause execution of the thread until all of the other threads
also call await( ). Once the
specified number of threads has reached the barrier, await( ) will return and execution will resume. Also, if you have
specified an action, then that thread is executed.
The await( ) method has the following two forms:
int await( ) throws
InterruptedException, BrokenBarrierException
int await(long wait, TimeUnit tu)
throws InterruptedException,
BrokenBarrierException, TimeoutException
The first form waits until
all the threads have reached the barrier point. The second form waits only for
the period of time specified by wait.
The units represented by wait are
specified by tu. Both forms return a
value that indicates the order that the threads arrive at the barrier point.
The first thread returns a value equal to the number of threads waited upon
minus one. The last thread returns zero.
Here is an example that
illustrates CyclicBarrier. It waits
until a set of three threads has reached the barrier. When that occurs, the
thread specified by BarAction
executes.
// An example of CyclicBarrier.
import java.util.concurrent.*;
class BarDemo {
public static void main(String args[]) {
CyclicBarrier cb = new CyclicBarrier(3, new
BarAction() );
System.out.println("Starting");
new MyThread(cb, "A"); new
MyThread(cb, "B"); new MyThread(cb, "C");
}
}
// A thread of execution that uses a
CyclicBarrier.
class MyThread implements Runnable {
CyclicBarrier cbar;
String name;
MyThread(CyclicBarrier c, String n) { cbar = c;
name = n;
new Thread(this).start();
}
public void run() {
System.out.println(name);
try { cbar.await();
} catch (BrokenBarrierException exc) {
System.out.println(exc);
} catch (InterruptedException exc) {
System.out.println(exc);
}
}
}
//An object of this class is called when the
//CyclicBarrier ends.
class BarAction implements Runnable { public
void run() {
System.out.println("Barrier
Reached!");
}
}
The output is shown here.
(The precise order in which the threads execute may vary.)
Starting
A
B
C
Barrier Reached!
A CyclicBarrier can be reused because it will release waiting threads
each time the specified number of threads calls await( ). For example, if you change main( ) in the preceding program so that it looks like this:
public static void main(String args[]) {
CyclicBarrier cb = new CyclicBarrier(3, new
BarAction() );
System.out.println("Starting");
new MyThread(cb, "A"); new
MyThread(cb, "B"); new MyThread(cb, "C"); new MyThread(cb,
"X"); new MyThread(cb, "Y"); new MyThread(cb,
"Z");
}
the following output will be
produced. (The precise order in which the threads execute may vary.)
Starting
A
B
C
Barrier Reached!
X
Y
Z
Barrier Reached!
As the preceding example
shows, the CyclicBarrier offers a
streamlined solution to what was previously a complicated problem.
Exchanger
Perhaps the most interesting
of the synchronization classes is Exchanger.
It is designed to simplify the exchange of data between two threads. The
operation of an Exchanger is
astoundingly simple: it simply waits until two separate threads call its exchange( ) method. When that occurs,
it exchanges the data supplied by the threads. This mechanism is both elegant
and easy to use. Uses for Exchanger
are easy to imagine. For example, one thread might prepare a buffer for
receiving information over a network connection. Another thread might fill that
buffer with the information from the connection. The two threads work together
so that each time a new buffer is needed, an exchange is made.
Exchanger is a generic class that is declared as shown here: Exchanger<V>
Here, V specifies the type of the data being exchanged.
The only method defined by Exchanger is exchange( ), which has the two forms shown here:
V exchange(V objRef) throws InterruptedException
V exchange(V objRef, long wait, TimeUnit tu) throws
InterruptedException, TimeoutException
Here, objRef is a reference to the data to exchange. The data received
from the other thread is returned. The second form of exchange( ) allows a time-out period to be specified. The key point
about exchange( ) is that it won’t
succeed until it has been called on the same Exchanger object by two separate threads. Thus, exchange( ) synchronizes the exchange
of the data.
Here is an example that
demonstrates Exchanger. It creates
two threads. One thread creates an empty buffer that will receive the data put
into it by the second thread. In this case, the data is a string. Thus, the
first thread exchanges an empty string for a full one.
// An example of Exchanger.
import java.util.concurrent.Exchanger;
class ExgrDemo {
public static void main(String args[]) {
Exchanger<String> exgr = new Exchanger<String>();
new UseString(exgr); new MakeString(exgr);
}
}
// A Thread that constructs a string.
class MakeString implements Runnable {
Exchanger<String> ex; String str;
MakeString(Exchanger<String> c) { ex = c;
str = new String();
new Thread(this).start();
}
public void run() { char ch = 'A';
for(int i = 0; i < 3; i++) {
// Fill Buffer
for(int j = 0; j < 5; j++)
str += ch++;
try {
// Exchange a full buffer for an empty one.
str = ex.exchange(str);
} catch(InterruptedException exc) {
System.out.println(exc);
}
}
}
}
// A Thread that uses a string.
class UseString implements Runnable {
Exchanger<String> ex;
String str; UseString(Exchanger<String>
c) {
ex = c;
new Thread(this).start();
}
public void run() {
for(int i=0; i < 3; i++) { try {
// Exchange an empty buffer for a full one.
str = ex.exchange(new String());
System.out.println("Got: " + str);
} catch(InterruptedException exc) {
System.out.println(exc);
}
}
}
}
Here is the output produced
by the program:
Got: ABCDE
Got: FGHIJ
Got: KLMNO
In the program, the main( ) method creates an Exchanger for strings. This object is
then used to synchronize the exchange of strings between the MakeString and UseString classes. The MakeString
class fills a string with data. The UseString
exchanges an empty string for a full one. It then displays the contents of the
newly constructed string. The exchange of empty and full buffers is
synchronized by the exchange( )
method, which is called by both classes’ run(
) method.
Related Topics
Privacy Policy, Terms and Conditions, DMCA Policy and Compliant
Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.