查看原文
其他

万字长文!手把手教您轻松实现单租户到SaaS架构的升级!

老猿人 码农闲谈AI 2024-01-22
背景

在以往的项目当中,刚开始可能因为业务量需求而只支持单个客户或者说租户使用。但随着业务量的增长及客户的增加。问题也随之而来。往往表现的问题有:

成本较高:单租户项目需要为每个租户提供独立的软件实例和基础设施,因此需要更多的硬件和软件资源,导致成本较高。

维护困难:随着租户数量的增加,单租户项目的维护将变得越来越困难。每个租户都有自己的定制需求,需要对每个租户的软件进行定制和维护,这需要大量的人力和时间资源。

扩展性差:单租户项目在面对大量租户时,可能会遇到性能和可扩展性问题。每个租户都会对系统产生负载,导致系统的性能下降。

数据隔离差:由于所有租户共享同一个软件实例和基础设施,因此数据隔离性较差。每个租户都可以访问其他租户的数据,这增加了数据泄露和安全风险。

定制化程度低:单租户项目通常只能提供一些通用的功能和配置,难以满足每个租户的独特需求。如果需要进行大量的定制化开发,可能需要耗费大量的时间和资源

那么如何让同一套代码适应不同租户的需求,同时保持系统的稳定性和高效性以及在原有代码上以改动(包括redis,rocketmq的多租户支持)最小的方式解决这个问题呢?

具体实现

目前有A客户在使用当前项目,且有业务为用户查询业务,项目当前结构为:

study   |-- study-api  #内部调用api模块   |-- study-service #内部服务模块   |-- study-web #对外接口模块   |-- study-common #公共包

当前有数据库study_tenant1,库中有表class_user_info表

