前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.
没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.
实现方法
pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.linuxsogood.sync</groupId>
<artifactId>linuxsogood-sync</artifactId>
<version>1.0.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<!-- 依赖版本 -->
<mybatis.version>3.3.1</mybatis.version>
<mybatis.spring.version>1.2.4</mybatis.spring.version>
<mapper.version>3.3.6</mapper.version>
<pagehelper.version>4.1.1</pagehelper.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.1.RELEASE</version>
<scope>compile</scope>
</dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
<!--<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.3.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.11</version>
</dependency>
<!--Mybatis-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>${mybatis.spring.version}</version>
</dependency>
<!--<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>-->
<!-- Mybatis Generator -->
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.2</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!--分页插件-->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>${pagehelper.version}</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories>
<build>
<finalName>mybatis_generator</finalName>
<plugins>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>org.linuxsogood.sync.Starter</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
orm层使用了MyBatis,又使用了通用Mapper和分页插件.
kafka消费端配置
import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
生产者的配置.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.
在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式
import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.List;
import java.util.Optional;
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
@Autowired
private StoreMapper storeMapper;
/**
* 监听kafka消息,如果有消息则消费,同步数据到新烽火的库
* @param record 消息实体bean
*/
@KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
try {
MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
CupMessageType type = messageWrapper.getType();
//判断消息的数据类型,不同的数据入不同的表
if (CupMessageType.STORE == type) {
proceedStore(messageWrapper);
}
} catch (Exception e) {
LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);
}
}
}
/**
* 消息是店铺类型,店铺消息处理入库
* @param messageWrapper 从kafka中得到的消息
*/
private void proceedStore(MessageWrapper messageWrapper) {
Object data = messageWrapper.getData();
Store cupStore = JSON.parseObject(data.toString(), Store.class);
StoreExample storeExample = new StoreExample();
String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
storeExample.createCriteria().andStoreNameEqualTo(storeName);
List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);
org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
//如果查询不到记录则新增
if (stores.size() == 0) {
storeMapper.insert(store);
} else {
store.setStoreId(stores.get(0).getStoreId());
storeMapper.updateByPrimaryKey(store);
}
}
}
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。
# springboot整合kafka
# spring
# boot用kafka
# springboot集成kafka
# SpringBoot实现MQTT消息发送和接收方式
# Springboot详解RocketMQ实现消息发送与接收流程
# SpringBoot整合RocketMQ实现消息发送和接收的详细步骤
# SpringBoot webSocket实现发送广播、点对点消息和Android接收
# Spring Boot实现消息的发送和接收使用实战指南
# 的是
# 就会
# 多个
# 又要
# 分页
# 使用了
# 有一
# 是个
# 文档
# 大材小用
# 我觉得
# 有很多
# 没办法
# 就不能
# 只有一个
# 重写
# 刚开始
# 这篇文章
# 即是
# 很漂亮
相关文章:
移动端手机网站制作软件,掌上时代,移动端网站的谷歌SEO该如何做?
极客网站有哪些,DoNews、36氪、爱范儿、虎嗅、雷锋网、极客公园这些互联网媒体网站有什么差异?
潮流网站制作头像软件下载,适合母子的网名有哪些?
如何快速生成橙子建站落地页链接?
佛山网站制作系统,佛山企业变更地址网上办理步骤?
如何选择服务器才能高效搭建专属网站?
如何用PHP快速搭建CMS系统?
导航网站建站方案与优化指南:一站式高效搭建技巧解析
如何零成本快速生成个人自助网站?
内部网站制作流程,如何建立公司内部网站?
建站之星IIS配置教程:代码生成技巧与站点搭建指南
制作网站建设的公司有哪些,网站建设比较好的公司都有哪些?
深圳网站制作平台,深圳市做网站好的公司有哪些?
如何在建站之星网店版论坛获取技术支持?
如何续费美橙建站之星域名及服务?
javascript中对象的定义、使用以及对象和原型链操作小结
建站之星多图banner生成与模板自定义指南
建站主机服务器选型指南与性能优化方案解析
微信网站制作公司有哪些,民生银行办理公司开户怎么在微信网页上查询进度?
视频网站app制作软件,有什么好的视频聊天网站或者软件?
再谈Python中的字符串与字符编码(推荐)
宝塔建站助手安装配置与建站模板使用全流程解析
建站之星后台密码遗忘?如何快速找回?
建站VPS配置与SEO优化指南:关键词排名提升策略
免费网站制作appp,免费制作app哪个平台好?
如何通过宝塔面板实现本地网站访问?
宝塔建站后网页无法访问如何解决?
深圳企业网站制作设计,在深圳如何网上全流程注册公司?
济南网站建设制作公司,室内设计网站一般都有哪些功能?
如何使用Golang安装API文档生成工具_快速生成接口文档
C#如何序列化对象为XML XmlSerializer用法
专业网站制作服务公司,有哪些网站可以免费发布招聘信息?
如何在Windows服务器上快速搭建网站?
建站之星如何一键生成手机站?
如何在Golang中使用replace替换模块_指定本地或远程路径
建站之星代理费用多少?最新价格详情介绍
如何选择高性价比服务器搭建个人网站?
建站之星后台搭建步骤解析:模板选择与产品管理实操指南
c++怎么使用类型萃取type_traits_c++ 模板元编程类型判断【方法】
详解一款开源免费的.NET文档操作组件DocX(.NET组件介绍之一)
品牌网站制作公司有哪些,买正品品牌一般去哪个网站买?
网站制作企业,网站的banner和导航栏是指什么?
沈阳个人网站制作公司,哪个网站能考到沈阳事业编招聘的信息?
东莞专业网站制作公司有哪些,东莞招聘网站哪个好?
电商平台网站制作流程,电商网站如何制作?
专业商城网站制作公司有哪些,pi商城官网是哪个?
Android自定义listview布局实现上拉加载下拉刷新功能
网页设计与网站制作内容,怎样注册网站?
网站设计制作企业有哪些,抖音官网主页怎么设置?
怎么制作一个起泡网,水泡粪全漏粪育肥舍冬季氨气超过25ppm,可以有哪些措施降低舍内氨气水平?
*请认真填写需求信息,我们会在24小时内与您取得联系。