尚硅谷大數據技術之Kafka第4章 Kafka API實戰
4.1 環境準備
1)在eclipse中創建一個java工程
2)在工程的根目錄創建一個lib文件夾
3)解壓kafka安裝包,將安裝包libs目錄下的jar包拷貝到工程的lib目錄下,并build path。
4)啟動zk和kafka集群,在kafka集群中打開一個消費者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
4.2 Kafka生產者Java API
4.2.1 創建生產者(過時的API)
package?com.atguigu.kafka; import?java.util.Properties; import?kafka.javaapi.producer.Producer; import?kafka.producer.KeyedMessage; import?kafka.producer.ProducerConfig;
public?class?OldProducer {
@SuppressWarnings("deprecation") public?static?void?main(String[] args) { Properties properties = new?Properties(); properties.put("metadata.broker.list", "hadoop102:9092"); properties.put("request.required.acks", "1"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); Producer<Integer, String> producer = new?Producer<Integer,String>(new?ProducerConfig(properties)); KeyedMessage<Integer, String> message = new?KeyedMessage<Integer, String>("first", "hello world"); producer.send(message ); } } |
4.2.2 創建生產者(新API)
package?com.atguigu.kafka; import?java.util.Properties; import?org.apache.kafka.clients.producer.KafkaProducer; import?org.apache.kafka.clients.producer.Producer; import?org.apache.kafka.clients.producer.ProducerRecord;
public?class?NewProducer {
public?static?void?main(String[] args) { Properties props = new?Properties(); // Kafka服務端的主機名和端口號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節點的應答 props.put("acks", "all"); // 消息發送最大嘗試次數 props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); //?請求延時 props.put("linger.ms", 1); // 發送緩存區內存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new?KafkaProducer<>(props); for?(int?i = 0; i < 50; i++) { producer.send(new?ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i)); }
producer.close(); } } |
4.2.3?創建生產者帶回調函數(新API)
package?com.atguigu.kafka; import?java.util.Properties; import?org.apache.kafka.clients.producer.Callback; import?org.apache.kafka.clients.producer.KafkaProducer; import?org.apache.kafka.clients.producer.ProducerRecord; import?org.apache.kafka.clients.producer.RecordMetadata;
public?class?CallBackProducer {
public?static?void?main(String[] args) {
Properties props = new?Properties(); // Kafka服務端的主機名和端口號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節點的應答 props.put("acks", "all"); // 消息發送最大嘗試次數 props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 增加服務端請求延時 props.put("linger.ms", 1); // 發送緩存區內存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new?KafkaProducer<>(props);
for?(int?i = 0; i < 50; i++) {
kafkaProducer.send(new?ProducerRecord<String, String>("first", "hello" + i), new?Callback() {
@Override public?void?onCompletion(RecordMetadata metadata, Exception exception) {
if?(metadata != null) {
System.err.println(metadata.partition() + "---" + metadata.offset()); } } }); }
kafkaProducer.close(); } } |
4.2.4?自定義分區生產者
0)需求:將所有數據存儲到topic的第0號分區上
1)定義一個類實現Partitioner接口,重寫里面的方法(過時API)
package?com.atguigu.kafka; import?java.util.Map; import?kafka.producer.Partitioner;
public?class?CustomPartitioner implements?Partitioner?{
public?CustomPartitioner() { super(); }
@Override public?int?partition(Object key, int?numPartitions) { // 控制分區 return?0; } } |
2)自定義分區(新API)
package?com.atguigu.kafka; import?java.util.Map; import?org.apache.kafka.clients.producer.Partitioner; import?org.apache.kafka.common.Cluster;
public?class?CustomPartitioner implements?Partitioner {
@Override public?void?configure(Map<String, ?> configs) { }
@Override public?int?partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { ????????// 控制分區 return?0; }
@Override public?void?close() { } } |
3)在代碼中調用
package?com.atguigu.kafka; import?java.util.Properties; import?org.apache.kafka.clients.producer.KafkaProducer; import?org.apache.kafka.clients.producer.Producer; import?org.apache.kafka.clients.producer.ProducerRecord;
public?class?PartitionerProducer {
public?static?void?main(String[] args) { Properties props = new?Properties(); // Kafka服務端的主機名和端口號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節點的應答 props.put("acks", "all"); // 消息發送最大嘗試次數 props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 增加服務端請求延時 props.put("linger.ms", 1); // 發送緩存區內存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 自定義分區 props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");
Producer<String, String> producer = new?KafkaProducer<>(props); producer.send(new?ProducerRecord<String, String>("first", "1", "atguigu"));
producer.close(); } } |
4)測試
(1)在hadoop102上監控/opt/module/kafka/logs/目錄下first主題3個分區的log日志動態變化情況
[atguigu@hadoop102 first-0]$ tail -f 00000000000000000000.log
[atguigu@hadoop102 first-1]$ tail -f 00000000000000000000.log
[atguigu@hadoop102 first-2]$ tail -f 00000000000000000000.log
(2)發現數據都存儲到指定的分區了。
4.3 Kafka消費者Java API
0)在控制臺創建發送者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092?--topic first
>hello world
1)創建消費者(過時API)
package?com.atguigu.kafka.consume; import?java.util.HashMap; import?java.util.List; import?java.util.Map; import?java.util.Properties; import?kafka.consumer.Consumer; import?kafka.consumer.ConsumerConfig; import?kafka.consumer.ConsumerIterator; import?kafka.consumer.KafkaStream; import?kafka.javaapi.consumer.ConsumerConnector;
public?class?CustomConsumer {
@SuppressWarnings("deprecation") public?static?void?main(String[] args) { Properties properties = new?Properties(); properties.put("zookeeper.connect", "hadoop102:2181"); properties.put("group.id", "g1"); properties.put("zookeeper.session.timeout.ms", "500"); properties.put("zookeeper.sync.time.ms", "250"); properties.put("auto.commit.interval.ms", "1000"); // 創建消費者連接器 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new?ConsumerConfig(properties)); HashMap<String, Integer> topicCount = new?HashMap<>(); topicCount.put("first", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount); KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while?(it.hasNext()) { System.out.println(new?String(it.next().message())); } } } |
2)官方提供案例(自動維護消費情況)(新API)
package?com.atguigu.kafka.consume; import?java.util.Arrays; import?java.util.Properties; import?org.apache.kafka.clients.consumer.ConsumerRecord; import?org.apache.kafka.clients.consumer.ConsumerRecords; import?org.apache.kafka.clients.consumer.KafkaConsumer;
public?class?CustomNewConsumer {
public?static?void?main(String[] args) {
Properties props = new?Properties(); // 定義kakfa?服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "hadoop102:9092"); // 制定consumer group props.put("group.id", "test"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumer<String, String> consumer?= new?KafkaConsumer<>(props); // 消費者訂閱的topic, 可同時訂閱多個 consumer.subscribe(Arrays.asList("first", "second","third"));
while?(true) { // 讀取數據,讀取超時時間為100ms ConsumerRecords<String, String> records = consumer.poll(100); for?(ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }` |
本教程由尚硅谷教育大數據研究院出品,如需轉載請注明來源,歡迎大家關注尚硅谷公眾號(atguigu)了解更多。