Updater cleanup and improvements (#416)

This commit is contained in:
Mitchell Syer
2022-10-11 12:27:15 -04:00
committed by GitHub
parent 71730fddad
commit 06eff55210
9 changed files with 100 additions and 108 deletions
@@ -2,7 +2,6 @@ package suwayomi.tachidesk.manga.controller
import io.javalin.http.HttpCode
import io.javalin.websocket.WsConfig
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.kodein.di.DI
import org.kodein.di.conf.global
@@ -15,6 +14,7 @@ import suwayomi.tachidesk.manga.impl.update.UpdateStatus
import suwayomi.tachidesk.manga.impl.update.UpdaterSocket
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaChapterDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import suwayomi.tachidesk.manga.model.dataclass.PaginatedList
import suwayomi.tachidesk.server.JavalinSetup.future
import suwayomi.tachidesk.server.util.formParam
@@ -68,22 +68,18 @@ object UpdateController {
}
},
behaviorOf = { ctx, categoryId ->
val categoriesForUpdate = ArrayList<CategoryDataClass>()
if (categoryId == null) {
logger.info { "Adding Library to Update Queue" }
categoriesForUpdate.addAll(Category.getCategoryList())
addCategoriesToUpdateQueue(Category.getCategoryList(), true)
} else {
val category = Category.getCategoryById(categoryId)
if (category != null) {
categoriesForUpdate.add(category)
addCategoriesToUpdateQueue(listOf(category), true)
} else {
logger.info { "No Category found" }
ctx.status(HttpCode.BAD_REQUEST)
return@handler
}
}
addCategoriesToUpdateQueue(categoriesForUpdate, true)
ctx.status(HttpCode.OK)
},
withResults = {
httpCode(HttpCode.OK)
@@ -94,14 +90,15 @@ object UpdateController {
private fun addCategoriesToUpdateQueue(categories: List<CategoryDataClass>, clear: Boolean = false) {
val updater by DI.global.instance<IUpdater>()
if (clear) {
runBlocking { updater.reset() }
updater.reset()
}
categories.forEach { category ->
val mangas = CategoryManga.getCategoryMangaList(category.id)
mangas.forEach { manga ->
categories
.flatMap { CategoryManga.getCategoryMangaList(it.id) }
.distinctBy { it.id }
.sortedWith(compareBy(String.CASE_INSENSITIVE_ORDER, MangaDataClass::title))
.forEach { manga ->
updater.addMangaToQueue(manga)
}
}
}
fun categoryUpdateWS(ws: WsConfig) {
@@ -125,7 +122,7 @@ object UpdateController {
},
behaviorOf = { ctx ->
val updater by DI.global.instance<IUpdater>()
ctx.json(updater.getStatus().value.getJsonSummary())
ctx.json(updater.status.value)
},
withResults = {
json<UpdateStatus>(HttpCode.OK)
@@ -5,6 +5,6 @@ import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
interface IUpdater {
fun addMangaToQueue(manga: MangaDataClass)
fun getStatus(): StateFlow<UpdateStatus>
suspend fun reset(): Unit
val status: StateFlow<UpdateStatus>
fun reset()
}
@@ -9,9 +9,7 @@ enum class JobStatus {
FAILED
}
class UpdateJob(val manga: MangaDataClass, var status: JobStatus = JobStatus.PENDING) {
override fun toString(): String {
return "UpdateJob(status=$status, manga=${manga.title})"
}
}
data class UpdateJob(
val manga: MangaDataClass,
val status: JobStatus = JobStatus.PENDING
)
@@ -1,33 +1,23 @@
package suwayomi.tachidesk.manga.impl.update
import com.fasterxml.jackson.annotation.JsonIgnore
import mu.KotlinLogging
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
var logger = KotlinLogging.logger {}
class UpdateStatus(
var statusMap: MutableMap<JobStatus, MutableList<MangaDataClass>> = mutableMapOf<JobStatus, MutableList<MangaDataClass>>(),
var running: Boolean = false,
val logger = KotlinLogging.logger {}
data class UpdateStatus(
val statusMap: Map<JobStatus, List<MangaDataClass>> = emptyMap(),
val running: Boolean = false,
@JsonIgnore
val numberOfJobs: Int = 0
) {
var numberOfJobs: Int = 0
constructor(jobs: List<UpdateJob>, running: Boolean) : this(
mutableMapOf<JobStatus, MutableList<MangaDataClass>>(),
running
) {
this.numberOfJobs = jobs.size
jobs.forEach {
val list = statusMap.getOrDefault(it.status, mutableListOf())
list.add(it.manga)
statusMap[it.status] = list
}
}
override fun toString(): String {
return "UpdateStatus(statusMap=${statusMap.map { "${it.key} : ${it.value.size}" }.joinToString("; ")}, running=$running)"
}
// serialize to summary json
fun getJsonSummary(): String {
return """{"statusMap":{${statusMap.map { "\"${it.key}\" : ${it.value.size}" }.joinToString(",")}}, "running":$running}"""
}
statusMap = jobs.groupBy { it.status }
.mapValues { entry ->
entry.value.map { it.manga }
},
running = running,
numberOfJobs = jobs.size
)
}
@@ -3,74 +3,76 @@ package suwayomi.tachidesk.manga.impl.update
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import mu.KotlinLogging
import suwayomi.tachidesk.manga.impl.Chapter
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import java.util.concurrent.ConcurrentHashMap
class Updater : IUpdater {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var tracker = mutableMapOf<String, UpdateJob>()
private var updateChannel = Channel<UpdateJob>()
private val statusChannel = MutableStateFlow(UpdateStatus())
private var updateJob: Job? = null
private val _status = MutableStateFlow(UpdateStatus())
override val status = _status.asStateFlow()
init {
updateJob = createUpdateJob()
}
private val tracker = ConcurrentHashMap<Int, UpdateJob>()
private var updateChannel = createUpdateChannel()
private fun createUpdateJob(): Job {
return scope.launch {
while (true) {
val job = updateChannel.receive()
process(job)
statusChannel.value = UpdateStatus(tracker.values.toList(), !updateChannel.isEmpty)
private fun createUpdateChannel(): Channel<UpdateJob> {
val channel = Channel<UpdateJob>(Channel.UNLIMITED)
channel.consumeAsFlow()
.onEach { job ->
_status.value = UpdateStatus(
process(job),
tracker.any { (_, job) ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
}
)
}
}
.catch { logger.error(it) { "Error during updates" } }
.launchIn(scope)
return channel
}
private suspend fun process(job: UpdateJob) {
job.status = JobStatus.RUNNING
tracker["${job.manga.id}"] = job
statusChannel.value = UpdateStatus(tracker.values.toList(), true)
try {
private suspend fun process(job: UpdateJob): List<UpdateJob> {
tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING)
_status.update { UpdateStatus(tracker.values.toList(), true) }
tracker[job.manga.id] = try {
logger.info { "Updating ${job.manga.title}" }
Chapter.getChapterList(job.manga.id, true)
job.status = JobStatus.COMPLETE
job.copy(status = JobStatus.COMPLETE)
} catch (e: Exception) {
if (e is CancellationException) throw e
logger.error(e) { "Error while updating ${job.manga.title}" }
job.status = JobStatus.FAILED
job.copy(status = JobStatus.FAILED)
}
tracker["${job.manga.id}"] = job
return tracker.values.toList()
}
override fun addMangaToQueue(manga: MangaDataClass) {
scope.launch {
updateChannel.send(UpdateJob(manga))
}
tracker["${manga.id}"] = UpdateJob(manga)
statusChannel.value = UpdateStatus(tracker.values.toList(), true)
tracker[manga.id] = UpdateJob(manga)
_status.update { UpdateStatus(tracker.values.toList(), true) }
}
override fun getStatus(): StateFlow<UpdateStatus> {
return statusChannel
}
override suspend fun reset() {
override fun reset() {
scope.coroutineContext.cancelChildren()
tracker.clear()
_status.update { UpdateStatus() }
updateChannel.cancel()
statusChannel.value = UpdateStatus()
updateJob?.cancel("Reset")
updateChannel = Channel()
updateJob = createUpdateJob()
updateChannel = createUpdateChannel()
}
}
@@ -6,26 +6,26 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.kodein.di.DI
import org.kodein.di.conf.global
import org.kodein.di.instance
object UpdaterSocket : Websocket() {
object UpdaterSocket : Websocket<UpdateStatus>() {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val updater by DI.global.instance<IUpdater>()
private var job: Job? = null
override fun notifyClient(ctx: WsContext) {
ctx.send(updater.getStatus().value.getJsonSummary())
override fun notifyClient(ctx: WsContext, value: UpdateStatus?) {
ctx.send(value ?: updater.status.value)
}
override fun handleRequest(ctx: WsMessageContext) {
when (ctx.message()) {
"STATUS" -> notifyClient(ctx)
"STATUS" -> notifyClient(ctx, updater.status.value)
else -> ctx.send(
"""
|Invalid command.
@@ -40,7 +40,7 @@ object UpdaterSocket : Websocket() {
override fun addClient(ctx: WsContext) {
logger.info { ctx.sessionId }
super.addClient(ctx)
if (job == null) {
if (job?.isActive != true) {
job = start()
}
}
@@ -54,12 +54,10 @@ object UpdaterSocket : Websocket() {
}
fun start(): Job {
return scope.launch {
while (true) {
updater.getStatus().collectLatest {
notifyAllClients()
}
return updater.status
.onEach {
notifyAllClients(it)
}
}
.launchIn(scope)
}
}
@@ -4,18 +4,18 @@ import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import java.util.concurrent.ConcurrentHashMap
abstract class Websocket {
abstract class Websocket<T> {
protected val clients = ConcurrentHashMap<String, WsContext>()
open fun addClient(ctx: WsContext) {
clients[ctx.sessionId] = ctx
notifyClient(ctx)
notifyClient(ctx, null)
}
open fun removeClient(ctx: WsContext) {
clients.remove(ctx.sessionId)
}
open fun notifyAllClients() {
clients.values.forEach { notifyClient(it) }
open fun notifyAllClients(value: T) {
clients.values.forEach { notifyClient(it, value) }
}
abstract fun notifyClient(ctx: WsContext)
abstract fun notifyClient(ctx: WsContext, value: T?)
abstract fun handleRequest(ctx: WsMessageContext)
}
@@ -32,7 +32,7 @@ internal class UpdateControllerTest : ApplicationTest() {
UpdateController.categoryUpdate(ctx)
verify { ctx.status(HttpCode.BAD_REQUEST) }
val updater by DI.global.instance<IUpdater>()
assertEquals(0, updater.getStatus().value.numberOfJobs)
assertEquals(0, updater.status.value.numberOfJobs)
}
@Test
@@ -44,7 +44,7 @@ internal class UpdateControllerTest : ApplicationTest() {
UpdateController.categoryUpdate(ctx)
verify { ctx.status(HttpCode.OK) }
val updater by DI.global.instance<IUpdater>()
assertEquals(1, updater.getStatus().value.numberOfJobs)
assertEquals(1, updater.status.value.numberOfJobs)
}
@Test
@@ -60,7 +60,7 @@ internal class UpdateControllerTest : ApplicationTest() {
UpdateController.categoryUpdate(ctx)
verify { ctx.status(HttpCode.OK) }
val updater by DI.global.instance<IUpdater>()
assertEquals(3, updater.getStatus().value.numberOfJobs)
assertEquals(3, updater.status.value.numberOfJobs)
}
private fun createLibraryManga(
@@ -2,23 +2,30 @@ package suwayomi.tachidesk.manga.impl.update
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import java.util.concurrent.CopyOnWriteArrayList
class TestUpdater : IUpdater {
private val updateQueue = ArrayList<UpdateJob>()
private val updateQueue = CopyOnWriteArrayList<UpdateJob>()
private var isRunning = false
private val _status = MutableStateFlow(UpdateStatus())
override val status: StateFlow<UpdateStatus> = _status.asStateFlow()
override fun addMangaToQueue(manga: MangaDataClass) {
updateQueue.add(UpdateJob(manga))
isRunning = true
updateStatus()
}
override fun getStatus(): StateFlow<UpdateStatus> {
return MutableStateFlow(UpdateStatus(updateQueue, isRunning))
}
override suspend fun reset() {
override fun reset() {
updateQueue.clear()
isRunning = false
updateStatus()
}
private fun updateStatus() {
_status.update { UpdateStatus(updateQueue.toList(), isRunning) }
}
}