/*============================================================================
 * Functions related to the gathering of local arrays based on global ranks
 * in parallel mode.
 *============================================================================*/

/*
  This file is part of the "Finite Volume Mesh" library, intended to provide
  finite volume mesh and associated fields I/O and manipulation services.

  Copyright (C) 2004-2006  EDF

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*/

/*----------------------------------------------------------------------------
 * Standard C library headers
 *----------------------------------------------------------------------------*/

#include <assert.h>
#include <string.h>

/*----------------------------------------------------------------------------
 * BFT library headers
 *----------------------------------------------------------------------------*/

#include <bft_mem.h>
#include <bft_printf.h>

/*----------------------------------------------------------------------------
 *  Local headers
 *----------------------------------------------------------------------------*/

#include <fvm_config_defs.h>
#include <fvm_defs.h>
#include <fvm_io_num.h>
#include <fvm_parall.h>

/*----------------------------------------------------------------------------
 *  Header for the current file
 *----------------------------------------------------------------------------*/

#include <fvm_gather.h>

/*----------------------------------------------------------------------------*/

#ifdef __cplusplus
extern "C" {
#if 0
} /* Fake brace to force back Emacs auto-indentation back to column 0 */
#endif
#endif /* __cplusplus */

/*============================================================================
 * Local structure definitions
 *============================================================================*/

#if defined (FVM_HAVE_MPI)

/*----------------------------------------------------------------------------
 * Structure keeping track of the status of a series of fvm_gather_...()
 * operations by slices of element global I/O number intervals.
 *----------------------------------------------------------------------------*/

struct _fvm_gather_slice_t {

  int         local_rank;      /* Local rank in communicator */
  int         n_ranks;         /* Number of ranks in communicator */

  /* Initial and final global element numbers for all slices */

  fvm_gnum_t  global_num_initial;
  fvm_gnum_t  global_num_final;

  /* First and past the last global element numbers for current slice */

  fvm_gnum_t  ref_slice_size;
  fvm_gnum_t  global_num_slice_start;
  fvm_gnum_t  global_num_slice_end;

  /* Local indexes corresponding to first element of a series with
     global number >= global_num_slice_start initially, and
     global number > global_num_end after a gather operation
     (initialized to 0 before first of a series of calls) */

  fvm_lnum_t  local_index_start;   /* Current start value */
  fvm_lnum_t  local_index_last;    /* Last value reached, to advance */

  /* Number of local entities (constant after initialization) */

  fvm_lnum_t  local_index_end;

  /* Next global element number expected for each rank before and after
     current call (rank 0 only); initialized upon the first call of
     a series of fvm_gather_...() operations. Keeping track of this
     allows us to avoid having to receive empty messages from
     processors having no element in a given slice, which could
     represent the majority of MPI calls for high processor counts */

  fvm_gnum_t  *next_global_num;      /* Current values */
  fvm_gnum_t  *next_global_num_last; /* Next values, to advance */

  /* We may only use the next global element number to avoid
     sending and receiving empty messages when it is up to date;
     this is not yet the case when a slice has just been initialized
     or resized and no operations on it have been done yet */

  _Bool  use_next_global_num;

  /* Use "safe" communication scheme with extra synchronization,
     avoiding calls to MPI_Probe() ? */

  _Bool  safe_mode;

  /* Buffers associated with MPI Datatype creation */

  int       *blocklengths;   /* Required for rank 0 only for most operations */
  MPI_Aint  *displacements;  /* Always required for all ranks; size
                                slice_size + 1 (last position used to
                                exchange next_global_num[] values) */
  MPI_Datatype aint_type;    /* MPI Datatype for MPI_Aint */

};

#endif /* defined (FVM_HAVE_MPI) */

/*=============================================================================
 * Private function definitions
 *============================================================================*/

#if defined (FVM_HAVE_MPI)

#if 0 && defined(DEBUG) && !defined(NDEBUG)

/*----------------------------------------------------------------------------
 * Print information on a fvm_gather_slice_t structure.
 *
 * parameters:
 *   this_slice <-- pointer to structure that should be printed
 *----------------------------------------------------------------------------*/

static void
fvm_gather_slice_dump(fvm_gather_slice_t  * this_slice)
{
  if (this_slice != NULL) {

    bft_printf("slice info:\n"
               "  adress                 = %p\n"
               "  local_rank             = %d\n"
               "  n_ranks                = %d\n"
               "  global_num_initial     = %lu\n"
               "  global_num_final       = %lu\n"
               "  ref_slice_size         = %lu\n"
               "  global_num_slice_start = %lu\n"
               "  global_num_slice_end   = %lu\n"
               "  local_index_start      = %ld\n"
               "  local_index_last       = %ld\n"
               "  local_index_end        = %ld\n"
               "  use_next_global_num    = %d\n",
               this_slice,
               this_slice->local_rank,
               this_slice->n_ranks,
               (long)(this_slice->local_index_end),
               (unsigned long)(this_slice->global_num_initial),
               (unsigned long)(this_slice->global_num_final),
               (unsigned long)(this_slice->ref_slice_size),
               (unsigned long)(this_slice->global_num_slice_start),
               (unsigned long)(this_slice->global_num_slice_end),
               (long)(this_slice->local_index_start),
               (long)(this_slice->local_index_last),
               (long)(this_slice->local_index_end),
               (int)(this_slice->use_next_global_num));

    if (this_slice->next_global_num != NULL) {
      int i;
      bft_printf("    rank | next_global_num | next_global_num_last\n");
      for (i = 0; i < this_slice->n_ranks; i++) {
        bft_printf(" %7d |    %12lu |   %12lu\n",
                   i,
                   (unsigned long)(this_slice->next_global_num[i]),
                   (unsigned long)(this_slice->next_global_num_last[i]));
      }
    }
  }
}

#endif /* 0 && defined(DEBUG) && !defined(NDEBUG) */

#endif /* defined (FVM_HAVE_MPI) */

/*=============================================================================
 * Public function definitions
 *============================================================================*/

#if defined (FVM_HAVE_MPI)

/*----------------------------------------------------------------------------
 * Create a fvm_gather_slice_t structure.
 *
 * parameters:
 *   entity_io_num    <-- I/O numbering structure associated with slice entity
 *   slice_size       <-- reference slice size
 *   comm             <-- associated MPI communicator
 *----------------------------------------------------------------------------*/

