Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🗜️ Understanding Nginx
    • Part 1 - Nginx 能帮我们做什么

      • Chapter 1 - 研究 Nginx 前的准备工作
      • Chapter 2 - Nginx 的配置
    • Part 2 - 如何编写 HTTP 模块

      • Chapter 7 - Nginx 提供的高级数据结构
    • Part 3 - 深入 Nginx

      • Chapter 8.1-8.2 - Nginx 基础架构
      • Chapter 8.3-8.6 - Nginx 框架核心结构体
      • Chapter 8.7 - Nginx 内存池
      • Chapter 9.1-9.3 - 事件处理框架
      • Chapter 9.4-9.6 - 事件驱动模块与 EPOLL
      • Chapter 9.7-9.8 - 定时器事件与事件驱动框架处理流程
      • Chapter 9.9-9.10 - 文件的异步 I/O 与 TCP
      • Chapter 10.1-10.2 - HTTP 框架的配置解析与合并
      • Chapter 10.3-10.7 - HTTP 阶段划分与框架初始化
      • Chapter 11 - HTTP 框架的执行流程
      • Chapter 12.1-12.4 - Upstream 与上游服务器通信
      • Chapter 12.5 - 接收上游服务器响应并处理
      • Chapter 12.6 - 12.9 - 转发响应并结束请求
      • Chapter 13.1-13.5 - 邮件代理模块 - 认证服务器
      • Chapter 13.6-13.7 - 邮件代理模块 - 上游
      • Chapter 14 - 进程间通信机制
      • Chapter 16 - slab 共享内存

Chapter 12.5 - 接收上游服务器响应并处理

Created by : Mr Dk.

2020 / 08 / 01 21:21

Nanjing, Jiangsu, China


12.5 接收上游服务器的响应头部

12.5.1 应用层协议的两段划分方式

Nginx 设置了 ngx_http_upstream_process_header() 函数处理上游服务器的响应。Upstream 机制支持上游服务器使用任何基于 TCP 的应用层协议 - TCP 其实就是 有顺序的数据流。实际上,上游服务器的响应可大可小,如果在内存中保存完整的响应,可能会引发内存不够的问题;如果在磁盘文件中接收响应,又会带来大量 I/O。

为了解决这个问题,应用层协议通常会将请求划分为 header 和 body。Header 中抽象了不同协议报文之间的共同部分,不同协议数据包的 header 都具有相同的格式。服务器必须解析 header,而 body 则不做格式上的要求,也不一定解析。Header 的长度要么是固定的,要么在一个指定的范围内 - 所以在处理 header 时,开辟的内存只需要容纳 header 的长度即可。

Header 和 body 存储什么样的信息取决于应用层协议,upstream 机制并不关心。Upstream 机制已经抽象出了 process_header() 函数,由具体的 HTTP 模块实现 header 解析逻辑。如果 HTTP 模块的目的是反向代理,那么可以将解析出的 header 适配到响应给下游客户端的 header 中。

而 body 的内容较为简单,HTTP 模块一般都不解析 body。Upstream 机制抽象了三种形式的 body 处理方式:

  1. 不转发响应
  2. 下游网速优先的响应 (使用固定的缓冲区存放上游响应,因为上游响应会很快被下游取走)
  3. 上游网速优先的响应 (使用较大内存或磁盘文件来缓存上游响应,因为可能会有堆积)

12.5.3 接收响应头部的流程

process_header 是 ngx_http_upstream_t 结构体中定义的函数指针。Upstream 机制不涉及应用层协议,由使用 upstream 机制的模块负责解析应用层协议。HTTP 模块提供的回调函数是 ngx_http_upstream_process_header()。这个函数也被设计为 ngx_http_upstream_t 结构体中的读事件回调函数,会被反复调用

