mirror of
https://github.com/postgres/postgres.git
synced 2025-06-04 00:02:37 -04:00
Commit b5a9b18c introduced block streaming infrastructure with a special fast path for all-cached scans, and commit b7b0f3f2 connected the infrastructure up to sequential scans. One of the fast path micro-optimizations had an unintended consequence: it interfered with parallel sequential scan's block range allocator (from commit 56788d21), which has its own ramp-up and ramp-down algorithm when handing out groups of pages to workers. A scan of an all-cached table could give extra blocks to one worker, when others had finished. In some plans (probably already very bad plans, such as the one reported by Alexander), the unfairness could be magnified. An internal buffer of 16 block numbers is removed, keeping just a single block buffer for technical reasons. Back-patch to 17. Reported-by: Alexander Lakhin <exclusion@gmail.com> Discussion: https://postgr.es/m/63a63690-dd92-c809-0b47-af05459e95d1%40gmail.com
772 lines
25 KiB
C
772 lines
25 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* read_stream.c
|
|
* Mechanism for accessing buffered relation data with look-ahead
|
|
*
|
|
* Code that needs to access relation data typically pins blocks one at a
|
|
* time, often in a predictable order that might be sequential or data-driven.
|
|
* Calling the simple ReadBuffer() function for each block is inefficient,
|
|
* because blocks that are not yet in the buffer pool require I/O operations
|
|
* that are small and might stall waiting for storage. This mechanism looks
|
|
* into the future and calls StartReadBuffers() and WaitReadBuffers() to read
|
|
* neighboring blocks together and ahead of time, with an adaptive look-ahead
|
|
* distance.
|
|
*
|
|
* A user-provided callback generates a stream of block numbers that is used
|
|
* to form reads of up to io_combine_limit, by attempting to merge them with a
|
|
* pending read. When that isn't possible, the existing pending read is sent
|
|
* to StartReadBuffers() so that a new one can begin to form.
|
|
*
|
|
* The algorithm for controlling the look-ahead distance tries to classify the
|
|
* stream into three ideal behaviors:
|
|
*
|
|
* A) No I/O is necessary, because the requested blocks are fully cached
|
|
* already. There is no benefit to looking ahead more than one block, so
|
|
* distance is 1. This is the default initial assumption.
|
|
*
|
|
* B) I/O is necessary, but fadvise is undesirable because the access is
|
|
* sequential, or impossible because direct I/O is enabled or the system
|
|
* doesn't support fadvise. There is no benefit in looking ahead more than
|
|
* io_combine_limit, because in this case the only goal is larger read system
|
|
* calls. Looking further ahead would pin many buffers and perform
|
|
* speculative work looking ahead for no benefit.
|
|
*
|
|
* C) I/O is necessary, it appears random, and this system supports fadvise.
|
|
* We'll look further ahead in order to reach the configured level of I/O
|
|
* concurrency.
|
|
*
|
|
* The distance increases rapidly and decays slowly, so that it moves towards
|
|
* those levels as different I/O patterns are discovered. For example, a
|
|
* sequential scan of fully cached data doesn't bother looking ahead, but a
|
|
* sequential scan that hits a region of uncached blocks will start issuing
|
|
* increasingly wide read calls until it plateaus at io_combine_limit.
|
|
*
|
|
* The main data structure is a circular queue of buffers of size
|
|
* max_pinned_buffers plus some extra space for technical reasons, ready to be
|
|
* returned by read_stream_next_buffer(). Each buffer also has an optional
|
|
* variable sized object that is passed from the callback to the consumer of
|
|
* buffers.
|
|
*
|
|
* Parallel to the queue of buffers, there is a circular queue of in-progress
|
|
* I/Os that have been started with StartReadBuffers(), and for which
|
|
* WaitReadBuffers() must be called before returning the buffer.
|
|
*
|
|
* For example, if the callback return block numbers 10, 42, 43, 60 in
|
|
* successive calls, then these data structures might appear as follows:
|
|
*
|
|
* buffers buf/data ios
|
|
*
|
|
* +----+ +-----+ +--------+
|
|
* | | | | +----+ 42..44 | <- oldest_io_index
|
|
* +----+ +-----+ | +--------+
|
|
* oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
|
|
* +----+ +-----+ | | +--------+
|
|
* | 42 | | ? |<-+ | | | <- next_io_index
|
|
* +----+ +-----+ | +--------+
|
|
* | 43 | | ? | | | |
|
|
* +----+ +-----+ | +--------+
|
|
* | 44 | | ? | | | |
|
|
* +----+ +-----+ | +--------+
|
|
* | 60 | | ? |<---+
|
|
* +----+ +-----+
|
|
* next_buffer_index -> | | | |
|
|
* +----+ +-----+
|
|
*
|
|
* In the example, 5 buffers are pinned, and the next buffer to be streamed to
|
|
* the client is block 10. Block 10 was a hit and has no associated I/O, but
|
|
* the range 42..44 requires an I/O wait before its buffers are returned, as
|
|
* does block 60.
|
|
*
|
|
*
|
|
* Portions Copyright (c) 2024, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/storage/aio/read_stream.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "catalog/pg_tablespace.h"
|
|
#include "miscadmin.h"
|
|
#include "storage/fd.h"
|
|
#include "storage/smgr.h"
|
|
#include "storage/read_stream.h"
|
|
#include "utils/memdebug.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/spccache.h"
|
|
|
|
typedef struct InProgressIO
|
|
{
|
|
int16 buffer_index;
|
|
ReadBuffersOperation op;
|
|
} InProgressIO;
|
|
|
|
/*
|
|
* State for managing a stream of reads.
|
|
*/
|
|
struct ReadStream
|
|
{
|
|
int16 max_ios;
|
|
int16 ios_in_progress;
|
|
int16 queue_size;
|
|
int16 max_pinned_buffers;
|
|
int16 pinned_buffers;
|
|
int16 distance;
|
|
bool advice_enabled;
|
|
|
|
/*
|
|
* One-block buffer to support 'ungetting' a block number, to resolve flow
|
|
* control problems when I/Os are split.
|
|
*/
|
|
BlockNumber buffered_blocknum;
|
|
|
|
/*
|
|
* The callback that will tell us which block numbers to read, and an
|
|
* opaque pointer that will be pass to it for its own purposes.
|
|
*/
|
|
ReadStreamBlockNumberCB callback;
|
|
void *callback_private_data;
|
|
|
|
/* Next expected block, for detecting sequential access. */
|
|
BlockNumber seq_blocknum;
|
|
|
|
/* The read operation we are currently preparing. */
|
|
BlockNumber pending_read_blocknum;
|
|
int16 pending_read_nblocks;
|
|
|
|
/* Space for buffers and optional per-buffer private data. */
|
|
size_t per_buffer_data_size;
|
|
void *per_buffer_data;
|
|
|
|
/* Read operations that have been started but not waited for yet. */
|
|
InProgressIO *ios;
|
|
int16 oldest_io_index;
|
|
int16 next_io_index;
|
|
|
|
bool fast_path;
|
|
|
|
/* Circular queue of buffers. */
|
|
int16 oldest_buffer_index; /* Next pinned buffer to return */
|
|
int16 next_buffer_index; /* Index of next buffer to pin */
|
|
Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
|
|
};
|
|
|
|
/*
|
|
* Return a pointer to the per-buffer data by index.
|
|
*/
|
|
static inline void *
|
|
get_per_buffer_data(ReadStream *stream, int16 buffer_index)
|
|
{
|
|
return (char *) stream->per_buffer_data +
|
|
stream->per_buffer_data_size * buffer_index;
|
|
}
|
|
|
|
/*
|
|
* Ask the callback which block it would like us to read next, with a one block
|
|
* buffer in front to allow read_stream_unget_block() to work.
|
|
*/
|
|
static inline BlockNumber
|
|
read_stream_get_block(ReadStream *stream, void *per_buffer_data)
|
|
{
|
|
BlockNumber blocknum;
|
|
|
|
blocknum = stream->buffered_blocknum;
|
|
if (blocknum != InvalidBlockNumber)
|
|
stream->buffered_blocknum = InvalidBlockNumber;
|
|
else
|
|
blocknum = stream->callback(stream,
|
|
stream->callback_private_data,
|
|
per_buffer_data);
|
|
|
|
return blocknum;
|
|
}
|
|
|
|
/*
|
|
* In order to deal with short reads in StartReadBuffers(), we sometimes need
|
|
* to defer handling of a block until later.
|
|
*/
|
|
static inline void
|
|
read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
|
|
{
|
|
/* We shouldn't ever unget more than one block. */
|
|
Assert(stream->buffered_blocknum == InvalidBlockNumber);
|
|
Assert(blocknum != InvalidBlockNumber);
|
|
stream->buffered_blocknum = blocknum;
|
|
}
|
|
|
|
static void
|
|
read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
|
|
{
|
|
bool need_wait;
|
|
int nblocks;
|
|
int flags;
|
|
int16 io_index;
|
|
int16 overflow;
|
|
int16 buffer_index;
|
|
|
|
/* This should only be called with a pending read. */
|
|
Assert(stream->pending_read_nblocks > 0);
|
|
Assert(stream->pending_read_nblocks <= io_combine_limit);
|
|
|
|
/* We had better not exceed the pin limit by starting this read. */
|
|
Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
|
|
stream->max_pinned_buffers);
|
|
|
|
/* We had better not be overwriting an existing pinned buffer. */
|
|
if (stream->pinned_buffers > 0)
|
|
Assert(stream->next_buffer_index != stream->oldest_buffer_index);
|
|
else
|
|
Assert(stream->next_buffer_index == stream->oldest_buffer_index);
|
|
|
|
/*
|
|
* If advice hasn't been suppressed, this system supports it, and this
|
|
* isn't a strictly sequential pattern, then we'll issue advice.
|
|
*/
|
|
if (!suppress_advice &&
|
|
stream->advice_enabled &&
|
|
stream->pending_read_blocknum != stream->seq_blocknum)
|
|
flags = READ_BUFFERS_ISSUE_ADVICE;
|
|
else
|
|
flags = 0;
|
|
|
|
/* We say how many blocks we want to read, but may be smaller on return. */
|
|
buffer_index = stream->next_buffer_index;
|
|
io_index = stream->next_io_index;
|
|
nblocks = stream->pending_read_nblocks;
|
|
need_wait = StartReadBuffers(&stream->ios[io_index].op,
|
|
&stream->buffers[buffer_index],
|
|
stream->pending_read_blocknum,
|
|
&nblocks,
|
|
flags);
|
|
stream->pinned_buffers += nblocks;
|
|
|
|
/* Remember whether we need to wait before returning this buffer. */
|
|
if (!need_wait)
|
|
{
|
|
/* Look-ahead distance decays, no I/O necessary (behavior A). */
|
|
if (stream->distance > 1)
|
|
stream->distance--;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Remember to call WaitReadBuffers() before returning head buffer.
|
|
* Look-ahead distance will be adjusted after waiting.
|
|
*/
|
|
stream->ios[io_index].buffer_index = buffer_index;
|
|
if (++stream->next_io_index == stream->max_ios)
|
|
stream->next_io_index = 0;
|
|
Assert(stream->ios_in_progress < stream->max_ios);
|
|
stream->ios_in_progress++;
|
|
stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
|
|
}
|
|
|
|
/*
|
|
* We gave a contiguous range of buffer space to StartReadBuffers(), but
|
|
* we want it to wrap around at queue_size. Slide overflowing buffers to
|
|
* the front of the array.
|
|
*/
|
|
overflow = (buffer_index + nblocks) - stream->queue_size;
|
|
if (overflow > 0)
|
|
memmove(&stream->buffers[0],
|
|
&stream->buffers[stream->queue_size],
|
|
sizeof(stream->buffers[0]) * overflow);
|
|
|
|
/* Compute location of start of next read, without using % operator. */
|
|
buffer_index += nblocks;
|
|
if (buffer_index >= stream->queue_size)
|
|
buffer_index -= stream->queue_size;
|
|
Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
|
|
stream->next_buffer_index = buffer_index;
|
|
|
|
/* Adjust the pending read to cover the remaining portion, if any. */
|
|
stream->pending_read_blocknum += nblocks;
|
|
stream->pending_read_nblocks -= nblocks;
|
|
}
|
|
|
|
static void
|
|
read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
|
|
{
|
|
while (stream->ios_in_progress < stream->max_ios &&
|
|
stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
|
|
{
|
|
BlockNumber blocknum;
|
|
int16 buffer_index;
|
|
void *per_buffer_data;
|
|
|
|
if (stream->pending_read_nblocks == io_combine_limit)
|
|
{
|
|
read_stream_start_pending_read(stream, suppress_advice);
|
|
suppress_advice = false;
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* See which block the callback wants next in the stream. We need to
|
|
* compute the index of the Nth block of the pending read including
|
|
* wrap-around, but we don't want to use the expensive % operator.
|
|
*/
|
|
buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
|
|
if (buffer_index >= stream->queue_size)
|
|
buffer_index -= stream->queue_size;
|
|
Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
|
|
per_buffer_data = get_per_buffer_data(stream, buffer_index);
|
|
blocknum = read_stream_get_block(stream, per_buffer_data);
|
|
if (blocknum == InvalidBlockNumber)
|
|
{
|
|
/* End of stream. */
|
|
stream->distance = 0;
|
|
break;
|
|
}
|
|
|
|
/* Can we merge it with the pending read? */
|
|
if (stream->pending_read_nblocks > 0 &&
|
|
stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
|
|
{
|
|
stream->pending_read_nblocks++;
|
|
continue;
|
|
}
|
|
|
|
/* We have to start the pending read before we can build another. */
|
|
while (stream->pending_read_nblocks > 0)
|
|
{
|
|
read_stream_start_pending_read(stream, suppress_advice);
|
|
suppress_advice = false;
|
|
if (stream->ios_in_progress == stream->max_ios)
|
|
{
|
|
/* And we've hit the limit. Rewind, and stop here. */
|
|
read_stream_unget_block(stream, blocknum);
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* This is the start of a new pending read. */
|
|
stream->pending_read_blocknum = blocknum;
|
|
stream->pending_read_nblocks = 1;
|
|
}
|
|
|
|
/*
|
|
* We don't start the pending read just because we've hit the distance
|
|
* limit, preferring to give it another chance to grow to full
|
|
* io_combine_limit size once more buffers have been consumed. However,
|
|
* if we've already reached io_combine_limit, or we've reached the
|
|
* distance limit and there isn't anything pinned yet, or the callback has
|
|
* signaled end-of-stream, we start the read immediately.
|
|
*/
|
|
if (stream->pending_read_nblocks > 0 &&
|
|
(stream->pending_read_nblocks == io_combine_limit ||
|
|
(stream->pending_read_nblocks == stream->distance &&
|
|
stream->pinned_buffers == 0) ||
|
|
stream->distance == 0) &&
|
|
stream->ios_in_progress < stream->max_ios)
|
|
read_stream_start_pending_read(stream, suppress_advice);
|
|
}
|
|
|
|
/*
|
|
* Create a new read stream object that can be used to perform the equivalent
|
|
* of a series of ReadBuffer() calls for one fork of one relation.
|
|
* Internally, it generates larger vectored reads where possible by looking
|
|
* ahead. The callback should return block numbers or InvalidBlockNumber to
|
|
* signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
|
|
* write extra data for each block into the space provided to it. It will
|
|
* also receive callback_private_data for its own purposes.
|
|
*/
|
|
ReadStream *
|
|
read_stream_begin_relation(int flags,
|
|
BufferAccessStrategy strategy,
|
|
Relation rel,
|
|
ForkNumber forknum,
|
|
ReadStreamBlockNumberCB callback,
|
|
void *callback_private_data,
|
|
size_t per_buffer_data_size)
|
|
{
|
|
ReadStream *stream;
|
|
size_t size;
|
|
int16 queue_size;
|
|
int max_ios;
|
|
int strategy_pin_limit;
|
|
uint32 max_pinned_buffers;
|
|
Oid tablespace_id;
|
|
SMgrRelation smgr;
|
|
|
|
smgr = RelationGetSmgr(rel);
|
|
|
|
/*
|
|
* Decide how many I/Os we will allow to run at the same time. That
|
|
* currently means advice to the kernel to tell it that we will soon read.
|
|
* This number also affects how far we look ahead for opportunities to
|
|
* start more I/Os.
|
|
*/
|
|
tablespace_id = smgr->smgr_rlocator.locator.spcOid;
|
|
if (!OidIsValid(MyDatabaseId) ||
|
|
IsCatalogRelation(rel) ||
|
|
IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
|
|
{
|
|
/*
|
|
* Avoid circularity while trying to look up tablespace settings or
|
|
* before spccache.c is ready.
|
|
*/
|
|
max_ios = effective_io_concurrency;
|
|
}
|
|
else if (flags & READ_STREAM_MAINTENANCE)
|
|
max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
|
|
else
|
|
max_ios = get_tablespace_io_concurrency(tablespace_id);
|
|
|
|
/* Cap to INT16_MAX to avoid overflowing below */
|
|
max_ios = Min(max_ios, PG_INT16_MAX);
|
|
|
|
/*
|
|
* Choose the maximum number of buffers we're prepared to pin. We try to
|
|
* pin fewer if we can, though. We clamp it to at least io_combine_limit
|
|
* so that we can have a chance to build up a full io_combine_limit sized
|
|
* read, even when max_ios is zero. Be careful not to allow int16 to
|
|
* overflow (even though that's not possible with the current GUC range
|
|
* limits), allowing also for the spare entry and the overflow space.
|
|
*/
|
|
max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
|
|
max_pinned_buffers = Min(max_pinned_buffers,
|
|
PG_INT16_MAX - io_combine_limit - 1);
|
|
|
|
/* Give the strategy a chance to limit the number of buffers we pin. */
|
|
strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
|
|
max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
|
|
|
|
/* Don't allow this backend to pin more than its share of buffers. */
|
|
if (SmgrIsTemp(smgr))
|
|
LimitAdditionalLocalPins(&max_pinned_buffers);
|
|
else
|
|
LimitAdditionalPins(&max_pinned_buffers);
|
|
Assert(max_pinned_buffers > 0);
|
|
|
|
/*
|
|
* We need one extra entry for buffers and per-buffer data, because users
|
|
* of per-buffer data have access to the object until the next call to
|
|
* read_stream_next_buffer(), so we need a gap between the head and tail
|
|
* of the queue so that we don't clobber it.
|
|
*/
|
|
queue_size = max_pinned_buffers + 1;
|
|
|
|
/*
|
|
* Allocate the object, the buffers, the ios and per_data_data space in
|
|
* one big chunk. Though we have queue_size buffers, we want to be able
|
|
* to assume that all the buffers for a single read are contiguous (i.e.
|
|
* don't wrap around halfway through), so we allow temporary overflows of
|
|
* up to the maximum possible read size by allocating an extra
|
|
* io_combine_limit - 1 elements.
|
|
*/
|
|
size = offsetof(ReadStream, buffers);
|
|
size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
|
|
size += sizeof(InProgressIO) * Max(1, max_ios);
|
|
size += per_buffer_data_size * queue_size;
|
|
size += MAXIMUM_ALIGNOF * 2;
|
|
stream = (ReadStream *) palloc(size);
|
|
memset(stream, 0, offsetof(ReadStream, buffers));
|
|
stream->ios = (InProgressIO *)
|
|
MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
|
|
if (per_buffer_data_size > 0)
|
|
stream->per_buffer_data = (void *)
|
|
MAXALIGN(&stream->ios[Max(1, max_ios)]);
|
|
|
|
#ifdef USE_PREFETCH
|
|
|
|
/*
|
|
* This system supports prefetching advice. We can use it as long as
|
|
* direct I/O isn't enabled, the caller hasn't promised sequential access
|
|
* (overriding our detection heuristics), and max_ios hasn't been set to
|
|
* zero.
|
|
*/
|
|
if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
|
|
(flags & READ_STREAM_SEQUENTIAL) == 0 &&
|
|
max_ios > 0)
|
|
stream->advice_enabled = true;
|
|
#endif
|
|
|
|
/*
|
|
* For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
|
|
* above. If we had real asynchronous I/O we might need a slightly
|
|
* different definition.
|
|
*/
|
|
if (max_ios == 0)
|
|
max_ios = 1;
|
|
|
|
stream->max_ios = max_ios;
|
|
stream->per_buffer_data_size = per_buffer_data_size;
|
|
stream->max_pinned_buffers = max_pinned_buffers;
|
|
stream->queue_size = queue_size;
|
|
stream->callback = callback;
|
|
stream->callback_private_data = callback_private_data;
|
|
stream->buffered_blocknum = InvalidBlockNumber;
|
|
|
|
/*
|
|
* Skip the initial ramp-up phase if the caller says we're going to be
|
|
* reading the whole relation. This way we start out assuming we'll be
|
|
* doing full io_combine_limit sized reads (behavior B).
|
|
*/
|
|
if (flags & READ_STREAM_FULL)
|
|
stream->distance = Min(max_pinned_buffers, io_combine_limit);
|
|
else
|
|
stream->distance = 1;
|
|
|
|
/*
|
|
* Since we always access the same relation, we can initialize parts of
|
|
* the ReadBuffersOperation objects and leave them that way, to avoid
|
|
* wasting CPU cycles writing to them for each read.
|
|
*/
|
|
for (int i = 0; i < max_ios; ++i)
|
|
{
|
|
stream->ios[i].op.rel = rel;
|
|
stream->ios[i].op.smgr = RelationGetSmgr(rel);
|
|
stream->ios[i].op.smgr_persistence = 0;
|
|
stream->ios[i].op.forknum = forknum;
|
|
stream->ios[i].op.strategy = strategy;
|
|
}
|
|
|
|
return stream;
|
|
}
|
|
|
|
/*
|
|
* Pull one pinned buffer out of a stream. Each call returns successive
|
|
* blocks in the order specified by the callback. If per_buffer_data_size was
|
|
* set to a non-zero size, *per_buffer_data receives a pointer to the extra
|
|
* per-buffer data that the callback had a chance to populate, which remains
|
|
* valid until the next call to read_stream_next_buffer(). When the stream
|
|
* runs out of data, InvalidBuffer is returned. The caller may decide to end
|
|
* the stream early at any time by calling read_stream_end().
|
|
*/
|
|
Buffer
|
|
read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|
{
|
|
Buffer buffer;
|
|
int16 oldest_buffer_index;
|
|
|
|
#ifndef READ_STREAM_DISABLE_FAST_PATH
|
|
|
|
/*
|
|
* A fast path for all-cached scans (behavior A). This is the same as the
|
|
* usual algorithm, but it is specialized for no I/O and no per-buffer
|
|
* data, so we can skip the queue management code, stay in the same buffer
|
|
* slot and use singular StartReadBuffer().
|
|
*/
|
|
if (likely(stream->fast_path))
|
|
{
|
|
BlockNumber next_blocknum;
|
|
|
|
/* Fast path assumptions. */
|
|
Assert(stream->ios_in_progress == 0);
|
|
Assert(stream->pinned_buffers == 1);
|
|
Assert(stream->distance == 1);
|
|
Assert(stream->pending_read_nblocks == 0);
|
|
Assert(stream->per_buffer_data_size == 0);
|
|
|
|
/* We're going to return the buffer we pinned last time. */
|
|
oldest_buffer_index = stream->oldest_buffer_index;
|
|
Assert((oldest_buffer_index + 1) % stream->queue_size ==
|
|
stream->next_buffer_index);
|
|
buffer = stream->buffers[oldest_buffer_index];
|
|
Assert(buffer != InvalidBuffer);
|
|
|
|
/* Choose the next block to pin. */
|
|
next_blocknum = read_stream_get_block(stream, NULL);
|
|
|
|
if (likely(next_blocknum != InvalidBlockNumber))
|
|
{
|
|
/*
|
|
* Pin a buffer for the next call. Same buffer entry, and
|
|
* arbitrary I/O entry (they're all free). We don't have to
|
|
* adjust pinned_buffers because we're transferring one to caller
|
|
* but pinning one more.
|
|
*/
|
|
if (likely(!StartReadBuffer(&stream->ios[0].op,
|
|
&stream->buffers[oldest_buffer_index],
|
|
next_blocknum,
|
|
stream->advice_enabled ?
|
|
READ_BUFFERS_ISSUE_ADVICE : 0)))
|
|
{
|
|
/* Fast return. */
|
|
return buffer;
|
|
}
|
|
|
|
/* Next call must wait for I/O for the newly pinned buffer. */
|
|
stream->oldest_io_index = 0;
|
|
stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
|
|
stream->ios_in_progress = 1;
|
|
stream->ios[0].buffer_index = oldest_buffer_index;
|
|
stream->seq_blocknum = next_blocknum + 1;
|
|
}
|
|
else
|
|
{
|
|
/* No more blocks, end of stream. */
|
|
stream->distance = 0;
|
|
stream->oldest_buffer_index = stream->next_buffer_index;
|
|
stream->pinned_buffers = 0;
|
|
}
|
|
|
|
stream->fast_path = false;
|
|
return buffer;
|
|
}
|
|
#endif
|
|
|
|
if (unlikely(stream->pinned_buffers == 0))
|
|
{
|
|
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
|
|
|
|
/* End of stream reached? */
|
|
if (stream->distance == 0)
|
|
return InvalidBuffer;
|
|
|
|
/*
|
|
* The usual order of operations is that we look ahead at the bottom
|
|
* of this function after potentially finishing an I/O and making
|
|
* space for more, but if we're just starting up we'll need to crank
|
|
* the handle to get started.
|
|
*/
|
|
read_stream_look_ahead(stream, true);
|
|
|
|
/* End of stream reached? */
|
|
if (stream->pinned_buffers == 0)
|
|
{
|
|
Assert(stream->distance == 0);
|
|
return InvalidBuffer;
|
|
}
|
|
}
|
|
|
|
/* Grab the oldest pinned buffer and associated per-buffer data. */
|
|
Assert(stream->pinned_buffers > 0);
|
|
oldest_buffer_index = stream->oldest_buffer_index;
|
|
Assert(oldest_buffer_index >= 0 &&
|
|
oldest_buffer_index < stream->queue_size);
|
|
buffer = stream->buffers[oldest_buffer_index];
|
|
if (per_buffer_data)
|
|
*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
|
|
|
|
Assert(BufferIsValid(buffer));
|
|
|
|
/* Do we have to wait for an associated I/O first? */
|
|
if (stream->ios_in_progress > 0 &&
|
|
stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
|
|
{
|
|
int16 io_index = stream->oldest_io_index;
|
|
int16 distance;
|
|
|
|
/* Sanity check that we still agree on the buffers. */
|
|
Assert(stream->ios[io_index].op.buffers ==
|
|
&stream->buffers[oldest_buffer_index]);
|
|
|
|
WaitReadBuffers(&stream->ios[io_index].op);
|
|
|
|
Assert(stream->ios_in_progress > 0);
|
|
stream->ios_in_progress--;
|
|
if (++stream->oldest_io_index == stream->max_ios)
|
|
stream->oldest_io_index = 0;
|
|
|
|
if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
|
|
{
|
|
/* Distance ramps up fast (behavior C). */
|
|
distance = stream->distance * 2;
|
|
distance = Min(distance, stream->max_pinned_buffers);
|
|
stream->distance = distance;
|
|
}
|
|
else
|
|
{
|
|
/* No advice; move towards io_combine_limit (behavior B). */
|
|
if (stream->distance > io_combine_limit)
|
|
{
|
|
stream->distance--;
|
|
}
|
|
else
|
|
{
|
|
distance = stream->distance * 2;
|
|
distance = Min(distance, io_combine_limit);
|
|
distance = Min(distance, stream->max_pinned_buffers);
|
|
stream->distance = distance;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef CLOBBER_FREED_MEMORY
|
|
/* Clobber old buffer and per-buffer data for debugging purposes. */
|
|
stream->buffers[oldest_buffer_index] = InvalidBuffer;
|
|
|
|
/*
|
|
* The caller will get access to the per-buffer data, until the next call.
|
|
* We wipe the one before, which is never occupied because queue_size
|
|
* allowed one extra element. This will hopefully trip up client code
|
|
* that is holding a dangling pointer to it.
|
|
*/
|
|
if (stream->per_buffer_data)
|
|
wipe_mem(get_per_buffer_data(stream,
|
|
oldest_buffer_index == 0 ?
|
|
stream->queue_size - 1 :
|
|
oldest_buffer_index - 1),
|
|
stream->per_buffer_data_size);
|
|
#endif
|
|
|
|
/* Pin transferred to caller. */
|
|
Assert(stream->pinned_buffers > 0);
|
|
stream->pinned_buffers--;
|
|
|
|
/* Advance oldest buffer, with wrap-around. */
|
|
stream->oldest_buffer_index++;
|
|
if (stream->oldest_buffer_index == stream->queue_size)
|
|
stream->oldest_buffer_index = 0;
|
|
|
|
/* Prepare for the next call. */
|
|
read_stream_look_ahead(stream, false);
|
|
|
|
#ifndef READ_STREAM_DISABLE_FAST_PATH
|
|
/* See if we can take the fast path for all-cached scans next time. */
|
|
if (stream->ios_in_progress == 0 &&
|
|
stream->pinned_buffers == 1 &&
|
|
stream->distance == 1 &&
|
|
stream->pending_read_nblocks == 0 &&
|
|
stream->per_buffer_data_size == 0)
|
|
{
|
|
stream->fast_path = true;
|
|
}
|
|
#endif
|
|
|
|
return buffer;
|
|
}
|
|
|
|
/*
|
|
* Reset a read stream by releasing any queued up buffers, allowing the stream
|
|
* to be used again for different blocks. This can be used to clear an
|
|
* end-of-stream condition and start again, or to throw away blocks that were
|
|
* speculatively read and read some different blocks instead.
|
|
*/
|
|
void
|
|
read_stream_reset(ReadStream *stream)
|
|
{
|
|
Buffer buffer;
|
|
|
|
/* Stop looking ahead. */
|
|
stream->distance = 0;
|
|
|
|
/* Forget buffered block number and fast path state. */
|
|
stream->buffered_blocknum = InvalidBlockNumber;
|
|
stream->fast_path = false;
|
|
|
|
/* Unpin anything that wasn't consumed. */
|
|
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
|
|
ReleaseBuffer(buffer);
|
|
|
|
Assert(stream->pinned_buffers == 0);
|
|
Assert(stream->ios_in_progress == 0);
|
|
|
|
/* Start off assuming data is cached. */
|
|
stream->distance = 1;
|
|
}
|
|
|
|
/*
|
|
* Release and free a read stream.
|
|
*/
|
|
void
|
|
read_stream_end(ReadStream *stream)
|
|
{
|
|
read_stream_reset(stream);
|
|
pfree(stream);
|
|
}
|