fvm_gather_slice_t *
fvm_gather_slice_create(const fvm_io_num_t  *entity_io_num,
                        const fvm_gnum_t     slice_size,
                        MPI_Comm             comm)
{
  int i;
  int  local_rank, n_ranks;
  fvm_gather_slice_t  *this_slice;

  /* Get local rank and size of the current MPI communicator */
  MPI_Comm_rank(comm, &local_rank);
  MPI_Comm_size(comm, &n_ranks);

  /* Allocate and initialize slice structure */

  BFT_MALLOC(this_slice, 1, fvm_gather_slice_t);

  this_slice->local_rank = local_rank;
  this_slice->n_ranks = n_ranks;

  this_slice->global_num_initial = 1;
  this_slice->global_num_final = fvm_io_num_get_global_count(entity_io_num);

  this_slice->ref_slice_size = slice_size;
  this_slice->global_num_slice_start = 1;
  this_slice->global_num_slice_end = 1;

  this_slice->local_index_end = fvm_io_num_get_local_count(entity_io_num);

  this_slice->local_index_start = 0;
  this_slice->local_index_last = 0;

  /* Allocate and initialize "next expected global number" arrays */

  if (local_rank == 0) {

    BFT_MALLOC(this_slice->next_global_num, n_ranks, fvm_gnum_t);
    BFT_MALLOC(this_slice->next_global_num_last, n_ranks, fvm_gnum_t);

    for (i = 0; i < n_ranks; i++) {
      this_slice->next_global_num[i] = 0;
      this_slice->next_global_num_last[i] = 0;
    }

  }
  else {
    this_slice->next_global_num = NULL;
    this_slice->next_global_num_last = NULL;
  }

  this_slice->use_next_global_num = false;

  this_slice->safe_mode = fvm_parall_get_safe_gather_mode();

  /* Allocated buffers (blocklengths allocated only if needed for ranks > 0) */

  if (local_rank == 0)
    BFT_MALLOC(this_slice->blocklengths, slice_size, int);
  else
    this_slice->blocklengths = NULL;

  BFT_MALLOC(this_slice->displacements, slice_size + 1,  MPI_Aint);

  /* Associate "best" predefined datatype to MPI_Aint (so as to
     handle 32/64 bit adresses while avoiding
     MPI_BYTE * sizeof(MPI_Aint), which could not handle
     big-endian/little-endian conversion on heterogeneous machines */

  if (sizeof(MPI_Aint) == sizeof(unsigned))
    this_slice->aint_type = MPI_UNSIGNED;
  else if (sizeof(MPI_Aint) == sizeof(unsigned long))
    this_slice->aint_type = MPI_UNSIGNED_LONG;
#if defined(MPI_UNSIGNED_LONG_LONG)
  else if (sizeof(MPI_Aint) == sizeof(unsigned long long))
    this_slice->aint_type = MPI_UNSIGNED_LONG_LONG;
#endif
  else
    bft_error(__FILE__, __LINE__, 0,
              _("fvm_gather_slice_create() is unable to find\n"
                "an MPI datatype matching MPI_Aint."));

  return this_slice;
}

/*----------------------------------------------------------------------------
 * Destroy a fvm_gather_slice_t structure.
 *
 * parameters:
 *   this_slice <-- pointer to structure that should be destroyed
 *
 * returns:
 *   NULL pointer
 *----------------------------------------------------------------------------*/

fvm_gather_slice_t *
fvm_gather_slice_destroy(fvm_gather_slice_t  * this_slice)
{
  if (this_slice != NULL) {

    if (this_slice->next_global_num != NULL) {
      BFT_FREE(this_slice->next_global_num);
      BFT_FREE(this_slice->next_global_num_last);
    }

    if (this_slice->blocklengths != NULL)
      BFT_FREE(this_slice->blocklengths);

    BFT_FREE(this_slice->displacements);

    BFT_FREE(this_slice);

  }

  return NULL;
}

/*----------------------------------------------------------------------------
 * Advance a fvm_gather_slice_t structure to the next start and end values.
 *
 * Elements within this slice will be those for whose global number
 * is >= global_num_start and < global_num_end.
 *
 * parameters:
 *   this_slice        <-- pointer to structure that should be advanced
 *   global_num_start  --> new current global slice start number
 *   global_num_end    --> new current global slice past the end number
 *
 * returns:
 *   0 if the end of the slice has not been reached before this call,
 *   1 if we have already attained the end of the slice.
 *----------------------------------------------------------------------------*/

int
fvm_gather_slice_advance(fvm_gather_slice_t  *this_slice,
                         fvm_gnum_t          *global_num_start,
                         fvm_gnum_t          *global_num_end)
{
  int retval = 0;

  if (this_slice != NULL) {

    if (this_slice->global_num_slice_end > this_slice->global_num_final)
      retval = 1;

    this_slice->global_num_slice_start
      = this_slice->global_num_slice_end;

    this_slice->global_num_slice_end
      = this_slice->global_num_slice_start + this_slice->ref_slice_size;

    if (  this_slice->global_num_slice_end
        > this_slice->global_num_final + 1)
      this_slice->global_num_slice_end = this_slice->global_num_final + 1;

    this_slice->local_index_start = this_slice->local_index_last;

    if (this_slice->next_global_num != NULL) {
      int i;
      for (i = 0; i < this_slice->n_ranks; i++)
        this_slice->next_global_num[i]
          = this_slice->next_global_num_last[i];
    }

    if (   this_slice->global_num_slice_start
        != this_slice->global_num_initial)
      this_slice->use_next_global_num = true;

    *global_num_start = this_slice->global_num_slice_start;
    *global_num_end = this_slice->global_num_slice_end;

  }

  return retval;
}

/*----------------------------------------------------------------------------
 * Reset a fvm_gather_slice_t structure to its initial state.
 *
 * parameters:
 *   this_slice <-- pointer to structure that should be reinitialized
 *----------------------------------------------------------------------------*/

void
fvm_gather_slice_reinitialize(fvm_gather_slice_t  *this_slice)
{
  if (this_slice != NULL) {

    this_slice->global_num_slice_start = this_slice->global_num_initial;
    this_slice->global_num_slice_end = this_slice->global_num_initial;

    this_slice->local_index_start = 0;
    this_slice->local_index_last = 0;

    if (this_slice->next_global_num != NULL) {
      int i;
      for (i = 0; i < this_slice->n_ranks; i++) {
        this_slice->next_global_num[i] = 0;
        this_slice->next_global_num_last[i] = 0;
      }
    }

    this_slice->use_next_global_num = false;
  }
}

/*----------------------------------------------------------------------------
 * Limit an fvm_gather_slice_t structure's end value.
 *
 * This allows setting a lower global_num_end value than that previously
 * defined (which may be necessary when buffer size limits require it).
 *
 * parameters:
 *   this_slice        <-- pointer to structure that should be advanced
 *   global_num_end    --> new current global slice past the end number
 *----------------------------------------------------------------------------*/

void
fvm_gather_slice_limit(fvm_gather_slice_t  *this_slice,
                       fvm_gnum_t          *global_num_end)
{
  if (*global_num_end != this_slice->global_num_slice_end) {

    if (*global_num_end > this_slice->global_num_final)
      *global_num_end = this_slice->global_num_final;

    this_slice->global_num_slice_end = *global_num_end;

    /* If slice size is changed, the next_global_num[] array of rank 0
       may not be up to date in certain cases, so do not use it */

    this_slice->use_next_global_num = false;

  }
}

