aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc/dsm.c
blob: 6df402f248637cb67f736b8236b15e0edda97420 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
/*-------------------------------------------------------------------------
 *
 * dsm.c
 *	  manage dynamic shared memory segments
 *
 * This file provides a set of services to make programming with dynamic
 * shared memory segments more convenient.  Unlike the low-level
 * facilities provided by dsm_impl.h and dsm_impl.c, mappings and segments
 * created using this module will be cleaned up automatically.  Mappings
 * will be removed when the resource owner under which they were created
 * is cleaned up, unless dsm_keep_mapping() is used, in which case they
 * have session lifespan.  Segments will be removed when there are no
 * remaining mappings, or at postmaster shutdown in any case.  After a
 * hard postmaster crash, remaining segments will be removed, if they
 * still exist, at the next postmaster startup.
 *
 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/backend/storage/ipc/dsm.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#ifndef WIN32
#include <sys/mman.h>
#endif
#include <sys/stat.h>

#include "lib/ilist.h"
#include "miscadmin.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/resowner_private.h"

#define PG_DYNSHMEM_STATE_FILE			PG_DYNSHMEM_DIR "/state"
#define PG_DYNSHMEM_NEW_STATE_FILE		PG_DYNSHMEM_DIR "/state.new"
#define PG_DYNSHMEM_STATE_BUFSIZ		512
#define PG_DYNSHMEM_CONTROL_MAGIC		0x9a503d32

/*
 * There's no point in getting too cheap here, because the minimum allocation
 * is one OS page, which is probably at least 4KB and could easily be as high
 * as 64KB.  Each currently sizeof(dsm_control_item), currently 8 bytes.
 */
#define PG_DYNSHMEM_FIXED_SLOTS			64
#define PG_DYNSHMEM_SLOTS_PER_BACKEND	2

#define INVALID_CONTROL_SLOT		((uint32) -1)

/* Backend-local state for a dynamic shared memory segment. */
struct dsm_segment
{
	dlist_node	node;				/* List link in dsm_segment_list. */
	ResourceOwner resowner;			/* Resource owner. */
	dsm_handle	handle;				/* Segment name. */
	uint32		control_slot;		/* Slot in control segment. */
	void       *impl_private;		/* Implementation-specific private data. */
	void	   *mapped_address;		/* Mapping address, or NULL if unmapped. */
	Size		mapped_size;		/* Size of our mapping. */
};

/* Shared-memory state for a dynamic shared memory segment. */
typedef struct dsm_control_item
{
	dsm_handle	handle;
	uint32		refcnt;				/* 2+ = active, 1 = moribund, 0 = gone */
} dsm_control_item;

/* Layout of the dynamic shared memory control segment. */
typedef struct dsm_control_header
{
	uint32		magic;
	uint32		nitems;
	uint32		maxitems;
	dsm_control_item	item[FLEXIBLE_ARRAY_MEMBER];
} dsm_control_header;

static void dsm_cleanup_using_control_segment(void);
static void dsm_cleanup_for_mmap(void);
static bool dsm_read_state_file(dsm_handle *h);
static void dsm_write_state_file(dsm_handle h);
static void dsm_postmaster_shutdown(int code, Datum arg);
static void dsm_backend_shutdown(int code, Datum arg);
static dsm_segment *dsm_create_descriptor(void);
static bool dsm_control_segment_sane(dsm_control_header *control,
						 Size mapped_size);
static uint64 dsm_control_bytes_needed(uint32 nitems);

/* Has this backend initialized the dynamic shared memory system yet? */
static bool dsm_init_done = false;

/*
 * List of dynamic shared memory segments used by this backend.
 *
 * At process exit time, we must decrement the reference count of each
 * segment we have attached; this list makes it possible to find all such
 * segments.
 *
 * This list should always be empty in the postmaster.  We could probably
 * allow the postmaster to map dynamic shared memory segments before it
 * begins to start child processes, provided that each process adjusted
 * the reference counts for those segments in the control segment at
 * startup time, but there's no obvious need for such a facility, which
 * would also be complex to handle in the EXEC_BACKEND case.  Once the
 * postmaster has begun spawning children, there's an additional problem:
 * each new mapping would require an update to the control segment,
 * which requires locking, in which the postmaster must not be involved.
 */
