SpringBoot与ElasticSearch、ActiveMQ、RocketMQ的整合及多环境配置、响应式框架WebFlux、服务器端主动推送SSE技术、生产环境部署、Actuator监控平台
- 管理员
- 405阅读
- 2022.07.29
(1)主要的搜索框架:MySQL、Solr、ElasticSearch
- MySQL:使用 like 进行模糊查询,存在性能问题
- Solr:底层使用 Lucene,适用于中小规模数据量场景
- ElasticSearch:适用于数据量特别大的场景,PB、TB 级别。使用纯 Java开发,ElasticSearch 从 4 版本升级到 5 版本改动较大,但是 5 版本后,改动不大
(2)ElasticSearch 主要特点
- 全文检索、 结构化检索
- 数据统计、分析
- 接近实时处理,分布式搜索,可部署数百台服务器,处理 PB 级别的数据
- 搜索纠错,自动完成
- 适用场景:日志搜索,数据聚合,数据监控,报表统计分析
- 应用实例:维基百科、Stack Overflow、GitHub
(3)ElasticSearch 6.x 新特性
- 6.2.x 版本基于 Lucene 7.x,更快,性能进一步提升。其中对应的序列化组件,升级到 Jackson 2.8。6.x 版本不再支持一个索引库里面多个type,所以一个 index 索引库只能存在 1 个 type。MySQL 与 ElasticSearch 的对应关系如下:
MySQL: database table record ElasticSearch: index type(只能存在一个) document复制
- 推荐使用 5.0 版本推出的 Java REST/HTTP 客户端,依赖少,比 Transport 使用更方便,在基准测试中,性能并不输于 Transport 客户端。推荐使用这种方式进行开发使用,在节点故障和特定响应代码的情况下进行故障转移,失败的连接处罚(失败的节点是否重试取决于失败的连续次数,失败的次数越多,客户端再次尝试同一节点之前等待的时间越长)。
(4)快速部署 ElasticSearch 5.6.x
- 配置 JDK 1.8
- 使用 wget 下载 ElasticSearch 安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.8.tar.gz复制
- 解压安装包:
tar -zxvf elasticsearch-5.6.8.tar.gz复制
- 外网访问配置:
~]$ vim elasticsearch/conf/elasticsearch.yml network.host: 0.0.0.0复制
- 报错一:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # /usr/local/software/temp/elasticsearch-6.2.2/hs_err_pid1912.log复制
解决方案:内存不够,购买阿里云、腾讯云,亚马逊云的机器可以动态增加内存
- 报错二:
[root@iZwz95j86y235aroi85ht0Z bin]# ./elasticsearch [2018-02-22T20:14:04,870][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [] uncaught exception in thread [main] org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:125) ~[elasticsearch-6.2.2.jar:6.2.2] at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112) ~[elasticsearch-6.2.2.jar:6.2.2] at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.2.2.jar:6.2.2] at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.2.2.jar:6.2.2]复制
解决方案:只能使用非 root 用户运行 ElasticSearch,需要添加普通用户:
useradd -m es passwd es复制
- 报错三:
~]$ ./elasticsearch Exception in thread "main" java.nio.file.AccessDeniedException: /usr/local/software/temp/elasticsearch-6.2.2/config/jvm.options复制
解决方案:ElasticSearch 目录权限不够:
chmod 777 -R 当前es目录复制
- 集群测试 测试工具:Postman 工具 查看集群状态:localhost:9200/_cat/health?v 查看索引列表:localhost:9200/_cat/indices?v
(5)SpringBoot 整合 ElasticSearch
- pom.xml 文件中添加 Maven 依赖
复制org.springframework.boot spring-boot-starter-data-elasticsearch
- application.properties 配置文件
spring.data.elasticsearch.cluster-name=elasticsearch spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300 spring.data.elasticsearch.repositories.enabled=true复制
- Article.java
package net.xdclass.base_project.domain; import java.io.Serializable; import org.springframework.data.elasticsearch.annotations.Document; /** * 功能描述:文章对象 */ // "blog" 与 "article" 都要小写 @Document(indexName = "blog", type = "article") public class Article implements Serializable{ private static final long serialVersionUID = 1L; private long id; private String title; private String summary; private String content; private int pv; public long getId() { return id; } public void setId(long id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getSummary() { return summary; } public void setSummary(String summary) { this.summary = summary; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public int getPv() { return pv; } public void setPv(int pv) { this.pv = pv; } }复制
- ArticleRepository.java
package net.xdclass.base_project.repository; import net.xdclass.base_project.domain.Article; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Component; import org.springframework.stereotype.Repository; @Component //@Repository public interface ArticleRepository extends ElasticsearchRepository复制{ }
- ArticleController.java
package net.xdclass.base_project.controller; import net.xdclass.base_project.domain.Article; import net.xdclass.base_project.domain.JsonData; import net.xdclass.base_project.repository.ArticleRepository; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/api/v1/article") public class ArticleController { @Autowired private ArticleRepository articleRepository; @GetMapping("save") public Object save(long id,String title){ Article article = new Article(); article.setId(id); article.setPv(123); article.setContent("springboot整合elasticsearch,这个是新版本 2018年录制"); article.setTitle(title); article.setSummary("搜索框架整合"); articleRepository.save(article); return JsonData.buildSuccess(); } @GetMapping("search") public Object search(String title){ // 搜索全部文档 // QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); // 单个匹配,搜索标题为 title 的文档 QueryBuilder queryBuilder = QueryBuilders.matchQuery("title", title); Iterable复制list = articleRepository.search(queryBuilder); return JsonData.buildSuccess(list); } }
- 查看 ElasticSearch 中存放的数据: 查看索引信息:http://localhost:9200/_cat/indices?v 查看某个索引库结构:http://localhost:9200/blog 查看某个对象:http://localhost:9200/blog/article/1
(1)JMS(Java Message Service,Java 消息服务),Java 平台中关于面向消息中间件的接口,是一种与厂商无关的 API,用来访问消息、收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系型数据库的接口。 示例:微信支付加消息队列:
- 基本概念:
- JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
- JMS生产者(Message Producer)
- JMS消费者(Message Consumer)
- JMS消息
- JMS队列
- JMS主题
- 编程模型(MQ中需要用的一些类):
- ConnectionFactory:连接工厂,JMS 用它创建连接
- Connection:JMS 客户端到 JMS Provider 的连接
- Session:一个发送或接收消息的线程
- Destination:消息的目的地,消息发送给谁.
- MessageConsumer / MessageProducer:消息的接收者,消费者
- JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
- 点对点类型:
- 发布/订阅类型:
- 特点: 跨平台、多语言、多项目、解耦、分布式事务、流量控制、最终一致性、RPC调用(上下游对接,数据源变动后通知下游)
(2)ActiveMQ 5.x 消息队列
- ActiveMQ 5.x 特点:
- 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
- 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
- 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
- Spring支持,ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
- 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
- 使用JDBC和高性能日志支持非常快速的持久化
- ActiveMQ 5.x 安装:
- 下载地址:http://activemq.apache.org/activemq-5153-release.html
- 快速开始:http://activemq.apache.org/getting-started.html
- 如果是32位的机器,就双击 win32 目录下的 activemq.bat,如果是 64 位机器,则双击 win64 目录下的 activemq.bat
- bin 目录里面启动,选择对应的系统版本和位数,启动命令:activeMQ start
- 启动后访问路径:http://127.0.0.1:8161/
- 用户名和密码默认都是 admin
- ActiveMQ 5.x 面板:
- Name:队列名称
- Number Of Pending Messages:等待消费的消息个数
- Number Of Consumers:当前连接的消费者数目
- Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减
- Messages Dequeued:已经消费的消息数量
(3)SpringBoot 整合 ActiveMQ 之点对点消息和发布订阅模式
- pom.xml 中加入 Maven 依赖
## pom.xml复制4.0.0 net.xdclass base_project 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-devtools true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-activemq org.apache.activemq activemq-pool org.springframework.boot spring-boot-maven-plugin
- application.properties 配置
## application.properties # 整合jms测试,安装在别的机器,防火墙和端口号记得开放 spring.activemq.broker-url=tcp://127.0.0.1:61616 # 集群配置 # spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) spring.activemq.user=admin spring.activemq.password=admin # 下列配置要增加依赖 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 # default point to point # spring.jms.pub-sub-domain=true复制
- 各部分代码
## XdclassApplication.java package net.xdclass.base_project; import javax.jms.ConnectionFactory; import javax.jms.Queue; import javax.jms.Topic; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; @SpringBootApplication @EnableJms // 开启支持 JMS public class XdclassApplication { @Bean public Topic topic(){ return new ActiveMQTopic("video.topic"); } @Bean public Queue queue(){ return new ActiveMQQueue("common.queue"); } @Bean public JmsListenerContainerFactory> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } public static void main(String[] args) { SpringApplication.run(XdclassApplication.class, args); } }复制
## CommonConsumer.java package net.xdclass.base_project.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class CommonConsumer { @JmsListener(destination="common.queue") public void receiveQueue(String text){ System.out.println("CommonConsumer收到的报文为:"+text); } }复制
## OrderConsumer.java package net.xdclass.base_project.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class OrderConsumer { @JmsListener(destination="order.queue") public void receiveQueue(String text){ System.out.println("OrderConsumer收到的报文为:"+text); } }复制
## TopicSub.java package net.xdclass.base_project.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class TopicSub { @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("video.topic 消费者:receive1="+text); } @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("video.topic 消费者:receive2="+text); } @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive3(String text){ System.out.println("video.topic 消费者:receive3="+text); } }复制
## ProducerService.java package net.xdclass.base_project.service; import javax.jms.Destination; /** * 功能描述:消息生产 */ public interface ProducerService { /** * 功能描述:指定消息队列,还有消息 * @param destination * @param message */ public void sendMessage(Destination destination, final String message); /** * 功能描述:使用默认消息队列, 发送消息 * @param message */ public void sendMessage( final String message); /** * 功能描述:消息发布者 * @param msg */ public void publish(String msg); }复制
## ProducerServiceImpl.java package net.xdclass.base_project.service.impl; import javax.jms.Destination; import javax.jms.Queue; import javax.jms.Topic; import net.xdclass.base_project.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; /** * 功能描述:消息生产者 */ @Service public class ProducerServiceImpl implements ProducerService{ @Autowired private Queue queue; // 用来发送消息到broker的对象 @Autowired private JmsMessagingTemplate jmsTemplate; // 发送消息,destination是发送到的队列,message是待发送的消息 @Override public void sendMessage(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } // 发送消息,destination是发送到的队列,message是待发送的消息 @Override public void sendMessage(final String message) { jmsTemplate.convertAndSend( message); } //=======发布订阅相关代码========= @Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic, msg); } }复制
## OrderController.java package net.xdclass.base_project.controller; import javax.jms.Destination; import net.xdclass.base_project.domain.JsonData; import net.xdclass.base_project.service.ProducerService; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 功能描述:模拟微信支付回调 */ @RestController @RequestMapping("/api/v1") public class OrderController { @Autowired private ProducerService producerService; /** * 功能描述:微信支付回调接口 * @param msg 支付信息 * @return */ @GetMapping("order") public Object order(String msg){ Destination destination = new ActiveMQQueue("order.queue"); producerService.sendMessage(destination, msg); return JsonData.buildSuccess(); } @GetMapping("common") public Object common(String msg){ producerService.sendMessage(msg); return JsonData.buildSuccess(); } /** * 功能描述:微信支付回调接口 * @param msg 支付信息 * @return */ @GetMapping("comment") public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{ // 创建一个消息实例,包含 topic、tag 和 消息体 Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); //同步的方式,会有返回结果,发送的是普通消息 SendResult result = msgProducer.getProducer().send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); return JsonData.buildSuccess(); } }复制
- 模拟请求:http://localhost:8080/api/v1/order?msg=12312321321312
(1)RocketMQ Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。 特点:
- 在高压下 1 毫秒内的响应延迟超过99.6%
- 适合金融类业务,高可用性跟踪和审计功能
- 支持发布订阅模式和点对点模式
- 支持拉 pull 和推 push 两种消息模式
- 单一队列支持百万消息访问量
- 支持单 master 节点,多 master 节点,多 master 多 slave 节点
概念:
- Producer:消息生产者
- Producer Group:消息生产者组,发送同类消息的一个消息生产者组
- Consumer:消费者
- Consumer Group:消费同个消息的多个实例
- Tag:标签,子主题(二级分类),用于区分同一个主题下的不同业务的消息
- Topic:主题
- Message:消息
- Broker:MQ程序,接收生产的消息,提供给消费者消费的程序
- Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现和路由
官网地址:http://rocketmq.apache.org/。
(2)RocketMQ 本地部署
- 解压压缩包
- 进入 bin 目录,启动 namesrv:
nohup sh mqnamesrv &复制
- 查看日志:
tail -f nohup.out复制
结尾:The Name Server boot success. serializeType=JSON 表示启动成功。
- 启动 broker:
nohup sh mqbroker -n 127.0.0.1:9876 &复制
- 关闭 nameserver broker:
sh mqshutdown namesrv sh mqshutdown broker复制
(3)RocketMQ 可视化控制台
- 下载源码:https://github.com/apache/rocketmq-externals
- 编译打包:
mvn clean package -Dmaven.test.skip=true复制
- target 目录通过java -jar的方式运行
- 如果无法连接获取 broker 的信息,则修改配置文件中 namesrvAddr 为本地 IP 地址及端口号,并在程序中 src/main/resources/application.properties 添加如下配置:
rocketmq.config.namesrvAddr=192.168.0.101:9876复制
- 访问地址:http://localhost:8080
- 注意:在阿里云,腾讯云或者虚拟机,记得检查端口号和防火墙是否启动
(4)SpringBoot 整合 RocketMQ
## pom.xml复制4.0.0 net.xdclass base_project 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE 4.1.0-incubating org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-devtools true org.springframework.boot spring-boot-starter-test test org.apache.rocketmq rocketmq-client ${rocketmq.version} org.apache.rocketmq rocketmq-common ${rocketmq.version} org.springframework.boot spring-boot-maven-plugin
## application.properties #通过触发器,去控制什么时候进行热加载部署新的文件 spring.devtools.restart.trigger-file=trigger.txt #自定义启动banner文件的路径 spring.banner.location=banner.txt # 消费者的组名 apache.rocketmq.consumer.PushConsumer=orderConsumer # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=127.0.0.1:9876复制
## MsgProducer.java package net.xdclass.base_project.jms; import javax.annotation.PostConstruct; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class MsgProducer { // 生产者的组名 @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; // NameServer 地址 @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; private DefaultMQProducer producer ; public DefaultMQProducer getProducer(){ return this.producer; } @PostConstruct public void init() { //生产者的组名 producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多个地址以 ; 隔开 //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876"); producer.setNamesrvAddr(namesrvAddr); producer.setVipChannelEnabled(false); try { // Producer对象在使用之前必须要调用start初始化,只能初始化一次 producer.start(); } catch (Exception e) { e.printStackTrace(); } // producer.shutdown(); 一般在应用上下文,关闭的时候进行关闭,用上下文监听器 } }复制
## MsgConsumer.java package net.xdclass.base_project.jms; import javax.annotation.PostConstruct; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class MsgConsumer { // 消费者的组名 @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; // NameServer 地址 @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { //消费者的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr(namesrvAddr); try { //设置consumer所订阅的Topic和Tag,*代表全部的Tag consumer.subscribe("testTopic", "*"); //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息 //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //MessageListenerOrderly 这个是有序的 //MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多 consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);//输出消息内容 String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }复制
## OrderController.java package net.xdclass.base_project.controller; import java.io.UnsupportedEncodingException; import net.xdclass.base_project.domain.JsonData; import net.xdclass.base_project.jms.MsgProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 功能描述:模拟微信支付回调 */ @RestController @RequestMapping("/api/v1") public class OrderController { @Autowired private MsgProducer msgProducer; /** * 功能描述:微信支付回调接口 * @param msg 支付信息 * @param tag 消息二级分类 * @return */ @GetMapping("order") public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{ // 创建一个消息实例,包含 topic、tag 和 消息体 Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = msgProducer.getProducer().send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); return JsonData.buildSuccess(); } /** * 功能描述:微信支付回调接口 * @param msg 支付信息 * @return */ @GetMapping("comment") public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{ // 创建一个消息实例,包含 topic、tag 和 消息体 Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); //同步的方式,会有返回结果,发送的是普通消息 SendResult result = msgProducer.getProducer().send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); return JsonData.buildSuccess(); } }复制
- 报错一:org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local] 解决方案:多网卡问题处理。
- 在 producer 部分设置:
producer.setVipChannelEnabled(false);复制
- 编辑 RocketMQ 配置文件:broker.conf(下列 IP 为自己的 IP)
namesrvAddr = 192.168.0.101:9876 brokerIP1 = 192.168.0.101复制
- 报错二:DESC: service not available now, maybe disk full, CL: 解决方案:修改启动脚本 runbroker.sh,在里面增加一句话即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"复制
即将磁盘保护的百分比设置成 98%(默认 0.9),只有磁盘空间使用率达到 98% 时才拒绝接收 producer 消息。
4、SpringBoot 多环境配置- 不同环境使用不同配置,例如数据库配置,在开发的时候,我们一般用开发数据库,而在生产环境的时候,我们用正式的数据库
- 配置文件存放路径,可以存放在 classpath 根目录的 “/config” 包下,也可以存放在 classpath 的根目录下
- SpringBoot 允许通过命名约定按照一定的格式(application-{profile}.properties)来定义多个配置文件
## application-dev.properties test.url=dev.com复制
## application-test.properties test.url=test.com复制
## application.properties test.url=local #指定哪个profile #spring.profiles.active=dev复制
## OrderController.java package net.xdclass.base_project.controller; import net.xdclass.base_project.domain.JsonData; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/api/v1") public class OrderController { @Value("${test.url}") private String domain; /** * 功能描述:微信支付回调接口 * @param msg 支付信息 * @return */ @GetMapping("order") public Object order(String msg){ return JsonData.buildSuccess(domain); } }复制5、SpringBoot 响应式框架 WebFlux
- Spring WebFlux 是 Spring Framework 5.0 中引入的新的反应式 Web 框架,与 SpringMVC 不同,它不需要 Servlet API,完全异步和非阻塞,并通过 Reactor 实现 Reactive Streams规范。
- Flux 和 Mono 就简单业务而言:和其他普通对象差别不大,而对复杂请求业务,就可以提升性能。
-
Mono 表示的是包含 0 或者 1 个元素的异步序列,Mono
表示单一对象 User Flux 表示的是包含 0 到 N 个元素的异步序列,Flux 表示List Flux 和 Mono 之间可以进行转换。 - Spring WebFlux 有两种风格:基于功能和基于注解的。基于注解非常接近 SpringMVC 模型。
- Spring WebFlux 项目不严格依赖于 Servlet API,因此不能作为 war 文件部署,也不能使用 src/main/webapp 目录。
- Spring WebFlux 可以整合多个模板引擎,除了 REST Web 服务外,还可以使用 Spring WebFlux 提供动态 HTML 内容。Spring WebFlux 支持各种模板技术,包括Thymeleaf,FreeMarker。
- WebFlux 中,请求和响应不再是 WebMVC 中的 ServletRequest 和 ServletResponse,而是 ServerRequest 和 ServerResponse。
- pom.xml 中的依赖,如果同时存在 spring-boot-starter-web,则会优先用 spring-boot-starter-web。
## pom.xml复制4.0.0 net.xdclass base_project 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE org.springframework.boot spring-boot-starter-webflux org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
## UserService.java package net.xdclass.base_project.service; import java.util.Collection; import java.util.HashMap; import java.util.Map; import net.xdclass.base_project.domain.User; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Service public class UserService { private static final Map复制dataMap = new HashMap<>(); static{ dataMap.put("1", new User("1", "小X老师")); dataMap.put("2", new User("2", "小D老师")); dataMap.put("3", new User("3", "小C老师")); dataMap.put("4", new User("4", "小L老师")); dataMap.put("5", new User("5", "小A老师")); dataMap.put("6", new User("6", "小S老师")); dataMap.put("7", new User("7", "小S老师")); } /** * 功能描述:返回用户列表 * @return */ public Flux list(){ Collection list = UserService.dataMap.values(); return Flux.fromIterable(list); } /** * 功能描述:根据id查找用户 * @param id * @return */ public Mono getById(final String id){ return Mono.justOrEmpty(UserService.dataMap.get(id)); } /** * 功能描述:根据id删除用户 * @param id * @return */ public Mono del(final String id){ return Mono.justOrEmpty(UserService.dataMap.remove(id)); } }
## UserController.java package net.xdclass.base_project.controller; import java.time.Duration; import net.xdclass.base_project.domain.User; import net.xdclass.base_project.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController @RequestMapping("/api/v1/user") public class UserController { //@Autowired //private UserService userService; private final UserService userService; public UserController(final UserService userService) { this.userService = userService; } @GetMapping("/test") public Mono复制test(){ return Mono.just("hello 小D课堂"); } /** * 功能描述:根据id找用户 * @param id * @return */ @GetMapping("find") public Mono findByid(final String id){ return userService.getById(id); } /** * 功能描述:删除用户 * @param id * @return */ @GetMapping("del") public Mono del(final String id){ return userService.del(id); } /** * 功能描述:列表 * @return */ @GetMapping(value="list",produces=MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux list(){ return userService.list().delayElements(Duration.ofSeconds(2)); } }
- 启动方式默认是 Netty,8080端口,访问地址:http://localhost:8080/api/v1/user/test。
- SpringBoot WebFlux 响应式客户端 WebClient:
## WebClientTest.java package base_project.base; import org.junit.Test; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @RunWith(SpringRunner.class) // 底层用junit SpringJUnit4ClassRunner @SpringBootTest(classes={XdclassApplication.class}) // 启动整个springboot工程 public class WebClientTest { @Test public void testBase(){ Mono复制6、SpringBoot 服务器端主动推送 SSE 技术resp = WebClient.create() .get() // 多个参数也可以直接放到map中,参数名与placeholder对应上即可 .uri("http://localhost:8080/api/v1/user/find?id=1") .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(String.class); System.out.println(resp.block()); } @Test public void testPlaceHolder(){ Mono resp = WebClient.create() .get() // 使用占位符 .uri("http://localhost:8080/api/v1/user/find?id={id}",1) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(String.class); System.out.println(resp.block()); } }
服务器端常用推送技术介绍:Ajax 定时拉取,WebSocket,SSE 轮询
- 客户端轮询:Ajax 定时拉取
- 服务端主动推送:WebSocket
- 全双工的,本质上是一个额外的 TCP 连接,建立和关闭时握手使用 HTTP 协议,其他数据传输不使用 HTTP 协议
- 更加复杂一些,适用于需要进行复杂双向数据通讯的场景
- 服务端主动推送:SSE(Server Send Event)
- HTML5 新标准,用来从服务端实时推送数据到浏览器端
- 直接建立在当前 HTTP 连接上,本质上是保持一个 HTTP 长连接,轻量协议
- 简单的服务器数据推送的场景,使用服务器推送事件
## SSEController.java package net.xdclass.base_project.controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/sse") public class SSEController { // 需要把 response 的类型改为 text/event-stream,才是 SSE 的类型 @RequestMapping(value = "/get_data", produces = "text/event-stream;charset=UTF-8") public String push() { try { Thread.sleep(1000); //第三方数据源调用 } catch (InterruptedException e) { e.printStackTrace(); } return "data:xdclass 行情" + Math.random() + "\n\n"; } }复制
## index.html复制Insert title here 模拟股票行情xdclass test
访问地址:http://localhost:8080/index.html
7、SpringBoot 阿里云服务器生产环境部署- 去除相关生产环境不需要的 jar 包,比如热部署 dev-tool。
## pom.xml复制4.0.0 net.xdclass base_project 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-maven-plugin
- 本地 maven 打包成 jar 包
mvn clean package -Dmaven.test.skip=true 跳过测试复制
- 打包指定配置文件
- 使用 Maven 的 profiles
- 使用 SpringBoot 的 profile=active
- 服务器端安装及配置 JDK
- Linux 下使用 wget 下载 JDK8。
- 配置环境变量:
vim /etc/profile export JAVA_HOME=/usr/local/software/jdk8 export PATH=$PATH:$JAVA_HOME/bin export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME PATH CLASSPATH复制
- 使用source /etc/profile,让配置立刻生效。
- 向服务器上传 jar 包。上传工具:(windows)WinSCP、SecurityCRT、(mac)Filezilla。
- 远程登录服务器:ssh [email protected]。
- 运行 jar 包:java -jar xxxx.jar,运行方式可以是:守护进程、系统服务、shell 脚本。
- 访问路径:http://120.79.160.143:8080/api/v1/user/find。
- 服务无法访问的原因:
- 阿里云防火墙是否开启,可以选择关闭,关闭是不安全的,可以选择开放端口
- 阿里云的安全访问组,开启对应的端口,如果应用是以 80 端口启动,则默认可以访问
成熟的互联网公司应该有的架构:
- 本地提交生产代码到 gitlab仓库
- Jenkins 自动化构建
- 运维或者开发人员发布
- Spring Boot包含许多附加功能,可帮助您在将应用程序投入生产时监视和管理应用程序。 可以选择使用 HTTP 端点或 JMX 来管理和监控您的应用程序,自动应用于审计,健康和指标收集,一句话,SpringBoot 提供用于监控和管理生产环境的模块。
- pom.xml 中加入依赖
复制org.springframework.boot spring-boot-starter-actuator
- 出于安全考虑,除 /actuator/health 和 /actuator/info 之外的所有执行器默认都是禁用的。management.endpoints.web.exposure.include 属性可用于启用执行器。 在 application.properties 中加入如下配置:
#开启监控端点 management.endpoints.web.exposure.include=*复制
- 在设置 management.endpoints.web.exposure.include 之前,要确保暴露的执行器不包含敏感信息,或通过将其放置在防火墙进行控制,不对外进行使用。禁用的端点将从应用程序上下文中完全删除,可以只暴露部分访问接口:
management.endpoints.web.exposure.include=* // 开启全部 management.endpoints.web.exposure.include=metrics // 开启某个 management.endpoints.web.exposure.exclude=metrics // 关闭某个复制
- 或者可以使用 Spring-Boot-Admin 进行管理,相关资料:https://www.cnblogs.com/ityouknow/p/8440455.html。
- 或者自己编写脚本监控 CPU、内存、磁盘使用状况和 Nginx 服务器的 HTTP 响应状态码:200,404,5xx。
- 常用的 URL: /actuator/health:查看应用健康指标 /actuator/metrics:查看应用基本指标列表 /actuator/metrics/{name}:通过上述列表查看具体指标 /actuator/env:显示来自 Spring 的 ConfigurableEnvironment 属性值