# POPFILE LOADABLE MODULE
package POPFile::MQ;

use POPFile::Module;
@ISA = ( "POPFile::Module" );

#----------------------------------------------------------------------------
#
# This module handles POPFile's message queue.  Every POPFile::Module is
# able to register with the MQ for specific message types and can also
# send messages without having to know which modules need to receive
# its messages.
#
# Message delivery is asynchronous and guaranteed, as well as guaranteed 
# first in, first out (FIFO) per process.
#
# The following public functions are defined:
#
# register() - register for a specific message type and pass an object
#              reference.  will call that object's deliver() method to
#              deliver messages
#
# post()     - send a message of a specific type
#
# The current list of types is
#
#     UIREG    Register a UI component, message is the component type
#              and the element and reference to the
#              object registering (comes from any component)
#
#     TICKD    Occurs when an hour has passed since the last TICKD (this
#              is generated by the POPFile::Logger module)
#
#     LOGIN    Occurs when a proxy logs into a remote server, the message
#              is the username sent
#
#     COMIT    Sent when an item is committed to the history through a call
#              to POPFile::History::commit_slot
#
#    RELSE    Sent when a session key is being released by a client
#
# Copyright (c) 2001-2006 John Graham-Cumming
#
#   This file is part of POPFile
#
#   POPFile is free software; you can redistribute it and/or modify it
#   under the terms of version 2 of the GNU General Public License as
#   published by the Free Software Foundation.
#
#   POPFile is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with POPFile; if not, write to the Free Software
#   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#----------------------------------------------------------------------------

use strict;
use warnings;
use locale;

use POSIX ":sys_wait_h";

#----------------------------------------------------------------------------
# new
#
#   Class new() function
#----------------------------------------------------------------------------
sub new
{
    my $type = shift;
    my $self = POPFile::Module->new();

    # These are the individual queues of message, indexed by type
    # and written to by post().

    $self->{queue__} = {};

    # These are the registered objects for each type

    $self->{waiters__} = {};

    # List of file handles to read from active children, this
    # maps the PID for each child to its associated pipe handle

    $self->{children__}        = {};

    # Record the parent process ID so that we can tell when post is
    # called whether we are in a child process or not

    $self->{pid__}             = $$;

    bless $self, $type;

    $self->name( 'mq' );

    return $self;
}

#----------------------------------------------------------------------------
#
# service
#
# Called to handle pending tasks for the module.  Here we flush all queues
#
#----------------------------------------------------------------------------
sub service
{
    my ( $self ) = @_;

    # See if any of the children have passed up messages through their
    # pipes and deal with it now

    for my $kid (keys %{$self->{children__}}) {
        $self->flush_child_data_( $self->{children__}{$kid} );
    }

    # Iterate through all the messages in all the queues

    for my $type (sort keys %{$self->{queue__}}) {
         while ( my $ref = shift @{$self->{queue__}{$type}} ) {
             my @message = @$ref;
             my $flat = join(':', @message);

             $self->log_( 2, "Message $type ($flat) ready for delivery" );

             for my $waiter (@{$self->{waiters__}{$type}}) {
                $self->log_( 2, "Delivering message $type ($flat) to " .
                    $waiter->name() );

                $waiter->deliver( $type, @message );
            }
        }
    }

    return 1;
}

#----------------------------------------------------------------------------
#
# stop
#
# Called when POPFile is closing down, this is the last method that
# will get called before the object is destroyed.  There is not return
# value from stop().
#
#----------------------------------------------------------------------------
sub stop
{
    my ( $self ) = @_;

    # Call service() so that any remaining items are flushed and delivered

    $self->service();

    for my $kid (keys %{$self->{children__}}) {
        close $self->{children__}{$kid};
        delete $self->{children__}{$kid};
    }
}

#----------------------------------------------------------------------------
#
# yield_
#
# Called by a child process to allow the parent to do work, this only
# does anything in the case where we didn't fork for the child process
#
#----------------------------------------------------------------------------
sub yield_
{
    my ( $self, $pipe, $pid ) = @_;

    if ( $pid != 0 ) {
        $self->flush_child_data_( $pipe )
    }
}

#----------------------------------------------------------------------------
#
# forked
#
# This is called when some module forks POPFile and is within the
# context of the child process so that this module can close any
# duplicated file handles that are not needed.
#
# $writer            The writing end of a pipe that can be used to send up from
#                    the child
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub forked
{
    my ( $self, $writer ) = @_;

    $self->{writer__} = $writer;

    for my $kid (keys %{$self->{children__}}) {
        close $self->{children__}{$kid};
        delete $self->{children__}{$kid};
    }
}

