/*
    Copyright (c) 1998--2006 Benhur Stein
    
    This file is part of Pajé.

    Pajé is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by the
    Free Software Foundation; either version 2 of the License, or (at your
    option) any later version.

    Pajé is distributed in the hope that it will be useful, but WITHOUT ANY
    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
    for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with Pajé; if not, write to the Free Software Foundation, Inc.,
	51 Franklin Street, Fifth Floor, Boston, MA 02111 USA.
*/



#include <sys/time.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/param.h>  /* for MAXHOSTNAMELEN */

#include "rastro_public.h"
#include "rastro_private.h"


/************************FUNCOES INTERNAS DA BIBLIOTECA*****************************/

// Le um evento do buffer
static char *trd_event(timestamp_t *hora_global, rst_event_t *evento, char *ptr)
{
    u_int32_t header;
    char field_types[100];
    int field_ctr = 0;
    int i;
    int fields_in_header;
  
    header = RST_GET(ptr, u_int32_t);

    evento->type = header >> 18;

    evento->ct.n_uint64 = evento->ct.n_uint16 = evento->ct.n_double = evento->ct.n_uint32 = 0;
    evento->ct.n_float = evento->ct.n_string = evento->ct.n_uint8 = 0;

    if (header & RST_TIME_SET) {
        long long x;
        x = (long long) RST_GET(ptr, long);
        *hora_global = x * RST_CLOCK_RESOLUTION;
    }
    evento->timestamp = (long long) RST_GET(ptr, long);

    evento->timestamp += *hora_global;

    fields_in_header = RST_FIELDS_IN_FIRST;
    for (;;) {
        for (i = 0; i < fields_in_header; i++) {
            char type;
            int bits_to_shift = RST_BITS_PER_FIELD * (fields_in_header - i - 1);
            type = (header>>bits_to_shift) & RST_FIELD_MASK;
            if (type == RST_NO_TYPE) {
                break;
            }
            field_types[field_ctr++] = type;
        }
        if (!((header >> (fields_in_header * RST_BITS_PER_FIELD)) & RST_LAST)) {
            // there are more headers -- process next one
            header = RST_GET(ptr, u_int32_t);
            fields_in_header = RST_FIELDS_IN_OTHERS;
        } else {
            // it was last header -- stop
            break;
        }
    }
    for (i = 0; i < field_ctr; i++) {
        switch (field_types[i]) {
        case RST_STRING_TYPE:
            RST_GET_STR(ptr, evento->v_string[evento->ct.n_string++]);
            break;
        case RST_DOUBLE_TYPE:
            evento->v_double[evento->ct.n_double++] = RST_GET(ptr, double);
            break;
        case RST_LONG_TYPE:
            evento->v_uint64[evento->ct.n_uint64++] = RST_GET(ptr, u_int64_t);
            break;
        case RST_FLOAT_TYPE:
            evento->v_float[evento->ct.n_float++] = RST_GET(ptr, float);
            break;
        case RST_INT_TYPE:
            evento->v_uint32[evento->ct.n_uint32++] = RST_GET(ptr, u_int32_t);
            break;
        case RST_SHORT_TYPE:
            evento->v_uint16[evento->ct.n_uint16++] = RST_GET(ptr, u_int16_t);
            break;
        case RST_CHAR_TYPE:
            evento->v_uint8[evento->ct.n_uint8++] = RST_GET(ptr, u_int8_t);
            break;
        }
    }

    ptr = ALIGN_PTR(ptr);

    return ptr;
}

// Corrige tempo t a partir da estrutura de correcao
static timestamp_t rst_correct_time(timestamp_t remote, ct_t *ct)
{
    timestamp_t local;
    
    local = (timestamp_t)(ct->a * (double)(remote - ct->loc0)) + ct->ref0;

    return local;
}

