#!/usr/bin/perl -w
#
# This test starts off with several very full queues; as the
# test continues, eventually the queues are exhausted and the
# qprocs are left idle, waiting for enqueued data.
use constant WRONG_OS => ($^O =~ /^(mswin|dos|os2)/oi);
use constant HAS_REQS => eval {
require POE; require POE::Component::Server::TCP; require POE::Filter::Line;
};
use constant RUN_TEST => (HAS_REQS && !WRONG_OS);
use Test;
BEGIN { plan tests => RUN_TEST ? 21 : 0 };
exit unless (RUN_TEST);
use lib '../lib'; if (-d 't') { chdir 't'; }
use IPC::DirQueue;
use lib '.'; use lib 't'; use Util;
start_indexd();
system ("rm -rf log/qdir");
mkdir ("log");
mkdir ("log/qdir");
my $bq = IPC::DirQueue->new({
dir => 'log/qdir',
indexd_uri => $indexd_uri
});
ok ($bq);
unlink ("log/counter");
$| = 1;
my @pids = ();
for my $i (0 .. 4) {
my $pid = fork();
if ($pid) {
push (@pids, $pid);
ok (1);
}
else {
start_writer();
exit;
}
}
sleep 4;
for my $i (0 .. 4) {
my $pid = fork();
if ($pid) {
push (@pids, $pid);
ok (1);
}
else {
start_worker();
exit;
}
}
sleep 4;
for my $i (0 .. 300) {
sleep 1;
my $count = (-s "log/counter");
if (!defined $count) {
warn "log/counter disappeared: $@ $!";
system ("ls -l log/counter");
die;
}
print "count: $count at $i in qdir: ".count_qdir()."\n";
if ($count && $count == 500) {
last;
}
}
ok (1);
kill (15, @pids);
for my $i (0 .. 4) {
waitpid ($pids[$i], 0) or die "waitpid failed";
ok (1);
}
my $left_in_qdir = count_qdir();
ok ($left_in_qdir == 0) or system("ls -l log/qdir/queue");
stop_indexd();
exit;
use Time::HiRes qw(sleep);
sub start_worker {
my $k = 0;
print "worker $$: forked\n";
while (1) {
my $job = $bq->wait_for_queued_job();
if (!$job) { next; }
$k++;
print "starting $k in $$: data: ".$job->get_data_path()."\n";
if ($k > 60) {
sleep 0.1;
} else {
sleep 0.3;
}
open (COUNT, ">>log/counter"); print COUNT "."; close COUNT;
$job->finish();
# print "finished $k in $$\n";
}
}
sub start_writer {
for my $j (1 .. 100) {
$bq->enqueue_string ("hello world! $$", { foo => "bar $$" });
my $delay = (0.01 + ($j * 0.01));
# print "writer sleeping $delay in $$\n";
sleep $delay;
}
}
sub count_qdir {
opendir (DIR, "log/qdir/queue");
my @count = grep { !/^\.\.?$/ } readdir(DIR);
closedir DIR;
return scalar @count;
}
syntax highlighted by Code2HTML, v. 0.9.1