# AIO abstraction layer # # Copyright 2004, Danga Interactive, Inc. # Copyright 2005-2007, Six Apart, Ltd. package Perlbal::AIO; use strict; use POSIX qw(ENOENT EACCES EBADF); use Fcntl qw(SEEK_CUR SEEK_SET SEEK_END O_RDWR O_CREAT O_TRUNC); # Try and use IO::AIO, if it's around. BEGIN { $Perlbal::OPTMOD_IO_AIO = eval "use IO::AIO 1.6 (); 1;"; } END { IO::AIO::max_parallel(0) if $Perlbal::OPTMOD_IO_AIO; } $Perlbal::AIO_MODE = "none"; $Perlbal::AIO_MODE = "ioaio" if $Perlbal::OPTMOD_IO_AIO; ############################################################################ # AIO functions available to callers ############################################################################ sub aio_readahead { my ($fh, $offset, $length, $user_cb) = @_; aio_channel_push(get_chan(), $user_cb, sub { my $cb = shift; if ($Perlbal::AIO_MODE eq "ioaio") { IO::AIO::aio_readahead($fh, $offset, $length, $cb); } else { $cb->(); } }); } sub aio_stat { my ($file, $user_cb) = @_; aio_channel_push(get_chan($file), $user_cb, sub { my $cb = shift; if ($Perlbal::AIO_MODE eq "ioaio") { IO::AIO::aio_stat($file, $cb); } else { stat($file); $cb->(); } }); } sub aio_open { my ($file, $flags, $mode, $user_cb) = @_; aio_channel_push(get_chan($file), $user_cb, sub { my $cb = shift; if ($Perlbal::AIO_MODE eq "ioaio") { IO::AIO::aio_open($file, $flags, $mode, $cb); } else { my $fh; my $rv = sysopen($fh, $file, $flags, $mode); $cb->($rv ? $fh : undef); } }); } sub aio_unlink { my ($file, $user_cb) = @_; aio_channel_push(get_chan($file), $user_cb, sub { my $cb = shift; if ($Perlbal::AIO_MODE eq "ioaio") { IO::AIO::aio_unlink($file, $cb); } else { my $rv = unlink($file); $rv = $rv ? 0 : -1; $cb->($rv); } }); } sub aio_write { # 0 1 2 3(data) 4 my ($fh, $offset, $length, undef, $user_cb) = @_; return no_fh($user_cb) unless $fh; my $alist = \@_; aio_channel_push(get_chan(), $user_cb, sub { my $cb = shift; if ($Perlbal::AIO_MODE eq "ioaio") { IO::AIO::aio_write($fh, $offset, $length, $alist->[3], 0, $cb); } else { my $old_off = sysseek($fh, 0, SEEK_CUR); sysseek($fh, $offset, 0); my $rv = syswrite($fh, $alist->[3], $length, 0); sysseek($fh, $old_off, SEEK_SET); $cb->($rv); } }); } sub aio_read { # 0 1 2 3(data) 4 my ($fh, $offset, $length, undef, $user_cb) = @_; return no_fh($user_cb) unless $fh; my $alist = \@_; aio_channel_push(get_chan(), $user_cb, sub { my $cb = shift; if ($Perlbal::AIO_MODE eq "ioaio") { IO::AIO::aio_read($fh, $offset, $length, $alist->[3], 0, $cb); } else { my $old_off = sysseek($fh, 0, SEEK_CUR); sysseek($fh, $offset, 0); my $rv = sysread($fh, $alist->[3], $length, 0); sysseek($fh, $old_off, SEEK_SET); $cb->($rv); } }); } ############################################################################ # AIO channel stuff # prevents all AIO threads from being consumed by requests for same # failing/overloaded disk by isolating them into separate 'channels' in # parent process and not dispatching more than the max in-flight count # allows. think of a channel as a named queue. or in reality, a disk. ############################################################################ my %chan_outstanding; # $channel_name -> $num_in_flight my %chan_pending; # $channel_name -> [ [$subref, $cb], .... ] my %chan_hitmaxdepth; # $channel_name -> $times_enqueued (not dispatched immediately) my %chan_submitct; # $channel_name -> $times_submitted (total AIO requests for this channel) my $use_aio_chans = 0; # keep them off for now, until mogstored code is ready to use them my $file_to_chan_hook; # coderef that returns $chan_name given a $filename my %chan_concurrency; # $channel_name -> concurrency per channel # (cache. definitive version via function call) sub get_aio_stats { my $ret = {}; foreach my $c (keys %chan_outstanding) { $ret->{$c} = { cur_running => $chan_outstanding{$c}, ctr_queued => $chan_hitmaxdepth{$c} || 0, ctr_total => $chan_submitct{$c}, }; } foreach my $c (keys %chan_pending) { my $rec = $ret->{$c} ||= {}; $rec->{cur_queued} = scalar @{$chan_pending{$c}}; } return $ret; } # (external API). set trans hook, but also enables AIO channels. sub set_file_to_chan_hook { $file_to_chan_hook = shift; # coderef that returns $chan_name given a $filename $use_aio_chans = 1; } # internal API: sub aio_channel_push { my ($chan, $user_cb, $action) = @_; # if we were to do it immediately, bypassing AIO channels (future option?) unless ($use_aio_chans) { $action->($user_cb); return; } # IO::AIO/etc only take one callback. so we wrap the user # (caller) function with our own that first calls theirs, then # does our bookkeeping and queue management afterwards. my $wrapped_cb = sub { $user_cb->(@_); $chan_outstanding{$chan}--; aio_channel_cond_run($chan); }; # in case this is the first time this queue has been used, init stuff: my $chanpend = ($chan_pending{$chan} ||= []); $chan_outstanding{$chan} ||= 0; $chan_submitct{$chan}++; my $max_out = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan); if ($chan_outstanding{$chan} < $max_out) { $chan_outstanding{$chan}++; $action->($wrapped_cb); return; } else { # too deep. enqueue. $chan_hitmaxdepth{$chan}++; push @$chanpend, [$action, $wrapped_cb]; } } sub aio_chan_max_concurrent { my ($chan) = @_; return 100 if $chan eq '[default]'; return 10; } sub aio_channel_cond_run { my ($chan) = @_; my $chanpend = $chan_pending{$chan} or return; my $max_out = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan); my $job; while ($chan_outstanding{$chan} < $max_out && ($job = shift @$chanpend)) { $chan_outstanding{$chan}++; $job->[0]->($job->[1]); } } my $next_chan; sub set_channel { $next_chan = shift; } sub set_file_for_channel { my ($file) = @_; if ($file_to_chan_hook) { $next_chan = $file_to_chan_hook->($file); } else { $next_chan = undef; } } # gets currently-set channel, then clears it. or if none set, # lets registered hook set the channel name from the optional # $file parameter. the default channel, '[default]' has no limits sub get_chan { return undef unless $use_aio_chans; my ($file) = @_; set_file_for_channel($file) if $file; if (my $chan = $next_chan) { $next_chan = undef; return $chan; } return "[default]"; } ############################################################################ # misc util functions ############################################################################ sub _fh_of_fd_mode { my ($fd, $mode) = @_; return undef unless defined $fd && $fd >= 0; #TODO: use the write MODE for the given $mode; my $fh = IO::Handle->new_from_fd($fd, 'r+'); my $num = fileno($fh); return $fh; } sub no_fh { my $cb = shift; my $i = 1; my $stack_trace = ""; while (my ($pkg, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller($i++)) { $stack_trace .= " at $filename:$line $subroutine\n"; } Perlbal::log("crit", "Undef \$fh: $stack_trace"); $cb->(undef); return undef; } 1;