Use Kotlin Coroutines Flow instead of Project reactor

This commit is contained in:
Syer10
2023-03-30 18:28:56 -04:00
parent 847a5fe71b
commit bce76bbcf3
8 changed files with 81 additions and 67 deletions
@@ -14,7 +14,7 @@ import java.io.IOException
class JavalinGraphQLRequestParser : GraphQLRequestParser<Context> {
@Suppress("BlockingMethodInNonBlockingContext")
@Suppress("BlockingMethodInNonBlockingContext", "PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override suspend fun parseRequest(context: Context): GraphQLServerRequest = try {
context.bodyAsClass(GraphQLServerRequest::class.java)
} catch (e: IOException) {
@@ -9,7 +9,7 @@ package suwayomi.tachidesk.graphql.server
import com.expediagroup.graphql.generator.SchemaGeneratorConfig
import com.expediagroup.graphql.generator.TopLevelObject
import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorHooks
import com.expediagroup.graphql.generator.toSchema
import graphql.scalars.ExtendedScalars
import graphql.schema.GraphQLType
@@ -21,10 +21,10 @@ import suwayomi.tachidesk.graphql.subscriptions.DownloadSubscription
import kotlin.reflect.KClass
import kotlin.reflect.KType
class CustomSchemaGeneratorHooks : SchemaGeneratorHooks {
class CustomSchemaGeneratorHooks : FlowSubscriptionSchemaGeneratorHooks() {
override fun willGenerateGraphQLType(type: KType): GraphQLType? = when (type.classifier as? KClass<*>) {
Long::class -> ExtendedScalars.GraphQLLong
else -> null
else -> super.willGenerateGraphQLType(type)
}
}
@@ -15,6 +15,9 @@ import graphql.GraphQL
import io.javalin.http.Context
import io.javalin.websocket.WsCloseContext
import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import suwayomi.tachidesk.graphql.server.subscriptions.ApolloSubscriptionProtocolHandler
import suwayomi.tachidesk.graphql.server.subscriptions.GraphQLSubscriptionHandler
@@ -31,7 +34,7 @@ class TachideskGraphQLServer(
subscriptionProtocolHandler.handleMessage(context)
.map { objectMapper.writeValueAsString(it) }
.map { context.send(it) }
.subscribe()
.launchIn(GlobalScope)
}
fun handleSubscriptionDisconnect(context: WsCloseContext) {
@@ -13,17 +13,24 @@ import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.readValue
import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.job
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import suwayomi.tachidesk.graphql.server.TachideskGraphQLContextFactory
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ClientMessages.*
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.*
import suwayomi.tachidesk.graphql.server.toGraphQLContext
import java.time.Duration
/**
* Implementation of the `graphql-ws` protocol defined by Apollo
@@ -42,8 +49,8 @@ class ApolloSubscriptionProtocolHandler(
private val acknowledgeMessage = SubscriptionOperationMessage(GQL_CONNECTION_ACK.type)
@Suppress("Detekt.TooGenericExceptionCaught")
fun handleMessage(context: WsMessageContext): Flux<SubscriptionOperationMessage> {
val operationMessage = convertToMessageOrNull(context.message()) ?: return Flux.just(basicConnectionErrorMessage)
fun handleMessage(context: WsMessageContext): Flow<SubscriptionOperationMessage> {
val operationMessage = convertToMessageOrNull(context.message()) ?: return flowOf(basicConnectionErrorMessage)
logger.debug("GraphQL subscription client message, sessionId=${context.sessionId} operationMessage=$operationMessage")
return try {
@@ -77,32 +84,34 @@ class ApolloSubscriptionProtocolHandler(
* If the keep alive configuration is set, send a message back to client at every interval until the session is terminated.
* Otherwise just return empty flux to append to the acknowledge message.
*/
private fun getKeepAliveFlux(context: WsContext): Flux<SubscriptionOperationMessage> {
@OptIn(FlowPreview::class)
private fun getKeepAliveFlow(context: WsContext): Flow<SubscriptionOperationMessage> {
val keepAliveInterval: Long? = 2000
if (keepAliveInterval != null) {
return Flux.interval(Duration.ofMillis(keepAliveInterval))
.map { keepAliveMessage }
.doOnSubscribe { sessionState.saveKeepAliveSubscription(context, it) }
return flowOf(keepAliveMessage).sample(keepAliveInterval)
.onStart {
sessionState.saveKeepAliveSubscription(context, currentCoroutineContext().job)
}
}
return Flux.empty()
return emptyFlow()
}
@Suppress("Detekt.TooGenericExceptionCaught")
private fun startSubscription(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Flux<SubscriptionOperationMessage> {
): Flow<SubscriptionOperationMessage> {
val graphQLContext = sessionState.getGraphQLContext(context)
if (operationMessage.id == null) {
logger.error("GraphQL subscription operation id is required")
return Flux.just(basicConnectionErrorMessage)
return flowOf(basicConnectionErrorMessage)
}
if (sessionState.doesOperationExist(context, operationMessage)) {
logger.info("Already subscribed to operation ${operationMessage.id} for session ${context.sessionId}")
return Flux.empty()
return emptyFlow()
}
val payload = operationMessage.payload
@@ -110,13 +119,12 @@ class ApolloSubscriptionProtocolHandler(
if (payload == null) {
logger.error("GraphQL subscription payload was null instead of a GraphQLRequest object")
sessionState.stopOperation(context, operationMessage)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
return flowOf(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
try {
val request = objectMapper.convertValue<GraphQLRequest>(payload)
return subscriptionHandler.executeSubscription(request, graphQLContext)
.asFlux()
.map {
if (it.errors?.isNotEmpty() == true) {
SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it)
@@ -124,22 +132,22 @@ class ApolloSubscriptionProtocolHandler(
SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it)
}
}
.concatWith(onComplete(operationMessage, context).toFlux())
.doOnSubscribe { sessionState.saveOperation(context, operationMessage, it) }
.onCompletion { if (it == null) emitAll(onComplete(operationMessage, context)) }
.onStart { sessionState.saveOperation(context, operationMessage, currentCoroutineContext().job) }
} catch (exception: Exception) {
logger.error("Error running graphql subscription", exception)
// Do not terminate the session, just stop the operation messages
sessionState.stopOperation(context, operationMessage)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
return flowOf(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
}
private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux<SubscriptionOperationMessage> {
private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flow<SubscriptionOperationMessage> {
saveContext(operationMessage, context)
val acknowledgeMessage = Mono.just(acknowledgeMessage)
val keepAliveFlux = getKeepAliveFlux(context)
return acknowledgeMessage.concatWith(keepAliveFlux)
.onErrorReturn(getConnectionErrorMessage(operationMessage))
val acknowledgeMessage = flowOf(acknowledgeMessage)
val keepAliveFlux = getKeepAliveFlow(context)
return acknowledgeMessage.onCompletion { if (it == null) emitAll(keepAliveFlux) }
.catch { emit(getConnectionErrorMessage(operationMessage)) }
}
/**
@@ -158,7 +166,7 @@ class ApolloSubscriptionProtocolHandler(
private fun onComplete(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Mono<SubscriptionOperationMessage> {
): Flow<SubscriptionOperationMessage> {
return sessionState.completeOperation(context, operationMessage)
}
@@ -168,24 +176,24 @@ class ApolloSubscriptionProtocolHandler(
private fun onStop(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Flux<SubscriptionOperationMessage> {
return sessionState.stopOperation(context, operationMessage).toFlux()
): Flow<SubscriptionOperationMessage> {
return sessionState.stopOperation(context, operationMessage)
}
private fun onDisconnect(context: WsContext): Flux<SubscriptionOperationMessage> {
private fun onDisconnect(context: WsContext): Flow<SubscriptionOperationMessage> {
sessionState.terminateSession(context)
return Flux.empty()
return emptyFlow()
}
private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux<SubscriptionOperationMessage> {
private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flow<SubscriptionOperationMessage> {
logger.error("Unknown subscription operation $operationMessage")
sessionState.stopOperation(context, operationMessage)
return Flux.just(getConnectionErrorMessage(operationMessage))
return flowOf(getConnectionErrorMessage(operationMessage))
}
private fun onException(exception: Exception): Flux<SubscriptionOperationMessage> {
private fun onException(exception: Exception): Flow<SubscriptionOperationMessage> {
logger.error("Error parsing the subscription message", exception)
return Flux.just(basicConnectionErrorMessage)
return flowOf(basicConnectionErrorMessage)
}
private fun getConnectionErrorMessage(operationMessage: SubscriptionOperationMessage): SubscriptionOperationMessage {
@@ -9,8 +9,11 @@ package suwayomi.tachidesk.graphql.server.subscriptions
import graphql.GraphQLContext
import io.javalin.websocket.WsContext
import org.reactivestreams.Subscription
import reactor.core.publisher.Mono
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onCompletion
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE
import suwayomi.tachidesk.graphql.server.toGraphQLContext
import java.util.concurrent.ConcurrentHashMap
@@ -18,10 +21,10 @@ import java.util.concurrent.ConcurrentHashMap
internal class ApolloSubscriptionSessionState {
// Sessions are saved by web socket session id
internal val activeKeepAliveSessions = ConcurrentHashMap<String, Subscription>()
internal val activeKeepAliveSessions = ConcurrentHashMap<String, Job>()
// Operations are saved by web socket session id, then operation id
internal val activeOperations = ConcurrentHashMap<String, ConcurrentHashMap<String, Subscription>>()
internal val activeOperations = ConcurrentHashMap<String, ConcurrentHashMap<String, Job>>()
// The graphQL context is saved by web socket session id
private val cachedGraphQLContext = ConcurrentHashMap<String, GraphQLContext>()
@@ -45,7 +48,7 @@ internal class ApolloSubscriptionSessionState {
* This will override values without cancelling the subscription, so it is the responsibility of the consumer to cancel.
* These messages will be stopped on [terminateSession].
*/
fun saveKeepAliveSubscription(context: WsContext, subscription: Subscription) {
fun saveKeepAliveSubscription(context: WsContext, subscription: Job) {
activeKeepAliveSessions[context.sessionId] = subscription
}
@@ -54,10 +57,10 @@ internal class ApolloSubscriptionSessionState {
* This will override values without cancelling the subscription so it is the responsibility of the consumer to cancel.
* These messages will be stopped on [stopOperation].
*/
fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Subscription) {
fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Job) {
val id = operationMessage.id
if (id != null) {
val operationsForSession: ConcurrentHashMap<String, Subscription> = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() }
val operationsForSession: ConcurrentHashMap<String, Job> = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() }
operationsForSession[id] = subscription
}
}
@@ -66,26 +69,26 @@ internal class ApolloSubscriptionSessionState {
* Send the [GQL_COMPLETE] message.
* This can happen when the publisher finishes or if the client manually sends the stop message.
*/
fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Flow<SubscriptionOperationMessage> {
return getCompleteMessage(operationMessage)
.doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) }
.onCompletion { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) }
}
/**
* Stop the subscription sending data and send the [GQL_COMPLETE] message.
* Does NOT terminate the session.
*/
fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Flow<SubscriptionOperationMessage> {
return getCompleteMessage(operationMessage)
.doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) }
.onCompletion { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) }
}
private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Flow<SubscriptionOperationMessage> {
val id = operationMessage.id
if (id != null) {
return Mono.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id))
return flowOf(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id))
}
return Mono.empty()
return emptyFlow()
}
/**
@@ -7,14 +7,14 @@
package suwayomi.tachidesk.graphql.server.subscriptions
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
class FluxSubscriptionSource<T : Any>() {
private var sink: FluxSink<T>? = null
val emitter: Flux<T> = Flux.create<T> { emitter -> sink = emitter }
class FlowSubscriptionSource<T : Any> {
private val mutableSharedFlow = MutableSharedFlow<T>()
val emitter = mutableSharedFlow.asSharedFlow()
fun publish(value: T) {
sink?.next(value)
mutableSharedFlow.tryEmit(value)
}
}
@@ -8,15 +8,16 @@
package suwayomi.tachidesk.graphql.subscriptions
import graphql.schema.DataFetchingEnvironment
import reactor.core.publisher.Flux
import suwayomi.tachidesk.graphql.server.subscriptions.FluxSubscriptionSource
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import suwayomi.tachidesk.graphql.server.subscriptions.FlowSubscriptionSource
import suwayomi.tachidesk.graphql.types.DownloadType
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
val downloadSubscriptionSource = FluxSubscriptionSource<DownloadChapter>()
val downloadSubscriptionSource = FlowSubscriptionSource<DownloadChapter>()
class DownloadSubscription {
fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flux<DownloadType> {
fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flow<DownloadType> {
return downloadSubscriptionSource.emitter.map { downloadChapter ->
DownloadType(downloadChapter)
}