0%

初学 kafka

kafka-node

kafka 是一个高吞吐的分布式发布订阅消息系统
当瞬发数据量非常庞大的时候,就可以使用 kafka 做消息队列, 由生产者把数据放入 kafka 服务的消息队列中,由 kafka 来控制调度消费者来消费掉这些数据,这个过程如果由 API 服务器直接做的话,很可能会因为并发量过大导致操作速度慢,甚至于宕机.

如果我们用 kafka 来控制这个过程,就会变的非常安全, kafka 的高吞吐量能够轻松容纳每秒数百万次的消息.

首先, kafka 是一项服务进程,所以我们要先安装到本地或者服务器上

  1. 使用 homebrew 安装
1
brew install kafka

会自动安装依赖zookeeper

  1. 安装配置文件位置
1
/usr/local/etc/kafka|zookeeper
  1. 启动 zookeeper
1
2
cd /usr/local/Cellar/kafka/0.10.0.1
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
  1. 启动 kafka 服务
1
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
  1. 创建 topic
1
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
  1. 查看创建的topic
1
./bin/kafka-topics --list --zookeeper localhost:2181
  1. 生产数据
1
./bin/kafka-console-producer --broker-list localhost:9092 --topic test1
  1. 消费数据
1
./bin/kafka-console-consumer --zookeeper localhost:2181 --topic test1 --from-beginning

到这里 kafka 的基本安装就完成了,接下来是编写生产和消费的代码, 环境使用的是 node.js,框架是 kafka-node.

编写业务逻辑代码

在生产中先创建一个连接 kafka 的对接对象

1
2
3
let client  =  new kafka.KafkaClient({
kafkaHost: confKafka.connData
})

这里的 kafkaHost 是一个 URL 字符串.而且要求 kafka 版本要比较高才可以用这个对象.
然后写具体逻辑

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
this.km  =  new kafka.KeyedMessage('key', 'message')
const payloads = [{
topic: 'test1',
messages: JSON.stringify(params),
partition: 0
},{
topic: 'test2',
messages: JSON.stringify(params),
partition: 0
},{
topic: 'test3',
messages: JSON.stringify(params),
partition: 0
}]

const producer = new kafka.Producer(this.client)
producer.on('ready', function () {3
// producer.createTopics('test3', false, function (err, data) {
// console.log(err, data)
// })
producer.send(payloads, function (err, data) {
console.log(111, err, data)
})
})

producer.on('error', function (err) {
console.log(222, err)
})

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
this.client  =  new kafka.KafkaClient({
kafkaHost: config.connData
})
let payloads = [{
topic: 'test1',
offset: 0,
partition: 0
}]
let options = {
autoCommit: true
}
const consumer = new kafka.Consumer(this.client, payloads, options)
// 业务处理
consumer.on('message', async (message) => {
let params = JSON.parse(message.value)
// 注册到TTN(业务逻辑)
// todo
await this.ttn.registerDevice(params.devID, params.eui)
// 保存到本地
// todo
// 成功后提交offset
consumer.setOffset(message.topic, message.partition, message.offset)
console.log(message)
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange', err)
})
consumer.on('err', function (err) {
console.log(err)
})

这样就可以完成生产者生产资料,发送到 kafka, 然后 kafka 调用消费者,让消费者去消费这些资料