package Lire::DlfStore;

use strict;

use Lire::DlfStream;
use Lire::DlfAnalyserProcess;
use Lire::DlfQuery;
use Lire::SQLExt::Registry;
use Lire::WeekCalculator;
use Lire::Config::Index;
use Lire::Config::ReportConfigIndex;
use Lire::ReportConfig;

use Lire::Utils qw/check_param check_object_param min max unique/;

use File::Path qw/mkpath/;
use File::Basename qw/dirname basename/;
use POSIX qw/strftime/;

use Errno;
use Fcntl;
use DBI;
use Carp;

=pod

=head1 NAME

Lire::DlfStore - Interface to a DLF store

=head1 SYNOPSIS

  use Lire::DlfStore;

  my $store = Lire::DlfStore->open( "mystore", 1 );
  my $avail_streams = $store->dlf_streams;

=head1 DESCRIPTION

=head2 open( $store_path, [ $create ] );

Opens a Lire::DlfStore residing in the directory pointed by
$store_path and returns a Lire::DlfStore instance that can be used to
access its content.

If the store $store_path wasn't created, the method will die unless
the $create parameter was set to true.

If the store is locked by another process, the method will also die.

Trying to open the same store twice from the same process will fail.

=cut

sub open {
    my ( $pkg, $store_path, $create ) = @_;

    check_param( $store_path, 'store_path' );

    my $self = bless { '_store_path' => $store_path,
                       '_dbh' => undef,
                       '_config' => undef,
                     }, $pkg;

    if ( ! -d $store_path ) {
        croak "DlfStore '$store_path' doesn't exist"
          unless $create;

        $self->_create_store();
    } elsif ( !( -f $self->{'_store_path'} . "/dlf.db"
                 && -f $self->{'_store_path'} . "/config.xml" ) ) {
        croak "Invalid DlfStore: '$store_path'"
          unless $create;
    }

    $self->_load_config_file();
    $self->{'_dbh'} = DBI->connect( "dbi:SQLite2:dbname=" .
                                    $self->{'_store_path'} . "/dlf.db", "", "",
                                    { 'RaiseError' => 1,
                                      'AutoCommit' => 0, } );

    Lire::SQLExt::Registry->register_functions( $self->{'_dbh'} );
    Lire::SQLExt::Registry->register_aggregates( $self->{'_dbh'} );

    my $locked_by = $self->_lock();
    die "DlfStore '$store_path' is locked by process $locked_by\n"
      unless $locked_by == $$;

    $self->_migrate_report_jobs();
    return $self;
}

sub _load_config_file {
    my $self = $_[0];

    my $config_file = "$self->{'_store_path'}/config.xml";
    if ( -f $config_file ) {
        my $parser =
          Lire::Config::Parser->new( 'spec' => Lire::Config->config_spec() );
        $self->{'_config'} = $parser->load_config_file( $config_file );
    } else {
        $self->{'_config'} =
          new Lire::Config::ConfigFile( 'spec' => Lire::Config->config_spec(),
                                        'filename' => $config_file );
    }
    Lire::Config::Index->set_index( 'store_report_configurations',
                                    new Lire::Config::ReportConfigIndex( $self->{'_config'}->get( 'reports' ) ) );
    return;
}

sub _migrate_report_jobs {
    my $self = $_[0];

    foreach my $job ( $self->{'_config'}->get( 'report_jobs' )->elements() ) {
        $self->_migrate_report_job( $job )
          if $job->is_set( 'superservice' );
    }
    return;
}

sub _migrate_report_job {
    my ( $self, $job ) = @_;

    my $super = $job->get( 'superservice' )->get();
    foreach my $sched ( $job->get( 'schedules' )->elements() ) {
        my $cfgref = $self->_find_config( $super,
                                          $sched->get( 'report_cfg' )->get() );
        $sched->get( 'report_config' )->set( $cfgref );
    }
    return;
}

sub _find_config {
    my ( $self, $super, $file ) = @_;

    my $id = basename( $file );
    return $id if $self->has_report_config( $id );

    my $cfg = new_from_file Lire::ReportConfig( $super, $file )->as_config_value( $id );
    $cfg->{'spec'} = $self->{'_config'}->spec()->get( 'reports' )->get( 'report_cfg' );
    $self->{'_config'}->get( 'reports' )->append( $cfg );

    return $id;
}

=pod

=head2 close()

Calls this method to release the lock held by the current process on
the store. If you don't call it, it will be called automatically when
the store reference goes out of scope, but you better not rely on this
to close your stores.

