Kotlin学习:5.2、异步数据流 Flow

2年前未命名215
Kotlin学习:5.2、异步数据流 Flow 清风徐来辽 于2023-02-28 22:18:08发布 219 收藏 1 分类专栏: kotlin 文章标签: kotlin kotlin 专栏收录该内容 19 篇文章 0 订阅 订阅专栏

Flow 一、Flow1、Flow是什么东西?2、实现功能3、特点4、冷流和热流5、流的连续性6、流的构建器7、流的上下文8、指定流所在协程9、流的取消9.1、超时取消9.2、主动取消9.3、密集型任务的取消 10、背压和优化10.1、buffer 操作符10.2、 flowOn10.3、conflate 操作符10.4、collectLatest 操作符 二、操作符1、变换操作符1.1、buffer (缓存)1.2、map (变换)1.2.1、map1.2.2、mapNotNull (不空的下发)1.2.3、mapLatest 1.3、transform (一转多)1.4、reduce (累*加减乘除)1.5、fold(累*加减乘除 and 拼接)1.6、flatMapConcat (有序变换)1.7、flatMapMerge (无序变换)1.8、flatMapLatest (截留) 2、过滤型操作符2.1、take (截留)2.1.2、takeWhile 2.2、filter(满足条件下发)2.2.2、filterNotNull (不空的下发)2.2.3、filterNot(符合条件的值将被丢弃)2.2.4、filterInstance (筛选符合类型的值) 2.3、skip 和 drop(跳过)2.3.2、dropWhile 2.4、distinctUntilChanged (过滤重复)2.4.2、distinctUntilChangedBy 2.5、single (判断是否一个事件)2.6、first (截留第一个事件)2.7、debounce (防抖动)2.8、conflate2.9、sample (周期采样) 3、组合型操作符3.1、count (计数)3.2、zip (合并元素)3.3、combine(合并元素)3.4、merge (合并成流)3.5、flattenConcat (展平流)3.6、flattenMerge(展平流) 4、异常操作符4.1、catch (拦截异常)4.2、retry (重试)4.2.2、retryWhen 4.3、withTimeout (超时) 5、辅助操作符5.1、onXXX5.2、delay (延时)5.3、measureTimeMillis (计时) 参考地址

一、Flow 1、Flow是什么东西?

Flow 是有点类似 RxJava 的 Observable

都有冷流和热流之分; 都有流式构建结构; 都包含 map、filter 等操作符。

区别于Observable,Flow可以配合挂起函数使用

2、实现功能

异步返回多个值

可以实现下载功能等,Observable 下发数组时可以实现什么功能,他就能实现什么功能 当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。

转:https://blog.csdn.net/qq_30382601/article/details/121825461

3、特点 flow{…}块中的代码可以挂起使用flow,suspend修饰符可以省略流使用emit函数发射值流使用collect的函数收集值flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。流的连续性:流收集都是按顺序收集的flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。

转:https://blog.csdn.net/zx_android/article/details/122744370

4、冷流和热流

冷流 冷流类似冷启动,代码在被用到才会执行,如你需要使用的数据在网络,需要先请求网络才能得到数据 Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

热流 热流类似热启动,代码在用到之前已经准备好,如你请求过网络,数据已经缓存在本地,你只需直接使用即可

5、流的连续性

流的连续性:流收集都是按顺序收集的

6、流的构建器

如下三种为冷流构建器

flow{emit} .collect{}flowOf(***).collect{}(***).asFlow().collect{} @Test fun `test flow builder`() = runBlocking<Unit> { flowOf("one", "two", "three") .onEach { delay(1000) } .collect { value -> println(value) } (1..3).asFlow().collect { value -> println(value) } flow<Int> { for (i in 11..13) { delay(1000) //假装在一些重要的事情 emit(i) //发射,产生一个元素 } }.collect { value -> println(value) } } 7、流的上下文

flowOn (多用于切线程)

流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。

fun simpleFlow3() = flow<Int> { println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) } } @Test fun `test flow context`() = runBlocking<Unit> { simpleFlow3() .collect { value -> println("Collected $value ${Thread.currentThread().name}") } }

如下:流的发射和接收在一个协程内

Flow started Test worker @coroutine#1 Collected 1 Test worker @coroutine#1 Collected 2 Test worker @coroutine#1 Collected 3 Test worker @coroutine#1

flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)

如下这种写法不被允许

