diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt index e501a720..6dac4cb3 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt @@ -147,7 +147,7 @@ object DownloadController { val input = json.decodeFromString(ctx.body()) ctx.future( future { - DownloadManager.unqueue(input) + DownloadManager.dequeue(input) } ) }, @@ -167,7 +167,7 @@ object DownloadController { } }, behaviorOf = { ctx, chapterIndex, mangaId -> - DownloadManager.unqueue(chapterIndex, mangaId) + DownloadManager.dequeue(chapterIndex, mangaId) ctx.status(200) }, diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt index 14a68e4c..9b621616 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Chapter.kt @@ -226,7 +226,6 @@ object Chapter { logger.info { "downloadNewChapters($mangaId): Downloading \"${chapterIdsToDownload.size}\" new chapter(s)..." } DownloadManager.enqueue(EnqueueInput(chapterIdsToDownload)) - DownloadManager.start() } fun modifyChapter( diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt index 517f4ae0..0173ed6e 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt @@ -29,6 +29,8 @@ import org.jetbrains.exposed.sql.transactions.transaction import suwayomi.tachidesk.graphql.subscriptions.downloadSubscriptionSource import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading +import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error +import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus import suwayomi.tachidesk.manga.impl.download.model.Status import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass @@ -38,7 +40,6 @@ import suwayomi.tachidesk.manga.model.table.MangaTable import suwayomi.tachidesk.manga.model.table.toDataClass import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get -import java.io.IOException import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import kotlin.reflect.jvm.jvmName @@ -52,7 +53,7 @@ object DownloadManager { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val clients = ConcurrentHashMap() private val downloadQueue = CopyOnWriteArrayList() - private val downloaders = ConcurrentHashMap() + private val downloaders = ConcurrentHashMap() private const val downloadQueueKey = "downloadQueueKey" private val sharedPreferences = @@ -73,7 +74,6 @@ object DownloadManager { if (downloadQueue.size > 0) { logger.info { "restoreAndResumeDownloads: Restored download queue, starting downloads..." } - start() } } @@ -152,10 +152,13 @@ object DownloadManager { scope.launch { downloaderWatch.sample(1.seconds).collect { val runningDownloaders = downloaders.values.filter { it.isActive } - logger.info { "Running: ${runningDownloaders.size}, Queued: ${downloadQueue.size}" } + val availableDownloads = downloadQueue.filter { it.state != Error } + + logger.info { "Running: ${runningDownloaders.size}, Queued: ${availableDownloads.size}, Failed: ${downloadQueue.size - availableDownloads.size}" } + if (runningDownloaders.size < MAX_SOURCES_IN_PARAllEL) { - downloadQueue.asSequence() - .map { it.manga.sourceId.toLong() } + availableDownloads.asSequence() + .map { it.manga.sourceId } .distinct() .minus( runningDownloaders.map { it.sourceId }.toSet() @@ -178,7 +181,7 @@ object DownloadManager { } } - private fun getDownloader(sourceId: Long) = downloaders.getOrPut(sourceId) { + private fun getDownloader(sourceId: String) = downloaders.getOrPut(sourceId) { Downloader( scope = scope, sourceId = sourceId, @@ -233,15 +236,6 @@ object DownloadManager { addMultipleToQueue(inputPairs) } - fun unqueue(input: EnqueueInput) { - if (input.chapterIds.isNullOrEmpty()) return - - downloadQueue.removeIf { it.chapter.id in input.chapterIds } - saveDownloadQueue() - - notifyAllClients() - } - /** * Tries to add multiple inputs to queue * If any of inputs was actually added to queue, starts the queue @@ -262,26 +256,52 @@ object DownloadManager { * If chapter is added, returns the created DownloadChapter, otherwise returns null */ private fun addToQueue(manga: MangaDataClass, chapter: ChapterDataClass): DownloadChapter? { - if (downloadQueue.none { it.mangaId == manga.id && it.chapterIndex == chapter.index }) { - val downloadChapter = DownloadChapter( + val downloadChapter = downloadQueue.firstOrNull { it.mangaId == manga.id && it.chapterIndex == chapter.index } + + val addToQueue = downloadChapter == null + if (addToQueue) { + val newDownloadChapter = DownloadChapter( chapter.index, manga.id, chapter, manga ) - downloadQueue.add(downloadChapter) + downloadQueue.add(newDownloadChapter) saveDownloadQueue() - downloadSubscriptionSource.publish(downloadChapter) - logger.debug { "Added chapter ${chapter.id} to download queue (${manga.title} | ${chapter.name})" } + downloadSubscriptionSource.publish(newDownloadChapter) + logger.debug { "Added chapter ${chapter.id} to download queue ($newDownloadChapter)" } + return newDownloadChapter + } + + val retryDownload = downloadChapter?.state == Error + if (retryDownload) { + logger.debug { "Chapter ${chapter.id} download failed, retry download ($downloadChapter)" } + + downloadChapter?.state = Queued + downloadChapter?.progress = 0f + return downloadChapter } - logger.debug { "Chapter ${chapter.id} already present in queue (${manga.title} | ${chapter.name})" } + + logger.debug { "Chapter ${chapter.id} already present in queue ($downloadChapter)" } return null } - fun unqueue(chapterIndex: Int, mangaId: Int) { - downloadQueue.removeIf { it.mangaId == mangaId && it.chapterIndex == chapterIndex } + fun dequeue(input: EnqueueInput) { + if (input.chapterIds.isNullOrEmpty()) return + dequeue(downloadQueue.filter { it.chapter.id in input.chapterIds }.toSet()) + } + + fun dequeue(chapterIndex: Int, mangaId: Int) { + dequeue(downloadQueue.filter { it.mangaId == mangaId && it.chapterIndex == chapterIndex }.toSet()) + } + + private fun dequeue(chapterDownloads: Set) { + logger.debug { "dequeue ${chapterDownloads.size} chapters [${chapterDownloads.joinToString(separator = ", ") { "$it" }}]" } + + downloadQueue.removeAll(chapterDownloads) saveDownloadQueue() + notifyAllClients() } @@ -289,18 +309,25 @@ object DownloadManager { require(to >= 0) { "'to' must be over or equal to 0" } val download = downloadQueue.find { it.mangaId == mangaId && it.chapterIndex == chapterIndex } ?: return + + logger.debug { "reorder download $download from ${downloadQueue.indexOf(download)} to $to" } + downloadQueue -= download downloadQueue.add(to, download) saveDownloadQueue() } fun start() { + logger.debug { "start" } + scope.launch { downloaderWatch.emit(Unit) } } suspend fun stop() { + logger.debug { "stop" } + coroutineScope { downloaders.map { (_, downloader) -> async { @@ -312,6 +339,8 @@ object DownloadManager { } suspend fun clear() { + logger.debug { "clear" } + stop() downloadQueue.clear() saveDownloadQueue() diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt index ddcc549a..e00ac578 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt @@ -29,23 +29,26 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued import suwayomi.tachidesk.manga.model.table.ChapterTable import java.util.concurrent.CopyOnWriteArrayList -private val logger = KotlinLogging.logger {} - class Downloader( private val scope: CoroutineScope, - val sourceId: Long, + val sourceId: String, private val downloadQueue: CopyOnWriteArrayList, private val notifier: (immediate: Boolean) -> Unit, private val onComplete: () -> Unit ) { + private val logger = KotlinLogging.logger("${Downloader::class.java.name} source($sourceId)") + private var job: Job? = null + private val availableSourceDownloads + get() = downloadQueue.filter { it.manga.sourceId == sourceId } + class StopDownloadException : Exception("Cancelled download") class PauseDownloadException : Exception("Pause download") private suspend fun step(download: DownloadChapter?, immediate: Boolean) { notifier(immediate) currentCoroutineContext().ensureActive() - if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) { + if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) { if (download in downloadQueue) { throw PauseDownloadException() } else { @@ -64,26 +67,32 @@ class Downloader( }.also { job -> job.invokeOnCompletion { if (it !is CancellationException) { + logger.debug { "completed" } onComplete() } } } + logger.debug { "started" } + notifier(false) } - - notifier(false) } suspend fun stop() { job?.cancelAndJoin() + logger.debug { "stopped" } } private suspend fun run() { while (downloadQueue.isNotEmpty() && currentCoroutineContext().isActive) { - val download = downloadQueue.firstOrNull { - it.manga.sourceId.toLong() == sourceId && - (it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download + val download = availableSourceDownloads.firstOrNull { + (it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download } ?: break + val logContext = "${logger.name} - downloadChapter($download))" + val downloadLogger = KotlinLogging.logger(logContext) + + downloadLogger.debug { "start" } + try { download.state = Downloading step(download, true) @@ -102,13 +111,15 @@ class Downloader( downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex } step(null, false) + downloadLogger.debug { "finished" } } catch (e: CancellationException) { logger.debug("Downloader was stopped") - downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued } + availableSourceDownloads.filter { it.state == Downloading }.forEach { it.state = Queued } } catch (e: PauseDownloadException) { + downloadLogger.debug { "paused" } download.state = Queued } catch (e: Exception) { - logger.info("Downloader faced an exception", e) + downloadLogger.warn("failed due to", e) download.tries++ download.state = Error } finally { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt index f101150b..2cd82654 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt @@ -19,4 +19,8 @@ class DownloadChapter( var state: DownloadState = Queued, var progress: Float = 0f, var tries: Int = 0 -) +) { + override fun toString(): String { + return "${manga.title} ($mangaId) - ${chapter.name} (${chapter.id}) | state= $state, tries= $tries, progress= $progress" + } +}