/*----------------------------------------------------------------------------
 * Build a slice index (0 to n-1 numbering) on rank 0 from local index arrays.
 *
 * This is done by computing the local block lengths from the local
 * index, gathering those lengths to rank 0, and rebuilding a 0 to n-1
 * numbered slice index on rank 0.
 *
 * This function is intended to be used within a loop on subsets of the global
 * lengths array (so as to enable writing to file or sending to an
 * external process without requiring the full array to reside in the process
 * directly handling I/O's memory). As such, it avoids allocating its own
 * working arrays (buffers), so that they may be allocated outside the loop
 * and reused for each call (avoiding the overhead associated with memory
 * allocation).
 *
 * All or most elements in a given portion may belong to a same process rank
 * (depending on mesh numbering and domain splitting). To account for
 * this, for each process rank, the slice_index[] arrays must be large
 * enough to contain (slice_size * stride) values, even though most processes
 * will require less.
 *
 * parameters:
 *   local_index      <-- local index array
 *   slice_index      --> global slice index section for elements
 *                        slice global_num_start to global_num_end
 *                        (output for rank 0, working array only for others)
 *   element_io_num   <-- I/O numbering structure associated with elements
 *   comm             <-- MPI communicator for structures considered
 *   this_slice       <-> structure for management of slice status
 *----------------------------------------------------------------------------*/

void
fvm_gather_slice_index(const fvm_lnum_t     local_index[],
                       fvm_gnum_t           slice_index[],
                       const fvm_io_num_t  *element_io_num,
                       MPI_Comm             comm,
                       fvm_gather_slice_t  *this_slice)
{
  int  i, j;
  int  n_local_entities, n_distant_entities;
  int  size_mult;
  fvm_lnum_t  local_index_start, local_index_stop;

  /* MPI related variables */
  MPI_Status status;
  MPI_Datatype fvm_index_type;
  int  distant_rank;
  const int local_rank = this_slice->local_rank;
  const int n_ranks = this_slice->n_ranks;
  int  *const blocklengths = this_slice->blocklengths;
  MPI_Aint *const displacements = this_slice->displacements;
  const MPI_Datatype aint_type = this_slice->aint_type;

  /* Local number of elements */
  const fvm_lnum_t  local_index_end = this_slice->local_index_end;

  /* Global numbering */
  const fvm_gnum_t global_num_start = this_slice->global_num_slice_start;
  const fvm_gnum_t global_num_end = this_slice->global_num_slice_end;
  const fvm_gnum_t *entity_global_num
                      = fvm_io_num_get_global_num(element_io_num);

  /* Initialize blocklengths and displacements */

  local_index_start = this_slice->local_index_start;

  assert(   local_index_start >= local_index_end
         || entity_global_num[local_index_start] >= global_num_start);

  if (local_rank == 0) {
    int n_entities_max = (int)(global_num_end - global_num_start);
    for (i = 0; i < n_entities_max; i++)
      blocklengths[i] = 1;
  }

  /* For local use on rank 0, displacements should be expressed
     in numbers of fvm_gnum_t elements; for other ranks,
     they will be expressed in bytes, as we use MPI_Datatype_hindexed() */

  size_mult = (local_rank == 0 ? 1 : sizeof(fvm_gnum_t));

  for (i = 0, j = local_index_start;
       i < local_index_end && j < local_index_end
                           && entity_global_num[j] < global_num_end;
       i++, j++) {
    displacements[i] = (MPI_Aint)(  (entity_global_num[j] - global_num_start)
                                  * size_mult);
  }

  n_local_entities = i;
  local_index_stop = local_index_start + n_local_entities;
  this_slice->local_index_last = local_index_stop;

  if (local_index_stop < local_index_end)
    displacements[n_local_entities] = entity_global_num[j];
  else
    displacements[n_local_entities] = this_slice->global_num_final + 1;

  /*
    Prepare send buffer:
    For rank 0, set "final" values directly;
    values are lengths and not indexes at this stage.
  */

  if (local_rank == 0) {
    for (i = 0, j = local_index_start;
         i < n_local_entities;
         i++, j++) {
      slice_index[displacements[i]] = local_index[j+1] - local_index[j];
    }
  }
  else {
    int n_local_values = n_local_entities;
    for (i = 0, j = local_index_start;
         i < n_local_values;
         i++, j++) {
      slice_index[i] = local_index[j+1] - local_index[j];
    }
  }

  /* Gather lengths information from ranks > 0 */

  if (local_rank == 0) {

    for (distant_rank = 1; distant_rank < n_ranks; distant_rank++) {

      /* Get index from distant rank */

      if (   this_slice->next_global_num[distant_rank] < global_num_end
          || this_slice->use_next_global_num == false) {

        if (this_slice->safe_mode == true) {
          MPI_Send(&distant_rank, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm);
          MPI_Recv(&n_distant_entities, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm, &status);
        }
        else {
          MPI_Probe(distant_rank, FVM_MPI_TAG, comm, &status);
          MPI_Get_count(&status, aint_type, &n_distant_entities);
        }

        MPI_Recv(displacements, n_distant_entities, aint_type,
                 distant_rank, FVM_MPI_TAG, comm, &status);

        n_distant_entities -= 1;
        this_slice->next_global_num_last[distant_rank]
          = displacements[n_distant_entities];

        if (n_distant_entities > 0) {

          /*
            Define indexed datatype so as to directly receive
            to correct location
          */

          MPI_Type_hindexed(n_distant_entities, blocklengths, displacements,
                            FVM_MPI_LNUM, &fvm_index_type);
          MPI_Type_commit(&fvm_index_type);

          /* Get portion of lengths array from distant rank */

          MPI_Recv(slice_index, 1, fvm_index_type,
                   distant_rank, FVM_MPI_TAG, comm, &status);

          MPI_Type_free(&fvm_index_type);

        }

      }

    }

    /* Transform lengths to global index (0 to n-1)*/

    {
      int l_cur;
      int l_sum = 0;
      int n_entities_max = (int)(global_num_end - global_num_start);
      for (i = 0; i < n_entities_max; i++) {
        l_cur = slice_index[i];
        slice_index[i] = l_sum;
        l_sum += l_cur;
      }
      slice_index[n_entities_max] = l_sum;
    }

  }
  else {

    if (   n_local_entities > 0
        || this_slice->use_next_global_num == false) {

      /* Send local index to rank zero */

      if (this_slice->safe_mode == true) {
        int buf_val;
        MPI_Recv(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm, &status);
        buf_val = n_local_entities + 1;
        MPI_Send(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm);
      }

      MPI_Send(displacements, n_local_entities + 1,
               aint_type, 0, FVM_MPI_TAG, comm);

      /* Send local portion of lengths array to rank zero */

      if (n_local_entities > 0)
        MPI_Send(slice_index, (int)n_local_entities,
                 FVM_MPI_LNUM, 0, FVM_MPI_TAG, comm);

    }

  }

#if 0 && defined(DEBUG) && !defined(NDEBUG)
  MPI_Barrier(comm);
#endif

}