=cut

sub close {
    my ($self, $in_destroy) = @_;

    $self->_unlock();

    # DESTROY calls close() another time
    if ( $self->{'_dbh'} ) {
        $self->{'_dbh'}->commit();
        $self->{'_dbh'}->disconnect();
        delete $self->{'_dbh'};
        $self->{'_config'}->save()
          unless $in_destroy;
        Lire::Config::Index->set_index( 'store_report_configurations', undef );
    }
}

=pod

=head2 is_closed()

Returns true if this DlfStore was closed and cannot be used anymore.

=cut

sub is_closed {
    return ! exists $_[0]->{'_dbh'};
}

=pod

=head2 path

Returns the path to the store. This is the argument used to create the
store.

=cut

sub path {
    return $_[0]{'_store_path'};
}

=pod

=head2 config

Returns the configuration of the store instantiated as a
Lire::Config::ConfigFile.

=cut

sub config {
    return $_[0]{'_config'};
}

=pod

=head2 import_jobs()

Returns as an array ref the list of ImportJob currently configured
in the DlfStore

=cut

sub import_jobs {
    my $self = $_[0];

    return $self->{'_config'}->get( 'import_jobs' )->as_value();
}

=pod

=head2 report_jobs()

Returns as an array ref the list of ReportJob currently configured
in the DlfStore

=cut

sub report_jobs {
    my $self = $_[0];

    return $self->{'_config'}->get( 'report_jobs' )->as_value();
}

=pod

=head2 has_report_config( $name )

Returns true if there is a report configuration named $name in
this DlfStore.

=cut

sub has_report_config {
    my ( $self, $name ) = @_;

    check_param( $name, 'name' );

    foreach my $cfg ( $self->{'_config'}->get( 'reports' )->elements() ) {
        return 1 if $cfg->get( 'id' )->get() eq $name;
    }

    return 0;
}

=pod

=head2 report_configurations()

Returns as an array ref the list of ReporConfig currently configured
in the DlfStore

=cut

sub report_configurations {
    my $self = $_[0];

    return $self->{'_config'}->get( 'reports' )->as_value();
}

=pod

=head2 get_report_config( $name )

Returns the Lire::ReportConfig object named $name. Throws an exception
if it doesn' exists.

=cut

sub get_report_config {
    my ( $self, $name ) = @_;

    check_param( $name, 'name' );
    croak "there is no report configuration named '$name'"
      unless $self->has_report_config( $name );

    foreach my $cfg ( $self->{'_config'}->get( 'reports' )->elements() ) {
        return $cfg->as_value() if $cfg->get( 'id' )->get() eq $name;
    }
}

#------------------------------------------------------------------------
# Method _create_store( )
#
# Creates the DlfStore structure.
sub _create_store {
    my $self = $_[0];


    mkdir $self->{'_store_path'}, 0770
      or croak "can't create DLF store '$self->{'_store_path'}: $!";

    return;
}

#------------------------------------------------------------------------
# Method _lock()
#
# Acquire a lock on the store. This is done automatically from open()
# It tries three time to acquired the lock before returning. It
# returns the process id of the owner of the lock. To check if the
# lock was acquired the called must check that this is equals to $$.
sub _lock {
    my $self = $_[0];

    # Do not try to acquired another lock.
    return if $self->{'_lock_file'};

    my $lock_file = $self->{'_store_path'} . "/lock";
    my $lock_fh;
    my $max_try = 3;
    for ( my $try =1; $try <= $max_try; $try++ ) {
        if ( sysopen( $lock_fh, $lock_file, O_EXCL|O_CREAT|O_RDWR) ) {
            print $lock_fh $$, "\n";
            CORE::close $lock_fh;
            $self->{'_lock_file'} = $lock_file;
            return $$;
        }
        if ( CORE::open $lock_fh, $lock_file ) {
            my $lock_pid = <$lock_fh>;
            chomp $lock_pid;

            if (kill( 0, $lock_pid) || $!{'EPERM'} ) {
                return $lock_pid if $try == $max_try;
                sleep 2;
            } else {
                # Stale lock
                unlink( $lock_file ) == 1
                  or croak "can't remove stale lock file '$lock_file': $!";
            }
        }
    }

    die "ASSERTION FAILED: unreachable code";
}

#------------------------------------------------------------------------
# Method _unlock()
#
# Release the lock previously acquired by _lock() method.
sub _unlock {
    my $self = $_[0];

    return unless $self->{'_lock_file'};

    unlink $self->{'_lock_file'};
}

