/* $CoreSDI: attr_mysql.c,v 1.10 2001/10/05 19:38:30 claudio Exp $ */ /* * Copyright (c) 2000, 2001, Core SDI S.A., Argentina * All rights reserved * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither name of the Core SDI S.A. nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* * MySQL attribute module - Compatibility with msyslog-1.xx * Author: Claudio Castiglia */ #include #if defined(__FreeBSD__) && (__FreeBSD_version >= 500035) #include #endif #ifdef __linux__ #include #else #include #endif #include #include #include #include #include #include #include #include "sysdep.h" #include "resource.h" #include "packet.h" #include "modtypes.h" #include "iaargs.h" #include "log.h" #define ATTR_MYSQL "attr mysql: " #define DATE "date" #define TIME "time" #define HOST "host" #define MESSAGE "message" #define SEQ_PREFIX "MySQLSeq_" #define SEQ_FIELDNAME "seq" typedef struct _mysql_ctx { char host[MAXHOSTNAMELEN]; u_int16_t port; char user[LINE_MAX]; char pass[LINE_MAX]; char dbase[LINE_MAX]; char table[LINE_MAX]; char logset_name[LINE_MAX]; int delayed; int32_t seq; } MYSQL_CTX; #define GET_MYSQL_CTX(c) (MYSQL_CTX *) ((ATTRCON *) c + 1) /* * _load_seq() * Load the last sequence number from user resources. */ static void _load_seq(ATTRCON *context) { char rname[MAXPATHLEN]; RESOURCE *rseq; MYSQL_CTX *mc; mc = GET_MYSQL_CTX(context); /* Get sequence number resource */ if (context->s->s_rlist != NULL) { snprintf(rname, sizeof(rname), SEQ_PREFIX "%s", mc->logset_name); rseq = res_find(context->s->s_rlist, rname); /* Convert resource */ if (rseq != NULL) { if (rseq->dsize == sizeof(int32_t)) { mc->seq = ntohl(*(int32_t *) rseq->data); return; } else log_warn(ATTR_MYSQL "%s resource data size " "not equal to %d. Ignoring.", rname, sizeof(int32_t)); } } mc->seq = 0; } /* * _save_seq(): * Save the current sequence number to the resources; * return 0 on success and -1 on error and set errno. */ static int _save_seq(ATTRCON *context) { char rname[MAXPATHLEN]; RESOURCE *rseq; MYSQL_CTX *mc; int32_t net_seq; int st; if (context->s->s_rlist != NULL) { /* Create resource data */ mc = GET_MYSQL_CTX(context); net_seq = htonl(mc->seq); /* If resource does not exists, create it */ snprintf(rname, sizeof(rname), SEQ_PREFIX "%s", mc->logset_name); rseq = res_find(context->s->s_rlist, rname); if (rseq == NULL) st = res_add(context->s->s_rlist, rname, &net_seq, sizeof(net_seq)); else st = res_replace(context->s->s_rlist, rseq, &net_seq, sizeof(net_seq)); } else st = 0; return (st); } /* * _set_members(): * Set context members data. */ int _set_members(MYSQL_CTX *mc, int opt, const char *data) { char *p; size_t size; switch (opt) { case 's': /* host[:port] */ size = sizeof(mc->host); p = mc->host; strsep(&p, ":"); if (p != NULL) { char *e, *i; i = p; mc->port = (int) strtoul(p, &e, 0); p = mc->host; if (*e == '\0') break; log_warn(ATTR_MYSQL "Invalid port '%s'. Using '%d'.", i, MYSQL_PORT); } mc->port = MYSQL_PORT; p = mc->host; break; case 'u': p = mc->user; size = sizeof(mc->user); break; case 'p': p = mc->pass; size = sizeof(mc->pass); break; case 'd': p = mc->dbase; size = sizeof(mc->dbase); break; case 't': p = mc->table; size = sizeof(mc->table); break; case 'D': mc->delayed = 1; default: return (0); } snprintf(p, size, "%s", data); return (1); } /* * _connect_to_server(): * Connect to the MySQL server and return a mysql pointer or NULL * on error. * After use mysql_close() should be called with the non-NULL pointer * as its argument. */ static MYSQL * _connect_to_server(ATTRCON *context) { MYSQL *h; MYSQL_CTX *mc; mc = GET_MYSQL_CTX(context); h = mysql_init(NULL); if (h == NULL) { errno = ENOMEM; /* See the mysql documentation */ log_err(ATTR_MYSQL "Can't initialize mysql api: %s.", strerror(errno)); } else if (!mysql_real_connect(h, mc->host, mc->user, mc->pass, mc->dbase, mc->port, NULL, 0)) { log_err(ATTR_MYSQL "Connection to server failed: %s.", mysql_error(h)); mysql_close(h); return (NULL); } return (h); } /* * _mysql_stat() * Retrieve the stat (data size and last sequence number) from * specified logset on the mysql server. * Return 0 on success and -1 on error (you should use the mysql * api to retrieve the error code or string). */ static int _mysql_stat(MYSQL_CTX *mc, MYSQL *mysqlh, int32_t *size, int32_t *last_seq, int *have_seq) { char buf[LINE_MAX]; MYSQL_RES *res; MYSQL_ROW row; int i; /* Get field names */ snprintf(buf, sizeof(buf), "SHOW COLUMNS FROM %s", mc->table); if (mysql_query(mysqlh, buf) || (res = mysql_store_result(mysqlh)) == NULL) return (-1); /* Search for the sequence field */ *have_seq = 0; while ( (row = mysql_fetch_row(res)) != NULL) { if (!strcmp((char *) row[0], SEQ_FIELDNAME)) { *have_seq = 1; break; } } mysql_free_result(res); /* Query data size and last sequence number (if any) */ snprintf(buf, sizeof(buf), "SELECT SUM(LENGTH(" DATE ")+LENGTH(" TIME ")+LENGTH(" HOST ")+LENGTH(" MESSAGE "))"); if (*have_seq) strlcat(buf, ",MAX(" SEQ_FIELDNAME ")", sizeof(buf)); i = strlen(buf); snprintf(buf + i, sizeof(buf) - i, " FROM %s", mc->table); if (*have_seq) { i = strlen(buf); snprintf(buf + i, sizeof(buf) - i, " WHERE " SEQ_FIELDNAME " > %d", mc->seq); } if (mysql_query(mysqlh, buf) || (res = mysql_store_result(mysqlh)) == NULL) return (-1); /* Get data size and last sequence number */ *size = 0; *last_seq = -1; if (mysql_num_rows(res) >= 1) { row = mysql_fetch_row(res); if (row != NULL) { if (row[0] != NULL && row[1] != NULL) { *size = (int32_t) strtoul(row[0], (char **) NULL, 0); *last_seq = (int32_t) ((*have_seq) ? strtoul(row[1], (char **) NULL, 0) : -1); } mysql_free_result(res); return (0); } } mysql_free_result(res); return (-1); } /* * _get_logset_name(): * Return logset name. */ static void _get_logset_name(ATTRCON *context, struct attrargs_ret *args) { MYSQL_CTX *mc; mc = GET_MYSQL_CTX(context); args->data = mc->logset_name; args->size = strlen(mc->logset_name);; } /* * _info(): * Return mysql logset information coded as a string: * "mysql * host * dbase * table * current seq * [delayed inserts]" */ static void _info(ATTRCON *context, struct attrargs_ret *args) { MYSQL_CTX *mc; mc = GET_MYSQL_CTX(context); /* Load last sequence number */ _load_seq(context); snprintf(args->data, args->size, "mysql\n" "\thost\t%s\n\tdbase\t%s\n\ttable\t%s\n\tcurrent seq\t%d\n%s", mc->host, mc->dbase, mc->table, mc->seq, mc->delayed ? "\tdelayed inserts\n" : ""); } /* * _get(): * Send logset to the client; * return 0 on success and -1 on error (the error is logged, * errno may not be set). */ static int _get(ATTRCON *context) { char buf[LINE_MAX]; MYSQL *mysqlh; MYSQL_CTX *mc; MYSQL_RES *res; MYSQL_ROW row; int32_t rows_no, size, last_seq, have_seq, i, j, status; mc = GET_MYSQL_CTX(context); log_debug(ATTR_MYSQL "Getting %s.", mc->logset_name); /* Initialize mysql */ mysqlh = _connect_to_server(context); if (mysqlh == NULL) { packet_put_string(context->s->s_packet, ""); return (-1); } /* Load sequence number from resources */ _load_seq(context); /* Load data size and last sequence number from the mysql server */ if (_mysql_stat(mc, mysqlh, &size, &last_seq, &have_seq) < 0) { log_err(ATTR_MYSQL "Can't stat '%s': %s.", mc->logset_name, mysql_error(mysqlh)); packet_put_string(context->s->s_packet, ""); mysql_close(mysqlh); return (-1); } /* Send logset name */ packet_put_string(context->s->s_packet, mc->logset_name); /* If there are no data to select, send nothing. */ if (size == 0) { packet_put_int32(context->s->s_packet, 0); mysql_close(mysqlh); return (0); } /* Query logs */ snprintf(buf, sizeof(buf), "SELECT " DATE "," TIME "," HOST "," MESSAGE " FROM %s", mc->table); if (last_seq >= 0) { i = strlen(buf); snprintf(buf + i, sizeof(buf) - i, " WHERE " SEQ_FIELDNAME " > %d", mc->seq); } if (mysql_query(mysqlh, buf) || (res = mysql_store_result(mysqlh)) == NULL) { log_err(ATTR_MYSQL "Query on '%s' failed: %s.", mc->logset_name, mysql_error(mysqlh)); if (res != NULL) mysql_free_result(res); mysql_close(mysqlh); packet_put_int32(context->s->s_packet, 0); return (-1); } /* Calculate data size to be sent to the other side */ status = 0; rows_no = mysql_num_rows(res); size += rows_no * 4; packet_put_int32(context->s->s_packet, size); while (rows_no--) { if ( (row = mysql_fetch_row(res)) != NULL) { buf[0] = '\0'; for (j = 0; j < 4 /* date, time, host, msg */; j++) { i = strlen(buf); snprintf(buf + i, sizeof(buf) - i, j < 3 ? "%s " : "%s\n", (char *) row[j]); } i = strlen(buf); size -= i; packet_put_raw(context->s->s_packet, buf, i); } else { log_err(ATTR_MYSQL "Can't fetch rows from '%s': %s. " "Aborting.", mc->logset_name, mysql_error(mysqlh)); /* Finish command. -i don't like this behaviour- */ for (i = 0; i < size; i++) packet_put_raw(context->s->s_packet, buf, 1); status = -1; } } mysql_free_result(res); mysql_close(mysqlh); /* Save last sequence number */ if (last_seq >= 0) mc->seq = last_seq; if (_save_seq(context) < 0) { log_warn(ATTR_MYSQL "Can't save current '%s' sequence number: " "%s.", mc->logset_name, strerror(errno)); return (-1); } return (status); } /* * _zap() */ static int _zap(ATTRCON *context, int last_call) { char buf[LINE_MAX]; MYSQL *mysqlh; MYSQL_CTX *mc; int32_t size, last_seq, have_seq; int status; mc = GET_MYSQL_CTX(context); log_debug(ATTR_MYSQL "Zapping %s, pass %d.", mc->logset_name, last_call); if (last_call) return (0); mysqlh = _connect_to_server(context); if (mysqlh == NULL) return (-1); /* Load sequence number from resources */ _load_seq(context); /* Load data size and last sequence number from the mysql server */ if (_mysql_stat(mc, mysqlh, &size, &last_seq, &have_seq) < 0) { log_err(ATTR_MYSQL "Can't stat '%s': %s.", mc->logset_name, mysql_error(mysqlh)); mysql_close(mysqlh); return (-1); } status = -1; /* Abort if table does not have sequence number */ if (!have_seq) log_err(ATTR_MYSQL "Missing sequence number on '%s'. " "Aborting.", mc->logset_name); /* Abort if last sequence on the table is less than current sequence */ else if (last_seq > 0 && last_seq < mc->seq) log_err(ATTR_MYSQL "Invalid sequence number on '%s'. Aborting.", mc->logset_name); /* Delete readed entries */ else { snprintf(buf, sizeof(buf), "DELETE FROM %s WHERE " SEQ_FIELDNAME " <= %d", mc->table, mc->seq); if (!mysql_query(mysqlh, buf)) { if (mc->seq == last_seq || last_seq <= 0) { /* * MySQL starts from sequence 1 * when all table is deleted: * the current sequence number * should be updated. */ mc->seq = 0; if (_save_seq(context) < 0) log_warn(ATTR_MYSQL "Can't save " "current sequence number: %s. " "Ignoring.", strerror(errno)); } status = 0; } else log_err(ATTR_MYSQL "Delete on '%s' failed: %s.", mc->logset_name, mysql_error(mysqlh)); } mysql_close(mysqlh); return (status); } /* * _sign() */ static int _sign(ATTRCON *context) { errno = EOPNOTSUPP; /* XXX */ log_err(ATTR_MYSQL "SIGN: %s.", strerror(errno)); return (-1); } /* * init(): * Initialize mysql module. */ ATTRCON * init(struct attrargs_init *args) { ATTRCON *context; MYSQL_CTX *mc; int i; log_debug(ATTR_MYSQL "Initializing."); if (args->argc < 1) { errno = EINVAL; log_err(ATTR_MYSQL "Can't initialize: %s.", strerror(errno)); return (NULL); } /* Create context and initiate its members */ context = (ATTRCON *) calloc(1, sizeof(ATTRCON) + sizeof(MYSQL_CTX)); if (context == NULL) { log_err(ATTR_MYSQL "Can't create context: %s.", strerror(errno)); return (NULL); } mc = GET_MYSQL_CTX(context); for (i = 1; i < args->argc; i++) i += _set_members(mc, *(args->argv[i] + 1), args->argv[i + 1]); snprintf(mc->logset_name, sizeof(mc->logset_name), "mysql.%s.%s.%s", mc->host, mc->dbase, mc->table); return (context); } /* * proc_entry() */ int proc_entry(int opcode, ATTRCON *context, void *args) { switch(opcode) { case ATTR_GET_LOGSET_NAME: _get_logset_name(context, args); case ATTR_FREEZE: return (0); case ATTR_INFO: _info(context, args); return (0); case ATTR_GET: return (_get(context)); case ATTR_ZAP: return (_zap(context, (int) args)); case ATTR_ROTATE: return (0); case ATTR_SIGN: return (_sign(context)); default: errno = EINVAL; } log_err(ATTR_MYSQL "Invalid '%d' command.", opcode); return (-1); }