Home | | Multi - Core Architectures and Programming | Sharing Data Between Threads

Chapter: Multicore Application Programming For Windows, Linux, and Oracle Solaris : Using POSIX Threads

Sharing Data Between Threads

A key advantage of multithreaded codes is that all threads see the same memory, so data is already shared between threads.

Sharing Data Between Threads

 

A key advantage of multithreaded codes is that all threads see the same memory, so data is already shared between threads. However, it often important to coordinate access to this data, since failure to coordinate accesses could cause data races that lead to incorrect results. POSIX provides a large number of synchronization and data-sharing methods.

 

Protecting Access Using Mutex Locks

 

A mutex lock is a mechanism supported by the POSIX standard that can be acquired by only one thread at a time. Other threads that attempt to acquire the same mutex must wait until it is released by the thread that currently has it.

 

Before they can be used, mutex locks need to be initialized to the appropriate state by a call to pthread_mutex_init() or, for statically defined mutexes, by assignment with the value PTHREAD_MUTEX_INITIALIZER. The call to pthread_mutex_init() takes an optional parameter that points to attributes describing the type of mutex required. Initialization through static assignment uses default parameters, as does passing in a null pointer in the call to pthread_mutex_init().

 

Once a mutex is no longer needed, the resources it consumes can be freed with a call to pthread_mutex_destroy(). Listing 5.19 shows examples of initializing and destroy-ing mutexes.

Listing 5.19   Creating and Destroying Mutexes

#include <pthread.h>

 

...

 

pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t m2;

 

pthread_mutex_init( &m2, 0 );

 

...

 

pthread_mutex_destroy( &m1 );

pthread_mutex_destroy( &m2 );

 

A thread can lock a mutex by calling pthread_mutex_lock(). Once it has finished with the mutex, the thread calls pthread_mutex_unlock(). If a thread calls pthread_ mutex_lock() while another thread holds the mutex, the calling thread will wait, or block, until the other thread releases the mutex, allowing the calling thread to attempt to acquire the released mutex.

 

In many situations, it is not desirable for the calling thread to wait for the mutex to be available. The call pthread_mutex_trylock() will attempt to acquire the mutex. If it succeeds, the function will return the value of zero, and the calling thread will now be the owner of the mutex. If the mutex is already locked by another thread, the function will immediately return a nonzero value indicating the exact situation.

 

The code shown in Listing 5.20 shows a mutex lock protecting the variable count against simultaneous access by multiple threads. The variable count is declared as volatile to ensure that it is read from memory on each access and written back to memory after each access. Without the mutex lock, there would be a data race between the two threads. Hence, it is very unlikely that count would end up with the correct value.

 

Listing 5.20   Mutex Lock Avoiding Data Race

#include <pthread.h> #include <stdio.h>

 

pthread_mutex_t mutex; volatile int counter = 0;

 

void * count( void * param )

 

{

 

for ( int i=0; i<100; i++ )

 

{

 

pthread_mutex_lock( &mutex );

 

counter++;

 

printf( "Count = %i\n", counter );

pthread_mutex_unlock( &mutex );

 

}

 

}

 

int main()

 

{

 

pthread_t thread1, thread2; pthread_mutex_init( &mutex, 0 ); pthread_create( &thread1, 0, count, 0 ); pthread_create( &thread2, 0, count, 0 ); pthread_join( thread1, 0 ); pthread_join( thread2, 0 ); pthread_mutex_destroy( &mutex );

 

return 0;

 

}

Mutex Attributes

 

Mutexes can be shared between multiple processes. By default, mutexes are private to a process. To create a mutex that can be shared between processes, it is necessary to set up the attributes for pthread_mutex_init(), as shown in Listing 5.21.

 

Listing 5.21   Creating a Mutex That Can Be Shared Between Processes

#include <pthread.h>

 

int main()

 

{

 

pthread_mutexattr_t attributes;

 

pthread_mutex_t mutex;

 

pthread_mutexattr_init( &attributes );

 

pthread_mutexattr_setpshared( &attributes, PTHREAD_PROCESS_SHARED );

 

pthread_mutex_init( &mutex, &attributes );

 

pthread_mutexattr_destroy( &attributes );

 

...

 

}

