aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/storage/ipc/shm_mq.c126
-rw-r--r--src/include/storage/shm_mq.h14
2 files changed, 124 insertions, 16 deletions
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index d96627a774e..90df5930e1a 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -139,7 +139,7 @@ struct shm_mq_handle
};
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
- void *data, bool nowait, Size *bytes_written);
+ const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
bool nowait, Size *nbytesp, void **datap);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
@@ -301,7 +301,33 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
}
/*
+ * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
+ * been passed to shm_mq_attach.
+ */
+void
+shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
+{
+ Assert(mqh->mqh_handle == NULL);
+ mqh->mqh_handle = handle;
+}
+
+/*
* Write a message into a shared message queue.
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+{
+ shm_mq_iovec iov;
+
+ iov.data = data;
+ iov.len = nbytes;
+
+ return shm_mq_sendv(mqh, &iov, 1, nowait);
+}
+
+/*
+ * Write a message into a shared message queue, gathered from multiple
+ * addresses.
*
* When nowait = false, we'll wait on our process latch when the ring buffer
* fills up, and then continue writing once the receiver has drained some data.
@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
* the length or payload will corrupt the queue.)
*/
shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
{
shm_mq_result res;
shm_mq *mq = mqh->mqh_queue;
+ Size nbytes = 0;
Size bytes_written;
+ int i;
+ int which_iov = 0;
+ Size offset;
Assert(mq->mq_sender == MyProc);
+ /* Compute total size of write. */
+ for (i = 0; i < iovcnt; ++i)
+ nbytes += iov[i].len;
+
/* Try to write, or finish writing, the length word into the buffer. */
while (!mqh->mqh_length_word_complete)
{
@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
/* Write the actual data bytes into the buffer. */
Assert(mqh->mqh_partial_bytes <= nbytes);
- res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
- ((char *) data) + mqh->mqh_partial_bytes,
- nowait, &bytes_written);
- if (res == SHM_MQ_WOULD_BLOCK)
- mqh->mqh_partial_bytes += bytes_written;
- else
+ offset = mqh->mqh_partial_bytes;
+ do
{
- mqh->mqh_partial_bytes = 0;
- mqh->mqh_length_word_complete = false;
- }
- if (res != SHM_MQ_SUCCESS)
- return res;
+ Size chunksize;
+
+ /* Figure out which bytes need to be sent next. */
+ if (offset >= iov[which_iov].len)
+ {
+ offset -= iov[which_iov].len;
+ ++which_iov;
+ if (which_iov >= iovcnt)
+ break;
+ continue;
+ }
+
+ /*
+ * We want to avoid copying the data if at all possible, but every
+ * chunk of bytes we write into the queue has to be MAXALIGN'd,
+ * except the last. Thus, if a chunk other than the last one ends
+ * on a non-MAXALIGN'd boundary, we have to combine the tail end of
+ * its data with data from one or more following chunks until we
+ * either reach the last chunk or accumulate a number of bytes which
+ * is MAXALIGN'd.
+ */
+ if (which_iov + 1 < iovcnt &&
+ offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
+ {
+ char tmpbuf[MAXIMUM_ALIGNOF];
+ int j = 0;
+
+ for (;;)
+ {
+ if (offset < iov[which_iov].len)
+ {
+ tmpbuf[j] = iov[which_iov].data[offset];
+ j++;
+ offset++;
+ if (j == MAXIMUM_ALIGNOF)
+ break;
+ }
+ else
+ {
+ offset -= iov[which_iov].len;
+ which_iov++;
+ if (which_iov >= iovcnt)
+ break;
+ }
+ }
+ res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+ mqh->mqh_partial_bytes += bytes_written;
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+ continue;
+ }
+
+ /*
+ * If this is the last chunk, we can write all the data, even if it
+ * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
+ * MAXALIGN_DOWN the write size.
+ */
+ chunksize = iov[which_iov].len - offset;
+ if (which_iov + 1 < iovcnt)
+ chunksize = MAXALIGN_DOWN(chunksize);
+ res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
+ nowait, &bytes_written);
+ mqh->mqh_partial_bytes += bytes_written;
+ offset += bytes_written;
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+ } while (mqh->mqh_partial_bytes < nbytes);
+
+ /* Reset for next message. */
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = false;
/* Notify receiver of the newly-written data, and return. */
return shm_mq_notify_receiver(mq);
@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq)
* Write bytes into a shared message queue.
*/
static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
- Size *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
+ bool nowait, Size *bytes_written)
{
shm_mq *mq = mqh->mqh_queue;
Size sent = 0;
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 5bae3807afb..063400ae286 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq;
struct shm_mq_handle;
typedef struct shm_mq_handle shm_mq_handle;
+/* Descriptors for a single write spanning multiple locations. */
+typedef struct
+{
+ const char *data;
+ Size len;
+} shm_mq_iovec;
+
/* Possible results of a send or receive operation. */
typedef enum
{
@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *);
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
BackgroundWorkerHandle *handle);
+/* Associate worker handle with shm_mq. */
+extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
+
/* Break connection. */
extern void shm_mq_detach(shm_mq *);
/* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
- Size nbytes, void *data, bool nowait);
+ Size nbytes, const void *data, bool nowait);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
+ shm_mq_iovec *iov, int iovcnt, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
Size *nbytesp, void **datap, bool nowait);