sub DESTROY {
    my $self = $_[0];

    $self->close( 'in_destroy' );
}

=pod

=head1 DLF STREAMS RELATED METHOD

A DlfStore can contains multiple Dlf Stream. Dlf Stream is data
conforming to a DLF schema. The streams are represented by
Lire::DlfStream objects.

=head2 dlf_streams()

List the DLF stream available in the DLF store. This returns a list of
DlfSchema's name.

=cut

sub dlf_streams {
    my $self = $_[0];

    my @streams = ();
    my $sth = $self->{'_dbh'}->table_info( "", "", "dlf_%", "TABLE" );
    $sth->execute();
    while ( my $table_info = $sth->fetchrow_hashref() ) {
        next unless $table_info->{'TABLE_NAME'} =~ /^dlf_(.*)/;
        next if $table_info->{'TABLE_NAME'} =~ /_links$/;
        push @streams, $1;
    }
    $sth->finish();

    return @streams;
}

=pod

=head2 has_dlf_stream( $name )

Returns true if the DlfStore contains a DlfStream conforming to the
schema $name.

=cut

sub has_dlf_stream {
    my ( $self, $name ) = @_;

    return grep { $_ eq $name } $self->dlf_streams();
}

=pod

=head2 get_stream_config( $name )

Returns the Lire::Config::Dictionary instance which contains the
stream configuration for the $name DlfStream. This method will create
an instance from the default if necessary. If $name isn't a valid
schema name, this method will throw an error.

=cut

sub get_stream_config {
    my ( $self, $name ) = @_;

    check_param( $name, 'name' );
    croak "schema '$name' doesn't exist"
      unless Lire::DlfSchema->has_schema( $name );

    my $streams_config = $self->{'_config'}->get( 'streams_config' );
    foreach my $config ( $streams_config->elements() ) {
        return $config if $config->name eq $name;
    }

    my $config = $streams_config->spec()->get( $name )->instance();
    $streams_config->append( $config );

    return $config;
}

=pod

=head2 configured_dlf_streams()

Returns the list of DlfStreams which are referenced by the store
configuration. This is the result of dlf_streams() plus all schemas
that exists and are used by one of the confiured ImportJob or
DlfStreamSpec.

=cut

sub configured_dlf_streams {
    my $self = $_[0];

    my @streams = ( $self->dlf_streams(), @{ $self->_import_job_streams() } );

    my %done = ();
    while ( defined( my $s = shift @streams ) ) {
        next unless Lire::DlfSchema->has_schema( $s );
        next if exists $done{$s};
        push @streams, @{ $self->_analyser_streams( $s ) };
        $done{$s} = 1;
    }
    return sort keys %done;
}

sub _analyser_streams {
    my ( $self, $name ) = @_;

    my @streams = ();

    my $cfg = $self->get_stream_config( $name );
    foreach my $comp ( $cfg->spec()->components() ) {
        next unless $comp->isa( 'Lire::Config::DlfAnalyserSpec' );
        push @streams, $comp->name()
          if $cfg->get( $comp->name() )->get_plugin() ne 'none';
    }

    return \@streams;
}

sub _import_job_streams {
    my $self = $_[0];

    my @streams = ();
    foreach my $job ( @{ $self->import_jobs() } ) {
        my $name = $job->converter();
        push @streams, Lire::PluginManager->get_plugin( 'dlf_converter', $name )->schemas()
          if Lire::PluginManager->has_plugin( 'dlf_converter', $name );
    }
    return \@streams;
}

=pod

=head2 open_dlf_stream( $name, [$mode], [$sort_spec] )

Returns a Lire::DlfStream object that can be used to a DLF stream in
the schema $name. The $mode parameter can either "r" or "w" and
defaults to "r".

If the mode is "w", the Lire::DlfStream is opened for writing (new Dlf
records can be inserted). If the DlfStore didn't contains a stream
$name, an empty one will be created.

If the mode is "r", the Lire::DlfStream is opened for reading (no DLF
records may be inserted). An exception will be thrown if there is no
stream $name available.

When the stream is opened in "r" mode, a sort order can be specified
by passing a $sort_spec. A $sort_spec is a white-space delimited list
of field names which should specify the sort order. Reverse sort order
can be specified by prefixing the field's name by '-'.

=cut

