万字长文!手把手教您轻松实现单租户到SaaS架构的升级!
在以往的项目当中,刚开始可能因为业务量需求而只支持单个客户或者说租户使用。但随着业务量的增长及客户的增加。问题也随之而来。往往表现的问题有:
成本较高:单租户项目需要为每个租户提供独立的软件实例和基础设施,因此需要更多的硬件和软件资源,导致成本较高。
维护困难:随着租户数量的增加,单租户项目的维护将变得越来越困难。每个租户都有自己的定制需求,需要对每个租户的软件进行定制和维护,这需要大量的人力和时间资源。
扩展性差:单租户项目在面对大量租户时,可能会遇到性能和可扩展性问题。每个租户都会对系统产生负载,导致系统的性能下降。
数据隔离差:由于所有租户共享同一个软件实例和基础设施,因此数据隔离性较差。每个租户都可以访问其他租户的数据,这增加了数据泄露和安全风险。
定制化程度低:单租户项目通常只能提供一些通用的功能和配置,难以满足每个租户的独特需求。如果需要进行大量的定制化开发,可能需要耗费大量的时间和资源
那么如何让同一套代码适应不同租户的需求,同时保持系统的稳定性和高效性以及在原有代码上以改动(包括redis,rocketmq的多租户支持)最小的方式解决这个问题呢?
目前有A客户在使用当前项目,且有业务为用户查询业务,项目当前结构为:
study
|-- study-api #内部调用api模块
|-- study-service #内部服务模块
|-- study-web #对外接口模块
|-- study-common #公共包
当前有数据库study_tenant1,库中有表class_user_info表
CREATE TABLE `class_user_info` (
`id` bigint(20) NOT NULL,
`user_name` varchar(30) NOT NULL,
`password` varchar(10) NOT NULL,
`user_type` char(1) NOT NULL,
`mobile` varchar(11) NOT NULL,
`salt` varchar(30) NOT NULL,
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`created_by` varchar(30) NOT NULL,
`updated_by` varchar(30) NOT NULL,
`updated_time` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
新增study_tenant2作为客户B的库。库表结构与study_tenant1一致. 并新增租户公共库study_common。公共库新增表结构如下:
-- 租户表
CREATE TABLE `study_tenant` (
`id` bigint(20) NOT NULL,
`tenant_name` varchar(20) NOT NULL,
`tenant_code` varchar(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 租户域名表
CREATE TABLE `study_tenant_domain` (
`id` bigint(20) NOT NULL,
`tenant_code` varchar(20) NOT NULL,
`domain` varchar(100) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
新增study-datasrouce模块相关处理
study
|-- study-api #内部调用api模块
|-- study-service #内部服务模块
|-- study-web #对外接口模块
|-- study-common #公共包
|-- study-datasource # 多数据源处理包
在新增配置类前,需要现将原有单租户nacos配置进行改动,改动如下:
spring:
redis:
study_common:
host: 127.0.0.1
port: 6379
database: 1
study_tenant1:
host: 127.0.0.1
port: 6379
database: 2
study_tenant2:
host: 127.0.0.1
port: 6379
database: 3
datasource:
study_common:
url: jdbc:mysql://192.168.0.168:3306/study_common?usseUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true&allowMultiQueries=true&serverTimezone=GMT%2B8
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
hikari:
connection-init-sql: SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci
minimum-idle: 10
maximum-pool-size: 15
idle-timeout: 30000
max-lifetime: 180000
connection-timeout: 30000
connection-test-query: select 'connected'
jackson:
date-format: yyyy-MM-dd HH:mm:ss
study-tenant1:
url: jdbc:mysql://192.168.0.168:3306/study_tenant1?usseUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true&allowMultiQueries=true&serverTimezone=GMT%2B8
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
hikari:
connection-init-sql: SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci
minimum-idle: 10
maximum-pool-size: 15
idle-timeout: 30000
max-lifetime: 180000
connection-timeout: 30000
connection-test-query: select 'connected'
jackson:
date-format: yyyy-MM-dd HH:mm:ss
study_tenant2:
url: jdbc:mysql://192.168.0.168:3306/study_tenant2?usseUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true&allowMultiQueries=true&serverTimezone=GMT%2B8
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
hikari:
connection-init-sql: SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci
minimum-idle: 10
maximum-pool-size: 15
idle-timeout: 30000
max-lifetime: 180000
connection-timeout: 30000
connection-test-query: select 'connected'
jackson:
date-format: yyyy-MM-dd HH:mm:ss
rocketmq:
tenant:
study_common: 127.0.0.1:9876
study_tenant1: 127.0.0.1:9876
study_tenant2: 127.0.0.1:9876
package com.jw.config.mysql;
import com.jw.DataSourceHolder;
import com.jw.utils.DsUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import java.util.Map;
@Configuration
@DependsOn("dsInit")
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
Object o = DataSourceHolder.peek();
return o==null?"":o;
}
public DynamicDataSource() {
Map<Object,Object> map = DsUtils.getDsMap();
super.setTargetDataSources(map);
}
}
数据源加载配置类
package com.jw.config.mysql;
import cn.hutool.crypto.SecureUtil;
import com.alibaba.nacos.common.utils.MapUtil;
import com.jw.properties.MultiDbProperties;
import com.jw.properties.MultiTenantDbProperties;
import com.jw.utils.DsUtils;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.annotation.Resource;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.Charset;
import java.util.*;
/**
* @author :j
* @date :Created in 2021/4/10 16:29
* @description:
*/
@Configuration
@EnableConfigurationProperties(MultiTenantDbProperties.class)
public class DataSourceConfiguration {
@Resource
private MultiTenantDbProperties multiTenantDbProperties;
@Bean("dsInit")
public void setDataSource() throws Exception {
try {
Map<String, MultiDbProperties> map = multiTenantDbProperties.getDatasource();
if (MapUtil.isNotEmpty(map)) {
Iterator<String> iterator = map.keySet().iterator();
while(iterator.hasNext()){
String key = iterator.next();
MultiDbProperties multiDbProperties = map.get(key);
HikariDataSource hikariDataSource = getDataSource(multiDbProperties);
if (Objects.equals(key, multiTenantDbProperties.getCommon_db())) {
DsUtils.put("", hikariDataSource);
}else {
DsUtils.put(key, hikariDataSource);
}
}
}
} catch (Exception e) {
throw new Exception("数据源加载出错");
}
}
public HikariDataSource getDataSource(MultiDbProperties multiDbProperties ) {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setJdbcUrl(multiDbProperties.getUrl());
hikariDataSource.setUsername(multiDbProperties.getUsername());
String password = multiDbProperties.getPassword();
hikariDataSource.setPassword(password);
hikariDataSource.setConnectionInitSql(multiDbProperties.getHikari().getConnectionInitSql());
hikariDataSource.setMinimumIdle(multiDbProperties.getHikari().getMinimumIdle());
hikariDataSource.setMaximumPoolSize(multiDbProperties.getHikari().getMaximumPoolSize());
hikariDataSource.setIdleTimeout(multiDbProperties.getHikari().getIdleTimeout());
hikariDataSource.setMaxLifetime(multiDbProperties.getHikari().getMaxLifetime());
hikariDataSource.setConnectionTimeout(multiDbProperties.getHikari().getConnectionTimeout());
hikariDataSource.setConnectionTestQuery(multiDbProperties.getHikari().getConnectionTestQuery());
return hikariDataSource;
}
}
新增redis多租户支持配置类
package com.jw.config.redis;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import com.jw.properties.MultiRedisProperties;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.annotation.Resource;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.Map;
@Configuration
@EnableConfigurationProperties(MultiRedisProperties.class)
public class RedisCustomizedConfiguration {
@Resource
private DataSource dataSource;
/**
* @param multiRedisProperties
* @return
*/
@Bean("multiRedisLettuceConnectionFactory")
public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(MultiRedisProperties multiRedisProperties) throws Exception{
//读取配置
Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newConcurrentMap();
Map<String, RedisProperties> multi = multiRedisProperties.getRedis();
if(MapUtil.isNotEmpty(multi)) {
try {
Iterator<String> iterator = multi.keySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
RedisProperties redisProperties = multi.get(key);
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
configuration.setHostName(redisProperties.getHost());
configuration.setPort(redisProperties.getPort());
configuration.setDatabase(redisProperties.getDatabase());
if (StrUtil.isNotEmpty(redisProperties.getPassword())) {
configuration.setPassword(redisProperties.getPassword());
}
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration);
connectionFactory.afterPropertiesSet();
// 配置redisTemplate
connectionFactoryMap.put(key, connectionFactory);
}
} catch (Exception e) {
throw new Exception(e);
}
}
return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);
}
}
package com.jw.config.redis;
import com.alibaba.cloud.commons.lang.StringUtils;
import com.jw.DataSourceHolder;
import com.jw.properties.MultiRedisProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import java.util.Map;
@Slf4j
public class MultiRedisLettuceConnectionFactory implements RedisConnectionFactory {
private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {
this.connectionFactoryMap = connectionFactoryMap;
}
private LettuceConnectionFactory currentLettuceConnectionFactory() {
String currentRedis = DataSourceHolder.peek();
if (StringUtils.isNotBlank(currentRedis)) {
MultiRedisLettuceConnectionFactory.currentRedis.remove();
return connectionFactoryMap.get(currentRedis);
}
return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
}
@Override
public RedisConnection getConnection() {
return currentLettuceConnectionFactory().getConnection();
}
@Override
public RedisClusterConnection getClusterConnection() {
return currentLettuceConnectionFactory().getClusterConnection();
}
@Override
public boolean getConvertPipelineAndTxResults() {
return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();
}
@Override
public RedisSentinelConnection getSentinelConnection() {
return currentLettuceConnectionFactory().getSentinelConnection();
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
}
}
package com.jw.config.rocketmq;
import cn.hutool.core.map.MapUtil;
import com.google.common.collect.Maps;
import com.jw.dto.MQProperties;
import com.jw.context.ApplicationContextHelper;
import com.jw.listener.AbstractBroadCastListener;
import com.jw.properties.MultiMqProperties;
import jakarta.annotation.Resource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import java.util.Iterator;
import java.util.Map;
@Configuration
public class ConsumerBroadCastInitializer {
private final static Logger LOG = LogManager.getLogger(ConsumerBroadCastInitializer.class);
@Resource
private ApplicationContextHelper applicationContextHelper;
@Resource
private MultiMqProperties multiMqProperties;
@Bean(value = "initBroadCastConsumer")
public void initConsumer() throws Exception{
Map<String,String> nameServerMap = Maps.newConcurrentMap();
Map<String, String> map = multiMqProperties.getTenant();
if (MapUtil.isNotEmpty(map)) {
try {
Iterator<String> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
// 配置redisTemplate
String key = iterator.next();
String host = map.get(key);
nameServerMap.put(key, host);
}
} catch (Exception e) {
throw new Exception(e);
}
Map<String, AbstractBroadCastListener> consumerListeners = ApplicationContextHelper.getApplicationContext()
.getBeansOfType(AbstractBroadCastListener.class);
if (consumerListeners.size() > 0) {
Iterator<String> iterator = nameServerMap.keySet().iterator();
while (iterator.hasNext()) {
String tenantCode = iterator.next();
String nameServer = nameServerMap.get(tenantCode);
for (AbstractBroadCastListener consumerListener : consumerListeners.values()) {
MQProperties mqProperties = consumerListener.getMQProperties(tenantCode);
String groupName = applicationContextHelper.getApplicationName()
+ "_" + mqProperties.getTopicName() + "_" + mqProperties.getTagsName();
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe(mqProperties.getTopicName(), mqProperties.getTagsName());
consumer.registerMessageListener(consumerListener);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}
}
package com.jw.config.rocketmq;
import cn.hutool.core.map.MapUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.jw.context.ApplicationContextHelper;
import com.jw.dto.MQProperties;
import com.jw.listener.AbstractClusterListener;
import com.jw.properties.MultiMqProperties;
import jakarta.annotation.Resource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@Configuration
@EnableConfigurationProperties(MultiMqProperties.class)
public class ConsumerClusterInitializer{
private final static Logger LOG = LogManager.getLogger(ConsumerClusterInitializer.class);
@Resource
private ApplicationContextHelper applicationContextHelper;
@Resource
private MultiMqProperties multiMqProperties;
@Bean(value = "initClusterConsumer")
public void initConsumer() throws Exception {
Map<String,String> nameServerMap = Maps.newConcurrentMap();
Map<String, String> map = multiMqProperties.getTenant();
if (MapUtil.isNotEmpty(map)) {
try {
Iterator<String> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
// 配置redisTemplate
String key = iterator.next();
String host = map.get(key);
nameServerMap.put(key, host);
}
} catch (Exception e) {
throw new Exception(e);
}
Map<String, AbstractClusterListener> consumerListeners = ApplicationContextHelper.getApplicationContext()
.getBeansOfType(AbstractClusterListener.class);
if (consumerListeners.size() > 0) {
Iterator<String> iterator = nameServerMap.keySet().iterator();
//set是用来处理settleTenantListener类的。这个类是不用初始化多份的
//全部topic,只有这个类的topic是没有拼接租户前缀的,所以我们可以通过topic来区分是否加载了多次
Set<String> set = Sets.newHashSet();
while (iterator.hasNext()) {
String tenantCode = iterator.next();
String nameServer = nameServerMap.get(tenantCode);
for (AbstractClusterListener consumerListener : consumerListeners.values()) {
MQProperties mqProperties = consumerListener.getMQProperties(tenantCode);
if (set.contains(mqProperties.getTopicName())) {
continue;
}
set.add(mqProperties.getTopicName());
String groupName = tenantCode
+ "_" + mqProperties.getTopicName() + "_" + mqProperties.getTagsName();
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe(mqProperties.getTopicName(), mqProperties.getTagsName());
consumer.registerMessageListener(consumerListener);
consumer.setConsumeThreadMin(100);
consumer.setConsumeThreadMax(600);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}
}
package com.jw.config.rocketmq;
import cn.hutool.core.map.MapUtil;
import com.google.common.collect.Maps;
import com.jw.context.ApplicationContextHelper;
import com.jw.dto.MQProperties;
import com.jw.listener.AbstractOrderListener;
import com.jw.properties.MultiMqProperties;
import jakarta.annotation.Resource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import java.util.Iterator;
import java.util.Map;
@Configuration
public class ConsumerOrderInitializer {
private final static Logger LOG = LogManager.getLogger(ConsumerOrderInitializer.class);
@Resource
private ApplicationContextHelper applicationContextHelper;
@Resource
private MultiMqProperties multiMqProperties;
@Bean(value = "initOrderConsumer")
public void initConsumer() throws Exception {
Map<String,String> nameServerMap = Maps.newConcurrentMap();
Map<String, String> map = multiMqProperties.getTenant();
if (MapUtil.isNotEmpty(map)) {
try {
Iterator<String> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
// 配置redisTemplate
String key = iterator.next();
String host = map.get(key);
nameServerMap.put(key, host);
}
} catch (Exception e) {
throw new Exception(e);
}
Map<String, AbstractOrderListener> consumerListeners = ApplicationContextHelper.getApplicationContext()
.getBeansOfType(AbstractOrderListener.class);
if (consumerListeners.size() > 0) {
Iterator<String> iterator = nameServerMap.keySet().iterator();
while (iterator.hasNext()) {
String tenantCode = iterator.next();
String nameServer = nameServerMap.get(tenantCode);
for (AbstractOrderListener consumerListener : consumerListeners.values()) {
MQProperties mqProperties = consumerListener.getMQProperties(tenantCode);
String groupName = applicationContextHelper.getApplicationName()
+ "_" + mqProperties.getTopicName() + (mqProperties.getTagsName().equals("*") ? "" : "_" + mqProperties.getTagsName());
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(mqProperties.getTopicName(), mqProperties.getTagsName());
consumer.registerMessageListener(consumerListener);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}
}
新增rocketmq广播消费,集群消费,有序消息消费基类
package com.jw.listener;
import com.alibaba.fastjson.JSON;
import com.jw.DataSourceHolder;
import com.jw.dto.MQProperties;
import com.jw.dto.MessageBody;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 广播消息模式监听基类
*/
@Slf4j
public abstract class AbstractBroadCastListener implements MessageListenerConcurrently {
private final static Logger LOG = LogManager.getLogger(AbstractBroadCastListener.class);
/**
* 初始化主题以及标签配置
*
* @return
*/
public abstract MQProperties getMQProperties(String db);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
String msg = new String(messageExt.getBody());
int count = messageExt.getReconsumeTimes();
if (count > 3) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
MessageBody messageBody = JSON.parseObject(msg, MessageBody.class);
/**
* 监听你到tenantCode标识,设置到线程变量
*/
String tenantCode = messageBody.getTenantCode();
DataSourceHolder.push(tenantCode);
idempotent(messageBody);
onMessage(messageBody);
} catch (Exception e) {
log.error("MQ消费异常:", e);
log.error("",e);
}finally {
DataSourceHolder.clear();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 消息接收处理
*
* @param msg 消息内容
*/
public abstract void onMessage(MessageBody msg);
/**
* 消息幂等处理
* 不需要幂等的消息,则可不重写该方法
* @param messageBody
*/
public abstract boolean idempotent(MessageBody messageBody);
}
package com.jw.listener;
import com.alibaba.fastjson.JSON;
import com.jw.DataSourceHolder;
import com.jw.dto.MQProperties;
import com.jw.dto.MessageBody;
import com.jw.exception.MqImportantException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 集群消费模式监听基类
*/
@Slf4j
public abstract class AbstractClusterListener implements MessageListenerConcurrently {
/**
* 初始化主题以及标签配置
*
* @return
*/
public abstract MQProperties getMQProperties(String db);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
String msg = new String(messageExt.getBody());
MessageBody messageBody = JSON.parseObject(msg, MessageBody.class);
int count = messageExt.getReconsumeTimes();
if (count > 5) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
String tenantCode = messageBody.getTenantCode();
/**
* 监听你到tenantCode标识,设置到线程变量
*/
DataSourceHolder.push(tenantCode);
beforeBusiness(messageBody);
idempotentMsg(messageBody);
onMessage(messageBody);
}catch (MqImportantException e){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e) {
log.error("MQ消费异常,消费数据:{}", messageBody.getData());
log.error("",e);
e.printStackTrace();
try {
deleteIdempotent(messageBody.getMessageId());
} catch (Exception ex) {
log.error("MQ删除幂等失败:{}", messageBody.getData());
log.error("",e);
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}finally {
DataSourceHolder.clear();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 前置作业
* @param messageBody
*/
public abstract void beforeBusiness(MessageBody messageBody);
/**
* 删除幂等
* @param msgId
* @throws Exception
*/
public abstract void deleteIdempotent(String msgId) throws Exception;
/**
* 消息接收处理
*
* @param msg 消息内容
*/
public abstract void onMessage(MessageBody msg) throws Exception;
/**
* 消息幂等处理
* 不需要幂等的消息,则可不重写该方法
* @param messageBody
*/
public abstract boolean idempotentMsg(MessageBody messageBody) throws MqImportantException;
}
package com.jw.listener;
import com.alibaba.fastjson.JSON;
import com.jw.DataSourceHolder;
import com.jw.dto.MQProperties;
import com.jw.dto.MessageBody;
import com.jw.exception.MqImportantException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 集群消费模式监听基类
*/
@Slf4j
public abstract class AbstractOrderListener implements MessageListenerOrderly {
/**
* 初始化主题以及标签配置
*
* @return
*/
public abstract MQProperties getMQProperties(String db);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
String msg = new String(messageExt.getBody());
String msgId = "";
int count = messageExt.getReconsumeTimes();
if (count > 5) {
return ConsumeOrderlyStatus.SUCCESS;
}
try {
MessageBody messageBody = JSON.parseObject(msg, MessageBody.class);
/**
* 监听你到tenantCode标识,设置到线程变量
*/
String tenantCode = messageBody.getTenantCode();
DataSourceHolder.push(tenantCode);
beforeBusiness(messageBody);
idempotent(messageBody);
onMessage(messageBody);
} catch (MqImportantException e){
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
log.error("MQ消费异常:");
log.error("",e);
try {
deleteIdempotent(msgId);
} catch (Exception ex) {
log.error("",e);
log.error("删除幂等失败,内容:{}",JSON.toJSONString(messageExt),e);
}
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}finally {
DataSourceHolder.clear();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
public abstract void beforeBusiness(MessageBody messageBody);
public abstract void deleteIdempotent(String msgId) throws Exception;
/**
* 消息接收处理
*
* @param msg 消息内容
*/
public abstract boolean onMessage(MessageBody msg) throws Exception;
/**
* 消息幂等处理
* 不需要幂等的消息,则可不重写该方法
* @param messageBody
*/
public abstract boolean idempotent(MessageBody messageBody) throws MqImportantException;
}
新增rocketmq发送消息类
package com.jw.service;
public interface MessageProducer {
void sendMsg(String topicName, String tagName, Object message) throws Exception;
/**
* 循序消息发送
* @param topic
* @param tag
* @param data
* @param bussinessQueueId
* @param delayTimeLeve 消息延迟时间,必传
* @throws Exception
*/
void sendOrderMessage(String topic, String tag, Object data, Long bussinessQueueId, int delayTimeLeve) throws Exception;
}
package com.jw.service.impl;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.jw.DataSourceHolder;
import com.jw.context.ApplicationContextHelper;
import com.jw.dto.MessageBody;
import com.jw.properties.MultiMqProperties;
import com.jw.service.MessageProducer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Lazy
@Service
@Slf4j
@EnableConfigurationProperties(MultiMqProperties.class)
public class DefaultMessageProducer implements MessageProducer {
private final static Logger LOG = LogManager.getLogger(DefaultMessageProducer.class);
private Map<String,DefaultMQProducer> producerMap = Maps.newConcurrentMap();
@Autowired
private ApplicationContextHelper applicationContextHelper;
@Resource
private MultiMqProperties multiMqProperties;
@PostConstruct
public void init() throws Exception{
try {
Map<String, String> map = multiMqProperties.getTenant();
if (MapUtil.isNotEmpty(map)) {
Iterator<String> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
// 配置redisTemplate
String key = iterator.next();
String host = map.get(key);
DefaultMQProducer producer = new DefaultMQProducer(key + "_" + applicationContextHelper.getApplicationName());
producer.setNamesrvAddr(host);
producer.start();
// 配置redisTemplate
producerMap.put(key, producer);
}
}
}catch(Exception e){
throw new Exception(e);
}
}
@Override
public void sendMsg(String topicName, String tagName, Object message) throws Exception {
String db = DataSourceHolder.peek();
Message msg = generateMessage(db,db+ StrUtil.UNDERLINE+topicName, tagName, message);
SendResult sendResult = producerMap.get(db).send(msg);
log.info("消息发送:" + msg.getTopic() + ":" + msg.getTags() + ",发送结果:" + (JSON.toJSONString(sendResult)));
}
@Override
public void sendOrderMessage(String topic,String tag ,Object data,Long bussinessQueueId,int delayTimeLeve) throws Exception {
String db = DataSourceHolder.peek();
Message message = generateMessage(db,db+ StrUtil.UNDERLINE+topic, tag, data);
message.setDelayTimeLevel(delayTimeLeve);
SendResult sendResult = producerMap.get(db).send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object queueId) {
Long sortId = (Long) queueId;
Long index = sortId % list.size();
return list.get(index.intValue());
}
}, bussinessQueueId);
}
private Message generateMessage(String tenantCode,String topic,String tag,Object data) throws Exception{
Message msg = new Message();
MessageBody messageBody = new MessageBody(tenantCode, UUID.randomUUID().toString(),tag, data);
msg.setTopic(topic);
msg.setTags(tag);
msg.setBody(JSONObject.toJSONString(messageBody).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setWaitStoreMsgOK(true);
msg.setFlag(0);
return msg;
}
}
加上其它附加DTO,Properties相关类,study-datasource模块最终结构如下图所示:
然后就可以在study-service模块引入study-datasource包了
<dependency>
<groupId>com.jw</groupId>
<artifactId>study-datasource</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
接下来去请求用户查询接口,接口则会报错。因为我们没有指定要具体路由的租户数据源Caused by: java.lang.IllegalStateException: Cannot determine target DataSource for lookup key []
at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.determineTargetDataSource(AbstractRoutingDataSource.java:232)
at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.getConnection(AbstractRoutingDataSource.java:194)
at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:160)
at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:118)
at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)
INSERT INTO `study_common`.`study_tenant` (`id`, `tenant_name`, `tenant_code`) VALUES ('1', 'study_tenant1', 'study_tenant1');
INSERT INTO `study_common`.`study_tenant` (`id`, `tenant_name`, `tenant_code`) VALUES ('2', 'study_tenant2', 'study_tenant2');
INSERT INTO `study_common`.`study_tenant_domain` (`id`, `tenant_code`, `domain`) VALUES ('1', 'study_tenant1', 'http://0:0:0:0:0:0:0:1');
INSERT INTO `study_common`.`study_tenant_domain` (`id`, `tenant_code`, `domain`) VALUES ('2', 'study_tenant2', 'http://127.0.0.1')
新增拦截器,拦截当前请求域名,根据域名查询当前所属哪个租户。此处为频繁查询,可把域名放置在内存中,避免给数据库造成压力
package com.jw.inteceptor;
import com.jw.DataSourceHolder;
import com.jw.remote.TestRemote;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
@Component
public class RequestInteceptor implements HandlerInterceptor {
@Resource
private TestRemote testRemote;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
HttpServletRequest httpServletRequest = (HttpServletRequest)request;
String host = request.getRemoteHost();
String domain = request.getScheme() + "://" + host;
String tenantCode = testRemote.getTenantCode(domain);
DataSourceHolder.push(tenantCode);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
DataSourceHolder.clear();
}
}
package com.jw.config;
import com.jw.inteceptor.RequestInteceptor;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Resource
private RequestInteceptor requestInteceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry .addInterceptor(this.requestInteceptor)
.addPathPatterns("/**")
.excludePathPatterns("/swagger-resources/**")
.excludePathPatterns( "/webjars/**")
.excludePathPatterns("/v2/**")
.excludePathPatterns("/**/getVersion")
.excludePathPatterns("/swagger-ui.html/**")
;
}
// 必须添加
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("swagger-ui.html")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}
}
新增feign请求拦截器,因为feign请求为http调用,线程变量是传递不过去的,所以通过拦截feign请求,把线程变量放在请求的header里面
package com.jw.inteceptor;
import com.jw.DataSourceHolder;
import feign.RequestInterceptor;
import feign.RequestTemplate;
public class FeignInterceptor implements RequestInterceptor{
@Override
public void apply(RequestTemplate requestTemplate) {
//获取到线程变量重新设置到线程变量中,防止中途丢失
String dbName = DataSourceHolder.peek();
requestTemplate.header("tenantCode",dbName);
}
}
package com.jw.config;
import com.jw.inteceptor.FeignInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FeignInteceptorConfig {
@Bean
public FeignInterceptor feignInterceptor(){
return new FeignInterceptor();
}
}
新增拦截器,拦截当前请求,从header中取出在feign拦截器中设置的租户code
package com.jw.inteceptor;
import cn.hutool.core.util.StrUtil;
import com.jw.DataSourceHolder;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import java.util.Objects;
/**
* @author :j
* @date :Created in 2022/12/16 15:05
* @description:
*/
@Component
public class RequestInteceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
HttpServletRequest httpServletRequest = (HttpServletRequest)request;
String dbName = httpServletRequest.getHeader("tenantCode");
//测试用
if (StrUtil.isEmpty(dbName) || Objects.equals(dbName,"common")) {
dbName = "";
}
DataSourceHolder.push(dbName);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
DataSourceHolder.clear();
}
}}
package com.jw.config;
import com.jw.inteceptor.RequestInteceptor;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Resource
private RequestInteceptor requestInteceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry .addInterceptor(this.requestInteceptor)
.addPathPatterns("/**")
;
}
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
}
经过这样配置后,通过测试可以发现,使用不同的域名请求到了不同租户数据库当中。在这整个过程中,原有业务代码是没有做任何改动的,都是新增的。
在这整个改造过程中,项目中使用的redis也是不需要做任何改动的。通过上面redis的两个配置类,在使用RedisTemplate时会自动的根据线程变量找到对应的redis。Rocketmq同理,也是通过在发送消息的时候带上线程的线程变量,然后再在消费监听的时候把当前消息传递过来的线程变量设置到当前监听的线程变量中。如此,这样改造下来,只需要在原有Rocketmq的发送及监听的基类上做些许改动其它不用做任何业务改动的前提下实现多租户系统。因为配置类及相关附属类有些多,相关源码文中没有放全。如有问题,您也可以在公众号下方留言。该改造方案经过生产实战验证可靠并且有效。
整理不易,喜欢点个关注吧,更多好文等你来分享
点击公众号发送"多租户"获取所有源码往期回顾
关于接口性能优化你不得不知道的事!
一行搞定!注解带你轻松实现redis分布式锁
【实用技巧】微信小程序循环中实现多个倒计时,让你轻松管理时间!
MySQL面试秘籍:如何吊打面试官并轻松拿下心仪职位!
微信小程序如何优雅的处理token失效
分布式系统必知必会:解析分布式系统遇到的一系列问题及解决方案
超全最新最实用AI工具集合192个,涉及多个领域,太牛逼了!!
redis导致生产事故,CTO:下次这样用别干了
[附源码]SpringBoot整合Sentinel限流及Sentinel-Dashboard持久化规则到Nacos
一文搞懂SpringBoot实现动态切换数据源
中小型分布式系统分布式ID生产实战
k8s图形化管理工具kuboard
超强大CICD平台Drone,看如何让系统自动化!
Sharding-jdbc在多数据源下的读写分离实现