diff options
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 4579 |
1 files changed, 76 insertions, 4503 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 115860a9d40..b6143b8bf21 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -18,25 +18,13 @@ #include <unistd.h> #include <sys/stat.h> -#include "access/heapam.h" -#include "access/htup_details.h" #include "access/sysattr.h" -#include "access/tableam.h" +#include "access/table.h" #include "access/xact.h" -#include "access/xlog.h" -#include "catalog/dependency.h" #include "catalog/pg_authid.h" -#include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" -#include "commands/trigger.h" -#include "executor/execPartition.h" #include "executor/executor.h" -#include "executor/nodeModifyTable.h" -#include "executor/tuptable.h" -#include "foreign/fdwapi.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -45,825 +33,13 @@ #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" -#include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" -#include "storage/fd.h" -#include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" -#include "utils/partcache.h" -#include "utils/portal.h" #include "utils/rel.h" #include "utils/rls.h" -#include "utils/snapmgr.h" - -#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) -#define OCTVALUE(c) ((c) - '0') - -/* - * Represents the different source/dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to/from file (or a piped program) */ - COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ - COPY_CALLBACK /* to/from callback function */ -} CopyDest; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL -} EolType; - -/* - * Represents the heap insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or fdw routine */ - CIM_MULTI, /* always use table_multi_insert */ - CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY - * operation. For simplicity, we use the same struct for all variants of COPY, - * even though some fields are used in only some cases. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyStateData -{ - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for - * dest == COPY_NEW_FE in COPY FROM */ - bool is_copy_from; /* COPY TO, or COPY FROM? */ - bool reached_eof; /* true if we read to end of copy data (not - * all copy_dest types maintain this) */ - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to or from */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN/STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - bool header_line; /* CSV header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - List *convert_select; /* list of column names (can be NIL) */ - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - - /* - * Working state for COPY TO/FROM - */ - MemoryContext copycontext; /* per-copy execution context */ - - /* - * Working state for COPY TO - */ - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - - /* - * Working state for COPY FROM - */ - AttrNumber num_defaults; - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - int *defmap; /* array of default att numbers */ - ExprState **defexprs; /* array of default att expressions */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, convert it - * to server encoding there, and then extract the individual attribute - * fields into attribute_buf. line_buf is preserved unmodified so that we - * can display it in error messages if appropriate. (In binary mode, - * line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_converted; /* converted to server encoding? */ - bool line_buf_valid; /* contains the row being processed? */ - - /* - * Finally, raw_buf holds raw data read from the data source (file or - * client connection). In text mode, CopyReadLine parses this data - * sufficiently to locate line boundaries, then transfers the data to - * line_buf and converts it. In binary mode, CopyReadBinaryData fetches - * appropriate amounts of data from this buffer. In both modes, we - * guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) -} CopyStateData; - -/* DestReceiver for COPY (query) TO */ -typedef struct -{ - DestReceiver pub; /* publicly-known function pointers */ - CopyState cstate; /* CopyStateData for the command */ - uint64 processed; /* # of tuples processed */ -} DR_copy; - - -/* - * No more than this many tuples per CopyMultiInsertBuffer - * - * Caution: Don't make this too big, as we could end up with this many - * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's - * multiInsertBuffers list. Increasing this can cause quadratic growth in - * memory requirements during copies into partitioned tables with a large - * number of partitions. - */ -#define MAX_BUFFERED_TUPLES 1000 - -/* - * Flush buffers if there are >= this many bytes, as counted by the input - * size, of tuples stored. - */ -#define MAX_BUFFERED_BYTES 65535 - -/* Trim the list of buffers back down to this number after flushing */ -#define MAX_PARTITION_BUFFERS 32 - -/* Stores multi-insert data related to a single relation in CopyFrom. */ -typedef struct CopyMultiInsertBuffer -{ - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ - ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel */ - int nused; /* number of 'slots' containing tuples */ - uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy - * stream */ -} CopyMultiInsertBuffer; - -/* - * Stores one or many CopyMultiInsertBuffers and details about the size and - * number of tuples which are stored in them. This allows multiple buffers to - * exist at once when COPYing into a partitioned table. - */ -typedef struct CopyMultiInsertInfo -{ - List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ - int bufferedTuples; /* number of tuples buffered over all buffers */ - int bufferedBytes; /* number of bytes from all buffered tuples */ - CopyState cstate; /* Copy state for this CopyMultiInsertInfo */ - EState *estate; /* Executor state used for COPY */ - CommandId mycid; /* Command Id used for COPY */ - int ti_options; /* table insert options */ -} CopyMultiInsertInfo; - - -/* - * These macros centralize code used to process line_buf and raw_buf buffers. - * They are macros because they often do continue/break control and to avoid - * function call overhead in tight COPY loops. - * - * We must use "if (1)" because the usual "do {...} while(0)" wrapper would - * prevent the continue/break processing from working. We end the "if (1)" - * with "else ((void) 0)" to ensure the "if" does not unintentionally match - * any "else" in the calling code, and to avoid any compiler warnings about - * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros. - */ - -/* - * This keeps the character read at the top of the loop in the buffer - * even if there is more than one read-ahead. - */ -#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ -if (1) \ -{ \ - if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \ - { \ - raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ - need_data = true; \ - continue; \ - } \ -} else ((void) 0) - -/* This consumes the remainder of the buffer and breaks */ -#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ -if (1) \ -{ \ - if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \ - { \ - if (extralen) \ - raw_buf_ptr = copy_buf_len; /* consume the partial character */ \ - /* backslash just before EOF, treat as data char */ \ - result = true; \ - break; \ - } \ -} else ((void) 0) - -/* - * Transfer any approved data to line_buf; must do this to be sure - * there is some room in raw_buf. - */ -#define REFILL_LINEBUF \ -if (1) \ -{ \ - if (raw_buf_ptr > cstate->raw_buf_index) \ - { \ - appendBinaryStringInfo(&cstate->line_buf, \ - cstate->raw_buf + cstate->raw_buf_index, \ - raw_buf_ptr - cstate->raw_buf_index); \ - cstate->raw_buf_index = raw_buf_ptr; \ - } \ -} else ((void) 0) - -/* Undo any read-ahead and jump out of the block. */ -#define NO_END_OF_COPY_GOTO \ -if (1) \ -{ \ - raw_buf_ptr = prev_raw_ptr + 1; \ - goto not_end_of_copy; \ -} else ((void) 0) - -static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; - - -/* non-export function prototypes */ -static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, - RawStmt *raw_query, Oid queryRelId, List *attnamelist, - List *options); -static void EndCopy(CopyState cstate); -static void ClosePipeToProgram(CopyState cstate); -static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, - Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); -static void EndCopyTo(CopyState cstate); -static uint64 DoCopyTo(CopyState cstate); -static uint64 CopyTo(CopyState cstate); -static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); -static bool CopyReadLine(CopyState cstate); -static bool CopyReadLineText(CopyState cstate); -static int CopyReadAttributesText(CopyState cstate); -static int CopyReadAttributesCSV(CopyState cstate); -static Datum CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, - Oid typioparam, int32 typmod, - bool *isnull); -static void CopyAttributeOutText(CopyState cstate, char *string); -static void CopyAttributeOutCSV(CopyState cstate, char *string, - bool use_quote, bool single_attr); -static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, - List *attnamelist); -static char *limit_printout_length(const char *str); - -/* Low-level communications functions */ -static void SendCopyBegin(CopyState cstate); -static void ReceiveCopyBegin(CopyState cstate); -static void SendCopyEnd(CopyState cstate); -static void CopySendData(CopyState cstate, const void *databuf, int datasize); -static void CopySendString(CopyState cstate, const char *str); -static void CopySendChar(CopyState cstate, char c); -static void CopySendEndOfRow(CopyState cstate); -static int CopyGetData(CopyState cstate, void *databuf, - int minread, int maxread); -static void CopySendInt32(CopyState cstate, int32 val); -static bool CopyGetInt32(CopyState cstate, int32 *val); -static void CopySendInt16(CopyState cstate, int16 val); -static bool CopyGetInt16(CopyState cstate, int16 *val); -static bool CopyLoadRawBuf(CopyState cstate); -static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); - - -/* - * Send copy start/stop messages for frontend copies. These have changed - * in past protocol redesigns. - */ -static void -SendCopyBegin(CopyState cstate) -{ - if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) - { - /* new way */ - StringInfoData buf; - int natts = list_length(cstate->attnumlist); - int16 format = (cstate->binary ? 1 : 0); - int i; - - pq_beginmessage(&buf, 'H'); - pq_sendbyte(&buf, format); /* overall format */ - pq_sendint16(&buf, natts); - for (i = 0; i < natts; i++) - pq_sendint16(&buf, format); /* per-column formats */ - pq_endmessage(&buf); - cstate->copy_dest = COPY_NEW_FE; - } - else - { - /* old way */ - if (cstate->binary) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY BINARY is not supported to stdout or from stdin"))); - pq_putemptymessage('H'); - /* grottiness needed for old COPY OUT protocol */ - pq_startcopyout(); - cstate->copy_dest = COPY_OLD_FE; - } -} - -static void -ReceiveCopyBegin(CopyState cstate) -{ - if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) - { - /* new way */ - StringInfoData buf; - int natts = list_length(cstate->attnumlist); - int16 format = (cstate->binary ? 1 : 0); - int i; - - pq_beginmessage(&buf, 'G'); - pq_sendbyte(&buf, format); /* overall format */ - pq_sendint16(&buf, natts); - for (i = 0; i < natts; i++) - pq_sendint16(&buf, format); /* per-column formats */ - pq_endmessage(&buf); - cstate->copy_dest = COPY_NEW_FE; - cstate->fe_msgbuf = makeStringInfo(); - } - else - { - /* old way */ - if (cstate->binary) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY BINARY is not supported to stdout or from stdin"))); - pq_putemptymessage('G'); - /* any error in old protocol will make us lose sync */ - pq_startmsgread(); - cstate->copy_dest = COPY_OLD_FE; - } - /* We *must* flush here to ensure FE knows it can send. */ - pq_flush(); -} - -static void -SendCopyEnd(CopyState cstate) -{ - if (cstate->copy_dest == COPY_NEW_FE) - { - /* Shouldn't have any unsent data */ - Assert(cstate->fe_msgbuf->len == 0); - /* Send Copy Done message */ - pq_putemptymessage('c'); - } - else - { - CopySendData(cstate, "\\.", 2); - /* Need to flush out the trailer (this also appends a newline) */ - CopySendEndOfRow(cstate); - pq_endcopyout(false); - } -} - -/*---------- - * CopySendData sends output data to the destination (file or frontend) - * CopySendString does the same for null-terminated strings - * CopySendChar does the same for single characters - * CopySendEndOfRow does the appropriate thing at end of each data row - * (data is not actually flushed except by CopySendEndOfRow) - * - * NB: no data conversion is applied by these functions - *---------- - */ -static void -CopySendData(CopyState cstate, const void *databuf, int datasize) -{ - appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); -} - -static void -CopySendString(CopyState cstate, const char *str) -{ - appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); -} - -static void -CopySendChar(CopyState cstate, char c) -{ - appendStringInfoCharMacro(cstate->fe_msgbuf, c); -} - -static void -CopySendEndOfRow(CopyState cstate) -{ - StringInfo fe_msgbuf = cstate->fe_msgbuf; - - switch (cstate->copy_dest) - { - case COPY_FILE: - if (!cstate->binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, - cstate->copy_file) != 1 || - ferror(cstate->copy_file)) - { - if (cstate->is_program) - { - if (errno == EPIPE) - { - /* - * The pipe will be closed automatically on error at - * the end of transaction, but we might get a better - * error message from the subprocess' exit code than - * just "Broken Pipe" - */ - ClosePipeToProgram(cstate); - - /* - * If ClosePipeToProgram() didn't throw an error, the - * program terminated normally, but closed the pipe - * first. Restore errno, and throw an error. - */ - errno = EPIPE; - } - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to COPY program: %m"))); - } - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to COPY file: %m"))); - } - break; - case COPY_OLD_FE: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->binary) - CopySendChar(cstate, '\n'); - - if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) - { - /* no hope of recovering connection sync, so FATAL */ - ereport(FATAL, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("connection lost during COPY to stdout"))); - } - break; - case COPY_NEW_FE: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->binary) - CopySendChar(cstate, '\n'); - - /* Dump the accumulated row as one CopyData message */ - (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); - break; - case COPY_CALLBACK: - Assert(false); /* Not yet supported. */ - break; - } - - resetStringInfo(fe_msgbuf); -} - -/* - * CopyGetData reads data from the source (file or frontend) - * - * We attempt to read at least minread, and at most maxread, bytes from - * the source. The actual number of bytes read is returned; if this is - * less than minread, EOF was detected. - * - * Note: when copying from the frontend, we expect a proper EOF mark per - * protocol; if the frontend simply drops the connection, we raise error. - * It seems unwise to allow the COPY IN to complete normally in that case. - * - * NB: no data conversion is applied here. - */ -static int -CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) -{ - int bytesread = 0; - - switch (cstate->copy_dest) - { - case COPY_FILE: - bytesread = fread(databuf, 1, maxread, cstate->copy_file); - if (ferror(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from COPY file: %m"))); - if (bytesread == 0) - cstate->reached_eof = true; - break; - case COPY_OLD_FE: - - /* - * We cannot read more than minread bytes (which in practice is 1) - * because old protocol doesn't have any clear way of separating - * the COPY stream from following data. This is slow, but not any - * slower than the code path was originally, and we don't care - * much anymore about the performance of old protocol. - */ - if (pq_getbytes((char *) databuf, minread)) - { - /* Only a \. terminator is legal EOF in old protocol */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection with an open transaction"))); - } - bytesread = minread; - break; - case COPY_NEW_FE: - while (maxread > 0 && bytesread < minread && !cstate->reached_eof) - { - int avail; - - while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) - { - /* Try to receive another message */ - int mtype; - - readmessage: - HOLD_CANCEL_INTERRUPTS(); - pq_startmsgread(); - mtype = pq_getbyte(); - if (mtype == EOF) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection with an open transaction"))); - if (pq_getmessage(cstate->fe_msgbuf, 0)) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection with an open transaction"))); - RESUME_CANCEL_INTERRUPTS(); - switch (mtype) - { - case 'd': /* CopyData */ - break; - case 'c': /* CopyDone */ - /* COPY IN correctly terminated by frontend */ - cstate->reached_eof = true; - return bytesread; - case 'f': /* CopyFail */ - ereport(ERROR, - (errcode(ERRCODE_QUERY_CANCELED), - errmsg("COPY from stdin failed: %s", - pq_getmsgstring(cstate->fe_msgbuf)))); - break; - case 'H': /* Flush */ - case 'S': /* Sync */ - - /* - * Ignore Flush/Sync for the convenience of client - * libraries (such as libpq) that may send those - * without noticing that the command they just - * sent was COPY. - */ - goto readmessage; - default: - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected message type 0x%02X during COPY from stdin", - mtype))); - break; - } - } - avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; - if (avail > maxread) - avail = maxread; - pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); - databuf = (void *) ((char *) databuf + avail); - maxread -= avail; - bytesread += avail; - } - break; - case COPY_CALLBACK: - bytesread = cstate->data_source_cb(databuf, minread, maxread); - break; - } - - return bytesread; -} - - -/* - * These functions do apply some data conversion - */ - -/* - * CopySendInt32 sends an int32 in network byte order - */ -static inline void -CopySendInt32(CopyState cstate, int32 val) -{ - uint32 buf; - - buf = pg_hton32((uint32) val); - CopySendData(cstate, &buf, sizeof(buf)); -} - -/* - * CopyGetInt32 reads an int32 that appears in network byte order - * - * Returns true if OK, false if EOF - */ -static inline bool -CopyGetInt32(CopyState cstate, int32 *val) -{ - uint32 buf; - - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int32) pg_ntoh32(buf); - return true; -} - -/* - * CopySendInt16 sends an int16 in network byte order - */ -static inline void -CopySendInt16(CopyState cstate, int16 val) -{ - uint16 buf; - - buf = pg_hton16((uint16) val); - CopySendData(cstate, &buf, sizeof(buf)); -} - -/* - * CopyGetInt16 reads an int16 that appears in network byte order - */ -static inline bool -CopyGetInt16(CopyState cstate, int16 *val) -{ - uint16 buf; - - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int16) pg_ntoh16(buf); - return true; -} - - -/* - * CopyLoadRawBuf loads some more data into raw_buf - * - * Returns true if able to obtain at least one more byte, else false. - * - * If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start - * of the buffer and then we load more data after that. This case occurs only - * when a multibyte character crosses a bufferload boundary. - */ -static bool -CopyLoadRawBuf(CopyState cstate) -{ - int nbytes = RAW_BUF_BYTES(cstate); - int inbytes; - - /* Copy down the unprocessed data if any. */ - if (nbytes > 0) - memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, - nbytes); - - inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, - 1, RAW_BUF_SIZE - nbytes); - nbytes += inbytes; - cstate->raw_buf[nbytes] = '\0'; - cstate->raw_buf_index = 0; - cstate->raw_buf_len = nbytes; - return (inbytes > 0); -} - -/* - * CopyReadBinaryData - * - * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf - * and writes them to 'dest'. Returns the number of bytes read (which - * would be less than 'nbytes' only if we reach EOF). - */ -static int -CopyReadBinaryData(CopyState cstate, char *dest, int nbytes) -{ - int copied_bytes = 0; - - if (RAW_BUF_BYTES(cstate) >= nbytes) - { - /* Enough bytes are present in the buffer. */ - memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, nbytes); - cstate->raw_buf_index += nbytes; - copied_bytes = nbytes; - } - else - { - /* - * Not enough bytes in the buffer, so must read from the file. Need - * to loop since 'nbytes' could be larger than the buffer size. - */ - do - { - int copy_bytes; - - /* Load more data if buffer is empty. */ - if (RAW_BUF_BYTES(cstate) == 0) - { - if (!CopyLoadRawBuf(cstate)) - break; /* EOF */ - } - - /* Transfer some bytes. */ - copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate)); - memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, copy_bytes); - cstate->raw_buf_index += copy_bytes; - dest += copy_bytes; - copied_bytes += copy_bytes; - } while (copied_bytes < nbytes); - } - - return copied_bytes; -} - /* * DoCopy executes the SQL COPY statement @@ -889,7 +65,6 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed) { - CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); Relation rel; @@ -1110,20 +285,24 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, if (is_from) { + CopyFromState cstate; + Assert(rel); /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); - cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, + cstate = BeginCopyFrom(pstate, rel, whereClause, + stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); - cstate->whereClause = whereClause; *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } else { + CopyToState cstate; + cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); @@ -1139,14 +318,13 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, * Process the statement option list for COPY. * * Scan the options list (a list of DefElem) and transpose the information - * into cstate, applying appropriate error checking. + * into *opts_out, applying appropriate error checking. * - * cstate is assumed to be filled with zeroes initially. + * If 'opts_out' is not NULL, it is assumed to be filled with zeroes initially. * * This is exported so that external users of the COPY API can sanity-check - * a list of options. In that usage, cstate should be passed as NULL - * (since external users don't know sizeof(CopyStateData)) and the collected - * data is just leaked until CurrentMemoryContext is reset. + * a list of options. In that usage, 'opts_out' can be passed as NULL and + * the collected data is just leaked until CurrentMemoryContext is reset. * * Note that additional checking, such as whether column names listed in FORCE * QUOTE actually exist, has to be applied later. This just checks for @@ -1154,7 +332,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, */ void ProcessCopyOptions(ParseState *pstate, - CopyState cstate, + CopyFormatOptions *opts_out, bool is_from, List *options) { @@ -1164,12 +342,10 @@ ProcessCopyOptions(ParseState *pstate, ListCell *option; /* Support external use for option sanity checking */ - if (cstate == NULL) - cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + if (opts_out == NULL) + opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions)); - cstate->is_copy_from = is_from; - - cstate->file_encoding = -1; + opts_out->file_encoding = -1; /* Extract options from the statement node tree */ foreach(option, options) @@ -1189,9 +365,9 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) - cstate->csv_mode = true; + opts_out->csv_mode = true; else if (strcmp(fmt, "binary") == 0) - cstate->binary = true; + opts_out->binary = true; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1206,25 +382,25 @@ ProcessCopyOptions(ParseState *pstate, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); freeze_specified = true; - cstate->freeze = defGetBoolean(defel); + opts_out->freeze = defGetBoolean(defel); } else if (strcmp(defel->defname, "delimiter") == 0) { - if (cstate->delim) + if (opts_out->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); - cstate->delim = defGetString(defel); + opts_out->delim = defGetString(defel); } else if (strcmp(defel->defname, "null") == 0) { - if (cstate->null_print) + if (opts_out->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); - cstate->null_print = defGetString(defel); + opts_out->null_print = defGetString(defel); } else if (strcmp(defel->defname, "header") == 0) { @@ -1234,37 +410,37 @@ ProcessCopyOptions(ParseState *pstate, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); header_specified = true; - cstate->header_line = defGetBoolean(defel); + opts_out->header_line = defGetBoolean(defel); } else if (strcmp(defel->defname, "quote") == 0) { - if (cstate->quote) + if (opts_out->quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); - cstate->quote = defGetString(defel); + opts_out->quote = defGetString(defel); } else if (strcmp(defel->defname, "escape") == 0) { - if (cstate->escape) + if (opts_out->escape) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); - cstate->escape = defGetString(defel); + opts_out->escape = defGetString(defel); } else if (strcmp(defel->defname, "force_quote") == 0) { - if (cstate->force_quote || cstate->force_quote_all) + if (opts_out->force_quote || opts_out->force_quote_all) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, A_Star)) - cstate->force_quote_all = true; + opts_out->force_quote_all = true; else if (defel->arg && IsA(defel->arg, List)) - cstate->force_quote = castNode(List, defel->arg); + opts_out->force_quote = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1274,13 +450,13 @@ ProcessCopyOptions(ParseState *pstate, } else if (strcmp(defel->defname, "force_not_null") == 0) { - if (cstate->force_notnull) + if (opts_out->force_notnull) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, List)) - cstate->force_notnull = castNode(List, defel->arg); + opts_out->force_notnull = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1290,12 +466,12 @@ ProcessCopyOptions(ParseState *pstate, } else if (strcmp(defel->defname, "force_null") == 0) { - if (cstate->force_null) + if (opts_out->force_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); if (defel->arg && IsA(defel->arg, List)) - cstate->force_null = castNode(List, defel->arg); + opts_out->force_null = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1310,14 +486,14 @@ ProcessCopyOptions(ParseState *pstate, * named columns to binary form, storing the rest as NULLs. It's * allowed for the column list to be NIL. */ - if (cstate->convert_selectively) + if (opts_out->convert_selectively) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); - cstate->convert_selectively = true; + opts_out->convert_selectively = true; if (defel->arg == NULL || IsA(defel->arg, List)) - cstate->convert_select = castNode(List, defel->arg); + opts_out->convert_select = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1327,13 +503,13 @@ ProcessCopyOptions(ParseState *pstate, } else if (strcmp(defel->defname, "encoding") == 0) { - if (cstate->file_encoding >= 0) + if (opts_out->file_encoding >= 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); - cstate->file_encoding = pg_char_to_encoding(defGetString(defel)); - if (cstate->file_encoding < 0) + opts_out->file_encoding = pg_char_to_encoding(defGetString(defel)); + if (opts_out->file_encoding < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a valid encoding name", @@ -1352,47 +528,47 @@ ProcessCopyOptions(ParseState *pstate, * Check for incompatible options (must do these two before inserting * defaults) */ - if (cstate->binary && cstate->delim) + if (opts_out->binary && opts_out->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DELIMITER in BINARY mode"))); - if (cstate->binary && cstate->null_print) + if (opts_out->binary && opts_out->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); /* Set defaults for omitted options */ - if (!cstate->delim) - cstate->delim = cstate->csv_mode ? "," : "\t"; + if (!opts_out->delim) + opts_out->delim = opts_out->csv_mode ? "," : "\t"; - if (!cstate->null_print) - cstate->null_print = cstate->csv_mode ? "" : "\\N"; - cstate->null_print_len = strlen(cstate->null_print); + if (!opts_out->null_print) + opts_out->null_print = opts_out->csv_mode ? "" : "\\N"; + opts_out->null_print_len = strlen(opts_out->null_print); - if (cstate->csv_mode) + if (opts_out->csv_mode) { - if (!cstate->quote) - cstate->quote = "\""; - if (!cstate->escape) - cstate->escape = cstate->quote; + if (!opts_out->quote) + opts_out->quote = "\""; + if (!opts_out->escape) + opts_out->escape = opts_out->quote; } /* Only single-byte delimiter strings are supported. */ - if (strlen(cstate->delim) != 1) + if (strlen(opts_out->delim) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single one-byte character"))); /* Disallow end-of-line characters */ - if (strchr(cstate->delim, '\r') != NULL || - strchr(cstate->delim, '\n') != NULL) + if (strchr(opts_out->delim, '\r') != NULL || + strchr(opts_out->delim, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be newline or carriage return"))); - if (strchr(cstate->null_print, '\r') != NULL || - strchr(cstate->null_print, '\n') != NULL) + if (strchr(opts_out->null_print, '\r') != NULL || + strchr(opts_out->null_print, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY null representation cannot use newline or carriage return"))); @@ -1407,3631 +583,92 @@ ProcessCopyOptions(ParseState *pstate, * future-proofing. Likewise we disallow all digits though only octal * digits are actually dangerous. */ - if (!cstate->csv_mode && + if (!opts_out->csv_mode && strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", - cstate->delim[0]) != NULL) + opts_out->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY delimiter cannot be \"%s\"", cstate->delim))); + errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim))); /* Check header */ - if (!cstate->csv_mode && cstate->header_line) + if (!opts_out->csv_mode && opts_out->header_line) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY HEADER available only in CSV mode"))); /* Check quote */ - if (!cstate->csv_mode && cstate->quote != NULL) + if (!opts_out->csv_mode && opts_out->quote != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote available only in CSV mode"))); - if (cstate->csv_mode && strlen(cstate->quote) != 1) + if (opts_out->csv_mode && strlen(opts_out->quote) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote must be a single one-byte character"))); - if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0]) + if (opts_out->csv_mode && opts_out->delim[0] == opts_out->quote[0]) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter and quote must be different"))); /* Check escape */ - if (!cstate->csv_mode && cstate->escape != NULL) + if (!opts_out->csv_mode && opts_out->escape != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape available only in CSV mode"))); - if (cstate->csv_mode && strlen(cstate->escape) != 1) + if (opts_out->csv_mode && strlen(opts_out->escape) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape must be a single one-byte character"))); /* Check force_quote */ - if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all)) + if (!opts_out->csv_mode && (opts_out->force_quote || opts_out->force_quote_all)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote available only in CSV mode"))); - if ((cstate->force_quote || cstate->force_quote_all) && is_from) + if ((opts_out->force_quote || opts_out->force_quote_all) && is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote only available using COPY TO"))); /* Check force_notnull */ - if (!cstate->csv_mode && cstate->force_notnull != NIL) + if (!opts_out->csv_mode && opts_out->force_notnull != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null available only in CSV mode"))); - if (cstate->force_notnull != NIL && !is_from) + if (opts_out->force_notnull != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null only available using COPY FROM"))); /* Check force_null */ - if (!cstate->csv_mode && cstate->force_null != NIL) + if (!opts_out->csv_mode && opts_out->force_null != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null available only in CSV mode"))); - if (cstate->force_null != NIL && !is_from) + if (opts_out->force_null != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null only available using COPY FROM"))); /* Don't allow the delimiter to appear in the null string. */ - if (strchr(cstate->null_print, cstate->delim[0]) != NULL) + if (strchr(opts_out->null_print, opts_out->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must not appear in the NULL specification"))); /* Don't allow the CSV quote char to appear in the null string. */ - if (cstate->csv_mode && - strchr(cstate->null_print, cstate->quote[0]) != NULL) + if (opts_out->csv_mode && + strchr(opts_out->null_print, opts_out->quote[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); } /* - * Common setup routines used by BeginCopyFrom and BeginCopyTo. - * - * Iff <binary>, unload or reload in the binary format, as opposed to the - * more wasteful but more robust and portable text format. - * - * Iff <oids>, unload or reload the format that includes OID information. - * On input, we accept OIDs whether or not the table has an OID column, - * but silently drop them if it does not. On output, we report an error - * if the user asks for OIDs in a table that has none (not providing an - * OID column might seem friendlier, but could seriously confuse programs). - * - * If in the text format, delimit columns with delimiter <delim> and print - * NULL values as <null_print>. - */ -static CopyState -BeginCopy(ParseState *pstate, - bool is_from, - Relation rel, - RawStmt *raw_query, - Oid queryRelId, - List *attnamelist, - List *options) -{ - CopyState cstate; - TupleDesc tupDesc; - int num_phys_attrs; - MemoryContext oldcontext; - - /* Allocate workspace and zero all fields */ - cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); - - /* - * We allocate everything used by a cstate in a new memory context. This - * avoids memory leaks during repeated use of COPY in a query. - */ - cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, - "COPY", - ALLOCSET_DEFAULT_SIZES); - - oldcontext = MemoryContextSwitchTo(cstate->copycontext); - - /* Extract options from the statement node tree */ - ProcessCopyOptions(pstate, cstate, is_from, options); - - /* Process the source/target relation or query */ - if (rel) - { - Assert(!raw_query); - - cstate->rel = rel; - - tupDesc = RelationGetDescr(cstate->rel); - } - else - { - List *rewritten; - Query *query; - PlannedStmt *plan; - DestReceiver *dest; - - Assert(!is_from); - cstate->rel = NULL; - - /* - * Run parse analysis and rewrite. Note this also acquires sufficient - * locks on the source table(s). - * - * Because the parser and planner tend to scribble on their input, we - * make a preliminary copy of the source querytree. This prevents - * problems in the case that the COPY is in a portal or plpgsql - * function and is executed repeatedly. (See also the same hack in - * DECLARE CURSOR and PREPARE.) XXX FIXME someday. - */ - rewritten = pg_analyze_and_rewrite(copyObject(raw_query), - pstate->p_sourcetext, NULL, 0, - NULL); - - /* check that we got back something we can work with */ - if (rewritten == NIL) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("DO INSTEAD NOTHING rules are not supported for COPY"))); - } - else if (list_length(rewritten) > 1) - { - ListCell *lc; - - /* examine queries to determine which error message to issue */ - foreach(lc, rewritten) - { - Query *q = lfirst_node(Query, lc); - - if (q->querySource == QSRC_QUAL_INSTEAD_RULE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("conditional DO INSTEAD rules are not supported for COPY"))); - if (q->querySource == QSRC_NON_INSTEAD_RULE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("DO ALSO rules are not supported for the COPY"))); - } - - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("multi-statement DO INSTEAD rules are not supported for COPY"))); - } - - query = linitial_node(Query, rewritten); - - /* The grammar allows SELECT INTO, but we don't support that */ - if (query->utilityStmt != NULL && - IsA(query->utilityStmt, CreateTableAsStmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY (SELECT INTO) is not supported"))); - - Assert(query->utilityStmt == NULL); - - /* - * Similarly the grammar doesn't enforce the presence of a RETURNING - * clause, but this is required here. - */ - if (query->commandType != CMD_SELECT && - query->returningList == NIL) - { - Assert(query->commandType == CMD_INSERT || - query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE); - - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY query must have a RETURNING clause"))); - } - - /* plan the query */ - plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_OK, NULL); - - /* - * With row level security and a user using "COPY relation TO", we - * have to convert the "COPY relation TO" to a query-based COPY (eg: - * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add - * in any RLS clauses. - * - * When this happens, we are passed in the relid of the originally - * found relation (which we have locked). As the planner will look up - * the relation again, we double-check here to make sure it found the - * same one that we have locked. - */ - if (queryRelId != InvalidOid) - { - /* - * Note that with RLS involved there may be multiple relations, - * and while the one we need is almost certainly first, we don't - * make any guarantees of that in the planner, so check the whole - * list and make sure we find the original relation. - */ - if (!list_member_oid(plan->relationOids, queryRelId)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("relation referenced by COPY statement has changed"))); - } - - /* - * Use a snapshot with an updated command ID to ensure this query sees - * results of any previously executed queries. - */ - PushCopiedSnapshot(GetActiveSnapshot()); - UpdateActiveSnapshotCommandId(); - - /* Create dest receiver for COPY OUT */ - dest = CreateDestReceiver(DestCopyOut); - ((DR_copy *) dest)->cstate = cstate; - - /* Create a QueryDesc requesting no output */ - cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext, - GetActiveSnapshot(), - InvalidSnapshot, - dest, NULL, NULL, 0); - - /* - * Call ExecutorStart to prepare the plan for execution. - * - * ExecutorStart computes a result tupdesc for us - */ - ExecutorStart(cstate->queryDesc, 0); - - tupDesc = cstate->queryDesc->tupDesc; - } - - /* Generate or convert list of attributes to process */ - cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - - num_phys_attrs = tupDesc->natts; - - /* Convert FORCE_QUOTE name list to per-column flags, check validity */ - cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - if (cstate->force_quote_all) - { - int i; - - for (i = 0; i < num_phys_attrs; i++) - cstate->force_quote_flags[i] = true; - } - else if (cstate->force_quote) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->force_quote_flags[attnum - 1] = true; - } - } - - /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ - cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - if (cstate->force_notnull) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->force_notnull_flags[attnum - 1] = true; - } - } - - /* Convert FORCE_NULL name list to per-column flags, check validity */ - cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - if (cstate->force_null) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE_NULL column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->force_null_flags[attnum - 1] = true; - } - } - - /* Convert convert_selectively name list to per-column flags */ - if (cstate->convert_selectively) - { - List *attnums; - ListCell *cur; - - cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg_internal("selected column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->convert_select_flags[attnum - 1] = true; - } - } - - /* Use client encoding when ENCODING option is not specified. */ - if (cstate->file_encoding < 0) - cstate->file_encoding = pg_get_client_encoding(); - - /* - * Set up encoding conversion info. Even if the file and server encodings - * are the same, we must apply pg_any_to_server() to validate data in - * multibyte encodings. - */ - cstate->need_transcoding = - (cstate->file_encoding != GetDatabaseEncoding() || - pg_database_encoding_max_length() > 1); - /* See Multibyte encoding comment above */ - cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - - cstate->copy_dest = COPY_FILE; /* default */ - - MemoryContextSwitchTo(oldcontext); - - return cstate; -} - -/* - * Closes the pipe to an external program, checking the pclose() return code. - */ -static void -ClosePipeToProgram(CopyState cstate) -{ - int pclose_rc; - - Assert(cstate->is_program); - - pclose_rc = ClosePipeStream(cstate->copy_file); - if (pclose_rc == -1) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close pipe to external command: %m"))); - else if (pclose_rc != 0) - { - /* - * If we ended a COPY FROM PROGRAM before reaching EOF, then it's - * expectable for the called program to fail with SIGPIPE, and we - * should not report that as an error. Otherwise, SIGPIPE indicates a - * problem. - */ - if (cstate->is_copy_from && !cstate->reached_eof && - wait_result_is_signal(pclose_rc, SIGPIPE)) - return; - - ereport(ERROR, - (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), - errmsg("program \"%s\" failed", - cstate->filename), - errdetail_internal("%s", wait_result_to_str(pclose_rc)))); - } -} - -/* - * Release resources allocated in a cstate for COPY TO/FROM. - */ -static void -EndCopy(CopyState cstate) -{ - if (cstate->is_program) - { - ClosePipeToProgram(cstate); - } - else - { - if (cstate->filename != NULL && FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); - } - - MemoryContextDelete(cstate->copycontext); - pfree(cstate); -} - -/* - * Setup CopyState to read tuples from a table or a query for COPY TO. - */ -static CopyState -BeginCopyTo(ParseState *pstate, - Relation rel, - RawStmt *query, - Oid queryRelId, - const char *filename, - bool is_program, - List *attnamelist, - List *options) -{ - CopyState cstate; - bool pipe = (filename == NULL); - MemoryContext oldcontext; - - if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) - { - if (rel->rd_rel->relkind == RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from view \"%s\"", - RelationGetRelationName(rel)), - errhint("Try the COPY (SELECT ...) TO variant."))); - else if (rel->rd_rel->relkind == RELKIND_MATVIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from materialized view \"%s\"", - RelationGetRelationName(rel)), - errhint("Try the COPY (SELECT ...) TO variant."))); - else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from foreign table \"%s\"", - RelationGetRelationName(rel)), - errhint("Try the COPY (SELECT ...) TO variant."))); - else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from sequence \"%s\"", - RelationGetRelationName(rel)))); - else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from partitioned table \"%s\"", - RelationGetRelationName(rel)), - errhint("Try the COPY (SELECT ...) TO variant."))); - else - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from non-table relation \"%s\"", - RelationGetRelationName(rel)))); - } - - cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist, - options); - oldcontext = MemoryContextSwitchTo(cstate->copycontext); - - if (pipe) - { - Assert(!is_program); /* the grammar does not allow this */ - if (whereToSendOutput != DestRemote) - cstate->copy_file = stdout; - } - else - { - cstate->filename = pstrdup(filename); - cstate->is_program = is_program; - - if (is_program) - { - cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not execute command \"%s\": %m", - cstate->filename))); - } - else - { - mode_t oumask; /* Pre-existing umask value */ - struct stat st; - - /* - * Prevent write to relative path ... too easy to shoot oneself in - * the foot by overwriting a database file ... - */ - if (!is_absolute_path(filename)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("relative path not allowed for COPY to file"))); - - oumask = umask(S_IWGRP | S_IWOTH); - PG_TRY(); - { - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); - } - PG_FINALLY(); - { - umask(oumask); - } - PG_END_TRY(); - if (cstate->copy_file == NULL) - { - /* copy errno because ereport subfunctions might change it */ - int save_errno = errno; - - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - cstate->filename), - (save_errno == ENOENT || save_errno == EACCES) ? - errhint("COPY TO instructs the PostgreSQL server process to write a file. " - "You may want a client-side facility such as psql's \\copy.") : 0)); - } - - if (fstat(fileno(cstate->copy_file), &st)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - cstate->filename))); - - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); - } - } - - MemoryContextSwitchTo(oldcontext); - - return cstate; -} - -/* - * This intermediate routine exists mainly to localize the effects of setjmp - * so we don't need to plaster a lot of variables with "volatile". - */ -static uint64 -DoCopyTo(CopyState cstate) -{ - bool pipe = (cstate->filename == NULL); - bool fe_copy = (pipe && whereToSendOutput == DestRemote); - uint64 processed; - - PG_TRY(); - { - if (fe_copy) - SendCopyBegin(cstate); - - processed = CopyTo(cstate); - - if (fe_copy) - SendCopyEnd(cstate); - } - PG_CATCH(); - { - /* - * Make sure we turn off old-style COPY OUT mode upon error. It is - * okay to do this in all cases, since it does nothing if the mode is - * not on. - */ - pq_endcopyout(true); - PG_RE_THROW(); - } - PG_END_TRY(); - - return processed; -} - -/* - * Clean up storage and release resources for COPY TO. - */ -static void -EndCopyTo(CopyState cstate) -{ - if (cstate->queryDesc != NULL) - { - /* Close down the query and free resources. */ - ExecutorFinish(cstate->queryDesc); - ExecutorEnd(cstate->queryDesc); - FreeQueryDesc(cstate->queryDesc); - PopActiveSnapshot(); - } - - /* Clean up storage */ - EndCopy(cstate); -} - -/* - * Copy from relation or query TO file. - */ -static uint64 -CopyTo(CopyState cstate) -{ - TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; - uint64 processed; - - if (cstate->rel) - tupDesc = RelationGetDescr(cstate->rel); - else - tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; - cstate->null_print_client = cstate->null_print; /* default */ - - /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ - cstate->fe_msgbuf = makeStringInfo(); - - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - - /* - * Create a temporary memory context that we can reset once per row to - * recover palloc'd memory. This avoids any problems with leaks inside - * datatype output routines, and should be faster than retail pfree's - * anyway. (We don't need a whole econtext as CopyFrom does.) - */ - cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, - "COPY TO", - ALLOCSET_DEFAULT_SIZES); - - if (cstate->binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->null_print_client = pg_server_to_any(cstate->null_print, - cstate->null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - } - - CopySendEndOfRow(cstate); - } - } - - if (cstate->rel) - { - TupleTableSlot *slot; - TableScanDesc scandesc; - - scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); - slot = table_slot_create(cstate->rel, NULL); - - processed = 0; - while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot)) - { - CHECK_FOR_INTERRUPTS(); - - /* Deconstruct the tuple ... */ - slot_getallattrs(slot); - - /* Format and send the data */ - CopyOneRowTo(cstate, slot); - processed++; - } - - ExecDropSingleTupleTableSlot(slot); - table_endscan(scandesc); - } - else - { - /* run the plan --- the dest receiver will send tuples */ - ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); - processed = ((DR_copy *) cstate->queryDesc->dest)->processed; - } - - if (cstate->binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - - MemoryContextDelete(cstate->rowcontext); - - return processed; -} - -/* - * Emit one row during CopyTo(). - */ -static void -CopyOneRowTo(CopyState cstate, TupleTableSlot *slot) -{ - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; - MemoryContext oldcontext; - ListCell *cur; - char *string; - - MemoryContextReset(cstate->rowcontext); - oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - - if (cstate->binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - - /* Make sure the tuple is fully deconstructed */ - slot_getallattrs(slot); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->binary) - { - if (need_delim) - CopySendChar(cstate, cstate->delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->binary) - CopySendString(cstate, cstate->null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); - - MemoryContextSwitchTo(oldcontext); -} - - -/* - * error context callback for COPY FROM - * - * The argument for the error context must be CopyState. - */ -void -CopyFromErrorCallback(void *arg) -{ - CopyState cstate = (CopyState) arg; - char curlineno_str[32]; - - snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, - cstate->cur_lineno); - - if (cstate->binary) - { - /* can't usefully display the data */ - if (cstate->cur_attname) - errcontext("COPY %s, line %s, column %s", - cstate->cur_relname, curlineno_str, - cstate->cur_attname); - else - errcontext("COPY %s, line %s", - cstate->cur_relname, curlineno_str); - } - else - { - if (cstate->cur_attname && cstate->cur_attval) - { - /* error is relevant to a particular column */ - char *attval; - - attval = limit_printout_length(cstate->cur_attval); - errcontext("COPY %s, line %s, column %s: \"%s\"", - cstate->cur_relname, curlineno_str, - cstate->cur_attname, attval); - pfree(attval); - } - else if (cstate->cur_attname) - { - /* error is relevant to a particular column, value is NULL */ - errcontext("COPY %s, line %s, column %s: null input", - cstate->cur_relname, curlineno_str, - cstate->cur_attname); - } - else - { - /* - * Error is relevant to a particular line. - * - * If line_buf still contains the correct line, and it's already - * transcoded, print it. If it's still in a foreign encoding, it's - * quite likely that the error is precisely a failure to do - * encoding conversion (ie, bad data). We dare not try to convert - * it, and at present there's no way to regurgitate it without - * conversion. So we have to punt and just report the line number. - */ - if (cstate->line_buf_valid && - (cstate->line_buf_converted || !cstate->need_transcoding)) - { - char *lineval; - - lineval = limit_printout_length(cstate->line_buf.data); - errcontext("COPY %s, line %s: \"%s\"", - cstate->cur_relname, curlineno_str, lineval); - pfree(lineval); - } - else - { - errcontext("COPY %s, line %s", - cstate->cur_relname, curlineno_str); - } - } - } -} - -/* - * Make sure we don't print an unreasonable amount of COPY data in a message. - * - * Returns a pstrdup'd copy of the input. - */ -static char * -limit_printout_length(const char *str) -{ -#define MAX_COPY_DATA_DISPLAY 100 - - int slen = strlen(str); - int len; - char *res; - - /* Fast path if definitely okay */ - if (slen <= MAX_COPY_DATA_DISPLAY) - return pstrdup(str); - - /* Apply encoding-dependent truncation */ - len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); - - /* - * Truncate, and add "..." to show we truncated the input. - */ - res = (char *) palloc(len + 4); - memcpy(res, str, len); - strcpy(res + len, "..."); - - return res; -} - -/* - * Allocate memory and initialize a new CopyMultiInsertBuffer for this - * ResultRelInfo. - */ -static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer; - - buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); - buffer->resultRelInfo = rri; - buffer->bistate = GetBulkInsertState(); - buffer->nused = 0; - - return buffer; -} - -/* - * Make a new buffer for this ResultRelInfo. - */ -static inline void -CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer; - - buffer = CopyMultiInsertBufferInit(rri); - - /* Setup back-link so we can easily find this buffer again */ - rri->ri_CopyMultiInsertBuffer = buffer; - /* Record that we're tracking this buffer */ - miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); -} - -/* - * Initialize an already allocated CopyMultiInsertInfo. - * - * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up - * for that table. - */ -static void -CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, - CopyState cstate, EState *estate, CommandId mycid, - int ti_options) -{ - miinfo->multiInsertBuffers = NIL; - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; - miinfo->cstate = cstate; - miinfo->estate = estate; - miinfo->mycid = mycid; - miinfo->ti_options = ti_options; - - /* - * Only setup the buffer when not dealing with a partitioned table. - * Buffers for partitioned tables will just be setup when we need to send - * tuples their way for the first time. - */ - if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); -} - -/* - * Returns true if the buffers are full - */ -static inline bool -CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) -{ - if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || - miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) - return true; - return false; -} - -/* - * Returns true if we have no buffered tuples - */ -static inline bool -CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) -{ - return miinfo->bufferedTuples == 0; -} - -/* - * Write the tuples stored in 'buffer' out to the table. - */ -static inline void -CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) -{ - MemoryContext oldcontext; - int i; - uint64 save_cur_lineno; - CopyState cstate = miinfo->cstate; - EState *estate = miinfo->estate; - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; - bool line_buf_valid = cstate->line_buf_valid; - int nused = buffer->nused; - ResultRelInfo *resultRelInfo = buffer->resultRelInfo; - TupleTableSlot **slots = buffer->slots; - - /* - * Print error context information correctly, if one of the operations - * below fail. - */ - cstate->line_buf_valid = false; - save_cur_lineno = cstate->cur_lineno; - - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) - { - /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, NULL, - NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } - - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) - { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); - } - - ExecClearTuple(slots[i]); - } - - /* Mark that all slots are free */ - buffer->nused = 0; - - /* reset cur_lineno and line_buf_valid to what they were */ - cstate->line_buf_valid = line_buf_valid; - cstate->cur_lineno = save_cur_lineno; -} - -/* - * Drop used slots and free member for this buffer. - * - * The buffer must be flushed before cleanup. - */ -static inline void -CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) -{ - int i; - - /* Ensure buffer was flushed */ - Assert(buffer->nused == 0); - - /* Remove back-link to ourself */ - buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; - - FreeBulkInsertState(buffer->bistate); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); - - table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, - miinfo->ti_options); - - pfree(buffer); -} - -/* - * Write out all stored tuples in all buffers out to the tables. - * - * Once flushed we also trim the tracked buffers list down to size by removing - * the buffers created earliest first. - * - * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being - * used. When cleaning up old buffers we'll never remove the one for - * 'curr_rri'. - */ -static inline void -CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) -{ - ListCell *lc; - - foreach(lc, miinfo->multiInsertBuffers) - { - CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); - - CopyMultiInsertBufferFlush(miinfo, buffer); - } - - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; - - /* - * Trim the list of tracked buffers down if it exceeds the limit. Here we - * remove buffers starting with the ones we created first. It seems less - * likely that these older ones will be needed than the ones that were - * just created. - */ - while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) - { - CopyMultiInsertBuffer *buffer; - - buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); - - /* - * We never want to remove the buffer that's currently being used, so - * if we happen to find that then move it to the end of the list. - */ - if (buffer->resultRelInfo == curr_rri) - { - miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); - miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); - buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); - } - - CopyMultiInsertBufferCleanup(miinfo, buffer); - miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); - } -} - -/* - * Cleanup allocated buffers and free memory - */ -static inline void -CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) -{ - ListCell *lc; - - foreach(lc, miinfo->multiInsertBuffers) - CopyMultiInsertBufferCleanup(miinfo, lfirst(lc)); - - list_free(miinfo->multiInsertBuffers); -} - -/* - * Get the next TupleTableSlot that the next tuple should be stored in. - * - * Callers must ensure that the buffer is not full. - * - * Note: 'miinfo' is unused but has been included for consistency with the - * other functions in this area. - */ -static inline TupleTableSlot * -CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; - int nused = buffer->nused; - - Assert(buffer != NULL); - Assert(nused < MAX_BUFFERED_TUPLES); - - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; -} - -/* - * Record the previously reserved TupleTableSlot that was reserved by - * CopyMultiInsertInfoNextFreeSlot as being consumed. - */ -static inline void -CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, - TupleTableSlot *slot, int tuplen, uint64 lineno) -{ - CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; - - Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); - - /* Store the line number so we can properly report any errors later */ - buffer->linenos[buffer->nused] = lineno; - - /* Record this slot as being used */ - buffer->nused++; - - /* Update how many tuples are stored and their size */ - miinfo->bufferedTuples++; - miinfo->bufferedBytes += tuplen; -} - -/* - * Copy FROM file to relation. - */ -uint64 -CopyFrom(CopyState cstate) -{ - ResultRelInfo *resultRelInfo; - ResultRelInfo *target_resultRelInfo; - ResultRelInfo *prevResultRelInfo = NULL; - EState *estate = CreateExecutorState(); /* for ExecConstraints() */ - ModifyTableState *mtstate; - ExprContext *econtext; - TupleTableSlot *singleslot = NULL; - MemoryContext oldcontext = CurrentMemoryContext; - - PartitionTupleRouting *proute = NULL; - ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); - int ti_options = 0; /* start with default options for insert */ - BulkInsertState bistate = NULL; - CopyInsertMethod insertMethod; - CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; - bool has_before_insert_row_trig; - bool has_instead_insert_row_trig; - bool leafpart_use_multi_insert = false; - - Assert(cstate->rel); - Assert(list_length(cstate->range_table) == 1); - - /* - * The target must be a plain, foreign, or partitioned relation, or have - * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only - * allowed on views, so we only hint about them in the view case.) - */ - if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && - cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && - cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && - !(cstate->rel->trigdesc && - cstate->rel->trigdesc->trig_insert_instead_row)) - { - if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to view \"%s\"", - RelationGetRelationName(cstate->rel)), - errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); - else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to materialized view \"%s\"", - RelationGetRelationName(cstate->rel)))); - else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to sequence \"%s\"", - RelationGetRelationName(cstate->rel)))); - else - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to non-table relation \"%s\"", - RelationGetRelationName(cstate->rel)))); - } - - /* - * If the target file is new-in-transaction, we assume that checking FSM - * for free space is a waste of time. This could possibly be wrong, but - * it's unlikely. - */ - if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && - (cstate->rel->rd_createSubid != InvalidSubTransactionId || - cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId)) - ti_options |= TABLE_INSERT_SKIP_FSM; - - /* - * Optimize if new relfilenode was created in this subxact or one of its - * committed children and we won't see those rows later as part of an - * earlier scan or command. The subxact test ensures that if this subxact - * aborts then the frozen rows won't be visible after xact cleanup. Note - * that the stronger test of exactly which subtransaction created it is - * crucial for correctness of this optimization. The test for an earlier - * scan or command tolerates false negatives. FREEZE causes other sessions - * to see rows they would not see under MVCC, and a false negative merely - * spreads that anomaly to the current session. - */ - if (cstate->freeze) - { - /* - * We currently disallow COPY FREEZE on partitioned tables. The - * reason for this is that we've simply not yet opened the partitions - * to determine if the optimization can be applied to them. We could - * go and open them all here, but doing so may be quite a costly - * overhead for small copies. In any case, we may just end up routing - * tuples to a small number of partitions. It seems better just to - * raise an ERROR for partitioned tables. - */ - if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform COPY FREEZE on a partitioned table"))); - } - - /* - * Tolerate one registration for the benefit of FirstXactSnapshot. - * Scan-bearing queries generally create at least two registrations, - * though relying on that is fragile, as is ignoring ActiveSnapshot. - * Clear CatalogSnapshot to avoid counting its registration. We'll - * still detect ongoing catalog scans, each of which separately - * registers the snapshot it uses. - */ - InvalidateCatalogSnapshot(); - if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot perform COPY FREEZE because of prior transaction activity"))); - - if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && - cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId()) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction"))); - - ti_options |= TABLE_INSERT_FROZEN; - } - - /* - * We need a ResultRelInfo so we can use the regular executor's - * index-entry-making machinery. (There used to be a huge amount of code - * here that basically duplicated execUtils.c ...) - */ - ExecInitRangeTable(estate, cstate->range_table); - resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo); - ExecInitResultRelation(estate, resultRelInfo, 1); - - /* Verify the named relation is a valid target for INSERT */ - CheckValidResultRel(resultRelInfo, CMD_INSERT); - - ExecOpenIndices(resultRelInfo, false); - - /* - * Set up a ModifyTableState so we can let FDW(s) init themselves for - * foreign-table result relation(s). - */ - mtstate = makeNode(ModifyTableState); - mtstate->ps.plan = NULL; - mtstate->ps.state = estate; - mtstate->operation = CMD_INSERT; - mtstate->resultRelInfo = resultRelInfo; - - if (resultRelInfo->ri_FdwRoutine != NULL && - resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) - resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, - resultRelInfo); - - /* Prepare to catch AFTER triggers. */ - AfterTriggerBeginQuery(); - - /* - * If there are any triggers with transition tables on the named relation, - * we need to be prepared to capture transition tuples. - * - * Because partition tuple routing would like to know about whether - * transition capture is active, we also set it in mtstate, which is - * passed to ExecFindPartition() below. - */ - cstate->transition_capture = mtstate->mt_transition_capture = - MakeTransitionCaptureState(cstate->rel->trigdesc, - RelationGetRelid(cstate->rel), - CMD_INSERT); - - /* - * If the named relation is a partitioned table, initialize state for - * CopyFrom tuple routing. - */ - if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - proute = ExecSetupPartitionTupleRouting(estate, NULL, cstate->rel); - - if (cstate->whereClause) - cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), - &mtstate->ps); - - /* - * It's generally more efficient to prepare a bunch of tuples for - * insertion, and insert them in one table_multi_insert() call, than call - * table_tuple_insert() separately for every tuple. However, there are a - * number of reasons why we might not be able to do this. These are - * explained below. - */ - if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_before_row || - resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) - { - /* - * Can't support multi-inserts when there are any BEFORE/INSTEAD OF - * triggers on the table. Such triggers might query the table we're - * inserting into and act differently if the tuples that have already - * been processed and prepared for insertion are not there. - */ - insertMethod = CIM_SINGLE; - } - else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && - resultRelInfo->ri_TrigDesc->trig_insert_new_table) - { - /* - * For partitioned tables we can't support multi-inserts when there - * are any statement level insert triggers. It might be possible to - * allow partitioned tables with such triggers in the future, but for - * now, CopyMultiInsertInfoFlush expects that any before row insert - * and statement level insert triggers are on the same relation. - */ - insertMethod = CIM_SINGLE; - } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) - { - /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. - * - * Note: It does not matter if any partitions have any volatile - * default expressions as we use the defaults from the target of the - * COPY command. - */ - insertMethod = CIM_SINGLE; - } - else if (contain_volatile_functions(cstate->whereClause)) - { - /* - * Can't support multi-inserts if there are any volatile function - * expressions in WHERE clause. Similarly to the trigger case above, - * such expressions may query the table we're inserting into. - */ - insertMethod = CIM_SINGLE; - } - else - { - /* - * For partitioned tables, we may still be able to perform bulk - * inserts. However, the possibility of this depends on which types - * of triggers exist on the partition. We must disable bulk inserts - * if the partition is a foreign table or it has any before row insert - * or insert instead triggers (same as we checked above for the parent - * table). Since the partition's resultRelInfos are initialized only - * when we actually need to insert the first tuple into them, we must - * have the intermediate insert method of CIM_MULTI_CONDITIONAL to - * flag that we must later determine if we can use bulk-inserts for - * the partition being inserted into. - */ - if (proute) - insertMethod = CIM_MULTI_CONDITIONAL; - else - insertMethod = CIM_MULTI; - - CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, - estate, mycid, ti_options); - } - - /* - * If not using batch mode (which allocates slots as needed) set up a - * tuple slot too. When inserting into a partitioned table, we also need - * one, even if we might batch insert, to read the tuple in the root - * partition's form. - */ - if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) - { - singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, - &estate->es_tupleTable); - bistate = GetBulkInsertState(); - } - - has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && - resultRelInfo->ri_TrigDesc->trig_insert_before_row); - - has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && - resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - - /* - * Check BEFORE STATEMENT insertion triggers. It's debatable whether we - * should do this for COPY, since it's not really an "INSERT" statement as - * such. However, executing these triggers maintains consistency with the - * EACH ROW triggers that we already fire on COPY. - */ - ExecBSInsertTriggers(estate, resultRelInfo); - - econtext = GetPerTupleExprContext(estate); - - /* Set up callback to identify error line number */ - errcallback.callback = CopyFromErrorCallback; - errcallback.arg = (void *) cstate; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - for (;;) - { - TupleTableSlot *myslot; - bool skip_tuple; - - CHECK_FOR_INTERRUPTS(); - - /* - * Reset the per-tuple exprcontext. We do this after every tuple, to - * clean-up after expression evaluations etc. - */ - ResetPerTupleExprContext(estate); - - /* select slot to (initially) load row into */ - if (insertMethod == CIM_SINGLE || proute) - { - myslot = singleslot; - Assert(myslot != NULL); - } - else - { - Assert(resultRelInfo == target_resultRelInfo); - Assert(insertMethod == CIM_MULTI); - - myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, - resultRelInfo); - } - - /* - * Switch to per-tuple context before calling NextCopyFrom, which does - * evaluate default expressions etc. and requires per-tuple context. - */ - MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - - ExecClearTuple(myslot); - - /* Directly store the values/nulls array in the slot */ - if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) - break; - - ExecStoreVirtualTuple(myslot); - - /* - * Constraints and where clause might reference the tableoid column, - * so (re-)initialize tts_tableOid before evaluating them. - */ - myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); - - /* Triggers and stuff need to be invoked in query context. */ - MemoryContextSwitchTo(oldcontext); - - if (cstate->whereClause) - { - econtext->ecxt_scantuple = myslot; - /* Skip items that don't match COPY's WHERE clause */ - if (!ExecQual(cstate->qualexpr, econtext)) - continue; - } - - /* Determine the partition to insert the tuple into */ - if (proute) - { - TupleConversionMap *map; - - /* - * Attempt to find a partition suitable for this tuple. - * ExecFindPartition() will raise an error if none can be found or - * if the found partition is not suitable for INSERTs. - */ - resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, - proute, myslot, estate); - - if (prevResultRelInfo != resultRelInfo) - { - /* Determine which triggers exist on this partition */ - has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && - resultRelInfo->ri_TrigDesc->trig_insert_before_row); - - has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && - resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - - /* - * Disable multi-inserts when the partition has BEFORE/INSTEAD - * OF triggers, or if the partition is a foreign partition. - */ - leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && - !has_before_insert_row_trig && - !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; - - /* Set the multi-insert buffer to use for this partition. */ - if (leafpart_use_multi_insert) - { - if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) - CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, - resultRelInfo); - } - else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - { - /* - * Flush pending inserts if this partition can't use - * batching, so rows are visible to triggers etc. - */ - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); - } - - if (bistate != NULL) - ReleaseBulkInsertStatePin(bistate); - prevResultRelInfo = resultRelInfo; - } - - /* - * If we're capturing transition tuples, we might need to convert - * from the partition rowtype to root rowtype. But if there are no - * BEFORE triggers on the partition that could change the tuple, - * we can just remember the original unconverted tuple to avoid a - * needless round trip conversion. - */ - if (cstate->transition_capture != NULL) - cstate->transition_capture->tcs_original_insert_tuple = - !has_before_insert_row_trig ? myslot : NULL; - - /* - * We might need to convert from the root rowtype to the partition - * rowtype. - */ - map = resultRelInfo->ri_RootToPartitionMap; - if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) - { - /* non batch insert */ - if (map != NULL) - { - TupleTableSlot *new_slot; - - new_slot = resultRelInfo->ri_PartitionTupleSlot; - myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); - } - } - else - { - /* - * Prepare to queue up tuple for later batch insert into - * current partition. - */ - TupleTableSlot *batchslot; - - /* no other path available for partitioned table */ - Assert(insertMethod == CIM_MULTI_CONDITIONAL); - - batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, - resultRelInfo); - - if (map != NULL) - myslot = execute_attr_map_slot(map->attrMap, myslot, - batchslot); - else - { - /* - * This looks more expensive than it is (Believe me, I - * optimized it away. Twice.). The input is in virtual - * form, and we'll materialize the slot below - for most - * slot types the copy performs the work materialization - * would later require anyway. - */ - ExecCopySlot(batchslot, myslot); - myslot = batchslot; - } - } - - /* ensure that triggers etc see the right relation */ - myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); - } - - skip_tuple = false; - - /* BEFORE ROW INSERT Triggers */ - if (has_before_insert_row_trig) - { - if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot)) - skip_tuple = true; /* "do nothing" */ - } - - if (!skip_tuple) - { - /* - * If there is an INSTEAD OF INSERT ROW trigger, let it handle the - * tuple. Otherwise, proceed with inserting the tuple into the - * table or foreign table. - */ - if (has_instead_insert_row_trig) - { - ExecIRInsertTriggers(estate, resultRelInfo, myslot); - } - else - { - /* Compute stored generated columns */ - if (resultRelInfo->ri_RelationDesc->rd_att->constr && - resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) - ExecComputeStoredGenerated(resultRelInfo, estate, myslot, - CMD_INSERT); - - /* - * If the target is a plain table, check the constraints of - * the tuple. - */ - if (resultRelInfo->ri_FdwRoutine == NULL && - resultRelInfo->ri_RelationDesc->rd_att->constr) - ExecConstraints(resultRelInfo, myslot, estate); - - /* - * Also check the tuple against the partition constraint, if - * there is one; except that if we got here via tuple-routing, - * we don't need to if there's no BR trigger defined on the - * partition. - */ - if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition && - (proute == NULL || has_before_insert_row_trig)) - ExecPartitionCheck(resultRelInfo, myslot, estate, true); - - /* Store the slot in the multi-insert buffer, when enabled. */ - if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) - { - /* - * The slot previously might point into the per-tuple - * context. For batching it needs to be longer lived. - */ - ExecMaterializeSlot(myslot); - - /* Add this tuple to the tuple buffer */ - CopyMultiInsertInfoStore(&multiInsertInfo, - resultRelInfo, myslot, - cstate->line_buf.len, - cstate->cur_lineno); - - /* - * If enough inserts have queued up, then flush all - * buffers out to their tables. - */ - if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); - } - else - { - List *recheckIndexes = NIL; - - /* OK, store the tuple */ - if (resultRelInfo->ri_FdwRoutine != NULL) - { - myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, - resultRelInfo, - myslot, - NULL); - - if (myslot == NULL) /* "do nothing" */ - continue; /* next tuple please */ - - /* - * AFTER ROW Triggers might reference the tableoid - * column, so (re-)initialize tts_tableOid before - * evaluating them. - */ - myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); - } - else - { - /* OK, store the tuple and create index entries for it */ - table_tuple_insert(resultRelInfo->ri_RelationDesc, - myslot, mycid, ti_options, bistate); - - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - myslot, - estate, - false, - NULL, - NIL); - } - - /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, myslot, - recheckIndexes, cstate->transition_capture); - - list_free(recheckIndexes); - } - } - - /* - * We count only tuples not suppressed by a BEFORE INSERT trigger - * or FDW; this is the same definition used by nodeModifyTable.c - * for counting tuples inserted by an INSERT command. - */ - processed++; - } - } - - /* Flush any remaining buffered tuples */ - if (insertMethod != CIM_SINGLE) - { - if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); - } - - /* Done, clean up */ - error_context_stack = errcallback.previous; - - if (bistate != NULL) - FreeBulkInsertState(bistate); - - MemoryContextSwitchTo(oldcontext); - - /* - * In the old protocol, tell pqcomm that we can process normal protocol - * messages again. - */ - if (cstate->copy_dest == COPY_OLD_FE) - pq_endmsgread(); - - /* Execute AFTER STATEMENT insertion triggers */ - ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture); - - /* Handle queued AFTER triggers */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - - /* Allow the FDW to shut down */ - if (target_resultRelInfo->ri_FdwRoutine != NULL && - target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) - target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, - target_resultRelInfo); - - /* Tear down the multi-insert buffer data */ - if (insertMethod != CIM_SINGLE) - CopyMultiInsertInfoCleanup(&multiInsertInfo); - - /* Close all the partitioned tables, leaf partitions, and their indices */ - if (proute) - ExecCleanupTupleRouting(mtstate, proute); - - /* Close the result relations, including any trigger target relations */ - ExecCloseResultRelations(estate); - ExecCloseRangeTableRelations(estate); - - FreeExecutorState(estate); - - return processed; -} - -/* - * Setup to read tuples from a file for COPY FROM. - * - * 'rel': Used as a template for the tuples - * 'filename': Name of server-local file to read - * 'attnamelist': List of char *, columns to include. NIL selects all cols. - * 'options': List of DefElem. See copy_opt_item in gram.y for selections. - * - * Returns a CopyState, to be passed to NextCopyFrom and related functions. - */ -CopyState -BeginCopyFrom(ParseState *pstate, - Relation rel, - const char *filename, - bool is_program, - copy_data_source_cb data_source_cb, - List *attnamelist, - List *options) -{ - CopyState cstate; - bool pipe = (filename == NULL); - TupleDesc tupDesc; - AttrNumber num_phys_attrs, - num_defaults; - FmgrInfo *in_functions; - Oid *typioparams; - int attnum; - Oid in_func_oid; - int *defmap; - ExprState **defexprs; - MemoryContext oldcontext; - bool volatile_defexprs; - - cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); - oldcontext = MemoryContextSwitchTo(cstate->copycontext); - - /* Initialize state variables */ - cstate->reached_eof = false; - cstate->eol_type = EOL_UNKNOWN; - cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - - /* - * Set up variables to avoid per-attribute overhead. attribute_buf and - * raw_buf are used in both text and binary modes, but we use line_buf - * only in text mode. - */ - initStringInfo(&cstate->attribute_buf); - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - if (!cstate->binary) - { - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - } - - /* Assign range table, we'll need it in CopyFrom. */ - if (pstate) - cstate->range_table = pstate->p_rtable; - - tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; - num_defaults = 0; - volatile_defexprs = false; - - /* - * Pick up the required catalog information for each attribute in the - * relation, including the input function, the element type (to pass to - * the input function), and info about defaults and constraints. (Which - * input function we use depends on text/binary format choice.) - */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); - defmap = (int *) palloc(num_phys_attrs * sizeof(int)); - defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); - - for (attnum = 1; attnum <= num_phys_attrs; attnum++) - { - Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); - - /* We don't need info for dropped attributes */ - if (att->attisdropped) - continue; - - /* Fetch the input function and typioparam info */ - if (cstate->binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - - /* Get default info if needed */ - if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated) - { - /* attribute is NOT to be copied from input */ - /* use default value if one exists */ - Expr *defexpr = (Expr *) build_column_default(cstate->rel, - attnum); - - if (defexpr != NULL) - { - /* Run the expression through planner */ - defexpr = expression_planner(defexpr); - - /* Initialize executable expression in copycontext */ - defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); - defmap[num_defaults] = attnum - 1; - num_defaults++; - - /* - * If a default expression looks at the table being loaded, - * then it could give the wrong answer when using - * multi-insert. Since database access can be dynamic this is - * hard to test for exactly, so we use the much wider test of - * whether the default expression is volatile. We allow for - * the special case of when the default expression is the - * nextval() of a sequence which in this specific case is - * known to be safe for use with the multi-insert - * optimization. Hence we use this special case function - * checker rather than the standard check for - * contain_volatile_functions(). - */ - if (!volatile_defexprs) - volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); - } - } - } - - /* We keep those variables in cstate. */ - cstate->in_functions = in_functions; - cstate->typioparams = typioparams; - cstate->defmap = defmap; - cstate->defexprs = defexprs; - cstate->volatile_defexprs = volatile_defexprs; - cstate->num_defaults = num_defaults; - cstate->is_program = is_program; - - if (data_source_cb) - { - cstate->copy_dest = COPY_CALLBACK; - cstate->data_source_cb = data_source_cb; - } - else if (pipe) - { - Assert(!is_program); /* the grammar does not allow this */ - if (whereToSendOutput == DestRemote) - ReceiveCopyBegin(cstate); - else - cstate->copy_file = stdin; - } - else - { - cstate->filename = pstrdup(filename); - - if (cstate->is_program) - { - cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not execute command \"%s\": %m", - cstate->filename))); - } - else - { - struct stat st; - - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - if (cstate->copy_file == NULL) - { - /* copy errno because ereport subfunctions might change it */ - int save_errno = errno; - - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename), - (save_errno == ENOENT || save_errno == EACCES) ? - errhint("COPY FROM instructs the PostgreSQL server process to read a file. " - "You may want a client-side facility such as psql's \\copy.") : 0)); - } - - if (fstat(fileno(cstate->copy_file), &st)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - cstate->filename))); - - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); - } - } - - if (cstate->binary) - { - /* Read and verify binary header */ - char readSig[11]; - int32 tmp; - - /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || - memcmp(readSig, BinarySignature, 11) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("COPY file signature not recognized"))); - /* Flags field */ - if (!CopyGetInt32(cstate, &tmp)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (missing flags)"))); - if ((tmp & (1 << 16)) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (WITH OIDS)"))); - tmp &= ~(1 << 16); - if ((tmp >> 16) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unrecognized critical flags in COPY file header"))); - /* Header extension length */ - if (!CopyGetInt32(cstate, &tmp) || - tmp < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (missing length)"))); - /* Skip extension header, if present */ - while (tmp-- > 0) - { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (wrong length)"))); - } - } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } - - MemoryContextSwitchTo(oldcontext); - - return cstate; -} - -/* - * Read raw fields in the next line for COPY FROM in text or csv mode. - * Return false if no more lines. - * - * An internal temporary buffer is returned via 'fields'. It is valid until - * the next call of the function. Since the function returns all raw fields - * in the input file, 'nfields' could be different from the number of columns - * in the relation. - * - * NOTE: force_not_null option are not applied to the returned fields. - */ -bool -NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) -{ - int fldct; - bool done; - - /* only available for text or csv input */ - Assert(!cstate->binary); - - /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->header_line) - { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - return false; /* done */ - } - - cstate->cur_lineno++; - - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); - - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - return false; - - /* Parse the line into de-escaped field values */ - if (cstate->csv_mode) - fldct = CopyReadAttributesCSV(cstate); - else - fldct = CopyReadAttributesText(cstate); - - *fields = cstate->raw_fields; - *nfields = fldct; - return true; -} - -/* - * Read next tuple from file for COPY FROM. Return false if no more tuples. - * - * 'econtext' is used to evaluate default expression for each columns not - * read from the file. It can be NULL when no default values are used, i.e. - * when all columns are read from the file. - * - * 'values' and 'nulls' arrays must be the same length as columns of the - * relation passed to BeginCopyFrom. This function fills the arrays. - * Oid of the tuple is returned with 'tupleOid' separately. - */ -bool -NextCopyFrom(CopyState cstate, ExprContext *econtext, - Datum *values, bool *nulls) -{ - TupleDesc tupDesc; - AttrNumber num_phys_attrs, - attr_count, - num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; - int i; - int *defmap = cstate->defmap; - ExprState **defexprs = cstate->defexprs; - - tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); - - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - - if (!cstate->binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->csv_mode) - { - if (string == NULL && - cstate->force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->null_print; - } - else if (string != NULL && cstate->force_null_flags[m] - && strcmp(string, cstate->null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - att->atttypmod); - if (string != NULL) - nulls[m] = false; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; - - if (cstate->copy_dest != COPY_OLD_FE && - CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } - - /* - * Now compute and insert any defaults available for the columns not - * provided by the input data. Anything not processed here or above will - * remain NULL. - */ - for (i = 0; i < num_defaults; i++) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, - &nulls[defmap[i]]); - } - - return true; -} - -/* - * Clean up storage and release resources for COPY FROM. - */ -void -EndCopyFrom(CopyState cstate) -{ - /* No COPY FROM related resources except memory. */ - - EndCopy(cstate); -} - -/* - * Read the next input line and stash it in line_buf, with conversion to - * server encoding. - * - * Result is true if read was terminated by EOF, false if terminated - * by newline. The terminating newline or EOF marker is not included - * in the final value of line_buf. - */ -static bool -CopyReadLine(CopyState cstate) -{ - bool result; - - resetStringInfo(&cstate->line_buf); - cstate->line_buf_valid = true; - - /* Mark that encoding conversion hasn't occurred yet */ - cstate->line_buf_converted = false; - - /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); - - if (result) - { - /* - * Reached EOF. In protocol version 3, we should ignore anything - * after \. up to the protocol end of copy data. (XXX maybe better - * not to treat \. as special?) - */ - if (cstate->copy_dest == COPY_NEW_FE) - { - do - { - cstate->raw_buf_index = cstate->raw_buf_len; - } while (CopyLoadRawBuf(cstate)); - } - } - else - { - /* - * If we didn't hit EOF, then we must have transferred the EOL marker - * to line_buf along with the data. Get rid of it. - */ - switch (cstate->eol_type) - { - case EOL_NL: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CR: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CRNL: - Assert(cstate->line_buf.len >= 2); - Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len -= 2; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_UNKNOWN: - /* shouldn't get here */ - Assert(false); - break; - } - } - - /* Done reading the line. Convert it to server encoding. */ - if (cstate->need_transcoding) - { - char *cvt; - - cvt = pg_any_to_server(cstate->line_buf.data, - cstate->line_buf.len, - cstate->file_encoding); - if (cvt != cstate->line_buf.data) - { - /* transfer converted data back to line_buf */ - resetStringInfo(&cstate->line_buf); - appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); - pfree(cvt); - } - } - - /* Now it's safe to use the buffer in error messages */ - cstate->line_buf_converted = true; - - return result; -} - -/* - * CopyReadLineText - inner loop of CopyReadLine for text mode - */ -static bool -CopyReadLineText(CopyState cstate) -{ - char *copy_raw_buf; - int raw_buf_ptr; - int copy_buf_len; - bool need_data = false; - bool hit_eof = false; - bool result = false; - char mblen_str[2]; - - /* CSV variables */ - bool first_char_in_line = true; - bool in_quote = false, - last_was_esc = false; - char quotec = '\0'; - char escapec = '\0'; - - if (cstate->csv_mode) - { - quotec = cstate->quote[0]; - escapec = cstate->escape[0]; - /* ignore special escape processing if it's the same as quotec */ - if (quotec == escapec) - escapec = '\0'; - } - - mblen_str[1] = '\0'; - - /* - * The objective of this loop is to transfer the entire next input line - * into line_buf. Hence, we only care for detecting newlines (\r and/or - * \n) and the end-of-copy marker (\.). - * - * In CSV mode, \r and \n inside a quoted field are just part of the data - * value and are put in line_buf. We keep just enough state to know if we - * are currently in a quoted field or not. - * - * These four characters, and the CSV escape and quote characters, are - * assumed the same in frontend and backend encodings. - * - * For speed, we try to move data from raw_buf to line_buf in chunks - * rather than one character at a time. raw_buf_ptr points to the next - * character to examine; any characters from raw_buf_index to raw_buf_ptr - * have been determined to be part of the line, but not yet transferred to - * line_buf. - * - * For a little extra speed within the loop, we copy raw_buf and - * raw_buf_len into local variables. - */ - copy_raw_buf = cstate->raw_buf; - raw_buf_ptr = cstate->raw_buf_index; - copy_buf_len = cstate->raw_buf_len; - - for (;;) - { - int prev_raw_ptr; - char c; - - /* - * Load more data if needed. Ideally we would just force four bytes - * of read-ahead and avoid the many calls to - * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol - * does not allow us to read too far ahead or we might read into the - * next data, so we read-ahead only as far we know we can. One - * optimization would be to read-ahead four byte here if - * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it, - * considering the size of the buffer. - */ - if (raw_buf_ptr >= copy_buf_len || need_data) - { - REFILL_LINEBUF; - - /* - * Try to read some more data. This will certainly reset - * raw_buf_index to zero, and raw_buf_ptr must go with it. - */ - if (!CopyLoadRawBuf(cstate)) - hit_eof = true; - raw_buf_ptr = 0; - copy_buf_len = cstate->raw_buf_len; - - /* - * If we are completely out of data, break out of the loop, - * reporting EOF. - */ - if (copy_buf_len <= 0) - { - result = true; - break; - } - need_data = false; - } - - /* OK to fetch a character */ - prev_raw_ptr = raw_buf_ptr; - c = copy_raw_buf[raw_buf_ptr++]; - - if (cstate->csv_mode) - { - /* - * If character is '\\' or '\r', we may need to look ahead below. - * Force fetch of the next character if we don't already have it. - * We need to do this before changing CSV state, in case one of - * these characters is also the quote or escape character. - * - * Note: old-protocol does not like forced prefetch, but it's OK - * here since we cannot validly be at EOF. - */ - if (c == '\\' || c == '\r') - { - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - } - - /* - * Dealing with quotes and escapes here is mildly tricky. If the - * quote char is also the escape char, there's no problem - we - * just use the char as a toggle. If they are different, we need - * to ensure that we only take account of an escape inside a - * quoted field and immediately preceding a quote char, and not - * the second in an escape-escape sequence. - */ - if (in_quote && c == escapec) - last_was_esc = !last_was_esc; - if (c == quotec && !last_was_esc) - in_quote = !in_quote; - if (c != escapec) - last_was_esc = false; - - /* - * Updating the line count for embedded CR and/or LF chars is - * necessarily a little fragile - this test is probably about the - * best we can do. (XXX it's arguable whether we should do this - * at all --- is cur_lineno a physical or logical count?) - */ - if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) - cstate->cur_lineno++; - } - - /* Process \r */ - if (c == '\r' && (!cstate->csv_mode || !in_quote)) - { - /* Check for \r\n on first line, _and_ handle \r\n. */ - if (cstate->eol_type == EOL_UNKNOWN || - cstate->eol_type == EOL_CRNL) - { - /* - * If need more data, go back to loop top to load it. - * - * Note that if we are at EOF, c will wind up as '\0' because - * of the guaranteed pad of raw_buf. - */ - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - - /* get next char */ - c = copy_raw_buf[raw_buf_ptr]; - - if (c == '\n') - { - raw_buf_ptr++; /* eat newline */ - cstate->eol_type = EOL_CRNL; /* in case not set yet */ - } - else - { - /* found \r, but no \n */ - if (cstate->eol_type == EOL_CRNL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->csv_mode ? - errmsg("literal carriage return found in data") : - errmsg("unquoted carriage return found in data"), - !cstate->csv_mode ? - errhint("Use \"\\r\" to represent carriage return.") : - errhint("Use quoted CSV field to represent carriage return."))); - - /* - * if we got here, it is the first line and we didn't find - * \n, so don't consume the peeked character - */ - cstate->eol_type = EOL_CR; - } - } - else if (cstate->eol_type == EOL_NL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->csv_mode ? - errmsg("literal carriage return found in data") : - errmsg("unquoted carriage return found in data"), - !cstate->csv_mode ? - errhint("Use \"\\r\" to represent carriage return.") : - errhint("Use quoted CSV field to represent carriage return."))); - /* If reach here, we have found the line terminator */ - break; - } - - /* Process \n */ - if (c == '\n' && (!cstate->csv_mode || !in_quote)) - { - if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->csv_mode ? - errmsg("literal newline found in data") : - errmsg("unquoted newline found in data"), - !cstate->csv_mode ? - errhint("Use \"\\n\" to represent newline.") : - errhint("Use quoted CSV field to represent newline."))); - cstate->eol_type = EOL_NL; /* in case not set yet */ - /* If reach here, we have found the line terminator */ - break; - } - - /* - * In CSV mode, we only recognize \. alone on a line. This is because - * \. is a valid CSV data value. - */ - if (c == '\\' && (!cstate->csv_mode || first_char_in_line)) - { - char c2; - - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - IF_NEED_REFILL_AND_EOF_BREAK(0); - - /* ----- - * get next character - * Note: we do not change c so if it isn't \., we can fall - * through and continue processing for file encoding. - * ----- - */ - c2 = copy_raw_buf[raw_buf_ptr]; - - if (c2 == '.') - { - raw_buf_ptr++; /* consume the '.' */ - - /* - * Note: if we loop back for more data here, it does not - * matter that the CSV state change checks are re-executed; we - * will come back here with no important state changed. - */ - if (cstate->eol_type == EOL_CRNL) - { - /* Get the next character */ - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - /* if hit_eof, c2 will become '\0' */ - c2 = copy_raw_buf[raw_buf_ptr++]; - - if (c2 == '\n') - { - if (!cstate->csv_mode) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker does not match previous newline style"))); - else - NO_END_OF_COPY_GOTO; - } - else if (c2 != '\r') - { - if (!cstate->csv_mode) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker corrupt"))); - else - NO_END_OF_COPY_GOTO; - } - } - - /* Get the next character */ - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - /* if hit_eof, c2 will become '\0' */ - c2 = copy_raw_buf[raw_buf_ptr++]; - - if (c2 != '\r' && c2 != '\n') - { - if (!cstate->csv_mode) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker corrupt"))); - else - NO_END_OF_COPY_GOTO; - } - - if ((cstate->eol_type == EOL_NL && c2 != '\n') || - (cstate->eol_type == EOL_CRNL && c2 != '\n') || - (cstate->eol_type == EOL_CR && c2 != '\r')) - { - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker does not match previous newline style"))); - } - - /* - * Transfer only the data before the \. into line_buf, then - * discard the data and the \. sequence. - */ - if (prev_raw_ptr > cstate->raw_buf_index) - appendBinaryStringInfo(&cstate->line_buf, - cstate->raw_buf + cstate->raw_buf_index, - prev_raw_ptr - cstate->raw_buf_index); - cstate->raw_buf_index = raw_buf_ptr; - result = true; /* report EOF */ - break; - } - else if (!cstate->csv_mode) - - /* - * If we are here, it means we found a backslash followed by - * something other than a period. In non-CSV mode, anything - * after a backslash is special, so we skip over that second - * character too. If we didn't do that \\. would be - * considered an eof-of copy, while in non-CSV mode it is a - * literal backslash followed by a period. In CSV mode, - * backslashes are not special, so we want to process the - * character after the backslash just like a normal character, - * so we don't increment in those cases. - */ - raw_buf_ptr++; - } - - /* - * This label is for CSV cases where \. appears at the start of a - * line, but there is more text after it, meaning it was a data value. - * We are more strict for \. in CSV mode because \. could be a data - * value, while in non-CSV mode, \. cannot be a data value. - */ -not_end_of_copy: - - /* - * Process all bytes of a multi-byte character as a group. - * - * We only support multi-byte sequences where the first byte has the - * high-bit set, so as an optimization we can avoid this block - * entirely if it is not set. - */ - if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c)) - { - int mblen; - - /* - * It is enough to look at the first byte in all our encodings, to - * get the length. (GB18030 is a bit special, but still works for - * our purposes; see comment in pg_gb18030_mblen()) - */ - mblen_str[0] = c; - mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str); - - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1); - IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); - raw_buf_ptr += mblen - 1; - } - first_char_in_line = false; - } /* end of outer loop */ - - /* - * Transfer any still-uncopied data to line_buf. - */ - REFILL_LINEBUF; - - return result; -} - -/* - * Return decimal value for a hexadecimal digit - */ -static int -GetDecimalFromHex(char hex) -{ - if (isdigit((unsigned char) hex)) - return hex - '0'; - else - return tolower((unsigned char) hex) - 'a' + 10; -} - -/* - * Parse the current line into separate attributes (fields), - * performing de-escaping as needed. - * - * The input is in line_buf. We use attribute_buf to hold the result - * strings. cstate->raw_fields[k] is set to point to the k'th attribute - * string, or NULL when the input matches the null marker string. - * This array is expanded as necessary. - * - * (Note that the caller cannot check for nulls since the returned - * string would be the post-de-escaping equivalent, which may look - * the same as some valid data string.) - * - * delim is the column delimiter string (must be just one byte for now). - * null_print is the null marker string. Note that this is compared to - * the pre-de-escaped input string. - * - * The return value is the number of fields actually read. - */ -static int -CopyReadAttributesText(CopyState cstate) -{ - char delimc = cstate->delim[0]; - int fieldno; - char *output_ptr; - char *cur_ptr; - char *line_end_ptr; - - /* - * We need a special case for zero-column tables: check that the input - * line is empty, and return. - */ - if (cstate->max_fields <= 0) - { - if (cstate->line_buf.len != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - return 0; - } - - resetStringInfo(&cstate->attribute_buf); - - /* - * The de-escaped attributes will certainly not be longer than the input - * data line, so we can just force attribute_buf to be large enough and - * then transfer data without any checks for enough space. We need to do - * it this way because enlarging attribute_buf mid-stream would invalidate - * pointers already stored into cstate->raw_fields[]. - */ - if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) - enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); - output_ptr = cstate->attribute_buf.data; - - /* set pointer variables for loop */ - cur_ptr = cstate->line_buf.data; - line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; - - /* Outer loop iterates over fields */ - fieldno = 0; - for (;;) - { - bool found_delim = false; - char *start_ptr; - char *end_ptr; - int input_len; - bool saw_non_ascii = false; - - /* Make sure there is enough space for the next value */ - if (fieldno >= cstate->max_fields) - { - cstate->max_fields *= 2; - cstate->raw_fields = - repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); - } - - /* Remember start of field on both input and output sides */ - start_ptr = cur_ptr; - cstate->raw_fields[fieldno] = output_ptr; - - /* - * Scan data for field. - * - * Note that in this loop, we are scanning to locate the end of field - * and also speculatively performing de-escaping. Once we find the - * end-of-field, we can match the raw field contents against the null - * marker string. Only after that comparison fails do we know that - * de-escaping is actually the right thing to do; therefore we *must - * not* throw any syntax errors before we've done the null-marker - * check. - */ - for (;;) - { - char c; - - end_ptr = cur_ptr; - if (cur_ptr >= line_end_ptr) - break; - c = *cur_ptr++; - if (c == delimc) - { - found_delim = true; - break; - } - if (c == '\\') - { - if (cur_ptr >= line_end_ptr) - break; - c = *cur_ptr++; - switch (c) - { - case '0': - case '1': - case '2': - case '3': - case '4': - case '5': - case '6': - case '7': - { - /* handle \013 */ - int val; - - val = OCTVALUE(c); - if (cur_ptr < line_end_ptr) - { - c = *cur_ptr; - if (ISOCTAL(c)) - { - cur_ptr++; - val = (val << 3) + OCTVALUE(c); - if (cur_ptr < line_end_ptr) - { - c = *cur_ptr; - if (ISOCTAL(c)) - { - cur_ptr++; - val = (val << 3) + OCTVALUE(c); - } - } - } - } - c = val & 0377; - if (c == '\0' || IS_HIGHBIT_SET(c)) - saw_non_ascii = true; - } - break; - case 'x': - /* Handle \x3F */ - if (cur_ptr < line_end_ptr) - { - char hexchar = *cur_ptr; - - if (isxdigit((unsigned char) hexchar)) - { - int val = GetDecimalFromHex(hexchar); - - cur_ptr++; - if (cur_ptr < line_end_ptr) - { - hexchar = *cur_ptr; - if (isxdigit((unsigned char) hexchar)) - { - cur_ptr++; - val = (val << 4) + GetDecimalFromHex(hexchar); - } - } - c = val & 0xff; - if (c == '\0' || IS_HIGHBIT_SET(c)) - saw_non_ascii = true; - } - } - break; - case 'b': - c = '\b'; - break; - case 'f': - c = '\f'; - break; - case 'n': - c = '\n'; - break; - case 'r': - c = '\r'; - break; - case 't': - c = '\t'; - break; - case 'v': - c = '\v'; - break; - - /* - * in all other cases, take the char after '\' - * literally - */ - } - } - - /* Add c to output string */ - *output_ptr++ = c; - } - - /* Check whether raw input matched null marker */ - input_len = end_ptr - start_ptr; - if (input_len == cstate->null_print_len && - strncmp(start_ptr, cstate->null_print, input_len) == 0) - cstate->raw_fields[fieldno] = NULL; - else - { - /* - * At this point we know the field is supposed to contain data. - * - * If we de-escaped any non-7-bit-ASCII chars, make sure the - * resulting string is valid data for the db encoding. - */ - if (saw_non_ascii) - { - char *fld = cstate->raw_fields[fieldno]; - - pg_verifymbstr(fld, output_ptr - fld, false); - } - } - - /* Terminate attribute value in output area */ - *output_ptr++ = '\0'; - - fieldno++; - /* Done if we hit EOL instead of a delim */ - if (!found_delim) - break; - } - - /* Clean up state of attribute_buf */ - output_ptr--; - Assert(*output_ptr == '\0'); - cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); - - return fieldno; -} - -/* - * Parse the current line into separate attributes (fields), - * performing de-escaping as needed. This has exactly the same API as - * CopyReadAttributesText, except we parse the fields according to - * "standard" (i.e. common) CSV usage. - */ -static int -CopyReadAttributesCSV(CopyState cstate) -{ - char delimc = cstate->delim[0]; - char quotec = cstate->quote[0]; - char escapec = cstate->escape[0]; - int fieldno; - char *output_ptr; - char *cur_ptr; - char *line_end_ptr; - - /* - * We need a special case for zero-column tables: check that the input - * line is empty, and return. - */ - if (cstate->max_fields <= 0) - { - if (cstate->line_buf.len != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - return 0; - } - - resetStringInfo(&cstate->attribute_buf); - - /* - * The de-escaped attributes will certainly not be longer than the input - * data line, so we can just force attribute_buf to be large enough and - * then transfer data without any checks for enough space. We need to do - * it this way because enlarging attribute_buf mid-stream would invalidate - * pointers already stored into cstate->raw_fields[]. - */ - if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) - enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); - output_ptr = cstate->attribute_buf.data; - - /* set pointer variables for loop */ - cur_ptr = cstate->line_buf.data; - line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; - - /* Outer loop iterates over fields */ - fieldno = 0; - for (;;) - { - bool found_delim = false; - bool saw_quote = false; - char *start_ptr; - char *end_ptr; - int input_len; - - /* Make sure there is enough space for the next value */ - if (fieldno >= cstate->max_fields) - { - cstate->max_fields *= 2; - cstate->raw_fields = - repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); - } - - /* Remember start of field on both input and output sides */ - start_ptr = cur_ptr; - cstate->raw_fields[fieldno] = output_ptr; - - /* - * Scan data for field, - * - * The loop starts in "not quote" mode and then toggles between that - * and "in quote" mode. The loop exits normally if it is in "not - * quote" mode and a delimiter or line end is seen. - */ - for (;;) - { - char c; - - /* Not in quote */ - for (;;) - { - end_ptr = cur_ptr; - if (cur_ptr >= line_end_ptr) - goto endfield; - c = *cur_ptr++; - /* unquoted field delimiter */ - if (c == delimc) - { - found_delim = true; - goto endfield; - } - /* start of quoted field (or part of field) */ - if (c == quotec) - { - saw_quote = true; - break; - } - /* Add c to output string */ - *output_ptr++ = c; - } - - /* In quote */ - for (;;) - { - end_ptr = cur_ptr; - if (cur_ptr >= line_end_ptr) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unterminated CSV quoted field"))); - - c = *cur_ptr++; - - /* escape within a quoted field */ - if (c == escapec) - { - /* - * peek at the next char if available, and escape it if it - * is an escape char or a quote char - */ - if (cur_ptr < line_end_ptr) - { - char nextc = *cur_ptr; - - if (nextc == escapec || nextc == quotec) - { - *output_ptr++ = nextc; - cur_ptr++; - continue; - } - } - } - - /* - * end of quoted field. Must do this test after testing for - * escape in case quote char and escape char are the same - * (which is the common case). - */ - if (c == quotec) - break; - - /* Add c to output string */ - *output_ptr++ = c; - } - } -endfield: - - /* Terminate attribute value in output area */ - *output_ptr++ = '\0'; - - /* Check whether raw input matched null marker */ - input_len = end_ptr - start_ptr; - if (!saw_quote && input_len == cstate->null_print_len && - strncmp(start_ptr, cstate->null_print, input_len) == 0) - cstate->raw_fields[fieldno] = NULL; - - fieldno++; - /* Done if we hit EOL instead of a delim */ - if (!found_delim) - break; - } - - /* Clean up state of attribute_buf */ - output_ptr--; - Assert(*output_ptr == '\0'); - cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); - - return fieldno; -} - - -/* - * Read a binary attribute - */ -static Datum -CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, - Oid typioparam, int32 typmod, - bool *isnull) -{ - int32 fld_size; - Datum result; - - if (!CopyGetInt32(cstate, &fld_size)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); - if (fld_size == -1) - { - *isnull = true; - return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); - } - if (fld_size < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid field size"))); - - /* reset attribute_buf to empty, and load raw data in it */ - resetStringInfo(&cstate->attribute_buf); - - enlargeStringInfo(&cstate->attribute_buf, fld_size); - if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, - fld_size) != fld_size) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); - - cstate->attribute_buf.len = fld_size; - cstate->attribute_buf.data[fld_size] = '\0'; - - /* Call the column type's binary input converter */ - result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, - typioparam, typmod); - - /* Trouble if it didn't eat the whole buffer */ - if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) - ereport(ERROR, - (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), - errmsg("incorrect binary data format"))); - - *isnull = false; - return result; -} - -/* - * Send text representation of one attribute, with conversion and escaping - */ -#define DUMPSOFAR() \ - do { \ - if (ptr > start) \ - CopySendData(cstate, start, ptr - start); \ - } while (0) - -static void -CopyAttributeOutText(CopyState cstate, char *string) -{ - char *ptr; - char *start; - char c; - char delimc = cstate->delim[0]; - - if (cstate->need_transcoding) - ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding); - else - ptr = string; - - /* - * We have to grovel through the string searching for control characters - * and instances of the delimiter character. In most cases, though, these - * are infrequent. To avoid overhead from calling CopySendData once per - * character, we dump out all characters between escaped characters in a - * single call. The loop invariant is that the data from "start" to "ptr" - * can be sent literally, but hasn't yet been. - * - * We can skip pg_encoding_mblen() overhead when encoding is safe, because - * in valid backend encodings, extra bytes of a multibyte character never - * look like ASCII. This loop is sufficiently performance-critical that - * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out - * of the normal safe-encoding path. - */ - if (cstate->encoding_embeds_ascii) - { - start = ptr; - while ((c = *ptr) != '\0') - { - if ((unsigned char) c < (unsigned char) 0x20) - { - /* - * \r and \n must be escaped, the others are traditional. We - * prefer to dump these using the C-like notation, rather than - * a backslash and the literal character, because it makes the - * dump file a bit more proof against Microsoftish data - * mangling. - */ - switch (c) - { - case '\b': - c = 'b'; - break; - case '\f': - c = 'f'; - break; - case '\n': - c = 'n'; - break; - case '\r': - c = 'r'; - break; - case '\t': - c = 't'; - break; - case '\v': - c = 'v'; - break; - default: - /* If it's the delimiter, must backslash it */ - if (c == delimc) - break; - /* All ASCII control chars are length 1 */ - ptr++; - continue; /* fall to end of loop */ - } - /* if we get here, we need to convert the control char */ - DUMPSOFAR(); - CopySendChar(cstate, '\\'); - CopySendChar(cstate, c); - start = ++ptr; /* do not include char in next run */ - } - else if (c == '\\' || c == delimc) - { - DUMPSOFAR(); - CopySendChar(cstate, '\\'); - start = ptr++; /* we include char in next run */ - } - else if (IS_HIGHBIT_SET(c)) - ptr += pg_encoding_mblen(cstate->file_encoding, ptr); - else - ptr++; - } - } - else - { - start = ptr; - while ((c = *ptr) != '\0') - { - if ((unsigned char) c < (unsigned char) 0x20) - { - /* - * \r and \n must be escaped, the others are traditional. We - * prefer to dump these using the C-like notation, rather than - * a backslash and the literal character, because it makes the - * dump file a bit more proof against Microsoftish data - * mangling. - */ - switch (c) - { - case '\b': - c = 'b'; - break; - case '\f': - c = 'f'; - break; - case '\n': - c = 'n'; - break; - case '\r': - c = 'r'; - break; - case '\t': - c = 't'; - break; - case '\v': - c = 'v'; - break; - default: - /* If it's the delimiter, must backslash it */ - if (c == delimc) - break; - /* All ASCII control chars are length 1 */ - ptr++; - continue; /* fall to end of loop */ - } - /* if we get here, we need to convert the control char */ - DUMPSOFAR(); - CopySendChar(cstate, '\\'); - CopySendChar(cstate, c); - start = ++ptr; /* do not include char in next run */ - } - else if (c == '\\' || c == delimc) - { - DUMPSOFAR(); - CopySendChar(cstate, '\\'); - start = ptr++; /* we include char in next run */ - } - else - ptr++; - } - } - - DUMPSOFAR(); -} - -/* - * Send text representation of one attribute, with conversion and - * CSV-style escaping - */ -static void -CopyAttributeOutCSV(CopyState cstate, char *string, - bool use_quote, bool single_attr) -{ - char *ptr; - char *start; - char c; - char delimc = cstate->delim[0]; - char quotec = cstate->quote[0]; - char escapec = cstate->escape[0]; - - /* force quoting if it matches null_print (before conversion!) */ - if (!use_quote && strcmp(string, cstate->null_print) == 0) - use_quote = true; - - if (cstate->need_transcoding) - ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding); - else - ptr = string; - - /* - * Make a preliminary pass to discover if it needs quoting - */ - if (!use_quote) - { - /* - * Because '\.' can be a data value, quote it if it appears alone on a - * line so it is not interpreted as the end-of-data marker. - */ - if (single_attr && strcmp(ptr, "\\.") == 0) - use_quote = true; - else - { - char *tptr = ptr; - - while ((c = *tptr) != '\0') - { - if (c == delimc || c == quotec || c == '\n' || c == '\r') - { - use_quote = true; - break; - } - if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) - tptr += pg_encoding_mblen(cstate->file_encoding, tptr); - else - tptr++; - } - } - } - - if (use_quote) - { - CopySendChar(cstate, quotec); - - /* - * We adopt the same optimization strategy as in CopyAttributeOutText - */ - start = ptr; - while ((c = *ptr) != '\0') - { - if (c == quotec || c == escapec) - { - DUMPSOFAR(); - CopySendChar(cstate, escapec); - start = ptr; /* we include char in next run */ - } - if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) - ptr += pg_encoding_mblen(cstate->file_encoding, ptr); - else - ptr++; - } - DUMPSOFAR(); - - CopySendChar(cstate, quotec); - } - else - { - /* If it doesn't need quoting, we can just dump it as-is */ - CopySendString(cstate, ptr); - } -} - -/* * CopyGetAttnums - build an integer list of attnums to be copied * * The input attnamelist is either the user-specified column list, @@ -5045,7 +682,7 @@ CopyAttributeOutCSV(CopyState cstate, char *string, * * rel can be NULL ... it's only used for error reports. */ -static List * +List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) { List *attnums = NIL; @@ -5121,67 +758,3 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) return attnums; } - - -/* - * copy_dest_startup --- executor startup - */ -static void -copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) -{ - /* no-op */ -} - -/* - * copy_dest_receive --- receive one tuple - */ -static bool -copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) -{ - DR_copy *myState = (DR_copy *) self; - CopyState cstate = myState->cstate; - - /* Send the data */ - CopyOneRowTo(cstate, slot); - myState->processed++; - - return true; -} - -/* - * copy_dest_shutdown --- executor end - */ -static void -copy_dest_shutdown(DestReceiver *self) -{ - /* no-op */ -} - -/* - * copy_dest_destroy --- release DestReceiver object - */ -static void -copy_dest_destroy(DestReceiver *self) -{ - pfree(self); -} - -/* - * CreateCopyDestReceiver -- create a suitable DestReceiver object - */ -DestReceiver * -CreateCopyDestReceiver(void) -{ - DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy)); - - self->pub.receiveSlot = copy_dest_receive; - self->pub.rStartup = copy_dest_startup; - self->pub.rShutdown = copy_dest_shutdown; - self->pub.rDestroy = copy_dest_destroy; - self->pub.mydest = DestCopyOut; - - self->cstate = NULL; /* will be set later */ - self->processed = 0; - - return (DestReceiver *) self; -} |