kafka-node
kafka 是一个高吞吐的分布式发布订阅消息系统
当瞬发数据量非常庞大的时候,就可以使用 kafka 做消息队列, 由生产者把数据放入 kafka 服务的消息队列中,由 kafka 来控制调度消费者来消费掉这些数据,这个过程如果由 API 服务器直接做的话,很可能会因为并发量过大导致操作速度慢,甚至于宕机.
如果我们用 kafka 来控制这个过程,就会变的非常安全, kafka 的高吞吐量能够轻松容纳每秒数百万次的消息.
首先, kafka 是一项服务进程,所以我们要先安装到本地或者服务器上
- 使用 homebrew 安装
会自动安装依赖zookeeper
- 安装配置文件位置
1
| /usr/local/etc/kafka|zookeeper
|
- 启动 zookeeper
1 2
| cd /usr/local/Cellar/kafka/0.10.0.1 ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
|
- 启动 kafka 服务
1
| ./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
|
- 创建 topic
1
| ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
|
- 查看创建的topic
1
| ./bin/kafka-topics --list --zookeeper localhost:2181
|
- 生产数据
1
| ./bin/kafka-console-producer --broker-list localhost:9092 --topic test1
|
- 消费数据
1
| ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic test1 --from-beginning
|
到这里 kafka 的基本安装就完成了,接下来是编写生产和消费的代码, 环境使用的是 node.js,框架是 kafka-node.
编写业务逻辑代码
在生产中先创建一个连接 kafka 的对接对象
1 2 3
| let client = new kafka.KafkaClient({ kafkaHost: confKafka.connData })
|
这里的 kafkaHost 是一个 URL 字符串.而且要求 kafka 版本要比较高才可以用这个对象.
然后写具体逻辑
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| this.km = new kafka.KeyedMessage('key', 'message') const payloads = [{ topic: 'test1', messages: JSON.stringify(params), partition: 0 },{ topic: 'test2', messages: JSON.stringify(params), partition: 0 },{ topic: 'test3', messages: JSON.stringify(params), partition: 0 }]
const producer = new kafka.Producer(this.client) producer.on('ready', function () {3 producer.send(payloads, function (err, data) { console.log(111, err, data) }) })
producer.on('error', function (err) { console.log(222, err) })
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| this.client = new kafka.KafkaClient({ kafkaHost: config.connData }) let payloads = [{ topic: 'test1', offset: 0, partition: 0 }] let options = { autoCommit: true } const consumer = new kafka.Consumer(this.client, payloads, options)
consumer.on('message', async (message) => { let params = JSON.parse(message.value)
await this.ttn.registerDevice(params.devID, params.eui)
consumer.setOffset(message.topic, message.partition, message.offset) console.log(message) }) consumer.on('offsetOutOfRange', function (err) { console.log('offsetOutOfRange', err) }) consumer.on('err', function (err) { console.log(err) })
|
这样就可以完成生产者生产资料,发送到 kafka, 然后 kafka 调用消费者,让消费者去消费这些资料