mirror of
https://github.com/postgres/postgres.git
synced 2025-08-10 00:03:33 -04:00
Compare commits
7 Commits
1f61680327
...
22f7e61a63
Author | SHA1 | Date | |
---|---|---|---|
|
22f7e61a63 | ||
|
b9d6038d70 | ||
|
1aa8324b81 | ||
|
e4b27b5355 | ||
|
d172b717c6 | ||
|
b83033c3cf | ||
|
9ed3ee5001 |
@ -117,9 +117,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
|
|||||||
command should connect to the publisher at all. The default
|
command should connect to the publisher at all. The default
|
||||||
is <literal>true</literal>. Setting this to
|
is <literal>true</literal>. Setting this to
|
||||||
<literal>false</literal> will force the values of
|
<literal>false</literal> will force the values of
|
||||||
<literal>create_slot</literal>, <literal>enabled</literal>,
|
<literal>create_slot</literal>, <literal>enabled</literal> and
|
||||||
<literal>copy_data</literal>, and <literal>failover</literal>
|
<literal>copy_data</literal> to <literal>false</literal>.
|
||||||
to <literal>false</literal>.
|
|
||||||
(You cannot combine setting <literal>connect</literal>
|
(You cannot combine setting <literal>connect</literal>
|
||||||
to <literal>false</literal> with
|
to <literal>false</literal> with
|
||||||
setting <literal>create_slot</literal>, <literal>enabled</literal>,
|
setting <literal>create_slot</literal>, <literal>enabled</literal>,
|
||||||
|
@ -1591,9 +1591,7 @@ CREATE DATABASE foo WITH TEMPLATE template0;
|
|||||||
information might have to be changed. If the subscription needs to
|
information might have to be changed. If the subscription needs to
|
||||||
be enabled for
|
be enabled for
|
||||||
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
|
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
|
||||||
then same needs to be done by executing
|
execute <link linkend="sql-altersubscription-params-set"><literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
|
||||||
<link linkend="sql-altersubscription-params-set">
|
|
||||||
<literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
|
|
||||||
after the slot has been created. It might also be appropriate to
|
after the slot has been created. It might also be appropriate to
|
||||||
truncate the target tables before initiating a new full table copy. If users
|
truncate the target tables before initiating a new full table copy. If users
|
||||||
intend to copy initial data during refresh they must create the slot with
|
intend to copy initial data during refresh they must create the slot with
|
||||||
|
@ -766,14 +766,10 @@ StartupCLOG(void)
|
|||||||
TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid);
|
TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid);
|
||||||
int64 pageno = TransactionIdToPage(xid);
|
int64 pageno = TransactionIdToPage(xid);
|
||||||
|
|
||||||
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize our idea of the latest page number.
|
* Initialize our idea of the latest page number.
|
||||||
*/
|
*/
|
||||||
XactCtl->shared->latest_page_number = pageno;
|
pg_atomic_write_u64(&XactCtl->shared->latest_page_number, pageno);
|
||||||
|
|
||||||
LWLockRelease(XactSLRULock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -689,9 +689,7 @@ ActivateCommitTs(void)
|
|||||||
/*
|
/*
|
||||||
* Re-Initialize our idea of the latest page number.
|
* Re-Initialize our idea of the latest page number.
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
|
pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
|
||||||
CommitTsCtl->shared->latest_page_number = pageno;
|
|
||||||
LWLockRelease(CommitTsSLRULock);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If CommitTs is enabled, but it wasn't in the previous server run, we
|
* If CommitTs is enabled, but it wasn't in the previous server run, we
|
||||||
@ -1006,7 +1004,8 @@ commit_ts_redo(XLogReaderState *record)
|
|||||||
* During XLOG replay, latest_page_number isn't set up yet; insert a
|
* During XLOG replay, latest_page_number isn't set up yet; insert a
|
||||||
* suitable value to bypass the sanity test in SimpleLruTruncate.
|
* suitable value to bypass the sanity test in SimpleLruTruncate.
|
||||||
*/
|
*/
|
||||||
CommitTsCtl->shared->latest_page_number = trunc->pageno;
|
pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
|
||||||
|
trunc->pageno);
|
||||||
|
|
||||||
SimpleLruTruncate(CommitTsCtl, trunc->pageno);
|
SimpleLruTruncate(CommitTsCtl, trunc->pageno);
|
||||||
}
|
}
|
||||||
|
@ -2017,13 +2017,15 @@ StartupMultiXact(void)
|
|||||||
* Initialize offset's idea of the latest page number.
|
* Initialize offset's idea of the latest page number.
|
||||||
*/
|
*/
|
||||||
pageno = MultiXactIdToOffsetPage(multi);
|
pageno = MultiXactIdToOffsetPage(multi);
|
||||||
MultiXactOffsetCtl->shared->latest_page_number = pageno;
|
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
|
||||||
|
pageno);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize member's idea of the latest page number.
|
* Initialize member's idea of the latest page number.
|
||||||
*/
|
*/
|
||||||
pageno = MXOffsetToMemberPage(offset);
|
pageno = MXOffsetToMemberPage(offset);
|
||||||
MultiXactMemberCtl->shared->latest_page_number = pageno;
|
pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
|
||||||
|
pageno);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -2047,14 +2049,15 @@ TrimMultiXact(void)
|
|||||||
oldestMXactDB = MultiXactState->oldestMultiXactDB;
|
oldestMXactDB = MultiXactState->oldestMultiXactDB;
|
||||||
LWLockRelease(MultiXactGenLock);
|
LWLockRelease(MultiXactGenLock);
|
||||||
|
|
||||||
/* Clean up offsets state */
|
|
||||||
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (Re-)Initialize our idea of the latest page number for offsets.
|
* (Re-)Initialize our idea of the latest page number for offsets.
|
||||||
*/
|
*/
|
||||||
pageno = MultiXactIdToOffsetPage(nextMXact);
|
pageno = MultiXactIdToOffsetPage(nextMXact);
|
||||||
MultiXactOffsetCtl->shared->latest_page_number = pageno;
|
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
|
||||||
|
pageno);
|
||||||
|
|
||||||
|
/* Clean up offsets state */
|
||||||
|
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Zero out the remainder of the current offsets page. See notes in
|
* Zero out the remainder of the current offsets page. See notes in
|
||||||
@ -2081,14 +2084,16 @@ TrimMultiXact(void)
|
|||||||
|
|
||||||
LWLockRelease(MultiXactOffsetSLRULock);
|
LWLockRelease(MultiXactOffsetSLRULock);
|
||||||
|
|
||||||
/* And the same for members */
|
|
||||||
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
* And the same for members.
|
||||||
|
*
|
||||||
* (Re-)Initialize our idea of the latest page number for members.
|
* (Re-)Initialize our idea of the latest page number for members.
|
||||||
*/
|
*/
|
||||||
pageno = MXOffsetToMemberPage(offset);
|
pageno = MXOffsetToMemberPage(offset);
|
||||||
MultiXactMemberCtl->shared->latest_page_number = pageno;
|
pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
|
||||||
|
pageno);
|
||||||
|
|
||||||
|
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Zero out the remainder of the current members page. See notes in
|
* Zero out the remainder of the current members page. See notes in
|
||||||
@ -3333,7 +3338,8 @@ multixact_redo(XLogReaderState *record)
|
|||||||
* SimpleLruTruncate.
|
* SimpleLruTruncate.
|
||||||
*/
|
*/
|
||||||
pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff);
|
pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff);
|
||||||
MultiXactOffsetCtl->shared->latest_page_number = pageno;
|
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
|
||||||
|
pageno);
|
||||||
PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff);
|
PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff);
|
||||||
|
|
||||||
LWLockRelease(MultiXactTruncationLock);
|
LWLockRelease(MultiXactTruncationLock);
|
||||||
|
@ -17,7 +17,8 @@
|
|||||||
* per-buffer LWLocks that synchronize I/O for each buffer. The control lock
|
* per-buffer LWLocks that synchronize I/O for each buffer. The control lock
|
||||||
* must be held to examine or modify any shared state. A process that is
|
* must be held to examine or modify any shared state. A process that is
|
||||||
* reading in or writing out a page buffer does not hold the control lock,
|
* reading in or writing out a page buffer does not hold the control lock,
|
||||||
* only the per-buffer lock for the buffer it is working on.
|
* only the per-buffer lock for the buffer it is working on. One exception
|
||||||
|
* is latest_page_number, which is read and written using atomic ops.
|
||||||
*
|
*
|
||||||
* "Holding the control lock" means exclusive lock in all cases except for
|
* "Holding the control lock" means exclusive lock in all cases except for
|
||||||
* SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
|
* SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
|
||||||
@ -239,8 +240,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
|
|||||||
shared->lsn_groups_per_page = nlsns;
|
shared->lsn_groups_per_page = nlsns;
|
||||||
|
|
||||||
shared->cur_lru_count = 0;
|
shared->cur_lru_count = 0;
|
||||||
|
pg_atomic_init_u64(&shared->latest_page_number, 0);
|
||||||
/* shared->latest_page_number will be set later */
|
|
||||||
|
|
||||||
shared->slru_stats_idx = pgstat_get_slru_index(name);
|
shared->slru_stats_idx = pgstat_get_slru_index(name);
|
||||||
|
|
||||||
@ -329,8 +329,15 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
|
|||||||
/* Set the LSNs for this new page to zero */
|
/* Set the LSNs for this new page to zero */
|
||||||
SimpleLruZeroLSNs(ctl, slotno);
|
SimpleLruZeroLSNs(ctl, slotno);
|
||||||
|
|
||||||
/* Assume this page is now the latest active page */
|
/*
|
||||||
shared->latest_page_number = pageno;
|
* Assume this page is now the latest active page.
|
||||||
|
*
|
||||||
|
* Note that because both this routine and SlruSelectLRUPage run with
|
||||||
|
* ControlLock held, it is not possible for this to be zeroing a page that
|
||||||
|
* SlruSelectLRUPage is going to evict simultaneously. Therefore, there's
|
||||||
|
* no memory barrier here.
|
||||||
|
*/
|
||||||
|
pg_atomic_write_u64(&shared->latest_page_number, pageno);
|
||||||
|
|
||||||
/* update the stats counter of zeroed pages */
|
/* update the stats counter of zeroed pages */
|
||||||
pgstat_count_slru_page_zeroed(shared->slru_stats_idx);
|
pgstat_count_slru_page_zeroed(shared->slru_stats_idx);
|
||||||
@ -1113,9 +1120,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
|
|||||||
shared->page_lru_count[slotno] = cur_count;
|
shared->page_lru_count[slotno] = cur_count;
|
||||||
this_delta = 0;
|
this_delta = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If this page is the one most recently zeroed, don't consider it
|
||||||
|
* an eviction candidate. See comments in SimpleLruZeroPage for an
|
||||||
|
* explanation about the lack of a memory barrier here.
|
||||||
|
*/
|
||||||
this_page_number = shared->page_number[slotno];
|
this_page_number = shared->page_number[slotno];
|
||||||
if (this_page_number == shared->latest_page_number)
|
if (this_page_number ==
|
||||||
|
pg_atomic_read_u64(&shared->latest_page_number))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (shared->page_status[slotno] == SLRU_PAGE_VALID)
|
if (shared->page_status[slotno] == SLRU_PAGE_VALID)
|
||||||
{
|
{
|
||||||
if (this_delta > best_valid_delta ||
|
if (this_delta > best_valid_delta ||
|
||||||
@ -1254,7 +1269,6 @@ void
|
|||||||
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
|
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
|
||||||
{
|
{
|
||||||
SlruShared shared = ctl->shared;
|
SlruShared shared = ctl->shared;
|
||||||
int slotno;
|
|
||||||
|
|
||||||
/* update the stats counter of truncates */
|
/* update the stats counter of truncates */
|
||||||
pgstat_count_slru_truncate(shared->slru_stats_idx);
|
pgstat_count_slru_truncate(shared->slru_stats_idx);
|
||||||
@ -1270,10 +1284,13 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
|
|||||||
restart:
|
restart:
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* While we are holding the lock, make an important safety check: the
|
* An important safety check: the current endpoint page must not be
|
||||||
* current endpoint page must not be eligible for removal.
|
* eligible for removal. This check is just a backstop against wraparound
|
||||||
|
* bugs elsewhere in SLRU handling, so we don't care if we read a slightly
|
||||||
|
* outdated value; therefore we don't add a memory barrier.
|
||||||
*/
|
*/
|
||||||
if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
|
if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number),
|
||||||
|
cutoffPage))
|
||||||
{
|
{
|
||||||
LWLockRelease(shared->ControlLock);
|
LWLockRelease(shared->ControlLock);
|
||||||
ereport(LOG,
|
ereport(LOG,
|
||||||
@ -1282,7 +1299,7 @@ restart:
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (slotno = 0; slotno < shared->num_slots; slotno++)
|
for (int slotno = 0; slotno < shared->num_slots; slotno++)
|
||||||
{
|
{
|
||||||
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
|
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
|
||||||
continue;
|
continue;
|
||||||
|
@ -256,7 +256,7 @@ spcachekey_hash(SearchPathCacheKey key)
|
|||||||
fasthash_state hs;
|
fasthash_state hs;
|
||||||
int sp_len;
|
int sp_len;
|
||||||
|
|
||||||
fasthash_init(&hs, FH_UNKNOWN_LENGTH, 0);
|
fasthash_init(&hs, 0);
|
||||||
|
|
||||||
hs.accum = key.roleid;
|
hs.accum = key.roleid;
|
||||||
fasthash_combine(&hs);
|
fasthash_combine(&hs);
|
||||||
|
@ -54,14 +54,6 @@ typedef enum CopyDest
|
|||||||
COPY_CALLBACK, /* to callback function */
|
COPY_CALLBACK, /* to callback function */
|
||||||
} CopyDest;
|
} CopyDest;
|
||||||
|
|
||||||
/*
|
|
||||||
* Per-format callback to send output representation of one attribute for
|
|
||||||
* a `string`. `use_quote` tracks if quotes are required in the output
|
|
||||||
* representation.
|
|
||||||
*/
|
|
||||||
typedef void (*CopyAttributeOut) (CopyToState cstate, const char *string,
|
|
||||||
bool use_quote);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This struct contains all the state variables used throughout a COPY TO
|
* This struct contains all the state variables used throughout a COPY TO
|
||||||
* operation.
|
* operation.
|
||||||
@ -105,7 +97,6 @@ typedef struct CopyToStateData
|
|||||||
MemoryContext copycontext; /* per-copy execution context */
|
MemoryContext copycontext; /* per-copy execution context */
|
||||||
|
|
||||||
FmgrInfo *out_functions; /* lookup info for output functions */
|
FmgrInfo *out_functions; /* lookup info for output functions */
|
||||||
CopyAttributeOut copy_attribute_out; /* output representation callback */
|
|
||||||
MemoryContext rowcontext; /* per-row evaluation context */
|
MemoryContext rowcontext; /* per-row evaluation context */
|
||||||
uint64 bytes_processed; /* number of bytes processed so far */
|
uint64 bytes_processed; /* number of bytes processed so far */
|
||||||
} CopyToStateData;
|
} CopyToStateData;
|
||||||
@ -126,10 +117,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
|||||||
static void EndCopy(CopyToState cstate);
|
static void EndCopy(CopyToState cstate);
|
||||||
static void ClosePipeToProgram(CopyToState cstate);
|
static void ClosePipeToProgram(CopyToState cstate);
|
||||||
static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
|
static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
|
||||||
|
static void CopyAttributeOutText(CopyToState cstate, const char *string);
|
||||||
/* Callbacks for copy_attribute_out */
|
|
||||||
static void CopyAttributeOutText(CopyToState cstate, const char *string,
|
|
||||||
bool use_quote);
|
|
||||||
static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
|
static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
|
||||||
bool use_quote);
|
bool use_quote);
|
||||||
|
|
||||||
@ -445,15 +433,6 @@ BeginCopyTo(ParseState *pstate,
|
|||||||
/* Extract options from the statement node tree */
|
/* Extract options from the statement node tree */
|
||||||
ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
|
ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
|
||||||
|
|
||||||
/* Set output representation callback */
|
|
||||||
if (!cstate->opts.binary)
|
|
||||||
{
|
|
||||||
if (cstate->opts.csv_mode)
|
|
||||||
cstate->copy_attribute_out = CopyAttributeOutCSV;
|
|
||||||
else
|
|
||||||
cstate->copy_attribute_out = CopyAttributeOutText;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Process the source/target relation or query */
|
/* Process the source/target relation or query */
|
||||||
if (rel)
|
if (rel)
|
||||||
{
|
{
|
||||||
@ -857,8 +836,10 @@ DoCopyTo(CopyToState cstate)
|
|||||||
|
|
||||||
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
|
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
|
||||||
|
|
||||||
/* Ignore quotes */
|
if (cstate->opts.csv_mode)
|
||||||
cstate->copy_attribute_out(cstate, colname, false);
|
CopyAttributeOutCSV(cstate, colname, false);
|
||||||
|
else
|
||||||
|
CopyAttributeOutText(cstate, colname);
|
||||||
}
|
}
|
||||||
|
|
||||||
CopySendEndOfRow(cstate);
|
CopySendEndOfRow(cstate);
|
||||||
@ -968,9 +949,11 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
|
|||||||
{
|
{
|
||||||
string = OutputFunctionCall(&out_functions[attnum - 1],
|
string = OutputFunctionCall(&out_functions[attnum - 1],
|
||||||
value);
|
value);
|
||||||
|
if (cstate->opts.csv_mode)
|
||||||
cstate->copy_attribute_out(cstate, string,
|
CopyAttributeOutCSV(cstate, string,
|
||||||
cstate->opts.force_quote_flags[attnum - 1]);
|
cstate->opts.force_quote_flags[attnum - 1]);
|
||||||
|
else
|
||||||
|
CopyAttributeOutText(cstate, string);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1000,8 +983,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
|
|||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
static void
|
static void
|
||||||
CopyAttributeOutText(CopyToState cstate, const char *string,
|
CopyAttributeOutText(CopyToState cstate, const char *string)
|
||||||
bool use_quote)
|
|
||||||
{
|
{
|
||||||
const char *ptr;
|
const char *ptr;
|
||||||
const char *start;
|
const char *start;
|
||||||
|
@ -73,7 +73,6 @@
|
|||||||
#define SUBOPT_LSN 0x00004000
|
#define SUBOPT_LSN 0x00004000
|
||||||
#define SUBOPT_ORIGIN 0x00008000
|
#define SUBOPT_ORIGIN 0x00008000
|
||||||
|
|
||||||
|
|
||||||
/* check if the 'val' has 'bits' set */
|
/* check if the 'val' has 'bits' set */
|
||||||
#define IsSet(val, bits) (((val) & (bits)) == (bits))
|
#define IsSet(val, bits) (((val) & (bits)) == (bits))
|
||||||
|
|
||||||
@ -852,9 +851,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||||||
(opts.failover || walrcv_server_version(wrconn) >= 170000))
|
(opts.failover || walrcv_server_version(wrconn) >= 170000))
|
||||||
{
|
{
|
||||||
walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
|
walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
|
||||||
ereport(NOTICE,
|
|
||||||
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
|
|
||||||
opts.slot_name, opts.failover ? "true" : "false")));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PG_FINALLY();
|
PG_FINALLY();
|
||||||
@ -1547,10 +1543,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
|||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
|
walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
|
||||||
|
|
||||||
ereport(NOTICE,
|
|
||||||
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
|
|
||||||
sub->slotname, opts.failover ? "true" : "false")));
|
|
||||||
}
|
}
|
||||||
PG_FINALLY();
|
PG_FINALLY();
|
||||||
{
|
{
|
||||||
|
@ -696,12 +696,16 @@ ReplicationSlotAlter(const char *name, bool failover)
|
|||||||
errmsg("cannot use %s with a physical replication slot",
|
errmsg("cannot use %s with a physical replication slot",
|
||||||
"ALTER_REPLICATION_SLOT"));
|
"ALTER_REPLICATION_SLOT"));
|
||||||
|
|
||||||
SpinLockAcquire(&MyReplicationSlot->mutex);
|
if (MyReplicationSlot->data.failover != failover)
|
||||||
MyReplicationSlot->data.failover = failover;
|
{
|
||||||
SpinLockRelease(&MyReplicationSlot->mutex);
|
SpinLockAcquire(&MyReplicationSlot->mutex);
|
||||||
|
MyReplicationSlot->data.failover = failover;
|
||||||
|
SpinLockRelease(&MyReplicationSlot->mutex);
|
||||||
|
|
||||||
|
ReplicationSlotMarkDirty();
|
||||||
|
ReplicationSlotSave();
|
||||||
|
}
|
||||||
|
|
||||||
ReplicationSlotMarkDirty();
|
|
||||||
ReplicationSlotSave();
|
|
||||||
ReplicationSlotRelease();
|
ReplicationSlotRelease();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +49,9 @@ typedef enum
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Shared-memory state
|
* Shared-memory state
|
||||||
|
*
|
||||||
|
* ControlLock is used to protect access to the other fields, except
|
||||||
|
* latest_page_number, which uses atomics; see comment in slru.c.
|
||||||
*/
|
*/
|
||||||
typedef struct SlruSharedData
|
typedef struct SlruSharedData
|
||||||
{
|
{
|
||||||
@ -95,7 +98,7 @@ typedef struct SlruSharedData
|
|||||||
* this is not critical data, since we use it only to avoid swapping out
|
* this is not critical data, since we use it only to avoid swapping out
|
||||||
* the latest page.
|
* the latest page.
|
||||||
*/
|
*/
|
||||||
int64 latest_page_number;
|
pg_atomic_uint64 latest_page_number;
|
||||||
|
|
||||||
/* SLRU's index for statistics purposes (might not be unique) */
|
/* SLRU's index for statistics purposes (might not be unique) */
|
||||||
int slru_stats_idx;
|
int slru_stats_idx;
|
||||||
|
@ -53,27 +53,40 @@
|
|||||||
* fasthash as implemented here has two interfaces:
|
* fasthash as implemented here has two interfaces:
|
||||||
*
|
*
|
||||||
* 1) Standalone functions, e.g. fasthash32() for a single value with a
|
* 1) Standalone functions, e.g. fasthash32() for a single value with a
|
||||||
* known length.
|
* known length. These return the same hash code as the original, at
|
||||||
|
* least on little-endian machines.
|
||||||
*
|
*
|
||||||
* 2) Incremental interface. This can used for incorporating multiple
|
* 2) Incremental interface. This can used for incorporating multiple
|
||||||
* inputs. The standalone functions use this internally, so see fasthash64()
|
* inputs. First, initialize the hash state (here with a zero seed):
|
||||||
* for an an example of how this works.
|
|
||||||
*
|
|
||||||
* The incremental interface is especially useful if any of the inputs
|
|
||||||
* are NUL-terminated C strings, since the length is not needed ahead
|
|
||||||
* of time. This avoids needing to call strlen(). This case is optimized
|
|
||||||
* in fasthash_accum_cstring() :
|
|
||||||
*
|
*
|
||||||
* fasthash_state hs;
|
* fasthash_state hs;
|
||||||
* fasthash_init(&hs, FH_UNKNOWN_LENGTH, 0);
|
* fasthash_init(&hs, 0);
|
||||||
* len = fasthash_accum_cstring(&hs, *str);
|
|
||||||
* ...
|
|
||||||
* return fasthash_final32(&hs, len);
|
|
||||||
*
|
*
|
||||||
* Here we pass FH_UNKNOWN_LENGTH as a convention, since passing zero
|
* If the inputs are of types that can be trivially cast to uint64, it's
|
||||||
* would zero out the internal seed as well. fasthash_accum_cstring()
|
* sufficient to do:
|
||||||
* returns the length of the string, which is computed on-the-fly while
|
*
|
||||||
* mixing the string into the hash. Experimentation has found that
|
* hs.accum = value1;
|
||||||
|
* fasthash_combine(&hs);
|
||||||
|
* hs.accum = value2;
|
||||||
|
* fasthash_combine(&hs);
|
||||||
|
* ...
|
||||||
|
*
|
||||||
|
* For longer or variable-length input, fasthash_accum() is a more
|
||||||
|
* flexible, but more verbose method. The standalone functions use this
|
||||||
|
* internally, so see fasthash64() for an an example of this.
|
||||||
|
*
|
||||||
|
* After all inputs have been mixed in, finalize the hash:
|
||||||
|
*
|
||||||
|
* hashcode = fasthash_final32(&hs, 0);
|
||||||
|
*
|
||||||
|
* The incremental interface allows an optimization for NUL-terminated
|
||||||
|
* C strings:
|
||||||
|
*
|
||||||
|
* len = fasthash_accum_cstring(&hs, str);
|
||||||
|
* hashcode = fasthash_final32(&hs, len);
|
||||||
|
*
|
||||||
|
* By handling the terminator on-the-fly, we can avoid needing a strlen()
|
||||||
|
* call to tell us how many bytes to hash. Experimentation has found that
|
||||||
* SMHasher fails unless we incorporate the length, so it is passed to
|
* SMHasher fails unless we incorporate the length, so it is passed to
|
||||||
* the finalizer as a tweak.
|
* the finalizer as a tweak.
|
||||||
*/
|
*/
|
||||||
@ -89,20 +102,17 @@ typedef struct fasthash_state
|
|||||||
|
|
||||||
#define FH_SIZEOF_ACCUM sizeof(uint64)
|
#define FH_SIZEOF_ACCUM sizeof(uint64)
|
||||||
|
|
||||||
#define FH_UNKNOWN_LENGTH 1
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize the hash state.
|
* Initialize the hash state.
|
||||||
*
|
*
|
||||||
* 'len' is the length of the input, if known ahead of time.
|
|
||||||
* If that is not known, pass FH_UNKNOWN_LENGTH.
|
|
||||||
* 'seed' can be zero.
|
* 'seed' can be zero.
|
||||||
*/
|
*/
|
||||||
static inline void
|
static inline void
|
||||||
fasthash_init(fasthash_state *hs, int len, uint64 seed)
|
fasthash_init(fasthash_state *hs, uint64 seed)
|
||||||
{
|
{
|
||||||
memset(hs, 0, sizeof(fasthash_state));
|
memset(hs, 0, sizeof(fasthash_state));
|
||||||
hs->hash = seed ^ (len * 0x880355f21e6d1965);
|
hs->hash = seed ^ 0x880355f21e6d1965;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* both the finalizer and part of the combining step */
|
/* both the finalizer and part of the combining step */
|
||||||
@ -210,26 +220,33 @@ fasthash_accum_cstring_aligned(fasthash_state *hs, const char *str)
|
|||||||
{
|
{
|
||||||
const char *const start = str;
|
const char *const start = str;
|
||||||
int remainder;
|
int remainder;
|
||||||
uint64 zero_bytes_le;
|
uint64 zero_byte_low;
|
||||||
|
|
||||||
Assert(PointerIsAligned(start, uint64));
|
Assert(PointerIsAligned(start, uint64));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For every chunk of input, check for zero bytes before mixing into the
|
||||||
|
* hash. The chunk with zeros must contain the NUL terminator. We arrange
|
||||||
|
* so that zero_byte_low tells us not only that a zero exists, but also
|
||||||
|
* where it is, so we can hash the remainder of the string.
|
||||||
|
*
|
||||||
|
* The haszero64 calculation will set bits corresponding to the lowest
|
||||||
|
* byte where a zero exists, so that suffices for little-endian machines.
|
||||||
|
* For big-endian machines, we would need bits set for the highest zero
|
||||||
|
* byte in the chunk, since the trailing junk past the terminator could
|
||||||
|
* contain additional zeros. haszero64 does not give us that, so we
|
||||||
|
* byteswap the chunk first.
|
||||||
|
*/
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
uint64 chunk = *(uint64 *) str;
|
uint64 chunk = *(uint64 *) str;
|
||||||
|
|
||||||
/*
|
|
||||||
* With little-endian representation, we can use this calculation,
|
|
||||||
* which sets bits in the first byte in the result word that
|
|
||||||
* corresponds to a zero byte in the original word. The rest of the
|
|
||||||
* bytes are indeterminate, so cannot be used on big-endian machines
|
|
||||||
* without either swapping or a bytewise check.
|
|
||||||
*/
|
|
||||||
#ifdef WORDS_BIGENDIAN
|
#ifdef WORDS_BIGENDIAN
|
||||||
zero_bytes_le = haszero64(pg_bswap64(chunk));
|
zero_byte_low = haszero64(pg_bswap64(chunk));
|
||||||
#else
|
#else
|
||||||
zero_bytes_le = haszero64(chunk);
|
zero_byte_low = haszero64(chunk);
|
||||||
#endif
|
#endif
|
||||||
if (zero_bytes_le)
|
if (zero_byte_low)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
hs->accum = chunk;
|
hs->accum = chunk;
|
||||||
@ -238,12 +255,11 @@ fasthash_accum_cstring_aligned(fasthash_state *hs, const char *str)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For the last word, only use bytes up to the NUL for the hash. Bytes
|
* The byte corresponding to the NUL will be 0x80, so the rightmost bit
|
||||||
* with set bits will be 0x80, so calculate the first occurrence of a zero
|
* position will be in the range 7, 15, ..., 63. Turn this into byte
|
||||||
* byte within the input word by counting the number of trailing (because
|
* position by dividing by 8.
|
||||||
* little-endian) zeros and dividing the result by 8.
|
|
||||||
*/
|
*/
|
||||||
remainder = pg_rightmost_one_pos64(zero_bytes_le) / BITS_PER_BYTE;
|
remainder = pg_rightmost_one_pos64(zero_byte_low) / BITS_PER_BYTE;
|
||||||
fasthash_accum(hs, str, remainder);
|
fasthash_accum(hs, str, remainder);
|
||||||
str += remainder;
|
str += remainder;
|
||||||
|
|
||||||
@ -328,7 +344,10 @@ fasthash64(const char *k, int len, uint64 seed)
|
|||||||
{
|
{
|
||||||
fasthash_state hs;
|
fasthash_state hs;
|
||||||
|
|
||||||
fasthash_init(&hs, len, seed);
|
fasthash_init(&hs, 0);
|
||||||
|
|
||||||
|
/* re-initialize the seed according to input length */
|
||||||
|
hs.hash = seed ^ (len * 0x880355f21e6d1965);
|
||||||
|
|
||||||
while (len >= FH_SIZEOF_ACCUM)
|
while (len >= FH_SIZEOF_ACCUM)
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user