Producer-Consumer
with a Circular Buffer
Consider the case where there is a producer-consumer pair of threads
that communicate through a circular buffer. It is possible to write code that
does not require locks or atomic operations to handle this situation. The code
shown in Listing 8.20 defines two functions: one to add an item to a circular
buffer and one to remove an item.
Listing 8.20 Adding
and Removing Elements from a Circular List
#include <stdio.h> #include <pthread.h>
#include <stdlib.h>
volatile int volatile buffer[16]; volatile int
addhere;
volatile
int removehere;
void
clearbuffer()
{
addhere = 0; removehere = 0;
for( int i=0; i<16; i++ ) { buffer[i] = 0; }
}
int
next( int current )
{
return ( current+1 ) & 15;
}
void addtobuffer( int value )
{
while(
next(addhere) == removehere ) {} // Spin
waiting for room if (
buffer[addhere] != 0 )
{
printf( "Circular buffer error\n" ); exit(1); }
buffer[addhere] = value; // Add
item to buffer
addhere = next(addhere); // Move
to next entry
}
int
removefrombuffer()
{
int
value;
while(
( value = buffer[removehere] ) == 0 ){} // Spin until
// something in buffer
buffer[removehere]
= 0; // Zero out element
removehere
= next(removehere); // Move to
next entry
return
value;
}
The code works without memory barriers because one
thread is responsible for adding elements to the array and the other thread is
responsible for removing elements for the array.
There are actually two implicit constraints that
ensure that the code works. One con-straint is that stores and loads are
themselves atomic. The other constraint is that stores do not become reordered.
Hardware ensures that a correctly aligned load
cannot get half of its data from the old value at the address and half from the
new value stored at that address. If this were to happen, there would be a
problem when returning the value stored in the array. Imagine that either the
store or the load was nonatomic. If an element was transitioning from holding
the value zero to holding a nonzero value, then a nonatomic load might get the
old upper half of the value (which would be zero) and the new lower half of the
value (which would be nonzero). The resulting value would be incorrect.
The requirement for ordering stores comes from the
code that removes elements from the array. The location containing the element
to be removed is zeroed out, then the pointer to the end element to be removed
is advanced to the next location in the array. If these actions became visible
to the producer thread in the wrong order, the pro-ducer thread would see that
the end pointer had been advanced. This would allow it to enter the code that
adds a new element. However, the first test this code performs is to check that
the new location is really empty. If the store of zero to the released array position
was delayed, this location would still contain the old value, and the code
would exit with an error. The check is “logically” unnecessary but validates
that the code is behaving correctly.
Several characteristics of the algorithm enable it
to work correctly. There are two point-ers that point to locations in the
array. Only one thread is responsible for updating each of these variables. The
other thread only reads the variable. As long as the reading thread sees the
updates to memory in the correct order and each variable is updated atomically,
the thread will continue to wait until the data is ready and only then read
that data.
A subtle characteristic of the code is that the updates of the pointers
into the array are carried out by a function call. The function call acts as a
compiler memory barrier and forces the compiler to store variables back to
memory. Otherwise, there would be the risk that the compiler might reorder the
store operations.
The handling of the array that forms the basis for
communication between the two threads is also of concern. There is only one
thread responsible for reading from the array and one thread responsible for
writing to it. In fact, the code could be simplified so that the act of reading
and writing the array was the synchronization mechanism. Listing 8.21 shows
this modification. In the modified code, the application adds an entry into the
buffer only if there is a space. The two pointers are entirely independent.
Listing 8.21 Using
Reads and Writes to Coordinate Thread Activity
#include <stdio.h> #include <pthread.h>
#include <stdlib.h>
volatile int volatile buffer[16]; volatile int
addhere;
volatile
int removehere;
void
clearbuffer()
{
addhere = 0; removehere = 0;
for ( int i=0; i<16;i++ ) { buffer[i] = 0; }
}
int
next( int current )
{
return ( current + 1 ) & 15;
}
void
addtobuffer( int value )
{
while(
buffer[ next(addhere) ] != 0 ) {}
buffer[addhere] = value; addhere = next(addhere);
}
int
removefrombuffer()
{
int value;
while( (
value = buffer[removehere] ) == 0 ) {}
buffer[removehere] = 0;
removehere = next(removehere);
return value;
}
The code in Listing 8.22 provides the remainder of the test program. It
creates two threads and sets one up as the producer and the other as the
consumer. Both threads run until 10 million elements have been passed from the
producer to the consumer.
Listing 8.22 Code
to Set Up Producer and Consumer Thread
void * producer( void *param )
{
for( int i=1; i<10000000; i++ )
{
addtobuffer( i );
}
}
void * consumer ( void *param )
{
while ( removefrombuffer() != 9999999 ) {}
}
int main()
{
clearbuffer(); pthread_t threads[2];
pthread_create( &threads[0], 0, producer, 0 );
pthread_create( &threads[1], 0, consumer, 0 ); pthread_join( threads[1], 0
);
pthread_join( threads[0], 0 );
}
Related Topics
Privacy Policy, Terms and Conditions, DMCA Policy and Compliant
Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.