/*----------------------------------------------------------------------------
 * Recompute maximum value of global_num_end and slice connectivity size for
 * an indexed connectivity slice.
 *
 * Given an initial global connectivity buffer size associated with the
 * slice (global_connect_s_size), this function verifies that the connectivity
 * associated with the slice from global_num_start to global_num_end may fit
 * in this buffer. If this is not the case, global_num_end is reduced to the
 * largest value such that the associated indexed connectivity or values may
 * fit in the indicated buffer size.
 *
 * In any case, slice size will neither be increased above the current
 * slice size, nor be reduced to less than
 * min(n_g_elements, n_elements_s_min) if initially larger than this.
 * If necessary, global_connect_s_size is increased so that this minimal
 * slice may fit in a buffer of this same size.
 *
 * parameters:
 *   n_elements_s_min      <-- minimum number of elements per slice desired
 *   global_num_end        --> new current global slice past the end number
 *   global_connect_s_size <-> pointer to global connectivity slice size
 *   comm                  <-- associated MPI communicator
 *   slice_index           <-- index of blocks corresponding to a given
 *                             element in the global_connect_s array
 *                             (required for rank 0 only)
 *   this_slice            <-> structure for management of slice status
 *----------------------------------------------------------------------------*/

void
fvm_gather_resize_indexed_slice(const fvm_gnum_t     n_elements_s_min,
                                fvm_gnum_t          *global_num_end,
                                fvm_gnum_t          *global_connect_s_size,
                                MPI_Comm             comm,
                                const fvm_gnum_t     slice_index[],
                                fvm_gather_slice_t  *this_slice)
{
  fvm_gnum_t  i_s;
  fvm_gnum_t  buf[2];

  fvm_gnum_t global_num_start = this_slice->global_num_slice_start;

  const int local_rank = this_slice->local_rank;

  *global_num_end = this_slice->global_num_slice_end;

  /* Recompute maximum value of global_num_end for this slice */

  if (local_rank == 0) {

    for (i_s = 1; i_s < *global_num_end - global_num_start + 1; i_s++) {
      if (slice_index[i_s] > *global_connect_s_size) {
        *global_num_end = global_num_start + i_s - 1;
        break;
      }
    }

    /* If possible, size to at least n_elements_s_min (unless
       reference slice size is smaller or already at end of slice) */

    if (*global_num_end - global_num_start < n_elements_s_min) {

      *global_num_end = global_num_start + n_elements_s_min;

      if (*global_num_end - global_num_start > this_slice->ref_slice_size)
        *global_num_end = global_num_start + this_slice->ref_slice_size;

      if (*global_num_end > this_slice->global_num_final + 1)
        *global_num_end = this_slice->global_num_final + 1;

      /* Limit to initial global_num_end */

      if (*global_num_end > this_slice->global_num_slice_end)
        *global_num_end = this_slice->global_num_slice_end;

      *global_connect_s_size =
        FVM_MAX(*global_connect_s_size,
                slice_index[*global_num_end - global_num_start]);

    }

    buf[0] = *global_num_end, buf[1] = *global_connect_s_size;

  }

  MPI_Bcast(buf, 2, FVM_MPI_GNUM, 0, comm);

  /* Modify (reduce) slice size limit if necessary */

  fvm_gather_slice_limit(this_slice, &(buf[0]));

  /* Set output values */

  *global_num_end = buf[0];
  *global_connect_s_size = buf[1];

#if 0 && defined(DEBUG) && !defined(NDEBUG)
  MPI_Barrier(comm);
#endif

}

/*----------------------------------------------------------------------------
 * Gather a given portion of an array to rank 0.
 *
 * This function is intended to be used within a loop on subsets of the global
 * array (so as to enable writing to file or sending to an external process
 * without requiring the full array to reside in the process directly
 * handling I/O's memory). As such, it avoids allocating its own working arrays
 * (buffers), so that they may be allocated outside the loop and reused for
 * each call (avoiding the overhead associated with memory allocation).
 *
 * All or most elements in a given portion may belong to a same process rank
 * (depending on mesh numbering and domain splitting). To account for
 * this, for each process rank, the global_array_s[] array must be large
 * enough to contain (slice_size * stride) values, even though most processes
 * will require less.
 *
 * parameters:
 *   local_array      <-- local array (size n_local_elements * stride)
 *   global_array_s   --> global array section for elements
 *                        slice global_num_start to global_num_end
 *                        (output for rank 0, working array only for others)
 *   datatype         <-- MPI datatype of each value
 *   stride           <-- number of (interlaced) values per element
 *   element_io_num   <-- I/O numbering structure associated with elements
 *   comm             <-- MPI communicator for structures considered
 *   this_slice       <-> structure for management of slice status
 *----------------------------------------------------------------------------*/

void
fvm_gather_array(const void          *local_array,
                 void                *global_array_s,
                 MPI_Datatype         datatype,
                 size_t               stride,
                 const fvm_io_num_t  *element_io_num,
                 MPI_Comm             comm,
                 fvm_gather_slice_t  *this_slice)
{
  int  n_local_entities, n_distant_entities;
  size_t  i, j, k;
  size_t  size_mult;
  fvm_lnum_t  local_index_start, local_index_stop;

  const char *local_array_val = local_array;
  char *global_array_s_val = global_array_s;

  /* MPI related variables */
  MPI_Status  status;
  MPI_Aint  extent;
  int  size, distant_rank;
  const int local_rank = this_slice->local_rank;
  const int n_ranks = this_slice->n_ranks;
  int  *const blocklengths = this_slice->blocklengths;
  MPI_Aint *const displacements = this_slice->displacements;
  const MPI_Datatype aint_type = this_slice->aint_type;

  /* Local number of elements */
  const fvm_lnum_t  local_index_end = this_slice->local_index_end;

  /* Global numbering */
  const fvm_gnum_t global_num_start = this_slice->global_num_slice_start;
  const fvm_gnum_t global_num_end = this_slice->global_num_slice_end;
  const fvm_gnum_t *entity_global_num
                      = fvm_io_num_get_global_num(element_io_num);

  /* Get info on the current MPI communicator */

  /* Get extent of datatype */
  MPI_Type_extent(datatype, &extent);
  MPI_Type_size(datatype, &size);

  if (extent != size)
    bft_error(__FILE__, __LINE__, 0,
              _("fvm_gather_array() is not implemented for use with\n"
                "MPI datatypes associated with structures using padding\n"
                "(for which size != extent)."));

  /* Initialize blocklengths and displacements */

  local_index_start = this_slice->local_index_start;

  assert(   local_index_start >= local_index_end
         || entity_global_num[local_index_start] >= global_num_start);

  if (local_rank == 0) {
    int n_entities_max = (int)(global_num_end - global_num_start);
    for (i = 0; i < (size_t)n_entities_max; i++)
      blocklengths[i] = stride;
  }

  /* Displacements should be expressed in bytes
     (we will use MPI_Datatype_hindexed() */

  size_mult = size * stride;

  for (i = 0, j = local_index_start;
       (   i < (size_t)local_index_end
        && j < (size_t)local_index_end
        && entity_global_num[j] < global_num_end);
       i++, j++) {
    displacements[i] = (MPI_Aint)(  (entity_global_num[j] - global_num_start)
                                  * size_mult);
  }

  n_local_entities = i;
  local_index_stop = local_index_start + n_local_entities;
  this_slice->local_index_last = local_index_stop;

  if (local_index_stop < local_index_end)
    displacements[n_local_entities] = entity_global_num[j];
  else
    displacements[n_local_entities] = this_slice->global_num_final + 1;

  /*
    Prepare send buffer (we use a copy to ensure constedness of input)
    For rank 0, set final values directly.
  */

  if (local_rank == 0) {
    for (i = 0, j = (size_t)local_index_start;
         i < (size_t)n_local_entities;
         i++, j++) {
      for (k = 0; k < size_mult; k++) {
        global_array_s_val[displacements[i] + k]
          = local_array_val[j*size_mult + k];
      }
    }
  }
  else
    memcpy(global_array_s_val,
           local_array_val+(local_index_start*size_mult),
           n_local_entities*size_mult);

  /* Gather connectivity information from ranks > 0 */

  if (local_rank == 0) {

    for (distant_rank = 1; distant_rank < n_ranks; distant_rank++) {

      /* Get index from distant rank */

      if (   this_slice->next_global_num[distant_rank] < global_num_end
          || this_slice->use_next_global_num == false) {

        if (this_slice->safe_mode == true) {
          MPI_Send(&distant_rank, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm);
          MPI_Recv(&n_distant_entities, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm, &status);
        }
        else {
          MPI_Probe(distant_rank, FVM_MPI_TAG, comm, &status);
          MPI_Get_count(&status, aint_type, &n_distant_entities);
        }

        MPI_Recv(displacements, n_distant_entities, aint_type,
                 distant_rank, FVM_MPI_TAG, comm, &status);

        n_distant_entities -= 1;
        this_slice->next_global_num_last[distant_rank]
          = displacements[n_distant_entities];

        if (n_distant_entities > 0) {

          MPI_Datatype  fvm_index_type;

          /*
            Define indexed datatype so as to directly receive
            to correct location
          */

          MPI_Type_hindexed(n_distant_entities, blocklengths, displacements,
                            datatype, &fvm_index_type);
          MPI_Type_commit(&fvm_index_type);

          /* Get portion of connectivity array from distant rank */

          MPI_Recv(global_array_s, 1, fvm_index_type,
                   distant_rank, FVM_MPI_TAG, comm, &status);

          MPI_Type_free(&fvm_index_type);

        }

      }

    }

  }
  else {

    if (   n_local_entities > 0
        || this_slice->use_next_global_num ==false) {

      /* Send local index to rank zero */

      if (this_slice->safe_mode == true) {
        int buf_val;
        MPI_Recv(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm, &status);
        buf_val = n_local_entities + 1;
        MPI_Send(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm);
      }

      MPI_Send(displacements, n_local_entities + 1, aint_type, 0,
               FVM_MPI_TAG, comm);

      /* Send local portion of connectivity array to rank zero */

      if (n_local_entities > 0)
        MPI_Send(global_array_s, (int)(n_local_entities * stride),
                 datatype, 0, FVM_MPI_TAG, comm);

    }

  }

#if 0 && defined(DEBUG) && !defined(NDEBUG)
  MPI_Barrier(comm);
#endif

}

