diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index df1ff05a8fe..46c7cc5923f 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -67,6 +67,7 @@ typedef struct remoteConn * Internal declarations */ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); +static void prepTuplestoreResult(FunctionCallInfo fcinfo); static void materializeResult(FunctionCallInfo fcinfo, PGresult *res); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); @@ -495,7 +496,6 @@ PG_FUNCTION_INFO_V1(dblink_fetch); Datum dblink_fetch(PG_FUNCTION_ARGS) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; PGresult *res = NULL; char *conname = NULL; remoteConn *rconn = NULL; @@ -505,6 +505,8 @@ dblink_fetch(PG_FUNCTION_ARGS) int howmany = 0; bool fail = true; /* default to backward compatible */ + prepTuplestoreResult(fcinfo); + DBLINK_INIT; if (PG_NARGS() == 4) @@ -551,11 +553,6 @@ dblink_fetch(PG_FUNCTION_ARGS) if (!conn) DBLINK_CONN_NOT_AVAIL; - /* let the caller know we're sending back a tuplestore */ - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = NULL; - rsinfo->setDesc = NULL; - initStringInfo(&buf); appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); @@ -632,7 +629,6 @@ dblink_get_result(PG_FUNCTION_ARGS) static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; char *msg; PGresult *res = NULL; PGconn *conn = NULL; @@ -643,16 +639,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) bool fail = true; /* default to backward compatible */ bool freeconn = false; - /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot accept a set"))); - if (!(rsinfo->allowedModes & SFRM_Materialize)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); + prepTuplestoreResult(fcinfo); DBLINK_INIT; @@ -712,11 +699,6 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (!conn) DBLINK_CONN_NOT_AVAIL; - /* let the caller know we're sending back a tuplestore */ - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = NULL; - rsinfo->setDesc = NULL; - /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); @@ -745,14 +727,45 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } /* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. + * Verify function caller can handle a tuplestore result, and set up for that. + * + * Note: if the caller returns without actually creating a tuplestore, the + * executor will treat the function result as an empty set. + */ +static void +prepTuplestoreResult(FunctionCallInfo fcinfo) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + /* check to see if query supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* let the executor know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + + /* caller must fill these to return a non-empty result */ + rsinfo->setResult = NULL; + rsinfo->setDesc = NULL; +} + +/* + * Copy the contents of the PGresult into a tuplestore to be returned + * as the result of the current function. + * The PGresult will be released in this function. */ static void materializeResult(FunctionCallInfo fcinfo, PGresult *res) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + /* prepTuplestoreResult must have been called previously */ Assert(rsinfo->returnMode == SFRM_Materialize); PG_TRY(); @@ -1004,85 +1017,97 @@ PG_FUNCTION_INFO_V1(dblink_exec); Datum dblink_exec(PG_FUNCTION_ARGS) { - char *msg; - PGresult *res = NULL; - text *sql_cmd_status = NULL; - PGconn *conn = NULL; - char *connstr = NULL; - char *sql = NULL; - char *conname = NULL; - remoteConn *rconn = NULL; - bool freeconn = false; - bool fail = true; /* default to backward compatible behavior */ + text *volatile sql_cmd_status = NULL; + PGconn *volatile conn = NULL; + volatile bool freeconn = false; DBLINK_INIT; - if (PG_NARGS() == 3) + PG_TRY(); { - /* must be text,text,bool */ - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - fail = PG_GETARG_BOOL(2); - } - else if (PG_NARGS() == 2) - { - /* might be text,text or text,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + char *msg; + PGresult *res = NULL; + char *connstr = NULL; + char *sql = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + bool fail = true; /* default to backward compatible behavior */ + + if (PG_NARGS() == 3) { + /* must be text,text,bool */ + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + fail = PG_GETARG_BOOL(2); + } + else if (PG_NARGS() == 2) + { + /* might be text,text or text,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + { + conn = pconn->conn; + sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + fail = PG_GETARG_BOOL(1); + } + else + { + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + } + } + else if (PG_NARGS() == 1) + { + /* must be single text argument */ conn = pconn->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - fail = PG_GETARG_BOOL(1); + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + res = PQexec(conn, sql); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + dblink_res_error(conname, res, "could not execute command", fail); + + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text("ERROR"); + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text(PQcmdStatus(res)); + PQclear(res); } else { - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + PQclear(res); + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("statement returning results not allowed"))); } } - else if (PG_NARGS() == 1) + PG_CATCH(); { - /* must be single text argument */ - conn = pconn->conn; - sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + /* if needed, close the connection to the database */ + if (freeconn) + PQfinish(conn); + PG_RE_THROW(); } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); + PG_END_TRY(); - if (!conn) - DBLINK_CONN_NOT_AVAIL; - - res = PQexec(conn, sql); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) - { - dblink_res_error(conname, res, "could not execute command", fail); - - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text("ERROR"); - } - else if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text(PQcmdStatus(res)); - PQclear(res); - } - else - { - PQclear(res); - ereport(ERROR, - (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), - errmsg("statement returning results not allowed"))); - } - - /* if needed, close the connection to the database and cleanup */ + /* if needed, close the connection to the database */ if (freeconn) PQfinish(conn); @@ -1503,13 +1528,15 @@ dblink_get_notify(PG_FUNCTION_ARGS) MemoryContext per_query_ctx; MemoryContext oldcontext; + prepTuplestoreResult(fcinfo); + DBLINK_INIT; if (PG_NARGS() == 1) DBLINK_GET_NAMED_CONN; else conn = pconn->conn; - /* create the tuplestore */ + /* create the tuplestore in per-query memory */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); @@ -1522,7 +1549,6 @@ dblink_get_notify(PG_FUNCTION_ARGS) TEXTOID, -1, 0); tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; rsinfo->setResult = tupstore; rsinfo->setDesc = tupdesc;