aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/copy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r--src/backend/commands/copy.c205
1 files changed, 150 insertions, 55 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 523c1e03315..c651ea30280 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -58,7 +58,7 @@
*/
typedef enum CopyDest
{
- COPY_FILE, /* to/from file */
+ COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
COPY_NEW_FE /* to/from frontend (3.0 protocol) */
} CopyDest;
@@ -108,6 +108,7 @@ typedef struct CopyStateData
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
+ bool is_program; /* is 'filename' a program to popen? */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool freeze; /* freeze rows on loading? */
@@ -277,8 +278,10 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
const char *queryString, List *attnamelist, List *options);
static void EndCopy(CopyState cstate);
+static void ClosePipeToProgram(CopyState cstate);
static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
- const char *filename, List *attnamelist, List *options);
+ const char *filename, bool is_program, List *attnamelist,
+ List *options);
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
@@ -482,9 +485,35 @@ CopySendEndOfRow(CopyState cstate)
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write to COPY file: %m")));
+ {
+ if (cstate->is_program)
+ {
+ if (errno == EPIPE)
+ {
+ /*
+ * The pipe will be closed automatically on error at
+ * the end of transaction, but we might get a better
+ * error message from the subprocess' exit code than
+ * just "Broken Pipe"
+ */
+ ClosePipeToProgram(cstate);
+
+ /*
+ * If ClosePipeToProgram() didn't throw an error,
+ * the program terminated normally, but closed the
+ * pipe first. Restore errno, and throw an error.
+ */
+ errno = EPIPE;
+ }
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to COPY program: %m")));
+ }
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to COPY file: %m")));
+ }
break;
case COPY_OLD_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
@@ -752,13 +781,22 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
Relation rel;
Oid relid;
- /* Disallow file COPY except to superusers. */
+ /* Disallow COPY to/from file or program except to superusers. */
if (!pipe && !superuser())
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("must be superuser to COPY to or from a file"),
- errhint("Anyone can COPY to stdout or from stdin. "
- "psql's \\copy command also works for anyone.")));
+ {
+ if (stmt->is_program)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to COPY to or from an external program"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to COPY to or from a file"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+ }
if (stmt->relation)
{
@@ -812,14 +850,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
if (XactReadOnly && !rel->rd_islocaltemp)
PreventCommandIfReadOnly("COPY FROM");
- cstate = BeginCopyFrom(rel, stmt->filename,
+ cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
stmt->attlist, stmt->options);
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
else
{
- cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+ cstate = BeginCopyTo(rel, stmt->query, queryString,
+ stmt->filename, stmt->is_program,
stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
@@ -1390,16 +1429,44 @@ BeginCopy(bool is_from,
}
/*
+ * Closes the pipe to an external program, checking the pclose() return code.
+ */
+static void
+ClosePipeToProgram(CopyState cstate)
+{
+ int pclose_rc;
+
+ Assert(cstate->is_program);
+
+ pclose_rc = ClosePipeStream(cstate->copy_file);
+ if (pclose_rc == -1)
+ ereport(ERROR,
+ (errmsg("could not close pipe to external command: %m")));
+ else if (pclose_rc != 0)
+ ereport(ERROR,
+ (errmsg("program \"%s\" failed",
+ cstate->filename),
+ errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+}
+
+/*
* Release resources allocated in a cstate for COPY TO/FROM.
*/
static void
EndCopy(CopyState cstate)
{
- if (cstate->filename != NULL && FreeFile(cstate->copy_file))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close file \"%s\": %m",
- cstate->filename)));
+ if (cstate->is_program)
+ {
+ ClosePipeToProgram(cstate);
+ }
+ else
+ {
+ if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->filename)));
+ }
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
@@ -1413,6 +1480,7 @@ BeginCopyTo(Relation rel,
Node *query,
const char *queryString,
const char *filename,
+ bool is_program,
List *attnamelist,
List *options)
{
@@ -1451,39 +1519,52 @@ BeginCopyTo(Relation rel,
if (pipe)
{
+ Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
else
{
- mode_t oumask; /* Pre-existing umask value */
- struct stat st;
+ cstate->filename = pstrdup(filename);
+ cstate->is_program = is_program;
- /*
- * Prevent write to relative path ... too easy to shoot oneself in the
- * foot by overwriting a database file ...
- */
- if (!is_absolute_path(filename))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_NAME),
- errmsg("relative path not allowed for COPY to file")));
+ if (is_program)
+ {
+ cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errmsg("could not execute command \"%s\": %m",
+ cstate->filename)));
+ }
+ else
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
- cstate->filename = pstrdup(filename);
- oumask = umask(S_IWGRP | S_IWOTH);
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
- umask(oumask);
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for COPY to file")));
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for writing: %m",
- cstate->filename)));
+ oumask = umask(S_IWGRP | S_IWOTH);
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+ umask(oumask);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->filename)));
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ fstat(fileno(cstate->copy_file), &st);
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
}
MemoryContextSwitchTo(oldcontext);
@@ -2317,6 +2398,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
CopyState
BeginCopyFrom(Relation rel,
const char *filename,
+ bool is_program,
List *attnamelist,
List *options)
{
@@ -2413,9 +2495,11 @@ BeginCopyFrom(Relation rel,
cstate->defexprs = defexprs;
cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
+ cstate->is_program = is_program;
if (pipe)
{
+ Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
@@ -2423,22 +2507,33 @@ BeginCopyFrom(Relation rel,
}
else
{
- struct stat st;
-
cstate->filename = pstrdup(filename);
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename)));
+ if (cstate->is_program)
+ {
+ cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errmsg("could not execute command \"%s\": %m",
+ cstate->filename)));
+ }
+ else
+ {
+ struct stat st;
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for reading: %m",
+ cstate->filename)));
+
+ fstat(fileno(cstate->copy_file), &st);
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
}
if (!cstate->binary)