/*----------------------------------------------------------------------------
 * Gather a given portion of an indexed array of to rank 0.
 *
 * A slice_index[] array indicating the index (0 to n-1) of blocks in
 * the slice is required for rank 0. This implies that the block sizes in
 * the slice have already been gathered through the use of
 * fvm_gather_slice_index() or some similar method, and used to build this
 * slice index.
 *
 * This function is intended to be used within a loop on subsets of the global
 * lengths array (so as to enable writing to file or sending to an
 * external process without requiring the full array to reside in the process
 * directly handling I/O's memory). As such, it avoids allocating its own
 * working arrays (buffers), so that they may be allocated outside the loop
 * and reused for each call (avoiding the overhead associated with memory
 * allocation).
 *
 * All or most elements in a given portion may belong to a same process rank
 * (depending on mesh numbering and domain splitting). To account for
 * this, for each process rank, the global_lengths_s[] arrays must be large
 * enough to contain (slice_index[current_slice_size] - 1) values, even
 * though most processes will require less.
 * Use fvm_gather_resize_indexed_slice() to adjust current_slice_size.
 *
 * parameters:
 *   local_array      <-- local array
 *                        (size: local_index[n_local_elements] * stride)
 *   global_array_s   --> global array section for elements
 *                        slice global_num_start to global_num_end
 *                        (output for rank 0, working array only for others)
 *   datatype         <-- MPI datatype of each value
 *   local_index      <-- local index array
 *   element_io_num   <-- I/O numbering structure associated with elements
 *   comm             <-- MPI communicator for structures considered
 *   slice_index      <-- index of blocks corresponding to a given
 *                        element in the global_numbers_s array
 *                        (required for rank 0 only)
 *   this_slice       <-> structure for management of slice status
 *----------------------------------------------------------------------------*/

