流式大数据处理
Kafka
event streaming
Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol.
- Server
- Broker
- Kafka Connect
- Client
Concepts
Event
Publisher
Consumer
Topic: event organized as topics. Topics in Kafka are always multi-producer and multi-subscriber.
Events in a topic can be ==read as often as needed==—unlike traditional messaging systems, events are ==not deleted after consumption==.
Instead, you define for how long Kafka should retain your events through a per-topic configuration setting, after which old events will be discarded.
Kafka's performance is effectively constant with respect to data size, so ==storing data for a long time is perfectly fine==.
Topics are partitioned, for scalability.
To make your data fault-tolerant and highly-available, every topic can be replicated, even across geo-regions or datacenters
安装运行
Linux
启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
可将
bin
目录加入路径,后续直接引用命令脚本。
docker
创建集群:
version: "2.2"
services:
zookeeper:
image: bitnami/zookeeper:3.7
ports:
- 2181:2181
volumes:
- zookeeper_data:/bitnami
networks:
- kafka
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
node-01:
image: bitnami/kafka:2
volumes:
- data01:/bitnami
- ./server01.properties:/bitnami/kafka/config/server.properties
networks:
- kafka
ports:
- 9092:9092
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_LISTENERS=PLAINTEXT://node-01:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://node-01:9092
depends_on:
- zookeeper
node-02:
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://node-02:9093
ports:
- 9093:9093
volumes:
zookeeper_data:
driver: local
data01:
driver: local
......
networks:
kafka:
driver: bridge
由于外部需要访问所有节点,因此需要将每个服务的端口映射到宿主机的不同端口。外部访问时,仅指定其中一台节点。
额外配置项:指定容器名和容器主机名。
container_name: kafka-zookeeper hostname: zookeeper
bitnami/bitnami-docker-kafka: Bitnami Docker Image for Kafka (github.com)
管理
CLI
主题管理
创建主题:
kafka-topics.sh --create --topic topic-name \
--zookeeper HOST1:2181,HOST2:PORT2,...\
--bootstrap-server HOST1:9092,...\ # [3.x]
--partitions 1 \
--replication-factor 2
对于2.x版本,必须指定
zookeeper
选项,不支持bootstrap-server
选项;对于3.x版本可仅指定bootstrap-server
。Bootstrap server vs zookeeper in kafka? - Stack Overflow
状态监控:
kafka-topics.sh ... --list all # 列出所有主题名称
kafka-topics.sh ... --describe [--topic topic] # 输出主题信息--desc
# Topic: first-topic TopicId: CjMZOiXeSXOaaXBlfZx_fQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
# Topic: first-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
写入数据(生产者)
启动生产者以写入数据(通过Ctrl+C
终止)。
kafka-console-producer.sh --topic topic-name \
--zookeeper node1:2181,node2:2181,... \
--bootstrap-server node1:9092,...
对于2.x版本,必须指定
zookeeper
选项,bootstrap-server
为可选项;对于3.x版本可仅指定bootstrap-server
(消费者参数同理)。
读取数据(消费者)
kafka-console-consumer.sh --topic topic-name \
--from-beginning \
--bootstrap-server ... --zookeeper ...
--from-beginning
:从kafka缓存的数据开始处开始读取(默认从当前时间点读取)。
API
There are multiple Python libraries available for usage:
- Kafka-Python — An open-source community-based library.
- PyKafka — This library is maintained by Parsly and it’s claimed to be a Pythonic API. Unlike Kafka-Python you can’t create dynamic topics.
- Confluent Python Kafka:- It is offered by Confluent as a thin wrapper around librdkafka, hence it’s performance is better than the two.
Kafka-Python
Admin
Producer
Consumer
Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science
import json
from time import sleep
from kafka import KafkaConsumer
import msgpack
if __name__ == '__main__':
parsed_topic_name = 'parsed_recipes'
# Notify if a recipe has more than 200 calories
calories_threshold = 200
consumer = KafkaConsumer(
parsed_topic_name,
auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'],
api_version=(0, 10),
consumer_timeout_ms=1000,
value_deserializer=msgpack.loads # 解码:可能采用其他解码方法
)
for msg in consumer:
record = json.loads(msg.value)
calories = int(record['calories'])
title = record['title']
if calories > calories_threshold:
print('Alert: {} calories count is {}'.format(title, calories))
sleep(3)
if consumer is not None:
consumer.close()
解码方法
根据Kafka的数据序列化方法,可能需要选择合适的解码方法:
- 纯文本,无需解码;
msgpack
:使用msgpack.loads
解码;如果kafka记录的value
字段为JSON对象文本,则会自动将其转换为字典。
Kafka Stream:处理数据
Kafka Connect:导入导出数据(事件流)
message queue
对接Elasticsearch
当Elasticsearch遇见Kafka--Kafka Connect - 云+社区 - 腾讯云 (tencent.com)
当Elasticsearch遇见Kafka--Logstash kafka input插件 - 云+社区 - 腾讯云 (tencent.com)
对接Spark
spark-streaming-kafka
:Spark流处理对接Kafka数据源;spark-sql-kafka
:Spark结构化流处理对接Kafka数据源;