Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 📝 Notes
    • Algorithm
      • Algorithm - Bloom Filter
      • Algorithm - Disjoint Set
      • Algorithm - Fast Power
      • Algorithm - KMP
      • Algorithm - Monotonic Stack
      • Algorithm - RB-Tree
      • Algorithm - Regular Expression
      • Algorithm - Sliding Window
      • Online Judge - I/O
    • C++
      • C++ - Const
      • C++ File I/O
      • C++ - Object Layout
      • C++ - Operator Overload
      • C++ - Polymorphism
      • C++ STL algorithm
      • C++ STL map
      • C++ STL multimap
      • C++ STL priority_queue
      • C++ STL set
      • C++ STL string
      • C++ STL unordered_map
      • C++ STL vector
      • C++ - Smart Pointer
      • C++ - Template & Genericity
    • Compiler
      • ANTLR - Basic
      • Compiler - LLVM Architecture
      • Compiler - Multi-version GCC
    • Cryptography
      • Cryptography - Certbot
      • Cryptography - Digital Signature & PKCS #7
      • Cryptography - GPG
      • Cryptography - JWT
      • Cryptography - Keystore & Certificates
      • Cryptography - OAuth 2.0
      • Cryptography - Java 实现对称与非对称加密算法
      • Cryptography - TLS
    • DevOps
      • DevOps - Travis CI
    • Docker
      • Docker - Image & Storage Management
      • Docker - Image
      • Docker - Libcontainer
      • Docker - Multi-Arch Image
      • Docker - Multi-Stage Build
      • Docker - Network
      • Docker - Orchestration & Deployment
      • Docker - Overview
      • Docker - Service Building
      • Docker - Volume & Network Usage
      • Docker - Volume
      • Linux - Control Group
      • Linux - Namespace
    • Git
      • Git - Branch & Merge
      • Git - Cached
      • Git - Cherry Pick
      • Git - Commit
      • Git - Patch
      • Git - Proxy
      • Git - Rebase
      • Git - Reset
      • Git - Stash
      • Git - Theme for Git-Bash
    • Java
      • JVM - Synchronized
      • JVM - Volatile
      • Java - Annotation 注解
      • Java - BIO & NIO
      • Java - Class Path
      • Java - Condition and LockSupport
      • Java - Current Timestamp
      • Java - Deep Copy
      • Java - 运行环境配置
      • Java - Equals
      • Java - Exporting JAR
      • Java - Javadoc
      • Java - Lock
      • Java - Maven 项目构建工具
      • Java - References
      • Java - Reflection Mechanism
      • Java - String Split
      • Java - Thread Pool
      • Java - Thread
      • Tomcat - Class Loader
      • Tomcat - Container
    • Linux
      • addr2line
      • cut
      • df
      • du
      • fallocate
      • find
      • fio
      • grep
      • groupadd
      • gzip
      • head / tail
      • hexdump
      • iostat
      • iotop
      • kill
      • ldd
      • lsof
      • ltrace / strace
      • mpstat
      • netstat
      • nm
      • pidstat
      • pmap
      • readlink
      • readlink
      • rpm2cpio / rpm2archive
      • sort
      • tee
      • uniq
      • useradd
      • usermod
      • watch
      • wc
      • which
      • xargs
    • MS Office
      • MS Office - Add-in Dev
      • MS Office - Application
    • MySQL
      • InnoDB - Architecture
      • InnoDB - Backup
      • InnoDB - Checkpoint
      • InnoDB - Critical Features
      • InnoDB - Files
      • InnoDB - Index
      • InnoDB - Insert Buffer
      • InnoDB - Lock
      • InnoDB - Partition Table
      • InnoDB - Table Storage
      • MySQL - Server Configuration
      • MySQL - Storage Engine
    • Network
      • Network - ARP
      • Network - FTP
      • Network - GitHub Accelerating
      • HTTP - Message Format
      • HTTP - POST 提交表单的两种方式
      • Network - Proxy Server
      • Network - SCP
      • Network - SSH
      • Network - TCP Congestion Control
      • Network - TCP Connection Management
      • Network - TCP Flow Control
      • Network - TCP Retransmission
      • Network - Traceroute
      • Network - V2Ray
      • Network - WebSocket
      • Network - Windows 10 Mail APP
      • Network - frp
    • Operating System
      • Linux - Kernel Compilation
      • Linux - Multi-OS
      • Linux - Mutex & Condition
      • Linux - Operations
      • Linux: Package Manager
      • Linux - Process Manipulation
      • Linux - User ID
      • Linux - Execve
      • OS - Compile and Link
      • OS - Dynamic Linking
      • OS - ELF
      • Linux - Image
      • OS - Loading
      • OS - Shared Library Organization
      • OS - Static Linking
      • Syzkaller - Architecture
      • Syzkaller - Description Syntax
      • Syzkaller - Usage
      • Ubuntu - Desktop Recover (Python)
      • WSL: CentOS 8
    • Performance
      • Linux Performance - Perf Event
      • Linux Performance - Perf Record
      • Linux Performance - Perf Report
      • Linux Performance - Flame Graphs
      • Linux Performance - Off CPU Analyze
    • PostgreSQL
      • PostgreSQL - ANALYZE
      • PostgreSQL - Atomics
      • PostgreSQL - CREATE INDEX CONCURRENTLY
      • PostgreSQL - COPY FROM
      • PostgreSQL - COPY TO
      • PostgreSQL - Executor: Append
      • PostgreSQL - Executor: Group
      • PostgreSQL - Executor: Limit
      • PostgreSQL - Executor: Material
      • PostgreSQL - Executor: Nest Loop Join
      • PostgreSQL - Executor: Result
      • PostgreSQL - Executor: Sequential Scan
      • PostgreSQL - Executor: Sort
      • PostgreSQL - Executor: Unique
      • PostgreSQL - FDW Asynchronous Execution
      • PostgreSQL - GUC
      • PostgreSQL - Locking
      • PostgreSQL - LWLock
      • PostgreSQL - Multi Insert
      • PostgreSQL - Plan Hint GUC
      • PostgreSQL - Process Activity
      • PostgreSQL - Query Execution
      • PostgreSQL - Spinlock
      • PostgreSQL - Storage Management
      • PostgreSQL - VFD
      • PostgreSQL - WAL Insert
      • PostgreSQL - WAL Prefetch
    • Productivity
      • LaTeX
      • Venn Diagram
      • VuePress
    • Solidity
      • Solidity - ABI Specification
      • Solidity - Contracts
      • Solidity - Expressions and Control Structures
      • Solidity - Layout and Structure
      • Solidity - Remix IDE
      • Solidity - Slither
      • Solidity - Types
      • Solidity - Units and Globally Available Variables
    • Vue.js
      • Vue.js - Environment Variable
    • Web
      • Web - CORS
      • Web - OpenAPI Specification
    • Wireless
      • Wireless - WEP Cracking by Aircrack-ng
      • Wireless - WPS Cracking by Reaver
      • Wireless - wifiphisher