static dlist_head dsm_segment_list = DLIST_STATIC_INIT(dsm_segment_list);

/*
 * Control segment information.
 *
 * Unlike ordinary shared memory segments, the control segment is not
 * reference counted; instead, it lasts for the postmaster's entire
 * life cycle.  For simplicity, it doesn't have a dsm_segment object either.
 */
static dsm_handle dsm_control_handle;
static dsm_control_header *dsm_control;
static Size dsm_control_mapped_size = 0;
static void	*dsm_control_impl_private = NULL;

/*
 * Start up the dynamic shared memory system.
 *
 * This is called just once during each cluster lifetime, at postmaster
 * startup time.
 */
void
dsm_postmaster_startup(void)
{
	void	   *dsm_control_address = NULL;
	uint32		maxitems;
	Size		segsize;

	Assert(!IsUnderPostmaster);

	/* If dynamic shared memory is disabled, there's nothing to do. */
	if (dynamic_shared_memory_type == DSM_IMPL_NONE)
		return;

	/*
	 * Check for, and remove, shared memory segments left behind by a dead
	 * postmaster.  This isn't necessary on Windows, which always removes them
	 * when the last reference is gone.
	 */
	switch (dynamic_shared_memory_type)
	{
		case DSM_IMPL_POSIX:
		case DSM_IMPL_SYSV:
			dsm_cleanup_using_control_segment();
			break;
		case DSM_IMPL_MMAP:
			dsm_cleanup_for_mmap();
			break;
		case DSM_IMPL_WINDOWS:
			/* Nothing to do. */
			break;
		default:
			elog(ERROR, "unknown dynamic shared memory type: %d",
				 dynamic_shared_memory_type);
	}

	/* Determine size for new control segment. */
	maxitems = PG_DYNSHMEM_FIXED_SLOTS
		+ PG_DYNSHMEM_SLOTS_PER_BACKEND * MaxBackends;
	elog(DEBUG2, "dynamic shared memory system will support %u segments",
		maxitems);
	segsize = dsm_control_bytes_needed(maxitems);

	/* Loop until we find an unused identifier for the new control segment. */
	for (;;)
	{
		Assert(dsm_control_address == NULL);
		Assert(dsm_control_mapped_size == 0);
		dsm_control_handle = random();
		if (dsm_impl_op(DSM_OP_CREATE, dsm_control_handle, segsize,
						&dsm_control_impl_private, &dsm_control_address,
						&dsm_control_mapped_size, ERROR))
			break;
	}
	dsm_control = dsm_control_address;
	on_shmem_exit(dsm_postmaster_shutdown, 0);
	elog(DEBUG2,
		 "created dynamic shared memory control segment %u (%lu bytes)",
		 dsm_control_handle, (unsigned long) segsize);
	dsm_write_state_file(dsm_control_handle);

	/* Initialize control segment. */
	dsm_control->magic = PG_DYNSHMEM_CONTROL_MAGIC;
	dsm_control->nitems = 0;
	dsm_control->maxitems = maxitems;
}

/*
 * Determine whether the control segment from the previous postmaster
 * invocation still exists.  If so, remove the dynamic shared memory
 * segments to which it refers, and then the control segment itself.
 */
