# $Id: Executer.pm 6839 2000-05-05 21:32:07Z rousskov $ # This program is copyrighted free software; you can redistribute it and/or # modify it under the same terms as Web Polygraph itself. package BB::Executer; use strict; # # executes a list of commands (sequentially or in parallel) # require FileHandle; require IO::Select; use Logger; use Exporter; @BB::Executer::ISA = qw( Exporter ); @BB::Executer::EXPORT = qw ( &RunSeq &RunPar &Step ); $SIG{CHLD} = sub { wait }; $SIG{HUP} = 'INGORE'; $SIG{PIPE} = 'IGNORE'; my $Select = new IO::Select() or die("cannot init select, $!; stopped"); my %Tasks = (); sub RunSeq { my $cmds = shift; foreach my $cmd (@{$cmds}) { my $fh = startCmd($cmd, 1); my $out = join('', $fh->getlines()); undef $fh; $cmd->read($out, length($out)); $cmd->done(0); } return undef; } sub RunPar { my ($cmds, $cbsub, $cbdata) = @_; die() unless $cmds && $cbsub; my $task = { cbsub => $cbsub, cbdata => $cbdata }; die() unless scalar @{$cmds}; # otherwise no chance to call back foreach my $cmd (@{$cmds}) { launchChild($cmd, $task); } return undef; } sub Step { my $tout = shift; my $tot_cnt = scalar $Select->handles(); my @ready = $Select->can_read($tout); my $ready_cnt = scalar @ready; &Log("activity: $ready_cnt/$tot_cnt"); foreach (@ready) { &readChild(@{$_}); } return scalar $Select->handles() ? $ready_cnt : undef; } sub launchChild { my $cmd = shift; my $task = shift; die() if defined $Tasks{$cmd}; $Tasks{$cmd} = $task; my $fh = startCmd($cmd, 0); $task->{pending} ||= {}; $task->{pending}->{$cmd} = $fh; $Select->add([$fh, $cmd]); } sub readChild { my ($fh, $cmd) = @_; # accumulate incoming data select(undef, undef, undef, 0.333); my $buffer; my $nread = sysread($fh, $buffer, 4096); $cmd->read($buffer, $nread); doneChild($cmd) if !$nread; # including !defined } sub doneChild { my ($cmd) = @_; my $task = $Tasks{$cmd} or die(); my $fh = $task->{pending}->{$cmd} or die(); $Select->remove($fh); undef $fh; # close the file $cmd->done(0); delete $task->{pending}->{$cmd}; delete $Tasks{$cmd}; if (! scalar keys %{$task->{pending}}) { # all done die() if grep { $_ == $task } values %Tasks; die() unless defined $task->{cbsub}; &{$task->{cbsub}}($task->{cbdata}); undef $task; } } use IPC::Open3; sub startCmd { my ($cmd, $seq) = @_; die($cmd) unless defined $seq; # warn($cmd->text()); my ($r, $w) = (new FileHandle, new FileHandle); my $pid = open3($w, $r, $r, $cmd->text()); undef $w; $cmd->start($pid, $seq); return $r; # my $fh = new FileHandle($cmd->text() . '|'); # die("cannot start ". $cmd->text() .", $!; stopped") unless $fh; # return $fh; } 1;