MQTTClientGJ 0.0.26

MQTTClientGJ 0.0.26

Alfian Losarialfian 维护。



  • 作者:
  • Alfian Losari

Build and Smoketest Status Documentation Maintenance GitHub Release Date GitHub last commit Discord : Gojek Courier

概述

Courier 是一个用于创建使用 MQTT 的长连接库,MQTT 是物联网消息的标准。长连接是客户端与服务器之间建立的持久连接,用于双向通信。长连接通过使用存活包(keepalive packets)来持续维护,以便即时更新。这也节省了移动设备上的电池和流量。

详细文档

在此处找到详细文档 - https://gojek.github.io/courier-iOS/

端到端的 courier 示例 - https://gojek.github.io/courier/docs/Introduction

开始使用

配置 Courier,在 iOS 设备和代理之间通过双向长连接订阅、发送和接收消息。

示例应用

您可以通过运行示例应用来连接任何可以配置的代理。从方案中选择 CourierE2EApp

安装

Courier 支持使用 Cocoapods 和 SPM 作为依赖管理器。它分为 5 个模块

  • CourierCore:包含 Courier 的公共 API,如协议和数据类型。其他模块对此模块有基本依赖。如果您想在项目中不添加 Courier 实现的情况下实现接口,则可以使用此模块。
  • CourierMQTT:使用 MQTT 实现了 CourierClientCourierSession。此模块依赖 MQTTClientGJ
  • MQTTClientGJ:开源库MQTT-Client-Framework 的分支版本。它增加了连接和空闲超时等特性。还修复了 MQTTSocketEncoderConnack 状态 5 在调用 MQTTTransportDidClose 之前未完成解码的竞态条件崩溃错误。
  • CourierProtobuf:使用 Protofobuf 实现了 ProtobufMessageAdapter。它依赖于 SwiftProtobuf 库,这是 optional 的,如果您的应用程序使用 protobuf 进行数据序列化,则可以使用此模块。
  • CourierMQTTChuck:可以用来检查底层 MQTT 连接的所有出站或入站数据包。它拦截所有数据包,持久化它们,并提供 UI 以访问所有发送或接收的 MQTT 数据包。它还提供了多个其他功能,如搜索、分享和清除数据。底层使用了 SwiftUI

Cocoapods

// Podfile
target 'Example-App' do
  use_frameworks!
  pod 'CourierCore'
  pod 'CourierMQTT'
  pod 'CourierProtobuf' #optional
  pod 'CourierMQTTChuck' #optional
end

Swift 包管理器(SPM)

简单地将包依赖项添加到您的 Package.swift 文件中,并在必要的目标中依赖于 CourierCoreCourierMQTT

dependencies: [
    .package(url: "https://github.com/gojek/courier-iOS", branch: "main")
]

实现 IConnectionServiceProvider 以提供用于身份验证的 ConnectOptions

要连接到 MQTT 代理,您需要实现 IConnectionServiceProvider。首先,您需要实现 IConnectionServiceProvider/clientId 以返回一个独一无二的字符串来标识您的客户端。这对于连接到代理的每个设备都必须是唯一的。

var clientId: String {
    UIDevice.current.identifierForVendor?.uuidString ?? UUID().uuidString
}

接下来,您需要实现 IConnectionServiceProvider/getConnectOptions(completion:) 方法。您需要提供 ConnectOptions 实例,该实例将用于连接到代理。此方法提供了一个逃逸闭包,以防您需要异步从远程 API 获取凭证。

func getConnectOptions(completion: @escaping (Result<ConnectOptions, AuthError>) -> Void) {
    executeNetworkRequest { (response: ConnectOptions) in
        completion(.success(connectOptions))
    } failure: { _, _, error in
        completion(.failure(error))
    }
}

以下是在 ConnectOptions 中需要提供的数据。

/// IP Host address of the broker
public let host: String
/// Port of the broker
public let port: UInt16
/// Keep Alive interval used to ping the broker over time to maintain the long run connection
public let keepAlive: UInt16
/// Unique Client ID used by broker to identify connected clients
public let clientId: String
/// Username of the client
public let username: String
/// Password of the client used for authentication by the broker
public let password: String
/// Tells broker whether to clear the previous session by the clients
public let isCleanSession: Bool