PostgreSQL - COPY FROM

Created by: Mr Dk.

2023 / 09 / 28 00:04

Qingdao, Shandong, China


Background

PostgreSQL 的 COPY FROM 语法用于将来自外部 文件(磁盘文件 / 网络管道 / IPC 管道)的数据导入到数据库的表中。COPY FROM 支持只导入指定的部分列,其它列被填充为默认值。COPY FROM 还支持带有 WHERE 子句,只允许满足条件的行被导入到表中。

COPY FROM 的实现逻辑比 COPY TO 相对复杂一些。其原因在于,COPY TO 是要把数据库中的数据导出到外部,其中获取数据这一步及其并行优化,很大程度上借助了优化器和执行器的能力,复用了很多代码;而 COPY FROM 是要把外部数据导入数据库,其中写入数据库的行为因需要更高效的定制实现,而不能复用 INSERT 相关的执行器代码了。

本文基于当前 PostgreSQL 主干开发分支(PostgreSQL 17 under devel)源代码对这个过程进行分析。分析过程中发现 COPY FROM 的代码存在一些小小的问题(真的是很小的问题..),于是误打误撞地向 PostgreSQL 社区贡献了自己的第一个 patch:

commit e434e21e114b423e919324ad6ce1f3f079ca2a03
Author: Michael Paquier <michael@paquier.xyz>
Date:   Sat Sep 9 21:12:41 2023 +0900

    Remove redundant assignments in copyfrom.c

    The tuple descriptor and the number of attributes are assigned twice to
    the same values in BeginCopyFrom(), for what looks like a small thinko
    coming from the refactoring done in c532d15dddff1.

    Author: Jingtang Zhang
    Discussion: https://postgr.es/m/CAPsk3_CrYeXUVHEiaWAYxY9BKiGvGT3AoXo_+Jm0xP_s_VmXCA@mail.gmail.com

COPY Statement

如对 COPY TO 的分析所述,此类语法被视为一种 DDL。在执行器开始处理之前,语法解析器已经把与 COPY 相关的参数设置在 CopyStmt 结构中了。其中:

  • relation:将要被导入的表
  • attlist:将要导入的列名列表
  • is_from:当前执行的是 COPY TO 还是 COPY FROM
  • is_program:导入的来源端是否是一个进程(管道)
  • filename:导入来源端的文件名/程序名(为 NULL 意味着从 STDIN 导入)
  • options:导入选项
