!
也想出现在这里? 联系我们
广告区块

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

个人简介:

> ?个人主页:赵四司机
> ?学习方向:JAVA后端开发
> ?种一棵树最好的时间是十年前,其次是现在!
> ⏰往期文章:SpringBoot项目整合微信支付
> ?喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。

前言:

1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是简单的CRUD操作,假如有兴趣的话可以私信我我再看看要不要写几篇文章做个介绍。

2.完成上一阶段的学习,我就投入到了微服务的学习当中,所用教程为B站上面黑马的微服务教程。由于我的记性不是很好,所以对于新事物的学习我比较喜欢做笔记以加强理解,在这里我会将笔记的重点内容做个总结发布到“微服务学习”笔记栏目中。我是赵四,一名有追求的程序员,希望大家能多多支持,能给我点个关注就更好了。

目录

一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

(2)监听消息

(3)测试结果

2.传递对象消息

(1)修改生产者代码

(2)结果测试

二:功能引入

1.需求分析

2.逻辑分析

三:前期准备

1.引入依赖

2.定义常量

3.Kafka配置信息

四:代码实现

1.自媒体端

2.移动端


一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

创建一个Controller包并编写一个测试类用于发送消息

  1. package com.my.kafka.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class HelloController {
  8. @Autowired
  9. private KafkaTemplate kafkaTemplate;
  10. @GetMapping("hello")
  11. public String helloProducer(){
  12. kafkaTemplate.send("my-topic","Hello~");
  13. return "ok";
  14. }
  15. }

(2)监听消息

编写测试类用于接收消息:

  1. package com.my.kafka.listener;
  2. import org.junit.platform.commons.util.StringUtils;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class HelloListener {
  7. @KafkaListener(topics = "my-topic")
  8. public void helloListener(String message) {
  9. if(StringUtils.isNotBlank(message)) {
  10. System.out.println(message);
  11. }
  12. }
  13. }

(3)测试结果

打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

2.传递对象消息

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。

(1)修改生产者代码

  1. @GetMapping("hello")
  2. public String helloProducer(){
  3. User user = new User();
  4. user.setName("赵四");
  5. user.setAge(20);
  6. kafkaTemplate.send("my-topic", JSON.toJSONString(user));
  7. return "ok";
  8. }

(2)结果测试

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。

二:功能引入

1.需求分析

发布文章之后,可能会由于文章出现某些错误或者其他原因,我们会在文章管理端实现文章的上下架功能(见下图),也即当管理端实现对文章下架之后移动端将不会再展示该文章,只有该文章重新被上架之后才能在移动端看到该文章信息。

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

2.逻辑分析

【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

后端接收到前端传过来的参数之后要先做一个校验,参数不为空才能继续往下执行,首先应该根据前端传过来的文章id(自媒体端文章id)查询自媒体数据库的文章信息并判断该文章是否已是发布状态,因为只有审核成功并成功发布了的文章才能进行上下架操作。自媒体端微服务对文章上下架状态进行修改之后便可以向Kafka发送一条消息,该消息为Map对象,里面存储的数据为移动端的文章id以及前端传过来的上下架参数enable,当然要将该Map对象转换成JSON字符串才能进行发送。

文章微服务监听到Kafka发送过来的消息之后将JSON字符串转换成Map对象之后再获取相关参数对移动端文章的上下架状态进行修改。

三:前期准备

1.引入依赖

  1. dependency>
  2. groupId>org.springframework.kafkagroupId>
  3. artifactId>spring-kafkaartifactId>
  4. dependency>
  5. dependency>
  6. groupId>org.apache.kafkagroupId>
  7. artifactId>kafka-clientsartifactId>
  8. dependency>

2.定义常量

  1. package com.my.common.constans;
  2. public class WmNewsMessageConstants {
  3. public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
  4. }

3.Kafka配置信息

由于我是用Nacos来作为注册中心,所以配置信息放置在Nacos上面即可。

(1)自媒体端配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: 4.234.52.122:9092
  4. producer:
  5. retries: 10
  6. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2)移动端配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: 4.234.52.122:9092
  4. consumer:
  5. group-id: ${spring.application.name}-test
  6. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

四:代码实现

1.自媒体端

  1. @Autowired
  2. private KafkaTemplate kafkaTemplate;
  3. /**
  4. * 文章下架或上架
  5. * @param id
  6. * @param enable
  7. * @return
  8. */
  9. @Override
  10. public ResponseResult downOrUp(Integer id,Integer enable) {
  11. log.info("执行文章上下架操作...");
  12. if(id == null || enable == null) {
  13. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  14. }
  15. //根据id获取文章
  16. WmNews news = getById(id);
  17. if(news == null) {
  18. return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
  19. }
  20. //获取当前文章状态
  21. Short status = news.getStatus();
  22. if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
  23. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
  24. }
  25. //更改文章状态
  26. news.setEnable(enable.shortValue());
  27. updateById(news);
  28. log.info("更改文章上架状态{}-->{}",status,news.getEnable());
  29. //发送消息到Kafka
  30. Map map = new HashMap();
  31. map.put("articleId",news.getArticleId());
  32. map.put("enable",enable.shortValue());
  33. kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
  34. log.info("发送消息到Kafka...");
  35. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  36. }

2.移动端

(1)设置监听器

  1. package com.my.article.listener;
  2. import com.baomidou.mybatisplus.core.toolkit.StringUtils;
  3. import com.my.article.service.ApArticleService;
  4. import com.my.common.constans.WmNewsMessageConstants;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.kafka.annotation.KafkaListener;
  9. @Slf4j
  10. @Component
  11. public class EnableListener {
  12. @Autowired
  13. private ApArticleService apArticleService;
  14. @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
  15. public void downOrUp(String message) {
  16. if(StringUtils.isNotBlank(message)) {
  17. log.info("监听到消息{}",message);
  18. apArticleService.downOrUp(message);
  19. }
  20. }
  21. }

(2)获取消息并修改文章状态

  1. /**
  2. * 文章上下架
  3. * @param message
  4. * @return
  5. */
  6. @Override
  7. public ResponseResult downOrUp(String message) {
  8. Map map = JSON.parseObject(message, Map.class);
  9. //获取文章id
  10. Long articleId = (Long) map.get("articleId");
  11. //获取文章待修改状态
  12. Integer enable = (Integer) map.get("enable");
  13. //查询文章配置
  14. ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
  15. (Wrappers.lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
  16. if(apArticleConfig != null) {
  17. //上架
  18. if(enable == 1) {
  19. log.info("文章重新上架");
  20. apArticleConfig.setIsDown(false);
  21. apArticleConfigMapper.updateById(apArticleConfig);
  22. }
  23. //下架
  24. if(enable == 0) {
  25. log.info("文章下架");
  26. apArticleConfig.setIsDown(true);
  27. apArticleConfigMapper.updateById(apArticleConfig);
  28. }
  29. }
  30. else {
  31. throw new RuntimeException("文章信息不存在");
  32. }
  33. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  34. }
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_45750572/article/details/126018426
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
有新私信 私信列表
搜索