From e5209260979d8b4249a00639765e8b405011c679 Mon Sep 17 00:00:00 2001 From: khalil <842328916@qq.com> Date: Thu, 11 Sep 2025 14:01:41 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B4=A6=E5=8D=95-=E9=9B=AA=E8=8A=B1=E4=B8=BB?= =?UTF-8?q?=E9=94=AE-mq-=E5=90=88=E5=B9=B6topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/accompany/common/redis/RedisKey.java | 1 - .../service/gift/BillMessageService.java | 66 ------------------- .../business/service/mq/RocketMQService.java | 2 +- .../com/accompany/mq/constant/MqConstant.java | 3 - .../mq/consumer/BillMessageV2Consumer.java | 28 -------- .../accompany/scheduler/task/BillTask.java | 4 +- 6 files changed, 3 insertions(+), 101 deletions(-) delete mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/BillMessageV2Consumer.java diff --git a/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java b/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java index 85268aae7..6e05b35b1 100644 --- a/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java +++ b/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java @@ -1390,7 +1390,6 @@ public enum RedisKey { charge_floating_count, //充值飘屏计数器 bill_record_message, - lock_bill_record_message, //小游戏重复订单 diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/BillMessageService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/BillMessageService.java index bbcd9011c..2b9d5392c 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/BillMessageService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/BillMessageService.java @@ -4,8 +4,6 @@ import com.accompany.business.message.BillMessage; import com.accompany.business.service.mq.RocketMQService; import com.accompany.business.service.user.UsersService; import com.accompany.common.redis.RedisKey; -import com.accompany.common.status.BusiStatus; -import com.accompany.core.exception.ServiceException; import com.accompany.core.model.Users; import com.accompany.sharding.mapper.BillRecordMapper; import com.accompany.sharding.model.BillRecord; @@ -13,21 +11,13 @@ import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RLock; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - @Slf4j @Service public class BillMessageService implements InitializingBean { @@ -59,39 +49,6 @@ public class BillMessageService implements InitializingBean { }; public void handleBillMessage(BillMessage billMessage) { - // 防止消息被重复消费 - boolean locked = false; - RLock lock = recordMessMap.getLock(RedisKey.lock_bill_record_message.getKey(billMessage.getMessId())); - - try { - locked = lock.tryLock(15, 5, TimeUnit.SECONDS); - if (!locked) { - log.warn("handleBillMessage billMessage had handle, mess: " + billMessage); - return; - } - - if (!recordMessMap.containsKey(billMessage.getMessId())){ - log.warn("handleBillMessage billMessage had handle, mess: " + billMessage); - return; - } - - BillRecord billRecord = insertBillRecord(billMessage); - log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}", - billRecord.getBillId(), billMessage.getMessId(), JSON.toJSONString(billMessage)); - - recordMessMap.fastRemove(billMessage.getMessId()); - - } catch (InterruptedException e) { - log.error("handleBillMessage mess {} 持有所异常 ", JSON.toJSONString(billMessage), e); - throw new RuntimeException(e); - } finally { - if (locked) { - lock.unlock(); - } - } - } - - public void handleBillMessageV2(BillMessage billMessage) { BillRecord billRecord = insertBillRecordIgnore(billMessage); log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}", @@ -100,29 +57,6 @@ public class BillMessageService implements InitializingBean { recordMessMap.fastRemove(billMessage.getMessId()); } - private BillRecord insertBillRecord(BillMessage billMessage) { - BillRecord billRecord = new BillRecord(); - BeanUtils.copyProperties(billMessage, billRecord); - - Users u = usersService.getUsersByUid(billMessage.getUid()); - if (null != u){ - billRecord.setPartitionId(u.getPartitionId()); - } - - for (int i = 0; i < 3; i++) { - try { - billRecord.setBillId(null); - billRecordMapper.insert(billRecord); - return billRecord; - } catch (DuplicateKeyException ignore){ - log.error("[insertBillRecord] 插入账单失败", ignore); - } - } - - log.error(String.format("[insertBillRecord] 插入账单3次都失败 %s", JSON.toJSONString(billRecord))); - throw new ServiceException(BusiStatus.SERVERBUSY); - } - private BillRecord insertBillRecordIgnore(BillMessage billMessage) { long startTime = System.currentTimeMillis(); long copyPropertiesTime = 0; diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java index 11bb323ec..0a91641bf 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java @@ -29,7 +29,7 @@ public class RocketMQService { * 送消息,发送到MQ */ public void sendBillRecordMessage(BillMessage billMessage) { - mqMessageProducer.send(MqConstant.BILL_RECORD_V2_TOPIC, billMessage, + mqMessageProducer.send(MqConstant.BILL_RECORD_TOPIC, billMessage, sendResult -> log.info("sendBillRecordMessage success message: {} queue {}", JSON.toJSONString(billMessage), sendResult.getMessageQueue().getQueueId()), throwable -> log.error("sendBillRecordMessage fail message: {}", JSON.toJSONString(billMessage), throwable)); } diff --git a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java index f66014730..0ffc9f161 100644 --- a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java +++ b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java @@ -35,9 +35,6 @@ public interface MqConstant { String BILL_RECORD_TOPIC = "bill_record_topic"; String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group"; - String BILL_RECORD_V2_TOPIC = "bill_record_v2_topic"; - String BILL_RECORD_V2_CONSUME_GROUP = "bill_record_v2_consume_group"; - String BRAVO_TOPIC = "bravo_topic"; String BRAVO_CONSUME_GROUP = "bravo_consume_group"; diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/BillMessageV2Consumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/BillMessageV2Consumer.java deleted file mode 100644 index 65f2a6386..000000000 --- a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/BillMessageV2Consumer.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.accompany.mq.consumer; - -import com.accompany.business.message.BillMessage; -import com.accompany.business.service.gift.BillMessageService; -import com.accompany.mq.constant.MqConstant; -import com.accompany.mq.listener.AbstractMessageListener; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") -@RocketMQMessageListener(topic = MqConstant.BILL_RECORD_V2_TOPIC, consumerGroup = MqConstant.BILL_RECORD_V2_CONSUME_GROUP) -public class BillMessageV2Consumer extends AbstractMessageListener { - - @Autowired - private BillMessageService billMessageService; - - @Override - public void onMessage(BillMessage billMessage) { - log.info("onMessage billMessage v2: {}", billMessage.toString()); - billMessageService.handleBillMessageV2(billMessage); - } - -} diff --git a/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/BillTask.java b/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/BillTask.java index 689f7af6d..10e204859 100644 --- a/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/BillTask.java +++ b/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/BillTask.java @@ -24,7 +24,7 @@ public class BillTask extends BaseTask { @Scheduled(cron = "0 */5 * * * ?") public void retryBillQueue() { log.info("retryBillQueue start ..."); - Map map = billMessageService.getRecordMessMap().randomEntries(20000); + Map map = billMessageService.getRecordMessMap().randomEntries(10000); if (CollectionUtils.isEmpty(map)) { return; } @@ -36,7 +36,7 @@ public class BillTask extends BaseTask { String messId = entry.getKey(); BillMessage val = entry.getValue(); if (curTime - val.getCreateTime().getTime() > gapTime) { - billMessageService.handleBillMessageV2(val); + billMessageService.handleBillMessage(val); } } catch (Exception e) { log.error("retryBillQueue error", e);