From 06eff5521023b403183b13fc6b29daa6d14e2270 Mon Sep 17 00:00:00 2001 From: Mitchell Syer Date: Tue, 11 Oct 2022 12:27:15 -0400 Subject: [PATCH] Updater cleanup and improvements (#416) --- .../manga/controller/UpdateController.kt | 23 +++--- .../tachidesk/manga/impl/update/IUpdater.kt | 4 +- .../tachidesk/manga/impl/update/UpdateJob.kt | 10 +-- .../manga/impl/update/UpdateStatus.kt | 38 ++++------ .../tachidesk/manga/impl/update/Updater.kt | 74 ++++++++++--------- .../manga/impl/update/UpdaterSocket.kt | 24 +++--- .../tachidesk/manga/impl/update/Websocket.kt | 10 +-- .../manga/controller/UpdateControllerTest.kt | 6 +- .../manga/impl/update/TestUpdater.kt | 19 +++-- 9 files changed, 100 insertions(+), 108 deletions(-) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt index 21864836..c6f09bb7 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt @@ -2,7 +2,6 @@ package suwayomi.tachidesk.manga.controller import io.javalin.http.HttpCode import io.javalin.websocket.WsConfig -import kotlinx.coroutines.runBlocking import mu.KotlinLogging import org.kodein.di.DI import org.kodein.di.conf.global @@ -15,6 +14,7 @@ import suwayomi.tachidesk.manga.impl.update.UpdateStatus import suwayomi.tachidesk.manga.impl.update.UpdaterSocket import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaChapterDataClass +import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass import suwayomi.tachidesk.manga.model.dataclass.PaginatedList import suwayomi.tachidesk.server.JavalinSetup.future import suwayomi.tachidesk.server.util.formParam @@ -68,22 +68,18 @@ object UpdateController { } }, behaviorOf = { ctx, categoryId -> - val categoriesForUpdate = ArrayList() if (categoryId == null) { logger.info { "Adding Library to Update Queue" } - categoriesForUpdate.addAll(Category.getCategoryList()) + addCategoriesToUpdateQueue(Category.getCategoryList(), true) } else { val category = Category.getCategoryById(categoryId) if (category != null) { - categoriesForUpdate.add(category) + addCategoriesToUpdateQueue(listOf(category), true) } else { logger.info { "No Category found" } ctx.status(HttpCode.BAD_REQUEST) - return@handler } } - addCategoriesToUpdateQueue(categoriesForUpdate, true) - ctx.status(HttpCode.OK) }, withResults = { httpCode(HttpCode.OK) @@ -94,14 +90,15 @@ object UpdateController { private fun addCategoriesToUpdateQueue(categories: List, clear: Boolean = false) { val updater by DI.global.instance() if (clear) { - runBlocking { updater.reset() } + updater.reset() } - categories.forEach { category -> - val mangas = CategoryManga.getCategoryMangaList(category.id) - mangas.forEach { manga -> + categories + .flatMap { CategoryManga.getCategoryMangaList(it.id) } + .distinctBy { it.id } + .sortedWith(compareBy(String.CASE_INSENSITIVE_ORDER, MangaDataClass::title)) + .forEach { manga -> updater.addMangaToQueue(manga) } - } } fun categoryUpdateWS(ws: WsConfig) { @@ -125,7 +122,7 @@ object UpdateController { }, behaviorOf = { ctx -> val updater by DI.global.instance() - ctx.json(updater.getStatus().value.getJsonSummary()) + ctx.json(updater.status.value) }, withResults = { json(HttpCode.OK) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt index 0b18e30b..33c20bf8 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt @@ -5,6 +5,6 @@ import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass interface IUpdater { fun addMangaToQueue(manga: MangaDataClass) - fun getStatus(): StateFlow - suspend fun reset(): Unit + val status: StateFlow + fun reset() } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt index b5463ad6..928fb98f 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt @@ -9,9 +9,7 @@ enum class JobStatus { FAILED } -class UpdateJob(val manga: MangaDataClass, var status: JobStatus = JobStatus.PENDING) { - - override fun toString(): String { - return "UpdateJob(status=$status, manga=${manga.title})" - } -} +data class UpdateJob( + val manga: MangaDataClass, + val status: JobStatus = JobStatus.PENDING +) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt index 3dd78cff..7f0a0999 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt @@ -1,33 +1,23 @@ package suwayomi.tachidesk.manga.impl.update +import com.fasterxml.jackson.annotation.JsonIgnore import mu.KotlinLogging import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass -var logger = KotlinLogging.logger {} -class UpdateStatus( - var statusMap: MutableMap> = mutableMapOf>(), - var running: Boolean = false, +val logger = KotlinLogging.logger {} +data class UpdateStatus( + val statusMap: Map> = emptyMap(), + val running: Boolean = false, + @JsonIgnore + val numberOfJobs: Int = 0 ) { - var numberOfJobs: Int = 0 constructor(jobs: List, running: Boolean) : this( - mutableMapOf>(), - running - ) { - this.numberOfJobs = jobs.size - jobs.forEach { - val list = statusMap.getOrDefault(it.status, mutableListOf()) - list.add(it.manga) - statusMap[it.status] = list - } - } - - override fun toString(): String { - return "UpdateStatus(statusMap=${statusMap.map { "${it.key} : ${it.value.size}" }.joinToString("; ")}, running=$running)" - } - - // serialize to summary json - fun getJsonSummary(): String { - return """{"statusMap":{${statusMap.map { "\"${it.key}\" : ${it.value.size}" }.joinToString(",")}}, "running":$running}""" - } + statusMap = jobs.groupBy { it.status } + .mapValues { entry -> + entry.value.map { it.manga } + }, + running = running, + numberOfJobs = jobs.size + ) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index 35f51e2c..6def341d 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -3,74 +3,76 @@ package suwayomi.tachidesk.manga.impl.update import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel +import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch import mu.KotlinLogging import suwayomi.tachidesk.manga.impl.Chapter import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass +import java.util.concurrent.ConcurrentHashMap class Updater : IUpdater { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - private var tracker = mutableMapOf() - private var updateChannel = Channel() - private val statusChannel = MutableStateFlow(UpdateStatus()) - private var updateJob: Job? = null + private val _status = MutableStateFlow(UpdateStatus()) + override val status = _status.asStateFlow() - init { - updateJob = createUpdateJob() - } + private val tracker = ConcurrentHashMap() + private var updateChannel = createUpdateChannel() - private fun createUpdateJob(): Job { - return scope.launch { - while (true) { - val job = updateChannel.receive() - process(job) - statusChannel.value = UpdateStatus(tracker.values.toList(), !updateChannel.isEmpty) + private fun createUpdateChannel(): Channel { + val channel = Channel(Channel.UNLIMITED) + channel.consumeAsFlow() + .onEach { job -> + _status.value = UpdateStatus( + process(job), + tracker.any { (_, job) -> + job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING + } + ) } - } + .catch { logger.error(it) { "Error during updates" } } + .launchIn(scope) + return channel } - private suspend fun process(job: UpdateJob) { - job.status = JobStatus.RUNNING - tracker["${job.manga.id}"] = job - statusChannel.value = UpdateStatus(tracker.values.toList(), true) - try { + private suspend fun process(job: UpdateJob): List { + tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) + _status.update { UpdateStatus(tracker.values.toList(), true) } + tracker[job.manga.id] = try { logger.info { "Updating ${job.manga.title}" } Chapter.getChapterList(job.manga.id, true) - job.status = JobStatus.COMPLETE + job.copy(status = JobStatus.COMPLETE) } catch (e: Exception) { if (e is CancellationException) throw e logger.error(e) { "Error while updating ${job.manga.title}" } - job.status = JobStatus.FAILED + job.copy(status = JobStatus.FAILED) } - tracker["${job.manga.id}"] = job + return tracker.values.toList() } override fun addMangaToQueue(manga: MangaDataClass) { scope.launch { updateChannel.send(UpdateJob(manga)) } - tracker["${manga.id}"] = UpdateJob(manga) - statusChannel.value = UpdateStatus(tracker.values.toList(), true) + tracker[manga.id] = UpdateJob(manga) + _status.update { UpdateStatus(tracker.values.toList(), true) } } - override fun getStatus(): StateFlow { - return statusChannel - } - - override suspend fun reset() { + override fun reset() { + scope.coroutineContext.cancelChildren() tracker.clear() + _status.update { UpdateStatus() } updateChannel.cancel() - statusChannel.value = UpdateStatus() - updateJob?.cancel("Reset") - updateChannel = Channel() - updateJob = createUpdateJob() + updateChannel = createUpdateChannel() } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt index 38e692a0..69d901c5 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt @@ -6,26 +6,26 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.flow.collectLatest -import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import mu.KotlinLogging import org.kodein.di.DI import org.kodein.di.conf.global import org.kodein.di.instance -object UpdaterSocket : Websocket() { +object UpdaterSocket : Websocket() { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val updater by DI.global.instance() private var job: Job? = null - override fun notifyClient(ctx: WsContext) { - ctx.send(updater.getStatus().value.getJsonSummary()) + override fun notifyClient(ctx: WsContext, value: UpdateStatus?) { + ctx.send(value ?: updater.status.value) } override fun handleRequest(ctx: WsMessageContext) { when (ctx.message()) { - "STATUS" -> notifyClient(ctx) + "STATUS" -> notifyClient(ctx, updater.status.value) else -> ctx.send( """ |Invalid command. @@ -40,7 +40,7 @@ object UpdaterSocket : Websocket() { override fun addClient(ctx: WsContext) { logger.info { ctx.sessionId } super.addClient(ctx) - if (job == null) { + if (job?.isActive != true) { job = start() } } @@ -54,12 +54,10 @@ object UpdaterSocket : Websocket() { } fun start(): Job { - return scope.launch { - while (true) { - updater.getStatus().collectLatest { - notifyAllClients() - } + return updater.status + .onEach { + notifyAllClients(it) } - } + .launchIn(scope) } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Websocket.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Websocket.kt index e83675dd..2687de46 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Websocket.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Websocket.kt @@ -4,18 +4,18 @@ import io.javalin.websocket.WsContext import io.javalin.websocket.WsMessageContext import java.util.concurrent.ConcurrentHashMap -abstract class Websocket { +abstract class Websocket { protected val clients = ConcurrentHashMap() open fun addClient(ctx: WsContext) { clients[ctx.sessionId] = ctx - notifyClient(ctx) + notifyClient(ctx, null) } open fun removeClient(ctx: WsContext) { clients.remove(ctx.sessionId) } - open fun notifyAllClients() { - clients.values.forEach { notifyClient(it) } + open fun notifyAllClients(value: T) { + clients.values.forEach { notifyClient(it, value) } } - abstract fun notifyClient(ctx: WsContext) + abstract fun notifyClient(ctx: WsContext, value: T?) abstract fun handleRequest(ctx: WsMessageContext) } diff --git a/server/src/test/kotlin/suwayomi/tachidesk/manga/controller/UpdateControllerTest.kt b/server/src/test/kotlin/suwayomi/tachidesk/manga/controller/UpdateControllerTest.kt index cce77e7a..6f5ab8d6 100644 --- a/server/src/test/kotlin/suwayomi/tachidesk/manga/controller/UpdateControllerTest.kt +++ b/server/src/test/kotlin/suwayomi/tachidesk/manga/controller/UpdateControllerTest.kt @@ -32,7 +32,7 @@ internal class UpdateControllerTest : ApplicationTest() { UpdateController.categoryUpdate(ctx) verify { ctx.status(HttpCode.BAD_REQUEST) } val updater by DI.global.instance() - assertEquals(0, updater.getStatus().value.numberOfJobs) + assertEquals(0, updater.status.value.numberOfJobs) } @Test @@ -44,7 +44,7 @@ internal class UpdateControllerTest : ApplicationTest() { UpdateController.categoryUpdate(ctx) verify { ctx.status(HttpCode.OK) } val updater by DI.global.instance() - assertEquals(1, updater.getStatus().value.numberOfJobs) + assertEquals(1, updater.status.value.numberOfJobs) } @Test @@ -60,7 +60,7 @@ internal class UpdateControllerTest : ApplicationTest() { UpdateController.categoryUpdate(ctx) verify { ctx.status(HttpCode.OK) } val updater by DI.global.instance() - assertEquals(3, updater.getStatus().value.numberOfJobs) + assertEquals(3, updater.status.value.numberOfJobs) } private fun createLibraryManga( diff --git a/server/src/test/kotlin/suwayomi/tachidesk/manga/impl/update/TestUpdater.kt b/server/src/test/kotlin/suwayomi/tachidesk/manga/impl/update/TestUpdater.kt index dbff40c4..b4c7f8a1 100644 --- a/server/src/test/kotlin/suwayomi/tachidesk/manga/impl/update/TestUpdater.kt +++ b/server/src/test/kotlin/suwayomi/tachidesk/manga/impl/update/TestUpdater.kt @@ -2,23 +2,30 @@ package suwayomi.tachidesk.manga.impl.update import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.update import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass +import java.util.concurrent.CopyOnWriteArrayList class TestUpdater : IUpdater { - private val updateQueue = ArrayList() + private val updateQueue = CopyOnWriteArrayList() private var isRunning = false + private val _status = MutableStateFlow(UpdateStatus()) + override val status: StateFlow = _status.asStateFlow() override fun addMangaToQueue(manga: MangaDataClass) { updateQueue.add(UpdateJob(manga)) isRunning = true + updateStatus() } - override fun getStatus(): StateFlow { - return MutableStateFlow(UpdateStatus(updateQueue, isRunning)) - } - - override suspend fun reset() { + override fun reset() { updateQueue.clear() isRunning = false + updateStatus() + } + + private fun updateStatus() { + _status.update { UpdateStatus(updateQueue.toList(), isRunning) } } }