使用 CourierClientFactory 配置和创建 MQTT CourierClient 实例

接下来,我们需要创建使用 MQTT 作为其实现的 CourierClient 实例。初始化 CourierClientFactory 实例并调用 CourierClientFactory/makeMQTTClient(config:)。我们需要传递具有多个可定制的参数的 MQTTClientConfig 实例。

let clientFactory = CourierClientFactory()
let courierClient = clientFactory.makeMQTTClient(
    config: MQTTClientConfig(
        authService: HiveMQAuthService(),
        messageAdapters: [
            JSONMessageAdapter(),
            ProtobufMessageAdapter()
        ],
        autoReconnectInterval: 1,
        maxAutoReconnectInterval: 30
    )
)
  • MQTTClientConfig/messageAdapters:我们需要传递一个 MessageAdapter 数组。当从代理接收并向代理发送消息时将使用它。《CourierMQTT》为符合 Codable 协议的 JSON(JSONMessageAdapter)和 Plist(《PlistMessageAdapter》)格式提供了内置的消息适配器。您只能使用其中之一,因为两者都实现了到 Codable 以避免任何冲突。若使用 protobuf,请导入 CourierProtobuf 并传递 ProtobufMessageAdapter
  • MQTTClientConfig/authService:我们需要传递我们的 IConnectionServiceProvider 协议实现,以向客户端提供 ConnectOptions。
  • MQTTClientConfig/autoReconnectInterval 用于在连接丢失的情况下重新连接到代理的间隔。它将在每次尝试成功建立连接之前乘以2,直到成功连接。上限基于 MQTTClientConfig/maxAutoReconnectInterval

在CourierClient中管理连接生命周期

要连接到代理,只需调用 connect 方法

courierClient.connect()

要断开连接,只需调用 disconnect 方法

courierClient.disconnect()

要获取 ConnectionState,可以访问 CourierSession/connectionState 属性

courierClient.connectionState

您还可以使用 CourierSession/connectionStatePublisher 属性订阅 ConnectionState 发布者。Courier 提供的 observable API 与 Apple Combine 非常相似,尽管它内部使用 RxSwift 实现,因此我们可以支持 iOS 12

courierClient.connectionStatePublisher
    .sink { [weak self] self?.handleConnectionStateEvent($0) }
    .store(in: &cancellables)

由于 MQTT 支持 QoS 1 和 QoS 2 消息,以确保在没有互联网连接且用户重新连接到代理时的可靠性,我们也在本地缓存中持久化这些消息。要断开连接并删除所有这些缓存,可以调用。

courierClient.destroy()

使用 Courier 时,您需要注意以下几点

  • Courier 总是在应用进入后台时断开连接,因为 iOS 不支持在后台长时间运行套接字连接。
  • 如果存在要订阅的主题,Courier 总是在应用进入前台时自动重新连接。
  • Courier 使用 Reachability 框架处理由于互联网连接不良/丢失导致的重新连接。
  • Courier 使用可配置的 TTL 持久化 QoS > 0 消息,以防没有活跃订阅 Observable/Publisher。

MQTT中的 QoS 级别

服务质量(QoS)级别是消息发送方和接收方之间的一种协议,它定义了特定消息的投递保证。MQTT 中有 3 个 QoS 级别

  • 最多一次(0)
  • 至少一次(1)
  • 恰好一次(2)。

当在 MQTT 中讨论 QoS 时,您需要考虑消息投递的两个方面

  • 从发布客户端到代理的消息投递。
  • 从代理到订阅客户端的消息投递。

您可以从 HiveMQ 网站了解更多有关 MQTT QoS 详细信息。

从代理订阅主题

要从代理订阅一个主题,我们可以调用 CourierSession/subscribe(_:) 并传递包含主题字符串和 QoS 枚举的元组。

courierClient.subscribe(("chat/user1", QoS.zero))

