即将推出 - 打包和制作!
包含头文件
#include <kafkalite.h>
使用基本文件夹创建一个上下文
KLContext *context = kl_context_new("/tmp/kafka/", NULL);
A KLContext 管理一个或多个主题,消息可以被发布到这些主题,也可以从这些主题中消费。可以认为 KLContext 是一个主题组,其中正在编写的消息将被序列化。这允许客户端配置一组主题的并发写入级别(为了增加并行发布,可以简单地创建新的上下文)。消息可以被并发消费。
创建主题以发布消息
KLTopic *topic1 = kl_topic_open(context, "mytopic");
每个主题都是一个单独的队列,消息将按照它们入队的顺序被写入。可以在任何时间并发地打开和关闭主题。
重复调用打开主题时会返回原始实例。内部维护一个引用计数,确保只有在所有主题的所有者都调用主题的对应 kl_topic_close 时,才会对主题进行解分配。
当完成为主题调用
kl_topic_close(topic1)
关闭主题并从调用者手中放弃主题的所有权。
在关闭后使用此主题可能导致由于可能被解分配而导致的异常。内部主题的引用计数会递减(如果需要,则进行解分配)。
向主题发布消息
char *message = "hello world";
kl_topic_publish(topic1, message, strlen(message) + 1 /* + 1 for the null char */);
此应用程序会将消息附加到主题。消息非常简单,其内容的格式或结构不重要。(待办事项:是否也应允许在发布中允许 messageType(int),以允许在消费期间轻松和简单地过滤,而不会产生序列化的开销?)
验证和/或消费消息
从主题中消费消息是通过迭代器完成的。一个主题可以有多个消费者。每个消费者提供了一个迭代器接口,可以从主题的任何点开始,并按照发布的顺序消费消息。为了消费消息
KLIterator *iterator = kl_iterator_new(context, "mytopic", 0);
bool hasMore = kl_iterator_forward(iterator);
if (hasMore)
{
size_t msgsize = kl_iterator_msgsize(iterator);
KLMessage *message = (KLMessage *)malloc(sizeof(KLMessage) + msgsize);
kl_iterator_message(iterator, message);
// Do something in the message...
printf("Message: %s\n", message->data);
}
可以创建多个迭代器以从不同的偏移量开始,并且可以并发迭代和消费消息。
当完成为上下文调用
kl_context_destroy(context)
这会关闭上下文以及所有相关的主题。对上下文或此上下文中分配的主题的任何引用(使用 kl_topic_open)现在将不可用。