The attributes structure pthread_mutexattr_t is initialized with default values by a call to pthread_mutexattr_init(). A call to pthread_mutex_setpshared() with a pointer to the attribute structure and the value PTHREAD_PROCESS_SHARED sets the attributes to cause a shared mutex to be created. By default, mutexes are not shared between processes; calling pthread_mutex_setpshared() with the value PTHREAD_ PROCESS_PRIVATE restores the attribute to the default.

These attributes are passed into the call to pthread_mutex_init() to set the attrib-utes of the initialized mutex. Once the attributes have been used, they can be disposed of by a call to pthread_mutex_attr_destroy().

 

A mutex can have other attributes set using the same mechanism:

 

n   The type of mutex. This can be a normal mutex, a mutex that detects errors such as multiple attempts to lock the mutex, or a recursive mutex that can be locked multiple times and then needs to be unlocked the same number of times.

 

n   The protocol to follow when another thread is waiting for the mutex. This can be the default of no change to thread priority, that the thread holding the mutex inherits the priority of any higher-priority thread waiting for the mutex, or that the thread gets the highest priority associated with the mutexes held by it.

 

n   The priority ceiling of the mutex. This is the priority that any lower-priority thread will be elevated to while it holds the mutex.

 

The attributes governing the priority of any thread holding the mutex are designed to avoid problems of priority inversion where a higher-priority thread is waiting for a lower-priority thread to release the mutex.

 

Using Spin Locks

 

The critical difference between spin locks and mutex locks is that a spin lock will keep spinning in a tight loop and consuming processor resources until it finally acquires the lock. Mutex locks will immediately put a thread to sleep when it cannot get the mutex, or an adaptive mutex lock will spin for a short time waiting for the lock to become free before going to sleep.

 

The interface for spin locks is very similar to that of mutex locks. The call pthread_spin_init() will initialize a spin lock. The spin lock can be created as share-able between processes or private to the process creating it. A spin lock that is private to a process is created by passing the value PTHREAD_PROCESS_PRIVATE as a parameter to the call to pthead_spin_init(), and passing the value PTHREAD_PROCESS_SHARED creates a spin lock that can be shared between processes.

 

Multiple threads in the process that created the lock will always be able to access it. However, if the spin lock is created to be private to a process, the behavior of the lock is not defined if it is used by other processes. The default is for the spin lock to be private to the creating process.

 

The call pthread_spin_lock() will spin until the lock is acquired, the call pthread_ spin_unlock() will release the lock, and finally the call pthread_spin_destroy() will release any resources used by the lock. Listing 5.22 demonstrates the use of a spin lock. This example places access to a local variable under the control of the spin lock; however, in this example, the variable is not shared between threads, so it is not actually necessary to use any locking.

Listing 5.22  Code Using a Spin Lock to Protect Access to a Variable

#include <pthread.h> pthread_spinlock_t lock;

void lockandunlock()

 

{

 

int i = 10000; while ( i>0 )

{

 

pthread_spin_lock( &lock ); i--;

pthread_spin_unlock( &lock );

 

}

 

}

 

int main()

 

{

 

pthread_spin_init( &lock,

PTHREAD_PROCESS_PRIVATE ); lockandunlock();

 

pthread_spin_destroy( &lock );

 

}

 

If the code is modified to create a spin lock that is shared between multiple processes, only one process should initialize and destroy the spin lock. The modification shown in Listing 5.23 will create a spin lock that can be shared between processes.

 

Listing 5.23   Creating a Spin Lock That Can Be Shared Between Processes

int main()

 

{

 

pthread_spin_init(&lock,PTHREAD_PROCESS_SHARED);

 

lockandunlock();

 

pthread_spin_destroy(&lock);

 

}

In addition, the call pthread_spin_trylock() will attempt to acquire the lock but will immediately return whether or not the lock is successfully acquired. Since a spin-ning lock will be using processor resources, it might be more useful to attempt to acquire the lock and, if that fails, to complete some other task before repeating the test. This utilizes the processor in useful work rather than just spinning. Listing 5.24 shows the earlier code modified to use pthread_spin_trylock() and keep a count of the number of times the thread fails to get the lock.

Listing 5.24   Counting the Number of Times That the Spin Lock Fails to Be Acquired

void lockandunlock()

 

