返回

SpringBoot与ElasticSearch、ActiveMQ、RocketMQ的整合及多环境配置、响应式框架WebFlux、服务器端主动推送SSE技术、生产环境部署、Actuator监控平台

1、SpringBoot 与 ElasticSearch 框架的整合

(1)主要的搜索框架:MySQL、Solr、ElasticSearch

  • MySQL:使用 like 进行模糊查询,存在性能问题
  • Solr:底层使用 Lucene,适用于中小规模数据量场景
  • ElasticSearch:适用于数据量特别大的场景,PB、TB 级别。使用纯 Java开发,ElasticSearch 从 4 版本升级到 5 版本改动较大,但是 5 版本后,改动不大

(2)ElasticSearch 主要特点

  • 全文检索、 结构化检索
  • 数据统计、分析
  • 接近实时处理,分布式搜索,可部署数百台服务器,处理 PB 级别的数据
  • 搜索纠错,自动完成
  • 适用场景:日志搜索,数据聚合,数据监控,报表统计分析
  • 应用实例:维基百科、Stack Overflow、GitHub

(3)ElasticSearch 6.x 新特性

  • 6.2.x 版本基于 Lucene 7.x,更快,性能进一步提升。其中对应的序列化组件,升级到 Jackson 2.8。6.x 版本不再支持一个索引库里面多个type,所以一个 index 索引库只能存在 1 个 type。MySQL 与 ElasticSearch 的对应关系如下:
MySQL:			database	table					record
ElasticSearch:	index		type(只能存在一个)    	document
复制
  • 推荐使用 5.0 版本推出的 Java REST/HTTP 客户端,依赖少,比 Transport 使用更方便,在基准测试中,性能并不输于 Transport 客户端。推荐使用这种方式进行开发使用,在节点故障和特定响应代码的情况下进行故障转移,失败的连接处罚(失败的节点是否重试取决于失败的连续次数,失败的次数越多,客户端再次尝试同一节点之前等待的时间越长)。

(4)快速部署 ElasticSearch 5.6.x

  1. 配置 JDK 1.8
  2. 使用 wget 下载 ElasticSearch 安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.8.tar.gz
复制
  1. 解压安装包:
tar -zxvf elasticsearch-5.6.8.tar.gz
复制
  1. 外网访问配置:
~]$ vim elasticsearch/conf/elasticsearch.yml

network.host: 0.0.0.0
复制
  1. 报错一:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 986513408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/software/temp/elasticsearch-6.2.2/hs_err_pid1912.log
复制

解决方案:内存不够,购买阿里云、腾讯云,亚马逊云的机器可以动态增加内存

  1. 报错二:
[root@iZwz95j86y235aroi85ht0Z bin]# ./elasticsearch
[2018-02-22T20:14:04,870][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root
	at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:125) ~[elasticsearch-6.2.2.jar:6.2.2]
	at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112) ~[elasticsearch-6.2.2.jar:6.2.2]
	at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.2.2.jar:6.2.2]
	at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.2.2.jar:6.2.2]
复制

解决方案:只能使用非 root 用户运行 ElasticSearch,需要添加普通用户:

useradd -m es
passwd es
复制
  1. 报错三:
~]$ ./elasticsearch
Exception in thread "main" java.nio.file.AccessDeniedException: /usr/local/software/temp/elasticsearch-6.2.2/config/jvm.options
复制

解决方案:ElasticSearch 目录权限不够:

chmod 777 -R 当前es目录
复制
  1. 集群测试 测试工具:Postman 工具 查看集群状态:localhost:9200/_cat/health?v 查看索引列表:localhost:9200/_cat/indices?v

(5)SpringBoot 整合 ElasticSearch

  1. pom.xml 文件中添加 Maven 依赖
  
	org.springframework.boot  
	spring-boot-starter-data-elasticsearch  
复制
  1. application.properties 配置文件
spring.data.elasticsearch.cluster-name=elasticsearch
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true
复制
  1. Article.java
package net.xdclass.base_project.domain;

import java.io.Serializable;
import org.springframework.data.elasticsearch.annotations.Document;

/**
 * 功能描述:文章对象
 */
// "blog" 与 "article" 都要小写
@Document(indexName = "blog", type = "article")
public class Article implements Serializable{

