/* * Copyright (c) 2002, 2004, 2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: t-ev-rcb-clt.c,v 1.14 2005/05/31 21:00:28 ca Exp $") #include "sm/assert.h" #include "sm/ctype.h" #include "sm/error.h" #include "sm/memops.h" #include "sm/heap.h" #include "sm/test.h" #include "sm/evthr.h" #include "sm/io.h" #include "sm/unixsock.h" #include "sm/rcb.h" #include "sm/check.h" #include "t-rcb.h" #include #define NSOCKET "./sockevthr1" #define WHAT_TERM 0 #define WHAT_CONT 1 #define MAX_CALLS 256 #define IOBUFSIZE 64 #define RCBSIZE 256 int Verbose = 0; struct t_ctx_S { sm_evthr_ctx_P ctx; int fd; int what; int status; int called; sm_rcb_P rcbr; sm_rcb_P rcbw; }; typedef struct t_ctx_S t_ctx_T, *t_ctx_P; #define TMO #include "t-rcb-sr.c" /* ** INPUTRCB -- read data for rcb ** ** Parameters: ** rcb -- RCB ** fd -- fd ** ** Returns: ** usual sm_error code */ #define WAITFORREPLY 1 sm_ret_T inputrcb(sm_rcb_P rcb) { int c; sm_ret_T ret; ret = sm_rcb_open_enc(rcb, RCBSIZE - 4); SM_TEST(sm_is_success(ret)); /* just a place holder for the size */ ret = sm_rcb_putuint32(rcb, 0); SM_TEST(sm_is_success(ret)); while ((c = getchar()) != EOF) { if (c == 'I') ret = addint(rcb); else if (c == 'S') ret = addstr(rcb); else { ret = sm_rcb_close_enc(rcb); SM_TEST(ret == SM_SUCCESS); if (c == 'W' && sm_is_success(ret)) { c = getchar(); ret = WAITFORREPLY; } break; } if (sm_is_err(ret)) { (void) sm_rcb_close_enc(rcb); break; } } if (c == EOF && sm_is_success(ret)) { (void) sm_rcb_close_enc(rcb); ret = EOF; } return ret; } /* ** FCT1 -- read/write RCB ** ** Parameters: ** tsk -- event thread task ** ** Returns: ** usual sm_error code */ sm_ret_T fct1(sm_evthr_task_P tsk) { t_ctx_P fctx; int fd, r, l; sm_ret_T ret, rv; SM_ASSERT(tsk != NULL); fctx = (t_ctx_P) tsk->evthr_t_actx; fd = fctx->fd; l = fctx->status--; fctx->called++; r = fctx->what; if (Verbose > 1) { fprintf(stderr, "fct1: called %lx, fd=%d, status=%d, what=%d, ev=%x\n", (long) tsk, fd, l, r, evthr_rqevents(tsk)); } if (r > 1) sleep(r - 1); if (fctx->called > MAX_CALLS) return EVTHR_TERM|EVTHR_DEL; rv = 0; if (fd >= 0) { if (evthr_got_rd(tsk)) { ret = rcvrcb(fctx->rcbr, fd, false); if (Verbose > 1) fprintf(stderr, "rcvrcb=%x\n", ret); if (ret < 0) return EVTHR_TERM|EVTHR_DEL; else if (ret > 0) return EVTHR_WAITQ; else { ret = sm_rcb_open_rcv(fctx->rcbr); SM_TEST(sm_is_success(ret)); rv |= evthr_r_yes(EVTHR_EV_WR); } } if (evthr_got_wr(tsk)) { ret = sm_rcb_snd(fd, fctx->rcbw); if (Verbose > 1) fprintf(stderr, "sm_rcb_snd=%x\n", ret); SM_TEST(ret >= 0); if (ret < 0) return EVTHR_TERM|EVTHR_DEL; if (ret > 0) return EVTHR_WAITQ; /* if (ret == 0) */ ret = sm_rcb_close_snd(fctx->rcbw); ret = inputrcb(fctx->rcbw); if (Verbose > 1) fprintf(stderr, "inputrcb=%x\n", ret); if (ret == WAITFORREPLY) { rv |= evthr_r_yes(EVTHR_EV_RD); rv |= evthr_r_no(EVTHR_EV_WR); } else if (ret == EOF) { /* terminate soon */ fctx->status = 2; rv |= evthr_r_no(EVTHR_EV_WR); } else rv |= evthr_r_yes(EVTHR_EV_RD|EVTHR_EV_WR); ret = sm_rcb_open_snd(fctx->rcbw); SM_TEST(sm_is_success(ret)); return EVTHR_WAITQ|rv; } } if (l <= 0) return EVTHR_TERM|EVTHR_DEL; switch (fctx->what) { case WHAT_TERM: return EVTHR_TERM|EVTHR_DEL; case WHAT_CONT: default: return EVTHR_WAITQ|rv; } /* NOTREACHED */ return EVTHR_TERM|EVTHR_DEL; } void testev(char *sockname, int what, int loops, int reps) { int fd; sm_ret_T ret; sm_evthr_ctx_P ctx; sm_evthr_task_P task; t_ctx_T tctx; struct timeval sleept; ret = thr_init(); SM_TEST(sm_is_success(ret)); sm_memzero(&sleept, sizeof(sleept)); ret = evthr_init(&ctx, 1, 6, 10); SM_TEST(sm_is_success(ret)); SM_TEST(ctx != NULL); fd = -1; tctx.called = 0; if (sockname != NULL) { (void) unix_client_connect(sockname, &fd); SM_TEST(fd >= 0); if (fd >= 0) { tctx.ctx = ctx; tctx.fd = fd; tctx.what = what; tctx.status = loops; tctx.rcbw = sm_rcb_new(NULL, RCBSIZE, RCBSIZE); SM_TEST(tctx.rcbw != NULL); if (tctx.rcbw == NULL) goto error; tctx.rcbr = sm_rcb_new(NULL, RCBSIZE, RCBSIZE); SM_TEST(tctx.rcbr != NULL); if (tctx.rcbr == NULL) goto error; ret = sm_rcb_open_rcv(tctx.rcbr); SM_TEST(sm_is_success(ret)); ret = sm_fd_nonblock(fd, true); SM_TEST(sm_is_success(ret)); ret = inputrcb(tctx.rcbw); SM_TEST(sm_is_success(ret)); ret = sm_rcb_open_snd(tctx.rcbw); SM_TEST(sm_is_success(ret)); ret = evthr_task_new(ctx, &task, EVTHR_EV_WR, fd, &sleept, fct1, (void *) &tctx); SM_TEST(sm_is_success(ret)); SM_TEST(task != NULL); } else fprintf(stderr, "unix_server_connect()=%d, errno=%d\n", fd, errno); } ret = evthr_loop(ctx); SM_TEST(sm_is_success(ret)); if (!sm_is_success(ret)) fprintf(stderr, "evthr_loop()=%x\n", ret); /* ** we should "hold" the system before deleting tasks? ** deleting the tasks while they are still in use ** will break things. */ error: if (fd >= 0) close(fd); SM_TEST(tctx.called > 0); if (reps > 0) SM_TEST(tctx.called == reps); if (Verbose > 0) { fprintf(stderr, "fcts=%d\n", tctx.called); } ret = evthr_stop(ctx); SM_TEST(sm_is_success(ret)); if (!sm_is_success(ret)) fprintf(stderr, "evthr_stop()=%x\n", ret); ret = thr_stop(); SM_TEST(sm_is_success(ret)); } void usage(const char *prg) { fprintf(stderr, "usage: %s [options] socket\n", prg); exit(0); } int main(int argc, char *argv[]) { int c, what, loops, reps; char *sockname, *prg; sockname = NULL; what = WHAT_CONT; loops = 16; reps = -1; prg = argv[0]; while ((c = getopt(argc, argv, "l:r:w:V")) != -1) { switch (c) { case 'l': loops = atoi(optarg); break; case 'r': reps = atoi(optarg); break; case 'w': what = atoi(optarg); break; case 'V': Verbose++; break; #if 0 default: usage(argv[0]); return(1); #endif /* 0 */ } } sm_test_begin(argc, argv, "test evthr"); argc -= optind; argv += optind; if (argc <= 0) usage(prg); sockname = argv[0]; testev(sockname, what, loops, reps); return sm_test_end(); }