{

 

int i = 0; int count = 0;

while ( i == 0 )

 

{

 

if ( pthread_spin_trylock( &lock ) )

 

{

 

i++;

 

pthread_spin_unlock( &lock );

 

}

 

else

 

{

 

count++;

 

}

 

}

 

printf( "Failed tries = %i\n", count );

 

}


Read-Write Locks

 

Read-write locks allow multiple threads to simultaneously read a resource, but only a single thread may update that resource at any time. They share a similar initialization and destruction syntax to mutex locks in that they take a set of attributes and can be initial-ized either through a call to pthread_rwlock_init() or statically initialized by assign-ment of the value PTHREAD_RWLOC_INITIALIZER. Listing 5.25 shows the two methods.

 

Listing 5.25   Initializing a Read-Write Lock

pthread_rwlock_t lock1, lock2;

 

...

 

pthread_rwlock_init( &lock1, 0 ); lock2 = PTHREAD_RWLOCK_INITIALIZER;

...

 

If the attributes passed to the initialization routine are zero, then the lock is initialized with the default attribute of being private to the creating process. To create a read-write lock that is shared between processes, it is necessary to create and use a set of attributes.

 

The call pthread_rwlockattr_init() initializes the attributes, while the call pthread_rwlockattr_setpshared() sets the shared attribute to the desired value. This set of attributes can then be passed into the pthread_rwlock_init() call to set the attributes for the read-write lock being created. Listing 5.26 demonstrates this.

 

Listing 5.26  Creating a Read-Write Lock That Can Be Shared Between Processes

pthread_rwlockattr_t attributes; pthread_rwlock_t lock;

...

 

pthread_rwlockattr_init( &attr ); pthread_rwlockattr_setpshared( &attr, PTHREAD_PROCESS_SHARED ); pthread_rwlock_init( &lock, &attr ); pthread_rwlockattr_destroy( &attr );

 

...

pthread_rwlock_destroy( &lock );

Once the lock has been created, it no longer references the attributes, so these can be either reused for a different lock or destroyed. The call to destroy the attributes is pthread_rwlockattr_destroy(). The resources consumed by the read-write lock are freed by the call to pthread_rwlock_destroy().

 

Read-write locks have a more complex interface than mutex locks because they can be locked and unlocked for either reading or writing. Hence, there are two pairs of lock and unlock calls. The pairs pthread_rwlock_rdlock() and pthread_rwlock_ rdunlock()lock and unlock for reading, and pthread_rwlock_wrlock() and pthread_rwlock_wrunlock()lock and unlock for writing. Listing 5.27 shows how the read-write lock might be used to protect access to a shared resource.

 

Listing 5.27   Using a Read-Write Lock to Protect Access to a Shared Resource

int readMatrix( int x,int y )

 

{

 

int result;

 

pthread_rwlock_rdlock( &lock );

 

result = matrix[x,y];

 

pthread_rwlock_rdunlock( &lock );

 

return result;

 

}

 

void updateMatrix( int x,int y,int value );

 

{

 

pthread_rwlock_wrlock( &lock );

 

matrix[x,y] = value;

 

pthread_rwlock_wrunlock( &lock );

 

}

The read-write lock is unnecessary in this short code snippet because load or store accesses to aligned integer variables are atomic. The lock would be critical if the updates and reads were of structures that required multiple writes.

The behavior of calls to acquire the lock is for the thread to block until the lock is acquired. However, there are also calls to try to acquire the lock. These calls return immediately either with or without having acquired the lock. To try to get a reader lock, the call is pthread_rwlock_tryrwlock(), and to try to acquire the lock as a writer, the call is pthread_rwlock_trywrlock(). Listing 5.28 shows an example of using these calls.

 

Listing 5.28   Updating a Shared Value Only If the Read-Write Lock Is Acquired

void typeUpdate( int value )

 

{

if ( pthread_rwlock_trywrlock( &lock ) == 0 )

 

{

 

count += value;

 

pthread_rwlock_wrunlock( &lock );

 

}

 

}

 

There is a further option of using a timeout when acquiring the lock. The routines pthread_rwlock_timedrdlock() and pthread_rwlock_timedwrlock() return 0 if the lock is acquired or an error code if the lock has not been acquired by the absolute time specified. The routines pthread_rwlock_timedrdlock_np() and pthread_ rwlock_timedwrlock_np() return an error code if the lock has not be acquired by the relative time specified.

