流式大数据处理

Kafka

event streaming

Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol.

  • Server
    • Broker
    • Kafka Connect
  • Client

Concepts

Event

Publisher

Consumer

Topic: event organized as topics. Topics in Kafka are always multi-producer and multi-subscriber.

Events in a topic can be ==read as often as needed==—unlike traditional messaging systems, events are ==not deleted after consumption==.

Instead, you define for how long Kafka should retain your events through a per-topic configuration setting, after which old events will be discarded.

Kafka's performance is effectively constant with respect to data size, so ==storing data for a long time is perfectly fine==.

Topics are partitioned, for scalability.

To make your data fault-tolerant and highly-available, every topic can be replicated, even across geo-regions or datacenters

安装运行

Linux

启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

可将bin目录加入路径,后续直接引用命令脚本。

docker

创建集群:

version: "2.2"
services:
  zookeeper:
    image: bitnami/zookeeper:3.7
    ports:
      - 2181:2181
    volumes:
      - zookeeper_data:/bitnami
    networks:
      - kafka
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  node-01:
    image: bitnami/kafka:2
    volumes:
      - data01:/bitnami
      - ./server01.properties:/bitnami/kafka/config/server.properties
    networks:
      - kafka
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_LISTENERS=PLAINTEXT://node-01:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://node-01:9092
    depends_on:
      - zookeeper
  node-02: 
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://node-02:9093
    ports:
      - 9093:9093   
volumes:
  zookeeper_data:
    driver: local
  data01:
    driver: local
  ......
networks:
  kafka:
    driver: bridge

由于外部需要访问所有节点,因此需要将每个服务的端口映射到宿主机的不同端口。外部访问时,仅指定其中一台节点。

额外配置项:指定容器名和容器主机名。

container_name: kafka-zookeeper
hostname: zookeeper

bitnami/bitnami-docker-kafka: Bitnami Docker Image for Kafka (github.com)

管理

CLI

主题管理

创建主题:

kafka-topics.sh --create --topic topic-name \
                --zookeeper HOST1:2181,HOST2:PORT2,...\
                --bootstrap-server HOST1:9092,...\ # [3.x]
                --partitions 1 \
                --replication-factor 2

对于2.x版本,必须指定zookeeper选项,不支持bootstrap-server选项;对于3.x版本可仅指定bootstrap-serverBootstrap server vs zookeeper in kafka? - Stack Overflow

状态监控:

kafka-topics.sh ... --list all # 列出所有主题名称
kafka-topics.sh ... --describe [--topic topic] # 输出主题信息--desc
# Topic: first-topic  TopicId: CjMZOiXeSXOaaXBlfZx_fQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
# Topic: first-topic  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
写入数据(生产者)

启动生产者以写入数据(通过Ctrl+C终止)。

kafka-console-producer.sh --topic topic-name \
                          --zookeeper node1:2181,node2:2181,... \
                          --bootstrap-server node1:9092,...

对于2.x版本,必须指定zookeeper选项,bootstrap-server为可选项;对于3.x版本可仅指定bootstrap-server(消费者参数同理)。

读取数据(消费者)
kafka-console-consumer.sh --topic topic-name \
                          --from-beginning \
                          --bootstrap-server ... --zookeeper ...

--from-beginning:从kafka缓存的数据开始处开始读取(默认从当前时间点读取)。

API

There are multiple Python libraries available for usage:

  • Kafka-Python — An open-source community-based library.
  • PyKafka — This library is maintained by Parsly and it’s claimed to be a Pythonic API. Unlike Kafka-Python you can’t create dynamic topics.
  • Confluent Python Kafka:- It is offered by Confluent as a thin wrapper around librdkafka, hence it’s performance is better than the two.

Kafka-Python

Admin
Producer
Consumer

Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science

import json
from time import sleep

from kafka import KafkaConsumer
import msgpack 

if __name__ == '__main__':
    parsed_topic_name = 'parsed_recipes'
    # Notify if a recipe has more than 200 calories
    calories_threshold = 200

    consumer = KafkaConsumer(
       parsed_topic_name, 
       auto_offset_reset='earliest',
       bootstrap_servers=['localhost:9092'], 
       api_version=(0, 10), 
       consumer_timeout_ms=1000,
       value_deserializer=msgpack.loads  # 解码:可能采用其他解码方法
    )
    for msg in consumer:
        record = json.loads(msg.value)
        calories = int(record['calories'])
        title = record['title']

        if calories > calories_threshold:
            print('Alert: {} calories count is {}'.format(title, calories))
        sleep(3)

    if consumer is not None:
        consumer.close()
解码方法

根据Kafka的数据序列化方法,可能需要选择合适的解码方法:

  • 纯文本,无需解码;
  • msgpack:使用msgpack.loads解码;如果kafka记录的value字段为JSON对象文本,则会自动将其转换为字典。
Kafka Stream:处理数据
Kafka Connect:导入导出数据(事件流)

Apache Kafka

message queue

对接Elasticsearch

当Elasticsearch遇见Kafka--Kafka Connect - 云+社区 - 腾讯云 (tencent.com)

当Elasticsearch遇见Kafka--Logstash kafka input插件 - 云+社区 - 腾讯云 (tencent.com)

对接Spark

  • spark-streaming-kafka:Spark流处理对接Kafka数据源;
  • spark-sql-kafka:Spark结构化流处理对接Kafka数据源;