/* $Id: proxy.c,v 1.18 2006/11/18 09:20:55 maxim Exp $ * */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "configure.h" #if SUPPORT_PROXY #include "common/settings.h" #include "common/misc.h" #include "common/str.h" #include "core.h" #include "httpd.h" #include "config.h" #include "flood.h" #include "proxy.h" #if SUPPORT_CACHE #include "cache.h" #endif #if MODEL == MODEL_KEVENT #include #endif #define REQUESTS_STEP 32 static char *states[] = { "UNDEF", "CONNECT", "WRITE_HEADER_BE", "WRITE_POST", "READ_RESPONSE", "READ_RESPONSE_H", "COPY_HEADER", "READ_BODY", "WAIT_MEMORY", "FINISHED", "FAULT" }; static ProxyRequest* requests = NULL; static ProxyRequest* tail = NULL; static ProxyRequest* freeRequests = NULL; static ProxyRequest** requestsBlocks = NULL; static int nRequests = 0, maxRequests = 0, nRequestsBlocks = 0, failed = 0; static Buffer* proxyBuffer = NULL; #if SUPPORT_ELAPSED #include "common/pairs.h" static Pairs* elapsed; typedef struct Elapsed_ { int count; int elapsed; } Elapsed; Elapsed* freeElapsed(Elapsed* e) { return FREE(e); } #endif char initProxyRequests() { int n = 0; Backend* backend; for (n = 0, backend = config->backends; backend; n++, backend = backend->next); n = (n * config->proxyConnections / REQUESTS_STEP) + 1; if (!(requestsBlocks = REALLOC(requestsBlocks, n * sizeof(ProxyRequest*)))) return FAILED; if (!proxyBuffer && !(proxyBuffer = newBuffer(INITIAL_BUFFER_SIZE))) return FAILED; #if SUPPORT_ELAPSED elapsed = newPairs(256, (void*)freeElapsed); #endif return OK; } void freeProxyRequests() { int i; ProxyRequest* r = requests; while (r) { ProxyRequest* next = r->next; closeProxy(r); r = next; } for (i = 0; i < nRequestsBlocks; i++) FREE(requestsBlocks[i]); requestsBlocks = FREE(requestsBlocks); freeRequests = NULL; proxyBuffer = freeBuffer(proxyBuffer); nRequests = maxRequests = nRequestsBlocks = failed = 0; } void resetTodayProxy() { maxRequests = failed = 0; } int getProxyStats(char* buffer, int size, char show, char* uri) { int l = 0; l += SNPRINTF(buffer + l, size - l, "\n" "Proxy requests (active/max/failed): %s / %s / %s\n", uri, show ? "" : "?proxy", itos(nRequests), itos(maxRequests), itos(failed)); if (show && size > l) { ProxyRequest* r; if (requests) l += SNPRINTF(buffer + l, size - l, "%-15s %-15s %4s [%5s/%-5s] %s\n", "State", "IP", "Time", "Recv", "Total", "URL"); for (r = requests; r && size > l; r = r->next) { if (r->request) { l += SNPRINTF(buffer + l, size - l, "%-15s %-15s %4s [%5s/%-5s] %s%s%s%s\n", states[r->state], ip2string(r->request->ip), humanPeriod(now - r->start), humanSize(r->request->received), r->request->length < 0 ? "?" : humanSize(r->request->length), r->request->host?:"-", r->request->uri, r->request->query?"?":"", r->request->query?:""); } else { l += SNPRINTF(buffer + l, size - l, "%-15s %-15s %4s\n", states[r->state], "?", humanPeriod(now - r->start)); } } } return l; } int fillProxyHeaders(ProxyRequest* pr, char* buffer, int size) { int l = 0; char* b = buffer; char location[MAX_URI]; Request* r = pr->request; size -= 2; debug("%d> fillProxyHeaders()", pr->fd); l = catstr(location, MAX_URI, pr->proxy->location, r->alias->exactLocation ? "" : escape(r->path, NO), NULL); if (!l) return 0; if (r->params) l += catstr(location + l, MAX_URI - l, ";", r->params, NULL); if (r->query) l += catstr(location + l, MAX_URI - l, "?", r->query, NULL); l = catstr(b, size, methods[r->method], " ", location, " HTTP/1.0" RN, "X-Real-IP: ", ip2string(r->ip), RN, NULL); if (!l) return 0; if (r->host) l += catstr(b + l, size - l, "Host: ", r->host, RN, NULL); if (r->userAgent) l += catstr(b + l, size - l, "User-Agent: ", r->userAgent, RN, NULL); if (r->referrer) l += catstr(b + l, size - l, "Referer: ", r->referrer, RN, NULL); if (r->authorization) l += catstr(b + l, size - l, "Authorization: ", r->authorization, RN, NULL); if (r->cookie) l += catstr(b + l, size - l, "Cookie: ", r->cookie, RN, NULL); if (r->inContentType) l += catstr(b + l, size - l, "Content-Type: ", r->inContentType, RN, NULL); if (r->inContentLength >= 0) l += catstr(b + l, size - l, "Content-Length: ", itoa(r->inContentLength), RN, NULL); if (r->rangeFrom >= 0 || r->rangeTo >= 0) { if (r->rangeFrom >= 0) { if (r->rangeTo >= 0) l += catstr(b + l, size - l, "Range: bytes=", itoa(r->rangeFrom), "-", itoa(r->rangeTo), RN, NULL); else l += catstr(b + l, size - l, "Range: bytes=", itoa(r->rangeFrom), "-", RN, NULL); } else { l += catstr(b + l, size - l, "Range: bytes=-", itoa(r->rangeTo), RN, NULL); } } if (r->ims > 0) l += catstr(b + l, size - l, "If-Modified-Since: ", time2str(r->ims, HTTP_TIME), RN, NULL); if (r->iums > 0) l += catstr(b + l, size - l, "If-Unmodified-Since: ", time2str(r->iums, HTTP_TIME), RN, NULL); if (r->ifRange > 0) l += catstr(b + l, size - l, "If-Range: ", time2str(r->ifRange, HTTP_TIME), RN, NULL); if (r->forwardedFor) l += catstr(b + l, size - l, "X-Forwarded-For: ", ip2string(r->forwardedFor), RN, NULL); #if SUPPORT_COUNTRY l += catstr(b + l, size - l, "X-Country: ", id2code(countries, r->country < 0 ? (r->country = ip2id(countries, r->ip)) : r->country), RN, NULL); l += catstr(b + l, size - l, "X-Region: ", id2code(countries, r->country), "/", (r->country == 182) ? id2code(regions, ip2id(regions, r->ip)) : "", RN, NULL); #endif #if MAX_OTHER_HEADERS { int i; for (i = 0; r->inHeaders[i]; i++) { int n = strlen(r->inHeaders[i]); if (l + n + 2 < size) { memcpy(b + l, r->inHeaders[i], n); l += n; memcpy(b + l, RN, 2); l += 2; } } } #endif memcpy(b + l, RN, 2); return l + 2; } int handleProxyRequest(Request* request) { int fd; Proxy* pr = request->alias->proxy; Proxy* startProxy = pr; ProxyRequest* r; int connected = NO; struct sockaddr_in address; #if SUPPORT_ANTIFLOOD if (!request->trusted && request->alias->antiflood && !addIPRequest(request->ip)) return 503; #endif if (!pr) { message(ERROR, "no backend defined for %s%s", request->host?:"-", request->uri?:"-"); failed++; return 502; } backendsLoop: do { if (pr->backend->state) break; if (pr->backend->last + request->alias->timeout < now) break; pr = pr->next; } while (pr != startProxy); if (!(*(pr->location) || *(request->path))) { char location[MAX_URI]; int n = catstr(location, MAX_URI, "http://", request->host?:request->server->name, request->uri, "/", NULL); if (!n) return 414; if (request->params) n += catstr(location + n, MAX_URI - n, ";", request->params, NULL); if (request->query) n += catstr(location + n, MAX_URI - n, "?", request->query, NULL); if (!(request->location = addStringToBuffer(request, location, n + 1))) return 414; return 301; } if (pr->backend->nRequests == config->proxyConnections) { message(ERROR, "connection to %s:%d failed: too many requests", inet_ntoa(pr->backend->addr), pr->backend->port); if (pr->backend->state) { message(MESSAGE, "backend %s:%d is DOWN", inet_ntoa(pr->backend->addr), pr->backend->port); pr->backend->state = FAILED; pr->backend->last = now; goto backendsLoop; } failed++; return 503; } if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { message(SYS|ERROR, "cannot create socket"); failed++; return 503; } debug("%d> handleProxyRequest(%s %s%s): %d", request->fd, ip2string(request->ip), request->host?:"-", request->uri?:"-", fd); if (fd >= fdLimit) { if (close(fd)) message(SYS|ERROR, "cannot close proxy fd"); message(ERROR, "connection to %s:%d failed: too many fds", inet_ntoa(pr->backend->addr), pr->backend->port); failed++; return 503; } // fcntl(fd, F_SETFD, FD_CLOEXEC); #if MODEL == MODEL_RTSIG fcntl(fd, F_SETSIG, rtSignal); fcntl(fd, F_SETOWN, myPID); fcntl(fd, F_SETFL, O_NONBLOCK|O_ASYNC); #else fcntl(fd, F_SETFL, O_NONBLOCK); #endif memset(&(address), 0, sizeof(address)); address.sin_addr = pr->backend->addr; address.sin_port = htons(pr->backend->port); address.sin_family = AF_INET; if (connect(fd, (struct sockaddr*) &address, sizeof(address)) < 0) { if (errno != EINPROGRESS) { message(SYS|ERROR, "cannot connect to %s:%d", inet_ntoa(pr->backend->addr), pr->backend->port); if (close(fd)) message(SYS|ERROR, "cannot close proxy fd"); if (pr->backend->state) { message(MESSAGE, "backend %s:%d is DOWN", inet_ntoa(pr->backend->addr), pr->backend->port); pr->backend->state = FAILED; pr->backend->last = now; goto backendsLoop; } failed++; return 503; } } else { connected = YES; } if (!(r = freeRequests)) { debug("handleProxyRequest: allocating new proxyRequests block"); freeRequests = CALLOC(REQUESTS_STEP * sizeof(ProxyRequest)); if (!freeRequests) { message(ERROR, "connection to %s:%d failed: not enough memory", inet_ntoa(pr->backend->addr), pr->backend->port); failed++; return 503; } for (r = freeRequests; r < freeRequests + REQUESTS_STEP - 1; r++) r->next = r + 1; r = requestsBlocks[nRequestsBlocks++] = freeRequests; } freeRequests = freeRequests->next; if ((r->next = requests)) r->next->prev = r; else tail = r; requests = r; r->prev = NULL; r->type = PROXY_REQUEST; r->state = CONNECT; r->fd = fd; r->length = -1; r->last = now + request->alias->timeout; r->start = now; r->rangeFrom = r->rangeTo = -1; r->request = request; r->proxy = pr; if (fd > maxFD) maxFD = fd; if (++nRequests > maxRequests) maxRequests = nRequests; if (++(pr->backend->nRequests) > pr->backend->maxRequests) pr->backend->maxRequests = pr->backend->nRequests; request->state = HANDLING; request->proxy = r; fds[fd].proxy = r; request->alias->proxy = pr->next; request->contentType = NULL; request->contentEnc = NULL; pr->backend->last = now; pr->backend->requests++; if (connected) { debug("%d> immediate connect", fd); proxy(r); } else { #if MODEL == MODEL_POLL r->watchWrite = YES; #elif MODEL == MODEL_KEVENT r->watchWrite = YES; watchEvent(fd, EVFILT_WRITE, EV_ADD, 0, 0); //strange modern FreeBSD behaivor, don't use oneshot //watchEvent(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0); #endif } return 0; } #define COPY_FIELD(name) if (r->name >= start && r->name < end) r->name += diff Buffer* copyResponseBuffer(ProxyRequest* r, Buffer* src, int increase) { int diff; char* start = src->buffer; char* end = src->buffer + src->count; if (!(r->buffer = newBuffer(increase ? src->size << 1 : src->size))) return NULL; memcpy(r->buffer->buffer, src->buffer, r->buffer->count = src->count); r->buffer->start = src->start; diff = r->buffer->buffer - src->buffer; COPY_FIELD(contentType); COPY_FIELD(contentEnc); COPY_FIELD(location); COPY_FIELD(auth); COPY_FIELD(setCookie); #if MAX_OTHER_HEADERS { int i; for (i = 0; r->outHeaders[i]; i++) COPY_FIELD(outHeaders[i]); } #endif return r->buffer; } #define COPY_TO_REQUEST(name) if(r->name) request->name = addStringToBuffer(request, r->name, strlen(r->name) + 1) void copyHeader(ProxyRequest* r) { Request* request = r->request; request->modified = r->modified; request->expires = r->expires; request->length = r->length; request->acceptRanges = r->acceptRanges; request->status = r->status; request->rangeFrom = r->rangeFrom; request->rangeTo = r->rangeTo; request->rangeTotal = r->rangeTotal; COPY_TO_REQUEST(contentType); COPY_TO_REQUEST(contentEnc); COPY_TO_REQUEST(location); COPY_TO_REQUEST(auth); COPY_TO_REQUEST(setCookie); #if MAX_OTHER_HEADERS { int i; for (i = 0; r->outHeaders[i]; i++) COPY_TO_REQUEST(outHeaders[i]); } #endif } char* getResponseLine(ProxyRequest* r) { if (!r || r->fd < 0) { message(ERROR, "getLine for bad request: %s", r ? "not connected" : "NULL"); return NULL; } while (1) { int cnt; if (r->buffer->count - r->buffer->start > 0) { char* line = r->buffer->buffer + r->buffer->start; char* eol; if ((eol = memchr(line, '\n', r->buffer->count - r->buffer->start))) { r->buffer->start = eol + 1 - r->buffer->buffer; *eol-- = 0; while (eol >= line && isSpace(*eol)) *eol-- = 0; return line; } } if (r->buffer->count == r->buffer->size) { if (r->buffer->size < MAXIMUM_BUFFER_SIZE) { Buffer* old = r->buffer; if (!copyResponseBuffer(r, r->buffer, YES)) return NULL; if (old != proxyBuffer) freeBuffer(old); } else { message(ERROR, "proxy buffer overflow while reading from %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); return NULL; } } errno = 0; cnt = read(r->fd, r->buffer->buffer + r->buffer->count, r->buffer->size - r->buffer->count); debug("%d> reading %d bytes (%s)", r->fd, cnt, errno?strerror(errno):"OK"); if (cnt <= 0) { if (errno == EWOULDBLOCK) return NULL; if (cnt) message(SYS|ERROR, "cannot read from %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); else message(ERROR, "cannot read from %s:%d: %s", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port, "connection is closed"); return NULL; } r->buffer->count += cnt; } } int parseResponseLine(char* line, ProxyRequest* r) { char *proto, *major, *minor, *status; proto = line; debug("parse: %s", line); if (!(major = strchr(proto, '/'))) return 502; *major++ = 0; if (!(minor = strchr(major, '.'))) return 502; *minor++ = 0; if (*(int*)proto != 0x50545448) // "HTTP" return 502; if (!(status = strchr(minor, ' '))) return 502; *status++ = 0; while(*status==' ') status++; r->major = atol(major); if (r->major < 1) r->major = 1; r->minor = atol(minor); if (r->minor < 0) r->minor = 0; r->status = atol(status); if (r->status < 1) r->status = 502; if (r->major > 1) return 502; r->keepalive = r->minor; return 0; } int parseResponseHeaderLine(char* line, ProxyRequest* r) { char* value = strchr(line, ':'); if (!value) return 502; *value++ = 0; while (isSpace(*value)) value++; if (!*value) return 0; strtolower(line); if (!strcmp(line, "content-type")) { r->contentType = strtolower(value); } else if (!strcmp(line, "content-encoding")) { r->contentEnc = strtolower(value); } else if (!strcmp(line, "content-length")) { r->length = atol(value); } else if (!strcmp(line, "accept-ranges")) { r->acceptRanges = !strcmp(value, "bytes"); } else if (!strcmp(line, "content-range")) { int from, to, total; char* end; if (strncmp(value, "bytes ", strlen("bytes "))) return 0; value += strlen("bytes "); errno = 0; from = strtol(value, &end, 10); if (end == value || *end++ != '-' || errno) return 0; to = strtol(value = end, &end, 10); if (end == value || *end++ != '/' || errno) return 0; total = strtol(value = end, &end, 10); if (end == value || *end != 0 || errno) return 0; r->rangeFrom = from; r->rangeTo = to; r->rangeTotal = total; } else if (!strcmp(line, "last-modified")) { r->modified = timerfc(strtolower(value)); if (r->modified == (time_t) -1) r->modified = 0; } else if (!strcmp(line, "expires")) { r->expires = timerfc(strtolower(value)); if (r->expires < 1) r->expires = EXPIRES_PAST; } else if (!strcmp(line, "location")) { if (r->request->host && r->proxy) { char* withPort = fmt("http://%s:%d/", r->request->host, r->proxy->backend->port); int n = strlen(withPort); if (!strncmp(value, withPort, n)) strcpy(value + 7 + strlen(r->request->host), value + n - 1); } r->location = value; } else if (!r->setCookie && !strcmp(line, "set-cookie")) { r->setCookie = value; } else if (!strcmp(line, "www-authenticate")) { r->auth = value; } else if (!strcmp(line, "connection")) { strtolower(value); if (r->minor) { if (!strcmp(value, "close")) r->keepalive = NO; } else { if (!strcasecmp(value, "keep-alive")) r->keepalive = YES; } } else if (!strcmp(line, "server")) { //ignored; } else if (!strcmp(line, "date")) { //ignored; } else { #if MAX_OTHER_HEADERS int i = 0; for (i = 0; i < MAX_OTHER_HEADERS; i++) { if (!r->outHeaders[i]) { line[strlen(line)] = ':'; r->outHeaders[i] = line; break; } } #endif } return 0; } void cleanupProxy() { while(tail && now > tail->last) { debug("%d> timeout in %s", tail->fd, states[tail->state]); if (tail->request) { message(MESSAGE, "proxy timeout in %s for %s (%s%s%s%s)", states[tail->state], ip2string(tail->request->ip), tail->request->host?:"?", tail->request->uri, tail->request->query?"?":"", tail->request->query?:""); } else { message(MESSAGE, "proxy timeout in %s", states[tail->state]); } if (tail->state == CONNECT && tail->proxy->backend->state) { tail->proxy->backend->state = FAILED; message(MESSAGE, "backend %s:%d is DOWN", inet_ntoa(tail->proxy->backend->addr), tail->proxy->backend->port); } tail->status = 504; tail->state = FAULT; proxy(tail); } #if SUPPORT_ELAPSED if (elapsed->count) { int fd = open("data/httpd/elapsed", O_RDWR|O_CREAT, 0644); if (fd < 0) { message(SYS|ERROR, "cannot open %s", "data/httpd/elapsed"); } else { char buf[LARGE_TEXT]; int n; Pair* p; flock(fd, LOCK_EX); if ((n = read(fd, buf, LARGE_TEXT - 1)) > 0) { char* end; char* s = buf; buf[n] = 0; do { char name[MEDIUM_STR]; char count[MEDIUM_STR]; char total[MEDIUM_STR]; Elapsed *e; if ((end = strchr(s, '\n'))) *end++ = 0; split(s, " ", MEDIUM_STR, name, count, total, NULL); if ((e = getPair(elapsed, name))) { e->count += atoi(count); e->elapsed += atoi(total); } } while ((s = end)); lseek(fd, 0, SEEK_SET); } if (elapsed->count > 1000) { ftruncate(fd, 0); freePairs(elapsed); elapsed = newPairs(256, (void*)freeElapsed); } n = 0; startListPairs(elapsed); while ((p = nextPair(elapsed))) { Elapsed* e = p->value; n += catstr(buf + n, LARGE_TEXT - n, p->name, " ", itoa(e->count), " ", itoa(e->elapsed), "\n", NULL); e->count = e->elapsed = 0; } write(fd, buf, n); close(fd); } } #endif } ProxyRequest* flushResponse(Request* r) { if (r->state == HANDLING) { #if SUPPORT_CACHE if (r->alias->cache && r->status == 200 && r->alias->expires >= 0 && (r->expires == 0 || r->expires > now) && r->length >= 0 && r->length <= r->alias->cache->maxLength) { r->cached = newCacheEntry(r); } #endif r->state = PREPARE_HEADER; http(r); } else if (r->state == WAIT_DATA) { http(r); } else if (r->state == WRITE_HEADER) { http(r); } else if (r->state != WRITE_CONTENT) { message(MESSAGE, "strange state of request: %d", r->state); } currentRequest = NULL; return r->proxy; } ProxyRequest* closeProxy(ProxyRequest* r) { if (!r || !r->state) return NULL; debug("%d> closeProxy()", r->fd); #if SUPPORT_ELAPSED if (r->request && r->request->alias->elapsed && r->request->path && r->exactStart.tv_sec) { struct timeval finish; Elapsed* e; int l = strlen(r->request->path); char* key = l > 4 && strcmp(r->request->path + l - 4, ".php") == 0 ? "php" : "html"; gettimeofday(&finish, NULL); l = ((finish.tv_sec - r->exactStart.tv_sec) * 1000000 + (finish.tv_usec - r->exactStart.tv_usec))/1000; if (l < 0) { message(MESSAGE, "strange: negative elapsed: %d.%d > %d.%d", r->exactStart.tv_sec, r->exactStart.tv_usec, finish.tv_sec, finish.tv_usec); l = 0; } if (!(e = getPair(elapsed, key))) { e = CALLOC(sizeof(Elapsed)); setPair(elapsed, key, e); } e->count++; e->elapsed += l; } #endif if (r->request) r->request->proxy = NULL; if (r->fd >= 0) { fds[r->fd].proxy = NULL; if (close(r->fd) < 0) message(SYS|ERROR, "cannot close proxy fd %d", r->fd); } if (r->buffer && r->buffer != proxyBuffer) freeBuffer(r->buffer); if (r->next) r->next->prev = r->prev; else tail = r->prev; if (r->prev) r->prev->next = r->next; else requests = r->next; r->proxy->backend->nRequests--; memset(r, 0, sizeof(ProxyRequest)); r->next = freeRequests; freeRequests = r; nRequests--; return NULL; } void proxy(ProxyRequest* r) { Request* request = r->request; char* line; char watchRead = 0; char watchWrite = 0; if (!r->buffer) { r->buffer = proxyBuffer; proxyBuffer->start = proxyBuffer->count = 0; } while(1) { switch (r->state) { case UNDEF: debug("%d> proxy: %s", r->fd, states[r->state]); message(ERROR, "strange: proxy in UNDEF state (%d)", r->fd); goto end; case CONNECT: { int err; socklen_t l; debug("%d> proxy: %s", r->fd, states[r->state]); err = 0; l = sizeof(err); if (getsockopt(r->fd, SOL_SOCKET, SO_ERROR, &err, &l) < 0 || err) { if (err) errno = err; message(SYS|ERROR, "cannot connect to %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); r->state = FAULT; if (r->proxy->backend->state) { message(MESSAGE, "backend %s:%d is DOWN", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); r->proxy->backend->state = FAILED; } break; } r->state = WRITE_HEADER_BE; watchWrite = 1; //break; } case WRITE_HEADER_BE: { int l; char header[LARGE_TEXT]; int count; char writePost = NO; debug("%d> proxy: %s", r->fd, states[r->state]); l = fillProxyHeaders(r, header, LARGE_TEXT); if (!l) { message(ERROR, "too long proxy request: %s", r->request->uri); r->status = 414; r->state = FAULT; break; } if (request->inContentLength > 0) { if (!request->postBuffer) { message(ERROR, "wow! post without postbuffer"); } else if (request->inContentLength <= LARGE_TEXT - l) { memcpy(header + l, request->postBuffer->buffer, request->inContentLength); l += request->inContentLength; } else { writePost = YES; } } l -= r->offset; #if SUPPORT_ELAPSED if (request->alias->elapsed) gettimeofday(&r->exactStart, NULL); #endif if ((count = write(r->fd, header + r->offset, l)) <= 0) { if (errno == EWOULDBLOCK) goto end; if (errno == ENOTCONN) { debug("do retry. cannot write to backend %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); goto end; } message(SYS|ERROR, "cannot write to backend %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); r->state = FAULT; break; } if (count < l) { debug("incomplete header write to backend (%d of %d bytes)", count, l); r->offset += count; goto end; } if (writePost) { r->state = WRITE_POST; break; } r->state = READ_RESPONSE; watchWrite = -1; watchRead = 1; goto end; } case WRITE_POST: { int cnt; cnt = write(r->fd, request->postBuffer->buffer + r->posted, request->inContentLength - r->posted); debug("%d> writing %d bytes (%s)", r->fd, cnt, errno?strerror(errno):"OK"); if (cnt <= 0) { if (errno == EWOULDBLOCK) goto end; if (cnt < 0) message(SYS|ERROR, "cannot write to backend %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); r->state = FAULT; break; } r->posted += cnt; if (r->posted == request->inContentLength) { r->state = READ_RESPONSE; watchWrite = -1; watchRead = 1; goto end; } break; } case READ_RESPONSE: debug("%d> proxy: %s", r->fd, states[r->state]); if (!(line = getResponseLine(r))) { if (errno == EWOULDBLOCK) goto end; message(ERROR, "strange: cannot get response line"); r->state = FAULT; break; } if (parseResponseLine(line, r)) { message(ERROR, "strange: cannot parse response line %s", line); r->status = 502; r->state = FAULT; break; } r->state = READ_RESPONSE_H; //break; case READ_RESPONSE_H: debug("%d> proxy: %s", r->fd, states[r->state]); while (1) { if (!(line = getResponseLine(r))) { if (errno == EWOULDBLOCK) goto end; message(ERROR, "strange: cannot get response header"); r->state = FAULT; break; } if (!*line) { r->state = COPY_HEADER; break; } if (parseResponseHeaderLine(line, r)) { message(ERROR, "strange: cannot parse response header %s", line); r->status = 502; r->state = FAULT; break; } } break; case COPY_HEADER: { debug("%d> proxy: %s", r->fd, states[r->state]); copyHeader(r); if (!r->proxy->backend->state && r->proxy->backend->nRequests <= config->proxyConnections / 2) { message(MESSAGE, "backend %s:%d is UP", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); r->proxy->backend->state = OK; } if (r->length == 0 || r->status == 304 || request->method == HEAD) { if (r->buffer != proxyBuffer) freeBuffer(r->buffer); r->buffer = NULL; r->state = FINISHED; break; } request->received = r->buffer->count - r->buffer->start; if (r->length > 0 && request->received >= r->length) { request->received = r->length; r->state = FINISHED; } else { r->state = READ_BODY; } request->contentSize = r->length < 0 ? BLOCK_SIZE : r->length < config->maxBuffer ? r->length : config->maxBuffer; if (request->contentSize < request->received) request->contentSize = request->received; request->content = CALLOC(request->contentSize); if (request->received) memcpy(request->content, r->buffer->buffer + r->buffer->start, request->received); if (r->buffer != proxyBuffer) freeBuffer(r->buffer); r->buffer = NULL; break; } case READ_BODY: debug("%d> proxy: %s", r->fd, states[r->state]); while (1) { int count = request->contentSize + request->offset - request->received; debug("READ_BODY: size: %d, off: %d, r: %d, s: %d", request->contentSize, request->offset, request->received, request->sent); if (count == 0) { if (request->contentSize + BLOCK_SIZE <= config->maxBuffer) { request->content = REALLOC(request->content, request->contentSize += BLOCK_SIZE); count += BLOCK_SIZE; } else { if (flushResponse(request) != r) { if (!r->state) return; message(MESSAGE, "strange: flushResponse != r and r->state in READ_BODY"); r->state = FAULT; break; } r->state = WAIT_MEMORY; watchRead = -1; break; } } if ((count = read(r->fd, request->content + request->received - request->offset, count)) < 0) { debug("%d> cannot read: %s", r->fd, errno?strerror(errno):"OK"); if (errno == EWOULDBLOCK) { if (now - r->start > 1 && flushResponse(request) != r) { if (!r->state) return; message(MESSAGE, "strange: flushResponse != r and r->state in READ_BODY"); r->state = FAULT; break; } goto end; } else { message(SYS|ERROR, "cannot read from %s:%d", inet_ntoa(r->proxy->backend->addr), r->proxy->backend->port); r->state = request->received ? FINISHED : FAULT; } break; } debug("%d> reading %d bytes", r->fd, count); request->received += count; if (count == 0 || request->received == request->length) { if (request->method != HEAD) request->length = request->received; r->state = FINISHED; break; } } break; case WAIT_MEMORY: debug("%d> proxy: %s", r->fd, states[r->state]); if (request->contentSize + request->offset - request->received <= 0) goto end; r->state = READ_BODY; watchRead = 1; break; case FINISHED: debug("%d> proxy: %s", r->fd, states[r->state]); r = closeProxy(r); flushResponse(request); return; case FAULT: debug("%d> proxy: %s", r->fd, states[r->state]); if ((request = r->request) && request->proxy) { request->status = r->status >= 400 ? r->status : 503; if (request->contentSize) { request->content = FREE(request->content); request->contentSize = 0; request->state = CLOSE; } else { request->state = request->state == HANDLING ? NOT_OK : CLOSE; } http(request); } else { r = closeProxy(r); } currentRequest = NULL; failed++; return; } } end: if (r && r->state) { #if MODEL == MODEL_KEVENT if (watchRead > 0) { if (!r->watchRead) { watchEvent(r->fd, EVFILT_READ, EV_ADD, 0, 0); r->watchRead = YES; } } else if (watchRead < 0) { if (r->watchRead) { watchEvent(r->fd, EVFILT_READ, EV_DELETE, 0, 0); r->watchRead = NO; } } if (watchWrite > 0) { if (!r->watchWrite) { watchEvent(r->fd, EVFILT_WRITE, EV_ADD, 0, 0); r->watchWrite = YES; } } else if (watchWrite < 0) { if (r->watchWrite) { watchEvent(r->fd, EVFILT_WRITE, EV_DELETE, 0, 0); r->watchWrite = NO; } } #elif MODEL == MODEL_POLL if (watchRead) r->watchRead = watchRead > 0 ? YES : NO; if (watchWrite) r->watchWrite = watchWrite > 0 ? YES : NO; #endif //MODEL if (r->buffer == proxyBuffer && r->state <= COPY_HEADER) { if (proxyBuffer->count) copyResponseBuffer(r, proxyBuffer, NO); else r->buffer = NULL; } if (r->request && r->request->alias) { r->last = now + request->alias->timeout; if (requests != r) { if (r->next) r->next->prev = r->prev; else tail = r->prev; r->prev->next = r->next; r->next = requests; r->prev = NULL; requests->prev = r; requests = r; } } } } #if MODEL == MODEL_POLL int fillProxyPoll(struct pollfd* pollfds) { ProxyRequest* r; struct pollfd* pollfd; if (!nRequests) return 0; for (pollfd = pollfds, r = requests; r; r = r->next) { short ev = r->watchRead ? POLLIN : r->watchWrite ? POLLOUT : 0; if (ev) { pollfd->events = ev; pollfd->fd = r->fd; pollfd++; } } return pollfd - pollfds; } #endif #endif /* SUPPORT_PROXY */