static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ssize_t            n;
    ngx_int_t          rc;
    ngx_connection_t  *c;

    // 获取 Nginx 与上游服务器的连接
    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process header");

    c->log->action = "reading response header from upstream";

    // 检测读事件是否超时
    if (c->read->timedout) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

    // 还没有请求发送到上游服务器,就收到了响应,不对劲
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    // 接收响应头的缓冲区还未分配内存
    if (u->buffer.start == NULL) {
        // 分配缓冲区内存
        u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
        if (u->buffer.start == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;
        u->buffer.end = u->buffer.start + u->conf->buffer_size;
        u->buffer.temporary = 1;

        u->buffer.tag = u->output.tag;

        if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                          sizeof(ngx_table_elt_t))
            != NGX_OK)
        {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        if (ngx_list_init(&u->headers_in.trailers, r->pool, 2,
                          sizeof(ngx_table_elt_t))
            != NGX_OK)
        {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

#if (NGX_HTTP_CACHE)

        if (r->cache) {
            u->buffer.pos += r->cache->header_start;
            u->buffer.last = u->buffer.pos;
        }
#endif
    }

    for ( ;; ) {

        // 调用 recv() 读取上游响应
        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);

        // 接下来还需要继续接收响应
        if (n == NGX_AGAIN) {
#if 0
            // 将读事件添加到定时器中
            ngx_add_timer(rev, u->read_timeout);
#endif
            // 将读事件添加到事件驱动模块中
            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            // 结束函数
            return;
        }

        // 上游服务器关闭连接
        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "upstream prematurely closed connection");
        }
        // 发生错误
        if (n == NGX_ERROR || n == 0) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        // 统计接收到的响应长度
        u->state->bytes_received += n;
        // 缓冲区地址后移
        u->buffer.last += n;

#if 0
        u->valid_header_in = 0;

        u->peer.cached = 0;
#endif
        // 解析 header
        rc = u->process_header(r);

        // Header 还没有接收完整
        if (rc == NGX_AGAIN) {
            // 检查缓冲区是否耗尽
            if (u->buffer.last == u->buffer.end) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream sent too big header");

                ngx_http_upstream_next(r, u,
                                       NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                return;
            }

            // 等待下一次触发
            continue;
        }

        break;
    }

    // Header 不合法
    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
        return;
    }

    // Header 错误
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* rc == NGX_OK */

    u->state->header_time = ngx_current_msec - u->start_time;

    if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {

        if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
            return;
        }

        if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
            return;
        }
    }

    // 处理已经解析出的 headers,会将已经解析出的 headers 设置到 request 结构体的 headers_out 中
    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
        return;
    }

    // 开始向客户端转发响应
    ngx_http_upstream_send_response(r, u);
}

12.5.4 响应体的处理

转发响应由 ngx_http_upstream_send_response() 进行。首先会把 headers 发送出去。之后在函数中,会根据配置项中的 buffering 标志来决定是否打开缓存处理 body - 打开缓存意味着上游网速更快 (比如 Nginx 与上游服务器在同一个内网中),从而进行是否需要转发响应的判断。

如果不打开缓存:

if (!u->buffering) {

#if (NGX_HTTP_CACHE)

    if (r->cache) {
        ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
    }

#endif

    // 判断 HTTP 模块是否实现了用于处理 body 的 input_filter 函数
    // 如果没有实现,则给出一个默认的实现函数
    // 如果用户想自己处理 body,会实现这个函数
    if (u->input_filter == NULL) {
        // 处理 body 前的初始化工作
        u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
        // 处理 body 的回调函数
        u->input_filter = ngx_http_upstream_non_buffered_filter;
        // 回调函数的参数,用于被多次调用时保存上下文
        u->input_filter_ctx = r;
    }

    // 设置读事件回调函数 (接收到上游服务器的响应时回调)
    u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
    // 设置写事件回调函数 (向下游客户端转发响应时回调)
    r->write_event_handler =
        ngx_http_upstream_process_non_buffered_downstream;

    r->limit_rate = 0;
    r->limit_rate_set = 1;

    // 调用 input_filter 的初始化函数,做好处理 body 前的准备工作
    if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    // 解析完 header 后,还剩下的字符
    n = u->buffer.last - u->buffer.pos;

    // 如果还有剩余字符,说明已经收到了部分 body
    if (n) {
        u->buffer.last = u->buffer.pos;

        u->state->response_length += n;

        // 调用 input_filter 处理 body
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        // Body 处理结束,向下游转发 body
        ngx_http_upstream_process_non_buffered_downstream(r);

    } else {
        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;

        if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        // 如果与上游的连接就绪,那么调用读回调函数
        if (u->peer.connection->read->ready || u->length == 0) {
            ngx_http_upstream_process_non_buffered_upstream(r, u);
        }
    }

    return;
}

