Home | | Multi - Core Architectures and Programming | Clustering Technologies

Chapter: Multicore Application Programming For Windows, Linux, and Oracle Solaris : Other Parallelization Technologies

Clustering Technologies

Historically, one of the ways of gathering large numbers of processors together was to use a cluster of machines.

Clustering Technologies


Historically, one of the ways of gathering large numbers of processors together was to use a cluster of machines. An advantage of doing this was that it increased the available disk space, memory, and memory bandwidth as well as the number of CPUs. The capac-ity of a single multicore processor has increased such that work that previously required a small cluster will now fit onto a single system. It is therefore interesting to consider tech-nologies that might be used in this domain.




Message Passing Interface (MPI) is a parallelization model that allows a single application to span over multiple nodes. Each node is one or more software threads on a system, so it is quite permissible to host multiple nodes on the same system. Hence, a multicore sys-tem can host multiple MPI nodes.


Communication between the nodes is, as the name suggests, accomplished by passing messages between the nodes. The messages can be broadcast to all nodes or directed to a particular node. Since there is no addressable shared memory between the nodes, the communication costs depend on the size and frequency of the messages. Hence, the best scaling will be achieved if the application does minimal communication. A consequence of this is that MPI codes have to scale well by design. This means that they can often run well on a multicore or multiprocessor system so long as no other system resources, such as bandwidth or memory capacity, are exhausted.


Each node executes the same application. The first action the application needs to take is to initialize the MPI library with a call to MPI_Init(); the final action it needs to take is to shut down the library with a call to MPI_Finalize(). Each individual MPI process will need to know both the total number of processes in the application and its own rank in that group of processes. The call to MPI_Comm_size() returns the total number of processes, and the call to MPI_Comm_rank() provides each process with its identifier. Listing 10.21 shows the basic framework for a typical MPI program.


Listing 10.21 A Bare-Bones MPI Program

#include "mpi.h"


int main( int argc, char *argv[] )




int numproc, myid;


MPI_Init( &argc, &argv );


MPI_Comm_size( MPI_COMM_WORLD, &numproc );


MPI_Comm_rank( MPI_COMM_WORLD, &myid );




MPI_Finalize(); return 0;



MPI provides a very rich API for communicating between processes. The simplest level of this is the ability to pass an array of data to another MPI process using a call to MPI_Send() and for that process to receive the data using a call to MPI_Recv(). Other calls exist to provide more intrinsic functionality such as reductions.


Since the communication overhead is often quite large, it makes most sense to use MPI to solve problems that are too large for a single system. With multicore processors, the size of the problem that can be tackled has become less dependent on the number of threads and more dependent on considerations such as the memory footprint of the data set or the amount of bandwidth of the system. This highlights an important motivator for MPI programs—system characteristics such as amount of bandwidth or amount of available memory scale with the number of systems. For a problem that is limited by the time it takes to stream through a large amount of data, two systems will provide twice as much bandwidth, potentially solving the problem twice as fast. Using two threads on a single system does not necessarily provide the same doubling of bandwidth and certainly does not double the memory capacity of the system.


High communication costs also cause a change in the approach taken to using MPI to solve problems. As we have previously discussed, the best scalability will result from codes that have the fewest and smallest amounts of communication. Tuning algorithms to remove or reduce the need for communication will result in better performance. It may be more efficient for an algorithm to use approximations to estimate the return values from other processes rather than waiting for those processes to return the exact answers, even if this approximation causes the algorithm to iterate longer before terminating.


To illustrate the use of MPI, we’ll use it to produce a larger image of the Mandelbrot set. The algorithm that we are going to use is a single master process and a number of worker processes. The worker processes compute a line of the Mandelbrot set at a time and pass this to the master process. Listing 10.22 shows the code necessary to compute a line of the Mandelbrot set.


Listing 10.22 Computing a Row of the Mandelbrot Set


#include "mpi.h" #include <stdlib.h>

#define COLS 1000 #define ROWS 1000


int inset( double ix, double iy )




int iterations = 0;


double x = ix, y = iy, x2 = x*x, y2 = y*y;


while ( ( x2 + y2 < 4 ) && ( iterations < 1000 ) )




y = 2 * x * y + iy; x = x2 - y2 + ix; x2 = x * x;

y2 = y * y; iterations++;



return iterations;




void computerow( int index, int * data )




double y = -1.5 + (double)index / ROWS; for( int c=0; c<COLS; c++ )




double x = -1.5 + (double)c / COLS; data[c] = inset( x, y );






Listing 10.23 shows the code for the master process. The master process needs to set up an array large enough to store all the results from the worker processes. The master process computes the zeroth row of the output and then waits for all subsequent rows to be calculated by the other processes. The MPI_Recv() call takes parameters that indicate the expected type of data and an array where the data is to be stored. The call also takes parameters that identify the process where the data is to come from and takes a tag—in this case the line number—that identifies the data being sent. The final parameters are the set of nodes that should be included in the communication and a variable to hold the status.


Listing 10.23 The Master Thread Accumulates All the Results from the Worker Threads


int numproc, myid, rows, cols;



void masterprocess()




int ** data;


MPI_Status status;