	private static final long serialVersionUID = 1L;
	private long id;
	private String title;
	private String summary;
	private String content;
	private int pv;

	public long getId() {
		return id;
	}
	public void setId(long id) {
		this.id = id;
	}
	public String getTitle() {
		return title;
	}
	public void setTitle(String title) {
		this.title = title;
	}
	public String getSummary() {
		return summary;
	}
	public void setSummary(String summary) {
		this.summary = summary;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	public int getPv() {
		return pv;
	}
	public void setPv(int pv) {
		this.pv = pv;
	}
}
复制
  1. ArticleRepository.java
package net.xdclass.base_project.repository;

import net.xdclass.base_project.domain.Article;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Repository;

@Component 
//@Repository
public interface ArticleRepository extends ElasticsearchRepository {

}
复制
  1. ArticleController.java
package net.xdclass.base_project.controller;

import net.xdclass.base_project.domain.Article;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.repository.ArticleRepository;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/article")
public class ArticleController {

	@Autowired
	private ArticleRepository articleRepository;
	
	@GetMapping("save")
	public Object save(long id,String title){
		Article article = new Article();
		article.setId(id);
		article.setPv(123);
		article.setContent("springboot整合elasticsearch,这个是新版本 2018年录制");
		article.setTitle(title);
		article.setSummary("搜索框架整合");
		articleRepository.save(article);
		return JsonData.buildSuccess();
	}
	
	@GetMapping("search")
	public Object search(String title){
		// 搜索全部文档
		// QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
		// 单个匹配,搜索标题为 title 的文档
		QueryBuilder queryBuilder = QueryBuilders.matchQuery("title", title); 
		Iterable
list = articleRepository.search(queryBuilder); return JsonData.buildSuccess(list); } }
复制
  1. 查看 ElasticSearch 中存放的数据: 查看索引信息:http://localhost:9200/_cat/indices?v 查看某个索引库结构:http://localhost:9200/blog 查看某个对象:http://localhost:9200/blog/article/1
2、SpringBoot 与 ActiveMQ 的整合

(1)JMS(Java Message Service,Java 消息服务),Java 平台中关于面向消息中间件的接口,是一种与厂商无关的 API,用来访问消息、收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系型数据库的接口。 示例:微信支付加消息队列:

  1. 基本概念:
  • JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
  • JMS生产者(Message Producer)
  • JMS消费者(Message Consumer)
  • JMS消息
  • JMS队列
  • JMS主题
  1. 编程模型(MQ中需要用的一些类):
  • ConnectionFactory:连接工厂,JMS 用它创建连接
  • Connection:JMS 客户端到 JMS Provider 的连接
  • Session:一个发送或接收消息的线程
  • Destination:消息的目的地,消息发送给谁.
  • MessageConsumer / MessageProducer:消息的接收者,消费者
  • JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
  • 点对点类型:
  • 发布/订阅类型:
  1. 特点: 跨平台、多语言、多项目、解耦、分布式事务、流量控制、最终一致性、RPC调用(上下游对接,数据源变动后通知下游)

(2)ActiveMQ 5.x 消息队列

  • ActiveMQ 5.x 特点:
  1. 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
  2. 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
  3. 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
  4. Spring支持,ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
  5. 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
  6. 使用JDBC和高性能日志支持非常快速的持久化
  • ActiveMQ 5.x 安装:
  1. 下载地址:http://activemq.apache.org/activemq-5153-release.html
  2. 快速开始:http://activemq.apache.org/getting-started.html
  3. 如果是32位的机器,就双击 win32 目录下的 activemq.bat,如果是 64 位机器,则双击 win64 目录下的 activemq.bat
  4. bin 目录里面启动,选择对应的系统版本和位数,启动命令:activeMQ start
  5. 启动后访问路径:http://127.0.0.1:8161/
  6. 用户名和密码默认都是 admin
  • ActiveMQ 5.x 面板:
  1. Name:队列名称
  2. Number Of Pending Messages:等待消费的消息个数
  3. Number Of Consumers:当前连接的消费者数目
  4. Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减
  5. Messages Dequeued:已经消费的消息数量

(3)SpringBoot 整合 ActiveMQ 之点对点消息和发布订阅模式

  • pom.xml 中加入 Maven 依赖
## pom.xml


