aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/proto.c26
-rw-r--r--src/backend/replication/logical/worker.c87
-rw-r--r--src/include/replication/logicalproto.h27
3 files changed, 83 insertions, 57 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index eb19142b486..fdb31182d77 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
void
logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
{
- pq_sendbyte(out, 'B'); /* BEGIN */
+ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
{
uint8 flags = 0;
- pq_sendbyte(out, 'C'); /* sending COMMIT */
+ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
/* send the flags field (unused for now) */
pq_sendbyte(out, flags);
@@ -112,7 +112,7 @@ void
logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn)
{
- pq_sendbyte(out, 'O'); /* ORIGIN */
+ pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
/* fixed fields */
pq_sendint64(out, origin_lsn);
@@ -141,7 +141,7 @@ void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
HeapTuple newtuple, bool binary)
{
- pq_sendbyte(out, 'I'); /* action INSERT */
+ pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
@@ -185,7 +185,7 @@ void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{
- pq_sendbyte(out, 'U'); /* action UPDATE */
+ pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
- pq_sendbyte(out, 'D'); /* action DELETE */
+ pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
int i;
uint8 flags = 0;
- pq_sendbyte(out, 'T'); /* action TRUNCATE */
+ pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
{
char *relname;
- pq_sendbyte(out, 'R'); /* sending RELATION */
+ pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
HeapTuple tup;
Form_pg_type typtup;
- pq_sendbyte(out, 'Y'); /* sending TYPE */
+ pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
@@ -755,7 +755,7 @@ void
logicalrep_write_stream_start(StringInfo out,
TransactionId xid, bool first_segment)
{
- pq_sendbyte(out, 'S'); /* action STREAM START */
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
Assert(TransactionIdIsValid(xid));
@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
void
logicalrep_write_stream_stop(StringInfo out)
{
- pq_sendbyte(out, 'E'); /* action STREAM END */
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
}
/*
@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
{
uint8 flags = 0;
- pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
Assert(TransactionIdIsValid(txn->xid));
@@ -849,7 +849,7 @@ void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid)
{
- pq_sendbyte(out, 'A'); /* action STREAM ABORT */
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b0f27e0af85..04684912dea 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s)
static void
apply_dispatch(StringInfo s)
{
- char action = pq_getmsgbyte(s);
+ LogicalRepMsgType action = pq_getmsgbyte(s);
switch (action)
{
- /* BEGIN */
- case 'B':
+ case LOGICAL_REP_MSG_BEGIN:
apply_handle_begin(s);
- break;
- /* COMMIT */
- case 'C':
+ return;
+
+ case LOGICAL_REP_MSG_COMMIT:
apply_handle_commit(s);
- break;
- /* INSERT */
- case 'I':
+ return;
+
+ case LOGICAL_REP_MSG_INSERT:
apply_handle_insert(s);
- break;
- /* UPDATE */
- case 'U':
+ return;
+
+ case LOGICAL_REP_MSG_UPDATE:
apply_handle_update(s);
- break;
- /* DELETE */
- case 'D':
+ return;
+
+ case LOGICAL_REP_MSG_DELETE:
apply_handle_delete(s);
- break;
- /* TRUNCATE */
- case 'T':
+ return;
+
+ case LOGICAL_REP_MSG_TRUNCATE:
apply_handle_truncate(s);
- break;
- /* RELATION */
- case 'R':
+ return;
+
+ case LOGICAL_REP_MSG_RELATION:
apply_handle_relation(s);
- break;
- /* TYPE */
- case 'Y':
+ return;
+
+ case LOGICAL_REP_MSG_TYPE:
apply_handle_type(s);
- break;
- /* ORIGIN */
- case 'O':
+ return;
+
+ case LOGICAL_REP_MSG_ORIGIN:
apply_handle_origin(s);
- break;
- /* STREAM START */
- case 'S':
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
- break;
- /* STREAM END */
- case 'E':
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_END:
apply_handle_stream_stop(s);
- break;
- /* STREAM ABORT */
- case 'A':
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_ABORT:
apply_handle_stream_abort(s);
- break;
- /* STREAM COMMIT */
- case 'c':
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s);
- break;
- default:
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid logical replication message type \"%c\"", action)));
+ return;
}
+
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid logical replication message type \"%c\"", action)));
}
/*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0c2cda264e1..cca13dae964 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -34,6 +34,33 @@
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
/*
+ * Logical message types
+ *
+ * Used by logical replication wire protocol.
+ *
+ * Note: though this is an enum, the values are used to identify message types
+ * in logical replication protocol, which uses a single byte to identify a
+ * message type. Hence the values should be single byte wide and preferrably
+ * human readable characters.
+ */
+typedef enum LogicalRepMsgType
+{
+ LOGICAL_REP_MSG_BEGIN = 'B',
+ LOGICAL_REP_MSG_COMMIT = 'C',
+ LOGICAL_REP_MSG_ORIGIN = 'O',
+ LOGICAL_REP_MSG_INSERT = 'I',
+ LOGICAL_REP_MSG_UPDATE = 'U',
+ LOGICAL_REP_MSG_DELETE = 'D',
+ LOGICAL_REP_MSG_TRUNCATE = 'T',
+ LOGICAL_REP_MSG_RELATION = 'R',
+ LOGICAL_REP_MSG_TYPE = 'Y',
+ LOGICAL_REP_MSG_STREAM_START = 'S',
+ LOGICAL_REP_MSG_STREAM_END = 'E',
+ LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
+ LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+} LogicalRepMsgType;
+
+/*
* This struct stores a tuple received via logical replication.
* Keep in mind that the columns correspond to the *remote* table.
*/