static void
dsm_cleanup_using_control_segment(void)
{
	void	   *mapped_address = NULL;
	void	   *junk_mapped_address = NULL;
	void	   *impl_private = NULL;
	void	   *junk_impl_private = NULL;
	Size		mapped_size = 0;
	Size		junk_mapped_size = 0;
	uint32		nitems;
	uint32		i;
	dsm_handle	old_control_handle;
	dsm_control_header *old_control;

	/*
	 * Read the state file.  If it doesn't exist or is empty, there's nothing
	 * more to do.
	 */
	if (!dsm_read_state_file(&old_control_handle))
		return;

	/*
	 * Try to attach the segment.  If this fails, it probably just means that
	 * the operating system has been rebooted and the segment no longer exists,
	 * or an unrelated proces has used the same shm ID.  So just fall out
	 * quietly.
	 */
	if (!dsm_impl_op(DSM_OP_ATTACH, old_control_handle, 0, &impl_private,
					 &mapped_address, &mapped_size, DEBUG1))
		return;

	/*
	 * We've managed to reattach it, but the contents might not be sane.
	 * If they aren't, we disregard the segment after all.
	 */
	old_control = (dsm_control_header *) mapped_address;
	if (!dsm_control_segment_sane(old_control, mapped_size))
	{
		dsm_impl_op(DSM_OP_DETACH, old_control_handle, 0, &impl_private,
					&mapped_address, &mapped_size, LOG);
		return;
	}

	/*
	 * OK, the control segment looks basically valid, so we can get use
	 * it to get a list of segments that need to be removed.
	 */
	nitems = old_control->nitems;
	for (i = 0; i < nitems; ++i)
	{
		dsm_handle		handle;
		uint32			refcnt;

		/* If the reference count is 0, the slot is actually unused. */
		refcnt = old_control->item[i].refcnt;
		if (refcnt == 0)
			continue;

		/* Log debugging information. */
		handle = old_control->item[i].handle;
		elog(DEBUG2, "cleaning up orphaned dynamic shared memory with ID %u (reference count %u)",
			handle, refcnt);

		/* Destroy the referenced segment. */
		dsm_impl_op(DSM_OP_DESTROY, handle, 0, &junk_impl_private,
					&junk_mapped_address, &junk_mapped_size, LOG);
	}

	/* Destroy the old control segment, too. */
	elog(DEBUG2,
		 "cleaning up dynamic shared memory control segment with ID %u",
		 old_control_handle);
	dsm_impl_op(DSM_OP_DESTROY, old_control_handle, 0, &impl_private,
				&mapped_address, &mapped_size, LOG);
}

/*
 * When we're using the mmap shared memory implementation, "shared memory"
 * segments might even manage to survive an operating system reboot.
 * But there's no guarantee as to exactly what will survive: some segments
 * may survive, and others may not, and the contents of some may be out
 * of date.  In particular, the control segment may be out of date, so we
 * can't rely on it to figure out what to remove.  However, since we know
 * what directory contains the files we used as shared memory, we can simply
 * scan the directory and blow everything away that shouldn't be there.
 */
static void
dsm_cleanup_for_mmap(void)
{
	DIR	   *dir;
	struct dirent *dent;

	/* Open the directory; can't use AllocateDir in postmaster. */
	if ((dir = opendir(PG_DYNSHMEM_DIR)) == NULL)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not open directory \"%s\": %m",
					PG_DYNSHMEM_DIR)));

	/* Scan for something with a name of the correct format. */
	while ((dent = readdir(dir)) != NULL)
	{
		if (strncmp(dent->d_name, PG_DYNSHMEM_MMAP_FILE_PREFIX,
				strlen(PG_DYNSHMEM_MMAP_FILE_PREFIX)) == 0)
		{
			char buf[MAXPGPATH];
			snprintf(buf, MAXPGPATH, PG_DYNSHMEM_DIR "/%s", dent->d_name);

			elog(DEBUG2, "removing file \"%s\"", buf);

			/* We found a matching file; so remove it. */
			if (unlink(buf) != 0)
			{
				int		save_errno;

				save_errno = errno;
				closedir(dir);
				errno = save_errno;

				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not remove file \"%s\": %m", buf)));
			}
		}
	}

	/* Cleanup complete. */
	closedir(dir);
}

/*
 * Read and parse the state file.
 *
 * If the state file is empty or the contents are garbled, it probably means
 * that the operating system rebooted before the data written by the previous
 * postmaster made it to disk.  In that case, we can just ignore it; any shared
 * memory from before the reboot should be gone anyway.
 */
