## 概要

`Channel` 提供在协程间传递消息的原语,容量与类型决定背压策略:`RENDEZVOUS`(零缓冲)、`BUFFERED`(固定缓冲)、`UNLIMITED`(不限缓冲)与 `CONFLATED`(保留最新)。本文以可复制代码验证不同策略的吞吐与尾延迟权衡,并演示 `select` 的多路竞争。


## 环境校验

  • Kotlin/JVM: 1.9+(或 2.0+)
  • `kotlinx-coroutines-core`: 1.8.x+

Gradle:

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
}

## 不同容量的背压行为

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

suspend fun demo(capacity: Int) = coroutineScope {
    val ch = Channel<Int>(capacity)
    val prod = launch {
        repeat(20) { i ->
            ch.send(i) // 根据容量可能挂起形成背压
            println("send $i")
        }
        ch.close()
    }
    val cons = launch {
        for (x in ch) { delay(50); println("recv $x") }
    }
    prod.join(); cons.join()
}

fun main() = runBlocking {
    println("-- RENDEZVOUS(0) --"); demo(Channel.RENDEZVOUS)
    println("-- BUFFERED(64) --"); demo(64)
    println("-- UNLIMITED --")
    val unlimited = Channel<Int>(Channel.UNLIMITED)
    launch { repeat(10) { unlimited.send(it) }; unlimited.close() }
    for (x in unlimited) { delay(30); println("recv $x") }
}

要点:

  • `RENDEZVOUS` 提供最强背压,发送与接收一对一同步;`BUFFERED` 在容量内快速吞吐;`UNLIMITED` 可能占用内存,需谨慎。

## CONFLATED 保留最新值

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val ch = Channel<Int>(Channel.CONFLATED)
    launch {
        repeat(10) { i -> ch.send(i); delay(10) }
        ch.close()
    }
    for (x in ch) { delay(50); println("recv $x") }
}

要点:强调时效性,处理中间值会被覆盖。


## select 多路竞争与降级策略

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.select

fun main() = runBlocking {
    val fast = Channel<String>(Channel.BUFFERED)
    val slow = Channel<String>(Channel.RENDEZVOUS)

    launch { repeat(5) { fast.send("F$it"); delay(10) }; fast.close() }
    launch { repeat(5) { delay(60); slow.send("S$it") }; slow.close() }

    repeat(10) {
        val v = select<String> {
            fast.onReceiveCatching { it.getOrNull()?.let { "FAST:$it" } ?: "FAST:end" }
            slow.onReceiveCatching { it.getOrNull()?.let { "SLOW:$it" } ?: "SLOW:end" }
        }
        println(v)
        if (v.endsWith("end")) break
    }
}

要点:

  • 先到先服务:通过 `select` 将更快通道优先消费;慢通道在数据到达时再处理。
  • 结合降级策略:若慢通道长时间无数据,可走缓存或默认值路径。

## 结论

  • `Channel` 的容量与类型是背压设计的核心;选择需结合吞吐、尾延迟与内存边界。
  • `select` 能实现多路竞争与优先级控制,适合聚合多源事件。

## 参考

  • kotlinx-coroutines channels 与 selects 文档

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部