aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/parallel.c18
-rw-r--r--src/backend/utils/activity/backend_progress.c32
-rw-r--r--src/include/utils/backend_progress.h1
3 files changed, 51 insertions, 0 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 2b8bc2f58dd..2bd04bd1773 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -24,6 +24,7 @@
#include "catalog/pg_enum.h"
#include "catalog/storage.h"
#include "commands/async.h"
+#include "commands/progress.h"
#include "commands/vacuum.h"
#include "executor/execParallel.h"
#include "libpq/libpq.h"
@@ -1199,6 +1200,23 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}
+ case 'P': /* Parallel progress reporting */
+ {
+ /*
+ * Only incremental progress reporting is currently supported.
+ * However, it's possible to add more fields to the message to
+ * allow for handling of other backend progress APIs.
+ */
+ int index = pq_getmsgint(msg, 4);
+ int64 incr = pq_getmsgint64(msg);
+
+ pq_getmsgend(msg);
+
+ pgstat_progress_incr_param(index, incr);
+
+ break;
+ }
+
case 'X': /* Terminate, indicating clean exit */
{
shm_mq_detach(pcxt->worker[i].error_mqh);
diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index fb48eafef9a..67447ef03ab 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -10,6 +10,8 @@
*/
#include "postgres.h"
+#include "access/parallel.h"
+#include "libpq/pqformat.h"
#include "port/atomics.h" /* for memory barriers */
#include "utils/backend_progress.h"
#include "utils/backend_status.h"
@@ -80,6 +82,36 @@ pgstat_progress_incr_param(int index, int64 incr)
}
/*-----------
+ * pgstat_progress_parallel_incr_param() -
+ *
+ * A variant of pgstat_progress_incr_param to allow a worker to poke at
+ * a leader to do an incremental progress update.
+ *-----------
+ */
+void
+pgstat_progress_parallel_incr_param(int index, int64 incr)
+{
+ /*
+ * Parallel workers notify a leader through a 'P' protocol message to
+ * update progress, passing the progress index and incremented value.
+ * Leaders can just call pgstat_progress_incr_param directly.
+ */
+ if (IsParallelWorker())
+ {
+ static StringInfoData progress_message;
+
+ initStringInfo(&progress_message);
+
+ pq_beginmessage(&progress_message, 'P');
+ pq_sendint32(&progress_message, index);
+ pq_sendint64(&progress_message, incr);
+ pq_endmessage(&progress_message);
+ }
+ else
+ pgstat_progress_incr_param(index, incr);
+}
+
+/*-----------
* pgstat_progress_update_multi_param() -
*
* Update multiple members in st_progress_param[] of own backend entry.
diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h
index a84752ade99..70dea55fc00 100644
--- a/src/include/utils/backend_progress.h
+++ b/src/include/utils/backend_progress.h
@@ -37,6 +37,7 @@ extern void pgstat_progress_start_command(ProgressCommandType cmdtype,
Oid relid);
extern void pgstat_progress_update_param(int index, int64 val);
extern void pgstat_progress_incr_param(int index, int64 incr);
+extern void pgstat_progress_parallel_incr_param(int index, int64 incr);
extern void pgstat_progress_update_multi_param(int nparam, const int *index,
const int64 *val);
extern void pgstat_progress_end_command(void);