一般我们在 Java 项目中做并发编程,基本都是通过创建线程的方式来执行(JDK21支持了虚拟线程),但是线程有如下问题
- 线程是不能无限创建的,而是受到操作系统的限制
- 线程切换的时候有较高的上下文切换的成本
而协程可以理解为轻量级的线程,可以在一个线程中执行多个任务,而不需要线程切换的开销,同时也避免了线程数量的限制
这里看一下kotlin中的协程,首先需要引入单独的依赖包
1 2 3 4
| dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0-RC2") }
|
如果需要在主线程中异步执行一些任务,但是主线程要同步等待异步任务结果,可以使用runBlocking
(结构化并发)
无需结果的任务执行
对于不需要获取结果的任务,可以使用如下形式:
1 2 3 4 5 6 7 8
| runBlocking { launch { } launch { } }
|
下面看一个具体的使用例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| fun main() { println("start ${Thread.currentThread()} ${System.currentTimeMillis()}") runBlocking { launch { delay(2000) println("task 1 ${Thread.currentThread()} ${System.currentTimeMillis()}") } launch { delay(1000) println("task 2 ${Thread.currentThread()} ${System.currentTimeMillis()}") } println("runBlocking ${Thread.currentThread()} ${System.currentTimeMillis()}") } println("done ${Thread.currentThread()} ${System.currentTimeMillis()}") }
|
执行结果
1 2 3 4 5 6
| // 可以看到线程ID都是相同的,说明都是在同一个线程内执行完成,但是任务明显不都是同步顺序执行的 start Thread[#1,main,5,main] 1704114721498 runBlocking Thread[#1,main,5,main] 1704114721583 task 2 Thread[#1,main,5,main] 1704114722600 -- 启动后,过了1s执行task2 task 1 Thread[#1,main,5,main] 1704114723592 -- 启动后,过了2s执行task1 done Thread[#1,main,5,main] 1704114723593 -- 等待task1与task2都执行完成
|
需要结果的任务执行
如果需要获取协程中执行的结果,那么上面的形式可以进行如下修改:
- 将 launch 改为 async
- async返回结果类型为 Deferred<T>
- 调用 Deferred#await方法同步获取结果
(这部分类似 Java 中的 Future 和 Future#get)
1 2 3 4 5 6 7 8 9 10
| runBlocking { val defer1: Deferred<T> = async { } val defer2: Deferred<T> = async { } val result1 = defer1.await() }
|
下面同样用一个例子看一下具体使用
1 2 3 4 5 6 7 8 9 10 11 12
| fun main() { runBlocking { val count: Deferred<Int> = async { println("start ${Thread.currentThread()}") Random.nextInt() } println("runBlocking ${Thread.currentThread()}") println("count: ${count.await()} ${Thread.currentThread()}") } }
|
执行结果如下
1 2 3
| runBlocking Thread[#1,main,5,main] start Thread[#1,main,5,main] count: -779821096 Thread[#1,main,5,main]
|
协程通信
协程间通信可以使用 channel(类似为一个队列)
channel类型可以分为
Unlimited channel : channel中元素无限制,放入过多元素且没有及时消费会出现OOM
Buffered channel: 可以指定缓存的元素数量,当发送到数量上限时,发送调用会被挂起
Rendezvous channel: 可以认为可以缓存的数量为0,发送调用会被挂起,直到有消费方进行消费
Conflated channel: 每次发送到channel的元素都会覆盖之前的元素,所以发送调用永远不会被挂起
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| fun main() = runBlocking<Unit> { val channel = Channel<String>() launch { channel.send("A1") channel.send("A2") log("A done") } launch { channel.send("B1") log("B done") } launch { repeat(3) { val x = channel.receive() log(x) } } }
|
输出:
1 2 3 4 5
| [main] A1 [main] B1 [main] A done [main] B done [main] A2
|
协程分派
之前看的任务都是在主线程(main)中执行的,我们也可以协程分派到不同的线程中进行执行,下面说的配置同时适用于 launch 和 async
首先,可以在launch中指定分派策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
runBlocking { launch { delay(2000) println("task 1 ${Thread.currentThread()} ${System.currentTimeMillis()}") } launch(Dispatchers.IO) { delay(1000) println("task 2 ${Thread.currentThread()} ${System.currentTimeMillis()}") } println("runBlocking ${Thread.currentThread()} ${System.currentTimeMillis()}") }
|
执行结果
1 2 3 4 5
| start Thread[#1,main,5,main] 1704115043212 runBlocking Thread[#1,main,5,main] 1704115043297 task 2 Thread[#21,DefaultDispatcher-worker-1,5,main] 1704115044305 task 1 Thread[#1,main,5,main] 1704115045304 done Thread[#1,main,5,main] 1704115045306
|
如果觉得默认提供的不适用,也可以使用自定义的线程池
1 2 3 4 5 6 7 8 9 10 11
| ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, ArrayBlockingQueue(20)) .asCoroutineDispatcher().use { dispatcher -> runBlocking { launch(dispatcher) { TimeUnit.SECONDS.sleep(1) println("task 2 ${Thread.currentThread()} ${System.currentTimeMillis()}") } println("runBlocking ${Thread.currentThread()} ${System.currentTimeMillis()}") } }
|
如果挂起后再执行时切换线程,可以使用 launch的第二个参数(start = CoroutineStart.UNDISPATCHED),当然还有一些其他的选项可以自行参考一下文档~
1 2 3 4 5 6 7 8 9 10
| ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, ArrayBlockingQueue(20)) .asCoroutineDispatcher().use { dispatcher -> runBlocking { launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { println("task 2 ${Thread.currentThread()} ${System.currentTimeMillis()}") } println("runBlocking ${Thread.currentThread()} ${System.currentTimeMillis()}") } }
|
runBlocking中要切换线程上下文,可以使用 withContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, ArrayBlockingQueue(20)) .asCoroutineDispatcher().use { dispatcher -> runBlocking { withContext(Dispatchers.IO) { println("task 2.1 ${Thread.currentThread()} ${System.currentTimeMillis()}") }
launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { TimeUnit.SECONDS.sleep(1) println("task 2.2 ${Thread.currentThread()} ${System.currentTimeMillis()}") } println("runBlocking ${Thread.currentThread()} ${System.currentTimeMillis()}") } }
start Thread[#1,main,5,main] 1704117793052 task 2.1 Thread[#21,DefaultDispatcher-worker-1,5,main] 1704117793154 task 2.2 Thread[#1,main,5,main] 1704117794162 runBlocking Thread[#1,main,5,main] 1704117794163 done Thread[#1,main,5,main] 1704117794165
|