Compare commits

...

7 Commits

Author SHA1 Message Date
Amit Kapila
22f7e61a63 Clean-ups for 776621a5e4 and 7329240437.
Following are a few clean-ups related to failover option support in slots:
1. Improve the documentation in create_subscription.sgml.
2. Remove the spurious blank line in subscriptioncmds.c.
3. Remove the NOTICE for alter_replication_slot in subscriptioncmds.c as
we would sometimes print it even when nothing has changed. One can find
the change by enabling log_replication_commands on the publisher.
4. Optimize ReplicationSlotAlter() function to prevent disk flushing when
the slot's data remains unchanged.

Author: Hou Zhijie
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
Discussion: https://postgr.es/m/OS0PR01MB57164904651FB588A518E98894472@OS0PR01MB5716.jpnprd01.prod.outlook.com
2024-02-07 10:04:04 +05:30
Michael Paquier
b9d6038d70 Simplify signature of CopyAttributeOutCSV() in copyto.c
This has come up in 2889fd23be56, reverted later on, and is still useful
on its own to reduce a bit the differences between the code paths
dedicated to CSV and text.

Discussion: https://postgr.es/m/ZcCKwAeFrlOqPBuN@paquier.xyz
2024-02-07 12:28:55 +09:00
Michael Paquier
1aa8324b81 Revert "Refactor CopyAttributeOut{CSV,Text}() to use a callback in COPY TO"
This reverts commit 2889fd23be56, following a discussion with Andres
Freund as this callback, being called once per attribute when sending a
relation's row, can involve a lot of indirect function calls (more
attributes to deal with means more impact).  The effects of a dispatch
at this level would become more visible when improving the per-row code
execution of COPY TO, impacting future potential performance
improvements.

Discussion: https://postgr.es/m/20240206014125.qofww7ew3dx3v3uk@awork3.anarazel.de
2024-02-07 08:04:26 +09:00
Alvaro Herrera
e4b27b5355
Change initial use of pg_atomic_write_u64 to init
This only matters when using atomics emulation with semaphores.

Per buildfarm member rorqual.
2024-02-06 12:08:39 +01:00
Alvaro Herrera
d172b717c6
Use atomic access for SlruShared->latest_page_number
The new concurrency model proposed for slru.c to improve performance
does not include any single lock that would coordinate processes
doing concurrent reads/writes on SlruShared->latest_page_number.
We can instead use atomic reads and writes for that variable.

Author: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Andrey M. Borodin <x4mmm@yandex-team.ru>
Discussion: https://postgr.es/m/CAFiTN-vzDvNz=ExGXz6gdyjtzGixKSqs0mKHMmaQ8sOSEFZ33A@mail.gmail.com
2024-02-06 10:54:10 +01:00
John Naylor
b83033c3cf Further cosmetic review of hashfn_unstable.h
In follow-up to e97b672c8,
* Flesh out comments explaining the incremental interface
* Clarify detection of zero bytes when hashing aligned C strings

The latter was suggested and reviewed by Jeff Davis

Discussion: https://postgr.es/m/48e8f8bbe0be9c789f98776c7438244ab7a7cc63.camel%40j-davis.com
2024-02-06 14:49:06 +07:00
John Naylor
9ed3ee5001 Simplify initialization of incremental hash state
The standalone functions fasthash{32,64} use length for two purposes:
how many bytes to hash, and how to perturb the internal seed.

Developers using the incremental interface may not know the length
ahead of time (e.g. for C strings). In this case, it's advised to
pass length to the finalizer, but initialization still needed some
length up front, in the form of a placeholder macro.

Separate the concerns by having the standalone functions perturb the
internal seed themselves from their own length parameter, allowing
to remove "len" from fasthash_init(), as well as the placeholder macro.

Discussion: https://postgr.es/m/CANWCAZbTUk2LOyhsFo33gjLyLAHZ7ucXCi5K9u%3D%2BPtnTShDKtw%40mail.gmail.com
2024-02-06 14:39:36 +07:00
12 changed files with 134 additions and 119 deletions

View File