  4.0.0
  net.xdclass
  base_project
  0.0.1-SNAPSHOT
  
	
		org.springframework.boot
		spring-boot-starter-parent
		2.0.1.RELEASE
	
	  
	
		
			org.springframework.boot
			spring-boot-starter-web
		
		
			org.springframework.boot
			spring-boot-devtools
			true
		
		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
		  
            org.springframework.boot  
            spring-boot-starter-activemq  
        
        
          
            org.apache.activemq  
            activemq-pool  
        
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	
复制
  • application.properties 配置
## application.properties

# 整合jms测试,安装在别的机器,防火墙和端口号记得开放
spring.activemq.broker-url=tcp://127.0.0.1:61616

# 集群配置
# spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)

spring.activemq.user=admin
spring.activemq.password=admin
# 下列配置要增加依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

# default point to point
# spring.jms.pub-sub-domain=true
复制
  • 各部分代码
## XdclassApplication.java

package net.xdclass.base_project;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@SpringBootApplication
@EnableJms				// 开启支持 JMS
public class XdclassApplication {

	@Bean
	public Topic topic(){
		return new ActiveMQTopic("video.topic");
	}
	
	@Bean
	public Queue queue(){
		return new ActiveMQQueue("common.queue");
	}

	@Bean
	public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
	}
	
	public static void main(String[] args) {
		SpringApplication.run(XdclassApplication.class, args);
	}
}
复制
## CommonConsumer.java

package net.xdclass.base_project.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CommonConsumer {

	@JmsListener(destination="common.queue")
	public void receiveQueue(String text){
		System.out.println("CommonConsumer收到的报文为:"+text);
	}
}
复制
## OrderConsumer.java

package net.xdclass.base_project.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer {

	@JmsListener(destination="order.queue")
	public void receiveQueue(String text){
		System.out.println("OrderConsumer收到的报文为:"+text);
	}
}
复制
## TopicSub.java

package net.xdclass.base_project.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicSub {

	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
	public void receive1(String text){
		System.out.println("video.topic 消费者:receive1="+text);
	}
	
	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
	public void receive2(String text){
		System.out.println("video.topic 消费者:receive2="+text);
	}
	
	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
	public void receive3(String text){
		System.out.println("video.topic 消费者:receive3="+text);
	}
}
复制
## ProducerService.java

package net.xdclass.base_project.service;

import javax.jms.Destination;

/**
 * 功能描述:消息生产
 */
public interface ProducerService {

	/**
	 * 功能描述:指定消息队列,还有消息
	 * @param destination
	 * @param message
	 */
	public void sendMessage(Destination destination, final String message);
	
	/**
	 * 功能描述:使用默认消息队列, 发送消息
	 * @param message
	 */
	public void sendMessage( final String message);
	
	/**
	 * 功能描述:消息发布者
	 * @param msg
	 */
	public void publish(String msg);
}
复制
## ProducerServiceImpl.java

package net.xdclass.base_project.service.impl;

import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import net.xdclass.base_project.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

/**
 * 功能描述:消息生产者
 */
@Service
public class ProducerServiceImpl implements ProducerService{

	@Autowired
	private Queue queue;
	
	// 用来发送消息到broker的对象
	@Autowired
	private JmsMessagingTemplate jmsTemplate;
	
	// 发送消息,destination是发送到的队列,message是待发送的消息
	@Override
	public void sendMessage(Destination destination, String message) {
		jmsTemplate.convertAndSend(destination, message);
	}
	
	// 发送消息,destination是发送到的队列,message是待发送的消息
	@Override
	public void sendMessage(final String message) {
		jmsTemplate.convertAndSend( message);
	}

	//=======发布订阅相关代码=========
	@Autowired
	private Topic topic;
	
	 @Override
	public void publish(String msg) {
		this.jmsTemplate.convertAndSend(this.topic, msg);
	}
}
复制
## OrderController.java

package net.xdclass.base_project.controller;

import javax.jms.Destination;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.service.ProducerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 功能描述:模拟微信支付回调
 */
@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Autowired
	private ProducerService producerService;
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg){
		Destination destination = new ActiveMQQueue("order.queue");
		producerService.sendMessage(destination, msg);
       	return JsonData.buildSuccess();
	}
	
	@GetMapping("common")
	public Object common(String msg){
		producerService.sendMessage(msg);	
       return JsonData.buildSuccess();
	}
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("comment")
	public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
      // 创建一个消息实例,包含 topic、tag 和 消息体
      Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
      //同步的方式,会有返回结果,发送的是普通消息
      SendResult result = msgProducer.getProducer().send(message);
      System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
      return JsonData.buildSuccess();
	}
}
复制
  • 模拟请求:http://localhost:8080/api/v1/order?msg=12312321321312
