Home | | Multi - Core Architectures and Programming | Producers and Consumers

Chapter: An Introduction to Parallel Programming : Shared-Memory Programming with OpenMP

Producers and Consumers

Producers and Consumers: 1. Queues 2. Message-passing 3. Sending messages 4. Receiving messages 5. Termination detection 6. Startup 7. The atomic directive 8. Critical sections and locks 9. Using locks in the message-passing program 10. critical directives, atomic directives or locks? 11. Some caveats

PRODUCERS AND CONSUMERS

Let’s take a look at a parallel problem that isn’t amenable to parallelization using a parallel for or for directive.

 

1. Queues

Recall that a queue is a list abstract datatype in which new elements are inserted at the “rear” of the queue and elements are removed from the “front” of the queue.

A queue can thus be viewed as an abstraction of a line of customers waiting to pay for their groceries in a supermarket. The elements of the list are the customers. New customers go to the end or “rear” of the line, and the next customer to check out is the customer standing at the “front” of the line.

 

When a new entry is added to the rear of a queue, we sometimes say that the entry has been “enqueued,” and when an entry is removed from the front of a queue, we sometimes say that the entry has been “dequeued.”

 

Queues occur frequently in computer science. For example, if we have a number of processes, each of which wants to store some data on a hard drive, then a natural way to insure that only one process writes to the disk at a time is to have the processes form a queue, that is, the first process that wants to write gets access to the drive first, the second process gets access to the drive next, and so on.

 

A queue is also a natural data structure to use in many multithreaded appli-cations. For example, suppose we have several “producer” threads and several “consumer” threads. The producer threads might “produce” requests for data from a server—for example, current stock prices—while the consumer threads might “consume” the request by finding or generating the requested data—the current stock prices. The producer threads could enqueue the requested prices, and the consumer threads could dequeue them. In this example, the process wouldn’t be completed until the consumer threads had given the requested data to the producer threads.

 

2. Message-passing

 

Another natural application would be implementing message-passing on a shared-memory system. Each thread could have a shared message queue, and when one thread wanted to “send a message” to another thread, it could enqueue the message in the destination thread’s queue. A thread could receive a message by dequeuing the message at the head of its message queue.

 

Let’s implement a relatively simple message-passing program in which each thread generates random integer “messages” and random destinations for the mes-sages. After creating the message, the thread enqueues the message in the appropriate message queue. After sending a message, a thread checks its queue to see if it has received a message. If it has, it dequeues the first message in its queue and prints it out. Each thread alternates between sending and trying to receive messages. We’ll let the user specify the number of messages each thread should send. When a thread is done sending messages, it receives messages until all the threads are done, at which point all the threads quit. Pseudocode for each thread might look something like this:

 

for (sent_msgs = 0; sent_msgs < send_max; sent_msgs++) { Send_msg();

 

Try_receive();

 

}

 

while (!Done())

 

Try_receive();

 

3. Sending messages

 

Note that accessing a message queue to enqueue a message is probably a critical section. Although we haven’t looked into the details of the implementation of the message queue, it seems likely that we’ll want to have a variable that keeps track of the rear of the queue. For example, if we use a singly linked list with the tail of the list corresponding to the rear of the queue, then, in order to efficiently enqueue, we would want to store a pointer to the rear. When we enqueue a new message, we’ll need to check and update the rear pointer. If two threads try to do this simultaneously, we may lose a message that has been enqueued by one of the threads. (It might help to draw a picture!) The results of the two operations will conflict, and hence enqueueing a message will form a critical section.

 

Pseudocode for the Send_msg() function might look something like this:

mesg = random();

 

dest = random() % thread count;

 

#        pragma omp critical

 

Enqueue(queue, dest, my_rank, mesg);

 

Note that this allows a thread to send a message to itself.

 

4. Receiving messages

 

The synchronization issues for receiving a message are a little different. Only the owner of the queue (that is, the destination thread) will dequeue from a given message queue. As long as we dequeue one message at a time, if there are at least two messages in the queue, a call to Dequeue can’t possibly conflict with any calls to Enqueue, so if we keep track of the size of the queue, we can avoid any synchronization (for example, critical directives), as long as there are at least two messages.

 

Now you may be thinking, “What about the variable storing the size of the queue?” This would be a problem if we simply store the size of the queue. However, if we store two variables, enqueued and dequeued, then the number of messages in the queue is

 

 

Queue_size = enqueued – dequeued

and the only thread that will update dequeued is the owner of the queue. Observe that one thread can update enqueued at the same time that another thread is using it to compute queue_size. To see this, let’s suppose thread q is computing queue_size. It will either get the old value of enqueued or the new value. It may therefore compute a queue_size of 0 or 1 when queue_size should actually be 1 or 2, respectively, but in our program this will only cause a modest delay. Thread q will try again later if queue_size is 0 when it should be 1, and it will execute the critical section directive unnecessarily if queue size is 1 when it should be 2.

 

