查看原文
其他

(eblog)8、消息异步通知、细节调整

吕一明 MarkerHub 2022-11-04

小Hub领读:

继续我们的eblog,今天来完成博客文章收藏,用户中心的设置!


项目名称:eblog

项目 Git 仓库:https://github.com/MarkerHub/eblog(给个 star 支持哈)

项目演示地址:https://markerhub.com:8082

前几篇项目讲解文章:

1、(eblog)Github 上最值得学习的 Springboot 开源博客项目!

2、eblog小 Hub 手把手教你如何从 0 搭建一个开源项目架构

3、eblog整合Redis,以及项目优雅的异常处理与返回结果封装

4、(eblog)用Redis的zset有序集合实现一个本周热议功能

5、(eblog)自定义Freemaker标签实现博客首页数据填充



1、细节调整

这一次作业,我们来修复一下bug,还有一些细节调整,因为博客的功能其实不多,业务逻辑也不复杂,后面我们还有搜索、群聊等功能,都是大模块。

文章收藏

文章收藏的js其实已经写好的了,只是有些条件没有触发而已,是什么条件呢,我们先来找到收藏的js先:

  • static/res/mods/jie.js

可以看到什么触发加载收藏的条件有两个:

  • 是否有id为LAY_jieAdmin的元素

  • layui.cache.user.uid是否为-1

LAY_jieAdmin是为了限定只有文章详情页才加载这个js,而其他页面不需要;那layui.cache.user.uid是哪里设置的呢?大家还记得我们一开始给html分模块的时候吗,我们在layout.ftl宏中有一段js,原本uid的值就是-1,所以我们需要把登录之后的值附上去。

  • templates/inc/layout.ftl

  1. <script>

  2. layui.cache.page = 'jie';

  3. layui.cache.user = {

  4. username: '${profile.username!"游客"}'

  5. ,uid: ${profile.id!'-1'}

  6. ,avatar: '${profile.avatar!"/res/images/avatar/00.jpg"}'

  7. ,experience: 0

  8. ,sex: '${profile.sex!'未知'}'

  9. };

  10. layui.config({

  11. version: "3.0.0"

  12. ,base: '/res/mods/'

  13. }).extend({

  14. fly: 'index'

  15. }).use('fly').use('jie').use('user');

  16. </script>

熟系freemarker语法的同学应该都懂${profile.id!'-1'}是啥意思了,!后面表示当值为空的默认值。 好,改好了之后刷新一下,你会发现有个弹窗提示"请求异常,请重试",我们暂时先不管,先把收藏功能搞定先,看看是不是收藏功能controller还没有导致的。

从上图可以看出,我已经把查看是否收藏功能的链接改了一下

  • /collection/find/

功能代码其实很简单,就从UserCollection表中查询是否有记录就行了,如果有表明已经收藏了,js会渲染出取消收藏的按钮,如果没有记录,就会渲染收藏的按钮。

  • com.example.controller.PostController

  1. @ResponseBody

  2. @PostMapping("/collection/find/")

  3. public Result collectionFind(Long cid) {

  4. int count = userCollectionService.count(new QueryWrapper<UserCollection>()

  5. .eq("post_id", cid)

  6. .eq("user_id", getProfileId()));


  7. return Result.succ(MapUtil.of("collection", count > 0));

  8. }

根据js,我直接返回的是data中放一个参数collection是否为true就行了。渲染效果如下: 

然后点击按钮,发现有两个链接(我改了一下链接前缀):

  • /collection/add/

  • /collection/remove/

