aio: Basic read_stream adjustments for real AIO

Adapt the read stream logic for real AIO:
- If AIO is enabled, we shouldn't issue advice, but if it isn't, we should
  continue issuing advice
- AIO benefits from reading ahead with direct IO
- If effective_io_concurrency=0, pass READ_BUFFERS_SYNCHRONOUSLY to
  StartReadBuffers() to ensure synchronous IO execution

There are further improvements we should consider:

- While in read_stream_look_ahead(), we can use AIO batch submission mode for
  increased efficiency. That however requires care to avoid deadlocks and thus
  done separately.
- It can be beneficial to defer starting new IOs until we can issue multiple
  IOs at once. That however requires non-trivial heuristics to decide when to
  do so.

Reviewed-by: Noah Misch <noah@leadboat.com>
Co-authored-by: Andres Freund <andres@anarazel.de>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
This commit is contained in:
Andres Freund 2025-03-30 18:26:44 -04:00
parent b27f8637ea
commit f4d0730bbc

View File

@ -72,6 +72,7 @@
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/aio.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "storage/read_stream.h" #include "storage/read_stream.h"
@ -99,6 +100,8 @@ struct ReadStream
int16 pinned_buffers; int16 pinned_buffers;
int16 distance; int16 distance;
int16 initialized_buffers; int16 initialized_buffers;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool advice_enabled; bool advice_enabled;
bool temporary; bool temporary;
@ -250,7 +253,7 @@ read_stream_start_pending_read(ReadStream *stream)
Assert(stream->next_buffer_index == stream->oldest_buffer_index); Assert(stream->next_buffer_index == stream->oldest_buffer_index);
/* Do we need to issue read-ahead advice? */ /* Do we need to issue read-ahead advice? */
flags = 0; flags = stream->read_buffers_flags;
if (stream->advice_enabled) if (stream->advice_enabled)
{ {
if (stream->pending_read_blocknum == stream->seq_blocknum) if (stream->pending_read_blocknum == stream->seq_blocknum)
@ -261,7 +264,7 @@ read_stream_start_pending_read(ReadStream *stream)
* then stay of the way of the kernel's own read-ahead. * then stay of the way of the kernel's own read-ahead.
*/ */
if (stream->seq_until_processed != InvalidBlockNumber) if (stream->seq_until_processed != InvalidBlockNumber)
flags = READ_BUFFERS_ISSUE_ADVICE; flags |= READ_BUFFERS_ISSUE_ADVICE;
} }
else else
{ {
@ -272,7 +275,7 @@ read_stream_start_pending_read(ReadStream *stream)
*/ */
stream->seq_until_processed = stream->pending_read_blocknum; stream->seq_until_processed = stream->pending_read_blocknum;
if (stream->pinned_buffers > 0) if (stream->pinned_buffers > 0)
flags = READ_BUFFERS_ISSUE_ADVICE; flags |= READ_BUFFERS_ISSUE_ADVICE;
} }
} }
@ -613,27 +616,33 @@ read_stream_begin_impl(int flags,
stream->per_buffer_data = (void *) stream->per_buffer_data = (void *)
MAXALIGN(&stream->ios[Max(1, max_ios)]); MAXALIGN(&stream->ios[Max(1, max_ios)]);
stream->sync_mode = io_method == IOMETHOD_SYNC;
#ifdef USE_PREFETCH #ifdef USE_PREFETCH
/* /*
* This system supports prefetching advice. We can use it as long as * Read-ahead advice simulating asynchronous I/O with synchronous calls.
* direct I/O isn't enabled, the caller hasn't promised sequential access * Issue advice only if AIO is not used, direct I/O isn't enabled, the
* (overriding our detection heuristics), and max_ios hasn't been set to * caller hasn't promised sequential access (overriding our detection
* zero. * heuristics), and max_ios hasn't been set to zero.
*/ */
if ((io_direct_flags & IO_DIRECT_DATA) == 0 && if (stream->sync_mode &&
(io_direct_flags & IO_DIRECT_DATA) == 0 &&
(flags & READ_STREAM_SEQUENTIAL) == 0 && (flags & READ_STREAM_SEQUENTIAL) == 0 &&
max_ios > 0) max_ios > 0)
stream->advice_enabled = true; stream->advice_enabled = true;
#endif #endif
/* /*
* For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
* above. If we had real asynchronous I/O we might need a slightly * we still need to allocate space to combine and run one I/O. Bump it up
* different definition. * to one, and remember to ask for synchronous I/O only.
*/ */
if (max_ios == 0) if (max_ios == 0)
{
max_ios = 1; max_ios = 1;
stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
}
/* /*
* Capture stable values for these two GUC-derived numbers for the * Capture stable values for these two GUC-derived numbers for the
@ -777,6 +786,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
if (likely(next_blocknum != InvalidBlockNumber)) if (likely(next_blocknum != InvalidBlockNumber))
{ {
int flags = stream->read_buffers_flags;
if (stream->advice_enabled)
flags |= READ_BUFFERS_ISSUE_ADVICE;
/* /*
* Pin a buffer for the next call. Same buffer entry, and * Pin a buffer for the next call. Same buffer entry, and
* arbitrary I/O entry (they're all free). We don't have to * arbitrary I/O entry (they're all free). We don't have to
@ -792,8 +806,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
if (likely(!StartReadBuffer(&stream->ios[0].op, if (likely(!StartReadBuffer(&stream->ios[0].op,
&stream->buffers[oldest_buffer_index], &stream->buffers[oldest_buffer_index],
next_blocknum, next_blocknum,
stream->advice_enabled ? flags)))
READ_BUFFERS_ISSUE_ADVICE : 0)))
{ {
/* Fast return. */ /* Fast return. */
return buffer; return buffer;