• 43687

    文章

  • 368

    评论

  • 30

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

kafka学习(7)生产者和消费者代码

喜欢本站的朋友可以收藏本站,或者加入QQ群:172816590Web网页设计师,我们大家一起来交流技术!

欢迎来到梁钟霖个人博客网站。本个人博客网站提供最新的站长新闻,各种互联网资讯。 还提供个人博客模板,最新最全的java教程,java面试题。在此我将尽我最大所能将此个人博客网站做的最好! 谢谢大家,愿大家一起进步!




首先,我们打开kafka的api页面,里面都有详细的样例

http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer

消息生产这要发送消息,需要经过三次握手,第一次获取连接,第二次发送消息,第三次关闭连接。

下面我们通过构建一个例子,让生产者通过JAVA代码,发送一条消息。

创建spring boot项目,在jar包依赖中,加入kafka的依赖包。

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.2.0</version>
		</dependency>

需要注意的是,依赖的jar包版本,要与我们部署的kafka版本一致,不然容易发生问题还不好解决。

下面我简单发送一个例子:

package com.example.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Test {
	public static void main(String[] args) {
		Properties props = new Properties();
		// 与kafka连接
		props.setProperty("bootstrap.servers", "master:9092,slave01:9092,slave02:9092");
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		// 发送
		producer.send(new ProducerRecord<String, String>("mytopic2", "hello"));
		// 关闭
		producer.close();
	}
}

运行之后,会报错:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
	at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.example.producer.Test.main(Test.java:13)

报错原因是系统之间的传递数据,一般都是将数据序列化的。增加如下配置:

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

然后我们执行以下,发现还是报错:

00:42:42.927 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [master:9092, slave01:9092, slave02:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

00:42:43.048 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
00:42:43.056 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
00:42:45.617 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server master:9092 from bootstrap.servers as DNS resolution failed for master
00:42:48.170 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server slave01:9092 from bootstrap.servers as DNS resolution failed for slave01
00:42:50.721 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server slave02:9092 from bootstrap.servers as DNS resolution failed for slave02
00:42:50.723 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
00:42:50.726 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.example.producer.Test.main(Test.java:16)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:90)
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:49)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
	... 2 more

 原因很简单,推消息失败了,我这里的报错的原因,是因为mytopic2这个主题,还没有创建。改为mytopic主题后,发送成功,如下。

16:03:11.739 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

16:03:11.956 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
16:03:11.972 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
16:03:11.989 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors
16:03:12.028 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time
16:03:12.542 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
16:03:12.543 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
16:03:12.543 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication:
16:03:12.544 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-reauthentication:
16:03:12.545 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication-no-reauth:
16:03:12.545 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-authentication:
16:03:12.546 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-reauthentication:
16:03:12.546 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name reauthentication-latency:
16:03:12.547 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
16:03:12.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
16:03:12.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
16:03:12.550 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
16:03:12.551 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
16:03:12.562 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size
16:03:12.563 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate
16:03:12.564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time
16:03:12.564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time
16:03:12.565 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request
16:03:12.566 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries
16:03:12.568 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size
16:03:12.570 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-split-rate
16:03:12.573 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Starting Kafka producer I/O thread.
16:03:12.575 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9094 (id: -3 rack: null) for sending metadata request
16:03:12.577 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9094 (id: -3 rack: null) using address /127.0.0.1
16:03:12.578 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.2.0
16:03:12.578 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 05fcfde8f69b0349
16:03:12.586 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started
16:03:12.606 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-sent
16:03:12.607 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-received
16:03:12.609 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--3.latency
16:03:12.610 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -3
16:03:12.828 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -3. Fetching API versions.
16:03:12.828 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -3.
16:03:12.833 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9093 (id: -2 rack: null) for sending metadata request
16:03:12.834 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9093 (id: -2 rack: null) using address /127.0.0.1
16:03:12.838 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
16:03:12.840 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
16:03:12.842 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -2. Fetching API versions.
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -2.
16:03:12.843 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 127.0.0.1:9092 (id: -1 rack: null) for sending metadata request
16:03:12.844 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9092 (id: -1 rack: null) using address /127.0.0.1
16:03:12.844 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
16:03:12.846 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
16:03:12.847 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
16:03:12.847 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -3: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
16:03:12.855 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
16:03:12.856 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=mytopic) to node 127.0.0.1:9094 (id: -3 rack: null)
16:03:12.858 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -2: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.866 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node -1: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updating last seen epoch from null to 0 for partition mytopic-0
16:03:12.897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updating last seen epoch from null to 0 for partition mytopic-1
16:03:12.901 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: pqU8TrnFQvumP_0Nu9V0ag
16:03:12.901 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = pqU8TrnFQvumP_0Nu9V0ag, nodes = [127.0.0.1:9094 (id: 3 rack: null), 127.0.0.1:9092 (id: 1 rack: null), 127.0.0.1:9093 (id: 2 rack: null)], partitions = [Partition(topic = mytopic, partition = 0, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = mytopic, partition = 1, leader = 1, replicas = [1,3], isr = [1,3], offlineReplicas = [])], controller = 127.0.0.1:9092 (id: 1 rack: null))}
16:03:12.925 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9094 (id: 3 rack: null) using address /127.0.0.1
16:03:12.926 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-sent
16:03:12.926 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-received
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.latency
16:03:12.927 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 3
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 3. Fetching API versions.
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 3.
16:03:12.928 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
16:03:12.930 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 3: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], ElectPreferredLeaders(43): 0 [usable: 0])
16:03:12.935 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.records-per-batch
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.bytes
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.compression-rate
16:03:12.936 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.record-retries
16:03:12.937 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.mytopic.record-errors
16:03:13.005 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
16:03:13.005 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
16:03:13.006 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
16:03:13.006 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-reauthentication:
16:03:13.007 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication-no-reauth:
16:03:13.008 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-reauthentication:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name reauthentication-latency:
16:03:13.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
16:03:13.010 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
16:03:13.010 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
16:03:13.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
16:03:13.013 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.bytes-sent
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.bytes-received
16:03:13.014 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--3.latency
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.bytes-sent
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.bytes-received
16:03:13.015 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--2.latency
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
16:03:13.016 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-sent
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-received
16:03:13.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.latency
16:03:13.018 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
16:03:13.018 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed

