代码拉取完成,页面将自动刷新
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1e710fdf6a..33f587f480 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
attemptRead(channel);
}
- if (channel.hasBytesBuffered()) {
+ if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
@@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable {
private void mute(KafkaChannel channel) {
channel.mute();
explicitlyMutedChannels.add(channel);
+ keysWithBufferedRead.remove(channel.selectionKey());
}
@Override
@@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable {
// Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted.
if (channel.maybeUnmute()) {
explicitlyMutedChannels.remove(channel);
+ if (channel.hasBytesBuffered()) {
+ keysWithBufferedRead.add(channel.selectionKey());
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 293614432c..fbc5563392 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1560,8 +1560,15 @@ class SocketServerTest {
val testableSelector = testableServer.testableSelector
testableSelector.updateMinWakeup(2)
+ val sleepTimeMs = idleTimeMs / 2 + 1
val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
- time.sleep(idleTimeMs + 1)
+ // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated
+ // additional calls to poll() should not update the channel last idle time
+ for (_ <- 0 to 3) {
+ time.sleep(sleepTimeMs)
+ testableSelector.operationCounts.clear()
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
+ }
testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false)
val otherSocket = sslConnect(testableServer)
@@ -1574,6 +1581,30 @@ class SocketServerTest {
}
}
+ @Test
+ def testUnmuteChannelWithBufferedReceives(): Unit = {
+ val time = new MockTime()
+ props ++= sslServerProps
+ val testableServer = new TestableSocketServer(time = time)
+ testableServer.startup()
+ val proxyServer = new ProxyServer(testableServer)
+ try {
+ val testableSelector = testableServer.testableSelector
+ val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
+ testableSelector.operationCounts.clear()
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
+ val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+ assertEquals(Set.empty, keysWithBufferedRead.asScala)
+ processRequest(testableServer.dataPlaneRequestChannel, request)
+ // buffered requests should be processed after channel is unmuted
+ receiveRequest(testableServer.dataPlaneRequestChannel)
+ socket.close()
+ } finally {
+ proxyServer.close()
+ shutdownServerAndMetrics(testableServer)
+ }
+ }
+
/**
* Tests exception handling in [[Processor.processCompletedReceives]]. Exception is
* injected into [[Selector.mute]] which is used to mute the channel when a receive is complete.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。