Kafka的分区数与多线程消费探讨
典型的high-level Consumer的API如下:
Properties props = new Properties();
props.put("zookeeper.connect", "xxxx:2181");
props.put("zookeeper.connectiontimeout.ms", "1000000");
props.put("group.id", "test_group");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
//key--topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
StringBuffer sb = new StringBuffer();
while(it.hasNext()) {
try {
String msg = new String(it.next().message(), "utf-8").trim();
System.out.println("receive:" + msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
这是典型的kafka消费端消费数据的代码,但可以看出这是十分典型的单线程消费。不能直接用在生产实践中。
首先,最好理解kafka的基本原理和一些基本概念:
public class KafakConsumer implements Runnable {
private ConsumerConfig consumerConfig;
private static String topic="blog";
Properties props;
final int a_numThreads = 6;
public KafakConsumer() {
props = new Properties();
props.put("zookeeper.connect", "xxx:2181,yyy:2181,zzz:2181");
// props.put("zookeeper.connect", "localhost:2181");
// props.put("zookeeper.connectiontimeout.ms", "30000");
props.put("group.id", "blog");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
consumerConfig = new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
executor.submit(new KafkaConsumerThread(stream));
}
}
public static void main(String[] args) {
System.out.println(topic);
Thread t = new Thread(new KafakConsumer());
t.start();
}
}
其中,具体消费线程KafkaConsumerThread代码为:
public class KafkaConsumerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;
public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mam = it.next();
System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
+ "offset[" + mam.offset() + "], " + new String(mam.message()));
}
}
}
编写生产端(Producer)的代码:
public class KafkaProducer implements Runnable {
private Producer<String, String> producer = null;
private ProducerConfig config = null;
public KafkaProducer() {
Properties props = new Properties();
props.put("zookeeper.connect", "*****:2181,****:2181,****:2181");
// props.put("zookeeper.connect", "localhost:2181");
// 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[]
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息
props.put("producer.type", "sync");
// 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
props.put("compression.codec", "1");
// 指定kafka节点列表,用于获取metadata(元数据),不必全部指定
props.put("broker.list", "****:6667,***:6667,****:6667");
config = new ProducerConfig(props);
}
@Override
public void run() {
producer = new Producer<String, String>(config);
// for(int i=0; i<10; i++) {
// String sLine = "I'm number " + i;
// KeyedMessage<String, String> msg = new KeyedMessage<String, String>("group1", sLine);
// producer.send(msg);
// }
for(int i = 1; i <= 6; i++){ //往6个分区发数据
List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 6; j++){ //每个分区6条讯息
messageList.add(new KeyedMessage<String, String>
//String topic, String partition, String message
("blog", "partition[" + i + "]", "message[The " + i + " message]"));
}
producer.send(messageList);
}
}
public static void main(String[] args) {
Thread t = new Thread(new KafkaProducer());
t.start();
}
}
上述生产端代码相对传统的发送端代码也做了改进,首先是用了批量发送(源码):
public void send(List<KeyedMessage<K, V>> messages)
{
underlying().send(JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());
}
而不是:
public void send(KeyedMessage<K, V> message)
{
underlying().send(Predef..MODULE$.wrapRefArray((Object[])new KeyedMessage[] { message }));
}
第二,KeyedMessage用的构造函数:
public KeyedMessage(String topic, K key, V message) { this(topic, key, key, message); }
public KeyedMessage(String topic, V message) {
this(topic, null, null, message);
}
分别run一下生产和消费的代码,可以看到消费端打印结果:
pool-2-thread-5: partition[5],offset[0], message[The 5 message]
pool-2-thread-1: partition[2],offset[0], message[The 2 message]
pool-2-thread-2: partition[1],offset[0], message[The 1 message]
pool-2-thread-5: partition[4],offset[0], message[The 4 message]
pool-2-thread-1: partition[3],offset[0], message[The 3 message]
pool-2-thread-4: partition[6],offset[0], message[The 6 message]
可以看到,6个分区的数据全部被消费了,但是不妨看下消费线程:pool-2-thread-1线程同时消费了partition[2]和partition[3]的数据;pool-2-thread-2消费了partiton[1]的数据;pool-2-thread-4消费了partiton[6]的数据;而pool-2-thread-5则消费了partitoin[4]和partition[5]的数据。
从上述消费情况来看,验证了消费线程和分区的对应情况——即:一个分区只能被一个线程消费,但一个消费线程可以消费多个分区的数据!虽然我指定了线程池的线程数为6,但并不是所有的线程都去消费了,这当然跟线程池的调度有关系了。并不是一个消费线程对应地去消费一个分区的数据。
我们不妨仔细看下消费端启动日志部分,这对我们理解kafka的启动生成和消费的原理有益:
消费端的启动日志表明:
1)Consumer happy_Connor-PC-1445916157267-b9cce79d rebalancing the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog with consumers: List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)
happy_Connor-PC-1445916157267-b9cce79d表示一个消费组,该topic可以使用10个分区:the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog。然后定义了6个消费线程,List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)。消费线程的个数由topicCountMap.put(String topic, Integer count)的第二个参数决定。但真正去消费的线程还是由线程池的调度机制来决定;
2)线程由zookeeper来声明它拥有1个或多个分区;
3)真正有数据存在的分区是由生产发送端来决定,即使你的kafka设置了10个分区,消费端在消费的时候,消费线程虽然会根据zookeeper的某种机制来声明它所消费的分区,但实际消费过程中,还是会消费真正存在数据的分区。(本例中,你只往6个分区push了数据,所以即使你声明了10个分区,你也只能消费6个分区的数据)。
如果把topicCountMap的第二个参数Integer值改成1,发送端改成往7个分区发数据再测试,可得到消费端的打印结果:
pool-2-thread-1: partition[6],offset[0], message[The 6 message]
pool-2-thread-1: partition[3],offset[0], message[The 3 message]
pool-2-thread-1: partition[2],offset[0], message[The 2 message]
pool-2-thread-1: partition[5],offset[0], message[The 5 message]
pool-2-thread-1: partition[4],offset[0], message[The 4 message]
pool-2-thread-1: partition[7],offset[0], message[The 7 message]
pool-2-thread-1: partition[1],offset[0], message[The 1 message]
//针对topic创建一个分区并发送数据
List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();
for(int i = 1; i <= 10; i++){
messageList.add(new KeyedMessage<String, String>("blog6", "我是发送的内容message"+i));
}
producer.send(messageList);
public KeyedMessage(String topic,V message) { this(topic, key, key, message); }
sendMessage(KeyedMessage(String topic,V message))
topicCountMap.put(topic, new Integer(7));
//......................
ExecutorService executor = Executors.newFixedThreadPool(5);
文章不错?点个【在看】吧! 👇