Home | | Multi - Core Architectures and Programming | Scaling the Producer-Consumer to Multiple Threads

Chapter: Multicore Application Programming For Windows, Linux, and Oracle Solaris : Hand-Coded Synchronization and Sharing

Scaling the Producer-Consumer to Multiple Threads

The simplest way to share the circular buffer between threads would be to use some kind of mutual exclusion that would ensure that only a single thread at a time could modify the structure.

Scaling the Producer-Consumer to Multiple Threads

 

The simplest way to share the circular buffer between threads would be to use some kind of mutual exclusion that would ensure that only a single thread at a time could modify the structure. We previously encountered spinlocks in Listing 8.7, and these would be appropriate to ensure that only a single thread manipulates the circular list at a single time. Listing 8.23 shows the code modified so that access to the circular buffer is protected with a spinlock.

 

Listing 8.23   Using Mutual Exclusion to Ensure Exclusive Access to the Circular Buffer

#include <stdio.h> #include <pthread.h>

 

volatile int volatile buffer[16]; volatile int addhere;

volatile int removehere;

 

volatile int lock = 0;

 

void lock_spinlock( volatile int* lock )

 

{

 

while ( CAS( lock, 0, 1 ) != 0 ) {} acquire_memory_barrier();

}

 

void free_spinlock( volatile int *lock )

 

{

 

release_memory_barrier(); *lock = 0;

}

 

void clearbuffer()

 

{

 

addhere = 0; removehere = 0;

for( int i=0; i<16; i++ ) { buffer[i] = 0; }

 

}

int next( int current )

 

{

 

return ( current + 1 ) & 15;

 

}

 

void addtobuffer( int value )

 

{

 

lock_spinlock( &lock );

 

while( buffer[ next(addhere) ] != 0 ) {} buffer[addhere] = value;

 

addhere = next(addhere); free_spinlock( &lock );

}

 

int removefrombuffer()

 

{

 

int value; lock_spinlock( &lock );

 

while( ( value = buffer[removehere] ) == 0 ){} buffer[removehere] = 0;

 

removehere = next(removehere); free_spinlock( &lock ); return value;

}

 

 

There are several points to make about the code as it stands. The first is that the cir-cular buffer has 16 entries, and although this might be adequate for a single producer-consumer pair, it is unlikely to remain so as the number of threads increases.

 

The second, more important, point is that the code now contains a deadlock. Imagine that the circular buffer is empty and a consumer thread acquires the lock and starts wait-ing for something to appear in the buffer. A producer thread eventually arrives but now has to acquire the lock before it can add anything into the buffer. Both threads end up spinning, waiting for an event that can never happen.

 

There are two solutions to this particular instance of the problem. The first solution is perhaps the most trivial, and that is to provide two locks—one for threads waiting to add elements to the buffer and the second for threads waiting to remove an item from the list. This solution works in this situation because it reduces the problem down to the one described earlier where there are only two threads present in the system. One lock ensures that only one producer thread can access the circular buffer at a time. The other lock ensures that only one consumer thread can access the circular buffer at a time. The other producer and consumer threads cannot interfere and cause correctness problems.

 

This solution relies on the original semantics of the code to provide a thread-safe ver-sion in the presence of two threads. For illustrative purposes, consider how we could modify the code so that only a single lock was necessary.

 

One way of doing this is to place the critical section inside another loop, which repeats the loop until the critical section is successfully executed. This requires modification to the addtobuffer() and removefrombuffer() routines so that they no longer loop inside the critical section and instead quickly return success or failure. Listing 8.24 shows code modified to remove the loops inside the critical section.

 

Listing 8.24   Using Mutual Exclusion to Ensure Exclusive Access to the Circular Buffer

void addtobuffer( int value )

 

{

 

int success = 0;

 

while( !success )

 

{

 

lock_spinlock( &lock );

 

if( buffer[ next(addhere) ] == 0 )

 

{

 

buffer[addhere] = value; addhere = next(addhere);

success = 1;

 

}

 

free_spinlock( &lock );

 

}

 

}

 

int removefrombuffer()

 

{

 

int value;

 

int success = 0;

 

while ( !success )

 

{

 

lock_spinlock( &lock );

 

if ( ( value = buffer[removehere] ) != 0 )

 

{

 

buffer[removehere] = 0; removehere = next(removehere);

success = 1;

 

}

 

free_spinlock( &lock );

 

}

 

return value;

 

}

 

 

 

The code uses a variable success to determine whether the critical region was suc-cessful. Although this change results in the desired behavior for the code, it is not the best code to run on the system. The problem with the code is that while threads are unable to add or remove items from the queue, the spinlock is constantly being acquired and released. This results in significant traffic between the cores invalidating and fetching the cache line containing the lock variable. Both the acquisition and release of the variable lock result in a store operation, which causes all the other caches to invalidate the cache line containing the lock. When the next thread acquires the spinlock, it has to fetch it from the cache of the processor that last updated the lock. This activity causes the cache line containing the variable lock to be constantly being passed between the caches of different virtual CPUs and may have an impact on the performance of the system.

 

For this code, a thread can easily determine whether it is likely to be successful in accessing the buffer. This test for success is to load the next element in the buffer array and see whether it is zero. The advantage of using load instructions is that the cache line fetched by a load remains resident in cache until it is invalidated by a store operation. In practical terms, each thread will spin on the appropriate variable waiting for it to be modified. This causes no invalidation of the values held in other caches until the variable is actually updated. Consequently, there is little risk of there being a performance impact from this scheme.

 

When the next element in the buffer array becomes zero, it indicates that the thread may be successful if it enters the critical region. Only at that point will the thread attempt to enter the critical region, and only at that point will there be any invalidation of data in cache lines. Listing 8.25 shows a modified version of the source code.

 

