编程知识 cdmana.com

(十七) 整合spring cloud云架构 -消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:

  1. 简介:

Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

  1. 使用工具:

rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了

  1. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖
1.  <span style="font-size: 16px;"><!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 --> 
2.  <dependency> 
3.   <groupId>org.springframework.cloud</groupId> 
4.   <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 
5.  </dependency></span>
  1. 在yml文件里面配置rabbit mq
1.  <span style="font-size: 16px;">server: 
2.   port: 5666 
3.  spring: 
4.   application: 
5.   name: commonservice-mq-producer 
6.   profiles: 
7.   active: dev 
8.   cloud: 
9.   config: 
10.   discovery: 
11.   enabled: true 
12.   service-id: commonservice-config-server 
13.   <span style="color: #ff0000;"># rabbitmq和kafka都有相关配置的默认值,如果修改,可以再次进行配置 
14.   stream: 
15.   bindings: 
16.   mqScoreOutput: 
17.   destination: honghu_exchange 
18.   contentType: application/json 

20.   rabbitmq: 
21.   host: localhost 
22.   port: 5672 
23.   username: honghu 
24.   password: honghu</span> 
25.  eureka: 
26.   client: 
27.   service-url: 
28.   defaultZone: http://honghu:123456@localhost:8761/eureka 
29.   instance: 
30.   prefer-ip-address: true</span>
  1. 定义接口ProducerService
1.  <span style="font-size: 16px;">package com.honghu.cloud.producer; 

3.  import org.springframework.cloud.stream.annotation.Output; 
4.  import org.springframework.messaging.SubscribableChannel; 

6.  public interface ProducerService { 

8.   String SCORE_OUPUT = "mqScoreOutput"; 

10.   @Output(ProducerService.SCORE_OUPUT) 
11.   SubscribableChannel sendMessage(); 
12.  }</span>
  1. 定义绑定
1.  <span style="font-size: 16px;">package com.honghu.cloud.producer; 

3.  import org.springframework.cloud.stream.annotation.EnableBinding; 

5.  @EnableBinding(ProducerService.class) 
6.  public class SendServerConfig { 

8.  }</span>
  1. 定义发送消息业务ProducerController
1.  <span style="font-size: 16px;">package com.honghu.cloud.controller; 

4.  import org.springframework.beans.factory.annotation.Autowired; 
5.  import org.springframework.integration.support.MessageBuilder; 
6.  import org.springframework.messaging.Message; 
7.  import org.springframework.web.bind.annotation.PathVariable; 
8.  import org.springframework.web.bind.annotation.RequestBody; 
9.  import org.springframework.web.bind.annotation.RequestMapping; 
10.  import org.springframework.web.bind.annotation.RequestMethod; 
11.  import org.springframework.web.bind.annotation.RestController; 

13.  import com.honghu.cloud.common.code.ResponseCode; 
14.  import com.honghu.cloud.common.code.ResponseVO; 
15.  import com.honghu.cloud.entity.User; 
16.  import com.honghu.cloud.producer.ProducerService; 

18.  import net.sf.json.JSONObject; 

20.  @RestController 
21.  @RequestMapping(value = "producer") 
22.  public class ProducerController { 

24.   @Autowired 
25.   private ProducerService producerService; 

28.   /** 
29.   * 通过get方式发送</span>对象<span > 
30.   * @param name 路径参数 
31.   * @return 成功|失败 
32.   */ 
33.   @RequestMapping(value = "/sendObj", method = RequestMethod.GET) 
34.   public ResponseVO sendObj() { 
35.   User user = new User(1, "hello User"); 
36.   <span style="color: #ff0000;">Message<User> msg = MessageBuilder.withPayload(user).build();</span> 
37.   boolean result = producerService.sendMessage().send(msg); 
38.   if(result){ 
39.   return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); 
40.   } 
41.   return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); 
42.   } 