static bool
dsm_read_state_file(dsm_handle *h)
{
	int			statefd;
	char		statebuf[PG_DYNSHMEM_STATE_BUFSIZ];
	int			nbytes = 0;
	char	   *endptr,
			   *s;
	dsm_handle	handle;

	/* Read the state file to get the ID of the old control segment. */
	statefd = open(PG_DYNSHMEM_STATE_FILE, O_RDONLY | PG_BINARY, 0);
	if (statefd < 0)
	{
		if (errno == ENOENT)
			return false;
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not open file \"%s\": %m",
					PG_DYNSHMEM_STATE_FILE)));
	}
	nbytes = read(statefd, statebuf, PG_DYNSHMEM_STATE_BUFSIZ - 1);
	if (nbytes < 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not read file \"%s\": %m",
					PG_DYNSHMEM_STATE_FILE)));
	/* make sure buffer is NUL terminated */
	statebuf[nbytes] = '\0';
	close(statefd);

	/*
	 * We expect to find the handle of the old control segment here,
	 * on a line by itself.
	 */
	handle = strtoul(statebuf, &endptr, 10);
	for (s = endptr; *s == ' ' || *s == '\t'; ++s)
		;
	if (*s != '\n' && *s != '\0')
		return false;

	/* Looks good. */
	*h = handle;
	return true;
}

/*
 * Write our control segment handle to the state file, so that if the
 * postmaster is killed without running it's on_shmem_exit hooks, the
 * next postmaster can clean things up after restart.
 */
static void
dsm_write_state_file(dsm_handle h)
{
	int			statefd;
	char		statebuf[PG_DYNSHMEM_STATE_BUFSIZ];
	int			nbytes;

	/* Create or truncate the file. */
	statefd = open(PG_DYNSHMEM_NEW_STATE_FILE,
				   O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, 0600);
	if (statefd < 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not create file \"%s\": %m",
					PG_DYNSHMEM_NEW_STATE_FILE)));

	/* Write contents. */
	snprintf(statebuf, PG_DYNSHMEM_STATE_BUFSIZ, "%u\n", dsm_control_handle);
	nbytes = strlen(statebuf);
	if (write(statefd, statebuf, nbytes) != nbytes)
	{
		if (errno == 0)
			errno = ENOSPC;		/* if no error signalled, assume no space */
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not write file \"%s\": %m",
					PG_DYNSHMEM_NEW_STATE_FILE)));
	}

	/* Close file. */
	close(statefd);

	/*
	 * Atomically rename file into place, so that no one ever sees a partially
	 * written state file.
	 */
	if (rename(PG_DYNSHMEM_NEW_STATE_FILE, PG_DYNSHMEM_STATE_FILE) < 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not rename file \"%s\": %m",
					PG_DYNSHMEM_NEW_STATE_FILE)));
}

/*
 * At shutdown time, we iterate over the control segment and remove all
 * remaining dynamic shared memory segments.  We avoid throwing errors here;
 * the postmaster is shutting down either way, and this is just non-critical
 * resource cleanup.
 */