Listing 8.25  Avoiding Unnecessary Invalidations of Cache Lines

void addtobuffer( int value )

 

{

 

int success = 0; while ( !success )

{

 

if ( buffer[ next(addhere) ] == 0 ) // Wait for an empty space

 

{

 

lock_spinlock( &lock );

 

if( buffer[ next(addhere) ] == 0 )

 

{

 

buffer[addhere] = value; addhere = next(addhere); success = 1;

}

 

free_spinlock( &lock );

 

}

 

}

 

}

 

int removefrombuffer()

 

{

 

int value;

 

int success = 0; while ( !success )

{

 

if ( buffer[removehere] != 0 ) // Wait for an item to be added

 

{

 

lock_spinlock( &lock );

 

if ( ( value = buffer[removehere] ) != 0 )

 

{

 

buffer[removehere] = 0; removehere = next(removehere); success = 1;

}

 

free_spinlock( &lock );

 

}

 

}

 

return value;

 

}

The modified code contains the same mechanism that keeps the thread spinning until it is successful. Within that loop, the thread tests to see whether it might be successful before actually obtaining the lock. Although this reduces the number of invalidations of the shared data and reduces the memory traffic, it is still not an optimal solution.

 

The problem with this code is that every time a new item is added to the circular buffer or every time a space becomes free, all the waiting threads recognize this and attempt to acquire the lock, even though only one thread can actually succeed. This is an example of the thundering herd problem where a number of threads are waiting for a con-dition to become true and only one thread can successfully proceed, so all the other threads end up using unnecessary resources. This problem can be resolved by ordering the list of threads such that only one thread is allowed to proceed.

 

However, the problem is worse than this. All the threads that identify the opportunity to access the circular buffer will enter the if statement and can exit only after they have acquired and released the spinlock. So, these threads will end up spinning on the spin-lock, which was not the intention of the code.

 

To remove this problem, we should change the code so that instead of spinning on the spinlock, the threads try to acquire it, and if they do not, then they should return to the outer loop and wait for the next opportunity to access the circular buffer. Listing 8.26 shows the modified code. It introduces a function called try_spinlock() that will either acquire the spinlock and return true or fail to acquire the lock and return false. With this modification, the threads spin on the variable, indicating the state of the circular buffer. This variable is shared so it does not produce much memory traffic. If the state changes, the threads attempt to get the spinlock. Only one thread will succeed, and this thread gets to access the buffer while the other threads go back to spinning on the shared variable. With this change, the spinlock has ceased to be used as a spinlock since the threads spin before attempting to acquire the lock.

 

Listing 8.26  Avoiding Unnecessary Invalidations of Cache Lines Using try_spinlock()

int try_spinlock( volatile int* lock )

 

{

 

if ( CAS( lock, 0, 1 ) == 1 ) { return 0; } else

 

{

 

acquire_memory_barrier(); return 1;

}

 

}

 

void addtobuffer( int value )

 

{

 

int success = 0; while ( !success )

{

 

if ( buffer[ next(addhere) ] == 0 )

 

{

 

if (try_spinlock( &lock ) )

 

{

 

if ( buffer[ next(addhere) ] == 0 )

 

{

 

buffer[addhere] = value; addhere = next(addhere); success = 1;

}

 

free_spinlock( &lock );

 

}

 

}

 

}

 

}

 

int removefrombuffer()

 

{

 

int value;

 

int success = 0; while ( !success )

{

 

if ( buffer[removehere] != 0 )

 

{

 

if ( try_spinlock( &lock ) )

 

{

 

if ( ( value = buffer[removehere] ) !=0 )

 

{

 

buffer[removehere] = 0; removehere = next(removehere);

success = 1;

 

}

 

free_spinlock( &lock );

 

}

 

}

 

}

 

return value;

 

}

 

A further improvement to the code would be to rewrite the spinlock so that it has these semantics. Listing 8.27 shows this improvement to the spinlock code. This code spins until the lock has been acquired by the calling thread. Every iteration, the loop tests whether the lock is available. If the lock is available, the code attempts to acquire the lock atomically. If successful, the code exits the loop having acquired the spinlock. If unsuccessful, the loop continues to spin.

 

Listing 8.27   Reducing Number of CAS Operations When Spinning

void lock_spinlock( volatile int* lock )

 

{

 

int acquired = 0; while ( !acquired )

{

 

if ( ( *lock == 0 ) && ( CAS( lock, 0, 1 ) == 0) )

 

{

 

acquired = 1;

 

}

 

}

 

acquire_memory_barrier();

 

}

 

void addtobuffer( int value )

 

{

 

int success = 0; while ( !success )

{

 

if ( buffer[ next(addhere) ] == 0 )

 

{

 

lock_spinlock( &lock );

 

if ( buffer[ next(addhere) ] == 0 )

 

{

 

buffer[addhere] = value; addhere = next(addhere); success = 1;

}

 

free_spinlock( &lock );

}

 

}

 

}

 

int removefrombuffer()

 

{

 

int value;

 

int success = 0; while ( !success )

{

 

if ( buffer[removehere] != 0 )

 

{

 

lock_spinlock( &lock );

 

if ( ( value = buffer[removehere] ) !=0 )

 

{

 

buffer[removehere] = 0; removehere = next(removehere); success = 1;

}

 

free_spinlock( &lock );

 

}

 

}

 

return value;

 

}


Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail
Multicore Application Programming For Windows, Linux, and Oracle Solaris : Hand-Coded Synchronization and Sharing : Scaling the Producer-Consumer to Multiple Threads |


Privacy Policy, Terms and Conditions, DMCA Policy and Compliant

Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.