/* ----------------------
 *      Copy Statement
 *
 * We support "COPY relation FROM file", "COPY relation TO file", and
 * "COPY (query) TO file".  In any given CopyStmt, exactly one of "relation"
 * and "query" must be non-NULL.
 * ----------------------
 */
typedef struct CopyStmt
{
    NodeTag     type;
    RangeVar   *relation;       /* the relation to copy */
    Node       *query;          /* the query (SELECT or DML statement with
                                 * RETURNING) to copy, as a raw parse tree */
    List       *attlist;        /* List of column names (as Strings), or NIL
                                 * for all columns */
    bool        is_from;        /* TO or FROM */
    bool        is_program;     /* is 'filename' a program to popen? */
    char       *filename;       /* filename, or NULL for STDIN/STDOUT */
    List       *options;        /* List of DefElem nodes */
    Node       *whereClause;    /* WHERE condition (or NULL) */
} CopyStmt;

权限检查

进入到 DoCopy 函数后,需要进行初步的权限检查。首先需要做判断的是从文件/进程导入的场景:如果是从文件导入,那么当前用户需要有读文件的权限;如果是从程序导入,那么当前用户需要有执行程序的权限:

bool        pipe = (stmt->filename == NULL);

/*
 * Disallow COPY to/from file or program except to users with the
 * appropriate role.
 */
if (!pipe)
{
    if (stmt->is_program)
    {
        if (!has_privs_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM))
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("permission denied to COPY to or from an external program"),
                     errdetail("Only roles with privileges of the \"%s\" role may COPY to or from an external program.",
                               "pg_execute_server_program"),
                     errhint("Anyone can COPY to stdout or from stdin. "
                             "psql's \\copy command also works for anyone.")));
    }
    else
    {
        if (is_from && !has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("permission denied to COPY from a file"),
                     errdetail("Only roles with privileges of the \"%s\" role may COPY from a file.",
                               "pg_read_server_files"),
                     errhint("Anyone can COPY to stdout or from stdin. "
                             "psql's \\copy command also works for anyone.")));

        if (!is_from && !has_privs_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("permission denied to COPY to a file"),
                     errdetail("Only roles with privileges of the \"%s\" role may COPY to a file.",
                               "pg_write_server_files"),
                     errhint("Anyone can COPY to stdout or from stdin. "
                             "psql's \\copy command also works for anyone.")));
    }
}

下一步是对将要导入的数据库表进行准备。对于 COPY FROM 来说,需要对表施加 RowExclusiveLock 级别的锁。这个级别与其它 DML 所施加的锁等级一致:

LOCKMODE    lockmode = is_from ? RowExclusiveLock : AccessShareLock;

/* Open and lock the relation, using the appropriate lock type. */
rel = table_openrv(stmt->relation, lockmode);

如果指定了 WHERE 子句,那么还需要将其处理为布尔表达式:

if (stmt->whereClause)
{
    /* add nsitem to query namespace */
    addNSItemToQuery(pstate, nsitem, false, true, true);

    /* Transform the raw expression tree */
    whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);

    /* Make sure it yields a boolean result. */
    whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");

    /* we have to fix its collations too */
    assign_expr_collations(pstate, whereClause);

    whereClause = eval_const_expressions(NULL, whereClause);

    whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
    whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
}

对于 COPY FROM 来说,需要确保对被导入的列具有插入权限。此外,不支持行级别安全策略:

perminfo = nsitem->p_perminfo;
perminfo->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);

tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
foreach(cur, attnums)
{
    int         attno;
    Bitmapset **bms;

    attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
    bms = is_from ? &perminfo->insertedCols : &perminfo->selectedCols;

    *bms = bms_add_member(*bms, attno);
}
ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true);

接下来,执行器逻辑开始处理 COPY FROM 的具体事宜。与 COPY TO 类似,BeginCopyFrom / CopyFrom / EndCopyFrom 三个函数分别对应了三个执行阶段:

  1. 准备
  2. 执行
  3. 结束
if (is_from)
{
    CopyFromState cstate;

    Assert(rel);

    /* check read-only transaction and parallel mode */
    if (XactReadOnly && !rel->rd_islocaltemp)
        PreventCommandIfReadOnly("COPY FROM");

    cstate = BeginCopyFrom(pstate, rel, whereClause,
                           stmt->filename, stmt->is_program,
                           NULL, stmt->attlist, stmt->options);
    *processed = CopyFrom(cstate);  /* copy from file to database */
    EndCopyFrom(cstate);
}
else
{
    /* COPY TO */
}

COPY FROM 准备阶段

BeginCopyFrom 完成 COPY FROM 的准备工作,主要是初始化一个 CopyFromState 结构:

/*
 * This struct contains all the state variables used throughout a COPY FROM
 * operation.
 */
