diff options
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 205 |
1 files changed, 150 insertions, 55 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 523c1e03315..c651ea30280 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -58,7 +58,7 @@ */ typedef enum CopyDest { - COPY_FILE, /* to/from file */ + COPY_FILE, /* to/from file (or a piped program) */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE /* to/from frontend (3.0 protocol) */ } CopyDest; @@ -108,6 +108,7 @@ typedef struct CopyStateData QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool freeze; /* freeze rows on loading? */ @@ -277,8 +278,10 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query, const char *queryString, List *attnamelist, List *options); static void EndCopy(CopyState cstate); +static void ClosePipeToProgram(CopyState cstate); static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString, - const char *filename, List *attnamelist, List *options); + const char *filename, bool is_program, List *attnamelist, + List *options); static void EndCopyTo(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); @@ -482,9 +485,35 @@ CopySendEndOfRow(CopyState cstate) if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to COPY file: %m"))); + { + if (cstate->is_program) + { + if (errno == EPIPE) + { + /* + * The pipe will be closed automatically on error at + * the end of transaction, but we might get a better + * error message from the subprocess' exit code than + * just "Broken Pipe" + */ + ClosePipeToProgram(cstate); + + /* + * If ClosePipeToProgram() didn't throw an error, + * the program terminated normally, but closed the + * pipe first. Restore errno, and throw an error. + */ + errno = EPIPE; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to COPY program: %m"))); + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to COPY file: %m"))); + } break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ @@ -752,13 +781,22 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed) Relation rel; Oid relid; - /* Disallow file COPY except to superusers. */ + /* Disallow COPY to/from file or program except to superusers. */ if (!pipe && !superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to COPY to or from a file"), - errhint("Anyone can COPY to stdout or from stdin. " - "psql's \\copy command also works for anyone."))); + { + if (stmt->is_program) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from an external program"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + else + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from a file"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + } if (stmt->relation) { @@ -812,14 +850,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed) if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); - cstate = BeginCopyFrom(rel, stmt->filename, + cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } else { - cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename, + cstate = BeginCopyTo(rel, stmt->query, queryString, + stmt->filename, stmt->is_program, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); @@ -1390,16 +1429,44 @@ BeginCopy(bool is_from, } /* + * Closes the pipe to an external program, checking the pclose() return code. + */ +static void +ClosePipeToProgram(CopyState cstate) +{ + int pclose_rc; + + Assert(cstate->is_program); + + pclose_rc = ClosePipeStream(cstate->copy_file); + if (pclose_rc == -1) + ereport(ERROR, + (errmsg("could not close pipe to external command: %m"))); + else if (pclose_rc != 0) + ereport(ERROR, + (errmsg("program \"%s\" failed", + cstate->filename), + errdetail_internal("%s", wait_result_to_str(pclose_rc)))); +} + +/* * Release resources allocated in a cstate for COPY TO/FROM. */ static void EndCopy(CopyState cstate) { - if (cstate->filename != NULL && FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); + if (cstate->is_program) + { + ClosePipeToProgram(cstate); + } + else + { + if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->filename))); + } MemoryContextDelete(cstate->copycontext); pfree(cstate); @@ -1413,6 +1480,7 @@ BeginCopyTo(Relation rel, Node *query, const char *queryString, const char *filename, + bool is_program, List *attnamelist, List *options) { @@ -1451,39 +1519,52 @@ BeginCopyTo(Relation rel, if (pipe) { + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } else { - mode_t oumask; /* Pre-existing umask value */ - struct stat st; + cstate->filename = pstrdup(filename); + cstate->is_program = is_program; - /* - * Prevent write to relative path ... too easy to shoot oneself in the - * foot by overwriting a database file ... - */ - if (!is_absolute_path(filename)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("relative path not allowed for COPY to file"))); + if (is_program) + { + cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errmsg("could not execute command \"%s\": %m", + cstate->filename))); + } + else + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; - cstate->filename = pstrdup(filename); - oumask = umask(S_IWGRP | S_IWOTH); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); - umask(oumask); + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for COPY to file"))); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - cstate->filename))); + oumask = umask(S_IWGRP | S_IWOTH); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); + umask(oumask); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->filename))); - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); + fstat(fileno(cstate->copy_file), &st); + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } } MemoryContextSwitchTo(oldcontext); @@ -2317,6 +2398,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, CopyState BeginCopyFrom(Relation rel, const char *filename, + bool is_program, List *attnamelist, List *options) { @@ -2413,9 +2495,11 @@ BeginCopyFrom(Relation rel, cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; + cstate->is_program = is_program; if (pipe) { + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); else @@ -2423,22 +2507,33 @@ BeginCopyFrom(Relation rel, } else { - struct stat st; - cstate->filename = pstrdup(filename); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); + if (cstate->is_program) + { + cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errmsg("could not execute command \"%s\": %m", + cstate->filename))); + } + else + { + struct stat st; - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename))); + + fstat(fileno(cstate->copy_file), &st); + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } } if (!cstate->binary) |