Skip to content

Commit 7c8474f

Browse files
bytter til channels for å vente på shutdown
vi har observert i prod at kafka-rapid ikke stopper før shutdown-hooken har returnert. kafka-rapid stopper konsumenttråden ved å kalle på Consumer.wakeup(), og prestop-hooken kaller på CountDownLatch.await(). vi har en teori om at disse java-tråd-apiene (CountDownLatch.await()) blokkerer underliggende javatråd (og ikke coroutine), og rare ting skjer. uansett tenker vi at det er best å bruke kotlin-API så lenge vi er coroutines, enn å bruke en blanding.
1 parent 11009b4 commit 7c8474f

File tree

1 file changed

+41
-5
lines changed

1 file changed

+41
-5
lines changed

src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt

+41-5
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,63 @@ package no.nav.helse.rapids_rivers
33
import com.github.navikt.tbd_libs.rapids_and_rivers.KafkaRapid
44
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
55
import kotlinx.coroutines.Dispatchers
6+
import kotlinx.coroutines.channels.Channel
7+
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
8+
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
9+
import kotlinx.coroutines.runBlocking
610
import kotlinx.coroutines.withContext
7-
import java.util.concurrent.CountDownLatch
8-
import java.util.concurrent.TimeUnit
11+
import kotlinx.coroutines.withTimeout
12+
import kotlinx.coroutines.withTimeoutOrNull
13+
import org.slf4j.LoggerFactory
14+
import kotlin.time.Duration.Companion.seconds
915

1016
class PreStopHook(private val rapid: KafkaRapid) : RapidsConnection.StatusListener {
11-
private val shutdownLatch = CountDownLatch(1)
17+
private companion object {
18+
val log = LoggerFactory.getLogger(this::class.java)
19+
}
20+
// bruker CONFLATED som er en channel med buffer på 1, hvor hver ny melding overskriver den forrige
21+
// i praksis vil dette bety at vi ikke blokkerer senderen av shutdown-signalet
22+
private val shutdownChannel = Channel<Boolean>(CONFLATED)
1223

1324
init {
1425
rapid.register(this)
1526
}
1627

1728
override fun onShutdown(rapidsConnection: RapidsConnection) {
18-
shutdownLatch.countDown()
29+
runBlocking(Dispatchers.IO) {
30+
try {
31+
withTimeout(1.seconds) {
32+
log.info("sender shutdownsignal på channel")
33+
shutdownChannel.send(true)
34+
// a channel can be closed to indicate that no more elements are coming
35+
shutdownChannel.close()
36+
}
37+
} catch (e: Exception) {
38+
log.warn("fikk exception da vi sendte shutdown-signal på channel: ${e.message}", e)
39+
}
40+
}
1941
}
2042

43+
/**
44+
* sender stop-signal til kafkarapid.
45+
* da vil kafka-rapid sørge for at konsumer-tråden får beskjed, og starter nedstenging.
46+
* når nedstengingen er fullført vil vi få et varsel på onShutdown().
47+
* da varsler vi prestop-hooken om at nedstenging er fullført.
48+
* prestop-hooken venter i opptil 30 sekunder på å motta dette signalet.
49+
*/
2150
suspend fun handlePreStopRequest() {
2251
rapid.stop()
2352
// block the preStopHook call from returning until
2453
// ktor is ready to shut down, which means that the KafkaRapid has shutdown
2554
withContext(Dispatchers.IO) {
26-
shutdownLatch.await(30, TimeUnit.SECONDS)
55+
val shutdownValue = withTimeoutOrNull(30.seconds) {
56+
shutdownChannel.receive()
57+
}
58+
if (shutdownValue == null) {
59+
log.info("fikk ikke shutdown-signal innen timeout")
60+
} else {
61+
log.info("mottok shutdownsignal på channel")
62+
}
2763
}
2864
}
2965
}

0 commit comments

Comments
 (0)