Compare commits

...

7 Commits

Author SHA1 Message Date
Amit Kapila
22f7e61a63 Clean-ups for 776621a5e4 and 7329240437.
Following are a few clean-ups related to failover option support in slots:
1. Improve the documentation in create_subscription.sgml.
2. Remove the spurious blank line in subscriptioncmds.c.
3. Remove the NOTICE for alter_replication_slot in subscriptioncmds.c as
we would sometimes print it even when nothing has changed. One can find
the change by enabling log_replication_commands on the publisher.
4. Optimize ReplicationSlotAlter() function to prevent disk flushing when
the slot's data remains unchanged.

Author: Hou Zhijie
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
Discussion: https://postgr.es/m/OS0PR01MB57164904651FB588A518E98894472@OS0PR01MB5716.jpnprd01.prod.outlook.com
2024-02-07 10:04:04 +05:30
Michael Paquier
b9d6038d70 Simplify signature of CopyAttributeOutCSV() in copyto.c
This has come up in 2889fd23be56, reverted later on, and is still useful
on its own to reduce a bit the differences between the code paths
dedicated to CSV and text.

Discussion: https://postgr.es/m/ZcCKwAeFrlOqPBuN@paquier.xyz
2024-02-07 12:28:55 +09:00
Michael Paquier
1aa8324b81 Revert "Refactor CopyAttributeOut{CSV,Text}() to use a callback in COPY TO"
This reverts commit 2889fd23be56, following a discussion with Andres
Freund as this callback, being called once per attribute when sending a
relation's row, can involve a lot of indirect function calls (more
attributes to deal with means more impact).  The effects of a dispatch
at this level would become more visible when improving the per-row code
execution of COPY TO, impacting future potential performance
improvements.

Discussion: https://postgr.es/m/20240206014125.qofww7ew3dx3v3uk@awork3.anarazel.de
2024-02-07 08:04:26 +09:00
Alvaro Herrera
e4b27b5355
Change initial use of pg_atomic_write_u64 to init
This only matters when using atomics emulation with semaphores.

Per buildfarm member rorqual.
2024-02-06 12:08:39 +01:00
Alvaro Herrera
d172b717c6
Use atomic access for SlruShared->latest_page_number
The new concurrency model proposed for slru.c to improve performance
does not include any single lock that would coordinate processes
doing concurrent reads/writes on SlruShared->latest_page_number.
We can instead use atomic reads and writes for that variable.

Author: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Andrey M. Borodin <x4mmm@yandex-team.ru>
Discussion: https://postgr.es/m/CAFiTN-vzDvNz=ExGXz6gdyjtzGixKSqs0mKHMmaQ8sOSEFZ33A@mail.gmail.com
2024-02-06 10:54:10 +01:00
John Naylor
b83033c3cf Further cosmetic review of hashfn_unstable.h
In follow-up to e97b672c8,
* Flesh out comments explaining the incremental interface
* Clarify detection of zero bytes when hashing aligned C strings

The latter was suggested and reviewed by Jeff Davis

Discussion: https://postgr.es/m/48e8f8bbe0be9c789f98776c7438244ab7a7cc63.camel%40j-davis.com
2024-02-06 14:49:06 +07:00
John Naylor
9ed3ee5001 Simplify initialization of incremental hash state
The standalone functions fasthash{32,64} use length for two purposes:
how many bytes to hash, and how to perturb the internal seed.

Developers using the incremental interface may not know the length
ahead of time (e.g. for C strings). In this case, it's advised to
pass length to the finalizer, but initialization still needed some
length up front, in the form of a placeholder macro.

Separate the concerns by having the standalone functions perturb the
internal seed themselves from their own length parameter, allowing
to remove "len" from fasthash_init(), as well as the placeholder macro.

Discussion: https://postgr.es/m/CANWCAZbTUk2LOyhsFo33gjLyLAHZ7ucXCi5K9u%3D%2BPtnTShDKtw%40mail.gmail.com
2024-02-06 14:39:36 +07:00
12 changed files with 134 additions and 119 deletions

View File

