Clustering
Technologies
Historically, one of the ways of gathering large numbers of processors
together was to use a cluster of machines. An advantage of doing this was that
it increased the available disk space, memory, and memory bandwidth as well as
the number of CPUs. The capac-ity of a single multicore processor has increased
such that work that previously required a small cluster will now fit onto a
single system. It is therefore interesting to consider tech-nologies that might
be used in this domain.
MPI
Message Passing Interface (MPI) is a parallelization model that allows a
single application to span over multiple nodes.
Each node is one or more software threads on a system, so it is quite
permissible to host multiple nodes on the same system. Hence, a multicore
sys-tem can host multiple MPI nodes.
Communication between the nodes is, as the name
suggests, accomplished by passing messages between the nodes. The messages can
be broadcast to all nodes or directed to a particular node. Since there is no
addressable shared memory between the nodes, the communication costs depend on
the size and frequency of the messages. Hence, the best scaling will be
achieved if the application does minimal communication. A consequence of this
is that MPI codes have to scale well by design. This means that they can often
run well on a multicore or multiprocessor system so long as no other system
resources, such as bandwidth or memory capacity, are exhausted.
Each node executes the same application. The first
action the application needs to take is to initialize the MPI library with a
call to MPI_Init(); the
final action it needs to take is to shut down the library with a call to MPI_Finalize(). Each individual MPI process will need to know both the total number of
processes in the application and its own rank in that group of processes. The
call to MPI_Comm_size() returns
the total number of processes, and the call to MPI_Comm_rank() provides each process with its identifier. Listing 10.21 shows the
basic framework for a typical MPI program.
Listing 10.21 A
Bare-Bones MPI Program
#include "mpi.h"
int main( int argc, char *argv[] )
{
int numproc, myid;
MPI_Init(
&argc, &argv );
MPI_Comm_size(
MPI_COMM_WORLD, &numproc );
MPI_Comm_rank(
MPI_COMM_WORLD, &myid );
...
MPI_Finalize();
return 0;
}
MPI provides a very rich API for communicating
between processes. The simplest level of this is the ability to pass an array
of data to another MPI process using a call to MPI_Send() and for that process to receive the data using a call to
MPI_Recv(). Other
calls exist to provide more intrinsic functionality
such as reductions.
Since the communication overhead is often quite
large, it makes most sense to use MPI to solve problems that are too large for
a single system. With multicore processors, the size of the problem that can be
tackled has become less dependent on the number of threads and more dependent
on considerations such as the memory footprint of the data set or the amount of
bandwidth of the system. This highlights an important motivator for MPI programs—system
characteristics such as amount of bandwidth or amount of available memory scale
with the number of systems. For a problem that is limited by the time it takes
to stream through a large amount of data, two systems will provide twice as
much bandwidth, potentially solving the problem twice as fast. Using two
threads on a single system does not necessarily provide the same doubling of
bandwidth and certainly does not double the memory capacity of the system.
High communication costs also cause a change in the
approach taken to using MPI to solve problems. As we have previously discussed,
the best scalability will result from codes that have the fewest and smallest
amounts of communication. Tuning algorithms to remove or reduce the need for
communication will result in better performance. It may be more efficient for
an algorithm to use approximations to estimate the return values from other
processes rather than waiting for those processes to return the exact answers,
even if this approximation causes the algorithm to iterate longer before
terminating.
To illustrate the use of MPI, we’ll use it to
produce a larger image of the Mandelbrot set. The algorithm that we are going
to use is a single master process and a number of worker processes. The worker
processes compute a line of the Mandelbrot set at a time and pass this to the
master process. Listing 10.22 shows the code necessary to compute a line of the
Mandelbrot set.
Listing 10.22 Computing
a Row of the Mandelbrot Set
#include "mpi.h" #include <stdlib.h>
#define COLS 1000 #define ROWS 1000
int inset( double ix, double iy )
{
int iterations = 0;
double x = ix, y = iy, x2 = x*x, y2 = y*y;
while ( ( x2 + y2 < 4 ) && ( iterations
< 1000 ) )
{
y = 2 * x * y + iy; x = x2 - y2 + ix; x2 = x * x;
y2 = y * y; iterations++;
}
return iterations;
}
void computerow( int index, int * data )
{
double y = -1.5 + (double)index / ROWS; for( int c=0; c<COLS; c++ )
{
double x = -1.5 + (double)c / COLS; data[c] = inset( x, y );
}
}
Listing 10.23 shows the code for the master
process. The master process needs to set up an array large enough to store all
the results from the worker processes. The master process computes the zeroth
row of the output and then waits for all subsequent rows to be calculated by the
other processes. The MPI_Recv() call
takes parameters that indicate the expected type of data and an array where the
data is to be stored. The call also takes parameters that identify the process
where the data is to come from and takes a tag—in this case the line
number—that identifies the data being sent. The final parameters are the set of
nodes that should be included in the communication and a variable to hold the
status.
Listing 10.23 The
Master Thread Accumulates All the Results from the Worker Threads
int numproc, myid, rows, cols;
void masterprocess()
{
int ** data;
MPI_Status status;
data = malloc( sizeof(int*) * ROWS ); // Allocate memory to store data
data[0] = malloc( sizeof(int) * COLS );
computerow( 0, data[0] ); //
Compute row zero
for( int currentrow = 1; currentrow < rows; currentrow++ )
{
data[currentrow] = malloc( sizeof(int) * cols );
int process = currentrow % ( numproc – 1 ) + 1;
MPI_Recv( data[currentrow], COLS, MPI_INT, process, currentrow,
MPI_COMM_WORLD, &status );
}
}
The worker processes run the code shown in Listing
10.24. Each process computes every Nth row of the matrix and then passes this
row to the master process. The main routine is also shown in this listing. This
routine needs to do the normal setup and tear-down of the MPI library and then
decide whether the process is the master or one of the workers.
Listing 10.24 MPI
Worker Thread and Setup Code
void
workerprocess()
{
int * data, row;
data = malloc( sizeof(int) * COLS ); row = myid;
while ( row <= ROWS )
{
computerow( row, data );
MPI_Send( data, COLS, MPI_INT, 0, row, MPI_COMM_WORLD ); row +=
numproc-1;
}
free( data );
}
int
main( int argc, char *argv[] )
{
MPI_Init( &argc, &argv );
MPI_Comm_size( MPI_COMM_WORLD, &numproc );
MPI_Comm_rank( MPI_COMM_WORLD, &myid );
if ( myid == 0 ) { masterprocess(); } else { workerprocess(); }
MPI_Finalize();
return 0;
}
The program as written achieves little parallelism.
This is for a couple of reasons. The first is that the send and receive operations
are blocking. The sending processes cannot make progress until the receiving
process receives the data. The receiving process com-pounds this by looking for
the data in order.
The code could be modified to use a nonblocking
send operation so that the worker processes could overlap communication with
computation. Another improvement to the code would be for the master process to
accept data from the workers in any order, rather than requiring the data to be
received in order.
With the increase in the number of available cores, it has become
appropriate to combine MPI for parallelization across nodes with OpenMP for
parallelization within a node. This allows an application to use coarse-grained
parallelism to scale to large num-bers of nodes and then a finer degree of
parallelism to scale to large numbers of cores.
MapReduce
as a Strategy for Scaling
MapReduce is an algorithm that is well suited to clusters because it requires
little commu-nication and it is a relatively flexible approach so long as there
are multiple parallel queries to be completed. Implementations are available in
various programming languages
The MapReduce algorithm has two steps. The map step
takes a problem and distrib-utes that problem across multiple nodes. The reduce
step takes the results from those nodes and combines them to provide a final
result.
MapReduce works best when there is an operation to
be performed on multiple sets of input data. For example, consider computing
the frequency of the use of all words over a range of documents. A single
thread would compute this task by reading each document and then using a map to
increment the value of a counter for each word encountered in the document.
The MapReduce version of the code would perform the
following two steps. In the map phase, the master thread would assign each
worker thread to perform the word count on a particular document. In the reduce
step, each worker thread would return the list of word frequencies for its
particular document, and the master thread would combine all these into a
single set of results.
From the description, it should be obvious that the
algorithm could be implemented using MPI. In fact, the only difference between
this and the code to compute the Mandelbrot set is that the MPI master thread
did not need to tell the worker threads what to compute since this information
was provided in the source code.
What makes MapReduce different from an algorithm
implemented in MPI? There are two major differences.
First, MPI is a general-purpose framework for the
composition of parallel problems, whereas MapReduce is an approach for a
specific kind of problem. So, it may be possible to implement MapReduce-type
problems in MPI, but it is not generally possible to implement all MPI problems
using a MapReduce framework.
The second difference is that MPI is designed to
work on a known grid of computers. MPI relies on the fact that there is a known
node performing the work and that this node will at some point return the
results of the computation. MapReduce is built on the basis that the nodes
performing the work may not be reliable. If a node does not provide the result
within a suitable time period, the master node can restart the query on a
different node.
There are two characteristics of the MapReduce work
that enables this restart to hap-pen. The first is that the master node
explicitly requests work from the worker nodes. So if the worker node fails,
the master node can make the same request of another worker node. The second
characteristic is that the nodes do not contain state. So if a worker node
fails, the master node can restart from a known point and not have the problem
that one of the other nodes is already using partial answers from the failed
node.
MapReduce is usually implemented as a library, such as the Hadoop
implementation. This library provides the functionality for distributing the
work across multiple systems, restarting the work if one of the nodes fails,
and so on. The advantage of providing this as a library is that improvements to
the library code will be felt by all users of the library.
Grids
Possibly the simplest approach to utilizing multiple compute threads is
to use a grid. A grid is a collection
of nodes. A task can be dispatched to the grid, and at some point the results of that task will be returned
to the user.
Conceptually, a grid is the task management aspect
of MapReduce without the “map” and “reduce” algorithms for performing
computation. A grid will typically provide much greater node management
functionality than a MapReduce implementation. Node man-agement might extend to
powering down idle nodes or allocating clusters of work to co-located nodes.
So, the two approaches will be broadly complementary in nature.
Related Topics
Privacy Policy, Terms and Conditions, DMCA Policy and Compliant
Copyright © 2018-2023 BrainKart.com; All Rights Reserved. Developed by Therithal info, Chennai.