Thus, we can implement Try_receive as follows:

queue_size = enqueued- dequeued;

if (queue_size == 0) return;

else if (queue_size == 1)

          # pragma omp critical

Dequeue(queue, &src, &mesg);

else

Dequeue(queue, &src, &mesg);

Print_message(src, mesg);

 

 

5. Termination detection

 

We also need to think about implementation of the Done function. First note that the following “obvious” implementation will have problems:

queue_size = enqueued-dequeued;

if (queue_size == 0)

return TRUE;

else

return FALSE;

If thread u executes this code, it’s entirely possible that some thread—call it thread v—will send a message to thread u after u has computed queue_size = 0. Of course, after thread u computes queue_size = 0, it will terminate and the message sent by thread v will never be received.

However, in our program, after each thread has completed the for loop, it won’t send any new messages. Thus, if we add a counter done sending, and each thread increments this after completing its for loop, then we can implement Done as follows:

queue size = enqueued - dequeued;

if (queue_size == 0 && done_sending == thread_count)

return TRUE;

else

 

return FALSE;

 

6. Startup

 

When the program begins execution, a single thread, the master thread, will get command-line arguments and allocate an array of message queues, one for each thread. This array needs to be shared among the threads, since any thread can send to any other thread, and hence any thread can enqueue a message in any of the queues. Given that a message queue will (at a minimum) store

.. a list of messages,

. a pointer or index to the rear of the queue,

. a pointer or index to the front of the queue,

. a count of messages enqueued, and a count of messages dequeued,

 

it makes sense to store the queue in a struct, and in order to reduce the amount of copying when passing arguments, it also makes sense to make the message queue an array of pointers to structs. Thus, once the array of queues is allocated by the master thread, we can start the threads using a parallel directive, and each thread can allocate storage for its individual queue.

 

An important point here is that one or more threads may finish allocating their queues before some other threads. If this happens, the threads that finish first could start trying to enqueue messages in a queue that hasn’t been allocated and cause the program to crash. We therefore need to make sure that none of the threads starts sending messages until all the queues are allocated. Recall that we’ve seen that sev-eral OpenMP directives provide implicit barriers when they’re completed, that is, no thread will proceed past the end of the block until all the threads in the team have completed the block. In this case, though, we’ll be in the middle of a parallel block, so we can’t rely on an implicit barrier from some other OpenMP construct—we need an explicit barrier. Fortunately, OpenMP provides one:

 

# pragma omp barrier

 

When a thread encounters the barrier, it blocks until all the threads in the team have reached the barrier. After all the threads have reached the barrier, all the threads in the team can proceed.

 

7. The atomic directive

 

After completing its sends, each thread increments done sending before proceeding to its final loop of receives. Clearly, incrementing done sending is a critical section, and we could protect it with a critical directive. However, OpenMP provides a potentially higher performance directive: the atomic directive:

# pragma omp atomic

 

Unlike the critical directive, it can only protect critical sections that consist of a single C assignment statement. Further, the statement must have one of the following forms:

 

x <op>= <expression>; x++;

 

++x;

 

x-- ;

 --x;

 

Here <op> can be one of the binary operators

 

+,       *, -, /, &, ˆ, | , <<, or >>.

 

It’s also important to remember that <expression> must not reference x.

 

It should be noted that only the load and store of x are guaranteed to be protected. For example, in the code

 

pragma omp atomic

x+= y++;

a thread’s update to x will be completed before any other thread can begin updat-ing x. However, the update to y may be unprotected and the results may be unpredictable.

 

The idea behind the atomic directive is that many processors provide a special load-modify-store instruction, and a critical section that only does a load-modify-store can be protected much more efficiently by using this special instruc-tion rather than the constructs that are used to protect more general critical sections.

 

8. Critical sections and locks

 

To finish our discussion of the message-passing program, we need to take a more careful look at OpenMP’s specification of the critical directive. In our earlier examples, our programs had at most one critical section, and the critical directive forced mutually exclusive access to the section by all the threads. In this program, however, the use of critical sections is more complex. If we simply look at the source code, we’ll see three blocks of code preceded by a critical or an atomic directive:

. done sending++;

. Enqueue(q_p, my_rank, mesg);

. Dequeue(q_p, &src, &mesg);

However, we don’t need to enforce exclusive access across all three of these blocks of code. We don’t even need to enforce completely exclusive access within the second and third blocks. For example, it would be fine for, say, thread 0 to enqueue a message in thread 1’s queue at the same time that thread 1 is enqueuing a message in thread 2’s queue. But for the second and third blocks—the blocks protected by critical directives—this is exactly what OpenMP does. From OpenMP’s point of view our program has two distinct critical sections: the critical section protected by the atomic directive, done sending++, and the “composite” critical section in which we enqueue and dequeue messages.

