diff options
Diffstat (limited to 'src/backend/tcop/postgres.c')
-rw-r--r-- | src/backend/tcop/postgres.c | 959 |
1 files changed, 883 insertions, 76 deletions
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index b60898270a6..d57ccd973b2 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.331 2003/05/03 05:13:20 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.332 2003/05/05 00:44:56 tgl Exp $ * * NOTES * this is the "main" module of the postgres backend and @@ -33,8 +33,11 @@ #include <getopt.h> #endif +#include "access/printtup.h" #include "access/xlog.h" +#include "catalog/pg_type.h" #include "commands/async.h" +#include "commands/prepare.h" #include "commands/trigger.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -54,6 +57,7 @@ #include "tcop/tcopprot.h" #include "tcop/utility.h" #include "utils/guc.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "mb/pg_wchar.h" @@ -82,7 +86,15 @@ bool InError = false; extern bool autocommit; -static bool EchoQuery = false; /* default don't echo */ +/* + * Flags for expensive function optimization -- JMH 3/9/92 + */ +int XfuncMode = 0; + +/* ---------------- + * private variables + * ---------------- + */ /* * Flag to mark SIGHUP. Whenever the main loop comes around it @@ -91,23 +103,41 @@ static bool EchoQuery = false; /* default don't echo */ */ static volatile bool got_SIGHUP = false; -/* ---------------- - * people who want to use EOF should #define DONTUSENEWLINE in - * tcop/tcopdebug.h - * ---------------- +/* + * Flag to keep track of whether we have started a transaction. + * For extended query protocol this has to be remembered across messages. + */ +static bool xact_started = false; + +/* + * Flags to implement skip-till-Sync-after-error behavior for messages of + * the extended query protocol. + */ +static bool doing_extended_query_message = false; +static bool ignore_till_sync = false; + +/* + * If an unnamed prepared statement exists, it's stored here. + * We keep it separate from the hashtable kept by commands/prepare.c + * in order to reduce overhead for short-lived queries. + */ +static MemoryContext unnamed_stmt_context = NULL; +static PreparedStatement *unnamed_stmt_pstmt = NULL; + + +static bool EchoQuery = false; /* default don't echo */ + +/* + * people who want to use EOF should #define DONTUSENEWLINE in + * tcop/tcopdebug.h */ #ifndef TCOP_DONTUSENEWLINE -int UseNewLine = 1; /* Use newlines query delimiters (the +static int UseNewLine = 1; /* Use newlines query delimiters (the * default) */ - #else -int UseNewLine = 0; /* Use EOF as query delimiters */ +static int UseNewLine = 0; /* Use EOF as query delimiters */ #endif /* TCOP_DONTUSENEWLINE */ -/* -** Flags for expensive function optimization -- JMH 3/9/92 -*/ -int XfuncMode = 0; /* ---------------------------------------------------------------- * decls for routines only used in this file @@ -254,10 +284,14 @@ SocketBackend(StringInfo inBuf) * Validate message type code before trying to read body; if we have * lost sync, better to say "command unknown" than to run out of memory * because we used garbage as a length word. + * + * This also gives us a place to set the doing_extended_query_message + * flag as soon as possible. */ switch (qtype) { case 'Q': /* simple query */ + doing_extended_query_message = false; if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) { /* old style without length word; convert */ @@ -270,15 +304,43 @@ SocketBackend(StringInfo inBuf) break; case 'F': /* fastpath function call */ + /* we let fastpath.c cope with old-style input of this */ + doing_extended_query_message = false; break; case 'X': /* terminate */ + doing_extended_query_message = false; + break; + + case 'B': /* bind */ + case 'C': /* close */ + case 'D': /* describe */ + case 'E': /* execute */ + case 'H': /* flush */ + case 'P': /* parse */ + doing_extended_query_message = true; + /* these are only legal in protocol 3 */ + if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) + elog(FATAL, "Socket command type %c unknown", qtype); + break; + + case 'S': /* sync */ + /* stop any active skip-till-Sync */ + ignore_till_sync = false; + /* mark not-extended, so that a new error doesn't begin skip */ + doing_extended_query_message = false; + /* only legal in protocol 3 */ + if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) + elog(FATAL, "Socket command type %c unknown", qtype); break; case 'd': /* copy data */ case 'c': /* copy done */ case 'f': /* copy fail */ - /* Accept but ignore these messages, per protocol spec */ + doing_extended_query_message = false; + /* these are only legal in protocol 3 */ + if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) + elog(FATAL, "Socket command type %c unknown", qtype); break; default: @@ -410,9 +472,6 @@ List * pg_analyze_and_rewrite(Node *parsetree, Oid *paramTypes, int numParams) { List *querytree_list; - List *list_item; - Query *querytree; - List *new_list; /* * (1) Perform parse analysis. @@ -423,21 +482,35 @@ pg_analyze_and_rewrite(Node *parsetree, Oid *paramTypes, int numParams) querytree_list = parse_analyze(parsetree, paramTypes, numParams); if (log_parser_stats) - { ShowUsage("PARSE ANALYSIS STATISTICS"); - ResetUsage(); - } /* * (2) Rewrite the queries, as necessary - * + */ + querytree_list = pg_rewrite_queries(querytree_list); + + return querytree_list; +} + +/* + * Perform rewriting of a list of queries produced by parse analysis. + */ +List * +pg_rewrite_queries(List *querytree_list) +{ + List *new_list = NIL; + List *list_item; + + if (log_parser_stats) + ResetUsage(); + + /* * rewritten queries are collected in new_list. Note there may be more * or fewer than in the original list. */ - new_list = NIL; foreach(list_item, querytree_list) { - querytree = (Query *) lfirst(list_item); + Query *querytree = (Query *) lfirst(list_item); if (Debug_print_parse) elog_node_display(LOG, "parse tree", querytree, @@ -471,7 +544,7 @@ pg_analyze_and_rewrite(Node *parsetree, Oid *paramTypes, int numParams) new_list = (List *) copyObject(querytree_list); /* This checks both copyObject() and the equal() routines... */ if (!equal(new_list, querytree_list)) - elog(WARNING, "pg_analyze_and_rewrite: copyObject failed on parse tree"); + elog(WARNING, "pg_rewrite_queries: copyObject failed on parse tree"); else querytree_list = new_list; #endif @@ -576,15 +649,13 @@ pg_plan_queries(List *querytrees, bool needSnapshot) /* - * exec_simple_query() + * exec_simple_query * * Execute a "simple Query" protocol message. */ static void -exec_simple_query(const char *query_string, /* string to execute */ - CommandDest dest) /* where results should go */ +exec_simple_query(const char *query_string) { - bool xact_started; MemoryContext oldcontext; List *parsetree_list, *parsetree_item; @@ -619,13 +690,28 @@ exec_simple_query(const char *query_string, /* string to execute */ * that this will normally change current memory context.) */ start_xact_command(); - xact_started = true; + + /* + * Zap any pre-existing unnamed statement. (While not strictly + * necessary, it seems best to define simple-Query mode as if it + * used the unnamed statement and portal; this ensures we recover + * any storage used by prior unnamed operations.) + */ + unnamed_stmt_pstmt = NULL; + if (unnamed_stmt_context) + { + DropDependentPortals(unnamed_stmt_context); + MemoryContextDelete(unnamed_stmt_context); + } + unnamed_stmt_context = NULL; /* * Switch to appropriate context for constructing parsetrees. */ oldcontext = MemoryContextSwitchTo(MessageContext); + QueryContext = CurrentMemoryContext; + /* * Do basic parsing of the query or queries (this should be safe even * if we are in aborted transaction state!) @@ -659,7 +745,7 @@ exec_simple_query(const char *query_string, /* string to execute */ set_ps_display(commandTag); - BeginCommand(commandTag, dest); + BeginCommand(commandTag, whereToSendOutput); /* * If we are in an aborted transaction, reject all commands except @@ -688,11 +774,7 @@ exec_simple_query(const char *query_string, /* string to execute */ } /* Make sure we are in a transaction command */ - if (!xact_started) - { - start_xact_command(); - xact_started = true; - } + start_xact_command(); /* If we got a cancel signal in parsing or prior command, quit */ CHECK_FOR_INTERRUPTS(); @@ -735,37 +817,40 @@ exec_simple_query(const char *query_string, /* string to execute */ */ PortalStart(portal, NULL); - (void) PortalRun(portal, FETCH_ALL, dest, dest, completionTag); + (void) PortalRun(portal, + FETCH_ALL, + whereToSendOutput, + whereToSendOutput, + completionTag); PortalDrop(portal, false); - /* - * If this was a transaction control statement or a variable - * set/show/reset statement, commit it and arrange to start a - * new xact command for the next command (if any). - */ + if (IsA(parsetree, TransactionStmt) || IsA(parsetree, VariableSetStmt) || IsA(parsetree, VariableShowStmt) || IsA(parsetree, VariableResetStmt)) { + /* + * If this was a transaction control statement or a variable + * set/show/reset statement, commit it. We will start a + * new xact command for the next command (if any). + */ finish_xact_command(true); - xact_started = false; } - /* - * If this is the last parsetree of the query string, close down - * transaction statement before reporting command-complete. This - * is so that any end-of-transaction errors are reported before - * the command-complete message is issued, to avoid confusing - * clients who will expect either a command-complete message or an - * error, not one and then the other. But for compatibility with - * historical Postgres behavior, we do not force a transaction - * boundary between queries appearing in a single query string. - */ else if (lnext(parsetree_item) == NIL || !autocommit) { + /* + * If this is the last parsetree of the query string, close down + * transaction statement before reporting command-complete. This + * is so that any end-of-transaction errors are reported before + * the command-complete message is issued, to avoid confusing + * clients who will expect either a command-complete message or an + * error, not one and then the other. But for compatibility with + * historical Postgres behavior, we do not force a transaction + * boundary between queries appearing in a single query string. + */ finish_xact_command(false); - xact_started = false; } else { @@ -783,20 +868,21 @@ exec_simple_query(const char *query_string, /* string to execute */ * (But a command aborted by error will not send an EndCommand * report at all.) */ - EndCommand(completionTag, dest); + EndCommand(completionTag, whereToSendOutput); } /* end loop over parsetrees */ /* * If there were no parsetrees, return EmptyQueryResponse message. */ if (!parsetree_list) - NullCommand(dest); + NullCommand(whereToSendOutput); + + QueryContext = NULL; /* * Close down transaction statement, if one is open. */ - if (xact_started) - finish_xact_command(false); + finish_xact_command(false); /* * Finish up monitoring. @@ -821,38 +907,608 @@ exec_simple_query(const char *query_string, /* string to execute */ } /* + * exec_parse_message + * + * Execute a "Parse" protocol message. + */ +static void +exec_parse_message(const char *query_string, /* string to execute */ + const char *stmt_name, /* name for prepared stmt */ + Oid *paramTypes, /* parameter types */ + int numParams) /* number of parameters */ +{ + MemoryContext oldcontext; + List *parsetree_list; + const char *commandTag; + List *querytree_list, + *plantree_list, + *param_list; + bool is_named; + bool save_log_statement_stats = log_statement_stats; + + /* + * Report query to various monitoring facilities. + */ + debug_query_string = query_string; + + pgstat_report_activity(query_string); + + set_ps_display("PARSE"); + + if (save_log_statement_stats) + ResetUsage(); + + /* + * Start up a transaction command so we can run parse analysis etc. + * (Note that this will normally change current memory context.) + * Nothing happens if we are already in one. + */ + start_xact_command(); + + /* + * Switch to appropriate context for constructing parsetrees. + * + * We have two strategies depending on whether the prepared statement + * is named or not. For a named prepared statement, we do parsing + * in MessageContext and copy the finished trees into the prepared + * statement's private context; then the reset of MessageContext releases + * temporary space used by parsing and planning. For an unnamed prepared + * statement, we assume the statement isn't going to hang around long, + * so getting rid of temp space quickly is probably not worth the costs + * of copying parse/plan trees. So in this case, we set up a special + * context for the unnamed statement, and do all the parsing/planning + * therein. + */ + is_named = (stmt_name[0] != '\0'); + if (is_named) + { + /* Named prepared statement --- parse in MessageContext */ + oldcontext = MemoryContextSwitchTo(MessageContext); + } + else + { + /* Unnamed prepared statement --- release any prior unnamed stmt */ + unnamed_stmt_pstmt = NULL; + if (unnamed_stmt_context) + { + DropDependentPortals(unnamed_stmt_context); + MemoryContextDelete(unnamed_stmt_context); + } + unnamed_stmt_context = NULL; + /* create context for parsing/planning */ + unnamed_stmt_context = + AllocSetContextCreate(TopMemoryContext, + "unnamed prepared statement", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcontext = MemoryContextSwitchTo(unnamed_stmt_context); + } + + QueryContext = CurrentMemoryContext; + + /* + * Do basic parsing of the query or queries (this should be safe even + * if we are in aborted transaction state!) + */ + parsetree_list = pg_parse_query(query_string); + + /* + * We only allow a single user statement in a prepared statement. + * This is mainly to keep the protocol simple --- otherwise we'd need + * to worry about multiple result tupdescs and things like that. + */ + if (length(parsetree_list) > 1) + elog(ERROR, "Cannot insert multiple commands into a prepared statement"); + + if (parsetree_list != NIL) + { + Node *parsetree = (Node *) lfirst(parsetree_list); + int i; + + /* + * Get the command name for possible use in status display. + */ + commandTag = CreateCommandTag(parsetree); + + /* + * If we are in an aborted transaction, reject all commands except + * COMMIT/ROLLBACK. It is important that this test occur before we + * try to do parse analysis, rewrite, or planning, since all those + * phases try to do database accesses, which may fail in abort + * state. (It might be safe to allow some additional utility + * commands in this state, but not many...) + */ + if (IsAbortedTransactionBlockState()) + { + bool allowit = false; + + if (IsA(parsetree, TransactionStmt)) + { + TransactionStmt *stmt = (TransactionStmt *) parsetree; + + if (stmt->kind == TRANS_STMT_COMMIT || + stmt->kind == TRANS_STMT_ROLLBACK) + allowit = true; + } + + if (!allowit) + elog(ERROR, "current transaction is aborted, " + "queries ignored until end of transaction block"); + } + + /* + * OK to analyze, rewrite, and plan this query. Note that the + * originally specified parameter set is not required to be + * complete, so we have to use parse_analyze_varparams(). + */ + if (log_parser_stats) + ResetUsage(); + + querytree_list = parse_analyze_varparams(parsetree, + ¶mTypes, + &numParams); + + /* + * Check all parameter types got determined, and convert array + * representation to a list for storage. + */ + param_list = NIL; + for (i = 0; i < numParams; i++) + { + Oid ptype = paramTypes[i]; + + if (ptype == InvalidOid || ptype == UNKNOWNOID) + elog(ERROR, "Could not determine datatype of parameter $%d", + i + 1); + param_list = lappendo(param_list, ptype); + } + + if (log_parser_stats) + ShowUsage("PARSE ANALYSIS STATISTICS"); + + querytree_list = pg_rewrite_queries(querytree_list); + + plantree_list = pg_plan_queries(querytree_list, true); + } + else + { + /* Empty input string. This is legal. */ + commandTag = NULL; + querytree_list = NIL; + plantree_list = NIL; + param_list = NIL; + } + + /* If we got a cancel signal in analysis or planning, quit */ + CHECK_FOR_INTERRUPTS(); + + /* + * Store the query as a prepared statement. See above comments. + */ + if (is_named) + { + StorePreparedStatement(stmt_name, + query_string, + commandTag, + querytree_list, + plantree_list, + param_list); + } + else + { + PreparedStatement *pstmt; + + pstmt = (PreparedStatement *) palloc0(sizeof(PreparedStatement)); + /* query_string needs to be copied into unnamed_stmt_context */ + pstmt->query_string = pstrdup(query_string); + /* the rest is there already */ + pstmt->commandTag = commandTag; + pstmt->query_list = querytree_list; + pstmt->plan_list = plantree_list; + pstmt->argtype_list = param_list; + pstmt->context = unnamed_stmt_context; + /* Now the unnamed statement is complete and valid */ + unnamed_stmt_pstmt = pstmt; + } + + MemoryContextSwitchTo(oldcontext); + + QueryContext = NULL; + + /* + * We do NOT close the open transaction command here; that only happens + * when the client sends Sync. Instead, do CommandCounterIncrement just + * in case something happened during parse/plan. + */ + CommandCounterIncrement(); + + /* + * Send ParseComplete. + */ + if (whereToSendOutput == Remote) + pq_putemptymessage('1'); + + if (save_log_statement_stats) + ShowUsage("PARSE MESSAGE STATISTICS"); + + debug_query_string = NULL; +} + +/* + * exec_bind_message + * + * Process a "Bind" message to create a portal from a prepared statement + */ +static void +exec_bind_message(StringInfo input_message) +{ + const char *portal_name; + const char *stmt_name; + int is_binary; + int numParams; + PreparedStatement *pstmt; + Portal portal; + ParamListInfo params; + + pgstat_report_activity("<BIND>"); + + set_ps_display("BIND"); + + /* + * Start up a transaction command so we can call functions etc. + * (Note that this will normally change current memory context.) + * Nothing happens if we are already in one. + */ + start_xact_command(); + + /* Get the fixed part of the message */ + portal_name = pq_getmsgstring(input_message); + stmt_name = pq_getmsgstring(input_message); + is_binary = pq_getmsgbyte(input_message); + numParams = pq_getmsgint(input_message, 4); + + if (is_binary) + elog(ERROR, "Binary BIND not implemented yet"); + + /* Find prepared statement */ + if (stmt_name[0] != '\0') + pstmt = FetchPreparedStatement(stmt_name, true); + else + { + /* special-case the unnamed statement */ + pstmt = unnamed_stmt_pstmt; + if (!pstmt) + elog(ERROR, "Unnamed prepared statement does not exist"); + } + + if (numParams != length(pstmt->argtype_list)) + elog(ERROR, "Bind message supplies %d parameters, but prepared statement \"%s\" requires %d", + numParams, stmt_name, length(pstmt->argtype_list)); + + /* + * Create the portal. Allow silent replacement of an existing portal + * only if the unnamed portal is specified. + */ + if (portal_name[0] == '\0') + portal = CreatePortal(portal_name, true, true); + else + portal = CreatePortal(portal_name, false, false); + + PortalDefineQuery(portal, + pstmt->query_string, + pstmt->commandTag, + pstmt->query_list, + pstmt->plan_list, + pstmt->context); + + /* + * Fetch parameters, if any, and store in the portal's memory context. + * + * In an aborted transaction, we can't risk calling user-defined functions, + * so bind all parameters to null values. + */ + if (numParams > 0) + { + bool isaborted = IsAbortedTransactionBlockState(); + int i = 0; + List *l; + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + + params = (ParamListInfo) + palloc0((numParams + 1) * sizeof(ParamListInfoData)); + + foreach(l, pstmt->argtype_list) + { + Oid ptype = lfirsto(l); + bool isNull; + + isNull = (pq_getmsgbyte(input_message) != 0) ? false : true; + if (!isNull) + { + const char *ptext = pq_getmsgstring(input_message); + + if (isaborted) + isNull = true; + else + { + Oid typInput; + Oid typElem; + + getTypeInputInfo(ptype, &typInput, &typElem); + params[i].value = + OidFunctionCall3(typInput, + CStringGetDatum(ptext), + ObjectIdGetDatum(typElem), + Int32GetDatum(-1)); + } + } + params[i].kind = PARAM_NUM; + params[i].id = i + 1; + params[i].isnull = isNull; + + i++; + } + + params[i].kind = PARAM_INVALID; + + MemoryContextSwitchTo(oldContext); + } + else + params = NULL; + + pq_getmsgend(input_message); + + /* + * Start portal execution. + */ + PortalStart(portal, params); + + /* + * Send BindComplete. + */ + if (whereToSendOutput == Remote) + pq_putemptymessage('2'); +} + +/* + * exec_execute_message + * + * Process an "Execute" message for a portal + */ +static void +exec_execute_message(const char *portal_name, int is_binary, long max_rows) +{ + CommandDest dest; + Portal portal; + bool is_trans_stmt = false; + bool is_trans_exit = false; + bool completed; + char completionTag[COMPLETION_TAG_BUFSIZE]; + + /* Adjust destination to tell printtup.c what to do */ + dest = whereToSendOutput; + if (dest == Remote) + dest = is_binary ? RemoteExecuteInternal : RemoteExecute; + + portal = GetPortalByName(portal_name); + if (!PortalIsValid(portal)) + elog(ERROR, "Portal \"%s\" not found", portal_name); + + /* + * If the original query was a null string, just return EmptyQueryResponse. + */ + if (portal->commandTag == NULL) + { + Assert(portal->parseTrees == NIL); + NullCommand(dest); + return; + } + + if (portal->sourceText) + { + debug_query_string = portal->sourceText; + pgstat_report_activity(portal->sourceText); + } + else + { + debug_query_string = "execute message"; + pgstat_report_activity("<EXECUTE>"); + } + + set_ps_display(portal->commandTag); + + BeginCommand(portal->commandTag, dest); + + /* Check for transaction-control commands */ + if (length(portal->parseTrees) == 1) + { + Query *query = (Query *) lfirst(portal->parseTrees); + + if (query->commandType == CMD_UTILITY && + query->utilityStmt != NULL && + IsA(query->utilityStmt, TransactionStmt)) + { + TransactionStmt *stmt = (TransactionStmt *) query->utilityStmt; + + is_trans_stmt = true; + if (stmt->kind == TRANS_STMT_COMMIT || + stmt->kind == TRANS_STMT_ROLLBACK) + is_trans_exit = true; + } + } + + /* + * Ensure we are in a transaction command (this should normally be + * the case already due to prior BIND). + */ + start_xact_command(); + + /* + * If we are in aborted transaction state, the only portals we can + * actually run are those containing COMMIT or ROLLBACK commands. + */ + if (IsAbortedTransactionBlockState()) + { + if (!is_trans_exit) + elog(ERROR, "current transaction is aborted, " + "queries ignored until end of transaction block"); + } + + /* Check for cancel signal before we start execution */ + CHECK_FOR_INTERRUPTS(); + + /* + * Okay to run the portal. + */ + if (max_rows <= 0) + max_rows = FETCH_ALL; + + completed = PortalRun(portal, + max_rows, + dest, + dest, + completionTag); + + if (completed) + { + if (is_trans_stmt) + { + /* + * If this was a transaction control statement, commit it. We will + * start a new xact command for the next command (if any). + */ + finish_xact_command(true); + } + else + { + /* + * We need a CommandCounterIncrement after every query, + * except those that start or end a transaction block. + */ + CommandCounterIncrement(); + } + + /* Send appropriate CommandComplete to client */ + EndCommand(completionTag, dest); + } + else + { + /* Portal run not complete, so send PortalSuspended */ + if (whereToSendOutput == Remote) + pq_putemptymessage('s'); + } + + debug_query_string = NULL; +} + +/* + * exec_describe_statement_message + * + * Process a "Describe" message for a prepared statement + */ +static void +exec_describe_statement_message(const char *stmt_name) +{ + PreparedStatement *pstmt; + List *l; + StringInfoData buf; + + /* Find prepared statement */ + if (stmt_name[0] != '\0') + pstmt = FetchPreparedStatement(stmt_name, true); + else + { + /* special-case the unnamed statement */ + pstmt = unnamed_stmt_pstmt; + if (!pstmt) + elog(ERROR, "Unnamed prepared statement does not exist"); + } + + if (whereToSendOutput != Remote) + return; /* can't actually do anything... */ + + pq_beginmessage(&buf, 't'); /* parameter description message type */ + pq_sendint(&buf, length(pstmt->argtype_list), 4); + + foreach(l, pstmt->argtype_list) + { + Oid ptype = lfirsto(l); + + pq_sendint(&buf, (int) ptype, 4); + } + pq_endmessage(&buf); +} + +/* + * exec_describe_portal_message + * + * Process a "Describe" message for a portal + */ +static void +exec_describe_portal_message(const char *portal_name) +{ + Portal portal; + + portal = GetPortalByName(portal_name); + if (!PortalIsValid(portal)) + elog(ERROR, "Portal \"%s\" not found", portal_name); + + if (whereToSendOutput != Remote) + return; /* can't actually do anything... */ + + if (portal->tupDesc) + SendRowDescriptionMessage(portal->tupDesc); + else + pq_putemptymessage('n'); /* NoData */ +} + + +/* * Convenience routines for starting/committing a single command. */ static void start_xact_command(void) { - elog(DEBUG1, "StartTransactionCommand"); - StartTransactionCommand(false); + if (!xact_started) + { + elog(DEBUG2, "StartTransactionCommand"); + StartTransactionCommand(false); + + /* Set statement timeout running, if any */ + if (StatementTimeout > 0) + enable_sig_alarm(StatementTimeout, true); - /* Set statement timeout running, if any */ - if (StatementTimeout > 0) - enable_sig_alarm(StatementTimeout, true); + xact_started = true; + } } static void finish_xact_command(bool forceCommit) { - /* Invoke IMMEDIATE constraint triggers */ - DeferredTriggerEndQuery(); + if (xact_started) + { + /* Invoke IMMEDIATE constraint triggers */ + DeferredTriggerEndQuery(); - /* Cancel any active statement timeout before committing */ - disable_sig_alarm(true); + /* Cancel any active statement timeout before committing */ + disable_sig_alarm(true); - /* Now commit the command */ - elog(DEBUG1, "CommitTransactionCommand"); + /* Now commit the command */ + elog(DEBUG2, "CommitTransactionCommand"); - CommitTransactionCommand(forceCommit); + CommitTransactionCommand(forceCommit); #ifdef SHOW_MEMORY_STATS - /* Print mem stats at each commit for leak tracking */ - if (ShowStats) - MemoryContextStats(TopMemoryContext); + /* Print mem stats at each commit for leak tracking */ + if (ShowStats) + MemoryContextStats(TopMemoryContext); #endif + + xact_started = false; + } } @@ -1679,7 +2335,7 @@ PostgresMain(int argc, char *argv[], const char *username) if (!IsUnderPostmaster) { puts("\nPOSTGRES backend interactive interface "); - puts("$Revision: 1.331 $ $Date: 2003/05/03 05:13:20 $\n"); + puts("$Revision: 1.332 $ $Date: 2003/05/05 00:44:56 $\n"); } /* @@ -1756,6 +2412,14 @@ PostgresMain(int argc, char *argv[], const char *username) * successfully. (Flag was set in elog.c before longjmp().) */ InError = false; + xact_started = false; + + /* + * If we were handling an extended-query-protocol message, + * initiate skip till next Sync. + */ + if (doing_extended_query_message) + ignore_till_sync = true; /* * Exit interrupt holdoff section we implicitly established above. @@ -1776,6 +2440,12 @@ PostgresMain(int argc, char *argv[], const char *username) for (;;) { /* + * At top of loop, reset extended-query-message flag, so that + * any errors encountered in "idle" state don't provoke skip. + */ + doing_extended_query_message = false; + + /* * Release storage left over from prior query cycle, and create a * new query input buffer in the cleared MessageContext. */ @@ -1853,20 +2523,74 @@ PostgresMain(int argc, char *argv[], const char *username) } /* - * (6) process the command. + * (6) process the command. But ignore it if we're skipping till Sync. */ + if (ignore_till_sync) + continue; + switch (firstchar) { case 'Q': /* simple query */ { - const char *query_string = pq_getmsgstring(input_message); + const char *query_string; + + query_string = pq_getmsgstring(input_message); + pq_getmsgend(input_message); - exec_simple_query(query_string, whereToSendOutput); + exec_simple_query(query_string); send_rfq = true; } break; + case 'P': /* parse */ + { + const char *stmt_name; + const char *query_string; + int numParams; + Oid *paramTypes = NULL; + + stmt_name = pq_getmsgstring(input_message); + query_string = pq_getmsgstring(input_message); + numParams = pq_getmsgint(input_message, 4); + if (numParams > 0) + { + int i; + + paramTypes = (Oid *) palloc(numParams * sizeof(Oid)); + for (i = 0; i < numParams; i++) + paramTypes[i] = pq_getmsgint(input_message, 4); + } + pq_getmsgend(input_message); + + exec_parse_message(query_string, stmt_name, + paramTypes, numParams); + } + break; + + case 'B': /* bind */ + /* + * this message is complex enough that it seems best to put + * the field extraction out-of-line + */ + exec_bind_message(input_message); + break; + + case 'E': /* execute */ + { + const char *portal_name; + int is_binary; + int max_rows; + + portal_name = pq_getmsgstring(input_message); + is_binary = pq_getmsgbyte(input_message); + max_rows = pq_getmsgint(input_message, 4); + pq_getmsgend(input_message); + + exec_execute_message(portal_name, is_binary, max_rows); + } + break; + case 'F': /* fastpath function call */ /* Tell the collector what we're doing */ pgstat_report_activity("<FASTPATH> function call"); @@ -1894,6 +2618,89 @@ PostgresMain(int argc, char *argv[], const char *username) send_rfq = true; break; + case 'C': /* close */ + { + int close_type; + const char *close_target; + + close_type = pq_getmsgbyte(input_message); + close_target = pq_getmsgstring(input_message); + pq_getmsgend(input_message); + + switch (close_type) + { + case 'S': + if (close_target[0] != '\0') + DropPreparedStatement(close_target, false); + else + { + /* special-case the unnamed statement */ + unnamed_stmt_pstmt = NULL; + if (unnamed_stmt_context) + { + DropDependentPortals(unnamed_stmt_context); + MemoryContextDelete(unnamed_stmt_context); + } + unnamed_stmt_context = NULL; + } + break; + case 'P': + { + Portal portal; + + portal = GetPortalByName(close_target); + if (PortalIsValid(portal)) + PortalDrop(portal, false); + } + break; + default: + elog(ERROR, "Invalid Close message subtype %d", + close_type); + break; + } + + if (whereToSendOutput == Remote) + pq_putemptymessage('3'); /* CloseComplete */ + } + break; + + case 'D': /* describe */ + { + int describe_type; + const char *describe_target; + + describe_type = pq_getmsgbyte(input_message); + describe_target = pq_getmsgstring(input_message); + pq_getmsgend(input_message); + + switch (describe_type) + { + case 'S': + exec_describe_statement_message(describe_target); + break; + case 'P': + exec_describe_portal_message(describe_target); + break; + default: + elog(ERROR, "Invalid Describe message subtype %d", + describe_type); + break; + } + } + break; + + case 'H': /* flush */ + pq_getmsgend(input_message); + if (whereToSendOutput == Remote) + pq_flush(); + break; + + case 'S': /* sync */ + pq_getmsgend(input_message); + finish_xact_command(false); + send_rfq = true; + break; + /* * 'X' means that the frontend is closing down the socket. * EOF means unexpected loss of frontend connection. |