aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-10-16 09:53:34 -0400
committerRobert Haas <rhaas@postgresql.org>2015-10-16 09:58:42 -0400
commit423ec0877ff29bc78e48e50362dbcde351a1f025 (patch)
tree2aaf036a548239f86e7d95d486cd99496d4b6524
parent2ad5c27bb565c26a4b12ea3343331c80f121f269 (diff)
downloadpostgresql-423ec0877ff29bc78e48e50362dbcde351a1f025.tar.gz
postgresql-423ec0877ff29bc78e48e50362dbcde351a1f025.zip
Transfer current command counter ID to parallel workers.
Commit 924bcf4f16d54c55310b28f77686608684734f42 correctly forbade parallel workers to modify the command counter while in parallel mode, but it inexplicably neglected to actually transfer the current command counter from leader to workers. This can result in the workers seeing a different set of tuples from the leader, which is bad. Repair.
-rw-r--r--src/backend/access/transam/xact.c46
1 files changed, 25 insertions, 21 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e8aafbac508..3e24800fbd1 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -4786,8 +4786,8 @@ Size
EstimateTransactionStateSpace(void)
{
TransactionState s;
- Size nxids = 5; /* iso level, deferrable, top & current XID,
- * XID count */
+ Size nxids = 6; /* iso level, deferrable, top & current XID,
+ * command counter, XID count */
for (s = CurrentTransactionState; s != NULL; s = s->parent)
{
@@ -4807,12 +4807,13 @@ EstimateTransactionStateSpace(void)
*
* We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs
* associated with this transaction. The first eight bytes of the result
- * contain XactDeferrable and XactIsoLevel; the next eight bytes contain the
- * XID of the top-level transaction and the XID of the current transaction
- * (or, in each case, InvalidTransactionId if none). After that, the next 4
- * bytes contain a count of how many additional XIDs follow; this is followed
- * by all of those XIDs one after another. We emit the XIDs in sorted order
- * for the convenience of the receiving process.
+ * contain XactDeferrable and XactIsoLevel; the next twelve bytes contain the
+ * XID of the top-level transaction, the XID of the current transaction
+ * (or, in each case, InvalidTransactionId if none), and the current command
+ * counter. After that, the next 4 bytes contain a count of how many
+ * additional XIDs follow; this is followed by all of those XIDs one after
+ * another. We emit the XIDs in sorted order for the convenience of the
+ * receiving process.
*/
void
SerializeTransactionState(Size maxsize, char *start_address)
@@ -4820,14 +4821,16 @@ SerializeTransactionState(Size maxsize, char *start_address)
TransactionState s;
Size nxids = 0;
Size i = 0;
+ Size c = 0;
TransactionId *workspace;
TransactionId *result = (TransactionId *) start_address;
- Assert(maxsize >= 5 * sizeof(TransactionId));
- result[0] = (TransactionId) XactIsoLevel;
- result[1] = (TransactionId) XactDeferrable;
- result[2] = XactTopTransactionId;
- result[3] = CurrentTransactionState->transactionId;
+ result[c++] = (TransactionId) XactIsoLevel;
+ result[c++] = (TransactionId) XactDeferrable;
+ result[c++] = XactTopTransactionId;
+ result[c++] = CurrentTransactionState->transactionId;
+ result[c++] = (TransactionId) currentCommandId;
+ Assert(maxsize >= c * sizeof(TransactionId));
/*
* If we're running in a parallel worker and launching a parallel worker
@@ -4836,9 +4839,9 @@ SerializeTransactionState(Size maxsize, char *start_address)
*/
if (nParallelCurrentXids > 0)
{
- Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId));
- result[4] = nParallelCurrentXids;
- memcpy(&result[5], ParallelCurrentXids,
+ result[c++] = nParallelCurrentXids;
+ Assert(maxsize >= (nParallelCurrentXids + c) * sizeof(TransactionId));
+ memcpy(&result[c], ParallelCurrentXids,
nParallelCurrentXids * sizeof(TransactionId));
return;
}
@@ -4853,7 +4856,7 @@ SerializeTransactionState(Size maxsize, char *start_address)
nxids = add_size(nxids, 1);
nxids = add_size(nxids, s->nChildXids);
}
- Assert(nxids * sizeof(TransactionId) < maxsize);
+ Assert((c + 1 + nxids) * sizeof(TransactionId) <= maxsize);
/* Copy them to our scratch space. */
workspace = palloc(nxids * sizeof(TransactionId));
@@ -4871,8 +4874,8 @@ SerializeTransactionState(Size maxsize, char *start_address)
qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
/* Copy data into output area. */
- result[4] = (TransactionId) nxids;
- memcpy(&result[5], workspace, nxids * sizeof(TransactionId));
+ result[c++] = (TransactionId) nxids;
+ memcpy(&result[c], workspace, nxids * sizeof(TransactionId));
}
/*
@@ -4892,8 +4895,9 @@ StartParallelWorkerTransaction(char *tstatespace)
XactDeferrable = (bool) tstate[1];
XactTopTransactionId = tstate[2];
CurrentTransactionState->transactionId = tstate[3];
- nParallelCurrentXids = (int) tstate[4];
- ParallelCurrentXids = &tstate[5];
+ currentCommandId = tstate[4];
+ nParallelCurrentXids = (int) tstate[5];
+ ParallelCurrentXids = &tstate[6];
CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
}