查看原文
其他

Spring Boot WebFlux + Server-sent事件示例

Yunooa SpringForAll社区 2020-10-17

在本文中,我们将向您展示如何使用服务器发送的事件开发响应式Web应用程序。

  • Spring Boot 2.1.2.RELEASE

  • Spring WebFlux 5.1.4.RELEASE

  • Thymeleaf 3.0.11.RELEASE

  • JUnit 5.3.2

  • Maven 3

在Spring中,返回JSON和标头 MediaType.TEXTEVENTSTREAM_VALUE

  1. @RestController

  2. public class CommentController {


  3. @GetMapping(path = "/comment/stream",

  4. produces = MediaType.TEXT_EVENT_STREAM_VALUE)

  5. public Flux<Comment> feed() {

  6. //...

  7. }

  8. }

在Javascript中,用于EventSource向上述端点发送请求。

  1. function loadComments () {

  2. this.source = null;

  3. this.start = function () {

  4. this.source = new EventSource("/comment/stream");

  5. this.source.addEventListener("message", function (event) {

  6. var comment = JSON.parse(event.data);

  7. //... update somewhere

  8. });

  9. this.source.onerror = function () {

  10. this.close();

  11. };

  12. };


  13. this.stop = function() {

  14. this.source.close();

  15. }

  16. }

  17. comment = new loadComments();

  18. window.onload = function() {

  19. comment.start();

  20. };

  21. window.onbeforeunload = function() {

  22. comment.stop();

  23. }

1.项目目录


2. Maven的pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>

  2. <project xmlns="http://maven.apache.org/POM/4.0.0"

  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

  5. http://maven.apache.org/xsd/maven-4.0.0.xsd">

  6. <modelVersion>4.0.0</modelVersion>


  7. <groupId>com.mkyong.spring.reactive</groupId>

  8. <artifactId>webflux-thymeleaf-sse</artifactId>

  9. <version>1.0</version>


  10. <properties>

  11. <java.version>1.8</java.version>

  12. <junit-jupiter.version>5.3.2</junit-jupiter.version>

  13. </properties>


  14. <parent>

  15. <groupId>org.springframework.boot</groupId>

  16. <artifactId>spring-boot-starter-parent</artifactId>

  17. <version>2.1.2.RELEASE</version>

  18. </parent>


  19. <dependencies>


  20. <!-- webflux reactive -->

  21. <dependency>

  22. <groupId>org.springframework.boot</groupId>

  23. <artifactId>spring-boot-starter-webflux</artifactId>

  24. </dependency>


  25. <!-- thymeleaf -->

  26. <dependency>

  27. <groupId>org.springframework.boot</groupId>

  28. <artifactId>spring-boot-starter-thymeleaf</artifactId>

  29. </dependency>


  30. <!-- exclude junit 4, prefer junit 5 -->

  31. <dependency>

  32. <groupId>org.springframework.boot</groupId>

  33. <artifactId>spring-boot-starter-test</artifactId>

  34. <scope>test</scope>

  35. <exclusions>

  36. <exclusion>

  37. <groupId>junit</groupId>

  38. <artifactId>junit</artifactId>

  39. </exclusion>

  40. </exclusions>

  41. </dependency>


  42. <!-- junit 5 -->

  43. <dependency>

  44. <groupId>org.junit.jupiter</groupId>

  45. <artifactId>junit-jupiter-engine</artifactId>

  46. <version>${junit-jupiter.version}</version>

  47. <scope>test</scope>

  48. </dependency>


  49. <dependency>

  50. <groupId>org.springframework.boot</groupId>

  51. <artifactId>spring-boot-devtools</artifactId>

  52. <optional>true</optional>

  53. </dependency>


  54. </dependencies>


  55. <build>

  56. <plugins>

  57. <plugin>

  58. <groupId>org.springframework.boot</groupId>

  59. <artifactId>spring-boot-maven-plugin</artifactId>

  60. </plugin>


  61. <plugin>

  62. <groupId>org.apache.maven.plugins</groupId>

  63. <artifactId>maven-surefire-plugin</artifactId>

  64. <version>2.22.0</version>

  65. </plugin>


  66. </plugins>

  67. </build>


  68. </project>