@ -117,9 +117,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
command should connect to the publisher at all. The default
is <literal>true</literal>. Setting this to
<literal>false</literal> will force the values of
<literal>create_slot</literal>, <literal>enabled</literal>,
<literal>copy_data</literal>, and <literal>failover</literal>
to <literal>false</literal>.
<literal>create_slot</literal>, <literal>enabled</literal> and
<literal>copy_data</literal> to <literal>false</literal>.
(You cannot combine setting <literal>connect</literal>
to <literal>false</literal> with
setting <literal>create_slot</literal>, <literal>enabled</literal>,

View File

@ -1591,9 +1591,7 @@ CREATE DATABASE foo WITH TEMPLATE template0;
information might have to be changed. If the subscription needs to
be enabled for
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
then same needs to be done by executing
<link linkend="sql-altersubscription-params-set">
<literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
execute <link linkend="sql-altersubscription-params-set"><literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
after the slot has been created. It might also be appropriate to
truncate the target tables before initiating a new full table copy. If users
intend to copy initial data during refresh they must create the slot with

View File

@ -766,14 +766,10 @@ StartupCLOG(void)
TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid);
int64 pageno = TransactionIdToPage(xid);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/*
* Initialize our idea of the latest page number.
*/
XactCtl->shared->latest_page_number = pageno;
LWLockRelease(XactSLRULock);
pg_atomic_write_u64(&XactCtl->shared->latest_page_number, pageno);
}
/*

View File

@ -689,9 +689,7 @@ ActivateCommitTs(void)
/*
* Re-Initialize our idea of the latest page number.
*/
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
CommitTsCtl->shared->latest_page_number = pageno;
LWLockRelease(CommitTsSLRULock);
pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
/*
* If CommitTs is enabled, but it wasn't in the previous server run, we
@ -1006,7 +1004,8 @@ commit_ts_redo(XLogReaderState *record)
* During XLOG replay, latest_page_number isn't set up yet; insert a
* suitable value to bypass the sanity test in SimpleLruTruncate.
*/
CommitTsCtl->shared->latest_page_number = trunc->pageno;
pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
trunc->pageno);
SimpleLruTruncate(CommitTsCtl, trunc->pageno);
}

View File

@ -2017,13 +2017,15 @@ StartupMultiXact(void)
* Initialize offset's idea of the latest page number.
*/
pageno = MultiXactIdToOffsetPage(multi);
MultiXactOffsetCtl->shared->latest_page_number = pageno;
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
/*
* Initialize member's idea of the latest page number.
*/
pageno = MXOffsetToMemberPage(offset);
MultiXactMemberCtl->shared->latest_page_number = pageno;
pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
pageno);
}
/*
@ -2047,14 +2049,15 @@ TrimMultiXact(void)
oldestMXactDB = MultiXactState->oldestMultiXactDB;
LWLockRelease(MultiXactGenLock);
/* Clean up offsets state */
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/*
* (Re-)Initialize our idea of the latest page number for offsets.
*/
pageno = MultiXactIdToOffsetPage(nextMXact);
MultiXactOffsetCtl->shared->latest_page_number = pageno;
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
/* Clean up offsets state */
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/*
* Zero out the remainder of the current offsets page. See notes in
@ -2081,14 +2084,16 @@ TrimMultiXact(void)
LWLockRelease(MultiXactOffsetSLRULock);
/* And the same for members */
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/*
* And the same for members.
*
* (Re-)Initialize our idea of the latest page number for members.
*/
pageno = MXOffsetToMemberPage(offset);
MultiXactMemberCtl->shared->latest_page_number = pageno;
pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
pageno);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/*
* Zero out the remainder of the current members page. See notes in
@ -3333,7 +3338,8 @@ multixact_redo(XLogReaderState *record)
* SimpleLruTruncate.
*/
pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff);
MultiXactOffsetCtl->shared->latest_page_number = pageno;
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff);
LWLockRelease(MultiXactTruncationLock);

View File