fun simpleFlow4() = flow<Int> { withContext(Dispatchers.Default) { println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) } } }

那么如何切换协程上下文呢? flowOn操作符,该函数用于更改流发射的上下文

fun simpleFlow5() = flow<Int> { println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) } }.flowOn(Dispatchers.Default) @Test fun `test flow context`() = runBlocking<Unit> { simpleFlow5() .collect { value -> println("Collected $value ${Thread.currentThread().name}") } }

如下:切换上下文成功

Flow started DefaultDispatcher-worker-2 @coroutine#2 Collected 1 Test worker @coroutine#1 Collected 2 Test worker @coroutine#1 Collected 3 Test worker @coroutine#1 8、指定流所在协程

launchIn 用于指定协程作用域通知flow执行

使用 launchIn 替换 collect 在单独的协程中启动收集流

指定协程 //事件源 private fun events() = (1..3) .asFlow() .onEach { delay(100) } .flowOn(Dispatchers.Default) @Test fun `test flow launch`() = runBlocking<Unit> { val job = events() .onEach { event -> println("Event: $event ${Thread.currentThread().name}") } // .collect {} .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow // .launchIn(this)//这里使用当前上下文传入Flow job.join() }

打印:

Event: 1 DefaultDispatcher-worker-2 @coroutine#2 Event: 2 DefaultDispatcher-worker-1 @coroutine#2 Event: 3 DefaultDispatcher-worker-3 @coroutine#2 也可以指定当前协程中执行 @Test fun `test flow launch`() = runBlocking<Unit> { val job = events() .onEach { event -> println("Event: $event ${Thread.currentThread().name}") } // .collect {} // .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow .launchIn(this)//这里使用当前上下文传入Flow // job.join() } Event: 1 Test worker @coroutine#2 Event: 2 Test worker @coroutine#2 Event: 3 Test worker @coroutine#2 9、流的取消

流采用和协程同样的协作取消。流可以在挂起函数的挂起的时候取消。

9.1、超时取消

withTimeoutOrNull 不能取消密集型任务

fun simpleFlow6() = flow<Int> { for (i in 1..300) { delay(1000) emit(i) println("Emitting $i") } } @Test fun `test cancel flow`() = runBlocking<Unit> { withTimeoutOrNull(2500) { simpleFlow6().collect { value -> println(value) } } println("Done") } 9.2、主动取消

cancel

@Test fun `test cancel flow `() = runBlocking<Unit> { simpleFlow6() .collect { value -> if (value == 3) { cancel() } println(value) } println("Done") } 9.3、密集型任务的取消

密集型任务需要流的取消检测

cancel + cancellable

@Test fun `test cancel flow check`() = runBlocking<Unit> { (1..5).asFlow().cancellable().collect { value -> println(value) if (value == 3) cancel() println("cancel check ${coroutineContext[Job]?.isActive}") } } 10、背压和优化 什么是背压?

生产者生产的效率大于消费者消费的效率,元素积压

例,演示背压