Since enforcing mutual exclusion among threads serializes execution, this default behavior of OpenMP—treating all critical blocks as part of one composite critical section—can be highly detrimental to our program’s performance. OpenMP does provide the option of adding a name to a critical directive:

 

# pragma omp critical(name)

 

When we do this, two blocks protected with critical directives with different names can be executed simultaneously. However, the names are set during compi-lation, and we want a different critical section for each thread’s queue. Therefore, we need to set the names at run-time, and in our setting, when we want to allow simul-taneous access to the same block of code by threads accessing different queues, the named critical directive isn’t sufficient.

The alternative is to use locks.4 A lock consists of a data structure and functions that allow the programmer to explicitly enforce mutual exclusion in a critical section. The use of a lock can be roughly described by the following pseudocode:

 

/*       Executed by one thread */

 

Initialize the lock data structure;

 

. . .

 

/*       Executed by multiple threads  */

 

Attempt to lock or set the lock data structure;

Critical section;

 

Unlock or unset the lock data structure;

 

. . .

 

/*       Executed by one thread */

 

Destroy the lock data structure;

 

The lock data structure is shared among the threads that will execute the critical section. One of the threads (e.g., the master thread) will initialize the lock, and when all the threads are done using the lock, one of the threads should destroy it.

 

Before a thread enters the critical section, it attempts to set or lock the lock data structure by calling the lock function. If no other thread is executing code in the critical section, it obtains the lock and proceeds into the critical section past the call to the lock function. When the thread finishes the code in the critical section, it calls an unlock function, which relinquishes or unsets the lock and allows another thread to obtain the lock.

 

While a thread owns the lock, no other thread can enter the critical section. If another thread attempts to enter the critical section, it will block when it calls the lock function. If multiple threads are blocked in a call to the lock function, then when the thread in the critical section relinquishes the lock, one of the blocked threads returns from the call to the lock, and the others remain blocked.

 

OpenMP has two types of locks: simple locks and nested locks. A simple lock can only be set once before it is unset, while a nested lock can be set multiple times by the same thread before it is unset. The type of an OpenMP simple lock is omp lock t, and the simple lock functions that we’ll be using are

void omp_init_lock(omp_lock_t*     lock p         /*  out  */);

void omp_set_lock(omp_lock_t*     lock_p        /*  in/out  */);

void omp_unset_lock(omp_lock_t* lock_p        /*       in/out */);

void omp_destroy_lock(omp_lock_t*        lock_p        /*       in/out */);

The type and the functions are specified in omp.h. The first function initializes the lock so that it’s unlocked, that is, no thread owns the lock. The second function attempts to set the lock. If it succeeds, the calling thread proceeds; if it fails, the calling thread blocks until the lock becomes available. The third function unsets the lock so another thread can obtain it. The fourth function makes the lock uninitial-ized. We’ll only use simple locks. For information about nested locks, see [8, 10], or [42].

 

9. Using locks in the message-passing program

 

In our earlier discussion of the limitations of the critical directive, we saw that in the message-passing program, we wanted to insure mutual exclusion in each individ-ual message queue, not in a particular block of source code. Locks allow us to do this. If we include a data member with type omp lock t in our queue struct, we can simply call omp set lock each time we want to insure exclusive access to a message queue. So the code

 

          pragma omp critical

 

/*  q p = msg_queues[dest]  */

 

Enqueue(q p, my_rank, mesg);

 

can be replaced with

 

/         q p = msg_queues[dest] /

 

omp_set_lock(&q p >lock);

Enqueue(q_p, my_rank, mesg);

omp_unset_lock(&q_p >lock);

 

Similarly, the code

 

#        pragma omp critical

 

/*  q_p = msg_queues[my_rank]  */

 

Dequeue(q_p, &src, &mesg);

 

can be replaced with

 

/*       q_p = msg_queues[my_rank]  */

 

omp_set_lock(&q_p->lock);

Dequeue(q_p, &src, &mesg);

 omp_unset_lock(&q_p ->lock);

 

Now when a thread tries to send or receive a message, it can only be blocked by a thread attempting to access the same message queue, since different message queues have different locks. In our original implementation, only one thread could send at a time, regardless of the destination.

 

Note that it would also be possible to put the calls to the lock functions in the queue functions Enqueue and Dequeue. However, in order to preserve the perfor-mance of Dequeue, we would also need to move the code that determines the size of the queue (enqueueddequeued) to Dequeue. Without it, the Dequeue function will lock the queue every time it is called by Try receive. In the interest of preserving the structure of the code we’ve already written, we’ll leave the calls to omp_set_lock and omp_unset_lock in the Send and Try_receive functions.

 

