Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
544 views
in Technique[技术] by (71.8m points)

c - Scatter Matrix Blocks of Different Sizes using MPI

(Suppose all the matrices are stored in row-major order.) An example that illustrate the problem is to distribute a 10x10 matrix over a 3x3 grid, so that the size of the sub-matrices in each node looks like

|-----+-----+-----|
| 3x3 | 3x3 | 3x4 |
|-----+-----+-----|
| 3x3 | 3x3 | 3x4 |
|-----+-----+-----|
| 4x3 | 4x3 | 4x4 |
|-----+-----+-----|

I've seen many posts on Stackoverflow (such as sending blocks of 2D array in C using MPI and MPI partition matrix into blocks). But they only deal with blocks of same size (in which case we can simply use MPI_Type_vector or MPI_Type_create_subarray and only one MPI_Scatterv call).

So, I'm wondering what is the most efficient way in MPI to scatter a matrix to a grid of processors where each processor has a block with a specified size.

P.S. I've also looked at MPI_Type_create_darray, but it seems not letting you specify block size for each processor.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You have to go through at least one extra step in MPI to do this.

The problem is that the most general of the gather/scatter routines, MPI_Scatterv and MPI_Gatherv, allow you to pass a "vector" (v) of counts/displacements, rather than just one count for Scatter and Gather, but the types are all assumed to be the same. Here, there's no way around it; the memory layouts of each block are different, and so have to be treated by a different type. If there were only one difference between the blocks – some had different numbers of columns, or some had different number of rows – then just using different counts would suffice. But with different columns and rows, counts won't do it; you really need to be able to specify different types.

So what you really want is an often-discussed but never implemented MPI_Scatterw (where w means vv; e.g., both counts and types are vectors) routine. But such a thing doesn't exist. The closest you can get is the much more general MPI_Alltoallw call, which allows completely general all-to-all sending and receiving of data; as the spec states, "The MPI_ALLTOALLW function generalizes several MPI functions by carefully selecting the input arguments. For example, by making all but one process have sendcounts(i) = 0, this achieves an MPI_SCATTERW function.".

So you can do this with MPI_Alltoallw by having all processes other than the one that originally has all the data ( we'll assume that it's rank 0 here) sent all their send counts to zero. All tasks will also have all their receive counts to zero except for the first - the amount of data they'll get from rank zero.

For process 0's send counts, we'll first have to define four different kinds of types (the 4 different sizes of subarrays), and then the send counts will all be 1, and the only part that remains is figuring out the send displacements (which, unlike scatterv, is here in units of bytes, because there's no single type one could use as a unit):

        /* 4 types of blocks - 
         * blocksize*blocksize, blocksize+1*blocksize, blocksize*blocksize+1, blocksize+1*blocksize+1
         */

        MPI_Datatype blocktypes[4];
        int subsizes[2];
        int starts[2] = {0,0};
        for (int i=0; i<2; i++) {
           subsizes[0] = blocksize+i;
           for (int j=0; j<2; j++) {
               subsizes[1] = blocksize+j;
               MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]);
               MPI_Type_commit(&blocktypes[2*i+j]);
           }
        }

        /* now figure out the displacement and type of each processor's data */
        for (int proc=0; proc<size; proc++) {
            int row, col;
            rowcol(proc, blocks, &row, &col);

            sendcounts[proc] = 1;
            senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char);

            int idx = typeIdx(row, col, blocks);
            sendtypes[proc] = blocktypes[idx];
        }
    }

    MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes,
                  &(localdata[0][0]), recvcounts, recvdispls, recvtypes, 
                  MPI_COMM_WORLD);

And this will work.

But the problem is that the Alltoallw function is so completely general, that it's difficult for implementations to do much in the line of optimization; so I'd be surprised if this performed as well as a scatter of equally-sized blocks.

So another approach is to do something like a two phases of communication.

The simplest such approach follows after noting that you can almost get all the data where it needs to go with a single MPI_Scatterv() call: in your example, if we operate in units of a single column vector with column=1 and rows=3 (the number of rows in most of the blocks of the domain), you can scatter almost all of the global data to the other processors. The processors each get 3 or 4 of these vectors, which distributes all of the data except the very last row of the global array, which can be handled by a simple second scatterv. That looks like this;

/* We're going to be operating mostly in units of a single column of a "normal" sized block.
 * There will need to be two vectors describing these columns; one in the context of the
 * global array, and one in the local results.
 */
MPI_Datatype vec, localvec;
MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);

MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);

/* The originating process needs to allocate and fill the source array,
 * and then define types defining the array chunks to send, and
 * fill out senddispls, sendcounts (1) and sendtypes.
 */