然后我们通过以下命令,启动一个消费者,可以看到新添加进来的消息

kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic mytopic

修改上述代码,采用for循环连续发送100条消息。可以看到消费者接收到的消息顺序和循环不是一致的。

	public static void main(String[] args) {
		Properties props = new Properties();
		// 与kafka连接
		props.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		// 发送
		for(int i = 0 ; i < 100 ; i++)
		producer.send(new ProducerRecord<String, String>("mytopic", "hello"+i));
		// 关闭
		producer.close();
	}

原因是消息是存放在不同的分区中的,而消费者也是随机从不同的分区去消费消息的。既消息在一个分区中是有序的,但是消费者并没有指定消费哪个分区的消息,就来回消费所有分区消息。就是乱的。但如果只有一个分区,消息消费就是有序的。

 

下面是消费者代码:

package com.example.producer;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerTest {
	public static void main(String[] args) {
		Properties props = new Properties();
		// 与kafka连接
		// 指定broker链接地址
		props.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
		// key反序列化
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// value反序列化
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

		// 每个消费者都必须属于某一个消费组,所以必须指定group.id
		props.put("group.id", "test");
		// 构造生产者实力对象
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 拿数据

		// 指定多主题方式1:
		// List<String> topics = new ArrayList<String>();
		// topics.add("mytopic");

		// 指定多主题方式2:
		List<String> topics = Arrays.asList("mytopic");
		consumer.subscribe(topics);
		try {
			while (true) {
				// timeout 阻塞时间,从kafka中取出100毫秒的数据,有可能一次取出0到N条
				ConsumerRecords<String, String> records = consumer.poll(100);
				// 遍历
				for (ConsumerRecord<String, String> record : records) {
					// 拿出结果
					System.out.println("消费者拿到的数据:" + record.value());
				}
			}
		} finally {
			// 关闭
			consumer.close();
		}

	}
}

执行消费者代码,可以看到消费到消息:

我们通过消费者代码,从kafka中拿到数据,是要往DB中写的。但是往DB写的过程中,也许会出现问题,但是此时kafka认为消息已经被消费了,就会造成消费丢失,数据丢失的情况。

 

 


 转载至链接:https://my.oschina.net/xiaoyoung/blog/3037315。




您觉喜欢本网站,或者觉得本文章对您有帮助,那么可以选择打赏。
打赏多少,您高兴就行,谢谢您对梁钟霖这小子的支持! ~(@^_^@)~

  • 微信扫一扫

  • 支付宝扫一扫

    支付宝打赏
转载原创文章请注明出处,转载至: 梁钟霖个人博客www.liangzl.com

0条评论

Loading...


发表评论

电子邮件地址不会被公开。 必填项已用*标注

自定义皮肤