客户管理系统开发定制Kafka详细教程入门

1 ——客户管理系统开发定制消息中间件

1.1 客户管理系统开发定制消息队列的作用

1.2 客户管理系统开发定制消息队列的概念——MQ

  • Message
客户管理系统开发定制在互联网中,客户管理系统开发定制多台设备产生通信的数据的总称:客户管理系统开发定制可以是视频、文本、音频等等。
  • 1
  • Quene
客户管理系统开发定制一种特殊的线性表,客户管理系统开发定制满足先进先出的原则。
  • 1

1.3 客户管理系统开发定制消息队列的种类

MQ分为两种:P2P : peer to peerPub/Sub : 客户管理系统开发定制发布与订阅
  • 1
  • 2
  • 3

1.3.1 peer to peer

1.3.2 pub/sub

1.3.3 客户管理系统开发定制二者之间的区别

共同点:客户管理系统开发定制消息生产者将消息生产到队列,客户管理系统开发定制消息消费者从队列中消费消息。不同点:p2p:客户管理系统开发定制一个生产者生产的消息客户管理系统开发定制只能被一个客户管理系统开发定制消费者消费。打电话pub/sub:客户管理系统开发定制每个消息都可以有多个消费者消费,客户管理系统开发定制说明消息存在队列中。客户管理系统开发定制所以他使用偏移量方式管理消息
  • 1
  • 2
  • 3
  • 4
  • 5

1.3.4 客户管理系统开发定制常见的消息队列

RabbitMQ : erlang编写。客户管理系统开发定制支持负载均衡,客户管理系统开发定制数据持久化。pub/sub\p2p

Redis : kv的nosql客户管理系统开发定制的缓存数据库,客户管理系统开发定制但是也支持pub/sub。客户管理系统开发定制对于短消息(小于10kb)的性能RabbitMQ 还好。

zeroMQ:轻量级的MQ。P2P

ActiveMQ:JMS实现,P2P,持久化,客户管理系统开发定制分布式事务

Kafka/Jafka:客户管理系统开发定制高性能跨语言分布式基于发布/客户管理系统开发定制订阅的全分布式支持数据持久化,客户管理系统开发定制也可以离线或者实时处理数据的。

RocketMQ:纯Java实现。发布/订阅,本地事务和分布式事务

2 Kafka快速入门

2.1 介绍

kafka是分布式的基于消息的发布-订阅的消息队列。LinkedIn(领英),Scala编写的

2.2 三大特点

  • 高吞吐量

可满足每秒百万级别的消息的生产和消费

  • 持久性

具备一套完整的消息的存储机制,可以确保消息数据的高效的安全的持久化

  • 分布式

既有扩展以及容错性。

2.3 kafka服务

  • topic : 主题,kafka处理的消息分为不同的分类,分类就是按照主题来划分。
  • broker:消息服务器的代理。kafka集群中的一个节点一般我都门都叫做一个broker;主要是用来存储消息。存在硬盘中。
  • partition:分区。Topic的在物理上的分组。一个topic在broker上被分为1个或者多个partition。分区在创建主题的时候指定的。
  • message:消息,通信的基本单位,每个消息属于某一个partition
  • Producer: 生产者,消息和数据都是由这个组件产生的,由它发送到kafka集群中的。
  • Consumer:消费者,消息和数据都是由这个组件来消费的。
  • Zookeeper: 他需要zk来做分布式协调

3 Kafka安装

3.1 安装