static void
dsm_postmaster_shutdown(int code, Datum arg)
{
	uint32		nitems;
	uint32		i;
	void	   *dsm_control_address;
	void	   *junk_mapped_address = NULL;
	void	   *junk_impl_private = NULL;
	Size		junk_mapped_size = 0;

	/*
	 * If some other backend exited uncleanly, it might have corrupted the
	 * control segment while it was dying.  In that case, we warn and ignore
	 * the contents of the control segment.  This may end up leaving behind
	 * stray shared memory segments, but there's not much we can do about
	 * that if the metadata is gone.
	 */
	nitems = dsm_control->nitems;
	if (!dsm_control_segment_sane(dsm_control, dsm_control_mapped_size))
	{
		ereport(LOG,
				(errmsg("dynamic shared memory control segment is corrupt")));
		return;
	}

	/* Remove any remaining segments. */
	for (i = 0; i < nitems; ++i)
	{
		dsm_handle	handle;

		/* If the reference count is 0, the slot is actually unused. */
		if (dsm_control->item[i].refcnt == 0)
			continue;

		/* Log debugging information. */
		handle = dsm_control->item[i].handle;
		elog(DEBUG2, "cleaning up orphaned dynamic shared memory with ID %u",
			handle);

		/* Destroy the segment. */
		dsm_impl_op(DSM_OP_DESTROY, handle, 0, &junk_impl_private,
					&junk_mapped_address, &junk_mapped_size, LOG);
	}

	/* Remove the control segment itself. */
	elog(DEBUG2,
		 "cleaning up dynamic shared memory control segment with ID %u",
		 dsm_control_handle);
	dsm_control_address = dsm_control;
	dsm_impl_op(DSM_OP_DESTROY, dsm_control_handle, 0,
				&dsm_control_impl_private, &dsm_control_address,
				&dsm_control_mapped_size, LOG);
	dsm_control = dsm_control_address;

	/* And, finally, remove the state file. */
	if (unlink(PG_DYNSHMEM_STATE_FILE) < 0)
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not unlink file \"%s\": %m",
					PG_DYNSHMEM_STATE_FILE)));
}

/*
 * Prepare this backend for dynamic shared memory usage.  Under EXEC_BACKEND,
 * we must reread the state file and map the control segment; in other cases,
 * we'll have inherited the postmaster's mapping and global variables.
 */
static void
dsm_backend_startup(void)
{
	/* If dynamic shared memory is disabled, reject this. */
	if (dynamic_shared_memory_type == DSM_IMPL_NONE)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("dynamic shared memory is disabled"),
				 errhint("Set dynamic_shared_memory_type to a value other than \"none\".")));

#ifdef EXEC_BACKEND
	{
		dsm_handle	control_handle;
		void	   *control_address = NULL;

		/* Read the control segment information from the state file. */
		if (!dsm_read_state_file(&control_handle))
			ereport(ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("could not parse dynamic shared memory state file")));

		/* Attach control segment. */
		dsm_impl_op(DSM_OP_ATTACH, control_handle, 0,
					&dsm_control_impl_private, &control_address,
					&dsm_control_mapped_size, ERROR);
		dsm_control_handle = control_handle;
		dsm_control = control_address;
		/* If control segment doesn't look sane, something is badly wrong. */
		if (!dsm_control_segment_sane(dsm_control, dsm_control_mapped_size))
		{
			dsm_impl_op(DSM_OP_DETACH, control_handle, 0,
						&dsm_control_impl_private, &control_address,
						&dsm_control_mapped_size, WARNING);
			ereport(FATAL,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("dynamic shared memory control segment is not valid")));
		}
	}
#endif

	/* Arrange to detach segments on exit. */
	on_shmem_exit(dsm_backend_shutdown, 0);

	dsm_init_done = true;
}

/*
 * Create a new dynamic shared memory segment.
 */
dsm_segment *
dsm_create(Size size)
{
	dsm_segment	   *seg = dsm_create_descriptor();
	uint32			i;
	uint32			nitems;

	/* Unsafe in postmaster (and pointless in a stand-alone backend). */
	Assert(IsUnderPostmaster);

	if (!dsm_init_done)
		dsm_backend_startup();

	/* Loop until we find an unused segment identifier. */
	for (;;)
	{
		Assert(seg->mapped_address == NULL && seg->mapped_size == 0);
		seg->handle = random();
		if (dsm_impl_op(DSM_OP_CREATE, seg->handle, size, &seg->impl_private,
						&seg->mapped_address, &seg->mapped_size, ERROR))
			break;
	}

	/* Lock the control segment so we can register the new segment. */
	LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);

	/* Search the control segment for an unused slot. */
	nitems = dsm_control->nitems;
	for (i = 0; i < nitems; ++i)
	{
		if (dsm_control->item[i].refcnt == 0)
		{
			dsm_control->item[i].handle = seg->handle;
			/* refcnt of 1 triggers destruction, so start at 2 */
			dsm_control->item[i].refcnt = 2;
			seg->control_slot = i;
			LWLockRelease(DynamicSharedMemoryControlLock);
			return seg;
		}
	}

	/* Verify that we can support an additional mapping. */
	if (nitems >= dsm_control->maxitems)
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
				 errmsg("too many dynamic shared memory segments")));

	/* Enter the handle into a new array slot. */
	dsm_control->item[nitems].handle = seg->handle;
	/* refcnt of 1 triggers destruction, so start at 2 */
	dsm_control->item[nitems].refcnt = 2;
	seg->control_slot = nitems;
	dsm_control->nitems++;
	LWLockRelease(DynamicSharedMemoryControlLock);

	return seg;
}