void
fvm_gather_indexed(const void          *local_array,
                   void                *global_array_s,
                   const MPI_Datatype   datatype,
                   const fvm_lnum_t     local_index[],
                   const fvm_io_num_t  *element_io_num,
                   MPI_Comm             comm,
                   const fvm_gnum_t     slice_index[],
                   fvm_gather_slice_t  *this_slice)
{
  int  i, j, k, l;
  int  n_local_entities, n_distant_entities;
  int  n_values_send = 0;
  fvm_lnum_t  local_index_start, local_index_stop;

  const char *local_array_val = local_array;
  char *global_array_s_val = global_array_s;

  /* MPI related variables */
  MPI_Status status;
  MPI_Datatype fvm_index_type, displacement_type;
  MPI_Aint  extent;
  int  size, distant_rank;
  const int local_rank = this_slice->local_rank;
  const int n_ranks = this_slice->n_ranks;
  int  *blocklengths = this_slice->blocklengths;
  MPI_Aint *const displacements = this_slice->displacements;
  const MPI_Datatype aint_type = this_slice->aint_type;

  /* Local number of elements */
  const fvm_lnum_t  local_index_end = this_slice->local_index_end;

  /* Global numbering */
  const fvm_gnum_t global_num_start = this_slice->global_num_slice_start;
  const fvm_gnum_t global_num_end = this_slice->global_num_slice_end;
  const fvm_gnum_t *entity_global_num
                      = fvm_io_num_get_global_num(element_io_num);

  /* Get extent of datatype */
  MPI_Type_extent(datatype, &extent);
  MPI_Type_size(datatype, &size);

  if (extent != size)
    bft_error(__FILE__, __LINE__, 0,
              _("fvm_gather_indexed_array() is not implemented for use with\n"
                "MPI datatypes associated with structures using padding\n"
                "(for which size != extent)."));

  /* Initialize blocklengths and displacements; this operation
     requires a blocklengths[] array for all ranks (contrary
     to others), so it updates the slice structure accordingly */

  if (blocklengths == NULL) {
    BFT_MALLOC(this_slice->blocklengths, this_slice->ref_slice_size, int);
    blocklengths = this_slice->blocklengths;
  }

  local_index_start = this_slice->local_index_start;

  assert(   local_index_start >= local_index_end
         || entity_global_num[local_index_start] >= global_num_start);

  /* Displacements are first used to transfer the global slice index position
     of a given entity, and will be set to the index value later on rank 0 */

  if (sizeof(MPI_Aint) == sizeof(unsigned))
    displacement_type = MPI_UNSIGNED;
#if defined(MPI_UNSIGNED_LONG_LONG)
  else if (sizeof(MPI_Aint) == sizeof(unsigned long long))
    displacement_type = MPI_UNSIGNED_LONG_LONG;
#endif
  else
    displacement_type = MPI_UNSIGNED_LONG;

  for (i = 0, j = local_index_start;
       j < local_index_end && entity_global_num[j] < global_num_end;
       i++, j++)
    displacements[i] = (MPI_Aint)(entity_global_num[j] - global_num_start);

  n_local_entities = i;
  local_index_stop = local_index_start + n_local_entities;
  this_slice->local_index_last = local_index_stop;

  if (local_index_stop < local_index_end)
    displacements[n_local_entities] = entity_global_num[j];
  else
    displacements[n_local_entities] = this_slice->global_num_final + 1;

  /*
    Prepare send buffer:
    For rank 0, set final values directly; for others, set blocklengths
    to be sent to rank 0.
  */

  if (local_rank == 0) {
    for (i = 0, j = local_index_start;
         i < n_local_entities;
         i++, j++) {
      const size_t displacement = slice_index[displacements[i]] * size;
      for (k = local_index[j]*(int)size, l = 0;
           k < (local_index[j+1])*(int)size;
           k++, l++)
        global_array_s_val[displacement + l] = local_array_val[k];
    }
  }
  else {
    n_values_send = (  local_index[local_index_start + n_local_entities]
                     - local_index[local_index_start]);
    memcpy(global_array_s_val,
           local_array_val + ((local_index[local_index_start]) * size),
           n_values_send * size);
    for (i = 0, j = local_index_start;
         i < n_local_entities;
         i++, j++) {
      blocklengths[i] = (local_index[j+1] - local_index[j]);
    }
  }

  /* Gather numbers information from ranks > 0 */

  if (local_rank == 0) {

    for (distant_rank = 1; distant_rank < n_ranks; distant_rank++) {

      /* Get index from distant rank */

      if (   this_slice->next_global_num[distant_rank] < global_num_end
          || this_slice->use_next_global_num == false) {

        if (this_slice->safe_mode == true) {
          MPI_Send(&distant_rank, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm);
          MPI_Recv(&n_distant_entities, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm, &status);
        }
        else {
          MPI_Probe(distant_rank, FVM_MPI_TAG, comm, &status);
          MPI_Get_count(&status, aint_type, &n_distant_entities);
        }

      /* Get index from distant rank */

        MPI_Recv(displacements, n_distant_entities, displacement_type,
                 distant_rank, FVM_MPI_TAG, comm, &status);

        n_distant_entities -= 1;
        this_slice->next_global_num_last[distant_rank]
          = displacements[n_distant_entities];

        for (i = 0; i < n_distant_entities; i++) {
          j = displacements[i];
          blocklengths[i] = slice_index[j+1] - slice_index[j];
          displacements[i] = (MPI_Aint)(slice_index[j] * size);
        }

        if (n_distant_entities > 0) {

          /*
            Define indexed datatype so as to directly receive
            to correct location
          */

          MPI_Type_hindexed(n_distant_entities, blocklengths, displacements,
                            datatype, &fvm_index_type);
          MPI_Type_commit(&fvm_index_type);

          /* Get portion of numbers array from distant rank */

          MPI_Recv(global_array_s, 1, fvm_index_type,
                   distant_rank, FVM_MPI_TAG, comm, &status);

          MPI_Type_free(&fvm_index_type);

        }

      }

    }

  }
  else {

    if (   n_local_entities > 0
        || this_slice->use_next_global_num == false) {

      /* Send local index to rank zero */

      if (this_slice->safe_mode == true) {
        int buf_val;
        MPI_Recv(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm, &status);
        buf_val = n_local_entities + 1;
        MPI_Send(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm);
      }

      MPI_Send(displacements, n_local_entities + 1, displacement_type,
               0, FVM_MPI_TAG, comm);

      /* Send local portion of numbers array to rank zero */

      if (n_local_entities > 0)
        MPI_Send(global_array_s, (int)n_values_send,
                 datatype, 0, FVM_MPI_TAG, comm);

    }

  }

#if 0 && defined(DEBUG) && !defined(NDEBUG)
  MPI_Barrier(comm);
#endif

}

/*----------------------------------------------------------------------------
 * Gather a given portion of a strided (i.e. regular) connectivity array
 * to rank 0. Connectivity values are converted from local to global values
 * (both with 1 to n type numbering).
 *
 * This function is intended to be used within a loop on subsets of the global
 * connectivity array (so as to enable writing to file or sending to an
 * external process without requiring the full array to reside in the process
 * directly handling I/O's memory). As such, it avoids allocating its own
 * working arrays (buffers), so that they may be allocated outside the loop
 * and reused for each call (avoiding the overhead associated with memory
 * allocation).
 *
 * All or most elements in a given portion may belong to a same process rank
 * (depending on mesh numbering and domain splitting). To account for
 * this, for each process rank, the global_connect_s[] array must be large
 * enough to contain (slice_size * stride) values, even though most processes
 * will require less.
 *
 * parameters:
 *   local_connect    <-- local connectivity array (1 to n numbering)
 *   global_connect_s --> global connectivity array section for elements
 *                        slice global_num_start to global_num_end
 *                        (output for rank 0, working array only for others)
 *   stride           <-- number of connected entities (i.e. vertices in
 *                        a nodal connectivity) per element
 *   connected_io_num <-- I/O numbering structure associated with "connected"
 *                        entities (i.e. vertices in a nodal connectivity)
 *   element_io_num   <-- I/O numbering structure associated with elements
 *   comm             <-- MPI communicator for structures considered
 *   this_slice       <-> structure for management of slice status
 *----------------------------------------------------------------------------*/