typedef struct CopyFromStateData
{
    /* low-level state data */
    CopySource  copy_src;       /* type of copy source */
    FILE       *copy_file;      /* used if copy_src == COPY_FILE */
    StringInfo  fe_msgbuf;      /* used if copy_src == COPY_FRONTEND */

    EolType     eol_type;       /* EOL type of input */
    int         file_encoding;  /* file or remote side's character encoding */
    bool        need_transcoding;   /* file encoding diff from server? */
    Oid         conversion_proc;    /* encoding conversion function */

    /* parameters from the COPY command */
    Relation    rel;            /* relation to copy from */
    List       *attnumlist;     /* integer list of attnums to copy */
    char       *filename;       /* filename, or NULL for STDIN */
    bool        is_program;     /* is 'filename' a program to popen? */
    copy_data_source_cb data_source_cb; /* function for reading data */

    CopyFormatOptions opts;
    bool       *convert_select_flags;   /* per-column CSV/TEXT CS flags */
    Node       *whereClause;    /* WHERE condition (or NULL) */

    /* these are just for error messages, see CopyFromErrorCallback */
    const char *cur_relname;    /* table name for error messages */
    uint64      cur_lineno;     /* line number for error messages */
    const char *cur_attname;    /* current att for error messages */
    const char *cur_attval;     /* current att value for error messages */
    bool        relname_only;   /* don't output line number, att, etc. */

    /*
     * Working state
     */
    MemoryContext copycontext;  /* per-copy execution context */

    AttrNumber  num_defaults;   /* count of att that are missing and have
                                 * default value */
    FmgrInfo   *in_functions;   /* array of input functions for each attrs */
    Oid        *typioparams;    /* array of element types for in_functions */
    int        *defmap;         /* array of default att numbers related to
                                 * missing att */
    ExprState **defexprs;       /* array of default att expressions for all
                                 * att */
    bool       *defaults;       /* if DEFAULT marker was found for
                                 * corresponding att */
    bool        volatile_defexprs;  /* is any of defexprs volatile? */
    List       *range_table;    /* single element list of RangeTblEntry */
    List       *rteperminfos;   /* single element list of RTEPermissionInfo */
    ExprState  *qualexpr;

    TransitionCaptureState *transition_capture;

    /*
     * These variables are used to reduce overhead in COPY FROM.
     *
     * attribute_buf holds the separated, de-escaped text for each field of
     * the current line.  The CopyReadAttributes functions return arrays of
     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
     * the buffer on each cycle.
     *
     * In binary COPY FROM, attribute_buf holds the binary data for the
     * current field, but the usage is otherwise similar.
     */
    StringInfoData attribute_buf;

    /* field raw data pointers found by COPY FROM */

    int         max_fields;
    char      **raw_fields;

    /*
     * Similarly, line_buf holds the whole input line being processed. The
     * input cycle is first to read the whole line into line_buf, and then
     * extract the individual attribute fields into attribute_buf.  line_buf
     * is preserved unmodified so that we can display it in error messages if
     * appropriate.  (In binary mode, line_buf is not used.)
     */
    StringInfoData line_buf;
    bool        line_buf_valid; /* contains the row being processed? */

    /*
     * input_buf holds input data, already converted to database encoding.
     *
     * In text mode, CopyReadLine parses this data sufficiently to locate line
     * boundaries, then transfers the data to line_buf. We guarantee that
     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
     * mode, input_buf is not used.)
     *
     * If encoding conversion is not required, input_buf is not a separate
     * buffer but points directly to raw_buf.  In that case, input_buf_len
     * tracks the number of bytes that have been verified as valid in the
     * database encoding, and raw_buf_len is the total number of bytes stored
     * in the buffer.
     */
#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
    char       *input_buf;
    int         input_buf_index;    /* next byte to process */
    int         input_buf_len;  /* total # of bytes stored */
    bool        input_reached_eof;  /* true if we reached EOF */
    bool        input_reached_error;    /* true if a conversion error happened */
    /* Shorthand for number of unconsumed bytes available in input_buf */
#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)

    /*
     * raw_buf holds raw input data read from the data source (file or client
     * connection), not yet converted to the database encoding.  Like with
     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
     */
#define RAW_BUF_SIZE 65536      /* we palloc RAW_BUF_SIZE+1 bytes */
    char       *raw_buf;
    int         raw_buf_index;  /* next byte to process */
    int         raw_buf_len;    /* total # of bytes stored */
    bool        raw_reached_eof;    /* true if we reached EOF */

    /* Shorthand for number of unconsumed bytes available in raw_buf */
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)

    uint64      bytes_processed;    /* number of bytes processed so far */
} CopyFromStateData;

typedef struct CopyFromStateData *CopyFromState;