@ -17,7 +17,8 @@
* per-buffer LWLocks that synchronize I/O for each buffer. The control lock
* must be held to examine or modify any shared state. A process that is
* reading in or writing out a page buffer does not hold the control lock,
* only the per-buffer lock for the buffer it is working on.
* only the per-buffer lock for the buffer it is working on. One exception
* is latest_page_number, which is read and written using atomic ops.
*
* "Holding the control lock" means exclusive lock in all cases except for
* SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
@ -239,8 +240,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
shared->lsn_groups_per_page = nlsns;
shared->cur_lru_count = 0;
/* shared->latest_page_number will be set later */
pg_atomic_init_u64(&shared->latest_page_number, 0);
shared->slru_stats_idx = pgstat_get_slru_index(name);
@ -329,8 +329,15 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
/* Set the LSNs for this new page to zero */
SimpleLruZeroLSNs(ctl, slotno);
/* Assume this page is now the latest active page */
shared->latest_page_number = pageno;
/*
* Assume this page is now the latest active page.
*
* Note that because both this routine and SlruSelectLRUPage run with
* ControlLock held, it is not possible for this to be zeroing a page that
* SlruSelectLRUPage is going to evict simultaneously. Therefore, there's
* no memory barrier here.
*/
pg_atomic_write_u64(&shared->latest_page_number, pageno);
/* update the stats counter of zeroed pages */
pgstat_count_slru_page_zeroed(shared->slru_stats_idx);
@ -1113,9 +1120,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
shared->page_lru_count[slotno] = cur_count;
this_delta = 0;
}
/*
* If this page is the one most recently zeroed, don't consider it
* an eviction candidate. See comments in SimpleLruZeroPage for an
* explanation about the lack of a memory barrier here.
*/
this_page_number = shared->page_number[slotno];
if (this_page_number == shared->latest_page_number)
if (this_page_number ==
pg_atomic_read_u64(&shared->latest_page_number))
continue;
if (shared->page_status[slotno] == SLRU_PAGE_VALID)
{
if (this_delta > best_valid_delta ||
@ -1254,7 +1269,6 @@ void
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
{
SlruShared shared = ctl->shared;
int slotno;
/* update the stats counter of truncates */
pgstat_count_slru_truncate(shared->slru_stats_idx);
@ -1270,10 +1284,13 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
restart:
/*
* While we are holding the lock, make an important safety check: the
* current endpoint page must not be eligible for removal.
* An important safety check: the current endpoint page must not be
* eligible for removal. This check is just a backstop against wraparound
* bugs elsewhere in SLRU handling, so we don't care if we read a slightly
* outdated value; therefore we don't add a memory barrier.
*/
if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number),
cutoffPage))
{
LWLockRelease(shared->ControlLock);
ereport(LOG,
@ -1282,7 +1299,7 @@ restart:
return;
}
for (slotno = 0; slotno < shared->num_slots; slotno++)
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;

View File

@ -256,7 +256,7 @@ spcachekey_hash(SearchPathCacheKey key)
fasthash_state hs;
int sp_len;
fasthash_init(&hs, FH_UNKNOWN_LENGTH, 0);
fasthash_init(&hs, 0);
hs.accum = key.roleid;
fasthash_combine(&hs);

View File

@ -54,14 +54,6 @@ typedef enum CopyDest
COPY_CALLBACK, /* to callback function */
} CopyDest;
/*
* Per-format callback to send output representation of one attribute for
* a `string`. `use_quote` tracks if quotes are required in the output
* representation.
*/
typedef void (*CopyAttributeOut) (CopyToState cstate, const char *string,
bool use_quote);
/*
* This struct contains all the state variables used throughout a COPY TO
* operation.
@ -105,7 +97,6 @@ typedef struct CopyToStateData
MemoryContext copycontext; /* per-copy execution context */
FmgrInfo *out_functions; /* lookup info for output functions */
CopyAttributeOut copy_attribute_out; /* output representation callback */
MemoryContext rowcontext; /* per-row evaluation context */
uint64 bytes_processed; /* number of bytes processed so far */
} CopyToStateData;
@ -126,10 +117,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
static void EndCopy(CopyToState cstate);
static void ClosePipeToProgram(CopyToState cstate);
static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
/* Callbacks for copy_attribute_out */
static void CopyAttributeOutText(CopyToState cstate, const char *string,
bool use_quote);
static void CopyAttributeOutText(CopyToState cstate, const char *string);
static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
bool use_quote);
@ -445,15 +433,6 @@ BeginCopyTo(ParseState *pstate,
/* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
/* Set output representation callback */
if (!cstate->opts.binary)
{
if (cstate->opts.csv_mode)
cstate->copy_attribute_out = CopyAttributeOutCSV;
else
cstate->copy_attribute_out = CopyAttributeOutText;
}
/* Process the source/target relation or query */
if (rel)
{
@ -857,8 +836,10 @@ DoCopyTo(CopyToState cstate)
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
/* Ignore quotes */
cstate->copy_attribute_out(cstate, colname, false);
if (cstate->opts.csv_mode)
CopyAttributeOutCSV(cstate, colname, false);
else
CopyAttributeOutText(cstate, colname);
}
CopySendEndOfRow(cstate);
@ -968,9 +949,11 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
cstate->copy_attribute_out(cstate, string,
cstate->opts.force_quote_flags[attnum - 1]);
if (cstate->opts.csv_mode)
CopyAttributeOutCSV(cstate, string,
cstate->opts.force_quote_flags[attnum - 1]);
else
CopyAttributeOutText(cstate, string);
}
else
{
@ -1000,8 +983,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
} while (0)
static void
CopyAttributeOutText(CopyToState cstate, const char *string,
bool use_quote)
CopyAttributeOutText(CopyToState cstate, const char *string)
{
const char *ptr;
const char *start;

View File

@ -73,7 +73,6 @@
#define SUBOPT_LSN 0x00004000
#define SUBOPT_ORIGIN 0x00008000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@ -852,9 +851,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
(opts.failover || walrcv_server_version(wrconn) >= 170000))
{
walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
ereport(NOTICE,
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
opts.slot_name, opts.failover ? "true" : "false")));
}
}
PG_FINALLY();
@ -1547,10 +1543,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
ereport(NOTICE,
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
sub->slotname, opts.failover ? "true" : "false")));
}
PG_FINALLY();
{

View File

@ -696,12 +696,16 @@ ReplicationSlotAlter(const char *name, bool failover)
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
SpinLockRelease(&MyReplicationSlot->mutex);
if (MyReplicationSlot->data.failover != failover)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
}

