Producer 端: Kafka 的发送端发送消息,如果是默认参数什么都不设置,则消息如果在网络没有抖动的时候,可以一批批的按消息发送的顺序被发送到 Kafka 服务器端。但是,一旦网络波动了,则消息就可能出现失序。所以,要严格保证 Kafka 发消息有序,首先要考虑同步发送消息。
Broker 端: 设置一个分区
Consumer 端: 一个consumer
如上图所示,有2个Broker,4个Partition,每个Partition都是leader-follower结构,follower只负责同步leader的数据,leader故障时选取一个ISR(in-sync replica)作为leader。
消费组机制 一个partition只能分配给一个消费组的一个consumer,也就是说一个消费组中,不可能2个consumer去消费一个partition。所以需要注意consumer数量,不要多于partition数量。
producer ACK机制 Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。ack有3个可选值,分别是1,0,-1(all)。
min.insync.replicas
broker端有min.insync.replicas
配置,比如副本数为3,min.insync.replicas
配置2,表示至少2个ISR(包括leader)确认收到消息,leader才回复producer消息提交成功。min.insync.replicas
只有在producer的ack配置为-1(即all)才会生效。
min.insync.replicas
默认值是1,可以在创建topic指定(跟着topic走),也可以在配置文件中指定(所有topic使用)。
这个配置的目的是保持提升可用性,如果一个副本挂了,仍然可以工作。
每个partition的leader或者follower都有LEO(日志末端位移),HW(高水位):
—-已提交数据—- | (HW)—–未提交数据—— | (LEO) |
leader的HW决定了消费者可以消费到的数据,跟不上leader速度(消息数量落后leader太多)的ISR或者超过一定时间未从leader拉取消息将会被剔除,数据同步跟上之后会再次成为ISR。
发送数据时,当所有ISR确认收到数据后,leader才commit。
如果一个消费组的消费者数量超过了partition数量,则会有空闲的消费者,即在一个消费组里,一个partition只能被一个消费者消费。
如下图所示,每个partition都是一个有序的不可变的记录序列,每个消息都会保留在磁盘一定的时间,在保留时间内,消费者都会消费到,超过保留时间,消息将会从磁盘删除。
Rebalance is the re-assignment of partition ownership among consumers within a given consumer group. Remember that every consumer in a consumer group is assigned one or more topic partitions exclusively.
A Rebalance happens when:
为了提高磁盘访问速度,现代操作系统在磁盘上构建一层page cache作为缓存,在读取磁盘时,一般会进行磁盘预读,即读取随后几个页面大小的磁盘到page cache。写磁盘时会写page cache,page cache会异步刷新到磁盘上。
kafka使用Memory-mapped file(内存映射文件)提高磁盘访问速度,即将page cache映射到进程地址空间,避免了数据从内核拷贝到用户空间,应用可以直接像访问内存一样访问文件。
发送文件一般涉及到如下几步:
使用Linux的sendfile api可以降低系统调用次数和内存拷贝次数,即零拷贝:
Properties props = new Properties();
props.put("bootstrap.servers", "xxxx:port");
props.put("group.id", "cgi-xxx");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "xxx_biz";
try {
// 获取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
long nowTime = now.getTime();
System.out.println("当前时间: " + df.format(now));
long fetchDataTime = nowTime - 1000 * 60 * 300; // 计算30分钟之前的时间戳
for(PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
// 获取每个partition一个小时之前的偏移量
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
System.out.println("开始设置各分区初始偏移量...");
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if(offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp))+
", offset = " + offset);
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("设置各分区初始偏移量结束...");
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() +
", offset = " + record.offset()+", msg="+record);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
Connect用在多数据源之间进行数据转换,比如将Oracle的数据转到Mysql,用户自定义Source(拉取)任务(connector)和Sink(转存)任务,将任务提交到connect服务(connect是一个独立配置、启动的服务),connect调度Source和Sink任务,通过Kafka的消息队列功能,完成数据转换。具体可以参考kafka connect文档(https://zditect.com/code/kafka-connect-tutorial.html)。 connect提供Standalone和Distributed模式,对外提供Restful的接口,实现对connector的管理。