From 251141a5c3b66babc0b99a208349bbe16582b2d0 Mon Sep 17 00:00:00 2001 From: schroda <50052685+schroda@users.noreply.github.com> Date: Sun, 30 Jul 2023 02:25:12 +0200 Subject: [PATCH] Fix/downloader (#622) * Change log level of download error * Change type of sourceId in Downloader Unclear why it was converted to Long since it just got converted back to String anyway when it was used in the Downloader * Only stop downloads from source of the Downloader The downloader just changed the state of all downloads, ignoring if they are from the source the Downloader is for or not * Remove unnecessary DownloadManager::start calls In case chapters were added to the queue the DownloadManager will start itself * Extract download filtering into property * Improve Downloader logging * Notify clients only in case Downloader was started In case nothing was done there is nothing to notify about * Do not start Downloaders for failed downloads In case there were failed chapter downloads in the queue the DownloadManager still created a Downloader and started it. This Downloader would than immediately call "onComplete", since there is no available download, which then would refresh the Downloaders again which created an infinite loop until the failed download got removed from the queue * Retry download in case it failed it gets re-added to the queue In case a failed downloaded that was still in the queue was tried to get added to the queue again, nothing happened. Instead of doing nothing, the download should get retried. Thus, it also provides the logic to easily retry a failed download by just "adding" the chapter to the queue again. Currently, to retry a failed download, the download has to be removed from the queue and then get re-added. * Rename function "unqueue" to "dequeue" * Move "dequeue" function * Extract dequeue logic into function * Improve DownloadManager logging * Override "toString" of DownloadChapter --- .../manga/controller/DownloadController.kt | 4 +- .../suwayomi/tachidesk/manga/impl/Chapter.kt | 1 - .../manga/impl/download/DownloadManager.kt | 77 +++++++++++++------ .../manga/impl/download/Downloader.kt | 33 +++++--- .../impl/download/model/DownloadChapter.kt | 6 +- 5 files changed, 82 insertions(+), 39 deletions(-) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt index e501a720..6dac4cb3 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt @@ -147,7 +147,7 @@ object DownloadController { val input = json.decodeFromString(ctx.body()) ctx.future( future { - DownloadManager.unqueue(input) + DownloadManager.dequeue(input) } ) }, @@ -167,7 +167,7 @@ object DownloadController { } }, behaviorOf = { ctx, chapterIndex, mangaId -> - DownloadManager.unqueue(chapterIndex, mangaId) + DownloadManager.dequeue(chapterIndex, mangaId) ctx.status(200) }, diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt index 14a68e4c..9b621616 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt @@ -226,7 +226,6 @@ object Chapter { logger.info { "downloadNewChapters($mangaId): Downloading \"${chapterIdsToDownload.size}\" new chapter(s)..." } DownloadManager.enqueue(EnqueueInput(chapterIdsToDownload)) - DownloadManager.start() } fun modifyChapter( diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt index 517f4ae0..0173ed6e 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt @@ -29,6 +29,8 @@ import org.jetbrains.exposed.sql.transactions.transaction import suwayomi.tachidesk.graphql.subscriptions.downloadSubscriptionSource import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading +import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error +import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus import suwayomi.tachidesk.manga.impl.download.model.Status import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass @@ -38,7 +40,6 @@ import suwayomi.tachidesk.manga.model.table.MangaTable import suwayomi.tachidesk.manga.model.table.toDataClass import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get -import java.io.IOException import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import kotlin.reflect.jvm.jvmName @@ -52,7 +53,7 @@ object DownloadManager { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val clients = ConcurrentHashMap() private val downloadQueue = CopyOnWriteArrayList() - private val downloaders = ConcurrentHashMap() + private val downloaders = ConcurrentHashMap() private const val downloadQueueKey = "downloadQueueKey" private val sharedPreferences = @@ -73,7 +74,6 @@ object DownloadManager { if (downloadQueue.size > 0) { logger.info { "restoreAndResumeDownloads: Restored download queue, starting downloads..." } - start() } } @@ -152,10 +152,13 @@ object DownloadManager { scope.launch { downloaderWatch.sample(1.seconds).collect { val runningDownloaders = downloaders.values.filter { it.isActive } - logger.info { "Running: ${runningDownloaders.size}, Queued: ${downloadQueue.size}" } + val availableDownloads = downloadQueue.filter { it.state != Error } + + logger.info { "Running: ${runningDownloaders.size}, Queued: ${availableDownloads.size}, Failed: ${downloadQueue.size - availableDownloads.size}" } + if (runningDownloaders.size < MAX_SOURCES_IN_PARAllEL) { - downloadQueue.asSequence() - .map { it.manga.sourceId.toLong() } + availableDownloads.asSequence() + .map { it.manga.sourceId } .distinct() .minus( runningDownloaders.map { it.sourceId }.toSet() @@ -178,7 +181,7 @@ object DownloadManager { } } - private fun getDownloader(sourceId: Long) = downloaders.getOrPut(sourceId) { + private fun getDownloader(sourceId: String) = downloaders.getOrPut(sourceId) { Downloader( scope = scope, sourceId = sourceId, @@ -233,15 +236,6 @@ object DownloadManager { addMultipleToQueue(inputPairs) } - fun unqueue(input: EnqueueInput) { - if (input.chapterIds.isNullOrEmpty()) return - - downloadQueue.removeIf { it.chapter.id in input.chapterIds } - saveDownloadQueue() - - notifyAllClients() - } - /** * Tries to add multiple inputs to queue * If any of inputs was actually added to queue, starts the queue @@ -262,26 +256,52 @@ object DownloadManager { * If chapter is added, returns the created DownloadChapter, otherwise returns null */ private fun addToQueue(manga: MangaDataClass, chapter: ChapterDataClass): DownloadChapter? { - if (downloadQueue.none { it.mangaId == manga.id && it.chapterIndex == chapter.index }) { - val downloadChapter = DownloadChapter( + val downloadChapter = downloadQueue.firstOrNull { it.mangaId == manga.id && it.chapterIndex == chapter.index } + + val addToQueue = downloadChapter == null + if (addToQueue) { + val newDownloadChapter = DownloadChapter( chapter.index, manga.id, chapter, manga ) - downloadQueue.add(downloadChapter) + downloadQueue.add(newDownloadChapter) saveDownloadQueue() - downloadSubscriptionSource.publish(downloadChapter) - logger.debug { "Added chapter ${chapter.id} to download queue (${manga.title} | ${chapter.name})" } + downloadSubscriptionSource.publish(newDownloadChapter) + logger.debug { "Added chapter ${chapter.id} to download queue ($newDownloadChapter)" } + return newDownloadChapter + } + + val retryDownload = downloadChapter?.state == Error + if (retryDownload) { + logger.debug { "Chapter ${chapter.id} download failed, retry download ($downloadChapter)" } + + downloadChapter?.state = Queued + downloadChapter?.progress = 0f + return downloadChapter } - logger.debug { "Chapter ${chapter.id} already present in queue (${manga.title} | ${chapter.name})" } + + logger.debug { "Chapter ${chapter.id} already present in queue ($downloadChapter)" } return null } - fun unqueue(chapterIndex: Int, mangaId: Int) { - downloadQueue.removeIf { it.mangaId == mangaId && it.chapterIndex == chapterIndex } + fun dequeue(input: EnqueueInput) { + if (input.chapterIds.isNullOrEmpty()) return + dequeue(downloadQueue.filter { it.chapter.id in input.chapterIds }.toSet()) + } + + fun dequeue(chapterIndex: Int, mangaId: Int) { + dequeue(downloadQueue.filter { it.mangaId == mangaId && it.chapterIndex == chapterIndex }.toSet()) + } + + private fun dequeue(chapterDownloads: Set) { + logger.debug { "dequeue ${chapterDownloads.size} chapters [${chapterDownloads.joinToString(separator = ", ") { "$it" }}]" } + + downloadQueue.removeAll(chapterDownloads) saveDownloadQueue() + notifyAllClients() } @@ -289,18 +309,25 @@ object DownloadManager { require(to >= 0) { "'to' must be over or equal to 0" } val download = downloadQueue.find { it.mangaId == mangaId && it.chapterIndex == chapterIndex } ?: return + + logger.debug { "reorder download $download from ${downloadQueue.indexOf(download)} to $to" } + downloadQueue -= download downloadQueue.add(to, download) saveDownloadQueue() } fun start() { + logger.debug { "start" } + scope.launch { downloaderWatch.emit(Unit) } } suspend fun stop() { + logger.debug { "stop" } + coroutineScope { downloaders.map { (_, downloader) -> async { @@ -312,6 +339,8 @@ object DownloadManager { } suspend fun clear() { + logger.debug { "clear" } + stop() downloadQueue.clear() saveDownloadQueue() diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt index ddcc549a..e00ac578 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt @@ -29,23 +29,26 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued import suwayomi.tachidesk.manga.model.table.ChapterTable import java.util.concurrent.CopyOnWriteArrayList -private val logger = KotlinLogging.logger {} - class Downloader( private val scope: CoroutineScope, - val sourceId: Long, + val sourceId: String, private val downloadQueue: CopyOnWriteArrayList, private val notifier: (immediate: Boolean) -> Unit, private val onComplete: () -> Unit ) { + private val logger = KotlinLogging.logger("${Downloader::class.java.name} source($sourceId)") + private var job: Job? = null + private val availableSourceDownloads + get() = downloadQueue.filter { it.manga.sourceId == sourceId } + class StopDownloadException : Exception("Cancelled download") class PauseDownloadException : Exception("Pause download") private suspend fun step(download: DownloadChapter?, immediate: Boolean) { notifier(immediate) currentCoroutineContext().ensureActive() - if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) { + if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) { if (download in downloadQueue) { throw PauseDownloadException() } else { @@ -64,26 +67,32 @@ class Downloader( }.also { job -> job.invokeOnCompletion { if (it !is CancellationException) { + logger.debug { "completed" } onComplete() } } } + logger.debug { "started" } + notifier(false) } - - notifier(false) } suspend fun stop() { job?.cancelAndJoin() + logger.debug { "stopped" } } private suspend fun run() { while (downloadQueue.isNotEmpty() && currentCoroutineContext().isActive) { - val download = downloadQueue.firstOrNull { - it.manga.sourceId.toLong() == sourceId && - (it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download + val download = availableSourceDownloads.firstOrNull { + (it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download } ?: break + val logContext = "${logger.name} - downloadChapter($download))" + val downloadLogger = KotlinLogging.logger(logContext) + + downloadLogger.debug { "start" } + try { download.state = Downloading step(download, true) @@ -102,13 +111,15 @@ class Downloader( downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex } step(null, false) + downloadLogger.debug { "finished" } } catch (e: CancellationException) { logger.debug("Downloader was stopped") - downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued } + availableSourceDownloads.filter { it.state == Downloading }.forEach { it.state = Queued } } catch (e: PauseDownloadException) { + downloadLogger.debug { "paused" } download.state = Queued } catch (e: Exception) { - logger.info("Downloader faced an exception", e) + downloadLogger.warn("failed due to", e) download.tries++ download.state = Error } finally { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt index f101150b..2cd82654 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt @@ -19,4 +19,8 @@ class DownloadChapter( var state: DownloadState = Queued, var progress: Float = 0f, var tries: Int = 0 -) +) { + override fun toString(): String { + return "${manga.title} ($mangaId) - ${chapter.name} (${chapter.id}) | state= $state, tries= $tries, progress= $progress" + } +}