Since we’re now including the lock associated with a queue in the queue struct, we can add initialization of the lock to the function that initializes an empty queue.

Destruction of the lock can be done by the thread that owns the queue before it frees the queue.

 

10. critical directives, atomic directives, or locks?

 

Now that we have three mechanisms for enforcing mutual exclusion in a critical section, it’s natural to wonder when one method is preferable to another. In gen-eral, the atomic directive has the potential to be the fastest method of obtaining mutual exclusion. Thus, if your critical section consists of an assignment statement having the required form, it will probably perform at least as well with the atomic directive as the other methods. However, the OpenMP specification [42] allows the atomic directive to enforce mutual exclusion across all atomic directives in the program—this is the way the unnamed critical directive behaves. If this might be a problem—for example, you have multiple different critical sections protected by atomic directives—you should use named critical directives or locks. For exam-ple, suppose we have a program in which it’s possible that one thread will execute the code on the left while another executes the code on the right.

#        pragma omp atomic      

#        pragma omp atomic

x++;  y++;

Even if x and y are unrelated memory locations, it’s possible that if one thread is executing x++, then no thread can simultaneously execute y++. It’s important to note that the standard doesn’t require this behavior. If two statements are protected by atomic directives and the two statements modify different variables, then there are implementations that treat the two statements as different critical sections. See Exercise 5.10. On the other hand, different statements that modify the same vari-able will be treated as if they belong to the same critical section, regardless of the implementation.

 

We’ve already seen some limitations to the use of critical directives. However, both named and unnamed critical directives are very easy to use. Furthermore, in the implementations of OpenMP that we’ve used there doesn’t seem to be a very large difference between the performance of critical sections protected by a critical directive, and critical sections protected by locks, so if you can’t use an atomic directive, but you can use a critical directive, you probably should. Thus, the use of locks should probably be reserved for situations in which mutual exclusion is needed for a data structure rather than a block of code.

 

11. Some caveats

 

You should exercise caution when you’re using the mutual exclusion techniques we’ve discussed. They can definitely cause serious programming problems. Here are a few things to be aware of:

 

          1. You shouldn’t mix the different types of mutual exclusion for a single critical section. For example, suppose a program contains the following two segments.

           #    pragma omp atomic    

x += f(y);

 

#          pragma omp critical

x = g(x);

The update to x on the right doesn’t have the form required by the atomic direc-tive, so the programmer used a critical directive. However, the critical directive won’t exclude the action executed by the atomic block, and it’s pos-sible that the results will be incorrect. The programmer needs to either rewrite the function g so that its use can have the form required by the atomic directive, or she needs to protect both blocks with a critical directive.

 

2. There is no guarantee of fairness in mutual exclusion constructs. This means that it’s possible that a thread can be blocked forever in waiting for access to a critical section. For example, in the code

while(1) {

            . .

            pragma omp critical x = g(my_rank);

            . .

}

it’s possible that, for example, thread 1 can block forever waiting to execute x = g(my_rank), while the other threads repeatedly execute the assignment. Of course, this wouldn’t be an issue if the loop terminated. Also note that many implementations give threads access to the critical section in the order in which they reach it, and for these implementations, this won’t be an issue.

It can be dangerous to “nest” mutual exclusion constructs. As an example, suppose a program contains the following two segments.

            pragma omp critical y = f(x);

. . .

 

double f(double x) {

 

            pragma omp critical

z = g(x);  /  z is shared  /

 . .

}

This is guaranteed to deadlock. When a thread attempts to enter the second crit-ical section, it will block forever. If thread u is executing code in the first critical block, no thread can execute code in the second block. In particular, thread u can’t execute this code. However, if thread u is blocked waiting to enter the second critical block, then it will never leave the first, and it will stay blocked forever.

 

In this example, we can solve the problem by using named critical sections. That is, we could rewrite the code as

            pragma omp critical(one) y = f(x);

. . .

 

double f(double x) {

 

#          pragma omp critical(two)

z = g(x);  /*  z is global  */

 

. . .

 

 

}

However, it’s not difficult to come up with examples when naming won’t help. For example, if a program has two named critical sections—say one and two— and threads can attempt to enter the critical sections in different orders, then deadlock can occur. For example, suppose thread u enters one at the same time that thread v enters two and u then attempts to enter two while v attempts to enter one:


Then both u and v will block forever waiting to enter the critical sections. So it’s not enough to just use different names for the critical sections—the program-mer must insure that different critical sections are always entered in the same order.


Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail
An Introduction to Parallel Programming : Shared-Memory Programming with OpenMP : Producers and Consumers |


Privacy Policy, Terms and Conditions, DMCA Policy and Compliant

Copyright © 2018-2024 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.