Implement Update of Library/Category (#235)

* Implement Update Controller tests

* Basic Threading and notify

* WIP

* Reworked using coroutines

* Use Map for JobSummary Tracking

* Change Tests

* Clean up

* Changes based on review

* Rethrow cancellationexception

* Clean up

* Fix Merge Error

* Actually handle messages

* Clean up

* Remove useless annotation
This commit is contained in:
Sascha Hahne
2021-11-10 20:08:41 +01:00
committed by GitHub
parent 14e02bee6c
commit 2cb2ded2d9
14 changed files with 419 additions and 0 deletions
@@ -113,6 +113,9 @@ object MangaAPI {
path("update") {
get("recentChapters/{pageNum}", UpdateController::recentChapters)
post("fetch", UpdateController::categoryUpdate)
get("summary", UpdateController::updateSummary)
ws("", UpdateController::categoryUpdateWS)
}
}
}
@@ -1,7 +1,19 @@
package suwayomi.tachidesk.manga.controller
import io.javalin.http.Context
import io.javalin.http.HttpCode
import io.javalin.websocket.WsConfig
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.kodein.di.DI
import org.kodein.di.conf.global
import org.kodein.di.instance
import suwayomi.tachidesk.manga.impl.Category
import suwayomi.tachidesk.manga.impl.CategoryManga
import suwayomi.tachidesk.manga.impl.Chapter
import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.manga.impl.update.UpdaterSocket
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
import suwayomi.tachidesk.server.JavalinSetup.future
/*
@@ -12,6 +24,8 @@ import suwayomi.tachidesk.server.JavalinSetup.future
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
object UpdateController {
private val logger = KotlinLogging.logger { }
/** get recently updated manga chapters */
fun recentChapters(ctx: Context) {
val pageNum = ctx.pathParam("pageNum").toInt()
@@ -22,4 +36,54 @@ object UpdateController {
}
)
}
fun categoryUpdate(ctx: Context) {
val categoryId = ctx.formParam("category")?.toIntOrNull()
val categoriesForUpdate = ArrayList<CategoryDataClass>()
if (categoryId == null) {
logger.info { "Adding Library to Update Queue" }
categoriesForUpdate.addAll(Category.getCategoryList())
} else {
val category = Category.getCategoryById(categoryId)
if (category != null) {
categoriesForUpdate.add(category)
} else {
logger.info { "No Category found" }
ctx.status(HttpCode.BAD_REQUEST)
return
}
}
addCategoriesToUpdateQueue(categoriesForUpdate, true)
ctx.status(HttpCode.OK)
}
private fun addCategoriesToUpdateQueue(categories: List<CategoryDataClass>, clear: Boolean = false) {
val updater by DI.global.instance<IUpdater>()
if (clear) {
runBlocking { updater.reset() }
}
categories.forEach { category ->
val mangas = CategoryManga.getCategoryMangaList(category.id)
mangas.forEach { manga ->
updater.addMangaToQueue(manga)
}
}
}
fun categoryUpdateWS(ws: WsConfig) {
ws.onConnect { ctx ->
UpdaterSocket.addClient(ctx)
}
ws.onMessage { ctx ->
UpdaterSocket.handleRequest(ctx)
}
ws.onClose { ctx ->
UpdaterSocket.removeClient(ctx)
}
}
fun updateSummary(ctx: Context) {
val updater by DI.global.instance<IUpdater>()
ctx.json(updater.getStatus().value.getJsonSummary())
}
}
@@ -109,4 +109,12 @@ object Category {
addDefaultIfNecessary(categories)
}
}
fun getCategoryById(categoryId: Int): CategoryDataClass? {
return transaction {
CategoryTable.select { CategoryTable.id eq categoryId }.firstOrNull()?.let {
CategoryTable.toDataClass(it)
}
}
}
}
@@ -0,0 +1,10 @@
package suwayomi.tachidesk.manga.impl.update
import kotlinx.coroutines.flow.StateFlow
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
interface IUpdater {
fun addMangaToQueue(manga: MangaDataClass)
fun getStatus(): StateFlow<UpdateStatus>
suspend fun reset(): Unit
}
@@ -0,0 +1,17 @@
package suwayomi.tachidesk.manga.impl.update
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
enum class JobStatus {
PENDING,
RUNNING,
COMPLETE,
FAILED
}
class UpdateJob(val manga: MangaDataClass, var status: JobStatus = JobStatus.PENDING) {
override fun toString(): String {
return "UpdateJob(status=$status, manga=${manga.title})"
}
}
@@ -0,0 +1,33 @@
package suwayomi.tachidesk.manga.impl.update
import mu.KotlinLogging
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
var logger = KotlinLogging.logger {}
class UpdateStatus(
var statusMap: MutableMap<JobStatus, MutableList<MangaDataClass>> = mutableMapOf<JobStatus, MutableList<MangaDataClass>>(),
var running: Boolean = false,
) {
var numberOfJobs: Int = 0
constructor(jobs: List<UpdateJob>, running: Boolean) : this(
mutableMapOf<JobStatus, MutableList<MangaDataClass>>(),
running
) {
this.numberOfJobs = jobs.size
jobs.forEach {
val list = statusMap.getOrDefault(it.status, mutableListOf())
list.add(it.manga)
statusMap[it.status] = list
}
}
override fun toString(): String {
return "UpdateStatus(statusMap=${statusMap.map { "${it.key} : ${it.value.size}" }.joinToString("; ")}, running=$running)"
}
// serialize to summary json
fun getJsonSummary(): String {
return """{"statusMap":{${statusMap.map { "\"${it.key}\" : ${it.value.size}" }.joinToString(",")}}, "running":$running}"""
}
}
@@ -0,0 +1,76 @@
package suwayomi.tachidesk.manga.impl.update
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import mu.KotlinLogging
import suwayomi.tachidesk.manga.impl.Chapter
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
class Updater : IUpdater {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var tracker = mutableMapOf<String, UpdateJob>()
private var updateChannel = Channel<UpdateJob>()
private val statusChannel = MutableStateFlow(UpdateStatus())
private var updateJob: Job? = null
init {
updateJob = createUpdateJob()
}
private fun createUpdateJob(): Job {
return scope.launch {
while (true) {
val job = updateChannel.receive()
process(job)
statusChannel.value = UpdateStatus(tracker.values.toList(), !updateChannel.isEmpty)
}
}
}
private suspend fun process(job: UpdateJob) {
job.status = JobStatus.RUNNING
tracker["${job.manga.id}"] = job
statusChannel.value = UpdateStatus(tracker.values.toList(), true)
try {
logger.info { "Updating ${job.manga.title}" }
Chapter.getChapterList(job.manga.id, true)
job.status = JobStatus.COMPLETE
} catch (e: Exception) {
if (e is CancellationException) throw e
logger.error(e) { "Error while updating ${job.manga.title}" }
job.status = JobStatus.FAILED
}
tracker["${job.manga.id}"] = job
}
override fun addMangaToQueue(manga: MangaDataClass) {
scope.launch {
updateChannel.send(UpdateJob(manga))
}
tracker["${manga.id}"] = UpdateJob(manga)
statusChannel.value = UpdateStatus(tracker.values.toList(), true)
}
override fun getStatus(): StateFlow<UpdateStatus> {
return statusChannel
}
override suspend fun reset() {
tracker.clear()
updateChannel.cancel()
statusChannel.value = UpdateStatus()
updateJob?.cancel("Reset")
updateChannel = Channel()
updateJob = createUpdateJob()
}
}
@@ -0,0 +1,65 @@
package suwayomi.tachidesk.manga.impl.update
import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.kodein.di.DI
import org.kodein.di.conf.global
import org.kodein.di.instance
object UpdaterSocket : Websocket() {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val updater by DI.global.instance<IUpdater>()
private var job: Job? = null
override fun notifyClient(ctx: WsContext) {
ctx.send(updater.getStatus().value.getJsonSummary())
}
override fun handleRequest(ctx: WsMessageContext) {
when (ctx.message()) {
"STATUS" -> notifyClient(ctx)
else -> ctx.send(
"""
|Invalid command.
|Supported commands are:
| - STATUS
| sends the current update status
|""".trimMargin()
)
}
}
override fun addClient(ctx: WsContext) {
logger.info { ctx.sessionId }
super.addClient(ctx)
if (job == null) {
job = start()
}
}
override fun removeClient(ctx: WsContext) {
super.removeClient(ctx)
if (clients.isEmpty()) {
job?.cancel()
job = null
}
}
fun start(): Job {
return scope.launch {
while (true) {
updater.getStatus().collectLatest {
notifyAllClients()
}
}
}
}
}
@@ -0,0 +1,21 @@
package suwayomi.tachidesk.manga.impl.update
import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import java.util.concurrent.ConcurrentHashMap
abstract class Websocket {
protected val clients = ConcurrentHashMap<String, WsContext>()
open fun addClient(ctx: WsContext) {
clients[ctx.sessionId] = ctx
notifyClient(ctx)
}
open fun removeClient(ctx: WsContext) {
clients.remove(ctx.sessionId)
}
open fun notifyAllClients() {
clients.values.forEach { notifyClient(it) }
}
abstract fun notifyClient(ctx: WsContext)
abstract fun handleRequest(ctx: WsMessageContext)
}
@@ -16,6 +16,8 @@ import org.kodein.di.DI
import org.kodein.di.bind
import org.kodein.di.conf.global
import org.kodein.di.singleton
import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.manga.impl.update.Updater
import suwayomi.tachidesk.server.database.databaseUp
import suwayomi.tachidesk.server.util.AppMutex.handleAppMutex
import suwayomi.tachidesk.server.util.SystemTray.systemTray
@@ -55,6 +57,7 @@ fun applicationSetup() {
DI.global.addImport(
DI.Module("Server") {
bind<ApplicationDirs>() with singleton { applicationDirs }
bind<IUpdater>() with singleton { Updater() }
bind<JsonMapper>() with singleton { JavalinJackson() }
}
)