aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/catalog/Makefile7
-rw-r--r--src/backend/catalog/indexing.c4
-rw-r--r--src/backend/catalog/pg_largeobject.c135
-rw-r--r--src/backend/libpq/be-fsstubs.c6
-rw-r--r--src/backend/storage/large_object/inv_api.c1395
-rw-r--r--src/bin/pg_dump/pg_dump.c4
-rw-r--r--src/include/catalog/catname.h3
-rw-r--r--src/include/catalog/indexing.h8
-rw-r--r--src/include/catalog/pg_largeobject.h63
-rw-r--r--src/include/storage/large_object.h19
10 files changed, 1097 insertions, 547 deletions
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index 1ac858642db..6a5beee94d7 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -2,7 +2,7 @@
#
# Makefile for catalog
#
-# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.29 2000/10/21 15:55:21 momjian Exp $
+# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.30 2000/10/22 05:27:10 momjian Exp $
#
#-------------------------------------------------------------------------
@@ -11,8 +11,7 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = catalog.o heap.o index.o indexing.o aclchk.o \
- pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \
- pg_type.o
+ pg_aggregate.o pg_operator.o pg_proc.o pg_type.o
BKIFILES = global.bki template1.bki global.description template1.description
@@ -30,7 +29,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_proc.h pg_type.h pg_attribute.h pg_class.h \
pg_inherits.h pg_index.h pg_statistic.h \
pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
- pg_language.h pg_largeobject.h \
+ pg_language.h \
pg_aggregate.h pg_ipl.h pg_inheritproc.h \
pg_rewrite.h pg_listener.h pg_description.h indexing.h \
)
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index faa3dc6a421..342896a93b2 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.70 2000/10/21 15:55:21 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.71 2000/10/22 05:27:10 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,8 +51,6 @@ char *Name_pg_inherits_indices[Num_pg_inherits_indices] =
{InheritsRelidSeqnoIndex};
char *Name_pg_language_indices[Num_pg_language_indices] =
{LanguageOidIndex, LanguageNameIndex};
-char *Name_pg_largeobject_indices[Num_pg_largeobject_indices] =
-{LargeobjectLOIdIndex, LargeobjectLOIdPNIndex};
char *Name_pg_listener_indices[Num_pg_listener_indices] =
{ListenerPidRelnameIndex};
char *Name_pg_opclass_indices[Num_pg_opclass_indices] =
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
deleted file mode 100644
index ace63d32634..00000000000
--- a/src/backend/catalog/pg_largeobject.c
+++ /dev/null
@@ -1,135 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * pg_largeobject.c
- * routines to support manipulation of the pg_largeobject relation
- *
- * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.3 2000/10/21 15:55:21 momjian Exp $
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/genam.h"
-#include "access/heapam.h"
-#include "catalog/catname.h"
-#include "catalog/indexing.h"
-#include "catalog/pg_largeobject.h"
-#include "miscadmin.h"
-#include "utils/fmgroids.h"
-
-bytea *_byteain(const char *data, int32 size);
-
-bytea *_byteain(const char *data, int32 size) {
- bytea *result;
-
- result = (bytea *)palloc(size + VARHDRSZ);
- result->vl_len = size + VARHDRSZ;
- if (size > 0)
- memcpy(result->vl_dat, data, size);
-
- return result;
-}
-
-Oid LargeobjectCreate(Oid loid) {
- Oid retval;
- Relation pg_largeobject;
- HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
- Relation idescs[Num_pg_largeobject_indices];
- Datum values[Natts_pg_largeobject];
- char nulls[Natts_pg_largeobject];
- int i;
-
- for (i=0; i<Natts_pg_largeobject; i++) {
- nulls[i] = ' ';
- values[i] = (Datum)NULL;
- }
-
- i = 0;
- values[i++] = ObjectIdGetDatum(loid);
- values[i++] = Int32GetDatum(0);
- values[i++] = (Datum) _byteain(NULL, 0);
-
- pg_largeobject = heap_openr(LargeobjectRelationName, RowExclusiveLock);
- ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls);
- retval = heap_insert(pg_largeobject, ntup);
-
- if (!IsIgnoringSystemIndexes()) {
- CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup);
- CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
- }
-
- heap_close(pg_largeobject, RowExclusiveLock);
- heap_freetuple(ntup);
-
- CommandCounterIncrement();
-
- return retval;
-}
-
-void LargeobjectDrop(Oid loid) {
- Relation pg_largeobject;
- Relation pg_lo_id;
- ScanKeyData skey;
- IndexScanDesc sd = (IndexScanDesc) NULL;
- RetrieveIndexResult indexRes;
- int found = 0;
-
- ScanKeyEntryInitialize(&skey,
- (bits16) 0x0,
- (AttrNumber) 1,
- (RegProcedure) F_OIDEQ,
- ObjectIdGetDatum(loid));
-
- pg_largeobject = heap_openr(LargeobjectRelationName, RowShareLock);
- pg_lo_id = index_openr(LargeobjectLOIdIndex);
-
- sd = index_beginscan(pg_lo_id, false, 1, &skey);
-
- while((indexRes = index_getnext(sd, ForwardScanDirection))) {
- found++;
- heap_delete(pg_largeobject, &indexRes->heap_iptr, NULL);
- pfree(indexRes);
- }
-
- index_endscan(sd);
-
- index_close(pg_lo_id);
- heap_close(pg_largeobject, RowShareLock);
- if (found == 0)
- elog(ERROR, "LargeobjectDrop: large object %d not found", loid);
-}
-
-int LargeobjectFind(Oid loid) {
- int retval = 0;
- Relation pg_lo_id;
- ScanKeyData skey;
- IndexScanDesc sd = (IndexScanDesc) NULL;
- RetrieveIndexResult indexRes;
-
- ScanKeyEntryInitialize(&skey,
- (bits16) 0x0,
- (AttrNumber) 1,
- (RegProcedure) F_OIDEQ,
- ObjectIdGetDatum(loid));
-
- pg_lo_id = index_openr(LargeobjectLOIdIndex);
-
- sd = index_beginscan(pg_lo_id, false, 1, &skey);
-
- if ((indexRes = index_getnext(sd, ForwardScanDirection))) {
- retval = 1;
- pfree(indexRes);
- }
-
- index_endscan(sd);
-
- index_close(pg_lo_id);
- return retval;
-}
-
diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c
index 30b50012714..bb5c7f6e556 100644
--- a/src/backend/libpq/be-fsstubs.c
+++ b/src/backend/libpq/be-fsstubs.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.53 2000/10/21 15:55:22 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.54 2000/10/22 05:27:12 momjian Exp $
*
* NOTES
* This should be moved to a more appropriate place. It is here
@@ -267,7 +267,7 @@ lo_creat(PG_FUNCTION_ARGS)
PG_RETURN_OID(InvalidOid);
}
- lobjId = lobjDesc->id;
+ lobjId = RelationGetRelid(lobjDesc->heap_r);
inv_close(lobjDesc);
@@ -512,10 +512,8 @@ lo_commit(bool isCommit)
{
if (cookies[i] != NULL)
{
-/*
if (isCommit)
inv_cleanindex(cookies[i]);
-*/
cookies[i] = NULL;
}
}
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c
index a245c818631..5b7df0562ad 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.77 2000/10/21 15:55:24 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.78 2000/10/22 05:27:15 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -22,34 +22,58 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/nbtree.h"
-#include "access/htup.h"
#include "catalog/catalog.h"
-#include "catalog/catname.h"
#include "catalog/heap.h"
#include "catalog/index.h"
-#include "catalog/indexing.h"
#include "catalog/pg_opclass.h"
-#include "catalog/pg_largeobject.h"
#include "catalog/pg_type.h"
#include "libpq/libpq-fs.h"
#include "miscadmin.h"
#include "storage/large_object.h"
#include "storage/smgr.h"
#include "utils/fmgroids.h"
-#include "utils/builtins.h"
+#include "utils/relcache.h"
-#include <errno.h>
-
-#define IBLKSIZE (MaxTupleSize - MinHeapTupleBitmapSize - sizeof(int32) * 3)
+/*
+ * Warning, Will Robinson... In order to pack data into an inversion
+ * file as densely as possible, we violate the class abstraction here.
+ * When we're appending a new tuple to the end of the table, we check
+ * the last page to see how much data we can put on it. If it's more
+ * than IMINBLK, we write enough to fill the page. This limits external
+ * fragmentation. In no case can we write more than IMAXBLK, since
+ * the 8K postgres page size less overhead leaves only this much space
+ * for data.
+ */
-/* Defined in backend/storage/catalog/large_object.c */
-bytea *_byteain(const char *data, int32 size);
+/*
+ * In order to prevent buffer leak on transaction commit, large object
+ * scan index handling has been modified. Indexes are persistant inside
+ * a transaction but may be closed between two calls to this API (when
+ * transaction is committed while object is opened, or when no
+ * transaction is active). Scan indexes are thus now reinitialized using
+ * the object current offset. [PA]
+ *
+ * Some cleanup has been also done for non freed memory.
+ *
+ * For subsequent notes, [PA] is Pascal André <andre@via.ecp.fr>
+ */
-static int32 getbytealen(bytea *data) {
- if (VARSIZE(data) < VARHDRSZ)
- elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error.");
- return (VARSIZE(data) - VARHDRSZ);
-}
+#define IFREESPC(p) (PageGetFreeSpace(p) - \
+ MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \
+ MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
+ sizeof(double))
+#define IMAXBLK 8092
+#define IMINBLK 512
+
+/* non-export function prototypes */
+static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
+ Page page, char *dbuf, int nwrite);
+static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
+static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
+static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
+ HeapTuple tuple, Buffer buffer);
+static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
+static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
/*
* inv_create -- create a new large object.
@@ -60,13 +84,19 @@ static int32 getbytealen(bytea *data) {
* Returns:
* large object descriptor, appropriately filled in.
*/
-
LargeObjectDesc *
inv_create(int flags)
{
- int file_oid;
LargeObjectDesc *retval;
-
+ Oid file_oid;
+ Relation r;
+ Relation indr;
+ TupleDesc tupdesc;
+ IndexInfo *indexInfo;
+ Oid classObjectId[1];
+ char objname[NAMEDATALEN];
+ char indname[NAMEDATALEN];
+
/*
* add one here since the pg_class tuple created will have the next
* oid and we want to have the relation name to correspond to the
@@ -74,25 +104,104 @@ inv_create(int flags)
*/
file_oid = newoid() + 1;
- if (LargeobjectFind(file_oid) == 1)
- elog(ERROR, "inv_create: large object %d already exists. This is internal error.", file_oid);
+ /* come up with some table names */
+ sprintf(objname, "xinv%u", file_oid);
+ sprintf(indname, "xinx%u", file_oid);
+
+ if (RelnameFindRelid(objname) != InvalidOid)
+ elog(ERROR,
+ "internal error: %s already exists -- cannot create large obj",
+ objname);
+ if (RelnameFindRelid(indname) != InvalidOid)
+ elog(ERROR,
+ "internal error: %s already exists -- cannot create large obj",
+ indname);
+
+ /* this is pretty painful... want a tuple descriptor */
+ tupdesc = CreateTemplateTupleDesc(2);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1,
+ "olastbye",
+ INT4OID,
+ -1, 0, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2,
+ "odata",
+ BYTEAOID,
+ -1, 0, false);
+
+ /*
+ * First create the table to hold the inversion large object. It will
+ * be located on whatever storage manager the user requested.
+ */
+
+ heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT,
+ false, false);
+
+ /* make the relation visible in this transaction */
+ CommandCounterIncrement();
+
+ /*--------------------
+ * We hold AccessShareLock on any large object we have open
+ * by inv_create or inv_open; it is released by inv_close.
+ * Note this will not conflict with ExclusiveLock or ShareLock
+ * that we acquire when actually reading/writing; it just prevents
+ * deletion of the large object while we have it open.
+ *--------------------
+ */
+ r = heap_openr(objname, AccessShareLock);
+
+ /*
+ * Now create a btree index on the relation's olastbyte attribute to
+ * make seeks go faster.
+ */
+ indexInfo = makeNode(IndexInfo);
+ indexInfo->ii_NumIndexAttrs = 1;
+ indexInfo->ii_NumKeyAttrs = 1;
+ indexInfo->ii_KeyAttrNumbers[0] = 1;
+ indexInfo->ii_Predicate = NULL;
+ indexInfo->ii_FuncOid = InvalidOid;
+ indexInfo->ii_Unique = false;
+
+ classObjectId[0] = INT4_OPS_OID;
+
+ index_create(objname, indname, indexInfo,
+ BTREE_AM_OID, classObjectId,
+ false, false, false);
+
+ /* make the index visible in this transaction */
+ CommandCounterIncrement();
+
+ indr = index_openr(indname);
+
+ if (!RelationIsValid(indr))
+ {
+ elog(ERROR, "cannot create index for large obj on %s under inversion",
+ DatumGetCString(DirectFunctionCall1(smgrout,
+ Int16GetDatum(DEFAULT_SMGR))));
+ }
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
- if (flags & INV_WRITE) {
+ retval->heap_r = r;
+ retval->index_r = indr;
+ retval->iscan = (IndexScanDesc) NULL;
+ retval->hdesc = RelationGetDescr(r);
+ retval->idesc = RelationGetDescr(indr);
+ retval->offset = retval->lowbyte = retval->highbyte = 0;
+ ItemPointerSetInvalid(&(retval->htid));
+ retval->flags = 0;
+
+ if (flags & INV_WRITE)
+ {
+ LockRelation(r, ExclusiveLock);
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
- retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock);
- } else if (flags & INV_READ) {
+ }
+ else if (flags & INV_READ)
+ {
+ LockRelation(r, ShareLock);
retval->flags = IFS_RDLOCK;
- retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock);
- } else
- elog(ERROR, "inv_create: invalid flags: %d", flags);
-
- retval->flags |= IFS_ATEOF;
- retval->index_r = index_openr(LargeobjectLOIdPNIndex);
- retval->offset = 0;
- retval->id = file_oid;
- (void)LargeobjectCreate(file_oid);
+ }
+ retval->flags |= IFS_ATEOF; /* since we know the object is empty */
+
return retval;
}
@@ -100,24 +209,46 @@ LargeObjectDesc *
inv_open(Oid lobjId, int flags)
{
LargeObjectDesc *retval;
+ Relation r;
+ char *indname;
+ Relation indrel;
+
+ r = heap_open(lobjId, AccessShareLock);
- if (LargeobjectFind(lobjId) == 0)
- elog(ERROR, "inv_open: large object %d not found", lobjId);
-
- retval = (LargeObjectDesc *)palloc(sizeof(LargeObjectDesc));
+ indname = pstrdup(RelationGetRelationName(r));
+
+ /*
+ * hack hack hack... we know that the fourth character of the
+ * relation name is a 'v', and that the fourth character of the index
+ * name is an 'x', and that they're otherwise identical.
+ */
+ indname[3] = 'x';
+ indrel = index_openr(indname);
- if (flags & INV_WRITE) {
+ if (!RelationIsValid(indrel))
+ return (LargeObjectDesc *) NULL;
+
+ retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
+
+ retval->heap_r = r;
+ retval->index_r = indrel;
+ retval->iscan = (IndexScanDesc) NULL;
+ retval->hdesc = RelationGetDescr(r);
+ retval->idesc = RelationGetDescr(indrel);
+ retval->offset = retval->lowbyte = retval->highbyte = 0;
+ ItemPointerSetInvalid(&(retval->htid));
+ retval->flags = 0;
+
+ if (flags & INV_WRITE)
+ {
+ LockRelation(r, ExclusiveLock);
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
- retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock);
- } else if (flags & INV_READ) {
+ }
+ else if (flags & INV_READ)
+ {
+ LockRelation(r, ShareLock);
retval->flags = IFS_RDLOCK;
- retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock);
- } else
- elog(ERROR, "inv_open: invalid flags: %d", flags);
-
- retval->index_r = index_openr(LargeobjectLOIdPNIndex);
- retval->offset = 0;
- retval->id = lobjId;
+ }
return retval;
}
@@ -130,11 +261,15 @@ inv_close(LargeObjectDesc *obj_desc)
{
Assert(PointerIsValid(obj_desc));
- if (obj_desc->flags & IFS_WRLOCK)
- heap_close(obj_desc->heap_r, RowExclusiveLock);
- else if (obj_desc->flags & IFS_RDLOCK)
- heap_close(obj_desc->heap_r, AccessShareLock);
+ if (obj_desc->iscan != (IndexScanDesc) NULL)
+ {
+ index_endscan(obj_desc->iscan);
+ obj_desc->iscan = NULL;
+ }
+
index_close(obj_desc->index_r);
+ heap_close(obj_desc->heap_r, AccessShareLock);
+
pfree(obj_desc);
}
@@ -146,7 +281,24 @@ inv_close(LargeObjectDesc *obj_desc)
int
inv_drop(Oid lobjId)
{
- LargeobjectDrop(lobjId);
+ Relation r;
+
+ r = RelationIdGetRelation(lobjId);
+ if (!RelationIsValid(r))
+ return -1;
+
+ if (r->rd_rel->relkind != RELKIND_LOBJECT)
+ {
+ /* drop relcache refcount from RelationIdGetRelation */
+ RelationDecrementReferenceCount(r);
+ return -1;
+ }
+
+ /*
+ * Since heap_drop_with_catalog will destroy the relcache entry,
+ * there's no need to drop the refcount in this path.
+ */
+ heap_drop_with_catalog(RelationGetRelationName(r), false);
return 1;
}
@@ -212,75 +364,71 @@ inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
#endif
-static uint32 inv_getsize(LargeObjectDesc *obj_desc) {
- uint32 found = 0;
- uint32 lastbyte = 0;
- ScanKeyData skey;
- IndexScanDesc sd = (IndexScanDesc) NULL;
- RetrieveIndexResult indexRes;
- HeapTupleData tuple;
- Buffer buffer;
- Form_pg_largeobject data;
+int
+inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
+{
+ int oldOffset;
+ Datum d;
+ ScanKeyData skey;
Assert(PointerIsValid(obj_desc));
- ScanKeyEntryInitialize(&skey,
- (bits16) 0x0,
- (AttrNumber) 1,
- (RegProcedure) F_OIDEQ,
- ObjectIdGetDatum(obj_desc->id));
-
- sd = index_beginscan(obj_desc->index_r, true, 1, &skey);
- tuple.t_datamcxt = CurrentMemoryContext;
- tuple.t_data = NULL;
- while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
- tuple.t_self = indexRes->heap_iptr;
- heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
- pfree(indexRes);
- if (tuple.t_data == NULL)
- continue;
- found++;
- data = (Form_pg_largeobject) GETSTRUCT(&tuple);
- lastbyte = data->pageno * IBLKSIZE + getbytealen(&(data->data));
- ReleaseBuffer(buffer);
- break;
+ if (whence == SEEK_CUR)
+ {
+ offset += obj_desc->offset; /* calculate absolute position */
}
-
- index_endscan(sd);
+ else if (whence == SEEK_END)
+ {
+ /* need read lock for getsize */
+ if (!(obj_desc->flags & IFS_RDLOCK))
+ {
+ LockRelation(obj_desc->heap_r, ShareLock);
+ obj_desc->flags |= IFS_RDLOCK;
+ }
+ offset += _inv_getsize(obj_desc->heap_r,
+ obj_desc->hdesc,
+ obj_desc->index_r);
+ }
+ /* now we can assume that the operation is SEEK_SET */
- if (found == 0)
- elog(ERROR, "inv_getsize: large object %d not found", obj_desc->id);
- return lastbyte;
-}
+ /*
+ * Whenever we do a seek, we turn off the EOF flag bit to force
+ * ourselves to check for real on the next read.
+ */
-int
-inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
-{
- Assert(PointerIsValid(obj_desc));
+ obj_desc->flags &= ~IFS_ATEOF;
+ oldOffset = obj_desc->offset;
+ obj_desc->offset = offset;
- switch (whence) {
- case SEEK_SET:
- if (offset < 0)
- elog(ERROR, "inv_seek: invalid offset: %d", offset);
- obj_desc->offset = offset;
- break;
- case SEEK_CUR:
- if ((obj_desc->offset + offset) < 0)
- elog(ERROR, "inv_seek: invalid offset: %d", offset);
- obj_desc->offset += offset;
- break;
- case SEEK_END:
- {
- int4 size = inv_getsize(obj_desc);
- if (offset > size)
- elog(ERROR, "inv_seek: invalid offset");
- obj_desc->offset = size - offset;
- }
- break;
- default:
- elog(ERROR, "inv_seek: invalid whence: %d", whence);
+ /* try to avoid doing any work, if we can manage it */
+ if (offset >= obj_desc->lowbyte
+ && offset <= obj_desc->highbyte
+ && oldOffset <= obj_desc->highbyte
+ && obj_desc->iscan != (IndexScanDesc) NULL)
+ return offset;
+
+ /*
+ * To do a seek on an inversion file, we start an index scan that will
+ * bring us to the right place. Each tuple in an inversion file
+ * stores the offset of the last byte that appears on it, and we have
+ * an index on this.
+ */
+ if (obj_desc->iscan != (IndexScanDesc) NULL)
+ {
+ d = Int32GetDatum(offset);
+ btmovescan(obj_desc->iscan, d);
}
- return obj_desc->offset;
+ else
+ {
+ ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
+ Int32GetDatum(offset));
+
+ obj_desc->iscan = index_beginscan(obj_desc->index_r,
+ (bool) 0, (uint16) 1,
+ &skey);
+ }
+
+ return offset;
}
int
@@ -294,259 +442,862 @@ inv_tell(LargeObjectDesc *obj_desc)
int
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
{
- uint32 nread = 0;
- uint32 n;
- uint32 off;
- uint32 len;
- uint32 found = 0;
- uint32 pageno = obj_desc->offset / IBLKSIZE;
- ScanKeyData skey[2];
- IndexScanDesc sd = (IndexScanDesc) NULL;
- RetrieveIndexResult indexRes;
- HeapTupleData tuple;
- Buffer buffer;
- Form_pg_largeobject data;
+ HeapTupleData tuple;
+ int nread;
+ int off;
+ int ncopy;
+ Datum d;
+ struct varlena *fsblock;
+ bool isNull;
Assert(PointerIsValid(obj_desc));
Assert(buf != NULL);
- ScanKeyEntryInitialize(&skey[0],
- (bits16) 0x0,
- (AttrNumber) 1,
- (RegProcedure) F_OIDEQ,
- ObjectIdGetDatum(obj_desc->id));
-
- ScanKeyEntryInitialize(&skey[1],
- (bits16) 0x0,
- (AttrNumber) 2,
- (RegProcedure) F_INT4GE,
- Int32GetDatum(pageno));
-
- sd = index_beginscan(obj_desc->index_r, false, 2, skey);
- tuple.t_datamcxt = CurrentMemoryContext;
- tuple.t_data = NULL;
- while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
- tuple.t_self = indexRes->heap_iptr;
- heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
- pfree(indexRes);
+ /* if we're already at EOF, we don't need to do any work here */
+ if (obj_desc->flags & IFS_ATEOF)
+ return 0;
- if (tuple.t_data == NULL)
- continue;
-
- found++;
- data = (Form_pg_largeobject) GETSTRUCT(&tuple);
- if (data->pageno != pageno) {
- ReleaseBuffer(buffer);
- index_endscan(sd);
- return 0;
- }
+ /* make sure we obey two-phase locking */
+ if (!(obj_desc->flags & IFS_RDLOCK))
+ {
+ LockRelation(obj_desc->heap_r, ShareLock);
+ obj_desc->flags |= IFS_RDLOCK;
+ }
- len = getbytealen(&(data->data));
- off = obj_desc->offset % IBLKSIZE;
- if (off == len) {
- ReleaseBuffer(buffer);
+ nread = 0;
+
+ /* fetch a block at a time */
+ while (nread < nbytes)
+ {
+ Buffer buffer;
+
+ /* fetch an inversion file system block */
+ inv_fetchtup(obj_desc, &tuple, &buffer);
+
+ if (tuple.t_data == NULL)
+ {
+ obj_desc->flags |= IFS_ATEOF;
break;
}
- if (off > len) {
- ReleaseBuffer(buffer);
- index_endscan(sd);
- return 0;
- }
- n = len - off;
-
- n = (n < (nbytes - nread)) ? n : (nbytes - nread);
- memcpy(buf + nread, VARDATA(&(data->data)) + off, n);
- nread += n;
- obj_desc->offset += n;
+ /* copy the data from this block into the buffer */
+ d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
+ fsblock = (struct varlena *) DatumGetPointer(d);
ReleaseBuffer(buffer);
- pageno++;
- if (nread == nbytes)
- break;
- }
- index_endscan(sd);
+ /*
+ * If block starts beyond current seek point, then we are looking
+ * at a "hole" (unwritten area) in the object. Return zeroes for
+ * the "hole".
+ */
+ if (obj_desc->offset < obj_desc->lowbyte)
+ {
+ int nzeroes = obj_desc->lowbyte - obj_desc->offset;
+
+ if (nzeroes > (nbytes - nread))
+ nzeroes = (nbytes - nread);
+ MemSet(buf, 0, nzeroes);
+ buf += nzeroes;
+ nread += nzeroes;
+ obj_desc->offset += nzeroes;
+ if (nread >= nbytes)
+ break;
+ }
+
+ off = obj_desc->offset - obj_desc->lowbyte;
+ ncopy = obj_desc->highbyte - obj_desc->offset + 1;
+ if (ncopy > (nbytes - nread))
+ ncopy = (nbytes - nread);
+ memmove(buf, &(fsblock->vl_dat[off]), ncopy);
- if (found == 0)
- return 0;
+ /* move pointers past the amount we just read */
+ buf += ncopy;
+ nread += ncopy;
+ obj_desc->offset += ncopy;
+ }
return nread;
}
-static int inv_write_existing(LargeObjectDesc *obj_desc, char *buf, int nbytes, int *found) {
- uint32 n = 0;
- uint32 off;
- uint32 len;
- int i;
- HeapTupleData tuple;
- HeapTuple newtup;
- Buffer buffer;
- Form_pg_largeobject data;
- ScanKeyData skey[2];
- IndexScanDesc sd = (IndexScanDesc) NULL;
- RetrieveIndexResult indexRes;
- Relation idescs[Num_pg_largeobject_indices];
- Datum values[Natts_pg_largeobject];
- char nulls[Natts_pg_largeobject];
- char replace[Natts_pg_largeobject];
+int
+inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
+{
+ HeapTupleData tuple;
+ int nwritten;
+ int tuplen;
Assert(PointerIsValid(obj_desc));
Assert(buf != NULL);
- ScanKeyEntryInitialize(&skey[0],
- (bits16) 0,
- (AttrNumber) 1,
- (RegProcedure) F_OIDEQ,
- ObjectIdGetDatum(obj_desc->id));
+ /*
+ * Make sure we obey two-phase locking. A write lock entitles you to
+ * read the relation, as well.
+ */
- ScanKeyEntryInitialize(&skey[1],
- (bits16) 0x0,
- (AttrNumber) 2,
- (RegProcedure) F_INT4EQ,
- Int32GetDatum(obj_desc->offset / IBLKSIZE));
+ if (!(obj_desc->flags & IFS_WRLOCK))
+ {
+ LockRelation(obj_desc->heap_r, ExclusiveLock);
+ obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
+ }
- CommandCounterIncrement();
- sd = index_beginscan(obj_desc->index_r, false, 2, skey);
- tuple.t_datamcxt = CurrentMemoryContext;
- tuple.t_data = NULL;
- while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
- tuple.t_self = indexRes->heap_iptr;
- heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
- pfree(indexRes);
- if (tuple.t_data != NULL)
- break;
+ nwritten = 0;
+
+ /* write a block at a time */
+ while (nwritten < nbytes)
+ {
+ Buffer buffer;
+
+ /*
+ * Fetch the current inversion file system block. We can skip
+ * the work if we already know we are at EOF.
+ */
+
+ if (obj_desc->flags & IFS_ATEOF)
+ tuple.t_data = NULL;
+ else
+ inv_fetchtup(obj_desc, &tuple, &buffer);
+
+ /* either append or replace a block, as required */
+ if (tuple.t_data == NULL)
+ tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
+ else
+ {
+ if (obj_desc->offset > obj_desc->highbyte)
+ {
+ tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
+ ReleaseBuffer(buffer);
+ }
+ else
+ tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
+
+ /*
+ * inv_wrold() has already issued WriteBuffer() which has
+ * decremented local reference counter (LocalRefCount). So we
+ * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
+ */
+ }
+
+ /* move pointers past the amount we just wrote */
+ buf += tuplen;
+ nwritten += tuplen;
+ obj_desc->offset += tuplen;
}
- index_endscan(sd);
- if (tuple.t_data == NULL)
- return 0;
-
- (*found)++;
- data = (Form_pg_largeobject) GETSTRUCT(&tuple);
- off = obj_desc->offset % IBLKSIZE;
- len = getbytealen(&(data->data));
+ /* that's it */
+ return nwritten;
+}
- if (len > IBLKSIZE) {
- ReleaseBuffer(buffer);
- elog(FATAL, "Internal error: len > IBLKSIZE");
+/*
+ * inv_cleanindex
+ * Clean opened indexes for large objects, and clears current result.
+ * This is necessary on transaction commit in order to prevent buffer
+ * leak.
+ * This function must be called for each opened large object.
+ * [ PA, 7/17/98 ]
+ */
+void
+inv_cleanindex(LargeObjectDesc *obj_desc)
+{
+ Assert(PointerIsValid(obj_desc));
+
+ if (obj_desc->iscan == (IndexScanDesc) NULL)
+ return;
+
+ index_endscan(obj_desc->iscan);
+ obj_desc->iscan = (IndexScanDesc) NULL;
+
+ ItemPointerSetInvalid(&(obj_desc->htid));
+}
+
+/*
+ * inv_fetchtup -- Fetch an inversion file system block.
+ *
+ * This routine finds the file system block containing the offset
+ * recorded in the obj_desc structure. Later, we need to think about
+ * the effects of non-functional updates (can you rewrite the same
+ * block twice in a single transaction?), but for now, we won't bother.
+ *
+ * Parameters:
+ * obj_desc -- the object descriptor.
+ * bufP -- pointer to a buffer in the buffer cache; caller
+ * must free this.
+ *
+ * Returns:
+ * A heap tuple containing the desired block, or NULL if no
+ * such tuple exists.
+ */
+static void
+inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
+{
+ RetrieveIndexResult res;
+ Datum d;
+ int firstbyte,
+ lastbyte;
+ struct varlena *fsblock;
+ bool isNull;
+
+ /*
+ * If we've exhausted the current block, we need to get the next one.
+ * When we support time travel and non-functional updates, we will
+ * need to loop over the blocks, rather than just have an 'if', in
+ * order to find the one we're really interested in.
+ */
+
+ if (obj_desc->offset > obj_desc->highbyte
+ || obj_desc->offset < obj_desc->lowbyte
+ || !ItemPointerIsValid(&(obj_desc->htid)))
+ {
+ ScanKeyData skey;
+
+ ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
+ Int32GetDatum(obj_desc->offset));
+
+ /* initialize scan key if not done */
+ if (obj_desc->iscan == (IndexScanDesc) NULL)
+ {
+
+ /*
+ * As scan index may be prematurely closed (on commit), we
+ * must use object current offset (was 0) to reinitialize the
+ * entry [ PA ].
+ */
+ obj_desc->iscan = index_beginscan(obj_desc->index_r,
+ (bool) 0, (uint16) 1,
+ &skey);
+ }
+ else
+ index_rescan(obj_desc->iscan, false, &skey);
+
+ do
+ {
+ res = index_getnext(obj_desc->iscan, ForwardScanDirection);
+
+ if (res == (RetrieveIndexResult) NULL)
+ {
+ ItemPointerSetInvalid(&(obj_desc->htid));
+ tuple->t_datamcxt = NULL;
+ tuple->t_data = NULL;
+ return;
+ }
+
+ /*
+ * For time travel, we need to use the actual time qual here,
+ * rather that NowTimeQual. We currently have no way to pass
+ * a time qual in.
+ *
+ * This is now valid for snapshot !!! And should be fixed in some
+ * way... - vadim 07/28/98
+ *
+ */
+ tuple->t_self = res->heap_iptr;
+ heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
+ pfree(res);
+ } while (tuple->t_data == NULL);
+
+ /* remember this tid -- we may need it for later reads/writes */
+ ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
}
+ else
+ {
+ tuple->t_self = obj_desc->htid;
+ heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
+ if (tuple->t_data == NULL)
+ elog(ERROR, "inv_fetchtup: heap_fetch failed");
+ }
+
+ /*
+ * By here, we have the heap tuple we're interested in. We cache the
+ * upper and lower bounds for this block in the object descriptor and
+ * return the tuple.
+ */
- for (i=0; i<Natts_pg_largeobject; i++) {
- nulls[i] = ' ';
- replace[i] = ' ';
- values[i] = (Datum)NULL;
+ d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
+ lastbyte = (int32) DatumGetInt32(d);
+ d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
+ fsblock = (struct varlena *) DatumGetPointer(d);
+
+ /*
+ * order of + and - is important -- these are unsigned quantites near
+ * 0
+ */
+ firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
+
+ obj_desc->lowbyte = firstbyte;
+ obj_desc->highbyte = lastbyte;
+
+ return;
+}
+
+/*
+ * inv_wrnew() -- append a new filesystem block tuple to the inversion
+ * file.
+ *
+ * In response to an inv_write, we append one or more file system
+ * blocks to the class containing the large object. We violate the
+ * class abstraction here in order to pack things as densely as we
+ * are able. We examine the last page in the relation, and write
+ * just enough to fill it, assuming that it has above a certain
+ * threshold of space available. If the space available is less than
+ * the threshold, we allocate a new page by writing a big tuple.
+ *
+ * By the time we get here, we know all the parameters passed in
+ * are valid, and that we hold the appropriate lock on the heap
+ * relation.
+ *
+ * Parameters:
+ * obj_desc: large object descriptor for which to append block.
+ * buf: buffer containing data to write.
+ * nbytes: amount to write
+ *
+ * Returns:
+ * number of bytes actually written to the new tuple.
+ */
+static int
+inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
+{
+ Relation hr;
+ HeapTuple ntup;
+ Buffer buffer;
+ Page page;
+ int nblocks;
+ int nwritten;
+
+ hr = obj_desc->heap_r;
+
+ /*
+ * Get the last block in the relation. If there's no data in the
+ * relation at all, then we just get a new block. Otherwise, we check
+ * the last block to see whether it has room to accept some or all of
+ * the data that the user wants to write. If it doesn't, then we
+ * allocate a new block.
+ */
+
+ nblocks = RelationGetNumberOfBlocks(hr);
+
+ if (nblocks > 0)
+ {
+ buffer = ReadBuffer(hr, nblocks - 1);
+ page = BufferGetPage(buffer);
}
+ else
+ {
+ buffer = ReadBuffer(hr, P_NEW);
+ page = BufferGetPage(buffer);
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ }
+
+ /*
+ * If the last page is too small to hold all the data, and it's too
+ * small to hold IMINBLK, then we allocate a new page. If it will
+ * hold at least IMINBLK, but less than all the data requested, then
+ * we write IMINBLK here. The caller is responsible for noticing that
+ * less than the requested number of bytes were written, and calling
+ * this routine again.
+ */
- i = 0;
+ nwritten = IFREESPC(page);
+ if (nwritten < nbytes)
{
- char b[IBLKSIZE];
- int4 rest = len - off;
-
- memset(b, 0, IBLKSIZE); /* Can optimize later */
- if ((off > 0) && (len > 0)) /* We start in the middle of the tuple */
- memcpy(b, VARDATA(&(data->data)), (off > len) ? len : off);
-
- if ((nbytes <= rest) || (len == IBLKSIZE)) {
- /* We will update inside existing tuple size */
- if (nbytes < rest)
- n = rest;
- else
- n = nbytes;
- memcpy(b + off, buf, n);
- if (n < rest) /* There's a rest of the tuple left */
- memcpy(b + off + n, VARDATA(&(data->data)) + off + n, rest - n);
- /* Update data only */
- replace[2] = 'r';
- values[2] = (Datum) _byteain(b, len);
- } else {
- /* We will extend tuple */
- /* Do we fit into max tuple size */
- if (nbytes <= (IBLKSIZE - off))
- len = off + nbytes;
+ if (nwritten < IMINBLK)
+ {
+ ReleaseBuffer(buffer);
+ buffer = ReadBuffer(hr, P_NEW);
+ page = BufferGetPage(buffer);
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ if (nbytes > IMAXBLK)
+ nwritten = IMAXBLK;
else
- len = IBLKSIZE;
- n = len - off;
- memcpy(b + off, buf, n);
- /* Update data */
- replace[2] = 'r';
- values[2] = (Datum) _byteain(b, len);
- }
-
- newtup = heap_modifytuple(&tuple, obj_desc->heap_r,
- values, nulls, replace);
-
- heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL);
- if (!IsIgnoringSystemIndexes()) {
- CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, newtup);
- CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
+ nwritten = nbytes;
}
- heap_freetuple(newtup);
}
- ReleaseBuffer(buffer);
-
- return n;
+ else
+ nwritten = nbytes;
+
+ /*
+ * Insert a new file system block tuple, index it, and write it out.
+ */
+
+ ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
+ inv_indextup(obj_desc, ntup);
+ heap_freetuple(ntup);
+
+ /* new tuple is inserted */
+ WriteBuffer(buffer);
+
+ return nwritten;
}
-static int inv_write_append(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
- HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
- Relation idescs[Num_pg_largeobject_indices];
- Datum values[Natts_pg_largeobject];
- char nulls[Natts_pg_largeobject];
- int i;
- uint32 len;
-
- for (i=0; i<Natts_pg_largeobject; i++) {
- nulls[i] = ' ';
- values[i] = (Datum)NULL;
+static int
+inv_wrold(LargeObjectDesc *obj_desc,
+ char *dbuf,
+ int nbytes,
+ HeapTuple tuple,
+ Buffer buffer)
+{
+ Relation hr;
+ HeapTuple ntup;
+ Buffer newbuf;
+ Page page;
+ Page newpage;
+ int tupbytes;
+ Datum d;
+ struct varlena *fsblock;
+ int nwritten,
+ nblocks,
+ freespc;
+ bool isNull;
+ int keep_offset;
+ RetrieveIndexResult res;
+
+ /*
+ * Since we're using a no-overwrite storage manager, the way we
+ * overwrite blocks is to mark the old block invalid and append a new
+ * block. First mark the old block invalid. This violates the tuple
+ * abstraction.
+ */
+
+ TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
+ tuple->t_data->t_cmax = GetCurrentCommandId();
+ tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
+
+ /*
+ * If we're overwriting the entire block, we're lucky. All we need to
+ * do is to insert a new block.
+ */
+
+ if (obj_desc->offset == obj_desc->lowbyte
+ && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
+ {
+ WriteBuffer(buffer);
+ return inv_wrnew(obj_desc, dbuf, nbytes);
}
- i = 0;
- values[i++] = ObjectIdGetDatum(obj_desc->id);
- len = (nbytes > IBLKSIZE) ? IBLKSIZE : nbytes;
-
- values[i++] = Int32GetDatum(obj_desc->offset / IBLKSIZE);
- values[i++] = (Datum) _byteain(buf, len);
-
- ntup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls);
- heap_insert(obj_desc->heap_r, ntup);
-
- if (!IsIgnoringSystemIndexes()) {
- CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, ntup);
- CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
+ /*
+ * By here, we need to overwrite part of the data in the current
+ * tuple. In order to reduce the degree to which we fragment blocks,
+ * we guarantee that no block will be broken up due to an overwrite.
+ * This means that we need to allocate a tuple on a new page, if
+ * there's not room for the replacement on this one.
+ */
+
+ newbuf = buffer;
+ page = BufferGetPage(buffer);
+ newpage = BufferGetPage(newbuf);
+ hr = obj_desc->heap_r;
+ freespc = IFREESPC(page);
+ d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
+ fsblock = (struct varlena *) DatumGetPointer(d);
+ tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
+
+ if (freespc < tupbytes)
+ {
+
+ /*
+ * First see if there's enough space on the last page of the table
+ * to put this tuple.
+ */
+
+ nblocks = RelationGetNumberOfBlocks(hr);
+
+ if (nblocks > 0)
+ {
+ newbuf = ReadBuffer(hr, nblocks - 1);
+ newpage = BufferGetPage(newbuf);
+ }
+ else
+ {
+ newbuf = ReadBuffer(hr, P_NEW);
+ newpage = BufferGetPage(newbuf);
+ PageInit(newpage, BufferGetPageSize(newbuf), 0);
+ }
+
+ freespc = IFREESPC(newpage);
+
+ /*
+ * If there's no room on the last page, allocate a new last page
+ * for the table, and put it there.
+ */
+
+ if (freespc < tupbytes)
+ {
+ ReleaseBuffer(newbuf);
+ newbuf = ReadBuffer(hr, P_NEW);
+ newpage = BufferGetPage(newbuf);
+ PageInit(newpage, BufferGetPageSize(newbuf), 0);
+ }
}
-
+
+ nwritten = nbytes;
+ if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
+ nwritten = obj_desc->highbyte - obj_desc->offset + 1;
+ memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
+ dbuf, nwritten);
+
+ /*
+ * we are rewriting the entire old block, therefore we reset offset to
+ * the lowbyte of the original block before jumping into
+ * inv_newtuple()
+ */
+ keep_offset = obj_desc->offset;
+ obj_desc->offset = obj_desc->lowbyte;
+ ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
+ tupbytes);
+ /* after we are done, we restore to the true offset */
+ obj_desc->offset = keep_offset;
+
+ /*
+ * By here, we have a page (newpage) that's guaranteed to have enough
+ * space on it to put the new tuple. Call inv_newtuple to do the
+ * work. Passing NULL as a buffer to inv_newtuple() keeps it from
+ * copying any data into the new tuple. When it returns, the tuple is
+ * ready to receive data from the old tuple and the user's data
+ * buffer.
+ */
+/*
+ ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
+ dptr = ((char *) ntup) + ntup->t_hoff -
+ (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
+ sizeof(int4)
+ + sizeof(fsblock->vl_len);
+
+ if (obj_desc->offset > obj_desc->lowbyte) {
+ memmove(dptr,
+ &(fsblock->vl_dat[0]),
+ obj_desc->offset - obj_desc->lowbyte);
+ dptr += obj_desc->offset - obj_desc->lowbyte;
+ }
+
+
+ nwritten = nbytes;
+ if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
+ nwritten = obj_desc->highbyte - obj_desc->offset + 1;
+
+ memmove(dptr, dbuf, nwritten);
+ dptr += nwritten;
+
+ if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
+*/
+/*
+ loc = (obj_desc->highbyte - obj_desc->offset)
+ + nwritten;
+ sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
+
+ what's going on here?? - jolly
+*/
+/*
+ sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
+ memmove(&(fsblock->vl_dat[0]), dptr, sz);
+ }
+*/
+
+
+ /* index the new tuple */
+ inv_indextup(obj_desc, ntup);
heap_freetuple(ntup);
-
- return len;
+
+ /*
+ * move the scandesc forward so we don't reread the newly inserted
+ * tuple on the next index scan
+ */
+ res = NULL;
+ if (obj_desc->iscan)
+ res = index_getnext(obj_desc->iscan, ForwardScanDirection);
+
+ if (res)
+ pfree(res);
+
+ /*
+ * Okay, by here, a tuple for the new block is correctly placed,
+ * indexed, and filled. Write the changed pages out.
+ */
+
+ WriteBuffer(buffer);
+ if (newbuf != buffer)
+ WriteBuffer(newbuf);
+
+ /* Tuple id is no longer valid */
+ ItemPointerSetInvalid(&(obj_desc->htid));
+
+ /* done */
+ return nwritten;
}
-
-static int inv_write_int(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
- int nwritten = 0;
- int found = 0;
-
- if (nbytes == 0)
- return 0;
- nwritten = inv_write_existing(obj_desc, buf, nbytes, &found);
- if (found > 0) {
- obj_desc->offset += nwritten;
- return nwritten;
+static HeapTuple
+inv_newtuple(LargeObjectDesc *obj_desc,
+ Buffer buffer,
+ Page page,
+ char *dbuf,
+ int nwrite)
+{
+ HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
+ PageHeader ph;
+ int tupsize;
+ int hoff;
+ Offset lower;
+ Offset upper;
+ ItemId itemId;
+ OffsetNumber off;
+ OffsetNumber limit;
+ char *attptr;
+
+ /* compute tuple size -- no nulls */
+ hoff = offsetof(HeapTupleHeaderData, t_bits);
+ hoff = MAXALIGN(hoff);
+
+ /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
+ tupsize = hoff + (2 * sizeof(int32)) + nwrite;
+ tupsize = MAXALIGN(tupsize);
+
+ /*
+ * Allocate the tuple on the page, violating the page abstraction.
+ * This code was swiped from PageAddItem().
+ */
+
+ ph = (PageHeader) page;
+ limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ /* look for "recyclable" (unused & deallocated) ItemId */
+ for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
+ {
+ itemId = &ph->pd_linp[off - 1];
+ if ((((*itemId).lp_flags & LP_USED) == 0) &&
+ ((*itemId).lp_len == 0))
+ break;
}
- /* Looks like we are beyond the end of the file */
- nwritten = inv_write_append(obj_desc, buf, nbytes);
- obj_desc->offset += nwritten;
- return nwritten;
+
+ if (off > limit)
+ lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
+ else if (off == limit)
+ lower = ph->pd_lower + sizeof(ItemIdData);
+ else
+ lower = ph->pd_lower;
+
+ upper = ph->pd_upper - tupsize;
+
+ itemId = &ph->pd_linp[off - 1];
+ (*itemId).lp_off = upper;
+ (*itemId).lp_len = tupsize;
+ (*itemId).lp_flags = LP_USED;
+ ph->pd_lower = lower;
+ ph->pd_upper = upper;
+
+ ntup->t_datamcxt = NULL;
+ ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
+
+ /*
+ * Tuple is now allocated on the page. Next, fill in the tuple
+ * header. This block of code violates the tuple abstraction.
+ */
+
+ ntup->t_len = tupsize;
+ ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
+ ntup->t_data->t_oid = newoid();
+ TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
+ ntup->t_data->t_cmin = GetCurrentCommandId();
+ StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
+ ntup->t_data->t_cmax = 0;
+ ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
+ ntup->t_data->t_natts = 2;
+ ntup->t_data->t_hoff = hoff;
+
+ /* if a NULL is passed in, avoid the calculations below */
+ if (dbuf == NULL)
+ return ntup;
+
+ /*
+ * Finally, copy the user's data buffer into the tuple. This violates
+ * the tuple and class abstractions.
+ */
+
+ attptr = ((char *) ntup->t_data) + hoff;
+ *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
+ attptr += sizeof(int32);
+
+ /*
+ * * mer fixed disk layout of varlenas to get rid of the need for
+ * this. *
+ *
+ * ((int32 *) attptr) = nwrite + sizeof(int32); * attptr +=
+ * sizeof(int32);
+ */
+
+ *((int32 *) attptr) = nwrite + sizeof(int32);
+ attptr += sizeof(int32);
+
+ /*
+ * If a data buffer was passed in, then copy the data from the buffer
+ * to the tuple. Some callers (eg, inv_wrold()) may not pass in a
+ * buffer, since they have to copy part of the old tuple data and part
+ * of the user's new data into the new tuple.
+ */
+
+ if (dbuf != (char *) NULL)
+ memmove(attptr, dbuf, nwrite);
+
+ /* keep track of boundary of current tuple */
+ obj_desc->lowbyte = obj_desc->offset;
+ obj_desc->highbyte = obj_desc->offset + nwrite - 1;
+
+ /* new tuple is filled -- return it */
+ return ntup;
}
-static int count = 0;
+static void
+inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
+{
+ InsertIndexResult res;
+ Datum v[1];
+ char n[1];
-int
-inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
- int nwritten = 0;
- while (nwritten < nbytes)
- nwritten += inv_write_int(obj_desc, buf + nwritten, nbytes - nwritten);
+ n[0] = ' ';
+ v[0] = Int32GetDatum(obj_desc->highbyte);
+ res = index_insert(obj_desc->index_r, &v[0], &n[0],
+ &(tuple->t_self), obj_desc->heap_r);
- return nwritten;
+ if (res)
+ pfree(res);
+}
+
+#ifdef NOT_USED
+
+static void
+DumpPage(Page page, int blkno)
+{
+ ItemId lp;
+ HeapTuple tup;
+ int flags, i, nline;
+ ItemPointerData pointerData;
+
+ printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
+ ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
+ ((PageHeader)page)->pd_special);
+
+ printf("\t:MaxOffsetNumber=%d\n",
+ (int16) PageGetMaxOffsetNumber(page));
+
+ nline = (int16) PageGetMaxOffsetNumber(page);
+
+{
+ int i;
+ char *cp;
+
+ i = PageGetSpecialSize(page);
+ cp = PageGetSpecialPointer(page);
+
+ printf("\t:SpecialData=");
+
+ while (i > 0) {
+ printf(" 0x%02x", *cp);
+ cp += 1;
+ i -= 1;
+ }
+ printf("\n");
+}
+ for (i = 0; i < nline; i++) {
+ lp = ((PageHeader)page)->pd_linp + i;
+ flags = (*lp).lp_flags;
+ ItemPointerSet(&pointerData, blkno, 1 + i);
+ printf("%s:off=%d:flags=0x%x:len=%d",
+ ItemPointerFormExternal(&pointerData), (*lp).lp_off,
+ flags, (*lp).lp_len);
+
+ if (flags & LP_USED) {
+ HeapTupleData htdata;
+
+ printf(":USED");
+
+ memmove((char *) &htdata,
+ (char *) &((char *)page)[(*lp).lp_off],
+ sizeof(htdata));
+
+ tup = &htdata;
+
+ printf("\n\t:ctid=%s:oid=%d",
+ ItemPointerFormExternal(&tup->t_ctid),
+ tup->t_oid);
+ printf(":natts=%d:thoff=%d:",
+ tup->t_natts,
+ tup->t_hoff);
+
+ printf("\n\t:cmin=%u:",
+ tup->t_cmin);
+
+ printf("xmin=%u:", tup->t_xmin);
+
+ printf("\n\t:cmax=%u:",
+ tup->t_cmax);
+
+ printf("xmax=%u:\n", tup->t_xmax);
+
+ } else
+ putchar('\n');
+ }
+}
+
+static char*
+ItemPointerFormExternal(ItemPointer pointer)
+{
+ static char itemPointerString[32];
+
+ if (!ItemPointerIsValid(pointer)) {
+ memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
+ } else {
+ sprintf(itemPointerString, "<%u,%u>",
+ ItemPointerGetBlockNumber(pointer),
+ ItemPointerGetOffsetNumber(pointer));
+ }
+
+ return itemPointerString;
+}
+
+#endif
+
+static int
+_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
+{
+ IndexScanDesc iscan;
+ RetrieveIndexResult res;
+ HeapTupleData tuple;
+ Datum d;
+ long size;
+ bool isNull;
+ Buffer buffer;
+
+ /* scan backwards from end */
+ iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
+
+ do
+ {
+ res = index_getnext(iscan, BackwardScanDirection);
+
+ /*
+ * If there are no more index tuples, then the relation is empty,
+ * so the file's size is zero.
+ */
+
+ if (res == (RetrieveIndexResult) NULL)
+ {
+ index_endscan(iscan);
+ return 0;
+ }
+
+ /*
+ * For time travel, we need to use the actual time qual here,
+ * rather that NowTimeQual. We currently have no way to pass a
+ * time qual in.
+ */
+ tuple.t_self = res->heap_iptr;
+ heap_fetch(hreln, SnapshotNow, &tuple, &buffer);
+ pfree(res);
+ } while (tuple.t_data == NULL);
+
+ /* don't need the index scan anymore */
+ index_endscan(iscan);
+
+ /* get olastbyte attribute */
+ d = heap_getattr(&tuple, 1, hdesc, &isNull);
+ size = DatumGetInt32(d) + 1;
+ ReleaseBuffer(buffer);
+
+ return size;
}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index debf1f2626e..3e4ba6c72a3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -22,7 +22,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.171 2000/10/21 15:55:26 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.172 2000/10/22 05:27:18 momjian Exp $
*
* Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb
*
@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal)
fprintf(stderr, "%s saving BLOBs\n", g_comment_start);
/* Cursor to get all BLOB tables */
- appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT loid from pg_largeobject");
+ appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT);
res = PQexec(g_conn, oidQry->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
diff --git a/src/include/catalog/catname.h b/src/include/catalog/catname.h
index cb95771147d..b82977d806c 100644
--- a/src/include/catalog/catname.h
+++ b/src/include/catalog/catname.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: catname.h,v 1.15 2000/10/21 15:55:28 momjian Exp $
+ * $Id: catname.h,v 1.16 2000/10/22 05:27:20 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -29,7 +29,6 @@
#define InheritsRelationName "pg_inherits"
#define InheritancePrecidenceListRelationName "pg_ipl"
#define LanguageRelationName "pg_language"
-#define LargeobjectRelationName "pg_largeobject"
#define ListenerRelationName "pg_listener"
#define LogRelationName "pg_log"
#define OperatorClassRelationName "pg_opclass"
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 7bee3e0f039..6cc98bdc322 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: indexing.h,v 1.43 2000/10/21 15:55:28 momjian Exp $
+ * $Id: indexing.h,v 1.44 2000/10/22 05:27:20 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,7 +31,6 @@
#define Num_pg_index_indices 2
#define Num_pg_inherits_indices 1
#define Num_pg_language_indices 2
-#define Num_pg_largeobject_indices 2
#define Num_pg_listener_indices 1
#define Num_pg_opclass_indices 2
#define Num_pg_operator_indices 2
@@ -63,8 +62,6 @@
#define InheritsRelidSeqnoIndex "pg_inherits_relid_seqno_index"
#define LanguageNameIndex "pg_language_name_index"
#define LanguageOidIndex "pg_language_oid_index"
-#define LargeobjectLOIdIndex "pg_largeobject_loid_index"
-#define LargeobjectLOIdPNIndex "pg_largeobject_loid_pn_index"
#define ListenerPidRelnameIndex "pg_listener_pid_relname_index"
#define OpclassDeftypeIndex "pg_opclass_deftype_index"
#define OpclassNameIndex "pg_opclass_name_index"
@@ -95,7 +92,6 @@ extern char *Name_pg_group_indices[];
extern char *Name_pg_index_indices[];
extern char *Name_pg_inherits_indices[];
extern char *Name_pg_language_indices[];
-extern char *Name_pg_largeobject_indices[];
extern char *Name_pg_listener_indices[];
extern char *Name_pg_opclass_indices[];
extern char *Name_pg_operator_indices[];
@@ -195,8 +191,6 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli
DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops));
DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops));
-DECLARE_INDEX(pg_largeobject_loid_index on pg_largeobject using btree(loid oid_ops));
-DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops));
/* This column needs to allow multiple zero entries, but is in the cache */
DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops));
diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h
deleted file mode 100644
index 409aaf8d226..00000000000
--- a/src/include/catalog/pg_largeobject.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * pg_largeobject.h
- * definition of the system "largeobject" relation (pg_largeobject)
- * along with the relation's initial contents.
- *
- *
- * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- * $Id: pg_largeobject.h,v 1.3 2000/10/21 15:55:28 momjian Exp $
- *
- * NOTES
- * the genbki.sh script reads this file and generates .bki
- * information from the DATA() statements.
- *
- *-------------------------------------------------------------------------
- */
-#ifndef PG_LARGEOBJECT_H
-#define PG_LARGEOBJECT_H
-
-/* ----------------
- * postgres.h contains the system type definintions and the
- * CATALOG(), BOOTSTRAP and DATA() sugar words so this file
- * can be read by both genbki.sh and the C compiler.
- * ----------------
- */
-
-/* ----------------
- * pg_largeobject definition. cpp turns this into
- * typedef struct FormData_pg_largeobject. Large object id
- * is stored in loid;
- * ----------------
- */
-
-CATALOG(pg_largeobject)
-{
- Oid loid;
- int4 pageno;
- bytea data;
-} FormData_pg_largeobject;
-
-/* ----------------
- * Form_pg_largeobject corresponds to a pointer to a tuple with
- * the format of pg_largeobject relation.
- * ----------------
- */
-typedef FormData_pg_largeobject *Form_pg_largeobject;
-
-/* ----------------
- * compiler constants for pg_largeobject
- * ----------------
- */
-#define Natts_pg_largeobject 3
-#define Anum_pg_largeobject_loid 1
-#define Anum_pg_largeobject_pageno 2
-#define Anum_pg_largeobject_data 3
-
-Oid LargeobjectCreate(Oid loid);
-void LargeobjectDrop(Oid loid);
-int LargeobjectFind(Oid loid);
-
-#endif /* PG_LARGEOBJECT_H */
diff --git a/src/include/storage/large_object.h b/src/include/storage/large_object.h
index 77990c56335..c480f5b7874 100644
--- a/src/include/storage/large_object.h
+++ b/src/include/storage/large_object.h
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: large_object.h,v 1.16 2000/10/21 15:55:29 momjian Exp $
+ * $Id: large_object.h,v 1.17 2000/10/22 05:27:23 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -22,11 +22,17 @@
/*
* This structure will eventually have lots more stuff associated with it.
*/
-typedef struct LargeObjectDesc {
- Relation heap_r;
- Relation index_r;
+typedef struct LargeObjectDesc
+{
+ Relation heap_r; /* heap relation */
+ Relation index_r; /* index relation on seqno attribute */
+ IndexScanDesc iscan; /* index scan we're using */
+ TupleDesc hdesc; /* heap relation tuple desc */
+ TupleDesc idesc; /* index relation tuple desc */
+ uint32 lowbyte; /* low byte on the current page */
+ uint32 highbyte; /* high byte on the current page */
uint32 offset; /* current seek pointer */
- Oid id;
+ ItemPointerData htid; /* tid of current heap tuple */
#define IFS_RDLOCK (1 << 0)
#define IFS_WRLOCK (1 << 1)
@@ -49,4 +55,7 @@ extern int inv_tell(LargeObjectDesc *obj_desc);
extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes);
+/* added for buffer leak prevention [ PA ] */
+extern void inv_cleanindex(LargeObjectDesc *obj_desc);
+
#endif /* LARGE_OBJECT_H */