显示项目依赖项。

  1. $ mvn dependency:tree


  2. [INFO] com.mkyong.spring.reactive:webflux-thymeleaf-sse:jar:1.0

  3. [INFO] +- org.springframework.boot:spring-boot-starter-webflux:jar:2.1.2.RELEASE:compile

  4. [INFO] | +- org.springframework.boot:spring-boot-starter:jar:2.1.2.RELEASE:compile

  5. [INFO] | | +- org.springframework.boot:spring-boot-starter-logging:jar:2.1.2.RELEASE:compile

  6. [INFO] | | | +- ch.qos.logback:logback-classic:jar:1.2.3:compile

  7. [INFO] | | | | \- ch.qos.logback:logback-core:jar:1.2.3:compile

  8. [INFO] | | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.11.1:compile

  9. [INFO] | | | | \- org.apache.logging.log4j:log4j-api:jar:2.11.1:compile

  10. [INFO] | | | \- org.slf4j:jul-to-slf4j:jar:1.7.25:compile

  11. [INFO] | | +- javax.annotation:javax.annotation-api:jar:1.3.2:compile

  12. [INFO] | | \- org.yaml:snakeyaml:jar:1.23:runtime

  13. [INFO] | +- org.springframework.boot:spring-boot-starter-json:jar:2.1.2.RELEASE:compile

  14. [INFO] | | +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.8:compile

  15. [INFO] | | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile

  16. [INFO] | | | \- com.fasterxml.jackson.core:jackson-core:jar:2.9.8:compile

  17. [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.8:compile

  18. [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.8:compile

  19. [INFO] | | \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.8:compile

  20. [INFO] | +- org.springframework.boot:spring-boot-starter-reactor-netty:jar:2.1.2.RELEASE:compile

  21. [INFO] | | \- io.projectreactor.netty:reactor-netty:jar:0.8.4.RELEASE:compile

  22. [INFO] | | +- io.netty:netty-codec-http:jar:4.1.31.Final:compile

  23. [INFO] | | | \- io.netty:netty-codec:jar:4.1.31.Final:compile

  24. [INFO] | | +- io.netty:netty-codec-http2:jar:4.1.31.Final:compile

  25. [INFO] | | +- io.netty:netty-handler:jar:4.1.31.Final:compile

  26. [INFO] | | | +- io.netty:netty-buffer:jar:4.1.31.Final:compile

  27. [INFO] | | | \- io.netty:netty-transport:jar:4.1.31.Final:compile

  28. [INFO] | | | \- io.netty:netty-resolver:jar:4.1.31.Final:compile

  29. [INFO] | | +- io.netty:netty-handler-proxy:jar:4.1.31.Final:compile

  30. [INFO] | | | \- io.netty:netty-codec-socks:jar:4.1.31.Final:compile

  31. [INFO] | | \- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.31.Final:compile

  32. [INFO] | | +- io.netty:netty-common:jar:4.1.31.Final:compile

  33. [INFO] | | \- io.netty:netty-transport-native-unix-common:jar:4.1.31.Final:compile

  34. [INFO] | +- org.hibernate.validator:hibernate-validator:jar:6.0.14.Final:compile

  35. [INFO] | | +- javax.validation:validation-api:jar:2.0.1.Final:compile

  36. [INFO] | | +- org.jboss.logging:jboss-logging:jar:3.3.2.Final:compile

  37. [INFO] | | \- com.fasterxml:classmate:jar:1.4.0:compile

  38. [INFO] | +- org.springframework:spring-web:jar:5.1.4.RELEASE:compile

  39. [INFO] | | \- org.springframework:spring-beans:jar:5.1.4.RELEASE:compile

  40. [INFO] | +- org.springframework:spring-webflux:jar:5.1.4.RELEASE:compile

  41. [INFO] | | \- io.projectreactor:reactor-core:jar:3.2.5.RELEASE:compile

  42. [INFO] | | \- org.reactivestreams:reactive-streams:jar:1.0.2:compile

  43. [INFO] | \- org.synchronoss.cloud:nio-multipart-parser:jar:1.1.0:compile

  44. [INFO] | +- org.slf4j:slf4j-api:jar:1.7.25:compile

  45. [INFO] | \- org.synchronoss.cloud:nio-stream-storage:jar:1.1.3:compile

  46. [INFO] +- org.springframework.boot:spring-boot-starter-thymeleaf:jar:2.1.2.RELEASE:compile

  47. [INFO] | +- org.thymeleaf:thymeleaf-spring5:jar:3.0.11.RELEASE:compile

  48. [INFO] | | \- org.thymeleaf:thymeleaf:jar:3.0.11.RELEASE:compile

  49. [INFO] | | +- org.attoparser:attoparser:jar:2.0.5.RELEASE:compile

  50. [INFO] | | \- org.unbescape:unbescape:jar:1.1.6.RELEASE:compile

  51. [INFO] | \- org.thymeleaf.extras:thymeleaf-extras-java8time:jar:3.0.2.RELEASE:compile

  52. [INFO] +- org.springframework.boot:spring-boot-starter-test:jar:2.1.2.RELEASE:test

  53. [INFO] | +- org.springframework.boot:spring-boot-test:jar:2.1.2.RELEASE:test

  54. [INFO] | +- org.springframework.boot:spring-boot-test-autoconfigure:jar:2.1.2.RELEASE:test

  55. [INFO] | +- com.jayway.jsonpath:json-path:jar:2.4.0:test

  56. [INFO] | | \- net.minidev:json-smart:jar:2.3:test

  57. [INFO] | | \- net.minidev:accessors-smart:jar:1.2:test

  58. [INFO] | | \- org.ow2.asm:asm:jar:5.0.4:test

  59. [INFO] | +- org.assertj:assertj-core:jar:3.11.1:test

  60. [INFO] | +- org.mockito:mockito-core:jar:2.23.4:test

  61. [INFO] | | +- net.bytebuddy:byte-buddy:jar:1.9.7:test

  62. [INFO] | | +- net.bytebuddy:byte-buddy-agent:jar:1.9.7:test

  63. [INFO] | | \- org.objenesis:objenesis:jar:2.6:test

  64. [INFO] | +- org.hamcrest:hamcrest-core:jar:1.3:test

  65. [INFO] | +- org.hamcrest:hamcrest-library:jar:1.3:test

  66. [INFO] | +- org.skyscreamer:jsonassert:jar:1.5.0:test

  67. [INFO] | | \- com.vaadin.external.google:android-json:jar:0.0.20131108.vaadin1:test

  68. [INFO] | +- org.springframework:spring-core:jar:5.1.4.RELEASE:compile

  69. [INFO] | | \- org.springframework:spring-jcl:jar:5.1.4.RELEASE:compile

  70. [INFO] | +- org.springframework:spring-test:jar:5.1.4.RELEASE:test

  71. [INFO] | \- org.xmlunit:xmlunit-core:jar:2.6.2:test

  72. [INFO] | \- javax.xml.bind:jaxb-api:jar:2.3.1:test

  73. [INFO] | \- javax.activation:javax.activation-api:jar:1.2.0:test

  74. [INFO] +- org.junit.jupiter:junit-jupiter-engine:jar:5.3.2:test

  75. [INFO] | +- org.apiguardian:apiguardian-api:jar:1.0.0:test

  76. [INFO] | +- org.junit.platform:junit-platform-engine:jar:1.3.2:test

  77. [INFO] | | +- org.junit.platform:junit-platform-commons:jar:1.3.2:test

  78. [INFO] | | \- org.opentest4j:opentest4j:jar:1.1.1:test

  79. [INFO] | \- org.junit.jupiter:junit-jupiter-api:jar:5.3.2:test

  80. [INFO] \- org.springframework.boot:spring-boot-devtools:jar:2.1.2.RELEASE:compile (optional)

  81. [INFO] +- org.springframework.boot:spring-boot:jar:2.1.2.RELEASE:compile

  82. [INFO] | \- org.springframework:spring-context:jar:5.1.4.RELEASE:compile

  83. [INFO] | +- org.springframework:spring-aop:jar:5.1.4.RELEASE:compile

  84. [INFO] | \- org.springframework:spring-expression:jar:5.1.4.RELEASE:compile

  85. [INFO] \- org.springframework.boot:spring-boot-autoconfigure:jar:2.1.2.RELEASE:compile

3. Spring Boot + Spring WebFlux

3.1 Spring基于WebFlux注释的控制器。启用数据流。写produces = MediaType.TEXTEVENTSTREAM_VALUE

  1. CommentController.java


  2. package com.mkyong.reactive.controller;

  3. import com.mkyong.reactive.model.Comment;

  4. import com.mkyong.reactive.repository.CommentRepository;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.http.MediaType;

  7. import org.springframework.web.bind.annotation.GetMapping;

  8. import org.springframework.web.bind.annotation.RestController;

  9. import reactor.core.publisher.Flux;


  10. @RestController

  11. public class CommentController {


  12. @Autowired

  13. private CommentRepository commentRepository;


  14. @GetMapping(path = "/comment/stream",

  15. produces = MediaType.TEXT_EVENT_STREAM_VALUE)

  16. public Flux<Comment> feed() {

  17. return this.commentRepository.findAll();

  18. }


  19. }

  20. MainController.java


  21. package com.mkyong.reactive.controller;


  22. import org.springframework.stereotype.Controller;

  23. import org.springframework.ui.Model;

  24. import org.springframework.web.bind.annotation.GetMapping;


  25. @Controller

  26. public class MainController {


  27. @GetMapping("/")

  28. public String index(final Model model) {

  29. return "index";

  30. }


  31. }

3.2在repository,返回一个Flux对象。

  1. CommentRepository.java


  2. package com.mkyong.reactive.repository;


  3. import com.mkyong.reactive.model.Comment;

  4. import reactor.core.publisher.Flux;


  5. public interface CommentRepository {


  6. Flux<Comment> findAll();


  7. }


  8. ReactiveCommentRepository.java

  9. package com.mkyong.reactive.repository;


  10. import com.mkyong.reactive.model.Comment;

  11. import com.mkyong.reactive.utils.CommentGenerator;

  12. import org.springframework.stereotype.Repository;

  13. import reactor.core.publisher.Flux;


  14. import java.time.Duration;

  15. import java.util.Arrays;

  16. import java.util.List;


  17. @Repository

  18. public class ReactiveCommentRepository implements CommentRepository {


  19. @Override

  20. public Flux<Comment> findAll() {


  21. //simulate data streaming every 1 second.

  22. return Flux.interval(Duration.ofSeconds(1))

  23. .onBackpressureDrop()

  24. .map(this::generateComment)

  25. .flatMapIterable(x -> x);


  26. }


  27. private List<Comment> generateComment(long interval) {


  28. Comment obj = new Comment(

  29. CommentGenerator.randomAuthor(),

  30. CommentGenerator.randomMessage(),

  31. CommentGenerator.getCurrentTimeStamp());

  32. return Arrays.asList(obj);


  33. }


  34. }

3.3一个用于生成随机注释的utils类。

  1. CommentGenerator.java


  2. package com.mkyong.reactive.utils;


  3. import java.time.LocalDateTime;

  4. import java.time.format.DateTimeFormatter;

  5. import java.util.Arrays;

  6. import java.util.List;

  7. import java.util.Random;


  8. public class CommentGenerator {


  9. private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");

  10. private static final Random RANDOM = new Random(System.currentTimeMillis());


  11. private static final List<String> COMMENT_AUTHOR =

  12. Arrays.asList(

  13. "Mkyong", "Oliver", "Jack", "Harry", "Jacob",

  14. "Isla", "Emily", "Poppy", "Ava", "Isabella");



  15. private static final List<String> COMMENT_MESSAGE =

  16. Arrays.asList(

  17. "I Love this!",

  18. "Me too!",

  19. "Wow",

  20. "True!",

  21. "Hello everyone here?",

  22. "Good!");


  23. public static String randomAuthor() {

  24. return COMMENT_AUTHOR.get(RANDOM.nextInt(COMMENT_AUTHOR.size()));

  25. }


  26. public static String randomMessage() {

  27. return COMMENT_MESSAGE.get(RANDOM.nextInt(COMMENT_MESSAGE.size()));

  28. }


  29. public static String getCurrentTimeStamp() {

  30. return dtf.format(LocalDateTime.now());

  31. }

  32. }

3.4评论模型。

  1. package com.mkyong.reactive.model;


  2. public class Comment {


  3. private String author;

  4. private String message;

  5. private String timestamp;


  6. //getter, setter and constructor

  7. }

3.5启动Spring Boot。

  1. package com.mkyong.reactive;


  2. import org.springframework.boot.SpringApplication;

  3. import org.springframework.boot.autoconfigure.SpringBootApplication;


  4. @SpringBootApplication

  5. public class CommentWebApplication {


  6. public static void main(String[] args) {

  7. SpringApplication.run(CommentWebApplication.class, args);

  8. }


  9. }

4. Thymeleaf

百里香叶模板中没有特殊的反应标签,只使用正常循环。

  1. <!DOCTYPE html>

  2. <html>

  3. <head>

  4. <meta charset="utf-8">

  5. <meta http-equiv="X-UA-Compatible" content="IE=edge">

  6. <meta name="viewport" content="width=device-width, initial-scale=1">

  7. <link data-th-href="@{/css/bootstrap.min.css}" rel="stylesheet">

  8. <link data-th-href="@{/css/main.css}" rel="stylesheet">

  9. </head>

  10. <body>


  11. <div class="container">


  12. <div class="row">

  13. <div id="title">

  14. <h1>Spring WebFlux + Server Sent Events</h1>

  15. </div>


  16. <table id="comments" class="table table-striped">

  17. <thead>

  18. <tr>

  19. <th width="10%">Author</th>

  20. <th width="60%">Message</th>

  21. <th width="30%">Date</th>

  22. </tr>

  23. </thead>

  24. <tbody>

  25. <tr class="result" data-th-each="comment : ${comments}">

  26. <td>[[${comment.author}]]</td>

  27. <td>[[${comment.message}]]</td>

  28. <td>[[${comment.timestamp}]]</td>

  29. </tr>

  30. </tbody>

  31. </table>

  32. </div>


  33. </div>


  34. <script data-th-src="@{/js/main.js}"></script>


  35. </body>


  36. </html>

5. JavaScript EventSource

关键是使用Javascript EventSource类发送请求并监听message事件,并将流数据反应更新到表中。

  1. function loadComments () {


  2. this.source = null;


  3. this.start = function () {


  4. var commentTable = document.getElementById("comments");


  5. this.source = new EventSource("/comment/stream");


  6. this.source.addEventListener("message", function (event) {


  7. // These events are JSON, so parsing and DOM fiddling are needed

  8. var comment = JSON.parse(event.data);


  9. var row = commentTable.getElementsByTagName("tbody")[0].insertRow(0);

  10. var cell0 = row.insertCell(0);

  11. var cell1 = row.insertCell(1);

  12. var cell2 = row.insertCell(2);


  13. cell0.className = "author-style";

  14. cell0.innerHTML = comment.author;


  15. cell1.className = "text";

  16. cell1.innerHTML = comment.message;


  17. cell2.className = "date";

  18. cell2.innerHTML = comment.timestamp;


  19. });


  20. this.source.onerror = function () {

  21. this.close();

  22. };


  23. };


  24. this.stop = function() {

  25. this.source.close();

  26. }


  27. }


  28. comment = new loadComments();


  29. /*

  30. * Register callbacks for starting and stopping the SSE controller.

  31. */

  32. window.onload = function() {

  33. comment.start();

  34. };

  35. window.onbeforeunload = function() {

  36. comment.stop();

  37. }

6.单元测试

WebTestClient单元测试流式响应

  1. package com.mkyong.reactive;


  2. import com.mkyong.reactive.model.Comment;

  3. import org.junit.jupiter.api.Test;

  4. import org.springframework.beans.factory.annotation.Autowired;

  5. import org.springframework.boot.test.context.SpringBootTest;

  6. import org.springframework.http.MediaType;

  7. import org.springframework.test.web.reactive.server.WebTestClient;


  8. import java.util.List;


  9. import static org.junit.jupiter.api.Assertions.assertEquals;


  10. @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)

  11. public class TestCommentWebApplication {


  12. @Autowired

  13. private WebTestClient webClient;


  14. @Test

  15. public void testCommentStream() {


  16. List<Comment> comments = webClient

  17. .get().uri("/comment/stream")

  18. .accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE))

  19. .exchange()

  20. .expectStatus().isOk()

  21. .returnResult(Comment.class)

  22. .getResponseBody()

  23. .take(3) // take 3 comment objects

  24. .collectList()

  25. .block();


  26. comments.forEach(x -> System.out.println(x));


  27. assertEquals(3, comments.size());


  28. }


  29. }

7.Demo

  1. $ mvn spring-boot:run


  2. 2019-02-11 15:41:17.657 INFO 257192 --- [ restartedMain] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080

URL = http:// localhost:8080 数据是流式传输,随机注释将每1秒显示一次。

8.下载源代码

  • $ git clone https://github.com/mkyong/spring-boot.git

  • $ cd webflux-thymeleaf-serversentevent

  • $ mvn spring-boot:run

原文链接:https://www.mkyong.com/spring-boot/spring-boot-webflux-server-sent-events-example/

作者: mkyong

译者:Yunooa


推荐: Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现

上一篇:Spring Boot with Hibernate




 关注公众号

点击原文阅读更多



    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存