sub open_dlf_stream {
    my ( $self, $name, $mode, $sort_spec ) = @_;

    check_param( $name, 'name' );

    $mode ||= "r";

    croak "mode should be either 'r' or 'w' not '$mode'"
      unless $mode eq 'r' or $mode eq 'w';

    croak "no DLF stream '$name' in this store"
      if ( $mode eq 'r' && ! $self->has_dlf_stream( $name ) );

    croak "only 'r' mode can use a sort_spec"
      if ( $mode ne 'r' && $sort_spec );

    return new Lire::DlfStream( $self, $name, $mode, $sort_spec );

}


=pod

=head2 run_analysers( $stream, [ $dlf_source ] )

Runs all the configured analysers on the DLF stream $stream.
$dlf_source limit the records which should be analysed.

=cut

sub run_analysers {
    my ( $self, $stream, $dlf_source ) = @_;

    check_param( $stream, 'stream' );

    croak "no DLF stream '$stream' in this store"
      unless $self->has_dlf_stream( $stream );

    my $config = $self->get_stream_config( $stream )->as_value();
    while ( my ( $stream, $analyser ) = each %$config ) {
        next if $stream eq 'keep_days';
        next if $analyser->{'plugin'} eq 'none';

        my $process = new Lire::DlfAnalyserProcess( $self,
                                                    $analyser->{'plugin'},
                                                    $analyser->{'properties'},
                                                    $dlf_source );
        $process->run_analysis_job();
        $self->run_analysers( $stream, $process->job_id() );
    }

    return;
}

=pod

=head2 clean_streams()

Remove records that are days older than what is configured in the
'keep_days' parameter.

=cut

sub clean_streams {
    my ($self, $time) = @_;

    $time ||= time;

    foreach my $name ( $self->dlf_streams() ) {
        my $config = $self->get_stream_config( $name );
        my $keep_days = $config->get( 'keep_days' )->as_value();
        next unless $keep_days;
        my $time = $time - 86400 * $keep_days;
        my $stream = $self->open_dlf_stream( $name, 'w' );
        $stream->clean( $time );
        $stream->close();
    }
    return;
}

sub _dbh {
    return $_[0]{'_dbh'};
}

=pod

=head1 REPORTS RELATED METHOD

A DlfStore also contains the periodically generated reports. These
reports are usually configured through the lire(1) command. The
generated reports are stored in Lire XML native format by period.

=head2 put_report( $job, $schedule, $report )

Saves the $report Lire::Report which was generated by $job and
$schedule. Returns the filename where the report was saved.

=cut

sub put_report {
    my ( $self, $job, $schedule, $report ) = @_;

    check_object_param( $job, 'job', 'Lire::ReportJob' );
    check_object_param( $schedule, 'schedule', 'Lire::ReportSchedule' );
    check_object_param( $report, 'report', 'Lire::Report' );

    my $file = $self->_report_filename( $job->name(), $schedule->period(),
                                        $report->timespan_start() );

    my $dir = dirname( $file );
    mkpath( [ $dir ], 0, 0755 )
      unless -d $dir;

    CORE::open my $fh, "> $file"
      or die "failed to open '$file' for writing: $!\n";
    $report->write_report( $fh );
    CORE::close $fh;

    return $file;
}

=pod

=head2 find_report_source( $job, $schedule, [$time] )

This method will returns an hash reference containing parameters on
how the ReportJob $job should be generated for the ReportSchedule
$schedule. The boundaries of the period are determined according to
$time which detauls to now.

The hash reference will contain a 'source' key as well as possible
others:

=over

=item source

Specify from what kind of source the report should be generated.
Possible values are 'dlf' when the report should be generated from
DLF, 'merging' when the report should be generated by merging previous
XML reports and 'none' when no data is available for that period and
the report cannot be generated.

Note that 'merging' is only available as a source for schedule above
'daily'. i.e. 'hourly' and 'daily' reports can only be generated from
DLF. Also of important is the fact that only 'daily' reports are
available as a source of merging. So you need a daily report schedule
to generate 'weekly', 'monthly' and 'yearly' schedule by merging.

=item start

The timestamp at which the available data for the period starts. This
can be higher than the requested period's starting boundary.

=item end

The timestamp at which the available data for the period ends. This
can be lower than the requested period's ending boundary.

=item coverage

An integer between 0 and 100 representing the ratio of data available
for the requested period. For example, if only 6 hours of data are
available for a 'daily' report, the 'coverage' will be 25.

=item reports

This key is only available when 'source' is 'merging'. It contains an
array of report files which should be used as the source of merging.

=item days

This key is only available when 'source' is 'merging'. It contains the
days for which a reports was available for merging. For example, if
only 6 daily reports were available to generate a 'weekly' report,
this will contain those 6 days dates in ISO format (%Y-%m-%d).