View File

@ -49,6 +49,9 @@ typedef enum
/*
* Shared-memory state
*
* ControlLock is used to protect access to the other fields, except
* latest_page_number, which uses atomics; see comment in slru.c.
*/
typedef struct SlruSharedData
{
@ -95,7 +98,7 @@ typedef struct SlruSharedData
* this is not critical data, since we use it only to avoid swapping out
* the latest page.
*/
int64 latest_page_number;
pg_atomic_uint64 latest_page_number;
/* SLRU's index for statistics purposes (might not be unique) */
int slru_stats_idx;

View File

@ -53,27 +53,40 @@
* fasthash as implemented here has two interfaces:
*
* 1) Standalone functions, e.g. fasthash32() for a single value with a
* known length.
* known length. These return the same hash code as the original, at
* least on little-endian machines.
*
* 2) Incremental interface. This can used for incorporating multiple
* inputs. The standalone functions use this internally, so see fasthash64()
* for an an example of how this works.
*
* The incremental interface is especially useful if any of the inputs
* are NUL-terminated C strings, since the length is not needed ahead
* of time. This avoids needing to call strlen(). This case is optimized
* in fasthash_accum_cstring() :
* inputs. First, initialize the hash state (here with a zero seed):
*
* fasthash_state hs;
* fasthash_init(&hs, FH_UNKNOWN_LENGTH, 0);
* len = fasthash_accum_cstring(&hs, *str);
* ...
* return fasthash_final32(&hs, len);
* fasthash_init(&hs, 0);
*
* Here we pass FH_UNKNOWN_LENGTH as a convention, since passing zero
* would zero out the internal seed as well. fasthash_accum_cstring()
* returns the length of the string, which is computed on-the-fly while
* mixing the string into the hash. Experimentation has found that
* If the inputs are of types that can be trivially cast to uint64, it's
* sufficient to do:
*
* hs.accum = value1;
* fasthash_combine(&hs);
* hs.accum = value2;
* fasthash_combine(&hs);
* ...
*
* For longer or variable-length input, fasthash_accum() is a more
* flexible, but more verbose method. The standalone functions use this
* internally, so see fasthash64() for an an example of this.
*
* After all inputs have been mixed in, finalize the hash:
*
* hashcode = fasthash_final32(&hs, 0);
*
* The incremental interface allows an optimization for NUL-terminated
* C strings:
*
* len = fasthash_accum_cstring(&hs, str);
* hashcode = fasthash_final32(&hs, len);
*
* By handling the terminator on-the-fly, we can avoid needing a strlen()
* call to tell us how many bytes to hash. Experimentation has found that
* SMHasher fails unless we incorporate the length, so it is passed to
* the finalizer as a tweak.
*/
@ -89,20 +102,17 @@ typedef struct fasthash_state
#define FH_SIZEOF_ACCUM sizeof(uint64)
#define FH_UNKNOWN_LENGTH 1
/*
* Initialize the hash state.
*
* 'len' is the length of the input, if known ahead of time.
* If that is not known, pass FH_UNKNOWN_LENGTH.
* 'seed' can be zero.
*/
static inline void
fasthash_init(fasthash_state *hs, int len, uint64 seed)
fasthash_init(fasthash_state *hs, uint64 seed)
{
memset(hs, 0, sizeof(fasthash_state));
hs->hash = seed ^ (len * 0x880355f21e6d1965);
hs->hash = seed ^ 0x880355f21e6d1965;
}
/* both the finalizer and part of the combining step */
@ -210,26 +220,33 @@ fasthash_accum_cstring_aligned(fasthash_state *hs, const char *str)
{
const char *const start = str;
int remainder;
uint64 zero_bytes_le;
uint64 zero_byte_low;
Assert(PointerIsAligned(start, uint64));
/*
* For every chunk of input, check for zero bytes before mixing into the
* hash. The chunk with zeros must contain the NUL terminator. We arrange
* so that zero_byte_low tells us not only that a zero exists, but also
* where it is, so we can hash the remainder of the string.
*
* The haszero64 calculation will set bits corresponding to the lowest
* byte where a zero exists, so that suffices for little-endian machines.
* For big-endian machines, we would need bits set for the highest zero
* byte in the chunk, since the trailing junk past the terminator could
* contain additional zeros. haszero64 does not give us that, so we
* byteswap the chunk first.
*/
for (;;)
{
uint64 chunk = *(uint64 *) str;
/*
* With little-endian representation, we can use this calculation,
* which sets bits in the first byte in the result word that
* corresponds to a zero byte in the original word. The rest of the
* bytes are indeterminate, so cannot be used on big-endian machines
* without either swapping or a bytewise check.
*/
#ifdef WORDS_BIGENDIAN
zero_bytes_le = haszero64(pg_bswap64(chunk));
zero_byte_low = haszero64(pg_bswap64(chunk));
#else
zero_bytes_le = haszero64(chunk);
zero_byte_low = haszero64(chunk);
#endif
if (zero_bytes_le)
if (zero_byte_low)
break;
hs->accum = chunk;
@ -238,12 +255,11 @@ fasthash_accum_cstring_aligned(fasthash_state *hs, const char *str)
}
/*
* For the last word, only use bytes up to the NUL for the hash. Bytes
* with set bits will be 0x80, so calculate the first occurrence of a zero
* byte within the input word by counting the number of trailing (because
* little-endian) zeros and dividing the result by 8.
* The byte corresponding to the NUL will be 0x80, so the rightmost bit
* position will be in the range 7, 15, ..., 63. Turn this into byte
* position by dividing by 8.
*/
remainder = pg_rightmost_one_pos64(zero_bytes_le) / BITS_PER_BYTE;
remainder = pg_rightmost_one_pos64(zero_byte_low) / BITS_PER_BYTE;
fasthash_accum(hs, str, remainder);
str += remainder;
@ -328,7 +344,10 @@ fasthash64(const char *k, int len, uint64 seed)
{
fasthash_state hs;
fasthash_init(&hs, len, seed);
fasthash_init(&hs, 0);
/* re-initialize the seed according to input length */
hs.hash = seed ^ (len * 0x880355f21e6d1965);
while (len >= FH_SIZEOF_ACCUM)
{