diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index a0381e52f31..74b97cf126a 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -17,6 +17,10 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ backup_manifest.o \ basebackup.o \ + basebackup_copy.o \ + basebackup_progress.o \ + basebackup_sink.o \ + basebackup_throttle.o \ repl_gram.o \ slot.o \ slotfuncs.o \ diff --git a/src/backend/replication/backup_manifest.c b/src/backend/replication/backup_manifest.c index 04ca455ace8..4fe11a3b5cd 100644 --- a/src/backend/replication/backup_manifest.c +++ b/src/backend/replication/backup_manifest.c @@ -17,6 +17,7 @@ #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "replication/backup_manifest.h" +#include "replication/basebackup_sink.h" #include "utils/builtins.h" #include "utils/json.h" @@ -310,9 +311,8 @@ AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr, * Finalize the backup manifest, and send it to the client. */ void -SendBackupManifest(backup_manifest_info *manifest) +SendBackupManifest(backup_manifest_info *manifest, bbsink *sink) { - StringInfoData protobuf; uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH]; char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH]; size_t manifest_bytes_done = 0; @@ -352,38 +352,28 @@ SendBackupManifest(backup_manifest_info *manifest) (errcode_for_file_access(), errmsg("could not rewind temporary file"))); - /* Send CopyOutResponse message */ - pq_beginmessage(&protobuf, 'H'); - pq_sendbyte(&protobuf, 0); /* overall format */ - pq_sendint16(&protobuf, 0); /* natts */ - pq_endmessage(&protobuf); /* - * Send CopyData messages. - * - * We choose to read back the data from the temporary file in chunks of - * size BLCKSZ; this isn't necessary, but buffile.c uses that as the I/O - * size, so it seems to make sense to match that value here. + * Send the backup manifest. */ + bbsink_begin_manifest(sink); while (manifest_bytes_done < manifest->manifest_size) { - char manifestbuf[BLCKSZ]; size_t bytes_to_read; size_t rc; - bytes_to_read = Min(sizeof(manifestbuf), + bytes_to_read = Min(sink->bbs_buffer_length, manifest->manifest_size - manifest_bytes_done); - rc = BufFileRead(manifest->buffile, manifestbuf, bytes_to_read); + rc = BufFileRead(manifest->buffile, sink->bbs_buffer, + bytes_to_read); if (rc != bytes_to_read) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from temporary file: %m"))); - pq_putmessage('d', manifestbuf, bytes_to_read); + bbsink_manifest_contents(sink, bytes_to_read); manifest_bytes_done += bytes_to_read; } - - /* No more data, so send CopyDone message */ - pq_putemptymessage('c'); + bbsink_end_manifest(sink); /* Release resources */ BufFileClose(manifest->buffile); diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index b7359f43903..38c82c46196 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -17,13 +17,9 @@ #include #include "access/xlog_internal.h" /* for pg_start/stop_backup */ -#include "catalog/pg_type.h" #include "common/file_perm.h" #include "commands/defrem.h" -#include "commands/progress.h" #include "lib/stringinfo.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/pg_list.h" #include "pgstat.h" @@ -31,6 +27,7 @@ #include "port.h" #include "postmaster/syslogger.h" #include "replication/basebackup.h" +#include "replication/basebackup_sink.h" #include "replication/backup_manifest.h" #include "replication/walsender.h" #include "replication/walsender_private.h" @@ -46,6 +43,16 @@ #include "utils/resowner.h" #include "utils/timestamp.h" +/* + * How much data do we want to send in one CopyData message? Note that + * this may also result in reading the underlying files in chunks of this + * size. + * + * NB: The buffer size is required to be a multiple of the system block + * size, so use that value instead if it's bigger than our preference. + */ +#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ) + typedef struct { const char *label; @@ -59,27 +66,25 @@ typedef struct pg_checksum_type manifest_checksum_type; } basebackup_options; -static int64 sendTablespace(char *path, char *oid, bool sizeonly, +static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly, struct backup_manifest_info *manifest); -static int64 sendDir(const char *path, int basepathlen, bool sizeonly, +static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest, const char *spcoid); -static bool sendFile(const char *readfilename, const char *tarfilename, +static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid, backup_manifest_info *manifest, const char *spcoid); -static void sendFileWithContent(const char *filename, const char *content, +static void sendFileWithContent(bbsink *sink, const char *filename, + const char *content, backup_manifest_info *manifest); -static int64 _tarWriteHeader(const char *filename, const char *linktarget, - struct stat *statbuf, bool sizeonly); +static int64 _tarWriteHeader(bbsink *sink, const char *filename, + const char *linktarget, struct stat *statbuf, + bool sizeonly); +static void _tarWritePadding(bbsink *sink, int len); static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf); -static void send_int8_string(StringInfoData *buf, int64 intval); -static void SendBackupHeader(List *tablespaces); -static void perform_base_backup(basebackup_options *opt); +static void perform_base_backup(basebackup_options *opt, bbsink *sink); static void parse_basebackup_options(List *options, basebackup_options *opt); -static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const ListCell *a, const ListCell *b); -static void throttle(size_t increment); -static void update_basebackup_progress(int64 delta); static bool is_checksummed_file(const char *fullpath, const char *filename); static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset, const char *filename, bool partial_read_ok); @@ -90,46 +95,12 @@ static bool backup_started_in_recovery = false; /* Relative path of temporary statistics directory */ static char *statrelpath = NULL; -/* - * Size of each block sent into the tar stream for larger files. - */ -#define TAR_SEND_SIZE 32768 - -/* - * How frequently to throttle, as a fraction of the specified rate-second. - */ -#define THROTTLING_FREQUENCY 8 - -/* The actual number of bytes, transfer of which may cause sleep. */ -static uint64 throttling_sample; - -/* Amount of data already transferred but not yet throttled. */ -static int64 throttling_counter; - -/* The minimum time required to transfer throttling_sample bytes. */ -static TimeOffset elapsed_min_unit; - -/* The last check of the transfer rate. */ -static TimestampTz throttled_last; - -/* The starting XLOG position of the base backup. */ -static XLogRecPtr startptr; - /* Total number of checksum failures during base backup. */ static long long int total_checksum_failures; /* Do not verify checksums. */ static bool noverify_checksums = false; -/* - * Total amount of backup data that will be streamed. - * -1 means that the size is not estimated. - */ -static int64 backup_total = 0; - -/* Amount of backup data already streamed */ -static int64 backup_streamed = 0; - /* * Definition of one element part of an exclusion list, used for paths part * of checksum validation or base backups. "name" is the name of the file @@ -253,32 +224,22 @@ static const struct exclude_list_item noChecksumFiles[] = { * clobbered by longjmp" from stupider versions of gcc. */ static void -perform_base_backup(basebackup_options *opt) +perform_base_backup(basebackup_options *opt, bbsink *sink) { - TimeLineID starttli; + bbsink_state state; XLogRecPtr endptr; TimeLineID endtli; StringInfo labelfile; StringInfo tblspc_map_file; backup_manifest_info manifest; int datadirpathlen; - List *tablespaces = NIL; - backup_total = 0; - backup_streamed = 0; - pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid); - - /* - * If the estimation of the total backup size is disabled, make the - * backup_total column in the view return NULL by setting the parameter to - * -1. - */ - if (!opt->progress) - { - backup_total = -1; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, - backup_total); - } + /* Initial backup state, insofar as we know it now. */ + state.tablespaces = NIL; + state.tablespace_num = 0; + state.bytes_done = 0; + state.bytes_total = 0; + state.bytes_total_is_valid = false; /* we're going to use a BufFile, so we need a ResourceOwner */ Assert(CurrentResourceOwner == NULL); @@ -295,11 +256,11 @@ perform_base_backup(basebackup_options *opt) total_checksum_failures = 0; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT); - startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, - labelfile, &tablespaces, - tblspc_map_file); + basebackup_progress_wait_checkpoint(); + state.startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, + &state.starttli, + labelfile, &state.tablespaces, + tblspc_map_file); /* * Once do_pg_start_backup has been called, ensure that any failure causes @@ -312,7 +273,6 @@ perform_base_backup(basebackup_options *opt) { ListCell *lc; tablespaceinfo *ti; - int tblspc_streamed = 0; /* * Calculate the relative path of temporary statistics directory in @@ -329,7 +289,7 @@ perform_base_backup(basebackup_options *opt) /* Add a node for the base directory at the end */ ti = palloc0(sizeof(tablespaceinfo)); ti->size = -1; - tablespaces = lappend(tablespaces, ti); + state.tablespaces = lappend(state.tablespaces, ti); /* * Calculate the total backup size by summing up the size of each @@ -337,100 +297,53 @@ perform_base_backup(basebackup_options *opt) */ if (opt->progress) { - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE); + basebackup_progress_estimate_backup_size(); - foreach(lc, tablespaces) + foreach(lc, state.tablespaces) { tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc); if (tmp->path == NULL) - tmp->size = sendDir(".", 1, true, tablespaces, true, NULL, - NULL); + tmp->size = sendDir(sink, ".", 1, true, state.tablespaces, + true, NULL, NULL); else - tmp->size = sendTablespace(tmp->path, tmp->oid, true, + tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true, NULL); - backup_total += tmp->size; + state.bytes_total += tmp->size; } + state.bytes_total_is_valid = true; } - /* Report that we are now streaming database files as a base backup */ - { - const int index[] = { - PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_BACKUP_TOTAL, - PROGRESS_BASEBACKUP_TBLSPC_TOTAL - }; - const int64 val[] = { - PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP, - backup_total, list_length(tablespaces) - }; - - pgstat_progress_update_multi_param(3, index, val); - } - - /* Send the starting position of the backup */ - SendXlogRecPtrResult(startptr, starttli); - - /* Send tablespace header */ - SendBackupHeader(tablespaces); - - /* Setup and activate network throttling, if client requested it */ - if (opt->maxrate > 0) - { - throttling_sample = - (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY; - - /* - * The minimum amount of time for throttling_sample bytes to be - * transferred. - */ - elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; - - /* Enable throttling. */ - throttling_counter = 0; - - /* The 'real data' starts now (header was ignored). */ - throttled_last = GetCurrentTimestamp(); - } - else - { - /* Disable throttling. */ - throttling_counter = -1; - } + /* notify basebackup sink about start of backup */ + bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH); /* Send off our tablespaces one by one */ - foreach(lc, tablespaces) + foreach(lc, state.tablespaces) { tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc); - StringInfoData buf; - - /* Send CopyOutResponse message */ - pq_beginmessage(&buf, 'H'); - pq_sendbyte(&buf, 0); /* overall format */ - pq_sendint16(&buf, 0); /* natts */ - pq_endmessage(&buf); if (ti->path == NULL) { struct stat statbuf; bool sendtblspclinks = true; + bbsink_begin_archive(sink, "base.tar"); + /* In the main tar, include the backup_label first... */ - sendFileWithContent(BACKUP_LABEL_FILE, labelfile->data, + sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data, &manifest); /* Then the tablespace_map file, if required... */ if (opt->sendtblspcmapfile) { - sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data, + sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data, &manifest); sendtblspclinks = false; } /* Then the bulk of the files... */ - sendDir(".", 1, false, tablespaces, sendtblspclinks, - &manifest, NULL); + sendDir(sink, ".", 1, false, state.tablespaces, + sendtblspclinks, &manifest, NULL); /* ... and pg_control after everything else. */ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0) @@ -438,32 +351,33 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", XLOG_CONTROL_FILE))); - sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, + sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, &manifest, NULL); } else - sendTablespace(ti->path, ti->oid, false, &manifest); + { + char *archive_name = psprintf("%s.tar", ti->oid); + + bbsink_begin_archive(sink, archive_name); + + sendTablespace(sink, ti->path, ti->oid, false, &manifest); + } /* * If we're including WAL, and this is the main data directory we - * don't terminate the tar stream here. Instead, we will append - * the xlog files below and terminate it then. This is safe since - * the main data directory is always sent *last*. + * don't treat this as the end of the tablespace. Instead, we will + * include the xlog files below and stop afterwards. This is safe + * since the main data directory is always sent *last*. */ if (opt->includewal && ti->path == NULL) { - Assert(lnext(tablespaces, lc) == NULL); + Assert(lnext(state.tablespaces, lc) == NULL); } else - pq_putemptymessage('c'); /* CopyDone */ - - tblspc_streamed++; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED, - tblspc_streamed); + bbsink_end_archive(sink); } - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE); + basebackup_progress_wait_wal_archive(&state); endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli); } PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false)); @@ -489,8 +403,7 @@ perform_base_backup(basebackup_options *opt) ListCell *lc; TimeLineID tli; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL); + basebackup_progress_transfer_wal(); /* * I'd rather not worry about timelines here, so scan pg_wal and @@ -501,8 +414,8 @@ perform_base_backup(basebackup_options *opt) * shouldn't be such files, but if there are, there's little harm in * including them. */ - XLByteToSeg(startptr, startsegno, wal_segment_size); - XLogFileName(firstoff, starttli, startsegno, wal_segment_size); + XLByteToSeg(state.startptr, startsegno, wal_segment_size); + XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size); XLByteToPrevSeg(endptr, endsegno, wal_segment_size); XLogFileName(lastoff, endtli, endsegno, wal_segment_size); @@ -528,7 +441,7 @@ perform_base_backup(basebackup_options *opt) * Before we go any further, check that none of the WAL segments we * need were removed. */ - CheckXLogRemoved(startsegno, starttli); + CheckXLogRemoved(startsegno, state.starttli); /* * Sort the WAL filenames. We want to send the files in order from @@ -555,7 +468,7 @@ perform_base_backup(basebackup_options *opt) { char startfname[MAXFNAMELEN]; - XLogFileName(startfname, starttli, startsegno, + XLogFileName(startfname, state.starttli, startsegno, wal_segment_size); ereport(ERROR, (errmsg("could not find WAL file \"%s\"", startfname))); @@ -590,7 +503,6 @@ perform_base_backup(basebackup_options *opt) { char *walFileName = (char *) lfirst(lc); int fd; - char buf[TAR_SEND_SIZE]; size_t cnt; pgoff_t len = 0; @@ -629,22 +541,17 @@ perform_base_backup(basebackup_options *opt) } /* send the WAL file itself */ - _tarWriteHeader(pathbuf, NULL, &statbuf, false); + _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false); - while ((cnt = basebackup_read_file(fd, buf, - Min(sizeof(buf), + while ((cnt = basebackup_read_file(fd, sink->bbs_buffer, + Min(sink->bbs_buffer_length, wal_segment_size - len), len, pathbuf, true)) > 0) { CheckXLogRemoved(segno, tli); - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); - update_basebackup_progress(cnt); + bbsink_archive_contents(sink, cnt); len += cnt; - throttle(cnt); if (len == wal_segment_size) break; @@ -673,7 +580,7 @@ perform_base_backup(basebackup_options *opt) * complete segment. */ StatusFilePath(pathbuf, walFileName, ".done"); - sendFileWithContent(pathbuf, "", &manifest); + sendFileWithContent(sink, pathbuf, "", &manifest); } /* @@ -696,23 +603,23 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf))); - sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, + sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid, &manifest, NULL); /* unconditionally mark file as archived */ StatusFilePath(pathbuf, fname, ".done"); - sendFileWithContent(pathbuf, "", &manifest); + sendFileWithContent(sink, pathbuf, "", &manifest); } - /* Send CopyDone message for the last tar file */ - pq_putemptymessage('c'); + bbsink_end_archive(sink); } - AddWALInfoToBackupManifest(&manifest, startptr, starttli, endptr, endtli); + AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli, + endptr, endtli); - SendBackupManifest(&manifest); + SendBackupManifest(&manifest, sink); - SendXlogRecPtrResult(endptr, endtli); + bbsink_end_backup(sink, endptr, endtli); if (total_checksum_failures) { @@ -738,7 +645,7 @@ perform_base_backup(basebackup_options *opt) /* clean up the resource owner we created */ WalSndResourceCleanup(true); - pgstat_progress_end_command(); + basebackup_progress_done(); } /* @@ -943,6 +850,7 @@ void SendBaseBackup(BaseBackupCmd *cmd) { basebackup_options opt; + bbsink *sink; parse_basebackup_options(cmd->options, &opt); @@ -957,158 +865,40 @@ SendBaseBackup(BaseBackupCmd *cmd) set_ps_display(activitymsg); } - perform_base_backup(&opt); -} + /* Create a basic basebackup sink. */ + sink = bbsink_copytblspc_new(); -static void -send_int8_string(StringInfoData *buf, int64 intval) -{ - char is[32]; + /* Set up network throttling, if client requested it */ + if (opt.maxrate > 0) + sink = bbsink_throttle_new(sink, opt.maxrate); - sprintf(is, INT64_FORMAT, intval); - pq_sendint32(buf, strlen(is)); - pq_sendbytes(buf, is, strlen(is)); -} - -static void -SendBackupHeader(List *tablespaces) -{ - StringInfoData buf; - ListCell *lc; - - /* Construct and send the directory information */ - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 3); /* 3 fields */ - - /* First field - spcoid */ - pq_sendstring(&buf, "spcoid"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, OIDOID); /* type oid */ - pq_sendint16(&buf, 4); /* typlen */ - pq_sendint32(&buf, 0); /* typmod */ - pq_sendint16(&buf, 0); /* format code */ - - /* Second field - spclocation */ - pq_sendstring(&buf, "spclocation"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, TEXTOID); - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - /* Third field - size */ - pq_sendstring(&buf, "size"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, INT8OID); - pq_sendint16(&buf, 8); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - - foreach(lc, tablespaces) - { - tablespaceinfo *ti = lfirst(lc); - - /* Send one datarow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 3); /* number of columns */ - if (ti->path == NULL) - { - pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ - pq_sendint32(&buf, -1); - } - else - { - Size len; - - len = strlen(ti->oid); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->oid, len); - - len = strlen(ti->path); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->path, len); - } - if (ti->size >= 0) - send_int8_string(&buf, ti->size / 1024); - else - pq_sendint32(&buf, -1); /* NULL */ - - pq_endmessage(&buf); - } - - /* Send a CommandComplete message */ - pq_puttextmessage('C', "SELECT"); -} - -/* - * Send a single resultset containing just a single - * XLogRecPtr record (in text format) - */ -static void -SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) -{ - StringInfoData buf; - char str[MAXFNAMELEN]; - Size len; - - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 2); /* 2 fields */ - - /* Field headers */ - pq_sendstring(&buf, "recptr"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, TEXTOID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - pq_sendstring(&buf, "tli"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ + /* Set up progress reporting. */ + sink = bbsink_progress_new(sink, opt.progress); /* - * int8 may seem like a surprising data type for this, but in theory int4 - * would not be wide enough for this, as TimeLineID is unsigned. + * Perform the base backup, but make sure we clean up the bbsink even if + * an error occurs. */ - pq_sendint32(&buf, INT8OID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - - /* Data row */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 2); /* number of columns */ - - len = snprintf(str, sizeof(str), - "%X/%X", LSN_FORMAT_ARGS(ptr)); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - len = snprintf(str, sizeof(str), "%u", tli); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - pq_endmessage(&buf); - - /* Send a CommandComplete message */ - pq_puttextmessage('C', "SELECT"); + PG_TRY(); + { + perform_base_backup(&opt, sink); + } + PG_FINALLY(); + { + bbsink_cleanup(sink); + } + PG_END_TRY(); } /* * Inject a file with given name and content in the output tar stream. */ static void -sendFileWithContent(const char *filename, const char *content, +sendFileWithContent(bbsink *sink, const char *filename, const char *content, backup_manifest_info *manifest) { struct stat statbuf; - int pad, + int bytes_done = 0, len; pg_checksum_context checksum_ctx; @@ -1134,26 +924,24 @@ sendFileWithContent(const char *filename, const char *content, statbuf.st_mode = pg_file_create_mode; statbuf.st_size = len; - _tarWriteHeader(filename, NULL, &statbuf, false); - /* Send the contents as a CopyData message */ - pq_putmessage('d', content, len); - update_basebackup_progress(len); - - /* Pad to a multiple of the tar block size. */ - pad = tarPaddingBytesRequired(len); - if (pad > 0) - { - char buf[TAR_BLOCK_SIZE]; - - MemSet(buf, 0, pad); - pq_putmessage('d', buf, pad); - update_basebackup_progress(pad); - } + _tarWriteHeader(sink, filename, NULL, &statbuf, false); if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0) elog(ERROR, "could not update checksum of file \"%s\"", filename); + while (bytes_done < len) + { + size_t remaining = len - bytes_done; + size_t nbytes = Min(sink->bbs_buffer_length, remaining); + + memcpy(sink->bbs_buffer, content, nbytes); + bbsink_archive_contents(sink, nbytes); + bytes_done += nbytes; + } + + _tarWritePadding(sink, len); + AddFileToBackupManifest(manifest, NULL, filename, len, (pg_time_t) statbuf.st_mtime, &checksum_ctx); } @@ -1166,7 +954,7 @@ sendFileWithContent(const char *filename, const char *content, * Only used to send auxiliary tablespaces, not PGDATA. */ static int64 -sendTablespace(char *path, char *spcoid, bool sizeonly, +sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly, backup_manifest_info *manifest) { int64 size; @@ -1196,11 +984,11 @@ sendTablespace(char *path, char *spcoid, bool sizeonly, return 0; } - size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, + size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, sizeonly); /* Send all the files in the tablespace version directory */ - size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, manifest, + size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest, spcoid); return size; @@ -1219,8 +1007,8 @@ sendTablespace(char *path, char *spcoid, bool sizeonly, * as it will be sent separately in the tablespace_map file. */ static int64 -sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, - bool sendtblspclinks, backup_manifest_info *manifest, +sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest, const char *spcoid) { DIR *dir; @@ -1380,8 +1168,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name); convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); excludeFound = true; break; } @@ -1398,8 +1186,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath); convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); continue; } @@ -1412,15 +1200,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { /* If pg_wal is a symlink, write it as a directory anyway */ convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); /* * Also send archive_status directory (by hackishly reusing * statbuf from above ...). */ - size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL, + &statbuf, sizeonly); continue; /* don't recurse into pg_wal */ } @@ -1451,7 +1239,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, pathbuf))); linkpath[rllen] = '\0'; - size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath, &statbuf, sizeonly); #else @@ -1475,7 +1263,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, * Store a directory entry in the tar file so we can get the * permissions right. */ - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf, sizeonly); /* @@ -1507,7 +1295,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, skip_this_dir = true; if (!skip_this_dir) - size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, + size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, manifest, spcoid); } else if (S_ISREG(statbuf.st_mode)) @@ -1515,7 +1303,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sent = false; if (!sizeonly) - sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, + sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf, true, isDbDir ? atooid(lastDir + 1) : InvalidOid, manifest, spcoid); @@ -1592,21 +1380,19 @@ is_checksummed_file(const char *fullpath, const char *filename) * and the file did not exist. */ static bool -sendFile(const char *readfilename, const char *tarfilename, +sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid, backup_manifest_info *manifest, const char *spcoid) { int fd; BlockNumber blkno = 0; bool block_retry = false; - char buf[TAR_SEND_SIZE]; uint16 checksum; int checksum_failures = 0; off_t cnt; int i; pgoff_t len = 0; char *page; - size_t pad; PageHeader phdr; int segmentno = 0; char *segmentpath; @@ -1627,7 +1413,7 @@ sendFile(const char *readfilename, const char *tarfilename, errmsg("could not open file \"%s\": %m", readfilename))); } - _tarWriteHeader(tarfilename, NULL, statbuf, false); + _tarWriteHeader(sink, tarfilename, NULL, statbuf, false); if (!noverify_checksums && DataChecksumsEnabled()) { @@ -1668,9 +1454,11 @@ sendFile(const char *readfilename, const char *tarfilename, */ while (len < statbuf->st_size) { + size_t remaining = statbuf->st_size - len; + /* Try to read some more data. */ - cnt = basebackup_read_file(fd, buf, - Min(sizeof(buf), statbuf->st_size - len), + cnt = basebackup_read_file(fd, sink->bbs_buffer, + Min(sink->bbs_buffer_length, remaining), len, readfilename, true); /* @@ -1687,7 +1475,7 @@ sendFile(const char *readfilename, const char *tarfilename, * TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of * BLCKSZ bytes. */ - Assert(TAR_SEND_SIZE % BLCKSZ == 0); + Assert((sink->bbs_buffer_length % BLCKSZ) == 0); if (verify_checksum && (cnt % BLCKSZ != 0)) { @@ -1703,7 +1491,7 @@ sendFile(const char *readfilename, const char *tarfilename, { for (i = 0; i < cnt / BLCKSZ; i++) { - page = buf + BLCKSZ * i; + page = sink->bbs_buffer + BLCKSZ * i; /* * Only check pages which have not been modified since the @@ -1713,7 +1501,7 @@ sendFile(const char *readfilename, const char *tarfilename, * this case. We also skip completely new pages, since they * don't have a checksum yet. */ - if (!PageIsNew(page) && PageGetLSN(page) < startptr) + if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr) { checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE); phdr = (PageHeader) page; @@ -1735,7 +1523,8 @@ sendFile(const char *readfilename, const char *tarfilename, /* Reread the failed block */ reread_cnt = - basebackup_read_file(fd, buf + BLCKSZ * i, + basebackup_read_file(fd, + sink->bbs_buffer + BLCKSZ * i, BLCKSZ, len + BLCKSZ * i, readfilename, false); @@ -1782,34 +1571,29 @@ sendFile(const char *readfilename, const char *tarfilename, } } - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); - update_basebackup_progress(cnt); + bbsink_archive_contents(sink, cnt); /* Also feed it to the checksum machinery. */ - if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0) + if (pg_checksum_update(&checksum_ctx, + (uint8 *) sink->bbs_buffer, cnt) < 0) elog(ERROR, "could not update checksum of base backup"); len += cnt; - throttle(cnt); } /* If the file was truncated while we were sending it, pad it with zeros */ - if (len < statbuf->st_size) + while (len < statbuf->st_size) { - MemSet(buf, 0, sizeof(buf)); - while (len < statbuf->st_size) - { - cnt = Min(sizeof(buf), statbuf->st_size - len); - pq_putmessage('d', buf, cnt); - if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0) - elog(ERROR, "could not update checksum of base backup"); - update_basebackup_progress(cnt); - len += cnt; - throttle(cnt); - } + size_t remaining = statbuf->st_size - len; + size_t nbytes = Min(sink->bbs_buffer_length, remaining); + + MemSet(sink->bbs_buffer, 0, nbytes); + if (pg_checksum_update(&checksum_ctx, + (uint8 *) sink->bbs_buffer, + nbytes) < 0) + elog(ERROR, "could not update checksum of base backup"); + bbsink_archive_contents(sink, nbytes); + len += nbytes; } /* @@ -1817,13 +1601,7 @@ sendFile(const char *readfilename, const char *tarfilename, * of data is probably not worth throttling, and is not checksummed * because it's not actually part of the file.) */ - pad = tarPaddingBytesRequired(len); - if (pad > 0) - { - MemSet(buf, 0, pad); - pq_putmessage('d', buf, pad); - update_basebackup_progress(pad); - } + _tarWritePadding(sink, len); CloseTransientFile(fd); @@ -1846,18 +1624,28 @@ sendFile(const char *readfilename, const char *tarfilename, return true; } - static int64 -_tarWriteHeader(const char *filename, const char *linktarget, +_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget, struct stat *statbuf, bool sizeonly) { - char h[TAR_BLOCK_SIZE]; enum tarError rc; if (!sizeonly) { - rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size, - statbuf->st_mode, statbuf->st_uid, statbuf->st_gid, + /* + * As of this writing, the smallest supported block size is 1kB, which + * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a + * multiple of BLCKSZ, it should be safe to assume that the buffer is + * large enough to fit an entire tar block. We double-check by means + * of these assertions. + */ + StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ, + "BLCKSZ too small for tar block"); + Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE); + + rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget, + statbuf->st_size, statbuf->st_mode, + statbuf->st_uid, statbuf->st_gid, statbuf->st_mtime); switch (rc) @@ -1879,11 +1667,32 @@ _tarWriteHeader(const char *filename, const char *linktarget, elog(ERROR, "unrecognized tar error: %d", rc); } - pq_putmessage('d', h, sizeof(h)); - update_basebackup_progress(sizeof(h)); + bbsink_archive_contents(sink, TAR_BLOCK_SIZE); } - return sizeof(h); + return TAR_BLOCK_SIZE; +} + +/* + * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE. + */ +static void +_tarWritePadding(bbsink *sink, int len) +{ + int pad = tarPaddingBytesRequired(len); + + /* + * As in _tarWriteHeader, it should be safe to assume that the buffer is + * large enough that we don't need to do this in multiple chunks. + */ + Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE); + Assert(pad <= TAR_BLOCK_SIZE); + + if (pad > 0) + { + MemSet(sink->bbs_buffer, 0, pad); + bbsink_archive_contents(sink, pad); + } } /* @@ -1902,113 +1711,6 @@ convert_link_to_directory(const char *pathbuf, struct stat *statbuf) statbuf->st_mode = S_IFDIR | pg_dir_create_mode; } -/* - * Increment the network transfer counter by the given number of bytes, - * and sleep if necessary to comply with the requested network transfer - * rate. - */ -static void -throttle(size_t increment) -{ - TimeOffset elapsed_min; - - if (throttling_counter < 0) - return; - - throttling_counter += increment; - if (throttling_counter < throttling_sample) - return; - - /* How much time should have elapsed at minimum? */ - elapsed_min = elapsed_min_unit * - (throttling_counter / throttling_sample); - - /* - * Since the latch could be set repeatedly because of concurrently WAL - * activity, sleep in a loop to ensure enough time has passed. - */ - for (;;) - { - TimeOffset elapsed, - sleep; - int wait_result; - - /* Time elapsed since the last measurement (and possible wake up). */ - elapsed = GetCurrentTimestamp() - throttled_last; - - /* sleep if the transfer is faster than it should be */ - sleep = elapsed_min - elapsed; - if (sleep <= 0) - break; - - ResetLatch(MyLatch); - - /* We're eating a potentially set latch, so check for interrupts */ - CHECK_FOR_INTERRUPTS(); - - /* - * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be - * the maximum time to sleep. Thus the cast to long is safe. - */ - wait_result = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - (long) (sleep / 1000), - WAIT_EVENT_BASE_BACKUP_THROTTLE); - - if (wait_result & WL_LATCH_SET) - CHECK_FOR_INTERRUPTS(); - - /* Done waiting? */ - if (wait_result & WL_TIMEOUT) - break; - } - - /* - * As we work with integers, only whole multiple of throttling_sample was - * processed. The rest will be done during the next call of this function. - */ - throttling_counter %= throttling_sample; - - /* - * Time interval for the remaining amount and possible next increments - * starts now. - */ - throttled_last = GetCurrentTimestamp(); -} - -/* - * Increment the counter for the amount of data already streamed - * by the given number of bytes, and update the progress report for - * pg_stat_progress_basebackup. - */ -static void -update_basebackup_progress(int64 delta) -{ - const int index[] = { - PROGRESS_BASEBACKUP_BACKUP_STREAMED, - PROGRESS_BASEBACKUP_BACKUP_TOTAL - }; - int64 val[2]; - int nparam = 0; - - backup_streamed += delta; - val[nparam++] = backup_streamed; - - /* - * Avoid overflowing past 100% or the full size. This may make the total - * size number change as we approach the end of the backup (the estimate - * will always be wrong if WAL is included), but that's better than having - * the done column be bigger than the total. - */ - if (backup_total > -1 && backup_streamed > backup_total) - { - backup_total = backup_streamed; - val[nparam++] = backup_total; - } - - pgstat_progress_update_multi_param(nparam, index, val); -} - /* * Read some data from a file, setting a wait event and reporting any error * encountered. diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c new file mode 100644 index 00000000000..30bab4546ef --- /dev/null +++ b/src/backend/replication/basebackup_copy.c @@ -0,0 +1,335 @@ +/*------------------------------------------------------------------------- + * + * basebackup_copy.c + * send basebackup archives using one COPY OUT operation per + * tablespace, and an additional COPY OUT for the backup manifest + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_copy.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_type_d.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "replication/basebackup.h" +#include "replication/basebackup_sink.h" + +static void bbsink_copytblspc_begin_backup(bbsink *sink); +static void bbsink_copytblspc_begin_archive(bbsink *sink, + const char *archive_name); +static void bbsink_copytblspc_archive_contents(bbsink *sink, size_t len); +static void bbsink_copytblspc_end_archive(bbsink *sink); +static void bbsink_copytblspc_begin_manifest(bbsink *sink); +static void bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len); +static void bbsink_copytblspc_end_manifest(bbsink *sink); +static void bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); +static void bbsink_copytblspc_cleanup(bbsink *sink); + +static void SendCopyOutResponse(void); +static void SendCopyData(const char *data, size_t len); +static void SendCopyDone(void); +static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); +static void SendTablespaceList(List *tablespaces); +static void send_int8_string(StringInfoData *buf, int64 intval); + +const bbsink_ops bbsink_copytblspc_ops = { + .begin_backup = bbsink_copytblspc_begin_backup, + .begin_archive = bbsink_copytblspc_begin_archive, + .archive_contents = bbsink_copytblspc_archive_contents, + .end_archive = bbsink_copytblspc_end_archive, + .begin_manifest = bbsink_copytblspc_begin_manifest, + .manifest_contents = bbsink_copytblspc_manifest_contents, + .end_manifest = bbsink_copytblspc_end_manifest, + .end_backup = bbsink_copytblspc_end_backup, + .cleanup = bbsink_copytblspc_cleanup +}; + +/* + * Create a new 'copytblspc' bbsink. + */ +bbsink * +bbsink_copytblspc_new(void) +{ + bbsink *sink = palloc0(sizeof(bbsink)); + + *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_copytblspc_ops; + + return sink; +} + +/* + * Begin backup. + */ +static void +bbsink_copytblspc_begin_backup(bbsink *sink) +{ + bbsink_state *state = sink->bbs_state; + + /* Create a suitable buffer. */ + sink->bbs_buffer = palloc(sink->bbs_buffer_length); + + /* Tell client the backup start location. */ + SendXlogRecPtrResult(state->startptr, state->starttli); + + /* Send client a list of tablespaces. */ + SendTablespaceList(state->tablespaces); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* + * Each archive is set as a separate stream of COPY data, and thus begins + * with a CopyOutResponse message. + */ +static void +bbsink_copytblspc_begin_archive(bbsink *sink, const char *archive_name) +{ + SendCopyOutResponse(); +} + +/* + * Each chunk of data within the archive is sent as a CopyData message. + */ +static void +bbsink_copytblspc_archive_contents(bbsink *sink, size_t len) +{ + SendCopyData(sink->bbs_buffer, len); +} + +/* + * The archive is terminated by a CopyDone message. + */ +static void +bbsink_copytblspc_end_archive(bbsink *sink) +{ + SendCopyDone(); +} + +/* + * The backup manifest is sent as a separate stream of COPY data, and thus + * begins with a CopyOutResponse message. + */ +static void +bbsink_copytblspc_begin_manifest(bbsink *sink) +{ + SendCopyOutResponse(); +} + +/* + * Each chunk of manifest data is sent using a CopyData message. + */ +static void +bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len) +{ + SendCopyData(sink->bbs_buffer, len); +} + +/* + * When we've finished sending the manifest, send a CopyDone message. + */ +static void +bbsink_copytblspc_end_manifest(bbsink *sink) +{ + SendCopyDone(); +} + +/* + * Send end-of-backup wire protocol messages. + */ +static void +bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + SendXlogRecPtrResult(endptr, endtli); +} + +/* + * Cleanup. + */ +static void +bbsink_copytblspc_cleanup(bbsink *sink) +{ + /* Nothing to do. */ +} + +/* + * Send a CopyOutResponse message. + */ +static void +SendCopyOutResponse(void) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); +} + +/* + * Send a CopyData message. + */ +static void +SendCopyData(const char *data, size_t len) +{ + pq_putmessage('d', data, len); +} + +/* + * Send a CopyDone message. + */ +static void +SendCopyDone(void) +{ + pq_putemptymessage('c'); +} + +/* + * Send a single resultset containing just a single + * XLogRecPtr record (in text format) + */ +static void +SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) +{ + StringInfoData buf; + char str[MAXFNAMELEN]; + Size len; + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 2); /* 2 fields */ + + /* Field headers */ + pq_sendstring(&buf, "recptr"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, TEXTOID); /* type oid */ + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + pq_sendstring(&buf, "tli"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + + /* + * int8 may seem like a surprising data type for this, but in theory int4 + * would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint32(&buf, INT8OID); /* type oid */ + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 2); /* number of columns */ + + len = snprintf(str, sizeof(str), + "%X/%X", LSN_FORMAT_ARGS(ptr)); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, str, len); + + len = snprintf(str, sizeof(str), "%u", tli); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, str, len); + + pq_endmessage(&buf); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* + * Send a result set via libpq describing the tablespace list. + */ +static void +SendTablespaceList(List *tablespaces) +{ + StringInfoData buf; + ListCell *lc; + + /* Construct and send the directory information */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 3); /* 3 fields */ + + /* First field - spcoid */ + pq_sendstring(&buf, "spcoid"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, OIDOID); /* type oid */ + pq_sendint16(&buf, 4); /* typlen */ + pq_sendint32(&buf, 0); /* typmod */ + pq_sendint16(&buf, 0); /* format code */ + + /* Second field - spclocation */ + pq_sendstring(&buf, "spclocation"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, TEXTOID); + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + /* Third field - size */ + pq_sendstring(&buf, "size"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, INT8OID); + pq_sendint16(&buf, 8); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + foreach(lc, tablespaces) + { + tablespaceinfo *ti = lfirst(lc); + + /* Send one datarow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 3); /* number of columns */ + if (ti->path == NULL) + { + pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ + pq_sendint32(&buf, -1); + } + else + { + Size len; + + len = strlen(ti->oid); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, ti->oid, len); + + len = strlen(ti->path); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, ti->path, len); + } + if (ti->size >= 0) + send_int8_string(&buf, ti->size / 1024); + else + pq_sendint32(&buf, -1); /* NULL */ + + pq_endmessage(&buf); + } +} + +/* + * Send a 64-bit integer as a string via the wire protocol. + */ +static void +send_int8_string(StringInfoData *buf, int64 intval) +{ + char is[32]; + + sprintf(is, INT64_FORMAT, intval); + pq_sendint32(buf, strlen(is)); + pq_sendbytes(buf, is, strlen(is)); +} diff --git a/src/backend/replication/basebackup_progress.c b/src/backend/replication/basebackup_progress.c new file mode 100644 index 00000000000..e1a196251ef --- /dev/null +++ b/src/backend/replication/basebackup_progress.c @@ -0,0 +1,246 @@ +/*------------------------------------------------------------------------- + * + * basebackup_progress.c + * Basebackup sink implementing progress tracking, including but not + * limited to command progress reporting. + * + * This should be used even if the PROGRESS option to the replication + * command BASE_BACKUP is not specified. Without that option, we won't + * have tallied up the size of the files that are going to need to be + * backed up, but we can still report to the command progress reporting + * facility how much data we've processed. + * + * Moreover, we also use this as a convenient place to update certain + * fields of the bbsink_state. That work is accurately described as + * keeping track of our progress, but it's not just for introspection. + * We need those fields to be updated properly in order for base backups + * to work. + * + * This particular basebackup sink requires extra callbacks that most base + * backup sinks don't. Rather than cramming those into the interface, we just + * have a few extra functions here that basebackup.c can call. (We could put + * the logic directly into that file as it's fairly simple, but it seems + * cleaner to have everything related to progress reporting in one place.) + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_progress.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "commands/progress.h" +#include "miscadmin.h" +#include "replication/basebackup.h" +#include "replication/basebackup_sink.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "utils/timestamp.h" + +static void bbsink_progress_begin_backup(bbsink *sink); +static void bbsink_progress_archive_contents(bbsink *sink, size_t len); +static void bbsink_progress_end_archive(bbsink *sink); + +const bbsink_ops bbsink_progress_ops = { + .begin_backup = bbsink_progress_begin_backup, + .begin_archive = bbsink_forward_begin_archive, + .archive_contents = bbsink_progress_archive_contents, + .end_archive = bbsink_progress_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_forward_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup, + .cleanup = bbsink_forward_cleanup +}; + +/* + * Create a new basebackup sink that performs progress tracking functions and + * forwards data to a successor sink. + */ +bbsink * +bbsink_progress_new(bbsink *next, bool estimate_backup_size) +{ + bbsink *sink; + + Assert(next != NULL); + + sink = palloc0(sizeof(bbsink)); + *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops; + sink->bbs_next = next; + + /* + * Report that a base backup is in progress, and set the total size of the + * backup to -1, which will get translated to NULL. If we're estimating + * the backup size, we'll insert the real estimate when we have it. + */ + pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid); + pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1); + + return sink; +} + +/* + * Progress reporting at start of backup. + */ +static void +bbsink_progress_begin_backup(bbsink *sink) +{ + const int index[] = { + PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_BACKUP_TOTAL, + PROGRESS_BASEBACKUP_TBLSPC_TOTAL + }; + int64 val[3]; + + /* + * Report that we are now streaming database files as a base backup. Also + * advertise the number of tablespaces, and, if known, the estimated total + * backup size. + */ + val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP; + if (sink->bbs_state->bytes_total_is_valid) + val[1] = sink->bbs_state->bytes_total; + else + val[1] = -1; + val[2] = list_length(sink->bbs_state->tablespaces); + pgstat_progress_update_multi_param(3, index, val); + + /* Delegate to next sink. */ + bbsink_forward_begin_backup(sink); +} + +/* + * End-of archive progress reporting. + */ +static void +bbsink_progress_end_archive(bbsink *sink) +{ + /* + * We expect one archive per tablespace, so reaching the end of an archive + * also means reaching the end of a tablespace. (Some day we might have a + * reason to decouple these concepts.) + * + * If WAL is included in the backup, we'll mark the last tablespace + * complete before the last archive is complete, so we need a guard here + * to ensure that the number of tablespaces streamed doesn't exceed the + * total. + */ + if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces)) + pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED, + sink->bbs_state->tablespace_num + 1); + + /* Delegate to next sink. */ + bbsink_forward_end_archive(sink); + + /* + * This is a convenient place to update the bbsink_state's notion of which + * is the current tablespace. Note that the bbsink_state object is shared + * across all bbsink objects involved, but we're the outermost one and + * this is the very last thing we do. + */ + sink->bbs_state->tablespace_num++; +} + +/* + * Handle progress tracking for new archive contents. + * + * Increment the counter for the amount of data already streamed + * by the given number of bytes, and update the progress report for + * pg_stat_progress_basebackup. + */ +static void +bbsink_progress_archive_contents(bbsink *sink, size_t len) +{ + bbsink_state *state = sink->bbs_state; + const int index[] = { + PROGRESS_BASEBACKUP_BACKUP_STREAMED, + PROGRESS_BASEBACKUP_BACKUP_TOTAL + }; + int64 val[2]; + int nparam = 0; + + /* First update bbsink_state with # of bytes done. */ + state->bytes_done += len; + + /* Now forward to next sink. */ + bbsink_forward_archive_contents(sink, len); + + /* Prepare to set # of bytes done for command progress reporting. */ + val[nparam++] = state->bytes_done; + + /* + * We may also want to update # of total bytes, to avoid overflowing past + * 100% or the full size. This may make the total size number change as we + * approach the end of the backup (the estimate will always be wrong if + * WAL is included), but that's better than having the done column be + * bigger than the total. + */ + if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total) + val[nparam++] = state->bytes_done; + + pgstat_progress_update_multi_param(nparam, index, val); +} + +/* + * Advertise that we are waiting for the start-of-backup checkpoint. + */ +void +basebackup_progress_wait_checkpoint(void) +{ + pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT); +} + +/* + * Advertise that we are estimating the backup size. + */ +void +basebackup_progress_estimate_backup_size(void) +{ + pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE); +} + +/* + * Advertise that we are waiting for WAL archiving at end-of-backup. + */ +void +basebackup_progress_wait_wal_archive(bbsink_state *state) +{ + const int index[] = { + PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_TBLSPC_STREAMED + }; + int64 val[2]; + + /* + * We report having finished all tablespaces at this point, even if the + * archive for the main tablespace is still open, because what's going to + * be added is WAL files, not files that are really from the main + * tablespace. + */ + val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE; + val[1] = list_length(state->tablespaces); + pgstat_progress_update_multi_param(2, index, val); +} + +/* + * Advertise that we are transferring WAL files into the final archive. + */ +void +basebackup_progress_transfer_wal(void) +{ + pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, + PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL); +} + +/* + * Advertise that we are no longer performing a backup. + */ +void +basebackup_progress_done(void) +{ + pgstat_progress_end_command(); +} diff --git a/src/backend/replication/basebackup_sink.c b/src/backend/replication/basebackup_sink.c new file mode 100644 index 00000000000..4a47854f81f --- /dev/null +++ b/src/backend/replication/basebackup_sink.c @@ -0,0 +1,125 @@ +/*------------------------------------------------------------------------- + * + * basebackup_sink.c + * Default implementations for bbsink (basebackup sink) callbacks. + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * src/backend/replication/basebackup_sink.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "replication/basebackup_sink.h" + +/* + * Forward begin_backup callback. + * + * Only use this implementation if you want the bbsink you're implementing to + * share a buffer with the succesor bbsink. + */ +void +bbsink_forward_begin_backup(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + Assert(sink->bbs_state != NULL); + bbsink_begin_backup(sink->bbs_next, sink->bbs_state, + sink->bbs_buffer_length); + sink->bbs_buffer = sink->bbs_next->bbs_buffer; +} + +/* + * Forward begin_archive callback. + */ +void +bbsink_forward_begin_archive(bbsink *sink, const char *archive_name) +{ + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, archive_name); +} + +/* + * Forward archive_contents callback. + * + * Code that wants to use this should initalize its own bbs_buffer and + * bbs_buffer_length fields to the values from the successor sink. In cases + * where the buffer isn't shared, the data needs to be copied before forwarding + * the callback. We don't do try to do that here, because there's really no + * reason to have separately allocated buffers containing the same identical + * data. + */ +void +bbsink_forward_archive_contents(bbsink *sink, size_t len) +{ + Assert(sink->bbs_next != NULL); + Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer); + Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length); + bbsink_archive_contents(sink->bbs_next, len); +} + +/* + * Forward end_archive callback. + */ +void +bbsink_forward_end_archive(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_end_archive(sink->bbs_next); +} + +/* + * Forward begin_manifest callback. + */ +void +bbsink_forward_begin_manifest(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_begin_manifest(sink->bbs_next); +} + +/* + * Forward manifest_contents callback. + * + * As with the archive_contents callback, it's expected that the buffer is + * shared. + */ +void +bbsink_forward_manifest_contents(bbsink *sink, size_t len) +{ + Assert(sink->bbs_next != NULL); + Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer); + Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length); + bbsink_manifest_contents(sink->bbs_next, len); +} + +/* + * Forward end_manifest callback. + */ +void +bbsink_forward_end_manifest(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_end_manifest(sink->bbs_next); +} + +/* + * Forward end_backup callback. + */ +void +bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli) +{ + Assert(sink->bbs_next != NULL); + bbsink_end_backup(sink->bbs_next, endptr, endtli); +} + +/* + * Forward cleanup callback. + */ +void +bbsink_forward_cleanup(bbsink *sink) +{ + Assert(sink->bbs_next != NULL); + bbsink_cleanup(sink->bbs_next); +} diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c new file mode 100644 index 00000000000..f163931f8a3 --- /dev/null +++ b/src/backend/replication/basebackup_throttle.c @@ -0,0 +1,199 @@ +/*------------------------------------------------------------------------- + * + * basebackup_throttle.c + * Basebackup sink implementing throttling. Data is forwarded to the + * next base backup sink in the chain at a rate no greater than the + * configured maximum. + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_throttle.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "replication/basebackup_sink.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "utils/timestamp.h" + +typedef struct bbsink_throttle +{ + /* Common information for all types of sink. */ + bbsink base; + + /* The actual number of bytes, transfer of which may cause sleep. */ + uint64 throttling_sample; + + /* Amount of data already transferred but not yet throttled. */ + int64 throttling_counter; + + /* The minimum time required to transfer throttling_sample bytes. */ + TimeOffset elapsed_min_unit; + + /* The last check of the transfer rate. */ + TimestampTz throttled_last; +} bbsink_throttle; + +static void bbsink_throttle_begin_backup(bbsink *sink); +static void bbsink_throttle_archive_contents(bbsink *sink, size_t len); +static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len); +static void throttle(bbsink_throttle *sink, size_t increment); + +const bbsink_ops bbsink_throttle_ops = { + .begin_backup = bbsink_throttle_begin_backup, + .begin_archive = bbsink_forward_begin_archive, + .archive_contents = bbsink_throttle_archive_contents, + .end_archive = bbsink_forward_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_throttle_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup, + .cleanup = bbsink_forward_cleanup +}; + +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* + * Create a new basebackup sink that performs throttling and forwards data + * to a successor sink. + */ +bbsink * +bbsink_throttle_new(bbsink *next, uint32 maxrate) +{ + bbsink_throttle *sink; + + Assert(next != NULL); + Assert(maxrate > 0); + + sink = palloc0(sizeof(bbsink_throttle)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops; + sink->base.bbs_next = next; + + sink->throttling_sample = + (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample bytes to be + * transferred. + */ + sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + return &sink->base; +} + +/* + * There's no real work to do here, but we need to record the current time so + * that it can be used for future calculations. + */ +static void +bbsink_throttle_begin_backup(bbsink *sink) +{ + bbsink_throttle *mysink = (bbsink_throttle *) sink; + + bbsink_forward_begin_backup(sink); + + /* The 'real data' starts now (header was ignored). */ + mysink->throttled_last = GetCurrentTimestamp(); +} + +/* + * First throttle, and then pass archive contents to next sink. + */ +static void +bbsink_throttle_archive_contents(bbsink *sink, size_t len) +{ + throttle((bbsink_throttle *) sink, len); + + bbsink_forward_archive_contents(sink, len); +} + +/* + * First throttle, and then pass manifest contents to next sink. + */ +static void +bbsink_throttle_manifest_contents(bbsink *sink, size_t len) +{ + throttle((bbsink_throttle *) sink, len); + + bbsink_forward_manifest_contents(sink->bbs_next, len); +} + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(bbsink_throttle *sink, size_t increment) +{ + TimeOffset elapsed_min; + + Assert(sink->throttling_counter >= 0); + + sink->throttling_counter += increment; + if (sink->throttling_counter < sink->throttling_sample) + return; + + /* How much time should have elapsed at minimum? */ + elapsed_min = sink->elapsed_min_unit * + (sink->throttling_counter / sink->throttling_sample); + + /* + * Since the latch could be set repeatedly because of concurrently WAL + * activity, sleep in a loop to ensure enough time has passed. + */ + for (;;) + { + TimeOffset elapsed, + sleep; + int wait_result; + + /* Time elapsed since the last measurement (and possible wake up). */ + elapsed = GetCurrentTimestamp() - sink->throttled_last; + + /* sleep if the transfer is faster than it should be */ + sleep = elapsed_min - elapsed; + if (sleep <= 0) + break; + + ResetLatch(MyLatch); + + /* We're eating a potentially set latch, so check for interrupts */ + CHECK_FOR_INTERRUPTS(); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + (long) (sleep / 1000), + WAIT_EVENT_BASE_BACKUP_THROTTLE); + + if (wait_result & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + + /* Done waiting? */ + if (wait_result & WL_TIMEOUT) + break; + } + + /* + * As we work with integers, only whole multiple of throttling_sample was + * processed. The rest will be done during the next call of this function. + */ + sink->throttling_counter %= sink->throttling_sample; + + /* + * Time interval for the remaining amount and possible next increments + * starts now. + */ + sink->throttled_last = GetCurrentTimestamp(); +} diff --git a/src/include/replication/backup_manifest.h b/src/include/replication/backup_manifest.h index 099108910ce..16ed7eec9bb 100644 --- a/src/include/replication/backup_manifest.h +++ b/src/include/replication/backup_manifest.h @@ -12,9 +12,9 @@ #ifndef BACKUP_MANIFEST_H #define BACKUP_MANIFEST_H -#include "access/xlogdefs.h" #include "common/checksum_helper.h" #include "pgtime.h" +#include "replication/basebackup_sink.h" #include "storage/buffile.h" typedef enum manifest_option @@ -47,7 +47,8 @@ extern void AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr, TimeLineID starttli, XLogRecPtr endptr, TimeLineID endtli); -extern void SendBackupManifest(backup_manifest_info *manifest); + +extern void SendBackupManifest(backup_manifest_info *manifest, bbsink *sink); extern void FreeBackupManifest(backup_manifest_info *manifest); #endif diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h new file mode 100644 index 00000000000..e6c073c5674 --- /dev/null +++ b/src/include/replication/basebackup_sink.h @@ -0,0 +1,296 @@ +/*------------------------------------------------------------------------- + * + * basebackup_sink.h + * API for filtering or sending to a final destination the archives + * produced by the base backup process + * + * Taking a base backup produces one archive per tablespace directory, + * plus a backup manifest unless that feature has been disabled. The + * goal of the backup process is to put those archives and that manifest + * someplace, possibly after postprocessing them in some way. A 'bbsink' + * is an object to which those archives, and the manifest if present, + * can be sent. + * + * In practice, there will be a chain of 'bbsink' objects rather than + * just one, with callbacks being forwarded from one to the next, + * possibly with modification. Each object is responsible for a + * single task e.g. command progress reporting, throttling, or + * communication with the client. + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * src/include/replication/basebackup_sink.h + * + *------------------------------------------------------------------------- + */ +#ifndef BASEBACKUP_SINK_H +#define BASEBACKUP_SINK_H + +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" + +/* Forward declarations. */ +struct bbsink; +struct bbsink_ops; +typedef struct bbsink bbsink; +typedef struct bbsink_ops bbsink_ops; + +/* + * Overall backup state shared by all bbsink objects for a backup. + * + * Before calling bbstate_begin_backup, caller must initiate a bbsink_state + * object which will last for the lifetime of the backup, and must thereafter + * update it as required before each new call to a bbsink method. The bbsink + * will retain a pointer to the state object and will consult it to understand + * the progress of the backup. + * + * 'tablespaces' is a list of tablespaceinfo objects. It must be set before + * calling bbstate_begin_backup() and must not be modified thereafter. + * + * 'tablespace_num' is the index of the current tablespace within the list + * stored in 'tablespaces'. + * + * 'bytes_done' is the number of bytes read so far from $PGDATA. + * + * 'bytes_total' is the total number of bytes estimated to be present in + * $PGDATA, if we have estimated this. + * + * 'bytes_total_is_valid' is true if and only if a proper estimate has been + * stored into 'bytes_total'. + * + * 'startptr' and 'starttli' identify the point in the WAL stream at which + * the backup began. They must be set before calling bbstate_begin_backup() + * and must not be modified thereafter. + */ +typedef struct bbsink_state +{ + List *tablespaces; + int tablespace_num; + uint64 bytes_done; + uint64 bytes_total; + bool bytes_total_is_valid; + XLogRecPtr startptr; + TimeLineID starttli; +} bbsink_state; + +/* + * Common data for any type of basebackup sink. + * + * 'bbs_ops' is the relevant callback table. + * + * 'bbs_buffer' is the buffer into which data destined for the bbsink + * should be stored. It must be a multiple of BLCKSZ. + * + * 'bbs_buffer_length' is the allocated length of the buffer. + * + * 'bbs_next' is a pointer to another bbsink to which this bbsink is + * forwarding some or all operations. + * + * 'bbs_state' is a pointer to the bbsink_state object for this backup. + * Every bbsink associated with this backup should point to the same + * underlying state object. + * + * In general it is expected that the values of these fields are set when + * a bbsink is created and that they do not change thereafter. It's OK + * to modify the data to which bbs_buffer or bbs_state point, but no changes + * should be made to the contents of this struct. + */ +struct bbsink +{ + const bbsink_ops *bbs_ops; + char *bbs_buffer; + size_t bbs_buffer_length; + bbsink *bbs_next; + bbsink_state *bbs_state; +}; + +/* + * Callbacks for a base backup sink. + * + * All of these callbacks are required. If a particular callback just needs to + * forward the call to sink->bbs_next, use bbsink_forward_ as + * the callback. + * + * Callers should always invoke these callbacks via the bbsink_* inline + * functions rather than calling them directly. + */ +struct bbsink_ops +{ + /* + * This callback is invoked just once, at the very start of the backup. It + * must set bbs_buffer to point to a chunk of storage where at least + * bbs_buffer_length bytes of data can be written. + */ + void (*begin_backup) (bbsink *sink); + + /* + * For each archive transmitted to a bbsink, there will be one call to the + * begin_archive() callback, some number of calls to the + * archive_contents() callback, and then one call to the end_archive() + * callback. + * + * Before invoking the archive_contents() callback, the caller should copy + * a number of bytes equal to what will be passed as len into bbs_buffer, + * but not more than bbs_buffer_length. + * + * It's generally good if the buffer is as full as possible before the + * archive_contents() callback is invoked, but it's not worth expending + * extra cycles to make sure it's absolutely 100% full. + */ + void (*begin_archive) (bbsink *sink, const char *archive_name); + void (*archive_contents) (bbsink *sink, size_t len); + void (*end_archive) (bbsink *sink); + + /* + * If a backup manifest is to be transmitted to a bbsink, there will be + * one call to the begin_manifest() callback, some number of calls to the + * manifest_contents() callback, and then one call to the end_manifest() + * callback. These calls will occur after all archives are transmitted. + * + * The rules for invoking the manifest_contents() callback are the same as + * for the archive_contents() callback above. + */ + void (*begin_manifest) (bbsink *sink); + void (*manifest_contents) (bbsink *sink, size_t len); + void (*end_manifest) (bbsink *sink); + + /* + * This callback is invoked just once, after all archives and the manifest + * have been sent. + */ + void (*end_backup) (bbsink *sink, XLogRecPtr endptr, TimeLineID endtli); + + /* + * If a backup is aborted by an error, this callback is invoked before the + * bbsink object is destroyed, so that it can release any resources that + * would not be released automatically. If no error occurs, this callback + * is invoked after the end_backup callback. + */ + void (*cleanup) (bbsink *sink); +}; + +/* Begin a backup. */ +static inline void +bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length) +{ + Assert(sink != NULL); + + Assert(buffer_length > 0); + + sink->bbs_state = state; + sink->bbs_buffer_length = buffer_length; + sink->bbs_ops->begin_backup(sink); + + Assert(sink->bbs_buffer != NULL); + Assert((sink->bbs_buffer_length % BLCKSZ) == 0); +} + +/* Begin an archive. */ +static inline void +bbsink_begin_archive(bbsink *sink, const char *archive_name) +{ + Assert(sink != NULL); + + sink->bbs_ops->begin_archive(sink, archive_name); +} + +/* Process some of the contents of an archive. */ +static inline void +bbsink_archive_contents(bbsink *sink, size_t len) +{ + Assert(sink != NULL); + + /* + * The caller should make a reasonable attempt to fill the buffer before + * calling this function, so it shouldn't be completely empty. Nor should + * it be filled beyond capacity. + */ + Assert(len > 0 && len <= sink->bbs_buffer_length); + + sink->bbs_ops->archive_contents(sink, len); +} + +/* Finish an archive. */ +static inline void +bbsink_end_archive(bbsink *sink) +{ + Assert(sink != NULL); + + sink->bbs_ops->end_archive(sink); +} + +/* Begin the backup manifest. */ +static inline void +bbsink_begin_manifest(bbsink *sink) +{ + Assert(sink != NULL); + + sink->bbs_ops->begin_manifest(sink); +} + +/* Process some of the manifest contents. */ +static inline void +bbsink_manifest_contents(bbsink *sink, size_t len) +{ + Assert(sink != NULL); + + /* See comments in bbsink_archive_contents. */ + Assert(len > 0 && len <= sink->bbs_buffer_length); + + sink->bbs_ops->manifest_contents(sink, len); +} + +/* Finish the backup manifest. */ +static inline void +bbsink_end_manifest(bbsink *sink) +{ + Assert(sink != NULL); + + sink->bbs_ops->end_manifest(sink); +} + +/* Finish a backup. */ +static inline void +bbsink_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli) +{ + Assert(sink != NULL); + Assert(sink->bbs_state->tablespace_num == list_length(sink->bbs_state->tablespaces)); + + sink->bbs_ops->end_backup(sink, endptr, endtli); +} + +/* Release resources before destruction. */ +static inline void +bbsink_cleanup(bbsink *sink) +{ + Assert(sink != NULL); + + sink->bbs_ops->cleanup(sink); +} + +/* Forwarding callbacks. Use these to pass operations through to next sink. */ +extern void bbsink_forward_begin_backup(bbsink *sink); +extern void bbsink_forward_begin_archive(bbsink *sink, + const char *archive_name); +extern void bbsink_forward_archive_contents(bbsink *sink, size_t len); +extern void bbsink_forward_end_archive(bbsink *sink); +extern void bbsink_forward_begin_manifest(bbsink *sink); +extern void bbsink_forward_manifest_contents(bbsink *sink, size_t len); +extern void bbsink_forward_end_manifest(bbsink *sink); +extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); +extern void bbsink_forward_cleanup(bbsink *sink); + +/* Constructors for various types of sinks. */ +extern bbsink *bbsink_copytblspc_new(void); +extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); +extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); + +/* Extra interface functions for progress reporting. */ +extern void basebackup_progress_wait_checkpoint(void); +extern void basebackup_progress_estimate_backup_size(void); +extern void basebackup_progress_wait_wal_archive(bbsink_state *); +extern void basebackup_progress_transfer_wal(void); +extern void basebackup_progress_done(void); + +#endif