data = malloc( sizeof(int*) * ROWS ); // Allocate memory to store data


data[0] = malloc( sizeof(int) * COLS );


computerow( 0, data[0] ); // Compute row zero


for( int currentrow = 1; currentrow < rows; currentrow++ )




data[currentrow] = malloc( sizeof(int) * cols ); int process = currentrow % ( numproc – 1 ) + 1;

MPI_Recv( data[currentrow], COLS, MPI_INT, process, currentrow,


MPI_COMM_WORLD, &status );





The worker processes run the code shown in Listing 10.24. Each process computes every Nth row of the matrix and then passes this row to the master process. The main routine is also shown in this listing. This routine needs to do the normal setup and tear-down of the MPI library and then decide whether the process is the master or one of the workers.


Listing 10.24 MPI Worker Thread and Setup Code


void workerprocess()




int * data, row;


data = malloc( sizeof(int) * COLS ); row = myid;


while ( row <= ROWS )




computerow( row, data );


MPI_Send( data, COLS, MPI_INT, 0, row, MPI_COMM_WORLD ); row += numproc-1;




free( data );




int main( int argc, char *argv[] )




MPI_Init( &argc, &argv );


MPI_Comm_size( MPI_COMM_WORLD, &numproc );


MPI_Comm_rank( MPI_COMM_WORLD, &myid );


if ( myid == 0 ) { masterprocess(); } else { workerprocess(); } MPI_Finalize();


return 0;



The program as written achieves little parallelism. This is for a couple of reasons. The first is that the send and receive operations are blocking. The sending processes cannot make progress until the receiving process receives the data. The receiving process com-pounds this by looking for the data in order.


The code could be modified to use a nonblocking send operation so that the worker processes could overlap communication with computation. Another improvement to the code would be for the master process to accept data from the workers in any order, rather than requiring the data to be received in order.


With the increase in the number of available cores, it has become appropriate to combine MPI for parallelization across nodes with OpenMP for parallelization within a node. This allows an application to use coarse-grained parallelism to scale to large num-bers of nodes and then a finer degree of parallelism to scale to large numbers of cores.


MapReduce as a Strategy for Scaling


MapReduce is an algorithm that is well suited to clusters because it requires little commu-nication and it is a relatively flexible approach so long as there are multiple parallel queries to be completed. Implementations are available in various programming languages


The MapReduce algorithm has two steps. The map step takes a problem and distrib-utes that problem across multiple nodes. The reduce step takes the results from those nodes and combines them to provide a final result.


MapReduce works best when there is an operation to be performed on multiple sets of input data. For example, consider computing the frequency of the use of all words over a range of documents. A single thread would compute this task by reading each document and then using a map to increment the value of a counter for each word encountered in the document.


The MapReduce version of the code would perform the following two steps. In the map phase, the master thread would assign each worker thread to perform the word count on a particular document. In the reduce step, each worker thread would return the list of word frequencies for its particular document, and the master thread would combine all these into a single set of results.


From the description, it should be obvious that the algorithm could be implemented using MPI. In fact, the only difference between this and the code to compute the Mandelbrot set is that the MPI master thread did not need to tell the worker threads what to compute since this information was provided in the source code.


What makes MapReduce different from an algorithm implemented in MPI? There are two major differences.


First, MPI is a general-purpose framework for the composition of parallel problems, whereas MapReduce is an approach for a specific kind of problem. So, it may be possible to implement MapReduce-type problems in MPI, but it is not generally possible to implement all MPI problems using a MapReduce framework.


The second difference is that MPI is designed to work on a known grid of computers. MPI relies on the fact that there is a known node performing the work and that this node will at some point return the results of the computation. MapReduce is built on the basis that the nodes performing the work may not be reliable. If a node does not provide the result within a suitable time period, the master node can restart the query on a different node.


There are two characteristics of the MapReduce work that enables this restart to hap-pen. The first is that the master node explicitly requests work from the worker nodes. So if the worker node fails, the master node can make the same request of another worker node. The second characteristic is that the nodes do not contain state. So if a worker node fails, the master node can restart from a known point and not have the problem that one of the other nodes is already using partial answers from the failed node.


MapReduce is usually implemented as a library, such as the Hadoop implementation. This library provides the functionality for distributing the work across multiple systems, restarting the work if one of the nodes fails, and so on. The advantage of providing this as a library is that improvements to the library code will be felt by all users of the library.




Possibly the simplest approach to utilizing multiple compute threads is to use a grid. A grid is a collection of nodes. A task can be dispatched to the grid, and at some point the results of that task will be returned to the user.


Conceptually, a grid is the task management aspect of MapReduce without the “map” and “reduce” algorithms for performing computation. A grid will typically provide much greater node management functionality than a MapReduce implementation. Node man-agement might extend to powering down idle nodes or allocating clusters of work to co-located nodes. So, the two approaches will be broadly complementary in nature.


Study Material, Lecturing Notes, Assignment, Reference, Wiki description explanation, brief detail
Multicore Application Programming For Windows, Linux, and Oracle Solaris : Other Parallelization Technologies : Clustering Technologies |

Privacy Policy, Terms and Conditions, DMCA Policy and Compliant

Copyright © 2018-2024 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.