static void find_timesync_data(char *filename, rst_one_file_t *of_data)
{
    FILE *ct_file;
    char refhost[MAXHOSTNAMELEN+1];
    char host[MAXHOSTNAMELEN+1];
    timestamp_t time;
    timestamp_t reftime;
    int first = 1;

    of_data->sync_time.a = 1;
    of_data->sync_time.loc0 = 0;
    of_data->sync_time.ref0 = 0;

    if (filename == NULL) {
        return;
    }
    ct_file = fopen(filename, "r");
    if ( ct_file == NULL ) {
        return;
    }
    
    while ( !feof(ct_file) ) {
        fscanf(ct_file, "%s %lld %s %lld",
                        refhost, &reftime, host, &time);
        if (strcmp(refhost, of_data->hostname) == 0) {
            // this is the reference host
            of_data->sync_time.ref0 = reftime;
            of_data->sync_time.loc0 = reftime;
            break;
        }
        if (strcmp(host, of_data->hostname) == 0) {
            if (first) {
                of_data->sync_time.ref0 = reftime;
                of_data->sync_time.loc0 = time;
                first = 0;
            } else {
                of_data->sync_time.a = (double)(reftime - of_data->sync_time.ref0)
                                    / (double)(time - of_data->sync_time.loc0);
            }
        }
    }
    fclose(ct_file);
}

void rst_fill_buffer(rst_one_file_t *of_data)
{
    int bytes_processed;
    int bytes_remaining;
    int bytes_free;
    int bytes_read;
    
    bytes_processed = of_data->rst_buffer_ptr - of_data->rst_buffer;
    bytes_remaining = of_data->rst_buffer_used - bytes_processed;
    if (bytes_remaining >= RST_MAX_EVENT_SIZE) {
        return;
    }
    
    bytes_free = of_data->rst_buffer_size - of_data->rst_buffer_used;
    if (bytes_free < RST_MAX_EVENT_SIZE) {
        memmove(of_data->rst_buffer, of_data->rst_buffer_ptr, bytes_remaining);
        of_data->rst_buffer_used = bytes_remaining;
        of_data->rst_buffer_ptr = of_data->rst_buffer;
        bytes_free += bytes_processed;
    }
    
    bytes_read = read(of_data->fd, of_data->rst_buffer + of_data->rst_buffer_used, bytes_free);
    if (bytes_read > 0) {
        of_data->rst_buffer_used += bytes_read;
    }
}


/*FUNCOES RELATIVAS A LEITURA DE UM UNICO ARQUIVO DE RASTRO*/

//Abre um arquivo de rastro
int rst_open_one_file(char *f_name, rst_one_file_t *of_data, char *syncfilename, int buffer_size)
{
    of_data->fd = open(f_name, O_RDONLY);
    if (of_data->fd == -1) {
        fprintf(stderr, "[rastro] cannot open file %s: %s\n", 
                        f_name, strerror(errno));
        return RST_NOK;
    }
    of_data->sync_time.a = 1.0;
    of_data->sync_time.ref0 = 0;
    of_data->sync_time.loc0 = 0;
    
    if (buffer_size < 2*RST_MAX_EVENT_SIZE) {
        buffer_size = 2*RST_MAX_EVENT_SIZE;
    }
    of_data->rst_buffer_size = buffer_size;
    of_data->rst_buffer = (char *)malloc(of_data->rst_buffer_size);
    if (of_data->rst_buffer == NULL) {
        fprintf(stderr, "[rastro] cannot allocate buffer memory");
        close(of_data->fd);
        of_data->fd = -1;
        return RST_NOK;
    }
    of_data->rst_buffer_ptr = of_data->rst_buffer;
    of_data->rst_buffer_used = 0;

    if (!rst_decode_one_event(of_data, &of_data->event)
       || of_data->event.type != RST_EVENT_INIT 
       || of_data->event.ct.n_uint64 != 2 
       || of_data->event.ct.n_string != 1) {
        fprintf(stderr, "[rastro] invalid rastro file %s\n", f_name);
        close(of_data->fd);
        of_data->fd = -1;
        free(of_data->rst_buffer);
        of_data->rst_buffer = NULL;
        return RST_NOK;
    }
    of_data->id1 = of_data->event.v_uint64[0];
    of_data->id2 = of_data->event.v_uint64[1];
    of_data->hostname = strdup(of_data->event.v_string[0]);
    of_data->event.id1 = of_data->id1;
    of_data->event.id2 = of_data->id2;
    find_timesync_data(syncfilename, of_data);

    /*Sincroniza o primeiro evento*/
    of_data->event.timestamp = rst_correct_time(of_data->event.timestamp, &of_data->sync_time);
    return RST_OK;
}

