送礼-gift mq改顺序
This commit is contained in:
@@ -182,22 +182,23 @@ public class GiftSendService extends BaseService {
|
||||
*
|
||||
* @param sendUid
|
||||
* @param giftId
|
||||
* @param goldNum
|
||||
* @param giftNum
|
||||
* @param giftSource
|
||||
* @return
|
||||
*/
|
||||
private UserPurse reduceStockV5(long sendUid, int partitionId, int giftId, Long goldNum, int giftNum, int giftSource, BillObjTypeEnum objTypeEnum) {
|
||||
private UserPurse reduceStockV5(long sendUid, int partitionId, int giftId,
|
||||
Long[] receiverList, Long everyGoldNum, Long totalGoldNum,
|
||||
int giftNum, int giftSource, BillObjTypeEnum objTypeEnum) {
|
||||
if (giftSource == Constant.GiftSource.COMMON || giftSource == Constant.GiftSource.ROOMPHOTO) {
|
||||
if (goldNum <= 0) {
|
||||
if (totalGoldNum <= 0) {
|
||||
throw new ServiceException(BusiStatus.PARAMETERILLEGAL);
|
||||
}
|
||||
if (giftSendConsumeGoldService.getGiftSendGoldConsumeSwitch(partitionId)){
|
||||
return giftSendConsumeGoldService.exchangeGoldToSend(sendUid, goldNum);
|
||||
return giftSendConsumeGoldService.exchangeGoldToSend(sendUid, totalGoldNum);
|
||||
} else {
|
||||
//账单等mq
|
||||
//因为外层有事务
|
||||
return userPurseService.subDiamondWithoutTx(sendUid, goldNum.doubleValue(), objTypeEnum, BusiStatus.PURSEMONEYNOTENOUGH, UserPurse::getDiamonds);
|
||||
return userPurseService.subDiamond(sendUid, totalGoldNum.doubleValue(), objTypeEnum, BusiStatus.PURSEMONEYNOTENOUGH, UserPurse::getDiamonds);
|
||||
}
|
||||
|
||||
} else if (giftSource == Constant.GiftSource.BACKPACK) {
|
||||
@@ -506,7 +507,8 @@ public class GiftSendService extends BaseService {
|
||||
int giftSource, byte sendType, Date sendGiftTime) {
|
||||
//扣
|
||||
BillObjTypeEnum objTypeEnum = null != room? BillObjTypeEnum.GIFT_ROOM_PAY: BillObjTypeEnum.GIFT_PERSON_PAY;
|
||||
UserPurse after = reduceStockV5(sender.getUid(), sender.getPartitionId(), gift.getGiftId(), totalGoldNum, totalGiftNum, giftSource, objTypeEnum);
|
||||
UserPurse after = reduceStockV5(sender.getUid(), sender.getPartitionId(), gift.getGiftId(),
|
||||
recvUids, everyGoldNum, totalGoldNum, totalGiftNum, giftSource, objTypeEnum);
|
||||
|
||||
//
|
||||
if (gift.getGiftType() == Constant.GiftType.LUCKY_BAG || gift.getGiftType() == Constant.GiftType.LUCKY_BAG_LINEAR) {
|
||||
@@ -521,7 +523,7 @@ public class GiftSendService extends BaseService {
|
||||
Map<String, String> caches = giftMessages.stream().collect(Collectors.toMap(GiftMessage::getMessId, JSON::toJSONString));
|
||||
jedisService.hwrite(RedisKey.mq_gift_status.getKey(), caches);
|
||||
|
||||
rocketMQService.sendBatchGiftMessage(caches.values());
|
||||
rocketMQService.sendBatchGiftMessage(giftMessages);
|
||||
}
|
||||
|
||||
private void sendMq(Users sender, Gift gift, Long[] recvUids, Room room, Integer everyGiftNum, Long everyGoldNum,
|
||||
|
@@ -47,11 +47,13 @@ public class RocketMQService {
|
||||
*
|
||||
* @param giftMessages
|
||||
*/
|
||||
public void sendBatchGiftMessage(Collection<String> giftMessages) {
|
||||
public void sendBatchGiftMessage(List<GiftMessage> giftMessages) {
|
||||
Long uid = giftMessages.get(0).getRecvUid();
|
||||
|
||||
List<Message<String>> messageList = giftMessages.stream()
|
||||
.map(giftMessage -> MessageBuilder.withPayload(giftMessage).build())
|
||||
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
|
||||
.collect(Collectors.toList());
|
||||
SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.GIFT_TOPIC, messageList);
|
||||
SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConstant.GIFT_TOPIC, messageList, uid.toString());
|
||||
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){
|
||||
log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList);
|
||||
} else {
|
||||
|
@@ -147,12 +147,6 @@ public class UserPurseService extends ServiceImpl<UserPurseMapper,UserPurse> {
|
||||
return SpringContextHolder.getBean(UserPurseService.class).subDiamond(uid, diamondNum, objTypeEnum, BusiStatus.PURSEMONEYNOTENOUGH, billConsumer);
|
||||
}
|
||||
|
||||
@Frozen
|
||||
//@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager")
|
||||
public UserPurse subDiamondWithoutTx(Long uid, Double diamondNum, BillObjTypeEnum objTypeEnum, BusiStatus busiStatus, Consumer<UserPurse> billConsumer) {
|
||||
return subDiamond(uid, diamondNum, objTypeEnum, busiStatus, billConsumer);
|
||||
}
|
||||
|
||||
@Frozen
|
||||
@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager")
|
||||
public UserPurse subDiamond(Long uid, Double diamondNum, BillObjTypeEnum objTypeEnum, BusiStatus busiStatus, Consumer<UserPurse> billConsumer) {
|
||||
|
@@ -5,6 +5,7 @@ import com.accompany.business.service.gift.GiftMessageService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
@@ -13,7 +14,8 @@ import org.springframework.stereotype.Component;
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP)
|
||||
//@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP)
|
||||
@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
|
||||
public class GiftMessageConsumer extends AbstractMessageListener<GiftMessage> {
|
||||
|
||||
@Autowired
|
||||
|
Reference in New Issue
Block a user