/*
 * Attach a dynamic shared memory segment.
 *
 * See comments for dsm_segment_handle() for an explanation of how this
 * is intended to be used.
 *
 * This function will return NULL if the segment isn't known to the system.
 * This can happen if we're asked to attach the segment, but then everyone
 * else detaches it (causing it to be destroyed) before we get around to
 * attaching it.
 */
dsm_segment *
dsm_attach(dsm_handle h)
{
	dsm_segment	   *seg;
	dlist_iter		iter;
	uint32			i;
	uint32			nitems;

	/* Unsafe in postmaster (and pointless in a stand-alone backend). */
	Assert(IsUnderPostmaster);

	if (!dsm_init_done)
		dsm_backend_startup();

	/*
	 * Since this is just a debugging cross-check, we could leave it out
	 * altogether, or include it only in assert-enabled builds.  But since
	 * the list of attached segments should normally be very short, let's
	 * include it always for right now.
	 *
	 * If you're hitting this error, you probably want to attempt to
	 * find an existing mapping via dsm_find_mapping() before calling
	 * dsm_attach() to create a new one.
	 */
	dlist_foreach(iter, &dsm_segment_list)
	{
		seg = dlist_container(dsm_segment, node, iter.cur);
		if (seg->handle == h)
			elog(ERROR, "can't attach the same segment more than once");
	}

	/* Create a new segment descriptor. */
	seg = dsm_create_descriptor();
	seg->handle = h;

	/* Bump reference count for this segment in shared memory. */
	LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);
	nitems = dsm_control->nitems;
	for (i = 0; i < nitems; ++i)
	{
		/* If the reference count is 0, the slot is actually unused. */
		if (dsm_control->item[i].refcnt == 0)
			continue;

		/*
		 * If the reference count is 1, the slot is still in use, but the
		 * segment is in the process of going away.  Treat that as if we
		 * didn't find a match.
		 */
		if (dsm_control->item[i].refcnt == 1)
			break;

		/* Otherwise, if the descriptor matches, we've found a match. */
		if (dsm_control->item[i].handle == seg->handle)
		{
			dsm_control->item[i].refcnt++;
			seg->control_slot = i;
			break;
		}
	}
	LWLockRelease(DynamicSharedMemoryControlLock);

	/*
	 * If we didn't find the handle we're looking for in the control
	 * segment, it probably means that everyone else who had it mapped,
	 * including the original creator, died before we got to this point.
	 * It's up to the caller to decide what to do about that.
	 */
	if (seg->control_slot == INVALID_CONTROL_SLOT)
	{
		dsm_detach(seg);
		return NULL;
	}

	/* Here's where we actually try to map the segment. */
	dsm_impl_op(DSM_OP_ATTACH, seg->handle, 0, &seg->impl_private,
				&seg->mapped_address, &seg->mapped_size, ERROR);

	return seg;
}

/*
 * At backend shutdown time, detach any segments that are still attached.
 */
static void
dsm_backend_shutdown(int code, Datum arg)
{
	while (!dlist_is_empty(&dsm_segment_list))
	{
		dsm_segment	   *seg;

		seg = dlist_head_element(dsm_segment, node, &dsm_segment_list);
		dsm_detach(seg);
	}
}