A timespec structure is used to pass the timing information into the function. For rel-ative timing, this structure needs to be initialized with the duration of the wait time; for absolute time, the structure can be initialized with the current time using a call to clock_gettime(). Listing 5.29 demonstrates how the timeout can be set to elapse in five seconds.

 

Listing 5.29   Acquiring a Read-Write Lock with a Timeout

#include <time.h>

 

void timeout_lock()

 

{

 

struct timespec now;

 

clock_gettime( CLOCK_REALTIME, &now ); now.tv_sec += 5;

 

if ( pthread_rwlock_timedrdlock( &lock, now ) == 0 )

 

{

 

...

 

pthread_rwlock_rdunlock( &lock );

 

}

 

}

Alternatively, the code could directly call the relative timeout, as shown in Listing 5.30.

 

Listing 5.30   Setting a Relative Timeout

#include <time.h>

 

void timeout_lock()

 

{

 

struct timespec now;

 

now.tv_sec = 5; now.tv_nsec = 0;

if ( pthread_rwlock_timedrdlock_np( &lock, now ) == 0 )

 

{

 

...

 

pthread_rwlock_rdunlock( &lock );

 

}

 

}

Barriers

 

There are situations where a program needs to wait until an entire group of threads has completed its work before further progress can be made. This is a barrier.

 

A barrier is created by a call to pthread_barrier_init(). The call to initialize the barrier takes the following:

 

n   A pointer to the barrier to be initialized.

 

n   An optional attributes structure, this structure determines whether the barrier is private to a process or shared across processes.

 

n   The number of threads that need to reach the barrier before any threads are released.

 

The resources consumed by a barrier can be released by a call to pthread_barrier_ destroy().

 

Each thread calls pthread_barrier_wait() when it reaches the barrier. This call will return when the appropriate number of threads has reached the barrier. The code in Listing 5.31 demonstrates using a barrier to cause the threads in an application to wait until all the threads have been created.

 

Listing 5.31   Creating and Using a Barrier

#include <pthread.h>

 

#include <stdio.h>

 

 

pthread_barrier_t barrier;

 

void * work( void* param)

 

{

 

int id=(int)param;

 

printf( "Thread arrived %i\n", id );

 

pthread_barrier_wait( &barrier );

 

printf( "Thread departed %i\n", id );

 

}

 

int main()

 

{

 

pthread_t threads[10];

 

pthread_barrier_init( &barrier, 0, 10 );

 

for ( int i=0; i<10; i++ )

 

{

 

pthread_create( &threads[i], 0, work, (void*)i );

 

}

 

for ( int i=0; i<10; i++ )

{

 

pthread_join( threads[i], 0 );

 

}

 

pthread_barrier_destroy( &barrier );

 

}

The output from the program would show all the threads arriving at the barrier and then all the threads departing from the barrier. Without the barrier, the arrivals and departures of all the threads would be mixed.

 

Semaphores

 

A semaphore is a counting and signaling mechanism. One use for it is to allow threads access to a specified number of items. If there is a single item, then a semaphore is essen-tially the same as a mutex, but it is more commonly useful in a situation where there are multiple items to be managed. Semaphores can also be used to signal between threads or processes, for example, to tell another thread that there is data present in a queue. There are two types of semaphores: named and unnamed semaphores.

 

An unnamed semaphore is initialized with a call to sem_init(). This function takes three parameters. The first parameter is a pointer to the semaphore. The next is an inte-ger to indicate whether the semaphore is shared between multiple processes or private to a single process. The final parameter is the value with which to initialize the semaphore. A semaphore created by a call to sem_init() is destroyed with a call to sem_destroy().

 

The code shown in Listing 5.32 initializes a semaphore with a count of 10. The mid-dle parameter of the call to sem_init() is zero, and this makes the semaphore private to the thread; passing the value one rather than zero would enable the semaphore to be shared between multiple processes.

 

Listing 5.32  Creating and Initializing a Semaphore

 

 

 

#include <semaphore.h>

int main()

 

{

 

sem_t semaphore;

 

sem_init( &semaphore, 0, 10 );

 

...

 

sem_destroy( &semaphore );

 

}

