Fix/backup import failure not resetting status (#746)
* Reset backup status to idle in case of an exception * Rename "performRestore" function * Set backup status to failure on exception Makes it possible to detect if the restore failed or not after the first status was received * Set backup status to success on completion Since the status is not provided over a subscription, but over a query that should be pulled, it is not really easily detectable if a restore finished or not, since both states will be indicated by "idle" * Correctly wait for first new status when triggering backup import The status is only "Idle" in case no backup import has ever run. Once the first backup process finished it is either "Failure" or "Success" * Rename "ProtoBackupImport::restore" function * Add id to restore process Makes it possible to differentiate between backup restore processes.
This commit is contained in:
@@ -1,13 +1,9 @@
|
||||
package suwayomi.tachidesk.graphql.mutations
|
||||
|
||||
import io.javalin.http.UploadedFile
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import suwayomi.tachidesk.graphql.server.TemporaryFileStorage
|
||||
import suwayomi.tachidesk.graphql.types.BackupRestoreState
|
||||
import suwayomi.tachidesk.graphql.types.BackupRestoreStatus
|
||||
import suwayomi.tachidesk.graphql.types.toStatus
|
||||
import suwayomi.tachidesk.manga.impl.backup.BackupFlags
|
||||
@@ -25,26 +21,23 @@ class BackupMutation {
|
||||
|
||||
data class RestoreBackupPayload(
|
||||
val clientMutationId: String?,
|
||||
val status: BackupRestoreStatus,
|
||||
val id: String,
|
||||
val status: BackupRestoreStatus?,
|
||||
)
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
fun restoreBackup(input: RestoreBackupInput): CompletableFuture<RestoreBackupPayload> {
|
||||
val (clientMutationId, backup) = input
|
||||
|
||||
return future {
|
||||
GlobalScope.launch {
|
||||
ProtoBackupImport.performRestore(backup.content)
|
||||
val restoreId = ProtoBackupImport.restore(backup.content)
|
||||
|
||||
withTimeout(10.seconds) {
|
||||
ProtoBackupImport.notifyFlow.first {
|
||||
ProtoBackupImport.getRestoreState(restoreId) != null
|
||||
}
|
||||
}
|
||||
|
||||
val status =
|
||||
withTimeout(10.seconds) {
|
||||
ProtoBackupImport.backupRestoreState.first {
|
||||
it != ProtoBackupImport.BackupRestoreState.Idle
|
||||
}.toStatus()
|
||||
}
|
||||
|
||||
RestoreBackupPayload(clientMutationId, status)
|
||||
RestoreBackupPayload(clientMutationId, restoreId, ProtoBackupImport.getRestoreState(restoreId)?.toStatus())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ class BackupQuery {
|
||||
)
|
||||
}
|
||||
|
||||
fun restoreStatus(): BackupRestoreStatus {
|
||||
return ProtoBackupImport.backupRestoreState.value.toStatus()
|
||||
fun restoreStatus(id: String): BackupRestoreStatus? {
|
||||
return ProtoBackupImport.getRestoreState(id)?.toStatus()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import suwayomi.tachidesk.manga.impl.backup.proto.ProtoBackupImport
|
||||
|
||||
enum class BackupRestoreState {
|
||||
IDLE,
|
||||
SUCCESS,
|
||||
FAILURE,
|
||||
RESTORING_CATEGORIES,
|
||||
RESTORING_MANGA,
|
||||
}
|
||||
@@ -22,6 +24,18 @@ fun ProtoBackupImport.BackupRestoreState.toStatus(): BackupRestoreStatus {
|
||||
totalManga = 0,
|
||||
mangaProgress = 0,
|
||||
)
|
||||
is ProtoBackupImport.BackupRestoreState.Success ->
|
||||
BackupRestoreStatus(
|
||||
state = BackupRestoreState.SUCCESS,
|
||||
totalManga = 0,
|
||||
mangaProgress = 0,
|
||||
)
|
||||
is ProtoBackupImport.BackupRestoreState.Failure ->
|
||||
BackupRestoreStatus(
|
||||
state = BackupRestoreState.FAILURE,
|
||||
totalManga = 0,
|
||||
mangaProgress = 0,
|
||||
)
|
||||
is ProtoBackupImport.BackupRestoreState.RestoringCategories ->
|
||||
BackupRestoreStatus(
|
||||
state = BackupRestoreState.RESTORING_CATEGORIES,
|
||||
|
||||
@@ -29,7 +29,7 @@ object BackupController {
|
||||
behaviorOf = { ctx ->
|
||||
ctx.future(
|
||||
future {
|
||||
ProtoBackupImport.performRestore(ctx.bodyAsInputStream())
|
||||
ProtoBackupImport.restoreLegacy(ctx.bodyAsInputStream())
|
||||
},
|
||||
)
|
||||
},
|
||||
@@ -55,7 +55,7 @@ object BackupController {
|
||||
// TODO: rewrite this with ctx.uploadedFiles(), don't call the multipart field "backup.proto.gz"
|
||||
ctx.future(
|
||||
future {
|
||||
ProtoBackupImport.performRestore(ctx.uploadedFile("backup.proto.gz")!!.content)
|
||||
ProtoBackupImport.restoreLegacy(ctx.uploadedFile("backup.proto.gz")!!.content)
|
||||
},
|
||||
)
|
||||
},
|
||||
|
||||
+155
-61
@@ -7,7 +7,14 @@ package suwayomi.tachidesk.manga.impl.backup.proto
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import mu.KotlinLogging
|
||||
@@ -20,6 +27,7 @@ import org.jetbrains.exposed.sql.insertAndGetId
|
||||
import org.jetbrains.exposed.sql.select
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
import org.jetbrains.exposed.sql.update
|
||||
import suwayomi.tachidesk.graphql.types.toStatus
|
||||
import suwayomi.tachidesk.manga.impl.Category
|
||||
import suwayomi.tachidesk.manga.impl.CategoryManga
|
||||
import suwayomi.tachidesk.manga.impl.Manga.clearThumbnail
|
||||
@@ -38,9 +46,13 @@ import suwayomi.tachidesk.manga.model.table.MangaTable
|
||||
import java.io.InputStream
|
||||
import java.lang.Integer.max
|
||||
import java.util.Date
|
||||
import java.util.Timer
|
||||
import java.util.TimerTask
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
object ProtoBackupImport : ProtoBackupBase() {
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
private var restoreAmount = 0
|
||||
@@ -52,78 +64,160 @@ object ProtoBackupImport : ProtoBackupBase() {
|
||||
sealed class BackupRestoreState {
|
||||
data object Idle : BackupRestoreState()
|
||||
|
||||
data object Success : BackupRestoreState()
|
||||
|
||||
data object Failure : BackupRestoreState()
|
||||
|
||||
data class RestoringCategories(val totalManga: Int) : BackupRestoreState()
|
||||
|
||||
data class RestoringManga(val current: Int, val totalManga: Int, val title: String) : BackupRestoreState()
|
||||
}
|
||||
|
||||
val backupRestoreState = MutableStateFlow<BackupRestoreState>(BackupRestoreState.Idle)
|
||||
private val backupRestoreIdToState = mutableMapOf<String, BackupRestoreState>()
|
||||
|
||||
suspend fun performRestore(sourceStream: InputStream): ValidationResult {
|
||||
return backupMutex.withLock {
|
||||
val backupString = sourceStream.source().gzip().buffer().use { it.readByteArray() }
|
||||
val backup = parser.decodeFromByteArray(BackupSerializer, backupString)
|
||||
val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = DROP_OLDEST)
|
||||
|
||||
val validationResult = validate(backup)
|
||||
fun getRestoreState(id: String): BackupRestoreState? {
|
||||
return backupRestoreIdToState[id]
|
||||
}
|
||||
|
||||
restoreAmount = backup.backupManga.size + 1 // +1 for categories
|
||||
private fun updateRestoreState(
|
||||
id: String,
|
||||
state: BackupRestoreState,
|
||||
) {
|
||||
backupRestoreIdToState[id] = state
|
||||
|
||||
backupRestoreState.value = BackupRestoreState.RestoringCategories(backup.backupManga.size)
|
||||
// Restore categories
|
||||
if (backup.backupCategories.isNotEmpty()) {
|
||||
restoreCategories(backup.backupCategories)
|
||||
}
|
||||
|
||||
val categoryMapping =
|
||||
transaction {
|
||||
backup.backupCategories.associate {
|
||||
val dbCategory = CategoryTable.select { CategoryTable.name eq it.name }.firstOrNull()
|
||||
val categoryId =
|
||||
dbCategory?.let {
|
||||
categoryResultRow ->
|
||||
categoryResultRow[CategoryTable.id].value
|
||||
} ?: Category.DEFAULT_CATEGORY_ID
|
||||
it.order to categoryId
|
||||
}
|
||||
}
|
||||
|
||||
// Store source mapping for error messages
|
||||
sourceMapping = backup.getSourceMap()
|
||||
|
||||
// Restore individual manga
|
||||
backup.backupManga.forEachIndexed { index, manga ->
|
||||
backupRestoreState.value =
|
||||
BackupRestoreState.RestoringManga(
|
||||
current = index + 1,
|
||||
totalManga = backup.backupManga.size,
|
||||
title = manga.title,
|
||||
)
|
||||
restoreManga(
|
||||
backupManga = manga,
|
||||
backupCategories = backup.backupCategories,
|
||||
categoryMapping = categoryMapping,
|
||||
)
|
||||
}
|
||||
|
||||
logger.info {
|
||||
"""
|
||||
Restore Errors:
|
||||
${errors.joinToString("\n") { "${it.first} - ${it.second}" }}
|
||||
Restore Summary:
|
||||
- Missing Sources:
|
||||
${validationResult.missingSources.joinToString("\n ")}
|
||||
- Titles missing Sources:
|
||||
${validationResult.mangasMissingSources.joinToString("\n ")}
|
||||
- Missing Trackers:
|
||||
${validationResult.missingTrackers.joinToString("\n ")}
|
||||
""".trimIndent()
|
||||
}
|
||||
backupRestoreState.value = BackupRestoreState.Idle
|
||||
|
||||
validationResult
|
||||
scope.launch {
|
||||
notifyFlow.emit(Unit)
|
||||
}
|
||||
}
|
||||
|
||||
private fun cleanupRestoreState(id: String) {
|
||||
val timer = Timer()
|
||||
val delay = 1000L * 60 // 60 seconds
|
||||
|
||||
timer.schedule(
|
||||
object : TimerTask() {
|
||||
override fun run() {
|
||||
logger.debug { "cleanupRestoreState: $id (${getRestoreState(id)?.toStatus()?.state})" }
|
||||
backupRestoreIdToState.remove(id)
|
||||
}
|
||||
},
|
||||
delay,
|
||||
)
|
||||
}
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
suspend fun restore(sourceStream: InputStream): String {
|
||||
val restoreId = System.currentTimeMillis().toString()
|
||||
|
||||
logger.info { "restore($restoreId): queued" }
|
||||
|
||||
updateRestoreState(restoreId, BackupRestoreState.Idle)
|
||||
|
||||
GlobalScope.launch {
|
||||
restoreLegacy(sourceStream, restoreId)
|
||||
}
|
||||
|
||||
return restoreId
|
||||
}
|
||||
|
||||
suspend fun restoreLegacy(
|
||||
sourceStream: InputStream,
|
||||
restoreId: String = "legacy",
|
||||
): ValidationResult {
|
||||
return backupMutex.withLock {
|
||||
try {
|
||||
logger.info { "restore($restoreId): restoring..." }
|
||||
performRestore(restoreId, sourceStream)
|
||||
} catch (e: Exception) {
|
||||
logger.error(e) { "restore($restoreId): failed due to" }
|
||||
|
||||
updateRestoreState(restoreId, BackupRestoreState.Failure)
|
||||
ValidationResult(
|
||||
emptyList(),
|
||||
emptyList(),
|
||||
emptyList(),
|
||||
emptyList(),
|
||||
)
|
||||
} finally {
|
||||
logger.info { "restore($restoreId): finished with state ${getRestoreState(restoreId)?.toStatus()?.state}" }
|
||||
cleanupRestoreState(restoreId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun performRestore(
|
||||
id: String,
|
||||
sourceStream: InputStream,
|
||||
): ValidationResult {
|
||||
val backupString = sourceStream.source().gzip().buffer().use { it.readByteArray() }
|
||||
val backup = parser.decodeFromByteArray(BackupSerializer, backupString)
|
||||
|
||||
val validationResult = validate(backup)
|
||||
|
||||
restoreAmount = backup.backupManga.size + 1 // +1 for categories
|
||||
|
||||
updateRestoreState(id, BackupRestoreState.RestoringCategories(backup.backupManga.size))
|
||||
// Restore categories
|
||||
if (backup.backupCategories.isNotEmpty()) {
|
||||
restoreCategories(backup.backupCategories)
|
||||
}
|
||||
|
||||
val categoryMapping =
|
||||
transaction {
|
||||
backup.backupCategories.associate {
|
||||
val dbCategory =
|
||||
CategoryTable.select { CategoryTable.name eq it.name }
|
||||
.firstOrNull()
|
||||
val categoryId =
|
||||
dbCategory?.let { categoryResultRow ->
|
||||
categoryResultRow[CategoryTable.id].value
|
||||
} ?: Category.DEFAULT_CATEGORY_ID
|
||||
it.order to categoryId
|
||||
}
|
||||
}
|
||||
|
||||
// Store source mapping for error messages
|
||||
sourceMapping = backup.getSourceMap()
|
||||
|
||||
// Restore individual manga
|
||||
backup.backupManga.forEachIndexed { index, manga ->
|
||||
updateRestoreState(
|
||||
id,
|
||||
BackupRestoreState.RestoringManga(
|
||||
current = index + 1,
|
||||
totalManga = backup.backupManga.size,
|
||||
title = manga.title,
|
||||
),
|
||||
)
|
||||
|
||||
restoreManga(
|
||||
backupManga = manga,
|
||||
backupCategories = backup.backupCategories,
|
||||
categoryMapping = categoryMapping,
|
||||
)
|
||||
}
|
||||
|
||||
logger.info {
|
||||
"""
|
||||
Restore Errors:
|
||||
${errors.joinToString("\n") { "${it.first} - ${it.second}" }}
|
||||
Restore Summary:
|
||||
- Missing Sources:
|
||||
${validationResult.missingSources.joinToString("\n ")}
|
||||
- Titles missing Sources:
|
||||
${validationResult.mangasMissingSources.joinToString("\n ")}
|
||||
- Missing Trackers:
|
||||
${validationResult.missingTrackers.joinToString("\n ")}
|
||||
""".trimIndent()
|
||||
}
|
||||
|
||||
updateRestoreState(id, BackupRestoreState.Success)
|
||||
|
||||
return validationResult
|
||||
}
|
||||
|
||||
private fun restoreCategories(backupCategories: List<BackupCategory>) {
|
||||
val dbCategories = Category.getCategoryList()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user