/* ====================================================================
* The Kannel Software License, Version 1.0
*
* Copyright (c) 2001-2005 Kannel Group
* Copyright (c) 1998-2001 WapIT Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Kannel Group (http://www.kannel.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Kannel" and "Kannel Group" must not be used to
* endorse or promote products derived from this software without
* prior written permission. For written permission, please
* contact org@kannel.org.
*
* 5. Products derived from this software may not be called "Kannel",
* nor may "Kannel" appear in their name, without prior written
* permission of the Kannel Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
* OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
* OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Kannel Group. For more information on
* the Kannel Group, please see .
*
* Portions of this software are based upon software originally written at
* WapIT Ltd., Helsinki, Finland for the Kannel project.
*/
#include
#include "gwlib/gwlib.h"
#include "gw/smsc/smpp_pdu.h"
#include
/***********************************************************************
* Configurable stuff.
*/
/*
* The port at which our HTTP server emulator listens.
*/
static long http_port = 8080;
/*
* The HTTP admin port and password for Kannel, needed to do shutdown.
*/
static long admin_port = 13000;
static char *admin_password = "bar";
/*
* The port at which the SMPP SMS center emulator listens.
*/
static long smpp_port = 2345;
/*
* Number of messages to use in the "Send N messages as fast as possible"
* benchmark.
*/
static long num_messages = 1;
/***********************************************************************
* Events and event queues.
*/
typedef List EventQueue;
typedef struct Event {
enum event_type {
got_smsc,
deliver,
deliver_ack,
http_request,
http_response,
submit,
got_enquire_link
} type;
long id;
long time;
Connection *conn; /* SMPP: Connection for response PDU */
long sequence_number; /* SMPP: Sequence number of resp PDU */
/* HTTP related stuff */
HTTPClient *client;
Octstr *body;
} Event;
static Counter *event_id_counter = NULL;
static const char *eq_type(Event *e)
{
#define TYPE(name) case name: return #name;
switch (e->type) {
TYPE(got_smsc)
TYPE(deliver)
TYPE(deliver_ack)
TYPE(http_request)
TYPE(http_response)
TYPE(submit)
TYPE(got_enquire_link)
}
#undef TYPE
return "unknown";
}
static Event *eq_create_event(enum event_type type)
{
Event *e;
e = gw_malloc(sizeof(*e));
e->type = type;
e->time = date_universal_now();
e->id = counter_increase(event_id_counter);
e->conn = NULL;
e->sequence_number = -1;
e->client = NULL;
e->body = NULL;
return e;
}
static Event *eq_create_submit(Connection *conn, long sequence_number,
Octstr *body)
{
Event *e;
gw_assert(conn != NULL);
gw_assert(sequence_number >= 0);
e = eq_create_event(submit);
e->conn = conn;
e->sequence_number = sequence_number;
e->body = octstr_duplicate(body);
return e;
}
static Event *eq_create_http_request(HTTPClient *client, Octstr *body)
{
Event *e;
gw_assert(client != NULL);
gw_assert(body != NULL);
e = eq_create_event(http_request);
e->client = client;
e->body = octstr_duplicate(body);
return e;
}
static void eq_destroy_event(Event *e)
{
octstr_destroy(e->body);
gw_free(e);
}
static EventQueue *eq_create(void)
{
return gwlist_create();
}
static void eq_add_producer(EventQueue *eq)
{
gwlist_add_producer(eq);
}
static void eq_remove_producer(EventQueue *eq)
{
gwlist_remove_producer(eq);
}
static void eq_destroy(EventQueue *eq)
{
gwlist_destroy(eq, NULL);
}
static void eq_append(EventQueue *eq, Event *e)
{
gwlist_produce(eq, e);
}
static Event *eq_extract(EventQueue *eq)
{
return gwlist_consume(eq);
}
static void eq_log(Event *e)
{
info(0, "Event %ld, type %s, time %ld", e->id, eq_type(e), e->time);
}
static void eq_init(void)
{
event_id_counter = counter_create();
}
static void eq_shutdown(void)
{
counter_destroy(event_id_counter);
}
static long eq_round_trip_time(Event *e)
{
long now, then;
now = date_universal_now();
if (octstr_parse_long(&then, e->body, 0, 10) == -1)
return 0;
return now - then;
}
/***********************************************************************
* SMS center emulator, declarations.
*/
struct smsc_emu_arg {
Semaphore *sema;
EventQueue *eq;
};
static EventQueue *undelivered_messages = NULL;
/***********************************************************************
* SMS center emulator, SMPP internals.
*/
enum { MAX_THREADS = 2 };
enum { SMPP_MAX_QUEUE = 10 };
struct smpp_emu_arg {
EventQueue *eq;
Connection *conn;
long id;
Semaphore *ok_to_send;
long writer_id;
int quit;
};
static Counter *smpp_emu_counter = NULL;
static void smpp_emu_writer(void *arg)
{
Event *e;
SMPP_PDU *pdu;
Octstr *os;
struct smpp_emu_arg *p;
p = arg;
for (;;) {
semaphore_down(p->ok_to_send);
e = eq_extract(undelivered_messages);
if (e == NULL)
break;
e->time = date_universal_now();
eq_log(e);
pdu = smpp_pdu_create(deliver_sm,
counter_increase(smpp_emu_counter));
pdu->u.deliver_sm.source_addr = octstr_create("123");
pdu->u.deliver_sm.destination_addr = octstr_create("456");
pdu->u.deliver_sm.short_message = octstr_format("%ld", e->time);
os = smpp_pdu_pack(pdu);
conn_write(p->conn, os);
octstr_destroy(os);
smpp_pdu_destroy(pdu);
eq_destroy_event(e);
}
}
static void smpp_emu_handle_pdu(struct smpp_emu_arg *p, SMPP_PDU *pdu)
{
SMPP_PDU *resp;
Octstr *os;
resp = NULL;
switch (pdu->type) {
case bind_transmitter:
resp = smpp_pdu_create(bind_transmitter_resp,
pdu->u.bind_transmitter.sequence_number);
break;
case bind_receiver:
resp = smpp_pdu_create(bind_receiver_resp,
pdu->u.bind_receiver.sequence_number);
eq_append(p->eq, eq_create_event(got_smsc));
gw_assert(p->writer_id == -1);
p->writer_id = gwthread_create(smpp_emu_writer, p);
if (p->writer_id == -1)
panic(0, "Couldn't create SMPP helper thread.");
break;
case submit_sm:
eq_append(p->eq,
eq_create_submit(p->conn, pdu->u.submit_sm.sequence_number,
pdu->u.submit_sm.short_message));
break;
case deliver_sm_resp:
eq_append(p->eq, eq_create_event(deliver_ack));
semaphore_up(p->ok_to_send);
break;
case enquire_link:
eq_append(p->eq, eq_create_event(got_enquire_link));
resp = smpp_pdu_create(enquire_link_resp,
pdu->u.enquire_link.sequence_number);
break;
case unbind:
resp = smpp_pdu_create(unbind_resp,
pdu->u.unbind.sequence_number);
break;
default:
error(0, "SMPP: Unhandled PDU type %s", pdu->type_name);
break;
}
if (resp != NULL) {
os = smpp_pdu_pack(resp);
conn_write(p->conn, os);
octstr_destroy(os);
smpp_pdu_destroy(resp);
}
}
static void smpp_emu_reader(void *arg)
{
Octstr *os;
long len;
SMPP_PDU *pdu;
struct smpp_emu_arg *p;
p = arg;
len = 0;
while (!p->quit && conn_wait(p->conn, -1.0) != -1) {
for (;;) {
if (len == 0) {
len = smpp_pdu_read_len(p->conn);
if (len == -1) {
error(0, "Client sent garbage, closing connection.");
goto error;
} else if (len == 0) {
if (conn_eof(p->conn) || conn_error(p->conn))
goto error;
break;
}
}
gw_assert(len > 0);
os = smpp_pdu_read_data(p->conn, len);
if (os != NULL) {
len = 0;
pdu = smpp_pdu_unpack(os);
if (pdu == NULL) {
error(0, "PDU unpacking failed!");
octstr_dump(os, 0);
} else {
smpp_emu_handle_pdu(p, pdu);
smpp_pdu_destroy(pdu);
}
octstr_destroy(os);
} else if (conn_eof(p->conn) || conn_error(p->conn))
goto error;
else
break;
}
}
error:
if (p->writer_id != -1)
gwthread_join(p->writer_id);
}
static void smpp_emu(void *arg)
{
EventQueue *eq;
struct smsc_emu_arg *p;
int fd;
int new_fd;
Octstr *client_addr;
long i;
long num_threads;
struct smpp_emu_arg *thread[MAX_THREADS];
p = arg;
eq = p->eq;
eq_add_producer(eq);
semaphore_up(p->sema);
/*
* Wait for SMPP clients.
*/
fd = make_server_socket(smpp_port, NULL);
if (fd == -1)
panic(0, "Couldn't create SMPP listen port.");
num_threads = 0;
for (;;) {
new_fd = gw_accept(fd, &client_addr);
if (new_fd == -1)
break;
octstr_destroy(client_addr);
if (num_threads == MAX_THREADS) {
warning(0, "Too many SMPP client connections.");
(void) close(new_fd);
} else {
thread[num_threads] = gw_malloc(sizeof(*thread[0]));
thread[num_threads]->conn = conn_wrap_fd(new_fd, 0);
thread[num_threads]->eq = eq;
thread[num_threads]->quit = 0;
thread[num_threads]->writer_id = -1;
thread[num_threads]->ok_to_send =
semaphore_create(SMPP_MAX_QUEUE);
thread[num_threads]->id =
gwthread_create(smpp_emu_reader, thread[num_threads]);
if (thread[num_threads]->id == -1)
panic(0, "Couldn't start SMPP subthread.");
++num_threads;
}
}
for (i = 0; i < num_threads; ++i) {
thread[i]->quit = 1;
gwthread_wakeup(thread[i]->id);
gwthread_join(thread[i]->id);
conn_destroy(thread[i]->conn);
semaphore_destroy(thread[i]->ok_to_send);
gw_free(thread[i]);
}
eq_remove_producer(eq);
}
/***********************************************************************
* SMS center emulator, generic interface.
*/
static long smpp_emu_id = -1;
/*
* Start all SMS center emulators.
*/
static void smsc_emu_create(EventQueue *eq)
{
struct smsc_emu_arg *arg;
gw_assert(smpp_emu_id == -1);
arg = gw_malloc(sizeof(*arg));
arg->sema = semaphore_create(0);
arg->eq = eq;
smpp_emu_id = gwthread_create(smpp_emu, arg);
if (smpp_emu_id == -1)
panic(0, "Couldn't start SMPP emulator thread.");
semaphore_down(arg->sema);
semaphore_destroy(arg->sema);
gw_free(arg);
}
static void smsc_emu_destroy(void)
{
eq_remove_producer(undelivered_messages);
gw_assert(smpp_emu_id != -1);
gwthread_wakeup(smpp_emu_id);
gwthread_join(smpp_emu_id);
}
static void smsc_emu_deliver(void)
{
eq_append(undelivered_messages, eq_create_event(deliver));
}
static void smsc_emu_submit_ack(Event *e)
{
SMPP_PDU *resp;
Octstr *os;
resp = smpp_pdu_create(submit_sm_resp, e->sequence_number);
os = smpp_pdu_pack(resp);
conn_write(e->conn, os);
octstr_destroy(os);
smpp_pdu_destroy(resp);
}
static void smsc_emu_init(void)
{
smpp_emu_counter = counter_create();
undelivered_messages = eq_create();
eq_add_producer(undelivered_messages);
}
static void smsc_emu_shutdown(void)
{
counter_destroy(smpp_emu_counter);
eq_destroy(undelivered_messages);
}
/***********************************************************************
* HTTP server emulator.
*/
static List *httpd_emu_headers = NULL;
struct httpd_emu_arg {
int port;
Semaphore *sema;
EventQueue *eq;
};
/*
* This is the HTTP server emulator thread.
*/
static void httpd_emu(void *arg)
{
HTTPClient *client;
Octstr *ip;
Octstr *url;
List *headers;
Octstr *body;
List *cgivars;
struct httpd_emu_arg *p;
EventQueue *eq;
p = arg;
eq = p->eq;
eq_add_producer(eq);
semaphore_up(p->sema);
for (;;) {
client = http_accept_request(p->port, &ip, &url, &headers, &body,
&cgivars);
if (client == NULL)
break;
eq_append(eq, eq_create_http_request(client,
http_cgi_variable(cgivars, "arg")));
octstr_destroy(ip);
octstr_destroy(url);
http_destroy_headers(headers);
octstr_destroy(body);
http_destroy_cgiargs(cgivars);
}
eq_remove_producer(eq);
gw_free(p);
}
/*
* Thread id for HTTP server emulator thread. It is needed for proper
* shutdown.
*/
static long httpd_emu_tid = -1;
/*
* Start the HTTP server emulator thread and return when it is
* ready to accept clients.
*/
static void httpd_emu_create(EventQueue *eq)
{
struct httpd_emu_arg *arg;
int ssl = 0; /* indicate if SSL-enabled server should be used */
if (http_open_port(http_port, ssl) == -1)
panic(0, "Can't open HTTP server emulator port %ld.", http_port);
gw_assert(httpd_emu_tid == -1);
arg = gw_malloc(sizeof(*arg));
arg->port = http_port;
arg->sema = semaphore_create(0);
arg->eq = eq;
httpd_emu_tid = gwthread_create(httpd_emu, arg);
if (httpd_emu_tid == -1)
panic(0, "Can't start the HTTP server emulator thread.");
semaphore_down(arg->sema);
semaphore_destroy(arg->sema);
}
/*
* Terminate the HTTP server emulator thread. Return when the thread
* is quite dead.
*/
static void httpd_emu_destroy(void)
{
gw_assert(httpd_emu_tid != -1);
http_close_all_ports();
gwthread_join(httpd_emu_tid);
httpd_emu_tid = -1;
}
/*
* Send a reply to an HTTP response.
*/
static void httpd_emu_reply(Event *e)
{
http_send_reply(e->client, HTTP_OK, httpd_emu_headers, e->body);
}
static void httpd_emu_init(void)
{
httpd_emu_headers = http_create_empty_headers();
http_header_add(httpd_emu_headers, "Content-Type", "text/plain");
}
static void httpd_emu_shutdown(void)
{
http_destroy_headers(httpd_emu_headers);
}
/***********************************************************************
* Main program for N SMS messages benchmark.
*/
static void kill_kannel(void)
{
Octstr *url;
Octstr *final_url;
List *req_headers;
List *reply_headers;
Octstr *reply_body;
int ret;
url = octstr_format("http://localhost:%ld/shutdown?password=%s",
admin_port, admin_password);
req_headers = http_create_empty_headers();
http_header_add(req_headers, "Content-Type", "text/plain");
ret = http_get_real(HTTP_METHOD_GET, url, req_headers, &final_url,
&reply_headers, &reply_body);
if (ret != -1) {
octstr_destroy(final_url);
http_destroy_headers(reply_headers);
octstr_destroy(reply_body);
}
octstr_destroy(url);
http_destroy_headers(req_headers);
}
/*
* This will try to have as large a sustained level of traffic as possible.
*/
enum { MAX_IN_AVERAGE = 100 };
enum { MAX_RTT = 1 };
enum { MAX_WAITING = 100 };
static void sustained_level_benchmark(void)
{
EventQueue *eq;
Event *e;
long i;
long num_deliver;
long num_submit;
long rtt;
long times[MAX_IN_AVERAGE];
long next_time;
double time_sum;
long num_unanswered;
eq = eq_create();
httpd_emu_create(eq);
smsc_emu_create(eq);
/* Wait for an SMS center client to appear. */
while ((e = eq_extract(eq)) != NULL && e->type != got_smsc)
debug("test_smsc", 0, "Discarding event of type %s", eq_type(e));
debug("test_smsc", 0, "Got event got_smsc.");
eq_destroy_event(e);
/*
* Send message when there are at most MAX_WAITING unanswered messages
* and current average round trip time is less than MAX_RTT.
*/
num_submit = 0;
for (i = 0; i < MAX_IN_AVERAGE; ++i)
times[i] = 0;
next_time = 0;
time_sum = 0.0;
num_unanswered = 0;
num_deliver = 0;
while (num_submit < num_messages) {
for (;;) {
if (num_deliver >= num_messages || num_unanswered >= MAX_WAITING)
break;
if (time_sum / MAX_IN_AVERAGE >= MAX_RTT && num_unanswered > 0)
break;
smsc_emu_deliver();
++num_unanswered;
++num_deliver;
}
e = eq_extract(eq);
if (e == NULL)
break;
eq_log(e);
switch (e->type) {
case deliver_ack:
break;
case http_request:
httpd_emu_reply(e);
break;
case submit:
rtt = eq_round_trip_time(e);
time_sum -= times[next_time];
times[next_time] = rtt;
time_sum += times[next_time];
debug("", 0, "RTT = %ld", rtt);
next_time = (next_time + 1) % MAX_IN_AVERAGE;
++num_submit;
--num_unanswered;
smsc_emu_submit_ack(e);
break;
case got_enquire_link:
break;
default:
debug("test_smsc", 0, "Ignoring event of type %s", eq_type(e));
break;
}
eq_destroy_event(e);
}
kill_kannel();
debug("test_smsc", 0, "Terminating benchmark.");
smsc_emu_destroy();
httpd_emu_destroy();
eq_destroy(eq);
}
/*
* This will send `num_messages' SMS messages as quickly as possible.
*/
enum { MAX_IN_QUEUE = 1000 };
static void n_messages_benchmark(void)
{
EventQueue *eq;
Event *e;
long i;
long num_submit;
long num_in_queue;
long num_deliver;
eq = eq_create();
httpd_emu_create(eq);
smsc_emu_create(eq);
/* Wait for an SMS center client to appear. */
while ((e = eq_extract(eq)) != NULL && e->type != got_smsc)
debug("test_smsc", 0, "Discarding event of type %s", eq_type(e));
debug("test_smsc", 0, "Got event got_smsc.");
eq_destroy_event(e);
/* Send the SMS messages, or at least fill the send queue. */
for (i = 0; i < num_messages && i < MAX_IN_QUEUE; ++i)
smsc_emu_deliver();
num_in_queue = i;
num_deliver = i;
/*
* Wait for results to be processed. When send queue is not full,
* fill it.
*/
num_submit = 0;
while (num_submit < num_messages && (e = eq_extract(eq)) != NULL) {
while (num_deliver < num_messages && num_in_queue < MAX_IN_QUEUE) {
smsc_emu_deliver();
++num_in_queue;
++num_deliver;
}
eq_log(e);
switch (e->type) {
case deliver_ack:
break;
case http_request:
httpd_emu_reply(e);
break;
case submit:
debug("", 0, "RTT = %ld", eq_round_trip_time(e));
smsc_emu_submit_ack(e);
++num_submit;
--num_in_queue;
break;
case got_enquire_link:
break;
default:
debug("test_smsc", 0, "Ignoring event of type %s", eq_type(e));
break;
}
eq_destroy_event(e);
}
kill_kannel();
debug("test_smsc", 0, "Terminating benchmark.");
smsc_emu_destroy();
httpd_emu_destroy();
eq_destroy(eq);
}
/***********************************************************************
* Main program.
*/
int main(int argc, char **argv)
{
int opt;
char *main_name;
int i;
static struct {
char *name;
void (*func)(void);
} tab[] = {
{ "n_messages", n_messages_benchmark },
{ "sustained_level", sustained_level_benchmark },
};
gwlib_init();
eq_init();
httpd_emu_init();
smsc_emu_init();
main_name = "n_messages_benchmark";
while ((opt = getopt(argc, argv, "m:r:")) != EOF) {
switch (opt) {
case 'm':
main_name = optarg;
break;
case 'r':
num_messages = atoi(optarg);
break;
}
}
for (i = 0; (size_t) i < sizeof(tab) / sizeof(tab[0]); ++i) {
if (strcmp(main_name, tab[i].name) == 0) {
tab[i].func();
break;
}
}
smsc_emu_shutdown();
httpd_emu_shutdown();
eq_shutdown();
gwlib_shutdown();
return 0;
}