Fix/downloader (#622)
* Change log level of download error * Change type of sourceId in Downloader Unclear why it was converted to Long since it just got converted back to String anyway when it was used in the Downloader * Only stop downloads from source of the Downloader The downloader just changed the state of all downloads, ignoring if they are from the source the Downloader is for or not * Remove unnecessary DownloadManager::start calls In case chapters were added to the queue the DownloadManager will start itself * Extract download filtering into property * Improve Downloader logging * Notify clients only in case Downloader was started In case nothing was done there is nothing to notify about * Do not start Downloaders for failed downloads In case there were failed chapter downloads in the queue the DownloadManager still created a Downloader and started it. This Downloader would than immediately call "onComplete", since there is no available download, which then would refresh the Downloaders again which created an infinite loop until the failed download got removed from the queue * Retry download in case it failed it gets re-added to the queue In case a failed downloaded that was still in the queue was tried to get added to the queue again, nothing happened. Instead of doing nothing, the download should get retried. Thus, it also provides the logic to easily retry a failed download by just "adding" the chapter to the queue again. Currently, to retry a failed download, the download has to be removed from the queue and then get re-added. * Rename function "unqueue" to "dequeue" * Move "dequeue" function * Extract dequeue logic into function * Improve DownloadManager logging * Override "toString" of DownloadChapter
This commit is contained in:
@@ -147,7 +147,7 @@ object DownloadController {
|
||||
val input = json.decodeFromString<EnqueueInput>(ctx.body())
|
||||
ctx.future(
|
||||
future {
|
||||
DownloadManager.unqueue(input)
|
||||
DownloadManager.dequeue(input)
|
||||
}
|
||||
)
|
||||
},
|
||||
@@ -167,7 +167,7 @@ object DownloadController {
|
||||
}
|
||||
},
|
||||
behaviorOf = { ctx, chapterIndex, mangaId ->
|
||||
DownloadManager.unqueue(chapterIndex, mangaId)
|
||||
DownloadManager.dequeue(chapterIndex, mangaId)
|
||||
|
||||
ctx.status(200)
|
||||
},
|
||||
|
||||
@@ -226,7 +226,6 @@ object Chapter {
|
||||
logger.info { "downloadNewChapters($mangaId): Downloading \"${chapterIdsToDownload.size}\" new chapter(s)..." }
|
||||
|
||||
DownloadManager.enqueue(EnqueueInput(chapterIdsToDownload))
|
||||
DownloadManager.start()
|
||||
}
|
||||
|
||||
fun modifyChapter(
|
||||
|
||||
@@ -29,6 +29,8 @@ import org.jetbrains.exposed.sql.transactions.transaction
|
||||
import suwayomi.tachidesk.graphql.subscriptions.downloadSubscriptionSource
|
||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
|
||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading
|
||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
|
||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
|
||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
|
||||
import suwayomi.tachidesk.manga.impl.download.model.Status
|
||||
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
|
||||
@@ -38,7 +40,6 @@ import suwayomi.tachidesk.manga.model.table.MangaTable
|
||||
import suwayomi.tachidesk.manga.model.table.toDataClass
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import kotlin.reflect.jvm.jvmName
|
||||
@@ -52,7 +53,7 @@ object DownloadManager {
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
||||
private val clients = ConcurrentHashMap<String, WsContext>()
|
||||
private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>()
|
||||
private val downloaders = ConcurrentHashMap<Long, Downloader>()
|
||||
private val downloaders = ConcurrentHashMap<String, Downloader>()
|
||||
|
||||
private const val downloadQueueKey = "downloadQueueKey"
|
||||
private val sharedPreferences =
|
||||
@@ -73,7 +74,6 @@ object DownloadManager {
|
||||
|
||||
if (downloadQueue.size > 0) {
|
||||
logger.info { "restoreAndResumeDownloads: Restored download queue, starting downloads..." }
|
||||
start()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,10 +152,13 @@ object DownloadManager {
|
||||
scope.launch {
|
||||
downloaderWatch.sample(1.seconds).collect {
|
||||
val runningDownloaders = downloaders.values.filter { it.isActive }
|
||||
logger.info { "Running: ${runningDownloaders.size}, Queued: ${downloadQueue.size}" }
|
||||
val availableDownloads = downloadQueue.filter { it.state != Error }
|
||||
|
||||
logger.info { "Running: ${runningDownloaders.size}, Queued: ${availableDownloads.size}, Failed: ${downloadQueue.size - availableDownloads.size}" }
|
||||
|
||||
if (runningDownloaders.size < MAX_SOURCES_IN_PARAllEL) {
|
||||
downloadQueue.asSequence()
|
||||
.map { it.manga.sourceId.toLong() }
|
||||
availableDownloads.asSequence()
|
||||
.map { it.manga.sourceId }
|
||||
.distinct()
|
||||
.minus(
|
||||
runningDownloaders.map { it.sourceId }.toSet()
|
||||
@@ -178,7 +181,7 @@ object DownloadManager {
|
||||
}
|
||||
}
|
||||
|
||||
private fun getDownloader(sourceId: Long) = downloaders.getOrPut(sourceId) {
|
||||
private fun getDownloader(sourceId: String) = downloaders.getOrPut(sourceId) {
|
||||
Downloader(
|
||||
scope = scope,
|
||||
sourceId = sourceId,
|
||||
@@ -233,15 +236,6 @@ object DownloadManager {
|
||||
addMultipleToQueue(inputPairs)
|
||||
}
|
||||
|
||||
fun unqueue(input: EnqueueInput) {
|
||||
if (input.chapterIds.isNullOrEmpty()) return
|
||||
|
||||
downloadQueue.removeIf { it.chapter.id in input.chapterIds }
|
||||
saveDownloadQueue()
|
||||
|
||||
notifyAllClients()
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to add multiple inputs to queue
|
||||
* If any of inputs was actually added to queue, starts the queue
|
||||
@@ -262,26 +256,52 @@ object DownloadManager {
|
||||
* If chapter is added, returns the created DownloadChapter, otherwise returns null
|
||||
*/
|
||||
private fun addToQueue(manga: MangaDataClass, chapter: ChapterDataClass): DownloadChapter? {
|
||||
if (downloadQueue.none { it.mangaId == manga.id && it.chapterIndex == chapter.index }) {
|
||||
val downloadChapter = DownloadChapter(
|
||||
val downloadChapter = downloadQueue.firstOrNull { it.mangaId == manga.id && it.chapterIndex == chapter.index }
|
||||
|
||||
val addToQueue = downloadChapter == null
|
||||
if (addToQueue) {
|
||||
val newDownloadChapter = DownloadChapter(
|
||||
chapter.index,
|
||||
manga.id,
|
||||
chapter,
|
||||
manga
|
||||
)
|
||||
downloadQueue.add(downloadChapter)
|
||||
downloadQueue.add(newDownloadChapter)
|
||||
saveDownloadQueue()
|
||||
downloadSubscriptionSource.publish(downloadChapter)
|
||||
logger.debug { "Added chapter ${chapter.id} to download queue (${manga.title} | ${chapter.name})" }
|
||||
downloadSubscriptionSource.publish(newDownloadChapter)
|
||||
logger.debug { "Added chapter ${chapter.id} to download queue ($newDownloadChapter)" }
|
||||
return newDownloadChapter
|
||||
}
|
||||
|
||||
val retryDownload = downloadChapter?.state == Error
|
||||
if (retryDownload) {
|
||||
logger.debug { "Chapter ${chapter.id} download failed, retry download ($downloadChapter)" }
|
||||
|
||||
downloadChapter?.state = Queued
|
||||
downloadChapter?.progress = 0f
|
||||
|
||||
return downloadChapter
|
||||
}
|
||||
logger.debug { "Chapter ${chapter.id} already present in queue (${manga.title} | ${chapter.name})" }
|
||||
|
||||
logger.debug { "Chapter ${chapter.id} already present in queue ($downloadChapter)" }
|
||||
return null
|
||||
}
|
||||
|
||||
fun unqueue(chapterIndex: Int, mangaId: Int) {
|
||||
downloadQueue.removeIf { it.mangaId == mangaId && it.chapterIndex == chapterIndex }
|
||||
fun dequeue(input: EnqueueInput) {
|
||||
if (input.chapterIds.isNullOrEmpty()) return
|
||||
dequeue(downloadQueue.filter { it.chapter.id in input.chapterIds }.toSet())
|
||||
}
|
||||
|
||||
fun dequeue(chapterIndex: Int, mangaId: Int) {
|
||||
dequeue(downloadQueue.filter { it.mangaId == mangaId && it.chapterIndex == chapterIndex }.toSet())
|
||||
}
|
||||
|
||||
private fun dequeue(chapterDownloads: Set<DownloadChapter>) {
|
||||
logger.debug { "dequeue ${chapterDownloads.size} chapters [${chapterDownloads.joinToString(separator = ", ") { "$it" }}]" }
|
||||
|
||||
downloadQueue.removeAll(chapterDownloads)
|
||||
saveDownloadQueue()
|
||||
|
||||
notifyAllClients()
|
||||
}
|
||||
|
||||
@@ -289,18 +309,25 @@ object DownloadManager {
|
||||
require(to >= 0) { "'to' must be over or equal to 0" }
|
||||
val download = downloadQueue.find { it.mangaId == mangaId && it.chapterIndex == chapterIndex }
|
||||
?: return
|
||||
|
||||
logger.debug { "reorder download $download from ${downloadQueue.indexOf(download)} to $to" }
|
||||
|
||||
downloadQueue -= download
|
||||
downloadQueue.add(to, download)
|
||||
saveDownloadQueue()
|
||||
}
|
||||
|
||||
fun start() {
|
||||
logger.debug { "start" }
|
||||
|
||||
scope.launch {
|
||||
downloaderWatch.emit(Unit)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun stop() {
|
||||
logger.debug { "stop" }
|
||||
|
||||
coroutineScope {
|
||||
downloaders.map { (_, downloader) ->
|
||||
async {
|
||||
@@ -312,6 +339,8 @@ object DownloadManager {
|
||||
}
|
||||
|
||||
suspend fun clear() {
|
||||
logger.debug { "clear" }
|
||||
|
||||
stop()
|
||||
downloadQueue.clear()
|
||||
saveDownloadQueue()
|
||||
|
||||
@@ -29,23 +29,26 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
|
||||
import suwayomi.tachidesk.manga.model.table.ChapterTable
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
class Downloader(
|
||||
private val scope: CoroutineScope,
|
||||
val sourceId: Long,
|
||||
val sourceId: String,
|
||||
private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>,
|
||||
private val notifier: (immediate: Boolean) -> Unit,
|
||||
private val onComplete: () -> Unit
|
||||
) {
|
||||
private val logger = KotlinLogging.logger("${Downloader::class.java.name} source($sourceId)")
|
||||
|
||||
private var job: Job? = null
|
||||
private val availableSourceDownloads
|
||||
get() = downloadQueue.filter { it.manga.sourceId == sourceId }
|
||||
|
||||
class StopDownloadException : Exception("Cancelled download")
|
||||
class PauseDownloadException : Exception("Pause download")
|
||||
|
||||
private suspend fun step(download: DownloadChapter?, immediate: Boolean) {
|
||||
notifier(immediate)
|
||||
currentCoroutineContext().ensureActive()
|
||||
if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) {
|
||||
if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) {
|
||||
if (download in downloadQueue) {
|
||||
throw PauseDownloadException()
|
||||
} else {
|
||||
@@ -64,26 +67,32 @@ class Downloader(
|
||||
}.also { job ->
|
||||
job.invokeOnCompletion {
|
||||
if (it !is CancellationException) {
|
||||
logger.debug { "completed" }
|
||||
onComplete()
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.debug { "started" }
|
||||
notifier(false)
|
||||
}
|
||||
|
||||
notifier(false)
|
||||
}
|
||||
|
||||
suspend fun stop() {
|
||||
job?.cancelAndJoin()
|
||||
logger.debug { "stopped" }
|
||||
}
|
||||
|
||||
private suspend fun run() {
|
||||
while (downloadQueue.isNotEmpty() && currentCoroutineContext().isActive) {
|
||||
val download = downloadQueue.firstOrNull {
|
||||
it.manga.sourceId.toLong() == sourceId &&
|
||||
(it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download
|
||||
val download = availableSourceDownloads.firstOrNull {
|
||||
(it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download
|
||||
} ?: break
|
||||
|
||||
val logContext = "${logger.name} - downloadChapter($download))"
|
||||
val downloadLogger = KotlinLogging.logger(logContext)
|
||||
|
||||
downloadLogger.debug { "start" }
|
||||
|
||||
try {
|
||||
download.state = Downloading
|
||||
step(download, true)
|
||||
@@ -102,13 +111,15 @@ class Downloader(
|
||||
|
||||
downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex }
|
||||
step(null, false)
|
||||
downloadLogger.debug { "finished" }
|
||||
} catch (e: CancellationException) {
|
||||
logger.debug("Downloader was stopped")
|
||||
downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued }
|
||||
availableSourceDownloads.filter { it.state == Downloading }.forEach { it.state = Queued }
|
||||
} catch (e: PauseDownloadException) {
|
||||
downloadLogger.debug { "paused" }
|
||||
download.state = Queued
|
||||
} catch (e: Exception) {
|
||||
logger.info("Downloader faced an exception", e)
|
||||
downloadLogger.warn("failed due to", e)
|
||||
download.tries++
|
||||
download.state = Error
|
||||
} finally {
|
||||
|
||||
+5
-1
@@ -19,4 +19,8 @@ class DownloadChapter(
|
||||
var state: DownloadState = Queued,
|
||||
var progress: Float = 0f,
|
||||
var tries: Int = 0
|
||||
)
|
||||
) {
|
||||
override fun toString(): String {
|
||||
return "${manga.title} ($mangaId) - ${chapter.name} (${chapter.id}) | state= $state, tries= $tries, progress= $progress"
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user