分别代表这收藏和取消收藏,所以我们分别写这两个controller,注意都是ajax请求来的。收藏的逻辑也比较简单,首先判断一下是否已经收藏过了,已经收藏就返回提示已经收藏,未收藏就添加一天记录即可。

  • com.example.controller.PostController

  1. @ResponseBody

  2. @PostMapping("/collection/add/")

  3. public Result collectionAdd(Long cid) {

  4. Post post = postService.getById(cid);


  5. Assert.isTrue(post != null, "该帖子已被删除");


  6. int count = userCollectionService.count(new QueryWrapper<UserCollection>()

  7. .eq("post_id", cid)

  8. .eq("user_id", getProfileId()));


  9. if(count > 0) {

  10. return Result.fail("你已经收藏");

  11. }


  12. UserCollection collection = new UserCollection();

  13. collection.setUserId(getProfileId());

  14. collection.setCreated(new Date());

  15. collection.setModified(new Date());


  16. collection.setPostId(post.getId());

  17. collection.setPostUserId(post.getUserId());


  18. userCollectionService.save(collection);


  19. return Result.succ(MapUtil.of("collection", true));

  20. }

  • com.example.controller.PostController

取消收藏的逻辑:删除一条记录即可

  1. @ResponseBody

  2. @PostMapping("/collection/remove/")

  3. public Result collectionRemove(Long cid) {

  4. Post post = postService.getById(Long.valueOf(cid));


  5. Assert.isTrue(post != null, "该帖子已被删除");


  6. boolean hasRemove = userCollectionService.remove(new QueryWrapper<UserCollection>()

  7. .eq("post_id", cid)

  8. .eq("user_id", getProfileId()));


  9. return Result.succ(hasRemove);

  10. }

ok,收藏设计到的3个方法已经开发完毕,点击文章详情页的收藏和取消收藏,都能正常执行代码!无bug~

消息未读

到了这时候,你发现,刷新页面之后,还是有弹窗提示,这是啥问题?浏览器打开F12,切换到Network标签,因为我们猜想应该是一些异步请求出了异常,所以触发了弹窗提示。接下来我们就要找到这个请求,Network下,我们再点击XHR,因为这表示是发起的异步请求的链接。从这里我们就看到了一个nums/的请求是404,具体的请求其实是:http://localhost:8080/message/nums/,

然后我们再全局搜索/message/nums找到发起这个异步请求的js地方:

所以我们确定了这个弹窗应该就是这引起的了。所以我们去写一下这个方法。这是新消息通知,我们之前在用户中心弄过一个我的消息,但是好像没有状态(已读和未读),所以我需要在UserMessage上添加一个status字段标识已读和未读。记得数据库要添加字段。

  • com.example.entity.UserMessage

  1. /**

  2. * 状态:0未读,1已读

  3. */

  4. private Integer status;

然后我们再查下当前用户的状态未0的消息数量出来,就是新消息通知的数量了。

  • com.example.controller.IndexController

  1. @ResponseBody

  2. @PostMapping("/message/nums/")

  3. public Object messageNums() throws IOException {

  4. int count = userMessageService.count(new QueryWrapper<UserMessage>()

  5. .eq("to_user_id", getProfileId())

  6. .eq("status", 0)

  7. );

  8. return MapUtil.builder().put("status", 0).put("count", count).build();

  9. }

返回值是啥我是根据js推算出来的,js需要啥结果我就返回啥结果。重新运行代码之后,我们发现弹窗没有了,页面展示效果如下: 

消息通知

至此,新消息通知已经ok,接下来我们搞一个高大上一点的功能。我们刷微博简书头条等网站的时候,如果收到消息通知,一般来说不用我们刷新页面,而是实时给我们展示有消息来了,会突然有个新消息通知的图标提示我们,这是怎么做到的呢,结合我们之前学过的知识。我们可以找到几种方案来实现这个功能:

  • ajax定时加载刷新

  • websocket双工通讯

  • 长链接

我们课程中有节课是专门讲解websocket的,接下来我们就使用这个技术来实现这个功能。

同学们可以去回顾一下websocket的知识:

  • https://gitee.com/lv-success/git-third/tree/master/course-15-websocket/springboot-websocket

上面是一个springboot集成ws的demo,接下来我们安装这个例子的步骤把ws集成到我们现有的项目里面。