3、SpringBoot 与 RocketMQ 的整合

(1)RocketMQ Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。 特点:

  • 在高压下 1 毫秒内的响应延迟超过99.6%
  • 适合金融类业务,高可用性跟踪和审计功能
  • 支持发布订阅模式和点对点模式
  • 支持拉 pull 和推 push 两种消息模式
  • 单一队列支持百万消息访问量
  • 支持单 master 节点,多 master 节点,多 master 多 slave 节点

概念:

  • Producer:消息生产者
  • Producer Group:消息生产者组,发送同类消息的一个消息生产者组
  • Consumer:消费者
  • Consumer Group:消费同个消息的多个实例
  • Tag:标签,子主题(二级分类),用于区分同一个主题下的不同业务的消息
  • Topic:主题
  • Message:消息
  • Broker:MQ程序,接收生产的消息,提供给消费者消费的程序
  • Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现和路由

官网地址:http://rocketmq.apache.org/。

(2)RocketMQ 本地部署

  • 解压压缩包
  • 进入 bin 目录,启动 namesrv:
nohup sh mqnamesrv &
复制
  • 查看日志:
tail -f nohup.out
复制

结尾:The Name Server boot success. serializeType=JSON 表示启动成功。

  • 启动 broker:
nohup sh mqbroker -n 127.0.0.1:9876 &
复制
  • 关闭 nameserver broker:
sh mqshutdown namesrv
sh mqshutdown broker
复制

(3)RocketMQ 可视化控制台

  • 下载源码:https://github.com/apache/rocketmq-externals
  • 编译打包:
mvn clean package -Dmaven.test.skip=true
复制
  • target 目录通过java -jar的方式运行
  • 如果无法连接获取 broker 的信息,则修改配置文件中 namesrvAddr 为本地 IP 地址及端口号,并在程序中 src/main/resources/application.properties 添加如下配置:
rocketmq.config.namesrvAddr=192.168.0.101:9876
复制
  • 访问地址:http://localhost:8080
  • 注意:在阿里云,腾讯云或者虚拟机,记得检查端口号和防火墙是否启动

(4)SpringBoot 整合 RocketMQ

## pom.xml


  4.0.0
  net.xdclass
  base_project
  0.0.1-SNAPSHOT
  
	
		org.springframework.boot
		spring-boot-starter-parent
		2.0.1.RELEASE
	

	
		4.1.0-incubating
	
	  
	
		
			org.springframework.boot
			spring-boot-starter-web
		
		
			org.springframework.boot
			spring-boot-devtools
			true
		
		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
		  
	    	org.apache.rocketmq  
	    	rocketmq-client  
	    	${rocketmq.version}  
		  
		  
	    	org.apache.rocketmq  
	    	rocketmq-common  
	    	${rocketmq.version}  
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	
复制
## application.properties

#通过触发器,去控制什么时候进行热加载部署新的文件
spring.devtools.restart.trigger-file=trigger.txt

#自定义启动banner文件的路径
spring.banner.location=banner.txt

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
复制
## MsgProducer.java

package net.xdclass.base_project.jms;

import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MsgProducer {
	 // 生产者的组名
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    // NameServer 地址
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private  DefaultMQProducer producer ;
    	
    public DefaultMQProducer getProducer(){
    	return this.producer;
    }
        
    @PostConstruct
    public void init() {
        //生产者的组名
    	producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
    	//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876"); 
        producer.setNamesrvAddr(namesrvAddr);        
        producer.setVipChannelEnabled(false);
        
        try {
            // Producer对象在使用之前必须要调用start初始化,只能初始化一次
            producer.start();
        } catch (Exception e) {
            e.printStackTrace();
        } 
        
        // producer.shutdown();  一般在应用上下文,关闭的时候进行关闭,用上下文监听器
    }
}
复制
## MsgConsumer.java

package net.xdclass.base_project.jms;

