#include "pargrid.h"
/*+ worker - Solve a sparse matrix problem associated with a grid
Input Parameters:
grid - the given grid
procinfo - the processor information (in BlockSolve format)
+*/
void worker(par_grid *grid, BSprocinfo *procinfo)
{
int i, j, k;
int *offset;
point *msg;
int max_msg_size;
int count;
BSspmat *A;
BSpar_mat *pA, *f_pA;
BScomm *Acomm, *f_comm;
FLOAT shifted_diag, residual;
int num_iter;
FLOAT *x;
FLOAT *rhs;
FLOAT t;
BSmsg_list *msg_list;
int send_id, dummy;
msg_list = NULL;
/* find the number of local grid points */
grid->local_total = grid->l_num_x*grid->l_num_y*grid->l_num_z;
/* determine the beginning number of my grid points in the global */
/* grid point numbering */
BSoffset(1,&(grid->local_total),&offset,procinfo); CHKERR(0);
grid->offset = (*offset);
FREE(offset);
/* determine the number of global grid points */
grid->global_total = grid->local_total;
GISUM(&(grid->global_total),1,&i,procinfo->procset);
/* determine the maximum number of grid points on a single processor */
j = grid->local_total;
GIMAX(&j,1,&i,procinfo->procset);
if (PSISROOT(procinfo)) {
printf("o Global total = %d, Global Max = %d\n",
grid->global_total,j);
}
/******************************************************************/
/* set up grid with ghost points */
/******************************************************************/
grid->points = (point ***) MALLOC(sizeof(point **)*(grid->l_num_x+2));
for (i=0;i<grid->l_num_x+2;i++) {
grid->points[i] = (point **) MALLOC(sizeof(point *)*(grid->l_num_y+2));
for (j=0;j<grid->l_num_y+2;j++) {
grid->points[i][j] = (point *) MALLOC(sizeof(point)*(grid->l_num_z+2));
for (k=0;k<grid->l_num_z+2;k++) {
grid->points[i][j][k].num = -1;
grid->points[i][j][k].type = -1;
}
}
}
/* number local part of grid */
count = 0;
for (i=1;i<grid->l_num_x+1;i++) {
for (j=1;j<grid->l_num_y+1;j++) {
for (k=1;k<grid->l_num_z+1;k++) {
grid->points[i][j][k].num = count + grid->offset;
grid->points[i][j][k].type = grid->type;
count++;
}
}
}
/******************************************************************/
/* exchange edge information with other processors */
/******************************************************************/
/* allocate a message */
max_msg_size = grid->l_num_x;
if (max_msg_size < grid->l_num_y) {
max_msg_size = grid->l_num_y;
}
if (max_msg_size < grid->l_num_z) {
max_msg_size = grid->l_num_z;
}
max_msg_size += 2;
max_msg_size *= (max_msg_size*sizeof(point));
msg = (point *) MALLOC(max_msg_size);
/* now send my east grid edge to the east */
MPI_Cart_shift(procinfo->procset,0,1,&dummy,&send_id);
Msend_border_msg(msg_list,grid->points,msg,EAST_MSG,send_id,
grid->l_num_x,grid->l_num_x,
0,grid->l_num_y+1,
0,grid->l_num_z+1,procinfo);
/* receive from the west */
Mrecv_border_msg(grid->points,EAST_MSG,
0,0,
0,grid->l_num_y+1,
0,grid->l_num_z+1,procinfo);
/* now send my west grid edge to the west */
MPI_Cart_shift(procinfo->procset,0,-1,&dummy,&send_id);
Msend_border_msg(msg_list,grid->points,msg,WEST_MSG,send_id,
1,1,
0,grid->l_num_y+1,
0,grid->l_num_z+1,procinfo);
/* receive from the east */
Mrecv_border_msg(grid->points,WEST_MSG,
grid->l_num_x+1,grid->l_num_x+1,
0,grid->l_num_y+1,
0,grid->l_num_z+1,procinfo);
/* now send my north grid edge to the north */
MPI_Cart_shift(procinfo->procset,1,1,&dummy,&send_id);
Msend_border_msg(msg_list,grid->points,msg,NORTH_MSG,send_id,
0,grid->l_num_x+1,
grid->l_num_y,grid->l_num_y,
0,grid->l_num_z+1,procinfo);
/* receive from the south */
Mrecv_border_msg(grid->points,NORTH_MSG,
0,grid->l_num_x+1,
0,0,
0,grid->l_num_z+1,procinfo);
/* now send my south grid edge to the south */
MPI_Cart_shift(procinfo->procset,1,-1,&dummy,&send_id);
Msend_border_msg(msg_list,grid->points,msg,SOUTH_MSG,send_id,
0,grid->l_num_x+1,
1,1,
0,grid->l_num_z+1,procinfo);
/* receive from the north */
Mrecv_border_msg(grid->points,SOUTH_MSG,
0,grid->l_num_x+1,
grid->l_num_y+1,grid->l_num_y+1,
0,grid->l_num_z+1,procinfo);
/* now send my upper grid edge up */
send_id = procinfo->my_id;
Msend_border_msg(msg_list,grid->points,msg,UP_MSG,send_id,
0,grid->l_num_x+1,
0,grid->l_num_y+1,
grid->l_num_z,grid->l_num_z,procinfo);
/* receive from below */
Mrecv_border_msg(grid->points,UP_MSG,
0,grid->l_num_x+1,
0,grid->l_num_y+1,
0,0,procinfo);
/* now send my lower grid edge down */
send_id = procinfo->my_id;
Msend_border_msg(msg_list,grid->points,msg,DOWN_MSG,send_id,
0,grid->l_num_x+1,
0,grid->l_num_y+1,
1,1,procinfo);
/* receive from above */
Mrecv_border_msg(grid->points,DOWN_MSG,
0,grid->l_num_x+1,
0,grid->l_num_y+1,
grid->l_num_z+1,grid->l_num_z+1,procinfo);
FINISH_SEND_LIST(msg_list);
FREE(msg);
/* check to make sure that the entire local grid is numbered */
for (i=0;i<grid->l_num_x+2;i++) {
for (j=0;j<grid->l_num_y+2;j++) {
for (k=0;k<grid->l_num_z+2;k++) {
if (grid->points[i][j][k].num == -1) {
printf("Proc %d: bad point %d %d %d\n",
procinfo->my_id,i,j,k);
}
}
}
}
/* now call the routines to set up the matrix */
A = get_mat(grid,procinfo);
/* Set symmetry and storage scheme to be used */
BSset_mat_symmetric(A,grid->symmetric);
BSset_mat_icc_storage(A,grid->icc_storage);
/* permute the matrix */
pA = BSmain_perm(procinfo,A); CHKERR(0);
/* diagonally scale the matrix */
BSscale_diag(pA,pA->diag,procinfo); CHKERR(0);
/* set up the communication structure for triangular matrix solution */
Acomm = BSsetup_forward(pA,procinfo); CHKERR(0);
/* now factor the matrix */
/* set the initial diagonal to 1.0 */
shifted_diag = 1.0;
/* get a copy of the sparse matrix */
f_pA = BScopy_par_mat(pA); CHKERR(0);
/* set up a communication structure for factorization */
f_comm = BSsetup_factor(f_pA,procinfo); CHKERR(0);
/* factor the matrix until successful */
while (BSfactor(f_pA,f_comm,procinfo) != 0) {
CHKERR(0);
/* recopy the nonzeroes */
BScopy_nz(pA,f_pA); CHKERR(0);
/* increment the diagonal shift */
shifted_diag += 0.1;
BSset_diag(f_pA,shifted_diag,procinfo); CHKERR(0);
}
CHKERR(0);
/* set up the rhs and the x vector */
rhs = (FLOAT *) MALLOC(sizeof(FLOAT)*A->num_rows);
x = (FLOAT *) MALLOC(sizeof(FLOAT)*A->num_rows);
t = A->global_num_rows;
t = 1.0/sqrt(t);
for (i=0;i<A->num_rows;i++) {
rhs[i] = t;
x[i] = 0.0;
}
/* solve it */
BSctx_set_max_it(procinfo,100);
BSctx_set_method(procinfo,CG);
BSctx_set_tol(procinfo,1.0e-5);
num_iter = BSpar_solve(pA,f_pA,Acomm,rhs,x,&residual,procinfo); CHKERR(0);
if (PSISROOT(procinfo)) {
printf("o Took %d iterations: residual = %e\n",num_iter,residual);
}
FREE(rhs);
FREE(x);
/* free the grid */
free_grid(grid);
/* free the spmat */
BSfree_spmat(A);
/* free the par mat, etc. */
BSfree_par_mat(pA);
BSfree_copy_par_mat(f_pA);
BSfree_comm(Acomm);
BSfree_comm(f_comm);
}
syntax highlighted by Code2HTML, v. 0.9.1