第一步:导入jar包

  • pom.xml

  1. <!-- ws -->

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-websocket</artifactId>

  5. </dependency>

第二步:编写ws配置

  • com.example.config.WebSocketConfig

  1. @Configuration

  2. @EnableWebSocketMessageBroker//注解表示开启使用STOMP协议来传输基于代理的消息,Broker就是代理的意思。

  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {


  4. /***

  5. * 注册 Stomp的端点

  6. * addEndpoint:添加STOMP协议的端点。提供WebSocket或SockJS客户端访问的地址

  7. * withSockJS:使用SockJS协议

  8. * @param registry

  9. */

  10. public void registerStompEndpoints(StompEndpointRegistry registry) {


  11. registry.addEndpoint("/websocket")

  12. .withSockJS();

  13. }


  14. /**

  15. * 配置消息代理

  16. * 启动Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker

  17. */

  18. public void configureMessageBroker(MessageBrokerRegistry registry) {

  19. registry.enableSimpleBroker("/user/", "/topic");//推送消息前缀

  20. registry.setApplicationDestinationPrefixes("/app");

  21. }


  22. }

我们来解析一下这是啥意思,首先@EnableWebSocketMessageBroker,springboot的手动配置大家还记得吧?这个也是开启ws消息代理,然后继承WebSocketMessageBrokerConfigurer重写registerStompEndpoints和configureMessageBroker方法,registerStompEndpoints方法是注册端点,addEndpoint("/websocket")表示注册一个端点叫websocket,那么前端就能通过这个链接连接到服务器实现双工通讯了。.withSockJS()意思是使用SockJs协议,回顾一下:

  • SockJs是解决浏览器不支持ws的情况

  • Stompjs是简化文本传输的格式

configureMessageBroker是配置消息代理,上面我们配置了/user,/topic都是需要消息代理的链接。前端/app链接前缀过来的消息都会进入消息代理。

有了这两步骤后,我们就可以使用ws了,我们先来写一下前端:

因为我们的消息通知是在头部的用户名称那里,所有的页面都有,所以我们把js写在layout.ftl上。

  1. $(function () {

  2. var elemUser = $('.fly-nav-user');


  3. if(layui.cache.user.uid !== -1 && elemUser[0]){

  4. var socket = new SockJS("/websocket");

  5. stompClient = Stomp.over(socket);

  6. stompClient.connect({},function (frame) {

  7. //subscribe订阅

  8. stompClient.subscribe('/user/' + ${profile.id} + '/messCount',function (res) {

  9. showTips(res.body);

  10. })

  11. })

  12. }

  13. });

前面的if判断,我是根据收藏那里来写的,var socket = new SockJS("/websocket");表示建立端点链接,这样前端就会和后端建立ws双工通道,stompClient = Stomp.over(socket);表示切换成stomp文本传输协议传输内容。stompClient.connect表示建立连接触发的方法,这个方法里面有个stompClient.subscribe,差不多就表示订阅这个消息队列的意思,当后端往/user/{userId}/messCount里面发送消息时候,当前用户就能接收到消息,res.body就是返回的内容,然后就是showTips方法,这个方法其实就是渲染新消息通知的样式,我们从之前的新消息通知那里吧对应的js复制过来即可:

  1. function showTips(count) {

  2. var msg = $('<a class="fly-nav-msg" href="javascript:;">'+ count +'</a>');


  3. var elemUser = $('.fly-nav-user');

  4. elemUser.append(msg);

  5. msg.on('click', function(){

  6. location.href = '/center/message/';

  7. });

  8. layer.tips('你有 '+ count +' 条未读消息', msg, {

  9. tips: 3

  10. ,tipsMore: true

  11. ,fixed: true

  12. });

  13. msg.on('mouseenter', function(){

  14. layer.closeAll('tips');

  15. })

  16. }

