威尼斯 0.9

威尼斯 0.9

测试已测试
语言语言 CC
许可证 MIT
发布上次发布2015年11月

Paolo Faria维护。



  • 页面上
  • Paolo Faria

威尼斯

codecov.io

威尼斯 是一个纯 Swift/C 库,为 Swift 2 提供了 CSP

功能

  • [x] 无 Foundation 依赖(Linux 兼容
  • [x] 协程
  • [x] 协程预分配
  • [x] 通道
  • [x] 可靠性通道
  • [x] 接收通道
  • [x] 发送通道
  • [x] 通道迭代
  • [x] 选择
  • [x] 计时器
  • [x] 报时器
  • [x] 文件描述符轮询
  • [x] IP
  • [x] TCP 套接字
  • [ ] UDP 套接字
  • [ ] UNIX 套接字
  • [ ] 文件

威尼斯 封装了一个修改过的 C 库版本 libmill

产品

威尼斯 是以下产品的基:

性能

威尼斯 的速度非常快,因为它使用由 libmill 管理的轻量级协程而非由操作系统管理的线程。命令行应用程序中的 Chinese Whispers 示例显示了您如何创建高达 100,000 个并发协程(在 2015 年早期 8 GB 的 MacBook Pro 上进行了测试)。

您在自己的计算机上运行性能测试并亲眼目睹。只需运行 PerformanceTests.swift 中的测试。

使用方法

co

func doSomething() {
    print("did something")
}

// call sync
doSomething()

// call async
co(doSomething())

// async closure
co {
    print("did something else")
}

napwakeUp

co {
    // wakes up 1 second from now
    wakeUp(now + 1 * second)
    print("yawn")
}

// nap for two seconds so the program
// doesn't terminate before the print
nap(2 * second)

after

after 在指定持续时间后运行协程。

after(1 * second) {
    print("yoo")
}

// same as

co {
    nap(1 * second)
    print("yoo")
}

Channel

通道是类型化的,返回包装值或 nil 的可选值,如果通道已关闭且缓冲区中无余值。

let messages = Channel<String>()
co(messages <- "ping")
let message = <-messages
print(message!)

// without operators

let messages = Channel<String>()
co(messages.receive("ping"))
let message = messages.send()
print(message!)

// buffered channels

let messages = Channel<String>(bufferSize: 2)

messages <- "buffered"
messages <- "channel"

print(!<-messages)
print(!<-messages)

ReceivingChannelSendingChannel

您可以获取具有只接收或只发送通道功能的通道的引用。

func receiveOnly(channel: ReceivingChannel<String>) {
    // can only receive
    channel <- "yo"
}

func sendOnly(channel: SendingChannel<String>) {
    // can only send
    <-channel
}

let channel = Channel<String>(bufferSize: 1)
receiveOnly(channel.receivingChannel)
sendOnly(channel.sendingChannel)

FallibleChannel

可靠性通道接受值和错误。

struct Error : ErrorType {}

let channel = FallibleChannel<String>(bufferSize: 2)

channel <- "yo"
channel <- Error()

do {
    let yo = try <-channel
    try <-channel // will throw
} catch {
    print("error")
}

选择

有时 选择 可能会与同名的系统库函数冲突。为了解决这个问题,您可以使用 Venice.select 或更简短的别名 sel 来调用 Venice 的 select

let channel = Channel<String>()
let fallibleChannel = FallibleChannel<String>()

select { when in
    when.receiveFrom(channel) { value in
        print("received \(value)")
    }
    when.receiveFrom(fallibleChannel) { result in
        result.success { value in
            print(value)
        }
        result.failure { error in
            print(error)
        }
    }
    when.send("value", to: channel) {
        print("sent value")
    }
    when.send("value", to: fallibleChannel) {
        print("sent value")
    }
    when.throwError(Error(), into: fallibleChannel) {
        print("threw error")
    }
    when.timeout(now + 1 * second) {
        print("timeout")
    }
    when.otherwise {
        print("default case")
    }
}

您可以通过将其设置为 nil 来禁用一个信道选择

var channelA: Channel<String>? = Channel<String>()
var channelB: Channel<String>? = Channel<String>()

if arc4random_uniform(2) == 0 {
    channelA = nil
    print("disabled channel a")
} else {
    channelB = nil
    print("disabled channel b")
}

co { channelA?.receive("a") }
co { channelB?.receive("b") }

sel { when in
    when.receiveFrom(channelA) { value in
        print("received \(value) from channel a")
    }
    when.receiveFrom(channelB) { value in
        print("received \(value) from channel b")
    }
}

另一种禁用信道选择的方法是将其 case 放入 if 中。

let channelA = Channel<String>()
let channelB = Channel<String>()

co(channelA <- "a")
co(channelB <- "b")

select { when in
    if arc4random_uniform(2) == 0 {
        print("disabled channel b")
        when.receiveFrom(channelA) { value in
            print("received \(value) from channel a")
        }
    } else {
        print("disabled channel a")
        when.receiveFrom(channelB) { value in
            print("received \(value) from channel b")
        }
    }
}

forSelect

很多时候我们需要在 while 循环中包装我们的 select。为了使这种模式更容易工作,我们可以使用 forSelect。当您调用 done() 之前,forSelect 将循环。

func flipCoin(result: FallibleChannel<String>) {
    if arc4random_uniform(2) == 0 {
        result <- "Success"
    } else {
        result <- Error(description: "Something went wrong")
    }
}

let results = FallibleChannel<String>()

co(flipCoin(results))

forSelect { when, done in
    when.receiveFrom(results) { result in
        result.success { value in
            print(value)
            done()
        }
        result.failure { error in
            print("\(error). Retrying...")
            co(flipCoin(results))
        }
    }
}

IP

// local
do {
    // all network interfaces
    let ip1 = try IP(port: 5555, mode: .IPV4)

    // specific network interface
    let ip2 = try IP(networkInterface: "en0", port: 5555, mode: .IPV6)
} catch {
    // something bad happened :(
}

// remote
do {
    // if the deadline is reached the call will throw
    let ip3 = try IP(address: "127.0.0.1", port: 5555, mode: .IPV4, deadline: now + 10 * second)
} catch {
    // something bad happened :(
}

TCP

// server
do {
    let ip = try IP(port: 5555)
    let serverSocket = try TCPServerSocket(ip: ip)
    let clientSocket = try serverSocket.accept()

    let yo = try clientSocket.receiveString(untilDelimiter: "\n")
} catch {
    // something bad happened :(
}

// client
do {
    let ip = try IP(address: "127.0.0.1", port: 5555)
    let clientSocket = try TCPClientSocket(ip: ip)
    let deadline = now + 10 * second

    // calls to send append the data to an internal
    // buffer to minimize system calls
    try clientSocket.sendString("yo\n", deadline: deadline)
    // flush actually sends all data in the buffer
    try clientSocket.flush()
} catch {
    // something bad happened :(
}

安装

手动

如果希望不使用依赖管理器,可以将 Venice 手动集成到您的项目中。

嵌入式框架

  • 打开终端,使用 cd 命令进入您的顶级项目目录,如果您还没有将项目初始化为 Git 仓库,请运行以下命令:
$ git init
  • 通过运行以下命令将 Venice 添加为 Git 子模块
$ git submodule add https://github.com/Zewo/Venice.git
  • 打开新的 Venice 文件夹,并将 Venice.xcodeproj 拖入您的应用程序 Xcode 项目的项目导航器中。

    它应该位于您的应用程序蓝色项目图标下面。它是在所有其他 Xcode 组的上面还是下面无关紧要。

  • 在项目导航器中选择 Venice.xcodeproj,并验证部署目标与您的应用程序目标匹配。

  • 接下来,在项目导航器中(蓝色项目图标)选择您的应用程序项目以导航到目标配置窗口,并在侧边栏中下方的“目标”标题下选择应用程序目标。
  • 在该窗口的选项卡栏中,打开“通用”面板。
  • 在“嵌入式二进制文件”部分下点击 + 按钮。
  • 您将看到两个不同的 Venice.xcodeproj 文件夹,每个文件夹中都有两个不同的版本 Venice.framework 存储在 Products 文件夹内。

    哪个 Products 文件夹无关紧要,但您选择的 Venice.framework 是顶层还是底层则很重要。

  • 针对 OS X 选择顶部的 Venice.framework,针对 iOS 选择底部的那个。

    您可以通过检查项目的构建日志来验证您选择了哪一个。构建目标将列为 Venice iOSVenice OSX

  • 一切就绪!

Venice.framework 将自动添加为目标依赖项、链接框架和嵌入框架,在复制文件构建阶段,这您需要构建模拟器和设备上的项目。

命令行应用程序

在命令行应用程序中使用 Venice

检查 Command Line Applications 目录获取示例。

示例

示例 01-15 来自于gobyexample,并使用威尼斯从Go翻译成Swift。Xcode项目中包含一个包含以下所有示例的重磅区域。至少编译一次Venice OSX目标,然后您可以自由地与重磅区域示例进行交互。

01 - 协程

协程是一个轻量级的执行线程。

func f(from: String) {
    for i in 0 ..< 4 {
        print("\(from): \(i)")
        yield
    }
}

假设我们有一个函数调用 f(s)。以下是常规方式的调用,它是同步执行的。

f("direct")

使用 co(f(s)) 协程来调用该函数。这个新的协程将与调用它的协程同时执行。

co(f("coroutine"))

您还可以使用闭包来启动协程。

co {
    print("going")
}

现在我们的两个函数调用正在非同步的不同协程中运行,所以执行会跳转到这里。我们在程序退出前等待1秒。

nap(1 * second)
print("done")

当我们运行此程序时,首先看到阻塞调用的输出,然后是两个协程交互的输出。这种交错反映了运行时并发执行的协程。

输出

direct: 0
direct: 1
direct: 2
direct: 3
coroutine: 0
going
coroutine: 1
coroutine: 2
coroutine: 3
done

02 - 通道

通道是将并发协程连接的管道。您可以从一个协程向通道发送值,并将这些值接收进另一个协程。

使用 Channel() 创建一个新通道。通道通过它们携带的值进行类型化。

let messages = Channel<String>()

使用 channel <- value 语法向通道发送值。这里,我们从一个新的协程向上面创建的 messages 通道发送 "ping"

co(messages <- "ping")

<-channel 语法接收通道中的值。这里我们将接收上面发送的 "ping" 消息并将其打印出来。

let message = <-messages
print(message!)

当我们运行程序时,“ping”消息成功地通过我们的通道从一个协程传递到另一个协程。默认情况下,发送和接收会阻塞,直到发送者和接收者都准备好。这种特性使得我们可以在程序末尾等待“ping”消息,而无需使用任何其他同步机制。

从通道接收到的值是 Optional。如果您尝试从一个没有剩余值的缓冲区关闭的通道中获取值,它将返回 nil。如果您确定有一个值被包装在 Optional 中,您可以使用 !<- 操作符,它返回一个隐式解包的选项。

输出

ping

03 - 通道缓冲

默认情况下,通道是无缓冲的,这意味着只有在准备好接收值时的接收(let value = <-channel)才会接收从通道发送的值。有缓冲的通道在没有任何相应接收者的情况下接受有限数量的值。

这里我们创建了一个能够缓冲2个值的字符串通道。

let messages = Channel<String>(bufferSize: 2)

因为这个通道是有缓冲的,所以我们可以在没有相应并发接收的情况下将值发送到通道中。

messages <- "buffered"
messages <- "channel"

稍后我们可以像以往一样接收这两个值。

print(!<-messages)
print(!<-messages)

输出

buffered
channel

04 - 通道同步

我们可以使用通道在协程之间同步执行。以下是一个使用阻塞接收以等待协程完成的示例。

这是我们将在一个协程中运行的函数。使用 done 通道通知另一个协程该函数的工作已完成。

func worker(done: Channel<Bool>) {
    print("working...")
    nap(1 * second)
    print("done")
    done <- true // Send a value to notify that we're done.
}

启动一个工作协程,向其提供通知的通道。

let done = Channel<Bool>(bufferSize: 1)
co(worker(done))

在接收到工作从通道发送的通知之前阻塞。

<-done

如果您从程序中删除了<-done行,程序将在工作器启动之前就退出。

输出

working...
done

05 - 通道方向

在将通道用作函数参数时,您可以指定通道是仅用于发送还是接收值。这种特定性提高了程序的类型安全性。

ping函数仅接受接收值的通道。尝试从该通道接收值将导致编译时错误。

func ping(pings: ReceivingChannel<String>, message: String) {
    pings <- message
}

pong函数接受一个仅发送值(pings)的通道和一个仅接收值(pongs)的通道。

func pong(pings: SendingChannel<String>, _ pongs: ReceivingChannel<String>) {
    let message = !<-pings
    pongs <- message
}

let pings = Channel<String>(bufferSize: 1)
let pongs = Channel<String>(bufferSize: 1)

ping(pings.receivingChannel, message: "passed message")
pong(pings.sendingChannel, pongs.receivingChannel)

print(!<-pongs)

输出

passed message

06 - Select

Select允许您等待多个通道操作。将协程和通道与select结合使用是极其强大的功能。

在我们的例子中,我们将跨两个通道进行选择。

let channel1 = Channel<String>()
let channel2 = Channel<String>()

每个通道在一段时间后将接收一个值,以模拟例如阻塞RPC操作在并发协程中执行。

co {
    nap(1 * second)
    channel1 <- "one"
}

co {
    nap(2 * second)
    channel2 <- "two"
}

我们将使用select同时等待这两个值,每当值到达时打印它们。

for _ in 0 ..< 2 {
    select { when in
        when.receiveFrom(channel1) { message1 in
            print("received \(message1)")
        }
        when.receiveFrom(channel2) { message2 in
            print("received \(message2)")
        }
    }
}

我们接收到的值依次是"one""two",正如预期的那样。请注意,总执行时间仅为约2秒,因为1秒和2秒的nap并发执行。

输出

received one
received two

07 - 超时

超时对于连接到外部资源或需要限制执行时间的程序很重要。由于通道和select,实现超时既简单又优雅。

在我们的例子中,假设我们正在执行一个外部调用,该调用在2秒后将结果返回到一个名为channel1的通道上。

let channel1 = Channel<String>(bufferSize: 1)

co {
    nap(2 * second)
    channel1 <- "result 1"
}

以下是实现超时的selectreceiveFrom(channel1)等待结果,而timeout(now + 1 * second)等待在1秒超时后发送一个值。由于select按照第一个准备好接收的接收操作来执行,因此如果操作超过允许的1秒,我们将采用超时情况。

select { when in
    when.receiveFrom(channel1) { result in
        print(result)
    }
    when.timeout(now + 1 * second) {
        print("timeout 1")
    }
}

如果我们允许更长的3秒超时,那么从channel2的接收将成功,我们将打印结果。

let channel2 = Channel<String>(bufferSize: 1)

co {
    nap(2 * second)
    channel2 <- "result 2"
}

select { when in
    when.receiveFrom(channel2) { result in
        print(result)
    }
    when.timeout(now + 3 * second) {
        print("timeout 2")
    }
}

运行此程序将显示第一个操作超时,第二个成功。

使用此select超时模式需要通过通道进行结果通信。这在一般情况下是一个好主意,因为其他重要特性也是基于通道和select的。我们将接下来看两个这样的例子:计时器和报时器。

输出

timeout 1
result 2

08 - 非阻塞通道操作

通道的发送和接收是阻塞的。然而,我们可以使用带有otherwise子句的select来实现非阻塞发送、接收,甚至非阻塞多路select

let messages = Channel<String>()
let signals = Channel<Bool>()

这里是一个非阻塞接收示例。如果数组messages上有可用值,则select将采用带有该值的receiveFrom(messages)情况。如果没有,它将立即采用otherwise情况。

select { when in
    when.receiveFrom(messages) { message in
        print("received message \(message)")
    }
    when.otherwise {
        print("no message received")
    }
}

非阻塞发送的工作方式类似。

let message = "hi"

select { when in
    when.send(message, to: messages) {
        print("sent message \(message)")
    }
    when.otherwise {
        print("no message sent")
    }
}

我们可以在otherwise子句上方使用多个情况来实现多路非阻塞select。在这里,我们尝试在messagessignals上执行非阻塞接收。

select { when in
    when.receiveFrom(messages) { message in
        print("received message \(message)")
    }
    when.receiveFrom(signals) { signal in
        print("received signal \(signal)")
    }
    when.otherwise {
        print("no activity")
    }
}

输出

no message received
no message sent
no activity

09 - 关闭通道

关闭一个通道表示不能再向其中发送更多值。这可以用来将完成的信号传递给通道的接收端。

在这个例子中,我们将使用一个jobs通道来将需要完成的任务传递给一个工作协程。当我们没有更多任务给工作协程时,我们将关闭jobs通道。

let jobs = Channel<Int>(bufferSize: 5)
let done = Channel<Bool>()

以下是一个工作协程的示例。它通过j = <-jobs重复从jobs接收数据。如果jobs已经被关闭,并且通道中的所有值都已经被接收,则返回值将是nil。我们使用这个值在完成所有任务后,在done上发出通知。

co {
    while true {
        if let job = <-jobs {
            print("received job \(job)")
        } else {
            print("received all jobs")
            done <- true
            return
        }
    }
}

这个示例通过jobs通道发送了3个任务给工作协程,然后关闭了它。

for job in 1 ... 3 {
    print("sent job \(job)")
    jobs <- job
}

jobs.close()
print("sent all jobs")

我们使用之前看到的同步方法等待工作协程。

<-done

关闭通道的概念自然地引出了我们下一个例子:遍历通道。

输出

sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received job 3
received all jobs

10 - 遍历通道

我们可以使用for in来遍历从通道接收到的值。我们将遍历queue通道中的2个值。

let queue =  Channel<String>(bufferSize: 2)

queue <- "one"
queue <- "two"
queue.close()

这个for in循环将在从queue接收每个元素时遍历。因为我们之前已经关闭了通道,所以在收到2个元素后,迭代将终止。如果我们没有关闭它,循环中的第3次接收将导致阻塞。

for element in queue {
    print(element)
}

这个例子还展示了关闭一个非空的通道但仍然接收剩余值是可能的。

输出

one
two

11 - 定时器

我们经常希望在未来的某个时刻执行代码,或者每隔一段时间重复执行。定时器和滴答器功能使得这两个任务变得容易。我们先看看定时器,然后再看看滴答器。

定时器代表未来的一个单一事件。你告诉定时器你想要等待多长时间,然后它提供一个在这个时间会发出通知的通道。这个定时器将等待2秒钟。

let timer1 = Timer(deadline: now + 2 * second)

<-timer1.channel在定时器通道上阻塞,直到定时器发送一个值,表明定时器已过期。

<-timer1.channel
print("Timer 1 expired")

如果你只想等待,你可以使用nap。定时器可能有用的一部分是因为你可以在它到期之前取消定时器。这里有一个例子来演示这一过程。

let timer2 = Timer(deadline: now + 1 * second)

co {
    <-timer2.channel
    print("Timer 2 expired")
}

let stop2 = timer2.stop()

if stop2 {
    print("Timer 2 stopped")
}

第一个定时器将在我们启动程序后大约2秒到期,但第二个应该在它有机会到期之前被停止。

输出

Timer 1 expired
Timer 2 stopped

12 - 滴答器

定时器用于当你想要在未来某个时间做一次事情时 - 滴答器用于当你想要以固定间隔重复执行时。这里有一个滴答器的示例,它会定期滴答,直到我们停止它。

滴答器使用与定时器类似的机制:一个发送值的通道。在这里,我们将使用通道的内置函数generator来迭代每500ms接收到的值。

let ticker = Ticker(period: 500 * millisecond)

co {
    for time in ticker.channel {
        print("Tick at \(time)")
    }
}

滴答器可以像定时器一样停止。一旦滴答器停止,它将不会在通道上收到任何更多值。我们将滴答器在1600ms后停止。

nap(1600 * millisecond)
ticker.stop()
print("Ticker stopped")

当我们运行这个程序时,滴答器应该在停止之前滴答3次。

输出

Tick at 37024098
Tick at 37024599
Tick at 37025105
Ticker stopped

13 - 工作池

在这个例子中,我们将看看如何使用协程和通道实现工作池

以下是工作进程,我们将运行多个并发实例。这些工作进程将在jobs通道上接收工作,并将相应的结果发送到results。我们将为每个工作暂停一秒来模拟一个昂贵的任务。

func worker(id: Int, jobs: Channel<Int>, results: Channel<Int>) {
    for job in jobs {
        print("worker \(id) processing job \(job)")
        nap(1 * second)
        results <- job * 2
    }
}

为了使用我们的工作进程池,我们需要向它们发送工作并收集结果。为此我们创建了2个通道。

let jobs = Channel<Int>(bufferSize: 100)
let results = Channel<Int>(bufferSize: 100)

这启动了3个工作进程,初始时由于还没有工作而处于阻塞状态。

for workerId in 1 ... 3 {
    co(worker(workerId, jobs: jobs, results: results))
}

这里我们发送了9个工作项,然后关闭这个通道以表示我们所有的工作。

for job in 1 ... 9 {
    jobs <- job
}

jobs.close()

最后,我们收集了所有工作的结果。

for _ in 1 ... 9 {
    <-results
}

我们的运行程序显示了9个工作项被各个工作进程执行。由于有3个工作进程并行操作,该程序只需要大约3秒钟,尽管总共需要大约9秒钟的工作。

输出

worker 1 processing job 1
worker 2 processing job 2
worker 3 processing job 3
worker 1 processing job 4
worker 2 processing job 5
worker 3 processing job 6
worker 1 processing job 7
worker 2 processing job 8
worker 3 processing job 9

14 - 速率限制

速率限制是一个重要的机制,用于控制资源利用率和维持服务质量。威尼斯优雅地支持通过协程、通道和定时器实现速率限制。

首先,我们将看看基本的速率限制。假设我们想要限制对传入请求的处理。我们将通过同名通道处理这些请求。

var requests = Channel<Int>(bufferSize: 5)

for request in 1 ... 5 {
    requests <- request
}

requests.close()

这个limiter通道每200毫秒接收一个值。这是我们的速率限制方案中的调节器。

let limiter = Ticker(period: 200 * millisecond)

通过在每个请求前在一个接收动作上阻塞limiter通道,我们可以将自己限制在每200毫秒处理一个请求。

for request in requests {
    <-limiter.channel
    print("request \(request) \(now)")
}

print("")

我们可能在速率限制方案中想要允许短暂的请求突发,同时保持整体速率限制。我们可以通过在速率限制器通道中缓冲来实现这一点。这个burstyLimiter通道将允许最多3个事件的突发。

let burstyLimiter = Channel<Int>(bufferSize: 3)

填充这个通道以代表允许的突发。

for _ in 0 ..< 3 {
    burstyLimiter <- now
}

每200毫秒,我们将尝试向burstyLimiter添加一个新值,直到其容量的3个。

co {
    for time in Ticker(period: 200 * millisecond).channel {
        burstyLimiter <- time
    }
}

现在模拟5个更多的传入请求。这5个中的前3个将受益于burstyLimiter的突发能力。

let burstyRequests = Channel<Int>(bufferSize: 5)

for request in 1 ... 5 {
    burstyRequests <- request
}

burstyRequests.close()

for request in burstyRequests {
    <-burstyLimiter
    print("request \(request) \(now)")
}

运行我们的程序,我们看到第一批请求每隔200毫秒处理一次,正如预期的那样。

对于第二批请求,由于突发速率限制,我们立即处理前3个,然后以大约每个200毫秒的延迟处理剩下的2个。

输出

request 1 37221046
request 2 37221251
request 3 37221453
request 4 37221658
request 5 37221860

request 1 37221863
request 2 37221864
request 3 37221865
request 4 37222064
request 5 37222265

15 - 带状态的协程

在这个例子中,我们的状态将由一个单一的协程拥有。这将保证数据在并发访问时不会被损坏。为了读取或写入该状态,其他协程会向拥有状态的协程发送消息,并接收相应的回复。这些ReadOperationWriteOperation结构体封装了请求和所有权的协程响应的方式。

struct ReadOperation {
    let key: Int
    let responses: Channel<Int>
}

struct WriteOperation {
    let key: Int
    let value: Int
    let responses: Channel<Bool>
}

我们将计数我们执行的操作数。

var operations = 0

reads通道将由其他协程用来发出读和写请求,分别。

let reads = Channel<ReadOperation>()
let writes = Channel<WriteOperation>()

这是我们拥有state(该状态属于状态保全协程的私有字典)的协程。这个协程重复选择在reads通道上,对到达的请求进行响应。响应通过先执行请求的操作,然后在responses通道上发送一个值以指示成功(以及在 readability 情况下的期望值)来执行。

co {
    var state: [Int: Int] = [:]
    while true {
        select { when in
            when.receiveFrom(reads) { read in
                read.responses <- state[read.key] ?? 0
            }
            when.receiveFrom(writes) { write in
                state[write.key] = write.value
                write.responses <- true
            }
        }
    }
}

这个协程通过reads通道开始启动100个协程,向拥有状态的协程发出读取请求。每个读取都需要构建一个ReadOperation,通过reads通道发送它,然后在提供的responses通道上接收结果。

for _ in 0 ..< 100 {
    co {
        while true {
            let read = ReadOperation(
                key: Int(arc4random_uniform(5)),
                responses: Channel<Int>()
            )
            reads <- read
            <-read.responses
            operations++
        }
    }
}

我们以类似的方式启动了10个写入。

for _ in 0 ..< 10 {
    co {
        while true {
            let write = WriteOperation(
                key: Int(arc4random_uniform(5)),
                value: Int(arc4random_uniform(100)),
                responses: Channel<Bool>()
            )
            writes <- write
            <-write.responses
            operations++
        }
    }
}

让协程工作一秒钟。

nap(1 * second)

最后,捕获并报告 操作 计数。

print("operations: \(operations)")

输出

operations: 55798

16 - 中国 whispers

func whisper(left: ReceivingChannel<Int>, _ right: SendingChannel<Int>) {
    left <- 1 + !<-right
}

let n = 1000

let leftmost = Channel<Int>()
var right = leftmost
var left = leftmost

for _ in 0 ..< n {
    right = Channel<Int>()
    co(whisper(left.receivingChannel, right.sendingChannel))
    left = right
}

co {
    right <- 1
}

print(!<-leftmost)

输出

1001

17 - 乒乓球

final class Ball { var hits: Int = 0 }

func player(name: String, table: Channel<Ball>) {
    while true {
        let ball = !<-table
        ball.hits++
        print("\(name) \(ball.hits)")
        nap(100 * millisecond)
        table <- ball
    }
}

let table = Channel<Ball>()

co(player("ping", table: table))
co(player("pong", table: table))

table <- Ball()
nap(1 * second)
<-table

输出

ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping 11

18 - 禁用通道选择

var channelA: Channel<String>? = Channel<String>()
var channelB: Channel<String>? = Channel<String>()

if arc4random_uniform(2) == 0 {
    channelA = nil
    print("disabled channel a")
} else {
    channelB = nil
    print("disabled channel b")
}

co { channelA?.receive("a") }
co { channelB?.receive("b") }

select { when in
    when.receiveFrom(channelA) { value in
        print("received \(value) from channel a")
    }
    when.receiveFrom(channelB) { value in
        print("received \(value) from channel b")
    }
}

输出

disabled channel b
received a from channel a

disabled channel a
received b from channel b

19 - 斐波那契

func fibonacci(n: Int, channel: Channel<Int>) {
    var x = 0
    var y = 1
    for _ in 0 ..< n {
        channel <- x
        (x, y) = (y, x + y)
    }
    channel.close()
}

let fibonacciChannel = Channel<Int>(bufferSize: 10)

co(fibonacci(fibonacciChannel.bufferSize, channel: fibonacciChannel))

for n in fibonacciChannel {
    print(n)
}

输出

0
1
1
2
3
5
8
13
21
34

20 - 炸弹

let tick = Ticker(period: 100 * millisecond).channel
let boom = Timer(deadline: now + 500 * millisecond).channel

var done = false
while !done {
    select { when in
        when.receiveFrom(tick) { _ in
            print("tick")
        }
        when.receiveFrom(boom) { _ in
            print("BOOM!")
            done = true
        }
        when.otherwise {
            print("    .")
            nap(50 * millisecond)
        }
    }
}

输出

    .
    .
tick
    .
    .
tick
    .
    .
tick
    .
    .
tick
    .
BOOM!

21 - 可靠性通道

func flipCoin(result: FallibleChannel<String>) {
    struct Error : ErrorType, CustomStringConvertible { let description: String }
    if arc4random_uniform(2) == 0 {
        result <- "Success"
    } else {
        result <- Error(description: "Something went wrong.")
    }
}

let results = FallibleChannel<String>()
var done = false

co(flipCoin(results))

while !done {
    do {
        let value = try !<-results
        print(value)
        done = true
    } catch {
        print("\(error) Retrying...")
        co(flipCoin(results))
    }
}

输出

Something went wrong. Retrying...
Something went wrong. Retrying...
Something went wrong. Retrying...
Something went wrong. Retrying...
Something went wrong. Retrying...
Success

22 - 选择和可靠性通道

struct Error : ErrorType, CustomStringConvertible { let description: String }

func flipCoin(result: FallibleChannel<String>) {
    if arc4random_uniform(2) == 0 {
        result <- "Success"
    } else {
        result <- Error(description: "Something went wrong")
    }
}

let results = FallibleChannel<String>()

co(flipCoin(results))

select { when in
    when.receiveFrom(results) { result in
        result.success { value in
            print(value)
        }
        result.failure { error in
            print(error)
        }
    }
}

输出

Success

Something went wrong

23 - 树

extension CollectionType where Index == Int {
    func shuffle() -> [Generator.Element] {
        var list = Array(self)
        list.shuffleInPlace()
        return list
    }
}

extension MutableCollectionType where Index == Int {
    mutating func shuffleInPlace() {
        if count < 2 { return }

        for i in 0..<count - 1 {
            let j = Int(arc4random_uniform(UInt32(count - i))) + i
            guard i != j else { continue }
            swap(&self[i], &self[j])
        }
    }
}

final class Tree<T> {
    var left: Tree?
    var value: T
    var right: Tree?

    init(left: Tree?, value: T, right: Tree?) {
        self.left = left
        self.value = value
        self.right = right
    }
}

深度优先遍历树,通过通道发送每个值。

func walk<T>(tree: Tree<T>?, channel: Channel<T>) {
    if let tree = tree {
        walk(tree.left, channel: channel)
        channel <- tree.value
        walk(tree.right, channel: channel)
    }
}

在新的协程中启动遍历,并返回只读值的通道。

func walker<T>(tree: Tree<T>?) -> SendingChannel<T> {
    let channel = Channel<T>()
    co {
        walk(tree, channel: channel)
        channel.close()
    }
    return channel.sendingChannel
}

从同时运行的两个漫步者读取值,如果tree1和tree2内容相同则返回true。

func ==<T : Equatable>(tree1: Tree<T>, tree2: Tree<T>) -> Bool {
    let channel1 = walker(tree1)
    let channel2 = walker(tree2)
    while true {
        let value1 = <-channel1
        let value2 = <-channel2
        if value1 == nil || value2 == nil {
            return value1 == value2
        }
        if value1 != value2 {
            break
        }
    }
    return false
}

返回一个新构造的二叉树,包含值1*k,2*k,…,n*k。

func newTree(n n: Int, k: Int) -> Tree<Int> {
    var tree: Tree<Int>?
    for value in (1...n).shuffle() {
        tree = insert(tree, value: value * k)
    }
    return tree!
}

在树中插入一个值

func insert(tree: Tree<Int>?, value: Int) -> Tree<Int> {
    if let tree = tree {
        if value < tree.value {
            tree.left = insert(tree.left, value: value)
            return tree
        } else {
            tree.right = insert(tree.right, value: value)
            return tree
        }
    } else {
        return Tree<Int>(left: nil, value: value, right: nil)
    }
}

let tree = newTree(n: 100, k: 1)

print("Same contents \(tree == newTree(n: 100, k: 1))")
print("Differing sizes \(tree == newTree(n: 99, k: 1))")
print("Differing values \(tree == newTree(n: 100, k: 2))")
print("Dissimilar \(tree == newTree(n: 101, k: 2))")

输出

Same contents true
Differing sizes false
Differing values false
Dissimilar false

24 - 禁用通道选择 ||

let channelA = Channel<String>()
let channelB = Channel<String>()

co(channelA <- "a")
co(channelB <- "b")

select { when in
    if arc4random_uniform(2) == 0 {
        print("disabled channel b")
        when.receiveFrom(channelA) { value in
            print("received \(value) from channel a")
        }
    } else {
        print("disabled channel a")
        when.receiveFrom(channelB) { value in
            print("received \(value) from channel b")
        }
    }
}

输出

disabled channel b
received a from channel a

disabled channel a
received b from channel b

25 - 假RSS客户端

struct Item : Equatable {
    let domain: String
    let title: String
    let GUID: String
}

func ==(lhs: Item, rhs: Item) -> Bool {
    return lhs.GUID == rhs.GUID
}

struct FetchResponse {
    let items: [Item]
    let nextFetchTime: Int
}

protocol FetcherType {
    func fetch() -> Result<FetchResponse>
}

struct Fetcher : FetcherType {
    let domain: String

    func randomItems() -> [Item] {
        let items = [
            Item(domain: domain, title: "Swift 2.0", GUID: "1"),
            Item(domain: domain, title: "Strings in Swift 2", GUID: "2"),
            Item(domain: domain, title: "Swift-er SDK", GUID: "3"),
            Item(domain: domain, title: "Swift 2 Apps in the App Store", GUID: "4"),
            Item(domain: domain, title: "Literals in Playgrounds", GUID: "5"),
            Item(domain: domain, title: "Swift Open Source", GUID: "6")
        ]
        return [Item](items[0..<Int(arc4random_uniform(UInt32(items.count)))])
    }

    func fetch() -> Result<FetchResponse> {
        if arc4random_uniform(2) == 0 {
            let fetchResponse = FetchResponse(
                items: randomItems(),
                nextFetchTime: now + 300 * millisecond
            )
            return Result.Value(fetchResponse)
        } else {
            struct Error : ErrorType, CustomStringConvertible { let description: String }
            return Result.Error(Error(description: "Network Error"))
        }
    }
}

protocol SubscriptionType {
    var updates: SendingChannel<Item> { get }
    func close() -> ErrorType?
}

struct Subscription : SubscriptionType {
    let fetcher: FetcherType
    let items = Channel<Item>()
    let closing = Channel<Channel<ErrorType?>>()

    init(fetcher: FetcherType) {
        self.fetcher = fetcher
        co(self.getUpdates())
    }

    var updates: SendingChannel<Item> {
        return self.items.sendingChannel
    }

    func getUpdates() {
        let maxPendingItems = 10
        let fetchDone = Channel<Result<FetchResponse>>(bufferSize: 1)

        var lastError: ErrorType?
        var pendingItems: [Item] = []
        var seenItems: [Item] = []
        var nextFetchTime = now
        var fetching = false

        forSelect { when, done in
            when.receiveFrom(closing) { errorChannel in
                errorChannel <- lastError
                self.items.close()
                done()
            }

            if !fetching && pendingItems.count < maxPendingItems {
                when.timeout(nextFetchTime) {
                    fetching = true
                    co {
                        fetchDone <- self.fetcher.fetch()
                    }
                }
            }

            when.receiveFrom(fetchDone) { fetchResult in
                fetching = false
                fetchResult.success { response in
                    for item in response.items {
                        if !seenItems.contains(item) {
                            pendingItems.append(item)
                            seenItems.append(item)
                        }
                    }
                    lastError = nil
                    nextFetchTime = response.nextFetchTime
                }
                fetchResult.failure { error in
                    lastError = error
                    nextFetchTime = now + 1 * second
                }
            }

            if let item = pendingItems.first {
                when.send(item, to: items) {
                    pendingItems.removeFirst()
                }
            }
        }
    }

    func close() -> ErrorType? {
        let errorChannel = Channel<ErrorType?>()
        closing <- errorChannel
        return !<-errorChannel
    }
}

let fetcher = Fetcher(domain: "developer.apple.com/swift/blog/")
let subscription = Subscription(fetcher: fetcher)

after(5 * second) {
    if let lastError = subscription.close() {
        print("Closed with last error: \(lastError)")
    } else {
        print("Closed with no last error")
    }
}

for item in subscription.updates {
    print("\(item.domain): \(item.title)")
}

输出

developer.apple.com/swift/blog/: Swift 2.0
developer.apple.com/swift/blog/: Strings in Swift 2
developer.apple.com/swift/blog/: Swift-er SDK
developer.apple.com/swift/blog/: Swift 2 Apps in the App Store
Closed with last error: Network Error

许可证

威尼斯 以下MIT许可证发布。详细信息请参阅LICENSE。