##1. 解压安装[root@hadoop software]# tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/[root@hadoop apps]# mv kafka_2.11-1.1.1/ kafka-1.1.1/[root@hadoop kafka-1.1.1]# vi /etc/profile## 自定义环境变量export JAVA_HOME=/opt/apps/jdk1.8.0_261export HADOOP_HOME=/opt/apps/hadoop-2.8.1export HIVE_HOME=/opt/apps/hive-1.2.1export HBASE_HOME=/opt/apps/hbase-1.2.1export COLLECT_HOME=/opt/apps/collect-appexport FRP_HOME=/opt/apps/frpexport SCRIPT_HOME=/opt/apps/scriptsexport SQOOP_HOME=/opt/apps/sqoop-1.4.7export AZKABAN_HOME=/opt/apps/azkaban-solo-serverexport SCALA_HOME=/opt/apps/scala-2.11.8export SPARK_HOME=/opt/apps/spark-2.2.0export KAFKA_HOME=/opt/apps/kafka-1.1.1export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HBASE_HOME/binexport PATH=$PATH:$COLLECT_HOME:$FRP_HOME:$SCRIPT_HOME:$SQOOP_HOME/bin:$KAFKA_HOME/binexport CLASS_PATH=.:$JAVA_HOME/libexport FLUME_HOME=/opt/apps/flume-1.9.0export PATH=$PATH:/opt/apps/flume-1.9.0/bin:$AZKABAN_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin##2. server.properties# The id of the broker. This must be set to a unique integer for each broker.broker.id=1 ## kafka的broker id的值,是一个整数,在集群中不能出现重复############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/opt/apps/kafka-1.1.1/logs ## kafka的日志文件的路径############################# Zookeeper #############################zookeeper.connect=hadoop:2181/kafka ## kafka的一些数据在zk中的目录##3. 接下来是全分布式的操作##3.1 拷贝,将本机的kafka-1.1.1目录拷贝到其他的节点上scp -r $KAFKA_HOME/ hadoop2:/opt/appsscp -r $KAFKA_HOME/ hadoop3:/opt/apps##3.2 修改server.properties# The id of the broker. This must be set to a unique integer for each broker.broker.id=2 ## kafka的broker id的值,是一个整数,在集群中不能出现重复
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

3.2 启动测试

##1. 如果是全分布式,请先启动zk的集群,如果不是全分布式请先启动kafka自带的zk的脚本。[root@hadoop bin]# sh zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties##2. 启动kafka的服务[root@hadoop bin]# sh kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties##3. 测试zk客户端连接zk服务[root@hadoop bin]# sh zookeeper-shell.sh localhost:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4 Kafka的基本操作

4.0 kafka在zookeeper中的目录说明

cluster

​ - id : {“version”:“1”,“id”:“_8TJvXJoQPqD1b2Vr-BVBA”} -> 包含了集群版本和集群的id

controller : {“version”:1,“brokerid”:1,“timestamp”:“1623914309134”} -> 控制partition的leader选举,topic的crud。当前集群中的brokerid=1的broker充当了controller的角色

controller_epoch :1。表示controller的纪元。表示controller的迭代。每当controller换一个broker,值自增1

brokers

​ -ids [1, 2, 3] -> 当前kafka中的broker的实例列表

​ -topics:[hadoop] --> 当前的kafka的主题列表

​ -seqid : 系统的序列id

consumers :

​ 老版本的kafka中适用于存放kafka的消费者信息,主要保存从就偏移量。

​ 新版本基本不用,消费的偏移量记录在集群中的broker节点的磁盘中,$KAFKA_HOME/logs/__consumer_offsets

config : 存放配置信息

4.1 Topic操作

4.1.1 创建主题

[root@hadoop bin]# kafka-topics.sh \> --create \ ## 创建事件> --topic hadoop \ ## 主题名称> --zookeeper hadoop/kafka \ ## kafka关联zk的地址> --partitions 3 \ ## 分区数> --replication-factor 1 ## 副本因子的个数必须要小于等于broker的实例的个数Created topic "hadoop".
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4.1.2 列举主题列表

[root@hadoop kafka-1.1.1]# kafka-topics.sh \> --list \> --zookeeper hadoop:2181/kafkahadoop
  • 1
  • 2
  • 3
  • 4

4.1.3 查看主题详情

[root@hadoop kafka-1.1.1]# kafka-topics.sh \> --describe \> --topic hadoop \> --zookeeper hadoop:2181/kafkaTopic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:        Topic: hadoop   Partition: 0    Leader: 1       Replicas: 1,3,2     Isr: 1,3,2        Topic: hadoop   Partition: 1    Leader: 2       Replicas: 2,1,3     Isr: 2,1,3        Topic: hadoop   Partition: 2    Leader: 3       Replicas: 3,2,1     Isr: 3,2,1        Partition: 当前的主题的分区号Replicas:副本因子,当前kafka对应的分区所在的broker实例的brokeridLeader:当前的kafka对应分区的broker中的leader,只有leader才负责处理读写请求Isr:该分区存活的副本对用的broker的borkerid
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

4.1.4 修改主题