=cut

sub find_report_source {
    my ( $self, $job, $schedule, $time ) = @_;

    $time ||= time();

    check_object_param( $job, 'job', 'Lire::ReportJob' );
    check_object_param( $schedule, 'schedule', 'Lire::ReportSchedule' );

    my $dlf_source = $self->_find_dlf_source( $job, $schedule, $time );
    return $dlf_source
      if $schedule->period() =~ /^(hourly|daily)$/;

    my $merging_source = $self->_find_merging_source( $job, $schedule, $time );
    return ( $merging_source->{'coverage'} > $dlf_source->{'coverage'} 
             ? $merging_source 
             : $dlf_source );
}

sub _find_dlf_source {
    my ( $self, $job, $schedule, $time ) = @_;

    my $avail_start = 0;
    my $avail_end = 2**31;
    my $range = $schedule->period_range( $time );
    foreach my $schema ( @{ $schedule->report_config()->schemas() } ) {
        next unless $self->has_dlf_stream( $schema );

        my $stream = $self->open_dlf_stream( $schema, 'r' );
        my $start = $stream->start_time();
        my $end   = $stream->end_time();
        $stream->close();
        next unless defined $start;

        next if ( $start > $range->[1]
                  || $end < $range->[0] );

        $avail_start = max( $range->[0],  $start, $avail_start );
        $avail_end   = min( $range->[1], $end, $avail_end );
    }
    return { 'source' => 'none', 'coverage' => 0 }
      unless $avail_start;

    return { 'source' => 'dlf',
             'start' => $avail_start,
             'end' => $avail_end,
             'coverage' => int( ( $avail_end - $avail_start ) * 100
                                / ( $range->[1] - $range->[0] ) ),
           };
}

sub _find_merging_source {
    my ( $self, $job, $schedule, $time ) = @_;

    my $source = { 'source' => 'merging',
                   'start'  => undef,
                   'end' => undef,
                   'reports' => [],
                   'days' => [],
                   'coverage' => 0 };

    my $range = $schedule->period_range( $time );
    for ( my $day=$range->[0]; $day < $range->[1]; $day += 86400 ) {
        my $file = $self->_report_filename( $job->name(), 'daily', $day );
        if ( -f $file ) {
            $source->{'start'} = $day 
              unless defined $source->{'start'};
            $source->{'end'} = $day + 86400;
            push @{$source->{'reports'}}, $file;
            push @{$source->{'days'}}, strftime( '%Y-%m-%d', localtime $day);
        }
    }
    if ( $source->{'start'} ) {
        $source->{'coverage'} =
          int( ( $source->{'end'} - $source->{'start'}) * 100
               / ( $range->[1] - $range->[0] ) );
    }
    return $source;
}

sub _report_filename {
    my ( $self, $report, $period, $time ) = @_;

    check_param( $report, 'report' );
    check_param( $period, 'period', qr/^(hourly|daily|weekly|monthly|yearly)$/,
                "'period' parameter should be one of 'hourly', 'daily', 'weekly', 'monthly' or 'yearly'" );
    check_param( $time, 'time', qr/^\d+$/ );

    my $fmt = "";
    if ( $period eq 'hourly' ) {
        $fmt = "%Y%m/%d/%H.xml";
    }  elsif ( $period eq 'daily' ) {
        $fmt = "%Y%m/%d.xml";
    }  elsif ( $period eq 'weekly' ) {
        $fmt = "%Y/%V.xml";
    }  elsif ( $period eq 'monthly' ) {
        $fmt = "%Y/%m.xml";
    }  elsif ( $period eq 'yearly' ) {
        $fmt = "%Y.xml";
    } else {
        die "shouldn't be reached";
    }

    my $calc = new Lire::WeekCalculator();
    return $self->{'_store_path'} . "/${period}_reports/$report/" .
      $calc->strfdate( $fmt, localtime( $time ) );
}

# keep perl happy
1;

__END__

=pod

=head1 SEE ALSO

Lire::DlfConverter(3pm)

=head1 AUTHOR

  Francis J. Lacoste <flacoste@logreport.org>

=head1 VERSION

$Id: DlfStore.pm,v 1.39 2006/07/23 13:16:29 vanbaal Exp $

=head1 COPYRIGHT

Copyright (C) 2002,2004 Stichting LogReport Foundation LogReport@LogReport.org

This file is part of Lire.

Lire is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program (see COPYING); if not, check with
http://www.gnu.org/copyleft/gpl.html.

=cut



syntax highlighted by Code2HTML, v. 0.9.1