@ -117,9 +117,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
command should connect to the publisher at all. The default command should connect to the publisher at all. The default
is <literal>true</literal>. Setting this to is <literal>true</literal>. Setting this to
<literal>false</literal> will force the values of <literal>false</literal> will force the values of
<literal>create_slot</literal>, <literal>enabled</literal>, <literal>create_slot</literal>, <literal>enabled</literal> and
<literal>copy_data</literal>, and <literal>failover</literal> <literal>copy_data</literal> to <literal>false</literal>.
to <literal>false</literal>.
(You cannot combine setting <literal>connect</literal> (You cannot combine setting <literal>connect</literal>
to <literal>false</literal> with to <literal>false</literal> with
setting <literal>create_slot</literal>, <literal>enabled</literal>, setting <literal>create_slot</literal>, <literal>enabled</literal>,

View File

@ -1591,9 +1591,7 @@ CREATE DATABASE foo WITH TEMPLATE template0;
information might have to be changed. If the subscription needs to information might have to be changed. If the subscription needs to
be enabled for be enabled for
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
then same needs to be done by executing execute <link linkend="sql-altersubscription-params-set"><literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
<link linkend="sql-altersubscription-params-set">
<literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
after the slot has been created. It might also be appropriate to after the slot has been created. It might also be appropriate to
truncate the target tables before initiating a new full table copy. If users truncate the target tables before initiating a new full table copy. If users
intend to copy initial data during refresh they must create the slot with intend to copy initial data during refresh they must create the slot with

View File

@ -766,14 +766,10 @@ StartupCLOG(void)
TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid); TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid);
int64 pageno = TransactionIdToPage(xid); int64 pageno = TransactionIdToPage(xid);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/* /*
* Initialize our idea of the latest page number. * Initialize our idea of the latest page number.
*/ */
XactCtl->shared->latest_page_number = pageno; pg_atomic_write_u64(&XactCtl->shared->latest_page_number, pageno);
LWLockRelease(XactSLRULock);
} }
/* /*

View File

@ -689,9 +689,7 @@ ActivateCommitTs(void)
/* /*
* Re-Initialize our idea of the latest page number. * Re-Initialize our idea of the latest page number.
*/ */
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
CommitTsCtl->shared->latest_page_number = pageno;
LWLockRelease(CommitTsSLRULock);
/* /*
* If CommitTs is enabled, but it wasn't in the previous server run, we * If CommitTs is enabled, but it wasn't in the previous server run, we
@ -1006,7 +1004,8 @@ commit_ts_redo(XLogReaderState *record)
* During XLOG replay, latest_page_number isn't set up yet; insert a * During XLOG replay, latest_page_number isn't set up yet; insert a
* suitable value to bypass the sanity test in SimpleLruTruncate. * suitable value to bypass the sanity test in SimpleLruTruncate.
*/ */
CommitTsCtl->shared->latest_page_number = trunc->pageno; pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
trunc->pageno);
SimpleLruTruncate(CommitTsCtl, trunc->pageno); SimpleLruTruncate(CommitTsCtl, trunc->pageno);
} }

View File

@ -2017,13 +2017,15 @@ StartupMultiXact(void)
* Initialize offset's idea of the latest page number. * Initialize offset's idea of the latest page number.
*/ */
pageno = MultiXactIdToOffsetPage(multi); pageno = MultiXactIdToOffsetPage(multi);
MultiXactOffsetCtl->shared->latest_page_number = pageno; pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
/* /*
* Initialize member's idea of the latest page number. * Initialize member's idea of the latest page number.
*/ */
pageno = MXOffsetToMemberPage(offset); pageno = MXOffsetToMemberPage(offset);
MultiXactMemberCtl->shared->latest_page_number = pageno; pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
pageno);
} }
/* /*
@ -2047,14 +2049,15 @@ TrimMultiXact(void)
oldestMXactDB = MultiXactState->oldestMultiXactDB; oldestMXactDB = MultiXactState->oldestMultiXactDB;
LWLockRelease(MultiXactGenLock); LWLockRelease(MultiXactGenLock);
/* Clean up offsets state */
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/* /*
* (Re-)Initialize our idea of the latest page number for offsets. * (Re-)Initialize our idea of the latest page number for offsets.
*/ */
pageno = MultiXactIdToOffsetPage(nextMXact); pageno = MultiXactIdToOffsetPage(nextMXact);
MultiXactOffsetCtl->shared->latest_page_number = pageno; pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
/* Clean up offsets state */
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/* /*
* Zero out the remainder of the current offsets page. See notes in * Zero out the remainder of the current offsets page. See notes in
@ -2081,14 +2084,16 @@ TrimMultiXact(void)
LWLockRelease(MultiXactOffsetSLRULock); LWLockRelease(MultiXactOffsetSLRULock);
/* And the same for members */
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/* /*
* And the same for members.
*
* (Re-)Initialize our idea of the latest page number for members. * (Re-)Initialize our idea of the latest page number for members.
*/ */
pageno = MXOffsetToMemberPage(offset); pageno = MXOffsetToMemberPage(offset);
MultiXactMemberCtl->shared->latest_page_number = pageno; pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
pageno);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/* /*
* Zero out the remainder of the current members page. See notes in * Zero out the remainder of the current members page. See notes in
@ -3333,7 +3338,8 @@ multixact_redo(XLogReaderState *record)
* SimpleLruTruncate. * SimpleLruTruncate.
*/ */
pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff); pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff);
MultiXactOffsetCtl->shared->latest_page_number = pageno; pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff); PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff);
LWLockRelease(MultiXactTruncationLock); LWLockRelease(MultiXactTruncationLock);

