aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/libpq.sgml29
-rw-r--r--src/interfaces/libpq/fe-connect.c242
-rw-r--r--src/interfaces/libpq/libpq-fe.h4
-rw-r--r--src/interfaces/libpq/libpq-int.h3
4 files changed, 227 insertions, 51 deletions
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 0f375bf5f25..2620eec033d 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -811,7 +811,7 @@ postgresql://localhost/mydb
postgresql://user@localhost
postgresql://user:secret@localhost
postgresql://other@localhost/otherdb?connect_timeout=10&application_name=myapp
-postgresql://host1:123,host2:456/somedb
+postgresql://host1:123,host2:456/somedb?target_session_attrs=any&application_name=myapp
</programlisting>
Components of the hierarchical part of the <acronym>URI</acronym> can also
be given as parameters. For example:
@@ -1386,6 +1386,23 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="libpq-connect-target-session-attrs" xreflabel="target_session_attrs">
+ <term><literal>target_session_attrs</literal></term>
+ <listitem>
+ <para>
+ If this parameter is set to <literal>read-write</literal>, only a
+ connection in which read-write transactions are accepted by default
+ is considered acceptable. The query
+ <literal>show transaction_read_only</literal> will be sent upon any
+ successful connection; if it returns <literal>on</>, the connection
+ will be closed. If multiple hosts were specified in the connection
+ string, any remaining servers will be tried just as if the connection
+ attempt had failed. The default value of this parameter,
+ <literal>any</>, regards all connections as acceptable.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</para>
</sect2>
@@ -7069,6 +7086,16 @@ myEventProc(PGEventId evtId, void *evtInfo, void *passThrough)
linkend="libpq-connect-client-encoding"> connection parameter.
</para>
</listitem>
+
+ <listitem>
+ <para>
+ <indexterm>
+ <primary><envar>PGTARGETSESSIONATTRS</envar></primary>
+ </indexterm>
+ <envar>PGTARGETSESSIONATTRS</envar> behaves the same as the <xref
+ linkend="libpq-connect-target-session-attrs"> connection parameter.
+ </para>
+ </listitem>
</itemizedlist>
</para>
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3e9c45bc406..cd96ddb2f07 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -108,6 +108,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
#define DefaultOption ""
#define DefaultAuthtype ""
#define DefaultPassword ""
+#define DefaultTargetSessionAttrs "any"
#ifdef USE_SSL
#define DefaultSSLMode "prefer"
#else
@@ -300,6 +301,11 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"target_session_attrs", "PGTARGETSESSIONATTRS",
+ DefaultTargetSessionAttrs, NULL,
+ "Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
+ offsetof(struct pg_conn, target_session_attrs)},
+
/* Terminating entry --- MUST BE LAST */
{NULL, NULL, NULL, NULL,
NULL, NULL, 0}
@@ -336,6 +342,8 @@ static PGconn *makeEmptyPGconn(void);
static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
static void freePGconn(PGconn *conn);
static void closePGconn(PGconn *conn);
+static void release_all_addrinfo(PGconn *conn);
+static void sendTerminateConn(PGconn *conn);
static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage);
static PQconninfoOption *parse_connection_string(const char *conninfo,
PQExpBuffer errorMessage, bool use_defaults);
@@ -1026,6 +1034,22 @@ connectOptions2(PGconn *conn)
}
/*
+ * Validate target_session_attrs option.
+ */
+ if (conn->target_session_attrs)
+ {
+ if (strcmp(conn->target_session_attrs, "any") != 0
+ && strcmp(conn->target_session_attrs, "read-write") != 0)
+ {
+ conn->status = CONNECTION_BAD;
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("invalid target_session_attrs value: \"%s\"\n"),
+ conn->target_session_attrs);
+ return false;
+ }
+ }
+
+ /*
* Only if we get this far is it appropriate to try to connect. (We need a
* state flag, rather than just the boolean result of this function, in
* case someone tries to PQreset() the PGconn.)
@@ -1814,6 +1838,7 @@ PQconnectPoll(PGconn *conn)
/* Special cases: proceed without waiting. */
case CONNECTION_SSL_STARTUP:
case CONNECTION_NEEDED:
+ case CONNECTION_CHECK_WRITABLE:
break;
default:
@@ -2752,27 +2777,6 @@ keep_going: /* We will come back to here until there is
goto error_return;
}
- /* We can release the address lists now. */
- if (conn->connhost != NULL)
- {
- int i;
-
- for (i = 0; i < conn->nconnhost; ++i)
- {
- int family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
- if (conn->connhost[i].type == CHT_UNIX_SOCKET)
- family = AF_UNIX;
-#endif
-
- pg_freeaddrinfo_all(family,
- conn->connhost[i].addrlist);
- conn->connhost[i].addrlist = NULL;
- }
- }
- conn->addr_cur = NULL;
-
/* Fire up post-connection housekeeping if needed */
if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
{
@@ -2782,7 +2786,24 @@ keep_going: /* We will come back to here until there is
return PGRES_POLLING_WRITING;
}
- /* Otherwise, we are open for business! */
+ /*
+ * If a read-write connection is required, see if we have one.
+ */
+ if (conn->target_session_attrs != NULL &&
+ strcmp(conn->target_session_attrs, "read-write") == 0)
+ {
+ conn->status = CONNECTION_OK;
+ if (!PQsendQuery(conn,
+ "show transaction_read_only"))
+ goto error_return;
+ conn->status = CONNECTION_CHECK_WRITABLE;
+ return PGRES_POLLING_READING;
+ }
+
+ /* We can release the address lists now. */
+ release_all_addrinfo(conn);
+
+ /* We are open for business! */
conn->status = CONNECTION_OK;
return PGRES_POLLING_OK;
}
@@ -2814,10 +2835,109 @@ keep_going: /* We will come back to here until there is
goto error_return;
}
+ /*
+ * If a read-write connection is requisted check for same.
+ */
+ if (conn->target_session_attrs != NULL &&
+ strcmp(conn->target_session_attrs, "read-write") == 0)
+ {
+ conn->status = CONNECTION_OK;
+ if (!PQsendQuery(conn,
+ "show transaction_read_only"))
+ goto error_return;
+ conn->status = CONNECTION_CHECK_WRITABLE;
+ return PGRES_POLLING_READING;
+ }
+
+ /* We can release the address lists now. */
+ release_all_addrinfo(conn);
+
/* We are open for business! */
conn->status = CONNECTION_OK;
return PGRES_POLLING_OK;
+ case CONNECTION_CHECK_WRITABLE:
+ {
+ conn->status = CONNECTION_OK;
+ if (!PQconsumeInput(conn))
+ goto error_return;
+
+ if (PQisBusy(conn))
+ {
+ conn->status = CONNECTION_CHECK_WRITABLE;
+ return PGRES_POLLING_READING;
+ }
+
+ res = PQgetResult(conn);
+ if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) &&
+ PQntuples(res) == 1)
+ {
+ char *val;
+
+ val = PQgetvalue(res, 0, 0);
+ if (strncmp(val, "on", 2) == 0)
+ {
+ PQclear(res);
+
+ /* Not writable; close connection. */
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("could not make a writable "
+ "connection to server "
+ "\"%s:%s\"\n"),
+ conn->connhost[conn->whichhost].host,
+ conn->connhost[conn->whichhost].port);
+ conn->status = CONNECTION_OK;
+ sendTerminateConn(conn);
+ pqDropConnection(conn, true);
+
+ /* Skip any remaining addresses for this host. */
+ conn->addr_cur = NULL;
+ if (conn->whichhost + 1 < conn->nconnhost)
+ {
+ conn->status = CONNECTION_NEEDED;
+ goto keep_going;
+ }
+
+ /* No more addresses to try. So we fail. */
+ goto error_return;
+ }
+ PQclear(res);
+
+ /* We can release the address lists now. */
+ release_all_addrinfo(conn);
+
+ /* We are open for business! */
+ conn->status = CONNECTION_OK;
+ return PGRES_POLLING_OK;
+ }
+
+ /*
+ * Something went wrong with "show transaction_read_only". We
+ * should try next addresses.
+ */
+ if (res)
+ PQclear(res);
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("test \"show transaction_read_only\" failed "
+ " on \"%s:%s\" \n"),
+ conn->connhost[conn->whichhost].host,
+ conn->connhost[conn->whichhost].port);
+ conn->status = CONNECTION_OK;
+ sendTerminateConn(conn);
+ pqDropConnection(conn, true);
+
+ if (conn->addr_cur->ai_next != NULL ||
+ conn->whichhost + 1 < conn->nconnhost)
+ {
+ conn->addr_cur = conn->addr_cur->ai_next;
+ conn->status = CONNECTION_NEEDED;
+ goto keep_going;
+ }
+
+ /* No more addresses to try. So we fail. */
+ goto error_return;
+ }
+
default:
appendPQExpBuffer(&conn->errorMessage,
libpq_gettext("invalid connection state %d, "
@@ -3109,6 +3229,8 @@ freePGconn(PGconn *conn)
free(conn->outBuffer);
if (conn->rowBuf)
free(conn->rowBuf);
+ if (conn->target_session_attrs)
+ free(conn->target_session_attrs);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
@@ -3120,19 +3242,41 @@ freePGconn(PGconn *conn)
}
/*
- * closePGconn
- * - properly close a connection to the backend
- *
- * This should reset or release all transient state, but NOT the connection
- * parameters. On exit, the PGconn should be in condition to start a fresh
- * connection with the same parameters (see PQreset()).
+ * release_all_addrinfo
+ * - free addrinfo of all hostconn elements.
*/
+
static void
-closePGconn(PGconn *conn)
+release_all_addrinfo(PGconn *conn)
{
- PGnotify *notify;
- pgParameterStatus *pstatus;
+ if (conn->connhost != NULL)
+ {
+ int i;
+
+ for (i = 0; i < conn->nconnhost; ++i)
+ {
+ int family = AF_UNSPEC;
+
+#ifdef HAVE_UNIX_SOCKETS
+ if (conn->connhost[i].type == CHT_UNIX_SOCKET)
+ family = AF_UNIX;
+#endif
+ pg_freeaddrinfo_all(family,
+ conn->connhost[i].addrlist);
+ conn->connhost[i].addrlist = NULL;
+ }
+ }
+ conn->addr_cur = NULL;
+}
+
+/*
+ * sendTerminateConn
+ * - Send a terminate message to backend.
+ */
+static void
+sendTerminateConn(PGconn *conn)
+{
/*
* Note that the protocol doesn't allow us to send Terminate messages
* during the startup phase.
@@ -3147,6 +3291,23 @@ closePGconn(PGconn *conn)
pqPutMsgEnd(conn);
(void) pqFlush(conn);
}
+}
+
+/*
+ * closePGconn
+ * - properly close a connection to the backend
+ *
+ * This should reset or release all transient state, but NOT the connection
+ * parameters. On exit, the PGconn should be in condition to start a fresh
+ * connection with the same parameters (see PQreset()).
+ */
+static void
+closePGconn(PGconn *conn)
+{
+ PGnotify *notify;
+ pgParameterStatus *pstatus;
+
+ sendTerminateConn(conn);
/*
* Must reset the blocking status so a possible reconnect will work.
@@ -3165,25 +3326,8 @@ closePGconn(PGconn *conn)
conn->asyncStatus = PGASYNC_IDLE;
pqClearAsyncResult(conn); /* deallocate result */
resetPQExpBuffer(&conn->errorMessage);
- if (conn->connhost != NULL)
- {
- int i;
-
- for (i = 0; i < conn->nconnhost; ++i)
- {
- int family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
- if (conn->connhost[i].type == CHT_UNIX_SOCKET)
- family = AF_UNIX;
-#endif
+ release_all_addrinfo(conn);
- pg_freeaddrinfo_all(family,
- conn->connhost[i].addrlist);
- conn->connhost[i].addrlist = NULL;
- }
- }
- conn->addr_cur = NULL;
notify = conn->notifyHead;
while (notify != NULL)
{
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 9ca0756c4bf..20b7e57de76 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -62,7 +62,9 @@ typedef enum
* backend startup. */
CONNECTION_SETENV, /* Negotiating environment. */
CONNECTION_SSL_STARTUP, /* Negotiating SSL. */
- CONNECTION_NEEDED /* Internal state: connect() needed */
+ CONNECTION_NEEDED, /* Internal state: connect() needed */
+ CONNECTION_CHECK_WRITABLE /* Check if we could make a writable
+ * connection. */
} ConnStatusType;
typedef enum
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 854ec89924b..a2f85895a18 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -361,6 +361,9 @@ struct pg_conn
char *krbsrvname; /* Kerberos service name */
#endif
+ char *target_session_attrs; /* Type of connection to make
+ * Possible values any, read-write. */
+
/* Optional file to write trace info to */
FILE *Pfdebug;