# POPFILE LOADABLE MODULE
package POPFile::MQ;
use POPFile::Module;
@ISA = ( "POPFile::Module" );
#----------------------------------------------------------------------------
#
# This module handles POPFile's message queue. Every POPFile::Module is
# able to register with the MQ for specific message types and can also
# send messages without having to know which modules need to receive
# its messages.
#
# Message delivery is asynchronous and guaranteed, as well as guaranteed
# first in, first out (FIFO) per process.
#
# The following public functions are defined:
#
# register() - register for a specific message type and pass an object
# reference. will call that object's deliver() method to
# deliver messages
#
# post() - send a message of a specific type
#
# The current list of types is
#
# UIREG Register a UI component, message is the component type
# and the element and reference to the
# object registering (comes from any component)
#
# TICKD Occurs when an hour has passed since the last TICKD (this
# is generated by the POPFile::Logger module)
#
# LOGIN Occurs when a proxy logs into a remote server, the message
# is the username sent
#
# COMIT Sent when an item is committed to the history through a call
# to POPFile::History::commit_slot
#
# RELSE Sent when a session key is being released by a client
#
# Copyright (c) 2001-2006 John Graham-Cumming
#
# This file is part of POPFile
#
# POPFile is free software; you can redistribute it and/or modify it
# under the terms of version 2 of the GNU General Public License as
# published by the Free Software Foundation.
#
# POPFile is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with POPFile; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#----------------------------------------------------------------------------
use strict;
use warnings;
use locale;
use POSIX ":sys_wait_h";
#----------------------------------------------------------------------------
# new
#
# Class new() function
#----------------------------------------------------------------------------
sub new
{
my $type = shift;
my $self = POPFile::Module->new();
# These are the individual queues of message, indexed by type
# and written to by post().
$self->{queue__} = {};
# These are the registered objects for each type
$self->{waiters__} = {};
# List of file handles to read from active children, this
# maps the PID for each child to its associated pipe handle
$self->{children__} = {};
# Record the parent process ID so that we can tell when post is
# called whether we are in a child process or not
$self->{pid__} = $$;
bless $self, $type;
$self->name( 'mq' );
return $self;
}
#----------------------------------------------------------------------------
#
# service
#
# Called to handle pending tasks for the module. Here we flush all queues
#
#----------------------------------------------------------------------------
sub service
{
my ( $self ) = @_;
# See if any of the children have passed up messages through their
# pipes and deal with it now
for my $kid (keys %{$self->{children__}}) {
$self->flush_child_data_( $self->{children__}{$kid} );
}
# Iterate through all the messages in all the queues
for my $type (sort keys %{$self->{queue__}}) {
while ( my $ref = shift @{$self->{queue__}{$type}} ) {
my @message = @$ref;
my $flat = join(':', @message);
$self->log_( 2, "Message $type ($flat) ready for delivery" );
for my $waiter (@{$self->{waiters__}{$type}}) {
$self->log_( 2, "Delivering message $type ($flat) to " .
$waiter->name() );
$waiter->deliver( $type, @message );
}
}
}
return 1;
}
#----------------------------------------------------------------------------
#
# stop
#
# Called when POPFile is closing down, this is the last method that
# will get called before the object is destroyed. There is not return
# value from stop().
#
#----------------------------------------------------------------------------
sub stop
{
my ( $self ) = @_;
# Call service() so that any remaining items are flushed and delivered
$self->service();
for my $kid (keys %{$self->{children__}}) {
close $self->{children__}{$kid};
delete $self->{children__}{$kid};
}
}
#----------------------------------------------------------------------------
#
# yield_
#
# Called by a child process to allow the parent to do work, this only
# does anything in the case where we didn't fork for the child process
#
#----------------------------------------------------------------------------
sub yield_
{
my ( $self, $pipe, $pid ) = @_;
if ( $pid != 0 ) {
$self->flush_child_data_( $pipe )
}
}
#----------------------------------------------------------------------------
#
# forked
#
# This is called when some module forks POPFile and is within the
# context of the child process so that this module can close any
# duplicated file handles that are not needed.
#
# $writer The writing end of a pipe that can be used to send up from
# the child
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub forked
{
my ( $self, $writer ) = @_;
$self->{writer__} = $writer;
for my $kid (keys %{$self->{children__}}) {
close $self->{children__}{$kid};
delete $self->{children__}{$kid};
}
}
#----------------------------------------------------------------------------
#
# postfork
#
# This is called when some module has just forked POPFile. It is
# called in the parent process.
#
# $pid The process ID of the new child process
# $reader The reading end of a pipe that can be used to read messages
# from the child
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub postfork
{
my ( $self, $pid, $reader ) = @_;
$self->{children__}{"$pid"} = $reader;
}
#----------------------------------------------------------------------------
#
# reaper
#
# Called when a child process terminates somewhere in POPFile. The
# object should check to see if it was one of its children and do any
# necessary processing by calling waitpid() on any child handles it
# has
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub reaper
{
my ( $self ) = @_;
# Look for children that have completed and then flush the data
# from their associated pipe and see if any of our children have
# data ready to read from their pipes,
my @kids = keys %{$self->{children__}};
if ( $#kids >= 0 ) {
for my $kid (@kids) {
if ( waitpid( $kid, &WNOHANG ) == $kid ) {
$self->flush_child_data_( $self->{children__}{$kid} );
close $self->{children__}{$kid};
delete $self->{children__}{$kid};
$self->log_( 0, "Done with $kid (" . scalar(keys %{$self->{children__}}) . " to go)" );
}
}
}
}
#----------------------------------------------------------------------------
#
# read_pipe_
#
# reads a single message from a pipe in a cross-platform way.
# returns undef if the pipe has no message
#
# $handle The handle of the pipe to read
#
#----------------------------------------------------------------------------
sub read_pipe_
{
my ( $self, $handle ) = @_;
if ( $^O eq "MSWin32" ) {
# bypasses bug in -s $pipe under ActivePerl
my $message; # PROFILE PLATFORM START MSWin32
if ( &{ $self->{pipeready_} }($handle) ) {
# add data to the pipe cache whenever the pipe is ready
sysread($handle, my $string, -s $handle);
# push messages onto the end of our cache
$self->{pipe_cache__} .= $string;
}
# pop the oldest message;
$message = $1 if (defined($self->{pipe_cache__}) &&
( $self->{pipe_cache__} =~ s/(.*?\n)// ) );
return $message; # PROFILE PLATFORM STOP
} else {
# do things normally
if ( &{ $self->{pipeready_} }($handle) ) {
return <$handle>;
}
}
return undef;
}
#----------------------------------------------------------------------------
#
# flush_child_data_
#
# Called to flush data from the pipe of each child as we go, I did
# this because there appears to be a problem on Windows where the pipe
# gets a lot of read data in it and then causes the child not to be
# terminated even though we are done. Also this is nice because we
# deal with the messages on the fly
#
# $handle The handle of the child's pipe
#
#----------------------------------------------------------------------------
sub flush_child_data_
{
my ( $self, $handle ) = @_;
my $stats_changed = 0;
my $message;
while ( ($message = $self->read_pipe_( $handle )) && defined($message) )
{
if ( $message =~ /([^:]+):([^\r\n]*)/ ) {
my @parameters = split( ':', $2 || '' );
$self->post( $1, @parameters );
}
}
}
#----------------------------------------------------------------------------
#
# register
#
# When a module wants to receive specific message types it calls this
# method with the type of message is wants to receive and the address
# of a callback function that will receive the messages
#
# $type A string identifying the message type
# $callback Reference to a function that takes three parameters
#
#----------------------------------------------------------------------------
sub register
{
my ( $self, $type, $callback ) = @_;
push @{$self->{waiters__}{$type}}, ( $callback );
}
#----------------------------------------------------------------------------
#
# post
#
# Called to send a message through the message queue
#
# $type A string identifying the message type
# @message The message (list of parameters)
#
#----------------------------------------------------------------------------
sub post
{
my ( $self, $type, @message ) = @_;
my $flat = join( ':', @message );
$self->log_( 2, "post $type ($flat)" );
# If we are in the parent process then just stick this on the queue,
# otherwise write it up the pipe.
if ( $$ == $self->{pid__} ) {
if ( exists( $self->{waiters__}{$type} ) ) {
$self->log_( 2, "queuing post $type ($flat)" );
push @{$self->{queue__}{$type}}, \@message;
$self->log_( 2, "$type queue length now " . $#{$self->{queue__}{$type}} );
} else {
$self->log_( 2, "dropping post $type ($flat)" );
}
} else {
my $pipe = $self->{writer__};
$self->log_( 2, "sending post $type ($flat) to parent $pipe" );
print $pipe "$type:$flat\n";
}
}
1;
syntax highlighted by Code2HTML, v. 0.9.1