威尼斯 是一个纯 Swift/C 库,为 Swift 2 提供了 CSP。
Foundation
依赖(Linux 兼容)威尼斯 封装了一个修改过的 C 库版本 libmill。
威尼斯 是以下产品的基:
威尼斯 的速度非常快,因为它使用由 libmill 管理的轻量级协程而非由操作系统管理的线程。命令行应用程序中的 Chinese Whispers 示例显示了您如何创建高达 100,000 个并发协程(在 2015 年早期 8 GB 的 MacBook Pro 上进行了测试)。
您在自己的计算机上运行性能测试并亲眼目睹。只需运行 PerformanceTests.swift
中的测试。
co
func doSomething() {
print("did something")
}
// call sync
doSomething()
// call async
co(doSomething())
// async closure
co {
print("did something else")
}
nap
和 wakeUp
co {
// wakes up 1 second from now
wakeUp(now + 1 * second)
print("yawn")
}
// nap for two seconds so the program
// doesn't terminate before the print
nap(2 * second)
after
after
在指定持续时间后运行协程。
after(1 * second) {
print("yoo")
}
// same as
co {
nap(1 * second)
print("yoo")
}
Channel
通道是类型化的,返回包装值或 nil 的可选值,如果通道已关闭且缓冲区中无余值。
let messages = Channel<String>()
co(messages <- "ping")
let message = <-messages
print(message!)
// without operators
let messages = Channel<String>()
co(messages.receive("ping"))
let message = messages.send()
print(message!)
// buffered channels
let messages = Channel<String>(bufferSize: 2)
messages <- "buffered"
messages <- "channel"
print(!<-messages)
print(!<-messages)
ReceivingChannel
和 SendingChannel
您可以获取具有只接收或只发送通道功能的通道的引用。
func receiveOnly(channel: ReceivingChannel<String>) {
// can only receive
channel <- "yo"
}
func sendOnly(channel: SendingChannel<String>) {
// can only send
<-channel
}
let channel = Channel<String>(bufferSize: 1)
receiveOnly(channel.receivingChannel)
sendOnly(channel.sendingChannel)
FallibleChannel
可靠性通道接受值和错误。
struct Error : ErrorType {}
let channel = FallibleChannel<String>(bufferSize: 2)
channel <- "yo"
channel <- Error()
do {
let yo = try <-channel
try <-channel // will throw
} catch {
print("error")
}
选择
有时 选择
可能会与同名的系统库函数冲突。为了解决这个问题,您可以使用 Venice.select
或更简短的别名 sel
来调用 Venice 的 select
。
let channel = Channel<String>()
let fallibleChannel = FallibleChannel<String>()
select { when in
when.receiveFrom(channel) { value in
print("received \(value)")
}
when.receiveFrom(fallibleChannel) { result in
result.success { value in
print(value)
}
result.failure { error in
print(error)
}
}
when.send("value", to: channel) {
print("sent value")
}
when.send("value", to: fallibleChannel) {
print("sent value")
}
when.throwError(Error(), into: fallibleChannel) {
print("threw error")
}
when.timeout(now + 1 * second) {
print("timeout")
}
when.otherwise {
print("default case")
}
}
您可以通过将其设置为 nil 来禁用一个信道选择
var channelA: Channel<String>? = Channel<String>()
var channelB: Channel<String>? = Channel<String>()
if arc4random_uniform(2) == 0 {
channelA = nil
print("disabled channel a")
} else {
channelB = nil
print("disabled channel b")
}
co { channelA?.receive("a") }
co { channelB?.receive("b") }
sel { when in
when.receiveFrom(channelA) { value in
print("received \(value) from channel a")
}
when.receiveFrom(channelB) { value in
print("received \(value) from channel b")
}
}
另一种禁用信道选择的方法是将其 case 放入 if 中。
let channelA = Channel<String>()
let channelB = Channel<String>()
co(channelA <- "a")
co(channelB <- "b")
select { when in
if arc4random_uniform(2) == 0 {
print("disabled channel b")
when.receiveFrom(channelA) { value in
print("received \(value) from channel a")
}
} else {
print("disabled channel a")
when.receiveFrom(channelB) { value in
print("received \(value) from channel b")
}
}
}
forSelect
很多时候我们需要在 while 循环中包装我们的 select。为了使这种模式更容易工作,我们可以使用 forSelect
。当您调用 done()
之前,forSelect
将循环。
func flipCoin(result: FallibleChannel<String>) {
if arc4random_uniform(2) == 0 {
result <- "Success"
} else {
result <- Error(description: "Something went wrong")
}
}
let results = FallibleChannel<String>()
co(flipCoin(results))
forSelect { when, done in
when.receiveFrom(results) { result in
result.success { value in
print(value)
done()
}
result.failure { error in
print("\(error). Retrying...")
co(flipCoin(results))
}
}
}
IP
// local
do {
// all network interfaces
let ip1 = try IP(port: 5555, mode: .IPV4)
// specific network interface
let ip2 = try IP(networkInterface: "en0", port: 5555, mode: .IPV6)
} catch {
// something bad happened :(
}
// remote
do {
// if the deadline is reached the call will throw
let ip3 = try IP(address: "127.0.0.1", port: 5555, mode: .IPV4, deadline: now + 10 * second)
} catch {
// something bad happened :(
}
TCP
// server
do {
let ip = try IP(port: 5555)
let serverSocket = try TCPServerSocket(ip: ip)
let clientSocket = try serverSocket.accept()
let yo = try clientSocket.receiveString(untilDelimiter: "\n")
} catch {
// something bad happened :(
}
// client
do {
let ip = try IP(address: "127.0.0.1", port: 5555)
let clientSocket = try TCPClientSocket(ip: ip)
let deadline = now + 10 * second
// calls to send append the data to an internal
// buffer to minimize system calls
try clientSocket.sendString("yo\n", deadline: deadline)
// flush actually sends all data in the buffer
try clientSocket.flush()
} catch {
// something bad happened :(
}
如果希望不使用依赖管理器,可以将 Venice 手动集成到您的项目中。
cd
命令进入您的顶级项目目录,如果您还没有将项目初始化为 Git 仓库,请运行以下命令:$ git init
$ git submodule add https://github.com/Zewo/Venice.git
打开新的 Venice
文件夹,并将 Venice.xcodeproj
拖入您的应用程序 Xcode 项目的项目导航器中。
它应该位于您的应用程序蓝色项目图标下面。它是在所有其他 Xcode 组的上面还是下面无关紧要。
在项目导航器中选择 Venice.xcodeproj
,并验证部署目标与您的应用程序目标匹配。
+
按钮。您将看到两个不同的 Venice.xcodeproj
文件夹,每个文件夹中都有两个不同的版本 Venice.framework
存储在 Products
文件夹内。
哪个
Products
文件夹无关紧要,但您选择的Venice.framework
是顶层还是底层则很重要。
针对 OS X 选择顶部的 Venice.framework
,针对 iOS 选择底部的那个。
您可以通过检查项目的构建日志来验证您选择了哪一个。构建目标将列为
Venice iOS
或Venice OSX
。
一切就绪!
Venice.framework
将自动添加为目标依赖项、链接框架和嵌入框架,在复制文件构建阶段,这您需要构建模拟器和设备上的项目。
在命令行应用程序中使用 Venice
检查 Command Line Applications
目录获取示例。
示例 01-15 来自于gobyexample,并使用威尼斯从Go翻译成Swift。Xcode项目中包含一个包含以下所有示例的重磅区域。至少编译一次Venice OSX目标,然后您可以自由地与重磅区域示例进行交互。
协程是一个轻量级的执行线程。
func f(from: String) {
for i in 0 ..< 4 {
print("\(from): \(i)")
yield
}
}
假设我们有一个函数调用 f(s)
。以下是常规方式的调用,它是同步执行的。
f("direct")
使用 co(f(s))
协程来调用该函数。这个新的协程将与调用它的协程同时执行。
co(f("coroutine"))
您还可以使用闭包来启动协程。
co {
print("going")
}
现在我们的两个函数调用正在非同步的不同协程中运行,所以执行会跳转到这里。我们在程序退出前等待1秒。
nap(1 * second)
print("done")
当我们运行此程序时,首先看到阻塞调用的输出,然后是两个协程交互的输出。这种交错反映了运行时并发执行的协程。
direct: 0
direct: 1
direct: 2
direct: 3
coroutine: 0
going
coroutine: 1
coroutine: 2
coroutine: 3
done
通道是将并发协程连接的管道。您可以从一个协程向通道发送值,并将这些值接收进另一个协程。
使用 Channel
创建一个新通道。通道通过它们携带的值进行类型化。
let messages = Channel<String>()
使用 channel <- value
语法向通道发送值。这里,我们从一个新的协程向上面创建的 messages
通道发送 "ping"
。
co(messages <- "ping")
<-channel
语法接收通道中的值。这里我们将接收上面发送的 "ping"
消息并将其打印出来。
let message = <-messages
print(message!)
当我们运行程序时,“ping”消息成功地通过我们的通道从一个协程传递到另一个协程。默认情况下,发送和接收会阻塞,直到发送者和接收者都准备好。这种特性使得我们可以在程序末尾等待“ping”消息,而无需使用任何其他同步机制。
从通道接收到的值是 Optional
。如果您尝试从一个没有剩余值的缓冲区关闭的通道中获取值,它将返回 nil
。如果您确定有一个值被包装在 Optional
中,您可以使用 !<-
操作符,它返回一个隐式解包的选项。
ping
默认情况下,通道是无缓冲的,这意味着只有在准备好接收值时的接收(let value = <-channel
)才会接收从通道发送的值。有缓冲的通道在没有任何相应接收者的情况下接受有限数量的值。
这里我们创建了一个能够缓冲2个值的字符串通道。
let messages = Channel<String>(bufferSize: 2)
因为这个通道是有缓冲的,所以我们可以在没有相应并发接收的情况下将值发送到通道中。
messages <- "buffered"
messages <- "channel"
稍后我们可以像以往一样接收这两个值。
print(!<-messages)
print(!<-messages)
buffered
channel
我们可以使用通道在协程之间同步执行。以下是一个使用阻塞接收以等待协程完成的示例。
这是我们将在一个协程中运行的函数。使用 done
通道通知另一个协程该函数的工作已完成。
func worker(done: Channel<Bool>) {
print("working...")
nap(1 * second)
print("done")
done <- true // Send a value to notify that we're done.
}
启动一个工作协程,向其提供通知的通道。
let done = Channel<Bool>(bufferSize: 1)
co(worker(done))
在接收到工作从通道发送的通知之前阻塞。
<-done
如果您从程序中删除了<-done
行,程序将在工作器启动之前就退出。
working...
done
在将通道用作函数参数时,您可以指定通道是仅用于发送还是接收值。这种特定性提高了程序的类型安全性。
此ping
函数仅接受接收值的通道。尝试从该通道接收值将导致编译时错误。
func ping(pings: ReceivingChannel<String>, message: String) {
pings <- message
}
pong
函数接受一个仅发送值(pings
)的通道和一个仅接收值(pongs
)的通道。
func pong(pings: SendingChannel<String>, _ pongs: ReceivingChannel<String>) {
let message = !<-pings
pongs <- message
}
let pings = Channel<String>(bufferSize: 1)
let pongs = Channel<String>(bufferSize: 1)
ping(pings.receivingChannel, message: "passed message")
pong(pings.sendingChannel, pongs.receivingChannel)
print(!<-pongs)
passed message
Select允许您等待多个通道操作。将协程和通道与select结合使用是极其强大的功能。
在我们的例子中,我们将跨两个通道进行选择。
let channel1 = Channel<String>()
let channel2 = Channel<String>()
每个通道在一段时间后将接收一个值,以模拟例如阻塞RPC操作在并发协程中执行。
co {
nap(1 * second)
channel1 <- "one"
}
co {
nap(2 * second)
channel2 <- "two"
}
我们将使用select
同时等待这两个值,每当值到达时打印它们。
for _ in 0 ..< 2 {
select { when in
when.receiveFrom(channel1) { message1 in
print("received \(message1)")
}
when.receiveFrom(channel2) { message2 in
print("received \(message2)")
}
}
}
我们接收到的值依次是"one"
和"two"
,正如预期的那样。请注意,总执行时间仅为约2秒,因为1秒和2秒的nap
并发执行。
received one
received two
超时对于连接到外部资源或需要限制执行时间的程序很重要。由于通道和select
,实现超时既简单又优雅。
在我们的例子中,假设我们正在执行一个外部调用,该调用在2秒后将结果返回到一个名为channel1
的通道上。
let channel1 = Channel<String>(bufferSize: 1)
co {
nap(2 * second)
channel1 <- "result 1"
}
以下是实现超时的select
。 receiveFrom(channel1)
等待结果,而timeout(now + 1 * second)
等待在1秒超时后发送一个值。由于select
按照第一个准备好接收的接收操作来执行,因此如果操作超过允许的1秒,我们将采用超时情况。
select { when in
when.receiveFrom(channel1) { result in
print(result)
}
when.timeout(now + 1 * second) {
print("timeout 1")
}
}
如果我们允许更长的3秒超时,那么从channel2
的接收将成功,我们将打印结果。
let channel2 = Channel<String>(bufferSize: 1)
co {
nap(2 * second)
channel2 <- "result 2"
}
select { when in
when.receiveFrom(channel2) { result in
print(result)
}
when.timeout(now + 3 * second) {
print("timeout 2")
}
}
运行此程序将显示第一个操作超时,第二个成功。
使用此select超时模式需要通过通道进行结果通信。这在一般情况下是一个好主意,因为其他重要特性也是基于通道和select的。我们将接下来看两个这样的例子:计时器和报时器。
timeout 1
result 2
通道的发送和接收是阻塞的。然而,我们可以使用带有otherwise
子句的select
来实现非阻塞发送、接收,甚至非阻塞多路select
。
let messages = Channel<String>()
let signals = Channel<Bool>()
这里是一个非阻塞接收示例。如果数组messages
上有可用值,则select
将采用带有该值的receiveFrom(messages)
情况。如果没有,它将立即采用otherwise
情况。
select { when in
when.receiveFrom(messages) { message in
print("received message \(message)")
}
when.otherwise {
print("no message received")
}
}
非阻塞发送的工作方式类似。
let message = "hi"
select { when in
when.send(message, to: messages) {
print("sent message \(message)")
}
when.otherwise {
print("no message sent")
}
}
我们可以在otherwise
子句上方使用多个情况来实现多路非阻塞select。在这里,我们尝试在messages
和signals
上执行非阻塞接收。
select { when in
when.receiveFrom(messages) { message in
print("received message \(message)")
}
when.receiveFrom(signals) { signal in
print("received signal \(signal)")
}
when.otherwise {
print("no activity")
}
}
no message received
no message sent
no activity
关闭一个通道表示不能再向其中发送更多值。这可以用来将完成的信号传递给通道的接收端。
在这个例子中,我们将使用一个jobs
通道来将需要完成的任务传递给一个工作协程。当我们没有更多任务给工作协程时,我们将关闭jobs
通道。
let jobs = Channel<Int>(bufferSize: 5)
let done = Channel<Bool>()
以下是一个工作协程的示例。它通过j = <-jobs
重复从jobs
接收数据。如果jobs
已经被关闭,并且通道中的所有值都已经被接收,则返回值将是nil
。我们使用这个值在完成所有任务后,在done
上发出通知。
co {
while true {
if let job = <-jobs {
print("received job \(job)")
} else {
print("received all jobs")
done <- true
return
}
}
}
这个示例通过jobs
通道发送了3个任务给工作协程,然后关闭了它。
for job in 1 ... 3 {
print("sent job \(job)")
jobs <- job
}
jobs.close()
print("sent all jobs")
我们使用之前看到的同步方法等待工作协程。
<-done
关闭通道的概念自然地引出了我们下一个例子:遍历通道。
sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received job 3
received all jobs
我们可以使用for in
来遍历从通道接收到的值。我们将遍历queue
通道中的2个值。
let queue = Channel<String>(bufferSize: 2)
queue <- "one"
queue <- "two"
queue.close()
这个for in
循环将在从queue
接收每个元素时遍历。因为我们之前已经关闭了通道,所以在收到2个元素后,迭代将终止。如果我们没有关闭它,循环中的第3次接收将导致阻塞。
for element in queue {
print(element)
}
这个例子还展示了关闭一个非空的通道但仍然接收剩余值是可能的。
one
two
我们经常希望在未来的某个时刻执行代码,或者每隔一段时间重复执行。定时器和滴答器功能使得这两个任务变得容易。我们先看看定时器,然后再看看滴答器。
定时器代表未来的一个单一事件。你告诉定时器你想要等待多长时间,然后它提供一个在这个时间会发出通知的通道。这个定时器将等待2秒钟。
let timer1 = Timer(deadline: now + 2 * second)
<-timer1.channel
在定时器通道上阻塞,直到定时器发送一个值,表明定时器已过期。
<-timer1.channel
print("Timer 1 expired")
如果你只想等待,你可以使用nap
。定时器可能有用的一部分是因为你可以在它到期之前取消定时器。这里有一个例子来演示这一过程。
let timer2 = Timer(deadline: now + 1 * second)
co {
<-timer2.channel
print("Timer 2 expired")
}
let stop2 = timer2.stop()
if stop2 {
print("Timer 2 stopped")
}
第一个定时器将在我们启动程序后大约2秒到期,但第二个应该在它有机会到期之前被停止。
Timer 1 expired
Timer 2 stopped
定时器用于当你想要在未来某个时间做一次事情时 - 滴答器用于当你想要以固定间隔重复执行时。这里有一个滴答器的示例,它会定期滴答,直到我们停止它。
滴答器使用与定时器类似的机制:一个发送值的通道。在这里,我们将使用通道的内置函数generator
来迭代每500ms接收到的值。
let ticker = Ticker(period: 500 * millisecond)
co {
for time in ticker.channel {
print("Tick at \(time)")
}
}
滴答器可以像定时器一样停止。一旦滴答器停止,它将不会在通道上收到任何更多值。我们将滴答器在1600ms后停止。
nap(1600 * millisecond)
ticker.stop()
print("Ticker stopped")
当我们运行这个程序时,滴答器应该在停止之前滴答3次。
Tick at 37024098
Tick at 37024599
Tick at 37025105
Ticker stopped
在这个例子中,我们将看看如何使用协程和通道实现工作池。
以下是工作进程,我们将运行多个并发实例。这些工作进程将在jobs
通道上接收工作,并将相应的结果发送到results
。我们将为每个工作暂停一秒来模拟一个昂贵的任务。
func worker(id: Int, jobs: Channel<Int>, results: Channel<Int>) {
for job in jobs {
print("worker \(id) processing job \(job)")
nap(1 * second)
results <- job * 2
}
}
为了使用我们的工作进程池,我们需要向它们发送工作并收集结果。为此我们创建了2个通道。
let jobs = Channel<Int>(bufferSize: 100)
let results = Channel<Int>(bufferSize: 100)
这启动了3个工作进程,初始时由于还没有工作而处于阻塞状态。
for workerId in 1 ... 3 {
co(worker(workerId, jobs: jobs, results: results))
}
这里我们发送了9个工作项,然后关闭这个通道以表示我们所有的工作。
for job in 1 ... 9 {
jobs <- job
}
jobs.close()
最后,我们收集了所有工作的结果。
for _ in 1 ... 9 {
<-results
}
我们的运行程序显示了9个工作项被各个工作进程执行。由于有3个工作进程并行操作,该程序只需要大约3秒钟,尽管总共需要大约9秒钟的工作。
worker 1 processing job 1
worker 2 processing job 2
worker 3 processing job 3
worker 1 processing job 4
worker 2 processing job 5
worker 3 processing job 6
worker 1 processing job 7
worker 2 processing job 8
worker 3 processing job 9
速率限制是一个重要的机制,用于控制资源利用率和维持服务质量。威尼斯优雅地支持通过协程、通道和定时器实现速率限制。
首先,我们将看看基本的速率限制。假设我们想要限制对传入请求的处理。我们将通过同名通道处理这些请求。
var requests = Channel<Int>(bufferSize: 5)
for request in 1 ... 5 {
requests <- request
}
requests.close()
这个limiter
通道每200毫秒接收一个值。这是我们的速率限制方案中的调节器。
let limiter = Ticker(period: 200 * millisecond)
通过在每个请求前在一个接收动作上阻塞limiter
通道,我们可以将自己限制在每200毫秒处理一个请求。
for request in requests {
<-limiter.channel
print("request \(request) \(now)")
}
print("")
我们可能在速率限制方案中想要允许短暂的请求突发,同时保持整体速率限制。我们可以通过在速率限制器通道中缓冲来实现这一点。这个burstyLimiter
通道将允许最多3个事件的突发。
let burstyLimiter = Channel<Int>(bufferSize: 3)
填充这个通道以代表允许的突发。
for _ in 0 ..< 3 {
burstyLimiter <- now
}
每200毫秒,我们将尝试向burstyLimiter
添加一个新值,直到其容量的3个。
co {
for time in Ticker(period: 200 * millisecond).channel {
burstyLimiter <- time
}
}
现在模拟5个更多的传入请求。这5个中的前3个将受益于burstyLimiter
的突发能力。
let burstyRequests = Channel<Int>(bufferSize: 5)
for request in 1 ... 5 {
burstyRequests <- request
}
burstyRequests.close()
for request in burstyRequests {
<-burstyLimiter
print("request \(request) \(now)")
}
运行我们的程序,我们看到第一批请求每隔200毫秒处理一次,正如预期的那样。
对于第二批请求,由于突发速率限制,我们立即处理前3个,然后以大约每个200毫秒的延迟处理剩下的2个。
request 1 37221046
request 2 37221251
request 3 37221453
request 4 37221658
request 5 37221860
request 1 37221863
request 2 37221864
request 3 37221865
request 4 37222064
request 5 37222265
在这个例子中,我们的状态将由一个单一的协程拥有。这将保证数据在并发访问时不会被损坏。为了读取或写入该状态,其他协程会向拥有状态的协程发送消息,并接收相应的回复。这些ReadOperation
和WriteOperation
结构体封装了请求和所有权的协程响应的方式。
struct ReadOperation {
let key: Int
let responses: Channel<Int>
}
struct WriteOperation {
let key: Int
let value: Int
let responses: Channel<Bool>
}
我们将计数我们执行的操作数。
var operations = 0
reads
和
let reads = Channel<ReadOperation>()
let writes = Channel<WriteOperation>()
这是我们拥有state
(该状态属于状态保全协程的私有字典)的协程。这个协程重复选择在reads
和responses
通道上发送一个值以指示成功(以及在 readability 情况下的期望值)来执行。
co {
var state: [Int: Int] = [:]
while true {
select { when in
when.receiveFrom(reads) { read in
read.responses <- state[read.key] ?? 0
}
when.receiveFrom(writes) { write in
state[write.key] = write.value
write.responses <- true
}
}
}
}
这个协程通过reads
通道开始启动100个协程,向拥有状态的协程发出读取请求。每个读取都需要构建一个ReadOperation
,通过reads
通道发送它,然后在提供的responses
通道上接收结果。
for _ in 0 ..< 100 {
co {
while true {
let read = ReadOperation(
key: Int(arc4random_uniform(5)),
responses: Channel<Int>()
)
reads <- read
<-read.responses
operations++
}
}
}
我们以类似的方式启动了10个写入。
for _ in 0 ..< 10 {
co {
while true {
let write = WriteOperation(
key: Int(arc4random_uniform(5)),
value: Int(arc4random_uniform(100)),
responses: Channel<Bool>()
)
writes <- write
<-write.responses
operations++
}
}
}
让协程工作一秒钟。
nap(1 * second)
最后,捕获并报告 操作
计数。
print("operations: \(operations)")
operations: 55798
func whisper(left: ReceivingChannel<Int>, _ right: SendingChannel<Int>) {
left <- 1 + !<-right
}
let n = 1000
let leftmost = Channel<Int>()
var right = leftmost
var left = leftmost
for _ in 0 ..< n {
right = Channel<Int>()
co(whisper(left.receivingChannel, right.sendingChannel))
left = right
}
co {
right <- 1
}
print(!<-leftmost)
1001
final class Ball { var hits: Int = 0 }
func player(name: String, table: Channel<Ball>) {
while true {
let ball = !<-table
ball.hits++
print("\(name) \(ball.hits)")
nap(100 * millisecond)
table <- ball
}
}
let table = Channel<Ball>()
co(player("ping", table: table))
co(player("pong", table: table))
table <- Ball()
nap(1 * second)
<-table
ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping 11
var channelA: Channel<String>? = Channel<String>()
var channelB: Channel<String>? = Channel<String>()
if arc4random_uniform(2) == 0 {
channelA = nil
print("disabled channel a")
} else {
channelB = nil
print("disabled channel b")
}
co { channelA?.receive("a") }
co { channelB?.receive("b") }
select { when in
when.receiveFrom(channelA) { value in
print("received \(value) from channel a")
}
when.receiveFrom(channelB) { value in
print("received \(value) from channel b")
}
}
disabled channel b
received a from channel a
或
disabled channel a
received b from channel b
func fibonacci(n: Int, channel: Channel<Int>) {
var x = 0
var y = 1
for _ in 0 ..< n {
channel <- x
(x, y) = (y, x + y)
}
channel.close()
}
let fibonacciChannel = Channel<Int>(bufferSize: 10)
co(fibonacci(fibonacciChannel.bufferSize, channel: fibonacciChannel))
for n in fibonacciChannel {
print(n)
}
0
1
1
2
3
5
8
13
21
34
let tick = Ticker(period: 100 * millisecond).channel
let boom = Timer(deadline: now + 500 * millisecond).channel
var done = false
while !done {
select { when in
when.receiveFrom(tick) { _ in
print("tick")
}
when.receiveFrom(boom) { _ in
print("BOOM!")
done = true
}
when.otherwise {
print(" .")
nap(50 * millisecond)
}
}
}
.
.
tick
.
.
tick
.
.
tick
.
.
tick
.
BOOM!
func flipCoin(result: FallibleChannel<String>) {
struct Error : ErrorType, CustomStringConvertible { let description: String }
if arc4random_uniform(2) == 0 {
result <- "Success"
} else {
result <- Error(description: "Something went wrong.")
}
}
let results = FallibleChannel<String>()
var done = false
co(flipCoin(results))
while !done {
do {
let value = try !<-results
print(value)
done = true
} catch {
print("\(error) Retrying...")
co(flipCoin(results))
}
}
Something went wrong. Retrying...
Something went wrong. Retrying...
Something went wrong. Retrying...
Something went wrong. Retrying...
Something went wrong. Retrying...
Success
struct Error : ErrorType, CustomStringConvertible { let description: String }
func flipCoin(result: FallibleChannel<String>) {
if arc4random_uniform(2) == 0 {
result <- "Success"
} else {
result <- Error(description: "Something went wrong")
}
}
let results = FallibleChannel<String>()
co(flipCoin(results))
select { when in
when.receiveFrom(results) { result in
result.success { value in
print(value)
}
result.failure { error in
print(error)
}
}
}
Success
或
Something went wrong
extension CollectionType where Index == Int {
func shuffle() -> [Generator.Element] {
var list = Array(self)
list.shuffleInPlace()
return list
}
}
extension MutableCollectionType where Index == Int {
mutating func shuffleInPlace() {
if count < 2 { return }
for i in 0..<count - 1 {
let j = Int(arc4random_uniform(UInt32(count - i))) + i
guard i != j else { continue }
swap(&self[i], &self[j])
}
}
}
final class Tree<T> {
var left: Tree?
var value: T
var right: Tree?
init(left: Tree?, value: T, right: Tree?) {
self.left = left
self.value = value
self.right = right
}
}
深度优先遍历树,通过通道发送每个值。
func walk<T>(tree: Tree<T>?, channel: Channel<T>) {
if let tree = tree {
walk(tree.left, channel: channel)
channel <- tree.value
walk(tree.right, channel: channel)
}
}
在新的协程中启动遍历,并返回只读值的通道。
func walker<T>(tree: Tree<T>?) -> SendingChannel<T> {
let channel = Channel<T>()
co {
walk(tree, channel: channel)
channel.close()
}
return channel.sendingChannel
}
从同时运行的两个漫步者读取值,如果tree1和tree2内容相同则返回true。
func ==<T : Equatable>(tree1: Tree<T>, tree2: Tree<T>) -> Bool {
let channel1 = walker(tree1)
let channel2 = walker(tree2)
while true {
let value1 = <-channel1
let value2 = <-channel2
if value1 == nil || value2 == nil {
return value1 == value2
}
if value1 != value2 {
break
}
}
return false
}
返回一个新构造的二叉树,包含值1*k,2*k,…,n*k。
func newTree(n n: Int, k: Int) -> Tree<Int> {
var tree: Tree<Int>?
for value in (1...n).shuffle() {
tree = insert(tree, value: value * k)
}
return tree!
}
在树中插入一个值
func insert(tree: Tree<Int>?, value: Int) -> Tree<Int> {
if let tree = tree {
if value < tree.value {
tree.left = insert(tree.left, value: value)
return tree
} else {
tree.right = insert(tree.right, value: value)
return tree
}
} else {
return Tree<Int>(left: nil, value: value, right: nil)
}
}
let tree = newTree(n: 100, k: 1)
print("Same contents \(tree == newTree(n: 100, k: 1))")
print("Differing sizes \(tree == newTree(n: 99, k: 1))")
print("Differing values \(tree == newTree(n: 100, k: 2))")
print("Dissimilar \(tree == newTree(n: 101, k: 2))")
Same contents true
Differing sizes false
Differing values false
Dissimilar false
let channelA = Channel<String>()
let channelB = Channel<String>()
co(channelA <- "a")
co(channelB <- "b")
select { when in
if arc4random_uniform(2) == 0 {
print("disabled channel b")
when.receiveFrom(channelA) { value in
print("received \(value) from channel a")
}
} else {
print("disabled channel a")
when.receiveFrom(channelB) { value in
print("received \(value) from channel b")
}
}
}
disabled channel b
received a from channel a
或
disabled channel a
received b from channel b
struct Item : Equatable {
let domain: String
let title: String
let GUID: String
}
func ==(lhs: Item, rhs: Item) -> Bool {
return lhs.GUID == rhs.GUID
}
struct FetchResponse {
let items: [Item]
let nextFetchTime: Int
}
protocol FetcherType {
func fetch() -> Result<FetchResponse>
}
struct Fetcher : FetcherType {
let domain: String
func randomItems() -> [Item] {
let items = [
Item(domain: domain, title: "Swift 2.0", GUID: "1"),
Item(domain: domain, title: "Strings in Swift 2", GUID: "2"),
Item(domain: domain, title: "Swift-er SDK", GUID: "3"),
Item(domain: domain, title: "Swift 2 Apps in the App Store", GUID: "4"),
Item(domain: domain, title: "Literals in Playgrounds", GUID: "5"),
Item(domain: domain, title: "Swift Open Source", GUID: "6")
]
return [Item](items[0..<Int(arc4random_uniform(UInt32(items.count)))])
}
func fetch() -> Result<FetchResponse> {
if arc4random_uniform(2) == 0 {
let fetchResponse = FetchResponse(
items: randomItems(),
nextFetchTime: now + 300 * millisecond
)
return Result.Value(fetchResponse)
} else {
struct Error : ErrorType, CustomStringConvertible { let description: String }
return Result.Error(Error(description: "Network Error"))
}
}
}
protocol SubscriptionType {
var updates: SendingChannel<Item> { get }
func close() -> ErrorType?
}
struct Subscription : SubscriptionType {
let fetcher: FetcherType
let items = Channel<Item>()
let closing = Channel<Channel<ErrorType?>>()
init(fetcher: FetcherType) {
self.fetcher = fetcher
co(self.getUpdates())
}
var updates: SendingChannel<Item> {
return self.items.sendingChannel
}
func getUpdates() {
let maxPendingItems = 10
let fetchDone = Channel<Result<FetchResponse>>(bufferSize: 1)
var lastError: ErrorType?
var pendingItems: [Item] = []
var seenItems: [Item] = []
var nextFetchTime = now
var fetching = false
forSelect { when, done in
when.receiveFrom(closing) { errorChannel in
errorChannel <- lastError
self.items.close()
done()
}
if !fetching && pendingItems.count < maxPendingItems {
when.timeout(nextFetchTime) {
fetching = true
co {
fetchDone <- self.fetcher.fetch()
}
}
}
when.receiveFrom(fetchDone) { fetchResult in
fetching = false
fetchResult.success { response in
for item in response.items {
if !seenItems.contains(item) {
pendingItems.append(item)
seenItems.append(item)
}
}
lastError = nil
nextFetchTime = response.nextFetchTime
}
fetchResult.failure { error in
lastError = error
nextFetchTime = now + 1 * second
}
}
if let item = pendingItems.first {
when.send(item, to: items) {
pendingItems.removeFirst()
}
}
}
}
func close() -> ErrorType? {
let errorChannel = Channel<ErrorType?>()
closing <- errorChannel
return !<-errorChannel
}
}
let fetcher = Fetcher(domain: "developer.apple.com/swift/blog/")
let subscription = Subscription(fetcher: fetcher)
after(5 * second) {
if let lastError = subscription.close() {
print("Closed with last error: \(lastError)")
} else {
print("Closed with no last error")
}
}
for item in subscription.updates {
print("\(item.domain): \(item.title)")
}
developer.apple.com/swift/blog/: Swift 2.0
developer.apple.com/swift/blog/: Strings in Swift 2
developer.apple.com/swift/blog/: Swift-er SDK
developer.apple.com/swift/blog/: Swift 2 Apps in the App Store
Closed with last error: Network Error
威尼斯 以下MIT许可证发布。详细信息请参阅LICENSE。