其中具体需要被初始化的结构包括:

  • 执行器内存上下文
  • 将要被处理的列编号
  • 输入格式解析选项
  • 输入端的编码格式,已经是否需要转换编码,编码转换函数指针
  • 执行状态
  • 输入缓冲区及其指针和标志位
    • raw_buf:存放从输入端接收到的裸字节
    • input_buf:(文本模式下)存放从裸字节经过编码转换以后的字符
    • line_buf:(文本模式下)存放一行数据的完整字符
    • attribute_buf:存放当前一行数据解除转义以后按列分隔的内容
  • 每个列的输入转换函数(将字符串格式转为内部格式)和默认值
  • 输入文件描述符
    • 如果来自于客户端,那么向对方发送 G 协议,表明要接收的列和格式
    • 如果来自于程序,那么 popen 启动程序
    • 如果来自于文件,那么 open 打开文件,并 fstat 确认文件存在且不是目录

COPY FROM 执行阶段

CopyFrom 函数完成每一行数据的收集和写入,最终返回处理数据的总行数。

表类型检查

首先进行的是表类型检查。只有普通表、外部表、分区表,或其它带有 INSTEAD OF INSERT 行触发器的目标才可以进行 COPY FROM:

/*
 * The target must be a plain, foreign, or partitioned relation, or have
 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
 * allowed on views, so we only hint about them in the view case.)
 */
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
    cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
    cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
    !(cstate->rel->trigdesc &&
      cstate->rel->trigdesc->trig_insert_instead_row))
{
    if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                 errmsg("cannot copy to view \"%s\"",
                        RelationGetRelationName(cstate->rel)),
                 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
    else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                 errmsg("cannot copy to materialized view \"%s\"",
                        RelationGetRelationName(cstate->rel))));
    else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                 errmsg("cannot copy to sequence \"%s\"",
                        RelationGetRelationName(cstate->rel))));
    else
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                 errmsg("cannot copy to non-table relation \"%s\"",
                        RelationGetRelationName(cstate->rel))));
}

元组可见性优化

接下来是对 INSERT 标志位的优化。如果被 COPY FROM 的目标是当前事务中新建的,那么加上 TABLE_INSERT_SKIP_FSM,插入时就不必检查并重用空闲空间了;如果目标是当前子事务中新建的,加上 TABLE_INSERT_FROZEN 使插入的行立刻冻结。

初始化执行器状态

如果 COPY FROM 的目标是一个外表,那么调用 FDW API 的 BeginForeignInsert 使外表准备好被插入;如果外表支持批量插入,那么通过 FDW API 的 GetForeignModifyBatchSize 获取批量插入的大小,否则默认每次插入一行:

if (resultRelInfo->ri_FdwRoutine != NULL &&
    resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
    resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
                                                     resultRelInfo);

/*
 * Also, if the named relation is a foreign table, determine if the FDW
 * supports batch insert and determine the batch size (a FDW may support
 * batching, but it may be disabled for the server/table).
 *
 * If the FDW does not support batching, we set the batch size to 1.
 */
if (resultRelInfo->ri_FdwRoutine != NULL &&
    resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
    resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
    resultRelInfo->ri_BatchSize =
        resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
else
    resultRelInfo->ri_BatchSize = 1;

Assert(resultRelInfo->ri_BatchSize >= 1);

初始化分区路由

如果 COPY FROM 的目标是分区表,那么初始化分区表的元组路由:

/*
 * If the named relation is a partitioned table, initialize state for
 * CopyFrom tuple routing.
 */
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);

初始化过滤条件

如果 COPY FROM 指定了 WHERE 子句,那么初始化过滤条件:

if (cstate->whereClause)
    cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
                                    &mtstate->ps);

批量写入准备

接下来是 批量写入 的优化。对于数据库表和外部表的写入,具有两种 AM 接口:

  • 单行写入:table_tuple_insert() / ExecForeignInsert
  • 批量写入:table_multi_insert() / ExecForeignBatchInsert

通常来说,批量写入是更高效的,因为页面锁定频率和 WAL 记录数都更少了。

如果表上有索引,那么堆表元组是批量写入的,而索引元组的写入依旧是随机的。这可能会成为影响性能的因素。

不是所有场景下都能安全使用批量写入的,所以接下来要把无法安全使用的场景挑出来:

  • 表上有 BEFORE 或 INSTEAD OF 触发器:因为在插入时会顺带查询表,攒批会导致本应该能查询到的行还未被写入
  • 是一个外表,但不支持批量写入,或强制不使用批量写入
  • 是一个分区表,并且具有 INSERT 触发器
  • 表上具有 VOLATILE 的默认值表达式:因为该表达式也有可能去查询表
  • WHERE 子句中带有 VOLATILE 函数
  • 是一个分区表,但分区中具有不支持批量插入的外部表