A named semaphore is opened rather than initialized. The process for doing this is similar to opening a file. The call to sem_open() returns a pointer to a semaphore. The first parameter to the call is the name of the semaphore. The name must conform to the naming conventions for files on the operating system and must start with a single slash sign and contain no further slash signs. The next parameter is the set of flags. There are three combinations of flags that can be passed to the sem_open() call. If no flags are passed, the function will return a pointer to the existing named semaphore if it exists and if the semaphore has the appropriate permissions to be shared with the calling process. If the O_CREAT flag is passed, the semaphore will be created; if it does not exist or if it does exist, a pointer will be returned to the existing version. The flag O_EXCL can be passed with the O_CREAT flag. This will successfully return a semaphore only if that semaphore does not already exist.

 

Creating a semaphore requires two additional parameters: the permissions that the semaphore should be created with and the initial value for the semaphore. Listing 5.33 shows an example of opening a semaphore with an initial value of 10, with read and write permissions for the user, the group, and all users.

 

Listing 5.33   Opening a Named Semaphore

#include <semaphore.h>

 

int main()

 

{

 

sem_t * semaphore;

 

semaphore = sem_open( "/my_semaphore", O_CREAT, 0777, 10 );

 

...

Listing 5.34   Closing and Unlinking a Named Semaphore

sem_close( semaphore );

 

sem_unlink( "/my_semaphore" );

 

}

The semaphore is used through a combination of three methods. The function sem_wait() will attempt to decrement the semaphore. If the semaphore is already zero, the calling thread will wait until the semaphore becomes nonzero and then return, hav-ing decremented the semaphore. The call sem_trywait() will return immediately either having decremented the semaphore or if the semaphore is already zero. The call to sem_post() will increment the semaphore. One more call, sem_getvalue(), will write the current value of the semaphore into an integer variable. The code in Listing 5.35 shows a semaphore used in the same way as a mutex might be, to protect the increment of the variable count. On Solaris, the semaphore functions are defined in the real-time library, so code needs to be linked with this library using -lrt.

 

Listing 5.35   Using a Semaphore as a Mutex

int main()

 

{

 

sem_t semaphore;

 

int count = 0;

 

sem_init( &semaphore, 0, 1 ); sem_wait( &semaphore );

count++;

 

sem_post( &semaphore ); sem_destroy( &semaphore );

}

Another property of semaphores that is not fully exploited when using them as mutex locks is signaling between threads. Semaphores can be used to signal that one task has been completed or to ensure that two tasks will be executed in a predetermined order. Consider the code shown in Listing 5.36.

 

Listing 5.36   Two Threads Executing Two Functions in a Nondeterministic Order

#include <pthread.h> #include <stdio.h>

 

void *func1( void * param )

 

{

 

printf( "Thread 1\n" );

 

}

void *func2( void * param )

 

{

 

printf( "Thread 2\n" );

 

}

 

int main()

 

{

pthread_t threads[2];

 

pthread_create( &threads[0], 0, func1, 0 ); pthread_create( &threads[1], 0, func2, 0 ); pthread_join( threads[0], 0 ); pthread_join( threads[1], 0 );

 

}

 

At runtime, the code can print either "Thread 1" or "Thread 2" first, depending on which thread gets to the printf() statement first. Semaphores can be used to ensure that the threads execute in a specific order. Suppose we want to ensure that the output is always "Thread 1" before "Thread 2"; then we need to make the second thread wait until the first thread completes before the second thread produces its output. Listing 5.37 shows how a semaphore can be used to ensure this ordering.

 

Listing 5.37   Using a Semaphore to Enforce a Deterministic Ordering on Two Threads

#include <pthread.h>

 

#include <stdio.h>

 

#include <semaphore.h>

 

sem_t semaphore;

 

void *func1( void * param )

 

{

 

printf( "Thread 1\n" );

 

sem_post( &semaphore );

 

}

 

void *func2( void * param )

 

{

 

sem_wait( &semaphore ); printf( "Thread 2\n" );

}

 

int main()

 

{

pthread_t threads[2];

 

sem_init( &semaphore, 0, 1 );

 

pthread_create( &threads[0], 0, func1, 0 );

pthread_create( &threads[1], 0, func2, 0 );

pthread_join( threads[0], 0 );

 

pthread_join( threads[1], 0 );

 

sem_destroy( &semaphore );

 

}

 