import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MsgConsumer {
    // 消费者的组名
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    // NameServer 地址
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        
        try {
            //设置consumer所订阅的Topic和Tag,*代表全部的Tag
            consumer.subscribe("testTopic", "*");
          
            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                       
            //MessageListenerOrderly 这个是有序的
            //MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {           
                        System.out.println("messageExt: " + messageExt);//输出消息内容
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);                        
                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });            
            
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制
## OrderController.java

package net.xdclass.base_project.controller;

import java.io.UnsupportedEncodingException;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.jms.MsgProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 功能描述:模拟微信支付回调
 */
@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Autowired
	private MsgProducer msgProducer;
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @param tag 消息二级分类
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
	   // 创建一个消息实例,包含 topic、tag 和 消息体
       Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));       
       SendResult result = msgProducer.getProducer().send(message);       
       System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());     
       return JsonData.buildSuccess();
	}

	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("comment")
	public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
	  // 创建一个消息实例,包含 topic、tag 和 消息体
      Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));      
      //同步的方式,会有返回结果,发送的是普通消息
      SendResult result = msgProducer.getProducer().send(message);      
      System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());    
      return JsonData.buildSuccess();
	}
}
复制
  • 报错一:org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local] 解决方案:多网卡问题处理。
  1. 在 producer 部分设置:
producer.setVipChannelEnabled(false);
复制
  1. 编辑 RocketMQ 配置文件:broker.conf(下列 IP 为自己的 IP)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
复制
  • 报错二:DESC: service not available now, maybe disk full, CL: 解决方案:修改启动脚本 runbroker.sh,在里面增加一句话即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
复制

即将磁盘保护的百分比设置成 98%(默认 0.9),只有磁盘空间使用率达到 98% 时才拒绝接收 producer 消息。

4、SpringBoot 多环境配置
  • 不同环境使用不同配置,例如数据库配置,在开发的时候,我们一般用开发数据库,而在生产环境的时候,我们用正式的数据库
  • 配置文件存放路径,可以存放在 classpath 根目录的 “/config” 包下,也可以存放在 classpath 的根目录下
  • SpringBoot 允许通过命名约定按照一定的格式(application-{profile}.properties)来定义多个配置文件
## application-dev.properties

test.url=dev.com
复制
## application-test.properties

test.url=test.com
复制
## application.properties

test.url=local

#指定哪个profile
#spring.profiles.active=dev
复制
## OrderController.java

package net.xdclass.base_project.controller;

import net.xdclass.base_project.domain.JsonData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Value("${test.url}")
	private String domain;

	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg){
       return JsonData.buildSuccess(domain);
	}	
}
复制5、SpringBoot 响应式框架 WebFlux
  • Spring WebFlux 是 Spring Framework 5.0 中引入的新的反应式 Web 框架,与 SpringMVC 不同,它不需要 Servlet API,完全异步和非阻塞,并通过 Reactor 实现 Reactive Streams规范。
  • Flux 和 Mono 就简单业务而言:和其他普通对象差别不大,而对复杂请求业务,就可以提升性能。
  • Mono 表示的是包含 0 或者 1 个元素的异步序列,Mono表示单一对象 User Flux 表示的是包含 0 到 N 个元素的异步序列,Flux表示ListFlux 和 Mono 之间可以进行转换。
  • Spring WebFlux 有两种风格:基于功能和基于注解的。基于注解非常接近 SpringMVC 模型。
  • Spring WebFlux 项目不严格依赖于 Servlet API,因此不能作为 war 文件部署,也不能使用 src/main/webapp 目录。
  • Spring WebFlux 可以整合多个模板引擎,除了 REST Web 服务外,还可以使用 Spring WebFlux 提供动态 HTML 内容。Spring WebFlux 支持各种模板技术,包括Thymeleaf,FreeMarker。
  • WebFlux 中,请求和响应不再是 WebMVC 中的 ServletRequest 和 ServletResponse,而是 ServerRequest 和 ServerResponse。
  • pom.xml 中的依赖,如果同时存在 spring-boot-starter-web,则会优先用 spring-boot-starter-web。
## pom.xml


  4.0.0
  net.xdclass
  base_project
  0.0.1-SNAPSHOT
  
	
		org.springframework.boot
		spring-boot-starter-parent
		2.0.1.RELEASE
	  
				
		
		
			org.springframework.boot
			spring-boot-starter-webflux
		
		
			org.springframework.boot
			spring-boot-starter-test
			test
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	
复制
## UserService.java

