7 Star 5 Fork 24

src-openEuler/kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
0010-not-update-connection.patch 4.12 KB
一键复制 编辑 原始数据 按行查看 历史
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1e710fdf6a..33f587f480 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
attemptRead(channel);
}
- if (channel.hasBytesBuffered()) {
+ if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
@@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable {
private void mute(KafkaChannel channel) {
channel.mute();
explicitlyMutedChannels.add(channel);
+ keysWithBufferedRead.remove(channel.selectionKey());
}
@Override
@@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable {
// Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted.
if (channel.maybeUnmute()) {
explicitlyMutedChannels.remove(channel);
+ if (channel.hasBytesBuffered()) {
+ keysWithBufferedRead.add(channel.selectionKey());
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 293614432c..fbc5563392 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1560,8 +1560,15 @@ class SocketServerTest {
val testableSelector = testableServer.testableSelector
testableSelector.updateMinWakeup(2)
+ val sleepTimeMs = idleTimeMs / 2 + 1
val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
- time.sleep(idleTimeMs + 1)
+ // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated
+ // additional calls to poll() should not update the channel last idle time
+ for (_ <- 0 to 3) {
+ time.sleep(sleepTimeMs)
+ testableSelector.operationCounts.clear()
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
+ }
testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false)
val otherSocket = sslConnect(testableServer)
@@ -1574,6 +1581,30 @@ class SocketServerTest {
}
}
+ @Test
+ def testUnmuteChannelWithBufferedReceives(): Unit = {
+ val time = new MockTime()
+ props ++= sslServerProps
+ val testableServer = new TestableSocketServer(time = time)
+ testableServer.startup()
+ val proxyServer = new ProxyServer(testableServer)
+ try {
+ val testableSelector = testableServer.testableSelector
+ val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
+ testableSelector.operationCounts.clear()
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
+ val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+ assertEquals(Set.empty, keysWithBufferedRead.asScala)
+ processRequest(testableServer.dataPlaneRequestChannel, request)
+ // buffered requests should be processed after channel is unmuted
+ receiveRequest(testableServer.dataPlaneRequestChannel)
+ socket.close()
+ } finally {
+ proxyServer.close()
+ shutdownServerAndMetrics(testableServer)
+ }
+ }
+
/**
* Tests exception handling in [[Processor.processCompletedReceives]]. Exception is
* injected into [[Selector.mute]] which is used to mute the channel when a receive is complete.
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/src-openeuler/kafka.git
[email protected]:src-openeuler/kafka.git
src-openeuler
kafka
kafka
master

搜索帮助