package Brackup::GPGProcManager;
use strict;
use warnings;
use Brackup::GPGProcess;
use POSIX ":sys_wait_h";
sub new {
my ($class, $iter, $target) = @_;
return bless {
chunkiter => $iter,
procs => {}, # "addr(pchunk)" => GPGProcess
target => $target,
procs_running => {}, # pid -> GPGProcess
uncollected_bytes => 0,
}, $class;
}
sub enc_chunkref_of {
my ($self, $pchunk) = @_;
my $proc = $self->{procs}{$pchunk};
unless ($proc) {
# catch iterator up to the point that was
# requested, or blow up.
my $found = 0;
my $iters = 0;
while (my $ich = $self->{chunkiter}->next) {
if ($ich == $pchunk) {
$found = 1;
last;
}
$iters++;
warn "iters = $iters\n";
}
die "Not found" unless $found;
$proc = $self->gen_process_for($pchunk);
}
while ($proc->running) {
my $pid = $self->wait_for_a_process(1) or die
"No processes were reaped!";
}
$self->_proc_summary_dump;
my $cref = $self->get_proc_chunkref($proc);
$self->_proc_summary_dump;
$self->start_some_processes;
return $cref;
}
sub start_some_processes {
my $self = shift;
# eat up any pending zombies
while ($self->wait_for_a_process(0)) {}
my $pchunk;
# TODO: make this stuff configurable/auto-tuned
while ($self->num_running_procs < 5 &&
$self->num_uncollected_bytes < 128 * 1024 * 1024 &&
($pchunk = $self->next_chunk_to_encrypt)) {
$self->_proc_summary_dump;
$self->gen_process_for($pchunk);
$self->_proc_summary_dump;
}
}
sub _proc_summary_dump {
my $self = shift;
return unless $ENV{GPG_DEBUG};
printf STDERR "num_running=%d, num_outstanding_bytes=%d\n",
$self->num_running_procs, $self->num_uncollected_bytes;
}
sub next_chunk_to_encrypt {
my $self = shift;
while (my $ev = $self->{chunkiter}->next) {
next if $ev->isa("Brackup::File");
my $pchunk = $ev;
next if $self->{target}->stored_chunk_from_inventory($pchunk);
return $pchunk;
}
return undef;
}
sub get_proc_chunkref {
my ($self, $proc) = @_;
my $cref = $proc->chunkref;
delete $self->{procs}{$proc};
$self->{uncollected_bytes} -= length($$cref);
return $cref;
}
# returns PID of a process that finished
sub wait_for_a_process {
my ($self, $block) = @_;
my $flags = $block ? 0 : WNOHANG;
my $kid = waitpid(-1, $flags);
return 0 if ! $block && $kid <= 0;
die "no child?" if $kid < 0;
return 0 unless $kid;
my $proc = $self->{procs_running}{$kid} or die "Unknown child
process $kid finished!\n";
delete $self->{procs_running}{$proc->pid} or die;
$proc->note_stopped;
$self->{uncollected_bytes} += $proc->size_on_disk;
return $kid;
}
sub num_uncollected_bytes { $_[0]{uncollected_bytes} }
sub gen_process_for {
my ($self, $pchunk) = @_;
my $proc = Brackup::GPGProcess->new($pchunk);
$self->{procs_running}{$proc->pid} = $proc;
$self->{procs}{$pchunk} = $proc;
return $proc;
}
sub num_running_procs {
my $self = shift;
return scalar keys %{$self->{procs_running}};
}
1;
syntax highlighted by Code2HTML, v. 0.9.1