CREATE TABLE `class_user_info` (  `id` bigint(20) NOT NULL,  `user_name` varchar(30) NOT NULL,  `password` varchar(10) NOT NULL,  `user_type` char(1) NOT NULL,  `mobile` varchar(11) NOT NULL,  `salt` varchar(30) NOT NULL,  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,  `created_by` varchar(30) NOT NULL,  `updated_by` varchar(30) NOT NULL,  `updated_time` datetime NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


那么我们如何以现有业务代码改动最小的基础上,使客户B也使用这套代码,而不用独立再部署一套呢?



01新增租户库及study-datasource模块

新增study_tenant2作为客户B的库。库表结构与study_tenant1一致. 并新增租户公共库study_common。公共库新增表结构如下:

-- 租户表CREATE TABLE `study_tenant` (  `id` bigint(20) NOT NULL,  `tenant_name` varchar(20) NOT NULL,  `tenant_code` varchar(20) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;-- 租户域名表CREATE TABLE `study_tenant_domain` (  `id` bigint(20) NOT NULL,  `tenant_code` varchar(20) NOT NULL,  `domain` varchar(100) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;




新增study-datasrouce模块相关处理

study   |-- study-api  #内部调用api模块   |-- study-service #内部服务模块   |-- study-web #对外接口模块   |-- study-common #公共包   |-- study-datasource # 多数据源处理包


02study-datasource新增配置类

在新增配置类前,需要现将原有单租户nacos配置进行改动,改动如下:

spring:  redis:    study_common:      host: 127.0.0.1      port: 6379      database: 1    study_tenant1:      host: 127.0.0.1      port: 6379      database: 2    study_tenant2:      host: 127.0.0.1      port: 6379      database: 3  datasource:    study_common:      url: jdbc:mysql://192.168.0.168:3306/study_common?usseUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true&allowMultiQueries=true&serverTimezone=GMT%2B8      username: root      password: root      type: com.zaxxer.hikari.HikariDataSource      hikari:        connection-init-sql: SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci        minimum-idle: 10        maximum-pool-size: 15        idle-timeout: 30000        max-lifetime: 180000        connection-timeout: 30000        connection-test-query: select 'connected'      jackson:        date-format: yyyy-MM-dd HH:mm:ss    study-tenant1:      url: jdbc:mysql://192.168.0.168:3306/study_tenant1?usseUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true&allowMultiQueries=true&serverTimezone=GMT%2B8      username: root      password: root      type: com.zaxxer.hikari.HikariDataSource      hikari:        connection-init-sql: SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci        minimum-idle: 10        maximum-pool-size: 15        idle-timeout: 30000        max-lifetime: 180000        connection-timeout: 30000        connection-test-query: select 'connected'      jackson:        date-format: yyyy-MM-dd HH:mm:ss    study_tenant2:      url: jdbc:mysql://192.168.0.168:3306/study_tenant2?usseUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true&allowMultiQueries=true&serverTimezone=GMT%2B8      username: root      password: root      type: com.zaxxer.hikari.HikariDataSource      hikari:        connection-init-sql: SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci        minimum-idle: 10        maximum-pool-size: 15        idle-timeout: 30000        max-lifetime: 180000        connection-timeout: 30000        connection-test-query: select 'connected'      jackson:        date-format: yyyy-MM-dd HH:mm:ssrocketmq:  tenant:    study_common: 127.0.0.1:9876    study_tenant1: 127.0.0.1:9876    study_tenant2: 127.0.0.1:9876


spring数据源切换配置类

 

package com.jw.config.mysql;import com.jw.DataSourceHolder;import com.jw.utils.DsUtils;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import java.util.Map;@Configuration@DependsOn("dsInit")public class DynamicDataSource extends AbstractRoutingDataSource {    @Override    protected Object determineCurrentLookupKey() {        Object o = DataSourceHolder.peek();        return o==null?"":o;    }    public DynamicDataSource() {        Map<Object,Object> map = DsUtils.getDsMap();        super.setTargetDataSources(map);    }}



数据源加载配置类


package com.jw.config.mysql;

import cn.hutool.crypto.SecureUtil;import com.alibaba.nacos.common.utils.MapUtil;import com.jw.properties.MultiDbProperties;import com.jw.properties.MultiTenantDbProperties;import com.jw.utils.DsUtils;import com.zaxxer.hikari.HikariDataSource;import jakarta.annotation.Resource;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;

import java.nio.charset.Charset;import java.util.*;
/** * @author :j * @date :Created in 2021/4/10 16:29 * @description: */@Configuration@EnableConfigurationProperties(MultiTenantDbProperties.class)public class DataSourceConfiguration {
   @Resource    private MultiTenantDbProperties multiTenantDbProperties;

   @Bean("dsInit")    public void setDataSource() throws Exception {        try {            Map<String, MultiDbProperties> map = multiTenantDbProperties.getDatasource();            if (MapUtil.isNotEmpty(map)) {                Iterator<String> iterator = map.keySet().iterator();                while(iterator.hasNext()){                    String key = iterator.next();                    MultiDbProperties multiDbProperties = map.get(key);                    HikariDataSource hikariDataSource = getDataSource(multiDbProperties);                    if (Objects.equals(key, multiTenantDbProperties.getCommon_db())) {                        DsUtils.put("", hikariDataSource);                    }else {                        DsUtils.put(key, hikariDataSource);                    }                }
           }        } catch (Exception e) {            throw new Exception("数据源加载出错");        }
   }
   public HikariDataSource getDataSource(MultiDbProperties multiDbProperties ) {        HikariDataSource hikariDataSource = new HikariDataSource();        hikariDataSource.setJdbcUrl(multiDbProperties.getUrl());        hikariDataSource.setUsername(multiDbProperties.getUsername());        String password = multiDbProperties.getPassword();        hikariDataSource.setPassword(password);        hikariDataSource.setConnectionInitSql(multiDbProperties.getHikari().getConnectionInitSql());        hikariDataSource.setMinimumIdle(multiDbProperties.getHikari().getMinimumIdle());        hikariDataSource.setMaximumPoolSize(multiDbProperties.getHikari().getMaximumPoolSize());        hikariDataSource.setIdleTimeout(multiDbProperties.getHikari().getIdleTimeout());        hikariDataSource.setMaxLifetime(multiDbProperties.getHikari().getMaxLifetime());        hikariDataSource.setConnectionTimeout(multiDbProperties.getHikari().getConnectionTimeout());        hikariDataSource.setConnectionTestQuery(multiDbProperties.getHikari().getConnectionTestQuery());        return hikariDataSource;    }




}



新增redis多租户支持配置类

package com.jw.config.redis;
import cn.hutool.core.map.MapUtil;import cn.hutool.core.util.StrUtil;import com.google.common.collect.Maps;
import com.jw.properties.MultiRedisProperties;import com.zaxxer.hikari.HikariDataSource;import jakarta.annotation.Resource;import org.springframework.boot.autoconfigure.data.redis.RedisProperties;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.data.redis.connection.RedisStandaloneConfiguration;import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import javax.sql.DataSource;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.util.Iterator;import java.util.Map;

@Configuration@EnableConfigurationProperties(MultiRedisProperties.class)public class RedisCustomizedConfiguration {
   @Resource    private DataSource dataSource;

   /**     * @param multiRedisProperties     * @return     */    @Bean("multiRedisLettuceConnectionFactory")    public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(MultiRedisProperties multiRedisProperties) throws Exception{        //读取配置        Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newConcurrentMap();        Map<String, RedisProperties> multi = multiRedisProperties.getRedis();        if(MapUtil.isNotEmpty(multi)) {            try {                Iterator<String> iterator = multi.keySet().iterator();                while (iterator.hasNext()) {                    String key = iterator.next();                    RedisProperties redisProperties = multi.get(key);                    RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();                    configuration.setHostName(redisProperties.getHost());                    configuration.setPort(redisProperties.getPort());                    configuration.setDatabase(redisProperties.getDatabase());                    if (StrUtil.isNotEmpty(redisProperties.getPassword())) {                        configuration.setPassword(redisProperties.getPassword());                    }                    LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration);                    connectionFactory.afterPropertiesSet();                    // 配置redisTemplate                    connectionFactoryMap.put(key, connectionFactory);                }            } catch (Exception e) {                throw new Exception(e);            }        }        return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);    }


}


package com.jw.config.redis;

import com.alibaba.cloud.commons.lang.StringUtils;import com.jw.DataSourceHolder;import com.jw.properties.MultiRedisProperties;import lombok.extern.slf4j.Slf4j;import org.springframework.dao.DataAccessException;import org.springframework.data.redis.connection.*;import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import java.util.Map;

@Slf4jpublic class MultiRedisLettuceConnectionFactory implements   RedisConnectionFactory {    private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
   private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
   public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {        this.connectionFactoryMap = connectionFactoryMap;    }
   private LettuceConnectionFactory currentLettuceConnectionFactory() {        String currentRedis = DataSourceHolder.peek();        if (StringUtils.isNotBlank(currentRedis)) {            MultiRedisLettuceConnectionFactory.currentRedis.remove();            return connectionFactoryMap.get(currentRedis);        }        return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);    }

   @Override    public RedisConnection getConnection() {        return currentLettuceConnectionFactory().getConnection();    }
   @Override    public RedisClusterConnection getClusterConnection() {        return currentLettuceConnectionFactory().getClusterConnection();    }
   @Override    public boolean getConvertPipelineAndTxResults() {        return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();    }
   @Override    public RedisSentinelConnection getSentinelConnection() {        return currentLettuceConnectionFactory().getSentinelConnection();    }
   @Override    public DataAccessException translateExceptionIfPossible(RuntimeException ex) {        return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);    }}


新增rocketmq多租户支持配置加载类(广播类消息,集群类消息,有序类消息)
package com.jw.config.rocketmq;
import cn.hutool.core.map.MapUtil;import com.google.common.collect.Maps;
import com.jw.dto.MQProperties;import com.jw.context.ApplicationContextHelper;import com.jw.listener.AbstractBroadCastListener;import com.jw.properties.MultiMqProperties;import jakarta.annotation.Resource;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;
import java.util.Iterator;import java.util.Map;
@Configurationpublic class ConsumerBroadCastInitializer  {
   private final static Logger LOG = LogManager.getLogger(ConsumerBroadCastInitializer.class);
   @Resource    private ApplicationContextHelper applicationContextHelper;    @Resource    private MultiMqProperties multiMqProperties;    @Bean(value = "initBroadCastConsumer")    public void initConsumer() throws Exception{        Map<String,String> nameServerMap = Maps.newConcurrentMap();        Map<String, String> map = multiMqProperties.getTenant();        if (MapUtil.isNotEmpty(map)) {            try {                Iterator<String> iterator = map.keySet().iterator();                while (iterator.hasNext()) {                    // 配置redisTemplate                    String key = iterator.next();                    String host = map.get(key);                    nameServerMap.put(key, host);                }

           } catch (Exception e) {                throw new Exception(e);            }            Map<String, AbstractBroadCastListener> consumerListeners = ApplicationContextHelper.getApplicationContext()                    .getBeansOfType(AbstractBroadCastListener.class);            if (consumerListeners.size() > 0) {                Iterator<String> iterator = nameServerMap.keySet().iterator();                while (iterator.hasNext()) {                    String tenantCode = iterator.next();                    String nameServer = nameServerMap.get(tenantCode);                    for (AbstractBroadCastListener consumerListener : consumerListeners.values()) {                        MQProperties mqProperties = consumerListener.getMQProperties(tenantCode);                        String groupName = applicationContextHelper.getApplicationName()                                + "_" + mqProperties.getTopicName() + "_" + mqProperties.getTagsName();                        try {                            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);                            consumer.setNamesrvAddr(nameServer);                            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);                            consumer.setMessageModel(MessageModel.BROADCASTING);                            consumer.subscribe(mqProperties.getTopicName(), mqProperties.getTagsName());                            consumer.registerMessageListener(consumerListener);                            consumer.start();                        } catch (Exception e) {                            e.printStackTrace();                        }                    }                }            }        }    }}
package com.jw.config.rocketmq;
import cn.hutool.core.map.MapUtil;import com.google.common.collect.Maps;import com.google.common.collect.Sets;import com.jw.context.ApplicationContextHelper;import com.jw.dto.MQProperties;import com.jw.listener.AbstractClusterListener;import com.jw.properties.MultiMqProperties;import jakarta.annotation.Resource;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;
import java.util.Iterator;import java.util.Map;import java.util.Set;
@Configuration@EnableConfigurationProperties(MultiMqProperties.class)public class ConsumerClusterInitializer{
   private final static Logger LOG = LogManager.getLogger(ConsumerClusterInitializer.class);
   @Resource    private ApplicationContextHelper applicationContextHelper;    @Resource    private MultiMqProperties multiMqProperties;
   @Bean(value = "initClusterConsumer")    public void initConsumer() throws Exception {        Map<String,String> nameServerMap = Maps.newConcurrentMap();        Map<String, String> map = multiMqProperties.getTenant();        if (MapUtil.isNotEmpty(map)) {            try {                Iterator<String> iterator = map.keySet().iterator();                while (iterator.hasNext()) {                    // 配置redisTemplate                    String key = iterator.next();                    String host = map.get(key);                    nameServerMap.put(key, host);                }

           } catch (Exception e) {                throw new Exception(e);            }            Map<String, AbstractClusterListener> consumerListeners = ApplicationContextHelper.getApplicationContext()                    .getBeansOfType(AbstractClusterListener.class);            if (consumerListeners.size() > 0) {                Iterator<String> iterator = nameServerMap.keySet().iterator();                //set是用来处理settleTenantListener类的。这个类是不用初始化多份的                //全部topic,只有这个类的topic是没有拼接租户前缀的,所以我们可以通过topic来区分是否加载了多次                Set<String> set = Sets.newHashSet();                while (iterator.hasNext()) {                    String tenantCode = iterator.next();                    String nameServer = nameServerMap.get(tenantCode);                    for (AbstractClusterListener consumerListener : consumerListeners.values()) {                        MQProperties mqProperties = consumerListener.getMQProperties(tenantCode);                        if (set.contains(mqProperties.getTopicName())) {                            continue;                        }                        set.add(mqProperties.getTopicName());                        String groupName = tenantCode                                + "_" + mqProperties.getTopicName() + "_" + mqProperties.getTagsName();                        try {                            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);                            consumer.setNamesrvAddr(nameServer);                            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);                            consumer.setMessageModel(MessageModel.CLUSTERING);                            consumer.subscribe(mqProperties.getTopicName(), mqProperties.getTagsName());                            consumer.registerMessageListener(consumerListener);                            consumer.setConsumeThreadMin(100);                            consumer.setConsumeThreadMax(600);                            consumer.start();                        } catch (Exception e) {                            e.printStackTrace();                        }                    }                }            }        }    }}
package com.jw.config.rocketmq;
import cn.hutool.core.map.MapUtil;import com.google.common.collect.Maps;
import com.jw.context.ApplicationContextHelper;import com.jw.dto.MQProperties;import com.jw.listener.AbstractOrderListener;import com.jw.properties.MultiMqProperties;import jakarta.annotation.Resource;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;
import java.util.Iterator;import java.util.Map;
@Configurationpublic class ConsumerOrderInitializer {
   private final static Logger LOG = LogManager.getLogger(ConsumerOrderInitializer.class);
   @Resource    private ApplicationContextHelper applicationContextHelper;    @Resource    private MultiMqProperties multiMqProperties;    @Bean(value = "initOrderConsumer")    public void initConsumer() throws Exception {        Map<String,String> nameServerMap = Maps.newConcurrentMap();        Map<String, String> map = multiMqProperties.getTenant();        if (MapUtil.isNotEmpty(map)) {            try {                Iterator<String> iterator = map.keySet().iterator();                while (iterator.hasNext()) {                    // 配置redisTemplate                    String key = iterator.next();                    String host = map.get(key);                    nameServerMap.put(key, host);                }

           } catch (Exception e) {                throw new Exception(e);            }            Map<String, AbstractOrderListener> consumerListeners = ApplicationContextHelper.getApplicationContext()                    .getBeansOfType(AbstractOrderListener.class);            if (consumerListeners.size() > 0) {                Iterator<String> iterator = nameServerMap.keySet().iterator();                while (iterator.hasNext()) {                    String tenantCode = iterator.next();                    String nameServer = nameServerMap.get(tenantCode);                    for (AbstractOrderListener consumerListener : consumerListeners.values()) {                        MQProperties mqProperties = consumerListener.getMQProperties(tenantCode);                        String groupName = applicationContextHelper.getApplicationName()                                + "_" + mqProperties.getTopicName() + (mqProperties.getTagsName().equals("*") ? "" : "_" + mqProperties.getTagsName());                        try {                            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);                            consumer.setNamesrvAddr(nameServer);                            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);                            consumer.subscribe(mqProperties.getTopicName(), mqProperties.getTagsName());                            consumer.registerMessageListener(consumerListener);                            consumer.setMessageModel(MessageModel.CLUSTERING);                            consumer.start();
                       } catch (Exception e) {                            e.printStackTrace();                        }                    }                }            }        }    }}

新增rocketmq广播消费,集群消费,有序消息消费基类

package com.jw.listener;
import com.alibaba.fastjson.JSON;
import com.jw.DataSourceHolder;import com.jw.dto.MQProperties;import com.jw.dto.MessageBody;import lombok.extern.slf4j.Slf4j;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/** * 广播消息模式监听基类 */@Slf4jpublic abstract class AbstractBroadCastListener implements MessageListenerConcurrently {
   private final static Logger LOG = LogManager.getLogger(AbstractBroadCastListener.class);

   /**     * 初始化主题以及标签配置     *     * @return     */    public abstract MQProperties getMQProperties(String db);
   @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {        for (MessageExt messageExt : msgs) {            String msg = new String(messageExt.getBody());            int count = messageExt.getReconsumeTimes();            if (count > 3) {                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }            try {                MessageBody messageBody = JSON.parseObject(msg, MessageBody.class);                /**                 * 监听你到tenantCode标识,设置到线程变量                 */                String tenantCode = messageBody.getTenantCode();                DataSourceHolder.push(tenantCode);                idempotent(messageBody);                onMessage(messageBody);            } catch (Exception e) {               log.error("MQ消费异常:", e);                log.error("",e);            }finally {                DataSourceHolder.clear();            }        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }
   /**     * 消息接收处理     *     * @param msg 消息内容     */    public abstract void onMessage(MessageBody msg);
   /**     * 消息幂等处理     * 不需要幂等的消息,则可不重写该方法     * @param messageBody     */    public abstract boolean idempotent(MessageBody messageBody);


}


package com.jw.listener;
import com.alibaba.fastjson.JSON;import com.jw.DataSourceHolder;import com.jw.dto.MQProperties;import com.jw.dto.MessageBody;import com.jw.exception.MqImportantException;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/** * 集群消费模式监听基类 */@Slf4jpublic abstract class AbstractClusterListener  implements MessageListenerConcurrently {


   /**     * 初始化主题以及标签配置     *     * @return     */    public abstract MQProperties getMQProperties(String db);
   @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {        for (MessageExt messageExt : msgs) {            String msg = new String(messageExt.getBody());            MessageBody messageBody = JSON.parseObject(msg, MessageBody.class);            int count = messageExt.getReconsumeTimes();            if (count > 5) {                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }            try {                String tenantCode = messageBody.getTenantCode();                /**                 * 监听你到tenantCode标识,设置到线程变量                 */                DataSourceHolder.push(tenantCode);                beforeBusiness(messageBody);                idempotentMsg(messageBody);                onMessage(messageBody);            }catch (MqImportantException e){                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }catch (Exception e) {                log.error("MQ消费异常,消费数据:{}", messageBody.getData());                log.error("",e);                e.printStackTrace();                try {                    deleteIdempotent(messageBody.getMessageId());                } catch (Exception ex) {                    log.error("MQ删除幂等失败:{}", messageBody.getData());                    log.error("",e);                }                return ConsumeConcurrentlyStatus.RECONSUME_LATER;            }finally {                DataSourceHolder.clear();            }        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }
   /**     *  前置作业     * @param messageBody     */    public abstract void beforeBusiness(MessageBody messageBody);

   /**     * 删除幂等     * @param msgId     * @throws Exception     */    public abstract void deleteIdempotent(String msgId) throws Exception;
   /**     * 消息接收处理     *     * @param msg 消息内容     */    public abstract void onMessage(MessageBody msg) throws Exception;
   /**     * 消息幂等处理     * 不需要幂等的消息,则可不重写该方法     * @param messageBody     */    public abstract boolean idempotentMsg(MessageBody messageBody) throws MqImportantException;

}


package com.jw.listener;
import com.alibaba.fastjson.JSON;import com.jw.DataSourceHolder;import com.jw.dto.MQProperties;import com.jw.dto.MessageBody;import com.jw.exception.MqImportantException;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/** * 集群消费模式监听基类 */@Slf4jpublic abstract class AbstractOrderListener implements MessageListenerOrderly {


   /**     * 初始化主题以及标签配置     *     * @return     */    public abstract MQProperties getMQProperties(String db);
   @Override    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext  context) {        for (MessageExt messageExt : msgs) {            String msg = new String(messageExt.getBody());            String msgId = "";            int count = messageExt.getReconsumeTimes();            if (count > 5) {                return ConsumeOrderlyStatus.SUCCESS;            }            try {                MessageBody messageBody = JSON.parseObject(msg, MessageBody.class);                /**                 * 监听你到tenantCode标识,设置到线程变量                 */                String tenantCode = messageBody.getTenantCode();                DataSourceHolder.push(tenantCode);                beforeBusiness(messageBody);                idempotent(messageBody);                onMessage(messageBody);            } catch (MqImportantException e){                return ConsumeOrderlyStatus.SUCCESS;            }catch (Exception e) {               log.error("MQ消费异常:");                log.error("",e);                try {                    deleteIdempotent(msgId);                } catch (Exception ex) {                    log.error("",e);                   log.error("删除幂等失败,内容:{}",JSON.toJSONString(messageExt),e);                }                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;            }finally {                DataSourceHolder.clear();            }        }        return ConsumeOrderlyStatus.SUCCESS;    }
   public abstract void beforeBusiness(MessageBody messageBody);

   public abstract void deleteIdempotent(String msgId) throws Exception;
   /**     * 消息接收处理     *     * @param msg 消息内容     */    public abstract boolean onMessage(MessageBody msg) throws Exception;
   /**     * 消息幂等处理     * 不需要幂等的消息,则可不重写该方法     * @param messageBody     */    public abstract boolean idempotent(MessageBody messageBody) throws MqImportantException;

}

新增rocketmq发送消息类

package com.jw.service;
public interface MessageProducer {
   void sendMsg(String topicName, String tagName, Object message) throws Exception;


   /**     * 循序消息发送     * @param topic     * @param tag     * @param data     * @param bussinessQueueId     * @param delayTimeLeve 消息延迟时间,必传     * @throws Exception     */    void sendOrderMessage(String topic, String tag, Object data, Long bussinessQueueId, int delayTimeLeve) throws Exception;}


package com.jw.service.impl;
import cn.hutool.core.map.MapUtil;import cn.hutool.core.util.StrUtil;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.google.common.collect.Maps;import com.jw.DataSourceHolder;import com.jw.context.ApplicationContextHelper;import com.jw.dto.MessageBody;import com.jw.properties.MultiMqProperties;import com.jw.service.MessageProducer;import jakarta.annotation.PostConstruct;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Lazy;import org.springframework.stereotype.Service;

import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.UUID;
@Lazy@Service@Slf4j@EnableConfigurationProperties(MultiMqProperties.class)public class DefaultMessageProducer implements MessageProducer {
   private final static Logger LOG = LogManager.getLogger(DefaultMessageProducer.class);
   private Map<String,DefaultMQProducer> producerMap = Maps.newConcurrentMap();
   @Autowired    private ApplicationContextHelper applicationContextHelper;
   @Resource    private MultiMqProperties multiMqProperties;
   @PostConstruct    public void init() throws Exception{        try {            Map<String, String> map = multiMqProperties.getTenant();            if (MapUtil.isNotEmpty(map)) {                Iterator<String> iterator = map.keySet().iterator();                while (iterator.hasNext()) {                    // 配置redisTemplate                    String key = iterator.next();                    String host = map.get(key);                    DefaultMQProducer producer = new DefaultMQProducer(key + "_" + applicationContextHelper.getApplicationName());                    producer.setNamesrvAddr(host);                    producer.start();                    // 配置redisTemplate                    producerMap.put(key, producer);                }

           }        }catch(Exception e){            throw new Exception(e);        }    }
   @Override    public void sendMsg(String topicName, String tagName, Object message) throws Exception {        String db = DataSourceHolder.peek();        Message msg  = generateMessage(db,db+ StrUtil.UNDERLINE+topicName, tagName, message);        SendResult sendResult = producerMap.get(db).send(msg);        log.info("消息发送:" + msg.getTopic() + ":" + msg.getTags() + ",发送结果:" + (JSON.toJSONString(sendResult)));    }

   @Override    public void sendOrderMessage(String topic,String tag ,Object data,Long bussinessQueueId,int delayTimeLeve) throws Exception {        String db = DataSourceHolder.peek();        Message message = generateMessage(db,db+ StrUtil.UNDERLINE+topic, tag, data);        message.setDelayTimeLevel(delayTimeLeve);        SendResult sendResult = producerMap.get(db).send(message, new MessageQueueSelector() {            @Override            public MessageQueue select(List<MessageQueue> list, Message message, Object queueId) {                Long sortId = (Long) queueId;                Long index = sortId % list.size();                return list.get(index.intValue());            }        }, bussinessQueueId);      }

   private Message generateMessage(String tenantCode,String topic,String tag,Object data)  throws Exception{        Message msg = new Message();        MessageBody messageBody = new MessageBody(tenantCode, UUID.randomUUID().toString(),tag, data);        msg.setTopic(topic);        msg.setTags(tag);        msg.setBody(JSONObject.toJSONString(messageBody).getBytes(RemotingHelper.DEFAULT_CHARSET));        msg.setWaitStoreMsgOK(true);        msg.setFlag(0);        return msg;    }}



加上其它附加DTO,Properties相关类,study-datasource模块最终结构如下图所示:


然后就可以在study-service模块引入study-datasource包了

<dependency>            <groupId>com.jw</groupId>            <artifactId>study-datasource</artifactId>            <version>1.0.0-SNAPSHOT</version>        </dependency>接下来去请求用户查询接口,接口则会报错。因为我们没有指定要具体路由的租户数据源
Caused by: java.lang.IllegalStateException: Cannot determine target DataSource for lookup key [] at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.determineTargetDataSource(AbstractRoutingDataSource.java:232) at org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.getConnection(AbstractRoutingDataSource.java:194) at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:160) at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:118) at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)


那么如何来确定到底是属于哪个租户在使用系统呢?这就需要用到study_common库中的study_tenant表跟study_tenant_domain表来控制了。分别在表中新增如下数据:
INSERT INTO `study_common`.`study_tenant` (`id`, `tenant_name`, `tenant_code`) VALUES ('1', 'study_tenant1', 'study_tenant1');INSERT INTO `study_common`.`study_tenant` (`id`, `tenant_name`, `tenant_code`) VALUES ('2', 'study_tenant2', 'study_tenant2');INSERT INTO `study_common`.`study_tenant_domain` (`id`, `tenant_code`, `domain`) VALUES ('1', 'study_tenant1',  'http://0:0:0:0:0:0:0:1');INSERT INTO `study_common`.`study_tenant_domain` (`id`, `tenant_code`,  `domain`) VALUES ('2', 'study_tenant2', 'http://127.0.0.1')


03study-web新增拦截器

新增拦截器,拦截当前请求域名,根据域名查询当前所属哪个租户。此处为频繁查询,可把域名放置在内存中,避免给数据库造成压力


package com.jw.inteceptor;
import com.jw.DataSourceHolder;import com.jw.remote.TestRemote;import jakarta.annotation.Resource;import jakarta.servlet.http.HttpServletRequest;import jakarta.servlet.http.HttpServletResponse;import org.springframework.stereotype.Component;import org.springframework.web.servlet.HandlerInterceptor;

@Componentpublic class RequestInteceptor implements HandlerInterceptor {    @Resource    private TestRemote testRemote;    @Override    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {        HttpServletRequest httpServletRequest = (HttpServletRequest)request;        String host = request.getRemoteHost();        String domain = request.getScheme() + "://" + host;        String tenantCode = testRemote.getTenantCode(domain);        DataSourceHolder.push(tenantCode);        return true;    }
   @Override    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {        DataSourceHolder.clear();    }}
package com.jw.config;
import com.jw.inteceptor.RequestInteceptor;import jakarta.annotation.Resource;import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.InterceptorRegistry;import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configurationpublic class WebMvcConfig implements WebMvcConfigurer {    @Resource    private RequestInteceptor requestInteceptor;
   @Override    public void addInterceptors(InterceptorRegistry registry) {     registry .addInterceptor(this.requestInteceptor)            .addPathPatterns("/**")            .excludePathPatterns("/swagger-resources/**")             .excludePathPatterns( "/webjars/**")             .excludePathPatterns("/v2/**")             .excludePathPatterns("/**/getVersion")            .excludePathPatterns("/swagger-ui.html/**")     ;
}
   // 必须添加    @Override    public void addResourceHandlers(ResourceHandlerRegistry registry) {        registry.addResourceHandler("swagger-ui.html")                .addResourceLocations("classpath:/META-INF/resources/");        registry.addResourceHandler("/webjars/**")                .addResourceLocations("classpath:/META-INF/resources/webjars/");
   }}

新增feign请求拦截器,因为feign请求为http调用,线程变量是传递不过去的,所以通过拦截feign请求,把线程变量放在请求的header里面

package com.jw.inteceptor;

import com.jw.DataSourceHolder;import feign.RequestInterceptor;import feign.RequestTemplate;


public class FeignInterceptor implements RequestInterceptor{    @Override    public void apply(RequestTemplate requestTemplate) {        //获取到线程变量重新设置到线程变量中,防止中途丢失        String dbName = DataSourceHolder.peek();        requestTemplate.header("tenantCode",dbName);    }
}


package com.jw.config;
import com.jw.inteceptor.FeignInterceptor;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;
@Configurationpublic class FeignInteceptorConfig {
   @Bean    public FeignInterceptor feignInterceptor(){        return new FeignInterceptor();    }}


04study-service新增拦截器

新增拦截器,拦截当前请求,从header中取出在feign拦截器中设置的租户code

package com.jw.inteceptor;
import cn.hutool.core.util.StrUtil;
import com.jw.DataSourceHolder;import jakarta.servlet.http.HttpServletRequest;import jakarta.servlet.http.HttpServletResponse;import org.springframework.stereotype.Component;import org.springframework.web.servlet.HandlerInterceptor;
import java.util.Objects;
/*** @author :j* @date :Created in 2022/12/16 15:05* @description: */@Componentpublic class RequestInteceptor implements HandlerInterceptor {    @Override    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {        HttpServletRequest httpServletRequest = (HttpServletRequest)request;        String dbName = httpServletRequest.getHeader("tenantCode");        //测试用        if (StrUtil.isEmpty(dbName) || Objects.equals(dbName,"common")) {            dbName = "";        }        DataSourceHolder.push(dbName);        return true;    }
   @Override    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {        DataSourceHolder.clear();    }}}


package com.jw.config;import com.jw.inteceptor.RequestInteceptor;import jakarta.annotation.Resource;import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.InterceptorRegistry;import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;@Configurationpublic class WebMvcConfig implements WebMvcConfigurer {    @Resource    private RequestInteceptor requestInteceptor;    @Override    public void addInterceptors(InterceptorRegistry registry) {     registry .addInterceptor(this.requestInteceptor)            .addPathPatterns("/**")     ;}    @Override    public void addResourceHandlers(ResourceHandlerRegistry registry) {    }


经过这样配置后,通过测试可以发现,使用不同的域名请求到了不同租户数据库当中。在这整个过程中,原有业务代码是没有做任何改动的,都是新增的。

结尾

在这整个改造过程中,项目中使用的redis也是不需要做任何改动的。通过上面redis的两个配置类,在使用RedisTemplate时会自动的根据线程变量找到对应的redis。Rocketmq同理,也是通过在发送消息的时候带上线程的线程变量,然后再在消费监听的时候把当前消息传递过来的线程变量设置到当前监听的线程变量中。如此,这样改造下来,只需要在原有Rocketmq的发送及监听的基类上做些许改动其它不用做任何业务改动的前提下实现多租户系统。因为配置类及相关附属类有些多,相关源码文中没有放全。如有问题,您也可以在公众号下方留言。该改造方案经过生产实战验证可靠并且有效。

整理不易,喜欢点个关注吧,更多好文等你来分享



END
点击公众号发送"多租户"获取所有源码往期回顾



关于接口性能优化你不得不知道的事!
一行搞定!注解带你轻松实现redis分布式锁
【实用技巧】微信小程序循环中实现多个倒计时,让你轻松管理时间!
MySQL面试秘籍:如何吊打面试官并轻松拿下心仪职位!
微信小程序如何优雅的处理token失效
分布式系统必知必会:解析分布式系统遇到的一系列问题及解决方案
超全最新最实用AI工具集合192个,涉及多个领域,太牛逼了!!
redis导致生产事故,CTO:下次这样用别干了
[附源码]SpringBoot整合Sentinel限流及Sentinel-Dashboard持久化规则到Nacos
一文搞懂SpringBoot实现动态切换数据源
中小型分布式系统分布式ID生产实战
k8s图形化管理工具kuboard
超强大CICD平台Drone,看如何让系统自动化!
Sharding-jdbc在多数据源下的读写分离实现


继续滑动看下一个

万字长文!手把手教您轻松实现单租户到SaaS架构的升级!

老猿人 码农闲谈AI
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存