if (rank == 0) {
    /* create the vector type which will send one column of a "normal" sized-block */
    /* then all processors except those in the last row need to get blocksize*vec or (blocksize+1)*vec */
    /* will still have to do something to tidy up the last row of values */
    /* we need to make the type have extent of 1 char for scattering */
    for (int proc=0; proc<size; proc++) {
        int row, col;
        rowcol(proc, blocks, &row, &col);

        sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
        senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize);
    }
}

recvcounts = localsizes[1];
MPI_Scatterv(globalptr, sendcounts, senddispls, vec,
              &(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD);

MPI_Type_free(&localvec);
if (rank == 0)
    MPI_Type_free(&vec);

/* now we need to do one more scatter, scattering just the last row of data
 * just to the processors on the last row.
 * Here we recompute the send counts
 */
if (rank == 0) {
    for (int proc=0; proc<size; proc++) {
        int row, col;
        rowcol(proc, blocks, &row, &col);
        sendcounts[proc] = 0;
        senddispls[proc] = 0;

        if ( isLastRow(row,blocks) ) {
            sendcounts[proc] = blocksize;
            senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize;
            if ( isLastCol(col,blocks) )
                sendcounts[proc] += 1;
        }
    }
}

recvcounts = 0;
if ( isLastRow(myrow, blocks) ) {
    recvcounts = blocksize;
    if ( isLastCol(mycol, blocks) )
        recvcounts++;
}
MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
              &(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD);

So far so good. But it's a shame to have most of the processors sitting around doing nothing during that final, "cleanup" scatterv.

So a nicer approach is to scatter all the rows in a first phase, and scatter that data amongst the columns in a second phase. Here we create new communicators, with each processor belonging to two new communicators - one representing other processors in the same block row, and the other in the same block column. In the first step, the origin processor distributes all the rows of the global array to the other processors in the same column communicator - which can be done in a single scatterv. Then those processors, using a single scatterv and the same columns data type as in the previous example, scatter the columns to each processor in the same block row as it. The result is two fairly simple scatterv's distributing all of the data:

/* create communicators which have processors with the same row or column in them*/
MPI_Comm colComm, rowComm;
MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm);
MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm);

/* first, scatter the array by rows, with the processor in column 0 corresponding to each row
 * receiving the data */
if (mycol == 0) {
    int sendcounts[ blocks[0] ];
    int senddispls[ blocks[0] ];
    senddispls[0] = 0;

    for (int row=0; row<blocks[0]; row++) {
        /* each processor gets blocksize rows, each of size globalsizes[1]... */
        sendcounts[row] = blocksize*globalsizes[1];
        if (row > 0)
            senddispls[row] = senddispls[row-1] + sendcounts[row-1];
    }
    /* the last processor gets one more */
    sendcounts[blocks[0]-1] += globalsizes[1];

    /* allocate my rowdata */
    rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );

    /* perform the scatter of rows */
    MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
                  &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);

}

/* Now, within each row of processors, we can scatter the columns.
 * We can do this as we did in the previous example; create a vector
 * (and localvector) type and scatter accordingly */
int locnrows = blocksize;
if ( isLastRow(myrow, blocks) )
    locnrows++;
MPI_Datatype vec, localvec;
MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);

MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);

int sendcounts[ blocks[1] ];
int senddispls[ blocks[1] ];
if (mycol == 0) {
    for (int col=0; col<blocks[1]; col++) {
        sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
        senddispls[col] = col*blocksize;
    }
}
char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL;

MPI_Scatterv(rowptr, sendcounts, senddispls, vec,
              &(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm);

which is simpler and should be a relatively good balance between performance and robustness.

Running all these three methods works:

bash-3.2$ mpirun -np 6 ./allmethods alltoall
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - alltoall

Rank 0:
abc
hij
opq

Rank 1:
defg
klmn
rstu

Rank 2:
vwx
cde
jkl

Rank 3:
yzab
fghi
mnop

Rank 4:
qrs
xyz
efg
lmn

Rank 5:
tuvw
abcd
hijk
opqr

bash-3.2$ mpirun -np 6 ./allmethods twophasevecs
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - two phase, vectors, then cleanup

Rank 0:
abc
hij
opq

Rank 1:
defg
klmn
rstu

Rank 2:
vwx
cde
jkl

Rank 3:
yzab
fghi
mnop

Rank 4:
qrs
xyz
efg
lmn

Rank 5:
tuvw
abcd
hijk
opqr
bash-3.2$ mpirun -np 6 ./allmethods twophaserowcol
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - two phase - row, cols

Rank 0:
abc
hij
opq

Rank 1:
defg
klmn
rstu

Rank 2:
vwx
cde
jkl

Rank 3:
yzab
fghi
mnop

Rank 4:
qrs
xyz
efg
lmn

Rank 5:
tuvw
abcd
hijk
opqr

The code implementing these methods follows; you can set block sizes to more typical sizes for your problem and run on a realistic number of processors to get some sense of which will be best for your application.

#include <stdio.h>
#include <stdlib.h>
#include <string.h&gt

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...