代码拉取完成,页面将自动刷新
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index a58f4238ff..88b337311d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition,
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionState.zkVersion
- // Clear any pending AlterIsr requests and check replica state
- alterIsrManager.clearPending(topicPartition)
-
// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition,
leaderEpochStartOffsetOpt = None
zkVersion = partitionState.zkVersion
- // Since we might have been a leader previously, still clear any pending AlterIsr requests
- alterIsrManager.clearPending(topicPartition)
-
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
false
} else {
@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition,
isrState = proposedIsrState
if (!alterIsrManager.submit(alterIsrItem)) {
- // If the ISR manager did not accept our update, we need to revert back to previous state
+ // If the ISR manager did not accept our update, we need to revert the proposed state.
+ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
+ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
isrState = oldState
isrChangeListener.markFailed()
- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
+ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
+ } else {
+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
}
-
- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
}
/**
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 9ad734f708..1059a3df3e 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -49,8 +49,6 @@ trait AlterIsrManager {
def shutdown(): Unit = {}
def submit(alterIsrItem: AlterIsrItem): Boolean
-
- def clearPending(topicPartition: TopicPartition): Unit
}
case class AlterIsrItem(topicPartition: TopicPartition,
@@ -134,9 +132,6 @@ class DefaultAlterIsrManager(
enqueued
}
- override def clearPending(topicPartition: TopicPartition): Unit = {
- unsentIsrUpdates.remove(topicPartition)
- }
private[server] def maybePropagateIsrChanges(): Unit = {
// Send all pending items if there is not already a request in-flight.
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala
index 2d88aac6b4..8dffcdf307 100644
--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
}
- override def clearPending(topicPartition: TopicPartition): Unit = {
- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to
- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK
- // has already happened, so we may as well send the notification to the controller.
- }
-
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " +
s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}")
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 5eedb63ae5..4dbd735753 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -18,10 +18,10 @@
package kafka.controller
import java.util.Properties
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
-
+import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
import com.yammer.metrics.core.Timer
import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
+import kafka.controller.KafkaController.AlterIsrCallback
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{LogCaptureAppender, TestUtils}
@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
latch.await()
}
+ @Test
+ def testAlterIsrErrors(): Unit = {
+ servers = makeServers(1)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId))
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ val controller = getController().kafkaController
+ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
+ var capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
+
+ future = captureAlterIsrError(99, controller.brokerEpoch,
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
+ capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
+
+ val unknownTopicPartition = new TopicPartition("unknown", 99)
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
+ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition)
+ capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
+
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
+ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp)
+ capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
+ }
+
+ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
+ val future = new CompletableFuture[Errors]()
+ val controller = getController().kafkaController
+ val callback: AlterIsrCallback = {
+ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
+ future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
+ case Right(error: Errors) =>
+ future.complete(error)
+ }
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
+ future
+ }
+
+ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
+ val future = new CompletableFuture[Errors]()
+ val controller = getController().kafkaController
+ val callback: AlterIsrCallback = {
+ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
+ partitionResults.get(tp) match {
+ case Some(Left(error: Errors)) => future.complete(error)
+ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
+ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
+ }
+ case Right(_: Errors) =>
+ future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
+ }
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
+ future
+ }
+
+
@Test
def testTopicIdsAreAdded(): Unit = {
servers = makeServers(1)
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index 1074fd3157..1c8c81471f 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -70,8 +70,10 @@ class AlterIsrManagerTest {
@Test
def testOverwriteWithinBatch(): Unit = {
val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]()
+ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()
+
EasyMock.expect(brokerToController.start())
- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once()
+ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2)
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
@@ -81,11 +83,21 @@ class AlterIsrManagerTest {
// Only send one ISR update for a given topic+partition
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))
+
+ // Simulate response
+ val alterIsrResp = partitionResponse(tp0, Errors.NONE)
+ val resp = new ClientResponse(null, null, "", 0L, 0L,
+ false, null, null, alterIsrResp)
+ callbackCapture.getValue.onComplete(resp)
+
+ // Now we can submit this partition again
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0)))
EasyMock.verify(brokerToController)
+ // Make sure we sent the right request ISR={1}
val request = capture.getValue.build()
assertEquals(request.data().topics().size(), 1)
- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3)
+ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 43df2b97f4..8e52007bc7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1106,10 +1106,6 @@ object TestUtils extends Logging {
}
}
- override def clearPending(topicPartition: TopicPartition): Unit = {
- inFlight.set(false);
- }
-
def completeIsrUpdate(newZkVersion: Int): Unit = {
if (inFlight.compareAndSet(true, false)) {
val item = isrUpdates.head
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。