aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/common/printsimple.c21
-rw-r--r--src/backend/access/common/tupdesc.c8
-rw-r--r--src/backend/replication/walsender.c247
3 files changed, 114 insertions, 162 deletions
diff --git a/src/backend/access/common/printsimple.c b/src/backend/access/common/printsimple.c
index 420de65e207..5fe1c72da83 100644
--- a/src/backend/access/common/printsimple.c
+++ b/src/backend/access/common/printsimple.c
@@ -22,6 +22,7 @@
#include "catalog/pg_type.h"
#include "fmgr.h"
#include "libpq/pqformat.h"
+#include "utils/builtins.h"
/*
* At startup time, send a RowDescription message.
@@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
}
break;
+ case INT4OID:
+ {
+ int32 num = DatumGetInt32(value);
+ char str[12]; /* sign, 10 digits and '\0' */
+
+ pg_ltoa(num, str);
+ pq_sendcountedtext(&buf, str, strlen(str), false);
+ }
+ break;
+
+ case INT8OID:
+ {
+ int64 num = DatumGetInt64(value);
+ char str[23]; /* sign, 21 digits and '\0' */
+
+ pg_lltoa(num, str);
+ pq_sendcountedtext(&buf, str, strlen(str), false);
+ }
+ break;
+
default:
elog(ERROR, "unsupported type OID: %u", attr->atttypid);
}
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 083c0303dcf..4e2ebe1ae7e 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
att->attstorage = 'p';
att->attcollation = InvalidOid;
break;
+
+ case INT8OID:
+ att->attlen = 8;
+ att->attbyval = FLOAT8PASSBYVAL;
+ att->attalign = 'd';
+ att->attstorage = 'p';
+ att->attcollation = InvalidOid;
+ break;
}
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5909b7dd8c5..76f09fbdbf2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -302,13 +302,15 @@ WalSndShutdown(void)
static void
IdentifySystem(void)
{
- StringInfoData buf;
char sysid[32];
- char tli[11];
char xpos[MAXFNAMELEN];
XLogRecPtr logptr;
char *dbname = NULL;
- Size len;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[4];
+ bool nulls[4];
/*
* Reply with a result set with one row, four columns. First col is system
@@ -328,8 +330,6 @@ IdentifySystem(void)
else
logptr = GetFlushRecPtr();
- snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
if (MyDatabaseId != InvalidOid)
@@ -346,79 +346,42 @@ IdentifySystem(void)
MemoryContextSwitchTo(cur);
}
- /* Send a RowDescription message */
- pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 4, 2); /* 4 fields */
-
- /* first field */
- pq_sendstring(&buf, "systemid"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* second field */
- pq_sendstring(&buf, "timeline"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, INT4OID, 4); /* type oid */
- pq_sendint(&buf, 4, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* third field */
- pq_sendstring(&buf, "xlogpos"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ MemSet(nulls, false, sizeof(nulls));
- /* fourth field */
- pq_sendstring(&buf, "dbname"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
- pq_endmessage(&buf);
+ /* need a tuple descriptor representing four columns */
+ tupdesc = CreateTemplateTupleDesc(4, false);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
+ INT4OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
+ TEXTOID, -1, 0);
- /* Send a DataRow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 4, 2); /* # of columns */
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
/* column 1: system identifier */
- len = strlen(sysid);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) &sysid, len);
+ values[0] = CStringGetTextDatum(sysid);
/* column 2: timeline */
- len = strlen(tli);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) tli, len);
+ values[1] = Int32GetDatum(ThisTimeLineID);
/* column 3: xlog position */
- len = strlen(xpos);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) xpos, len);
+ values[2] = CStringGetTextDatum(xpos);
/* column 4: database name, or NULL if none */
if (dbname)
- {
- len = strlen(dbname);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) dbname, len);
- }
+ values[3] = CStringGetTextDatum(dbname);
else
- {
- pq_sendint(&buf, -1, 4);
- }
+ nulls[3] = true;
- pq_endmessage(&buf);
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
+
+ end_tup_output(tstate);
}
@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd)
*/
if (sendTimeLineIsHistoric)
{
- char tli_str[11];
char startpos_str[8 + 1 + 8 + 1];
- Size len;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[2];
+ bool nulls[2];
- snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint(&buf, 2, 2); /* 2 fields */
-
- /* Field header */
- pq_sendstring(&buf, "next_tli");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ MemSet(nulls, false, sizeof(nulls));
/*
+ * Need a tuple descriptor representing two columns.
* int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned.
*/
- pq_sendint(&buf, INT8OID, 4); /* type oid */
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
-
- pq_sendstring(&buf, "next_tli_startpos");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
- pq_endmessage(&buf);
+ tupdesc = CreateTemplateTupleDesc(2, false);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
+ INT8OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
+ TEXTOID, -1, 0);
- /* Data row */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 2, 2); /* number of columns */
+ /* prepare for projection of tuple */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
- len = strlen(tli_str);
- pq_sendint(&buf, len, 4); /* length */
- pq_sendbytes(&buf, tli_str, len);
+ values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
+ values[1] = CStringGetTextDatum(startpos_str);
- len = strlen(startpos_str);
- pq_sendint(&buf, len, 4); /* length */
- pq_sendbytes(&buf, startpos_str, len);
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
- pq_endmessage(&buf);
+ end_tup_output(tstate);
}
/* Send CommandComplete message */
@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN];
- StringInfoData buf;
- Size len;
+ char *slot_name;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[4];
+ bool nulls[4];
Assert(!MyReplicationSlot);
@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
(uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
(uint32) MyReplicationSlot->data.confirmed_flush);
- pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 4, 2); /* 4 fields */
-
- /* first field: slot name */
- pq_sendstring(&buf, "slot_name"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* second field: LSN at which we became consistent */
- pq_sendstring(&buf, "consistent_point"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* third field: exported snapshot's name */
- pq_sendstring(&buf, "snapshot_name"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* fourth field: output plugin */
- pq_sendstring(&buf, "output_plugin"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ MemSet(nulls, false, sizeof(nulls));
- pq_endmessage(&buf);
-
- /* Send a DataRow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 4, 2); /* # of columns */
+ /*
+ * Need a tuple descriptor representing four columns:
+ * - first field: the slot name
+ * - second field: LSN at which we became consistent
+ * - third field: exported snapshot's name
+ * - fourth field: output plugin
+ */
+ tupdesc = CreateTemplateTupleDesc(4, false);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
+ TEXTOID, -1, 0);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
/* slot_name */
- len = strlen(NameStr(MyReplicationSlot->data.name));
- pq_sendint(&buf, len, 4); /* col1 len */
- pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
+ slot_name = NameStr(MyReplicationSlot->data.name);
+ values[0] = CStringGetTextDatum(slot_name);
/* consistent wal location */
- len = strlen(xpos);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, xpos, len);
+ values[1] = CStringGetTextDatum(xpos);
/* snapshot name, or NULL if none */
if (snapshot_name != NULL)
- {
- len = strlen(snapshot_name);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, snapshot_name, len);
- }
+ values[2] = CStringGetTextDatum(snapshot_name);
else
- pq_sendint(&buf, -1, 4);
+ nulls[2] = true;
/* plugin, or NULL if none */
if (cmd->plugin != NULL)
- {
- len = strlen(cmd->plugin);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, cmd->plugin, len);
- }
+ values[3] = CStringGetTextDatum(cmd->plugin);
else
- pq_sendint(&buf, -1, 4);
+ nulls[3] = true;
- pq_endmessage(&buf);
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
+ end_tup_output(tstate);
ReplicationSlotRelease();
}