aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/deparse.c
diff options
context:
space:
mode:
authorTomas Vondra <tomas.vondra@postgresql.org>2021-01-20 23:05:46 +0100
committerTomas Vondra <tomas.vondra@postgresql.org>2021-01-20 23:57:27 +0100
commitb663a4136331de6c7364226e3dbf7c88bfee7145 (patch)
tree3a14fac68bcfc27a42e365501ce1cdbc1ddfdc00 /contrib/postgres_fdw/deparse.c
parentad600bba0422dde4b73fbd61049ff2a3847b068a (diff)
downloadpostgresql-b663a4136331de6c7364226e3dbf7c88bfee7145.tar.gz
postgresql-b663a4136331de6c7364226e3dbf7c88bfee7145.zip
Implement support for bulk inserts in postgres_fdw
Extends the FDW API to allow batching inserts into foreign tables. That is usually much more efficient than inserting individual rows, due to high latency for each round-trip to the foreign server. It was possible to implement something similar in the regular FDW API, but it was inconvenient and there were issues with reporting the number of actually inserted rows etc. This extends the FDW API with two new functions: * GetForeignModifyBatchSize - allows the FDW picking optimal batch size * ExecForeignBatchInsert - inserts a batch of rows at once Currently, only INSERT queries support batching. Support for DELETE and UPDATE may be added in the future. This also implements batching for postgres_fdw. The batch size may be specified using "batch_size" option both at the server and table level. The initial patch version was written by me, but it was rewritten and improved in many ways by Takayuki Tsunakawa. Author: Takayuki Tsunakawa Reviewed-by: Tomas Vondra, Amit Langote Discussion: https://postgr.es/m/20200628151002.7x5laxwpgvkyiu3q@development
Diffstat (limited to 'contrib/postgres_fdw/deparse.c')
-rw-r--r--contrib/postgres_fdw/deparse.c54
1 files changed, 53 insertions, 1 deletions
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 3cf7b4eb1e0..6faf499f9a6 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1705,13 +1705,16 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
* The statement text is appended to buf, and we also create an integer List
* of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any),
* which is returned to *retrieved_attrs.
+ *
+ * This also stores end position of the VALUES clause, so that we can rebuild
+ * an INSERT for a batch of rows later.
*/
void
deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
- List **retrieved_attrs)
+ List **retrieved_attrs, int *values_end_len)
{
AttrNumber pindex;
bool first;
@@ -1754,6 +1757,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
}
else
appendStringInfoString(buf, " DEFAULT VALUES");
+ *values_end_len = buf->len;
if (doNothing)
appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
@@ -1764,6 +1768,54 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
}
/*
+ * rebuild remote INSERT statement
+ *
+ * Provided a number of rows in a batch, builds INSERT statement with the
+ * right number of parameters.
+ */
+void
+rebuildInsertSql(StringInfo buf, char *orig_query,
+ int values_end_len, int num_cols,
+ int num_rows)
+{
+ int i, j;
+ int pindex;
+ bool first;
+
+ /* Make sure the values_end_len is sensible */
+ Assert((values_end_len > 0) && (values_end_len <= strlen(orig_query)));
+
+ /* Copy up to the end of the first record from the original query */
+ appendBinaryStringInfo(buf, orig_query, values_end_len);
+
+ /*
+ * Add records to VALUES clause (we already have parameters for the
+ * first row, so start at the right offset).
+ */
+ pindex = num_cols + 1;
+ for (i = 0; i < num_rows; i++)
+ {
+ appendStringInfoString(buf, ", (");
+
+ first = true;
+ for (j = 0; j < num_cols; j++)
+ {
+ if (!first)
+ appendStringInfoString(buf, ", ");
+ first = false;
+
+ appendStringInfo(buf, "$%d", pindex);
+ pindex++;
+ }
+
+ appendStringInfoChar(buf, ')');
+ }
+
+ /* Copy stuff after VALUES clause from the original query */
+ appendStringInfoString(buf, orig_query + values_end_len);
+}
+
+/*
* deparse remote UPDATE statement
*
* The statement text is appended to buf, and we also create an integer List