[root@hadoop kafka-1.1.1]# kafka-topics.sh \> --alter \> --topic hadoop \> --partitions 4 \> --zookeeper hadoop:2181/kafkatip: 修改分区,只能+,不能-
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4.1.5 删除主题

[root@hadoop kafka-1.1.1]# kafka-topics.sh \> --delete \> --topic hadoop \> --zookeeper hadoop:2181/kafkaTopic hadoop is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4.2 测试kafka的生产与消费能力

4.2.1 开启生产端

[root@hadoop bin]# sh kafka-console-producer.sh \> --topic hadoop \> --broker-list hadoop:9092> 
  • 1
  • 2
  • 3
  • 4

4.2.2 开启消费端

[root@hadoop bin]# sh kafka-console-consumer.sh \> --topic hadoop \> --bootstrap-server hadoop:9092
  • 1
  • 2
  • 3
  • 4

4.2.3 消费者消费总结

​ kafka消费者在消费数据的时候,一般都是分组的。这个分组叫做消费者组。不同组的消费相互之间不影响。相同组内的消费,同组内的偏移量相互影响;但是需要注意的是,如果你的分区是3个,那么你的消费者组内的消费者最优是3个,因为再多也没有用了,因为组内的消费者,一个消费者消费一个主题的分区。

[root@hadoop bin]# sh kafka-console-consumer.sh \> --topic hadoop \> --group default \ ##指定到default的消费者组> --bootstrap-server hadoop:9092 \> --partition 2 ## 消费分区2> --offset ealiest ## 从什么位置开始消费
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5 Kafka的API操作

5.1 导入依赖

<!-- kafka --><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.11</artifactId>    <version>${kafka.version}</version></dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5.2 生产者和消费者

5.2.1 Demo1_Producer