The code creates a semaphore. Once the first thread completes its task, it signals the semaphore that the second thread can now perform its task. The logic of the second thread will cause it to wait at the semaphore until the first thread signals it, or if it does not reach the semaphore before the first thread completes its task, the second will not even wait at the semaphore. This use of a single semaphore ensures that the second thread always executes the printf() statement after the first thread has completed its printf() statement.

 

An extension of this ordering mechanism is the producer-consumer configuration of threads, as shown in Listing 5.38. The semaphore in this instance contains the number of items waiting in the queue to be processed. If there are no items in the queue, the con-sumer thread can sleep until an item is placed in the queue by the producer. The code uses the semaphore as a signaling mechanism between the two threads, not as a mecha-nism that ensures mutual exclusion to accesses to the queue. The code that manipulates the queue is omitted, but this code would ensure that multiple threads can safely simul-taneously access the queue data structure.

 

Listing 5.38   Using a Semaphore in a Producer-Consumer System

#include <pthread.h>

 

#include <stdio.h>

 

#include <semaphore.h>

 

sem_t semaphore;

 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

 

int queue[200]; int queueLength;

 

void *producer( void * param)

 

{

 

for ( int i=0; i<500; i++ )

 

{

 

// Add item to queue pthread_mutex_lock( &mutex ); queue[ queueLength++ ] = i; pthread_mutex_unlock( &mutex ); // Signal semaphore

sem_post( &semaphore );

 

}

 

}

void *consumer(void * param)

 

{

 

for ( int i=0; i<500; i++ )

 

{

 

int item;

 

// Wait if nothing in queue

 

if (queueLength==0) { sem_wait(&semaphore); }

 

pthread_mutex_lock( &mutex ); item = queue[ --queueLength ]; pthread_mutex_unlock( &mutex ); printf( "Received %i\n", item);

}

 

}

 

int main()

 

{

pthread_t threads[2];

 

sem_init( &semaphore, 0, 0 );

 

pthread_create( &threads[0], 0, producer, 0 ); pthread_create( &threads[1], 0, consumer, 0 ); pthread_join( threads[0], 0 );

pthread_join( threads[1], 0 );

 

sem_destroy( &semaphore );

 

}

Controlling access to a finite number of elements is another situation where a sema-phore is useful. This could be a real physical constraint, such as only sufficient spaces in a list exist or only a finite amount of memory has been reserved. Or it could be a throt-tling feature. For example, in the producer-consumer, we might want to limit the queue length to avoid stacking up too much work for the consumer. Listing 5.39 shows the modified version of the code.

 

Listing 5.39   Producer-Consumer Modified So That the Producer Thread Can Be Throttled

#include <pthread.h>

 

#include <stdio.h>

 

#include <semaphore.h>

 

sem_t semaphore;

 

sem_t limit;

 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

 

int queue[10]; int queueLength;

void *producer( void * param)

 

{

 

for ( int i=0; i<500; i++ )

 

{

 

// Wait for space

 

sem_wait( &limit );

 

// Add item to queue pthread_mutex_lock( &mutex ); queue[ queueLength++ ] = i; pthread_mutex_unlock( &mutex ); // Signal semaphore

sem_post( &semaphore );

 

}

 

}

 

void *consumer(void * param)

 

{

 

for ( int i=0; i<500; i++ )

 

{

 

int item;

 

// Wait if nothing in queue

 

if (queueLength==0) { sem_wait(&semaphore); }

 

pthread_mutex_lock( &mutex ); item = queue[ --queueLength ]; pthread_mutex_unlock( &mutex ); printf( "Received %i\n", item);

sem_post( &limit );

 

}

 

}

 

int main()

 

{

pthread_t threads[2];

 

sem_init( &semaphore, 0, 0 );

 

sem_init( &limit, 0, 10 );

 

pthread_create( &threads[0], 0, producer, 0 ); pthread_create( &threads[1], 0, consumer, 0 ); pthread_join( threads[0], 0 );

pthread_join( threads[1], 0 );

 

sem_destroy( &limit );

 

sem_destroy( &semaphore );

 

}

The modifications introduce a second semaphore, limit. This semaphore is initialized with a value of 10. Before the producer threads adds an item to the queue, it calls sem_wait(), which will decrement the value of the semaphore. Every time the con-sumer thread removes an item from the queue, it calls sem_post(), which will increase the value of the semaphore. When the semaphore reaches zero, the producer thread will call sem_wait() and will not return from the call until the consumer thread has removed an item from the list. This will stop the producer thread from adding more items to the queue before the consumer thread has had the chance to deal with those items already there.

 