如果打开缓存:

// 准备并处理 body
if (u->input_filter_init
    && u->input_filter_init(p->input_ctx) != NGX_OK)
{
    ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
    return;
}

// 设置读事件回调函数
u->read_event_handler = ngx_http_upstream_process_upstream;
// 设置写事件回调函数
r->write_event_handler = ngx_http_upstream_process_downstream;

// 调用读事件回调函数,向上游服务器读取响应
ngx_http_upstream_process_upstream(r, u);

可以看到,对响应中 body 的处理,由 input_filter() 完成。如果没有实现,那么 Nginx 提供了一个默认实现 (以及一个相对应的初始化函数)。默认的初始化函数为 ngx_http_upstream_non_buffered_filter_init():

ngx_int_t
ngx_http_upstream_non_buffered_filter_init(void *data)
{
    return NGX_OK;
}

默认的 body 处理函数为 ngx_http_upstream_non_buffered_filter()。从函数名就可以看出来,这个函数不使用 (文件) 缓存。也就是说,将会试图在内存缓冲区中存放完整的 body - 一旦上游服务器发来的响应超过了内存缓冲区的大小,请求将会出错:

ngx_int_t
ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
{
    ngx_http_request_t  *r = data;

    ngx_buf_t            *b;
    ngx_chain_t          *cl, **ll;
    ngx_http_upstream_t  *u;

    u = r->upstream;

    // 找到 out_bufs 的末尾以便添加新的缓冲区
    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;
    }

    // 获得一个空闲的 ngx_buf_t
    cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    // 新的缓冲块加入链表尾
    *ll = cl;

    cl->buf->flush = 1;
    cl->buf->memory = 1;

    b = &u->buffer;

    // Buffer 指针向后移动,保存 body
    cl->buf->pos = b->last;
    b->last += bytes;
    cl->buf->last = b->last;
    cl->buf->tag = u->output.tag;

    if (u->length == -1) {
        return NGX_OK;
    }

    if (bytes > u->length) {

        ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
                      "upstream sent more data than specified in "
                      "\"Content-Length\" header");

        cl->buf->last = cl->buf->pos + u->length;
        u->length = 0;

        return NGX_OK;
    }

    // 需要接收的 body 长度减少
    u->length -= bytes;

    return NGX_OK;
}

这是处理响应 body 的回调函数。从 Nginx 的视角来看,它的读事件应该是从上游服务器获取响应,写事件应该是向下游客户端转发响应。因此上面函数中设置的读写回调函数才是 input_filter() 函数的调用者。根据 Nginx 根据网络环境处理响应的策略,读写回调函数也被分为了两组:

  • 不打开缓存 - ngx_http_upstream_process_non_buffered_upstream() / ngx_http_upstream_process_non_buffered_downstream()
  • 打开缓存 - ngx_http_upstream_process_upstream() / ngx_http_upstream_process_downstream()
Edit this page on GitHub
Prev
Chapter 12.1-12.4 - Upstream 与上游服务器通信
Next
Chapter 12.6 - 12.9 - 转发响应并结束请求