View File

@ -17,7 +17,8 @@
* per-buffer LWLocks that synchronize I/O for each buffer. The control lock * per-buffer LWLocks that synchronize I/O for each buffer. The control lock
* must be held to examine or modify any shared state. A process that is * must be held to examine or modify any shared state. A process that is
* reading in or writing out a page buffer does not hold the control lock, * reading in or writing out a page buffer does not hold the control lock,
* only the per-buffer lock for the buffer it is working on. * only the per-buffer lock for the buffer it is working on. One exception
* is latest_page_number, which is read and written using atomic ops.
* *
* "Holding the control lock" means exclusive lock in all cases except for * "Holding the control lock" means exclusive lock in all cases except for
* SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
@ -239,8 +240,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
shared->lsn_groups_per_page = nlsns; shared->lsn_groups_per_page = nlsns;
shared->cur_lru_count = 0; shared->cur_lru_count = 0;
pg_atomic_init_u64(&shared->latest_page_number, 0);
/* shared->latest_page_number will be set later */
shared->slru_stats_idx = pgstat_get_slru_index(name); shared->slru_stats_idx = pgstat_get_slru_index(name);
@ -329,8 +329,15 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
/* Set the LSNs for this new page to zero */ /* Set the LSNs for this new page to zero */
SimpleLruZeroLSNs(ctl, slotno); SimpleLruZeroLSNs(ctl, slotno);
/* Assume this page is now the latest active page */ /*
shared->latest_page_number = pageno; * Assume this page is now the latest active page.
*
* Note that because both this routine and SlruSelectLRUPage run with
* ControlLock held, it is not possible for this to be zeroing a page that
* SlruSelectLRUPage is going to evict simultaneously. Therefore, there's
* no memory barrier here.
*/
pg_atomic_write_u64(&shared->latest_page_number, pageno);
/* update the stats counter of zeroed pages */ /* update the stats counter of zeroed pages */
pgstat_count_slru_page_zeroed(shared->slru_stats_idx); pgstat_count_slru_page_zeroed(shared->slru_stats_idx);
@ -1113,9 +1120,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
shared->page_lru_count[slotno] = cur_count; shared->page_lru_count[slotno] = cur_count;
this_delta = 0; this_delta = 0;
} }
/*
* If this page is the one most recently zeroed, don't consider it
* an eviction candidate. See comments in SimpleLruZeroPage for an
* explanation about the lack of a memory barrier here.
*/
this_page_number = shared->page_number[slotno]; this_page_number = shared->page_number[slotno];
if (this_page_number == shared->latest_page_number) if (this_page_number ==
pg_atomic_read_u64(&shared->latest_page_number))
continue; continue;
if (shared->page_status[slotno] == SLRU_PAGE_VALID) if (shared->page_status[slotno] == SLRU_PAGE_VALID)
{ {
if (this_delta > best_valid_delta || if (this_delta > best_valid_delta ||
@ -1254,7 +1269,6 @@ void
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage) SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int slotno;
/* update the stats counter of truncates */ /* update the stats counter of truncates */
pgstat_count_slru_truncate(shared->slru_stats_idx); pgstat_count_slru_truncate(shared->slru_stats_idx);
@ -1270,10 +1284,13 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
restart: restart:
/* /*
* While we are holding the lock, make an important safety check: the * An important safety check: the current endpoint page must not be
* current endpoint page must not be eligible for removal. * eligible for removal. This check is just a backstop against wraparound
* bugs elsewhere in SLRU handling, so we don't care if we read a slightly
* outdated value; therefore we don't add a memory barrier.
*/ */
if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage)) if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number),
cutoffPage))
{ {
LWLockRelease(shared->ControlLock); LWLockRelease(shared->ControlLock);
ereport(LOG, ereport(LOG,
@ -1282,7 +1299,7 @@ restart:
return; return;
} }
for (slotno = 0; slotno < shared->num_slots; slotno++) for (int slotno = 0; slotno < shared->num_slots; slotno++)
{ {
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue; continue;

View File

@ -256,7 +256,7 @@ spcachekey_hash(SearchPathCacheKey key)
fasthash_state hs; fasthash_state hs;
int sp_len; int sp_len;
fasthash_init(&hs, FH_UNKNOWN_LENGTH, 0); fasthash_init(&hs, 0);
hs.accum = key.roleid; hs.accum = key.roleid;
fasthash_combine(&hs); fasthash_combine(&hs);

View File

@ -54,14 +54,6 @@ typedef enum CopyDest
COPY_CALLBACK, /* to callback function */ COPY_CALLBACK, /* to callback function */
} CopyDest; } CopyDest;
/*
* Per-format callback to send output representation of one attribute for
* a `string`. `use_quote` tracks if quotes are required in the output
* representation.
*/
typedef void (*CopyAttributeOut) (CopyToState cstate, const char *string,
bool use_quote);
/* /*
* This struct contains all the state variables used throughout a COPY TO * This struct contains all the state variables used throughout a COPY TO
* operation. * operation.
@ -105,7 +97,6 @@ typedef struct CopyToStateData
MemoryContext copycontext; /* per-copy execution context */ MemoryContext copycontext; /* per-copy execution context */
FmgrInfo *out_functions; /* lookup info for output functions */ FmgrInfo *out_functions; /* lookup info for output functions */
CopyAttributeOut copy_attribute_out; /* output representation callback */
MemoryContext rowcontext; /* per-row evaluation context */ MemoryContext rowcontext; /* per-row evaluation context */
uint64 bytes_processed; /* number of bytes processed so far */ uint64 bytes_processed; /* number of bytes processed so far */
} CopyToStateData; } CopyToStateData;
@ -126,10 +117,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
static void EndCopy(CopyToState cstate); static void EndCopy(CopyToState cstate);
static void ClosePipeToProgram(CopyToState cstate); static void ClosePipeToProgram(CopyToState cstate);
static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot); static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
static void CopyAttributeOutText(CopyToState cstate, const char *string);
/* Callbacks for copy_attribute_out */
static void CopyAttributeOutText(CopyToState cstate, const char *string,
bool use_quote);
static void CopyAttributeOutCSV(CopyToState cstate, const char *string, static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
bool use_quote); bool use_quote);
@ -445,15 +433,6 @@ BeginCopyTo(ParseState *pstate,
/* Extract options from the statement node tree */ /* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
/* Set output representation callback */
if (!cstate->opts.binary)
{
if (cstate->opts.csv_mode)
cstate->copy_attribute_out = CopyAttributeOutCSV;
else
cstate->copy_attribute_out = CopyAttributeOutText;
}
/* Process the source/target relation or query */ /* Process the source/target relation or query */
if (rel) if (rel)
{ {
@ -857,8 +836,10 @@ DoCopyTo(CopyToState cstate)
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
/* Ignore quotes */ if (cstate->opts.csv_mode)
cstate->copy_attribute_out(cstate, colname, false); CopyAttributeOutCSV(cstate, colname, false);
else
CopyAttributeOutText(cstate, colname);
} }
CopySendEndOfRow(cstate); CopySendEndOfRow(cstate);
@ -968,9 +949,11 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{ {
string = OutputFunctionCall(&out_functions[attnum - 1], string = OutputFunctionCall(&out_functions[attnum - 1],
value); value);
if (cstate->opts.csv_mode)
cstate->copy_attribute_out(cstate, string, CopyAttributeOutCSV(cstate, string,
cstate->opts.force_quote_flags[attnum - 1]); cstate->opts.force_quote_flags[attnum - 1]);
else
CopyAttributeOutText(cstate, string);
} }
else else
{ {
@ -1000,8 +983,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
} while (0) } while (0)
static void static void
CopyAttributeOutText(CopyToState cstate, const char *string, CopyAttributeOutText(CopyToState cstate, const char *string)
bool use_quote)
{ {
const char *ptr; const char *ptr;
const char *start; const char *start;

View File

@ -73,7 +73,6 @@
#define SUBOPT_LSN 0x00004000 #define SUBOPT_LSN 0x00004000
#define SUBOPT_ORIGIN 0x00008000 #define SUBOPT_ORIGIN 0x00008000
/* check if the 'val' has 'bits' set */ /* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits)) #define IsSet(val, bits) (((val) & (bits)) == (bits))
@ -852,9 +851,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
(opts.failover || walrcv_server_version(wrconn) >= 170000)) (opts.failover || walrcv_server_version(wrconn) >= 170000))
{ {
walrcv_alter_slot(wrconn, opts.slot_name, opts.failover); walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
ereport(NOTICE,
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
opts.slot_name, opts.failover ? "true" : "false")));
} }
} }
PG_FINALLY(); PG_FINALLY();
@ -1547,10 +1543,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY(); PG_TRY();
{ {
walrcv_alter_slot(wrconn, sub->slotname, opts.failover); walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
ereport(NOTICE,
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
sub->slotname, opts.failover ? "true" : "false")));
} }
PG_FINALLY(); PG_FINALLY();
{ {

View File

@ -696,12 +696,16 @@ ReplicationSlotAlter(const char *name, bool failover)
errmsg("cannot use %s with a physical replication slot", errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT")); "ALTER_REPLICATION_SLOT"));
SpinLockAcquire(&MyReplicationSlot->mutex); if (MyReplicationSlot->data.failover != failover)
MyReplicationSlot->data.failover = failover; {
SpinLockRelease(&MyReplicationSlot->mutex); SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease(); ReplicationSlotRelease();
} }

View File

@ -49,6 +49,9 @@ typedef enum
/* /*
* Shared-memory state * Shared-memory state
*
* ControlLock is used to protect access to the other fields, except
* latest_page_number, which uses atomics; see comment in slru.c.
*/ */
typedef struct SlruSharedData typedef struct SlruSharedData
{ {
@ -95,7 +98,7 @@ typedef struct SlruSharedData
* this is not critical data, since we use it only to avoid swapping out * this is not critical data, since we use it only to avoid swapping out
* the latest page. * the latest page.
*/ */
int64 latest_page_number; pg_atomic_uint64 latest_page_number;
/* SLRU's index for statistics purposes (might not be unique) */ /* SLRU's index for statistics purposes (might not be unique) */
int slru_stats_idx; int slru_stats_idx;

View File

@ -53,27 +53,40 @@
* fasthash as implemented here has two interfaces: * fasthash as implemented here has two interfaces:
* *
* 1) Standalone functions, e.g. fasthash32() for a single value with a * 1) Standalone functions, e.g. fasthash32() for a single value with a
* known length. * known length. These return the same hash code as the original, at
* least on little-endian machines.
* *
* 2) Incremental interface. This can used for incorporating multiple * 2) Incremental interface. This can used for incorporating multiple
* inputs. The standalone functions use this internally, so see fasthash64() * inputs. First, initialize the hash state (here with a zero seed):
* for an an example of how this works.
*
* The incremental interface is especially useful if any of the inputs
* are NUL-terminated C strings, since the length is not needed ahead
* of time. This avoids needing to call strlen(). This case is optimized
* in fasthash_accum_cstring() :
* *
* fasthash_state hs; * fasthash_state hs;
* fasthash_init(&hs, FH_UNKNOWN_LENGTH, 0); * fasthash_init(&hs, 0);
* len = fasthash_accum_cstring(&hs, *str);
* ...
* return fasthash_final32(&hs, len);
* *
* Here we pass FH_UNKNOWN_LENGTH as a convention, since passing zero * If the inputs are of types that can be trivially cast to uint64, it's
* would zero out the internal seed as well. fasthash_accum_cstring() * sufficient to do:
* returns the length of the string, which is computed on-the-fly while *
* mixing the string into the hash. Experimentation has found that * hs.accum = value1;
* fasthash_combine(&hs);
* hs.accum = value2;
* fasthash_combine(&hs);
* ...
*
* For longer or variable-length input, fasthash_accum() is a more
* flexible, but more verbose method. The standalone functions use this
* internally, so see fasthash64() for an an example of this.
*
* After all inputs have been mixed in, finalize the hash:
*
* hashcode = fasthash_final32(&hs, 0);
*
* The incremental interface allows an optimization for NUL-terminated
* C strings:
*
* len = fasthash_accum_cstring(&hs, str);
* hashcode = fasthash_final32(&hs, len);
*
* By handling the terminator on-the-fly, we can avoid needing a strlen()
* call to tell us how many bytes to hash. Experimentation has found that
* SMHasher fails unless we incorporate the length, so it is passed to * SMHasher fails unless we incorporate the length, so it is passed to
* the finalizer as a tweak. * the finalizer as a tweak.
*/ */
@ -89,20 +102,17 @@ typedef struct fasthash_state
#define FH_SIZEOF_ACCUM sizeof(uint64) #define FH_SIZEOF_ACCUM sizeof(uint64)
#define FH_UNKNOWN_LENGTH 1
/* /*
* Initialize the hash state. * Initialize the hash state.
* *
* 'len' is the length of the input, if known ahead of time.
* If that is not known, pass FH_UNKNOWN_LENGTH.
* 'seed' can be zero. * 'seed' can be zero.
*/ */
static inline void static inline void
fasthash_init(fasthash_state *hs, int len, uint64 seed) fasthash_init(fasthash_state *hs, uint64 seed)
{ {
memset(hs, 0, sizeof(fasthash_state)); memset(hs, 0, sizeof(fasthash_state));
hs->hash = seed ^ (len * 0x880355f21e6d1965); hs->hash = seed ^ 0x880355f21e6d1965;
} }
/* both the finalizer and part of the combining step */ /* both the finalizer and part of the combining step */
@ -210,26 +220,33 @@ fasthash_accum_cstring_aligned(fasthash_state *hs, const char *str)
{ {
const char *const start = str; const char *const start = str;
int remainder; int remainder;
uint64 zero_bytes_le; uint64 zero_byte_low;
Assert(PointerIsAligned(start, uint64)); Assert(PointerIsAligned(start, uint64));
/*
* For every chunk of input, check for zero bytes before mixing into the
* hash. The chunk with zeros must contain the NUL terminator. We arrange
* so that zero_byte_low tells us not only that a zero exists, but also
* where it is, so we can hash the remainder of the string.
*
* The haszero64 calculation will set bits corresponding to the lowest
* byte where a zero exists, so that suffices for little-endian machines.
* For big-endian machines, we would need bits set for the highest zero
* byte in the chunk, since the trailing junk past the terminator could
* contain additional zeros. haszero64 does not give us that, so we
* byteswap the chunk first.
*/
for (;;) for (;;)
{ {
uint64 chunk = *(uint64 *) str; uint64 chunk = *(uint64 *) str;
/*
* With little-endian representation, we can use this calculation,
* which sets bits in the first byte in the result word that
* corresponds to a zero byte in the original word. The rest of the
* bytes are indeterminate, so cannot be used on big-endian machines
* without either swapping or a bytewise check.
*/
#ifdef WORDS_BIGENDIAN #ifdef WORDS_BIGENDIAN
zero_bytes_le = haszero64(pg_bswap64(chunk)); zero_byte_low = haszero64(pg_bswap64(chunk));
#else #else
zero_bytes_le = haszero64(chunk); zero_byte_low = haszero64(chunk);
#endif #endif
if (zero_bytes_le) if (zero_byte_low)
break; break;
hs->accum = chunk; hs->accum = chunk;
@ -238,12 +255,11 @@ fasthash_accum_cstring_aligned(fasthash_state *hs, const char *str)
} }
/* /*
* For the last word, only use bytes up to the NUL for the hash. Bytes * The byte corresponding to the NUL will be 0x80, so the rightmost bit
* with set bits will be 0x80, so calculate the first occurrence of a zero * position will be in the range 7, 15, ..., 63. Turn this into byte
* byte within the input word by counting the number of trailing (because * position by dividing by 8.
* little-endian) zeros and dividing the result by 8.
*/ */
remainder = pg_rightmost_one_pos64(zero_bytes_le) / BITS_PER_BYTE; remainder = pg_rightmost_one_pos64(zero_byte_low) / BITS_PER_BYTE;
fasthash_accum(hs, str, remainder); fasthash_accum(hs, str, remainder);
str += remainder; str += remainder;
@ -328,7 +344,10 @@ fasthash64(const char *k, int len, uint64 seed)
{ {
fasthash_state hs; fasthash_state hs;
fasthash_init(&hs, len, seed); fasthash_init(&hs, 0);
/* re-initialize the seed according to input length */
hs.hash = seed ^ (len * 0x880355f21e6d1965);
while (len >= FH_SIZEOF_ACCUM) while (len >= FH_SIZEOF_ACCUM)
{ {