其他
初创公司5大Java服务困局,阿里工程师如何打破?
// 抢取订单函数
public synchronized void grabOrder(Long orderId, Long userId) {
// 获取订单信息
OrderDO order = orderDAO.get(orderId);
if (Objects.isNull(order)) {
throw new BizRuntimeException(String.format("订单(%s)不存在", orderId));
}
// 检查订单状态
if (!Objects.equals(order.getStatus, OrderStatus.WAITING_TO_GRAB.getValue())) {
throw new BizRuntimeException(String.format("订单(%s)已被抢", orderId));
}
// 设置订单被抢
orderDAO.setGrabed(orderId, userId);
}
// 抢取订单函数
public void grabOrder(Long orderId, Long userId) {
Long lockId = orderDistributedLock.lock(orderId);
try {
grabOrderWithoutLock(orderId, userId);
} finally {
orderDistributedLock.unlock(orderId, lockId);
}
}
// 不带锁的抢取订单函数
private void grabOrderWithoutLock(Long orderId, Long userId) {
// 获取订单信息
OrderDO order = orderDAO.get(orderId);
if (Objects.isNull(order)) {
throw new BizRuntimeException(String.format("订单(%s)不存在", orderId));
}
// 检查订单状态
if (!Objects.equals(order.getStatus, OrderStatus.WAITING_TO_GRAB.getValue())) {
throw new BizRuntimeException(String.format("订单(%s)已被抢", orderId));
}
// 设置订单被抢
orderDAO.setGrabed(orderId, userId);
}
可靠性、高容错性:一台服务器的崩溃,不会影响其它服务器,其它服务器仍能提供服务。 可扩展性:如果系统服务能力不足,可以水平扩展更多服务器。 灵活性:可以很容易的安装、实施、扩容和升级系统。 性能高:拥有多台服务器的计算能力,比单台服务器处理速度更快。 性价比高:分布式系统对服务器硬件要求很低,可以选用廉价服务器搭建分布式集群,从而得到更好的性价比。
排查难度高:由于系统分布在多台服务器上,故障排查和问题诊断难度较高。 软件支持少:分布式系统解决方案的软件支持较少。 建设成本高:需要多台服务器搭建分布式系统。
基于数据库实现的分布式锁; 基于Redis实现的分布式锁; 基于Zookeeper实现的分布式锁。
// 登录函数(示意写法)
public UserVO login(String phoneNumber, String verifyCode) {
// 检查验证码
if (!checkVerifyCode(phoneNumber, verifyCode)) {
throw new ExampleException("验证码错误");
}
// 检查用户存在
UserDO user = userDAO.getByPhoneNumber(phoneNumber);
if (Objects.nonNull(user)) {
return transUser(user);
}
// 创建新用户
return createNewUser(user);
}
// 创建新用户函数
private UserVO createNewUser(String phoneNumber) {
// 创建新用户
UserDO user = new UserDO();
...
userDAO.insert(user);
// 绑定优惠券
couponService.bindCoupon(user.getId(), CouponType.NEW_USER);
// 返回新用户
return transUser(user);
}
// 创建新用户函数
private UserVO createNewUser(String phoneNumber) {
// 创建新用户
UserDO user = new UserDO();
...
userDAO.insert(user);
// 绑定优惠券
executorService.execute(()->couponService.bindCoupon(user.getId(), CouponType.NEW_USER));
// 返回新用户
return transUser(user);
}
// 创建新用户函数
private UserVO createNewUser(String phoneNumber) {
// 创建新用户
UserDO user = new UserDO();
...
userDAO.insert(user);
// 发送优惠券消息
Long userId = user.getId();
CouponMessageDataVO data = new CouponMessageDataVO();
data.setUserId(userId);
data.setCouponType(CouponType.NEW_USER);
Message message = new Message(TOPIC, TAG, userId, JSON.toJSONBytes(data));
SendResult result = metaqTemplate.sendMessage(message);
if (!Objects.equals(result, SendStatus.SEND_OK)) {
log.error("发送用户({})绑定优惠券消息失败:{}", userId, JSON.toJSONString(result));
}
// 返回新用户
return transUser(user);
}
// 优惠券服务类
@Slf4j
@Service
public class CouponService extends DefaultMessageListener<String> {
// 消息处理函数
@Override
@Transactional(rollbackFor = Exception.class)
public void onReceiveMessages(MetaqMessage<String> message) {
// 获取消息体
String body = message.getBody();
if (StringUtils.isBlank(body)) {
log.warn("获取消息({})体为空", message.getId());
return;
}
// 解析消息数据
CouponMessageDataVO data = JSON.parseObject(body, CouponMessageDataVO.class);
if (Objects.isNull(data)) {
log.warn("解析消息({})体为空", message.getId());
return;
}
// 绑定优惠券
bindCoupon(data.getUserId(), data.getCouponType());
}
}
如果系统发生重启或崩溃,导致消息处理函数执行失败,不会确认消息已消费;由于MetaQ支持多服务订阅同一队列,该消息可以转到别的服务进行消费,亦或等到本服务恢复正常后再进行消费。 消费者可多服务、多线程进行消费消息,即便消息处理时间较长,也不容易引起消息积压;即便引起消息积压,也可以通过扩充服务实例的方式解决。 如果需要重新消费该消息,只需要在MetaQ管理平台上点击"消息验证"即可。
/** 完成采购动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
public void finishPurchase(PurchaseOrder order) {
// 完成相关处理
......
// 回流采购单(调用HTTP接口)
backflowPurchaseOrder(order);
// 设置完成状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.FINISHED.getValue());
}
该函数可能耗费时间较长,导致完成采购接口成为慢接口; 该函数可能失败抛出异常,导致客户调用完成采购接口失败。
/** 完成采购动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
public void finishPurchase(PurchaseOrder order) {
// 完成相关处理
......
// 设置完成状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.FINISHED.getValue());
}
/** 执行回流动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
public void executeBackflow(PurchaseOrder order) {
// 回流采购单(调用HTTP接口)
backflowPurchaseOrder(order);
// 设置回流状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.BACKFLOWED.getValue());
}
状态必须是一个持久状态,而不能是一个临时状态; 终结状态不能是中间状态,不能继续进行流程流转; 状态划分合理,不要把多个状态强制合并为一个状态; 状态尽量精简,同一状态的不同情况可以用其它字段表示。
每个动作执行前,必须检查当前状态和触发动作状态的一致性; 状态机的状态更改,只能通过动作进行,其它操作都是不符合规范的; 需要添加分布式锁保证动作的原子性,添加数据库事务保证数据的一致性; 类似的动作(比如操作用户、请求参数、动作含义等)可以合并为一个动作,并根据动作执行结果转向不同的状态。
/** 执行回流动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
public void executeBackflow(PurchaseOrder order) {
// 完成原始采购单
rawPurchaseOrderDAO.setStatus(order.getRawId(), RawPurchaseOrderStatus.FINISHED.getValue());
// 设置回流状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.BACKFLOWED.getValue());
}
直接暴露数据库表,容易产生数据安全问题; 多个系统操作同一数据库表,容易造成数据库表数据混乱; 操作同一个数据库表的代码,分布在不同的系统中,不便于管理和维护; 具有数据库表这样的强关联,无法实现系统间的隔离和解耦。
/** 采购单服务接口 */
public interface PurchaseOrderService {
/** 完成采购单函数 */
public void finishPurchaseOrder(Long orderId);
}
/** 采购单服务实现 */
@Service("purchaseOrderService")
public class PurchaseOrderServiceImpl implements PurchaseOrderService {
/** 完成采购单函数 */
@Override
@Transactional(rollbackFor = Exception.class)
public void finishPurchaseOrder(Long orderId) {
// 相关处理
...
// 完成采购单
purchaseOrderService.finishPurchaseOrder(order.getRawId());
}
}
/** 执行回流动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
public void executeBackflow(PurchaseOrder order) {
// 完成采购单
purchaseOrderService.finishPurchaseOrder(order.getRawId());
// 设置回流状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.BACKFLOWED.getValue());
}
HTTP/HTTPS接口; WebService接口; Dubbo/HSF接口; CORBA接口。
MetaQ的消息通知; CORBA消息通知。
/** 订单DAO接口 */
public interface OrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day)")
public List<OrderDO> queryTimeout();
}
/** 订单服务接口 */
public interface OrderService {
/** 查询过期订单函数 */
public List<OrderVO> queryTimeout();
}
数据量太大,导致服务端的内存溢出; 数据量太大,导致查询接口超时、返回数据超时等; 数据量太大,导致客户端的内存溢出。
/** 订单DAO接口 */
public interface OrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit 0, #{maxCount}")
public List<OrderDO> queryTimeout(@Param("maxCount") Integer maxCount);
}
/** 订单服务接口 */
public interface OrderService {
/** 查询过期订单函数 */
public List<OrderVO> queryTimeout(Integer maxCount);
}
/** 订单DAO接口 */
public interface OrderDAO {
/** 统计过期订单函数 */
@Select("select count(*) from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day)")
public Long countTimeout();
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit #{startIndex}, #{pageSize}")
public List<OrderDO> queryTimeout(@Param("startIndex") Long startIndex, @Param("pageSize") Integer pageSize);
}
/** 订单服务接口 */
public interface OrderService {
/** 查询过期订单函数 */
public PageData<OrderVO> queryTimeout(Long startIndex, Integer pageSize);
}
/** 订单DAO接口 */
public interface OrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit #{startIndex}, #{pageSize}")
public List<OrderDO> queryTimeout(@Param("startIndex") Long startIndex, @Param("pageSize") Integer pageSize);
/** 设置订单超时关闭 */
@Update("update t_order set status = 10 where id = #{orderId} and status = 5")
public Long setTimeoutClosed(@Param("orderId") Long orderId)
}
/** 关闭过期订单作业类 */
public class CloseTimeoutOrderJob extends Job {
/** 分页数量 */
private static final int PAGE_COUNT = 100;
/** 分页大小 */
private static final int PAGE_SIZE = 1000;
/** 作业执行函数 */
@Override
public void execute() {
for (int i = 0; i < PAGE_COUNT; i++) {
// 查询处理订单
List<OrderDO> orderList = orderDAO.queryTimeout(i * PAGE_COUNT, PAGE_SIZE);
for (OrderDO order : orderList) {
// 进行超时关闭
......
orderDAO.setTimeoutClosed(order.getId());
}
// 检查处理完毕
if(orderList.size() < PAGE_SIZE) {
break;
}
}
}
}
/** 订单DAO接口 */
public interface OrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit 0, #{maxCount}")
public List<OrderDO> queryTimeout(@Param("maxCount") Integer maxCount);
/** 设置订单超时关闭 */
@Update("update t_order set status = 10 where id = #{orderId} and status = 5")
public Long setTimeoutClosed(@Param("orderId") Long orderId)
}
/** 关闭过期订单作业(定时作业) */
public class CloseTimeoutOrderJob extends Job {
/** 分页数量 */
private static final int PAGE_COUNT = 100;
/** 分页大小 */
private static final int PAGE_SIZE = 1000;
/** 作业执行函数 */
@Override
public void execute() {
for (int i = 0; i < PAGE_COUNT; i++) {
// 查询处理订单
List<OrderDO> orderList = orderDAO.queryTimeout(PAGE_SIZE);
for (OrderDO order : orderList) {
// 进行超时关闭
......
orderDAO.setTimeoutClosed(order.getId());
}
// 检查处理完毕
if(orderList.size() < PAGE_SIZE) {
break;
}
}
}
}
微信后台回复“11”满足你的好奇和想象还有双11干货集锦大放送