###################################################################### # HTTP Connection from a reverse proxy client # # Copyright 2004, Danga Interactive, Inc. # Copyright 2005-2007, Six Apart, Ltd. # package Perlbal::ClientProxy; use strict; use warnings; use base "Perlbal::ClientHTTPBase"; no warnings qw(deprecated); use Perlbal::ChunkedUploadState; use fields ( 'backend', # Perlbal::BackendHTTP object (or undef if disconnected) 'backend_requested', # true if we've requested a backend for this request 'reconnect_count', # number of times we've tried to reconnect to backend 'high_priority', # boolean; 1 if we are or were in the high priority queue 'low_priority', # boolean; 1 if we are or were in the low priority queue 'reproxy_uris', # arrayref; URIs to reproxy to, in order 'reproxy_expected_size', # int: size of response we expect to get back for reproxy 'currently_reproxying', # arrayref; the host info and URI we're reproxying right now 'content_length_remain', # int: amount of data we're still waiting for 'responded', # bool: whether we've already sent a response to the user or not 'last_request_time', # int: time that we last received a request 'primary_res_hdrs', # if defined, we are doing a transparent reproxy-URI # and the headers we get back aren't necessarily # the ones we want. instead, get most headers # from the provided res headers object here. 'is_buffering', # bool; if we're buffering some/all of a request to memory/disk 'is_writing', # bool; if on, we currently have an aio_write out 'start_time', # hi-res time when we started getting data to upload 'bufh', # buffered upload filehandle object 'bufilename', # string; buffered upload filename 'bureason', # string; if defined, the reason we're buffering to disk 'buoutpos', # int; buffered output position 'backend_stalled', # boolean: if backend has shut off its reads because we're too slow. 'unread_data_waiting', # boolean: if we shut off reads while we know data is yet to be read from client 'chunked_upload_state', # bool/obj: if processing a chunked upload, Perlbal::ChunkedUploadState object, else undef 'request_body_length', # integer: request's body length, either as-declared, # or calculated after chunked upload is complete # for perlbal sending out UDP packets related to upload status (for xmlhttprequest upload bar) 'last_upload_packet', # unixtime we last sent a UDP upload packet 'upload_session', # client's self-generated upload session # error-retrying stuff 'retry_count', # number of times we've retried this request so far after getting 500 errors ); use constant READ_SIZE => 131072; # 128k, ~common TCP window size? use constant READ_AHEAD_SIZE => 32768; # kinda arbitrary. sum of these two is max stored per connection while waiting for backend. use Errno qw( EPIPE ENOENT ECONNRESET EAGAIN ); use POSIX qw( O_CREAT O_TRUNC O_RDWR O_RDONLY ); use Time::HiRes qw( gettimeofday tv_interval ); my $udp_sock; # ClientProxy sub new { my ($class, $service, $sock) = @_; my $self = $class; $self = fields::new($class) unless ref $self; $self->SUPER::new($service, $sock); # init base fields Perlbal::objctor($self); bless $self, ref $class || $class; $self->init; $self->watch_read(1); return $self; } sub new_from_base { my $class = shift; my Perlbal::ClientHTTPBase $cb = shift; bless $cb, $class; $cb->init; $cb->watch_read(1); $cb->handle_request; return $cb; } sub init { my Perlbal::ClientProxy $self = $_[0]; $self->{last_request_time} = 0; $self->{backend} = undef; $self->{high_priority} = 0; $self->{low_priority} = 0; $self->{responded} = 0; $self->{unread_data_waiting} = 0; $self->{content_length_remain} = undef; $self->{backend_requested} = 0; $self->{is_buffering} = 0; $self->{is_writing} = 0; $self->{start_time} = undef; $self->{bufh} = undef; $self->{bufilename} = undef; $self->{buoutpos} = 0; $self->{bureason} = undef; $self->{chunked_upload_state} = undef; $self->{request_body_length} = undef; $self->{reproxy_uris} = undef; $self->{reproxy_expected_size} = undef; $self->{currently_reproxying} = undef; $self->{retry_count} = 0; } # given a service name, re-request (GET/HEAD only) to that service, even though # you've already done a request to your original service sub start_reproxy_service { my Perlbal::ClientProxy $self = $_[0]; my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1]; my $svc_name = $_[2]; my $svc = $svc_name ? Perlbal->service($svc_name) : undef; unless ($svc) { $self->_simple_response(404, "Vhost twiddling not configured for requested pair."); return 1; } $self->{backend_requested} = 0; $self->{backend} = undef; $self->{res_headers} = $primary_res_hdrs; $svc->adopt_base_client($self); } # call this with a string of space separated URIs to start a process # that will fetch the item at the first and return it to the user, # on failure it will try the second, then third, etc sub start_reproxy_uri { my Perlbal::ClientProxy $self = $_[0]; my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1]; my $urls = $_[2]; # at this point we need to disconnect from our backend $self->{backend} = undef; # failure if we have no primary response headers return unless $self->{primary_res_hdrs} ||= $primary_res_hdrs; # construct reproxy_uri list if (defined $urls) { my @uris = split /\s+/, $urls; $self->{currently_reproxying} = undef; $self->{reproxy_uris} = []; foreach my $uri (@uris) { next unless $uri =~ m!^http://(.+?)(?::(\d+))?(/.*)?$!; push @{$self->{reproxy_uris}}, [ $1, $2 || 80, $3 || '/' ]; } } # if we get in here and we have currently_reproxying defined, then something # happened and we want to retry that one if ($self->{currently_reproxying}) { unshift @{$self->{reproxy_uris}}, $self->{currently_reproxying}; $self->{currently_reproxying} = undef; } # if we have no uris in our list now, tell the user 404 return $self->_simple_response(503) unless @{$self->{reproxy_uris} || []}; # set the expected size if we got a content length in our headers if ($primary_res_hdrs && (my $expected_size = $primary_res_hdrs->header('X-REPROXY-EXPECTED-SIZE'))) { $self->{reproxy_expected_size} = $expected_size; } # pass ourselves off to the reproxy manager $self->state('wait_backend'); Perlbal::ReproxyManager::do_reproxy($self); } # called by the reproxy manager when we can't get to our requested backend sub try_next_uri { my Perlbal::ClientProxy $self = $_[0]; shift @{$self->{reproxy_uris}}; $self->{currently_reproxying} = undef; $self->start_reproxy_uri(); } # returns true if this ClientProxy is too many bytes behind the backend sub too_far_behind_backend { my Perlbal::ClientProxy $self = $_[0]; my Perlbal::BackendHTTP $backend = $self->{backend} or return 0; # if a backend doesn't have a service, it's a # ReproxyManager-created backend, and thus it should use the # 'buffer_size_reproxy_url' parameter for acceptable buffer # widths, and not the regular 'buffer_size'. this lets people # tune buffers depending on the types of webservers. (assumption # being that reproxied-to webservers are event-based and it's okay # to tie the up longer in favor of using less buffer memory in # perlbal) my $max_buffer = defined $backend->{service} ? $self->{service}->{buffer_size} : $self->{service}->{buffer_size_reproxy_url}; return $self->{write_buf_size} > $max_buffer; } # this is a callback for when a backend has been created and is # ready for us to do something with it sub use_reproxy_backend { my Perlbal::ClientProxy $self = $_[0]; my Perlbal::BackendHTTP $be = $_[1]; # get a URI my $datref = $self->{currently_reproxying} = shift @{$self->{reproxy_uris}}; unless (defined $datref) { # return error and close the backend $be->close('invalid_uris'); return $self->_simple_response(503); } # now send request $self->{backend} = $be; $be->{client} = $self; my $extra_hdr = ""; if (my $range = $self->{req_headers}->header("Range")) { $extra_hdr .= "Range: $range\r\n"; } if (my $host = $self->{req_headers}->header("Host")) { $extra_hdr .= "Host: $host\r\n"; } my $req_method = $self->{req_headers}->request_method eq 'HEAD' ? 'HEAD' : 'GET'; my $headers = "$req_method $datref->[2] HTTP/1.0\r\nConnection: keep-alive\r\n${extra_hdr}\r\n"; $be->{req_headers} = Perlbal::HTTPHeaders->new(\$headers); $be->state('sending_req'); $self->state('backend_req_sent'); $be->write($be->{req_headers}->to_string_ref); $be->watch_read(1); $be->watch_write(1); } # this is called when a transient backend getting a reproxied URI has received # a response from the server and is ready for us to deal with it sub backend_response_received { my Perlbal::ClientProxy $self = $_[0]; my Perlbal::BackendHTTP $be = $_[1]; # a response means that we are no longer currently waiting on a reproxy, and # don't want to retry this URI $self->{currently_reproxying} = undef; # we fail if we got something that's NOT a 2xx code, OR, if we expected # a certain size and got back something different my $code = $be->{res_headers}->response_code + 0; my $bad_code = sub { return 0 if $code >= 200 && $code <= 299; return 0 if $code == 416; return 1; }; my $bad_size = sub { return 0 unless defined $self->{reproxy_expected_size}; return $self->{reproxy_expected_size} != $be->{res_headers}->header('Content-length'); }; if ($bad_code->() || $bad_size->()) { # fall back to an alternate URL $be->{client} = undef; $be->close('non_200_reproxy'); $self->try_next_uri; return 1; } return 0; } sub start_reproxy_file { my Perlbal::ClientProxy $self = shift; my $file = shift; # filename to reproxy my Perlbal::HTTPHeaders $hd = shift; # headers from backend, in need of cleanup # at this point we need to disconnect from our backend $self->{backend} = undef; # call hook for pre-reproxy return if $self->{service}->run_hook("start_file_reproxy", $self, \$file); # set our expected size if (my $expected_size = $hd->header('X-REPROXY-EXPECTED-SIZE')) { $self->{reproxy_expected_size} = $expected_size; } # start an async stat on the file $self->state('wait_stat'); Perlbal::AIO::aio_stat($file, sub { # if the client's since disconnected by the time we get the stat, # just bail. return if $self->{closed}; my $size = -s _; unless ($size) { # FIXME: POLICY: 404 or retry request to backend w/o reproxy-file capability? return $self->_simple_response(404); } if (defined $self->{reproxy_expected_size} && $self->{reproxy_expected_size} != $size) { # 404; the file size doesn't match what we expected return $self->_simple_response(404); } # if the thing we're reproxying is indeed a file, advertise that # we support byteranges on it if (-f _) { $hd->header("Accept-Ranges", "bytes"); } my ($status, $range_start, $range_end) = $self->{req_headers}->range($size); my $not_satisfiable = 0; if ($status == 416) { $hd = Perlbal::HTTPHeaders->new_response(416); $hd->header("Content-Range", $size ? "bytes */$size" : "*"); $not_satisfiable = 1; } # change the status code to 200 if the backend gave us 204 No Content $hd->code(200) if $hd->response_code == 204; # fixup the Content-Length header with the correct size (application # doesn't need to provide a correct value if it doesn't want to stat()) if ($status == 200) { $hd->header("Content-Length", $size); } elsif ($status == 206) { $hd->header("Content-Range", "bytes $range_start-$range_end/$size"); $hd->header("Content-Length", $range_end - $range_start + 1); $hd->code(206); } # don't send this internal header to the client: $hd->header('X-REPROXY-FILE', undef); # rewrite some other parts of the header $self->setup_keepalive($hd); # just send the header, now that we cleaned it. $self->{res_headers} = $hd; $self->write($hd->to_string_ref); if ($self->{req_headers}->request_method eq 'HEAD' || $not_satisfiable) { $self->write(sub { $self->http_response_sent; }); return; } $self->state('wait_open'); Perlbal::AIO::aio_open($file, 0, 0 , sub { my $fh = shift; # if client's gone, just close filehandle and abort if ($self->{closed}) { CORE::close($fh) if $fh; return; } # handle errors if (! $fh) { # FIXME: do 500 vs. 404 vs whatever based on $! ? return $self->_simple_response(500); } # seek if partial content if ($status == 206) { sysseek($fh, $range_start, &POSIX::SEEK_SET); $size = $range_end - $range_start + 1; } $self->reproxy_fh($fh, $size); $self->watch_write(1); }); }); } # Client # get/set backend proxy connection sub backend { my Perlbal::ClientProxy $self = shift; return $self->{backend} unless @_; my $backend = shift; $self->state('draining_res') unless $backend; return $self->{backend} = $backend; } # invoked by backend when it wants us to start watching for reads again # and feeding it data (if we have any) sub backend_ready { my Perlbal::ClientProxy $self = $_[0]; my Perlbal::BackendHTTP $be = $_[1]; # if we'd turned ourselves off while we waited for a backend, turn # ourselves back on, because the backend is ready for data now. if ($self->{unread_data_waiting}) { $self->watch_read(1); } # normal, not-buffered-to-disk case: return $self->drain_read_buf_to($be) unless $self->{bureason}; # buffered-to-disk case. # tell the backend it has to go into buffered_upload_mode, # which makes it inform us of its writable availability $be->invoke_buffered_upload_mode; } # our backend enqueues a call to this method in our write buffer, so this is called # right after we've finished sending all of the results to the user. at this point, # if we were doing keep-alive, we don't close and setup for the next request. sub backend_finished { my Perlbal::ClientProxy $self = shift; print "ClientProxy::backend_finished\n" if Perlbal::DEBUG >= 3; # mark ourselves as having responded (presumeably if we're here, # the backend has responded already) $self->{responded} = 1; # our backend is done with us, so we disconnect ourselves from it $self->{backend} = undef; # backend is done sending data to us, so we can recycle this clientproxy # if we don't have any data yet to read return $self->http_response_sent unless $self->{unread_data_waiting}; # if we get here (and we do, rarely, in practice) then that means # the backend read was empty/disconected (or otherwise messed up), # and the only thing we can really do is close the client down. $self->close("backend_finished_while_unread_data"); } # called when we've sent a response to a user fully and we need to reset state sub http_response_sent { my Perlbal::ClientProxy $self = $_[0]; # persistence logic is in ClientHTTPBase return 0 unless $self->SUPER::http_response_sent; print "ClientProxy::http_response_sent -- resetting state\n" if Perlbal::DEBUG >= 3; if (my $be = $self->{backend}) { $self->{backend} = undef; $be->forget_client; } # if we get here we're being persistent, reset our state $self->{backend_requested} = 0; $self->{high_priority} = 0; $self->{reproxy_uris} = undef; $self->{reproxy_expected_size} = undef; $self->{currently_reproxying} = undef; $self->{content_length_remain} = undef; $self->{primary_res_hdrs} = undef; $self->{responded} = 0; $self->{is_buffering} = 0; $self->{is_writing} = 0; $self->{start_time} = undef; $self->{bufh} = undef; $self->{bufilename} = undef; $self->{buoutpos} = 0; $self->{bureason} = undef; $self->{upload_session} = undef; $self->{chunked_upload_state} = undef; $self->{request_body_length} = undef; return 1; } # to request a backend connection AFTER you've already done so, if you # didn't like the results from the first one. (like after a 500 error) sub rerequest_backend { my Perlbal::ClientProxy $self = shift; $self->{backend_requested} = 0; $self->{backend} = undef; $self->request_backend; } sub request_backend { my Perlbal::ClientProxy $self = shift; return if $self->{backend_requested}; $self->{backend_requested} = 1; $self->state('wait_backend'); $self->{service}->request_backend_connection($self); $self->tcp_cork(1); # cork writes to self } # Client (overrides and calls super) sub close { my Perlbal::ClientProxy $self = shift; my $reason = shift; # don't close twice return if $self->{closed}; # signal that we're done $self->{service}->run_hooks('end_proxy_request', $self); # kill our backend if we still have one if (my $backend = $self->{backend}) { print "Client ($self) closing backend ($backend)\n" if Perlbal::DEBUG >= 1; $self->backend(undef); $backend->close($reason ? "proxied_from_client_close:$reason" : "proxied_from_client_close"); } else { # if no backend, tell our service that we don't care for one anymore $self->{service}->note_client_close($self); } # call ClientHTTPBase's close $self->SUPER::close($reason); } sub client_disconnected { # : void my Perlbal::ClientProxy $self = shift; print "ClientProxy::client_disconnected\n" if Perlbal::DEBUG >= 2; # if client disconnected, then we need to turn off watching for # further reads and purge the existing upload if any. also, we # should just return and do nothing else. $self->watch_read(0); $self->purge_buffered_upload if $self->{bureason}; return $self->close('user_disconnected'); } # Client sub event_write { my Perlbal::ClientProxy $self = shift; print "ClientProxy::event_write\n" if Perlbal::DEBUG >= 3; $self->SUPER::event_write; # obviously if we're writing the backend has processed our request # and we are responding/have responded to the user, so mark it so $self->{responded} = 1; # trigger our backend to keep reading, if it's still connected if ($self->{backend_stalled} && (my $backend = $self->{backend})) { print " unstalling backend\n" if Perlbal::DEBUG >= 3; $self->{backend_stalled} = 0; $backend->watch_read(1); } } # ClientProxy sub event_read { my Perlbal::ClientProxy $self = shift; print "ClientProxy::event_read\n" if Perlbal::DEBUG >= 3; # mark alive so we don't get killed for being idle $self->{alive_time} = time; # if we have no headers, the only thing we can do is try to get some if (! $self->{req_headers}) { print " no headers. reading.\n" if Perlbal::DEBUG >= 3; $self->handle_request if $self->read_request_headers; return; } # if we're buffering to disk or haven't read too much from this client, keep reading, # otherwise shut off read notifications unless ($self->{is_buffering} || $self->{read_ahead} < READ_AHEAD_SIZE) { # our buffer is full, so turn off reads for now print " disabling reads.\n" if Perlbal::DEBUG >= 3; $self->watch_read(0); return; } # deal with chunked uploads if (my $cus = $self->{chunked_upload_state}) { $cus->on_readable($self); # if we got more than 1MB not flushed to disk, # stop reading for a bit until disk catches up if ($self->{read_ahead} > 1024*1024) { $self->watch_read(0); } return; } # read more data if we're still buffering or if our current read buffer # is not full to the max READ_AHEAD_SIZE which is how much data we will # buffer in from the user before passing on to the backend # read the MIN(READ_SIZE, content_length_remain) my $read_size = READ_SIZE; my $remain = $self->{content_length_remain}; $read_size = $remain if $remain && $remain < $read_size; print " reading $read_size bytes (", (defined $remain ? $remain : "(undef)"), " bytes remain)\n" if Perlbal::DEBUG >= 3; my $bref = $self->read($read_size); # if the read returned undef, that means the connection was closed # (see: Danga::Socket::read) return $self->client_disconnected unless defined $bref; # if they didn't declare a content body length and we just got a # readable event that's not a disconnect, something's messed up. # they're overflowing us. disconnect! if (! $remain) { $self->_simple_response(400, "Can't pipeline to HTTP/1.0"); $self->close("pipelining_to_http10"); return; } # now that we know we have a defined value, determine how long it is, and do # housekeeping to keep our tracking numbers up to date. my $len = length($$bref); print " read $len bytes\n" if Perlbal::DEBUG >= 3; # when run under the program "trickle", epoll speaks the truth to # us, but then trickle interferes and steals our reads/writes, so # this fails. normally this check isn't needed. return unless $len; $self->{read_size} += $len; $self->{content_length_remain} -= $len if $remain; my $done_reading = defined $self->{content_length_remain} && $self->{content_length_remain} <= 0; my $backend = $self->backend; print(" done_reading = $done_reading, backend = ", ($backend || ""), "\n") if Perlbal::DEBUG >= 3; # upload tracking if (my $session = $self->{upload_session}) { my $cl = $self->{req_headers}->content_length; my $remain = $self->{content_length_remain}; my $now = time(); # FIXME: more efficient? if ($cl && $remain && ($self->{last_upload_packet} || 0) != $now) { my $done = $cl - $remain; $self->{last_upload_packet} = $now; $udp_sock ||= IO::Socket::INET->new(Proto => 'udp'); my $since = $self->{last_request_time}; my $send = "UPLOAD:$session:$done:$cl:$since:$now"; if ($udp_sock) { foreach my $ep (@{ $self->{service}{upload_status_listeners_sockaddr} }) { my $rv = $udp_sock->send($send, 0, $ep); } } } } # just dump the read into the nether if we're dangling. that is # the case when we send the headers to the backend and it responds # before we're done reading from the client; therefore further # reads from the client just need to be sent nowhere, because the # RFC2616 section 8.2.3 says: "the server SHOULD NOT close the # transport connection until it has read the entire request" if ($self->{responded}) { print " already responded.\n" if Perlbal::DEBUG >= 3; # in addition, if we're now out of data (clr == 0), then we should # either close ourselves or get ready for another request return $self->http_response_sent if $done_reading; print " already responded [2].\n" if Perlbal::DEBUG >= 3; # at this point, if the backend has responded then we just return # as we don't want to send it on to them or buffer it up, which is # what the code below does return; } # if we have no data left to read, stop reading. all that can # come later is an extra \r\n which we handle later when parsing # new request headers. and if it's something else, we'll bail on # the next request, not this one. if ($done_reading) { Carp::confess("content_length_remain less than zero: self->{content_length_remain}") if $self->{content_length_remain} < 0; $self->{unread_data_waiting} = 0; $self->watch_read(0); } # now, if we have a backend, then we should be writing it to the backend # and not doing anything else if ($backend) { print " got a backend. sending write to it.\n" if Perlbal::DEBUG >= 3; $backend->write($bref); # TODO: monitor the backend's write buffer depth? return; } # now, we know we don't have a backend, so we have to push this data onto our # read buffer... it's not going anywhere yet push @{$self->{read_buf}}, $bref; $self->{read_ahead} += $len; print " no backend. read_ahead = $self->{read_ahead}.\n" if Perlbal::DEBUG >= 3; # if we know we've already started spooling a file to disk, then continue # to do that. print " bureason = $self->{bureason}\n" if Perlbal::DEBUG >= 3 && $self->{bureason}; return $self->buffered_upload_update if $self->{bureason}; # if we are under our buffer-to-memory size, just continue buffering here and # don't fall through to the backend request call below return if ! $done_reading && $self->{read_ahead} < $self->{service}->{buffer_backend_connect}; # over the buffer-to-memory size, see if we should start spooling to disk. return if $self->{service}->{buffer_uploads} && $self->decide_to_buffer_to_disk; # give plugins a chance to act on the request before we request a backend # (added by Chris Hondl , March 2006) my $svc = $self->{service}; return if $svc->run_hook('proxy_read_request', $self); # if we fall through to here, we need to ensure that a backend is on the # way, because no specialized handling took over above print " finally requesting a backend\n" if Perlbal::DEBUG >= 3; return $self->request_backend; } sub handle_request { my Perlbal::ClientProxy $self = shift; my $req_hd = $self->{req_headers}; my $svc = $self->{service}; # give plugins a chance to force us to bail return if $svc->run_hook('start_proxy_request', $self); return if $svc->run_hook('start_http_request', $self); if ($self->handle_chunked_upload) { # handled in method. } else { # if defined we're waiting on some amount of data. also, we have to # subtract out read_size, which is the amount of data that was # extra in the packet with the header that's part of the body. $self->{request_body_length} = $self->{content_length_remain} = $req_hd->content_length; $self->{unread_data_waiting} = 1 if $self->{content_length_remain}; } # upload-tracking stuff. both starting a new upload track session, # and checking on status of ongoing one return if $svc->{upload_status_listeners} && $self->handle_upload_tracking; # note that we've gotten a request $self->{requests}++; $self->{last_request_time} = $self->{alive_time}; # either start buffering some of the request to memory, or # immediately request a backend connection. if ($self->{chunked_upload_state}) { $self->{request_body_length} = 0; $self->{is_buffering} = 1; $self->{bureason} = 'chunked'; $self->buffered_upload_update; } elsif ($self->{content_length_remain} && $self->{service}->{buffer_backend_connect}) { # the deeper path $self->start_buffering_request; } else { # get the backend request process moving, since we aren't buffering $self->{is_buffering} = 0; # if reproxy-caching is enabled, we can often bypass needing to allocate a BackendHTTP connection: return if $svc->{reproxy_cache} && $self->satisfy_request_from_cache; $self->request_backend; } } sub handle_chunked_upload { my Perlbal::ClientProxy $self = shift; my $req_hd = $self->{req_headers}; my $te = $req_hd->header("Transfer-Encoding"); return unless $te && $te eq "chunked"; return unless $self->{service}->{buffer_uploads}; $req_hd->header("Transfer-Encoding", undef); # remove it (won't go to backend) my $eh = $req_hd->header("Expect"); if ($eh && $eh =~ /\b100-continue\b/) { $self->write(\ "HTTP/1.1 100 Continue\r\n\r\n"); $req_hd->header("Expect", undef); # remove it (won't go to backend) } my $max_size = $self->{service}{max_chunked_request_size}; my $args = { on_new_chunk => sub { my $cref = shift; my $len = length($$cref); push @{$self->{read_buf}}, $cref; $self->{read_ahead} += $len; $self->{request_body_length} += $len; # if too large, disconnect them... if ($max_size && $self->{request_body_length} > $max_size) { $self->purge_buffered_upload; $self->close; return; } $self->buffered_upload_update; }, on_disconnect => sub { $self->client_disconnected; }, on_zero_chunk => sub { $self->send_buffered_upload; }, }; $self->{chunked_upload_state} = Perlbal::ChunkedUploadState->new(%$args); return 1; } sub satisfy_request_from_cache { my Perlbal::ClientProxy $self = shift; my $req_hd = $self->{req_headers}; my $svc = $self->{service}; my $cache = $svc->{reproxy_cache}; $svc->{_stat_requests}++; my $requri = $req_hd->request_uri || ''; my $hostname = $req_hd->header("Host") || ''; my $key = "$hostname|$requri"; my $reproxy = $cache->get($key) or return 0; my ($timeout, $headers, $urls) = @$reproxy; return 0 if time() > $timeout; $svc->{_stat_cache_hits}++; my %headers = map { ref $_ eq 'SCALAR' ? $$_ : $_ } @{$headers || []}; if (my $ims = $req_hd->header("If-Modified-Since")) { my ($lm_key) = grep { uc($_) eq "LAST-MODIFIED" } keys %headers; my $lm = $headers{$lm_key} || ""; # remove the IE length suffix $ims =~ s/; length=(\d+)//; # If 'Last-Modified' is same as 'If-Modified-Since', send a 304 if ($ims eq $lm) { my $res_hd = Perlbal::HTTPHeaders->new_response(304); $res_hd->header("Content-Length", "0"); $self->tcp_cork(1); $self->state('xfer_resp'); $self->write($res_hd->to_string_ref); $self->write(sub { $self->http_response_sent; }); return 1; } } my $res_hd = Perlbal::HTTPHeaders->new_response(200); $res_hd->header("Date", HTTP::Date::time2str(time())); while (my ($key, $value) = each %headers) { $res_hd->header($key, $value); } $self->start_reproxy_uri($res_hd, $urls); return 1; } # return 1 to steal this connection (when they're asking status of an # upload session), return 0 to return it to handle_request's control. sub handle_upload_tracking { my Perlbal::ClientProxy $self = shift; my $req_hd = $self->{req_headers}; return 0 unless $req_hd->request_uri =~ /[\?&]client_up_sess=(\w{5,50})\b/; my $sess = $1; # getting status? if ($req_hd->request_uri =~ m!^/__upload_status\?!) { my $status = Perlbal::UploadListener::get_status($sess); my $now = time(); my $body = $status ? "{done:$status->{done},total:$status->{total},starttime:$status->{starttime},nowtime:$now}" : "{}"; my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200); $res->header("Content-Type", "text/plain"); $res->header('Content-Length', length $body); $self->setup_keepalive($res); $self->tcp_cork(1); # cork writes to self $self->write($res->to_string_ref); $self->write(\ $body); $self->write(sub { $self->http_response_sent; }); return 1; } # otherwise just tagging this upload as a new upload session $self->{upload_session} = $sess; return 0; } # continuation of handle_request, in the case where we need to start buffering # a bit of the request body to memory, either hoping that's all of it, or to # make a determination of whether or not we should save it all to disk first sub start_buffering_request { my Perlbal::ClientProxy $self = shift; # buffering case: $self->{is_buffering} = 1; # shortcut: if we know that we're buffering by size, and the size # of this upload is bigger than that value, we can just turn on spool # to disk right now... if ($self->{service}->{buffer_uploads} && $self->{service}->{buffer_upload_threshold_size}) { my $req_hd = $self->{req_headers}; if ($req_hd->content_length >= $self->{service}->{buffer_upload_threshold_size}) { $self->{bureason} = 'size'; if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) { $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', 'size'); } $self->state('buffering_upload'); $self->buffered_upload_update; return; } } # well, we're buffering, but we're not going to disk just yet (but still might) $self->state('buffering_request'); # only need time if we are using the buffer to disk functionality $self->{start_time} = [ gettimeofday() ] if $self->{service}->{buffer_uploads}; } # looks at our states and decides if we should start writing to disk # or should just go ahead and blast this to the backend. returns 1 # if the decision was made to buffer to disk sub decide_to_buffer_to_disk { my Perlbal::ClientProxy $self = shift; return unless $self->{is_buffering}; return $self->{bureason} if defined $self->{bureason}; # this is called when we have enough data to determine whether or not to # start buffering to disk my $dur = tv_interval($self->{start_time}) || 1; my $rate = $self->{read_ahead} / $dur; my $etime = $self->{content_length_remain} / $rate; # see if we have enough data to make the determination my $reason = undef; # see if we blow the rate away if ($self->{service}->{buffer_upload_threshold_rate} > 0 && $rate < $self->{service}->{buffer_upload_threshold_rate}) { # they are slower than the minimum rate $reason = 'rate'; } # and finally check estimated time exceeding if ($self->{service}->{buffer_upload_threshold_time} > 0 && $etime > $self->{service}->{buffer_upload_threshold_time}) { # exceeds $reason = 'time'; } unless ($reason) { $self->{is_buffering} = 0; return 0; } # start saving it to disk $self->state('buffering_upload'); $self->buffered_upload_update; $self->{bureason} = $reason; if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) { $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', $reason); } return 1; } # take ourselves and send along our buffered data to the backend sub send_buffered_upload { my Perlbal::ClientProxy $self = shift; # make sure our buoutpos is the same as the content length... return if $self->{is_writing}; # set the content-length that goes to the backend... if ($self->{chunked_upload_state}) { $self->{req_headers}->header("Content-Length", $self->{request_body_length}); } my $clen = $self->{req_headers}->content_length; if ($clen != $self->{buoutpos}) { Perlbal::log('critical', "Content length of $clen declared but $self->{buoutpos} bytes written to disk"); return $self->_simple_response(500); } # reset our position so we start reading from the right spot $self->{buoutpos} = 0; sysseek($self->{bufh}, 0, 0); # notify that we want the backend so we get the ball rolling $self->request_backend; } sub continue_buffered_upload { my Perlbal::ClientProxy $self = shift; my Perlbal::BackendHTTP $be = shift; return unless $self && $be; # now send the data my $clen = $self->{request_body_length}; my $sent = Perlbal::Socket::sendfile($be->{fd}, fileno($self->{bufh}), $clen - $self->{buoutpos}); if ($sent < 0) { return $self->close("epipe") if $! == EPIPE; return $self->close("connreset") if $! == ECONNRESET; print STDERR "Error w/ sendfile: $!\n"; return $self->close('sendfile_error'); } $self->{buoutpos} += $sent; # if we're done, purge the file and move on if ($self->{buoutpos} >= $clen) { $be->{buffered_upload_mode} = 0; $self->purge_buffered_upload; return; } # we will be called again by the backend since buffered_upload_mode is on } # write data to disk sub buffered_upload_update { my Perlbal::ClientProxy $self = shift; return if $self->{is_writing}; return unless $self->{is_buffering} && $self->{read_ahead}; # so we're not writing now and we have data to write... unless ($self->{bufilename}) { # create a filename and see if it exists or not $self->{is_writing} = 1; my $fn = join('-', $self->{service}->name, $self->{service}->listenaddr, "client", $self->{fd}, int(rand(0xffffffff))); $fn = $self->{service}->{buffer_uploads_path} . '/' . $fn; # good, now we need to create the file Perlbal::AIO::aio_open($fn, O_CREAT | O_TRUNC | O_RDWR, 0644, sub { $self->{is_writing} = 0; $self->{bufh} = shift; # throw errors back to the user if (! $self->{bufh}) { Perlbal::log('critical', "Failure to open $fn for buffered upload output"); return $self->_simple_response(500); } # save state and info and bounce it back to write data $self->{bufilename} = $fn; $self->buffered_upload_update; }); return; } # can't proceed if we have no disk file to async write to # people reported seeing this crash rarely in production... # must be a race between previously in-flight's write # re-invoking a write immediately after something triggered # a buffered upload purge. unless ($self->{bufh}) { $self->close; return; } # at this point, we want to do some writing my $bref = shift(@{$self->{read_buf}}); my $len = length $$bref; $self->{read_ahead} -= $len; # so at this point we have a valid filename and file handle and should write out # the buffer that we have $self->{is_writing} = 1; Perlbal::AIO::aio_write($self->{bufh}, $self->{buoutpos}, $len, $$bref, sub { my $bytes = shift; $self->{is_writing} = 0; # check for error unless ($bytes) { Perlbal::log('critical', "Error writing buffered upload: $!. Tried to do $len bytes at $self->{buoutpos}."); return $self->_simple_response(500); } # update our count of data written $self->{buoutpos} += $bytes; # now check if we wrote less than we had in this chunk of buffer. if that's # the case then we need to reenqueue the part of the chunk that wasn't # written out and update as appropriate. if ($bytes < $len) { my $diff = $len - $bytes; unshift @{$self->{read_buf}}, \ substr($$bref, $bytes, $diff); $self->{read_ahead} += $diff; } # if we're processing a chunked upload, ... if ($self->{chunked_upload_state}) { # turn reads back on, if we haven't hit the end yet. if ($self->{unread_data_waiting} && $self->{read_ahead} < 1024*1024) { $self->watch_read(1); $self->{unread_data_waiting} = 0; } if ($self->{read_ahead} == 0 && $self->{chunked_upload_state}->hit_zero_chunk) { $self->watch_read(0); $self->send_buffered_upload; return; } } # if we're done (no clr and no read ahead!) then send it elsif ($self->{read_ahead} <= 0 && $self->{content_length_remain} <= 0) { $self->send_buffered_upload; return; } # spawn another writer! $self->buffered_upload_update; }); } # destroy any files we've created sub purge_buffered_upload { my Perlbal::ClientProxy $self = shift; # FIXME: it's reported that sometimes the two now-in-eval blocks # fail, hence the eval blocks and warnings. the FIXME is to # figure this out, why it happens sometimes. # first close our filehandle... not async eval { CORE::close($self->{bufh}); }; if ($@) { warn "Error closing file in ClientProxy::purge_buffered_upload: $@\n"; } $self->{bufh} = undef; eval { # now asyncronously unlink the file Perlbal::AIO::aio_unlink($self->{bufilename}, sub { if ($!) { # note an error, but whatever, we'll either overwrite the file later (O_TRUNC | O_CREAT) # or a cleaner will come through and do it for us someday (if the user runs one) Perlbal::log('warning', "Unable to link $self->{bufilename}: $!"); } }); }; if ($@) { warn "Error unlinking file in ClientProxy::purge_buffered_upload: $@\n"; } } # returns bool; whether backend should hide the 500 error from the client # and have us try a new backend. return true to retry, false to get a 500 error. sub should_retry_after_500 { my Perlbal::ClientProxy $self = shift; my Perlbal::BackendHTTP $be = shift; my $svc = $be->{service}; return 0 unless $svc->{enable_error_retries}; my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule}); return 0 if ++$self->{retry_count} > @sched; return 1; } # called by Backend to tell us it got a 500 error and we should retry another backend. sub retry_after_500 { my Perlbal::ClientProxy $self = shift; my Perlbal::Service $svc = shift; my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule}); my $delay = $sched[$self->{retry_count} - 1]; if ($delay) { Danga::Socket->AddTimer($delay, sub { return if $self->{closed}; $self->rerequest_backend; }); } else { $self->rerequest_backend; } } sub as_string { my Perlbal::ClientProxy $self = shift; my $ret = $self->SUPER::as_string; if ($self->{backend}) { my $ipport = $self->{backend}->{ipport}; $ret .= "; backend=$ipport"; } else { $ret .= "; write_buf_size=$self->{write_buf_size}" if $self->{write_buf_size} > 0; } $ret .= "; highpri" if $self->{high_priority}; $ret .= "; lowpri" if $self->{low_priority}; $ret .= "; responded" if $self->{responded}; $ret .= "; waiting_for=" . $self->{content_length_remain} if defined $self->{content_length_remain}; $ret .= "; reproxying" if $self->{currently_reproxying}; return $ret; } sub set_queue_low { my Perlbal::ClientProxy $self = shift; $self->{low_priority} = 1; return; } sub set_queue_high { my Perlbal::ClientProxy $self = shift; $self->{high_priority} = 1; return; } sub DESTROY { Perlbal::objdtor($_[0]); $_[0]->SUPER::DESTROY; } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: