package Lire::DlfQuery; use strict; use Carp; use Lire::DataTypes qw/ check_xml_name check_int /; use Lire::DlfSchema; use Lire::DlfResult; use Lire::Utils qw/ sql_quote_name check_param check_object_param /; =pod =head1 NAME Lire::DlfQuery - Interface to specialized SQL wrappers used internally by the Lire API. =head1 SYNOPSIS use Lire::DlfQuery; =head1 DESCRIPTION =head2 new( $stream_name ) This is the constructor method for the Lire::DlfQuery class. It takes a dlf stream name and a Lire::DlfStore object as parameters. Lire::DlfStore is expected to contain the specified stream. Both parameters are mandatory. This method returns an instance of Lire::DlfQuery. =cut sub new { my ( $class, $stream_name ) = @_; check_param( $stream_name, 'stream_name' ); croak "there is no schema '$stream_name'" unless Lire::DlfSchema->has_schema( $stream_name ); my $self = bless {}, $class; $self->{'_stream_name'} = $stream_name; $self->{'_fields'} = []; $self->{'_field_refs'} = {}; $self->{'_order_by'} = undef; $self->{'_limit'} = 0; $self->{'_filter_clause'} = undef; $self->{'_filter_params'} = []; $self->{'_parent'} = undef; $self->{'_nested_queries'} = []; $self->{'_joined_streams'} = []; return $self; } =pod =head2 stream_name() Returns the DLF stream's name upon which this DlfQuery is defined. =cut sub stream_name { return $_[0]{'_stream_name'}; } sub schema { return Lire::DlfSchema::load_schema( $_[0]->{'_stream_name'} ); } =pod =head2 join_stream( $stream_name ) This method can be used to add an ExtendedSchema stream which will be joined in the query. =cut sub join_stream { my ( $self, $stream_name ) = @_; check_param( $stream_name, 'stream_name' ); croak "there is no schema: '$stream_name'" unless Lire::DlfSchema->has_schema( $stream_name ); croak "'$stream_name' is already joined" if grep { $_ eq $stream_name } @{ $self->joined_streams() }; croak "can only join streams on root query" if defined $self->{'_parent'}; my $joined_schema = Lire::DlfSchema::load_schema( $stream_name ); croak "cannot join '$stream_name' with '$self->{'_stream_name'}'" unless $self->schema()->can_join_schema( $joined_schema ); push @{$self->{'_joined_streams'}}, $stream_name; return; } =pod =head2 joined_streams() Returns as an array reference all the streams that are part of the query. =cut sub joined_streams { my $self = $_[0]; return [ $self->{'_stream_name'}, @{$self->{'_joined_streams'}} ]; } =pod =head2 release() Nested DlfQuery introduce a circular reference between the children and the parent. Call release() on the top-level DlfQuery to remove the circular when the DlfQuery object isn't needed anymore, so that perl can garbage collect the objects. =cut sub release { my $self = $_[0]; delete $self->{'_parent'}; foreach my $query ( @{ $self->nested_queries() } ) { $query->release(); } return; } sub _schema_has_field { my ( $self, $field_name ) = @_; foreach my $schema ( @{$self->joined_streams()} ) { return 1 if Lire::DlfSchema::load_schema( $schema )->has_field( $field_name ); } return 0; } sub _field_by_name { return $_[0]->{'_field_refs'}{$_[1]}; } =pod =head2 create_nested_query() This method will create and return an instance of Lire::DlfQuery nested within the current query. The returned query will be modified later with regular methods. =cut sub create_nested_query { my $self = $_[0]; croak "can't create a nested query on query containing simple fields" if ( @{$self->_fields_of_type( 'simple' ) } ); my $nested = new Lire::DlfQuery( $self->{'_stream_name'}, $self->{'_store' } ); $nested->{'_parent'} = $self; push @{$self->{'_nested_queries'}}, $nested; return $nested; } sub _root_query { my $self = $_[0]; my $root = $self; while ( defined $root->{'_parent'} ) { $root = $root->{'_parent'}; } return $root; } =pod =head2 nested_queries() This method returns a reference to an array containing all the nested queries of the current query. =cut sub nested_queries { return [ @{$_[0]{'_nested_queries'}} ]; } sub _find_field_def { my ( $self, $field_name ) = @_; my $queries = $self->_query_subtree_from_root(); foreach my $q (@$queries) { my $field = $q->_field_by_name( $field_name ); return $field if defined $field; } return undef; } =pod =head2 has_field( $field_name ) Returns true if there is a field $field_name defined in the query. =cut sub has_field { my ( $self, $field_name ) = @_; return defined $self->_find_field_def( $field_name ); } =pod =head2 field_def( $field_name ) Returns the SQL expression that is associated with the field $field_name in the query. This method will throw an exception if there is no field $field_name defined in the query. =cut sub field_def { my ( $self, $field_name ) = @_; my $field = $self->_find_field_def( $field_name ); croak "no field '$field_name' defined in the query" unless defined $field; return $field->{'field'}; } sub _fields_of_type { my ( $self, $type ) = @_; croak "type should be 'simple', 'group', 'aggr' or 'group+aggr': '$type'" unless ( $type eq 'simple' || $type eq 'group' || $type eq 'aggr' || $type eq 'group+aggr'); my @fields = (); foreach my $f ( @{ $self->{'_fields'} } ) { if ( $type eq 'group+aggr') { next unless $f->{'type'} eq 'group' || $f->{'type'} eq 'aggr'; } else { next unless $f->{'type'} eq $type; } push @fields, $f; } return \@fields; } sub _query_subtree_from_root { my $self = $_[0]; my @subtree = (); my $current = $self; while (defined $current) { unshift @subtree, $current; $current = $current->{'_parent'}; } $self->_nested_queries_subtree ( \@subtree ); return \@subtree; } sub _nested_queries_subtree { my ( $self, $subtree ) = @_; foreach my $child ( @{$self->{'_nested_queries'}} ) { push @$subtree, $child; $child->_nested_queries_subtree( $subtree ); } return; } sub _field_name { my $field = $_[0]; return (defined $field->{'alias'}) ? $field->{'alias'} : $field->{'field'}; } =pod =head2 fields() Returns the name of the fields defined as simple fields in an array reference. These are fields added through the add_field() method. =cut sub fields { my $self = $_[0]; return [ map { _field_name($_) } @{$self->_fields_of_type( 'simple' )} ]; } =pod =head2 group_fields() Returns the name of the fields defined as group fields in an array reference. These are fields added through the add_group_field() method. =cut sub group_fields { my $self = $_[0]; my @fields = @{$self->_traverse_from_parent( sub { @{$_[0]->_fields_of_type( 'group' )} } ) }; push @fields, @{$self->_fields_of_type( 'group' )}; return [ map { _field_name($_) } @fields ]; } =pod =head2 aggr_fields() Returns the name of the fields defined as aggregate fields in an array reference. These are fields added through the add_aggr_field() method. =cut sub aggr_fields { my $self = $_[0]; my $queries = [ $self ]; $self->_nested_queries_subtree( $queries ); my @fields = (); foreach my $q (@$queries) { push @fields, @{$q->_fields_of_type( 'aggr' )}; } return [ map { _field_name($_) } @fields ]; } sub _add_field { my ( $self, $field, $type, $alias ) = @_; check_param( $field, 'field' ); check_param( $type, 'type', qr/^(simple|group|aggr)$/, "type should be 'simple', 'group', 'aggr'" ); if (defined $alias) { croak "field name should contain only alphanumeric characters: '$alias'" unless ( check_xml_name ($alias) ); croak "'$alias' is already defined in the query" if ( $self->has_field( $alias ) ); } else { croak "field name should contain only alphanumeric characters: '$field'" unless ( check_xml_name ($field) ); croak "no field '$field' in '$self->{'_stream_name'}' schema" unless ( $self->_schema_has_field( $field ) || $type ne 'simple' ); croak "'$field' is already defined in the query" if ( $self->has_field( $field ) ); } my $field_ref = { 'field' => $field, 'alias' => $alias, 'type' => $type }; push @{$self->{'_fields'}}, $field_ref; $self->{'_field_refs'}{(defined $alias) ? $alias : $field} = $field_ref; return; } =pod =head2 add_field( $field_name, $expr ) Add a simple field to the current query. The first parameter will be the field name. The second represents an expression that will then be aliased with the specified field name. Only the field name is mandatory. Certain restrictions apply: simple fields cannot be added to queries containing either subqueries or group fields. =cut sub add_field { my ( $self, $field_name, $expr ) = @_; croak "calls to add_field() and add_group_field() can't be mixed " . "on the same Lire::DlfQuery" if ( @{ $self->_fields_of_type( 'group' ) } ); croak "can't add simple field to a query containing nested queries" if ( @{ $self->nested_queries() } ); if (defined $expr) { $self->_add_field( $expr, 'simple', $field_name ); } else { $self->_add_field( $field_name, 'simple' ); } return; } =pod =head2 add_aggr_field( $field_name, $aggregate ) Add a group field to the current query. The first parameter will be the field name. The second represents the aggregate expression that will then be aliased with the specified field name. Both parameters are mandatory. Aggregate field can only be added to queries which contain group fields. =cut sub add_aggr_field { my ( $self, $field_name, $aggregate ) = @_; check_param ( $aggregate, 'aggregate' ); $self->_add_field( $aggregate, 'aggr', $field_name ); return; } =pod =head2 add_group_field( $field_name, $expr ) Add a group field to the current query. The first parameter will be the field name. The second represents an expression that will then be aliased with the specified field name. Only the field name is mandatory. Certain restrictions apply: group fields cannot be added to queries containing simple fields. =cut sub add_group_field { my ( $self, $field_name, $expr ) = @_; croak "calls to add_field() and add_group_field() can't be mixed " . "on the same Lire::DlfQuery" if ( @{ $self->_fields_of_type( 'simple' ) } ); if (defined $expr) { $self->_add_field( $expr, 'group', $field_name ); } else { $self->_add_field( $field_name, 'group' ); } return; } =pod =head2 set_order_by_clause( $clause ) Sets the order clause of the query, that is, what will appear in the "ORDER BY" clause of the SQL statement. =cut sub set_order_by_clause { my ( $self, $clause ) = @_; $self->{'_sort_spec'} = undef; $self->{'_order_by'} = ( defined $clause && $clause ne '' ) ? $clause : undef; return; } =pod =head2 order_by_clause() Returns the current ORDER BY clause or undef when none was set. =cut sub order_by_clause { return $_[0]{'_order_by'}; } sub _is_valid_sort_field { my ( $self, $field ) = @_; return ( grep { $_ eq $field } @{$self->aggr_fields()}, @{$self->group_fields()}, @{$self->fields()} ) > 0; } =pod =head2 set_sort_spec( $spec ) =cut sub set_sort_spec { my ( $self, $spec ) = @_; if ( !defined $spec || $spec eq '' ) { $self->{'_sort_spec'} = undef; $self->{'_order_by'} = undef; return; } my @sfields = (); foreach my $field_spec ( split /\s+/, $spec ) { my $field = $field_spec; my $is_desc = 0; if ( substr( $field, 0, 1) eq '-' ) { $field = substr( $field, 1 ); $is_desc = 1; } croak "field '$field' unavailable for sorting" unless ( $self->_is_valid_sort_field( $field ) ); push @sfields, $is_desc ? sql_quote_name( $field ) . " DESC" : sql_quote_name( $field ); } $self->{'_sort_spec'} = $spec; $self->{'_order_by'} = join( ", ", @sfields ); return; } =pod =head2 sort_spec() Returns the sort specification that was set using set_sort_spec(). If no sort specification was set (or it was set using the set_order_by_clause() method), this will be undef. =cut sub sort_spec { return $_[0]->{'_sort_spec'}; } =pod =head2 set_limit() Sets the limit clause of the query, that is, what will appear in the "LIMIT" clause of the SQL statement. This method will also have an impact on the internal handling of the query. =cut sub set_limit { my ( $self, $limit ) = @_; croak "'limit' parameter should be an integer: '$limit'" unless ( !defined $limit || check_int( $limit ) ); $self->{'_limit'} = (defined $limit) ? $limit : 0; return; } =pod =head2 limit() Returns the maximum number of records that should be returned by the query. When no limit was set, it returns 0. =cut sub limit { return $_[0]{'_limit'}; } =pod =head2 set_filter_clause( $clause, @params ) Sets the filter clause of the query, the filter clause being the SQL restrictions on the results that will be return by the query, i.e. what will appear in the "WHERE" clause of the SQL statement. It is compatible with the DBI handling of placeholders ('?'). Of course, the amount of placeholders has to be the same of the amount of values and variables that are passed in @params. Only the clause parameter is mandatory, the number of elements in @params being dependent on the amount of placeholders specified therein. =cut sub set_filter_clause { my ( $self, $clause, @params ) = @_; $clause ||= ''; my $nbr_ph = $clause =~ tr/?/?/; my $nparams = @params; croak "invalid number of parameters: filter clause contains $nbr_ph placeholders while $nparams values provided" if ($nbr_ph != $nparams); $self->{'_filter_clause'} = ($clause ne '') ? $clause : undef; $self->{'_filter_params'} = \@params; return; } =pod =head2 filter_clause() Returns the current WHERE clause or undef when none was set. =cut sub filter_clause { return $_[0]{'_filter_clause'}; } sub _traverse_from_parent { my ( $self, $sub ) = @_; my @elmnts = (); my $cur = $self; while ( defined $cur->{'_parent'} ) { unshift @elmnts, $sub->( $cur->{'_parent'} ); $cur = $cur->{'_parent'}; } return \@elmnts; } sub _sql_select_fields { my ( $self, $is_summary ) = @_; my @fields; if ( $is_summary ) { push @fields, @{$self->_fields_of_type( 'aggr' ) }; } elsif ( @{$self->_fields_of_type( 'simple' ) } ) { push @fields, @{$self->_fields_of_type( 'simple' ) }; } else { push @fields, @{$self->_fields_of_type( 'group+aggr' ) }; } foreach my $child ( @{$self->{'_nested_queries'}} ) { push @fields, @{$child->_sql_select_fields( 1 )}; } return \@fields; } sub _sql_select_clause { my ( $self, $is_summary ) = @_; my @fields = @{$self->_traverse_from_parent( sub { @{$_[0]->_fields_of_type( 'group' )} } )}; push @fields, @{$self->_sql_select_fields( $is_summary )}; my @expr = (); foreach my $field ( @fields ) { if ( $field->{'alias'} ) { push @expr, $field->{'field'} . ' AS '. sql_quote_name( $field->{'alias'} ); } else { push @expr, sql_quote_name( $field->{'field'} ); } } return 'SELECT '.join( ",\n ", @expr ); } sub _sql_from_clause { my $self = $_[0]; my $schema = $self->schema(); my @tables = ( $schema->sql_table() ); if ( $schema->isa( 'Lire::ExtendedSchema' ) ) { unshift @tables, $schema->base()->sql_table(); } elsif ( $self->_needs_links_table() ) { push @tables, $schema->sql_table( '', '_links' ); } foreach my $name ( @{$self->{'_joined_streams'}} ) { push @tables, Lire::DlfSchema::load_schema( $name )->sql_table(); } return "FROM " . join( ", ", @tables ); } sub _needs_links_table { my $self = $_[0]; my $schema = $self->schema(); return ( $schema->isa( 'Lire::DerivedSchema' ) && grep { my $joined = Lire::DlfSchema::load_schema( $_ ); return ( ! $joined->isa( 'Lire::ExtendedSchema' ) || $joined->base() ne $schema ) } @{$self->{'_joined_streams'}} ); } sub _sql_where_clause { my $self = $_[0]; my @elmnts = @{$self->_sql_where_join_elements()}; my $filter_collector = sub { defined $_[0]{'_filter_clause'} ? $_[0]{'_filter_clause'} : () }; push @elmnts, @{$self->_traverse_from_parent( $filter_collector )}; push @elmnts, $self->{'_filter_clause'} if ( defined $self->{'_filter_clause'} ); return @elmnts ? "WHERE " . join ( " AND ", @elmnts ) : (); } sub _sql_where_join_elements { my $self = $_[0]; my $schema = $self->schema(); my @elmnts = (); if ( $self->_needs_links_table() ) { push @elmnts, $schema->sql_table() . ".dlf_id = " . $schema->sql_table( '', '_links' ) . ".src_id"; } foreach my $name ( @{ $self->joined_streams() } ) { my $joined = Lire::DlfSchema::load_schema( $name ); if ( $schema->isa( 'Lire::DerivedSchema' ) ) { if ( $joined->isa( 'Lire::ExtendedSchema' ) && $joined->base() eq $schema ) { push( @elmnts, $schema->sql_table() . '.dlf_id = ' . $joined->sql_table() . '.dlf_id' ); } elsif ( ( $joined->isa( 'Lire::ExtendedSchema' ) && $joined->base() ne $schema ) || $joined eq $schema->base() ) { push( @elmnts, $schema->sql_table( '', '_links' ) .'.link_id = ' . $joined->sql_table() . '.dlf_id' ); } } elsif ( $joined->isa( 'Lire::ExtendedSchema' ) ) { push( @elmnts, $joined->base()->sql_table() . '.dlf_id = ' . $joined->sql_table() . '.dlf_id' ); } } return \@elmnts; } sub _sql_group_by_clause { my ( $self, $is_summary ) = @_; return () if $self->{'_parent'} && @{$self->_fields_of_type( 'simple' )}; my @fields = (); if ($is_summary) { my @defs = @{$self->_traverse_from_parent( sub { @{$_[0]->_fields_of_type( 'group' )} } ) }; @fields = map { sql_quote_name( _field_name($_) ) } @defs; } else { @fields = map { sql_quote_name( $_ ) } @{$self->group_fields()}; } return @fields ? "GROUP BY " . join ( ", ", @fields ) : (); } sub _sql_order_by_clause { my ( $self, $is_summary ) = @_; my @elmnts = @{$self->_traverse_from_parent( sub { defined ($_[0]->{'_order_by'}) ? $_[0]->{'_order_by'} : () } ) }; push @elmnts, $self->{'_order_by'} if ( ! $is_summary && defined $self->{'_order_by'} ); return @elmnts ? "ORDER BY " . join ( ", ", @elmnts ) : (); } sub _sql_limit_clause { my $self = $_[0]; return $self->{'_limit'} ? 'LIMIT ' . $self->{'_limit'} : (); } sub sql_params { my $self = $_[0]; my @params = @{$self->_traverse_from_parent( sub { @{ $_[0]->{'_filter_params'} } } ) }; push @params, @{ $self->{'_filter_params'} }; return \@params; } sub _as_sql { my $self = $_[0]; return join( "\n", $self->_sql_select_clause(), $self->_sql_from_clause(), $self->_sql_where_clause(), $self->_sql_group_by_clause(), $self->_sql_order_by_clause(), $self->_sql_limit_clause(), '' ); } sub _as_summary_sql { my $self = $_[0]; return join( "\n", $self->_sql_select_clause( 1 ), $self->_sql_from_clause( 1 ), $self->_sql_where_clause( 1 ), $self->_sql_group_by_clause( 1 ), $self->_sql_order_by_clause( 1 ), '' ); } =pod =head2 execute() This method executes the query through the database interface and returns an instance of Lire::DlfResult to obtain the data. =cut sub execute { my ( $self, $store ) = @_; check_object_param( $store, 'store', 'Lire::DlfStore' ); foreach my $stream ( @{ $self->joined_streams() } ) { croak "store doesn't contain a '$stream' stream" unless $store->has_dlf_stream( $stream ); } return Lire::DlfResult->new( $self, $self->_as_sql(), $store ); } =pod =head2 execute_summary() Same as the execute() except that the query is executed as a summary query. =cut sub execute_summary { my ( $self, $store ) = @_; check_object_param( $store, 'store', 'Lire::DlfStore' ); foreach my $stream ( @{ $self->joined_streams() } ) { croak "store doesn't contain a '$stream' stream" unless $store->has_dlf_stream( $stream ); } return Lire::DlfResult->new( $self, $self->_as_summary_sql(), $store ); } # keep perl happy 1; __END__ =pod =head1 SEE ALSO Lire::DlfQuery(3pm) Lire::DlfStream(3pm) Lire::DlfStore(3pm) =head1 AUTHORS Francis J. Lacoste Wolfgang Sourdeau =head1 VERSION $Id: DlfQuery.pm,v 1.44 2006/07/23 13:16:28 vanbaal Exp $ =head1 COPYRIGHT Copyright (C) 2003 Stichting LogReport Foundation LogReport@LogReport.org This file is part of Lire. Lire is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see COPYING); if not, check with http://www.gnu.org/copyleft/gpl.html. =cut