Spark-Streaming with Kafka Programming

2018-05-08 15:55:06
运行环境
  1. jdk环境

    1
    2
    3
    4
    [root@cdh1 kafka]# java -version
    java version "1.8.0_112"
    Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
    Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
  2. 引入maven

    1
    2
    3
    4
    5
    6
    7
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>

    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
示例WordCount
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package cn.spark.streaming;


import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.CanCommitOffsets;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

import scala.Tuple2;

public class KafkaDirectWordCount {

public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub

SparkConf conf = new SparkConf()
.setAppName("KafkaReceiveWordCount")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5));

String brokers = "192.168.10.140:9092,192.168.10.141:9092,192.168.10.142:9092";

Map<String, Object> kafkaparams = new HashMap<>();
kafkaparams.put("metadata.broker.list", brokers);
kafkaparams.put("bootstrap.servers", brokers);
kafkaparams.put("group.id", "KafkaDirectWordCount");
kafkaparams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaparams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaparams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaparams.put("enable.auto.commit", false);
kafkaparams.put("auto.offset.reset", "latest"); // earliest latest none
kafkaparams.put("offsets.storage", "kafka");

Collection<String> topics = new HashSet<String>();
topics.add("topicA");

// Map<TopicPartition,Long> offsets = new HashMap<>();
// offsets.put(new TopicPartition("topicA",0),2L);

JavaInputDStream<ConsumerRecord<String,String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaparams)
);

JavaDStream<String> words = messages.flatMap(new FlatMapFunction<ConsumerRecord<String,String>,String>(){
private static final long serialVersionUID = 1L;

@Override
public Iterator<String> call(ConsumerRecord<String, String> line) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(line.value().split(" ")).iterator();
}

});


JavaPairDStream<String,Integer> paris = words.mapToPair(new PairFunction<String,String,Integer>(){
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String,Integer>(word,1);
}

});

JavaPairDStream<String,Integer> wordcount= paris.reduceByKey(new Function2<Integer,Integer,Integer>(){
private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}

});

wordcount.print();

messages.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});

((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);

});




jssc.start();
jssc.awaitTermination();
jssc.close();

}

}

运行之前开启生产者:

1
2
3
cd /usr/local/kafka
bin/kafka-console-producer.sh --broker-list 192.168.10.140:9092,192.168.10.141:9092,192.168.10.142:9092 --topic topicA
> hello word hello me

ref
DirectKafkaInputDStream — Direct Kafka DStream
Creating a Direct Stream
Spark Streaming Programming Guide
Offset Management For Apache Kafka With Apache Spark Streaming
Spark Streaming ‘numRecords must not be negative’问题解决
Kafka auto.offset.reset值详解
Spark整合kafka0.10.0新特性(一)
kafka0.8版本和sparkstreaming整合的两种不同方式
Spark streaming 跟踪kafka offset的问题研究


您的鼓励是我写作最大的动力

俗话说,投资效率是最好的投资。 如果您感觉我的文章质量不错,读后收获很大,预计能为您提高 10% 的工作效率,不妨小额捐助我一下,让我有动力继续写出更多好文章。