对于可以进行批量写入的场景,初始化批量写入需要用到的内存缓冲区和相关指针;否则,初始化单行写入需要用到的结构。

/*
 * It's generally more efficient to prepare a bunch of tuples for
 * insertion, and insert them in one
 * table_multi_insert()/ExecForeignBatchInsert() call, than call
 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 * However, there are a number of reasons why we might not be able to do
 * this.  These are explained below.
 */
if (resultRelInfo->ri_TrigDesc != NULL &&
    (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
{
    /*
     * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
     * triggers on the table. Such triggers might query the table we're
     * inserting into and act differently if the tuples that have already
     * been processed and prepared for insertion are not there.
     */
    insertMethod = CIM_SINGLE;
}
else if (resultRelInfo->ri_FdwRoutine != NULL &&
         resultRelInfo->ri_BatchSize == 1)
{
    /*
     * Can't support multi-inserts to a foreign table if the FDW does not
     * support batching, or it's disabled for the server or foreign table.
     */
    insertMethod = CIM_SINGLE;
}
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
         resultRelInfo->ri_TrigDesc->trig_insert_new_table)
{
    /*
     * For partitioned tables we can't support multi-inserts when there
     * are any statement level insert triggers. It might be possible to
     * allow partitioned tables with such triggers in the future, but for
     * now, CopyMultiInsertInfoFlush expects that any after row insert and
     * statement level insert triggers are on the same relation.
     */
    insertMethod = CIM_SINGLE;
}
else if (cstate->volatile_defexprs)
{
    /*
     * Can't support multi-inserts if there are any volatile default
     * expressions in the table.  Similarly to the trigger case above,
     * such expressions may query the table we're inserting into.
     *
     * Note: It does not matter if any partitions have any volatile
     * default expressions as we use the defaults from the target of the
     * COPY command.
     */
    insertMethod = CIM_SINGLE;
}
else if (contain_volatile_functions(cstate->whereClause))
{
    /*
     * Can't support multi-inserts if there are any volatile function
     * expressions in WHERE clause.  Similarly to the trigger case above,
     * such expressions may query the table we're inserting into.
     */
    insertMethod = CIM_SINGLE;
}
else
{
    /*
     * For partitioned tables, we may still be able to perform bulk
     * inserts.  However, the possibility of this depends on which types
     * of triggers exist on the partition.  We must disable bulk inserts
     * if the partition is a foreign table that can't use batching or it
     * has any before row insert or insert instead triggers (same as we
     * checked above for the parent table).  Since the partition's
     * resultRelInfos are initialized only when we actually need to insert
     * the first tuple into them, we must have the intermediate insert
     * method of CIM_MULTI_CONDITIONAL to flag that we must later
     * determine if we can use bulk-inserts for the partition being
     * inserted into.
     */
    if (proute)
        insertMethod = CIM_MULTI_CONDITIONAL;
    else
        insertMethod = CIM_MULTI;

    CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
                            estate, mycid, ti_options);
}

/*
 * If not using batch mode (which allocates slots as needed) set up a
 * tuple slot too. When inserting into a partitioned table, we also need
 * one, even if we might batch insert, to read the tuple in the root
 * partition's form.
 */
if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
{
    singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
                                   &estate->es_tupleTable);
    bistate = GetBulkInsertState();
}

处理每一行输入数据

接下来是处理每一行数据的大循环,循环将会在没有任何数据输入时退出。

首先,不管是单行写入模式还是批量写入模式,都要获取一个用于保存当前元组的槽位。如果是单行写入,那么直接使用刚才调用 table_slot_create 创建出来的槽位就可以了,并且后面的每一行也都一直复用这个槽位;如果是批量写入,那么从内存缓冲区里获取一个槽位:

/* select slot to (initially) load row into */
if (insertMethod == CIM_SINGLE || proute)
{
    myslot = singleslot;
    Assert(myslot != NULL);
}
else
{
    Assert(resultRelInfo == target_resultRelInfo);
    Assert(insertMethod == CIM_MULTI);

    myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
                                             resultRelInfo);
}

从输入文件描述符中获取一行数据,根据其格式,通过初始化阶段中准备好的转换函数和默认值表达式,转换为数据库内部的元组表示形式(此处省略内部细节),并存储在槽位中:

/* Directly store the values/nulls array in the slot */
if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
    break;

ExecStoreVirtualTuple(myslot);

/*
 * Constraints and where clause might reference the tableoid column,
 * so (re-)initialize tts_tableOid before evaluating them.
 */
myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);

如果指定了 WHERE 子句,那么将这行数据根据过滤条件进行判断,略过不符合条件的行,直接进行下轮循环:

