diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/GiftSendService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/GiftSendService.java index 36afd1088..4aca72fdc 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/GiftSendService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/GiftSendService.java @@ -182,22 +182,23 @@ public class GiftSendService extends BaseService { * * @param sendUid * @param giftId - * @param goldNum * @param giftNum * @param giftSource * @return */ - private UserPurse reduceStockV5(long sendUid, int partitionId, int giftId, Long goldNum, int giftNum, int giftSource, BillObjTypeEnum objTypeEnum) { + private UserPurse reduceStockV5(long sendUid, int partitionId, int giftId, + Long[] receiverList, Long everyGoldNum, Long totalGoldNum, + int giftNum, int giftSource, BillObjTypeEnum objTypeEnum) { if (giftSource == Constant.GiftSource.COMMON || giftSource == Constant.GiftSource.ROOMPHOTO) { - if (goldNum <= 0) { + if (totalGoldNum <= 0) { throw new ServiceException(BusiStatus.PARAMETERILLEGAL); } if (giftSendConsumeGoldService.getGiftSendGoldConsumeSwitch(partitionId)){ - return giftSendConsumeGoldService.exchangeGoldToSend(sendUid, goldNum); + return giftSendConsumeGoldService.exchangeGoldToSend(sendUid, totalGoldNum); } else { //账单等mq //因为外层有事务 - return userPurseService.subDiamondWithoutTx(sendUid, goldNum.doubleValue(), objTypeEnum, BusiStatus.PURSEMONEYNOTENOUGH, UserPurse::getDiamonds); + return userPurseService.subDiamond(sendUid, totalGoldNum.doubleValue(), objTypeEnum, BusiStatus.PURSEMONEYNOTENOUGH, UserPurse::getDiamonds); } } else if (giftSource == Constant.GiftSource.BACKPACK) { @@ -506,7 +507,8 @@ public class GiftSendService extends BaseService { int giftSource, byte sendType, Date sendGiftTime) { //扣 BillObjTypeEnum objTypeEnum = null != room? BillObjTypeEnum.GIFT_ROOM_PAY: BillObjTypeEnum.GIFT_PERSON_PAY; - UserPurse after = reduceStockV5(sender.getUid(), sender.getPartitionId(), gift.getGiftId(), totalGoldNum, totalGiftNum, giftSource, objTypeEnum); + UserPurse after = reduceStockV5(sender.getUid(), sender.getPartitionId(), gift.getGiftId(), + recvUids, everyGoldNum, totalGoldNum, totalGiftNum, giftSource, objTypeEnum); // if (gift.getGiftType() == Constant.GiftType.LUCKY_BAG || gift.getGiftType() == Constant.GiftType.LUCKY_BAG_LINEAR) { @@ -521,7 +523,7 @@ public class GiftSendService extends BaseService { Map caches = giftMessages.stream().collect(Collectors.toMap(GiftMessage::getMessId, JSON::toJSONString)); jedisService.hwrite(RedisKey.mq_gift_status.getKey(), caches); - rocketMQService.sendBatchGiftMessage(caches.values()); + rocketMQService.sendBatchGiftMessage(giftMessages); } private void sendMq(Users sender, Gift gift, Long[] recvUids, Room room, Integer everyGiftNum, Long everyGoldNum, diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java index 391ca917c..0317a24a5 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java @@ -47,11 +47,13 @@ public class RocketMQService { * * @param giftMessages */ - public void sendBatchGiftMessage(Collection giftMessages) { + public void sendBatchGiftMessage(List giftMessages) { + Long uid = giftMessages.get(0).getRecvUid(); + List> messageList = giftMessages.stream() - .map(giftMessage -> MessageBuilder.withPayload(giftMessage).build()) + .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .collect(Collectors.toList()); - SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.GIFT_TOPIC, messageList); + SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConstant.GIFT_TOPIC, messageList, uid.toString()); if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList); } else { diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/purse/UserPurseService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/purse/UserPurseService.java index 7388a6f69..61eca5fd5 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/purse/UserPurseService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/purse/UserPurseService.java @@ -147,12 +147,6 @@ public class UserPurseService extends ServiceImpl { return SpringContextHolder.getBean(UserPurseService.class).subDiamond(uid, diamondNum, objTypeEnum, BusiStatus.PURSEMONEYNOTENOUGH, billConsumer); } - @Frozen - //@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager") - public UserPurse subDiamondWithoutTx(Long uid, Double diamondNum, BillObjTypeEnum objTypeEnum, BusiStatus busiStatus, Consumer billConsumer) { - return subDiamond(uid, diamondNum, objTypeEnum, busiStatus, billConsumer); - } - @Frozen @Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager") public UserPurse subDiamond(Long uid, Double diamondNum, BillObjTypeEnum objTypeEnum, BusiStatus busiStatus, Consumer billConsumer) { diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java index 9c8069332..5ca27f732 100644 --- a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java @@ -5,6 +5,7 @@ import com.accompany.business.service.gift.GiftMessageService; import com.accompany.mq.constant.MqConstant; import com.accompany.mq.listener.AbstractMessageListener; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -13,7 +14,8 @@ import org.springframework.stereotype.Component; @Slf4j @Component @ConditionalOnProperty(name = "spring.application.name", havingValue = "web") -@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP) +//@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP) +@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY) public class GiftMessageConsumer extends AbstractMessageListener { @Autowired