void
fvm_gather_strided_connect(const fvm_lnum_t    local_connect[],
                           fvm_gnum_t          global_connect_s[],
                           const int           stride,
                           const fvm_io_num_t  *connected_io_num,
                           const fvm_io_num_t  *element_io_num,
                           MPI_Comm             comm,
                           fvm_gather_slice_t  *this_slice)
{
  int  i, j, k;
  int  n_local_entities, n_distant_entities;
  int  size_mult;
  fvm_lnum_t  local_index_start, local_index_stop;

  /* MPI related variables */
  MPI_Status status;
  MPI_Datatype fvm_index_type;
  int  distant_rank;
  const int local_rank = this_slice->local_rank;
  const int n_ranks = this_slice->n_ranks;
  int  *const blocklengths = this_slice->blocklengths;
  MPI_Aint *const displacements = this_slice->displacements;
  const MPI_Datatype aint_type = this_slice->aint_type;

  /* Local number of elements */
  const fvm_lnum_t  local_index_end = this_slice->local_index_end;

  /* Global numbering */
  const fvm_gnum_t global_num_start = this_slice->global_num_slice_start;
  const fvm_gnum_t global_num_end = this_slice->global_num_slice_end;
  const fvm_gnum_t *connected_global_num
                      = fvm_io_num_get_global_num(connected_io_num);
  const fvm_gnum_t *entity_global_num
                      = fvm_io_num_get_global_num(element_io_num);

  /* Initialize blocklengths and displacements */

  local_index_start = this_slice->local_index_start;

  assert(   local_index_start >= local_index_end
         || entity_global_num[local_index_start] >= global_num_start);

  if (local_rank == 0) {
    int n_entities_max = (int)(global_num_end - global_num_start);
    for (i = 0; i < n_entities_max; i++)
      blocklengths[i] = stride;
  }

  /* For local use on rank 0, displacements should be expressed
     in numbers of fvm_gnum_t elements; for other ranks,
     they will be expressed in bytes, as we use MPI_Datatype_hindexed() */

  size_mult = (local_rank == 0 ? 1 : sizeof(fvm_gnum_t)) * stride;

  for (i = 0, j = local_index_start;
       i < local_index_end && j < local_index_end
                           && entity_global_num[j] < global_num_end;
       i++, j++) {
    displacements[i] = (MPI_Aint)(  (entity_global_num[j] - global_num_start)
                                  * size_mult);
  }

  n_local_entities = i;
  local_index_stop = local_index_start + n_local_entities;
  this_slice->local_index_last = local_index_stop;

  if (local_index_stop < local_index_end)
    displacements[n_local_entities] = entity_global_num[j];
  else
    displacements[n_local_entities] = this_slice->global_num_final + 1;

  /*
    Prepare send buffer:
    replace local connected entity numbers with their global counterparts.
    For rank 0, set final values directly.
  */

  if (local_rank == 0) {
    for (i = 0, j = local_index_start;
         i < n_local_entities;
         i++, j++) {
      for (k = 0; k < stride; k++)
        global_connect_s[displacements[i] + k]
          = connected_global_num[local_connect[j*stride + k] - 1];
    }
  }
  else {
    int n_local_values = n_local_entities * stride;
    for (i = 0, j = (fvm_gnum_t)(local_index_start * stride);
         i < n_local_values;
         i++, j++) {
      global_connect_s[i] = connected_global_num[local_connect[j] - 1];
    }
  }

  /* Gather connectivity information from ranks > 0 */

  if (local_rank == 0) {

    for (distant_rank = 1; distant_rank < n_ranks; distant_rank++) {

      /* Get index from distant rank */

      if (   this_slice->next_global_num[distant_rank] < global_num_end
          || this_slice->use_next_global_num == false) {

        if (this_slice->safe_mode == true) {
          MPI_Send(&distant_rank, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm);
          MPI_Recv(&n_distant_entities, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm, &status);
        }
        else {
          MPI_Probe(distant_rank, FVM_MPI_TAG, comm, &status);
          MPI_Get_count(&status, aint_type, &n_distant_entities);
        }

        MPI_Recv(displacements, n_distant_entities, aint_type,
                 distant_rank, FVM_MPI_TAG, comm, &status);

        n_distant_entities -= 1;
        this_slice->next_global_num_last[distant_rank]
          = displacements[n_distant_entities];

        if (n_distant_entities > 0) {

          /*
            Define indexed datatype so as to directly receive
            to correct location
          */

          MPI_Type_hindexed(n_distant_entities, blocklengths, displacements,
                            FVM_MPI_GNUM, &fvm_index_type);
          MPI_Type_commit(&fvm_index_type);

          /* Get portion of connectivity array from distant rank */

          MPI_Recv(global_connect_s, 1, fvm_index_type,
                   distant_rank, FVM_MPI_TAG, comm, &status);

          MPI_Type_free(&fvm_index_type);

        }

      }

    }

  }
  else {

    if (   n_local_entities > 0
        || this_slice->use_next_global_num == false) {

      /* Send local index to rank zero */

      if (this_slice->safe_mode == true) {
        int buf_val;
        MPI_Recv(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm, &status);
        buf_val = n_local_entities + 1;
        MPI_Send(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm);
      }

      MPI_Send(displacements, n_local_entities + 1, aint_type, 0,
               FVM_MPI_TAG, comm);

      /* Send local portion of connectivity array to rank zero */

      if (n_local_entities > 0)
        MPI_Send(global_connect_s, (int)(n_local_entities * stride),
                 FVM_MPI_GNUM, 0, FVM_MPI_TAG, comm);

    }

  }

#if 0 && defined(DEBUG) && !defined(NDEBUG)
  MPI_Barrier(comm);
#endif

}

/*----------------------------------------------------------------------------
 * Gather a given portion of an indexed array of numbers to rank 0.
 * If the connected_io_num argument is non-NULL, these numbers
 * are assumed to represent connectivity values, and are converted from
 * local to global values (both with 1 to n type numbering).
 * Otherwise, they are considered to represent any other type of positive
 * integer (such as the number of vertices for each of a polyhedron's faces).
 *
 * A slice_index[] array indicating the index (0 to n-1) of blocks in
 * the slice is required for rank 0. This implies that the block sizes in
 * the slice have already been gathered through the use of
 * fvm_gather_slice_index() or some similar method, and used to build this
 * slice index.
 *
 * This function is intended to be used within a loop on subsets of the global
 * lengths array (so as to enable writing to file or sending to an
 * external process without requiring the full array to reside in the process
 * directly handling I/O's memory). As such, it avoids allocating its own
 * working arrays (buffers), so that they may be allocated outside the loop
 * and reused for each call (avoiding the overhead associated with memory
 * allocation).
 *
 * All or most elements in a given portion may belong to a same process rank
 * (depending on mesh numbering and domain splitting). To account for
 * this, for each process rank, the global_lengths_s[] arrays must be large
 * enough to contain (slice_index[current_slice_size] - 1) values, even
 * though most processes will require less.
 * Use fvm_gather_resize_indexed_slice() to adjust current_slice_size.
 *
 * parameters:
 *   local_index      <-- local index array
 *   local_numbers    <-- local numbers array
 *   global_numbers_s --> global numbers array section for elements
 *                        slice global_num_start to global_num_end
 *                        (output for rank 0, working array only for others)
 *   connected_io_num <-- I/O numbering structure associated with "connected"
 *                        entities (i.e. vertices in a nodal connectivity)
 *   element_io_num   <-- I/O numbering structure associated with elements
 *   comm             <-- MPI communicator for structures considered
 *   slice_index      <-- index of blocks corresponding to a given
 *                        element in the global_numbers_s array
 *                        (required for rank 0 only)
 *   this_slice       <-> structure for management of slice status
 *----------------------------------------------------------------------------*/

