
1. 大资料领域资料型别
1.1 有界资料
一般批处理(一个档案 或者一批档案),不管档案多大,都是可以度量
mapreduce hive sparkcore sparksql
1.2 无界资料
源源不断的流水一样 (流资料)
Storm SparkStreaming
2. 讯息伫列(Message Queue)
讯息 Message网络中的两台计算机或者两个通讯装置之间传递的资料,例如说:文字、音乐、视讯等内容伫列 Queue一种特殊的线性表(资料元素首尾相接),特殊之处在于只允许在首部移除元素和在尾部追加元素。入队、出队。讯息伫列 MQ讯息+伫列储存讯息的伫列讯息的传输过程中的容器主要提供生产、消费界面供外部呼叫做资料的储存和获取3. 讯息伫列的分类
3.1 点对点(P2P)
一个生产者生产的讯息只能被一个消费者消费3.2 释出订阅(Pub/Sub)
讯息伫列(Queue)、主题(Topic)、释出者(Publisher)、订阅者(Subscriber)
讯息的释出者讯息的订阅者每个讯息可以有多个消费者,彼此互不影响。比如我释出一个微博:关注我的人都能够看到。
4. Kafka的简介
在大资料领域呢,为了满足日益增长的资料量,也有一款可以满足百万级别讯息的生成和消费,分散式、持久稳定的产品——KafkaKafka是分散式的释出—订阅讯息系统(基于PS的一个讯息伫列)它最初由LinkedIn(领英)公司释出,使用Scala语言编写Kafka是一个高吞吐量的、永续性的、分散式释出订阅讯息系统它主要用于处理活跃的资料(登入、浏览、点选、分享、喜欢等使用者行为产生的资料5. Kafka的特点
高吞吐量可以满足每秒百万级 别讯息的生产和消费(生产消费 )永续性有一套完善的讯息储存机制,确保资料的高效安全的持久化 (资料的储存)分散式基于分散式的扩充套件和容错机制;Kafka的资料都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性6. Kafka的元件
一个讯息伫列需要哪些部分?生产消费讯息类别储存等等Topic(主题)Kafka处理的讯息的不同分类Broker (讯息代理)Kafka丛集中的一个kafka服务节点称为一个broker,主要储存讯息资料,存在硬盘中。每个topic都是有分割槽的Partition (物理上的分割槽)一个topic在broker中被分为1个或者多个partition,分割槽在建立topic的时候指定Message (讯息)讯息,是通讯的基本单位,每个讯息都属于一个partition7. Kafka的服务
Producer : 讯息和资料的生产者,向Kafka的一个topic释出讯息Consumer :讯息和资料的消费者,定于topic并处理其释出的讯息Zookeeper :协调kafka的正常执行
8. Kafka的安装
8.1 单机版的安装
准备kafkakafka_2.10-0.10.0.1.tgz解压kafkatar -zxvf kafka_2.10-0.10.0.1.tgz -C /opt/重新命名mv kafka_2.10-0.10.0.1.tgz kafka配置环境变数export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
编辑server.propertiesbroker.id=1
log.dirs=/opt/kafka/logs
zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
listeners=PLAINTEXT://:9092
启动kafka-server服务kafka-server-start.sh [-daemon] server.properties
停止kafka服务kafka-server-stop.sh
8.2 丛集的安装
只需要在每个机器上修改对应的 ==broker.id=1== 即可
9. Kafka中Topic的操作
建立topickafka-topics.sh --create --topic t1 --partitions 3 --replication-factor 1 --zookeeper uplooking03:2181,uplooking04:2181
==注意: 建立topic过程的问题,replication-factor个数不能超过brokerserver的个数==检视topickafka-topics.sh --list --zookeeper uplooking03
检视具体topic的详情kafka-topics.sh --describe --topic t1 --zookeeper uplooking04:2181
PartitionCount:topic对应的partition的个数
ReplicationFactor:topic对应的副本因子,说白就是副本个数
Partition:partition编号,从0开始递增
Leader:当前partition起作用的breaker.id
Replicas: 当前副本资料存在的breaker.id,是一个列表,排在最前面的其作用
Isr:当前kakfa丛集中可用的breaker.id列表
修改topic(不能修改replication-factor,以及只能对partition个数进行增加,不能减少 )kafka-topics.sh --alter --topic t1 --partitions 4 --zookeeper uplooking03
删除Topickafka-topics.sh --delete --topic t1 --zookeeper uplooking03
ps:这种删除只是标记删除,要想彻底删除必须设定一个属性,在server.properties中配置delete.topic.enable=true,否则只是标记删除配置完成之后,需要重启kafka服务10. Kafka中的生产者和消费者界面
自己写程式码实现kafka提供的讯息生产和消费的界面kafka自身也实现了自身的生产和消费的界面,给出了两个工具(kafka-console-producer.sh , kafka-console-consumer.sh)11. Kafka自带的生产和消费讯息的工具
11.1 kafka-console-producer.sh(生产工具)
kafka-console-producer.sh --topic t1 --broker-list uplooking03:9092,uploo
king04:9092,uplooking05:9092
11.2 kafka-console-consumer.sh(消费工具)
kafka-console-consumer.sh --zookeeper uplooking03 --topic t1
--from-beginning:从头开始消费
--blacklist:黑名单过滤(kafka-console-consumer.sh --zookeeper uplooking03 --blacklist t1,t3)
--whitelist:白名单过滤(kafka-console-consumer.sh --zookeeper uplooking03 --whitelist t2)
ps:--topic|--blacklist|--whitelist 只能出现其中一个
12. ==Flume与Kafka的整合==
配置flume的agent配置档案touch flume-kafka.properties# 对各个元件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 用于描述source的,型别是netcat网络
a1.sources.r1.type = netcat
# source监听的网络ip地址和埠号
a1.sources.r1.bind = uplooking01
a1.sources.r1.port = 44444
# 用于描述sink,型别是kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hadoop
a1.sinks.k1.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2
# 用于描述channel,在内存中做资料的临时的储存
a1.channels.c1.type = memory
# 该内存中最大的储存容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100
# 将a1中的各个元件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume开始采集资料[[email protected]:/opt/flume/conf]
flume-ng agent --name a1 --conf-file flume-kafka.properties
开启Kafka讯息消费工具[[email protected]:/opt/flume/conf]
kafka-console-consumer.sh --zookeeper uplooking03 --topic hadoop
给flume监听的Source传送资料[[email protected]:/]
nc uplooking01 44444
现在就可以到kafka的消费工具(kafka-console-consumer.sh)中区检视nc传送的资料13. Kafka的API操作(生产者和消费者)
org.apache.kafka
kafka_2.10
0.10.0.1
13.1 Kafka的生产者
建立生产者的配置档案 producer.propertiesbootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
建立生产者并且传送资料到topic中public class MyKafkaProducer {
public static void main(String[] args) throws IOException {
Properties prop = new Properties();
prop.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
KafkaProducer kafkaProducer = new KafkaProducer(prop);
kafkaProducer.send(new ProducerRecord("hadoop", "name", "admin123"));
kafkaProducer.close();
}
}
13.2 Kafka的消费者
建立消费者的配置档案consumer.propertieszookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
group.id=test-consumer-group
bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
建立讯息消费者消费topic中的资料public static void main(String[] args) throws Exception {
Properties prop = new Properties();
prop.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
KafkaConsumer kafkaConsumer = new KafkaConsumer(prop);
Collection topics = new ArrayList();
topics.add("hadoop");
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords records = kafkaConsumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println(record.value());
}
}
}
自定义分割槽(MyCustomPartition)package com.uplooking.bigdata.kafka.partition;
public class MyCustomPartition implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取分割槽数, 分割槽编号一般都是从0开始
int partitionSize = cluster.partitionCountForTopic(topic);
int keyHash = Math.abs(key.hashCode());
int valueHash = Math.abs(value.hashCode());
return keyHash % partitionSize;
}
public void close() {
}
public void configure(Map configs) {
}
}
配置自定义分割槽(producer.properties)partitioner.class=com.uplooking.bigdata.kafka.partition.MyCustomPartition
end:如果你觉得本文对你有帮助的话,记得点赞转发,你的支援就是我更新动力。





























