MapReduce-MPI WWW Site - MapReduce-MPI Documentation

MapReduce compress() method

MapReduce multivalue_blocks() method

MapReduce multivalue_block() method

int MapReduce::compress(void (*mycompress)(char *, int, char *, int, int *, KeyValue *, void *), void *ptr) 
int MapReduce::multivalue_blocks() 
int MapReduce::multivalue_block(int iblock, char **ptr_multivalue, int **ptr_valuesizes) 

This calls the compress() method of a MapReduce object, which compresses a KeyValue object with duplicate keys into a new KeyValue object, where each key appears once (on that processor) and has a single new value. The new value is a combination of the values associated with that key in the original KeyValue object. The mycompress() function you provide generates the new value. The method returns the total number of key/value pairs in the new KeyValue object.

This method is used to compress a large set of key/value pairs produced by the map() method into a smaller set before proceeding with the rest of a MapReduce operation, e.g. with a collate() and reduce().

You can give this method a pointer (void *ptr) which will be returned to your mycompress() function. See the Technical Details section for why this can be useful. Just specify a NULL if you don't need this.

In this example the user function is called mycompress() and it must have the following interface, which is the same as that used by the reduce() method:

void mycompress(char *key, int keybytes, char *multivalue, int nvalues, int *valuebytes, KeyValue *kv, void *ptr) 

A single key/multi-value pair is passed to your function from a temporary KeyMultiValue object created by the library. That object creates a multi-value for each unique key in the KeyValue object which contains a list of the nvalues associated with that key. Note that this is only the values on this processor, not across all processors.

In this case, the char *multivalue argument is a pointer to the beginning of the multi-value which contains all nvalues, packed one after the other. The int *valuebytes argument is an array which stores the length of each value in bytes. If needed, it can be used by your function to compute an offset into char *values for where each individual value begins. Your function is also passed a kv pointer to a new KeyValue object created and stored internally by the MapReduce object.

If the values do not fit in memory, then the meaning of the arguments passed to your function is changed. Your function must call two additional library functions in order to retrieve a block of values that does fit in memory, and process them one block at a time.

In this case, the char *multivalue argument will be NULL, which is how your function can test for this possibility. If you know huge multi-values will not occur or if you don't need to examine the values themselves, then the test is not needed. Nvalues still holds the total number of values in the multi-value. The meaning of the kv and ptr arguments is the same as discussed above. However, the int *valuebytes argument is changed to be a pointer to the MapReduce object. This is to allow you to make the following two kinds of calls back to the library:

MapReduce *mr = (MapReduce *) valuebytes;
int nblocks = mr->multivalue_blocks();
for (int iblock = 0; iblock < nblocks; iblock++)  
  int nv = mr->multivalue_block(iblock,&multivalue,&valuebytes);
  for (int i = 0; i < nv; i++) 
    process each value within the block of values
  
 

The call to multivalue_blocks() returns the number of blocks of values in the multi-value. Each call to multivalue_block() retrieves one block of values. The number of values in the block (nv in this case) is returned. The multivalue and valuebytes arguments are pointers to a char * and int * (i.e. a char ** and int **), which will be set to point to the block of values and their lengths respectively, so they can then be used just as the multivalue and valuebytes arguments in the mycompress() callback itself (when the values do not exceed available memory).

Note that in this example we are re-using (and thus overwriting) the original multivalue and valuebytes arguments as local variables.

Also note that your mycompress() function can call multivalue_block() as many times as it wishes and process the blocks of values multiple times or in any order, though looping through blocks in ascending order will typically give the best disk I/O performance.

Your mycompress() function should typicaly produce a single key/value pair which it registers with the MapReduce object by calling the add() method of the KeyValue object. The syntax for this call is described on the doc page for the KeyValue add() methd. For example, if the set of nvalues were integers, the compressed value might be the sum of those integers.

See the Settings and Technical Details sections for details on the byte-alignment of keys and values that are passed to your mycompress() function and on those you register with the KeyValue add() methods. Note that only the first value of a multi-value (or of each block of values) passed to your mycompress() function will be aligned to the valuealign setting.

This method is an on-processor operation, requiring no communication. When run in parallel, each processor operates only on the key/value pairs it stores. Thus you are NOT compressing all values associated with a particular key across all processors, but only those currently owned by one processor.


Related methods: collate()