Skip to content

Commit df0a3fe

Browse files
jclynesvc-squareup-copybara
authored andcommitted
Cleanup and Documentation
GitOrigin-RevId: 1115466d02e3a5e97875020c7056b0b4420fb8d1
1 parent a12bfa1 commit df0a3fe

9 files changed

+52
-61
lines changed

docs/actions.md

+6-3
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,10 @@ class GreeterActionModule : KAbstractModule() {
187187
}
188188
```
189189

190-
Misk also supports `rpcCallStyle = "suspending"` for suspending gRPC actions.This is the preferred way to generate server
191-
actions if you intend on using coroutines to implement the business logic of your action. See [coroutines](coroutines.md) for more information.
190+
Misk also supports `rpcCallStyle = "suspending"` for suspending gRPC actions. This is the way to generate server
191+
actions if you intend to use coroutines to implement the business logic of your action. The generated interface will
192+
then expect you to implement a suspending function instead of a regular blocking function for your action's handler method.
193+
See [coroutines](coroutines.md) for more information.
192194

193195
```kotlin
194196
wire {
@@ -208,7 +210,8 @@ wire {
208210
}
209211
```
210212

211-
The above will generate a similar action class, but with a suspending action function
213+
The above Wire Gradle plugin configuration will have a suspending function in the generated interface instead of a
214+
regular function. Your implementing class will then look like this:
212215

213216
```kotlin
214217
@Singleton

docs/coroutines.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Misk Coroutines Rules
1+
# Misk Coroutines Rules
22

33
Coroutines are cooperative concurrency. If you don't cooperate (ie. suspend regularly), things will not be as efficient
44
and may potentially deadlock.
@@ -32,7 +32,7 @@ Example
3232
}```
3333
3434
35-
When an action is declared with the `suspend` modifier, it'll be called with a `Dispatcher` that has a single backing
35+
When an action is declared with the `suspend` modifier, it will be called with a `Dispatcher` that has a single backing
3636
thread (`runBlocking`). This thread is part of the Jetty Thread Pool and allocated to this specific request, therefore
3737
it is safe to make blocking calls on. This will also take care of request scoped features that are thread local,
3838
such as ActionScoped values, MDC, tracing, etc.
@@ -44,6 +44,10 @@ Follow structured concurrency best practices, including:
4444
- All coroutines should be in child scopes for the incoming request scope.
4545
- Don't use `GlobalScope` or create a new `CoroutineScope()` object.
4646
47+
Misk is gradually adding experimental support for Kotlin coroutines.
48+
Functionality may be partially implemented or buggy in the short term as support is added to core libraries
49+
and implementations. Please report any issues encountered.
50+
4751
4852
4953

misk/src/main/kotlin/misk/grpc/GrpcFeatureBinding.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,11 @@ internal class GrpcFeatureBinding(
107107
webConfig: WebConfig
108108
) : FeatureBinding.Factory {
109109

110-
// This dispatcher is sized to the jetty thread pool size to make sure that
111-
// no requests that are currently scheduled on a jetty thread are ever blocked
112-
// from reading a streaming request
110+
/**
111+
* This dispatcher is sized to the jetty thread pool size to make sure that
112+
* no requests that are currently scheduled on a jetty thread are ever blocked
113+
* from reading a streaming request.
114+
*/
113115
private val grpcMessageSourceChannelDispatcher =
114116
Dispatchers.IO.limitedParallelism(
115117
parallelism = webConfig.jetty_max_thread_pool_size,
@@ -147,7 +149,7 @@ internal class GrpcFeatureBinding(
147149
@Suppress("UNCHECKED_CAST") // Assume it's a proto type.
148150
ProtoAdapter.get(wireAnnotation.responseAdapter) as ProtoAdapter<Any>
149151
}
150-
val isSuspending = action.function.isSuspend
152+
val isSuspend = action.function.isSuspend
151153

152154
return if (streamingRequestType != null) {
153155
@Suppress("UNCHECKED_CAST") // Assume it's a proto type.
@@ -156,7 +158,7 @@ internal class GrpcFeatureBinding(
156158
responseAdapter = responseAdapter,
157159
streamingRequest = true,
158160
streamingResponse = streamingResponse,
159-
isSuspend = isSuspending,
161+
isSuspend = isSuspend,
160162
grpcMessageSourceChannelContext = grpcMessageSourceChannelDispatcher,
161163
)
162164
} else {
@@ -166,7 +168,7 @@ internal class GrpcFeatureBinding(
166168
responseAdapter = responseAdapter,
167169
streamingRequest = false,
168170
streamingResponse = streamingResponse,
169-
isSuspend = isSuspending,
171+
isSuspend = isSuspend,
170172
grpcMessageSourceChannelContext = grpcMessageSourceChannelDispatcher,
171173
)
172174
}

misk/src/main/kotlin/misk/logging/DynamicMdcContext.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ internal class DynamicMdcContext(
3636
}
3737

3838
override fun restoreThreadContext(context: CoroutineContext, oldState: MDCContext?) {
39-
//NOOP
39+
clear()
4040
}
4141

4242
override fun copyForChild(): CopyableThreadContextElement<MDCContext?> {

misk/src/main/kotlin/misk/web/actions/WebActions.kt

+12-10
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,26 @@ internal fun WebAction.asChain(
3535
argsMap[param] = arg
3636
}
3737

38-
return if (function.isSuspend) {
38+
return if (!function.isSuspend) {
39+
function.callBy(argsMap)
40+
} else {
3941
// Handle suspending invocation, this includes building out the context to propagate MDC
4042
// and action scope.
41-
val context = EmptyCoroutineContext +
42-
DynamicMdcContext() +
43-
if (scope.inScope()) scope.asContextElement() else EmptyCoroutineContext
43+
val context = DynamicMdcContext() +
44+
if (scope.inScope()) {
45+
scope.asContextElement()
46+
} else {
47+
EmptyCoroutineContext
48+
}
4449

4550
runBlocking(context) {
4651
// Build the list of Source and Sink Channels (should only be 0 or 1 of each)
4752
val sourceChannel = argsMap.values
4853
.mapNotNull { it as? GrpcMessageSourceChannel<*> }
4954
.singleOrNull()
50-
val sinkChannel =
51-
argsMap.values
52-
.mapNotNull { it as? GrpcMessageSinkChannel<*> }
53-
.singleOrNull()
55+
val sinkChannel = argsMap.values
56+
.mapNotNull { it as? GrpcMessageSinkChannel<*> }
57+
.singleOrNull()
5458
// Launch a coroutine for each Source and Sink Channels to bridge the data
5559
sourceChannel?.let { launch { sourceChannel.bridgeFromSource() } }
5660
sinkChannel?.let { launch { sinkChannel.bridgeToSink() } }
@@ -63,8 +67,6 @@ internal fun WebAction.asChain(
6367
sinkChannel?.close()
6468
}
6569
}
66-
} else {
67-
function.callBy(argsMap)
6870
} ?: throw IllegalStateException("Null return from WebAction")
6971
}
7072
}

misk/src/test/kotlin/misk/grpc/GrpcMessageSinkChannelTest.kt

+3-17
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,15 @@ import kotlinx.coroutines.launch
77
import kotlinx.coroutines.test.runTest
88
import okio.Buffer
99
import okio.ByteString.Companion.decodeHex
10-
import org.assertj.core.api.Assertions.assertThat
1110
import org.junit.jupiter.api.AfterEach
12-
import org.junit.jupiter.api.BeforeEach
1311
import org.junit.jupiter.api.Test
12+
import kotlin.test.assertEquals
1413
import kotlin.time.Duration.Companion.milliseconds
15-
import kotlin.time.Duration.Companion.seconds
1614

1715
class GrpcMessageSinkChannelTest {
18-
1916
private val buffer = Buffer()
2017
private val writer = GrpcMessageSink(buffer, HelloRequest.ADAPTER, "identity")
2118

22-
2319
@AfterEach
2420
fun tearDown() {
2521
writer.close()
@@ -28,28 +24,18 @@ class GrpcMessageSinkChannelTest {
2824

2925
@Test
3026
fun `test bridge from Channel to GrpcMessageSink HelloRequest`() = runTest {
31-
3227
val channel = Channel<HelloRequest>()
3328
launch { GrpcMessageSinkChannel(channel, writer).bridgeToSink() }
3429

3530
channel.send(HelloRequest("localhost"))
3631
eventually(100.milliseconds) {
37-
buffer.readByteString().also { byteString ->
38-
assertThat(byteString)
39-
.isEqualTo("000000000b0a096c6f63616c686f7374".decodeHex())
40-
}
32+
assertEquals("000000000b0a096c6f63616c686f7374".decodeHex(), buffer.readByteString())
4133
}
4234

4335
channel.send(HelloRequest("proxy"))
4436
eventually(100.milliseconds) {
45-
buffer.readByteString().also { byteString ->
46-
assertThat(byteString)
47-
.isEqualTo("00000000070a0570726f7879".decodeHex())
48-
}
37+
assertEquals("00000000070a0570726f7879".decodeHex(), buffer.readByteString())
4938
}
5039
channel.close()
51-
5240
}
53-
5441
}
55-

misk/src/test/kotlin/misk/grpc/GrpcMessageSourceChannelTest.kt

+3-10
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ import kotlinx.coroutines.launch
77
import kotlinx.coroutines.test.runTest
88
import okio.Buffer
99
import okio.ByteString.Companion.decodeHex
10-
import org.assertj.core.api.Assertions.assertThat
1110
import org.junit.jupiter.api.AfterEach
1211
import org.junit.jupiter.api.Test
12+
import kotlin.test.assertEquals
1313
import kotlin.time.Duration.Companion.milliseconds
1414

1515
class GrpcMessageSourceChannelTest {
16-
1716
private val buffer = Buffer()
1817
private val reader = GrpcMessageSource(buffer, HelloRequest.ADAPTER)
1918

@@ -35,17 +34,11 @@ class GrpcMessageSourceChannelTest {
3534
launch { GrpcMessageSourceChannel(channel, reader, coroutineContext).bridgeFromSource() }
3635

3736
eventually(100.milliseconds) {
38-
channel.receive().also { request ->
39-
assertThat(request).isEqualTo(HelloRequest("localhost"))
40-
}
37+
assertEquals(HelloRequest("localhost"), channel.receive())
4138
}
4239

4340
eventually(100.milliseconds) {
44-
channel.receive().also { request ->
45-
assertThat(request).isEqualTo(HelloRequest("proxy"))
46-
}
41+
assertEquals(HelloRequest("proxy"), channel.receive())
4742
}
4843
}
49-
5044
}
51-

misk/src/test/kotlin/misk/grpc/GrpcSourceSinkTest.kt

+9-8
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,26 @@ import com.squareup.protos.test.grpc.HelloReply
44
import com.squareup.protos.test.grpc.HelloRequest
55
import okio.Buffer
66
import okio.ByteString.Companion.decodeHex
7-
import org.assertj.core.api.Assertions.assertThat
87
import org.junit.jupiter.api.Test
8+
import kotlin.test.assertEquals
99

1010
class GrpcSourceSinkTest {
1111
@Test
1212
fun grpcMessageSourceHelloRequest() {
1313
val buffer = Buffer()
1414
buffer.write("000000000b0a096c6f63616c686f7374".decodeHex())
1515
val reader = GrpcMessageSource(buffer, HelloRequest.ADAPTER)
16-
assertThat(reader.read()).isEqualTo(HelloRequest("localhost"))
16+
17+
assertEquals(HelloRequest("localhost"), reader.read())
1718
}
1819

1920
@Test
2021
fun grpcMessageSourceHelloReply() {
2122
val buffer = Buffer()
2223
buffer.write("00000000110a0f48656c6c6f206c6f63616c686f7374".decodeHex())
2324
val reader = GrpcMessageSource(buffer, HelloReply.ADAPTER)
24-
assertThat(reader.read()).isEqualTo(HelloReply("Hello localhost"))
25+
26+
assertEquals(HelloReply("Hello localhost"), reader.read())
2527
}
2628

2729
@Test
@@ -30,8 +32,8 @@ class GrpcSourceSinkTest {
3032
val writer = GrpcMessageSink(buffer, HelloRequest.ADAPTER, "identity")
3133
writer.write(HelloRequest("localhost"))
3234
writer.close()
33-
assertThat(buffer.readByteString())
34-
.isEqualTo("000000000b0a096c6f63616c686f7374".decodeHex())
35+
36+
assertEquals("000000000b0a096c6f63616c686f7374".decodeHex(), buffer.readByteString())
3537
}
3638

3739
@Test
@@ -40,8 +42,7 @@ class GrpcSourceSinkTest {
4042
val writer = GrpcMessageSink(buffer, HelloReply.ADAPTER, "identity")
4143
writer.write(HelloReply("Hello localhost"))
4244
writer.close()
43-
assertThat(buffer.readByteString()).isEqualTo(
44-
"00000000110a0f48656c6c6f206c6f63616c686f7374".decodeHex()
45-
)
45+
46+
assertEquals("00000000110a0f48656c6c6f206c6f63616c686f7374".decodeHex(), buffer.readByteString())
4647
}
4748
}

misk/src/test/kotlin/misk/logging/DynamicMdcContextTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package misk.logging
33
import io.kotest.assertions.assertSoftly
44
import kotlinx.coroutines.launch
55
import kotlinx.coroutines.runBlocking
6-
import org.assertj.core.api.Assertions.assertThat
76
import org.junit.jupiter.api.BeforeEach
87
import org.junit.jupiter.api.Test
98
import org.slf4j.MDC
109
import wisp.logging.getLogger
10+
import kotlin.test.assertEquals
1111

1212
class DynamicMdcContextTest {
1313
@BeforeEach
@@ -33,9 +33,9 @@ class DynamicMdcContextTest {
3333
logger.info { "level2 added" }
3434
}
3535
assertSoftly {
36-
assertThat(MDC.get("level0")).isEqualTo("value0")
37-
assertThat(MDC.get("level1")).isEqualTo("value1")
38-
assertThat(MDC.get("level2")).isEqualTo("value2")
36+
assertEquals("value0", MDC.get("level0"))
37+
assertEquals("value1", MDC.get("level1"))
38+
assertEquals("value2", MDC.get("level2"))
3939
}
4040
}
4141
}

0 commit comments

Comments
 (0)