fun simpleFlow8() = flow<Int> { for (i in 1..10) { // emit 上面这段代码在collect之前执行 delay(100) emit(i) // 调用collect // emit下面这段代码在 collect 之后执行 println("Emitting $i ${Thread.currentThread().name}") } } @Test fun `test flow back pressure`() = runBlocking<Unit> { val time = measureTimeMillis { simpleFlow8() .collect { value -> delay(200) //处理这个元素消耗 200ms println("Collected $value ${Thread.currentThread().name}") } } println("Collected in $time ms") } Collected 1 Test worker @coroutine#1 Emitting 1 Test worker @coroutine#1 Collected 2 Test worker @coroutine#1 Emitting 2 Test worker @coroutine#1 Collected 3 Test worker @coroutine#1 Emitting 3 Test worker @coroutine#1 Collected 4 Test worker @coroutine#1 Emitting 4 Test worker @coroutine#1 Collected 5 Test worker @coroutine#1 Emitting 5 Test worker @coroutine#1 Collected 6 Test worker @coroutine#1 Emitting 6 Test worker @coroutine#1 Collected 7 Test worker @coroutine#1 Emitting 7 Test worker @coroutine#1 Collected 8 Test worker @coroutine#1 Emitting 8 Test worker @coroutine#1 Collected 9 Test worker @coroutine#1 Emitting 9 Test worker @coroutine#1 Collected 10 Test worker @coroutine#1 Emitting 10 Test worker @coroutine#1 Collected in 3169 ms 如何解决背压?

通过缓存进行性能优化

10.1、buffer 操作符

并发运行流中发射元素的代码

注意:for (i in 1…10) 这里用的是 1到 10,原因是 for循环 有耗时问题,通过打印时间戳在 for (i in 1…x) 上下,发现 for (i in 1…x) 这行代码有时耗时超过200毫秒,目前不知是何问题,特此记录,为方便对比优化时长,使用1到10.

@Test fun `test flow back pressure buffer`() = runBlocking<Unit> { val time = measureTimeMillis { simpleFlow8() .buffer(10) //缓存发射事件 .collect { value -> delay(200) //处理这个元素消耗 200ms println("Collected $value ${Thread.currentThread().name}") } } println("Collected in $time ms") } Emitting 1 Test worker @coroutine#2 Emitting 2 Test worker @coroutine#2 Collected 1 Test worker @coroutine#1 Emitting 3 Test worker @coroutine#2 Emitting 4 Test worker @coroutine#2 Collected 2 Test worker @coroutine#1 Emitting 5 Test worker @coroutine#2 Emitting 6 Test worker @coroutine#2 Collected 3 Test worker @coroutine#1 Emitting 7 Test worker @coroutine#2 Emitting 8 Test worker @coroutine#2 Collected 4 Test worker @coroutine#1 Emitting 9 Test worker @coroutine#2 Emitting 10 Test worker @coroutine#2 Collected 5 Test worker @coroutine#1 Collected 6 Test worker @coroutine#1 Collected 7 Test worker @coroutine#1 Collected 8 Test worker @coroutine#1 Collected 9 Test worker @coroutine#1 Collected 10 Test worker @coroutine#1 Collected in 2398 ms

10.2、 flowOn

flowOn(),修改流上下文,达到异步处理的效果,从而优化背压

@Test fun `test flow back pressure flowOn`() = runBlocking<Unit> { val time = measureTimeMillis { simpleFlow8() .flowOn(Dispatchers.IO) .collect { value -> delay(200) //处理这个元素消耗 200ms println("Collected $value ${Thread.currentThread().name}") } } println("Collected in $time ms") } Emitting 1 DefaultDispatcher-worker-1 @coroutine#2 Emitting 2 DefaultDispatcher-worker-1 @coroutine#2 Collected 1 Test worker @coroutine#1 Emitting 3 DefaultDispatcher-worker-1 @coroutine#2 Emitting 4 DefaultDispatcher-worker-1 @coroutine#2 Collected 2 Test worker @coroutine#1 Emitting 5 DefaultDispatcher-worker-1 @coroutine#2 Emitting 6 DefaultDispatcher-worker-1 @coroutine#2 Collected 3 Test worker @coroutine#1 Emitting 7 DefaultDispatcher-worker-1 @coroutine#2 Emitting 8 DefaultDispatcher-worker-1 @coroutine#2 Collected 4 Test worker @coroutine#1 Emitting 9 DefaultDispatcher-worker-1 @coroutine#2 Emitting 10 DefaultDispatcher-worker-1 @coroutine#2 Collected 5 Test worker @coroutine#1 Collected 6 Test worker @coroutine#1 Collected 7 Test worker @coroutine#1 Collected 8 Test worker @coroutine#1 Collected 9 Test worker @coroutine#1 Collected 10 Test worker @coroutine#1 Collected in 2385 ms 10.3、conflate 操作符

conflate(),合并发射项,处理最新的值,不对每个值进行处理;

@Test fun `test flow back pressure conflate`() = runBlocking<Unit> { val time = measureTimeMillis { simpleFlow8() .conflate() .collect { value -> delay(200) //处理这个元素消耗 200ms println("Collected $value ${Thread.currentThread().name}") } } println("Collected in $time ms") } Emitting 1 Test worker @coroutine#2 Emitting 2 Test worker @coroutine#2 Collected 1 Test worker @coroutine#1 Emitting 3 Test worker @coroutine#2 Emitting 4 Test worker @coroutine#2 Collected 2 Test worker @coroutine#1 Emitting 5 Test worker @coroutine#2 Emitting 6 Test worker @coroutine#2 Collected 4 Test worker @coroutine#1 Emitting 7 Test worker @coroutine#2 Emitting 8 Test worker @coroutine#2 Collected 6 Test worker @coroutine#1 Emitting 9 Test worker @coroutine#2 Emitting 10 Test worker @coroutine#2 Collected 8 Test worker @coroutine#1 Collected 10 Test worker @coroutine#1 Collected in 1554 ms 10.4、collectLatest 操作符

collectLatest(),取消并重新发射最后一个值

@Test fun `test flow back pressure collectLatest`() = runBlocking<Unit> { val time = measureTimeMillis { simpleFlow8() .collectLatest { value -> delay(200) //处理这个元素消耗 200ms println("Collected $value ${Thread.currentThread().name}") } } println("Collected in $time ms") } Emitting 1 Test worker @coroutine#2 Emitting 2 Test worker @coroutine#2 Emitting 3 Test worker @coroutine#2 Emitting 4 Test worker @coroutine#2 Emitting 5 Test worker @coroutine#2 Emitting 6 Test worker @coroutine#2 Emitting 7 Test worker @coroutine#2 Emitting 8 Test worker @coroutine#2 Emitting 9 Test worker @coroutine#2 Emitting 10 Test worker @coroutine#2 Collected 10 Test worker @coroutine#12 Collected in 1648 ms 二、操作符 1、变换操作符 1.1、buffer (缓存)

上面背压有栗子

1.2、map (变换) 1.2.1、map

map 是变换元素

data class Student(var name: String, var age: Int) private suspend fun performRequest(age: Int): Student { delay(500) return Student("这是name", age) } @Test fun `test map flow operator`() = runBlocking<Unit> { (1..3).asFlow() .map { request -> performRequest(request) } .collect { value -> println(value) } } Student(name=这是name, age=1) Student(name=这是name, age=2) Student(name=这是name, age=3) 1.2.2、mapNotNull (不空的下发) @Test fun `test mapNotNull flow operator`() = runBlocking<Unit> { flow { emit(1) emit(3) emit(2) } .mapNotNull { request -> if (1 == request) { null } else { Student("这是name", request) } } .collect { value -> println(value) } } Student(name=这是name, age=3) Student(name=这是name, age=2) 1.2.3、mapLatest

当有新值发送时,如果上个转换还没结束,会取消掉,用法同map

@Test fun `test mapLatest flow operator`() = runBlocking<Unit> { flow { emit(1) emit(2) emit(3) }.mapLatest { if (2 == it) delay(100L) "it is $it" }.collect { println(it) } } 1.3、transform (一转多) @Test fun `test transform flow operator`() = runBlocking<Unit> { (1..3).asFlow() .transform { request -> emit("Making request $request") emit(performRequest(request)) }.collect { value -> println(value) } } Making request 1 Student(name=这是name, age=1) Making request 2 Student(name=这是name, age=2) Making request 3 Student(name=这是name, age=3) 1.4、reduce (累*加减乘除) @Test fun `test reduce operator`() = runBlocking<Unit> { println(flow<Int> { emit(1) emit(1) emit(2) emit(3) emit(3) emit(4) }.reduce { accumulator, value -> accumulator + value }) } 1.5、fold(累*加减乘除 and 拼接) 加 @Test fun `test fold + operator`() = runBlocking<Unit> { println(flow<Int> { emit(1) emit(1) emit(2) emit(3) emit(3) emit(4) }.fold(3) { accumulator, value -> accumulator + value }) } 17 减 @Test fun `test fold - operator`() = runBlocking<Unit> { println(flow<Int> { emit(2) emit(3) }.fold(18) { accumulator, value -> accumulator - value }) } 13 乘 @Test fun `test fold multiply by operator`() = runBlocking<Unit> { println(flow<Int> { emit(1) emit(1) emit(2) emit(3) }.fold(3) { accumulator, value -> accumulator * value }) } 18 除 @Test fun `test fold devide operator`() = runBlocking<Unit> { println(flow<Int> { emit(2) emit(3) }.fold(18) { accumulator, value -> accumulator / value }) } 3 拼接 @Test fun `test fold joint operator`() = runBlocking<Unit> { println(flow<Int> { emit(1) emit(2) emit(3) }.fold("拼接") { accumulator, value -> return@fold "$accumulator =+= $value" }) } 拼接 =+= 1 =+= 2 =+= 3 1.6、flatMapConcat (有序变换)

元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素会等待。

@Test fun `test flatMapConcat operator`() = runBlocking<Unit> { (1..5).asFlow() .onEach { delay(100) } .flatMapConcat { num -> flow { if (3==num){ delay(200) } emit("num: $num") } }.collect { println("value -> $it") } } value -> num: 1 value -> num: 2 value -> num: 3 value -> num: 4 value -> num: 5 1.7、flatMapMerge (无序变换)

元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素不会等待。

@Test fun `test flatMapMerge operator`() = runBlocking<Unit> { (1..5).asFlow() .onEach { delay(100) } .flatMapMerge() { num -> flow { if (3==num){ delay(200) } emit("num: $num") } }.collect { println("value -> $it") } } value -> num: 1 value -> num: 2 value -> num: 4 value -> num: 3 value -> num: 5 1.8、flatMapLatest (截留)

快速执行的事件都正常下发, 当有新值发送时,如果上个转换还没结束,会上取消掉上一个,直接下发新值。

@Test fun `test flatMapLatest operator`() = runBlocking<Unit> { (1..5).asFlow() .onEach { delay(100) } .flatMapLatest() { num -> flow { if (3 == num) { delay(200) } emit("num: $num") emit("num2: $num") } }.collect { println("value -> $it") } } value -> num: 1 value -> num2: 1 value -> num: 2 value -> num2: 2 value -> num: 4 value -> num2: 4 value -> num: 5 value -> num2: 5 2、过滤型操作符 2.1、take (截留)

跟Rxjava一样

fun numbers() = flow<Int> { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally in numbers") } } @Test fun `test limit length operator`() = runBlocking<Unit> { //take(2),表示 当计数元素被消耗时,原始流被取消 numbers().take(2).collect { value -> println(value) } } 1 2 Finally in numbers 2.1.2、takeWhile

找到第一个不满足条件的值,发送它之前的值,和dropWhile相反

@Test fun `test takeWhile operator`() = runBlocking<Unit> { flow<Int> { emit(2) emit(1) emit(3) emit(4) emit(1) }.takeWhile { it < 2 } .collect { value -> println(value) } }

如上什么也不会输出;

@Test fun `test takeWhile operator`() = runBlocking<Unit> { flow<Int> { emit(1) emit(2) emit(3) emit(4) emit(1) }.takeWhile { it < 2 } .collect { value -> println(value) } }

会输出 1

2.2、filter(满足条件下发)

跟Rxjava一样

@Test fun `test filter operator`() = runBlocking<Unit> { numbers().filter { it == 2 }.collect { value -> println(value) } } 2.2.2、filterNotNull (不空的下发) @Test fun `test filterNotNull flow operator`() = runBlocking<Unit> { flow { emit(1) emit(3) emit(null) emit(2) } .filterNotNull () .collect { value -> println(value) } } 1 3 2 2.2.3、filterNot(符合条件的值将被丢弃)

筛选不符合条件的值,相当于filter取反

@Test fun `test filterNot operator`() = runBlocking<Unit> { flow<Int> { emit(1) emit(2) emit(3) }.filterNot { it > 2 }.collect { value -> println(value) } } 1 2 2.2.4、filterInstance (筛选符合类型的值)

对标rxjava中的ofType

筛选符合类型的值(不符合类型的值将被丢弃)

@Test fun `test filterInstance operator`() = runBlocking<Unit> { flow<Any> { emit(1) emit("2") emit(3) emit("str") }.filterIsInstance<String>() .collect { value -> println(value) } } 2 str 2.3、skip 和 drop(跳过)

@Test fun `test skip operator`() = runBlocking<Unit> { numbers() .drop(2) .collect { value -> println(value) } }

输出

3 2.3.2、dropWhile

找到第一个不满足条件的值,继续发送它和它之后的值

@Test fun `test dropWhile operator`() = runBlocking<Unit> { numbers() .dropWhile { it <= 2 } .collect { value -> println(value) } } This line will not execute 3 Finally in numbers 2.4、distinctUntilChanged (过滤重复) @Test fun `test distinctUntilChanged operator`() = runBlocking<Unit> { flow<Int> { emit(1) emit(1) emit(2) emit(3) emit(3) emit(4) } .distinctUntilChanged() .collect { value -> println(value) } } 2.4.2、distinctUntilChangedBy

判断两个连续值是否重复,可以设置是否丢弃重复值。 去重规则有点复杂,没完全懂

@Test fun `test distinctUntilChangedBy operator`() = runBlocking<Unit> { flowOf( Student(name = "Jack", age = 11), Student(name = "Tom", age = 10), Student(name = "Jack", age = 12), Student(name = "Jack", age = 13), Student(name = "Tom", age = 11) ) .distinctUntilChangedBy { it.name == "Jack" } .collect { //第三个Stu将被丢弃 println(it.toString()) } } Student(name=Jack, age=11) Student(name=Tom, age=10) Student(name=Jack, age=12) Student(name=Tom, age=11) 2.5、single (判断是否一个事件)

用于确保 flow 输出值唯一。若只有一个值,则可以正常执行,若输出的值不止只有一个的时候,就会抛出异常:

@Test fun `test single operator`() = runBlocking<Unit> { try { println(flow<Int> { emit(1) emit(1) emit(2) emit(3) emit(3) emit(4) }.single()) } catch (e: Exception) { println("e =$e") } }

如果一个事件,就正常执行;否则异常。

e =java.lang.IllegalArgumentException: Flow has more than one element 2.6、first (截留第一个事件) @Test fun `test first operator`() = runBlocking<Unit> { println(flow<Int> { emit(1) emit(1) emit(2) emit(3) emit(3) emit(4) }.first()) } 1 2.7、debounce (防抖动) @Test fun `test debounce operator`() = runBlocking<Unit> { flowOf( Student(name = "Jack", age = 11), Student(name = "Tom", age = 10), Student(name = "Jack", age = 12), Student(name = "Jack", age = 13), Student(name = "Tom", age = 11) ) .onEach { if (it.name == "Jack" && it.age == 13) delay(500) } .debounce(500) .collect { //第三个Stu将被丢弃 println(it.toString()) } } Student(name=Jack, age=12) Student(name=Tom, age=11) 2.8、conflate

见 10.3、conflate

仅保留最新值, 内部就是 buffer(CONFLATED``)

2.9、sample (周期采样)

固定周期采样 ,给定一个时间周期,保留周期内最后发出的值,其他的值将被丢弃

sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。

@Test fun `test sample operator`() = runBlocking<Unit> { flow { repeat(10) { delay(50) emit(it) } }.sample(100).collect { println(it) } } 0 2 4 6 8 3、组合型操作符 3.1、count (计数) @Test fun `test count operator`() = runBlocking<Unit> { println(flow<Int> { emit(1) emit(1) emit(2) emit(3) emit(3) emit(4) } .count()) } 3.2、zip (合并元素)

跟Rxjava一样

@Test fun `test zip operator`() = runBlocking<Unit> { val nameFlow = mutableListOf("小红", "小黑").asFlow() val numFlow = (1..3).asFlow() nameFlow.zip(numFlow) { string, num -> "$string:$num" }.collect { println("value -> $it") } } 3.3、combine(合并元素) @Test fun `test combine operator`() = runBlocking<Unit> { val nameFlow = mutableListOf("小红", "小黑").asFlow() val numFlow = (1..3).asFlow() nameFlow.combine(numFlow) { string, num -> "$string:$num" }.collect { println("value -> $it") } } value -> 小红:1 value -> 小黑:2 value -> 小黑:3 3.4、merge (合并成流)

merge 是将两个flow合并起来,将每个值依次发出来

@Test fun `test merge operator`() = runBlocking<Unit> { val flow1 = listOf(1, 2) .asFlow() val flow2 = listOf("one", "two", "three") .asFlow() merge(flow1, flow2) .collect { value -> println(value) } } 1 2 one two three 3.5、flattenConcat (展平流)

展平操作符 flattenConcat 以顺序方式将给定的流展开为单个流,通俗点讲,减少层级 ,感觉和merge这么像呢,这个不太理解啥用

@Test fun `test flattenConcat operator`() = runBlocking<Unit> { val flow1 = listOf(1, 2) .asFlow() val flow2 = listOf("one", "two", "three") .asFlow() val flow3 = listOf("x", "xx", "xxx") .asFlow() flowOf(flow1, flow2, flow3) .flattenConcat() .collect { value -> println(value) } } 1 2 one two three x xx xxx 3.6、flattenMerge(展平流)

flattenMerge 作用和 flattenConcat 一样,但是可以设置并发收集流的数量

@Test fun `test flattenMerge operator`() = runBlocking<Unit> { val flow1 = listOf(1, 2) .asFlow() val flow2 = listOf("one", "two", "three") .asFlow() val flow3 = listOf("x", "xx", "xxx") .asFlow() flowOf(flow1, flow2, flow3) .flattenMerge(2) .collect { value -> println(value) } } 1 2 one two three x xx xxx 4、异常操作符 4.1、catch (拦截异常)

对标rxjava 中的 onErrorResumeNext

Exception、Throwable、Error 都会拦截

@Test fun `test catch operator`() = runBlocking<Unit> { (1..5).asFlow() .onEach { delay(100) } .onEach { if (2 == it) throw NullPointerException() } .catch { emit(110) println("e == $it") } .collect { println("value -> $it") } } @Test fun `test catch operator`() = runBlocking<Unit> { (1..5).asFlow() .onEach { delay(100) } .onEach { if (2 == it) // throw Exception("测试 异常") // throw Throwable("测试 异常") throw Error("测试 错误") } .catch { emit(110) println("e == $it") } .collect { println("value -> $it") } } value -> 1 value -> 110 e == java.lang.Error: 测试 错误 4.2、retry (重试)

所有异常错误都拦截

拦截次数 @Test fun `test retry operator`() = runBlocking<Unit> { flow<Any> { emit(1) emit(2) throw Exception("异常") emit(3) }.retry(2) .catch { emit(110) } .collect { value -> println(value) } } 拦截条件 @Test fun `test retry 2 operator`() = runBlocking<Unit> { flow<Any> { emit(1) emit(2) throw Error("异常") emit(3) }.retry { it.message == "异常" } .catch { emit(110) } .collect { value -> println(value) } }

如上,满足拦截条件,所以会一直打印日志

1 2 1 2 1 2 1 2 1 ... 不杀死程序一直打印 4.2.2、retryWhen 4.3、withTimeout (超时) @Test fun `test retry 2 operator`() = runBlocking<Unit> { withTimeout(2500) { flow<Any> { emit(1) throw Error("异常") }.retry { it.message == "异常" } .catch { emit(110) } .collect { value -> println(value) } } }

输出:

1 1 ... 好多个 1 1 1 Timed out waiting for 2500 ms kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms (Coroutine boundary) at com.yoshin.kt.kotlindemo20220713.ExampleUnitTest$test retry 2 operator$1.invokeSuspend(ExampleUnitTest.kt:928) Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms at app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184) at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154) at app//kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508) at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284) at app//kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108) at java.base@11.0.13/java.lang.Thread.run(Thread.java:834) 5、辅助操作符 5.1、onXXX

onXXX 的方法包含

onCompletion 流完成时调用 onStart 流开始时调用 onEach 元素下发时调用,每次下发都调用

对比rxjava 中: onCompletion == doOnComplete onStart == doOnSubscribe 或者 doOnLifecycle onEach == doNext

@Test fun `test do operator`() = runBlocking<Unit> { (1..5).asFlow() .onCompletion { println(" onCompletion == $it ") } .onStart { println(" onStart ") } .onEach { println(" onEach == $it ") } .collect { println("value -> $it") } } 5.2、delay (延时)

延时

private fun events() = (1..3) .asFlow() .onEach { delay(100) } .flowOn(Dispatchers.Default) 5.3、measureTimeMillis (计时)

测量代码用时

@Test fun `test flow back pressure`() = runBlocking<Unit> { val time = measureTimeMillis { simpleFlow8() .collect { value -> delay(200) //处理这个元素消耗 200ms println("Collected $value ${Thread.currentThread().name}") } } println("Collected in $time ms") } 参考地址

笔记大部分内容来自动脑学院的文章和视频

动脑学院 :https://blog.csdn.net/qq_30382601/article/details/121825461

Kotlin 之 协程(三)Flow异步流 :https://blog.csdn.net/zx_android/article/details/122744370

Android Kotlin之Flow数据流:https://blog.csdn.net/u013700502/article/details/120526170

标签: [db:标签TAG]

相关文章

8 个很棒的 Vue 开发技巧

8 个很棒的 Vue 开发技巧...

【Linux】进程信号万字详解(下)

【Linux】进程信号万字详解(下)...

OpenAI ChatGPT 人工智能机器人注册使用,能以中文对答如流的机器人

OpenAI ChatGPT 人工智能机器人注册使用,能以中文对答如流的机器人...

学习 Python 之 Pygame 开发魂斗罗(一)

学习 Python 之 Pygame 开发魂斗罗(一)...

如何轻松解决C盘爆满情况——SpaceSniffer

如何轻松解决C盘爆满情况——SpaceSniffer...

用OpeAI API打造ChatGPT桌面端应用

用OpeAI API打造ChatGPT桌面端应用...