package com.zxy.bigdata.kafka.day2;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.IntegerSerializer;import org.apache.kafka.common.serialization.StringSerializer;import java.io.IOException;import java.util.Properties;public class Demo1_Producer {    public static void main(String[] args) throws IOException, InterruptedException {        // 1.创建配置属性对象        Properties properties = new Properties();        properties.load(Demo1_Producer.class.getClassLoader().getResourceAsStream("producer.properties"));        /* 2.创建生产者对象         * K : 向topic发送的消息的key的类型         * V : 向topic发送的消息的value的类型         */        Producer<Integer, String> producer = new KafkaProducer(properties);        //3. 发送消息        for(int start = 0; start < 20; start++) {            ProducerRecord<Integer, String> record = new ProducerRecord<>("hadoop", "ni dead bu dead");            producer.send(record);        }        Thread.sleep(10000);        //4 释放资源        producer.close();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

5.2.2 producer.properties

bootstrap.servers=hadoop:9092key.serializer=org.apache.kafka.common.serialization.IntegerSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializeracks=[0|-1|1|all] ## 消息确认机制				## 0 : 不做确认,只管发送				## -1|all : 首先保证leader将数据写入到磁盘,并确认;还要保证等待数据同步到其他的非leader节点。				## 1 : 只确保leader写入数据完毕。后期leader和其他节点自动完成同步。batch.size=1024  ## 每个分区内的用户缓存未发送记录的容量linger.ms=10     ## 无论你的缓冲区是否填满,都会延迟10ms发送请求buffer.memory=10240 ## 设置你的生产者的所有的缓存空间retries=0       ## 发送消息失败之后重试的次数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

5.2.3 Demo2_Consumer

package com.zxy.bigdata.kafka.day2;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.IOException;import java.util.Arrays;import java.util.Properties;public class Demo2_Consumer {    public static void main(String[] args) throws IOException {        // 1.创建配置属性对象        Properties properties = new Properties();        properties.load(Demo1_Producer.class.getClassLoader().getResourceAsStream("consumer.properties"));        //2. 创建消费者对象        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties);        //3. 订阅主题        consumer.subscribe(Arrays.asList("hadoop"));        //4. 拉取        while (true) {            ConsumerRecords<Integer, String> records = consumer.poll(1000);            for (ConsumerRecord<Integer, String> record : records) {                Integer key = record.key();                String value = record.value();                int partition = record.partition();                long offset = record.offset();                String topic = record.topic();                System.out.println(key + "-->" + value + "-->" + partition + "-->" + offset + "-->" + topic);            }        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

5.2.4 consumer.properties

bootstrap.servers=hadoop:9092group.id=hzbigdata2101key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializerauto.offset.reset=earliest    ## ealiest latest none
  • 1
  • 2
  • 3
  • 4
  • 5

5.3 操作Topic

5.3.1 创建主题

package com.zxy.bigdata.kafka.day3;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.NewTopic;import java.util.Arrays;import java.util.Properties;public class Demo1_Admin {    public static void main(String[] args) {        //1. 创建一个管理员对象        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "hadoop:9092"); // 146.56.208.76        AdminClient admin = AdminClient.create(properties);        //2. 创建主题        admin.createTopics(Arrays.asList(new NewTopic("spark", 3, (short)1)));        //3. 释放资源        admin.close();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5.3.2 查询主题

package com.zxy.bigdata.kafka.day3;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.ListTopicsResult;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.common.KafkaFuture;import java.util.Arrays;import java.util.Properties;import java.util.Set;import java.util.concurrent.ExecutionException;public class Demo2_Admin {    public static void main(String[] args) throws ExecutionException, InterruptedException {        //1. 创建一个管理员对象        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "hadoop:9092"); // 146.56.208.76        AdminClient admin = AdminClient.create(properties);        //2. 查询主题        ListTopicsResult listTopicsResult = admin.listTopics();        //2.1 获取到你所有的主题名称        KafkaFuture<Set<String>> names = listTopicsResult.names();        Set<String> topics = names.get();        //2.2 遍历        for (String topic : topics) {            System.out.println(topic);        }                //3. 释放资源        admin.close();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

5.3.3 查询详情

package com.zxy.bigdata.kafka.day3;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.DescribeTopicsResult;import org.apache.kafka.clients.admin.ListTopicsResult;import org.apache.kafka.clients.admin.TopicDescription;import org.apache.kafka.common.KafkaFuture;import org.apache.kafka.common.TopicPartitionInfo;import java.util.*;import java.util.concurrent.ExecutionException;public class Demo3_Admin {    public static void main(String[] args) throws ExecutionException, InterruptedException {        //1. 创建一个管理员对象        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "hadoop:9092"); // 146.56.208.76        AdminClient admin = AdminClient.create(properties);        //2. 查询主题        DescribeTopicsResult describeTopicsResult = admin.describeTopics(Arrays.asList("hadoop"));        KafkaFuture<Map<String, TopicDescription>> mapResult = describeTopicsResult.all();        Map<String, TopicDescription> map = mapResult.get();        //3. 遍历        for (Map.Entry<String, TopicDescription> entry:map.entrySet()) {            System.out.println(entry.getKey() + "  ----->");            TopicDescription value = entry.getValue();            System.out.println(value.name());            List<TopicPartitionInfo> partitions = value.partitions();            for (TopicPartitionInfo info : partitions) {                System.out.println(info.partition() + ":" +info.leader()+":"+info.replicas());            }        }        //3. 释放资源        admin.close();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

5.4 record进行主题的分区策略

每条ProducerRecord(topic名称,可选的partition编号,以及一组key和value)

  1. 如果指定了partition,按照指定的分区编号发送
  2. 如果没有指定partition,但是指定了key,使用key进行hash,根据hash结果选择partition
  3. 如果没有指定partition也没有指定key,那么是轮循的方式选择partition

5.5 分区器

5.5.1 核心类

public interface Partitioner extends Configurable, Closeable {    /**     * Compute the partition for the given record.     *     * @param topic The topic name     * @param key The key to partition on (or null if no key)     * @param keyBytes The serialized key to partition on( or null if no key)     * @param value The value to partition on or null     * @param valueBytes The serialized value to partition on or null     * @param cluster The current cluster metadata     */    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);    /**     * This is called when partitioner is closed.     */    public void close();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

5.5.2 自定义分区器

5.5.2.1 随机分区器
  • 代码
public class RandomPartitioner implements Partitioner {    private Random random = new Random();    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        //1. 根据主题获取到当前的主题的分区数量        Integer partitionNum = cluster.partitionCountForTopic(topic);        //2. 随机分区        int partitionId = random.nextInt(partitionNum);        return partitionId;    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> configs) {    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 注册
partitioner.class=com.zxy.bigdata.kafka.day3.RandomPartitioner
  • 1
5.5.2.2 Hash分区器
  • 编码
package com.zxy.bigdata.kafka.day3;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class HashPartitioner implements Partitioner {    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        //1. 根据主题获取到当前的主题的分区数量        Integer partitionNum = cluster.partitionCountForTopic(topic);        int partitionId = 0;        if (key != null) partitionId = Math.abs(key.hashCode()) % partitionNum;        System.out.println("key = " + key + ", partition : " + partitionId);        return partitionId;    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> configs) {    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 注册
partitioner.class=com.zxy.bigdata.kafka.day3.HashPartitioner
  • 1

6 Flume + Kafka

flume采集日志:既可以做离线的数据分析,也可以做实时的数据分析

6.1 创建新主题

[root@hadoop bin]# kafka-topics.sh \> --create \> --topic flume_kafka \ > --zookeeper hadoop/kafka \ > --partitions 3 \> --replication-factor 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

6.2 flume-kafka-sink.conf

# 给source、channel、sink命名a1.channels = channel1a1.sources = source1a1.sinks = sink1# 1. source# source的类型a1.sources.source1.type = netcata1.sources.source1.bind = hadoopa1.sources.source1.port = 6666# 2. channela1.channels.channel1.type = memorya1.channels.channel1.capacity = 10000a1.channels.channel1.transactionCapacity = 10000a1.channels.channel1.byteCapacityBufferPercentage = 20a1.channels.channel1.byteCapacity = 800000# 3. sinka1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.sink1.kafka.bootstrap.servers = hadoop:9092a1.sinks.sink1.kafka.topic = flume_kafkaa1.sinks.sink1.flumeBatchSize = 100a1.sinks.sink1.kafka.producer.acks = 1a1.sinks.sink1.kafka.producer.linger.ms = 1a1.sinks.sink1.kafka.producer.compression.type = snappya1.sources.source1.channels = channel1a1.sinks.sink1.channel = channel1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

6.3 测试

##1. 开启kafka##2. 开启flumenohup flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf -f /opt/apps/flume-1.9.0/conf/flume-kafka-sink.conf > /dev/null 2>&1 &##3. 开启消费者[root@hadoop bin]# sh kafka-console-consumer.sh \> --topic flume_kafka \> --bootstrap-server hadoop:9092##4. 下载netcat[root@hadoop ~]# yum -y install telnet[root@hadoop ~]# telnet 10.206.0.4 6666
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

7 Kafka架构

7.1 使用kafka的组件

7.1.1 replica

每个分区根据副本因子N,都会有N个副本。比如再broker1上面有一个topic1,副本因子2。那么就会由两个borker包含一样的数据,其中一个是leader,另一个是follower

7.1.2 segment

分区物理上由segment组成。segment中存放着message

7.1.3 leader

每个分区有多个副本,其中只有一个作为leader。Leader负责数据的读写

7.1.4 follower

​ 跟随leader,follower所有的读写都是由leader路由,leader广播给所有的follower。当leader宕机了,就会从剩下的follower中选取一个新的leader。如果当某个follower太慢、宕机、卡住这种发生的时候,leader就无情的叫他抛弃。“in sync replicas”(isr)列表中剔除

7.1.5 offset

​ kafka的存储文件都是按照${offset}.log来命名,以偏移量命名最大的好处方面查找。比如你想找2019位置的数据你只需要找到2019范围的文件即可。000000000000.log、0000000002022.log、00000000003033.log

7.2 架构重画

7.3 分布式模型——生产过程中需要主要注意的事项

​ kafka的分布式主要指的是分区被分布在多台server(broker),同时每个分区都有Leader和follower(不是必须的),这个理解大佬和马仔的关系。大佬负责读写(处理数据),马仔负责同步。在大佬挂掉的情况,马仔也可以成为大佬。

​ kafka的分区日志(message)被分布在kafka集群的服务器上。每个服务器都处理和共享这些分区请求。每个分区是被复制到一系列配置好的服务器上来进行容错。

​ 每个分区都有一个leader和0到多个follower。leader负责读写请求处理,follower负责同步。正常情况下每个服务器都能作为分区的一个leader和其他分区的follower。当然这些都是从内部的说法。从外部而言kafka是一个去中心化的集群。

7.4 Kafka的文件存储

​ 每个分区有多个segment file。创建分区的时候默认就会生成一个segment file 默认大小1G,当producer向分区生产数据的时候,如果内存满了,就会将数据溢出到segment file中。当segment file存储数据超过了1G的时候,就会生成第二个segment file:1G。一个segment file实际分为xxxx.log和xxxx.lindex。

​ 每个segment file都有自己的命名规则,第N个segment file的命名就是以第(N -1) 个的segment file的最后一条小的的offset。

​ 在.index文件中,存储的k-v格式的数据,其中key代表.log文件中顺序开始的第n条消息,value表示的该消息对应位置的偏移量。但是.index并不是对每条消息都记录,他是隔一些消息记录一次,避免占用太多的内存。

7.5 topic中的partition

7.5.1 为什么分区?

​ 为什么要分区?将一个完整大文件,划分到多个目录下管理,这个目录就是我所谓的分区。

​ 分区的好处?1.方便在集群中扩展;2. 提高并发

7.5.2 单点partition的存储分布

​ 如果kafka只有一个broker,这种一般称之为伪分布式。默认的分布式日志:/var/log/kafka-log目录为数据存储目录。我们的数据实际是存放在这个下面:

topic-partitionId

e.g.
hadoop-0

hadoop-1

hadoop-2

7.5.3 多点partition的存储分布

几乎和单点一样,不过每个节点上都应该有日志目录

7.5.4 分区分配broker的策略

  1. 将所有的broker(n个)和partition排序
  2. 将第i个partition分配到(i mod n)个broker上
  3. 副本将i个partition的第j个副本分配到((i + j)mod n)个broker
  • 举例
## test的topic, 4个分区, 2个副本。Topic:test    PartitionCount:4        ReplicationFactor:2     Configs:        Topic: test   Partition: 0    Leader: 0       Replicas: 0,1     Isr: 0,3        Topic: test   Partition: 1    Leader: 1       Replicas: 1,2     Isr: 1,2         Topic: test   Partition: 2    Leader: 2       Replicas: 2,3     Isr: 2,1        Topic: test   Partition: 3    Leader: 3       Replicas: 3,0     Isr: 3,0                第1个partition分配到第(0 % 4)= 0第2个partition分配到第(1 % 4)= 1第1个partition分配到第(2 % 4)= 2第1个partition分配到第(3 % 4)= 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

7.5.5 partition中文件存储

7.5.5.1 分区中的文件存储1

每个分区都是由一堆的segment file构成(默认是一个file 一个 GB).

每个partition其实是被平均分配给很多的segment。但是实际每个segment file消息数量不一定相等

每个partition只需要支持顺序读写,segment file的生命周期是由服务区端配置决定的,默认是7天。

这样的好处是快速删除无用文件,提高磁盘利用率

7.5.5.2 kafka中的segment

7.5.6 segment的文件结构

7.5.7 message的物理结构

关键字说明
8 byte offset在分区之内每个消息都有一个有序的id号,这就是offset。他可以唯一的确定消息在分区的位置。
4 byte message size消息的大小
4 byte CRC32校验消息
1 byte “magic”表示的发布的kafka的协议的版本
1 byte “attribute”独立版本、压缩类型、编码类型
4 byte key lengthkey的长度,当key为-1的时候,这个K字段就不填。
K byte key可选
Value bytes表示实际的消息的数据

7.6 Consumer Group

​ kafka提供的一组可扩展的并且具有一定的容错性的消费者机制。既然是一个组,这个组中一定包含了多个消费者,这些消费者共享了同一个公共的ID(Group ID)。这些组内的消费者是协调在一起来订阅主题的(分区)。当然,一个组内的消费者实例只能消费一个分区。

  • consumer group下必须得有一个或多个consumer instance。这个consumer instance可以是一个线程,也可以是一个进程。
  • group.id是一个字符串,是一个消费者组的唯一标识。
  • consumer group下订阅的主题,一般情况下保证消费者数量和分区数量一一对应可以达到性能最优化

7.7 offset的维护

​ consumer消费数据的时候使用的。

​ 那么kafka默认是定期的帮助你提交offset(enable.auto.commit=true),你也可以手动提交,在实际的生产环境开发中我们都是手动管理偏移量。另外kafka也会定期的把consumer group的消费情况保存起来,做成要给offset map,如下图:

​ 有一个叫做hzbigdata2101的消费者组,消费的Topic-A的两个分区,对Topic-A-0的分区消费到偏移量为9的位置,对Topic-A-1的消费到偏移量为6的位置。

消息队列

什么是

消息中间件,是利用高效可靠的消息传递机制进行异步的数据传输,并基于数据通信进行分布式系统的集成。通过提供那个消息队列模型和消息传递机制,可以在分布式环境下扩展进程间的通信。

消息中间件的传递模式

1.点对点模式

消息生产者讲消息发送到队列种,消息消费者从队列中接收消息。消息可以在队列中进行异步传输。

2.发布/订阅模式

发布订阅模式是通过一个内容节点来发布和订阅消息,这个内容结点成为主题,消息发布者将消息发布到某个主题,消息订阅者订阅这个主题的消息,主题相当于一个中介。主题的消息的发布与订阅相互独立,该模式的消息的传播是一对多的模式。

消息中间件可以做什么

应用程序之间可以不采取直接通信,而是采取消息中间件作为中介,做到数据的异步通信。开发人员不需要考虑网络协议和远程调用的问题,只需要通过各种消息中间件提供的API,就可以完成简单的消息推送,和消息接收等业务功能。

消息的生产者将消息存储到队列中,消息的消费者不一定马上消费消息,可以的能到自己想要用到这个消息的时候,再从响应的队列中获取消息。这样的设计可以很好的解决大数据传输过程中占用资源的问题,使数据传递和平台分开,不需要分配资源用于数据传输,有效的利用当前的资源。

常见的消息队列

Message Quene(MQ)

  • Message
在互联网中,多台设备产生通信的数据的总称:可以是视频、文本、音频等等。
  • 1
  • Quene
一种特殊的线性表,满足先进先出的原则。
  • 1

Kafka介绍

kafka是分布式的基于消息的发布-订阅的消息队列。LinkedIn(领英),Scala编写的

2.2 三大特点

  • 高吞吐量

可满足每秒百万级别的消息的生产和消费

  • 持久性

具备一套完整的消息的存储机制,可以确保消息数据的高效的安全的持久化

  • 分布式

既有扩展以及容错性。

2.3 kafka服务

  • topic : 主题,kafka处理的消息分为不同的分类,分类就是按照主题来划分。
  • broker:消息服务器的代理。kafka集群中的一个节点一般我都门都叫做一个broker;主要是用来存储消息。存在硬盘中。
  • partition:分区。Topic的在物理上的分组。一个topic在broker上被分为1个或者多个partition。分区在创建主题的时候指定的。
  • message:消息,通信的基本单位,每个消息属于某一个partition
  • Producer: 生产者,消息和数据都是由这个组件产生的,由它发送到kafka集群中的。
  • Consumer:消费者,消息和数据都是由这个组件来消费的。
  • Zookeeper: 他需要zk来做分布式协调

Kafka架构

RabbitMQ消息队列

使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现

AMQP协议更多的用于企业系统中,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。AMQP(高级消息队列协议)是一个进程间传递一部消息的网络协议。发布者发布消息,经过交换机,交换机根据路由规则将收到的消息分发给交换机绑定的队列,最后AMQP代理会将消息传递给此队列的消费者,或者消费者按照需求自动获取。

RocketMQ消息队列

RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache的顶级项目,它是纯Java开发,具有高吞吐量、高可用性、适合大闺蜜分布式系统的特点。

该消息队列的设计思路起源于Kafka,他对消息的可靠传输和事务的性能做了优化,目前被阿里集团广泛应用于交易、充值、流计算、消息推送、日志流处理、binglog分发等场景。

ActiveMQ消息队列

Apache的开源产品,完全支持JMS规范的消息中间件,是一个纯Java的程序,因此只需要操作系统支持Java虚拟机的,ActiveMQ即可执行。

JMS即为Java Message Service的应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个程序之间,或分布式系统中发送消息,进行异步通信。

1.Distination目的地,消息发送者需要指定Destination才能发送消息,接收者需要指定Destination才能接收消息

2.Producer是生产者,生产数据,Consumer是消费者,接收数据

3.Message是消息封装一次后通信的内容

zeroMQ

正确的说,zeroMQ不是TCP,不是socket,也不是消息队列,而是这些的综合体

zeroMQ又称Ø MQ

Ø 也是一个非常奇妙的符号:

1.Ø 是一种权衡

2.Ø 暗合“零代理”、“零延迟”

3.Ø 的目标是“零管理、零消耗、零浪费”

4.Ø 符合简约注意,以减低复杂度作为力量的源泉,而不是增加新的功能

zeroMQ与消息队列相比的特点:

1.点对点无中间节点

致力于把点对点模式做到极致

2.强调消息收发机制

对通信的模式做了总结

例如数据的推送,接收 等

  • PUB and SUB
  • REQ and REP
  • REQ and XREP
  • XREQ and REP
  • XREQ and XREP
  • XREQ and XREQ
  • XREP and XREP
  • PUSH and PULL
  • PAIR and PAIR

3.以统一接口支持多种底层通信方式

不管是线程间通信,进程间通信还是跨主机通信,zeroMQ都使用同一套API进行调用,在调用的时候只需要修改对应的协议名称即可

Redis实现消息队列

Redis实现轻量级的消息队列与消息中间件,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而且对消息的可靠性要求不严格的可以试用。

Redis存储类型:

Redis中列表List类型是按照插入顺序的字符串链表,和数据结构中的普遍链表一样,可以在头部left和尾部right添加新的元素。插入时如何键不存在Redis将为该键创建一个新的链表。如果链表中所有元素均被删除,那么该键也被删除。

Redis的列表List可以包含的最大元素数量为4294967295,从元素插入和删除的效率来看,如果是在链表的两头插入或删除元素将是非常高效的操作。即使链表中已经存储了数百万条记录,该操作也能在常量时间内完成。然后需要说明的是,如果元素插入或删除操作是作用于链表中间,那将是非常低效的。

Redis中对列表List的操作命令中,L表示从左侧头部开始插入和弹出,R表示从右侧尾部开始插入和弹出。

Redis提供了两种方式来做消息队列,一种是生产消费模式,另一种是发布订阅模式。

生产消费模式

生产消费模式会让一个或多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的。如果队列中没有消息,消费者会继续监听。

PUSH/POP

Redis数据结构的列表List提供了push和pup命令,遵循着先入先出FIFO的原则。使用push/pop方式的优点在于消息可以持久化,缺点是一条消息只能被一个消费者接收,消费者完全靠手速来获取,是一种比较简陋的消息队列。

Redis的队列list是有序的且可以重复的,作为消息队列使用时可使用rpush/lpush操作入队,使用lpop/rpop操作出队。当发布消息是执行lpush命令,将消息从列表左侧加入队列。消息接收方执行rpop命令从列表右侧弹出消息。

如果队列空了怎么办呢?

如果队列空了,消费者会陷入pop死循环,即使没有数据也不会停止。空轮询不但消耗消费者的CPU资源还会影响Redis的性能。傻瓜式的做法是让消费者的线程按照一定的时间间隔不停的循环和监控队列,虽然可行但显然会造成不必要的资源浪费,而且循环周期也很难确定。

对于消费者而言存在一个问题,需要不停的调用rpop查看列表中是否有待处理的消息。每调用一次都会发起一次连接,势必造成不必要的资源浪费。如果使用休眠的方式让消费者线程间隔一段时间再消费,但这样做也有两个问题:

如果生产者速度大于消费者消费的速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
如果休眠时间过长,就无法处理一些时效性的消息。如果休眠时间过短也会在连接上造成比较大的开销。
当LPOP返回一个元素给客户端时会从List中将该元素移除,这意味着该元素只存在于客户端的上下文中。如果客户端在处理这个返回元素的过程中崩溃了,这个元素就会永远的丢失掉。

pulsar消息队列

待补充

offset维护

维护offset的原因

由于消费者在消费消费过程中可能会出现断电宕机等故障,consumer恢复后需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到哪个offset,以便故障恢复后继续消费。

维护offset的方式

Kafaka 0.9版本之前,consumer默认将offset保存在zookeeper中,从0.9版本之后,consumer默认将offset保存在Kafka一个内置的topic中,该topic为“_consumer_offsets”

offset常识

消费者提交消费位移时提交得是当前消费到最新消息得的offset + 1而不是offset

offset更新的方式,不区分是哪种API,大概分为两类:

自动提交,设置enable.auto.commit=true,更新的频率根据参数确定。这种方式也称为[at most once],fetch一旦接收到消息就可以更新offset,无论是否消费成功。

手动提交,设置enable.auto.commit=false,这种方式称为[at lwast once].fetch到消息后,等消息完成再调用方法,手动更新offset;如果offset消费失败,则offset也不会更新,此条消息会被重复消费一次。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发