mirror of
https://github.com/postgres/postgres.git
synced 2025-06-06 00:02:36 -04:00
Allow parallel zstd compression when taking a base backup.
libzstd allows transparent parallel compression just by setting an option when creating the compression context, so permit that for both client and server-side backup compression. To use this, use something like pg_basebackup --compress WHERE-zstd:workers=N where WHERE is "client" or "server" and N is an integer. When compression is performed on the server side, this will spawn threads inside the PostgreSQL backend. While there is almost no PostgreSQL server code which is thread-safe, the threads here are used internally by libzstd and touch only data structures controlled by libzstd. Patch by me, based in part on earlier work by Dipesh Pandit and Jeevan Ladhe. Reviewed by Justin Pryzby. Discussion: http://postgr.es/m/CA+Tgmobj6u-nWF-j=FemygUhobhryLxf9h-wJN7W-2rSsseHNA@mail.gmail.com
This commit is contained in:
parent
c6863b8582
commit
51c0d186d9
@ -2739,17 +2739,23 @@ The commands accepted in replication mode are:
|
|||||||
option. If the value is an integer, it specifies the compression
|
option. If the value is an integer, it specifies the compression
|
||||||
level. Otherwise, it should be a comma-separated list of items,
|
level. Otherwise, it should be a comma-separated list of items,
|
||||||
each of the form <literal>keyword</literal> or
|
each of the form <literal>keyword</literal> or
|
||||||
<literal>keyword=value</literal>. Currently, the only supported
|
<literal>keyword=value</literal>. Currently, the supported keywords
|
||||||
keyword is <literal>level</literal>, which sets the compression
|
are <literal>level</literal> and <literal>workers</literal>.
|
||||||
level.
|
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
|
The <literal>level</literal> keyword sets the compression level.
|
||||||
For <literal>gzip</literal> the compression level should be an
|
For <literal>gzip</literal> the compression level should be an
|
||||||
integer between 1 and 9, for <literal>lz4</literal> an integer
|
integer between 1 and 9, for <literal>lz4</literal> an integer
|
||||||
between 1 and 12, and for <literal>zstd</literal> an integer
|
between 1 and 12, and for <literal>zstd</literal> an integer
|
||||||
between 1 and 22.
|
between 1 and 22.
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The <literal>workers</literal> keyword sets the number of threads
|
||||||
|
that should be used for parallel compression. Parallel compression
|
||||||
|
is supported only for <literal>zstd</literal>.
|
||||||
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
@ -424,8 +424,8 @@ PostgreSQL documentation
|
|||||||
integer, it specifies the compression level. Otherwise, it should be
|
integer, it specifies the compression level. Otherwise, it should be
|
||||||
a comma-separated list of items, each of the form
|
a comma-separated list of items, each of the form
|
||||||
<literal>keyword</literal> or <literal>keyword=value</literal>.
|
<literal>keyword</literal> or <literal>keyword=value</literal>.
|
||||||
Currently, the only supported keyword is <literal>level</literal>,
|
Currently, the supported keywords are <literal>level</literal>
|
||||||
which sets the compression level.
|
and <literal>workers</literal>.
|
||||||
</para>
|
</para>
|
||||||
<para>
|
<para>
|
||||||
If no compression level is specified, the default compression level
|
If no compression level is specified, the default compression level
|
||||||
|
@ -25,8 +25,8 @@ typedef struct bbsink_zstd
|
|||||||
/* Common information for all types of sink. */
|
/* Common information for all types of sink. */
|
||||||
bbsink base;
|
bbsink base;
|
||||||
|
|
||||||
/* Compression level */
|
/* Compression options */
|
||||||
int compresslevel;
|
bc_specification *compress;
|
||||||
|
|
||||||
ZSTD_CCtx *cctx;
|
ZSTD_CCtx *cctx;
|
||||||
ZSTD_outBuffer zstd_outBuf;
|
ZSTD_outBuffer zstd_outBuf;
|
||||||
@ -67,22 +67,13 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
|
|||||||
return NULL; /* keep compiler quiet */
|
return NULL; /* keep compiler quiet */
|
||||||
#else
|
#else
|
||||||
bbsink_zstd *sink;
|
bbsink_zstd *sink;
|
||||||
int compresslevel;
|
|
||||||
|
|
||||||
Assert(next != NULL);
|
Assert(next != NULL);
|
||||||
|
|
||||||
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
|
|
||||||
compresslevel = 0;
|
|
||||||
else
|
|
||||||
{
|
|
||||||
compresslevel = compress->level;
|
|
||||||
Assert(compresslevel >= 1 && compresslevel <= 22);
|
|
||||||
}
|
|
||||||
|
|
||||||
sink = palloc0(sizeof(bbsink_zstd));
|
sink = palloc0(sizeof(bbsink_zstd));
|
||||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
|
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
|
||||||
sink->base.bbs_next = next;
|
sink->base.bbs_next = next;
|
||||||
sink->compresslevel = compresslevel;
|
sink->compress = compress;
|
||||||
|
|
||||||
return &sink->base;
|
return &sink->base;
|
||||||
#endif
|
#endif
|
||||||
@ -99,16 +90,36 @@ bbsink_zstd_begin_backup(bbsink *sink)
|
|||||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||||
size_t output_buffer_bound;
|
size_t output_buffer_bound;
|
||||||
size_t ret;
|
size_t ret;
|
||||||
|
bc_specification *compress = mysink->compress;
|
||||||
|
|
||||||
mysink->cctx = ZSTD_createCCtx();
|
mysink->cctx = ZSTD_createCCtx();
|
||||||
if (!mysink->cctx)
|
if (!mysink->cctx)
|
||||||
elog(ERROR, "could not create zstd compression context");
|
elog(ERROR, "could not create zstd compression context");
|
||||||
|
|
||||||
|
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
|
||||||
|
{
|
||||||
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
|
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
|
||||||
mysink->compresslevel);
|
compress->level);
|
||||||
if (ZSTD_isError(ret))
|
if (ZSTD_isError(ret))
|
||||||
elog(ERROR, "could not set zstd compression level to %d: %s",
|
elog(ERROR, "could not set zstd compression level to %d: %s",
|
||||||
mysink->compresslevel, ZSTD_getErrorName(ret));
|
compress->level, ZSTD_getErrorName(ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* On older versions of libzstd, this option does not exist, and trying
|
||||||
|
* to set it will fail. Similarly for newer versions if they are
|
||||||
|
* compiled without threading support.
|
||||||
|
*/
|
||||||
|
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
|
||||||
|
compress->workers);
|
||||||
|
if (ZSTD_isError(ret))
|
||||||
|
ereport(ERROR,
|
||||||
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg("could not set compression worker count to %d: %s",
|
||||||
|
compress->workers, ZSTD_getErrorName(ret)));
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We need our own buffer, because we're going to pass different data to
|
* We need our own buffer, because we're going to pass different data to
|
||||||
|
@ -67,7 +67,6 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
|
|||||||
{
|
{
|
||||||
#ifdef USE_ZSTD
|
#ifdef USE_ZSTD
|
||||||
bbstreamer_zstd_frame *streamer;
|
bbstreamer_zstd_frame *streamer;
|
||||||
int compresslevel;
|
|
||||||
size_t ret;
|
size_t ret;
|
||||||
|
|
||||||
Assert(next != NULL);
|
Assert(next != NULL);
|
||||||
@ -88,19 +87,36 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Initialize stream compression preferences */
|
/* Set compression level, if specified */
|
||||||
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
|
if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
|
||||||
compresslevel = 0;
|
{
|
||||||
else
|
|
||||||
compresslevel = compress->level;
|
|
||||||
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
|
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
|
||||||
compresslevel);
|
compress->level);
|
||||||
if (ZSTD_isError(ret))
|
if (ZSTD_isError(ret))
|
||||||
{
|
{
|
||||||
pg_log_error("could not set zstd compression level to %d: %s",
|
pg_log_error("could not set zstd compression level to %d: %s",
|
||||||
compresslevel, ZSTD_getErrorName(ret));
|
compress->level, ZSTD_getErrorName(ret));
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set # of workers, if specified */
|
||||||
|
if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* On older versions of libzstd, this option does not exist, and
|
||||||
|
* trying to set it will fail. Similarly for newer versions if they
|
||||||
|
* are compiled without threading support.
|
||||||
|
*/
|
||||||
|
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
|
||||||
|
compress->workers);
|
||||||
|
if (ZSTD_isError(ret))
|
||||||
|
{
|
||||||
|
pg_log_error("could not set compression worker count to %d: %s",
|
||||||
|
compress->workers, ZSTD_getErrorName(ret));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Initialize the ZSTD output buffer. */
|
/* Initialize the ZSTD output buffer. */
|
||||||
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
|
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
|
||||||
|
@ -130,6 +130,11 @@ my @compression_failure_tests = (
|
|||||||
'invalid compression specification: found empty string where a compression option was expected',
|
'invalid compression specification: found empty string where a compression option was expected',
|
||||||
'failure on extra, empty compression option'
|
'failure on extra, empty compression option'
|
||||||
],
|
],
|
||||||
|
[
|
||||||
|
'gzip:workers=3',
|
||||||
|
'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
|
||||||
|
'failure on worker count for gzip'
|
||||||
|
],
|
||||||
);
|
);
|
||||||
for my $cft (@compression_failure_tests)
|
for my $cft (@compression_failure_tests)
|
||||||
{
|
{
|
||||||
|
@ -34,6 +34,12 @@ my @test_configuration = (
|
|||||||
'compression_method' => 'zstd',
|
'compression_method' => 'zstd',
|
||||||
'backup_flags' => ['--compress', 'server-zstd:5'],
|
'backup_flags' => ['--compress', 'server-zstd:5'],
|
||||||
'enabled' => check_pg_config("#define USE_ZSTD 1")
|
'enabled' => check_pg_config("#define USE_ZSTD 1")
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'compression_method' => 'parallel zstd',
|
||||||
|
'backup_flags' => ['--compress', 'server-zstd:workers=3'],
|
||||||
|
'enabled' => check_pg_config("#define USE_ZSTD 1"),
|
||||||
|
'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -55,8 +61,27 @@ for my $tc (@test_configuration)
|
|||||||
my @verify = ('pg_verifybackup', '-e', $backup_path);
|
my @verify = ('pg_verifybackup', '-e', $backup_path);
|
||||||
|
|
||||||
# A backup with a valid compression method should work.
|
# A backup with a valid compression method should work.
|
||||||
$primary->command_ok(\@backup,
|
my $backup_stdout = '';
|
||||||
"backup done, compression method \"$method\"");
|
my $backup_stderr = '';
|
||||||
|
my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
|
||||||
|
'2>', \$backup_stderr);
|
||||||
|
if ($backup_stdout ne '')
|
||||||
|
{
|
||||||
|
print "# standard output was:\n$backup_stdout";
|
||||||
|
}
|
||||||
|
if ($backup_stderr ne '')
|
||||||
|
{
|
||||||
|
print "# standard error was:\n$backup_stderr";
|
||||||
|
}
|
||||||
|
if (! $backup_result && $tc->{'possibly_unsupported'} &&
|
||||||
|
$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
|
||||||
|
{
|
||||||
|
skip "compression with $method not supported by this build", 2;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ok($backup_result, "backup done, compression $method");
|
||||||
|
}
|
||||||
|
|
||||||
# Make sure that it verifies OK.
|
# Make sure that it verifies OK.
|
||||||
$primary->command_ok(\@verify,
|
$primary->command_ok(\@verify,
|
||||||
|
@ -49,6 +49,15 @@ my @test_configuration = (
|
|||||||
'decompress_program' => $ENV{'ZSTD'},
|
'decompress_program' => $ENV{'ZSTD'},
|
||||||
'decompress_flags' => [ '-d' ],
|
'decompress_flags' => [ '-d' ],
|
||||||
'enabled' => check_pg_config("#define USE_ZSTD 1")
|
'enabled' => check_pg_config("#define USE_ZSTD 1")
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'compression_method' => 'parallel zstd',
|
||||||
|
'backup_flags' => ['--compress', 'client-zstd:workers=3'],
|
||||||
|
'backup_archive' => 'base.tar.zst',
|
||||||
|
'decompress_program' => $ENV{'ZSTD'},
|
||||||
|
'decompress_flags' => [ '-d' ],
|
||||||
|
'enabled' => check_pg_config("#define USE_ZSTD 1"),
|
||||||
|
'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -69,9 +78,27 @@ for my $tc (@test_configuration)
|
|||||||
'pg_basebackup', '-D', $backup_path,
|
'pg_basebackup', '-D', $backup_path,
|
||||||
'-Xfetch', '--no-sync', '-cfast', '-Ft');
|
'-Xfetch', '--no-sync', '-cfast', '-Ft');
|
||||||
push @backup, @{$tc->{'backup_flags'}};
|
push @backup, @{$tc->{'backup_flags'}};
|
||||||
$primary->command_ok(\@backup,
|
my $backup_stdout = '';
|
||||||
"client side backup, compression $method");
|
my $backup_stderr = '';
|
||||||
|
my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
|
||||||
|
'2>', \$backup_stderr);
|
||||||
|
if ($backup_stdout ne '')
|
||||||
|
{
|
||||||
|
print "# standard output was:\n$backup_stdout";
|
||||||
|
}
|
||||||
|
if ($backup_stderr ne '')
|
||||||
|
{
|
||||||
|
print "# standard error was:\n$backup_stderr";
|
||||||
|
}
|
||||||
|
if (! $backup_result && $tc->{'possibly_unsupported'} &&
|
||||||
|
$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
|
||||||
|
{
|
||||||
|
skip "compression with $method not supported by this build", 3;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ok($backup_result, "client side backup, compression $method");
|
||||||
|
}
|
||||||
|
|
||||||
# Verify that the we got the files we expected.
|
# Verify that the we got the files we expected.
|
||||||
my $backup_files = join(',',
|
my $backup_files = join(',',
|
||||||
|
@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
|
|||||||
result->level = expect_integer_value(keyword, value, result);
|
result->level = expect_integer_value(keyword, value, result);
|
||||||
result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
|
result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
|
||||||
}
|
}
|
||||||
|
else if (strcmp(keyword, "workers") == 0)
|
||||||
|
{
|
||||||
|
result->workers = expect_integer_value(keyword, value, result);
|
||||||
|
result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
result->parse_error =
|
result->parse_error =
|
||||||
psprintf(_("unknown compression option \"%s\""), keyword);
|
psprintf(_("unknown compression option \"%s\""), keyword);
|
||||||
@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec)
|
|||||||
min_level, max_level);
|
min_level, max_level);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Of the compression algorithms that we currently support, only zstd
|
||||||
|
* allows parallel workers.
|
||||||
|
*/
|
||||||
|
if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 &&
|
||||||
|
(spec->algorithm != BACKUP_COMPRESSION_ZSTD))
|
||||||
|
{
|
||||||
|
return psprintf(_("compression algorithm \"%s\" does not accept a worker count"),
|
||||||
|
get_bc_algorithm_name(spec->algorithm));
|
||||||
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -23,12 +23,14 @@ typedef enum bc_algorithm
|
|||||||
} bc_algorithm;
|
} bc_algorithm;
|
||||||
|
|
||||||
#define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0)
|
#define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0)
|
||||||
|
#define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1)
|
||||||
|
|
||||||
typedef struct bc_specification
|
typedef struct bc_specification
|
||||||
{
|
{
|
||||||
bc_algorithm algorithm;
|
bc_algorithm algorithm;
|
||||||
unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */
|
unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */
|
||||||
int level;
|
int level;
|
||||||
|
int workers;
|
||||||
char *parse_error; /* NULL if parsing was OK, else message */
|
char *parse_error; /* NULL if parsing was OK, else message */
|
||||||
} bc_specification;
|
} bc_specification;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user