ok,那前端我们已经可以连上ws实现双工通讯,并且监听了/user/{userId}/messCount这个队列,所以后端往这里面发送消息前端就能收到然后实现showTips方法。 那后端什么时候该发送消息给前端呢?

  • 有人评论了作者文章,或者回复作者的评论

  • 系统消息等

ok,我们先来写一个wsService,写一个发送消息数量给前端的方法。

  • com.example.service.WsService

  1. void sendMessCountToUser(Long userId, Integer count);

他的实现类复杂嘛?其实不复杂,我们先来看下参数,userId,就是限定要给谁发送消息,count是消息数量,这里我们考虑多种情况,但count不为空时候,我们返回count数量的,当count为空时候,我们搜索userId所有未读的消息数量然后返回。

  • com.example.service.impl.WsServiceImpl

  1. @Slf4j

  2. @Service

  3. public class WsServiceImpl implements WsService {


  4. @Autowired

  5. private SimpMessagingTemplate messagingTemplate ;


  6. @Autowired

  7. UserMessageService userMessageService;


  8. /**

  9. * 订阅链接为/user/{userId}/messCount的用户能收到消息

  10. * /user为默认前缀

  11. *

  12. * @param userId

  13. * @param count

  14. */

  15. @Async

  16. public void sendMessCountToUser(Long userId, Integer count) {

  17. if(count == null) {

  18. count = userMessageService.count(new QueryWrapper<UserMessage>()

  19. .eq("status", 0)

  20. .eq("to_user_id", userId));

  21. }


  22. this.messagingTemplate.convertAndSendToUser(userId.toString(),"/messCount", count);

  23. log.info("ws发送消息成功------------> {}, 数量:{}", userId, count);

  24. }


  25. }

发送ws消息,用的是SimpMessagingTemplate,convertAndSendToUser方法会自动在前面添加前缀/user,然后是userId,加上后面的后缀/messCount,所以加起来的链接其实就是/user/{userId}/messCount,那么我们在需要发送消息的地方调用这个方法即可。 然后这里还有个内容要点,就是这里我用了一个@Async表示异步,从线程角度来说就是新起一个线程来执行这个方法,从而保证不影响调用方的事务和执行时间等。

那么我们来说下@Async的用法

异步@Async

其实这里我原本是想用队列来实现的,也能表示异步。本着让同学们接触到更多知识,我们这里就用了@Aysnc注解来实现,后面我们还是会用到MQ的,同学们别急。

使用这个注解我们需要开启异步配置。注解是@EnableAsync

  • com.example.config.AsyncConfig

  1. @EnableAsync

  2. @Configuration

  3. public class AsyncConfig {


  4. @Bean

  5. AsyncTaskExecutor asyncTaskExecutor() {

  6. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

  7. executor.setCorePoolSize(100);

  8. executor.setQueueCapacity(25);

  9. executor.setMaxPoolSize(500);

  10. return executor;

  11. }

  12. }

所以使用了@EnableAsync注解之后我们就可以使用@Aysnc注解来实现异步了,asyncTaskExecutor()其实就是我用来重写AsyncTaskExecutor用的,定义了最大线程组等信息。另外Async其实还可以配置很多信息,比如异步线程出错时候的处理(重试等),大家课后可以多查询一下资料,这注解我在工作中运用其实还比较多的。 好了,上面我们已经把发送ws消息的方法改成了异步方法,会起一个线程执行发送。我们现在需要调用的地方其实就是在评论那里。

  • com.example.controller.PostController#reply(Long, Long, String)

这样有人评论文章或者回复评论的时候,都能实时收到消息了,我们来演示一下效果:

至此,我们实现了传说中的实时通知功能!要膨胀了~

文章阅读量

