Merged manga implementation, man this took forever to make and bugfix, its not even done
This commit is contained in:
@@ -104,17 +104,30 @@ suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
||||
|
||||
suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
|
||||
subscribe(object : CompletableSubscriber {
|
||||
override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
|
||||
override fun onCompleted() { cont.resume(Unit) }
|
||||
override fun onError(e: Throwable) { cont.resumeWithException(e) }
|
||||
override fun onSubscribe(s: Subscription) {
|
||||
cont.unsubscribeOnCancellation(s)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
cont.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable) {
|
||||
cont.resumeWithException(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
|
||||
cont.unsubscribeOnCancellation(
|
||||
subscribe(object : SingleSubscriber<T>() {
|
||||
override fun onSuccess(t: T) { cont.resume(t) }
|
||||
override fun onError(error: Throwable) { cont.resumeWithException(error) }
|
||||
override fun onSuccess(t: T) {
|
||||
cont.resume(t)
|
||||
}
|
||||
|
||||
override fun onError(error: Throwable) {
|
||||
cont.resumeWithException(error)
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
@@ -129,7 +142,11 @@ suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstOrDefaul
|
||||
suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
|
||||
suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(Observable.fromCallable(defaultValue)).first().awaitOne()
|
||||
suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(
|
||||
Observable.fromCallable(
|
||||
defaultValue
|
||||
)
|
||||
).first().awaitOne()
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
|
||||
suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
|
||||
@@ -141,11 +158,24 @@ suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
|
||||
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
|
||||
cont.unsubscribeOnCancellation(
|
||||
subscribe(object : Subscriber<T>() {
|
||||
override fun onStart() { request(1) }
|
||||
override fun onNext(t: T) { cont.resume(t) }
|
||||
override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
|
||||
override fun onStart() {
|
||||
request(1)
|
||||
}
|
||||
|
||||
override fun onNext(t: T) {
|
||||
cont.resume(t)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
if (cont.isActive) cont.resumeWithException(
|
||||
IllegalStateException(
|
||||
"Should have invoked onNext"
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable) {
|
||||
/*
|
||||
/*
|
||||
* Rx1 observable throws NoSuchElementException if cancellation happened before
|
||||
* element emission. To mitigate this we try to atomically resume continuation with exception:
|
||||
* if resume failed, then we know that continuation successfully cancelled itself
|
||||
@@ -185,7 +215,7 @@ fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
|
||||
fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
|
||||
return Observable.create(
|
||||
{ emitter ->
|
||||
/*
|
||||
/*
|
||||
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
|
||||
* asObservable is already invoked from unconfined
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user