# POPFILE LOADABLE MODULE package POPFile::MQ; use POPFile::Module; @ISA = ( "POPFile::Module" ); #---------------------------------------------------------------------------- # # This module handles POPFile's message queue. Every POPFile::Module is # able to register with the MQ for specific message types and can also # send messages without having to know which modules need to receive # its messages. # # Message delivery is asynchronous and guaranteed, as well as guaranteed # first in, first out (FIFO) per process. # # The following public functions are defined: # # register() - register for a specific message type and pass an object # reference. will call that object's deliver() method to # deliver messages # # post() - send a message of a specific type # # The current list of types is # # UIREG Register a UI component, message is the component type # and the element and reference to the # object registering (comes from any component) # # TICKD Occurs when an hour has passed since the last TICKD (this # is generated by the POPFile::Logger module) # # LOGIN Occurs when a proxy logs into a remote server, the message # is the username sent # # COMIT Sent when an item is committed to the history through a call # to POPFile::History::commit_slot # # RELSE Sent when a session key is being released by a client # # Copyright (c) 2001-2006 John Graham-Cumming # # This file is part of POPFile # # POPFile is free software; you can redistribute it and/or modify it # under the terms of version 2 of the GNU General Public License as # published by the Free Software Foundation. # # POPFile is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with POPFile; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # #---------------------------------------------------------------------------- use strict; use warnings; use locale; use POSIX ":sys_wait_h"; #---------------------------------------------------------------------------- # new # # Class new() function #---------------------------------------------------------------------------- sub new { my $type = shift; my $self = POPFile::Module->new(); # These are the individual queues of message, indexed by type # and written to by post(). $self->{queue__} = {}; # These are the registered objects for each type $self->{waiters__} = {}; # List of file handles to read from active children, this # maps the PID for each child to its associated pipe handle $self->{children__} = {}; # Record the parent process ID so that we can tell when post is # called whether we are in a child process or not $self->{pid__} = $$; bless $self, $type; $self->name( 'mq' ); return $self; } #---------------------------------------------------------------------------- # # service # # Called to handle pending tasks for the module. Here we flush all queues # #---------------------------------------------------------------------------- sub service { my ( $self ) = @_; # See if any of the children have passed up messages through their # pipes and deal with it now for my $kid (keys %{$self->{children__}}) { $self->flush_child_data_( $self->{children__}{$kid} ); } # Iterate through all the messages in all the queues for my $type (sort keys %{$self->{queue__}}) { while ( my $ref = shift @{$self->{queue__}{$type}} ) { my @message = @$ref; my $flat = join(':', @message); $self->log_( 2, "Message $type ($flat) ready for delivery" ); for my $waiter (@{$self->{waiters__}{$type}}) { $self->log_( 2, "Delivering message $type ($flat) to " . $waiter->name() ); $waiter->deliver( $type, @message ); } } } return 1; } #---------------------------------------------------------------------------- # # stop # # Called when POPFile is closing down, this is the last method that # will get called before the object is destroyed. There is not return # value from stop(). # #---------------------------------------------------------------------------- sub stop { my ( $self ) = @_; # Call service() so that any remaining items are flushed and delivered $self->service(); for my $kid (keys %{$self->{children__}}) { close $self->{children__}{$kid}; delete $self->{children__}{$kid}; } } #---------------------------------------------------------------------------- # # yield_ # # Called by a child process to allow the parent to do work, this only # does anything in the case where we didn't fork for the child process # #---------------------------------------------------------------------------- sub yield_ { my ( $self, $pipe, $pid ) = @_; if ( $pid != 0 ) { $self->flush_child_data_( $pipe ) } } #---------------------------------------------------------------------------- # # forked # # This is called when some module forks POPFile and is within the # context of the child process so that this module can close any # duplicated file handles that are not needed. # # $writer The writing end of a pipe that can be used to send up from # the child # # There is no return value from this method # #---------------------------------------------------------------------------- sub forked { my ( $self, $writer ) = @_; $self->{writer__} = $writer; for my $kid (keys %{$self->{children__}}) { close $self->{children__}{$kid}; delete $self->{children__}{$kid}; } } #---------------------------------------------------------------------------- # # postfork # # This is called when some module has just forked POPFile. It is # called in the parent process. # # $pid The process ID of the new child process # $reader The reading end of a pipe that can be used to read messages # from the child # # There is no return value from this method # #---------------------------------------------------------------------------- sub postfork { my ( $self, $pid, $reader ) = @_; $self->{children__}{"$pid"} = $reader; } #---------------------------------------------------------------------------- # # reaper # # Called when a child process terminates somewhere in POPFile. The # object should check to see if it was one of its children and do any # necessary processing by calling waitpid() on any child handles it # has # # There is no return value from this method # #---------------------------------------------------------------------------- sub reaper { my ( $self ) = @_; # Look for children that have completed and then flush the data # from their associated pipe and see if any of our children have # data ready to read from their pipes, my @kids = keys %{$self->{children__}}; if ( $#kids >= 0 ) { for my $kid (@kids) { if ( waitpid( $kid, &WNOHANG ) == $kid ) { $self->flush_child_data_( $self->{children__}{$kid} ); close $self->{children__}{$kid}; delete $self->{children__}{$kid}; $self->log_( 0, "Done with $kid (" . scalar(keys %{$self->{children__}}) . " to go)" ); } } } } #---------------------------------------------------------------------------- # # read_pipe_ # # reads a single message from a pipe in a cross-platform way. # returns undef if the pipe has no message # # $handle The handle of the pipe to read # #---------------------------------------------------------------------------- sub read_pipe_ { my ( $self, $handle ) = @_; if ( $^O eq "MSWin32" ) { # bypasses bug in -s $pipe under ActivePerl my $message; # PROFILE PLATFORM START MSWin32 if ( &{ $self->{pipeready_} }($handle) ) { # add data to the pipe cache whenever the pipe is ready sysread($handle, my $string, -s $handle); # push messages onto the end of our cache $self->{pipe_cache__} .= $string; } # pop the oldest message; $message = $1 if (defined($self->{pipe_cache__}) && ( $self->{pipe_cache__} =~ s/(.*?\n)// ) ); return $message; # PROFILE PLATFORM STOP } else { # do things normally if ( &{ $self->{pipeready_} }($handle) ) { return <$handle>; } } return undef; } #---------------------------------------------------------------------------- # # flush_child_data_ # # Called to flush data from the pipe of each child as we go, I did # this because there appears to be a problem on Windows where the pipe # gets a lot of read data in it and then causes the child not to be # terminated even though we are done. Also this is nice because we # deal with the messages on the fly # # $handle The handle of the child's pipe # #---------------------------------------------------------------------------- sub flush_child_data_ { my ( $self, $handle ) = @_; my $stats_changed = 0; my $message; while ( ($message = $self->read_pipe_( $handle )) && defined($message) ) { if ( $message =~ /([^:]+):([^\r\n]*)/ ) { my @parameters = split( ':', $2 || '' ); $self->post( $1, @parameters ); } } } #---------------------------------------------------------------------------- # # register # # When a module wants to receive specific message types it calls this # method with the type of message is wants to receive and the address # of a callback function that will receive the messages # # $type A string identifying the message type # $callback Reference to a function that takes three parameters # #---------------------------------------------------------------------------- sub register { my ( $self, $type, $callback ) = @_; push @{$self->{waiters__}{$type}}, ( $callback ); } #---------------------------------------------------------------------------- # # post # # Called to send a message through the message queue # # $type A string identifying the message type # @message The message (list of parameters) # #---------------------------------------------------------------------------- sub post { my ( $self, $type, @message ) = @_; my $flat = join( ':', @message ); $self->log_( 2, "post $type ($flat)" ); # If we are in the parent process then just stick this on the queue, # otherwise write it up the pipe. if ( $$ == $self->{pid__} ) { if ( exists( $self->{waiters__}{$type} ) ) { $self->log_( 2, "queuing post $type ($flat)" ); push @{$self->{queue__}{$type}}, \@message; $self->log_( 2, "$type queue length now " . $#{$self->{queue__}{$type}} ); } else { $self->log_( 2, "dropping post $type ($flat)" ); } } else { my $pipe = $self->{writer__}; $self->log_( 2, "sending post $type ($flat) to parent $pipe" ); print $pipe "$type:$flat\n"; } } 1;