aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/large_object/inv_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/large_object/inv_api.c')
-rw-r--r--src/backend/storage/large_object/inv_api.c1877
1 files changed, 968 insertions, 909 deletions
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c
index ddf69a6527e..dfde8f469c5 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -1,19 +1,19 @@
/*-------------------------------------------------------------------------
*
* inv_api.c--
- * routines for manipulating inversion fs large objects. This file
- * contains the user-level large object application interface routines.
+ * routines for manipulating inversion fs large objects. This file
+ * contains the user-level large object application interface routines.
*
* Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.13 1997/08/19 21:33:10 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.14 1997/09/07 04:48:46 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include <sys/types.h>
-#include <stdio.h> /* for sprintf() */
+#include <stdio.h> /* for sprintf() */
#include <string.h>
#include <sys/file.h>
#include <sys/stat.h>
@@ -28,11 +28,11 @@
#include "access/xact.h"
#include "access/nbtree.h"
#include "access/tupdesc.h"
-#include "catalog/index.h" /* for index_create() */
+#include "catalog/index.h" /* for index_create() */
#include "catalog/catalog.h" /* for newoid() */
-#include "catalog/pg_am.h" /* for BTREE_AM_OID */
+#include "catalog/pg_am.h" /* for BTREE_AM_OID */
#include "catalog/pg_opclass.h" /* for INT4_OPS_OID */
-#include "catalog/pg_proc.h" /* for INT4GE_PROC_OID */
+#include "catalog/pg_proc.h" /* for INT4GE_PROC_OID */
#include "storage/itemptr.h"
#include "storage/bufpage.h"
#include "storage/bufmgr.h"
@@ -43,226 +43,241 @@
#include "storage/large_object.h"
#include "storage/lmgr.h"
#include "utils/syscache.h"
-#include "utils/builtins.h" /* for namestrcpy() */
+#include "utils/builtins.h" /* for namestrcpy() */
#include "catalog/heap.h"
#include "nodes/pg_list.h"
/*
- * Warning, Will Robinson... In order to pack data into an inversion
- * file as densely as possible, we violate the class abstraction here.
- * When we're appending a new tuple to the end of the table, we check
- * the last page to see how much data we can put on it. If it's more
- * than IMINBLK, we write enough to fill the page. This limits external
- * fragmentation. In no case can we write more than IMAXBLK, since
- * the 8K postgres page size less overhead leaves only this much space
- * for data.
+ * Warning, Will Robinson... In order to pack data into an inversion
+ * file as densely as possible, we violate the class abstraction here.
+ * When we're appending a new tuple to the end of the table, we check
+ * the last page to see how much data we can put on it. If it's more
+ * than IMINBLK, we write enough to fill the page. This limits external
+ * fragmentation. In no case can we write more than IMAXBLK, since
+ * the 8K postgres page size less overhead leaves only this much space
+ * for data.
*/
-#define IFREESPC(p) (PageGetFreeSpace(p) - sizeof(HeapTupleData) - sizeof(struct varlena) - sizeof(int32))
-#define IMAXBLK 8092
-#define IMINBLK 512
+#define IFREESPC(p) (PageGetFreeSpace(p) - sizeof(HeapTupleData) - sizeof(struct varlena) - sizeof(int32))
+#define IMAXBLK 8092
+#define IMINBLK 512
/* non-export function prototypes */
-static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
- Page page, char *dbuf, int nwrite);
-static HeapTuple inv_fetchtup(LargeObjectDesc *obj_desc, Buffer *bufP);
-static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
-static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
- HeapTuple htup, Buffer buffer);
-static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple htup);
-static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
+static HeapTuple
+inv_newtuple(LargeObjectDesc * obj_desc, Buffer buffer,
+ Page page, char *dbuf, int nwrite);
+static HeapTuple inv_fetchtup(LargeObjectDesc * obj_desc, Buffer * bufP);
+static int inv_wrnew(LargeObjectDesc * obj_desc, char *buf, int nbytes);
+static int
+inv_wrold(LargeObjectDesc * obj_desc, char *dbuf, int nbytes,
+ HeapTuple htup, Buffer buffer);
+static void inv_indextup(LargeObjectDesc * obj_desc, HeapTuple htup);
+static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
/*
- * inv_create -- create a new large object.
+ * inv_create -- create a new large object.
*
- * Arguments:
- * flags -- storage manager to use, archive mode, etc.
+ * Arguments:
+ * flags -- storage manager to use, archive mode, etc.
*
- * Returns:
- * large object descriptor, appropriately filled in.
+ * Returns:
+ * large object descriptor, appropriately filled in.
*/
LargeObjectDesc *
inv_create(int flags)
{
- int file_oid;
- LargeObjectDesc *retval;
- Relation r;
- Relation indr;
- int smgr;
- char archchar;
- TupleDesc tupdesc;
- AttrNumber attNums[1];
- Oid classObjectId[1];
- char objname[NAMEDATALEN];
- char indname[NAMEDATALEN];
-
- /* parse flags */
- smgr = flags & INV_SMGRMASK;
- if (flags & INV_ARCHIVE)
- archchar = 'h';
- else
- archchar = 'n';
-
- /* add one here since the pg_class tuple created
- will have the next oid and we want to have the relation name
- to correspond to the tuple OID */
- file_oid = newoid()+1;
-
- /* come up with some table names */
- sprintf(objname, "xinv%d", file_oid);
- sprintf(indname, "xinx%d", file_oid);
-
- if (SearchSysCacheTuple(RELNAME, PointerGetDatum(objname),
- 0,0,0) != NULL) {
- elog(WARN,
- "internal error: %s already exists -- cannot create large obj",
- objname);
- }
- if (SearchSysCacheTuple(RELNAME, PointerGetDatum(indname),
- 0,0,0) != NULL) {
- elog(WARN,
- "internal error: %s already exists -- cannot create large obj",
- indname);
- }
-
- /* this is pretty painful... want a tuple descriptor */
- tupdesc = CreateTemplateTupleDesc(2);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1,
- "olastbye",
- "int4",
- 0, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 2,
- "odata",
- "bytea",
- 0, false);
- /*
- * First create the table to hold the inversion large object. It
- * will be located on whatever storage manager the user requested.
- */
-
- heap_create(objname,
- objname,
- (int) archchar, smgr,
- tupdesc);
-
- /* make the relation visible in this transaction */
- CommandCounterIncrement();
- r = heap_openr(objname);
-
- if (!RelationIsValid(r)) {
- elog(WARN, "cannot create large object on %s under inversion",
- smgrout(smgr));
- }
-
- /*
- * Now create a btree index on the relation's olastbyte attribute to
- * make seeks go faster. The hardwired constants are embarassing
- * to me, and are symptomatic of the pressure under which this code
- * was written.
- *
- * ok, mao, let's put in some symbolic constants - jolly
- */
-
- attNums[0] = 1;
- classObjectId[0] = INT4_OPS_OID;
- index_create(objname, indname, NULL, NULL, BTREE_AM_OID,
- 1, &attNums[0], &classObjectId[0],
- 0, (Datum) NULL, NULL, FALSE, FALSE);
-
- /* make the index visible in this transaction */
- CommandCounterIncrement();
- indr = index_openr(indname);
-
- if (!RelationIsValid(indr)) {
- elog(WARN, "cannot create index for large obj on %s under inversion",
- smgrout(smgr));
- }
-
- retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
-
- retval->heap_r = r;
- retval->index_r = indr;
- retval->iscan = (IndexScanDesc) NULL;
- retval->hdesc = RelationGetTupleDescriptor(r);
- retval->idesc = RelationGetTupleDescriptor(indr);
- retval->offset = retval->lowbyte =
- retval->highbyte = 0;
- ItemPointerSetInvalid(&(retval->htid));
-
- if (flags & INV_WRITE) {
- RelationSetLockForWrite(r);
- retval->flags = IFS_WRLOCK|IFS_RDLOCK;
- } else if (flags & INV_READ) {
- RelationSetLockForRead(r);
- retval->flags = IFS_RDLOCK;
- }
- retval->flags |= IFS_ATEOF;
-
- return(retval);
+ int file_oid;
+ LargeObjectDesc *retval;
+ Relation r;
+ Relation indr;
+ int smgr;
+ char archchar;
+ TupleDesc tupdesc;
+ AttrNumber attNums[1];
+ Oid classObjectId[1];
+ char objname[NAMEDATALEN];
+ char indname[NAMEDATALEN];
+
+ /* parse flags */
+ smgr = flags & INV_SMGRMASK;
+ if (flags & INV_ARCHIVE)
+ archchar = 'h';
+ else
+ archchar = 'n';
+
+ /*
+ * add one here since the pg_class tuple created will have the next
+ * oid and we want to have the relation name to correspond to the
+ * tuple OID
+ */
+ file_oid = newoid() + 1;
+
+ /* come up with some table names */
+ sprintf(objname, "xinv%d", file_oid);
+ sprintf(indname, "xinx%d", file_oid);
+
+ if (SearchSysCacheTuple(RELNAME, PointerGetDatum(objname),
+ 0, 0, 0) != NULL)
+ {
+ elog(WARN,
+ "internal error: %s already exists -- cannot create large obj",
+ objname);
+ }
+ if (SearchSysCacheTuple(RELNAME, PointerGetDatum(indname),
+ 0, 0, 0) != NULL)
+ {
+ elog(WARN,
+ "internal error: %s already exists -- cannot create large obj",
+ indname);
+ }
+
+ /* this is pretty painful... want a tuple descriptor */
+ tupdesc = CreateTemplateTupleDesc(2);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1,
+ "olastbye",
+ "int4",
+ 0, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2,
+ "odata",
+ "bytea",
+ 0, false);
+
+ /*
+ * First create the table to hold the inversion large object. It will
+ * be located on whatever storage manager the user requested.
+ */
+
+ heap_create(objname,
+ objname,
+ (int) archchar, smgr,
+ tupdesc);
+
+ /* make the relation visible in this transaction */
+ CommandCounterIncrement();
+ r = heap_openr(objname);
+
+ if (!RelationIsValid(r))
+ {
+ elog(WARN, "cannot create large object on %s under inversion",
+ smgrout(smgr));
+ }
+
+ /*
+ * Now create a btree index on the relation's olastbyte attribute to
+ * make seeks go faster. The hardwired constants are embarassing to
+ * me, and are symptomatic of the pressure under which this code was
+ * written.
+ *
+ * ok, mao, let's put in some symbolic constants - jolly
+ */
+
+ attNums[0] = 1;
+ classObjectId[0] = INT4_OPS_OID;
+ index_create(objname, indname, NULL, NULL, BTREE_AM_OID,
+ 1, &attNums[0], &classObjectId[0],
+ 0, (Datum) NULL, NULL, FALSE, FALSE);
+
+ /* make the index visible in this transaction */
+ CommandCounterIncrement();
+ indr = index_openr(indname);
+
+ if (!RelationIsValid(indr))
+ {
+ elog(WARN, "cannot create index for large obj on %s under inversion",
+ smgrout(smgr));
+ }
+
+ retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
+
+ retval->heap_r = r;
+ retval->index_r = indr;
+ retval->iscan = (IndexScanDesc) NULL;
+ retval->hdesc = RelationGetTupleDescriptor(r);
+ retval->idesc = RelationGetTupleDescriptor(indr);
+ retval->offset = retval->lowbyte =
+ retval->highbyte = 0;
+ ItemPointerSetInvalid(&(retval->htid));
+
+ if (flags & INV_WRITE)
+ {
+ RelationSetLockForWrite(r);
+ retval->flags = IFS_WRLOCK | IFS_RDLOCK;
+ }
+ else if (flags & INV_READ)
+ {
+ RelationSetLockForRead(r);
+ retval->flags = IFS_RDLOCK;
+ }
+ retval->flags |= IFS_ATEOF;
+
+ return (retval);
}
LargeObjectDesc *
inv_open(Oid lobjId, int flags)
{
- LargeObjectDesc *retval;
- Relation r;
- char *indname;
- Relation indrel;
-
- r = heap_open(lobjId);
-
- if (!RelationIsValid(r))
- return ((LargeObjectDesc *) NULL);
-
- indname = pstrdup((r->rd_rel->relname).data);
-
- /*
- * hack hack hack... we know that the fourth character of the relation
- * name is a 'v', and that the fourth character of the index name is an
- * 'x', and that they're otherwise identical.
- */
- indname[3] = 'x';
- indrel = index_openr(indname);
-
- if (!RelationIsValid(indrel))
- return ((LargeObjectDesc *) NULL);
-
- retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
-
- retval->heap_r = r;
- retval->index_r = indrel;
- retval->iscan = (IndexScanDesc) NULL;
- retval->hdesc = RelationGetTupleDescriptor(r);
- retval->idesc = RelationGetTupleDescriptor(indrel);
- retval->offset = retval->lowbyte = retval->highbyte = 0;
- ItemPointerSetInvalid(&(retval->htid));
-
- if (flags & INV_WRITE) {
- RelationSetLockForWrite(r);
- retval->flags = IFS_WRLOCK|IFS_RDLOCK;
- } else if (flags & INV_READ) {
- RelationSetLockForRead(r);
- retval->flags = IFS_RDLOCK;
- }
-
- return(retval);
+ LargeObjectDesc *retval;
+ Relation r;
+ char *indname;
+ Relation indrel;
+
+ r = heap_open(lobjId);
+
+ if (!RelationIsValid(r))
+ return ((LargeObjectDesc *) NULL);
+
+ indname = pstrdup((r->rd_rel->relname).data);
+
+ /*
+ * hack hack hack... we know that the fourth character of the
+ * relation name is a 'v', and that the fourth character of the index
+ * name is an 'x', and that they're otherwise identical.
+ */
+ indname[3] = 'x';
+ indrel = index_openr(indname);
+
+ if (!RelationIsValid(indrel))
+ return ((LargeObjectDesc *) NULL);
+
+ retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
+
+ retval->heap_r = r;
+ retval->index_r = indrel;
+ retval->iscan = (IndexScanDesc) NULL;
+ retval->hdesc = RelationGetTupleDescriptor(r);
+ retval->idesc = RelationGetTupleDescriptor(indrel);
+ retval->offset = retval->lowbyte = retval->highbyte = 0;
+ ItemPointerSetInvalid(&(retval->htid));
+
+ if (flags & INV_WRITE)
+ {
+ RelationSetLockForWrite(r);
+ retval->flags = IFS_WRLOCK | IFS_RDLOCK;
+ }
+ else if (flags & INV_READ)
+ {
+ RelationSetLockForRead(r);
+ retval->flags = IFS_RDLOCK;
+ }
+
+ return (retval);
}
/*
* Closes an existing large object descriptor.
*/
void
-inv_close(LargeObjectDesc *obj_desc)
+inv_close(LargeObjectDesc * obj_desc)
{
- Assert(PointerIsValid(obj_desc));
+ Assert(PointerIsValid(obj_desc));
- if (obj_desc->iscan != (IndexScanDesc) NULL)
- index_endscan(obj_desc->iscan);
+ if (obj_desc->iscan != (IndexScanDesc) NULL)
+ index_endscan(obj_desc->iscan);
- heap_close(obj_desc->heap_r);
- index_close(obj_desc->index_r);
+ heap_close(obj_desc->heap_r);
+ index_close(obj_desc->index_r);
- pfree(obj_desc);
+ pfree(obj_desc);
}
/*
@@ -273,897 +288,941 @@ inv_close(LargeObjectDesc *obj_desc)
int
inv_destroy(Oid lobjId)
{
- Relation r;
+ Relation r;
- r = (Relation) RelationIdGetRelation(lobjId);
- if (!RelationIsValid(r) || r->rd_rel->relkind == RELKIND_INDEX)
- return -1;
+ r = (Relation) RelationIdGetRelation(lobjId);
+ if (!RelationIsValid(r) || r->rd_rel->relkind == RELKIND_INDEX)
+ return -1;
- heap_destroy(r->rd_rel->relname.data);
- return 1;
+ heap_destroy(r->rd_rel->relname.data);
+ return 1;
}
/*
- * inv_stat() -- do a stat on an inversion file.
+ * inv_stat() -- do a stat on an inversion file.
*
- * For the time being, this is an insanely expensive operation. In
- * order to find the size of the file, we seek to the last block in
- * it and compute the size from that. We scan pg_class to determine
- * the file's owner and create time. We don't maintain mod time or
- * access time, yet.
+ * For the time being, this is an insanely expensive operation. In
+ * order to find the size of the file, we seek to the last block in
+ * it and compute the size from that. We scan pg_class to determine
+ * the file's owner and create time. We don't maintain mod time or
+ * access time, yet.
*
- * These fields aren't stored in a table anywhere because they're
- * updated so frequently, and postgres only appends tuples at the
- * end of relations. Once clustering works, we should fix this.
+ * These fields aren't stored in a table anywhere because they're
+ * updated so frequently, and postgres only appends tuples at the
+ * end of relations. Once clustering works, we should fix this.
*/
#ifdef NOT_USED
int
-inv_stat(LargeObjectDesc *obj_desc, struct pgstat *stbuf)
+inv_stat(LargeObjectDesc * obj_desc, struct pgstat * stbuf)
{
- Assert(PointerIsValid(obj_desc));
- Assert(stbuf != NULL);
-
- /* need read lock for stat */
- if (!(obj_desc->flags & IFS_RDLOCK)) {
- RelationSetLockForRead(obj_desc->heap_r);
- obj_desc->flags |= IFS_RDLOCK;
- }
+ Assert(PointerIsValid(obj_desc));
+ Assert(stbuf != NULL);
+
+ /* need read lock for stat */
+ if (!(obj_desc->flags & IFS_RDLOCK))
+ {
+ RelationSetLockForRead(obj_desc->heap_r);
+ obj_desc->flags |= IFS_RDLOCK;
+ }
- stbuf->st_ino = obj_desc->heap_r->rd_id;
+ stbuf->st_ino = obj_desc->heap_r->rd_id;
#if 1
- stbuf->st_mode = (S_IFREG | 0666); /* IFREG|rw-rw-rw- */
+ stbuf->st_mode = (S_IFREG | 0666); /* IFREG|rw-rw-rw- */
#else
- stbuf->st_mode = 100666; /* IFREG|rw-rw-rw- */
+ stbuf->st_mode = 100666; /* IFREG|rw-rw-rw- */
#endif
- stbuf->st_size = _inv_getsize(obj_desc->heap_r,
- obj_desc->hdesc,
- obj_desc->index_r);
+ stbuf->st_size = _inv_getsize(obj_desc->heap_r,
+ obj_desc->hdesc,
+ obj_desc->index_r);
- stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
+ stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
- /* we have no good way of computing access times right now */
- stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
+ /* we have no good way of computing access times right now */
+ stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
- return (0);
+ return (0);
}
+
#endif
int
-inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
+inv_seek(LargeObjectDesc * obj_desc, int offset, int whence)
{
- int oldOffset;
- Datum d;
- ScanKeyData skey;
-
- Assert(PointerIsValid(obj_desc));
-
- if (whence == SEEK_CUR) {
- offset += obj_desc->offset; /* calculate absolute position */
- return (inv_seek(obj_desc, offset, SEEK_SET));
- }
-
- /*
- * if you seek past the end (offset > 0) I have
- * no clue what happens :-( B.L. 9/1/93
- */
- if (whence == SEEK_END) {
- /* need read lock for getsize */
- if (!(obj_desc->flags & IFS_RDLOCK)) {
- RelationSetLockForRead(obj_desc->heap_r);
- obj_desc->flags |= IFS_RDLOCK;
- }
- offset += _inv_getsize(obj_desc->heap_r,
- obj_desc->hdesc,
- obj_desc->index_r );
- return (inv_seek(obj_desc, offset, SEEK_SET));
- }
-
- /*
- * Whenever we do a seek, we turn off the EOF flag bit to force
- * ourselves to check for real on the next read.
- */
-
- obj_desc->flags &= ~IFS_ATEOF;
- oldOffset = obj_desc->offset;
- obj_desc->offset = offset;
-
- /* try to avoid doing any work, if we can manage it */
- if (offset >= obj_desc->lowbyte
- && offset <= obj_desc->highbyte
- && oldOffset <= obj_desc->highbyte
- && obj_desc->iscan != (IndexScanDesc) NULL)
- return (offset);
-
- /*
- * To do a seek on an inversion file, we start an index scan that
- * will bring us to the right place. Each tuple in an inversion file
- * stores the offset of the last byte that appears on it, and we have
- * an index on this.
- */
-
-
- /* right now, just assume that the operation is SEEK_SET */
- if (obj_desc->iscan != (IndexScanDesc) NULL) {
- d = Int32GetDatum(offset);
- btmovescan(obj_desc->iscan, d);
- } else {
-
- ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
- Int32GetDatum(offset));
-
- obj_desc->iscan = index_beginscan(obj_desc->index_r,
- (bool) 0, (uint16) 1,
- &skey);
- }
-
- return (offset);
-}
+ int oldOffset;
+ Datum d;
+ ScanKeyData skey;
-int
-inv_tell(LargeObjectDesc *obj_desc)
-{
- Assert(PointerIsValid(obj_desc));
+ Assert(PointerIsValid(obj_desc));
- return (obj_desc->offset);
-}
+ if (whence == SEEK_CUR)
+ {
+ offset += obj_desc->offset; /* calculate absolute position */
+ return (inv_seek(obj_desc, offset, SEEK_SET));
+ }
-int
-inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
-{
- HeapTuple htup;
- Buffer b;
- int nread;
- int off;
- int ncopy;
- Datum d;
- struct varlena *fsblock;
- bool isNull;
-
- Assert(PointerIsValid(obj_desc));
- Assert(buf != NULL);
-
- /* if we're already at EOF, we don't need to do any work here */
- if (obj_desc->flags & IFS_ATEOF)
- return (0);
+ /*
+ * if you seek past the end (offset > 0) I have no clue what happens
+ * :-( B.L. 9/1/93
+ */
+ if (whence == SEEK_END)
+ {
+ /* need read lock for getsize */
+ if (!(obj_desc->flags & IFS_RDLOCK))
+ {
+ RelationSetLockForRead(obj_desc->heap_r);
+ obj_desc->flags |= IFS_RDLOCK;
+ }
+ offset += _inv_getsize(obj_desc->heap_r,
+ obj_desc->hdesc,
+ obj_desc->index_r);
+ return (inv_seek(obj_desc, offset, SEEK_SET));
+ }
- /* make sure we obey two-phase locking */
- if (!(obj_desc->flags & IFS_RDLOCK)) {
- RelationSetLockForRead(obj_desc->heap_r);
- obj_desc->flags |= IFS_RDLOCK;
- }
+ /*
+ * Whenever we do a seek, we turn off the EOF flag bit to force
+ * ourselves to check for real on the next read.
+ */
- nread = 0;
+ obj_desc->flags &= ~IFS_ATEOF;
+ oldOffset = obj_desc->offset;
+ obj_desc->offset = offset;
- /* fetch a block at a time */
- while (nread < nbytes) {
+ /* try to avoid doing any work, if we can manage it */
+ if (offset >= obj_desc->lowbyte
+ && offset <= obj_desc->highbyte
+ && oldOffset <= obj_desc->highbyte
+ && obj_desc->iscan != (IndexScanDesc) NULL)
+ return (offset);
+
+ /*
+ * To do a seek on an inversion file, we start an index scan that will
+ * bring us to the right place. Each tuple in an inversion file
+ * stores the offset of the last byte that appears on it, and we have
+ * an index on this.
+ */
- /* fetch an inversion file system block */
- htup = inv_fetchtup(obj_desc, &b);
- if (!HeapTupleIsValid(htup)) {
- obj_desc->flags |= IFS_ATEOF;
- break;
+ /* right now, just assume that the operation is SEEK_SET */
+ if (obj_desc->iscan != (IndexScanDesc) NULL)
+ {
+ d = Int32GetDatum(offset);
+ btmovescan(obj_desc->iscan, d);
}
+ else
+ {
- /* copy the data from this block into the buffer */
- d = (Datum) heap_getattr(htup, b, 2, obj_desc->hdesc, &isNull);
- fsblock = (struct varlena *) DatumGetPointer(d);
+ ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
+ Int32GetDatum(offset));
- off = obj_desc->offset - obj_desc->lowbyte;
- ncopy = obj_desc->highbyte - obj_desc->offset + 1;
- if (ncopy > (nbytes - nread))
- ncopy = (nbytes - nread);
- memmove(buf, &(fsblock->vl_dat[off]), ncopy);
+ obj_desc->iscan = index_beginscan(obj_desc->index_r,
+ (bool) 0, (uint16) 1,
+ &skey);
+ }
- /* be a good citizen */
- ReleaseBuffer(b);
+ return (offset);
+}
- /* move pointers past the amount we just read */
- buf += ncopy;
- nread += ncopy;
- obj_desc->offset += ncopy;
- }
+int
+inv_tell(LargeObjectDesc * obj_desc)
+{
+ Assert(PointerIsValid(obj_desc));
- /* that's it */
- return (nread);
+ return (obj_desc->offset);
}
int
-inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
+inv_read(LargeObjectDesc * obj_desc, char *buf, int nbytes)
{
- HeapTuple htup;
- Buffer b;
- int nwritten;
- int tuplen;
+ HeapTuple htup;
+ Buffer b;
+ int nread;
+ int off;
+ int ncopy;
+ Datum d;
+ struct varlena *fsblock;
+ bool isNull;
+
+ Assert(PointerIsValid(obj_desc));
+ Assert(buf != NULL);
+
+ /* if we're already at EOF, we don't need to do any work here */
+ if (obj_desc->flags & IFS_ATEOF)
+ return (0);
+
+ /* make sure we obey two-phase locking */
+ if (!(obj_desc->flags & IFS_RDLOCK))
+ {
+ RelationSetLockForRead(obj_desc->heap_r);
+ obj_desc->flags |= IFS_RDLOCK;
+ }
+
+ nread = 0;
+
+ /* fetch a block at a time */
+ while (nread < nbytes)
+ {
+
+ /* fetch an inversion file system block */
+ htup = inv_fetchtup(obj_desc, &b);
- Assert(PointerIsValid(obj_desc));
- Assert(buf != NULL);
+ if (!HeapTupleIsValid(htup))
+ {
+ obj_desc->flags |= IFS_ATEOF;
+ break;
+ }
- /*
- * Make sure we obey two-phase locking. A write lock entitles you
- * to read the relation, as well.
- */
+ /* copy the data from this block into the buffer */
+ d = (Datum) heap_getattr(htup, b, 2, obj_desc->hdesc, &isNull);
+ fsblock = (struct varlena *) DatumGetPointer(d);
- if (!(obj_desc->flags & IFS_WRLOCK)) {
- RelationSetLockForRead(obj_desc->heap_r);
- obj_desc->flags |= (IFS_WRLOCK|IFS_RDLOCK);
- }
+ off = obj_desc->offset - obj_desc->lowbyte;
+ ncopy = obj_desc->highbyte - obj_desc->offset + 1;
+ if (ncopy > (nbytes - nread))
+ ncopy = (nbytes - nread);
+ memmove(buf, &(fsblock->vl_dat[off]), ncopy);
- nwritten = 0;
+ /* be a good citizen */
+ ReleaseBuffer(b);
- /* write a block at a time */
- while (nwritten < nbytes) {
+ /* move pointers past the amount we just read */
+ buf += ncopy;
+ nread += ncopy;
+ obj_desc->offset += ncopy;
+ }
+
+ /* that's it */
+ return (nread);
+}
+
+int
+inv_write(LargeObjectDesc * obj_desc, char *buf, int nbytes)
+{
+ HeapTuple htup;
+ Buffer b;
+ int nwritten;
+ int tuplen;
+
+ Assert(PointerIsValid(obj_desc));
+ Assert(buf != NULL);
/*
- * Fetch the current inversion file system block. If the
- * class storing the inversion file is empty, we don't want
- * to do an index lookup, since index lookups choke on empty
- * files (should be fixed someday).
+ * Make sure we obey two-phase locking. A write lock entitles you to
+ * read the relation, as well.
*/
- if ((obj_desc->flags & IFS_ATEOF)
- || obj_desc->heap_r->rd_nblocks == 0)
- htup = (HeapTuple) NULL;
- else
- htup = inv_fetchtup(obj_desc, &b);
-
- /* either append or replace a block, as required */
- if (!HeapTupleIsValid(htup)) {
- tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
- } else {
- if (obj_desc->offset > obj_desc->highbyte)
- tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
- else
- tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, htup, b);
+ if (!(obj_desc->flags & IFS_WRLOCK))
+ {
+ RelationSetLockForRead(obj_desc->heap_r);
+ obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
}
- /* move pointers past the amount we just wrote */
- buf += tuplen;
- nwritten += tuplen;
- obj_desc->offset += tuplen;
- }
+ nwritten = 0;
+
+ /* write a block at a time */
+ while (nwritten < nbytes)
+ {
+
+ /*
+ * Fetch the current inversion file system block. If the class
+ * storing the inversion file is empty, we don't want to do an
+ * index lookup, since index lookups choke on empty files (should
+ * be fixed someday).
+ */
+
+ if ((obj_desc->flags & IFS_ATEOF)
+ || obj_desc->heap_r->rd_nblocks == 0)
+ htup = (HeapTuple) NULL;
+ else
+ htup = inv_fetchtup(obj_desc, &b);
+
+ /* either append or replace a block, as required */
+ if (!HeapTupleIsValid(htup))
+ {
+ tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
+ }
+ else
+ {
+ if (obj_desc->offset > obj_desc->highbyte)
+ tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
+ else
+ tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, htup, b);
+ }
+
+ /* move pointers past the amount we just wrote */
+ buf += tuplen;
+ nwritten += tuplen;
+ obj_desc->offset += tuplen;
+ }
- /* that's it */
- return (nwritten);
+ /* that's it */
+ return (nwritten);
}
/*
- * inv_fetchtup -- Fetch an inversion file system block.
+ * inv_fetchtup -- Fetch an inversion file system block.
*
- * This routine finds the file system block containing the offset
- * recorded in the obj_desc structure. Later, we need to think about
- * the effects of non-functional updates (can you rewrite the same
- * block twice in a single transaction?), but for now, we won't bother.
+ * This routine finds the file system block containing the offset
+ * recorded in the obj_desc structure. Later, we need to think about
+ * the effects of non-functional updates (can you rewrite the same
+ * block twice in a single transaction?), but for now, we won't bother.
*
- * Parameters:
- * obj_desc -- the object descriptor.
- * bufP -- pointer to a buffer in the buffer cache; caller
- * must free this.
+ * Parameters:
+ * obj_desc -- the object descriptor.
+ * bufP -- pointer to a buffer in the buffer cache; caller
+ * must free this.
*
- * Returns:
- * A heap tuple containing the desired block, or NULL if no
- * such tuple exists.
+ * Returns:
+ * A heap tuple containing the desired block, or NULL if no
+ * such tuple exists.
*/
-static HeapTuple
-inv_fetchtup(LargeObjectDesc *obj_desc, Buffer *bufP)
+static HeapTuple
+inv_fetchtup(LargeObjectDesc * obj_desc, Buffer * bufP)
{
- HeapTuple htup;
- RetrieveIndexResult res;
- Datum d;
- int firstbyte, lastbyte;
- struct varlena *fsblock;
- bool isNull;
-
- /*
- * If we've exhausted the current block, we need to get the next one.
- * When we support time travel and non-functional updates, we will
- * need to loop over the blocks, rather than just have an 'if', in
- * order to find the one we're really interested in.
- */
-
- if (obj_desc->offset > obj_desc->highbyte
- || obj_desc->offset < obj_desc->lowbyte
- || !ItemPointerIsValid(&(obj_desc->htid))) {
-
- /* initialize scan key if not done */
- if (obj_desc->iscan==(IndexScanDesc)NULL) {
- ScanKeyData skey;
-
- ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
- Int32GetDatum(0));
- obj_desc->iscan =
- index_beginscan(obj_desc->index_r,
- (bool) 0, (uint16) 1,
- &skey);
- }
+ HeapTuple htup;
+ RetrieveIndexResult res;
+ Datum d;
+ int firstbyte,
+ lastbyte;
+ struct varlena *fsblock;
+ bool isNull;
- do {
- res = index_getnext(obj_desc->iscan, ForwardScanDirection);
+ /*
+ * If we've exhausted the current block, we need to get the next one.
+ * When we support time travel and non-functional updates, we will
+ * need to loop over the blocks, rather than just have an 'if', in
+ * order to find the one we're really interested in.
+ */
+
+ if (obj_desc->offset > obj_desc->highbyte
+ || obj_desc->offset < obj_desc->lowbyte
+ || !ItemPointerIsValid(&(obj_desc->htid)))
+ {
- if (res == (RetrieveIndexResult) NULL) {
- ItemPointerSetInvalid(&(obj_desc->htid));
- return ((HeapTuple) NULL);
- }
+ /* initialize scan key if not done */
+ if (obj_desc->iscan == (IndexScanDesc) NULL)
+ {
+ ScanKeyData skey;
- /*
- * For time travel, we need to use the actual time qual here,
- * rather that NowTimeQual. We currently have no way to pass
- * a time qual in.
- */
+ ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
+ Int32GetDatum(0));
+ obj_desc->iscan =
+ index_beginscan(obj_desc->index_r,
+ (bool) 0, (uint16) 1,
+ &skey);
+ }
- htup = heap_fetch(obj_desc->heap_r, NowTimeQual,
- &(res->heap_iptr), bufP);
+ do
+ {
+ res = index_getnext(obj_desc->iscan, ForwardScanDirection);
- } while (htup == (HeapTuple) NULL);
+ if (res == (RetrieveIndexResult) NULL)
+ {
+ ItemPointerSetInvalid(&(obj_desc->htid));
+ return ((HeapTuple) NULL);
+ }
- /* remember this tid -- we may need it for later reads/writes */
- ItemPointerCopy(&(res->heap_iptr), &(obj_desc->htid));
+ /*
+ * For time travel, we need to use the actual time qual here,
+ * rather that NowTimeQual. We currently have no way to pass
+ * a time qual in.
+ */
- } else {
- htup = heap_fetch(obj_desc->heap_r, NowTimeQual,
- &(obj_desc->htid), bufP);
- }
+ htup = heap_fetch(obj_desc->heap_r, NowTimeQual,
+ &(res->heap_iptr), bufP);
- /*
- * By here, we have the heap tuple we're interested in. We cache
- * the upper and lower bounds for this block in the object descriptor
- * and return the tuple.
- */
+ } while (htup == (HeapTuple) NULL);
+
+ /* remember this tid -- we may need it for later reads/writes */
+ ItemPointerCopy(&(res->heap_iptr), &(obj_desc->htid));
+
+ }
+ else
+ {
+ htup = heap_fetch(obj_desc->heap_r, NowTimeQual,
+ &(obj_desc->htid), bufP);
+ }
+
+ /*
+ * By here, we have the heap tuple we're interested in. We cache the
+ * upper and lower bounds for this block in the object descriptor and
+ * return the tuple.
+ */
- d = (Datum)heap_getattr(htup, *bufP, 1, obj_desc->hdesc, &isNull);
- lastbyte = (int32) DatumGetInt32(d);
- d = (Datum)heap_getattr(htup, *bufP, 2, obj_desc->hdesc, &isNull);
- fsblock = (struct varlena *) DatumGetPointer(d);
+ d = (Datum) heap_getattr(htup, *bufP, 1, obj_desc->hdesc, &isNull);
+ lastbyte = (int32) DatumGetInt32(d);
+ d = (Datum) heap_getattr(htup, *bufP, 2, obj_desc->hdesc, &isNull);
+ fsblock = (struct varlena *) DatumGetPointer(d);
- /* order of + and - is important -- these are unsigned quantites near 0 */
- firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
+ /*
+ * order of + and - is important -- these are unsigned quantites near
+ * 0
+ */
+ firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
- obj_desc->lowbyte = firstbyte;
- obj_desc->highbyte = lastbyte;
+ obj_desc->lowbyte = firstbyte;
+ obj_desc->highbyte = lastbyte;
- /* done */
- return (htup);
+ /* done */
+ return (htup);
}
/*
- * inv_wrnew() -- append a new filesystem block tuple to the inversion
- * file.
+ * inv_wrnew() -- append a new filesystem block tuple to the inversion
+ * file.
*
- * In response to an inv_write, we append one or more file system
- * blocks to the class containing the large object. We violate the
- * class abstraction here in order to pack things as densely as we
- * are able. We examine the last page in the relation, and write
- * just enough to fill it, assuming that it has above a certain
- * threshold of space available. If the space available is less than
- * the threshold, we allocate a new page by writing a big tuple.
+ * In response to an inv_write, we append one or more file system
+ * blocks to the class containing the large object. We violate the
+ * class abstraction here in order to pack things as densely as we
+ * are able. We examine the last page in the relation, and write
+ * just enough to fill it, assuming that it has above a certain
+ * threshold of space available. If the space available is less than
+ * the threshold, we allocate a new page by writing a big tuple.
*
- * By the time we get here, we know all the parameters passed in
- * are valid, and that we hold the appropriate lock on the heap
- * relation.
+ * By the time we get here, we know all the parameters passed in
+ * are valid, and that we hold the appropriate lock on the heap
+ * relation.
*
- * Parameters:
- * obj_desc: large object descriptor for which to append block.
- * buf: buffer containing data to write.
- * nbytes: amount to write
+ * Parameters:
+ * obj_desc: large object descriptor for which to append block.
+ * buf: buffer containing data to write.
+ * nbytes: amount to write
*
- * Returns:
- * number of bytes actually written to the new tuple.
+ * Returns:
+ * number of bytes actually written to the new tuple.
*/
static int
-inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
+inv_wrnew(LargeObjectDesc * obj_desc, char *buf, int nbytes)
{
- Relation hr;
- HeapTuple ntup;
- Buffer buffer;
- Page page;
- int nblocks;
- int nwritten;
-
- hr = obj_desc->heap_r;
-
- /*
- * Get the last block in the relation. If there's no data in the
- * relation at all, then we just get a new block. Otherwise, we
- * check the last block to see whether it has room to accept some
- * or all of the data that the user wants to write. If it doesn't,
- * then we allocate a new block.
- */
-
- nblocks = RelationGetNumberOfBlocks(hr);
-
- if (nblocks > 0)
- buffer = ReadBuffer(hr, nblocks - 1);
- else
- buffer = ReadBuffer(hr, P_NEW);
-
- page = BufferGetPage(buffer);
-
- /*
- * If the last page is too small to hold all the data, and it's too
- * small to hold IMINBLK, then we allocate a new page. If it will
- * hold at least IMINBLK, but less than all the data requested, then
- * we write IMINBLK here. The caller is responsible for noticing that
- * less than the requested number of bytes were written, and calling
- * this routine again.
- */
-
- nwritten = IFREESPC(page);
- if (nwritten < nbytes) {
- if (nwritten < IMINBLK) {
- ReleaseBuffer(buffer);
- buffer = ReadBuffer(hr, P_NEW);
- page = BufferGetPage(buffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
- if (nbytes > IMAXBLK)
- nwritten = IMAXBLK;
- else
+ Relation hr;
+ HeapTuple ntup;
+ Buffer buffer;
+ Page page;
+ int nblocks;
+ int nwritten;
+
+ hr = obj_desc->heap_r;
+
+ /*
+ * Get the last block in the relation. If there's no data in the
+ * relation at all, then we just get a new block. Otherwise, we check
+ * the last block to see whether it has room to accept some or all of
+ * the data that the user wants to write. If it doesn't, then we
+ * allocate a new block.
+ */
+
+ nblocks = RelationGetNumberOfBlocks(hr);
+
+ if (nblocks > 0)
+ buffer = ReadBuffer(hr, nblocks - 1);
+ else
+ buffer = ReadBuffer(hr, P_NEW);
+
+ page = BufferGetPage(buffer);
+
+ /*
+ * If the last page is too small to hold all the data, and it's too
+ * small to hold IMINBLK, then we allocate a new page. If it will
+ * hold at least IMINBLK, but less than all the data requested, then
+ * we write IMINBLK here. The caller is responsible for noticing that
+ * less than the requested number of bytes were written, and calling
+ * this routine again.
+ */
+
+ nwritten = IFREESPC(page);
+ if (nwritten < nbytes)
+ {
+ if (nwritten < IMINBLK)
+ {
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(hr, P_NEW);
+ page = BufferGetPage(buffer);
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ if (nbytes > IMAXBLK)
+ nwritten = IMAXBLK;
+ else
+ nwritten = nbytes;
+ }
+ }
+ else
+ {
nwritten = nbytes;
}
- } else {
- nwritten = nbytes;
- }
- /*
- * Insert a new file system block tuple, index it, and write it out.
- */
+ /*
+ * Insert a new file system block tuple, index it, and write it out.
+ */
- ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
- inv_indextup(obj_desc, ntup);
+ ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
+ inv_indextup(obj_desc, ntup);
- /* new tuple is inserted */
- WriteBuffer(buffer);
+ /* new tuple is inserted */
+ WriteBuffer(buffer);
- return (nwritten);
+ return (nwritten);
}
static int
-inv_wrold(LargeObjectDesc *obj_desc,
- char *dbuf,
- int nbytes,
- HeapTuple htup,
- Buffer buffer)
+inv_wrold(LargeObjectDesc * obj_desc,
+ char *dbuf,
+ int nbytes,
+ HeapTuple htup,
+ Buffer buffer)
{
- Relation hr;
- HeapTuple ntup;
- Buffer newbuf;
- Page page;
- Page newpage;
- int tupbytes;
- Datum d;
- struct varlena *fsblock;
- int nwritten, nblocks, freespc;
- bool isNull;
- int keep_offset;
-
- /*
- * Since we're using a no-overwrite storage manager, the way we
- * overwrite blocks is to mark the old block invalid and append
- * a new block. First mark the old block invalid. This violates
- * the tuple abstraction.
- */
-
- TransactionIdStore(GetCurrentTransactionId(), &(htup->t_xmax));
- htup->t_cmax = GetCurrentCommandId();
-
- /*
- * If we're overwriting the entire block, we're lucky. All we need
- * to do is to insert a new block.
- */
-
- if (obj_desc->offset == obj_desc->lowbyte
- && obj_desc->lowbyte + nbytes >= obj_desc->highbyte) {
- WriteBuffer(buffer);
- return (inv_wrnew(obj_desc, dbuf, nbytes));
- }
-
- /*
- * By here, we need to overwrite part of the data in the current
- * tuple. In order to reduce the degree to which we fragment blocks,
- * we guarantee that no block will be broken up due to an overwrite.
- * This means that we need to allocate a tuple on a new page, if
- * there's not room for the replacement on this one.
- */
-
- newbuf = buffer;
- page = BufferGetPage(buffer);
- newpage = BufferGetPage(newbuf);
- hr = obj_desc->heap_r;
- freespc = IFREESPC(page);
- d = (Datum)heap_getattr(htup, buffer, 2, obj_desc->hdesc, &isNull);
- fsblock = (struct varlena *) DatumGetPointer(d);
- tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
-
- if (freespc < tupbytes) {
+ Relation hr;
+ HeapTuple ntup;
+ Buffer newbuf;
+ Page page;
+ Page newpage;
+ int tupbytes;
+ Datum d;
+ struct varlena *fsblock;
+ int nwritten,
+ nblocks,
+ freespc;
+ bool isNull;
+ int keep_offset;
/*
- * First see if there's enough space on the last page of the
- * table to put this tuple.
+ * Since we're using a no-overwrite storage manager, the way we
+ * overwrite blocks is to mark the old block invalid and append a new
+ * block. First mark the old block invalid. This violates the tuple
+ * abstraction.
*/
- nblocks = RelationGetNumberOfBlocks(hr);
+ TransactionIdStore(GetCurrentTransactionId(), &(htup->t_xmax));
+ htup->t_cmax = GetCurrentCommandId();
- if (nblocks > 0)
- newbuf = ReadBuffer(hr, nblocks - 1);
- else
- newbuf = ReadBuffer(hr, P_NEW);
+ /*
+ * If we're overwriting the entire block, we're lucky. All we need to
+ * do is to insert a new block.
+ */
- newpage = BufferGetPage(newbuf);
- freespc = IFREESPC(newpage);
+ if (obj_desc->offset == obj_desc->lowbyte
+ && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
+ {
+ WriteBuffer(buffer);
+ return (inv_wrnew(obj_desc, dbuf, nbytes));
+ }
/*
- * If there's no room on the last page, allocate a new last
- * page for the table, and put it there.
+ * By here, we need to overwrite part of the data in the current
+ * tuple. In order to reduce the degree to which we fragment blocks,
+ * we guarantee that no block will be broken up due to an overwrite.
+ * This means that we need to allocate a tuple on a new page, if
+ * there's not room for the replacement on this one.
*/
- if (freespc < tupbytes) {
- ReleaseBuffer(newbuf);
- newbuf = ReadBuffer(hr, P_NEW);
- newpage = BufferGetPage(newbuf);
- PageInit(newpage, BufferGetPageSize(newbuf), 0);
+ newbuf = buffer;
+ page = BufferGetPage(buffer);
+ newpage = BufferGetPage(newbuf);
+ hr = obj_desc->heap_r;
+ freespc = IFREESPC(page);
+ d = (Datum) heap_getattr(htup, buffer, 2, obj_desc->hdesc, &isNull);
+ fsblock = (struct varlena *) DatumGetPointer(d);
+ tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
+
+ if (freespc < tupbytes)
+ {
+
+ /*
+ * First see if there's enough space on the last page of the table
+ * to put this tuple.
+ */
+
+ nblocks = RelationGetNumberOfBlocks(hr);
+
+ if (nblocks > 0)
+ newbuf = ReadBuffer(hr, nblocks - 1);
+ else
+ newbuf = ReadBuffer(hr, P_NEW);
+
+ newpage = BufferGetPage(newbuf);
+ freespc = IFREESPC(newpage);
+
+ /*
+ * If there's no room on the last page, allocate a new last page
+ * for the table, and put it there.
+ */
+
+ if (freespc < tupbytes)
+ {
+ ReleaseBuffer(newbuf);
+ newbuf = ReadBuffer(hr, P_NEW);
+ newpage = BufferGetPage(newbuf);
+ PageInit(newpage, BufferGetPageSize(newbuf), 0);
+ }
}
- }
-
- nwritten = nbytes;
- if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
- nwritten = obj_desc->highbyte - obj_desc->offset + 1;
- memmove(VARDATA(fsblock)+ (obj_desc->offset - obj_desc->lowbyte),
- dbuf,nwritten);
- /* we are rewriting the entire old block, therefore
- we reset offset to the lowbyte of the original block
- before jumping into inv_newtuple() */
- keep_offset = obj_desc->offset;
- obj_desc->offset = obj_desc->lowbyte;
- ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
- tupbytes);
- /* after we are done, we restore to the true offset */
- obj_desc->offset = keep_offset;
-
- /*
- * By here, we have a page (newpage) that's guaranteed to have
- * enough space on it to put the new tuple. Call inv_newtuple
- * to do the work. Passing NULL as a buffer to inv_newtuple()
- * keeps it from copying any data into the new tuple. When it
- * returns, the tuple is ready to receive data from the old
- * tuple and the user's data buffer.
- */
-/*
- ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
- dptr = ((char *) ntup) + ntup->t_hoff - sizeof(ntup->t_bits) + sizeof(int4)
- + sizeof(fsblock->vl_len);
- if (obj_desc->offset > obj_desc->lowbyte) {
- memmove(dptr,
- &(fsblock->vl_dat[0]),
- obj_desc->offset - obj_desc->lowbyte);
- dptr += obj_desc->offset - obj_desc->lowbyte;
- }
+ nwritten = nbytes;
+ if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
+ nwritten = obj_desc->highbyte - obj_desc->offset + 1;
+ memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
+ dbuf, nwritten);
+
+ /*
+ * we are rewriting the entire old block, therefore we reset offset to
+ * the lowbyte of the original block before jumping into
+ * inv_newtuple()
+ */
+ keep_offset = obj_desc->offset;
+ obj_desc->offset = obj_desc->lowbyte;
+ ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
+ tupbytes);
+ /* after we are done, we restore to the true offset */
+ obj_desc->offset = keep_offset;
+
+ /*
+ * By here, we have a page (newpage) that's guaranteed to have enough
+ * space on it to put the new tuple. Call inv_newtuple to do the
+ * work. Passing NULL as a buffer to inv_newtuple() keeps it from
+ * copying any data into the new tuple. When it returns, the tuple is
+ * ready to receive data from the old tuple and the user's data
+ * buffer.
+ */
+/*
+ ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
+ dptr = ((char *) ntup) + ntup->t_hoff - sizeof(ntup->t_bits) + sizeof(int4)
+ + sizeof(fsblock->vl_len);
+
+ if (obj_desc->offset > obj_desc->lowbyte) {
+ memmove(dptr,
+ &(fsblock->vl_dat[0]),
+ obj_desc->offset - obj_desc->lowbyte);
+ dptr += obj_desc->offset - obj_desc->lowbyte;
+ }
- nwritten = nbytes;
- if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
- nwritten = obj_desc->highbyte - obj_desc->offset + 1;
+ nwritten = nbytes;
+ if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
+ nwritten = obj_desc->highbyte - obj_desc->offset + 1;
- memmove(dptr, dbuf, nwritten);
- dptr += nwritten;
+ memmove(dptr, dbuf, nwritten);
+ dptr += nwritten;
- if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
+ if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
*/
/*
- loc = (obj_desc->highbyte - obj_desc->offset)
- + nwritten;
- sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
+ loc = (obj_desc->highbyte - obj_desc->offset)
+ + nwritten;
+ sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
- what's going on here?? - jolly
+ what's going on here?? - jolly
*/
/*
- sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
- memmove(&(fsblock->vl_dat[0]), dptr, sz);
- }
+ sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
+ memmove(&(fsblock->vl_dat[0]), dptr, sz);
+ }
*/
- /* index the new tuple */
- inv_indextup(obj_desc, ntup);
+ /* index the new tuple */
+ inv_indextup(obj_desc, ntup);
- /* move the scandesc forward so we don't reread the newly inserted
- tuple on the next index scan */
- if (obj_desc->iscan)
- index_getnext(obj_desc->iscan, ForwardScanDirection);
+ /*
+ * move the scandesc forward so we don't reread the newly inserted
+ * tuple on the next index scan
+ */
+ if (obj_desc->iscan)
+ index_getnext(obj_desc->iscan, ForwardScanDirection);
- /*
- * Okay, by here, a tuple for the new block is correctly placed,
- * indexed, and filled. Write the changed pages out.
- */
+ /*
+ * Okay, by here, a tuple for the new block is correctly placed,
+ * indexed, and filled. Write the changed pages out.
+ */
- WriteBuffer(buffer);
- if (newbuf != buffer)
- WriteBuffer(newbuf);
+ WriteBuffer(buffer);
+ if (newbuf != buffer)
+ WriteBuffer(newbuf);
- /* done */
- return (nwritten);
+ /* done */
+ return (nwritten);
}
-static HeapTuple
-inv_newtuple(LargeObjectDesc *obj_desc,
- Buffer buffer,
- Page page,
- char *dbuf,
- int nwrite)
+static HeapTuple
+inv_newtuple(LargeObjectDesc * obj_desc,
+ Buffer buffer,
+ Page page,
+ char *dbuf,
+ int nwrite)
{
- HeapTuple ntup;
- PageHeader ph;
- int tupsize;
- int hoff;
- Offset lower;
- Offset upper;
- ItemId itemId;
- OffsetNumber off;
- OffsetNumber limit;
- char *attptr;
-
- /* compute tuple size -- no nulls */
- hoff = sizeof(HeapTupleData) - sizeof(ntup->t_bits);
-
- /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
- tupsize = hoff + (2 * sizeof(int32)) + nwrite;
- tupsize = LONGALIGN(tupsize);
-
- /*
- * Allocate the tuple on the page, violating the page abstraction.
- * This code was swiped from PageAddItem().
- */
-
- ph = (PageHeader) page;
- limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-
- /* look for "recyclable" (unused & deallocated) ItemId */
- for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off)) {
+ HeapTuple ntup;
+ PageHeader ph;
+ int tupsize;
+ int hoff;
+ Offset lower;
+ Offset upper;
+ ItemId itemId;
+ OffsetNumber off;
+ OffsetNumber limit;
+ char *attptr;
+
+ /* compute tuple size -- no nulls */
+ hoff = sizeof(HeapTupleData) - sizeof(ntup->t_bits);
+
+ /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
+ tupsize = hoff + (2 * sizeof(int32)) + nwrite;
+ tupsize = LONGALIGN(tupsize);
+
+ /*
+ * Allocate the tuple on the page, violating the page abstraction.
+ * This code was swiped from PageAddItem().
+ */
+
+ ph = (PageHeader) page;
+ limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ /* look for "recyclable" (unused & deallocated) ItemId */
+ for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
+ {
+ itemId = &ph->pd_linp[off - 1];
+ if ((((*itemId).lp_flags & LP_USED) == 0) &&
+ ((*itemId).lp_len == 0))
+ break;
+ }
+
+ if (off > limit)
+ lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
+ else if (off == limit)
+ lower = ph->pd_lower + sizeof(ItemIdData);
+ else
+ lower = ph->pd_lower;
+
+ upper = ph->pd_upper - tupsize;
+
itemId = &ph->pd_linp[off - 1];
- if ((((*itemId).lp_flags & LP_USED) == 0) &&
- ((*itemId).lp_len == 0))
- break;
- }
-
- if (off > limit)
- lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
- else if (off == limit)
- lower = ph->pd_lower + sizeof (ItemIdData);
- else
- lower = ph->pd_lower;
-
- upper = ph->pd_upper - tupsize;
-
- itemId = &ph->pd_linp[off - 1];
- (*itemId).lp_off = upper;
- (*itemId).lp_len = tupsize;
- (*itemId).lp_flags = LP_USED;
- ph->pd_lower = lower;
- ph->pd_upper = upper;
-
- ntup = (HeapTuple) ((char *) page + upper);
-
- /*
- * Tuple is now allocated on the page. Next, fill in the tuple
- * header. This block of code violates the tuple abstraction.
- */
-
- ntup->t_len = tupsize;
- ItemPointerSet(&(ntup->t_ctid), BufferGetBlockNumber(buffer), off);
- ItemPointerSetInvalid(&(ntup->t_chain));
- LastOidProcessed = ntup->t_oid = newoid();
- TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_xmin));
- ntup->t_cmin = GetCurrentCommandId();
- StoreInvalidTransactionId(&(ntup->t_xmax));
- ntup->t_cmax = 0;
- ntup->t_tmin = INVALID_ABSTIME;
- ntup->t_tmax = CURRENT_ABSTIME;
- ntup->t_natts = 2;
- ntup->t_hoff = hoff;
- ntup->t_vtype = 0;
- ntup->t_infomask = 0x0;
-
- /* if a NULL is passed in, avoid the calculations below */
- if (dbuf == NULL)
- return ntup;
-
- /*
- * Finally, copy the user's data buffer into the tuple. This violates
- * the tuple and class abstractions.
- */
-
- attptr = ((char *) ntup) + hoff;
- *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
- attptr += sizeof(int32);
-
- /*
- ** mer fixed disk layout of varlenas to get rid of the need for this.
- **
- ** *((int32 *) attptr) = nwrite + sizeof(int32);
- ** attptr += sizeof(int32);
- */
-
- *((int32 *) attptr) = nwrite + sizeof(int32);
- attptr += sizeof(int32);
-
- /*
- * If a data buffer was passed in, then copy the data from the buffer
- * to the tuple. Some callers (eg, inv_wrold()) may not pass in a
- * buffer, since they have to copy part of the old tuple data and
- * part of the user's new data into the new tuple.
- */
-
- if (dbuf != (char *) NULL)
- memmove(attptr, dbuf, nwrite);
-
- /* keep track of boundary of current tuple */
- obj_desc->lowbyte = obj_desc->offset;
- obj_desc->highbyte = obj_desc->offset + nwrite - 1;
-
- /* new tuple is filled -- return it */
- return (ntup);
+ (*itemId).lp_off = upper;
+ (*itemId).lp_len = tupsize;
+ (*itemId).lp_flags = LP_USED;
+ ph->pd_lower = lower;
+ ph->pd_upper = upper;
+
+ ntup = (HeapTuple) ((char *) page + upper);
+
+ /*
+ * Tuple is now allocated on the page. Next, fill in the tuple
+ * header. This block of code violates the tuple abstraction.
+ */
+
+ ntup->t_len = tupsize;
+ ItemPointerSet(&(ntup->t_ctid), BufferGetBlockNumber(buffer), off);
+ ItemPointerSetInvalid(&(ntup->t_chain));
+ LastOidProcessed = ntup->t_oid = newoid();
+ TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_xmin));
+ ntup->t_cmin = GetCurrentCommandId();
+ StoreInvalidTransactionId(&(ntup->t_xmax));
+ ntup->t_cmax = 0;
+ ntup->t_tmin = INVALID_ABSTIME;
+ ntup->t_tmax = CURRENT_ABSTIME;
+ ntup->t_natts = 2;
+ ntup->t_hoff = hoff;
+ ntup->t_vtype = 0;
+ ntup->t_infomask = 0x0;
+
+ /* if a NULL is passed in, avoid the calculations below */
+ if (dbuf == NULL)
+ return ntup;
+
+ /*
+ * Finally, copy the user's data buffer into the tuple. This violates
+ * the tuple and class abstractions.
+ */
+
+ attptr = ((char *) ntup) + hoff;
+ *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
+ attptr += sizeof(int32);
+
+ /*
+ * * mer fixed disk layout of varlenas to get rid of the need for
+ * this. *
+ *
+ * *((int32 *) attptr) = nwrite + sizeof(int32); * attptr +=
+ * sizeof(int32);
+ */
+
+ *((int32 *) attptr) = nwrite + sizeof(int32);
+ attptr += sizeof(int32);
+
+ /*
+ * If a data buffer was passed in, then copy the data from the buffer
+ * to the tuple. Some callers (eg, inv_wrold()) may not pass in a
+ * buffer, since they have to copy part of the old tuple data and part
+ * of the user's new data into the new tuple.
+ */
+
+ if (dbuf != (char *) NULL)
+ memmove(attptr, dbuf, nwrite);
+
+ /* keep track of boundary of current tuple */
+ obj_desc->lowbyte = obj_desc->offset;
+ obj_desc->highbyte = obj_desc->offset + nwrite - 1;
+
+ /* new tuple is filled -- return it */
+ return (ntup);
}
static void
-inv_indextup(LargeObjectDesc *obj_desc, HeapTuple htup)
+inv_indextup(LargeObjectDesc * obj_desc, HeapTuple htup)
{
- InsertIndexResult res;
- Datum v[1];
- char n[1];
+ InsertIndexResult res;
+ Datum v[1];
+ char n[1];
- n[0] = ' ';
- v[0] = Int32GetDatum(obj_desc->highbyte);
- res = index_insert(obj_desc->index_r, &v[0], &n[0],
- &(htup->t_ctid), obj_desc->heap_r);
+ n[0] = ' ';
+ v[0] = Int32GetDatum(obj_desc->highbyte);
+ res = index_insert(obj_desc->index_r, &v[0], &n[0],
+ &(htup->t_ctid), obj_desc->heap_r);
- if (res)
- pfree(res);
+ if (res)
+ pfree(res);
}
/*
static void
DumpPage(Page page, int blkno)
{
- ItemId lp;
- HeapTuple tup;
- int flags, i, nline;
- ItemPointerData pointerData;
+ ItemId lp;
+ HeapTuple tup;
+ int flags, i, nline;
+ ItemPointerData pointerData;
+
+ printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
+ ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
+ ((PageHeader)page)->pd_special);
- printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
- ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
- ((PageHeader)page)->pd_special);
+ printf("\t:MaxOffsetNumber=%d\n",
+ (int16) PageGetMaxOffsetNumber(page));
- printf("\t:MaxOffsetNumber=%d\n",
- (int16) PageGetMaxOffsetNumber(page));
-
- nline = (int16) PageGetMaxOffsetNumber(page);
+ nline = (int16) PageGetMaxOffsetNumber(page);
{
- int i;
- char *cp;
+ int i;
+ char *cp;
- i = PageGetSpecialSize(page);
- cp = PageGetSpecialPointer(page);
+ i = PageGetSpecialSize(page);
+ cp = PageGetSpecialPointer(page);
- printf("\t:SpecialData=");
+ printf("\t:SpecialData=");
- while (i > 0) {
- printf(" 0x%02x", *cp);
- cp += 1;
- i -= 1;
- }
- printf("\n");
+ while (i > 0) {
+ printf(" 0x%02x", *cp);
+ cp += 1;
+ i -= 1;
+ }
+ printf("\n");
}
- for (i = 0; i < nline; i++) {
- lp = ((PageHeader)page)->pd_linp + i;
- flags = (*lp).lp_flags;
- ItemPointerSet(&pointerData, blkno, 1 + i);
- printf("%s:off=%d:flags=0x%x:len=%d",
- ItemPointerFormExternal(&pointerData), (*lp).lp_off,
- flags, (*lp).lp_len);
+ for (i = 0; i < nline; i++) {
+ lp = ((PageHeader)page)->pd_linp + i;
+ flags = (*lp).lp_flags;
+ ItemPointerSet(&pointerData, blkno, 1 + i);
+ printf("%s:off=%d:flags=0x%x:len=%d",
+ ItemPointerFormExternal(&pointerData), (*lp).lp_off,
+ flags, (*lp).lp_len);
- if (flags & LP_USED) {
- HeapTupleData htdata;
+ if (flags & LP_USED) {
+ HeapTupleData htdata;
- printf(":USED");
+ printf(":USED");
- memmove((char *) &htdata,
- (char *) &((char *)page)[(*lp).lp_off],
- sizeof(htdata));
+ memmove((char *) &htdata,
+ (char *) &((char *)page)[(*lp).lp_off],
+ sizeof(htdata));
- tup = &htdata;
+ tup = &htdata;
- printf("\n\t:ctid=%s:oid=%d",
- ItemPointerFormExternal(&tup->t_ctid),
- tup->t_oid);
- printf(":natts=%d:thoff=%d:vtype=`%c' (0x%02x):",
- tup->t_natts,
- tup->t_hoff, tup->t_vtype, tup->t_vtype);
+ printf("\n\t:ctid=%s:oid=%d",
+ ItemPointerFormExternal(&tup->t_ctid),
+ tup->t_oid);
+ printf(":natts=%d:thoff=%d:vtype=`%c' (0x%02x):",
+ tup->t_natts,
+ tup->t_hoff, tup->t_vtype, tup->t_vtype);
- printf("\n\t:tmin=%d:cmin=%u:",
- tup->t_tmin, tup->t_cmin);
+ printf("\n\t:tmin=%d:cmin=%u:",
+ tup->t_tmin, tup->t_cmin);
- printf("xmin=%u:", tup->t_xmin);
+ printf("xmin=%u:", tup->t_xmin);
- printf("\n\t:tmax=%d:cmax=%u:",
- tup->t_tmax, tup->t_cmax);
+ printf("\n\t:tmax=%d:cmax=%u:",
+ tup->t_tmax, tup->t_cmax);
- printf("xmax=%u:", tup->t_xmax);
+ printf("xmax=%u:", tup->t_xmax);
- printf("\n\t:chain=%s:\n",
- ItemPointerFormExternal(&tup->t_chain));
- } else
- putchar('\n');
- }
+ printf("\n\t:chain=%s:\n",
+ ItemPointerFormExternal(&tup->t_chain));
+ } else
+ putchar('\n');
+ }
}
static char*
ItemPointerFormExternal(ItemPointer pointer)
{
- static char itemPointerString[32];
-
- if (!ItemPointerIsValid(pointer)) {
- memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
- } else {
- sprintf(itemPointerString, "<%u,%u>",
- ItemPointerGetBlockNumber(pointer),
- ItemPointerGetOffsetNumber(pointer));
- }
+ static char itemPointerString[32];
+
+ if (!ItemPointerIsValid(pointer)) {
+ memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
+ } else {
+ sprintf(itemPointerString, "<%u,%u>",
+ ItemPointerGetBlockNumber(pointer),
+ ItemPointerGetOffsetNumber(pointer));
+ }
- return (itemPointerString);
+ return (itemPointerString);
}
*/
static int
_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
{
- IndexScanDesc iscan;
- RetrieveIndexResult res;
- Buffer buf;
- HeapTuple htup;
- Datum d;
- long size;
- bool isNull;
+ IndexScanDesc iscan;
+ RetrieveIndexResult res;
+ Buffer buf;
+ HeapTuple htup;
+ Datum d;
+ long size;
+ bool isNull;
- /* scan backwards from end */
- iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
+ /* scan backwards from end */
+ iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
- buf = InvalidBuffer;
+ buf = InvalidBuffer;
- do {
- res = index_getnext(iscan, BackwardScanDirection);
+ do
+ {
+ res = index_getnext(iscan, BackwardScanDirection);
- /*
- * If there are no more index tuples, then the relation is empty,
- * so the file's size is zero.
- */
+ /*
+ * If there are no more index tuples, then the relation is empty,
+ * so the file's size is zero.
+ */
- if (res == (RetrieveIndexResult) NULL) {
- index_endscan(iscan);
- return (0);
- }
+ if (res == (RetrieveIndexResult) NULL)
+ {
+ index_endscan(iscan);
+ return (0);
+ }
- /*
- * For time travel, we need to use the actual time qual here,
- * rather that NowTimeQual. We currently have no way to pass
- * a time qual in.
- */
+ /*
+ * For time travel, we need to use the actual time qual here,
+ * rather that NowTimeQual. We currently have no way to pass a
+ * time qual in.
+ */
- if (buf != InvalidBuffer)
- ReleaseBuffer(buf);
+ if (buf != InvalidBuffer)
+ ReleaseBuffer(buf);
- htup = heap_fetch(hreln, NowTimeQual, &(res->heap_iptr), &buf);
+ htup = heap_fetch(hreln, NowTimeQual, &(res->heap_iptr), &buf);
- } while (!HeapTupleIsValid(htup));
+ } while (!HeapTupleIsValid(htup));
- /* don't need the index scan anymore */
- index_endscan(iscan);
+ /* don't need the index scan anymore */
+ index_endscan(iscan);
- /* get olastbyte attribute */
- d = (Datum) heap_getattr(htup, buf, 1, hdesc, &isNull);
- size = DatumGetInt32(d) + 1;
+ /* get olastbyte attribute */
+ d = (Datum) heap_getattr(htup, buf, 1, hdesc, &isNull);
+ size = DatumGetInt32(d) + 1;
- /* wei hates it if you forget to do this */
- ReleaseBuffer(buf);
+ /* wei hates it if you forget to do this */
+ ReleaseBuffer(buf);
- return (size);
+ return (size);
}