diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore index 1334a1f77b1..7abea15a3f9 100644 --- a/src/bin/pg_basebackup/.gitignore +++ b/src/bin/pg_basebackup/.gitignore @@ -1,2 +1,3 @@ /pg_basebackup /pg_receivexlog +/pg_recvlogical diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 17c91af1240..346560eeab1 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) OBJS=receivelog.o streamutil.o $(WIN32RES) -all: pg_basebackup pg_receivexlog +all: pg_basebackup pg_receivexlog pg_recvlogical pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) @@ -28,9 +28,13 @@ pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) +pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport + $(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) + install: all installdirs $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)' $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' + $(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)' installdirs: $(MKDIR_P) '$(DESTDIR)$(bindir)' @@ -38,6 +42,9 @@ installdirs: uninstall: rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)' rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' + rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)' clean distclean maintainer-clean: - rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o + rm -f pg_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) \ + pg_basebackup.o pg_receivexlog.o pg_recvlogical.o \ + $(OBJS) diff --git a/src/bin/pg_basebackup/nls.mk b/src/bin/pg_basebackup/nls.mk index e1c96dd4c49..29df4bcdb39 100644 --- a/src/bin/pg_basebackup/nls.mk +++ b/src/bin/pg_basebackup/nls.mk @@ -1,4 +1,4 @@ # src/bin/pg_basebackup/nls.mk CATALOG_NAME = pg_basebackup AVAIL_LANGUAGES = cs de es fr it ja pl pt_BR ru zh_CN -GETTEXT_FILES = pg_basebackup.c pg_receivexlog.c receivelog.c streamutil.c ../../common/fe_memutils.c +GETTEXT_FILES = pg_basebackup.c pg_receivexlog.c pg_recvlogical.c receivelog.c streamutil.c ../../common/fe_memutils.c diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c new file mode 100644 index 00000000000..a631cee2dd7 --- /dev/null +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -0,0 +1,978 @@ +/*------------------------------------------------------------------------- + * + * pg_recvlogical.c - receive data from a logical decoding slot in a streaming fashion + * and write it to to a local file. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_recvlogical.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include +#include +#include + +/* local includes */ +#include "streamutil.h" + +#include "access/xlog_internal.h" +#include "common/fe_memutils.h" +#include "getopt_long.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "pqexpbuffer.h" + + +/* Time to sleep between reconnection attempts */ +#define RECONNECT_SLEEP_TIME 5 + +/* Global Options */ +static char *outfile = NULL; +static int verbose = 0; +static int noloop = 0; +static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static int fsync_interval = 10 * 1000; /* 10 sec = default */ +static XLogRecPtr startpos = InvalidXLogRecPtr; +static bool do_create_slot = false; +static bool do_start_slot = false; +static bool do_drop_slot = false; + +/* filled pairwise with option, value. value may be NULL */ +static char **options; +static size_t noptions = 0; +static const char *plugin = "test_decoding"; + +/* Global State */ +static int outfd = -1; +static volatile sig_atomic_t time_to_abort = false; +static volatile sig_atomic_t output_reopen = false; +static int64 output_last_fsync = -1; +static bool output_unsynced = false; +static XLogRecPtr output_written_lsn = InvalidXLogRecPtr; +static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; + +static void usage(void); +static void StreamLog(); +static void disconnect_and_exit(int code); + +static void +usage(void) +{ + printf(_("%s receives PostgreSQL logical change stream.\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nOptions:\n")); + printf(_(" -f, --file=FILE receive log into this file. - for stdout\n")); + printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_("\nConnection options:\n")); + printf(_(" -d, --dbname=DBNAME database to connect to\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); + printf(_(" -p, --port=PORT database server port number\n")); + printf(_(" -U, --username=NAME connect as specified database user\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_("\nReplication options:\n")); + printf(_(" -F --fsync-interval=INTERVAL\n" + " frequency of syncs to the output file (in seconds, defaults to 10)\n")); + printf(_(" -o, --option=NAME[=VALUE]\n" + " Specify option NAME with optional value VAL, to be passed\n" + " to the output plugin\n")); + printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (defaults to test_decoding)\n")); + printf(_(" -s, --status-interval=INTERVAL\n" + " time between status packets sent to server (in seconds, defaults to 10)\n")); + printf(_(" -S, --slot=SLOT use existing replication slot SLOT instead of starting a new one\n")); + printf(_(" -I, --startpos=PTR Where in an existing slot should the streaming start\n")); + printf(_("\nAction to be performed:\n")); + printf(_(" --create create a new replication slot (for the slotname see --slot)\n")); + printf(_(" --start start streaming in a replication slot (for the slotname see --slot)\n")); + printf(_(" --drop drop the replication slot (for the slotname see --slot)\n")); + printf(_("\nReport bugs to .\n")); +} + +/* + * Send a Standby Status Update message to server. + */ +static bool +sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested) +{ + static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; + static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; + + char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + int len = 0; + + /* + * we normally don't want to send superflous feedbacks, but if it's + * because of a timeout we need to, otherwise wal_sender_timeout will + * kill us. + */ + if (!force && + last_written_lsn == output_written_lsn && + last_fsync_lsn != output_fsync_lsn) + return true; + + if (verbose) + fprintf(stderr, + _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"), + progname, + (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn, + (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn, + replication_slot); + + replybuf[len] = 'r'; + len += 1; + fe_sendint64(output_written_lsn, &replybuf[len]); /* write */ + len += 8; + fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */ + len += 8; + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + len += 8; + fe_sendint64(now, &replybuf[len]); /* sendTime */ + len += 8; + replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ + len += 1; + + startpos = output_written_lsn; + last_written_lsn = output_written_lsn; + last_fsync_lsn = output_fsync_lsn; + + if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send feedback packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + + return true; +} + +static void +disconnect_and_exit(int code) +{ + if (conn != NULL) + PQfinish(conn); + + exit(code); +} + +static bool +OutputFsync(int64 now) +{ + output_last_fsync = now; + + output_fsync_lsn = output_written_lsn; + + if (fsync_interval <= 0) + return true; + + if (!output_unsynced) + return true; + + output_unsynced = false; + + /* Accept EINVAL, in case output is writing to a pipe or similar. */ + if (fsync(outfd) != 0 && errno != EINVAL) + { + fprintf(stderr, + _("%s: could not fsync log file \"%s\": %s\n"), + progname, outfile, strerror(errno)); + return false; + } + + return true; +} + +/* + * Start the log streaming + */ +static void +StreamLog(void) +{ + PGresult *res; + char *copybuf = NULL; + int64 last_status = -1; + int i; + PQExpBuffer query; + + output_written_lsn = InvalidXLogRecPtr; + output_fsync_lsn = InvalidXLogRecPtr; + + query = createPQExpBuffer(); + + /* + * Connect in replication mode to the server + */ + if (!conn) + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + return; + + /* + * Start the replication + */ + if (verbose) + fprintf(stderr, + _("%s: starting log streaming at %X/%X (slot %s)\n"), + progname, (uint32) (startpos >> 32), (uint32) startpos, + replication_slot); + + /* Initiate the replication stream at specified location */ + appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X", + replication_slot, (uint32) (startpos >> 32), (uint32) startpos); + + /* print options if there are any */ + if (noptions) + appendPQExpBufferStr(query, " ("); + + for (i = 0; i < noptions; i++) + { + /* separator */ + if (i > 0) + appendPQExpBufferStr(query, ", "); + + /* write option name */ + appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]); + + /* write option value if specified */ + if (options[(i * 2) + 1] != NULL) + appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]); + } + + if (noptions) + appendPQExpBufferChar(query, ')'); + + res = PQexec(conn, query->data); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"), + progname, query->data, PQresultErrorMessage(res)); + PQclear(res); + goto error; + } + PQclear(res); + resetPQExpBuffer(query); + + if (verbose) + fprintf(stderr, + _("%s: initiated streaming\n"), + progname); + + while (!time_to_abort) + { + int r; + int bytes_left; + int bytes_written; + int64 now; + int hdr_len; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + /* + * Potentially send a status message to the master + */ + now = feGetCurrentTimestamp(); + + if (outfd != -1 && + feTimestampDifferenceExceeds(output_last_fsync, now, + fsync_interval)) + { + if (!OutputFsync(now)) + goto error; + } + + if (standby_message_timeout > 0 && + feTimestampDifferenceExceeds(last_status, now, + standby_message_timeout)) + { + /* Time to send feedback! */ + if (!sendFeedback(conn, now, true, false)) + goto error; + + last_status = now; + } + + r = PQgetCopyData(conn, ©buf, 1); + if (r == 0) + { + /* + * In async mode, and no data available. We block on reading but + * not more than the specified timeout, so that we can send a + * response back to the client. + */ + fd_set input_mask; + int64 message_target = 0; + int64 fsync_target = 0; + struct timeval timeout; + struct timeval *timeoutptr; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + + /* Compute when we need to wakeup to send a keepalive message. */ + if (standby_message_timeout) + message_target = last_status + (standby_message_timeout - 1) * + ((int64) 1000); + + /* Compute when we need to wakeup to fsync the output file. */ + if (fsync_interval > 0 && output_unsynced) + fsync_target = output_last_fsync + (fsync_interval - 1) * + ((int64) 1000); + + /* Now compute when to wakeup. */ + if (message_target > 0 || fsync_target > 0) + { + int64 targettime; + long secs; + int usecs; + + targettime = message_target; + + if (fsync_target > 0 && fsync_target < targettime) + targettime = fsync_target; + + feTimestampDifference(now, + targettime, + &secs, + &usecs); + if (secs <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + else + timeout.tv_sec = secs; + timeout.tv_usec = usecs; + timeoutptr = &timeout; + } + + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Continue the loop and either + * deliver a status packet to the server or just go back into + * blocking. + */ + continue; + } + else if (r < 0) + { + fprintf(stderr, _("%s: select() failed: %s\n"), + progname, strerror(errno)); + goto error; + } + + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, + _("%s: could not receive data from WAL stream: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + continue; + } + + /* End of copy stream */ + if (r == -1) + break; + + /* Failure while reading the copy stream */ + if (r == -2) + { + fprintf(stderr, _("%s: could not read COPY data: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + + /* Check the message type. */ + if (copybuf[0] == 'k') + { + int pos; + bool replyRequested; + XLogRecPtr walEnd; + + /* + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. + */ + pos = 1; /* skip msgtype 'k' */ + walEnd = fe_recvint64(©buf[pos]); + output_written_lsn = Max(walEnd, output_written_lsn); + + pos += 8; /* read walEnd */ + + pos += 8; /* skip sendTime */ + + if (r < pos + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); + goto error; + } + replyRequested = copybuf[pos]; + + /* If the server requested an immediate reply, send one. */ + if (replyRequested) + { + /* fsync data, so we send a recent flush pointer */ + if (!OutputFsync(now)) + goto error; + + now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, now, true, false)) + goto error; + last_status = now; + } + continue; + } + else if (copybuf[0] != 'w') + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); + goto error; + } + + + /* + * Read the header of the XLogData message, enclosed in the CopyData + * message. We only need the WAL location field (dataStart), the rest + * of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (r < hdr_len + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); + goto error; + } + + /* Extract WAL location for this block */ + { + XLogRecPtr temp = fe_recvint64(©buf[1]); + + output_written_lsn = Max(temp, output_written_lsn); + } + + /* redirect output to stdout */ + if (outfd == -1 && strcmp(outfile, "-") == 0) + { + outfd = fileno(stdout); + } + + /* got SIGHUP, close output file */ + if (outfd != -1 && output_reopen) + { + now = feGetCurrentTimestamp(); + if (!OutputFsync(now)) + goto error; + close(outfd); + outfd = -1; + output_reopen = false; + } + + if (outfd == -1) + { + + outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (outfd == -1) + { + fprintf(stderr, + _("%s: could not open log file \"%s\": %s\n"), + progname, outfile, strerror(errno)); + goto error; + } + } + + bytes_left = r - hdr_len; + bytes_written = 0; + + /* signal that a fsync is needed */ + output_unsynced = true; + + while (bytes_left) + { + int ret; + + ret = write(outfd, + copybuf + hdr_len + bytes_written, + bytes_left); + + if (ret < 0) + { + fprintf(stderr, + _("%s: could not write %u bytes to log file \"%s\": %s\n"), + progname, bytes_left, outfile, + strerror(errno)); + goto error; + } + + /* Write was successful, advance our position */ + bytes_written += ret; + bytes_left -= ret; + } + + if (write(outfd, "\n", 1) != 1) + { + fprintf(stderr, + _("%s: could not write %u bytes to log file \"%s\": %s\n"), + progname, 1, outfile, + strerror(errno)); + goto error; + } + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, + _("%s: unexpected termination of replication stream: %s"), + progname, PQresultErrorMessage(res)); + goto error; + } + PQclear(res); + + if (copybuf != NULL) + PQfreemem(copybuf); + + if (outfd != -1 && strcmp(outfile, "-") != 0) + { + int64 t = feGetCurrentTimestamp(); + + /* no need to jump to error on failure here, we're finishing anyway */ + OutputFsync(t); + + if (close(outfd) != 0) + fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), + progname, outfile, strerror(errno)); + } + outfd = -1; +error: + destroyPQExpBuffer(query); + PQfinish(conn); + conn = NULL; +} + +/* + * Unfortunately we can't do sensible signal handling on windows... + */ +#ifndef WIN32 + +/* + * When sigint is called, just tell the system to exit at the next possible + * moment. + */ +static void +sigint_handler(int signum) +{ + time_to_abort = true; +} + +/* + * Trigger the output file to be reopened. + */ +static void +sighup_handler(int signum) +{ + output_reopen = true; +} +#endif + + +int +main(int argc, char **argv) +{ + PGresult *res; + static struct option long_options[] = { +/* general options */ + {"file", required_argument, NULL, 'f'}, + {"no-loop", no_argument, NULL, 'n'}, + {"verbose", no_argument, NULL, 'v'}, + {"version", no_argument, NULL, 'V'}, + {"help", no_argument, NULL, '?'}, +/* connnection options */ + {"dbname", required_argument, NULL, 'd'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"username", required_argument, NULL, 'U'}, + {"no-password", no_argument, NULL, 'w'}, + {"password", no_argument, NULL, 'W'}, +/* replication options */ + {"option", required_argument, NULL, 'o'}, + {"plugin", required_argument, NULL, 'P'}, + {"status-interval", required_argument, NULL, 's'}, + {"fsync-interval", required_argument, NULL, 'F'}, + {"slot", required_argument, NULL, 'S'}, + {"startpos", required_argument, NULL, 'I'}, +/* action */ + {"create", no_argument, NULL, 1}, + {"start", no_argument, NULL, 2}, + {"drop", no_argument, NULL, 3}, + {NULL, 0, NULL, 0} + }; + int c; + int option_index; + uint32 hi, + lo; + + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 || + strcmp(argv[1], "--version") == 0) + { + puts("pg_recvlogical (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "f:F:nvd:h:o:p:U:wWP:s:S:", + long_options, &option_index)) != -1) + { + switch (c) + { +/* general options */ + case 'f': + outfile = pg_strdup(optarg); + break; + case 'n': + noloop = 1; + break; + case 'v': + verbose++; + break; +/* connnection options */ + case 'd': + dbname = pg_strdup(optarg); + break; + case 'h': + dbhost = pg_strdup(optarg); + break; + case 'p': + if (atoi(optarg) <= 0) + { + fprintf(stderr, _("%s: invalid port number \"%s\"\n"), + progname, optarg); + exit(1); + } + dbport = pg_strdup(optarg); + break; + case 'U': + dbuser = pg_strdup(optarg); + break; + case 'w': + dbgetpassword = -1; + break; + case 'W': + dbgetpassword = 1; + break; +/* replication options */ + case 'o': + { + char *data = pg_strdup(optarg); + char *val = strchr(data, '='); + + if (val != NULL) + { + /* remove =; separate data from val */ + *val = '\0'; + val++; + } + + noptions += 1; + options = pg_realloc(options, sizeof(char*) * noptions * 2); + + options[(noptions - 1) * 2] = data; + options[(noptions - 1) * 2 + 1] = val; + } + + break; + case 'P': + plugin = pg_strdup(optarg); + break; + case 's': + standby_message_timeout = atoi(optarg) * 1000; + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 'F': + fsync_interval = atoi(optarg) * 1000; + if (fsync_interval < 0) + { + fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 'S': + replication_slot = pg_strdup(optarg); + break; + case 'I': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse start position \"%s\"\n"), + progname, optarg); + exit(1); + } + startpos = ((uint64) hi) << 32 | lo; + break; +/* action */ + case 1: + do_create_slot = true; + break; + case 2: + do_start_slot = true; + break; + case 3: + do_drop_slot = true; + break; + + default: + + /* + * getopt_long already emitted a complaint + */ + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + + /* + * Any non-option arguments? + */ + if (optind < argc) + { + fprintf(stderr, + _("%s: too many command-line arguments (first is \"%s\")\n"), + progname, argv[optind]); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* + * Required arguments + */ + if (replication_slot == NULL) + { + fprintf(stderr, _("%s: no slot specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (do_start_slot && outfile == NULL) + { + fprintf(stderr, _("%s: no target file specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (!do_drop_slot && dbname == NULL) + { + fprintf(stderr, _("%s: no database specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (!do_drop_slot && !do_create_slot && !do_start_slot) + { + fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (do_drop_slot && (do_create_slot || do_start_slot)) + { + fprintf(stderr, _("%s: --stop cannot be combined with --init or --start\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (startpos && (do_create_slot || do_drop_slot)) + { + fprintf(stderr, _("%s: --startpos cannot be combined with --init or --stop\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + +#ifndef WIN32 + pqsignal(SIGINT, sigint_handler); + pqsignal(SIGHUP, sighup_handler); +#endif + + /* + * don't really need this but it actually helps to get more precise error + * messages about authentication, required GUCs and such without starting + * to loop around connection attempts lateron. + */ + { + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + exit(1); + + /* + * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog + * position. + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + if (PQntuples(res) != 1 || PQnfields(res) < 4) + { + fprintf(stderr, + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 4); + disconnect_and_exit(1); + } + PQclear(res); + } + + + /* + * stop a replication slot + */ + if (do_drop_slot) + { + char query[256]; + + if (verbose) + fprintf(stderr, + _("%s: freeing replication slot \"%s\"\n"), + progname, replication_slot); + + snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", + replication_slot); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + if (PQntuples(res) != 0 || PQnfields(res) != 0) + { + fprintf(stderr, + _("%s: could not stop logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 0, 0); + disconnect_and_exit(1); + } + + PQclear(res); + disconnect_and_exit(0); + } + + /* + * init a replication slot + */ + if (do_create_slot) + { + char query[256]; + + if (verbose) + fprintf(stderr, + _("%s: initializing replication slot \"%s\"\n"), + progname, replication_slot); + + snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", + replication_slot, plugin); + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + if (PQntuples(res) != 1 || PQnfields(res) != 4) + { + fprintf(stderr, + _("%s: could not init logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 4); + disconnect_and_exit(1); + } + + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 1)); + disconnect_and_exit(1); + } + startpos = ((uint64) hi) << 32 | lo; + + replication_slot = strdup(PQgetvalue(res, 0, 0)); + PQclear(res); + } + + + if (!do_start_slot) + disconnect_and_exit(0); + + while (true) + { + StreamLog(); + if (time_to_abort) + { + /* + * We've been Ctrl-C'ed. That's not an error, so exit without an + * errorcode. + */ + disconnect_and_exit(0); + } + else if (noloop) + { + fprintf(stderr, _("%s: disconnected.\n"), progname); + exit(1); + } + else + { + fprintf(stderr, + /* translator: check source for value for %d */ + _("%s: disconnected. Waiting %d seconds to try again.\n"), + progname, RECONNECT_SLEEP_TIME); + pg_usleep(RECONNECT_SLEEP_TIME * 1000000); + } + } +} diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index febe3d1a2b7..55f3d7f367e 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -11,22 +11,19 @@ * src/bin/pg_basebackup/receivelog.c *------------------------------------------------------------------------- */ + #include "postgres_fe.h" #include -#include -#include #include -/* for ntohl/htonl */ -#include -#include + +/* local includes */ +#include "receivelog.h" +#include "streamutil.h" #include "libpq-fe.h" #include "access/xlog_internal.h" -#include "receivelog.h" -#include "streamutil.h" - /* fd and filename for currently open WAL file */ static int walfile = -1; @@ -194,63 +191,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) } -/* - * Local version of GetCurrentTimestamp(), since we are not linked with - * backend code. The protocol always uses integer timestamps, regardless of - * server setting. - */ -static int64 -localGetCurrentTimestamp(void) -{ - int64 result; - struct timeval tp; - - gettimeofday(&tp, NULL); - - result = (int64) tp.tv_sec - - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); - - result = (result * USECS_PER_SEC) + tp.tv_usec; - - return result; -} - -/* - * Local version of TimestampDifference(), since we are not linked with - * backend code. - */ -static void -localTimestampDifference(int64 start_time, int64 stop_time, - long *secs, int *microsecs) -{ - int64 diff = stop_time - start_time; - - if (diff <= 0) - { - *secs = 0; - *microsecs = 0; - } - else - { - *secs = (long) (diff / USECS_PER_SEC); - *microsecs = (int) (diff % USECS_PER_SEC); - } -} - -/* - * Local version of TimestampDifferenceExceeds(), since we are not - * linked with backend code. - */ -static bool -localTimestampDifferenceExceeds(int64 start_time, - int64 stop_time, - int msec) -{ - int64 diff = stop_time - start_time; - - return (diff >= msec * INT64CONST(1000)); -} - /* * Check if a timeline history file exists. */ @@ -370,47 +310,6 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co return true; } -/* - * Converts an int64 to network byte order. - */ -static void -sendint64(int64 i, char *buf) -{ - uint32 n32; - - /* High order half first, since we're doing MSB-first */ - n32 = (uint32) (i >> 32); - n32 = htonl(n32); - memcpy(&buf[0], &n32, 4); - - /* Now the low order half */ - n32 = (uint32) i; - n32 = htonl(n32); - memcpy(&buf[4], &n32, 4); -} - -/* - * Converts an int64 from network byte order to native format. - */ -static int64 -recvint64(char *buf) -{ - int64 result; - uint32 h32; - uint32 l32; - - memcpy(&h32, buf, 4); - memcpy(&l32, buf + 4, 4); - h32 = ntohl(h32); - l32 = ntohl(l32); - - result = h32; - result <<= 32; - result |= l32; - - return result; -} - /* * Send a Standby Status Update message to server. */ @@ -422,16 +321,16 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) replybuf[len] = 'r'; len += 1; - sendint64(blockpos, &replybuf[len]); /* write */ + fe_sendint64(blockpos, &replybuf[len]); /* write */ len += 8; if (reportFlushPosition) - sendint64(lastFlushPosition, &replybuf[len]); /* flush */ + fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */ else - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ len += 8; - sendint64(now, &replybuf[len]); /* sendTime */ + fe_sendint64(now, &replybuf[len]); /* sendTime */ len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; @@ -864,9 +763,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Potentially send a status message to the master */ - now = localGetCurrentTimestamp(); + now = feGetCurrentTimestamp(); if (still_sending && standby_message_timeout > 0 && - localTimestampDifferenceExceeds(last_status, now, + feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) { /* Time to send feedback! */ @@ -895,10 +794,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int usecs; targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - localTimestampDifference(now, - targettime, - &secs, - &usecs); + feTimestampDifference(now, + targettime, + &secs, + &usecs); if (secs <= 0) timeout.tv_sec = 1; /* Always sleep at least 1 sec */ else @@ -1002,7 +901,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* If the server requested an immediate reply, send one. */ if (replyRequested && still_sending) { - now = localGetCurrentTimestamp(); + now = feGetCurrentTimestamp(); if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; @@ -1032,7 +931,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, r); goto error; } - blockpos = recvint64(©buf[1]); + blockpos = fe_recvint64(©buf[1]); /* Extract WAL location for this block */ xlogoff = blockpos % XLOG_SEG_SIZE; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 7c983cd604a..f4789a580ae 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -1,3 +1,5 @@ +#include "libpq-fe.h" + #include "access/xlogdefs.h" /* diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 041076ff1d7..e440dc4e244 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -12,10 +12,23 @@ */ #include "postgres_fe.h" -#include "streamutil.h" #include #include +#include +#include +#include + +/* for ntohl/htonl */ +#include +#include + +/* local includes */ +#include "receivelog.h" +#include "streamutil.h" + +#include "common/fe_memutils.h" +#include "datatype/timestamp.h" const char *progname; char *connection_string = NULL; @@ -23,6 +36,7 @@ char *dbhost = NULL; char *dbuser = NULL; char *dbport = NULL; char *replication_slot = NULL; +char *dbname = NULL; int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ static char *dbpassword = NULL; PGconn *conn = NULL; @@ -87,10 +101,10 @@ GetConnection(void) } keywords[i] = "dbname"; - values[i] = "replication"; + values[i] = dbname == NULL ? "replication" : dbname; i++; keywords[i] = "replication"; - values[i] = "true"; + values[i] = dbname == NULL ? "true" : "database"; i++; keywords[i] = "fallback_application_name"; values[i] = progname; @@ -212,3 +226,102 @@ GetConnection(void) return tmpconn; } + + +/* + * Frontend version of GetCurrentTimestamp(), since we are not linked with + * backend code. The protocol always uses integer timestamps, regardless of + * server setting. + */ +int64 +feGetCurrentTimestamp(void) +{ + int64 result; + struct timeval tp; + + gettimeofday(&tp, NULL); + + result = (int64) tp.tv_sec - + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + result = (result * USECS_PER_SEC) + tp.tv_usec; + + return result; +} + +/* + * Frontend version of TimestampDifference(), since we are not linked with + * backend code. + */ +void +feTimestampDifference(int64 start_time, int64 stop_time, + long *secs, int *microsecs) +{ + int64 diff = stop_time - start_time; + + if (diff <= 0) + { + *secs = 0; + *microsecs = 0; + } + else + { + *secs = (long) (diff / USECS_PER_SEC); + *microsecs = (int) (diff % USECS_PER_SEC); + } +} + +/* + * Frontend version of TimestampDifferenceExceeds(), since we are not + * linked with backend code. + */ +bool +feTimestampDifferenceExceeds(int64 start_time, + int64 stop_time, + int msec) +{ + int64 diff = stop_time - start_time; + + return (diff >= msec * INT64CONST(1000)); +} + +/* + * Converts an int64 to network byte order. + */ +void +fe_sendint64(int64 i, char *buf) +{ + uint32 n32; + + /* High order half first, since we're doing MSB-first */ + n32 = (uint32) (i >> 32); + n32 = htonl(n32); + memcpy(&buf[0], &n32, 4); + + /* Now the low order half */ + n32 = (uint32) i; + n32 = htonl(n32); + memcpy(&buf[4], &n32, 4); +} + +/* + * Converts an int64 from network byte order to native format. + */ +int64 +fe_recvint64(char *buf) +{ + int64 result; + uint32 h32; + uint32 l32; + + memcpy(&h32, buf, 4); + memcpy(&l32, buf + 4, 4); + h32 = ntohl(h32); + l32 = ntohl(l32); + + result = h32; + result <<= 32; + result |= l32; + + return result; +} diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 7c7d0228897..d0f3799d1e3 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -5,6 +5,7 @@ extern char *connection_string; extern char *dbhost; extern char *dbuser; extern char *dbport; +extern char *dbname; extern int dbgetpassword; extern char *replication_slot; @@ -12,3 +13,12 @@ extern char *replication_slot; extern PGconn *conn; extern PGconn *GetConnection(void); + +extern int64 feGetCurrentTimestamp(void); +extern void feTimestampDifference(int64 start_time, int64 stop_time, + long *secs, int *microsecs); + +extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time, + int msec); +extern void fe_sendint64(int64 i, char *buf); +extern int64 fe_recvint64(char *buf);