/* $CoreSDI: attr_pgsql.c,v 1.5 2001/10/05 19:38:30 claudio Exp $ */ /* * Copyright (c) 2000, 2001, Core SDI S.A., Argentina * All rights reserved * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither name of the Core SDI S.A. nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* * PGSQL attribute module - Compatibility with msyslog-1.xx * Author: Claudio Castiglia */ #include #if defined(__FreeBSD__) && (__FreeBSD_version >= 500035) #include #endif #ifdef __linux__ #include #else #include #endif #include #include #include #include #include #include #include #include "sysdep.h" #include "resource.h" #include "packet.h" #include "modtypes.h" #include "iaargs.h" #include "log.h" #ifndef DEF_PGPORT #define DEF_PGPORT "5432" /* From pgsql config.h */ #endif #define ATTR_PGSQL "attr pgsql: " #define DATE "date" #define TIME "time" #define HOST "host" #define MESSAGE "message" #define DATE_LEN 10 /* YYYY-MM-DD */ #define TIME_LEN 8 /* HH:MM:SS */ #define SEQ_PREFIX "PGSQLSeq_" #define SEQ_FIELDNAME "seq" typedef struct _pgsql_ctx { char host[MAXHOSTNAMELEN]; char port[6]; char user[LINE_MAX]; char pass[LINE_MAX]; char dbase[LINE_MAX]; char table[LINE_MAX]; char logset_name[LINE_MAX]; int32_t seq; } PGSQL_CTX; #define GET_PGSQL_CTX(c) (PGSQL_CTX *) ((ATTRCON *) c + 1) /* * _load_seq() * Load the last sequence number from user resources. */ static void _load_seq(ATTRCON *context) { char rname[MAXPATHLEN]; RESOURCE *rseq; PGSQL_CTX *pc; pc = GET_PGSQL_CTX(context); /* Get sequence number resource */ if (context->s->s_rlist != NULL) { snprintf(rname, sizeof(rname), SEQ_PREFIX "%s", pc->logset_name); rseq = res_find(context->s->s_rlist, rname); /* Convert resource */ if (rseq != NULL) { if (rseq->dsize == sizeof(int32_t)) { pc->seq = ntohl(*(int32_t *) rseq->data); return; } else log_warn(ATTR_PGSQL "%s resource data size " "not equal to %d. Ignoring.", rname, sizeof(int32_t)); } } pc->seq = 0; } /* * _save_seq(): * Save the current sequence number to the resources; * return 0 on success and -1 on error and set errno. */ static int _save_seq(ATTRCON *context) { char rname[MAXPATHLEN]; RESOURCE *rseq; PGSQL_CTX *pc; int32_t net_seq; int st; if (context->s->s_rlist != NULL) { /* Create resource data */ pc = GET_PGSQL_CTX(context); net_seq = htonl(pc->seq); /* If resource does not exists, create it */ snprintf(rname, sizeof(rname), SEQ_PREFIX "%s", pc->logset_name); rseq = res_find(context->s->s_rlist, rname); if (rseq == NULL) st = res_add(context->s->s_rlist, rname, &net_seq, sizeof(net_seq)); else st = res_replace(context->s->s_rlist, rseq, &net_seq, sizeof(net_seq)); } else st = 0; return (st); } /* * _set_members(): * Set context members data. */ int _set_members(PGSQL_CTX *pc, int opt, const char *data) { char *p; size_t size; switch (opt) { case 's': /* host[:port] */ size = sizeof(pc->host); p = pc->host; strsep(&p, ":"); if (p != NULL) { char *e, *i; int port; i = p; port = (int) strtoul(p, &e, 0); p = pc->host; if (*e == '\0') { snprintf(pc->port, sizeof(pc->port), "%d", port); break; } log_warn(ATTR_PGSQL "Invalid port '%s'. Using '%s'.", i, DEF_PGPORT); } strlcpy(pc->port, DEF_PGPORT, sizeof(pc->port)); p = pc->host; break; case 'u': p = pc->user; size = sizeof(pc->user); break; case 'p': p = pc->pass; size = sizeof(pc->pass); break; case 'd': p = pc->dbase; size = sizeof(pc->dbase); break; case 't': p = pc->table; size = sizeof(pc->table); break; case 'c': /* Not used. Leaved for compatibility */ default: return (0); } snprintf(p, size, "%s", data); return (1); } /* * _connect_to_server(): * Connect to the PGSQL server and return a PGconn pointer or NULL * on error. * After use PQfinish() should be called with the non-NULL pointer * as its argument. */ static PGconn * _connect_to_server(ATTRCON *context) { PGconn *h; PGSQL_CTX *pc; pc = GET_PGSQL_CTX(context); h = PQsetdbLogin(pc->host, pc->port, NULL, NULL, pc->dbase, pc->user, pc->pass); if (h == NULL) { errno = ENOMEM; /* See the pqlib documentation */ log_err(ATTR_PGSQL "Can't initialize pq api: %s.", strerror(errno)); } else if (PQstatus(h) == CONNECTION_BAD) { log_err(ATTR_PGSQL "Connection to server failed: %s.", PQerrorMessage(h)); PQfinish(h); return (NULL); } return (h); } /* * _pgsql_stat() * Retrieve the stat (data size and last sequence number) from * specified logset on the pgsql server. * Return 0 on success and -1 on error (you should use the pq api * to retrieve the error code or string). */ static int _pgsql_stat(PGSQL_CTX *pc, PGconn *pgsqlh, int32_t *size, int32_t *last_seq, int *have_seq) { char buf[LINE_MAX]; PGresult *res; size_t i; /* * If 'table_SEQ_FIELDNAME_seq' exists * then 'table' has a sequence field. */ snprintf(buf, sizeof(buf), "SELECT relname FROM pg_class WHERE " "relname LIKE '%s%%'", pc->table); if ( (res = PQexec(pgsqlh, buf)) == NULL) return (-1); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); return (-1); } snprintf(buf, sizeof(buf), "%s_" SEQ_FIELDNAME "_seq", pc->table); for (i = 0; i < PQntuples(res); i++) if (!strcmp(PQgetvalue(res, i, 0), buf)) break; *have_seq = (i == PQntuples(res)) ? 0 : 1; PQclear(res); /* Get last sequence number */ if (*have_seq) { snprintf(buf, sizeof(buf), "SELECT last_value FROM %s_" SEQ_FIELDNAME "_seq", pc->table); if ( (res = PQexec(pgsqlh, buf)) == NULL) return (-1); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); return (-1); } *last_seq = (int32_t) strtoul(PQgetvalue(res, 0, 0), (char **) NULL, 0); PQclear(res); } else *last_seq = 0; /* Query data size */ snprintf(buf, sizeof(buf), "SELECT COUNT(*),SUM(LENGTH(" HOST ")+LENGTH(" MESSAGE ")) FROM %s", pc->table); if (*have_seq) { i = strlen(buf); snprintf(buf + i, sizeof(buf) - i, " WHERE " SEQ_FIELDNAME " > %d", pc->seq); } if ( (res = PQexec(pgsqlh, buf)) == NULL) return (-1); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); return (-1); } /* Get data size */ *size = 0; if (PQntuples(res) >= 1) { *size = (DATE_LEN + TIME_LEN) * strtoul(PQgetvalue(res, 0, 0), (char **) NULL, 0) + strtoul(PQgetvalue(res, 0, 1), (char **) NULL, 0); PQclear(res); return (0); } PQclear(res); return (-1); } /* * _get_logset_name(): * Return logset name. */ static void _get_logset_name(ATTRCON *context, struct attrargs_ret *args) { PGSQL_CTX *pc; pc = GET_PGSQL_CTX(context); args->data = pc->logset_name; args->size = strlen(pc->logset_name);; } /* * _info(): * Return pgsql logset information coded as a string: * "pgsql * host * dbase * table * current seq " */ static void _info(ATTRCON *context, struct attrargs_ret *args) { PGSQL_CTX *pc; pc = GET_PGSQL_CTX(context); /* Load last sequence number */ _load_seq(context); snprintf(args->data, args->size, "pgsql\n" "\thost\t%s\n\tdbase\t%s\n\ttable\t%s\n\tcurrent seq\t%d\n", pc->host, pc->dbase, pc->table, pc->seq); } /* * _get(): * Send logset to the client; * return 0 on success and -1 on error (the error is logged, * errno may not be set). */ static int _get(ATTRCON *context) { char buf[LINE_MAX]; PGSQL_CTX *pc; PGconn *pgsqlh; PGresult *res; int32_t size, last_seq, have_seq, len, i; pc = GET_PGSQL_CTX(context); log_debug(ATTR_PGSQL "Getting %s.", pc->logset_name); /* Initialize pgsql */ pgsqlh = _connect_to_server(context); if (pgsqlh == NULL) { packet_put_string(context->s->s_packet, ""); return (-1); } /* Load sequence number from resources */ _load_seq(context); /* Load data size and last sequence number from the mysql server */ if (_pgsql_stat(pc, pgsqlh, &size, &last_seq, &have_seq) < 0) { log_err(ATTR_PGSQL "Can't stat '%s': %s.", pc->logset_name, PQerrorMessage(pgsqlh)); packet_put_string(context->s->s_packet, ""); PQfinish(pgsqlh); return (-1); } /* Send logset name */ packet_put_string(context->s->s_packet, pc->logset_name); /* If there are no data to select, send nothing. */ if (size == 0) { packet_put_int32(context->s->s_packet, 0); return (0); } /* Query logs */ snprintf(buf, sizeof(buf), "SELECT " DATE "," TIME "," HOST "," MESSAGE " FROM %s", pc->table); if (have_seq) { i = strlen(buf); snprintf(buf + i, sizeof(buf) - i, " WHERE " SEQ_FIELDNAME " > %d", pc->seq); } if ( (res = PQexec(pgsqlh, buf)) == NULL) { log_err(ATTR_PGSQL "Query on '%s' failed: %s.", pc->logset_name, PQerrorMessage(pgsqlh)); PQfinish(pgsqlh); packet_put_int32(context->s->s_packet, 0); return (-1); } /* Calculate data size to be sent to the other side */ size += PQntuples(res) * 4; packet_put_int32(context->s->s_packet, size); /* Send data */ for (i = 0; i < PQntuples(res); i++) { len = snprintf(buf, sizeof(buf), "%s %s %s %s\n", PQgetvalue(res, i, 0), PQgetvalue(res, i, 1), PQgetvalue(res, i, 2), PQgetvalue(res, i, 3)); packet_put_raw(context->s->s_packet, buf, len); } PQclear(res); PQfinish(pgsqlh); /* Save last sequence number */ if (last_seq >= 0) pc->seq = last_seq; if (_save_seq(context) < 0) { log_warn(ATTR_PGSQL "Can't save current '%s' sequence number: " "%s.", pc->logset_name, strerror(errno)); return (-1); } return (0); } /* * _zap() */ static int _zap(ATTRCON *context, int last_call) { char buf[LINE_MAX]; PGconn *pgsqlh; PGresult *res; PGSQL_CTX *pc; int32_t size, last_seq, have_seq; int status; pc = GET_PGSQL_CTX(context); log_debug(ATTR_PGSQL "Zapping %s, pass %d.", pc->logset_name, last_call); if (last_call) return (0); pgsqlh = _connect_to_server(context); if (pgsqlh == NULL) return (-1); /* Load sequence number from resources */ _load_seq(context); /* Load data size and last sequence number from the mysql server */ if (_pgsql_stat(pc, pgsqlh, &size, &last_seq, &have_seq) < 0) { log_err(ATTR_PGSQL "Can't stat '%s': %s.", pc->logset_name, PQerrorMessage(pgsqlh)); PQfinish(pgsqlh); return (-1); } status = -1; /* Abort if table does not have sequence number */ if (!have_seq) log_err(ATTR_PGSQL "Missing sequence number on '%s'. " "Aborting.", pc->logset_name); /* Abort if last sequence on the table is less than current sequence */ else if (last_seq > 0 && last_seq < pc->seq) log_err(ATTR_PGSQL "Invalid sequence number on '%s'. Aborting.", pc->logset_name); /* Delete readed entries */ else { snprintf(buf, sizeof(buf), "DELETE FROM %s WHERE " SEQ_FIELDNAME " <= %d", pc->table, pc->seq); if ( (res = PQexec(pgsqlh, buf)) != NULL) { /* * XXX: PGSQL stores the last sequence number on * 'table_seq_seq' in the 'last_sequence' field; * PGSQL does not starts from sequence 1 when all * table is deleted. * What happens when seq number gets the 'max_value' ? */ if (_save_seq(context) < 0) log_warn(ATTR_PGSQL "Can't save current sequence number: %s. " "Ignoring.", strerror(errno)); status = 0; } else log_err(ATTR_PGSQL "Zap on '%s' failed: %s.", pc->logset_name, PQerrorMessage(pgsqlh)); } PQfinish(pgsqlh); return (status); } /* * _sign() */ static int _sign(ATTRCON *context) { errno = EOPNOTSUPP; /* XXX */ log_err(ATTR_PGSQL "SIGN: %s.", strerror(errno)); return (-1); } /* * init(): * Initialize mysql module. */ ATTRCON * init(struct attrargs_init *args) { ATTRCON *context; PGSQL_CTX *pc; int i; log_debug(ATTR_PGSQL "Initializing."); if (args->argc < 1) { errno = EINVAL; log_err(ATTR_PGSQL "Can't initialize: %s.", strerror(errno)); return (NULL); } /* Create context and initiate its members */ context = (ATTRCON *) calloc(1, sizeof(ATTRCON) + sizeof(PGSQL_CTX)); if (context == NULL) { log_err(ATTR_PGSQL "Can't create context: %s.", strerror(errno)); return (NULL); } pc = GET_PGSQL_CTX(context); for (i = 1; i < args->argc; i++) i += _set_members(pc, *(args->argv[i] + 1), args->argv[i + 1]); snprintf(pc->logset_name, sizeof(pc->logset_name), "pgsql.%s.%s.%s", pc->host, pc->dbase, pc->table); return (context); } /* * proc_entry() */ int proc_entry(int opcode, ATTRCON *context, void *args) { switch(opcode) { case ATTR_GET_LOGSET_NAME: _get_logset_name(context, args); case ATTR_FREEZE: return (0); case ATTR_INFO: _info(context, args); return (0); case ATTR_GET: return (_get(context)); case ATTR_ZAP: return (_zap(context, (int) args)); case ATTR_ROTATE: return (0); case ATTR_SIGN: return (_sign(context)); default: errno = EINVAL; } log_err(ATTR_PGSQL "Invalid '%d' command.", opcode); return (-1); }