Scaling
the Producer-Consumer to Multiple Threads
The simplest way to share the circular buffer between threads would be
to use some kind of mutual exclusion that would ensure that only a single
thread at a time could modify the structure. We previously encountered
spinlocks in Listing 8.7, and these would be appropriate to ensure that only a
single thread manipulates the circular list at a single time. Listing 8.23
shows the code modified so that access to the circular buffer is protected with
a spinlock.
Listing 8.23 Using
Mutual Exclusion to Ensure Exclusive Access to the Circular Buffer
#include <stdio.h> #include <pthread.h>
volatile int volatile buffer[16]; volatile int
addhere;
volatile
int removehere;
volatile int lock = 0;
void lock_spinlock( volatile int* lock )
{
while ( CAS(
lock, 0, 1 ) != 0 ) {} acquire_memory_barrier();
}
void free_spinlock( volatile int *lock )
{
release_memory_barrier();
*lock = 0;
}
void
clearbuffer()
{
addhere = 0; removehere = 0;
for( int i=0; i<16; i++ ) { buffer[i] = 0; }
}
int next( int current )
{
return ( current + 1 ) & 15;
}
void addtobuffer( int value )
{
lock_spinlock(
&lock );
while( buffer[ next(addhere) ] != 0 ) {} buffer[addhere] = value;
addhere = next(addhere); free_spinlock( &lock );
}
int removefrombuffer()
{
int value; lock_spinlock(
&lock );
while( ( value = buffer[removehere] ) == 0 ){} buffer[removehere] = 0;
removehere = next(removehere); free_spinlock( &lock ); return value;
}
There are several points to make about the code as
it stands. The first is that the cir-cular buffer has 16 entries, and although
this might be adequate for a single producer-consumer pair, it is unlikely to
remain so as the number of threads increases.
The second, more important, point is that the code
now contains a deadlock. Imagine that the circular buffer is empty and a
consumer thread acquires the lock and starts wait-ing for something to appear
in the buffer. A producer thread eventually arrives but now has to acquire the
lock before it can add anything into the buffer. Both threads end up spinning,
waiting for an event that can never happen.
There are two solutions to this particular instance
of the problem. The first solution is perhaps the most trivial, and that is to
provide two locks—one for threads waiting to add elements to the buffer and the
second for threads waiting to remove an item from the list. This solution works
in this situation because it reduces the problem down to the one described
earlier where there are only two threads present in the system. One lock
ensures that only one producer thread can access the circular buffer at a time.
The other lock ensures that only one consumer thread can access the circular
buffer at a time. The other producer and consumer threads cannot interfere and
cause correctness problems.
This solution relies on the original semantics of
the code to provide a thread-safe ver-sion in the presence of two threads. For
illustrative purposes, consider how we could modify the code so that only a
single lock was necessary.
One way of doing this is to place the critical section inside another
loop, which repeats the loop until the critical section is successfully
executed. This requires modification to the addtobuffer() and removefrombuffer() routines
so that they no longer loop inside the critical section and instead quickly
return success or failure. Listing 8.24 shows code modified to remove the loops
inside the critical section.
Listing 8.24 Using
Mutual Exclusion to Ensure Exclusive Access to the Circular Buffer
void
addtobuffer( int value )
{
int success =
0;
while(
!success )
{
lock_spinlock( &lock );
if( buffer[ next(addhere) ] == 0 )
{
buffer[addhere] = value; addhere = next(addhere);
success = 1;
}
free_spinlock( &lock );
}
}
int
removefrombuffer()
{
int value;
int success =
0;
while (
!success )
{
lock_spinlock( &lock );
if ( ( value = buffer[removehere] ) != 0 )
{
buffer[removehere] = 0; removehere =
next(removehere);
success = 1;
}
free_spinlock( &lock );
}
return value;
}
The code uses a variable success to determine whether the critical region was suc-cessful. Although this
change results in the desired behavior for the code, it is not the best code to
run on the system. The problem with the code is that while threads are unable
to add or remove items from the queue, the spinlock is constantly being
acquired and released. This results in significant traffic between the cores
invalidating and fetching the cache line containing the lock variable. Both the acquisition and release of the variable lock
result in a store operation, which causes all the
other caches to invalidate the cache line containing the lock. When the next thread acquires the spinlock, it
has to fetch it from the cache of the processor that last updated the lock.
This activity causes the cache line containing the variable lock to be
constantly being passed between the caches of different virtual CPUs and may
have an impact on the performance of the system.
For this code, a thread can easily determine whether it is likely to be
successful in accessing the buffer. This test for success is to load the next
element in the buffer array
and see whether it is zero. The advantage of using load instructions is that
the cache line fetched by a load remains resident in cache until it is
invalidated by a store operation. In practical terms, each thread will spin on
the appropriate variable waiting for it to be modified. This causes no
invalidation of the values held in other caches until the variable is actually
updated. Consequently, there is little risk of there being a performance impact
from this scheme.
When the next element in the buffer array becomes
zero, it indicates that the thread may be successful if it enters the critical
region. Only at that point will the thread attempt to enter the critical
region, and only at that point will there be any invalidation of data in cache
lines. Listing 8.25 shows a modified version of the source code.
Listing 8.25 Avoiding Unnecessary Invalidations of Cache Lines
void addtobuffer( int value )
{
int success = 0; while ( !success )
{
if ( buffer[
next(addhere) ] == 0 ) // Wait for an empty space
{
lock_spinlock( &lock );
if( buffer[ next(addhere) ] == 0 )
{
buffer[addhere] = value; addhere = next(addhere); success = 1;
}
free_spinlock( &lock );
}
}
}
int removefrombuffer()
{
int value;
int success = 0; while ( !success )
{
if (
buffer[removehere] != 0 ) // Wait for an item to be added
{
lock_spinlock( &lock );
if ( ( value = buffer[removehere] ) != 0 )
{
buffer[removehere] = 0; removehere =
next(removehere); success = 1;
}
free_spinlock( &lock );
}
}
return value;
}
The modified code contains the same mechanism that
keeps the thread spinning until it is successful. Within that loop, the thread
tests to see whether it might be successful before actually obtaining the lock.
Although this reduces the number of invalidations of the shared data and
reduces the memory traffic, it is still not an optimal solution.
The problem with this code is that every time a new
item is added to the circular buffer or every time a space becomes free, all
the waiting threads recognize this and attempt to acquire the lock, even though
only one thread can actually succeed. This is an example of the thundering herd problem where a number
of threads are waiting for a con-dition to become true and only one thread can
successfully proceed, so all the other threads end up using unnecessary
resources. This problem can be resolved by ordering the list of threads such
that only one thread is allowed to proceed.
However, the problem is worse than this. All the
threads that identify the opportunity to access the circular buffer will enter
the if statement and can exit only
after they have acquired and released the spinlock. So, these threads will end
up spinning on the spin-lock, which was not the intention of the code.
To remove this problem, we should change the code so that instead of
spinning on the spinlock, the threads try to acquire it, and if they do not,
then they should return to the outer loop and wait for the next opportunity to
access the circular buffer. Listing 8.26 shows the modified code. It introduces
a function called try_spinlock() that
will either acquire the spinlock and return true or fail to acquire the lock
and return false. With this modification, the threads spin on the variable,
indicating the state of the circular buffer. This variable is shared so it does
not produce much memory traffic. If the state changes, the threads attempt to
get the spinlock. Only one thread will succeed, and this thread gets to access
the buffer while the other threads go back to spinning on the shared variable.
With this change, the spinlock has ceased to be used as a spinlock since the
threads spin before attempting to acquire the lock.
Listing 8.26 Avoiding Unnecessary Invalidations of Cache Lines Using try_spinlock()
int
try_spinlock( volatile int* lock )
{
if ( CAS(
lock, 0, 1 ) == 1 ) { return 0; } else
{
acquire_memory_barrier();
return 1;
}
}
void addtobuffer( int value )
{
int success = 0; while ( !success )
{
if ( buffer[ next(addhere) ] == 0 )
{
if
(try_spinlock( &lock ) )
{
if ( buffer[ next(addhere) ] == 0 )
{
buffer[addhere] = value; addhere = next(addhere); success = 1;
}
free_spinlock( &lock );
}
}
}
}
int removefrombuffer()
{
int value;
int success = 0; while ( !success )
{
if ( buffer[removehere] != 0 )
{
if (
try_spinlock( &lock ) )
{
if ( ( value = buffer[removehere] ) !=0 )
{
buffer[removehere] = 0; removehere =
next(removehere);
success = 1;
}
free_spinlock( &lock );
}
}
}
return value;
}
A further improvement to the code would be to
rewrite the spinlock so that it has these semantics. Listing 8.27 shows this
improvement to the spinlock code. This code spins until the lock has been
acquired by the calling thread. Every iteration, the loop tests whether the
lock is available. If the lock is available, the code attempts to acquire the
lock atomically. If successful, the code exits the loop having acquired the
spinlock. If unsuccessful, the loop continues to spin.
Listing 8.27 Reducing
Number of CAS Operations When Spinning
void
lock_spinlock( volatile int* lock )
{
int acquired = 0; while ( !acquired )
{
if ( ( *lock == 0 ) && ( CAS( lock, 0, 1 ) == 0) )
{
acquired = 1;
}
}
acquire_memory_barrier();
}
void
addtobuffer( int value )
{
int success = 0; while ( !success )
{
if ( buffer[ next(addhere) ] == 0 )
{
lock_spinlock(
&lock );
if ( buffer[ next(addhere) ] == 0 )
{
buffer[addhere] = value; addhere = next(addhere);
success = 1;
}
free_spinlock(
&lock );
}
}
}
int removefrombuffer()
{
int value;
int success = 0; while ( !success )
{
if ( buffer[removehere] != 0 )
{
lock_spinlock(
&lock );
if ( ( value = buffer[removehere] ) !=0 )
{
buffer[removehere] = 0; removehere =
next(removehere); success = 1;
}
free_spinlock( &lock );
}
}
return value;
}
Related Topics
Privacy Policy, Terms and Conditions, DMCA Policy and Compliant
Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.