您还可以订阅多个主题,调用 CourierSession/subscribe(_:) 并传递一个包含主题字符串和 QoS 枚举元组的数组。

courierClient.subscribe([
    ("chat/user1", QoS.zero),
    ("order/1234", QoS.one),
    ("order/123456", QoS.two),
])

从已订阅的主题接收消息

在您已订阅主题后,您需要通过传递相关主题来订阅一个消息发布者,使用 CourierSession/messagePublisher(topic:)。此方法使用 Generic 将二进制数据序列化到类型。确保您已提供可以解码数据到类型的关联 MessageAdapter。

courierClient.messagePublisher(topic: topic)
    .sink { [weak self] (note: Note) in
        self?.messages.insert(Message(id: UUID().uuidString, name: "Protobuf: \(note.title)", timestamp: Date()), at: 0)
    }.store(in: &cancellables)

此方法返回 AnyPublisher,您可以通过类似于 AnyPublisher/filter(predicate:)AnyPublisher/map(transform:) 的操作符进行链式调用。

Courier 提供的响应式 API 与 Apple Combine 非常相似,尽管它内部使用 RxSwift 实现,因此我们可以支持 iOS 12,只支持 filtermap 操作符。

从主题取消订阅

要从主题取消订阅,我们可以调用 CourierSession/unsubscribe(_:) 并传递一个主题字符串。

courierClient.unsubscribe("chat/user1")

您也可以取消订阅多个主题,调用 CourierSession/unsubscribe(_:) 并传递一个包含主题字符串和 QoS 枚举元组的数组。

courierClient.unsubscribe([
    "chat/user1",
    "order/"
])

向代理发送消息

要向代理发送消息,首先请确保您已经提供了一个能够将您的对象编码为二进制数据格式的MessageAdapter。例如,如果您有一个要发送为JSON的数据结构。请确保它符合Encodable协议,并在创建CourierClient实例时在MQTTClientConfig中传递JSONMessageAdapter

您只需调用CourierSession/publishMessage(_:topic:qos:),传入主题字符串和QoS枚举。这是一个throwing函数,在编码失败时可能会抛出异常。

let message = Message(
    id: UUID().uuidString,
    name: message,
    timestamp: Date()
)
        
try? courierService?.publishMessage(
    message,
    topic: "chat/1234",
    qos: QoS.zero
)

监听Courier内部事件

要监听Courier系统事件,如CourierEvent/connectionSuccessCourierEvent/connectionAttempt等,您可以在CourierEvent枚举中声明的更多情况下实现ICourierEventHandler协议,并实现ICourierEventHandler/onEvent(_:)方法。此方法将在任何Courier系统事件发生时被调用。

最后,确保对实例有强引用,并调用CourierEventManager/addEventHandler(_:)传入该实例。

courierClient.addEventHandler(analytics)

监控Courier MQTT数据包日志

CourierMQTTChuck用于检查底层MQTT连接的所有传入或传出的数据包。

它拦截所有数据包,持久化它们,并提供一个UI来访问所有发送或接收的MQTT数据包。它还提供诸如搜索、分享和清除数据等多种其他功能。

它使用SwiftUI作为底层数据库,最低iOS部署版本为15,您仍可以在iOS 11上构建它来访问记录器,而无需视图。

Simulator Screen Shot - iPhone 14 Pro - 2023-04-10 at 17 26 54

Simulator Screen Shot - iPhone 14 Pro - 2023-04-10 at 17 27 31

用法

将依赖项添加到 CourierMQTTChuck 并像这样简单声明日志记录器:

import CourierMQTTChuck

let logger = MQTTChuckLogger()

然后在 SwiftUI 视图中传递 logger 声明 MQTTChuckView

.sheet(isPresented: $showChuckView, content: {
            NavigationView {
                MQTTChuckView(logger: logger)
            }
        })

贡献指南

阅读我们的 贡献指南 了解我们的开发流程、如何提出问题修复和改进方案,以及如何构建和测试您对 Courier iOS 库的更改。

许可证

Courier 中的所有模块除了 MQTTClientGJ 都遵循 MIT 许可证。MQTTClientGJ 使用 Eclipse 许可证