mirror of
https://github.com/postgres/postgres.git
synced 2025-06-02 00:01:40 -04:00
shm_mq_sendv: Fix flushing bug when receiver not yet attached.
With the old logic, when the reciever had not yet attached, we would never call shm_mq_inc_bytes_written(), even if force_flush = true was specified. That could result in a situation where data that the sender believes it has sent is never received. Along the way, remove a useless function prototype for a nonexistent function from shm_mq.h. Commit 46846433a03dff4f2e08c8a161e54a842da360d6 introduced these problems. Pavan Deolasee, with a few changes by me. Discussion: https://postgr.es/m/CABOikdPkwtLLCTnzzmpSMXo3QZa2yXq0J7Q61ssdLFAJYrOVvQ@mail.gmail.com
This commit is contained in:
parent
0a050ee000
commit
f5bfba5413
@ -518,8 +518,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* If the counterparty is known to have attached, we can read mq_receiver
|
* If the counterparty is known to have attached, we can read mq_receiver
|
||||||
* without acquiring the spinlock and assume it isn't NULL. Otherwise,
|
* without acquiring the spinlock. Otherwise, more caution is needed.
|
||||||
* more caution is needed.
|
|
||||||
*/
|
*/
|
||||||
if (mqh->mqh_counterparty_attached)
|
if (mqh->mqh_counterparty_attached)
|
||||||
receiver = mq->mq_receiver;
|
receiver = mq->mq_receiver;
|
||||||
@ -528,9 +527,8 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
|
|||||||
SpinLockAcquire(&mq->mq_mutex);
|
SpinLockAcquire(&mq->mq_mutex);
|
||||||
receiver = mq->mq_receiver;
|
receiver = mq->mq_receiver;
|
||||||
SpinLockRelease(&mq->mq_mutex);
|
SpinLockRelease(&mq->mq_mutex);
|
||||||
if (receiver == NULL)
|
if (receiver != NULL)
|
||||||
return SHM_MQ_SUCCESS;
|
mqh->mqh_counterparty_attached = true;
|
||||||
mqh->mqh_counterparty_attached = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -541,7 +539,8 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
|
|||||||
if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
|
if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
|
||||||
{
|
{
|
||||||
shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
|
shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
|
||||||
SetLatch(&receiver->procLatch);
|
if (receiver != NULL)
|
||||||
|
SetLatch(&receiver->procLatch);
|
||||||
mqh->mqh_send_pending = 0;
|
mqh->mqh_send_pending = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +76,6 @@ extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
|
|||||||
int iovcnt, bool nowait, bool force_flush);
|
int iovcnt, bool nowait, bool force_flush);
|
||||||
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
|
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
|
||||||
Size *nbytesp, void **datap, bool nowait);
|
Size *nbytesp, void **datap, bool nowait);
|
||||||
extern void shm_mq_flush(shm_mq_handle *mqh);
|
|
||||||
|
|
||||||
/* Wait for our counterparty to attach to the queue. */
|
/* Wait for our counterparty to attach to the queue. */
|
||||||
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
|
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user