package Lire::DlfStore; use strict; use Lire::DlfStream; use Lire::DlfAnalyserProcess; use Lire::DlfQuery; use Lire::SQLExt::Registry; use Lire::WeekCalculator; use Lire::Config::Index; use Lire::Config::ReportConfigIndex; use Lire::ReportConfig; use Lire::Utils qw/check_param check_object_param min max unique/; use File::Path qw/mkpath/; use File::Basename qw/dirname basename/; use POSIX qw/strftime/; use Errno; use Fcntl; use DBI; use Carp; =pod =head1 NAME Lire::DlfStore - Interface to a DLF store =head1 SYNOPSIS use Lire::DlfStore; my $store = Lire::DlfStore->open( "mystore", 1 ); my $avail_streams = $store->dlf_streams; =head1 DESCRIPTION =head2 open( $store_path, [ $create ] ); Opens a Lire::DlfStore residing in the directory pointed by $store_path and returns a Lire::DlfStore instance that can be used to access its content. If the store $store_path wasn't created, the method will die unless the $create parameter was set to true. If the store is locked by another process, the method will also die. Trying to open the same store twice from the same process will fail. =cut sub open { my ( $pkg, $store_path, $create ) = @_; check_param( $store_path, 'store_path' ); my $self = bless { '_store_path' => $store_path, '_dbh' => undef, '_config' => undef, }, $pkg; if ( ! -d $store_path ) { croak "DlfStore '$store_path' doesn't exist" unless $create; $self->_create_store(); } elsif ( !( -f $self->{'_store_path'} . "/dlf.db" && -f $self->{'_store_path'} . "/config.xml" ) ) { croak "Invalid DlfStore: '$store_path'" unless $create; } $self->_load_config_file(); $self->{'_dbh'} = DBI->connect( "dbi:SQLite2:dbname=" . $self->{'_store_path'} . "/dlf.db", "", "", { 'RaiseError' => 1, 'AutoCommit' => 0, } ); Lire::SQLExt::Registry->register_functions( $self->{'_dbh'} ); Lire::SQLExt::Registry->register_aggregates( $self->{'_dbh'} ); my $locked_by = $self->_lock(); die "DlfStore '$store_path' is locked by process $locked_by\n" unless $locked_by == $$; $self->_migrate_report_jobs(); return $self; } sub _load_config_file { my $self = $_[0]; my $config_file = "$self->{'_store_path'}/config.xml"; if ( -f $config_file ) { my $parser = Lire::Config::Parser->new( 'spec' => Lire::Config->config_spec() ); $self->{'_config'} = $parser->load_config_file( $config_file ); } else { $self->{'_config'} = new Lire::Config::ConfigFile( 'spec' => Lire::Config->config_spec(), 'filename' => $config_file ); } Lire::Config::Index->set_index( 'store_report_configurations', new Lire::Config::ReportConfigIndex( $self->{'_config'}->get( 'reports' ) ) ); return; } sub _migrate_report_jobs { my $self = $_[0]; foreach my $job ( $self->{'_config'}->get( 'report_jobs' )->elements() ) { $self->_migrate_report_job( $job ) if $job->is_set( 'superservice' ); } return; } sub _migrate_report_job { my ( $self, $job ) = @_; my $super = $job->get( 'superservice' )->get(); foreach my $sched ( $job->get( 'schedules' )->elements() ) { my $cfgref = $self->_find_config( $super, $sched->get( 'report_cfg' )->get() ); $sched->get( 'report_config' )->set( $cfgref ); } return; } sub _find_config { my ( $self, $super, $file ) = @_; my $id = basename( $file ); return $id if $self->has_report_config( $id ); my $cfg = new_from_file Lire::ReportConfig( $super, $file )->as_config_value( $id ); $cfg->{'spec'} = $self->{'_config'}->spec()->get( 'reports' )->get( 'report_cfg' ); $self->{'_config'}->get( 'reports' )->append( $cfg ); return $id; } =pod =head2 close() Calls this method to release the lock held by the current process on the store. If you don't call it, it will be called automatically when the store reference goes out of scope, but you better not rely on this to close your stores. =cut sub close { my ($self, $in_destroy) = @_; $self->_unlock(); # DESTROY calls close() another time if ( $self->{'_dbh'} ) { $self->{'_dbh'}->commit(); $self->{'_dbh'}->disconnect(); delete $self->{'_dbh'}; $self->{'_config'}->save() unless $in_destroy; Lire::Config::Index->set_index( 'store_report_configurations', undef ); } } =pod =head2 is_closed() Returns true if this DlfStore was closed and cannot be used anymore. =cut sub is_closed { return ! exists $_[0]->{'_dbh'}; } =pod =head2 path Returns the path to the store. This is the argument used to create the store. =cut sub path { return $_[0]{'_store_path'}; } =pod =head2 config Returns the configuration of the store instantiated as a Lire::Config::ConfigFile. =cut sub config { return $_[0]{'_config'}; } =pod =head2 import_jobs() Returns as an array ref the list of ImportJob currently configured in the DlfStore =cut sub import_jobs { my $self = $_[0]; return $self->{'_config'}->get( 'import_jobs' )->as_value(); } =pod =head2 report_jobs() Returns as an array ref the list of ReportJob currently configured in the DlfStore =cut sub report_jobs { my $self = $_[0]; return $self->{'_config'}->get( 'report_jobs' )->as_value(); } =pod =head2 has_report_config( $name ) Returns true if there is a report configuration named $name in this DlfStore. =cut sub has_report_config { my ( $self, $name ) = @_; check_param( $name, 'name' ); foreach my $cfg ( $self->{'_config'}->get( 'reports' )->elements() ) { return 1 if $cfg->get( 'id' )->get() eq $name; } return 0; } =pod =head2 report_configurations() Returns as an array ref the list of ReporConfig currently configured in the DlfStore =cut sub report_configurations { my $self = $_[0]; return $self->{'_config'}->get( 'reports' )->as_value(); } =pod =head2 get_report_config( $name ) Returns the Lire::ReportConfig object named $name. Throws an exception if it doesn' exists. =cut sub get_report_config { my ( $self, $name ) = @_; check_param( $name, 'name' ); croak "there is no report configuration named '$name'" unless $self->has_report_config( $name ); foreach my $cfg ( $self->{'_config'}->get( 'reports' )->elements() ) { return $cfg->as_value() if $cfg->get( 'id' )->get() eq $name; } } #------------------------------------------------------------------------ # Method _create_store( ) # # Creates the DlfStore structure. sub _create_store { my $self = $_[0]; mkdir $self->{'_store_path'}, 0770 or croak "can't create DLF store '$self->{'_store_path'}: $!"; return; } #------------------------------------------------------------------------ # Method _lock() # # Acquire a lock on the store. This is done automatically from open() # It tries three time to acquired the lock before returning. It # returns the process id of the owner of the lock. To check if the # lock was acquired the called must check that this is equals to $$. sub _lock { my $self = $_[0]; # Do not try to acquired another lock. return if $self->{'_lock_file'}; my $lock_file = $self->{'_store_path'} . "/lock"; my $lock_fh; my $max_try = 3; for ( my $try =1; $try <= $max_try; $try++ ) { if ( sysopen( $lock_fh, $lock_file, O_EXCL|O_CREAT|O_RDWR) ) { print $lock_fh $$, "\n"; CORE::close $lock_fh; $self->{'_lock_file'} = $lock_file; return $$; } if ( CORE::open $lock_fh, $lock_file ) { my $lock_pid = <$lock_fh>; chomp $lock_pid; if (kill( 0, $lock_pid) || $!{'EPERM'} ) { return $lock_pid if $try == $max_try; sleep 2; } else { # Stale lock unlink( $lock_file ) == 1 or croak "can't remove stale lock file '$lock_file': $!"; } } } die "ASSERTION FAILED: unreachable code"; } #------------------------------------------------------------------------ # Method _unlock() # # Release the lock previously acquired by _lock() method. sub _unlock { my $self = $_[0]; return unless $self->{'_lock_file'}; unlink $self->{'_lock_file'}; } sub DESTROY { my $self = $_[0]; $self->close( 'in_destroy' ); } =pod =head1 DLF STREAMS RELATED METHOD A DlfStore can contains multiple Dlf Stream. Dlf Stream is data conforming to a DLF schema. The streams are represented by Lire::DlfStream objects. =head2 dlf_streams() List the DLF stream available in the DLF store. This returns a list of DlfSchema's name. =cut sub dlf_streams { my $self = $_[0]; my @streams = (); my $sth = $self->{'_dbh'}->table_info( "", "", "dlf_%", "TABLE" ); $sth->execute(); while ( my $table_info = $sth->fetchrow_hashref() ) { next unless $table_info->{'TABLE_NAME'} =~ /^dlf_(.*)/; next if $table_info->{'TABLE_NAME'} =~ /_links$/; push @streams, $1; } $sth->finish(); return @streams; } =pod =head2 has_dlf_stream( $name ) Returns true if the DlfStore contains a DlfStream conforming to the schema $name. =cut sub has_dlf_stream { my ( $self, $name ) = @_; return grep { $_ eq $name } $self->dlf_streams(); } =pod =head2 get_stream_config( $name ) Returns the Lire::Config::Dictionary instance which contains the stream configuration for the $name DlfStream. This method will create an instance from the default if necessary. If $name isn't a valid schema name, this method will throw an error. =cut sub get_stream_config { my ( $self, $name ) = @_; check_param( $name, 'name' ); croak "schema '$name' doesn't exist" unless Lire::DlfSchema->has_schema( $name ); my $streams_config = $self->{'_config'}->get( 'streams_config' ); foreach my $config ( $streams_config->elements() ) { return $config if $config->name eq $name; } my $config = $streams_config->spec()->get( $name )->instance(); $streams_config->append( $config ); return $config; } =pod =head2 configured_dlf_streams() Returns the list of DlfStreams which are referenced by the store configuration. This is the result of dlf_streams() plus all schemas that exists and are used by one of the confiured ImportJob or DlfStreamSpec. =cut sub configured_dlf_streams { my $self = $_[0]; my @streams = ( $self->dlf_streams(), @{ $self->_import_job_streams() } ); my %done = (); while ( defined( my $s = shift @streams ) ) { next unless Lire::DlfSchema->has_schema( $s ); next if exists $done{$s}; push @streams, @{ $self->_analyser_streams( $s ) }; $done{$s} = 1; } return sort keys %done; } sub _analyser_streams { my ( $self, $name ) = @_; my @streams = (); my $cfg = $self->get_stream_config( $name ); foreach my $comp ( $cfg->spec()->components() ) { next unless $comp->isa( 'Lire::Config::DlfAnalyserSpec' ); push @streams, $comp->name() if $cfg->get( $comp->name() )->get_plugin() ne 'none'; } return \@streams; } sub _import_job_streams { my $self = $_[0]; my @streams = (); foreach my $job ( @{ $self->import_jobs() } ) { my $name = $job->converter(); push @streams, Lire::PluginManager->get_plugin( 'dlf_converter', $name )->schemas() if Lire::PluginManager->has_plugin( 'dlf_converter', $name ); } return \@streams; } =pod =head2 open_dlf_stream( $name, [$mode], [$sort_spec] ) Returns a Lire::DlfStream object that can be used to a DLF stream in the schema $name. The $mode parameter can either "r" or "w" and defaults to "r". If the mode is "w", the Lire::DlfStream is opened for writing (new Dlf records can be inserted). If the DlfStore didn't contains a stream $name, an empty one will be created. If the mode is "r", the Lire::DlfStream is opened for reading (no DLF records may be inserted). An exception will be thrown if there is no stream $name available. When the stream is opened in "r" mode, a sort order can be specified by passing a $sort_spec. A $sort_spec is a white-space delimited list of field names which should specify the sort order. Reverse sort order can be specified by prefixing the field's name by '-'. =cut sub open_dlf_stream { my ( $self, $name, $mode, $sort_spec ) = @_; check_param( $name, 'name' ); $mode ||= "r"; croak "mode should be either 'r' or 'w' not '$mode'" unless $mode eq 'r' or $mode eq 'w'; croak "no DLF stream '$name' in this store" if ( $mode eq 'r' && ! $self->has_dlf_stream( $name ) ); croak "only 'r' mode can use a sort_spec" if ( $mode ne 'r' && $sort_spec ); return new Lire::DlfStream( $self, $name, $mode, $sort_spec ); } =pod =head2 run_analysers( $stream, [ $dlf_source ] ) Runs all the configured analysers on the DLF stream $stream. $dlf_source limit the records which should be analysed. =cut sub run_analysers { my ( $self, $stream, $dlf_source ) = @_; check_param( $stream, 'stream' ); croak "no DLF stream '$stream' in this store" unless $self->has_dlf_stream( $stream ); my $config = $self->get_stream_config( $stream )->as_value(); while ( my ( $stream, $analyser ) = each %$config ) { next if $stream eq 'keep_days'; next if $analyser->{'plugin'} eq 'none'; my $process = new Lire::DlfAnalyserProcess( $self, $analyser->{'plugin'}, $analyser->{'properties'}, $dlf_source ); $process->run_analysis_job(); $self->run_analysers( $stream, $process->job_id() ); } return; } =pod =head2 clean_streams() Remove records that are days older than what is configured in the 'keep_days' parameter. =cut sub clean_streams { my ($self, $time) = @_; $time ||= time; foreach my $name ( $self->dlf_streams() ) { my $config = $self->get_stream_config( $name ); my $keep_days = $config->get( 'keep_days' )->as_value(); next unless $keep_days; my $time = $time - 86400 * $keep_days; my $stream = $self->open_dlf_stream( $name, 'w' ); $stream->clean( $time ); $stream->close(); } return; } sub _dbh { return $_[0]{'_dbh'}; } =pod =head1 REPORTS RELATED METHOD A DlfStore also contains the periodically generated reports. These reports are usually configured through the lire(1) command. The generated reports are stored in Lire XML native format by period. =head2 put_report( $job, $schedule, $report ) Saves the $report Lire::Report which was generated by $job and $schedule. Returns the filename where the report was saved. =cut sub put_report { my ( $self, $job, $schedule, $report ) = @_; check_object_param( $job, 'job', 'Lire::ReportJob' ); check_object_param( $schedule, 'schedule', 'Lire::ReportSchedule' ); check_object_param( $report, 'report', 'Lire::Report' ); my $file = $self->_report_filename( $job->name(), $schedule->period(), $report->timespan_start() ); my $dir = dirname( $file ); mkpath( [ $dir ], 0, 0755 ) unless -d $dir; CORE::open my $fh, "> $file" or die "failed to open '$file' for writing: $!\n"; $report->write_report( $fh ); CORE::close $fh; return $file; } =pod =head2 find_report_source( $job, $schedule, [$time] ) This method will returns an hash reference containing parameters on how the ReportJob $job should be generated for the ReportSchedule $schedule. The boundaries of the period are determined according to $time which detauls to now. The hash reference will contain a 'source' key as well as possible others: =over =item source Specify from what kind of source the report should be generated. Possible values are 'dlf' when the report should be generated from DLF, 'merging' when the report should be generated by merging previous XML reports and 'none' when no data is available for that period and the report cannot be generated. Note that 'merging' is only available as a source for schedule above 'daily'. i.e. 'hourly' and 'daily' reports can only be generated from DLF. Also of important is the fact that only 'daily' reports are available as a source of merging. So you need a daily report schedule to generate 'weekly', 'monthly' and 'yearly' schedule by merging. =item start The timestamp at which the available data for the period starts. This can be higher than the requested period's starting boundary. =item end The timestamp at which the available data for the period ends. This can be lower than the requested period's ending boundary. =item coverage An integer between 0 and 100 representing the ratio of data available for the requested period. For example, if only 6 hours of data are available for a 'daily' report, the 'coverage' will be 25. =item reports This key is only available when 'source' is 'merging'. It contains an array of report files which should be used as the source of merging. =item days This key is only available when 'source' is 'merging'. It contains the days for which a reports was available for merging. For example, if only 6 daily reports were available to generate a 'weekly' report, this will contain those 6 days dates in ISO format (%Y-%m-%d). =cut sub find_report_source { my ( $self, $job, $schedule, $time ) = @_; $time ||= time(); check_object_param( $job, 'job', 'Lire::ReportJob' ); check_object_param( $schedule, 'schedule', 'Lire::ReportSchedule' ); my $dlf_source = $self->_find_dlf_source( $job, $schedule, $time ); return $dlf_source if $schedule->period() =~ /^(hourly|daily)$/; my $merging_source = $self->_find_merging_source( $job, $schedule, $time ); return ( $merging_source->{'coverage'} > $dlf_source->{'coverage'} ? $merging_source : $dlf_source ); } sub _find_dlf_source { my ( $self, $job, $schedule, $time ) = @_; my $avail_start = 0; my $avail_end = 2**31; my $range = $schedule->period_range( $time ); foreach my $schema ( @{ $schedule->report_config()->schemas() } ) { next unless $self->has_dlf_stream( $schema ); my $stream = $self->open_dlf_stream( $schema, 'r' ); my $start = $stream->start_time(); my $end = $stream->end_time(); $stream->close(); next unless defined $start; next if ( $start > $range->[1] || $end < $range->[0] ); $avail_start = max( $range->[0], $start, $avail_start ); $avail_end = min( $range->[1], $end, $avail_end ); } return { 'source' => 'none', 'coverage' => 0 } unless $avail_start; return { 'source' => 'dlf', 'start' => $avail_start, 'end' => $avail_end, 'coverage' => int( ( $avail_end - $avail_start ) * 100 / ( $range->[1] - $range->[0] ) ), }; } sub _find_merging_source { my ( $self, $job, $schedule, $time ) = @_; my $source = { 'source' => 'merging', 'start' => undef, 'end' => undef, 'reports' => [], 'days' => [], 'coverage' => 0 }; my $range = $schedule->period_range( $time ); for ( my $day=$range->[0]; $day < $range->[1]; $day += 86400 ) { my $file = $self->_report_filename( $job->name(), 'daily', $day ); if ( -f $file ) { $source->{'start'} = $day unless defined $source->{'start'}; $source->{'end'} = $day + 86400; push @{$source->{'reports'}}, $file; push @{$source->{'days'}}, strftime( '%Y-%m-%d', localtime $day); } } if ( $source->{'start'} ) { $source->{'coverage'} = int( ( $source->{'end'} - $source->{'start'}) * 100 / ( $range->[1] - $range->[0] ) ); } return $source; } sub _report_filename { my ( $self, $report, $period, $time ) = @_; check_param( $report, 'report' ); check_param( $period, 'period', qr/^(hourly|daily|weekly|monthly|yearly)$/, "'period' parameter should be one of 'hourly', 'daily', 'weekly', 'monthly' or 'yearly'" ); check_param( $time, 'time', qr/^\d+$/ ); my $fmt = ""; if ( $period eq 'hourly' ) { $fmt = "%Y%m/%d/%H.xml"; } elsif ( $period eq 'daily' ) { $fmt = "%Y%m/%d.xml"; } elsif ( $period eq 'weekly' ) { $fmt = "%Y/%V.xml"; } elsif ( $period eq 'monthly' ) { $fmt = "%Y/%m.xml"; } elsif ( $period eq 'yearly' ) { $fmt = "%Y.xml"; } else { die "shouldn't be reached"; } my $calc = new Lire::WeekCalculator(); return $self->{'_store_path'} . "/${period}_reports/$report/" . $calc->strfdate( $fmt, localtime( $time ) ); } # keep perl happy 1; __END__ =pod =head1 SEE ALSO Lire::DlfConverter(3pm) =head1 AUTHOR Francis J. Lacoste =head1 VERSION $Id: DlfStore.pm,v 1.39 2006/07/23 13:16:29 vanbaal Exp $ =head1 COPYRIGHT Copyright (C) 2002,2004 Stichting LogReport Foundation LogReport@LogReport.org This file is part of Lire. Lire is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see COPYING); if not, check with http://www.gnu.org/copyleft/gpl.html. =cut