//Finaliza um arquivo de rastro
void rst_close_one_file(rst_one_file_t *of_data)
{
    close(of_data->fd);
    of_data->fd = -1;
    free(of_data->rst_buffer);
    of_data->rst_buffer = NULL;
    free(of_data->hostname);
    of_data->hostname = NULL;
}

// Le buffer e decodifica um evento
int rst_decode_one_event(rst_one_file_t *of_data, rst_event_t *event)
{
    int bytes_remaining, bytes_processed;

    rst_fill_buffer(of_data);

    bytes_processed = of_data->rst_buffer_ptr - of_data->rst_buffer;
    bytes_remaining = of_data->rst_buffer_used - bytes_processed;
    if (bytes_remaining <= 0) {
        return RST_NOK;
    }

    of_data->rst_buffer_ptr = trd_event(&of_data->hora_global, event, of_data->rst_buffer_ptr);

    event->timestamp = rst_correct_time(event->timestamp, &of_data->sync_time);
    event->id1 = of_data->id1;
    event->id2 = of_data->id2;
    if (event->type == RST_EVENT_STOP) {
        return RST_NOK;
    }
    return RST_OK;
}
/*FIM*/



/*FUNCOES DE AUXLIO A LEITURA DE MULTIPLOS ARQUIVOS*/

//Reorganiza a fila de traz pra frente          
void reorganize_bottom_up (rst_file_t *f_data, int son)
{
    int dead;
    dead = son/2;
    if (dead == 0)
        return;

    smallest_first (f_data, dead, son);
    reorganize_bottom_up (f_data, dead);
}

//Reorganiza a fila do primeiro pra traz
void reorganize_top_down (rst_file_t *f_data, int dead)
{
    int son1, son2;
    son1 = dead * 2;
    son2 = (dead * 2) + 1;

    //fila vazia
    if (dead > f_data->quantity)
        return;
    //filho1 nao pertence a fila
    if (son1 > f_data->quantity)
        return;
    //filho2 nao pertence a fila
    if (son2 > f_data->quantity)
        smallest_first (f_data, dead, son1);

    //ambos filhos pertencem a fila
    else {
        if (f_data->of_data[son1 - 1]->event.timestamp < f_data->of_data[son2 - 1]->event.timestamp) {
            smallest_first (f_data, dead, son1);
            reorganize_top_down (f_data, son1);
        } else {
            smallest_first (f_data, dead, son2);
            reorganize_top_down (f_data, son2);
        }
    }
}

//Se o filho for menor que o pai, troca os dois
void smallest_first (rst_file_t *f_data, int dead, int son)
{
    rst_one_file_t *aux;
    if (f_data->of_data[dead - 1]->event.timestamp > f_data->of_data[son - 1]->event.timestamp){
        aux = f_data->of_data[dead - 1];
        f_data->of_data[dead - 1] = f_data->of_data[son - 1];
        f_data->of_data[son - 1] = aux;
    }
}

/*FIM*/



/*********************************FUNCOES DISPONIVEIS AO USUARIO************************************************/

/*FUNCOES RELATIVAS A LEITURA DE MULTIPLOS CODIGOS*/

//Adiciona um arquivo de rastro na fila de prioridades 'f_data'
int rst_open_file(char *f_name, rst_file_t *f_data, char 
*syncfilename, int buffer_size)
{
    if (f_data->initialized != FDATAINITIALIZED){
        f_data->of_data = (rst_one_file_t **) malloc (sizeof( *f_data->of_data ));
	f_data->quantity = 0;
	f_data->initialized = FDATAINITIALIZED;
    } else {
        f_data->of_data = (rst_one_file_t **) realloc (f_data->of_data, sizeof( *f_data->of_data ) * (f_data->quantity + 1));
    }

    if (f_data->of_data == NULL) {
        fprintf(stderr, "[rastro] cannot allocate memory");
        return RST_NOK;
    }

    f_data->of_data[f_data->quantity] = (rst_one_file_t *) malloc (sizeof( rst_one_file_t ));
    if (f_data->of_data[f_data->quantity] == NULL) {
        fprintf(stderr, "[rastro] cannot allocate memory");
        return RST_NOK;
    }
    
    if (rst_open_one_file(f_name, f_data->of_data[f_data->quantity], syncfilename, buffer_size)){
        if ( !rst_decode_one_event (f_data->of_data[f_data->quantity], &f_data->of_data[f_data->quantity]->event) ) {
            
	    rst_close_one_file(f_data->of_data[f_data->quantity]);
            free(f_data->of_data[f_data->quantity]);
        } else {
            f_data->quantity++;
            reorganize_bottom_up (f_data, f_data->quantity);
        }
        return RST_OK;
    }else
        return RST_NOK;
}