if (cstate->whereClause)
{
    econtext->ecxt_scantuple = myslot;
    /* Skip items that don't match COPY's WHERE clause */
    if (!ExecQual(cstate->qualexpr, econtext))
    {
        /*
         * Report that this tuple was filtered out by the WHERE
         * clause.
         */
        pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
                                     ++excluded);
        continue;
    }
}

如果 COPY FROM 的目标是一个分区表,那么接下来需要确认当前元组真正将会被插入的子分区,并确认这个子分区是否可以使用批量写入。如果可以批量写入,而本次要写入的子分区与上一个子分区不同时,需要先把上一个子分区攒批的缓存元组刷入磁盘,然后将当前元组写入槽位。由于子分区和父分区的列编号可能是不一致的,所以需要获取一个 TupleConversionMap 结构,该结构能够根据列名称,将子分区和父分区对应同一个列的编号相互映射。在写入槽位时,需要以该映射作为参数,保证数据的正确性。

/* Determine the partition to insert the tuple into */
if (proute)
{
    TupleConversionMap *map;

    /*
     * Attempt to find a partition suitable for this tuple.
     * ExecFindPartition() will raise an error if none can be found or
     * if the found partition is not suitable for INSERTs.
     */
    resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
                                      proute, myslot, estate);

    if (prevResultRelInfo != resultRelInfo)
    {
        /* Determine which triggers exist on this partition */
        has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
                                      resultRelInfo->ri_TrigDesc->trig_insert_before_row);

        has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
                                       resultRelInfo->ri_TrigDesc->trig_insert_instead_row);

        /*
         * Disable multi-inserts when the partition has BEFORE/INSTEAD
         * OF triggers, or if the partition is a foreign table that
         * can't use batching.
         */
        leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
            !has_before_insert_row_trig &&
            !has_instead_insert_row_trig &&
            (resultRelInfo->ri_FdwRoutine == NULL ||
             resultRelInfo->ri_BatchSize > 1);

        /* Set the multi-insert buffer to use for this partition. */
        if (leafpart_use_multi_insert)
        {
            if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
                CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
                                               resultRelInfo);
        }
        else if (insertMethod == CIM_MULTI_CONDITIONAL &&
                 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
        {
            /*
             * Flush pending inserts if this partition can't use
             * batching, so rows are visible to triggers etc.
             */
            CopyMultiInsertInfoFlush(&multiInsertInfo,
                                     resultRelInfo,
                                     &processed);
        }

        if (bistate != NULL)
            ReleaseBulkInsertStatePin(bistate);
        prevResultRelInfo = resultRelInfo;
    }

    /*
     * If we're capturing transition tuples, we might need to convert
     * from the partition rowtype to root rowtype. But if there are no
     * BEFORE triggers on the partition that could change the tuple,
     * we can just remember the original unconverted tuple to avoid a
     * needless round trip conversion.
     */
    if (cstate->transition_capture != NULL)
        cstate->transition_capture->tcs_original_insert_tuple =
            !has_before_insert_row_trig ? myslot : NULL;

    /*
     * We might need to convert from the root rowtype to the partition
     * rowtype.
     */
    map = ExecGetRootToChildMap(resultRelInfo, estate);
    if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    {
        /* non batch insert */
        if (map != NULL)
        {
            TupleTableSlot *new_slot;

            new_slot = resultRelInfo->ri_PartitionTupleSlot;
            myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
        }
    }
    else
    {
        /*
         * Prepare to queue up tuple for later batch insert into
         * current partition.
         */
        TupleTableSlot *batchslot;

        /* no other path available for partitioned table */
        Assert(insertMethod == CIM_MULTI_CONDITIONAL);

        batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
                                                    resultRelInfo);

        if (map != NULL)
            myslot = execute_attr_map_slot(map->attrMap, myslot,
                                           batchslot);
        else
        {
            /*
             * This looks more expensive than it is (Believe me, I
             * optimized it away. Twice.). The input is in virtual
             * form, and we'll materialize the slot below - for most
             * slot types the copy performs the work materialization
             * would later require anyway.
             */
            ExecCopySlot(batchslot, myslot);
            myslot = batchslot;
        }
    }

    /* ensure that triggers etc see the right relation  */
    myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}

最后终于到了完成写入的环节:

  • 如果表上有 BEFORE ROW INSERT 触发器,那么先执行一遍,如果执行的结果是 do nothing,就直接跳过写入
  • 如果表上有 INSTEAD OF INSERT ROW 触发器,那么把这个元组交给触发器处理

然后进行一些写入前检查:

  • 计算生成列的列值
  • 检查元组是否符合表上的约束
  • 检查元组是否符合分区约束

根据当前元组是单行写入还是批量写入,将元组写入 AM 或内存缓冲区中。如果表上有索引,还需要创建并插入相应的索引元组。最后调用 AFTER ROW INSERT 触发器。