package net.xdclass.base_project.service;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import net.xdclass.base_project.domain.User;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class UserService {
	
	private static final Map dataMap = new HashMap<>();
	
	static{
		dataMap.put("1", new User("1", "小X老师"));
		dataMap.put("2", new User("2", "小D老师"));
		dataMap.put("3", new User("3", "小C老师"));
		dataMap.put("4", new User("4", "小L老师"));
		dataMap.put("5", new User("5", "小A老师"));
		dataMap.put("6", new User("6", "小S老师"));
		dataMap.put("7", new User("7", "小S老师"));
	}
	
	/**
	 * 功能描述:返回用户列表
	 * @return
	 */
	public Flux list(){
		Collection list = UserService.dataMap.values();
		return Flux.fromIterable(list);
	}
	
	/**
	 * 功能描述:根据id查找用户
	 * @param id
	 * @return
	 */
	public Mono getById(final String id){
		return Mono.justOrEmpty(UserService.dataMap.get(id));
	}
	
   /**
    * 功能描述:根据id删除用户
    * @param id
    * @return
    */
	public Mono del(final String id){
		return Mono.justOrEmpty(UserService.dataMap.remove(id));
	}
}
复制
## UserController.java

package net.xdclass.base_project.controller;

import java.time.Duration;
import net.xdclass.base_project.domain.User;
import net.xdclass.base_project.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/api/v1/user")
public class UserController {
	
	//@Autowired
	//private UserService userService;
	
	private final UserService userService;
	
	public UserController(final UserService userService) {
		this.userService = userService;
	}
	
	@GetMapping("/test")
	public Mono test(){
		return Mono.just("hello 小D课堂");
	}
	
	/**
	 * 功能描述:根据id找用户
	 * @param id
	 * @return
	 */
	@GetMapping("find")
	public Mono findByid(final String id){
		return userService.getById(id);
	}
	
	/**
	 * 功能描述:删除用户
	 * @param id
	 * @return
	 */
	@GetMapping("del")
	public Mono del(final String id){
		return userService.del(id);
	}
	
	/**
	 * 功能描述:列表
	 * @return
	 */
	@GetMapping(value="list",produces=MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux list(){
		return userService.list().delayElements(Duration.ofSeconds(2));
	}
}
复制
  • 启动方式默认是 Netty,8080端口,访问地址:http://localhost:8080/api/v1/user/test。
  • SpringBoot WebFlux 响应式客户端 WebClient:
## WebClientTest.java

package base_project.base;

import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

@RunWith(SpringRunner.class)  							// 底层用junit  SpringJUnit4ClassRunner
@SpringBootTest(classes={XdclassApplication.class})		// 启动整个springboot工程
public class WebClientTest {

	@Test
  	public void testBase(){
    	Mono resp = WebClient.create()
        	.get()
        	// 多个参数也可以直接放到map中,参数名与placeholder对应上即可
        	.uri("http://localhost:8080/api/v1/user/find?id=1")
        	.accept(MediaType.APPLICATION_JSON)
        	.retrieve()
        	.bodyToMono(String.class);
    	System.out.println(resp.block());
  	}
	
	@Test
 	public void testPlaceHolder(){
    	Mono resp = WebClient.create()
        	.get()
        	// 使用占位符
        	.uri("http://localhost:8080/api/v1/user/find?id={id}",1) 
        	.accept(MediaType.APPLICATION_JSON)
        	.retrieve()
        	.bodyToMono(String.class);
    	System.out.println(resp.block());	 
	}
}
复制6、SpringBoot 服务器端主动推送 SSE 技术

服务器端常用推送技术介绍:Ajax 定时拉取,WebSocket,SSE 轮询

  1. 客户端轮询:Ajax 定时拉取
  2. 服务端主动推送:WebSocket
  • 全双工的,本质上是一个额外的 TCP 连接,建立和关闭时握手使用 HTTP 协议,其他数据传输不使用 HTTP 协议
  • 更加复杂一些,适用于需要进行复杂双向数据通讯的场景
  1. 服务端主动推送:SSE(Server Send Event)
  • HTML5 新标准,用来从服务端实时推送数据到浏览器端
  • 直接建立在当前 HTTP 连接上,本质上是保持一个 HTTP 长连接,轻量协议
  • 简单的服务器数据推送的场景,使用服务器推送事件
## SSEController.java

package net.xdclass.base_project.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/sse")
public class SSEController {
	// 需要把 response 的类型改为 text/event-stream,才是 SSE 的类型
    @RequestMapping(value = "/get_data", produces = "text/event-stream;charset=UTF-8")
    public String push() {
    	  
          try {
              Thread.sleep(1000); 
              //第三方数据源调用
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          return "data:xdclass 行情" + Math.random() + "\n\n";
    }
}
复制
## index.html



	
		