#----------------------------------------------------------------------------
#
# postfork
#
# This is called when some module has just forked POPFile.  It is
# called in the parent process.
#
# $pid              The process ID of the new child process
# $reader      The reading end of a pipe that can be used to read messages
# from the child
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub postfork
{
    my ( $self, $pid, $reader ) = @_;

    $self->{children__}{"$pid"} = $reader;
}

#----------------------------------------------------------------------------
#
# reaper
#
# Called when a child process terminates somewhere in POPFile.  The
# object should check to see if it was one of its children and do any
# necessary processing by calling waitpid() on any child handles it
# has
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub reaper
{
    my ( $self ) = @_;

    # Look for children that have completed and then flush the data
    # from their associated pipe and see if any of our children have
    # data ready to read from their pipes,

    my @kids = keys %{$self->{children__}};

    if ( $#kids >= 0 ) {
        for my $kid (@kids) {
            if ( waitpid( $kid, &WNOHANG ) == $kid ) {
                $self->flush_child_data_( $self->{children__}{$kid} );
                close $self->{children__}{$kid};
                delete $self->{children__}{$kid};

                $self->log_( 0, "Done with $kid (" . scalar(keys %{$self->{children__}}) . " to go)" );
            }
        }
    }
}

#----------------------------------------------------------------------------
#
# read_pipe_
#
# reads a single message from a pipe in a cross-platform way.
# returns undef if the pipe has no message
#
# $handle   The handle of the pipe to read
#
#----------------------------------------------------------------------------
sub read_pipe_
{
    my ( $self, $handle ) = @_;

    if ( $^O eq "MSWin32" ) {

        # bypasses bug in -s $pipe under ActivePerl

        my $message;         # PROFILE PLATFORM START MSWin32

        if ( &{ $self->{pipeready_} }($handle) ) {

            # add data to the pipe cache whenever the pipe is ready

            sysread($handle, my $string, -s $handle);

            # push messages onto the end of our cache

            $self->{pipe_cache__} .= $string;
        }

        # pop the oldest message;

        $message = $1 if (defined($self->{pipe_cache__}) &&
                          ( $self->{pipe_cache__} =~ s/(.*?\n)// ) );

        return $message;        # PROFILE PLATFORM STOP
    } else {

        # do things normally

        if ( &{ $self->{pipeready_} }($handle) ) {
            return <$handle>;
        }
    }

    return undef;
}

#----------------------------------------------------------------------------
#
# flush_child_data_
#
# Called to flush data from the pipe of each child as we go, I did
# this because there appears to be a problem on Windows where the pipe
# gets a lot of read data in it and then causes the child not to be
# terminated even though we are done.  Also this is nice because we
# deal with the messages on the fly
#
# $handle   The handle of the child's pipe
#
#----------------------------------------------------------------------------

sub flush_child_data_
{
    my ( $self, $handle ) = @_;

    my $stats_changed = 0;
    my $message;

    while ( ($message = $self->read_pipe_( $handle )) && defined($message) )
    {
        if ( $message =~ /([^:]+):([^\r\n]*)/ ) {
            my @parameters = split( ':', $2 || '' );
            $self->post( $1, @parameters );
        }
    }
}

#----------------------------------------------------------------------------
#
# register
#
#   When a module wants to receive specific message types it calls this
#   method with the type of message is wants to receive and the address
#   of a callback function that will receive the messages
#
#   $type        A string identifying the message type
#   $callback    Reference to a function that takes three parameters
#
#----------------------------------------------------------------------------
sub register
{
    my ( $self, $type, $callback ) = @_;

    push @{$self->{waiters__}{$type}}, ( $callback );
}

#----------------------------------------------------------------------------
#
# post
#
#   Called to send a message through the message queue
#
#   $type        A string identifying the message type
#   @message     The message (list of parameters)
#
#----------------------------------------------------------------------------
sub post
{
    my ( $self, $type, @message ) = @_;

    my $flat = join( ':', @message );
    $self->log_( 2, "post $type ($flat)" );

    # If we are in the parent process then just stick this on the queue,
    # otherwise write it up the pipe.

    if ( $$ == $self->{pid__} ) {
        if ( exists( $self->{waiters__}{$type} ) ) {
            $self->log_( 2, "queuing post $type ($flat)" );
            push @{$self->{queue__}{$type}}, \@message;
            $self->log_( 2, "$type queue length now " . $#{$self->{queue__}{$type}} );
        } else {
            $self->log_( 2, "dropping post $type ($flat)" );
        }
    } else {
        my $pipe = $self->{writer__};
        $self->log_( 2, "sending post $type ($flat) to parent $pipe" );
        print $pipe "$type:$flat\n";
    }
}

1;


syntax highlighted by Code2HTML, v. 0.9.1