skip_tuple = false;

/* BEFORE ROW INSERT Triggers */
if (has_before_insert_row_trig)
{
    if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
        skip_tuple = true;  /* "do nothing" */
}

if (!skip_tuple)
{
    /*
     * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
     * tuple.  Otherwise, proceed with inserting the tuple into the
     * table or foreign table.
     */
    if (has_instead_insert_row_trig)
    {
        ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    }
    else
    {
        /* Compute stored generated columns */
        if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
            resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
            ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
                                       CMD_INSERT);

        /*
         * If the target is a plain table, check the constraints of
         * the tuple.
         */
        if (resultRelInfo->ri_FdwRoutine == NULL &&
            resultRelInfo->ri_RelationDesc->rd_att->constr)
            ExecConstraints(resultRelInfo, myslot, estate);

        /*
         * Also check the tuple against the partition constraint, if
         * there is one; except that if we got here via tuple-routing,
         * we don't need to if there's no BR trigger defined on the
         * partition.
         */
        if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
            (proute == NULL || has_before_insert_row_trig))
            ExecPartitionCheck(resultRelInfo, myslot, estate, true);

        /* Store the slot in the multi-insert buffer, when enabled. */
        if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
        {
            /*
             * The slot previously might point into the per-tuple
             * context. For batching it needs to be longer lived.
             */
            ExecMaterializeSlot(myslot);

            /* Add this tuple to the tuple buffer */
            CopyMultiInsertInfoStore(&multiInsertInfo,
                                     resultRelInfo, myslot,
                                     cstate->line_buf.len,
                                     cstate->cur_lineno);

            /*
             * If enough inserts have queued up, then flush all
             * buffers out to their tables.
             */
            if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
                CopyMultiInsertInfoFlush(&multiInsertInfo,
                                         resultRelInfo,
                                         &processed);

            /*
             * We delay updating the row counter and progress of the
             * COPY command until after writing the tuples stored in
             * the buffer out to the table, as in single insert mode.
             * See CopyMultiInsertBufferFlush().
             */
            continue;   /* next tuple please */
        }
        else
        {
            List       *recheckIndexes = NIL;

            /* OK, store the tuple */
            if (resultRelInfo->ri_FdwRoutine != NULL)
            {
                myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
                                                                         resultRelInfo,
                                                                         myslot,
                                                                         NULL);

                if (myslot == NULL) /* "do nothing" */
                    continue;   /* next tuple please */

                /*
                 * AFTER ROW Triggers might reference the tableoid
                 * column, so (re-)initialize tts_tableOid before
                 * evaluating them.
                 */
                myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
            }
            else
            {
                /* OK, store the tuple and create index entries for it */
                table_tuple_insert(resultRelInfo->ri_RelationDesc,
                                   myslot, mycid, ti_options, bistate);

                if (resultRelInfo->ri_NumIndices > 0)
                    recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
                                                           myslot,
                                                           estate,
                                                           false,
                                                           false,
                                                           NULL,
                                                           NIL,
                                                           false);
            }

            /* AFTER ROW INSERT Triggers */
            ExecARInsertTriggers(estate, resultRelInfo, myslot,
                                 recheckIndexes, cstate->transition_capture);

            list_free(recheckIndexes);
        }
    }

    /*
     * We count only tuples not suppressed by a BEFORE INSERT trigger
     * or FDW; this is the same definition used by nodeModifyTable.c
     * for counting tuples inserted by an INSERT command.  Update
     * progress of the COPY command as well.
     */
    pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
                                 ++processed);
}

后处理

此时大循环的执行已经结束,这意味着所有的元组已经被处理。接下来进行一些后处理,最重要的是将最后一批在内存中缓存的元组刷下去:

/* Flush any remaining buffered tuples */
if (insertMethod != CIM_SINGLE)
{
    if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
        CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
}

COPY FROM 结束阶段

EndCopyFrom 完成 COPY FROM 的收尾清理工作。主要是关闭文件描述符,并销毁内存上下文:

/*
 * Clean up storage and release resources for COPY FROM.
 */
void
EndCopyFrom(CopyFromState cstate)
{
    /* No COPY FROM related resources except memory. */
    if (cstate->is_program)
    {
        ClosePipeFromProgram(cstate);
    }
    else
    {
        if (cstate->filename != NULL && FreeFile(cstate->copy_file))
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not close file \"%s\": %m",
                            cstate->filename)));
    }

    pgstat_progress_end_command();

    MemoryContextDelete(cstate->copycontext);
    pfree(cstate);
}

References

PostgreSQL Documentation: COPY

Edit this page on GitHub
Prev
PostgreSQL - CREATE INDEX CONCURRENTLY
Next
PostgreSQL - COPY TO