package Lire::DlfConverterProcess; use strict; use Carp; use POSIX qw/ strftime /; use Lire::PluginManager; use Lire::Config; use Lire::DlfQuery; use Lire::Utils qw/ check_param check_object_param /; use Lire::I18N qw/ set_fh_encoding /; =pod =head1 NAME Lire::DlfConverterProcess - Object that controls the DLF conversion process =head1 SYNOPSIS use Lire::ImportJob; use Lire::DlfStore; use Lire::DlfConverterProcess; my $src = new Lire::ImportJob( "file", 'pattern' => "/var/log/messages" ); my $store = Lire::DlfStore->open( "store" ); my $process = new Lire::DlfConverterProcess( $src, $store ); $process->run_import_job(); print "Log lines read: ", $process->line_count(), "\n"; print "DLF records created: ", $process->dlf_count, "\n"; print "Errors encountered: ", $process->errors_count, "\n"; print "Ignored records: ", $process->ignored_count,"\n"; =head1 DESCRIPTION This object encapsulates the Lire DLF conversion process. It takes as parameter a Lire::ImportJob and a Lire::DlfStore. It will setup the converter and will converter the content of Lire::ImportJob to DLF which will be saved in the Lire::DlfStore. The object provides the API to the converter. Methods are also available to query information on the conversion process. =head1 new( $job, $store ); Create a Lire::DlfConverterProcess that will be used to import the log specified in Lire::ImportJob into DLF records which will be stored into Lire::DlfStore. =cut sub new { my ( $pkg, $job, $store ) = @_; check_object_param( $job, 'job', 'Lire::ImportJob' ); check_object_param( $store, 'store', 'Lire::DlfStore' ); return bless { '_job' => $job, '_store' => $store, '_convert_called' => 0, }, ref $pkg || $pkg; } =pod =head2 run_import_job( [$time] ) Import the log data from ImportJob as DLF. This method will throw an exception if it is called more than once. The $time parameter will be used to determine the time window covered by period. It defaults to the current time. =cut sub run_import_job { my ( $self, $time ) = @_; croak $self->{'_job'}->name(), " ImportJob was already processed" if $self->{'_convert_called'}; $self->{'_job_id'} = $self->{'_job'}->name() . '-' . strftime( "%Y%m%d_%H%M%S", localtime ) unless exists $self->{'_job_id'}; $self->{'_time_start'} = time; $self->_init_counters(); $self->_init_converter(); $self->_init_streams(); $self->{'_fh'} = $self->{'_job'}->log_fh( $time ); if ( $self->{'_converter'}->handle_log_lines() ) { $self->_handle_continuation(); $self->_process_log_lines( $self->{'_fh'} ); } else { $self->{'_converter'}->process_log_file( $self, $self->{'_fh'} ); } close $self->{'_fh'}; $self->{'_converter'}->finish_conversion( $self ); # Close the dlf streams foreach my $s ( values %{$self->{'_streams'}} ) { $s->close(); } $self->{'_log_stream'}->close(); $self->_save_import_stats(); $self->{'_convert_called'} = 1; return; } sub _init_converter { my $self = $_[0]; my $cname = $self->{'_job'}->converter(); croak "ImportJob ", $self->{'_job'}->name(), " doesn't have a converter defined" unless defined $cname; croak "DLF converter $cname isn't available" unless Lire::PluginManager->has_plugin( 'dlf_converter', $cname ); $self->{'_converter'} = Lire::PluginManager->get_plugin( 'dlf_converter', $cname ); $self->{'_converter'}->init_dlf_converter( $self, $self->{'_job'}->converter_config() ); return; } sub _init_streams { my $self = $_[0]; foreach my $s ( $self->{'_converter'}->schemas() ) { $self->{'_streams'}{$s} = $self->{'_store'}->open_dlf_stream( $s, "w" ); } $self->{'_log_stream'} = $self->{'_store'}->open_dlf_stream( 'lire_import_log', "w" ); return; } sub _init_counters { my $self = $_[0]; $self->{'_line_count'} = 0; $self->{'_dlf_count'} = 0; $self->{'_ignored_count'} = 0; $self->{'_error_count'} = 0; $self->{'_saved_count'} = 0; return; } sub _handle_continuation { my $self = $_[0]; my $query = new Lire::DlfQuery( 'lire_import_log' ); $query->add_field( 'line' ); $query->add_field( 'time' ); $query->add_field( 'line_no' ); $query->set_filter_clause( "type = 'continuation' AND job_name = ?", $self->{'_job'}->name() ); $query->set_sort_spec( 'time line_no' ); my $result = $query->execute( $self->{'_store'} ); while ( defined( my $row = $result->next_row_aref() ) ) { $self->{'_line_count'}++; $self->{'_converter'}->process_log_line( $self, $row->[0] ); } $self->{'_store'}->_dbh()->do( <{'_job'}->name(), $self->{'_time_start'} ); DELETE FROM dlf_lire_import_log WHERE type='continuation' AND job_name = ? AND time < ? EOSQL return; } #------------------------------------------------------------------------ # Method _process_log_lines( $fh ) # # Method which handles the read a line, process a line loop sub _process_log_lines { my ($self, $fh) = @_; my $line; my $converter = $self->{'_converter'}; while (defined ($line = <$fh>)) { # Remove DOS and/or UNIX line ending $line =~ s/\r?\n?$//; $self->{'_line_count'}++; $converter->process_log_line( $self, $line ) } return; } sub _save_import_stats { my $self = $_[0]; my $elapsed = time - $self->{'_time_start'}; my $stream = $self->{'_store'}->open_dlf_stream( 'lire_import_stats', "w"); $stream->write_dlf( { 'time_start' => $self->{'_time_start'}, 'elapsed' => $elapsed, 'job_name' => $self->{'_job'}->name(), 'job_id' => $self->{'_job_id'}, 'line_count' => $self->{'_line_count'}, 'dlf_count' => $self->{'_dlf_count'}, 'error_count' => $self->{'_error_count'}, 'ignored_count' => $self->{'_ignored_count'}, 'saved_count' => $self->{'_saved_count'}, } ); $stream->close(); return; } =pod =head2 job_id() Returns the job identifier associated to this process. =cut sub job_id { return $_[0]{'_job_id'}; } =pod =head2 dlf_store() Returns the Lire::DlfStore in which this conversion process is storing the DLF records. =cut sub dlf_store { $_[0]{'_store'} } =pod =head2 import_job() Returns the Lire::ImportJob upon which this conversion process is operating. =cut sub import_job { $_[0]{'_job'}} =pod =head2 line_count() Returns the number of lines processed. This will 0 in case the DLF converter process file and not log lines. During a processing, this is always equals to the line that is currently being converted. =cut sub line_count { $_[0]{'_line_count'}} =pod =head2 dlf_count() Returns the number of DLF records created. =cut sub dlf_count { $_[0]{'_dlf_count'} } =pod =head2 error_count() Returns the number of errors encountered in the conversion process. =cut sub error_count { $_[0]{'_error_count'} } =pod =head2 ignored_count() Returns the number of records which were ignored in the conversion process. =cut sub ignored_count { $_[0]{'_ignored_count'} } =pod =head2 saved_count() Returns the number of lines which were saved for later processing. =cut sub saved_count { $_[0]{'_saved_count'} } =pod =head1 API FOR THE DLF CONVERTERS This is the object that encapsulates the Dlf implementation and hides the complexitity of the storage framework from the DLF converter. It offers the following methods to the DLf converter. =head2 write_dlf( $schema, $dlf ) This writes the $dlf DLF record conforming the $schema's schema in the Lire::DlfStore. The schema is the schema's name (e.g. 'www'). $dlf is an hash reference. Keys are the schema's field name. Undefined value means that this field isn't available in that record. =cut sub write_dlf { my ( $self, $schema, $dlf ) = @_; check_param( $schema, 'schema' ); check_param( $dlf, 'dlf' ); my $s = $self->{'_streams'}{$schema}; croak "schema $schema wasn't defined by ", $self->{'_converter'}->name, " converter" unless defined $s; $dlf->{'dlf_source'} = $self->{'_job_id'}; $s->write_dlf( $dlf ); $self->{'_dlf_count'}++; return; } =pod =head2 save_log_line( $line ) Method that should be used to save $line for a future processing run of the converter on the same Lire::ImportJob. =cut sub save_log_line { my ( $self, $line ) = @_; check_param( $line, 'line' ); croak "only DLF converter handling log lines can save log line" unless $self->{'_converter'}->handle_log_lines(); $self->{'_saved_count'}++; $self->{'_log_stream'}->write_dlf( { 'time' => time(), 'job_name' => $self->{'_job'}->name(), 'job_id' => $self->{'_job_id'}, 'line_no' => $self->{'_line_count'}, 'type' => 'continuation', 'line' => $line, } ); return; } =pod =head2 ignored_log_line( $line, [ $reason ] ) Method that can be used by the Lire::DlfConverter to report that the '$line' log line was ignored during that processing. The reason why the line was ignored can be given in $reason. For example, syslog-based converter should use that method to report lines that are for another 'service' than theirs. =cut sub ignore_log_line { my ( $self, $line, $reason ) = @_; $reason ||= "Unknown reason"; check_param( $line, 'line' ); $self->{'_ignored_count'}++; $self->{'_log_stream'}->write_dlf( { 'time' => time(), 'job_name' => $self->{'_job'}->name(), 'job_id' => $self->{'_job_id'}, 'line_no' => $self->{'_line_count'}, 'type' => 'ignored', 'line' => $line, 'msg' => $reason, } ); return; } =pod =head2 error( $error_msg, [ $line ] ); Method that should be used by the Lire::DlfConveter to report that an error was encountered when processing the Lire::ImportJob. $error_msg should be used to report the nature of the error. The $line parameter should be used by converter operating on lines to associate the error message to a particular line. =cut sub error { my ( $self, $error_msg, $line ) = @_; check_param( $error_msg, 'error_msg' ); $self->{'_error_count'}++; $self->{'_log_stream'}->write_dlf( { 'time' => time(), 'job_name' => $self->{'_job'}->name(), 'job_id' => $self->{'_job_id'}, 'line_no' => $self->{'_line_count'}, 'type' => 'error', 'line' => defined $line ? $line : '', 'msg' => $error_msg, } ); return; } # keep perl happy 1; __END__ =pod =head1 SEE ALSO Lire::DlfStore(3pm) Lire::DlfConverter(3pm) =head1 AUTHOR Francis J. Lacoste =head1 VERSION $Id: DlfConverterProcess.pm,v 1.18 2006/07/23 13:16:28 vanbaal Exp $ =head1 COPYRIGHT Copyright (C) 2002-2004 Stichting LogReport Foundation LogReport@LogReport.org This file is part of Lire. Lire is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see COPYING); if not, check with http://www.gnu.org/copyleft/gpl.html. =cut