//Finaliza fila de prioridades
void rst_close_file (rst_file_t *f_data)
{
    free(f_data->of_data);
    f_data->quantity = 0;
}

//Le proximo evento do espaco temporal
int rst_decode_event (rst_file_t *f_data, rst_event_t *event)
{
    rst_one_file_t *aux;

    //nao tem nada na fila
    if (f_data->quantity < 1)
        return RST_NOK;

    else {
        *event = f_data->of_data[0]->event;

        f_data->quantity--;

        //troca o ultimo pelo primeiro(removido)                
        aux = f_data->of_data[0];
        f_data->of_data[0] = f_data->of_data[f_data->quantity];
        f_data->of_data[f_data->quantity] = aux;

        //reorganiza a fila
        reorganize_top_down (f_data, 1);

        if ( !rst_decode_one_event (f_data->of_data[f_data->quantity], &f_data->of_data[f_data->quantity]->event) ) {
            
	    rst_close_one_file(f_data->of_data[f_data->quantity]);
            free(f_data->of_data[f_data->quantity]);
        } else {
            f_data->quantity++;
            //reorganiza a fila     
            reorganize_bottom_up (f_data, f_data->quantity);
        }
        return RST_OK;
    }
}
/*FIM*/


//Imprime um evento
void rst_print_event(rst_event_t *event)
{
    int i;
    fprintf (stderr, "type: %d ts: %lld\n", event->type, event->timestamp);
    if (event->ct.n_uint64 > 0) {
        fprintf (stderr, "\tu_int64_ts-> ");
        for (i = 0; i < event->ct.n_uint64; i++) {
            fprintf (stderr, "(%lld) ", event->v_uint64[i]);
        }
        fprintf (stderr, "\n");
    }
    if (event->ct.n_string>0) {
        fprintf (stderr, "\tstrings-> ");
        for (i = 0; i < event->ct.n_string; i++) {
            fprintf (stderr, "(%s) ", event->v_string[i]);
        }
        fprintf (stderr, "\n");
    }
    if (event->ct.n_float > 0) {
        fprintf (stderr, "\tfloats-> ");
        for (i = 0; i < event->ct.n_float; i++) {
            fprintf (stderr, "(%f) ", event->v_float[i]);
        }
    fprintf (stderr, "\n");
    }
    if (event->ct.n_uint32 > 0) {
        fprintf (stderr, "\tu_int32_ts-> ");
        for (i = 0; i < event->ct.n_uint32; i++) {
            fprintf (stderr, "(%d) ", event->v_uint32[i]);
        }
        fprintf (stderr, "\n");
    }
    if (event->ct.n_uint16 > 0) {
        fprintf (stderr, "\tu_int16_ts-> ");
        for (i = 0; i < event->ct.n_uint16; i++) {
            fprintf (stderr, "(%d) ", event->v_uint16[i]);
        }
        fprintf (stderr, "\n");
    }
    if (event->ct.n_uint8 > 0) {
        fprintf (stderr, "\tu_int8_ts-> ");
        for (i = 0; i < event->ct.n_uint8; i++) {
            fprintf (stderr, "(%c) ", event->v_uint8[i]);
        }
        fprintf (stderr, "\n");
    }
    if (event->ct.n_double > 0) {
        fprintf (stderr, "\tdoubles-> ");
        for (i = 0; i < event->ct.n_double; i++) {
            fprintf (stderr, "(%f) ", event->v_double[i]);
        }
        fprintf (stderr, "\n");
    }
}


syntax highlighted by Code2HTML, v. 0.9.1