Condition Variables

 

Condition variables enable threads to communicate state changes. Using them requires both a mutex and a condition variable, together with the additional state that threads need to check.

 

A condition variable is initialized with a call to pthread_cond_init(), which takes the address of the condition variable together with any attributes. Condition variables are destroyed with a call to pthread_cond_destroy(), passing the address of the condition variable.

 

The default for condition variables is to be private to a process. Attributes can be used to produce a condition variable shared between processes. Listing 5.40 demonstrates using attributes to create a shared condition variable.

 

Listing 5.40   Creating a Condition Variable That Can Be Shared Between Processes

#include <pthread.h>

 

pthread_cond_t CV;

 

int main()

 

{

 

pthread_condattr_t CVA; pthread_condattr_init( &CVA );

pthread_condattr_setpshared( &CVA, PTHREAD_PROCESS_SHARED );

 

pthread_cond_init( &CV, &CVA );

 

pthread_condattr_destroy( &CVA );

 

...

 

pthread_cond_destroy( &CV );

 

}

The condition variable requires an actual variable to be monitored. A producer-consumer is a good scenario to use for an example. We will use the variable length to denote the length of the queue. The condition variable is used to wake the consumer thread when the length of the queue is greater than zero. Listing 5.41 shows the initial-ization code.

 

Listing 5.41   Creating Threads and Condition Variable for Producer-Consumer Example

#include <pthread.h> #include <stdio.h>

 

pthread_cond_t cv; pthread_mutex_t mutex; int length;

 

int queue[200];

 

...

 

int main()

 

{

pthread_t threads[2];

 

pthread_cond_init( &cv, 0 );

 

pthread_mutex_init( &mutex, 0 );

 

length = 0;

 

pthread_create( &threads[0], 0, producer, 0 ); pthread_create( &threads[1], 0, consumer, 0 ); pthread_join( threads[1], 0 );

 

pthread_join( threads[0], 0 ); pthread_mutex_destroy( &mutex ); pthread_cond_destroy( &cv );

}

Listing 5.42 shows the code for the producer thread. The producer thread will obtain the mutex and then increment the length of the queue before using the condition vari-able to signal to waiting threads that there is an item in the queue. This signal will wake one of the waiting threads. After the signal has completed, the mutex can be released.

 

Listing 5.42   Code for Producer Thread

void * producer( void* param )

 

{

 

for (int i=0; i<200; i++)

 

{

 

pthread_mutex_lock( &mutex ); queue[ length++ ] = i;

pthread_cond_signal( &cv );

 

pthread_mutex_unlock( &mutex );

 

}

 

}

If there are no threads waiting on the condition variable, the call to pthread_cond_ signal() has no effect. Hence, it is really necessary to make the call only if the queue was empty before the item was added. It is only in this situation where the consumer thread might have been waiting for items to be placed in the queue. In situations where there are already items in the queue, the consumer thread will not have stopped.

 

It is also possible to use a broadcast to signal to all waiting threads that there is an item in the queue. The function pthread_cond_broadcast() wakes all the threads waiting on the condition variable. This is illustrated in the version of the producer thread shown in Listing 5.43.

 

Listing 5.43   Broadcasting the Arrival of a New Item to All Waiting Threads

void * producer( void* param )

 

{

 

item_t * item;

 

for( int i=0; i<200; i++ )

 

{

 

pthread_mutex_lock( &mutex ); queue[ length++ ] = i ;

pthread_cond_broadcast( &cv );

 

pthread_mutex_unlock( &mutex );

 

}

}

There is no advantage to using broadcast in a situation where there is only a single task to perform, since it will incur the overhead of waking all the threads and then send all but one of them back to sleep. It is useful in the situation where there are multiple independent tasks to complete and each woken thread is able to identify an independent item of work.

 

Listing 5.44 shows the code for the consumer thread. This is slightly more complex than the code for the producer thread. The consumer thread is placed in a while(true) loop. In this loop, the first thing it needs to do is to acquire the mutex in order to get access to the variable length, which, in this example, is the proxy for the queue.

 

