From a4b9d4a7bda3503c86fcd4387f30ea1256fc1e4d Mon Sep 17 00:00:00 2001 From: khalil Date: Fri, 10 Jan 2025 17:57:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B8=E8=BF=90=E7=A4=BC=E7=89=A9-mq?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E7=BB=93=E7=AE=97=EF=BC=8CLucky24Record?= =?UTF-8?q?=E5=8A=A0=E4=B8=8AroomUid=E5=92=8CmessId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/accompany/common/redis/RedisKey.java | 2 + .../sharding/model/Lucky24Record.java | 2 + .../business/message/Lucky24Message.java | 27 ++++ .../service/gift/Lucky24GiftSendService.java | 92 ++++++++++++- .../service/gift/Lucky24MessageService.java | 124 ++++++++++++++++++ .../gift/SuperLuckyGiftSendService.java | 6 + .../service/lucky/Lucky24RecordService.java | 23 ++++ .../lucky/Lucky24SettlementService.java | 7 +- .../business/service/mq/RocketMQService.java | 27 ++-- .../com/accompany/mq/constant/MqConstant.java | 3 + .../mq/listener/AbstractMessageListener.java | 4 +- .../mq/consumer/Lucky24MessageConsumer.java | 29 ++++ .../scheduler/task/luckyBag/Lucky24Task.java | 40 ++++++ 13 files changed, 364 insertions(+), 22 deletions(-) create mode 100644 accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/Lucky24Message.java create mode 100644 accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageConsumer.java diff --git a/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java b/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java index 29f9bcf24..90133a2e3 100644 --- a/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java +++ b/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java @@ -1355,6 +1355,8 @@ public enum RedisKey { lucky_24_user_history, lucky_24_user_lock, lucky_24_robot_push_msg, + lucky_24_status, // 礼物消息的状态 + lock_lucky_24_message, // 消费送礼物消息锁 family_diamond_settlement, diff --git a/accompany-base/accompany-sharding/accompany-sharding-sdk/src/main/java/com/accompany/sharding/model/Lucky24Record.java b/accompany-base/accompany-sharding/accompany-sharding-sdk/src/main/java/com/accompany/sharding/model/Lucky24Record.java index 1264c77dd..12db05f6a 100644 --- a/accompany-base/accompany-sharding/accompany-sharding-sdk/src/main/java/com/accompany/sharding/model/Lucky24Record.java +++ b/accompany-base/accompany-sharding/accompany-sharding-sdk/src/main/java/com/accompany/sharding/model/Lucky24Record.java @@ -16,6 +16,7 @@ public class Lucky24Record { private Integer partitionId; private Long uid; private Long receiverUid; + private Long roomUid; private Integer giftId; private Long giftGoldPrice; private Integer giftNum; @@ -25,4 +26,5 @@ public class Lucky24Record { private Integer afterMultiple; private Long winGoldNum; private Date createTime; + private String messId; } diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/Lucky24Message.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/Lucky24Message.java new file mode 100644 index 000000000..06b7a56d2 --- /dev/null +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/Lucky24Message.java @@ -0,0 +1,27 @@ +package com.accompany.business.message; + +import com.accompany.mq.model.BaseMqMessage; +import lombok.Data; + +import java.util.Date; + +/** + * 礼物消息 + */ +@Data +public class Lucky24Message extends BaseMqMessage { + private Integer partitionId; + private Long uid; + private Long receiverUid; + private Long roomUid; + private Integer giftId; + private Long giftGoldPrice; + private Integer giftNum; + private Integer poolId; + private Boolean isSupplement; + private Integer drawMultiple; + private Integer afterMultiple; + private Long winGoldNum; + private Long createTime; + private String messId; +} diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24GiftSendService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24GiftSendService.java index 13bf4a84a..103ca7137 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24GiftSendService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24GiftSendService.java @@ -3,28 +3,35 @@ package com.accompany.business.service.gift; import com.accompany.business.dto.lucky.Lucky24GiftConfig; import com.accompany.business.dto.lucky.Lucky24Result; import com.accompany.business.dto.lucky.SuperLuckyGiftIncomeAllot; +import com.accompany.business.message.GiftMessage; +import com.accompany.business.message.Lucky24Message; import com.accompany.business.model.Gift; +import com.accompany.business.model.UserPurse; import com.accompany.business.model.lucky.Lucky24Pool; import com.accompany.business.service.lucky.*; +import com.accompany.business.service.mq.RocketMQService; import com.accompany.common.constant.Constant; +import com.accompany.common.redis.RedisKey; import com.accompany.common.status.BusiStatus; import com.accompany.core.exception.ServiceException; import com.accompany.core.model.Room; import com.accompany.core.model.Users; import com.accompany.core.service.SysConfService; +import com.accompany.core.service.common.JedisService; +import com.accompany.core.util.DoubleUtil; +import com.accompany.sharding.model.Lucky24Record; import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RDeque; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.math.BigDecimal; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.OptionalDouble; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -47,11 +54,14 @@ public class Lucky24GiftSendService { private Lucky24SettlementService settlementService; @Autowired private Lucky24RobotMsgService robotMsgService; + @Autowired + private RocketMQService rocketMQService; + @Autowired + private JedisService jedisService; public void draw(long senderUid, Users sender, Integer partitionId, Room room, List receiveList, Gift gift, int everyGiftNum, Date sendGiftTime) { - int totalGiftNum = everyGiftNum * receiveList.size(); long everyoneGoldNum = everyGiftNum * gift.getGoldPrice(); Lucky24GiftConfig config = getConfig(); @@ -63,14 +73,82 @@ public class Lucky24GiftSendService { log.info("[lucky24] uid {}, partitionId {}, addStockGoldNum {}, afterStock {}", senderUid, partitionId, incomeAllot.getRemainValue(), afterStock); - Map winGoldNumMap = receiveList.parallelStream() + Map recordMap = receiveList.parallelStream() + .collect(Collectors.toMap(receiverUid-> receiverUid, + receiverUid-> drawMultiple2(config, partitionConfig, senderUid, sender, partitionId, gift, everyGiftNum, receiverUid, everyoneGoldNum, room, sendGiftTime))); + log.info("[lucky24] uid {}, totalWinGoldNum {}", senderUid, JSON.toJSONString(recordMap)); + + sendMq(recordMap); + + /*Map winGoldNumMap = receiveList.parallelStream() .collect(Collectors.toMap(receiverUid-> receiverUid, receiverUid-> drawMultiple(config, partitionConfig, senderUid, sender, partitionId, gift, everyGiftNum, receiverUid, everyoneGoldNum, room, sendGiftTime))); log.info("[lucky24] uid {}, totalWinGoldNUm {}", senderUid, JSON.toJSONString(winGoldNumMap)); - settlementService.settlement(config, senderUid, room, gift, everyGiftNum, totalGiftNum, winGoldNumMap, incomeAllot, sendGiftTime); + settlementService.settlement(config, senderUid, room, gift, everyGiftNum, totalGiftNum, winGoldNumMap, incomeAllot, sendGiftTime);*/ } + private void sendMq(Map recordMap) { + Map caches = new HashMap<>(recordMap.size()); + List messageList = new ArrayList<>(); + + DefaultIdentifierGenerator idGenerator = DefaultIdentifierGenerator.getInstance(); + + for (Lucky24Record record: recordMap.values()){ + String id = idGenerator.nextUUID(null); + + Lucky24Message message = new Lucky24Message(); + message.setMessId(id); + message.setPartitionId(record.getPartitionId()); + message.setUid(record.getUid()); + message.setReceiverUid(record.getReceiverUid()); + message.setRoomUid(record.getRoomUid()); + message.setGiftId(record.getGiftId()); + message.setGiftGoldPrice(record.getGiftGoldPrice()); + message.setGiftNum(record.getGiftNum()); + message.setPoolId(record.getPoolId()); + message.setIsSupplement(record.getIsSupplement()); + message.setDrawMultiple(record.getDrawMultiple()); + message.setAfterMultiple(record.getAfterMultiple()); + message.setWinGoldNum(record.getWinGoldNum()); + message.setCreateTime(record.getCreateTime().getTime()); + + messageList.add(message); + + caches.put(id, JSON.toJSONString(message)); + } + + jedisService.hwrite(RedisKey.lucky_24_status.getKey(), caches); + + rocketMQService.asyncSendBatchLucky24Message(messageList); + } + + public Lucky24Record drawMultiple2(Lucky24GiftConfig config, Lucky24GiftConfig partitionConfig, long senderUid, Users sender, int partitionId, + Gift gift, int giftNum, long receiverUid, long everyoneGoldNum, Room room, Date sendGiftTime) { + Lucky24Result drawResult = poolService.drawMultipleFromPool(config, senderUid, partitionId); + long curTimes = userMetaService.updateUserTimes(senderUid); + Long supplementMultiple = judgeSupplement(config, partitionConfig.getSupplement(), senderUid, receiverUid, curTimes, drawResult); + long drawMultiple = null != supplementMultiple? supplementMultiple: drawResult.getOutput(); + long afterMultiple = drawMultiple; + if (drawMultiple > 0){ + BigDecimal winGoldNum = BigDecimal.valueOf(everyoneGoldNum * drawMultiple); + BigDecimal afterStock = stockService.subStock(partitionId, winGoldNum); + if (afterStock.compareTo(BigDecimal.ZERO) < 0){ + log.info("[lucky24] drawMultiple sender {} receiver {} 产出大于库存 winGoldNum {} afterStock {}", + senderUid, receiverUid, winGoldNum, afterStock); + afterStock = stockService.addStock(partitionId, winGoldNum); + robotMsgService.pushStockNotEnough(partitionId, afterStock); + afterMultiple = 0; + } + } + long winGoldNum = afterMultiple * everyoneGoldNum; + userMetaService.updateUserMeta(config, senderUid, partitionId, curTimes, everyoneGoldNum, winGoldNum); + return recordService.buildRecord(senderUid, sender.getPartitionId(), gift, giftNum, null != room? room.getUid(): null, + receiverUid, drawResult.getPoolId(), null != supplementMultiple || Boolean.TRUE.equals(drawResult.getIsSupplement()), + drawMultiple, afterMultiple, sendGiftTime); + } + + @Deprecated private long drawMultiple(Lucky24GiftConfig config, Lucky24GiftConfig partitionConfig, long senderUid, Users sender, int partitionId, Gift gift, int giftNum, long receiverUid, long everyoneGoldNum, Room room, Date sendGiftTime) { Lucky24Result drawResult = poolService.drawMultipleFromPool(config, senderUid, partitionId); diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java new file mode 100644 index 000000000..3f17b9813 --- /dev/null +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java @@ -0,0 +1,124 @@ +package com.accompany.business.service.gift; + +import com.accompany.business.dto.lucky.Lucky24GiftConfig; +import com.accompany.business.dto.lucky.SuperLuckyGiftIncomeAllot; +import com.accompany.business.message.Lucky24Message; +import com.accompany.business.model.Gift; +import com.accompany.business.service.lucky.Lucky24IncomeAllotService; +import com.accompany.business.service.lucky.Lucky24RecordService; +import com.accompany.business.service.lucky.Lucky24RobotMsgService; +import com.accompany.business.service.lucky.Lucky24SettlementService; +import com.accompany.business.service.room.RoomService; +import com.accompany.business.service.user.UsersService; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.model.Room; +import com.accompany.core.model.Users; +import com.accompany.core.service.base.BaseService; +import com.accompany.sharding.model.Lucky24Record; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.Date; + +@Slf4j +@Service +public class Lucky24MessageService extends BaseService { + + @Autowired + private Lucky24RecordService recordService; + @Autowired + private Lucky24IncomeAllotService incomeAllotService; + @Autowired + private SuperLuckyGiftSendService superLuckyGiftSendService; + @Autowired + private Lucky24SettlementService settlementService; + @Autowired + private Lucky24RobotMsgService robotMsgService; + @Autowired + private Lucky24GiftSendService sendService; + @Autowired + private RoomService roomService; + @Autowired + private GiftService giftService; + @Autowired + private UsersService usersService; + + public void handleGiftMessage(Lucky24Message giftMessage) { + // 防止消息被重复消费 + if (!jedisLockService.isExist(RedisKey.lock_lucky_24_message.getKey(giftMessage.getMessId()), 30)) { + logger.warn("handleLucky24Message giftMessage had handle, mess: " + giftMessage); + return; + } + + if (!jedisService.hexists(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId())){ + logger.warn("handleLucky24Message giftMessage had handle, mess: " + giftMessage); + return; + } + + logger.info("【处理lucky24 mq】 开始处理 giftMessage: {}", JSON.toJSONString(giftMessage)); + + Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null; + Gift gift = giftService.getGiftById(giftMessage.getGiftId()); + Date createTime = new Date(giftMessage.getCreateTime()); + + Lucky24Record record = insertRecord(giftMessage); + + log.info("【处理lucky24 mq】 record 插入成功 messId:{} recordId:{} record:{}", + giftMessage.getMessId(), record.getId(), JSON.toJSONString(record)); + + // 收礼者收益 + Lucky24GiftConfig config = sendService.getConfig(); + Lucky24GiftConfig partitionConfig = config.getRatioByPartitionId(giftMessage.getPartitionId()); + SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid())); + superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime); + + logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot)); + + if (record.getWinGoldNum() > 0L){ + settlementService.syncSendReward(partitionConfig, record.getUid(), room, gift, record.getWinGoldNum(), record.getAfterMultiple()); + logger.info("【处理lucky24 mq】 送礼收益已发放 messId: {} senderUid: {} winGoldNum: {} afterMultiple: {}", + giftMessage.getMessId(), record.getUid(), record.getWinGoldNum(), record.getAfterMultiple()); + } + + // 后面都是异步发消息 + if (record.getAfterMultiple() >= config.getWarnMulti()){ + long totalGoldNum = giftMessage.getGiftNum() * giftMessage.getGiftGoldPrice(); + robotMsgService.pushSuperMulti(record.getUid(), record.getReceiverUid(), record.getAfterMultiple(), totalGoldNum, record.getWinGoldNum(), + record.getRoomUid()); + } + + Users u = usersService.getNotNullUsersByUid(record.getUid()); + if (null != u && config.getFollowErbanNoList().contains(u.getErbanNo())){ + robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid()); + } + + // 删除该标识,表示消息已经消费过 + jedisService.hdel(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId()); + } + + private Lucky24Record insertRecord(Lucky24Message giftMessage) { + Lucky24Record record = new Lucky24Record(); + record.setMessId(giftMessage.getMessId()); + record.setPartitionId(giftMessage.getPartitionId()); + record.setUid(giftMessage.getUid()); + record.setReceiverUid(giftMessage.getReceiverUid()); + record.setRoomUid(giftMessage.getRoomUid()); + record.setGiftId(giftMessage.getGiftId()); + record.setGiftGoldPrice(giftMessage.getGiftGoldPrice()); + record.setGiftNum(giftMessage.getGiftNum()); + record.setPoolId(giftMessage.getPoolId()); + record.setIsSupplement(giftMessage.getIsSupplement()); + record.setDrawMultiple(giftMessage.getDrawMultiple()); + record.setAfterMultiple(giftMessage.getAfterMultiple()); + record.setWinGoldNum(giftMessage.getWinGoldNum()); + record.setCreateTime(new Date(giftMessage.getCreateTime())); + + recordService.insertRecord(record); + + return record; + } + +} diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/SuperLuckyGiftSendService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/SuperLuckyGiftSendService.java index 147806843..ff53352ed 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/SuperLuckyGiftSendService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/SuperLuckyGiftSendService.java @@ -177,6 +177,12 @@ public class SuperLuckyGiftSendService { room, incomeAllot, sendGiftTime); } + public void syncSettlement(Long senderUid, Gift gift, int everyGiftNum, int totalGiftNum, + Room room, SuperLuckyGiftIncomeAllot incomeAllot, Date sendGiftTime){ + settlement(senderUid, gift, everyGiftNum, totalGiftNum, room, incomeAllot, sendGiftTime); + } + + @Async //@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager") public void settlement(Long senderUid, Gift gift, int everyGiftNum, int totalGiftNum, diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/lucky/Lucky24RecordService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/lucky/Lucky24RecordService.java index 2a726a386..d100cd5ba 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/lucky/Lucky24RecordService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/lucky/Lucky24RecordService.java @@ -33,6 +33,25 @@ public class Lucky24RecordService extends ServiceImpl= config.getSpecialTipMulti() ? 2 : 1); + superLuckyGiftSendService.sendTip(senderUid, room, BigDecimal.valueOf(winGoldNum), BigDecimal.valueOf(afterMultiple), + afterMultiple >= config.getSpecialTipMulti() ? 2 : 1); if (winGoldNum >= config.getAllRoomChatToastValue()){ superLuckyGiftSendService.sendAllRoomScreen(senderUid, room, gift, BigDecimal.valueOf(winGoldNum)); diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java index b8fada2b2..8a9f7bc71 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java @@ -30,18 +30,6 @@ public class RocketMQService { @Autowired private MQMessageProducer mqMessageProducer; - /** - * 送礼物消息,发送到MQ - * - * @param giftMessage - */ - public void sendGiftMessage(GiftMessage giftMessage) { - mqMessageProducer.send(MqConstant.GIFT_TOPIC, giftMessage, sendResult -> log.info("sendGiftMessage success message: {}", JSON.toJSONString(giftMessage)), throwable -> { - log.error("sendGiftMessage fail message: {}", JSON.toJSONString(giftMessage), throwable); - //SpringContextHolder.getBean(GiftMessageMQListener.class).onMessage(giftMessage); - }); - } - /** * 送礼物消息,发送到MQ * @@ -59,6 +47,21 @@ public class RocketMQService { } } + /** + * 送礼物消息,发送到MQ + * + * @param lucky24Messages + */ + public void asyncSendBatchLucky24Message(Collection lucky24Messages) { + for (Lucky24Message lucky24Message : lucky24Messages){ + mqMessageProducer.sendOrderly(MqConstant.LUCKY_24_TOPIC, lucky24Message, lucky24Message.getReceiverUid().toString(), + sendResult -> log.info("sendLucky24Message success message: {} queue {}", JSON.toJSONString(lucky24Message), + sendResult.getMessageQueue().getQueueId()), throwable -> { + log.error("sendLucky24Message fail message: {}", JSON.toJSONString(lucky24Message), throwable); + }); + } + } + /** * 发送开箱子中奖消息 */ diff --git a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java index db6891a00..b52a78bd7 100644 --- a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java +++ b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java @@ -32,4 +32,7 @@ public interface MqConstant { String ACT_TASK_REWARD_CONSUME_GROUP = "act_task_reward_consume_group"; + String LUCKY_24_TOPIC = "lucky_24_topic"; + String LUCKY_24_CONSUME_GROUP = "lucky_24_group"; + } diff --git a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java index b62c80571..d6d188dc3 100644 --- a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java +++ b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java @@ -32,8 +32,8 @@ public abstract class AbstractMessageListener implement @Override public void onMessage(String message) { try { - log.info("====mq message start===="); - log.info("text message : {}", message); + //log.info("====mq message start===="); + //log.info("text message : {}", message); if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) { return; } diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageConsumer.java new file mode 100644 index 000000000..92b06552b --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageConsumer.java @@ -0,0 +1,29 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.Lucky24Message; +import com.accompany.business.service.gift.Lucky24MessageService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.LUCKY_24_TOPIC, consumerGroup = MqConstant.LUCKY_24_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY) +public class Lucky24MessageConsumer extends AbstractMessageListener { + + @Autowired + private Lucky24MessageService lucky24MessageService; + + @Override + public void onMessage(Lucky24Message giftMessage) { + log.info("onMessage lucky24Message: {}", giftMessage.toString()); + lucky24MessageService.handleGiftMessage(giftMessage); + } + +} diff --git a/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java b/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java index aff7202f4..eab55c516 100644 --- a/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java +++ b/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java @@ -1,9 +1,15 @@ package com.accompany.scheduler.task.luckyBag; +import com.accompany.business.message.GiftMessage; +import com.accompany.business.message.Lucky24Message; +import com.accompany.business.service.gift.Lucky24MessageService; import com.accompany.business.service.lucky.Lucky24RecordService; +import com.accompany.common.redis.RedisKey; import com.accompany.common.utils.DateTimeUtil; import com.accompany.core.model.PartitionInfo; +import com.accompany.core.service.common.JedisService; import com.accompany.core.service.partition.PartitionInfoService; +import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -15,6 +21,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; @Component @@ -28,6 +35,39 @@ public class Lucky24Task { @Resource(name = "bizExecutor") private ThreadPoolExecutor bizExecutor; + @Autowired + private JedisService jedisService; + @Autowired + private Lucky24MessageService lucky24MessageService; + + /** + * 重新消费队列的消息 + */ + @Scheduled(cron = "0 */5 * * * ?") + public void retryLucky24Queue() { + log.info("retryLucky24Queue start ..."); + Map map = jedisService.hgetAll(RedisKey.lucky_24_status.getKey()); + if (map == null || map.size() == 0) { + return; + } + long curTime = System.currentTimeMillis(); + long gapTime = 1000 * 60 * 10; // 十分钟内没被消费 + + map.entrySet().parallelStream().forEach(entry -> { + try { + String messId = entry.getKey(); + String val = entry.getValue(); + Lucky24Message giftMessage = JSON.parseObject(val, Lucky24Message.class); + if (curTime - giftMessage.getCreateTime() > gapTime) { + lucky24MessageService.handleGiftMessage(giftMessage); + } + } catch (Exception e) { + log.error("retryLucky24Queue error", e); + } + }); + log.info("retryLucky24Queue end ..."); + } + @Scheduled(cron = "0 2 * * * ? ") public void lucky24RecordStat() { Date now = new Date();