/*
 * Resize an existing shared memory segment.
 *
 * This may cause the shared memory segment to be remapped at a different
 * address.  For the caller's convenience, we return the mapped address.
 */
void *
dsm_resize(dsm_segment *seg, Size size)
{
	Assert(seg->control_slot != INVALID_CONTROL_SLOT);
	dsm_impl_op(DSM_OP_RESIZE, seg->handle, size, &seg->impl_private,
				&seg->mapped_address, &seg->mapped_size, ERROR);
	return seg->mapped_address;
}

/*
 * Remap an existing shared memory segment.
 *
 * This is intended to be used when some other process has extended the
 * mapping using dsm_resize(), but we've still only got the initial
 * portion mapped.  Since this might change the address at which the
 * segment is mapped, we return the new mapped address.
 */
void *
dsm_remap(dsm_segment *seg)
{
	dsm_impl_op(DSM_OP_ATTACH, seg->handle, 0, &seg->impl_private,
				&seg->mapped_address, &seg->mapped_size, ERROR);

	return seg->mapped_address;
}

/*
 * Detach from a shared memory segment, destroying the segment if we
 * remove the last reference.
 *
 * This function should never fail.  It will often be invoked when aborting
 * a transaction, and a further error won't serve any purpose.  It's not a
 * complete disaster if we fail to unmap or destroy the segment; it means a
 * resource leak, but that doesn't necessarily preclude further operations.
 */
void
dsm_detach(dsm_segment *seg)
{
	/*
	 * Try to remove the mapping, if one exists.  Normally, there will be,
	 * but maybe not, if we failed partway through a create or attach
	 * operation.  We remove the mapping before decrementing the reference
	 * count so that the process that sees a zero reference count can be
	 * certain that no remaining mappings exist.  Even if this fails, we
	 * pretend that it works, because retrying is likely to fail in the
	 * same way.
	 */
	if (seg->mapped_address != NULL)
	{
		dsm_impl_op(DSM_OP_DETACH, seg->handle, 0, &seg->impl_private,
					&seg->mapped_address, &seg->mapped_size, WARNING);
		seg->impl_private = NULL;
		seg->mapped_address = NULL;
		seg->mapped_size = 0;
	}

	/* Reduce reference count, if we previously increased it. */
	if (seg->control_slot != INVALID_CONTROL_SLOT)
	{
		uint32	refcnt;
		uint32	control_slot = seg->control_slot;

		LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);
		Assert(dsm_control->item[control_slot].handle == seg->handle);
		Assert(dsm_control->item[control_slot].refcnt > 1);
		refcnt = --dsm_control->item[control_slot].refcnt;
		seg->control_slot = INVALID_CONTROL_SLOT;
		LWLockRelease(DynamicSharedMemoryControlLock);

		/* If new reference count is 1, try to destroy the segment. */
		if (refcnt == 1)
		{
			/*
			 * If we fail to destroy the segment here, or are killed before
			 * we finish doing so, the reference count will remain at 1, which
			 * will mean that nobody else can attach to the segment.  At
			 * postmaster shutdown time, or when a new postmaster is started
			 * after a hard kill, another attempt will be made to remove the
			 * segment.
			 *
			 * The main case we're worried about here is being killed by
			 * a signal before we can finish removing the segment.  In that
			 * case, it's important to be sure that the segment still gets
			 * removed. If we actually fail to remove the segment for some
			 * other reason, the postmaster may not have any better luck than
			 * we did.  There's not much we can do about that, though.
			 */
			if (dsm_impl_op(DSM_OP_DESTROY, seg->handle, 0, &seg->impl_private,
							&seg->mapped_address, &seg->mapped_size, WARNING))
			{
				LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);
				Assert(dsm_control->item[control_slot].handle == seg->handle);
				Assert(dsm_control->item[control_slot].refcnt == 1);
				dsm_control->item[control_slot].refcnt = 0;
				LWLockRelease(DynamicSharedMemoryControlLock);
			}
		}
	}

	/* Clean up our remaining backend-private data structures. */
	if (seg->resowner != NULL)
		ResourceOwnerForgetDSM(seg->resowner, seg);
	dlist_delete(&seg->node);
	pfree(seg);
}