Listing 5.44   Code for Consumer Thread

void * consumer( void* param)

 

{

 

for( int i=0; i<200; i++ )

 

{

 

pthread_mutex_lock(&mutex); while (length==0)

{

 

pthread_cond_wait( &cv, &mutex );

}

int item = queue[ --length ];

 

pthread_mutex_unlock(&mutex);

 

printf( "Received %i\n", item );

 

}

 

}

 

 

The consumer thread needs to wait on the condition variable only when there are no items in the queue. If there are items in the queue, the consumer thread can immediately remove one and process it. Once the consumer thread has decremented the queue, it can release the mutex and process the item.

 

If the queue is empty, the consumer thread will need to wait to be signaled by the producer thread. It does this by calling pthread_cond_wait() while still holding the mutex. This call will release the mutex while the thread is waiting, but when signaled, the thread will wake up holding the mutex again. The call to pthread_cond_wait() needs to be placed in a loop. The thread will be woken when it is signaled that length is greater than zero, but it may also be signaled when length does not meet these crite-ria. Therefore, the thread needs to loop calling pthread_cond_wait() until the condi-tion, in this instance the value of the variable length, is met.

 

An example of a thread being woken up when the condition is not true is when there are multiple threads waiting on the condition variable and all the threads are woken by a broadcast signal. If there is one item of work and two threads are woken, the first thread will get the item of work. When the second thread wakes, it will discover that there is no work for it. Hence, the second thread will appear to have suffered a spurious wake-up.

 

There is one problem that should be avoided when coding threads that wait on con-dition variables: the lost wake-up problem. Listing 5.45 shows an example.

 

Listing 5.45   Potential “Lost Wake-Up” Issue

void * consumer( void* param )

 

{

 

for( int i=0; i<200; i++ )

 

{

 

int item;

 

int go = 0;

 

pthread_mutex_lock( &mutex );

 

pthread_cond_wait( &cv, &mutex );

 

if (length > 0)

 

{

 

item = queue[ --length ]; go = 1;

 

}

 

pthread_mutex_unlock(&mutex); if (go)

{

 

printf( "Received %i\n", item );

 

}

 

}

 

}

In this version of the code, the consumer threads wait on the condition variable for each iteration of the loop. If the condition variable is signaled before the consumer thread reaches the wait call, then the signal is lost, and the consumer variable will wait until the next signal. If no further work is produced by the producer thread, the con-sumer thread will wait indefinitely, even though it has work waiting. This problem is compounded if the producer thread is set to signal only when new work was added to an empty queue; in this instance, the consumer thread will never get signaled, and the producer thread will keep adding work to the queue.

 

Condition variables have a method to provide a timeout when waiting to be signaled by the condition variable. The call is pthread_cond_timedwait(), which takes the timeout period, specified as an absolute time, as well as the condition variable and mutex. This call will return either holding the mutex lock or with an error code indicat-ing the reason for the return. The code in Listing 5.46 illustrates using this function call to count the number of minutes waited until the condition variable was signaled.

 

Listing 5.46   Using a Timeout to Count Elapsed Minutes

#include <time.h> #include <errno.h>

 

void * consumer( void* param )

 

{

 

for( int i=0; i<200; i++ )

 

{

 

int seconds = 0; pthread_mutex_lock( &mutex ); while ( length == 0 )

{

 

struct timespec now;

 

now.tv_sec = time( 0 ) + 1;

 

now.tv_nsec = 0;

 

if ( pthread_cond_timedwait( &cv, &mutex, &now ) == ETIMEDOUT )

 

{

 

seconds++;

 

}

 

}

 

int item = queue[ --length ];

 

if ( seconds ) { printf( "%i seconds waited\n", seconds ); } pthread_mutex_unlock( &mutex );

printf( "Received %i\n", item );

 

}

 

}

The code uses pthread_cond_timedwait() to wait in units of one second. Every time the call fails to acquire the mutex, a count of seconds waited is incremented. If the producer thread is modified so that it sleeps between producing each item, then it is pos-sible to see the timeout of the consumer thread.


Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail
Multicore Application Programming For Windows, Linux, and Oracle Solaris : Using POSIX Threads : Sharing Data Between Threads |


Privacy Policy, Terms and Conditions, DMCA Policy and Compliant

Copyright © 2018-2024 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.