接下来的任务,我们是要完善一下文章阅读量。之前访问文章,阅读量都没增加,现在我们来补上 这个一个bug。怎么做呢?是每访问一次我们就直接修改数据库?这里我们使用缓存在解决这个问题,每次访问,我们就直接缓存的阅读量增一,然后在某一时刻再同步到数据库中即可。访问文章时候,我们把缓存中的阅读量传到vo中,具体咋样的呢,我们找到之前写的com.example.controller.PostController#view方法,然后我加了这一句代码:

技术要把vo的viewCount值修改成缓存的数量。

  • com.example.service.impl.PostServiceImpl#setViewCount

  1. @Override

  2. public void setViewCount(Post post) {

  3. // 从缓存中获取阅读数量

  4. Integer viewCount = (Integer) redisUtil.hget("rank_post_" + post.getId(), "post:viewCount");


  5. if(viewCount != null) {

  6. post.setViewCount((Integer) viewCount + 1);

  7. } else {

  8. post.setViewCount(post.getViewCount() + 1);

  9. }


  10. // 设置新的阅读

  11. redisUtil.hset("rank_post_" + post.getId(), "post:viewCount", post.getViewCount());

  12. }

从代码中可以看到,我们先从缓存中获取ViewCount,然后设置post.setViewCount,最后再把加一之后的值同步到redis中。 ok,这一步还是比较简单的,接下来我们要起一个定时器,然后定时吧缓存中的阅读量同步到数据库中,实现数据同步。

  • com.example.schedules.ScheduledTasks

  1. @Slf4j

  2. @Component

  3. public class ScheduledTasks {


  4. @Autowired

  5. RedisUtil redisUtil;


  6. @Autowired

  7. private RedisTemplate redisTemplate;


  8. @Autowired

  9. PostService postService;


  10. /**

  11. * 阅读数量同步任务

  12. * 每天2点同步

  13. */

  14. // @Scheduled(cron = "0 0 2 * * ?")

  15. @Scheduled(cron = "0 0/1 * * * *")//一分钟(测试用)

  16. public void postViewCountSync() {

  17. Set<String> keys = redisTemplate.keys("rank_post_*");

  18. List<String> ids = new ArrayList<>();

  19. for (String key : keys) {

  20. String postId = key.substring("rank_post_".length());


  21. if(redisUtil.hHasKey("rank_post_" + postId, "post:viewCount")){

  22. ids.add(postId);


  23. }

  24. }


  25. if(ids.isEmpty()) return;


  26. List<Post> posts = postService.list(new QueryWrapper<Post>().in("id", ids));


  27. Iterator<Post> it = posts.iterator();

  28. List<String> syncKeys = new ArrayList<>();


  29. while (it.hasNext()) {

  30. Post post = it.next();

  31. Object count =redisUtil.hget("rank_post_" + post.getId(), "post:viewCount");

  32. if(count != null) {

  33. post.setViewCount(Integer.valueOf(count.toString()));


  34. syncKeys.add("rank_post_" + post.getId());

  35. } else {

  36. //不需要同步的

  37. }

  38. }


  39. if(posts.isEmpty()) return;


  40. boolean isSuccess = postService.updateBatchById(posts);

  41. if(isSuccess) {

  42. for(Post post : posts) {

  43. // 删除缓存中的阅读数量,防止重复同步(根据实际情况来)

  44. redisUtil.hdel("rank_post_" + post.getId(), "post:viewCount");

  45. }

  46. }


  47. log.info("同步文章阅读成功 ------> {}", syncKeys);

  48. }

  49. }

为何获取所有需要同步阅读的列表,我们用了keys命令,实际上当redis的缓存越来越大的时候,我们是不能再使用这keys命令的,因为keys命令会检索所有的key,是个耗时的过程,而redis又是个单线程的中间件,会影响其他命令的执行。所以理论上我们需要用scan命令。考虑到这里博客只是个简单业务,redis不会很大,所以就直接用了keys命令,后期大家可以自行优化。 然后获取到列表后,然后就是获取所有的实体,然后批量更新阅读量。



 给eblog一个star,感谢支持哈

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存