		Insert title here
		  
	
	
		模拟股票行情
 		
xdclass test
复制

访问地址:http://localhost:8080/index.html

7、SpringBoot 阿里云服务器生产环境部署
  1. 去除相关生产环境不需要的 jar 包,比如热部署 dev-tool。
## pom.xml


  4.0.0
  net.xdclass
  base_project
  0.0.1-SNAPSHOT
  
	
		org.springframework.boot
		spring-boot-starter-parent
		2.0.1.RELEASE
	
	  
	
		
			org.springframework.boot
			spring-boot-starter-web
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	
复制
  1. 本地 maven 打包成 jar 包
mvn clean package -Dmaven.test.skip=true 跳过测试
复制
  1. 打包指定配置文件
  • 使用 Maven 的 profiles
  • 使用 SpringBoot 的 profile=active
  1. 服务器端安装及配置 JDK
  • Linux 下使用 wget 下载 JDK8。
  • 配置环境变量:
vim /etc/profile
	
export JAVA_HOME=/usr/local/software/jdk8
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
复制
  • 使用source /etc/profile,让配置立刻生效。
  1. 向服务器上传 jar 包。上传工具:(windows)WinSCP、SecurityCRT、(mac)Filezilla。
  2. 远程登录服务器:ssh [email protected]
  3. 运行 jar 包:java -jar xxxx.jar,运行方式可以是:守护进程、系统服务、shell 脚本。
  4. 访问路径:http://120.79.160.143:8080/api/v1/user/find。
  5. 服务无法访问的原因:
  • 阿里云防火墙是否开启,可以选择关闭,关闭是不安全的,可以选择开放端口
  • 阿里云的安全访问组,开启对应的端口,如果应用是以 80 端口启动,则默认可以访问

成熟的互联网公司应该有的架构:

  1. 本地提交生产代码到 gitlab仓库
  2. Jenkins 自动化构建
  3. 运维或者开发人员发布
8、SpringBoot Actuator 监控平台
  • Spring Boot包含许多附加功能,可帮助您在将应用程序投入生产时监视和管理应用程序。 可以选择使用 HTTP 端点或 JMX 来管理和监控您的应用程序,自动应用于审计,健康和指标收集,一句话,SpringBoot 提供用于监控和管理生产环境的模块。
  • pom.xml 中加入依赖

  
    org.springframework.boot  
    spring-boot-starter-actuator  
复制
  • 出于安全考虑,除 /actuator/health 和 /actuator/info 之外的所有执行器默认都是禁用的。management.endpoints.web.exposure.include 属性可用于启用执行器。 在 application.properties 中加入如下配置:
#开启监控端点
management.endpoints.web.exposure.include=*
复制
  • 在设置 management.endpoints.web.exposure.include 之前,要确保暴露的执行器不包含敏感信息,或通过将其放置在防火墙进行控制,不对外进行使用。禁用的端点将从应用程序上下文中完全删除,可以只暴露部分访问接口:
management.endpoints.web.exposure.include=*				// 开启全部
management.endpoints.web.exposure.include=metrics		// 开启某个
management.endpoints.web.exposure.exclude=metrics		// 关闭某个
复制
  • 或者可以使用 Spring-Boot-Admin 进行管理,相关资料:https://www.cnblogs.com/ityouknow/p/8440455.html。
  • 或者自己编写脚本监控 CPU、内存、磁盘使用状况和 Nginx 服务器的 HTTP 响应状态码:200,404,5xx。
  • 常用的 URL: /actuator/health:查看应用健康指标 /actuator/metrics:查看应用基本指标列表 /actuator/metrics/{name}:通过上述列表查看具体指标 /actuator/env:显示来自 Spring 的 ConfigurableEnvironment 属性值
9、后端开发技术栈

相关知识