45.   /** 
46.   * 通过get方式发送字符串消息 
47.   * @param name 路径参数 
48.   * @return 成功|失败 
49.   */ 
50.   @RequestMapping(value = "/send/{name}", method = RequestMethod.GET) 
51.   public ResponseVO send(@PathVariable(value = "name", required = true) String name) { 
52.   Message msg = MessageBuilder.withPayload(name.getBytes()).build(); 
53.   boolean result = producerService.sendMessage().send(msg); 
54.   if(result){ 
55.   return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); 
56.   } 
57.   return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); 
58.   } 

60.   /** 
61.   * 通过post方式发送</span>json对象<span > 
62.   * @param name 路径参数 
63.   * @return 成功|失败 
64.   */ 
65.   @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST) 
66.   public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) { 
67.   Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build(); 
68.   boolean result = producerService.sendMessage().send(msg); 
69.   if(result){ 
70.   return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); 
71.   } 
72.   return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); 
73.   } 
74.  } 
75.  </span>
  1. 创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖
1.  <!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 --> 
2.  <dependency> 
3.   <groupId>org.springframework.cloud</groupId> 
4.   <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 
5.  </dependency>
  1. 在yml文件中配置:
1.  server: 
2.   port: 5111 
3.  spring: 
4.   application: 
5.   name: commonservice-mq-consumer1 
6.   profiles: 
7.   active: dev 
8.   cloud: 
9.   config: 
10.   discovery: 
11.   enabled: true 
12.   service-id: commonservice-config-server 

14.   <span style="color: #ff0000;">stream: 
15.   bindings: 
16.   mqScoreInput: 
17.   group: honghu_queue 
18.   destination: honghu_exchange 
19.   contentType: application/json 

21.   rabbitmq: 
22.   host: localhost 
23.   port: 5672 
24.   username: honghu 
25.   password: honghu</span> 
26.  eureka: 
27.   client: 
28.   service-url: 
29.   defaultZone: http://honghu:123456@localhost:8761/eureka 
30.   instance: 
31.   prefer-ip-address: true
  1. 定义接口ConsumerService
1.  package com.honghu.cloud.consumer; 

3.  import org.springframework.cloud.stream.annotation.Input; 
4.  import org.springframework.messaging.SubscribableChannel; 

6.  public interface ConsumerService { 

8.   <span style="color: #ff0000;">String SCORE_INPUT = "mqScoreInput"; 

10.   @Input(ConsumerService.SCORE_INPUT) 
11.   SubscribableChannel sendMessage();</span> 

13.  }
  1. 定义启动类和消息消费
1.  package com.honghu.cloud; 

3.  import org.springframework.boot.SpringApplication; 
4.  import org.springframework.boot.autoconfigure.SpringBootApplication; 
5.  import org.springframework.cloud.netflix.eureka.EnableEurekaClient; 
6.  import org.springframework.cloud.stream.annotation.EnableBinding; 
7.  import org.springframework.cloud.stream.annotation.StreamListener; 

9.  import com.honghu.cloud.consumer.ConsumerService; 
10.  import com.honghu.cloud.entity.User; 

12.  @EnableEurekaClient 
13.  @SpringBootApplication 
14.  @EnableBinding(ConsumerService.class) //可以绑定多个接口 
15.  public class ConsumerApplication { 

17.   public static void main(String[] args) { 
18.   SpringApplication.run(ConsumerApplication.class, args); 
19.   } 

21.   <span style="color: #ff0000;">@StreamListener(ConsumerService.SCORE_INPUT) 
22.   public void onMessage(Object obj) { 
23.   System.out.println("消费者1,接收到的消息:" + obj); 
24.   }</span> 

26.  }
  1. 分别启动commonservice-mq-producer、commonservice-mq-consumer1
  2. 通过postman来验证消息的发送和接收

 

 

 

 

 

可以看到接收到了消息,下一章我们介绍mq的集群方案。

到此,整个消息中心方案集成完毕(企业架构源码可以加求球:叁五三陆二肆柒二伍玖)

欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。

版权声明
本文为[黑骑士1203]所创,转载请带上原文链接,感谢
https://segmentfault.com/a/1190000038149594

Scroll to Top