流式大数据处理
Kafka
event streaming
Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol.
- Server
- Broker
- Kafka Connect
- Client
Concepts
Event
Publisher
Consumer
Topic: event organized as topics. Topics in Kafka are always multi-producer and multi-subscriber.
Events in a topic can be ==read as often as needed==—unlike traditional messaging systems, events are ==not deleted after consumption==.
Instead, you define for how long Kafka should retain your events through a per-topic configuration setting, after which old events will be discarded.
Kafka's performance is effectively constant with respect to data size, so ==storing data for a long time is perfectly fine==.
Topics are partitioned, for scalability.
To make your data fault-tolerant and highly-available, every topic can be replicated, even across geo-regions or datacenters
安装运行
Linux
启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
可将
bin目录加入路径,后续直接引用命令脚本。
docker
创建集群:
version: "2.2"
services:
  zookeeper:
    image: bitnami/zookeeper:3.7
    ports:
      - 2181:2181
    volumes:
      - zookeeper_data:/bitnami
    networks:
      - kafka
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  node-01:
    image: bitnami/kafka:2
    volumes:
      - data01:/bitnami
      - ./server01.properties:/bitnami/kafka/config/server.properties
    networks:
      - kafka
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_LISTENERS=PLAINTEXT://node-01:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://node-01:9092
    depends_on:
      - zookeeper
  node-02: 
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://node-02:9093
    ports:
      - 9093:9093   
volumes:
  zookeeper_data:
    driver: local
  data01:
    driver: local
  ......
networks:
  kafka:
    driver: bridge
由于外部需要访问所有节点,因此需要将每个服务的端口映射到宿主机的不同端口。外部访问时,仅指定其中一台节点。
额外配置项:指定容器名和容器主机名。
container_name: kafka-zookeeper hostname: zookeeper
bitnami/bitnami-docker-kafka: Bitnami Docker Image for Kafka (github.com)
管理
CLI
主题管理
创建主题:
kafka-topics.sh --create --topic topic-name \
                --zookeeper HOST1:2181,HOST2:PORT2,...\
                --bootstrap-server HOST1:9092,...\ # [3.x]
                --partitions 1 \
                --replication-factor 2
对于2.x版本,必须指定
zookeeper选项,不支持bootstrap-server选项;对于3.x版本可仅指定bootstrap-server。Bootstrap server vs zookeeper in kafka? - Stack Overflow
状态监控:
kafka-topics.sh ... --list all # 列出所有主题名称
kafka-topics.sh ... --describe [--topic topic] # 输出主题信息--desc
# Topic: first-topic  TopicId: CjMZOiXeSXOaaXBlfZx_fQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
# Topic: first-topic  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
写入数据(生产者)
启动生产者以写入数据(通过Ctrl+C终止)。
kafka-console-producer.sh --topic topic-name \
                          --zookeeper node1:2181,node2:2181,... \
                          --bootstrap-server node1:9092,...
对于2.x版本,必须指定
zookeeper选项,bootstrap-server为可选项;对于3.x版本可仅指定bootstrap-server(消费者参数同理)。
读取数据(消费者)
kafka-console-consumer.sh --topic topic-name \
                          --from-beginning \
                          --bootstrap-server ... --zookeeper ...
--from-beginning:从kafka缓存的数据开始处开始读取(默认从当前时间点读取)。
API
There are multiple Python libraries available for usage:
- Kafka-Python — An open-source community-based library.
- PyKafka — This library is maintained by Parsly and it’s claimed to be a Pythonic API. Unlike Kafka-Python you can’t create dynamic topics.
- Confluent Python Kafka:- It is offered by Confluent as a thin wrapper around librdkafka, hence it’s performance is better than the two.
Kafka-Python
Admin
Producer
Consumer
Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science
import json
from time import sleep
from kafka import KafkaConsumer
import msgpack 
if __name__ == '__main__':
    parsed_topic_name = 'parsed_recipes'
    # Notify if a recipe has more than 200 calories
    calories_threshold = 200
    consumer = KafkaConsumer(
       parsed_topic_name, 
       auto_offset_reset='earliest',
       bootstrap_servers=['localhost:9092'], 
       api_version=(0, 10), 
       consumer_timeout_ms=1000,
       value_deserializer=msgpack.loads  # 解码:可能采用其他解码方法
    )
    for msg in consumer:
        record = json.loads(msg.value)
        calories = int(record['calories'])
        title = record['title']
        if calories > calories_threshold:
            print('Alert: {} calories count is {}'.format(title, calories))
        sleep(3)
    if consumer is not None:
        consumer.close()
解码方法
根据Kafka的数据序列化方法,可能需要选择合适的解码方法:
- 纯文本,无需解码;
- msgpack:使用- msgpack.loads解码;如果kafka记录的- value字段为JSON对象文本,则会自动将其转换为字典。
Kafka Stream:处理数据
Kafka Connect:导入导出数据(事件流)
message queue
对接Elasticsearch
当Elasticsearch遇见Kafka--Kafka Connect - 云+社区 - 腾讯云 (tencent.com)
当Elasticsearch遇见Kafka--Logstash kafka input插件 - 云+社区 - 腾讯云 (tencent.com)
对接Spark
- spark-streaming-kafka:Spark流处理对接Kafka数据源;
- spark-sql-kafka:Spark结构化流处理对接Kafka数据源;