diff options
author | Bruce Momjian <bruce@momjian.us> | 2015-05-23 21:35:49 -0400 |
---|---|---|
committer | Bruce Momjian <bruce@momjian.us> | 2015-05-23 21:35:49 -0400 |
commit | 807b9e0dff663c5da875af7907a5106c0ff90673 (patch) | |
tree | 89a0cfbd3c9801dcb04aae4ccf2fee935092f958 /src/backend/access/transam/parallel.c | |
parent | 225892552bd3052982d2b97b749e5945ea71facc (diff) | |
download | postgresql-807b9e0dff663c5da875af7907a5106c0ff90673.tar.gz postgresql-807b9e0dff663c5da875af7907a5106c0ff90673.zip |
pgindent run for 9.5
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 188 |
1 files changed, 95 insertions, 93 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 8d6a3606794..f4ba8518b12 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -39,7 +39,7 @@ * without blocking. That way, a worker that errors out can write the whole * message into the queue and terminate without waiting for the user backend. */ -#define PARALLEL_ERROR_QUEUE_SIZE 16384 +#define PARALLEL_ERROR_QUEUE_SIZE 16384 /* Magic number for parallel context TOC. */ #define PARALLEL_MAGIC 0x50477c7c @@ -71,7 +71,7 @@ typedef struct FixedParallelState BackendId parallel_master_backend_id; /* Entrypoint for parallel workers. */ - parallel_worker_main_type entrypoint; + parallel_worker_main_type entrypoint; /* Mutex protects remaining fields. */ slock_t mutex; @@ -90,10 +90,10 @@ typedef struct FixedParallelState * and < the number of workers before any user code is invoked; each parallel * worker will get a different parallel worker number. */ -int ParallelWorkerNumber = -1; +int ParallelWorkerNumber = -1; /* Is there a parallel message pending which we need to receive? */ -bool ParallelMessagePending = false; +bool ParallelMessagePending = false; /* Pointer to our fixed parallel state. */ static FixedParallelState *MyFixedParallelState; @@ -115,8 +115,8 @@ static void ParallelWorkerMain(Datum main_arg); ParallelContext * CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) { - MemoryContext oldcontext; - ParallelContext *pcxt; + MemoryContext oldcontext; + ParallelContext *pcxt; /* It is unsafe to create a parallel context if not in parallel mode. */ Assert(IsInParallelMode()); @@ -159,7 +159,7 @@ CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers) { - MemoryContext oldcontext; + MemoryContext oldcontext; ParallelContext *pcxt; /* We might be running in a very short-lived memory context. */ @@ -184,15 +184,15 @@ CreateParallelContextForExternalFunction(char *library_name, void InitializeParallelDSM(ParallelContext *pcxt) { - MemoryContext oldcontext; - Size library_len = 0; - Size guc_len = 0; - Size combocidlen = 0; - Size tsnaplen = 0; - Size asnaplen = 0; - Size tstatelen = 0; - Size segsize = 0; - int i; + MemoryContext oldcontext; + Size library_len = 0; + Size guc_len = 0; + Size combocidlen = 0; + Size tsnaplen = 0; + Size asnaplen = 0; + Size tstatelen = 0; + Size segsize = 0; + int i; FixedParallelState *fps; Snapshot transaction_snapshot = GetTransactionSnapshot(); Snapshot active_snapshot = GetActiveSnapshot(); @@ -205,8 +205,8 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_toc_estimate_keys(&pcxt->estimator, 1); /* - * Normally, the user will have requested at least one worker process, - * but if by chance they have not, we can skip a bunch of things here. + * Normally, the user will have requested at least one worker process, but + * if by chance they have not, we can skip a bunch of things here. */ if (pcxt->nworkers > 0) { @@ -228,8 +228,8 @@ InitializeParallelDSM(ParallelContext *pcxt) /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == - PARALLEL_ERROR_QUEUE_SIZE, - "parallel error queue size not buffer-aligned"); + PARALLEL_ERROR_QUEUE_SIZE, + "parallel error queue size not buffer-aligned"); shm_toc_estimate_chunk(&pcxt->estimator, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); shm_toc_estimate_keys(&pcxt->estimator, 1); @@ -251,9 +251,9 @@ InitializeParallelDSM(ParallelContext *pcxt) * memory segment; instead, just use backend-private memory. * * Also, if we can't create a dynamic shared memory segment because the - * maximum number of segments have already been created, then fall back - * to backend-private memory, and plan not to use any workers. We hope - * this won't happen very often, but it's better to abandon the use of + * maximum number of segments have already been created, then fall back to + * backend-private memory, and plan not to use any workers. We hope this + * won't happen very often, but it's better to abandon the use of * parallelism than to fail outright. */ segsize = shm_toc_estimate(&pcxt->estimator); @@ -290,13 +290,13 @@ InitializeParallelDSM(ParallelContext *pcxt) /* We can skip the rest of this if we're not budgeting for any workers. */ if (pcxt->nworkers > 0) { - char *libraryspace; - char *gucspace; - char *combocidspace; - char *tsnapspace; - char *asnapspace; - char *tstatespace; - char *error_queue_space; + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + char *error_queue_space; /* Serialize shared libraries we have loaded. */ libraryspace = shm_toc_allocate(pcxt->toc, library_len); @@ -338,12 +338,12 @@ InitializeParallelDSM(ParallelContext *pcxt) * should be transmitted via separate (possibly larger?) queues. */ error_queue_space = - shm_toc_allocate(pcxt->toc, - PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + shm_toc_allocate(pcxt->toc, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); for (i = 0; i < pcxt->nworkers; ++i) { - char *start; - shm_mq *mq; + char *start; + shm_mq *mq; start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); @@ -355,8 +355,8 @@ InitializeParallelDSM(ParallelContext *pcxt) /* Serialize extension entrypoint information. */ if (pcxt->library_name != NULL) { - Size lnamelen = strlen(pcxt->library_name); - char *extensionstate; + Size lnamelen = strlen(pcxt->library_name); + char *extensionstate; extensionstate = shm_toc_allocate(pcxt->toc, lnamelen + strlen(pcxt->function_name) + 2); @@ -377,10 +377,10 @@ InitializeParallelDSM(ParallelContext *pcxt) void LaunchParallelWorkers(ParallelContext *pcxt) { - MemoryContext oldcontext; - BackgroundWorker worker; - int i; - bool any_registrations_failed = false; + MemoryContext oldcontext; + BackgroundWorker worker; + int i; + bool any_registrations_failed = false; /* Skip this if we have no workers. */ if (pcxt->nworkers == 0) @@ -408,8 +408,8 @@ LaunchParallelWorkers(ParallelContext *pcxt) * * The caller must be able to tolerate ending up with fewer workers than * expected, so there is no need to throw an error here if registration - * fails. It wouldn't help much anyway, because registering the worker - * in no way guarantees that it will start up and initialize successfully. + * fails. It wouldn't help much anyway, because registering the worker in + * no way guarantees that it will start up and initialize successfully. */ for (i = 0; i < pcxt->nworkers; ++i) { @@ -421,8 +421,8 @@ LaunchParallelWorkers(ParallelContext *pcxt) else { /* - * If we weren't able to register the worker, then we've bumped - * up against the max_worker_processes limit, and future + * If we weren't able to register the worker, then we've bumped up + * against the max_worker_processes limit, and future * registrations will probably fail too, so arrange to skip them. * But we still have to execute this code for the remaining slots * to make sure that we forget about the error queues we budgeted @@ -455,13 +455,13 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) { for (;;) { - bool anyone_alive = false; - int i; + bool anyone_alive = false; + int i; /* - * This will process any parallel messages that are pending, which - * may change the outcome of the loop that follows. It may also - * throw an error propagated from a worker. + * This will process any parallel messages that are pending, which may + * change the outcome of the loop that follows. It may also throw an + * error propagated from a worker. */ CHECK_FOR_INTERRUPTS(); @@ -502,7 +502,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) void DestroyParallelContext(ParallelContext *pcxt) { - int i; + int i; /* * Be careful about order of operations here! We remove the parallel @@ -548,7 +548,7 @@ DestroyParallelContext(ParallelContext *pcxt) /* Wait until the workers actually die. */ for (i = 0; i < pcxt->nworkers; ++i) { - BgwHandleStatus status; + BgwHandleStatus status; if (pcxt->worker[i].bgwhandle == NULL) continue; @@ -626,9 +626,9 @@ HandleParallelMessages(void) dlist_foreach(iter, &pcxt_list) { ParallelContext *pcxt; - int i; - Size nbytes; - void *data; + int i; + Size nbytes; + void *data; pcxt = dlist_container(ParallelContext, node, iter.cur); if (pcxt->worker == NULL) @@ -637,14 +637,14 @@ HandleParallelMessages(void) for (i = 0; i < pcxt->nworkers; ++i) { /* - * Read as many messages as we can from each worker, but stop - * when either (1) the error queue goes away, which can happen if - * we receive a Terminate message from the worker; or (2) no more + * Read as many messages as we can from each worker, but stop when + * either (1) the error queue goes away, which can happen if we + * receive a Terminate message from the worker; or (2) no more * messages can be read from the worker without blocking. */ while (pcxt->worker[i].error_mqh != NULL) { - shm_mq_result res; + shm_mq_result res; res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, &data, true); @@ -652,7 +652,7 @@ HandleParallelMessages(void) break; else if (res == SHM_MQ_SUCCESS) { - StringInfoData msg; + StringInfoData msg; initStringInfo(&msg); appendBinaryStringInfo(&msg, data, nbytes); @@ -661,7 +661,7 @@ HandleParallelMessages(void) } else ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ + (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ errmsg("lost connection to parallel worker"))); /* This might make the error queue go away. */ @@ -677,23 +677,24 @@ HandleParallelMessages(void) static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) { - char msgtype; + char msgtype; msgtype = pq_getmsgbyte(msg); switch (msgtype) { - case 'K': /* BackendKeyData */ + case 'K': /* BackendKeyData */ { - int32 pid = pq_getmsgint(msg, 4); + int32 pid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 4); /* discard cancel key */ (void) pq_getmsgend(msg); pcxt->worker[i].pid = pid; break; } - case 'E': /* ErrorResponse */ - case 'N': /* NoticeResponse */ + case 'E': /* ErrorResponse */ + case 'N': /* NoticeResponse */ { ErrorData edata; ErrorContextCallback errctx; @@ -725,14 +726,14 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) break; } - case 'A': /* NotifyResponse */ + case 'A': /* NotifyResponse */ { /* Propagate NotifyResponse. */ pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); break; } - case 'X': /* Terminate, indicating clean exit */ + case 'X': /* Terminate, indicating clean exit */ { pfree(pcxt->worker[i].bgwhandle); pfree(pcxt->worker[i].error_mqh); @@ -797,18 +798,18 @@ static void ParallelWorkerMain(Datum main_arg) { dsm_segment *seg; - shm_toc *toc; + shm_toc *toc; FixedParallelState *fps; - char *error_queue_space; - shm_mq *mq; + char *error_queue_space; + shm_mq *mq; shm_mq_handle *mqh; - char *libraryspace; - char *gucspace; - char *combocidspace; - char *tsnapspace; - char *asnapspace; - char *tstatespace; - StringInfoData msgbuf; + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + StringInfoData msgbuf; /* Establish signal handlers. */ pqsignal(SIGTERM, die); @@ -824,8 +825,8 @@ ParallelWorkerMain(Datum main_arg) ALLOCSET_DEFAULT_MAXSIZE); /* - * Now that we have a resource owner, we can attach to the dynamic - * shared memory segment and read the table of contents. + * Now that we have a resource owner, we can attach to the dynamic shared + * memory segment and read the table of contents. */ seg = dsm_attach(DatumGetUInt32(main_arg)); if (seg == NULL) @@ -836,7 +837,7 @@ ParallelWorkerMain(Datum main_arg) if (toc == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("bad magic number in dynamic shared memory segment"))); + errmsg("bad magic number in dynamic shared memory segment"))); /* Determine and set our worker number. */ fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); @@ -860,7 +861,7 @@ ParallelWorkerMain(Datum main_arg) */ error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); mq = (shm_mq *) (error_queue_space + - ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); shm_mq_set_sender(mq, MyProc); mqh = shm_mq_attach(mq, seg, NULL); pq_redirect_to_shm_mq(mq, mqh); @@ -870,9 +871,9 @@ ParallelWorkerMain(Datum main_arg) /* * Send a BackendKeyData message to the process that initiated parallelism * so that it has access to our PID before it receives any other messages - * from us. Our cancel key is sent, too, since that's the way the protocol - * message is defined, but it won't actually be used for anything in this - * case. + * from us. Our cancel key is sent, too, since that's the way the + * protocol message is defined, but it won't actually be used for anything + * in this case. */ pq_beginmessage(&msgbuf, 'K'); pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32)); @@ -880,13 +881,13 @@ ParallelWorkerMain(Datum main_arg) pq_endmessage(&msgbuf); /* - * Hooray! Primary initialization is complete. Now, we need to set up - * our backend-local state to match the original backend. + * Hooray! Primary initialization is complete. Now, we need to set up our + * backend-local state to match the original backend. */ /* - * Load libraries that were loaded by original backend. We want to do this - * before restoring GUCs, because the libraries might define custom + * Load libraries that were loaded by original backend. We want to do + * this before restoring GUCs, because the libraries might define custom * variables. */ libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); @@ -928,7 +929,8 @@ ParallelWorkerMain(Datum main_arg) SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); /* - * We've initialized all of our state now; nothing should change hereafter. + * We've initialized all of our state now; nothing should change + * hereafter. */ EnterParallelMode(); @@ -965,9 +967,9 @@ ParallelWorkerMain(Datum main_arg) static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) { - char *extensionstate; - char *library_name; - char *function_name; + char *extensionstate; + char *library_name; + char *function_name; parallel_worker_main_type entrypt; extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); @@ -988,7 +990,7 @@ ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) static void ParallelErrorContext(void *arg) { - errcontext("parallel worker, pid %d", * (int32 *) arg); + errcontext("parallel worker, pid %d", *(int32 *) arg); } /* |