/*
 * Keep a dynamic shared memory mapping until end of session.
 *
 * By default, mappings are owned by the current resource owner, which
 * typically means they stick around for the duration of the current query
 * only.
 */
void
dsm_keep_mapping(dsm_segment *seg)
{
	if (seg->resowner != NULL)
	{
		ResourceOwnerForgetDSM(seg->resowner, seg);
		seg->resowner = NULL;
	}
}

/*
 * Find an existing mapping for a shared memory segment, if there is one.
 */
dsm_segment *
dsm_find_mapping(dsm_handle h)
{
	dlist_iter		iter;
	dsm_segment	   *seg;

	dlist_foreach(iter, &dsm_segment_list)
	{
		seg = dlist_container(dsm_segment, node, iter.cur);
		if (seg->handle == h)
			return seg;
	}

	return NULL;
}

/*
 * Get the address at which a dynamic shared memory segment is mapped.
 */
void *
dsm_segment_address(dsm_segment *seg)
{
	Assert(seg->mapped_address != NULL);
	return seg->mapped_address;
}

/*
 * Get the size of a mapping.
 */
Size
dsm_segment_map_length(dsm_segment *seg)
{
	Assert(seg->mapped_address != NULL);
	return seg->mapped_size;
}

/*
 * Get a handle for a mapping.
 *
 * To establish communication via dynamic shared memory between two backends,
 * one of them should first call dsm_create() to establish a new shared
 * memory mapping.  That process should then call dsm_segment_handle() to
 * obtain a handle for the mapping, and pass that handle to the
 * coordinating backend via some means (e.g. bgw_main_arg, or via the
 * main shared memory segment).  The recipient, once in position of the
 * handle, should call dsm_attach().
 */
dsm_handle
dsm_segment_handle(dsm_segment *seg)
{
	return seg->handle;
}

/*
 * Create a segment descriptor.
 */
static dsm_segment *
dsm_create_descriptor(void)
{
	dsm_segment	   *seg;

	ResourceOwnerEnlargeDSMs(CurrentResourceOwner);

	seg = MemoryContextAlloc(TopMemoryContext, sizeof(dsm_segment));
	dlist_push_head(&dsm_segment_list, &seg->node);

	/* seg->handle must be initialized by the caller */
	seg->control_slot = INVALID_CONTROL_SLOT;
	seg->impl_private = NULL;
	seg->mapped_address = NULL;
	seg->mapped_size = 0;

	seg->resowner = CurrentResourceOwner;
	ResourceOwnerRememberDSM(CurrentResourceOwner, seg);

	return seg;
}

/*
 * Sanity check a control segment.
 *
 * The goal here isn't to detect everything that could possibly be wrong with
 * the control segment; there's not enough information for that.  Rather, the
 * goal is to make sure that someone can iterate over the items in the segment
 * without overrunning the end of the mapping and crashing.  We also check
 * the magic number since, if that's messed up, this may not even be one of
 * our segments at all.
 */
static bool
dsm_control_segment_sane(dsm_control_header *control, Size mapped_size)
{
	if (mapped_size < offsetof(dsm_control_header, item))
		return false;			/* Mapped size too short to read header. */
	if (control->magic != PG_DYNSHMEM_CONTROL_MAGIC)
		return false;			/* Magic number doesn't match. */
	if (dsm_control_bytes_needed(control->maxitems) > mapped_size)
		return false;			/* Max item count won't fit in map. */
	if (control->nitems > control->maxitems)
		return false;			/* Overfull. */
	return true;
}

/*
 * Compute the number of control-segment bytes needed to store a given
 * number of items.
 */
static uint64
dsm_control_bytes_needed(uint32 nitems)
{
	return offsetof(dsm_control_header, item)
		+ sizeof(dsm_control_item) * (uint64) nitems;
}