尚硅谷大數據技術之電信客服
3.2?數據采集/消費(存儲)
歡迎來到數據采集模塊(消費),在企業中你要清楚流式數據采集框架flume和kafka的定位是什么。我們在此需要將實時數據通過flume采集到kafka然后供給給hbase消費。
flume:cloudera公司研發
適合下游數據消費者不多的情況;
適合數據安全性要求不高的操作;
適合與Hadoop生態圈對接的操作。
kafka:linkedin公司研發
適合數據下游消費眾多的情況;
適合數據安全性要求較高的操作(支持replication);
因此我們常用的一種模型是:
線上數據 --> flume?--> kafka?--> flume(根據情景增刪該流程) -->?HDFS
消費存儲模塊流程如圖2所示:
圖2 消費存儲模塊流程圖
3.2.1?數據采集
思路:
- a) 配置kafka,啟動zookeeper和kafka集群;
- b) 創建kafka主題;
- c) 啟動kafka控制臺消費者(此消費者只用于測試使用);
- d)配置flume,監控日志文件;
- e) 啟動flume監控任務;
- f)運行日志生產腳本;
- g)觀察測試。
1)啟動zookeeper,kafka集群
$/opt/module/kafka/bin/kafka-server-start.sh?/opt/module/kafka/config/server.properties |
2)創建kafka主題
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic calllog --create --replication-factor 1 --partitions 3 |
檢查一下是否創建主題成功:
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list |
3)啟動kafka控制臺消費者,等待flume信息的輸入
$ /opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 -topic?calllog --from-beginning |
4)配置flume(flume-kafka.conf)
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1
# source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /home/atguigu/call/calllog.csv a1.sources.r1.shell = /bin/bash -c
# sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = calllog a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
5)啟動flume
$ /opt/module/flume/bin/flume-ng agent --conf /opt/module/flume/conf/ --name a1 --conf-file?/home/atguigu/calllog/flume2kafka.conf |
6)運行生產日志的任務腳本,觀察kafka控制臺消費者是否成功顯示產生的數據
$ sh /home/atguigu/calllog/productlog.sh |