Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,31 @@ class RPCProducerIdManager(brokerId: Int,
debug("Requesting next Producer ID block")
controllerChannel.sendRequest(request, new ControllerRequestCompletionHandler() {
override def onComplete(response: ClientResponse): Unit = {
val message = response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
handleAllocateProducerIdsResponse(message)
handleAllocateProducerIdsResponse(response)
}

override def onTimeout(): Unit = handleTimeout()
})
}

// Visible for testing
private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
private[transaction] def handleAllocateProducerIdsResponse(clientResponse: ClientResponse): Unit = {
if (clientResponse.authenticationException != null) {
error("Unable to allocate producer id because of an authentication exception", clientResponse.authenticationException)
handleUnsuccessfulResponse()
return
}
if (clientResponse.versionMismatch != null) {
error("Unable to allocate producer id because of a version mismatch exception", clientResponse.versionMismatch)
handleUnsuccessfulResponse()
return
}
if (!clientResponse.hasResponse) {
error("Unable to allocate producer id because of empty response from controller")
handleUnsuccessfulResponse()
return
}
val response = clientResponse.responseBody().asInstanceOf[AllocateProducerIdsResponse]
val data = response.data
var successfulResponse = false
Errors.forCode(data.errorCode()) match {
Expand All @@ -269,13 +284,17 @@ class RPCProducerIdManager(brokerId: Int,
}

if (!successfulResponse) {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.
backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
requestInFlight.set(false)
handleUnsuccessfulResponse()
}
}

private def handleUnsuccessfulResponse(): Unit = {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.
backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
requestInFlight.set(false)
}

private def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
import org.apache.kafka.common.errors.{AuthenticationException, CoordinatorLoadInProgressException, UnsupportedVersionException}
import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
Expand Down Expand Up @@ -52,31 +53,58 @@ class ProducerIdManagerTest {
val idLen: Int,
val errorQueue: ConcurrentLinkedQueue[Errors] = new ConcurrentLinkedQueue[Errors](),
val isErroneousBlock: Boolean = false,
val time: Time = Time.SYSTEM
val time: Time = Time.SYSTEM,
var hasAuthenticationException: Boolean = false,
var hasVersionMismatch: Boolean = false,
var hasNoResponse: Boolean = false
) extends RPCProducerIdManager(brokerId, time, () => 1, brokerToController) {

private val brokerToControllerRequestExecutor = Executors.newSingleThreadExecutor()
val capturedFailure: AtomicBoolean = new AtomicBoolean(false)

private def createClientResponse(authenticationException: AuthenticationException = null, versionException: UnsupportedVersionException = null, response: AllocateProducerIdsResponse = null): ClientResponse =
new ClientResponse(null, null, null, time.milliseconds, time.milliseconds, false, versionException, authenticationException, response)

override private[transaction] def sendRequest(): Unit = {

brokerToControllerRequestExecutor.submit(() => {
if (hasAuthenticationException) {
handleAllocateProducerIdsResponse(createClientResponse(authenticationException = new AuthenticationException("Auth Failure")))
hasAuthenticationException = false // reset so retry works
return
}
if (hasVersionMismatch) {
handleAllocateProducerIdsResponse(createClientResponse(versionException = new UnsupportedVersionException("Version Mismatch")))
hasVersionMismatch = false // reset so retry works
return
}
if (hasNoResponse) {
handleAllocateProducerIdsResponse(createClientResponse(null, null, null))
hasNoResponse = false // reset so retry works
return
}
val error = errorQueue.poll()
if (error == null || error == Errors.NONE) {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
handleAllocateProducerIdsResponse(createClientResponse(
response = new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)
)
))
if (!isErroneousBlock) {
idStart += idLen
}
} else {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setErrorCode(error.code)))
handleAllocateProducerIdsResponse(createClientResponse(
response = new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setErrorCode(error.code)
)
))
}
}, 0)
}

override private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
super.handleAllocateProducerIdsResponse(response)
override private[transaction] def handleAllocateProducerIdsResponse(clientResponse: ClientResponse): Unit = {
super.handleAllocateProducerIdsResponse(clientResponse)
capturedFailure.set(nextProducerIdBlock.get == null)
}
}
Expand Down Expand Up @@ -215,6 +243,45 @@ class ProducerIdManagerTest {
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}

@Test
def testRetryBackoffOnAuthException(): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1, time = time, hasAuthenticationException = true)

verifyFailure(manager)

// We should only get a new block once retry backoff ms has passed.
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}

@Test
def testRetryBackoffOnVersionMismatch(): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1, time = time, hasVersionMismatch = true)

verifyFailure(manager)

// We should only get a new block once retry backoff ms has passed.
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}

@Test
def testRetryBackoffOnNoResponse(): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1, time = time, hasNoResponse = true)

verifyFailure(manager)

// We should only get a new block once retry backoff ms has passed.
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}

private def queue(errors: Errors*): ConcurrentLinkedQueue[Errors] = {
val queue = new ConcurrentLinkedQueue[Errors]()
errors.foreach(queue.add)
Expand Down