void
fvm_gather_indexed_numbers(const fvm_lnum_t     local_index[],
                           const fvm_lnum_t     local_numbers[],
                           fvm_gnum_t           global_numbers_s[],
                           const fvm_io_num_t  *connected_io_num,
                           const fvm_io_num_t  *element_io_num,
                           MPI_Comm             comm,
                           const fvm_gnum_t     slice_index[],
                           fvm_gather_slice_t  *this_slice)
{
  int  i, j, k, l;
  int  n_local_entities, n_distant_entities;
  int  n_values_send = 0;
  fvm_lnum_t  local_index_start, local_index_stop;

  /* MPI related variables */
  MPI_Status status;
  MPI_Datatype fvm_index_type, displacement_type;
  int  distant_rank;
  const int local_rank = this_slice->local_rank;
  const int n_ranks = this_slice->n_ranks;
  int  *blocklengths = this_slice->blocklengths;
  MPI_Aint *const displacements = this_slice->displacements;
  const MPI_Datatype aint_type = this_slice->aint_type;

  /* Local number of elements */
  const fvm_lnum_t  local_index_end = this_slice->local_index_end;

  /* Global numbering */
  const fvm_gnum_t global_num_start = this_slice->global_num_slice_start;
  const fvm_gnum_t global_num_end = this_slice->global_num_slice_end;
  const fvm_gnum_t *connected_global_num = NULL;
  const fvm_gnum_t *entity_global_num
                      = fvm_io_num_get_global_num(element_io_num);

  if (connected_io_num != NULL)
    connected_global_num = fvm_io_num_get_global_num(connected_io_num);

  /* Initialize blocklengths and displacements; this operation
     requires a blocklengths[] array for all ranks (contrary
     to others), so it updates the slice structure accordingly */

  if (blocklengths == NULL) {
    BFT_MALLOC(this_slice->blocklengths, this_slice->ref_slice_size, int);
    blocklengths = this_slice->blocklengths;
  }

  local_index_start = this_slice->local_index_start;

  assert(   local_index_start >= local_index_end
         || entity_global_num[local_index_start] >= global_num_start);

  /* Displacements are first used to transfer the global slice index position
     of a given entity, and will be set to the index value later on rank 0 */

  if (sizeof(MPI_Aint) == sizeof(unsigned))
    displacement_type = MPI_UNSIGNED;
#if defined(MPI_UNSIGNED_LONG_LONG)
  else if (sizeof(MPI_Aint) == sizeof(unsigned long long))
    displacement_type = MPI_UNSIGNED_LONG_LONG;
#endif
  else
    displacement_type = MPI_UNSIGNED_LONG;

  for (i = 0, j = local_index_start;
       i < local_index_end && j < local_index_end
                           && entity_global_num[j] < global_num_end;
       i++, j++)
    displacements[i] = (MPI_Aint)(entity_global_num[j] - global_num_start);

  n_local_entities = i;
  local_index_stop = local_index_start + n_local_entities;
  this_slice->local_index_last = local_index_stop;

  if (local_index_stop < local_index_end)
    displacements[n_local_entities] = entity_global_num[j];
  else
    displacements[n_local_entities] = this_slice->global_num_final + 1;

  /*
    Prepare send buffer:
    For rank 0, set final values directly; for others, set blocklengths
    to be sent to rank 0.
  */

  if (connected_io_num == NULL) {

    if (local_rank == 0) {
      for (i = 0, j = local_index_start;
           i < n_local_entities;
           i++, j++) {
        displacements[i] = slice_index[displacements[i]];
        for (k = local_index[j], l = 0; k < local_index[j+1]; k++, l++)
          global_numbers_s[displacements[i] + l] = local_numbers[k];
      }
    }
    else {
      l = 0;
      for (i = 0, j = local_index_start;
           i < n_local_entities;
           i++, j++) {
        blocklengths[i] = local_index[j+1] - local_index[j];
        for (k = local_index[j]; k < local_index[j+1]; k++)
          global_numbers_s[l++] = local_numbers[k];
      }
      n_values_send = l;
    }

  }
  else {

    if (local_rank == 0) {
      for (i = 0, j = local_index_start;
           i < n_local_entities;
           i++, j++) {
        displacements[i] = slice_index[displacements[i]];
        for (k = local_index[j], l = 0; k < local_index[j+1]; k++, l++)
          global_numbers_s[displacements[i] + l]
            = connected_global_num[local_numbers[k] - 1];
      }
    }
    else {
      l = 0;
      for (i = 0, j = local_index_start;
           i < n_local_entities;
           i++, j++) {
        blocklengths[i] = local_index[j+1] - local_index[j];
        for (k = local_index[j]; k < local_index[j+1]; k++)
          global_numbers_s[l++] = connected_global_num[local_numbers[k] - 1];
      }
      n_values_send = l;
    }

  }

  /* Gather numbers information from ranks > 0 */

  if (local_rank == 0) {

    for (distant_rank = 1; distant_rank < n_ranks; distant_rank++) {

      /* Get index from distant rank */

      if (   this_slice->next_global_num[distant_rank] < global_num_end
          || this_slice->use_next_global_num == false) {

        if (this_slice->safe_mode == true) {
          MPI_Send(&distant_rank, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm);
          MPI_Recv(&n_distant_entities, 1, MPI_INT,
                   distant_rank, FVM_MPI_TAG, comm, &status);
        }
        else {
          MPI_Probe(distant_rank, FVM_MPI_TAG, comm, &status);
          MPI_Get_count(&status, aint_type, &n_distant_entities);
        }

      /* Get index from distant rank */

        MPI_Recv(displacements, n_distant_entities, displacement_type,
                 distant_rank, FVM_MPI_TAG, comm, &status);

        n_distant_entities -= 1;
        this_slice->next_global_num_last[distant_rank]
          = displacements[n_distant_entities];

        for (i = 0; i < n_distant_entities; i++) {
          j = displacements[i];
          blocklengths[i] = slice_index[j+1] - slice_index[j];
          displacements[i] = (MPI_Aint)(slice_index[j] * sizeof(fvm_gnum_t));
        }

        if (n_distant_entities > 0) {

          /*
            Define indexed datatype so as to directly receive
            to correct location
          */

          MPI_Type_hindexed(n_distant_entities, blocklengths, displacements,
                            FVM_MPI_GNUM, &fvm_index_type);
          MPI_Type_commit(&fvm_index_type);

          /* Get portion of numbers array from distant rank */

          MPI_Recv(global_numbers_s, 1, fvm_index_type,
                   distant_rank, FVM_MPI_TAG, comm, &status);

          MPI_Type_free(&fvm_index_type);

        }

      }

    }

  }
  else {

    if (   n_local_entities > 0
        || this_slice->use_next_global_num == false) {

      /* Send local index to rank zero */

      if (this_slice->safe_mode == true) {
        int buf_val;
        MPI_Recv(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm, &status);
        buf_val = n_local_entities + 1;
        MPI_Send(&buf_val, 1, MPI_INT,
                 0, FVM_MPI_TAG, comm);
      }
      MPI_Send(displacements, n_local_entities + 1, displacement_type,
               0, FVM_MPI_TAG, comm);

      /* Send local portion of numbers array to rank zero */

      if (n_local_entities > 0)
        MPI_Send(global_numbers_s, (int)n_values_send,
                 FVM_MPI_GNUM, 0, FVM_MPI_TAG, comm);

    }

  }

#if 0 && defined(DEBUG) && !defined(NDEBUG)
  MPI_Barrier(comm);
#endif

}

#endif /* defined (FVM_HAVE_MPI) */

/*----------------------------------------------------------------------------*/

#ifdef __cplusplus
}
#endif /* __cplusplus */


syntax highlighted by Code2HTML, v. 0.9.1