# Base class for all socket types # # Copyright 2004, Danga Interactive, Inc. # Copyright 2005-2007, Six Apart, Ltd. package Perlbal::Socket; use strict; use warnings; no warnings qw(deprecated); use Perlbal::HTTPHeaders; use Sys::Syscall; use POSIX (); use Danga::Socket 1.44; use base 'Danga::Socket'; use fields ( 'headers_string', # headers as they're being read 'req_headers', # the final Perlbal::HTTPHeaders object inbound 'res_headers', # response headers outbound (Perlbal::HTTPHeaders object) 'create_time', # creation time 'alive_time', # last time noted alive 'state', # general purpose state; used by descendants. 'do_die', # if on, die and do no further requests 'read_buf', # arrayref of scalarref read from client 'read_ahead', # bytes sitting in read_buf 'read_size', # total bytes read from client, ever 'ditch_leading_rn', # if true, the next header parsing will ignore a leading \r\n ); use constant MAX_HTTP_HEADER_LENGTH => 102400; # 100k, arbitrary use constant TRACK_OBJECTS => 0; # see @created_objects below if (TRACK_OBJECTS) { use Scalar::Util qw(weaken isweak); } # kick-off one cleanup _do_cleanup(); our %state_changes = (); # { "objref" => [ state, state, state, ... ] } our $last_callbacks = 0; # time last ran callbacks our $callbacks = []; # [ [ time, subref ], [ time, subref ], ... ] # this one deserves its own section. we keep track of every Perlbal::Socket object # created if the TRACK_OBJECTS constant is on. we use weakened references, though, # so this list will hopefully contain mostly undefs. users can ask for this list if # they want to work with it via the get_created_objects_ref function. our @created_objects; # ( $ref, $ref, $ref ... ) our $last_co_cleanup = 0; # clean the list every few seconds sub get_statechange_ref { return \%state_changes; } sub get_created_objects_ref { return \@created_objects; } sub write_debuggy { my $self = shift; my $cref = $_[0]; my $content = ref $cref eq "SCALAR" ? $$cref : $cref; my $clen = defined $content ? length($content) : "undef"; $content = substr($content, 0, 17) . "..." if defined $content && $clen > 30; my ($pkg, $filename, $line) = caller; print "write($self, <$clen>\"$content\") from ($pkg, $filename, $line)\n" if Perlbal::DEBUG >= 4; $self->SUPER::write(@_); } if (Perlbal::DEBUG >= 4) { *write = \&write_debuggy; } sub new { my Perlbal::Socket $self = shift; $self = fields::new( $self ) unless ref $self; Perlbal::objctor($self); $self->SUPER::new( @_ ); $self->{headers_string} = ''; $self->{state} = undef; $self->{do_die} = 0; $self->{read_buf} = []; # arrayref of scalar refs of bufs read from client $self->{read_ahead} = 0; # bytes sitting in read_buf $self->{read_size} = 0; # total bytes read from client my $now = time; $self->{alive_time} = $self->{create_time} = $now; # now put this item in the list of created objects if (TRACK_OBJECTS) { # clean the created objects list if necessary if ($last_co_cleanup < $now - 5) { # remove out undefs, because those are natural byproducts of weakening # references @created_objects = grep { $_ } @created_objects; # however, the grep turned our weak references back into strong ones, so # we have to reweaken them weaken($_) foreach @created_objects; # we've cleaned up at this point $last_co_cleanup = $now; } # now add this one to our cleaned list and weaken it push @created_objects, $self; weaken($created_objects[-1]); } return $self; } # FIXME: this doesn't scale in theory, but it might use less CPU in # practice than using the Heap:: modules and manipulating the # expirations all the time, thus doing things properly # algorithmically. and this is definitely less work, so it's worth # a try. sub _do_cleanup { my $sf = Perlbal::Socket->get_sock_ref; my $now = time; my @to_close; while (my $k = each %$sf) { my Perlbal::Socket $v = $sf->{$k}; my $max_age = eval { $v->max_idle_time } || 0; next unless $max_age; if ($v->{alive_time} < $now - $max_age) { push @to_close, $v; } } foreach my $sock (@to_close) { $sock->close("perlbal_timeout") } Danga::Socket->AddTimer(5, \&_do_cleanup); } # CLASS METHOD: given a delay (in seconds) and a subref, this will call # that subref in AT LEAST delay seconds. if the subref returns 0, the # callback is discarded, but if it returns a positive number, the callback # is pushed onto the callback stack to be called again in at least that # many seconds. sub register_callback { # adds a new callback to our list my ($delay, $subref) = @_; push @$callbacks, [ time + $delay, $subref ]; return 1; } # CLASS METHOD: runs through the list of registered callbacks and executes # any that need to be executed # FIXME: this doesn't scale. need a heap. sub run_callbacks { my $now = time; return if $last_callbacks == $now; $last_callbacks = $now; my @destlist = (); foreach my $ref (@$callbacks) { # if their time is <= now... if ($ref->[0] <= $now) { # find out if they want to run again... my $rv = $ref->[1]->(); # and if they do, push onto list... push @destlist, [ $rv + $now, $ref->[1] ] if defined $rv && $rv > 0; } else { # not time for this one, just shove it push @destlist, $ref; } } $callbacks = \@destlist; } # CLASS METHOD: # default is for sockets to never time out. classes # can override. sub max_idle_time { 0; } # Socket: specific to HTTP socket types (only here and not in # ClientHTTPBase because ClientManage wants it too) sub read_request_headers { read_headers($_[0], 0); } sub read_response_headers { read_headers($_[0], 1); } sub read_headers { my Perlbal::Socket $self = shift; my $is_res = shift; print "Perlbal::Socket::read_headers($self) is_res=$is_res\n" if Perlbal::DEBUG >= 2; my $sock = $self->{sock}; my $to_read = MAX_HTTP_HEADER_LENGTH - length($self->{headers_string}); my $bref = $self->read($to_read); unless (defined $bref) { # client disconnected print " client disconnected\n" if Perlbal::DEBUG >= 3; return $self->close('remote_closure'); } $self->{headers_string} .= $$bref; my $idx = index($self->{headers_string}, "\r\n\r\n"); my $delim_len = 4; # can't find the header delimiter? check for LFLF header delimiter. if ($idx == -1) { $idx = index($self->{headers_string}, "\n\n"); $delim_len = 2; } # still can't find the header delimiter? if ($idx == -1) { # usually we get the headers all in one packet (one event), so # if we get in here, that means it's more than likely the # extra \r\n and if we clean it now (throw it away), then we # can avoid a regexp later on. if ($self->{ditch_leading_rn} && $self->{headers_string} eq "\r\n") { print " throwing away leading \\r\\n\n" if Perlbal::DEBUG >= 3; $self->{ditch_leading_rn} = 0; $self->{headers_string} = ""; return 0; } print " can't find end of headers\n" if Perlbal::DEBUG >= 3; $self->close('long_headers') if length($self->{headers_string}) >= MAX_HTTP_HEADER_LENGTH; return 0; } my $hstr = substr($self->{headers_string}, 0, $idx); print " pre-parsed headers: [$hstr]\n" if Perlbal::DEBUG >= 3; my $extra = substr($self->{headers_string}, $idx+$delim_len); if (my $len = length($extra)) { print " pushing back $len bytes after header\n" if Perlbal::DEBUG >= 3; $self->push_back_read(\$extra); } # some browsers send an extra \r\n after their POST bodies that isn't # in their content-length. a base class can tell us when they're # on their 2nd+ request after a POST and tell us to be ready for that # condition, and we'll clean it up $hstr =~ s/^\r\n// if $self->{ditch_leading_rn}; unless (($is_res ? $self->{res_headers} : $self->{req_headers}) = Perlbal::HTTPHeaders->new(\$hstr, $is_res)) { # bogus headers? close connection. print " bogus headers\n" if Perlbal::DEBUG >= 3; return $self->close("parse_header_failure"); } print " got valid headers\n" if Perlbal::DEBUG >= 3; $Perlbal::reqs++ unless $is_res; $self->{ditch_leading_rn} = 0; return $is_res ? $self->{res_headers} : $self->{req_headers}; } ### METHOD: drain_read_buf_to( $destination ) ### Write read-buffered data (if any) from the receiving object to the ### I object. sub drain_read_buf_to { my ($self, $dest) = @_; return unless $self->{read_ahead}; while (my $bref = shift @{$self->{read_buf}}) { print "draining readbuf from $self to $dest: [$$bref]\n" if Perlbal::DEBUG >= 3; $dest->write($bref); $self->{read_ahead} -= length($$bref); } } ### METHOD: die_gracefully() ### By default, if we're in persist_wait state, close. Else, ignore. Children ### can override if they want to do some other processing. sub die_gracefully { my Perlbal::Socket $self = $_[0]; if ($self->state eq 'persist_wait') { $self->close('graceful_shutdown'); } $self->{do_die} = 1; } ### METHOD: close() ### Set our state when we get closed. sub close { my Perlbal::Socket $self = $_[0]; $self->state('closed'); return $self->SUPER::close($_[1]); } ### METHOD: state() ### If you pass a parameter, sets the state, else returns it. sub state { my Perlbal::Socket $self = shift; return $self->{state} unless @_; push @{$state_changes{"$self"} ||= []}, $_[0] if Perlbal::TRACK_STATES; return $self->{state} = $_[0]; } sub as_string_html { my Perlbal::Socket $self = shift; return $self->SUPER::as_string; } sub DESTROY { my Perlbal::Socket $self = shift; delete $state_changes{"$self"} if Perlbal::TRACK_STATES; Perlbal::objdtor($self); } # package function (not a method). returns bytes sent, or -1 on error. our $sf_defined = Sys::Syscall::sendfile_defined; our $max_sf_readwrite = 128 * 1024; sub sendfile { my ($sfd, $fd, $bytes) = @_; return Sys::Syscall::sendfile($sfd, $fd, $bytes) if $sf_defined; # no support for sendfile. ghetto version: read and write. my $buf; $bytes = $max_sf_readwrite if $bytes > $max_sf_readwrite; my $rv = POSIX::read($fd, $buf, $bytes); return -1 unless defined $rv; return -1 unless $rv == $bytes; my $wv = POSIX::write($sfd, $buf, $rv); return -1 unless defined $wv; if (my $over_read = $rv - $wv) { POSIX::lseek($fd, -$over_read, &POSIX::SEEK_CUR); } return $wv; } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: